DEV Community

Henry Lin
Henry Lin

Posted on

Qlib源码分析-Top-K实例讲解

!/usr/bin/env python3

-- coding: utf-8 --

"""
TopkDropoutStrategy 和 SimulatorExecutor 源码分析演示

本示例展示如何使用 TopkDropoutStrategy 和 SimulatorExecutor 进行量化投资回测,
并深入解析其内部工作机制。

"""

import sys
import warnings
import numpy as np
import pandas as pd
from pathlib import Path

添加项目根目录到Python路径

sys.path.insert(0, str(Path(file).parent.parent))

import qlib
from qlib.config import REG_CN
from qlib.contrib.data.handler import Alpha158
from qlib.contrib.model.gbdt import LGBModel
from qlib.contrib.strategy.signal_strategy import TopkDropoutStrategy
from qlib.backtest.executor import SimulatorExecutor
from qlib.backtest.account import Account
from qlib.backtest.exchange import Exchange
from qlib.data.dataset import DatasetH
from qlib.workflow import R
from qlib.backtest import backtest
from qlib.utils import init_instance_by_config
from qlib.model.trainer import TrainerR

抑制警告信息

warnings.filterwarnings('ignore')

def initialize_qlib():
"""初始化Qlib环境"""
print("🚀 初始化Qlib环境...")
try:
# 初始化Qlib,使用本地数据
provider_uri = "~/.qlib/qlib_data/cn_data" # 数据路径
qlib.init(provider_uri=provider_uri, region=REG_CN)
print("✅ Qlib初始化成功")
return True
except Exception as e:
print(f"❌ Qlib初始化失败: {e}")
return False

def create_dataset():
"""创建数据集"""
print("\n📊 创建Alpha158特征数据集...")

# Alpha158数据处理器配置
handler_config = {
    "start_time": "2020-01-01",
    "end_time": "2020-12-31", 
    "fit_start_time": "2020-01-01",
    "fit_end_time": "2020-08-31",
    "instruments": "csi300",  # 沪深300成分股
    "infer_processors": [
        {
            "class": "RobustZScoreNorm",
            "kwargs": {"fields_group": "feature", "clip_outlier": True},
        },
        {"class": "Fillna", "kwargs": {"fields_group": "feature"}},
    ],
    "learn_processors": [
        {"class": "DropnaLabel"},
        {"class": "CSRankNorm", "kwargs": {"fields_group": "label"}},
    ],
    "label": ["Ref($close, -2) / Ref($close, -1) - 1"],  # 未来2日收益率
}

# 创建Alpha158处理器
handler = Alpha158(**handler_config)

# 定义数据集分段
segments = {
    "train": ("2020-01-01", "2020-06-30"),
    "valid": ("2020-07-01", "2020-08-31"), 
    "test": ("2020-09-01", "2020-12-31")
}

# 创建数据集
dataset = DatasetH(handler, segments=segments)

print("✅ 数据集创建成功")
print(f"   - 特征维度: Alpha158 (158个技术指标)")
print(f"   - 标的池: CSI300成分股") 
print(f"   - 时间范围: 2020-01-01 到 2020-12-31")
print(f"   - 标签: 未来2日收益率")

return dataset
Enter fullscreen mode Exit fullscreen mode

def train_model(dataset):
"""训练LightGBM预测模型"""
print("\n🤖 训练LightGBM预测模型...")

# 直接创建LGBModel实例
model = LGBModel(
    objective="regression",
    num_leaves=60,
    learning_rate=0.1,
    feature_fraction=0.8,
    bagging_fraction=0.8,
    bagging_freq=5,
    verbose=-1,
    num_boost_round=100,
    early_stopping_rounds=10,
)

# 训练模型
model.fit(dataset)

print("✅ 模型训练完成")
print("   - 模型类型: LightGBM")
print("   - 目标函数: 回归")
print("   - 叶子数量: 60")
print("   - 学习率: 0.1")

return model
Enter fullscreen mode Exit fullscreen mode

def analyze_topk_dropout_strategy():
"""分析TopkDropoutStrategy的核心机制"""
print("\n🔍 TopkDropoutStrategy 源码机制分析")
print("=" * 60)

# 1. 核心参数解析
print("📋 核心参数说明:")
print("   • topk: 持仓股票数量 (如: 50只)")
print("   • n_drop: 每日调仓数量 (如: 5只)")
print("   • method_sell: 卖出策略")
print("     - 'bottom': 卖出得分最低的股票")
print("     - 'random': 随机卖出")
print("   • method_buy: 买入策略") 
print("     - 'top': 买入得分最高的未持有股票")
print("     - 'random': 随机买入")
print("   • hold_thresh: 最小持有天数")
print("   • only_tradable: 是否只考虑可交易股票")
print("   • forbid_all_trade_at_limit: 涨跌停时是否禁止交易")

# 2. 算法流程分析
print("\n🔄 核心算法流程:")
print("   1️⃣ 获取当前交易日的预测信号")
print("   2️⃣ 分析当前持仓,按得分排序")  
print("   3️⃣ 选择买入候选股票 (未持有的高得分股票)")
print("   4️⃣ 确定卖出股票 (避免卖高买低)")
print("   5️⃣ 生成具体的买卖订单")

# 3. 关键设计亮点
print("\n💡 关键设计亮点:")
print("   ✨ 防卖高买低: 通过comb机制确保交易合理性")
print("   ✨ 持有时间控制: hold_thresh避免过度频繁交易")
print("   ✨ 可交易性检查: 考虑股票实际可交易状态")
print("   ✨ 风险管理: risk_degree控制总体仓位")
Enter fullscreen mode Exit fullscreen mode

def analyze_simulator_executor():
"""分析SimulatorExecutor的核心机制"""
print("\n🎮 SimulatorExecutor 执行器机制分析")
print("=" * 60)

# 1. 执行模式分析
print("⚡ 交易执行模式:")
print("   • TT_SERIAL (串行模式):")
print("     - 订单按序执行")
print("     - 支持先卖后买 (释放资金再投资)")
print("     - 适合日频交易")
print("   • TT_PARAL (并行模式):")
print("     - 订单并行执行") 
print("     - 卖单优先执行 (避免资金冲突)")
print("     - 适合高频交易场景")

# 2. 核心功能
print("\n🛠️ 核心功能模块:")
print("   📦 订单管理: _get_order_iterator() 处理订单序列")
print("   💰 资金管理: 实时跟踪账户资金变化")
print("   📊 交易记录: 详细记录每笔交易的执行情况")
print("   🔄 日内跟踪: dealt_order_amount 跟踪日内累计交易")

# 3. 集成特点
print("\n🔗 基础设施集成:")
print("   🏪 Exchange: 模拟真实交易所环境")
print("   💳 Account: 管理资金和持仓状态")
print("   📅 Calendar: 管理交易日历和时间")
print("   📈 Metrics: 生成交易和组合指标")
Enter fullscreen mode Exit fullscreen mode

def create_strategy_config(model, dataset):
"""创建策略配置"""
print("\n⚙️ 配置TopkDropoutStrategy...")

# 策略配置
strategy_config = {
    "class": "TopkDropoutStrategy",
    "kwargs": {
        "signal": (model, dataset),  # 模型和数据集
        "topk": 20,              # 持有20只股票
        "n_drop": 3,             # 每日调仓3只
        "method_sell": "bottom", # 卖出得分最低的股票
        "method_buy": "top",     # 买入得分最高的股票  
        "hold_thresh": 1,        # 最少持有1天
        "only_tradable": True,   # 只考虑可交易股票
        "forbid_all_trade_at_limit": True,  # 涨跌停时禁止交易
    },
}

print("✅ 策略配置完成")
print(f"   - 持仓数量: {strategy_config['kwargs']['topk']}只")
print(f"   - 日调仓量: {strategy_config['kwargs']['n_drop']}只") 
print(f"   - 换手率: {strategy_config['kwargs']['n_drop']/strategy_config['kwargs']['topk']*100:.1f}%")

return strategy_config
Enter fullscreen mode Exit fullscreen mode

def create_executor_config():
"""创建执行器配置"""
print("\n⚙️ 配置SimulatorExecutor...")

# 执行器配置
executor_config = {
    "class": "SimulatorExecutor",
    "kwargs": {
        "time_per_step": "day",           # 日频交易
        "generate_portfolio_metrics": True,  # 生成组合指标
        "verbose": False,                 # 不显示详细交易信息
        "trade_type": SimulatorExecutor.TT_SERIAL,  # 串行执行模式
        "indicator_config": {             # 交易指标配置
            "show_indicator": True,
            "pa_config": {
                "base_price": "twap",     # 基于时间加权平均价
                "weight_method": "mean",  # 均值加权
            },
            "ffr_config": {
                "weight_method": "value_weighted",  # 按价值加权
            }
        }
    },
}

print("✅ 执行器配置完成")
print(f"   - 执行频率: {executor_config['kwargs']['time_per_step']}")
print(f"   - 执行模式: {executor_config['kwargs']['trade_type']}")
print("   - 指标监控: 启用价格优势和成交率监控")

return executor_config
Enter fullscreen mode Exit fullscreen mode

def run_backtest(strategy_config, executor_config):
"""运行回测"""
print("\n🚀 运行量化回测...")
print("=" * 60)

# 回测配置
backtest_config = {
    "start_time": "2020-09-01",
    "end_time": "2020-12-31", 
    "account": 1000000,       # 初始资金100万
    "benchmark": "SH000300",  # 沪深300基准
    "exchange_kwargs": {
        "freq": "day",
        "limit_threshold": 0.095,  # 涨跌停限制
        "deal_price": "close",     # 收盘价成交
        "open_cost": 0.0005,       # 手续费0.05%
        "close_cost": 0.0005,
        "min_cost": 5,             # 最小手续费5元
    },
}

# 执行回测
print("📊 开始回测计算...")

try:
    # 运行回测
    portfolio_metric_dict, indicator_dict = backtest(
        executor=executor_config,
        strategy=strategy_config, 
        **backtest_config
    )

    print("✅ 回测计算完成")

    # 分析回测结果
    analyze_backtest_results(portfolio_metric_dict, indicator_dict)

    return portfolio_metric_dict, indicator_dict

except Exception as e:
    print(f"❌ 回测执行失败: {e}")
    return None, None
Enter fullscreen mode Exit fullscreen mode

def analyze_backtest_results(portfolio_metric_dict, indicator_dict):
"""分析回测结果"""
print("\n📈 回测结果分析")
print("=" * 60)

if portfolio_metric_dict is None:
    print("❌ 无有效回测结果")
    return

try:
    # 提取关键指标
    excess_return_without_cost = portfolio_metric_dict.get('excess_return_without_cost', {})
    excess_return_with_cost = portfolio_metric_dict.get('excess_return_with_cost', {})

    print("📊 投资组合表现:")
    if 'annualized_return' in excess_return_without_cost:
        print(f"   📈 年化收益率 (无成本): {excess_return_without_cost['annualized_return']:.2%}")
    if 'annualized_return' in excess_return_with_cost:  
        print(f"   💰 年化收益率 (含成本): {excess_return_with_cost['annualized_return']:.2%}")
    if 'information_ratio' in excess_return_with_cost:
        print(f"   📏 信息比率: {excess_return_with_cost['information_ratio']:.3f}")
    if 'max_drawdown' in excess_return_with_cost:
        print(f"   📉 最大回撤: {excess_return_with_cost['max_drawdown']:.2%}")

    # 交易指标分析
    if indicator_dict:
        print("\n💹 交易执行分析:")
        if '1day.pa' in indicator_dict:
            pa = indicator_dict['1day.pa']
            if isinstance(pa, (int, float)):
                print(f"   ⚡ 价格优势 (PA): {pa:.4f}")
        if '1day.ffr' in indicator_dict:
            ffr = indicator_dict['1day.ffr'] 
            if isinstance(ffr, (int, float)):
                print(f"   ✅ 成交率 (FFR): {ffr:.2%}")

except Exception as e:
    print(f"⚠️ 结果分析出现问题: {e}")
    print("📋 可用指标键名:", list(portfolio_metric_dict.keys()) if portfolio_metric_dict else "无")
Enter fullscreen mode Exit fullscreen mode

def demonstrate_strategy_mechanism():
"""演示策略内部工作机制"""
print("\n🔬 策略机制深度解析")
print("=" * 60)

# 模拟策略决策过程
print("🎯 模拟TopkDropoutStrategy决策过程:")

# 创建模拟数据
np.random.seed(42)
stocks = [f"股票{i:03d}" for i in range(100)]
pred_scores = pd.Series(np.random.randn(100), index=stocks, name='score')
current_holdings = stocks[:20]  # 当前持有前20只

print(f"\n📊 当前市场状态:")
print(f"   • 股票池大小: {len(stocks)}只")
print(f"   • 当前持仓: {len(current_holdings)}只") 
print(f"   • 预测信号范围: [{pred_scores.min():.3f}, {pred_scores.max():.3f}]")

# 模拟策略参数
topk, n_drop = 20, 3

# 步骤1: 当前持仓排序
current_scores = pred_scores[current_holdings].sort_values(ascending=False)
print(f"\n🔹 步骤1 - 当前持仓分析:")
print(f"   最佳持仓: {current_scores.index[0]} (得分: {current_scores.iloc[0]:.3f})")
print(f"   最差持仓: {current_scores.index[-1]} (得分: {current_scores.iloc[-1]:.3f})")

# 步骤2: 买入候选选择  
not_holding = pred_scores[~pred_scores.index.isin(current_holdings)]
buy_candidates = not_holding.sort_values(ascending=False).head(n_drop + topk - len(current_holdings))
print(f"\n🔹 步骤2 - 买入候选分析:")
print(f"   候选数量: {len(buy_candidates)}只")
print(f"   最佳候选: {buy_candidates.index[0]} (得分: {buy_candidates.iloc[0]:.3f})")

# 步骤3: 避免卖高买低
combined = pd.concat([current_scores, buy_candidates]).sort_values(ascending=False)
sell_candidates = current_scores[current_scores.index.isin(combined.tail(n_drop).index)]
print(f"\n🔹 步骤3 - 卖出决策分析:")
print(f"   待卖数量: {len(sell_candidates)}只")
if len(sell_candidates) > 0:
    print(f"   将卖出: {sell_candidates.index[0]} (得分: {sell_candidates.iloc[0]:.3f})")

# 步骤4: 最终交易决策
actual_buy = buy_candidates.head(len(sell_candidates))
print(f"\n🔹 步骤4 - 最终交易决策:")
print(f"   卖出股票数: {len(sell_candidates)}")
print(f"   买入股票数: {len(actual_buy)}")
print(f"   预期换手率: {len(sell_candidates)/topk*100:.1f}%")
Enter fullscreen mode Exit fullscreen mode

def main():
"""主函数"""
print("🎯 TopkDropoutStrategy 和 SimulatorExecutor 源码分析演示")
print("=" * 80)

# 初始化环境
if not initialize_qlib():
    print("❌ 环境初始化失败,程序退出")
    return

try:
    # 源码机制分析
    analyze_topk_dropout_strategy()
    analyze_simulator_executor() 

    # 演示策略内部机制
    demonstrate_strategy_mechanism()

    # 创建数据集
    dataset = create_dataset()

    # 训练模型
    model = train_model(dataset)

    # 配置策略和执行器
    strategy_config = create_strategy_config(model, dataset)
    executor_config = create_executor_config()

    # 运行回测
    portfolio_results, indicator_results = run_backtest(strategy_config, executor_config)

    print("\n🎉 演示完成!")
    print("=" * 80)
    print("📚 关键知识点总结:")
    print("   1️⃣ TopkDropoutStrategy实现智能选股和动态调仓")
    print("   2️⃣ SimulatorExecutor提供高保真的交易执行模拟")
    print("   3️⃣ 两者协同工作构成完整的量化回测系统") 
    print("   4️⃣ 模块化设计支持灵活的策略定制和优化")

    print(f"\n📖 详细教程文档: tutorial/markdown/topk_dropout_strategy_and_simulator_executor_analysis.md")
    print(f"💻 演示代码位置: demo/topk_dropout_and_simulator_demo.py")

except Exception as e:
    print(f"❌ 程序执行出错: {e}")
    import traceback
    traceback.print_exc()
Enter fullscreen mode Exit fullscreen mode

if name == "main":
main()

Top comments (0)