交易员智能体
概述
交易员智能体是 TradingAgents 框架的核心决策组件,负责综合分析师报告和研究员辩论结果,制定最终的交易决策。交易员智能体具备专业的交易知识和风险意识,能够在复杂的市场环境中做出明智的投资决策。
交易员架构
基础交易员类
class Trader:
"""交易员智能体 - 负责最终交易决策"""
def __init__(self, llm, config):
self.llm = llm
self.config = config
self.trading_style = config.get("trading_style", "balanced")
self.risk_tolerance = config.get("risk_tolerance", "medium")
self.memory = TradingMemory()
self.position_manager = PositionManager()
def make_decision(self, analysis_data: Dict) -> Dict:
"""制定交易决策"""
# 1. 综合分析所有输入
comprehensive_analysis = self.synthesize_analysis(analysis_data)
# 2. 评估市场条件
market_assessment = self.assess_market_conditions(analysis_data)
# 3. 制定交易策略
trading_strategy = self.develop_trading_strategy(
comprehensive_analysis, market_assessment
)
# 4. 确定仓位大小
position_size = self.calculate_position_size(trading_strategy)
# 5. 设置风险管理参数
risk_parameters = self.set_risk_parameters(trading_strategy)
# 6. 生成最终决策
final_decision = self.generate_final_decision(
trading_strategy, position_size, risk_parameters
)
# 7. 更新交易记忆
self.memory.update_decision(final_decision)
return final_decision
核心功能模块
1. 分析综合模块
def synthesize_analysis(self, analysis_data: Dict) -> Dict:
"""综合分析所有输入数据"""
# 提取各类分析结果
analyst_reports = analysis_data.get("analyst_reports", {})
research_consensus = analysis_data.get("research_consensus", {})
market_data = analysis_data.get("market_data", {})
# 分析师报告权重
analyst_weights = self.config.get("analyst_weights", {
"fundamentals": 0.3,
"technical": 0.3,
"news": 0.2,
"social": 0.2
})
# 计算加权分析评分
weighted_scores = {}
total_score = 0
for analyst_type, weight in analyst_weights.items():
if analyst_type in analyst_reports:
score = analyst_reports[analyst_type].get("overall_score", 0.5)
weighted_scores[analyst_type] = score * weight
total_score += weighted_scores[analyst_type]
# 研究员共识影响
consensus_impact = self._assess_consensus_impact(research_consensus)
# 综合评估
synthesis = {
"analyst_scores": weighted_scores,
"total_analyst_score": total_score,
"consensus_impact": consensus_impact,
"adjusted_score": self._adjust_score_with_consensus(total_score, consensus_impact),
"confidence_level": self._calculate_overall_confidence(analyst_reports, research_consensus),
"key_factors": self._extract_key_factors(analyst_reports, research_consensus)
}
return synthesis
def _assess_consensus_impact(self, research_consensus: Dict) -> Dict:
"""评估研究员共识的影响"""
consensus_strength = research_consensus.get("consensus_level", 0.5)
recommendation = research_consensus.get("recommendation", "neutral")
# 共识强度影响权重
if consensus_strength > 0.8:
impact_weight = 0.3 # 高共识,高影响
elif consensus_strength > 0.6:
impact_weight = 0.2 # 中等共识,中等影响
else:
impact_weight = 0.1 # 低共识,低影响
# 推荐方向影响
direction_impact = {
"谨慎乐观": 0.15,
"谨慎悲观": -0.15,
"中性观望": 0.0
}.get(recommendation, 0.0)
return {
"strength": consensus_strength,
"weight": impact_weight,
"direction": direction_impact,
"net_impact": direction_impact * impact_weight
}
2. 市场条件评估
def assess_market_conditions(self, analysis_data: Dict) -> Dict:
"""评估当前市场条件"""
market_indicators = {
"volatility": self._assess_volatility(analysis_data),
"liquidity": self._assess_liquidity(analysis_data),
"sentiment": self._assess_market_sentiment(analysis_data),
"trend": self._assess_market_trend(analysis_data),
"correlation": self._assess_market_correlation(analysis_data)
}
# 市场环境分类
market_regime = self._classify_market_regime(market_indicators)
# 交易适宜性评估
trading_suitability = self._assess_trading_suitability(market_indicators)
return {
"indicators": market_indicators,
"regime": market_regime,
"suitability": trading_suitability,
"risk_level": self._calculate_market_risk_level(market_indicators)
}
def _classify_market_regime(self, indicators: Dict) -> str:
"""分类市场环境"""
volatility = indicators["volatility"]["level"]
trend = indicators["trend"]["direction"]
sentiment = indicators["sentiment"]["score"]
if volatility == "low" and trend == "uptrend" and sentiment > 0.6:
return "bull_market"
elif volatility == "high" and trend == "downtrend" and sentiment < 0.4:
return "bear_market"
elif volatility == "high":
return "volatile_market"
else:
return "sideways_market"
def _assess_trading_suitability(self, indicators: Dict) -> Dict:
"""评估交易适宜性"""
suitability_score = 0.5
factors = []
# 波动性影响
if indicators["volatility"]["level"] == "low":
suitability_score += 0.1
factors.append("低波动性有利于交易")
elif indicators["volatility"]["level"] == "high":
suitability_score -= 0.1
factors.append("高波动性增加交易风险")
# 流动性影响
if indicators["liquidity"]["level"] == "high":
suitability_score += 0.1
factors.append("高流动性便于进出")
# 趋势明确性影响
if indicators["trend"]["strength"] > 0.7:
suitability_score += 0.1
factors.append("趋势明确有利于交易")
return {
"score": max(0.0, min(1.0, suitability_score)),
"factors": factors,
"recommendation": "适宜" if suitability_score > 0.6 else "谨慎" if suitability_score > 0.4 else "不适宜"
}
3. 交易策略制定
def develop_trading_strategy(self, analysis: Dict, market_conditions: Dict) -> Dict:
"""制定交易策略"""
# 基于分析结果确定基本方向
base_signal = self._determine_base_signal(analysis)
# 根据市场条件调整策略
adjusted_strategy = self._adjust_for_market_conditions(base_signal, market_conditions)
# 选择交易类型
trade_type = self._select_trade_type(adjusted_strategy, market_conditions)
# 设定时间框架
time_horizon = self._determine_time_horizon(adjusted_strategy, market_conditions)
# 制定进出场策略
entry_exit_strategy = self._develop_entry_exit_strategy(adjusted_strategy)
return {
"base_signal": base_signal,
"adjusted_signal": adjusted_strategy,
"trade_type": trade_type,
"time_horizon": time_horizon,
"entry_strategy": entry_exit_strategy["entry"],
"exit_strategy": entry_exit_strategy["exit"],
"confidence": self._calculate_strategy_confidence(analysis, market_conditions)
}
def _determine_base_signal(self, analysis: Dict) -> Dict:
"""确定基本交易信号"""
adjusted_score = analysis.get("adjusted_score", 0.5)
confidence = analysis.get("confidence_level", 0.5)
# 信号强度阈值
strong_buy_threshold = 0.7
buy_threshold = 0.6
sell_threshold = 0.4
strong_sell_threshold = 0.3
if adjusted_score >= strong_buy_threshold and confidence > 0.7:
signal = "strong_buy"
elif adjusted_score >= buy_threshold:
signal = "buy"
elif adjusted_score <= strong_sell_threshold and confidence > 0.7:
signal = "strong_sell"
elif adjusted_score <= sell_threshold:
signal = "sell"
else:
signal = "hold"
return {
"action": signal,
"strength": abs(adjusted_score - 0.5) * 2, # 0-1 scale
"confidence": confidence,
"score": adjusted_score
}
def _select_trade_type(self, strategy: Dict, market_conditions: Dict) -> str:
"""选择交易类型"""
signal_strength = strategy.get("strength", 0.5)
market_regime = market_conditions.get("regime", "sideways_market")
volatility = market_conditions["indicators"]["volatility"]["level"]
# 根据信号强度和市场条件选择交易类型
if signal_strength > 0.8 and market_regime in ["bull_market", "bear_market"]:
return "momentum_trade" # 动量交易
elif signal_strength > 0.6 and volatility == "low":
return "swing_trade" # 波段交易
elif market_regime == "sideways_market":
return "range_trade" # 区间交易
else:
return "position_trade" # 仓位交易
4. 仓位管理
def calculate_position_size(self, strategy: Dict) -> Dict:
"""计算仓位大小"""
# 基础仓位大小 (基于信号强度)
signal_strength = strategy.get("strength", 0.5)
base_position = signal_strength * self.config.get("max_position_size", 0.1)
# 风险调整
risk_adjustment = self._calculate_risk_adjustment(strategy)
# 市场条件调整
market_adjustment = self._calculate_market_adjustment(strategy)
# 最终仓位大小
final_position = base_position * risk_adjustment * market_adjustment
# 确保在限制范围内
max_position = self.config.get("max_position_size", 0.1)
min_position = self.config.get("min_position_size", 0.01)
final_position = max(min_position, min(max_position, final_position))
return {
"base_size": base_position,
"risk_adjustment": risk_adjustment,
"market_adjustment": market_adjustment,
"final_size": final_position,
"size_rationale": self._explain_position_sizing(
base_position, risk_adjustment, market_adjustment
)
}
def _calculate_risk_adjustment(self, strategy: Dict) -> float:
"""计算风险调整系数"""
confidence = strategy.get("confidence", 0.5)
trade_type = strategy.get("trade_type", "position_trade")
# 基于置信度的调整
confidence_adjustment = 0.5 + (confidence - 0.5)
# 基于交易类型的调整
type_adjustments = {
"momentum_trade": 1.2, # 动量交易可以稍大仓位
"swing_trade": 1.0, # 波段交易标准仓位
"range_trade": 0.8, # 区间交易较小仓位
"position_trade": 0.9 # 仓位交易中等仓位
}
type_adjustment = type_adjustments.get(trade_type, 1.0)
return confidence_adjustment * type_adjustment
5. 风险管理参数
def set_risk_parameters(self, strategy: Dict) -> Dict:
"""设置风险管理参数"""
trade_type = strategy.get("trade_type", "position_trade")
signal_strength = strategy.get("strength", 0.5)
time_horizon = strategy.get("time_horizon", "medium")
# 止损设置
stop_loss = self._calculate_stop_loss(trade_type, signal_strength)
# 止盈设置
take_profit = self._calculate_take_profit(trade_type, signal_strength)
# 风险收益比
risk_reward_ratio = take_profit / stop_loss if stop_loss > 0 else 0
# 最大持有时间
max_holding_period = self._determine_max_holding_period(time_horizon)
return {
"stop_loss": stop_loss,
"take_profit": take_profit,
"risk_reward_ratio": risk_reward_ratio,
"max_holding_period": max_holding_period,
"position_monitoring": self._setup_position_monitoring(strategy),
"exit_conditions": self._define_exit_conditions(strategy)
}
def _calculate_stop_loss(self, trade_type: str, signal_strength: float) -> float:
"""计算止损水平"""
base_stop_loss = {
"momentum_trade": 0.03, # 3% 止损
"swing_trade": 0.05, # 5% 止损
"range_trade": 0.02, # 2% 止损
"position_trade": 0.08 # 8% 止损
}.get(trade_type, 0.05)
# 根据信号强度调整
# 信号越强,可以容忍更大的回撤
strength_adjustment = 1.0 + (signal_strength - 0.5) * 0.5
return base_stop_loss * strength_adjustment
def _calculate_take_profit(self, trade_type: str, signal_strength: float) -> float:
"""计算止盈水平"""
base_take_profit = {
"momentum_trade": 0.08, # 8% 止盈
"swing_trade": 0.12, # 12% 止盈
"range_trade": 0.04, # 4% 止盈
"position_trade": 0.20 # 20% 止盈
}.get(trade_type, 0.10)
# 根据信号强度调整
strength_adjustment = 1.0 + signal_strength * 0.5
return base_take_profit * strength_adjustment
6. 最终决策生成
def generate_final_decision(self, strategy: Dict, position: Dict, risk_params: Dict) -> Dict:
"""生成最终交易决策"""
action = strategy["adjusted_signal"]["action"]
# 构建决策结构
decision = {
"action": action,
"quantity": position["final_size"],
"confidence": strategy["confidence"],
"strategy_type": strategy["trade_type"],
"time_horizon": strategy["time_horizon"],
# 风险管理
"stop_loss": risk_params["stop_loss"],
"take_profit": risk_params["take_profit"],
"risk_reward_ratio": risk_params["risk_reward_ratio"],
# 执行细节
"entry_strategy": strategy["entry_strategy"],
"exit_strategy": strategy["exit_strategy"],
"max_holding_period": risk_params["max_holding_period"],
# 决策依据
"reasoning": self._generate_decision_reasoning(strategy, position, risk_params),
"key_factors": strategy.get("key_factors", []),
# 监控要求
"monitoring_requirements": risk_params["position_monitoring"],
"exit_conditions": risk_params["exit_conditions"],
# 元数据
"decision_timestamp": datetime.now().isoformat(),
"decision_id": self._generate_decision_id(),
"trader_style": self.trading_style,
"risk_tolerance": self.risk_tolerance
}
return decision
def _generate_decision_reasoning(self, strategy: Dict, position: Dict, risk_params: Dict) -> str:
"""生成决策推理说明"""
action = strategy["adjusted_signal"]["action"]
confidence = strategy["confidence"]
trade_type = strategy["trade_type"]
reasoning_parts = []
# 基本决策逻辑
if action in ["buy", "strong_buy"]:
reasoning_parts.append(f"基于综合分析,建议{action}该股票")
elif action in ["sell", "strong_sell"]:
reasoning_parts.append(f"基于综合分析,建议{action}该股票")
else:
reasoning_parts.append("当前分析结果建议持有观望")
# 置信度说明
if confidence > 0.8:
reasoning_parts.append(f"决策置信度很高({confidence:.1%})")
elif confidence > 0.6:
reasoning_parts.append(f"决策置信度较高({confidence:.1%})")
else:
reasoning_parts.append(f"决策置信度中等({confidence:.1%}),建议谨慎操作")
# 交易类型说明
reasoning_parts.append(f"采用{trade_type}策略")
# 风险管理说明
reasoning_parts.append(
f"设置{risk_params['stop_loss']:.1%}止损和{risk_params['take_profit']:.1%}止盈"
)
# 仓位说明
reasoning_parts.append(f"建议仓位大小为{position['final_size']:.1%}")
return "。".join(reasoning_parts) + "。"
7. 交易记忆管理
class TradingMemory:
"""交易记忆管理"""
def __init__(self):
self.decision_history = []
self.performance_metrics = {}
self.learning_insights = []
def update_decision(self, decision: Dict):
"""更新决策记录"""
self.decision_history.append(decision)
# 保持最近100个决策
if len(self.decision_history) > 100:
self.decision_history = self.decision_history[-100:]
def learn_from_outcomes(self, decision_id: str, outcome: Dict):
"""从交易结果中学习"""
# 找到对应的决策
decision = self._find_decision(decision_id)
if not decision:
return
# 分析决策质量
decision_quality = self._analyze_decision_quality(decision, outcome)
# 提取学习要点
insights = self._extract_learning_insights(decision, outcome, decision_quality)
# 更新学习记录
self.learning_insights.extend(insights)
# 更新性能指标
self._update_performance_metrics(decision, outcome)
def get_relevant_experience(self, current_situation: Dict) -> List[Dict]:
"""获取相关的历史经验"""
relevant_decisions = []
for decision in self.decision_history:
similarity = self._calculate_situation_similarity(
current_situation, decision
)
if similarity > 0.7: # 相似度阈值
relevant_decisions.append({
"decision": decision,
"similarity": similarity
})
# 按相似度排序
relevant_decisions.sort(key=lambda x: x["similarity"], reverse=True)
return relevant_decisions[:5] # 返回最相似的5个决策
交易员智能体通过综合分析、策略制定、风险管理和持续学习,确保在复杂的市场环境中做出明智的投资决策。