ETF轮动策略(基于五福改进)
一个基于动量效应的智能ETF轮动交易系统
一句话概括
"追涨杀跌" —— 在强势ETF中顺势而为,在弱势ETF中及时退出。
先上回测图,回测区间是20250401-20260513,策略收益80.30%,超额49.76%,最大回撤14.84%

核心理念:动量效应
什么是动量?
简单说:"强者恒强,弱者恒弱"
就像跑步一样:
跑得快的选手,大概率下一段还是跑在前面的
跑得慢的选手,大概率继续落在后面
在市场中:
近期涨幅大的ETF,未来大概率继续涨
近期涨幅小的ETF,未来大概率继续跌
策略原理
过去25天涨得最好的ETF → 买入持有
过去25天跌得最多的ETF → 卖出换成涨得好的策略架构图
┌─────────────────────────────────────────────────────────┐
│ 每日收盘前(14:45) │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 第一步:风险识别 │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 检查当前市场状态: │ │
│ │ • 是否进入"震荡期"?(市场来回波动) │ │
│ │ • 乖离率过大?RSI超买回落?触发止损? │ │
│ │ • 满足任一条件 → 进入"高斯滤波器"模式 │ │
│ │ • 否则 → 进入"拉普拉斯滤波器"模式 │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 第二步:筛选候选ETF │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 从100+只ETF中筛选: │ │
│ │ 1. 流动性过滤(剔除日均成交额<1000万的ETF) │ │
│ │ 2. 溢价率过滤(剔除溢价>30%的ETF) │ │
│ │ 3. 短期风控过滤(剔除近3日跌幅>3%的ETF) │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 第三步:动量评分 │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 对每只ETF计算: │ │
│ │ • 动量得分 = ln(收益率) × R² 权重 │ │
│ │ • 年化收益率(上限1000%) │ │
│ │ • R²(趋势稳定性,越高越好) │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 第四步:RSRS增强 │
│ ┌─────────────────────────────────────────────────┐ │
│ │ RSRS(阻力支撑相对强度): │ │
│ │ • 斜率 > 0 → 市场偏强 → 加权 │ │
│ │ • 斜率 < 0 → 市场偏弱 → 减权 │ │
│ │ • 综合得分 = 动量得分 × (1 + RSRS权重) │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 第五步:行业分散 │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 最终持仓规则: │ │
│ │ • 同一行业最多1只(如"黄金"行业只选1只) │ │
│ │ • 按得分从高到低选满4-6只 │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 第六步:行业冷却 │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 触发止损的行业: │ │
│ │ • 进入10天"冷却期" │ │
│ │ • 冷却期内该行业ETF不参与排名 │ │
│ │ • 避免频繁交易同一行业 │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘核心功能详解
1. 震荡期识别与切换
为什么要识别震荡期?
在剧烈波动的市场中,动量效应会失效。此时"追涨"容易"追高被套"。
如何识别?
触发条件 | 说明 |
|---|---|
乖离率 > 8% | 价格偏离均线太多,可能回调 |
RSI超买后回落 | 从70以上跌到65以下 |
当日触发止损 | 持仓出现较大亏损 |
两种滤波器模式:
模式 | 适用场景 | 特点 |
|---|---|---|
拉普拉斯滤波 | 正常上涨趋势 | 斜率≥0.002即可,敏感度高 |
高斯滤波 | 震荡市场 | 标准差过滤,更稳健 |
2. 动态仓位管理
根据市场环境自动调整仓位:
市场状态 | 持仓数量 | 单只仓位 |
|---|---|---|
低回撤(<2%) | 6只 | 18% |
中低回撤(2-5%) | 5只 | 22% |
中回撤(5-10%) | 4只 | 25% |
高回撤(10-15%) | 3只 | 30% |
极高回撤(>15%) | 2只 | 30% |
3. 多重止损止盈机制
三重保护:
┌─────────────────────────────────────────────────────────┐
│ 第一重:固定止损 │
│ • 持仓成本 × 95% 止损(即亏损5%出局) │
│ • 适用于快速下跌行情 │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 第二重:移动止损 │
│ • 从最高点回撤6%触发 │
│ • 锁定利润,防止盈利变亏损 │
│ • 例子:买后涨到20元,最高到25元,回撤6%即24.5元出局 │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 第三重:分批止盈 │
│ • 单只盈利>12%时,卖出50% │
│ • 剩余50%使用移动止损 │
│ • 先落袋为安,剩下的让利润奔跑 │
└─────────────────────────────────────────────────────────┘4. 动量衰竭出场
连续2日动量得分<0 → 自动清仓
原理:动量得分由正转负,说明上涨趋势可能结束。
5. 市场宽度指标
衡量市场整体强弱:
正动量占比 > 50%:正常市场
正动量占比 35-50%:弱势市场 → 降低仓位
正动量占比 < 35%:市场糟糕 → 降至2只+防御仓位
6. 入场择时
只在胜率较高时入场:
条件:沪深300尾盘跌幅 > 2%
操作:半仓买入(不是满仓)
原理:大跌次日反弹概率较高
7. 动态换仓阈值
高波动日提高换仓门槛:
正常日:得分差 > 0.20 即换仓
高波动日:得分差 > 0.30 才换仓(提高1.5倍)
减少在高波动时的"频繁换手"和"追涨杀跌"。
8. 胜率自适应
让策略"学会"调整:
连胜次数 | 操作 |
|---|---|
连胜2次 | 降仓1只 |
连胜3次 | 增仓1只 |
连败次数 | 操作 |
|---|---|
连败2次 | 降仓1只 |
策略参数一览
可调参数(小白友好)
参数 | 默认值 | 说明 |
|---|---|---|
| 4 | 持仓数量(3-6只动态调整) |
| 25 | 动量计算周期(交易日) |
| 6% | 移动止损回撤比例 |
| 12% | 分批止盈触发线 |
高级参数(专业用户)
参数 | 默认值 | 说明 |
|---|---|---|
| 18 | RSRS计算天数 |
| 15% | RSRS对得分的影响权重 |
| 0.05 | 拉普拉斯滤波器参数 |
策略优势
✅ 优点
系统性高:有明确的选股、止损、止盈规则,避免情绪干扰
风险可控:多重止损机制,有效控制回撤
自适应强:能根据市场环境自动调整策略
行业分散:避免单押某一行业,降低黑天鹅风险
流动性好:仅选择日均成交额>1000万的ETF
⚠️ 注意事项
趋势市表现好:在单边上涨/下跌行情中效果显著
震荡市表现差:来回震荡可能导致频繁止损
滑点成本:高换手率策略需注意交易成本
历史不代表未来:动量效应可能失效
策略运行时间轴
09:00 盘前准备
│
├── 持仓检查
├── 回撤监控
├── 计算流动性阈值
├── 更新行业ETF池
└── 合并候选池
│
09:30-11:30 盘中监控(分钟级止损检查)
│
13:00-14:40 盘中监控(分钟级止损检查)
│
14:40 卖出操作
│
├── 震荡期状态检查
├── 动量计算
└── 执行卖出
│
14:45 买入操作
│
├── 入场择时检查
└── 执行买入
│
15:00 收盘
│
15:10 重置标志适合人群
适合 ✅ | 不适合 ❌ |
|---|---|
有一定投资经验 | 完全不懂投资的小白 |
能承受一定波动 | 追求稳定收益、不能接受亏损 |
认同"趋势投资"理念 | 信仰"价值投资"、"长期持有" |
有时间关注账户 | 没时间管理,想"躺平" |
Q:这个策略能赚钱吗? A:没有"圣杯"。过去表现不代表未来。2025年动量效应显著,2026年可能不同。从更长的历史表现看,在21年到24年期间这个策略表现一般,进入结构性牛市后,才是该策略的甜点区。
版本历史
版本 | 主要改进 |
|---|---|
V5 | 基础动量轮动框架 |
V6 | 加入RSRS、行业分散、止损冷却 |
V6.2 | 动态仓位管理、RSRS自适应放松 |
V6.3 | 移动止损、分批止盈、市场宽度指标、入场择时 |
V6.4 | 去掉RSRS/R²硬过滤,仅用synergy软调整 |
策略代码仅供学习研究量化知识使用,请勿接入实盘使用。若因该策略代码及其衍生代码造成亏损,作者不承担任何责任。
核心代码摘要
# ========================================================
# ETF轮动策略 V6.4 - ptrade平台适配版
# ========================================================
# V6.4 改进(基于V6.3):
# 1. 去掉RSRS硬过滤:RSRS从一票否决改为仅通过synergy软调整(预期减少44%误杀)
# 2. 去掉R²硬过滤:R²已是动量得分乘数,单独过滤等于双重过滤同一因子
# 3. 正常期关闭动态滤波:正常期已有RSRS+动量双重保障,动态滤波仅震荡期启用
# ========================================================
# V6.3 改进:
# 1. 移动止损(Trailing Stop): 持仓从最高点回撤N%出场,锁定利润
# 2. 动量衰竭出场: 持仓标的动量得分连续2日<0则清仓
# 3. 分批止盈: 单只盈利>12%减半仓,余下用移动止损
# 4. 市场宽度指标: ETF池正动量占比<35%降仓至2只+防御仓位
# 5. 入场择时: 尾盘沪深300跌幅>2%时半仓买入
# 6. 动态换仓阈值: 高波动日提高换仓门槛,减少无效换手
# 7. 胜率自适应: 连续亏损降仓,连续盈利加仓
# ========================================================
# V6.2 改进:
# A. 动态仓位管理:根据回撤调整目标持仓数(3-6)和单只仓位(18-30%)
# B. 震荡期/高回撤时放松RSRS过滤阈值,扩大候选池(Top10→Top20)
# V6 改进:
# 1. 年化收益率上限提高至1000%,高分段对数压缩保持区分度
# 2. 行业分散约束:最终持仓同一行业最多1只(按名称前2字符)
# 3. 止损行业冷却:触发止损的行业10个交易日内禁止参与排名
# 4. RSRS过滤后增加冷却行业过滤,减少过度过滤
# ========================================================
# ptrade适配:
# - initialize() 替代 init()
# - handle_data(context, data) 替代 handle_bar(context, bar_dict)
# - get_history() 替代 get_price()
# - context.portfolio.positions 替代 context.portfolio.stock_account.positions
# - set_commission/commission_ratio 替代 PerShare
# ========================================================
import numpy as np
import math
import pandas as pd
from datetime import datetime, date, timedelta
from scipy import stats
# ==================== 策略初始化 ====================
def initialize(context):
set_benchmark('510300.SS')
log.info('【五福ETF轮动-V6.4-ptrade版】启动!')
# ptrade: set_commission 使用 commission_ratio, min_commission, type 参数
set_commission(commission_ratio=0.0001, min_commission=5.0, type='ETF')
# ptrade: set_slippage 使用滑点比例
set_slippage(slippage=0.001)
# ptrade: set_volume_ratio 设置回测成交量限制比例
set_volume_ratio(volume_ratio=0.25)
g.fixed_etf_pool = [
'518880.SS', '161226.SZ', '159980.SZ', '501018.SS', '159985.SZ',
'513100.SS', '159509.SZ', '513290.SS', '513500.SS', '159518.SZ',
'159502.SZ', '159529.SZ', '513400.SS', '520830.SS', '513520.SS', '513030.SS',
'513090.SS', '513180.SS', '513120.SS', '513330.SS', '513750.SS',
'159892.SZ', '159605.SZ', '513190.SS', '510900.SS', '513630.SS',
'513920.SS', '159323.SZ', '513970.SS',
'510500.SS', '512100.SS', '563300.SS', '510300.SS', '512050.SS',
'510760.SS', '159915.SZ', '159949.SZ', '159967.SZ', '588080.SS',
'588220.SS', '511380.SS',
'513310.SS', '588200.SS', '159852.SZ', '512880.SS', '159206.SZ',
'512400.SS', '512980.SS', '159516.SZ', '512480.SS', '515880.SS',
'562500.SS', '159218.SZ', '159869.SZ', '159870.SZ', '159326.SZ',
'159851.SZ', '560860.SS', '159363.SZ', '588170.SS', '159755.SZ',
'512170.SS', '512800.SS', '159819.SZ', '512710.SS', '159638.SZ',
'517520.SS', '515980.SS', '159995.SZ', '159227.SZ', '512660.SS',
'512690.SS', '516150.SS', '512890.SS', '588790.SS', '159992.SZ',
'512070.SS', '562800.SS', '512010.SS', '515790.SS', '510880.SS',
'159928.SZ', '159883.SZ', '159998.SZ', '515220.SS', '561980.SS',
'515400.SS', '515120.SS', '159566.SZ', '515050.SS', '516510.SS',
'159256.SZ', '159766.SZ', '512200.SS', '513350.SS', '159583.SZ',
'159732.SZ', '516160.SS', '516520.SS', '562590.SS', '515030.SS',
'512670.SS', '561330.SS', '516190.SS', '159840.SZ', '159611.SZ',
'159981.SZ', '159865.SZ', '561360.SS', '159667.SZ', '515170.SS',
'513360.SS', '159825.SZ', '515210.SS',
]
g.filtered_fixed_pool = []
g.dynamic_etf_pool = []
g.merged_etf_pool = []
g.ranked_etfs_result = []
g.target_etfs_list = []
g.etf_names_dict = {}
g.cache_date = None
g.yesterday_close_cache = {}
g.holdings_num = 4
g.defensive_etf = '511880.SS'
g.min_money = 10
# V6.2 动态仓位管理参数
g.min_holdings_num = 3 # 高回撤时最少持仓数
g.max_holdings_num = 6 # 低回撤时最多持仓数
g.max_position_pct = 0.30 # 单只最大仓位比例
g.min_position_pct = 0.18 # 单只最小仓位比例
g.current_holdings_num = 4 # 动态目标持仓数(运行时更新)
g.current_position_pct = 0.25 # 动态单只仓位比例(运行时更新)
g.lookback_days = 25
g.min_score_threshold = 0
g.score_threshold_ratio = 0.9
g.use_short_momentum_period = False
g.short_momentum_lookback = 21
g.short_momentum_min_score = 0
# RSRS 参数
g.enable_rsrs_filter = True
g.rsrs_lookback = 18
g.rsrs_std_lookback = 60
g.rsrs_base_threshold = 0.0
g.enable_rsrs_direction = True
g.rsrs_min_slope = 0.001
g.enable_rsrs_adaptive = True
g.rsrs_volatility_lookback = 30
g.rsrs_vol_threshold_high = 0.02
g.rsrs_vol_adjust_factor = 0.3
g.enable_rsrs_synergy = True
g.rsrs_quality_weight = 0.15
g.enable_r2_filter = True
g.r2_threshold = 0.4
g.enable_volume_check = True
g.volume_lookback = 5
g.volume_threshold = 1.8
g.enable_loss_filter = True
g.loss = 0.97
g.enable_premium_filter = True
g.max_premium_rate = 30
# V6.2 RSRS自适应放松:震荡期/高回撤时降低RSRS门槛
g.rsrs_relax_factor = 0.5 # 放松时RSRS阈值乘以0.5(降低50%)
g.rsrs_relax_dd_threshold = 0.05 # 回撤>=5%时触发放松
g.use_expanded_pool = True # 是否使用扩大候选池
g.expanded_pool_top_n = 20 # 扩大候选池的候选数量(从Top10扩展到Top20)
g.laplace_s_param = 0.05
g.laplace_min_slope = 0.002
g.gaussian_sigma = 1.2
g.gaussian_min_slope = 0.002
# === V6.3 择时优化参数 ===
# 移动止损
g.enable_trailing_stop = True
g.trailing_stop_pct = 0.06 # 从最高点回撤6%触发移动止损
g.trailing_stop_highs = {} # {etf: highest_price_since_entry}
# 分批止盈
g.enable_partial_take_profit = True
g.take_profit_threshold = 0.12 # 盈利>12%触发部分止盈
g.take_profit_ratio = 0.50 # 减仓50%
g.partial_take_profit_done = {} # {etf: True/False}
# 动量衰竭出场
g.enable_momentum_exit = True
g.momentum_exit_days = 2 # 连续N日动量得分<0则清仓
g.momentum_negative_days = {} # {etf: consecutive_negative_days}
# 市场宽度指标
g.enable_market_breadth = True
g.breadth_positive_threshold = 0.35 # 正动量占比<35%触发降仓
g.breadth_weak_threshold = 0.50 # <50%为弱势信号
g.breadth_state = 'normal' # normal/weak/crash
g.breadth_override_holdings = None # 市场宽度覆盖持仓数
g.breadth_override_position = None # 市场宽度覆盖仓位比例
# 入场择时
g.enable_entry_timing = True
g.entry_benchmark_drop = 0.02 # 沪深300跌幅>2%触发半仓
g.entry_half_position = False # 是否启用半仓模式
# 动态换仓阈值
g.enable_dynamic_switch = True
g.switch_score_base = 0.20 # 基础换仓阈值
g.switch_volatility_multiplier = 1.5 # 高波动日阈值倍数
g.current_switch_threshold = 0.20
# 胜率自适应
g.enable_win_rate_adaptive = True
g.win_streak = 0
g.lose_streak = 0
g.trade_history = [] # [{etf, entry_price, exit_price, pnl_pct, date}]
g.enable_range_bound_mode = True
g.current_filter = 'laplace'
g.risk_state = 'normal'
g.lookback_high_low_days = 20
g.risk_benchmark = '510300.SS'
g.enable_bias_trigger = True
g.bias_threshold = 0.08
g.ma_period = 20
g.enable_rsi_trigger = True
g.rsi_overbought = 70
g.rsi_pullback = 65
g.previous_rsi = None
g.enable_stop_loss_trigger = True
g.enable_low_point_rise_trigger = True
g.low_point_rise_threshold = 0.04
g.enable_stable_signal_trigger = True
g.drawdown_recovery = 0.02
g.max_range_bound_days = 20
g.stable_days = 0
g.filter_switch_cooldown = 3
g.last_switch_date = None
g.range_bound_start_date = None
g.range_bound_start_count = 0
g.range_bound_days_count = 0
g.trading_day_count = 0
g.stop_loss_triggered_today = False
g.previous_drawdown = None
g.max_portfolio_value = 0
g.drawdown_threshold = 0.03
g.drawdown_records = []
g.use_fixed_stop_loss = True
g.fixed_stop_loss_threshold = 0.95
g.use_pct_stop_loss = False
g.pct_stop_loss_threshold = 0.95
g.avg_etf_money_threshold = None
g.cost_prices = {}
# 行业分散约束:同义词前缀匹配,限制最终持仓中同一行业最多N只
g.max_same_industry = 1 # 同一行业最多1只
# 止损行业冷却:触发止损的行业N个交易日内禁止参与排名
g.stopped_industries = {} # {industry_name: remaining_cooldown_days}
g.industry_cooldown_days = 10 # 冷却期10个交易日
g._initialized = False
g._sell_executed_today = False
g._trading_executed_today = False
g.min_hold_days = 5
g.switch_score_threshold = 0.20
g.purchase_dates = {}
# === V5 诊断计数器 ===
g.diag_day_count = 0
g.diag_rsrs_filter_count = 0 # RSRS过滤累计
g.diag_range_bound_entries = 0 # 震荡期进入次数
g.diag_stop_loss_count = 0 # 止损累计次数
log.info(f'【策略参数】持仓:动态{g.min_holdings_num}-{g.max_holdings_num}只 | 动量周期:{g.lookback_days}天 | '
f'震荡期:{"开启" if g.enable_range_bound_mode else "关闭"} | '
f'固定池:{len(g.fixed_etf_pool)}只 | RSRS协同:{"开启" if g.enable_rsrs_synergy else "关闭"} | '
f'动态仓位:{"开启" if True else "关闭"} | 候选池扩展:{"开启" if g.use_expanded_pool else "关闭"} | '
f'择时优化:{"开启" if True else "关闭"}')
# ==================== 开盘前运行 ====================
def before_trading_start(context, data):
date_str = context.blotter.current_dt.strftime('%Y-%m-%d')
log.info(f'{date_str} 盘前运行 - ETF轮动策略V6.4-ptrade版')
if not g._initialized:
init_range_bound_status(context)
_update_dynamic_position_params(0.0)
g._initialized = True
morning_routine(context)
# ==================== 主逻辑(每日15:00或分钟bar结束调用) ====================
def handle_data(context, data):
current_time = context.blotter.current_dt.strftime('%H:%M')
# ptrade日线模式: 每天15:00(回测)或14:50(交易)调用
# 我们将所有逻辑整合到handle_data中执行
if current_time >= '14:40' and not g._sell_executed_today:
# 先执行卖出
afternoon_sell_trades(context)
g._sell_executed_today = True
# 再执行买入
afternoon_buy_trades(context, data)
g._trading_executed_today = True
return
if current_time >= '15:00':
g._sell_executed_today = False
g._trading_executed_today = False
return
# 盘中止损检查(分钟模式时每根bar结束后执行)
if '09:31' <= current_time <= '11:29' or '13:01' <= current_time <= '14:39':
check_stop_loss(context, data)
# ==================== 下午卖出 ====================
def afternoon_sell_trades(context):
log.info('▶️ 【14:45卖出】启动...')
if g.enable_range_bound_mode:
check_and_exit_range_bound_mode(context)
check_and_enter_range_bound_mode(context)
# V6.3 择时出场:移动止损 + 动量衰竭 + 分批止盈
v63_exit_trades(context)
log.info('【动量计算】开始...')
final_list = get_final_ranked_etfs(context)
g.ranked_etfs_result = final_list
# === V5 诊断:输出每日诊断摘要 ===
g.diag_day_count += 1
if g.diag_day_count % 5 == 0:
print_diagnostic_summary(context, final_list)
log.info('【卖出执行】...')
execute_sell_trades(context)
log.info('⏸️ 【14:45卖出】完毕!')
# ==================== 下午买入 ====================
def afternoon_buy_trades(context, data=None):
log.info('▶️ 【14:46买入】启动...')
# V6.3 入场择时:检查大盘环境
if g.enable_entry_timing:
_check_entry_timing(context)
log.info('【买入执行】...')
execute_buy_trades(context, data)
g.stop_loss_triggered_today = False
g.trading_day_count += 1
# 行业冷却计数器衰减
cooled = [ind for ind, rem in g.stopped_industries.items()]
for ind in cooled:
g.stopped_industries[ind] -= 1
if g.stopped_industries[ind] <= 0:
del g.stopped_industries[ind]
log.info(f'✅ 【行业冷却结束】{ind} 冷却期满')
g._trading_executed_today = True
# 重置半仓模式
g.entry_half_position = False
log.info('⏸️ 【14:46买入】完毕!')
# ==================== 收盘后运行 ====================
def after_trading_end(context, data):
# ptrade: context.portfolio.positions 替代 context.portfolio.stock_account.positions
positions = context.portfolio.positions
holding = [s for s, pos in positions.items() if pos.amount > 0]
if holding:
log.info(f'当日持仓: {", ".join(holding)} (共{len(holding)}只)')
else:
log.info('当日无持仓')
log.info(f'账户总价值: {context.portfolio.portfolio_value:.2f}, '
f'可用资金: {context.portfolio.cash:.2f}')
# ==================== V5 诊断函数 ====================
def print_diagnostic_summary(context, final_list):
"""
打印详细的诊断信息,帮助分析策略问题
"""
log.info('=' * 80)
log.info(f'📊 V5诊断报告 - 第{g.diag_day_count}个交易日')
log.info(f' 账户净值: {context.portfolio.portfolio_value:,.0f}')
log.info(f' 滤波器状态: {g.current_filter} | 风险状态: {g.risk_state}')
log.info(f' 震荡期天数: {g.range_bound_days_count}')
log.info(f' 累计止损次数: {g.diag_stop_loss_count}')
log.info(f' 累计RSRS过滤: {g.diag_rsrs_filter_count}')
# ptrade: context.portfolio.positions
positions = context.portfolio.positions
holding = [s for s, pos in positions.items() if pos.amount > 0]
target_hold = g.current_holdings_num if g.current_holdings_num > 0 else g.holdings_num
log.info(f' 当前持仓: {len(holding)}只 (目标{target_hold}只)')
log.info(f' 市场宽度: {g.breadth_state} | 换仓阈值: {g.current_switch_threshold:.0%} | '
f'连胜/连败: {g.win_streak}/{g.lose_streak} | 半仓模式: {g.entry_half_position}')
# 分析最终排名结果
if final_list:
log.info(f'\n📈 最终目标ETF ({len(final_list)}只):')
for item in final_list:
score_key = 'composite_score' if g.enable_rsrs_synergy else 'momentum_score'
cs = item.get(score_key, 0)
ms = item.get('momentum_score', 0)
rs = item.get('rsrs_raw', 0)
rstd = item.get('rsrs_std', 0)
rq = item.get('rsrs_quality', 0)
r2 = item.get('r_squared', 0)
ann = item.get('annualized_returns', 0)
# 计算RSRS对得分的影响
if g.enable_rsrs_synergy and ms != 0:
w = g.rsrs_quality_weight
raw_contribution = ms * (1 - w)
rsrs_contribution = rq * w * abs(ms)
rsrs_impact_pct = rsrs_contribution / abs(cs) * 100 if cs != 0 else 0
log.info(f' {item["etf"]} {item["etf_name"]}:')
log.info(f' 动量得分={ms:.4f} (年化={ann:.2%}, R²={r2:.3f})')
log.info(f' RSRS: raw={rs:.4f}, std={rstd:.2f}, quality={rq:.3f}')
log.info(f' 综合得分={cs:.4f} = 动量贡献{raw_contribution:.4f} + RSRS贡献{rsrs_contribution:.4f} (RSRS占比{rsrs_impact_pct:.1f}%)')
else:
log.info(f' {item["etf"]} {item["etf_name"]}: 得分={cs:.4f}, R²={r2:.3f}')
else:
log.info(' 无目标ETF')
log.info('=' * 80)
# ==================== 震荡期状态初始化 ====================
def init_range_bound_status(context):
if not g.enable_range_bound_mode:
return
log.info('🔍 【首次运行】初始化震荡期状态...')
try:
prev_date = _get_prev_date_str(context)
lookback = max(g.ma_period, g.lookback_high_low_days) + 30
# ptrade: get_history 替代 get_price
df = get_history(
count=lookback, frequency='1d', field=['close', 'high', 'low'],
security_list=g.risk_benchmark, fq='pre'
)
if df is None or len(df) < max(g.ma_period, g.lookback_high_low_days):
log.info(f'【首次运行】数据不足,保持正常期')
return
close = df['close'].values
high = df['high'].values
low = df['low'].values
current_price = close[-1]
if len(close) >= g.lookback_high_low_days:
recent_high = np.max(high[-g.lookback_high_low_days:])
recent_low = np.min(low[-g.lookback_high_low_days:])
else:
recent_high = np.max(high)
recent_low = np.min(low)
ma = np.mean(close[-g.ma_period:])
bias = (current_price - ma) / ma if ma > 0 else 0
rise_from_low = (current_price - recent_low) / recent_low if recent_low > 0 else 0
current_rsi = _calc_rsi(close, period=14)
should_enter = False
signals = []
if g.enable_bias_trigger and bias > g.bias_threshold:
should_enter = True
signals.append(f'乖离率{bias:.2%}>{g.bias_threshold:.0%}')
if g.enable_rsi_trigger and current_rsi is not None and len(close) >= 15:
prev_rsi = _calc_rsi(close[:-1], period=14)
if prev_rsi is not None and prev_rsi > g.rsi_overbought and current_rsi < g.rsi_pullback:
should_enter = True
signals.append(f'RSI超买回落{prev_rsi:.1f}→{current_rsi:.1f}')
if should_enter:
g.current_filter = 'range_bound'
g.risk_state = 'range_bound'
g.range_bound_start_date = prev_date
g.range_bound_start_count = 0
g.range_bound_days_count = 0
g.diag_range_bound_entries += 1
log.info(f'🔔 【首次运行】初始化进入震荡期: {"; ".join(signals)}')
else:
g.current_filter = 'laplace'
g.risk_state = 'normal'
if len(close) >= g.lookback_high_low_days:
g.previous_drawdown = (recent_high - current_price) / recent_high if recent_high > 0 else 0
else:
g.previous_drawdown = 0
g.previous_rsi = current_rsi
log.info(f'📌 【首次运行】初始状态: 正常期, 乖离率:{bias:.2%}, RSI:{current_rsi:.1f}')
except Exception as e:
log.info(f'【首次运行】初始化异常: {e},保持正常期')
# ==================== 晨间流水线 ====================
def morning_routine(context):
log.info('★' * 60)
log.info('▶️ 【晨间准备】启动...')
check_positions(context)
monitor_drawdown(context)
# V6.3 市场宽度计算
if g.enable_market_breadth:
_update_market_breadth(context)
# V6.3 动态换仓阈值
_update_switch_threshold(context)
filter_fixed_pool_by_volume(context)
update_dynamic_pool(context)
daily_merge_etf_pools(context)
log.info('⏸️ 【晨间准备】完毕!')
# ==================== 持仓检查 ====================
def check_positions(context):
# ptrade: context.portfolio.positions
positions = context.portfolio.positions
for security, pos in positions.items():
if pos.amount > 0:
name = _get_name(security)
log.info(f'📊 【持仓】{security} {name}, 数量:{pos.amount}, '
f'成本:{pos.cost_basis:.3f}')
def monitor_drawdown(context):
try:
# ptrade: portfolio_value 替代 total_value
current_value = context.portfolio.portfolio_value
if current_value > g.max_portfolio_value:
g.max_portfolio_value = current_value
if g.max_portfolio_value > 0:
current_dd = (g.max_portfolio_value - current_value) / g.max_portfolio_value
if current_dd >= g.drawdown_threshold:
log.info(f'【回撤预警】回撤 {current_dd:.2%} '
f'(阈值:{g.drawdown_threshold:.0%}), '
f'净值:{current_value:,.0f}, 最高:{g.max_portfolio_value:,.0f}')
# V6.2 动态仓位:每次监控回撤时更新仓位参数
_update_dynamic_position_params(current_dd)
except Exception as e:
log.info(f'【回撤监控】异常: {e}')
def _update_dynamic_position_params(current_dd):
"""V6.2+V6.3 根据回撤+市场宽度+胜率动态调整持仓数和单只仓位比例"""
# 回撤分级 -> 持仓数 & 单只仓位
if current_dd <= 0.02:
target = g.max_holdings_num # 6
pos_pct = g.min_position_pct # 18%
elif current_dd <= 0.05:
target = 5
pos_pct = 0.22
elif current_dd <= 0.10:
target = 4
pos_pct = 0.25
elif current_dd <= 0.15:
target = g.min_holdings_num # 3
pos_pct = g.max_position_pct # 30%
else:
target = 2
pos_pct = g.max_position_pct
# V6.3 市场宽度覆盖:弱势市场强制降仓
if g.breadth_state == 'crash':
target = min(target, 2)
pos_pct = min(pos_pct, 0.25)
elif g.breadth_state == 'weak':
target = max(target - 1, 2)
# V6.3 胜率自适应:连续亏损降仓,连续盈利加仓
if g.enable_win_rate_adaptive:
if g.lose_streak >= 2:
target = max(target - 1, 2)
pos_pct = pos_pct * 0.85
elif g.win_streak >= 3:
target = min(target + 1, g.max_holdings_num)
pos_pct = min(pos_pct * 1.1, g.max_position_pct)
# 仅在参数变化时输出日志
if target != g.current_holdings_num or abs(pos_pct - g.current_position_pct) > 0.01:
old = f'{g.current_holdings_num}只/{g.current_position_pct:.0%}'
g.current_holdings_num = target
g.current_position_pct = pos_pct
log.info(f'【动态仓位】回撤{current_dd:.2%} -> 目标持仓{target}只, 单只{pos_pct:.0%} (原{old})')
# ==================== 动态池更新 ====================
def update_dynamic_pool(context):
log.info('【动态池更新】开始...')
try:
prev_date = _get_prev_date_str(context)
# ptrade: get_all_securities 可能不可用,仅用固定池
try:
df_etf = get_all_securities('etf', date=prev_date)
if df_etf is not None and hasattr(df_etf, 'index') and len(df_etf) > 0:
etf_list = [s for s in df_etf.index.tolist() if not s.endswith('.OF')]
if hasattr(df_etf, 'display_name'):
if hasattr(df_etf['display_name'], 'to_dict'):
g.etf_names_dict = df_etf['display_name'].to_dict()
elif isinstance(df_etf['display_name'], dict):
g.etf_names_dict = df_etf['display_name']
log.info(f'【动态池更新】全市场ETF: {len(etf_list)}只')
else:
log.info('【动态池更新】get_all_securities不可用,仅用固定池')
g.dynamic_etf_pool = []
return
except Exception as e:
log.info(f'【动态池更新】get_all_securities不可用: {e},仅用固定池')
g.dynamic_etf_pool = []
return
threshold = g.avg_etf_money_threshold if g.avg_etf_money_threshold is not None else 1e7
# ptrade: get_history 替代 get_price
df = get_history(
count=3, frequency='1d', field='money',
security_list=etf_list, fq=None
)
if _is_empty(df):
log.info('【动态池更新】无法获取成交额')
g.dynamic_etf_pool = []
return
code_total_money = {}
if isinstance(df, dict):
for code, code_df in df.items():
if isinstance(code_df, pd.DataFrame):
code_total_money[code] = code_df['money'].sum()
elif isinstance(code_df, dict):
code_total_money[code] = sum(code_df.get('money', []))
if not code_total_money:
log.info('【动态池更新】无法解析成交额')
g.dynamic_etf_pool = []
return
avg_daily_money = {c: v / 3.0 for c, v in code_total_money.items()}
qualified = {c: v for c, v in avg_daily_money.items() if v > threshold}
qualified_sorted = sorted(qualified.items(), key=lambda x: x[1], reverse=True)
log.info(f'【动态池更新】流动性过滤: {len(etf_list)}→{len(qualified)}只')
exclude_keywords = [
'300', '500', '1000', '2000', '800', '30', '50', '100', '180', '200',
'沪深', '中证', '上证', '深证', '深成', 'A50', 'A100', 'A500', '深100',
'短融', '可转债', '转债', '双债', '利率债', '国债', '地债', '政金债',
'国开债', '信用债', '企业债', '公司债', '城投债', '城投', '美元债',
'货币', '现金', '快线', '快钱', 'ESG', 'MSCI', '债',
]
industry_map = {}
for code, _ in qualified_sorted:
name = g.etf_names_dict.get(code, '')
skipped = False
for kw in exclude_keywords:
if kw in name:
skipped = True
break
if skipped:
continue
key = name[:2] if len(name) >= 2 else name
if key not in industry_map:
industry_map[key] = []
industry_map[key].append((code, name, avg_daily_money.get(code, 0)))
pool = []
for key, items in industry_map.items():
items.sort(key=lambda x: x[2], reverse=True)
pool.append(items[0][0])
g.dynamic_etf_pool = pool[:100]
log.info(f'【动态池更新完成】动态池: {len(g.dynamic_etf_pool)}只')
except Exception as e:
log.info(f'【动态池更新】异常: {e}')
g.dynamic_etf_pool = []
# ==================== 固定池流动性过滤 ====================
def filter_fixed_pool_by_volume(context):
log.info('【固定池过滤】开始...')
if not g.fixed_etf_pool:
return
threshold = g.avg_etf_money_threshold if g.avg_etf_money_threshold is not None else 1e7
try:
prev_date = _get_prev_date_str(context)
# ptrade: get_history 替代 get_price
df = get_history(
count=3, frequency='1d', field='money',
security_list=g.fixed_etf_pool, fq=None
)
if _is_empty(df):
log.info('【固定池过滤】无法获取成交额,保留全部')
g.filtered_fixed_pool = g.fixed_etf_pool[:]
return
code_total_money = {}
if isinstance(df, dict):
for code, code_df in df.items():
if isinstance(code_df, pd.DataFrame):
code_total_money[code] = code_df['money'].sum()
elif isinstance(code_df, dict):
code_total_money[code] = sum(code_df.get('money', []))
if not code_total_money:
log.info('【固定池过滤】无法解析成交额,保留全部')
g.filtered_fixed_pool = g.fixed_etf_pool[:]
return
avg_daily_money = {c: v / 3.0 for c, v in code_total_money.items()}
qualified = [c for c, v in avg_daily_money.items() if v > threshold]
g.filtered_fixed_pool = qualified
removed = set(g.fixed_etf_pool) - set(g.filtered_fixed_pool)
if removed:
log.info(f'【固定池过滤】剔除{len(removed)}只低流动性ETF')
log.info(f'【固定池过滤】保留{len(g.filtered_fixed_pool)}只')
except Exception as e:
log.info(f'【固定池过滤】异常: {e}')
g.filtered_fixed_pool = g.fixed_etf_pool[:]
# ==================== 合并ETF池 ====================
def daily_merge_etf_pools(context):
merged = list(set(g.filtered_fixed_pool + g.dynamic_etf_pool))
merged = [s for s in merged if not s.endswith('.OF')]
merged.sort()
log.info(f'【合并池】固定:{len(g.filtered_fixed_pool)}只, '
f'动态:{len(g.dynamic_etf_pool)}只, 合并:{len(merged)}只')
g.merged_etf_pool = merged
# ==================== 震荡期退出检查 ====================
def check_and_exit_range_bound_mode(context):
if g.current_filter != 'range_bound':
return
log.info('🔍 【震荡期退出检查】...')
try:
prev_date = _get_prev_date_str(context)
lookback = max(g.ma_period, g.lookback_high_low_days) + 30
# ptrade: get_history 替代 get_price
df = get_history(
count=lookback, frequency='1d', field=['close', 'high', 'low'],
security_list=g.risk_benchmark, fq='pre'
)
if df is None or len(df) < max(g.ma_period, g.lookback_high_low_days):
return
close = df['close'].values
high = df['high'].values
low = df['low'].values
current_price = close[-1]
recent_high = np.max(high[-g.lookback_high_low_days:]) if len(high) >= g.lookback_high_low_days else np.max(high)
recent_low = np.min(low[-g.lookback_high_low_days:]) if len(low) >= g.lookback_high_low_days else np.min(low)
current_drawdown = (recent_high - current_price) / recent_high if recent_high > 0 else 0
rise_from_low = (current_price - recent_low) / recent_low if recent_low > 0 else 0
ma = np.mean(close[-g.ma_period:])
current_rsi = _calc_rsi(close, period=14)
recovery_signals = []
if g.enable_low_point_rise_trigger and rise_from_low >= g.low_point_rise_threshold:
recovery_signals.append(f'从低点上涨{rise_from_low:.2%}')
if g.enable_stable_signal_trigger:
if current_price > ma:
recovery_signals.append('价格站上均线')
if len(close) >= 2 and close[-1] > close[-2]:
recovery_signals.append('价格回升')
if g.previous_drawdown is not None and current_drawdown < g.previous_drawdown:
recovery_signals.append('回撤收窄')
if current_rsi is not None and g.previous_rsi is not None and current_rsi > g.previous_rsi:
recovery_signals.append('RSI回升')
if current_drawdown < g.drawdown_recovery:
g.stable_days += 1
else:
g.stable_days = 0
g.previous_drawdown = current_drawdown
g.previous_rsi = current_rsi
range_bound_days = g.trading_day_count - g.range_bound_start_count
if range_bound_days >= g.max_range_bound_days:
recovery_signals.append(f'震荡期满({range_bound_days}天)')
low_rise = g.enable_low_point_rise_trigger and rise_from_low >= g.low_point_rise_threshold
stable = (g.enable_stable_signal_trigger and
current_drawdown < g.drawdown_recovery and
len(recovery_signals) >= 2 and g.stable_days >= 2)
force_exit = range_bound_days >= g.max_range_bound_days
if low_rise or stable or force_exit:
can_switch = True
if g.last_switch_date is not None:
days_since = g.trading_day_count - g.last_switch_date
if days_since < g.filter_switch_cooldown:
can_switch = False
log.info(f'⏳ 【冷却期】距上次切换{days_since}天')
if can_switch:
g.current_filter = 'laplace'
g.risk_state = 'normal'
g.last_switch_date = g.trading_day_count
g.range_bound_start_date = None
g.range_bound_start_count = 0
g.range_bound_days_count = 0
g.stable_days = 0
log.info(f'🔔 【退出震荡期】切换回拉普拉斯: {"; ".join(recovery_signals)}')
else:
log.info('📌 【退出检查】未满足退出条件,保持震荡期')
except Exception as e:
log.info(f'【退出检查】异常: {e}')
# ==================== 震荡期进入检查 ====================
def check_and_enter_range_bound_mode(context):
if not g.enable_range_bound_mode:
return
if g.current_filter == 'range_bound':
return
can_switch = True
if g.last_switch_date is not None:
days_since = g.trading_day_count - g.last_switch_date
if days_since < g.filter_switch_cooldown:
can_switch = False
if not can_switch:
return
log.info('🔍 【震荡期进入检查】...')
risk_signals = []
try:
prev_date = _get_prev_date_str(context)
# ptrade: get_history 替代 get_price
df = get_history(
count=g.ma_period + 20, frequency='1d', field=['close'],
security_list=g.risk_benchmark, fq='pre'
)
if df is not None and len(df) >= g.ma_period:
close = df['close'].values
current_price = close[-1]
if g.enable_bias_trigger:
ma = np.mean(close[-g.ma_period:])
bias = (current_price - ma) / ma
if bias > g.bias_threshold:
risk_signals.append(f'乖离率{bias:.2%}>{g.bias_threshold:.0%}')
if g.enable_rsi_trigger and len(close) >= 15:
current_rsi = _calc_rsi(close, period=14)
prev_rsi = _calc_rsi(close[:-1], period=14)
if prev_rsi is not None and prev_rsi > g.rsi_overbought and current_rsi < g.rsi_pullback:
risk_signals.append(f'RSI超买回落({prev_rsi:.1f}→{current_rsi:.1f})')
except Exception as e:
log.info(f'【进入检查】异常: {e}')
if g.enable_stop_loss_trigger and g.stop_loss_triggered_today:
risk_signals.append('今日触发止损')
if len(risk_signals) > 0:
g.current_filter = 'range_bound'
g.risk_state = 'range_bound'
g.last_switch_date = g.trading_day_count
g.range_bound_start_date = context.blotter.current_dt.strftime('%Y%m%d')
g.range_bound_start_count = g.trading_day_count
g.range_bound_days_count = 0
g.stable_days = 0
g.diag_range_bound_entries += 1
log.info(f'🔔 【进入震荡期】切换高斯滤波器: {"; ".join(risk_signals)}')
else:
log.info('✅ 【进入检查】保持正常期(拉普拉斯)')
def _get_industry(etf_name):
"""从ETF名称提取行业标识(前2个字符)"""
if not etf_name:
return '其他'
return etf_name[:2] if len(etf_name) >= 2 else etf_name
def _apply_industry_filter(candidates, rank_key):
"""行业分散约束:同一行业最多g.max_same_industry只,按得分从高到低贪心选择"""
if not candidates:
return candidates
max_per_industry = g.max_same_industry
industry_count = {}
result = []
target_hold = g.current_holdings_num if g.current_holdings_num > 0 else g.holdings_num
for item in candidates:
industry = _get_industry(item.get('etf_name', ''))
if industry_count.get(industry, 0) >= max_per_industry:
continue
industry_count[industry] = industry_count.get(industry, 0) + 1
result.append(item)
if len(result) >= target_hold:
break
return result
# ==================== 动量计算核心 ====================
def calculate_momentum_score(price_series, lookback_days):
if len(price_series) < lookback_days + 1:
return None, None, None
recent = price_series[-(lookback_days + 1):]
y = np.log(recent)
x = np.arange(len(y))
weights = np.linspace(1, 2, len(y))
slope, intercept = np.polyfit(x, y, 1, w=weights)
# V6 fix: 提高年化收益率上限,使用对数压缩防止高分过度拉开差距
MAX_ANNUALIZED_RETURN = 10.0 # 1000%年化上限(从500%提高到1000%)
raw_annualized = min(math.exp(slope * 250) - 1, MAX_ANNUALIZED_RETURN)
# 对数压缩:超过300%的收益进行对数压缩,保持相对排序同时缩小绝对差距
if raw_annualized > 3.0:
# log(1+x) * 3/log(4) 使得 x=3 时值为3, x=10 时值约5.2
annualized_returns = 3.0 * (1 + math.log(1 + (raw_annualized - 3.0)) / math.log(4))
else:
annualized_returns = raw_annualized
ss_res = np.sum(weights * (y - (slope * x + intercept)) ** 2)
ss_tot = np.sum(weights * (y - np.mean(y)) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot else 0
# V6 fix: 动量得分下限,防止负分被RSRS质量因子放大
MIN_MOMENTUM_SCORE = -1.0
momentum_score = max(annualized_returns * r_squared, MIN_MOMENTUM_SCORE)
return momentum_score, annualized_returns, r_squared
def calculate_rsrs(highs, lows, N=18):
try:
if len(highs) < N or len(lows) < N:
return None, None, None
t = np.arange(N)
b_support, _ = np.polyfit(t, lows[-N:], 1)
b_resist, _ = np.polyfit(t, highs[-N:], 1)
if abs(b_resist) < 1e-10:
return None, b_support, b_resist
return b_support / b_resist, b_support, b_resist
except Exception:
return None, None, None
def calculate_rsrs_std(rsrs_series, M=60):
try:
if len(rsrs_series) < M:
return None
recent = rsrs_series[-M:]
mean = np.mean(recent)
std = np.std(recent)
if std < 1e-10:
return None
return (recent[-1] - mean) / std
except Exception:
return None
def calc_market_volatility(benchmark_closes, lookback=30):
try:
if len(benchmark_closes) < lookback + 1:
return None
recent = benchmark_closes[-(lookback + 1):]
returns = np.diff(np.log(recent))
return np.std(returns) * np.sqrt(252)
except Exception:
return None
def get_rsrs_quality(rsrs_std, rsrs_raw, b_support, b_resist):
if rsrs_std is None:
return 0.5, False
quality = 1.0 / (1.0 + np.exp(-rsrs_std))
direction_ok = True
if g.enable_rsrs_direction:
if b_support is not None and b_resist is not None:
if b_support < g.rsrs_min_slope or b_resist < g.rsrs_min_slope:
direction_ok = False
quality *= 0.5
return quality, direction_ok
def get_volume_ratio(hist_volumes, today_vol, lookback_days=None):
if lookback_days is None:
lookback_days = g.volume_lookback
try:
if hist_volumes is None or len(hist_volumes) < lookback_days:
return None
past_vol = hist_volumes[-lookback_days:]
if np.any(np.isnan(past_vol)) or np.any(past_vol == 0):
return None
avg_vol = np.mean(past_vol)
if avg_vol == 0:
return None
return today_vol / avg_vol
except Exception:
return None
def calculate_premium_rate(etf, current_price, context):
try:
prev_date = _get_prev_date_str(context)
try:
# ptrade: get_extras 获取基金净值
nav_df = get_extras('unit_net_value', etf, start_date=prev_date, end_date=prev_date)
if nav_df is not None and len(nav_df) > 0:
if hasattr(nav_df, 'iloc'):
nav_val = nav_df.iloc[-1]
if hasattr(nav_val, 'values'):
nav_val = nav_val.values[0]
else:
nav_val = nav_df
if nav_val is not None and nav_val > 0:
premium_rate = (current_price - nav_val) / nav_val * 100
passed = premium_rate <= g.max_premium_rate
return premium_rate, passed
except Exception:
pass
return None, True
except Exception:
return None, True
# ==================== 滤波器 ====================
def laplace_filter(price, s=0.05):
alpha = 1 - np.exp(-s)
L = np.zeros(len(price))
L[0] = price[0]
for t in range(1, len(price)):
L[t] = alpha * price[t] + (1 - alpha) * L[t - 1]
return L
def gaussian_filter(price, sigma=1.2):
n = len(price)
G = np.zeros(n)
for t in range(n):
weights = np.array([np.exp(-((i + 1) ** 2) / (2 * sigma ** 2)) for i in range(t + 1)])
weights = weights[::-1]
weights = weights / np.sum(weights)
G[t] = np.sum(price[:t + 1] * weights)
return G
def _calc_rsi(close, period=14):
try:
if len(close) < period + 1:
return None
deltas = np.diff(close)
gains = np.where(deltas > 0, deltas, 0)
losses = np.where(deltas < 0, -deltas, 0)
avg_gain = np.mean(gains[-period:])
avg_loss = np.mean(losses[-period:])
if avg_loss == 0:
return 100
rs = avg_gain / avg_loss
return 100 - (100 / (1 + rs))
except Exception:
return None
# ==================== 单个ETF指标计算 ====================
def calculate_all_metrics(etf, etf_name, hist_closes, hist_volumes,
current_price, today_vol, hist_highs=None, hist_lows=None,
context=None, benchmark_volatility=None):
try:
price_series = np.append(hist_closes, current_price)
mom_score, ann_ret, r2 = calculate_momentum_score(price_series, g.lookback_days)
if mom_score is None:
return None
passed_momentum = mom_score >= g.min_score_threshold
short_score, short_ret, short_r2 = calculate_momentum_score(
price_series, g.short_momentum_lookback)
passed_short = short_score >= g.short_momentum_min_score if short_score is not None else False
vol_ratio = get_volume_ratio(hist_volumes, today_vol, g.volume_lookback)
# RSRS趋势过滤
rsrs_raw = None
rsrs_std = None
rsrs_quality = 0.5 # default
rsrs_direction_ok = True
b_support = None
b_resist = None
passed_rsrs = True
rsrs_data_sufficient = False
if hist_highs is not None and hist_lows is not None and g.enable_rsrs_filter:
min_rsrs_len = g.rsrs_std_lookback + g.rsrs_lookback + 1
if len(hist_highs) >= min_rsrs_len:
rsrs_data_sufficient = True
rsrs_history = []
n_total = len(hist_highs)
for i in range(g.rsrs_lookback, n_total + 1):
r, bs, br = calculate_rsrs(hist_highs[:i], hist_lows[:i], g.rsrs_lookback)
if r is not None:
rsrs_history.append(r)
b_support = bs
b_resist = br
if len(rsrs_history) >= g.rsrs_std_lookback:
rsrs_raw = rsrs_history[-1]
rsrs_std = calculate_rsrs_std(rsrs_history, g.rsrs_std_lookback)
rsrs_threshold = g.rsrs_base_threshold
if g.enable_rsrs_adaptive and benchmark_volatility is not None:
if benchmark_volatility > g.rsrs_vol_threshold_high:
rsrs_threshold = g.rsrs_base_threshold * (1 + g.rsrs_vol_adjust_factor)
if g.enable_rsrs_direction and b_support is not None and b_resist is not None:
rsrs_direction_ok = (b_support >= g.rsrs_min_slope and
b_resist >= g.rsrs_min_slope)
if not rsrs_direction_ok:
passed_rsrs = False
elif rsrs_std is not None:
passed_rsrs = rsrs_std > rsrs_threshold
if g.enable_rsrs_synergy:
rsrs_quality, rsrs_direction_ok = get_rsrs_quality(
rsrs_std, rsrs_raw, b_support, b_resist)
# 短期风控
passed_loss = True
day_ratios = []
if len(price_series) >= 4:
d1 = price_series[-1] / price_series[-2]
d2 = price_series[-2] / price_series[-3]
d3 = price_series[-3] / price_series[-4]
day_ratios = [d1, d2, d3]
if min(day_ratios) < g.loss:
passed_loss = False
premium_rate, passed_premium = calculate_premium_rate(etf, current_price, context)
laplace_val, laplace_slope, passed_laplace = 0, 0, False
gaussian_val, gaussian_slope, passed_gaussian = 0, 0, False
if len(price_series) >= 10:
try:
laplace_vals = laplace_filter(price_series, s=g.laplace_s_param)
if len(laplace_vals) >= 2:
laplace_val = laplace_vals[-1]
laplace_slope = laplace_vals[-1] - laplace_vals[-2]
passed_laplace = (current_price > laplace_val and
laplace_slope > g.laplace_min_slope)
gaussian_vals = gaussian_filter(price_series, sigma=g.gaussian_sigma)
if len(gaussian_vals) >= 2:
gaussian_val = gaussian_vals[-1]
gaussian_slope = gaussian_vals[-1] - gaussian_vals[-2]
passed_gaussian = (current_price > gaussian_val and
gaussian_slope > g.gaussian_min_slope)
except Exception:
pass
if g.current_filter == 'laplace':
filter_val = laplace_val
filter_slope = laplace_slope
passed_filter = passed_laplace
else:
filter_val = gaussian_val
filter_slope = gaussian_slope
passed_filter = passed_gaussian
return {
'etf': etf,
'etf_name': etf_name,
'momentum_score': mom_score,
'short_momentum_score': short_score,
'annualized_returns': ann_ret,
'r_squared': r2,
'current_price': current_price,
'volume_ratio': vol_ratio,
'day_ratios': day_ratios,
'premium_rate': premium_rate,
'rsrs_raw': rsrs_raw,
'rsrs_std': rsrs_std,
'rsrs_quality': rsrs_quality,
'rsrs_direction_ok': rsrs_direction_ok,
'rsrs_data_sufficient': rsrs_data_sufficient,
'b_support': b_support,
'b_resist': b_resist,
'passed_momentum': passed_momentum,
'passed_short_momentum': passed_short,
'passed_r2': r2 > g.r2_threshold,
'passed_volume': vol_ratio is not None and vol_ratio < g.volume_threshold,
'passed_loss': passed_loss,
'passed_premium': passed_premium,
'passed_rsrs': passed_rsrs,
'laplace_value': laplace_val,
'laplace_slope': laplace_slope,
'gaussian_value': gaussian_val,
'gaussian_slope': gaussian_slope,
'passed_laplace': passed_laplace,
'passed_gaussian': passed_gaussian,
'filter_value': filter_val,
'filter_slope': filter_slope,
'passed_filter': passed_filter,
}
except Exception as e:
log.info(f'【指标计算】{etf} 失败: {e}')
return None
# ==================== 过滤应用 ====================
def apply_filters(metrics_list, context=None):
use_short = g.use_short_momentum_period
# V6.4 精简筛选链:
# - 去掉RSRS硬过滤:RSRS已通过synergy作为软调整因子参与排名,一票否决属于重复过滤
# - 去掉R²硬过滤:R²已是动量得分=年化收益×R²的乘数,单独过滤等于双重过滤同一因子
# - 正常期关闭动态滤波:正常期已有RSRS synergy+动量得分双重保障,仅在震荡期启用
steps = [
# ('RSRS趋势', lambda m: m['passed_rsrs'], False), # V6.4: 移除,保留synergy软调整
# ('R²', lambda m: m['passed_r2'], False), # V6.4: 移除,R²已融入动量得分
('动态滤波', lambda m: m['passed_filter'], g.current_filter == 'range_bound'), # V6.4: 仅震荡期启用
('主周期动量', lambda m: m['passed_momentum'], not use_short),
('短期动量', lambda m: m['passed_short_momentum'], use_short),
('成交量', lambda m: m['passed_volume'], g.enable_volume_check),
('短期风控', lambda m: m['passed_loss'], g.enable_loss_filter),
('溢价率', lambda m: m['passed_premium'], g.enable_premium_filter),
]
filtered = metrics_list[:]
for name, condition, enabled in steps:
if enabled:
before = len(filtered)
filtered = [m for m in filtered if condition(m)]
after = len(filtered)
if name == 'RSRS趋势':
g.diag_rsrs_filter_count += (before - after)
if before > after:
log.info(f'【过滤】{name}: {after}/{before} (淘汰{before - after}只)')
return filtered
…………

2 天前