DEV Community

drake
drake

Posted on

加密货币协整检验算法


    @staticmethod
    def _calculate_cointegration_params(base_prices: pd.Series, alt_prices: pd.Series,
                                        coin: str = None, base_symbol: str = None) -> Optional[dict]:
        """
        使用OLS回归计算协整参数(验证性函数)

        通过OLS回归计算截距项α和斜率β,并进行ADF检验验证价差平稳性。
        这是协整检验的标准方法(Engle-Granger两步法)。

        Args:
            base_prices: 基准币种价格序列(pandas Series)
            alt_prices: 山寨币价格序列(pandas Series)
            coin: 币种名称(可选,用于日志)

        Returns:
            dict: {
                'alpha': 截距项(价格溢价/折价),
                'beta': OLS回归系数(对冲比例),
                'spread': OLS价差序列(残差,保留原始索引),
                'adf_pvalue': ADF检验p值(平稳性)
            }
            None: 如果计算失败

        Note:
            - α显著非0表示存在固定溢价/折价
            - β是最优对冲比例
            - ADF p-value < 0.05 表示价差平稳,适合配对交易
            - 此函数用于验证性分析,使用全样本数据是标准做法(Engle-Granger两步法)
            - 返回的价差序列保留原始时间索引,便于后续分析
            - 注意:此函数存在 look-ahead bias(使用全样本),仅用于事后验证,不适用于实时交易
              实时交易应使用 price_diff_spread_ols_window 方法
        """
        try:
            # 1. 数据验证
            if len(base_prices) != len(alt_prices):
                coin_info = f" | 币种: {coin}" if coin else ""
                base_symbol_info = f" | 基准币种: {base_symbol}" if base_symbol else ""
                logger.warning(f"协整参数计算失败:基准币种和ALT数据长度不一致 | "
                              f"基准币种: {len(base_prices)}, ALT: {len(alt_prices)}"
                              f"{coin_info}{base_symbol_info}")
                return None

            if len(base_prices) < 10:  # 最小数据点要求
                coin_info = f" | 币种: {coin}" if coin else ""
                logger.debug(f"协整参数计算失败:数据点不足(需要至少10个点,实际{len(base_prices)}个){coin_info}")
                return None

            # 2. 计算对数价格(保留索引信息)
            log_base_series = np.log(base_prices)
            log_alt_series = np.log(alt_prices)

            # 3. statsmodels OLS回归(带常数项):log_alt = α + β * log_base + ε
            # 注意:此函数用于验证性分析,使用全样本是标准做法(Engle-Granger两步法)
            # 虽然存在 look-ahead bias,但这是协整检验的标准方法
            X = sm.add_constant(log_base_series)
            model = sm.OLS(log_alt_series, X).fit()

            alpha = model.params.iloc[0]      # 常数项
            beta = model.params.iloc[1]       # 斜率
            alpha_pvalue = model.pvalues.iloc[0]  # α的p值
            beta_pvalue = model.pvalues.iloc[1]   # β的p值
            rsquared = model.rsquared    # 拟合优度

            # 4. 根据α显著性和绝对值大小选择价差计算方法(智能模型选择)
            if alpha_pvalue < 0.05 and abs(alpha) > 5:
                # α显著且绝对值很大 → 跨资产类配对(如NEAR/BTC)
                # 使用无α模型更稳健(避免α时变性问题)
                spread_ols = log_alt_series - beta * log_base_series
                model_type = "no_intercept_forced"
                use_alpha = False
                model_reason = f"|α|={abs(alpha):.1f}>5, 跨资产类配对"

            elif alpha_pvalue < 0.05 and abs(alpha) < 2:
                # α显著且绝对值较小 → 同类资产配对(如UNI/SUSHI)
                # α代表真实的溢价关系,应当包含
                spread_ols = log_alt_series - (alpha + beta * log_base_series)
                model_type = "standard_EG"  # 标准Engle-Granger
                use_alpha = True
                model_reason = f"|α|={abs(alpha):.1f}<2, 同类资产配对"

            else:
                # α不显著或中等范围(2<=|α|<=5)→ 使用无α模型
                spread_ols = log_alt_series - beta * log_base_series
                model_type = "no_intercept"
                use_alpha = False
                if alpha_pvalue >= 0.05:
                    model_reason = "α不显著"
                else:
                    model_reason = f"|α|={abs(alpha):.1f}∈[2,5], 中等范围"

            # 5. ADF检验价差平稳性(使用数值数组)
            adf_result = adfuller(spread_ols.values, autolag='AIC')
            adf_pvalue = adf_result[1]

            # 6. 日志输出(便于调试)
            if coin:
                logger.debug(
                    f"协整参数 | 币种: {coin} | α={alpha:.4f} (p={alpha_pvalue:.4f}) | "
                    f"β={beta:.4f} (p={beta_pvalue:.4f}) | R²={rsquared:.4f} | "
                    f"模型: {model_type} | 原因: {model_reason} | ADF p={adf_pvalue:.4f}"
                )

            return {
                'alpha': alpha,
                'beta': beta,
                'spread': spread_ols,  # 保留原始索引的 pandas Series
                'adf_pvalue': adf_pvalue,
                # 新增统计信息
                'alpha_pvalue': alpha_pvalue,
                'beta_pvalue': beta_pvalue,
                'rsquared': rsquared,
                'model_type': model_type,
                'use_alpha': use_alpha,  # 标记是否使用了α
                'model_reason': model_reason  # 模型选择原因
            }
        except Exception as e:
            coin_info = f" | 币种: {coin}" if coin else ""
            logger.debug(f"OLS协整参数计算失败:{type(e).__name__}: {str(e)}{coin_info}", exc_info=True)
            return None

    @staticmethod
    def price_diff_spread_ols_window(base_prices: pd.Series, alt_prices: pd.Series, beta_window: int = 100, zscore_window: int = 30) -> pd.Series:
        """
        计算价格差价(双窗口策略:OLS回归使用长窗口(稳定),统计量使用短窗口(敏感)。)
        """
        # 4. 数据切片:取足够计算OLS回归和统计量的数据
        data_window = max(beta_window, zscore_window)
        recent_base_full = base_prices.iloc[-data_window:]
        recent_alt_full = alt_prices.iloc[-data_window:]

        # 5. OLS回归计算协整参数
        # 使用前 beta_window-1 个点计算OLS参数(避免 look-ahead bias)
        # 公式:log_alt = α + β × log_base + ε
        # 用途:构建价差序列 spread = log(ALT) - (α + β × log(BASE))
        ols_base = recent_base_full.iloc[:-1]
        ols_alt = recent_alt_full.iloc[:-1]

        # 计算对数价格
        log_base_ols = np.log(ols_base)
        log_alt_ols = np.log(ols_alt)

        # statsmodels OLS回归
        X = sm.add_constant(log_base_ols)
        model = sm.OLS(log_alt_ols, X).fit()

        alpha = model.params.iloc[0]          # 常数项
        beta_ols = model.params.iloc[1]       # 斜率
        alpha_pvalue = model.pvalues.iloc[0]  # α的p值
        beta_pvalue = model.pvalues.iloc[1]   # β的p值
        rsquared = model.rsquared         # 拟合优度

        # 根据α显著性和绝对值大小选择价差计算方法(智能模型选择)
        log_base_full = np.log(recent_base_full)  # 全部100期
        log_alt_full = np.log(recent_alt_full)

        if alpha_pvalue < 0.05 and abs(alpha) > 5:
            # α显著且绝对值很大 → 跨资产类配对(如NEAR/BTC)
            # 使用无α模型更稳健(避免α时变性问题)
            spread_full = log_alt_full - beta_ols * log_base_full
            model_type = "no_intercept_forced"
            use_alpha = False
            model_reason = f"|α|={abs(alpha):.1f}>5, 跨资产类配对"

        elif alpha_pvalue < 0.05 and abs(alpha) < 2:
            # α显著且绝对值较小 → 同类资产配对(如UNI/SUSHI)
            # α代表真实的溢价关系,应当包含
            spread_full = log_alt_full - (alpha + beta_ols * log_base_full)
            model_type = "standard_EG"
            use_alpha = True
            model_reason = f"|α|={abs(alpha):.1f}<2, 同类资产配对"

        else:
            # α不显著或中等范围(2<=|α|<=5)→ 使用无α模型
            spread_full = log_alt_full - beta_ols * log_base_full
            model_type = "no_intercept"
            use_alpha = False
            if alpha_pvalue >= 0.05:
                model_reason = "α不显著"
            else:
                model_reason = f"|α|={abs(alpha):.1f}∈[2,5], 中等范围"

        # ADF检验价差平稳性
        adf_result = adfuller(spread_full.values, autolag='AIC')
        adf_pvalue = adf_result[1]

        # 6. 价差构建(用于Z-score计算:使用短窗口保持敏感度)
        # 取最近 zscore_window 期数据,使用长窗口计算的OLS参数构建对数价差
        recent_base = recent_base_full.iloc[-zscore_window:]
        recent_alt = recent_alt_full.iloc[-zscore_window:]
        log_base = np.log(recent_base)
        log_alt = np.log(recent_alt)

        # 使用相同的模型选择
        if use_alpha:
            spread = log_alt - (alpha + beta_ols * log_base)
        else:
            spread = log_alt - beta_ols * log_base

        return {
            'alpha': alpha,           # 截距项(价格溢价/折价)
            'beta': beta_ols,         # OLS回归系数
            'spread': spread,         # 用于Z-score计算的价差序列
            'adf_pvalue': adf_pvalue, # ADF检验价差平稳性
            # 新增统计信息
            'alpha_pvalue': alpha_pvalue,
            'beta_pvalue': beta_pvalue,
            'rsquared': rsquared,
            'model_type': model_type,
            'use_alpha': use_alpha,
            'model_reason': model_reason  # 模型选择原因
        }

    def cointegration_analysis(
        self, cointegration_result: dict, 
        method_type: str, 
        coin: str = None, 
        stats_period_key: tuple = None
        ) -> bool:
        """
        协整分析
        """
        # 协整检验状态
        cointegration_status = False
        # 币种信息
        coin_info = f" | 币种: {coin} | 方法: {method_type}" if coin else ""
        # check_result = ''
        if cointegration_result is None or cointegration_result['adf_pvalue'] >= 0.05:
            if cointegration_result:
                adf_pvalue_str = f"{cointegration_result['adf_pvalue']:.4f}"
                alpha_str = f"{cointegration_result['alpha']:.4f}"
                beta_str = f"{cointegration_result['beta']:.4f}"
                check_result = (f"❌ 协整检验未通过(基于{stats_period_key}周期数据) | "
                                f"α={alpha_str}, β={beta_str} | "
                                f"ADF p-value: {adf_pvalue_str} >= 0.05 | "
                                f"原因: 价差非平稳,不适合配对交易"
                                f"{coin_info}")
            else:
                check_result = (
                    f"❌ 协整检验未通过(基于{stats_period_key}周期数据) | "
                    f"ADF p-value: N/A | "
                    f"原因: OLS参数计算失败"
                    f"{coin_info}"
                )
            # is_anomaly = False  # ⚠️ 协整失败,拒绝信号
        else:
            # 协整检验通过,输出详细信息
            check_result = (f"✅ 协整检验通过(基于{stats_period_key}周期数据) | "
                            f"α={cointegration_result['alpha']:.4f}, β={cointegration_result['beta']:.4f} | "
                            f"ADF p-value={cointegration_result['adf_pvalue']:.4f} < 0.05"
                            f"{coin_info}")
            cointegration_status = True
        self.alert_content += f"\n{check_result}\n"
        logger.info(check_result)

        return cointegration_status
Enter fullscreen mode Exit fullscreen mode

Top comments (0)