跳到主要内容

交易员智能体

概述

交易员智能体是 TradingAgents 框架的核心决策组件,负责综合分析师报告和研究员辩论结果,制定最终的交易决策。交易员智能体具备专业的交易知识和风险意识,能够在复杂的市场环境中做出明智的投资决策。

交易员架构

基础交易员类

class Trader:
"""交易员智能体 - 负责最终交易决策"""

def __init__(self, llm, config):
self.llm = llm
self.config = config
self.trading_style = config.get("trading_style", "balanced")
self.risk_tolerance = config.get("risk_tolerance", "medium")
self.memory = TradingMemory()
self.position_manager = PositionManager()

def make_decision(self, analysis_data: Dict) -> Dict:
"""制定交易决策"""

# 1. 综合分析所有输入
comprehensive_analysis = self.synthesize_analysis(analysis_data)

# 2. 评估市场条件
market_assessment = self.assess_market_conditions(analysis_data)

# 3. 制定交易策略
trading_strategy = self.develop_trading_strategy(
comprehensive_analysis, market_assessment
)

# 4. 确定仓位大小
position_size = self.calculate_position_size(trading_strategy)

# 5. 设置风险管理参数
risk_parameters = self.set_risk_parameters(trading_strategy)

# 6. 生成最终决策
final_decision = self.generate_final_decision(
trading_strategy, position_size, risk_parameters
)

# 7. 更新交易记忆
self.memory.update_decision(final_decision)

return final_decision

核心功能模块

1. 分析综合模块

def synthesize_analysis(self, analysis_data: Dict) -> Dict:
"""综合分析所有输入数据"""

# 提取各类分析结果
analyst_reports = analysis_data.get("analyst_reports", {})
research_consensus = analysis_data.get("research_consensus", {})
market_data = analysis_data.get("market_data", {})

# 分析师报告权重
analyst_weights = self.config.get("analyst_weights", {
"fundamentals": 0.3,
"technical": 0.3,
"news": 0.2,
"social": 0.2
})

# 计算加权分析评分
weighted_scores = {}
total_score = 0

for analyst_type, weight in analyst_weights.items():
if analyst_type in analyst_reports:
score = analyst_reports[analyst_type].get("overall_score", 0.5)
weighted_scores[analyst_type] = score * weight
total_score += weighted_scores[analyst_type]

# 研究员共识影响
consensus_impact = self._assess_consensus_impact(research_consensus)

# 综合评估
synthesis = {
"analyst_scores": weighted_scores,
"total_analyst_score": total_score,
"consensus_impact": consensus_impact,
"adjusted_score": self._adjust_score_with_consensus(total_score, consensus_impact),
"confidence_level": self._calculate_overall_confidence(analyst_reports, research_consensus),
"key_factors": self._extract_key_factors(analyst_reports, research_consensus)
}

return synthesis

def _assess_consensus_impact(self, research_consensus: Dict) -> Dict:
"""评估研究员共识的影响"""

consensus_strength = research_consensus.get("consensus_level", 0.5)
recommendation = research_consensus.get("recommendation", "neutral")

# 共识强度影响权重
if consensus_strength > 0.8:
impact_weight = 0.3 # 高共识,高影响
elif consensus_strength > 0.6:
impact_weight = 0.2 # 中等共识,中等影响
else:
impact_weight = 0.1 # 低共识,低影响

# 推荐方向影响
direction_impact = {
"谨慎乐观": 0.15,
"谨慎悲观": -0.15,
"中性观望": 0.0
}.get(recommendation, 0.0)

return {
"strength": consensus_strength,
"weight": impact_weight,
"direction": direction_impact,
"net_impact": direction_impact * impact_weight
}

2. 市场条件评估

def assess_market_conditions(self, analysis_data: Dict) -> Dict:
"""评估当前市场条件"""

market_indicators = {
"volatility": self._assess_volatility(analysis_data),
"liquidity": self._assess_liquidity(analysis_data),
"sentiment": self._assess_market_sentiment(analysis_data),
"trend": self._assess_market_trend(analysis_data),
"correlation": self._assess_market_correlation(analysis_data)
}

# 市场环境分类
market_regime = self._classify_market_regime(market_indicators)

# 交易适宜性评估
trading_suitability = self._assess_trading_suitability(market_indicators)

return {
"indicators": market_indicators,
"regime": market_regime,
"suitability": trading_suitability,
"risk_level": self._calculate_market_risk_level(market_indicators)
}

def _classify_market_regime(self, indicators: Dict) -> str:
"""分类市场环境"""

volatility = indicators["volatility"]["level"]
trend = indicators["trend"]["direction"]
sentiment = indicators["sentiment"]["score"]

if volatility == "low" and trend == "uptrend" and sentiment > 0.6:
return "bull_market"
elif volatility == "high" and trend == "downtrend" and sentiment < 0.4:
return "bear_market"
elif volatility == "high":
return "volatile_market"
else:
return "sideways_market"

def _assess_trading_suitability(self, indicators: Dict) -> Dict:
"""评估交易适宜性"""

suitability_score = 0.5
factors = []

# 波动性影响
if indicators["volatility"]["level"] == "low":
suitability_score += 0.1
factors.append("低波动性有利于交易")
elif indicators["volatility"]["level"] == "high":
suitability_score -= 0.1
factors.append("高波动性增加交易风险")

# 流动性影响
if indicators["liquidity"]["level"] == "high":
suitability_score += 0.1
factors.append("高流动性便于进出")

# 趋势明确性影响
if indicators["trend"]["strength"] > 0.7:
suitability_score += 0.1
factors.append("趋势明确有利于交易")

return {
"score": max(0.0, min(1.0, suitability_score)),
"factors": factors,
"recommendation": "适宜" if suitability_score > 0.6 else "谨慎" if suitability_score > 0.4 else "不适宜"
}

3. 交易策略制定

def develop_trading_strategy(self, analysis: Dict, market_conditions: Dict) -> Dict:
"""制定交易策略"""

# 基于分析结果确定基本方向
base_signal = self._determine_base_signal(analysis)

# 根据市场条件调整策略
adjusted_strategy = self._adjust_for_market_conditions(base_signal, market_conditions)

# 选择交易类型
trade_type = self._select_trade_type(adjusted_strategy, market_conditions)

# 设定时间框架
time_horizon = self._determine_time_horizon(adjusted_strategy, market_conditions)

# 制定进出场策略
entry_exit_strategy = self._develop_entry_exit_strategy(adjusted_strategy)

return {
"base_signal": base_signal,
"adjusted_signal": adjusted_strategy,
"trade_type": trade_type,
"time_horizon": time_horizon,
"entry_strategy": entry_exit_strategy["entry"],
"exit_strategy": entry_exit_strategy["exit"],
"confidence": self._calculate_strategy_confidence(analysis, market_conditions)
}

def _determine_base_signal(self, analysis: Dict) -> Dict:
"""确定基本交易信号"""

adjusted_score = analysis.get("adjusted_score", 0.5)
confidence = analysis.get("confidence_level", 0.5)

# 信号强度阈值
strong_buy_threshold = 0.7
buy_threshold = 0.6
sell_threshold = 0.4
strong_sell_threshold = 0.3

if adjusted_score >= strong_buy_threshold and confidence > 0.7:
signal = "strong_buy"
elif adjusted_score >= buy_threshold:
signal = "buy"
elif adjusted_score <= strong_sell_threshold and confidence > 0.7:
signal = "strong_sell"
elif adjusted_score <= sell_threshold:
signal = "sell"
else:
signal = "hold"

return {
"action": signal,
"strength": abs(adjusted_score - 0.5) * 2, # 0-1 scale
"confidence": confidence,
"score": adjusted_score
}

def _select_trade_type(self, strategy: Dict, market_conditions: Dict) -> str:
"""选择交易类型"""

signal_strength = strategy.get("strength", 0.5)
market_regime = market_conditions.get("regime", "sideways_market")
volatility = market_conditions["indicators"]["volatility"]["level"]

# 根据信号强度和市场条件选择交易类型
if signal_strength > 0.8 and market_regime in ["bull_market", "bear_market"]:
return "momentum_trade" # 动量交易
elif signal_strength > 0.6 and volatility == "low":
return "swing_trade" # 波段交易
elif market_regime == "sideways_market":
return "range_trade" # 区间交易
else:
return "position_trade" # 仓位交易

4. 仓位管理

def calculate_position_size(self, strategy: Dict) -> Dict:
"""计算仓位大小"""

# 基础仓位大小 (基于信号强度)
signal_strength = strategy.get("strength", 0.5)
base_position = signal_strength * self.config.get("max_position_size", 0.1)

# 风险调整
risk_adjustment = self._calculate_risk_adjustment(strategy)

# 市场条件调整
market_adjustment = self._calculate_market_adjustment(strategy)

# 最终仓位大小
final_position = base_position * risk_adjustment * market_adjustment

# 确保在限制范围内
max_position = self.config.get("max_position_size", 0.1)
min_position = self.config.get("min_position_size", 0.01)

final_position = max(min_position, min(max_position, final_position))

return {
"base_size": base_position,
"risk_adjustment": risk_adjustment,
"market_adjustment": market_adjustment,
"final_size": final_position,
"size_rationale": self._explain_position_sizing(
base_position, risk_adjustment, market_adjustment
)
}

def _calculate_risk_adjustment(self, strategy: Dict) -> float:
"""计算风险调整系数"""

confidence = strategy.get("confidence", 0.5)
trade_type = strategy.get("trade_type", "position_trade")

# 基于置信度的调整
confidence_adjustment = 0.5 + (confidence - 0.5)

# 基于交易类型的调整
type_adjustments = {
"momentum_trade": 1.2, # 动量交易可以稍大仓位
"swing_trade": 1.0, # 波段交易标准仓位
"range_trade": 0.8, # 区间交易较小仓位
"position_trade": 0.9 # 仓位交易中等仓位
}

type_adjustment = type_adjustments.get(trade_type, 1.0)

return confidence_adjustment * type_adjustment

5. 风险管理参数

def set_risk_parameters(self, strategy: Dict) -> Dict:
"""设置风险管理参数"""

trade_type = strategy.get("trade_type", "position_trade")
signal_strength = strategy.get("strength", 0.5)
time_horizon = strategy.get("time_horizon", "medium")

# 止损设置
stop_loss = self._calculate_stop_loss(trade_type, signal_strength)

# 止盈设置
take_profit = self._calculate_take_profit(trade_type, signal_strength)

# 风险收益比
risk_reward_ratio = take_profit / stop_loss if stop_loss > 0 else 0

# 最大持有时间
max_holding_period = self._determine_max_holding_period(time_horizon)

return {
"stop_loss": stop_loss,
"take_profit": take_profit,
"risk_reward_ratio": risk_reward_ratio,
"max_holding_period": max_holding_period,
"position_monitoring": self._setup_position_monitoring(strategy),
"exit_conditions": self._define_exit_conditions(strategy)
}

def _calculate_stop_loss(self, trade_type: str, signal_strength: float) -> float:
"""计算止损水平"""

base_stop_loss = {
"momentum_trade": 0.03, # 3% 止损
"swing_trade": 0.05, # 5% 止损
"range_trade": 0.02, # 2% 止损
"position_trade": 0.08 # 8% 止损
}.get(trade_type, 0.05)

# 根据信号强度调整
# 信号越强,可以容忍更大的回撤
strength_adjustment = 1.0 + (signal_strength - 0.5) * 0.5

return base_stop_loss * strength_adjustment

def _calculate_take_profit(self, trade_type: str, signal_strength: float) -> float:
"""计算止盈水平"""

base_take_profit = {
"momentum_trade": 0.08, # 8% 止盈
"swing_trade": 0.12, # 12% 止盈
"range_trade": 0.04, # 4% 止盈
"position_trade": 0.20 # 20% 止盈
}.get(trade_type, 0.10)

# 根据信号强度调整
strength_adjustment = 1.0 + signal_strength * 0.5

return base_take_profit * strength_adjustment

6. 最终决策生成

def generate_final_decision(self, strategy: Dict, position: Dict, risk_params: Dict) -> Dict:
"""生成最终交易决策"""

action = strategy["adjusted_signal"]["action"]

# 构建决策结构
decision = {
"action": action,
"quantity": position["final_size"],
"confidence": strategy["confidence"],
"strategy_type": strategy["trade_type"],
"time_horizon": strategy["time_horizon"],

# 风险管理
"stop_loss": risk_params["stop_loss"],
"take_profit": risk_params["take_profit"],
"risk_reward_ratio": risk_params["risk_reward_ratio"],

# 执行细节
"entry_strategy": strategy["entry_strategy"],
"exit_strategy": strategy["exit_strategy"],
"max_holding_period": risk_params["max_holding_period"],

# 决策依据
"reasoning": self._generate_decision_reasoning(strategy, position, risk_params),
"key_factors": strategy.get("key_factors", []),

# 监控要求
"monitoring_requirements": risk_params["position_monitoring"],
"exit_conditions": risk_params["exit_conditions"],

# 元数据
"decision_timestamp": datetime.now().isoformat(),
"decision_id": self._generate_decision_id(),
"trader_style": self.trading_style,
"risk_tolerance": self.risk_tolerance
}

return decision

def _generate_decision_reasoning(self, strategy: Dict, position: Dict, risk_params: Dict) -> str:
"""生成决策推理说明"""

action = strategy["adjusted_signal"]["action"]
confidence = strategy["confidence"]
trade_type = strategy["trade_type"]

reasoning_parts = []

# 基本决策逻辑
if action in ["buy", "strong_buy"]:
reasoning_parts.append(f"基于综合分析,建议{action}该股票")
elif action in ["sell", "strong_sell"]:
reasoning_parts.append(f"基于综合分析,建议{action}该股票")
else:
reasoning_parts.append("当前分析结果建议持有观望")

# 置信度说明
if confidence > 0.8:
reasoning_parts.append(f"决策置信度很高({confidence:.1%})")
elif confidence > 0.6:
reasoning_parts.append(f"决策置信度较高({confidence:.1%})")
else:
reasoning_parts.append(f"决策置信度中等({confidence:.1%}),建议谨慎操作")

# 交易类型说明
reasoning_parts.append(f"采用{trade_type}策略")

# 风险管理说明
reasoning_parts.append(
f"设置{risk_params['stop_loss']:.1%}止损和{risk_params['take_profit']:.1%}止盈"
)

# 仓位说明
reasoning_parts.append(f"建议仓位大小为{position['final_size']:.1%}")

return "。".join(reasoning_parts) + "。"

7. 交易记忆管理

class TradingMemory:
"""交易记忆管理"""

def __init__(self):
self.decision_history = []
self.performance_metrics = {}
self.learning_insights = []

def update_decision(self, decision: Dict):
"""更新决策记录"""
self.decision_history.append(decision)

# 保持最近100个决策
if len(self.decision_history) > 100:
self.decision_history = self.decision_history[-100:]

def learn_from_outcomes(self, decision_id: str, outcome: Dict):
"""从交易结果中学习"""

# 找到对应的决策
decision = self._find_decision(decision_id)
if not decision:
return

# 分析决策质量
decision_quality = self._analyze_decision_quality(decision, outcome)

# 提取学习要点
insights = self._extract_learning_insights(decision, outcome, decision_quality)

# 更新学习记录
self.learning_insights.extend(insights)

# 更新性能指标
self._update_performance_metrics(decision, outcome)

def get_relevant_experience(self, current_situation: Dict) -> List[Dict]:
"""获取相关的历史经验"""

relevant_decisions = []

for decision in self.decision_history:
similarity = self._calculate_situation_similarity(
current_situation, decision
)

if similarity > 0.7: # 相似度阈值
relevant_decisions.append({
"decision": decision,
"similarity": similarity
})

# 按相似度排序
relevant_decisions.sort(key=lambda x: x["similarity"], reverse=True)

return relevant_decisions[:5] # 返回最相似的5个决策

交易员智能体通过综合分析、策略制定、风险管理和持续学习,确保在复杂的市场环境中做出明智的投资决策。