Skip to main content

数据流架构

概述

TradingAgents 的数据流架构设计用于高效地获取、处理和分发金融数据。系统支持多种数据源,实现了统一的数据接口,并提供了强大的缓存和处理机制。

数据流架构图

graph TB
subgraph "外部数据源"
FINNHUB[FinnHub API<br/>实时金融数据]
YAHOO[Yahoo Finance<br/>历史价格数据]
REDDIT[Reddit API<br/>社交媒体数据]
GNEWS[Google News<br/>新闻数据]
CUSTOM[自定义数据源<br/>扩展接口]
end

subgraph "数据获取层"
FUTILS[FinnHub Utils]
YUTILS[YFinance Utils]
RUTILS[Reddit Utils]
NUTILS[News Utils]
SUTILS[StockStats Utils]
end

subgraph "数据处理层"
INTERFACE[Data Interface]
PROCESSOR[Data Processor]
VALIDATOR[Data Validator]
TRANSFORMER[Data Transformer]
end

subgraph "缓存层"
CACHE[Data Cache]
REDIS[Redis Cache]
LOCAL[Local Cache]
MEMORY[Memory Cache]
end

subgraph "数据分发层"
DISPATCHER[Data Dispatcher]
ROUTER[Data Router]
FORMATTER[Data Formatter]
end

subgraph "智能体消费层"
ANALYSTS[分析师团队]
RESEARCHERS[研究员团队]
TRADER[交易员]
RISK[风险管理]
end

FINNHUB --> FUTILS
YAHOO --> YUTILS
REDDIT --> RUTILS
GNEWS --> NUTILS
CUSTOM --> SUTILS

FUTILS --> INTERFACE
YUTILS --> INTERFACE
RUTILS --> INTERFACE
NUTILS --> INTERFACE
SUTILS --> INTERFACE

INTERFACE --> PROCESSOR
PROCESSOR --> VALIDATOR
VALIDATOR --> TRANSFORMER

TRANSFORMER --> CACHE
CACHE --> REDIS
CACHE --> LOCAL
CACHE --> MEMORY

CACHE --> DISPATCHER
DISPATCHER --> ROUTER
ROUTER --> FORMATTER

FORMATTER --> ANALYSTS
FORMATTER --> RESEARCHERS
FORMATTER --> TRADER
FORMATTER --> RISK

数据源详解

1. FinnHub API

class FinnHubUtils:
"""FinnHub 数据获取工具"""

支持的数据类型:
- 实时股价数据
- 公司基本信息
- 财务报表数据
- 新闻和公告
- 技术指标
- 市场情绪指标

API限制:
- 免费版: 60 calls/minute
- 付费版: 更高频率限制

数据格式:
{
"symbol": "AAPL",
"price": 150.25,
"change": 2.15,
"changePercent": 1.45,
"timestamp": 1640995200
}

2. Yahoo Finance

class YFinanceUtils:
"""Yahoo Finance 数据获取工具"""

支持的数据类型:
- 历史价格数据
- 股票分割信息
- 股息数据
- 期权数据
- 财务指标

优势:
- 免费使用
- 数据覆盖面广
- 历史数据丰富

数据格式:
{
"Date": "2024-01-01",
"Open": 148.50,
"High": 152.30,
"Low": 147.80,
"Close": 150.25,
"Volume": 45678900
}

3. Reddit API

class RedditUtils:
"""Reddit 社交媒体数据获取工具"""

支持的数据类型:
- 热门帖子
- 评论情感
- 用户讨论热度
- 关键词提及频率

分析维度:
- 情感极性 (正面/负面/中性)
- 讨论热度
- 用户参与度
- 话题趋势

数据格式:
{
"post_id": "abc123",
"title": "AAPL earnings discussion",
"score": 1250,
"comments": 89,
"sentiment": 0.65,
"timestamp": 1640995200
}

4. Google News

class GoogleNewsUtils:
"""Google News 新闻数据获取工具"""

支持的数据类型:
- 相关新闻文章
- 新闻情感分析
- 事件时间线
- 影响力评估

处理流程:
1. 关键词搜索
2. 新闻筛选
3. 内容提取
4. 情感分析
5. 影响力评估

数据格式:
{
"title": "Apple reports strong Q4 earnings",
"source": "Reuters",
"published": "2024-01-01T10:00:00Z",
"sentiment": 0.8,
"relevance": 0.95,
"impact_score": 0.7
}

数据处理流程

1. 数据获取阶段

class DataAcquisition:
"""数据获取协调器"""

def fetch_data(self, symbol: str, date: str) -> Dict:
"""获取指定股票和日期的所有数据"""

# 并行获取各类数据
tasks = [
self.fetch_price_data(symbol, date),
self.fetch_fundamental_data(symbol),
self.fetch_news_data(symbol, date),
self.fetch_social_data(symbol, date),
self.fetch_technical_data(symbol, date)
]

# 等待所有任务完成
results = await asyncio.gather(*tasks)

# 整合数据
return self.integrate_data(results)

2. 数据验证阶段

class DataValidator:
"""数据验证器"""

验证规则:
- 数据完整性检查
- 数据类型验证
- 数值范围检查
- 时间戳验证
- 异常值检测

def validate(self, data: Dict) -> Tuple[bool, List[str]]:
"""验证数据质量"""
errors = []

# 检查必需字段
if not self.check_required_fields(data):
errors.append("Missing required fields")

# 检查数据类型
if not self.check_data_types(data):
errors.append("Invalid data types")

# 检查数值范围
if not self.check_value_ranges(data):
errors.append("Values out of range")

return len(errors) == 0, errors

3. 数据转换阶段

class DataTransformer:
"""数据转换器"""

转换功能:
- 数据标准化
- 单位统一
- 格式转换
- 特征工程
- 数据聚合

def transform(self, raw_data: Dict) -> Dict:
"""转换原始数据为标准格式"""

transformed = {}

# 价格数据标准化
transformed['price_data'] = self.normalize_prices(
raw_data['price_data']
)

# 财务数据转换
transformed['financial_data'] = self.convert_financials(
raw_data['financial_data']
)

# 情感数据聚合
transformed['sentiment_data'] = self.aggregate_sentiment(
raw_data['news_data'],
raw_data['social_data']
)

return transformed

缓存策略

1. 多层缓存架构

class CacheManager:
"""缓存管理器"""

缓存层次:
1. 内存缓存 (最快访问)
2. 本地文件缓存 (持久化)
3. Redis缓存 (分布式)
4. 数据库缓存 (长期存储)

def get_data(self, key: str) -> Optional[Dict]:
"""按优先级获取缓存数据"""

# 1. 检查内存缓存
if data := self.memory_cache.get(key):
return data

# 2. 检查本地缓存
if data := self.local_cache.get(key):
self.memory_cache.set(key, data)
return data

# 3. 检查Redis缓存
if data := self.redis_cache.get(key):
self.local_cache.set(key, data)
self.memory_cache.set(key, data)
return data

return None

2. 缓存策略

缓存配置:
{
"price_data": {
"ttl": 300, # 5分钟过期
"refresh": "auto" # 自动刷新
},
"fundamental_data": {
"ttl": 86400, # 24小时过期
"refresh": "manual" # 手动刷新
},
"news_data": {
"ttl": 3600, # 1小时过期
"refresh": "auto" # 自动刷新
},
"social_data": {
"ttl": 1800, # 30分钟过期
"refresh": "auto" # 自动刷新
}
}

数据分发机制

1. 数据路由

class DataRouter:
"""数据路由器"""

路由规则:
- 基本面数据 → 基本面分析师
- 技术数据 → 技术分析师
- 新闻数据 → 新闻分析师
- 社交数据 → 社交媒体分析师
- 综合数据 → 所有智能体

def route_data(self, data: Dict, agents: List[str]) -> Dict:
"""根据智能体类型分发相应数据"""

routed_data = {}

for agent in agents:
if agent == "fundamentals_analyst":
routed_data[agent] = {
"financial_data": data["financial_data"],
"company_info": data["company_info"],
"industry_data": data["industry_data"]
}
elif agent == "technical_analyst":
routed_data[agent] = {
"price_data": data["price_data"],
"volume_data": data["volume_data"],
"technical_indicators": data["technical_indicators"]
}
# ... 其他智能体的路由规则

return routed_data

2. 数据格式化

class DataFormatter:
"""数据格式化器"""

def format_for_agent(self, data: Dict, agent_type: str) -> Dict:
"""为特定智能体格式化数据"""

if agent_type == "fundamentals_analyst":
return self.format_fundamental_data(data)
elif agent_type == "technical_analyst":
return self.format_technical_data(data)
elif agent_type == "news_analyst":
return self.format_news_data(data)
elif agent_type == "social_analyst":
return self.format_social_data(data)

return data

性能优化

1. 并行处理

  • 多线程数据获取
  • 异步API调用
  • 并行数据处理

2. 智能缓存

  • 预测性缓存
  • 热数据预加载
  • 缓存命中率优化

3. 数据压缩

  • 数据压缩存储
  • 增量数据传输
  • 数据去重

4. 错误处理

  • 数据源故障转移
  • 重试机制
  • 降级策略

这种数据流架构确保了系统能够高效、可靠地处理大量金融数据,为智能体提供高质量的数据支持。