ETF轮动策略(基于五福改进)

阿凡
阿凡 2025,大牛市快来了 V:flj99103681

0 人点赞了该文章 · 91 浏览

一个基于动量效应的智能ETF轮动交易系统


一句话概括

"追涨杀跌" —— 在强势ETF中顺势而为,在弱势ETF中及时退出。

先上回测图,回测区间是20250401-20260513,策略收益80.30%,超额49.76%,最大回撤14.84%


核心理念:动量效应

什么是动量?

简单说:"强者恒强,弱者恒弱"

就像跑步一样:

  • 跑得快的选手,大概率下一段还是跑在前面的

  • 跑得慢的选手,大概率继续落在后面

在市场中:

  • 近期涨幅大的ETF,未来大概率继续涨

  • 近期涨幅小的ETF,未来大概率继续跌

策略原理

过去25天涨得最好的ETF → 买入持有
过去25天跌得最多的ETF → 卖出换成涨得好的

策略架构图

┌─────────────────────────────────────────────────────────┐
│                    每日收盘前(14:45)                     │
└─────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────┐
│                    第一步:风险识别                       │
│  ┌─────────────────────────────────────────────────┐   │
│  │  检查当前市场状态:                               │   │
│  │  • 是否进入"震荡期"?(市场来回波动)              │   │
│  │  • 乖离率过大?RSI超买回落?触发止损?            │   │
│  │  • 满足任一条件 → 进入"高斯滤波器"模式            │   │
│  │  • 否则 → 进入"拉普拉斯滤波器"模式               │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────┐
│                   第二步:筛选候选ETF                     │
│  ┌─────────────────────────────────────────────────┐   │
│  │  从100+只ETF中筛选:                             │   │
│  │  1. 流动性过滤(剔除日均成交额<1000万的ETF)      │   │
│  │  2. 溢价率过滤(剔除溢价>30%的ETF)             │   │
│  │  3. 短期风控过滤(剔除近3日跌幅>3%的ETF)        │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────┐
│                   第三步:动量评分                        │
│  ┌─────────────────────────────────────────────────┐   │
│  │  对每只ETF计算:                                 │   │
│  │  • 动量得分 = ln(收益率) × R² 权重              │   │
│  │  • 年化收益率(上限1000%)                      │   │
│  │  • R²(趋势稳定性,越高越好)                   │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────┐
│                   第四步:RSRS增强                        │
│  ┌─────────────────────────────────────────────────┐   │
│  │  RSRS(阻力支撑相对强度):                       │   │
│  │  • 斜率 > 0 → 市场偏强 → 加权                    │   │
│  │  • 斜率 < 0 → 市场偏弱 → 减权                   │   │
│  │  • 综合得分 = 动量得分 × (1 + RSRS权重)          │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────┐
│                   第五步:行业分散                        │
│  ┌─────────────────────────────────────────────────┐   │
│  │  最终持仓规则:                                   │   │
│  │  • 同一行业最多1只(如"黄金"行业只选1只)         │   │
│  │  • 按得分从高到低选满4-6只                       │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────┐
│                   第六步:行业冷却                        │
│  ┌─────────────────────────────────────────────────┐   │
│  │  触发止损的行业:                                 │   │
│  │  • 进入10天"冷却期"                             │   │
│  │  • 冷却期内该行业ETF不参与排名                   │   │
│  │  • 避免频繁交易同一行业                         │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘

核心功能详解

1. 震荡期识别与切换

为什么要识别震荡期?

在剧烈波动的市场中,动量效应会失效。此时"追涨"容易"追高被套"。

如何识别?

触发条件

说明

乖离率 > 8%

价格偏离均线太多,可能回调

RSI超买后回落

从70以上跌到65以下

当日触发止损

持仓出现较大亏损

两种滤波器模式:

模式

适用场景

特点

拉普拉斯滤波

正常上涨趋势

斜率≥0.002即可,敏感度高

高斯滤波

震荡市场

标准差过滤,更稳健

2. 动态仓位管理

根据市场环境自动调整仓位:

市场状态

持仓数量

单只仓位

低回撤(<2%)

6只

18%

中低回撤(2-5%)

5只

22%

中回撤(5-10%)

4只

25%

高回撤(10-15%)

3只

30%

极高回撤(>15%)

2只

30%

3. 多重止损止盈机制

三重保护:

┌─────────────────────────────────────────────────────────┐
│                     第一重:固定止损                      │
│  • 持仓成本 × 95% 止损(即亏损5%出局)                  │
│  • 适用于快速下跌行情                                   │
└─────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────┐
│                    第二重:移动止损                       │
│  • 从最高点回撤6%触发                                  │
│  • 锁定利润,防止盈利变亏损                             │
│  • 例子:买后涨到20元,最高到25元,回撤6%即24.5元出局    │
└─────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────┐
│                    第三重:分批止盈                       │
│  • 单只盈利>12%时,卖出50%                             │
│  • 剩余50%使用移动止损                                 │
│  • 先落袋为安,剩下的让利润奔跑                          │
└─────────────────────────────────────────────────────────┘

4. 动量衰竭出场

连续2日动量得分<0 → 自动清仓

原理:动量得分由正转负,说明上涨趋势可能结束。

5. 市场宽度指标

衡量市场整体强弱:

  • 正动量占比 > 50%:正常市场

  • 正动量占比 35-50%:弱势市场 → 降低仓位

  • 正动量占比 < 35%:市场糟糕 → 降至2只+防御仓位

6. 入场择时

只在胜率较高时入场:

  • 条件:沪深300尾盘跌幅 > 2%

  • 操作:半仓买入(不是满仓)

  • 原理:大跌次日反弹概率较高

7. 动态换仓阈值

高波动日提高换仓门槛:

  • 正常日:得分差 > 0.20 即换仓

  • 高波动日:得分差 > 0.30 才换仓(提高1.5倍)

减少在高波动时的"频繁换手"和"追涨杀跌"。

8. 胜率自适应

让策略"学会"调整:

连胜次数

操作

连胜2次

降仓1只

连胜3次

增仓1只

连败次数

操作

连败2次

降仓1只


策略参数一览

可调参数(小白友好)

参数

默认值

说明

holdings_num

4

持仓数量(3-6只动态调整)

lookback_days

25

动量计算周期(交易日)

trailing_stop_pct

6%

移动止损回撤比例

take_profit_threshold

12%

分批止盈触发线

高级参数(专业用户)

参数

默认值

说明

rsrs_lookback

18

RSRS计算天数

rsrs_quality_weight

15%

RSRS对得分的影响权重

laplace_s_param

0.05

拉普拉斯滤波器参数


策略优势

✅ 优点

  1. 系统性高:有明确的选股、止损、止盈规则,避免情绪干扰

  2. 风险可控:多重止损机制,有效控制回撤

  3. 自适应强:能根据市场环境自动调整策略

  4. 行业分散:避免单押某一行业,降低黑天鹅风险

  5. 流动性好:仅选择日均成交额>1000万的ETF

⚠️ 注意事项

  1. 趋势市表现好:在单边上涨/下跌行情中效果显著

  2. 震荡市表现差:来回震荡可能导致频繁止损

  3. 滑点成本:高换手率策略需注意交易成本

  4. 历史不代表未来:动量效应可能失效


策略运行时间轴

09:00  盘前准备
  │
  ├── 持仓检查
  ├── 回撤监控
  ├── 计算流动性阈值
  ├── 更新行业ETF池
  └── 合并候选池
  │
09:30-11:30 盘中监控(分钟级止损检查)
  │
13:00-14:40 盘中监控(分钟级止损检查)
  │
14:40 卖出操作
  │
  ├── 震荡期状态检查
  ├── 动量计算
  └── 执行卖出
  │
14:45 买入操作
  │
  ├── 入场择时检查
  └── 执行买入
  │
15:00 收盘
  │
15:10 重置标志

适合人群

适合 ✅

不适合 ❌

有一定投资经验

完全不懂投资的小白

能承受一定波动

追求稳定收益、不能接受亏损

认同"趋势投资"理念

信仰"价值投资"、"长期持有"

有时间关注账户

没时间管理,想"躺平"



Q:这个策略能赚钱吗? A:没有"圣杯"。过去表现不代表未来。2025年动量效应显著,2026年可能不同。从更长的历史表现看,在21年到24年期间这个策略表现一般,进入结构性牛市后,才是该策略的甜点区。



版本历史

版本

主要改进

V5

基础动量轮动框架

V6

加入RSRS、行业分散、止损冷却

V6.2

动态仓位管理、RSRS自适应放松

V6.3

移动止损、分批止盈、市场宽度指标、入场择时

V6.4

去掉RSRS/R²硬过滤,仅用synergy软调整


策略代码仅供学习研究量化知识使用,请勿接入实盘使用。若因该策略代码及其衍生代码造成亏损,作者不承担任何责任。

核心代码摘要

# ========================================================
# ETF轮动策略 V6.4 - ptrade平台适配版
# ========================================================
# V6.4 改进(基于V6.3):
# 1. 去掉RSRS硬过滤:RSRS从一票否决改为仅通过synergy软调整(预期减少44%误杀)
# 2. 去掉R²硬过滤:R²已是动量得分乘数,单独过滤等于双重过滤同一因子
# 3. 正常期关闭动态滤波:正常期已有RSRS+动量双重保障,动态滤波仅震荡期启用
# ========================================================
# V6.3 改进:
# 1. 移动止损(Trailing Stop): 持仓从最高点回撤N%出场,锁定利润
# 2. 动量衰竭出场: 持仓标的动量得分连续2日<0则清仓
# 3. 分批止盈: 单只盈利>12%减半仓,余下用移动止损
# 4. 市场宽度指标: ETF池正动量占比<35%降仓至2只+防御仓位
# 5. 入场择时: 尾盘沪深300跌幅>2%时半仓买入
# 6. 动态换仓阈值: 高波动日提高换仓门槛,减少无效换手
# 7. 胜率自适应: 连续亏损降仓,连续盈利加仓
# ========================================================
# V6.2 改进:
# A. 动态仓位管理:根据回撤调整目标持仓数(3-6)和单只仓位(18-30%)
# B. 震荡期/高回撤时放松RSRS过滤阈值,扩大候选池(Top10→Top20)
# V6 改进:
# 1. 年化收益率上限提高至1000%,高分段对数压缩保持区分度
# 2. 行业分散约束:最终持仓同一行业最多1只(按名称前2字符)
# 3. 止损行业冷却:触发止损的行业10个交易日内禁止参与排名
# 4. RSRS过滤后增加冷却行业过滤,减少过度过滤
# ========================================================
# ptrade适配:
# - initialize() 替代 init()
# - handle_data(context, data) 替代 handle_bar(context, bar_dict)
# - get_history() 替代 get_price()
# - context.portfolio.positions 替代 context.portfolio.stock_account.positions
# - set_commission/commission_ratio 替代 PerShare
# ========================================================

import numpy as np
import math
import pandas as pd
from datetime import datetime, date, timedelta
from scipy import stats


# ==================== 策略初始化 ====================
def initialize(context):
set_benchmark('510300.SS')
log.info('【五福ETF轮动-V6.4-ptrade版】启动!')

# ptrade: set_commission 使用 commission_ratio, min_commission, type 参数
set_commission(commission_ratio=0.0001, min_commission=5.0, type='ETF')
# ptrade: set_slippage 使用滑点比例
set_slippage(slippage=0.001)
# ptrade: set_volume_ratio 设置回测成交量限制比例
set_volume_ratio(volume_ratio=0.25)

g.fixed_etf_pool = [
'518880.SS', '161226.SZ', '159980.SZ', '501018.SS', '159985.SZ',
'513100.SS', '159509.SZ', '513290.SS', '513500.SS', '159518.SZ',
'159502.SZ', '159529.SZ', '513400.SS', '520830.SS', '513520.SS', '513030.SS',
'513090.SS', '513180.SS', '513120.SS', '513330.SS', '513750.SS',
'159892.SZ', '159605.SZ', '513190.SS', '510900.SS', '513630.SS',
'513920.SS', '159323.SZ', '513970.SS',
'510500.SS', '512100.SS', '563300.SS', '510300.SS', '512050.SS',
'510760.SS', '159915.SZ', '159949.SZ', '159967.SZ', '588080.SS',
'588220.SS', '511380.SS',
'513310.SS', '588200.SS', '159852.SZ', '512880.SS', '159206.SZ',
'512400.SS', '512980.SS', '159516.SZ', '512480.SS', '515880.SS',
'562500.SS', '159218.SZ', '159869.SZ', '159870.SZ', '159326.SZ',
'159851.SZ', '560860.SS', '159363.SZ', '588170.SS', '159755.SZ',
'512170.SS', '512800.SS', '159819.SZ', '512710.SS', '159638.SZ',
'517520.SS', '515980.SS', '159995.SZ', '159227.SZ', '512660.SS',
'512690.SS', '516150.SS', '512890.SS', '588790.SS', '159992.SZ',
'512070.SS', '562800.SS', '512010.SS', '515790.SS', '510880.SS',
'159928.SZ', '159883.SZ', '159998.SZ', '515220.SS', '561980.SS',
'515400.SS', '515120.SS', '159566.SZ', '515050.SS', '516510.SS',
'159256.SZ', '159766.SZ', '512200.SS', '513350.SS', '159583.SZ',
'159732.SZ', '516160.SS', '516520.SS', '562590.SS', '515030.SS',
'512670.SS', '561330.SS', '516190.SS', '159840.SZ', '159611.SZ',
'159981.SZ', '159865.SZ', '561360.SS', '159667.SZ', '515170.SS',
'513360.SS', '159825.SZ', '515210.SS',
]

g.filtered_fixed_pool = []
g.dynamic_etf_pool = []
g.merged_etf_pool = []
g.ranked_etfs_result = []
g.target_etfs_list = []
g.etf_names_dict = {}
g.cache_date = None
g.yesterday_close_cache = {}

g.holdings_num = 4
g.defensive_etf = '511880.SS'
g.min_money = 10

# V6.2 动态仓位管理参数
g.min_holdings_num = 3 # 高回撤时最少持仓数
g.max_holdings_num = 6 # 低回撤时最多持仓数
g.max_position_pct = 0.30 # 单只最大仓位比例
g.min_position_pct = 0.18 # 单只最小仓位比例
g.current_holdings_num = 4 # 动态目标持仓数(运行时更新)
g.current_position_pct = 0.25 # 动态单只仓位比例(运行时更新)

g.lookback_days = 25
g.min_score_threshold = 0
g.score_threshold_ratio = 0.9

g.use_short_momentum_period = False
g.short_momentum_lookback = 21
g.short_momentum_min_score = 0

# RSRS 参数
g.enable_rsrs_filter = True
g.rsrs_lookback = 18
g.rsrs_std_lookback = 60
g.rsrs_base_threshold = 0.0

g.enable_rsrs_direction = True
g.rsrs_min_slope = 0.001

g.enable_rsrs_adaptive = True
g.rsrs_volatility_lookback = 30
g.rsrs_vol_threshold_high = 0.02
g.rsrs_vol_adjust_factor = 0.3

g.enable_rsrs_synergy = True
g.rsrs_quality_weight = 0.15

g.enable_r2_filter = True
g.r2_threshold = 0.4
g.enable_volume_check = True
g.volume_lookback = 5
g.volume_threshold = 1.8
g.enable_loss_filter = True
g.loss = 0.97
g.enable_premium_filter = True
g.max_premium_rate = 30

# V6.2 RSRS自适应放松:震荡期/高回撤时降低RSRS门槛
g.rsrs_relax_factor = 0.5 # 放松时RSRS阈值乘以0.5(降低50%)
g.rsrs_relax_dd_threshold = 0.05 # 回撤>=5%时触发放松
g.use_expanded_pool = True # 是否使用扩大候选池
g.expanded_pool_top_n = 20 # 扩大候选池的候选数量(从Top10扩展到Top20)

g.laplace_s_param = 0.05
g.laplace_min_slope = 0.002
g.gaussian_sigma = 1.2
g.gaussian_min_slope = 0.002

# === V6.3 择时优化参数 ===
# 移动止损
g.enable_trailing_stop = True
g.trailing_stop_pct = 0.06 # 从最高点回撤6%触发移动止损
g.trailing_stop_highs = {} # {etf: highest_price_since_entry}

# 分批止盈
g.enable_partial_take_profit = True
g.take_profit_threshold = 0.12 # 盈利>12%触发部分止盈
g.take_profit_ratio = 0.50 # 减仓50%
g.partial_take_profit_done = {} # {etf: True/False}

# 动量衰竭出场
g.enable_momentum_exit = True
g.momentum_exit_days = 2 # 连续N日动量得分<0则清仓
g.momentum_negative_days = {} # {etf: consecutive_negative_days}

# 市场宽度指标
g.enable_market_breadth = True
g.breadth_positive_threshold = 0.35 # 正动量占比<35%触发降仓
g.breadth_weak_threshold = 0.50 # <50%为弱势信号
g.breadth_state = 'normal' # normal/weak/crash
g.breadth_override_holdings = None # 市场宽度覆盖持仓数
g.breadth_override_position = None # 市场宽度覆盖仓位比例

# 入场择时
g.enable_entry_timing = True
g.entry_benchmark_drop = 0.02 # 沪深300跌幅>2%触发半仓
g.entry_half_position = False # 是否启用半仓模式

# 动态换仓阈值
g.enable_dynamic_switch = True
g.switch_score_base = 0.20 # 基础换仓阈值
g.switch_volatility_multiplier = 1.5 # 高波动日阈值倍数
g.current_switch_threshold = 0.20

# 胜率自适应
g.enable_win_rate_adaptive = True
g.win_streak = 0
g.lose_streak = 0
g.trade_history = [] # [{etf, entry_price, exit_price, pnl_pct, date}]

g.enable_range_bound_mode = True
g.current_filter = 'laplace'
g.risk_state = 'normal'
g.lookback_high_low_days = 20
g.risk_benchmark = '510300.SS'

g.enable_bias_trigger = True
g.bias_threshold = 0.08
g.ma_period = 20
g.enable_rsi_trigger = True
g.rsi_overbought = 70
g.rsi_pullback = 65
g.previous_rsi = None
g.enable_stop_loss_trigger = True

g.enable_low_point_rise_trigger = True
g.low_point_rise_threshold = 0.04
g.enable_stable_signal_trigger = True
g.drawdown_recovery = 0.02
g.max_range_bound_days = 20
g.stable_days = 0

g.filter_switch_cooldown = 3
g.last_switch_date = None
g.range_bound_start_date = None
g.range_bound_start_count = 0
g.range_bound_days_count = 0
g.trading_day_count = 0

g.stop_loss_triggered_today = False
g.previous_drawdown = None
g.max_portfolio_value = 0
g.drawdown_threshold = 0.03
g.drawdown_records = []

g.use_fixed_stop_loss = True
g.fixed_stop_loss_threshold = 0.95
g.use_pct_stop_loss = False
g.pct_stop_loss_threshold = 0.95

g.avg_etf_money_threshold = None
g.cost_prices = {}

# 行业分散约束:同义词前缀匹配,限制最终持仓中同一行业最多N只
g.max_same_industry = 1 # 同一行业最多1只

# 止损行业冷却:触发止损的行业N个交易日内禁止参与排名
g.stopped_industries = {} # {industry_name: remaining_cooldown_days}
g.industry_cooldown_days = 10 # 冷却期10个交易日

g._initialized = False
g._sell_executed_today = False
g._trading_executed_today = False

g.min_hold_days = 5
g.switch_score_threshold = 0.20
g.purchase_dates = {}

# === V5 诊断计数器 ===
g.diag_day_count = 0
g.diag_rsrs_filter_count = 0 # RSRS过滤累计
g.diag_range_bound_entries = 0 # 震荡期进入次数
g.diag_stop_loss_count = 0 # 止损累计次数

log.info(f'【策略参数】持仓:动态{g.min_holdings_num}-{g.max_holdings_num}只 | 动量周期:{g.lookback_days}天 | '
f'震荡期:{"开启" if g.enable_range_bound_mode else "关闭"} | '
f'固定池:{len(g.fixed_etf_pool)}只 | RSRS协同:{"开启" if g.enable_rsrs_synergy else "关闭"} | '
f'动态仓位:{"开启" if True else "关闭"} | 候选池扩展:{"开启" if g.use_expanded_pool else "关闭"} | '
f'择时优化:{"开启" if True else "关闭"}')


# ==================== 开盘前运行 ====================
def before_trading_start(context, data):
date_str = context.blotter.current_dt.strftime('%Y-%m-%d')
log.info(f'{date_str} 盘前运行 - ETF轮动策略V6.4-ptrade版')

if not g._initialized:
init_range_bound_status(context)
_update_dynamic_position_params(0.0)
g._initialized = True

morning_routine(context)


# ==================== 主逻辑(每日15:00或分钟bar结束调用) ====================
def handle_data(context, data):
current_time = context.blotter.current_dt.strftime('%H:%M')

# ptrade日线模式: 每天15:00(回测)或14:50(交易)调用
# 我们将所有逻辑整合到handle_data中执行
if current_time >= '14:40' and not g._sell_executed_today:
# 先执行卖出
afternoon_sell_trades(context)
g._sell_executed_today = True
# 再执行买入
afternoon_buy_trades(context, data)
g._trading_executed_today = True
return

if current_time >= '15:00':
g._sell_executed_today = False
g._trading_executed_today = False
return

# 盘中止损检查(分钟模式时每根bar结束后执行)
if '09:31' <= current_time <= '11:29' or '13:01' <= current_time <= '14:39':
check_stop_loss(context, data)


# ==================== 下午卖出 ====================
def afternoon_sell_trades(context):
log.info('▶️ 【14:45卖出】启动...')

if g.enable_range_bound_mode:
check_and_exit_range_bound_mode(context)
check_and_enter_range_bound_mode(context)

# V6.3 择时出场:移动止损 + 动量衰竭 + 分批止盈
v63_exit_trades(context)

log.info('【动量计算】开始...')
final_list = get_final_ranked_etfs(context)
g.ranked_etfs_result = final_list

# === V5 诊断:输出每日诊断摘要 ===
g.diag_day_count += 1
if g.diag_day_count % 5 == 0:
print_diagnostic_summary(context, final_list)

log.info('【卖出执行】...')
execute_sell_trades(context)

log.info('⏸️ 【14:45卖出】完毕!')


# ==================== 下午买入 ====================
def afternoon_buy_trades(context, data=None):
log.info('▶️ 【14:46买入】启动...')

# V6.3 入场择时:检查大盘环境
if g.enable_entry_timing:
_check_entry_timing(context)

log.info('【买入执行】...')
execute_buy_trades(context, data)

g.stop_loss_triggered_today = False
g.trading_day_count += 1
# 行业冷却计数器衰减
cooled = [ind for ind, rem in g.stopped_industries.items()]
for ind in cooled:
g.stopped_industries[ind] -= 1
if g.stopped_industries[ind] <= 0:
del g.stopped_industries[ind]
log.info(f'✅ 【行业冷却结束】{ind} 冷却期满')
g._trading_executed_today = True

# 重置半仓模式
g.entry_half_position = False

log.info('⏸️ 【14:46买入】完毕!')


# ==================== 收盘后运行 ====================
def after_trading_end(context, data):
# ptrade: context.portfolio.positions 替代 context.portfolio.stock_account.positions
positions = context.portfolio.positions
holding = [s for s, pos in positions.items() if pos.amount > 0]
if holding:
log.info(f'当日持仓: {", ".join(holding)} (共{len(holding)}只)')
else:
log.info('当日无持仓')
log.info(f'账户总价值: {context.portfolio.portfolio_value:.2f}, '
f'可用资金: {context.portfolio.cash:.2f}')


# ==================== V5 诊断函数 ====================
def print_diagnostic_summary(context, final_list):
"""
打印详细的诊断信息,帮助分析策略问题
"""
log.info('=' * 80)
log.info(f'📊 V5诊断报告 - 第{g.diag_day_count}个交易日')
log.info(f' 账户净值: {context.portfolio.portfolio_value:,.0f}')
log.info(f' 滤波器状态: {g.current_filter} | 风险状态: {g.risk_state}')
log.info(f' 震荡期天数: {g.range_bound_days_count}')
log.info(f' 累计止损次数: {g.diag_stop_loss_count}')
log.info(f' 累计RSRS过滤: {g.diag_rsrs_filter_count}')

# ptrade: context.portfolio.positions
positions = context.portfolio.positions
holding = [s for s, pos in positions.items() if pos.amount > 0]
target_hold = g.current_holdings_num if g.current_holdings_num > 0 else g.holdings_num
log.info(f' 当前持仓: {len(holding)}只 (目标{target_hold}只)')
log.info(f' 市场宽度: {g.breadth_state} | 换仓阈值: {g.current_switch_threshold:.0%} | '
f'连胜/连败: {g.win_streak}/{g.lose_streak} | 半仓模式: {g.entry_half_position}')

# 分析最终排名结果
if final_list:
log.info(f'\n📈 最终目标ETF ({len(final_list)}只):')
for item in final_list:
score_key = 'composite_score' if g.enable_rsrs_synergy else 'momentum_score'
cs = item.get(score_key, 0)
ms = item.get('momentum_score', 0)
rs = item.get('rsrs_raw', 0)
rstd = item.get('rsrs_std', 0)
rq = item.get('rsrs_quality', 0)
r2 = item.get('r_squared', 0)
ann = item.get('annualized_returns', 0)

# 计算RSRS对得分的影响
if g.enable_rsrs_synergy and ms != 0:
w = g.rsrs_quality_weight
raw_contribution = ms * (1 - w)
rsrs_contribution = rq * w * abs(ms)
rsrs_impact_pct = rsrs_contribution / abs(cs) * 100 if cs != 0 else 0
log.info(f' {item["etf"]} {item["etf_name"]}:')
log.info(f' 动量得分={ms:.4f} (年化={ann:.2%}, R²={r2:.3f})')
log.info(f' RSRS: raw={rs:.4f}, std={rstd:.2f}, quality={rq:.3f}')
log.info(f' 综合得分={cs:.4f} = 动量贡献{raw_contribution:.4f} + RSRS贡献{rsrs_contribution:.4f} (RSRS占比{rsrs_impact_pct:.1f}%)')
else:
log.info(f' {item["etf"]} {item["etf_name"]}: 得分={cs:.4f}, R²={r2:.3f}')
else:
log.info(' 无目标ETF')

log.info('=' * 80)


# ==================== 震荡期状态初始化 ====================
def init_range_bound_status(context):
if not g.enable_range_bound_mode:
return
log.info('🔍 【首次运行】初始化震荡期状态...')
try:
prev_date = _get_prev_date_str(context)
lookback = max(g.ma_period, g.lookback_high_low_days) + 30
# ptrade: get_history 替代 get_price
df = get_history(
count=lookback, frequency='1d', field=['close', 'high', 'low'],
security_list=g.risk_benchmark, fq='pre'
)
if df is None or len(df) < max(g.ma_period, g.lookback_high_low_days):
log.info(f'【首次运行】数据不足,保持正常期')
return

close = df['close'].values
high = df['high'].values
low = df['low'].values
current_price = close[-1]

if len(close) >= g.lookback_high_low_days:
recent_high = np.max(high[-g.lookback_high_low_days:])
recent_low = np.min(low[-g.lookback_high_low_days:])
else:
recent_high = np.max(high)
recent_low = np.min(low)

ma = np.mean(close[-g.ma_period:])
bias = (current_price - ma) / ma if ma > 0 else 0
rise_from_low = (current_price - recent_low) / recent_low if recent_low > 0 else 0
current_rsi = _calc_rsi(close, period=14)

should_enter = False
signals = []

if g.enable_bias_trigger and bias > g.bias_threshold:
should_enter = True
signals.append(f'乖离率{bias:.2%}>{g.bias_threshold:.0%}')

if g.enable_rsi_trigger and current_rsi is not None and len(close) >= 15:
prev_rsi = _calc_rsi(close[:-1], period=14)
if prev_rsi is not None and prev_rsi > g.rsi_overbought and current_rsi < g.rsi_pullback:
should_enter = True
signals.append(f'RSI超买回落{prev_rsi:.1f}→{current_rsi:.1f}')

if should_enter:
g.current_filter = 'range_bound'
g.risk_state = 'range_bound'
g.range_bound_start_date = prev_date
g.range_bound_start_count = 0
g.range_bound_days_count = 0
g.diag_range_bound_entries += 1
log.info(f'🔔 【首次运行】初始化进入震荡期: {"; ".join(signals)}')
else:
g.current_filter = 'laplace'
g.risk_state = 'normal'
if len(close) >= g.lookback_high_low_days:
g.previous_drawdown = (recent_high - current_price) / recent_high if recent_high > 0 else 0
else:
g.previous_drawdown = 0
g.previous_rsi = current_rsi
log.info(f'📌 【首次运行】初始状态: 正常期, 乖离率:{bias:.2%}, RSI:{current_rsi:.1f}')
except Exception as e:
log.info(f'【首次运行】初始化异常: {e},保持正常期')


# ==================== 晨间流水线 ====================
def morning_routine(context):
log.info('★' * 60)
log.info('▶️ 【晨间准备】启动...')

check_positions(context)
monitor_drawdown(context)

# V6.3 市场宽度计算
if g.enable_market_breadth:
_update_market_breadth(context)

# V6.3 动态换仓阈值
_update_switch_threshold(context)

filter_fixed_pool_by_volume(context)
update_dynamic_pool(context)
daily_merge_etf_pools(context)

log.info('⏸️ 【晨间准备】完毕!')


# ==================== 持仓检查 ====================
def check_positions(context):
# ptrade: context.portfolio.positions
positions = context.portfolio.positions
for security, pos in positions.items():
if pos.amount > 0:
name = _get_name(security)
log.info(f'📊 【持仓】{security} {name}, 数量:{pos.amount}, '
f'成本:{pos.cost_basis:.3f}')


def monitor_drawdown(context):
try:
# ptrade: portfolio_value 替代 total_value
current_value = context.portfolio.portfolio_value
if current_value > g.max_portfolio_value:
g.max_portfolio_value = current_value
if g.max_portfolio_value > 0:
current_dd = (g.max_portfolio_value - current_value) / g.max_portfolio_value
if current_dd >= g.drawdown_threshold:
log.info(f'【回撤预警】回撤 {current_dd:.2%} '
f'(阈值:{g.drawdown_threshold:.0%}), '
f'净值:{current_value:,.0f}, 最高:{g.max_portfolio_value:,.0f}')
# V6.2 动态仓位:每次监控回撤时更新仓位参数
_update_dynamic_position_params(current_dd)
except Exception as e:
log.info(f'【回撤监控】异常: {e}')


def _update_dynamic_position_params(current_dd):
"""V6.2+V6.3 根据回撤+市场宽度+胜率动态调整持仓数和单只仓位比例"""
# 回撤分级 -> 持仓数 & 单只仓位
if current_dd <= 0.02:
target = g.max_holdings_num # 6
pos_pct = g.min_position_pct # 18%
elif current_dd <= 0.05:
target = 5
pos_pct = 0.22
elif current_dd <= 0.10:
target = 4
pos_pct = 0.25
elif current_dd <= 0.15:
target = g.min_holdings_num # 3
pos_pct = g.max_position_pct # 30%
else:
target = 2
pos_pct = g.max_position_pct

# V6.3 市场宽度覆盖:弱势市场强制降仓
if g.breadth_state == 'crash':
target = min(target, 2)
pos_pct = min(pos_pct, 0.25)
elif g.breadth_state == 'weak':
target = max(target - 1, 2)

# V6.3 胜率自适应:连续亏损降仓,连续盈利加仓
if g.enable_win_rate_adaptive:
if g.lose_streak >= 2:
target = max(target - 1, 2)
pos_pct = pos_pct * 0.85
elif g.win_streak >= 3:
target = min(target + 1, g.max_holdings_num)
pos_pct = min(pos_pct * 1.1, g.max_position_pct)

# 仅在参数变化时输出日志
if target != g.current_holdings_num or abs(pos_pct - g.current_position_pct) > 0.01:
old = f'{g.current_holdings_num}只/{g.current_position_pct:.0%}'
g.current_holdings_num = target
g.current_position_pct = pos_pct
log.info(f'【动态仓位】回撤{current_dd:.2%} -> 目标持仓{target}只, 单只{pos_pct:.0%} (原{old})')


# ==================== 动态池更新 ====================
def update_dynamic_pool(context):
log.info('【动态池更新】开始...')
try:
prev_date = _get_prev_date_str(context)
# ptrade: get_all_securities 可能不可用,仅用固定池
try:
df_etf = get_all_securities('etf', date=prev_date)
if df_etf is not None and hasattr(df_etf, 'index') and len(df_etf) > 0:
etf_list = [s for s in df_etf.index.tolist() if not s.endswith('.OF')]
if hasattr(df_etf, 'display_name'):
if hasattr(df_etf['display_name'], 'to_dict'):
g.etf_names_dict = df_etf['display_name'].to_dict()
elif isinstance(df_etf['display_name'], dict):
g.etf_names_dict = df_etf['display_name']
log.info(f'【动态池更新】全市场ETF: {len(etf_list)}只')
else:
log.info('【动态池更新】get_all_securities不可用,仅用固定池')
g.dynamic_etf_pool = []
return
except Exception as e:
log.info(f'【动态池更新】get_all_securities不可用: {e},仅用固定池')
g.dynamic_etf_pool = []
return

threshold = g.avg_etf_money_threshold if g.avg_etf_money_threshold is not None else 1e7

# ptrade: get_history 替代 get_price
df = get_history(
count=3, frequency='1d', field='money',
security_list=etf_list, fq=None
)

if _is_empty(df):
log.info('【动态池更新】无法获取成交额')
g.dynamic_etf_pool = []
return

code_total_money = {}
if isinstance(df, dict):
for code, code_df in df.items():
if isinstance(code_df, pd.DataFrame):
code_total_money[code] = code_df['money'].sum()
elif isinstance(code_df, dict):
code_total_money[code] = sum(code_df.get('money', []))

if not code_total_money:
log.info('【动态池更新】无法解析成交额')
g.dynamic_etf_pool = []
return

avg_daily_money = {c: v / 3.0 for c, v in code_total_money.items()}
qualified = {c: v for c, v in avg_daily_money.items() if v > threshold}
qualified_sorted = sorted(qualified.items(), key=lambda x: x[1], reverse=True)

log.info(f'【动态池更新】流动性过滤: {len(etf_list)}→{len(qualified)}只')

exclude_keywords = [
'300', '500', '1000', '2000', '800', '30', '50', '100', '180', '200',
'沪深', '中证', '上证', '深证', '深成', 'A50', 'A100', 'A500', '深100',
'短融', '可转债', '转债', '双债', '利率债', '国债', '地债', '政金债',
'国开债', '信用债', '企业债', '公司债', '城投债', '城投', '美元债',
'货币', '现金', '快线', '快钱', 'ESG', 'MSCI', '债',
]

industry_map = {}
for code, _ in qualified_sorted:
name = g.etf_names_dict.get(code, '')
skipped = False
for kw in exclude_keywords:
if kw in name:
skipped = True
break
if skipped:
continue
key = name[:2] if len(name) >= 2 else name
if key not in industry_map:
industry_map[key] = []
industry_map[key].append((code, name, avg_daily_money.get(code, 0)))

pool = []
for key, items in industry_map.items():
items.sort(key=lambda x: x[2], reverse=True)
pool.append(items[0][0])

g.dynamic_etf_pool = pool[:100]
log.info(f'【动态池更新完成】动态池: {len(g.dynamic_etf_pool)}只')

except Exception as e:
log.info(f'【动态池更新】异常: {e}')
g.dynamic_etf_pool = []


# ==================== 固定池流动性过滤 ====================
def filter_fixed_pool_by_volume(context):
log.info('【固定池过滤】开始...')

if not g.fixed_etf_pool:
return

threshold = g.avg_etf_money_threshold if g.avg_etf_money_threshold is not None else 1e7

try:
prev_date = _get_prev_date_str(context)

# ptrade: get_history 替代 get_price
df = get_history(
count=3, frequency='1d', field='money',
security_list=g.fixed_etf_pool, fq=None
)

if _is_empty(df):
log.info('【固定池过滤】无法获取成交额,保留全部')
g.filtered_fixed_pool = g.fixed_etf_pool[:]
return

code_total_money = {}
if isinstance(df, dict):
for code, code_df in df.items():
if isinstance(code_df, pd.DataFrame):
code_total_money[code] = code_df['money'].sum()
elif isinstance(code_df, dict):
code_total_money[code] = sum(code_df.get('money', []))

if not code_total_money:
log.info('【固定池过滤】无法解析成交额,保留全部')
g.filtered_fixed_pool = g.fixed_etf_pool[:]
return

avg_daily_money = {c: v / 3.0 for c, v in code_total_money.items()}
qualified = [c for c, v in avg_daily_money.items() if v > threshold]
g.filtered_fixed_pool = qualified

removed = set(g.fixed_etf_pool) - set(g.filtered_fixed_pool)
if removed:
log.info(f'【固定池过滤】剔除{len(removed)}只低流动性ETF')
log.info(f'【固定池过滤】保留{len(g.filtered_fixed_pool)}只')

except Exception as e:
log.info(f'【固定池过滤】异常: {e}')
g.filtered_fixed_pool = g.fixed_etf_pool[:]


# ==================== 合并ETF池 ====================
def daily_merge_etf_pools(context):
merged = list(set(g.filtered_fixed_pool + g.dynamic_etf_pool))
merged = [s for s in merged if not s.endswith('.OF')]
merged.sort()
log.info(f'【合并池】固定:{len(g.filtered_fixed_pool)}只, '
f'动态:{len(g.dynamic_etf_pool)}只, 合并:{len(merged)}只')
g.merged_etf_pool = merged


# ==================== 震荡期退出检查 ====================
def check_and_exit_range_bound_mode(context):
if g.current_filter != 'range_bound':
return

log.info('🔍 【震荡期退出检查】...')

try:
prev_date = _get_prev_date_str(context)
lookback = max(g.ma_period, g.lookback_high_low_days) + 30
# ptrade: get_history 替代 get_price
df = get_history(
count=lookback, frequency='1d', field=['close', 'high', 'low'],
security_list=g.risk_benchmark, fq='pre'
)
if df is None or len(df) < max(g.ma_period, g.lookback_high_low_days):
return

close = df['close'].values
high = df['high'].values
low = df['low'].values
current_price = close[-1]

recent_high = np.max(high[-g.lookback_high_low_days:]) if len(high) >= g.lookback_high_low_days else np.max(high)
recent_low = np.min(low[-g.lookback_high_low_days:]) if len(low) >= g.lookback_high_low_days else np.min(low)

current_drawdown = (recent_high - current_price) / recent_high if recent_high > 0 else 0
rise_from_low = (current_price - recent_low) / recent_low if recent_low > 0 else 0

ma = np.mean(close[-g.ma_period:])
current_rsi = _calc_rsi(close, period=14)

recovery_signals = []

if g.enable_low_point_rise_trigger and rise_from_low >= g.low_point_rise_threshold:
recovery_signals.append(f'从低点上涨{rise_from_low:.2%}')

if g.enable_stable_signal_trigger:
if current_price > ma:
recovery_signals.append('价格站上均线')
if len(close) >= 2 and close[-1] > close[-2]:
recovery_signals.append('价格回升')
if g.previous_drawdown is not None and current_drawdown < g.previous_drawdown:
recovery_signals.append('回撤收窄')
if current_rsi is not None and g.previous_rsi is not None and current_rsi > g.previous_rsi:
recovery_signals.append('RSI回升')

if current_drawdown < g.drawdown_recovery:
g.stable_days += 1
else:
g.stable_days = 0

g.previous_drawdown = current_drawdown
g.previous_rsi = current_rsi

range_bound_days = g.trading_day_count - g.range_bound_start_count
if range_bound_days >= g.max_range_bound_days:
recovery_signals.append(f'震荡期满({range_bound_days}天)')

low_rise = g.enable_low_point_rise_trigger and rise_from_low >= g.low_point_rise_threshold
stable = (g.enable_stable_signal_trigger and
current_drawdown < g.drawdown_recovery and
len(recovery_signals) >= 2 and g.stable_days >= 2)
force_exit = range_bound_days >= g.max_range_bound_days

if low_rise or stable or force_exit:
can_switch = True
if g.last_switch_date is not None:
days_since = g.trading_day_count - g.last_switch_date
if days_since < g.filter_switch_cooldown:
can_switch = False
log.info(f'⏳ 【冷却期】距上次切换{days_since}天')

if can_switch:
g.current_filter = 'laplace'
g.risk_state = 'normal'
g.last_switch_date = g.trading_day_count
g.range_bound_start_date = None
g.range_bound_start_count = 0
g.range_bound_days_count = 0
g.stable_days = 0
log.info(f'🔔 【退出震荡期】切换回拉普拉斯: {"; ".join(recovery_signals)}')
else:
log.info('📌 【退出检查】未满足退出条件,保持震荡期')

except Exception as e:
log.info(f'【退出检查】异常: {e}')


# ==================== 震荡期进入检查 ====================
def check_and_enter_range_bound_mode(context):
if not g.enable_range_bound_mode:
return
if g.current_filter == 'range_bound':
return

can_switch = True
if g.last_switch_date is not None:
days_since = g.trading_day_count - g.last_switch_date
if days_since < g.filter_switch_cooldown:
can_switch = False

if not can_switch:
return

log.info('🔍 【震荡期进入检查】...')
risk_signals = []

try:
prev_date = _get_prev_date_str(context)

# ptrade: get_history 替代 get_price
df = get_history(
count=g.ma_period + 20, frequency='1d', field=['close'],
security_list=g.risk_benchmark, fq='pre'
)
if df is not None and len(df) >= g.ma_period:
close = df['close'].values
current_price = close[-1]

if g.enable_bias_trigger:
ma = np.mean(close[-g.ma_period:])
bias = (current_price - ma) / ma
if bias > g.bias_threshold:
risk_signals.append(f'乖离率{bias:.2%}>{g.bias_threshold:.0%}')

if g.enable_rsi_trigger and len(close) >= 15:
current_rsi = _calc_rsi(close, period=14)
prev_rsi = _calc_rsi(close[:-1], period=14)
if prev_rsi is not None and prev_rsi > g.rsi_overbought and current_rsi < g.rsi_pullback:
risk_signals.append(f'RSI超买回落({prev_rsi:.1f}→{current_rsi:.1f})')

except Exception as e:
log.info(f'【进入检查】异常: {e}')

if g.enable_stop_loss_trigger and g.stop_loss_triggered_today:
risk_signals.append('今日触发止损')

if len(risk_signals) > 0:
g.current_filter = 'range_bound'
g.risk_state = 'range_bound'
g.last_switch_date = g.trading_day_count
g.range_bound_start_date = context.blotter.current_dt.strftime('%Y%m%d')
g.range_bound_start_count = g.trading_day_count
g.range_bound_days_count = 0
g.stable_days = 0
g.diag_range_bound_entries += 1
log.info(f'🔔 【进入震荡期】切换高斯滤波器: {"; ".join(risk_signals)}')
else:
log.info('✅ 【进入检查】保持正常期(拉普拉斯)')


def _get_industry(etf_name):
"""从ETF名称提取行业标识(前2个字符)"""
if not etf_name:
return '其他'
return etf_name[:2] if len(etf_name) >= 2 else etf_name


def _apply_industry_filter(candidates, rank_key):
"""行业分散约束:同一行业最多g.max_same_industry只,按得分从高到低贪心选择"""
if not candidates:
return candidates
max_per_industry = g.max_same_industry
industry_count = {}
result = []
target_hold = g.current_holdings_num if g.current_holdings_num > 0 else g.holdings_num
for item in candidates:
industry = _get_industry(item.get('etf_name', ''))
if industry_count.get(industry, 0) >= max_per_industry:
continue
industry_count[industry] = industry_count.get(industry, 0) + 1
result.append(item)
if len(result) >= target_hold:
break
return result


# ==================== 动量计算核心 ====================
def calculate_momentum_score(price_series, lookback_days):
if len(price_series) < lookback_days + 1:
return None, None, None

recent = price_series[-(lookback_days + 1):]
y = np.log(recent)
x = np.arange(len(y))
weights = np.linspace(1, 2, len(y))

slope, intercept = np.polyfit(x, y, 1, w=weights)

# V6 fix: 提高年化收益率上限,使用对数压缩防止高分过度拉开差距
MAX_ANNUALIZED_RETURN = 10.0 # 1000%年化上限(从500%提高到1000%)
raw_annualized = min(math.exp(slope * 250) - 1, MAX_ANNUALIZED_RETURN)
# 对数压缩:超过300%的收益进行对数压缩,保持相对排序同时缩小绝对差距
if raw_annualized > 3.0:
# log(1+x) * 3/log(4) 使得 x=3 时值为3, x=10 时值约5.2
annualized_returns = 3.0 * (1 + math.log(1 + (raw_annualized - 3.0)) / math.log(4))
else:
annualized_returns = raw_annualized

ss_res = np.sum(weights * (y - (slope * x + intercept)) ** 2)
ss_tot = np.sum(weights * (y - np.mean(y)) ** 2)
r_squared = 1 - ss_res / ss_tot if ss_tot else 0

# V6 fix: 动量得分下限,防止负分被RSRS质量因子放大
MIN_MOMENTUM_SCORE = -1.0
momentum_score = max(annualized_returns * r_squared, MIN_MOMENTUM_SCORE)
return momentum_score, annualized_returns, r_squared


def calculate_rsrs(highs, lows, N=18):
try:
if len(highs) < N or len(lows) < N:
return None, None, None
t = np.arange(N)
b_support, _ = np.polyfit(t, lows[-N:], 1)
b_resist, _ = np.polyfit(t, highs[-N:], 1)
if abs(b_resist) < 1e-10:
return None, b_support, b_resist
return b_support / b_resist, b_support, b_resist
except Exception:
return None, None, None


def calculate_rsrs_std(rsrs_series, M=60):
try:
if len(rsrs_series) < M:
return None
recent = rsrs_series[-M:]
mean = np.mean(recent)
std = np.std(recent)
if std < 1e-10:
return None
return (recent[-1] - mean) / std
except Exception:
return None


def calc_market_volatility(benchmark_closes, lookback=30):
try:
if len(benchmark_closes) < lookback + 1:
return None
recent = benchmark_closes[-(lookback + 1):]
returns = np.diff(np.log(recent))
return np.std(returns) * np.sqrt(252)
except Exception:
return None


def get_rsrs_quality(rsrs_std, rsrs_raw, b_support, b_resist):
if rsrs_std is None:
return 0.5, False

quality = 1.0 / (1.0 + np.exp(-rsrs_std))

direction_ok = True
if g.enable_rsrs_direction:
if b_support is not None and b_resist is not None:
if b_support < g.rsrs_min_slope or b_resist < g.rsrs_min_slope:
direction_ok = False
quality *= 0.5

return quality, direction_ok


def get_volume_ratio(hist_volumes, today_vol, lookback_days=None):
if lookback_days is None:
lookback_days = g.volume_lookback
try:
if hist_volumes is None or len(hist_volumes) < lookback_days:
return None
past_vol = hist_volumes[-lookback_days:]
if np.any(np.isnan(past_vol)) or np.any(past_vol == 0):
return None
avg_vol = np.mean(past_vol)
if avg_vol == 0:
return None
return today_vol / avg_vol
except Exception:
return None


def calculate_premium_rate(etf, current_price, context):
try:
prev_date = _get_prev_date_str(context)
try:
# ptrade: get_extras 获取基金净值
nav_df = get_extras('unit_net_value', etf, start_date=prev_date, end_date=prev_date)
if nav_df is not None and len(nav_df) > 0:
if hasattr(nav_df, 'iloc'):
nav_val = nav_df.iloc[-1]
if hasattr(nav_val, 'values'):
nav_val = nav_val.values[0]
else:
nav_val = nav_df

if nav_val is not None and nav_val > 0:
premium_rate = (current_price - nav_val) / nav_val * 100
passed = premium_rate <= g.max_premium_rate
return premium_rate, passed
except Exception:
pass

return None, True

except Exception:
return None, True


# ==================== 滤波器 ====================
def laplace_filter(price, s=0.05):
alpha = 1 - np.exp(-s)
L = np.zeros(len(price))
L[0] = price[0]
for t in range(1, len(price)):
L[t] = alpha * price[t] + (1 - alpha) * L[t - 1]
return L


def gaussian_filter(price, sigma=1.2):
n = len(price)
G = np.zeros(n)
for t in range(n):
weights = np.array([np.exp(-((i + 1) ** 2) / (2 * sigma ** 2)) for i in range(t + 1)])
weights = weights[::-1]
weights = weights / np.sum(weights)
G[t] = np.sum(price[:t + 1] * weights)
return G


def _calc_rsi(close, period=14):
try:
if len(close) < period + 1:
return None
deltas = np.diff(close)
gains = np.where(deltas > 0, deltas, 0)
losses = np.where(deltas < 0, -deltas, 0)
avg_gain = np.mean(gains[-period:])
avg_loss = np.mean(losses[-period:])
if avg_loss == 0:
return 100
rs = avg_gain / avg_loss
return 100 - (100 / (1 + rs))
except Exception:
return None


# ==================== 单个ETF指标计算 ====================
def calculate_all_metrics(etf, etf_name, hist_closes, hist_volumes,
current_price, today_vol, hist_highs=None, hist_lows=None,
context=None, benchmark_volatility=None):
try:
price_series = np.append(hist_closes, current_price)

mom_score, ann_ret, r2 = calculate_momentum_score(price_series, g.lookback_days)
if mom_score is None:
return None

passed_momentum = mom_score >= g.min_score_threshold

short_score, short_ret, short_r2 = calculate_momentum_score(
price_series, g.short_momentum_lookback)

passed_short = short_score >= g.short_momentum_min_score if short_score is not None else False

vol_ratio = get_volume_ratio(hist_volumes, today_vol, g.volume_lookback)

# RSRS趋势过滤
rsrs_raw = None
rsrs_std = None
rsrs_quality = 0.5 # default
rsrs_direction_ok = True
b_support = None
b_resist = None
passed_rsrs = True
rsrs_data_sufficient = False
if hist_highs is not None and hist_lows is not None and g.enable_rsrs_filter:
min_rsrs_len = g.rsrs_std_lookback + g.rsrs_lookback + 1
if len(hist_highs) >= min_rsrs_len:
rsrs_data_sufficient = True
rsrs_history = []
n_total = len(hist_highs)
for i in range(g.rsrs_lookback, n_total + 1):
r, bs, br = calculate_rsrs(hist_highs[:i], hist_lows[:i], g.rsrs_lookback)
if r is not None:
rsrs_history.append(r)
b_support = bs
b_resist = br
if len(rsrs_history) >= g.rsrs_std_lookback:
rsrs_raw = rsrs_history[-1]
rsrs_std = calculate_rsrs_std(rsrs_history, g.rsrs_std_lookback)

rsrs_threshold = g.rsrs_base_threshold
if g.enable_rsrs_adaptive and benchmark_volatility is not None:
if benchmark_volatility > g.rsrs_vol_threshold_high:
rsrs_threshold = g.rsrs_base_threshold * (1 + g.rsrs_vol_adjust_factor)

if g.enable_rsrs_direction and b_support is not None and b_resist is not None:
rsrs_direction_ok = (b_support >= g.rsrs_min_slope and
b_resist >= g.rsrs_min_slope)
if not rsrs_direction_ok:
passed_rsrs = False
elif rsrs_std is not None:
passed_rsrs = rsrs_std > rsrs_threshold

if g.enable_rsrs_synergy:
rsrs_quality, rsrs_direction_ok = get_rsrs_quality(
rsrs_std, rsrs_raw, b_support, b_resist)

# 短期风控
passed_loss = True
day_ratios = []
if len(price_series) >= 4:
d1 = price_series[-1] / price_series[-2]
d2 = price_series[-2] / price_series[-3]
d3 = price_series[-3] / price_series[-4]
day_ratios = [d1, d2, d3]
if min(day_ratios) < g.loss:
passed_loss = False

premium_rate, passed_premium = calculate_premium_rate(etf, current_price, context)

laplace_val, laplace_slope, passed_laplace = 0, 0, False
gaussian_val, gaussian_slope, passed_gaussian = 0, 0, False

if len(price_series) >= 10:
try:
laplace_vals = laplace_filter(price_series, s=g.laplace_s_param)
if len(laplace_vals) >= 2:
laplace_val = laplace_vals[-1]
laplace_slope = laplace_vals[-1] - laplace_vals[-2]
passed_laplace = (current_price > laplace_val and
laplace_slope > g.laplace_min_slope)

gaussian_vals = gaussian_filter(price_series, sigma=g.gaussian_sigma)
if len(gaussian_vals) >= 2:
gaussian_val = gaussian_vals[-1]
gaussian_slope = gaussian_vals[-1] - gaussian_vals[-2]
passed_gaussian = (current_price > gaussian_val and
gaussian_slope > g.gaussian_min_slope)
except Exception:
pass

if g.current_filter == 'laplace':
filter_val = laplace_val
filter_slope = laplace_slope
passed_filter = passed_laplace
else:
filter_val = gaussian_val
filter_slope = gaussian_slope
passed_filter = passed_gaussian

return {
'etf': etf,
'etf_name': etf_name,
'momentum_score': mom_score,
'short_momentum_score': short_score,
'annualized_returns': ann_ret,
'r_squared': r2,
'current_price': current_price,
'volume_ratio': vol_ratio,
'day_ratios': day_ratios,
'premium_rate': premium_rate,
'rsrs_raw': rsrs_raw,
'rsrs_std': rsrs_std,
'rsrs_quality': rsrs_quality,
'rsrs_direction_ok': rsrs_direction_ok,
'rsrs_data_sufficient': rsrs_data_sufficient,
'b_support': b_support,
'b_resist': b_resist,
'passed_momentum': passed_momentum,
'passed_short_momentum': passed_short,
'passed_r2': r2 > g.r2_threshold,
'passed_volume': vol_ratio is not None and vol_ratio < g.volume_threshold,
'passed_loss': passed_loss,
'passed_premium': passed_premium,
'passed_rsrs': passed_rsrs,
'laplace_value': laplace_val,
'laplace_slope': laplace_slope,
'gaussian_value': gaussian_val,
'gaussian_slope': gaussian_slope,
'passed_laplace': passed_laplace,
'passed_gaussian': passed_gaussian,
'filter_value': filter_val,
'filter_slope': filter_slope,
'passed_filter': passed_filter,
}
except Exception as e:
log.info(f'【指标计算】{etf} 失败: {e}')
return None


# ==================== 过滤应用 ====================
def apply_filters(metrics_list, context=None):
use_short = g.use_short_momentum_period
# V6.4 精简筛选链:
# - 去掉RSRS硬过滤:RSRS已通过synergy作为软调整因子参与排名,一票否决属于重复过滤
# - 去掉R²硬过滤:R²已是动量得分=年化收益×R²的乘数,单独过滤等于双重过滤同一因子
# - 正常期关闭动态滤波:正常期已有RSRS synergy+动量得分双重保障,仅在震荡期启用
steps = [
# ('RSRS趋势', lambda m: m['passed_rsrs'], False), # V6.4: 移除,保留synergy软调整
# ('R²', lambda m: m['passed_r2'], False), # V6.4: 移除,R²已融入动量得分
('动态滤波', lambda m: m['passed_filter'], g.current_filter == 'range_bound'), # V6.4: 仅震荡期启用
('主周期动量', lambda m: m['passed_momentum'], not use_short),
('短期动量', lambda m: m['passed_short_momentum'], use_short),
('成交量', lambda m: m['passed_volume'], g.enable_volume_check),
('短期风控', lambda m: m['passed_loss'], g.enable_loss_filter),
('溢价率', lambda m: m['passed_premium'], g.enable_premium_filter),
]

filtered = metrics_list[:]
for name, condition, enabled in steps:
if enabled:
before = len(filtered)
filtered = [m for m in filtered if condition(m)]
after = len(filtered)
if name == 'RSRS趋势':
g.diag_rsrs_filter_count += (before - after)
if before > after:
log.info(f'【过滤】{name}: {after}/{before} (淘汰{before - after}只)')

return filtered


…………


发布于 2 天前

免责声明:

本文由 阿凡 原创发布于 百果量化交流平台 ,著作权归作者所有。

登录一下,更多精彩内容等你发现,贡献精彩回答,参与评论互动

登录! 还没有账号?去注册

阿凡
2 天前
核心代码都贴出来了,后面还有些辅助的函数,就没有贴了。 有技术的兄弟可以自己补齐就行。 如果想省时间,可以直接在另外一个付费贴付费下载