@staticmethod
def _calculate_cointegration_params(base_prices: pd.Series, alt_prices: pd.Series,
coin: str = None, base_symbol: str = None) -> Optional[dict]:
"""
使用OLS回归计算协整参数(验证性函数)
通过OLS回归计算截距项α和斜率β,并进行ADF检验验证价差平稳性。
这是协整检验的标准方法(Engle-Granger两步法)。
Args:
base_prices: 基准币种价格序列(pandas Series)
alt_prices: 山寨币价格序列(pandas Series)
coin: 币种名称(可选,用于日志)
Returns:
dict: {
'alpha': 截距项(价格溢价/折价),
'beta': OLS回归系数(对冲比例),
'spread': OLS价差序列(残差,保留原始索引),
'adf_pvalue': ADF检验p值(平稳性)
}
None: 如果计算失败
Note:
- α显著非0表示存在固定溢价/折价
- β是最优对冲比例
- ADF p-value < 0.05 表示价差平稳,适合配对交易
- 此函数用于验证性分析,使用全样本数据是标准做法(Engle-Granger两步法)
- 返回的价差序列保留原始时间索引,便于后续分析
- 注意:此函数存在 look-ahead bias(使用全样本),仅用于事后验证,不适用于实时交易
实时交易应使用 price_diff_spread_ols_window 方法
"""
try:
# 1. 数据验证
if len(base_prices) != len(alt_prices):
coin_info = f" | 币种: {coin}" if coin else ""
base_symbol_info = f" | 基准币种: {base_symbol}" if base_symbol else ""
logger.warning(f"协整参数计算失败:基准币种和ALT数据长度不一致 | "
f"基准币种: {len(base_prices)}, ALT: {len(alt_prices)}"
f"{coin_info}{base_symbol_info}")
return None
if len(base_prices) < 10: # 最小数据点要求
coin_info = f" | 币种: {coin}" if coin else ""
logger.debug(f"协整参数计算失败:数据点不足(需要至少10个点,实际{len(base_prices)}个){coin_info}")
return None
# 2. 计算对数价格(保留索引信息)
log_base_series = np.log(base_prices)
log_alt_series = np.log(alt_prices)
# 3. statsmodels OLS回归(带常数项):log_alt = α + β * log_base + ε
# 注意:此函数用于验证性分析,使用全样本是标准做法(Engle-Granger两步法)
# 虽然存在 look-ahead bias,但这是协整检验的标准方法
X = sm.add_constant(log_base_series)
model = sm.OLS(log_alt_series, X).fit()
alpha = model.params.iloc[0] # 常数项
beta = model.params.iloc[1] # 斜率
alpha_pvalue = model.pvalues.iloc[0] # α的p值
beta_pvalue = model.pvalues.iloc[1] # β的p值
rsquared = model.rsquared # 拟合优度
# 4. 根据α显著性和绝对值大小选择价差计算方法(智能模型选择)
if alpha_pvalue < 0.05 and abs(alpha) > 5:
# α显著且绝对值很大 → 跨资产类配对(如NEAR/BTC)
# 使用无α模型更稳健(避免α时变性问题)
spread_ols = log_alt_series - beta * log_base_series
model_type = "no_intercept_forced"
use_alpha = False
model_reason = f"|α|={abs(alpha):.1f}>5, 跨资产类配对"
elif alpha_pvalue < 0.05 and abs(alpha) < 2:
# α显著且绝对值较小 → 同类资产配对(如UNI/SUSHI)
# α代表真实的溢价关系,应当包含
spread_ols = log_alt_series - (alpha + beta * log_base_series)
model_type = "standard_EG" # 标准Engle-Granger
use_alpha = True
model_reason = f"|α|={abs(alpha):.1f}<2, 同类资产配对"
else:
# α不显著或中等范围(2<=|α|<=5)→ 使用无α模型
spread_ols = log_alt_series - beta * log_base_series
model_type = "no_intercept"
use_alpha = False
if alpha_pvalue >= 0.05:
model_reason = "α不显著"
else:
model_reason = f"|α|={abs(alpha):.1f}∈[2,5], 中等范围"
# 5. ADF检验价差平稳性(使用数值数组)
adf_result = adfuller(spread_ols.values, autolag='AIC')
adf_pvalue = adf_result[1]
# 6. 日志输出(便于调试)
if coin:
logger.debug(
f"协整参数 | 币种: {coin} | α={alpha:.4f} (p={alpha_pvalue:.4f}) | "
f"β={beta:.4f} (p={beta_pvalue:.4f}) | R²={rsquared:.4f} | "
f"模型: {model_type} | 原因: {model_reason} | ADF p={adf_pvalue:.4f}"
)
return {
'alpha': alpha,
'beta': beta,
'spread': spread_ols, # 保留原始索引的 pandas Series
'adf_pvalue': adf_pvalue,
# 新增统计信息
'alpha_pvalue': alpha_pvalue,
'beta_pvalue': beta_pvalue,
'rsquared': rsquared,
'model_type': model_type,
'use_alpha': use_alpha, # 标记是否使用了α
'model_reason': model_reason # 模型选择原因
}
except Exception as e:
coin_info = f" | 币种: {coin}" if coin else ""
logger.debug(f"OLS协整参数计算失败:{type(e).__name__}: {str(e)}{coin_info}", exc_info=True)
return None
@staticmethod
def price_diff_spread_ols_window(base_prices: pd.Series, alt_prices: pd.Series, beta_window: int = 100, zscore_window: int = 30) -> pd.Series:
"""
计算价格差价(双窗口策略:OLS回归使用长窗口(稳定),统计量使用短窗口(敏感)。)
"""
# 4. 数据切片:取足够计算OLS回归和统计量的数据
data_window = max(beta_window, zscore_window)
recent_base_full = base_prices.iloc[-data_window:]
recent_alt_full = alt_prices.iloc[-data_window:]
# 5. OLS回归计算协整参数
# 使用前 beta_window-1 个点计算OLS参数(避免 look-ahead bias)
# 公式:log_alt = α + β × log_base + ε
# 用途:构建价差序列 spread = log(ALT) - (α + β × log(BASE))
ols_base = recent_base_full.iloc[:-1]
ols_alt = recent_alt_full.iloc[:-1]
# 计算对数价格
log_base_ols = np.log(ols_base)
log_alt_ols = np.log(ols_alt)
# statsmodels OLS回归
X = sm.add_constant(log_base_ols)
model = sm.OLS(log_alt_ols, X).fit()
alpha = model.params.iloc[0] # 常数项
beta_ols = model.params.iloc[1] # 斜率
alpha_pvalue = model.pvalues.iloc[0] # α的p值
beta_pvalue = model.pvalues.iloc[1] # β的p值
rsquared = model.rsquared # 拟合优度
# 根据α显著性和绝对值大小选择价差计算方法(智能模型选择)
log_base_full = np.log(recent_base_full) # 全部100期
log_alt_full = np.log(recent_alt_full)
if alpha_pvalue < 0.05 and abs(alpha) > 5:
# α显著且绝对值很大 → 跨资产类配对(如NEAR/BTC)
# 使用无α模型更稳健(避免α时变性问题)
spread_full = log_alt_full - beta_ols * log_base_full
model_type = "no_intercept_forced"
use_alpha = False
model_reason = f"|α|={abs(alpha):.1f}>5, 跨资产类配对"
elif alpha_pvalue < 0.05 and abs(alpha) < 2:
# α显著且绝对值较小 → 同类资产配对(如UNI/SUSHI)
# α代表真实的溢价关系,应当包含
spread_full = log_alt_full - (alpha + beta_ols * log_base_full)
model_type = "standard_EG"
use_alpha = True
model_reason = f"|α|={abs(alpha):.1f}<2, 同类资产配对"
else:
# α不显著或中等范围(2<=|α|<=5)→ 使用无α模型
spread_full = log_alt_full - beta_ols * log_base_full
model_type = "no_intercept"
use_alpha = False
if alpha_pvalue >= 0.05:
model_reason = "α不显著"
else:
model_reason = f"|α|={abs(alpha):.1f}∈[2,5], 中等范围"
# ADF检验价差平稳性
adf_result = adfuller(spread_full.values, autolag='AIC')
adf_pvalue = adf_result[1]
# 6. 价差构建(用于Z-score计算:使用短窗口保持敏感度)
# 取最近 zscore_window 期数据,使用长窗口计算的OLS参数构建对数价差
recent_base = recent_base_full.iloc[-zscore_window:]
recent_alt = recent_alt_full.iloc[-zscore_window:]
log_base = np.log(recent_base)
log_alt = np.log(recent_alt)
# 使用相同的模型选择
if use_alpha:
spread = log_alt - (alpha + beta_ols * log_base)
else:
spread = log_alt - beta_ols * log_base
return {
'alpha': alpha, # 截距项(价格溢价/折价)
'beta': beta_ols, # OLS回归系数
'spread': spread, # 用于Z-score计算的价差序列
'adf_pvalue': adf_pvalue, # ADF检验价差平稳性
# 新增统计信息
'alpha_pvalue': alpha_pvalue,
'beta_pvalue': beta_pvalue,
'rsquared': rsquared,
'model_type': model_type,
'use_alpha': use_alpha,
'model_reason': model_reason # 模型选择原因
}
def cointegration_analysis(
self, cointegration_result: dict,
method_type: str,
coin: str = None,
stats_period_key: tuple = None
) -> bool:
"""
协整分析
"""
# 协整检验状态
cointegration_status = False
# 币种信息
coin_info = f" | 币种: {coin} | 方法: {method_type}" if coin else ""
# check_result = ''
if cointegration_result is None or cointegration_result['adf_pvalue'] >= 0.05:
if cointegration_result:
adf_pvalue_str = f"{cointegration_result['adf_pvalue']:.4f}"
alpha_str = f"{cointegration_result['alpha']:.4f}"
beta_str = f"{cointegration_result['beta']:.4f}"
check_result = (f"❌ 协整检验未通过(基于{stats_period_key}周期数据) | "
f"α={alpha_str}, β={beta_str} | "
f"ADF p-value: {adf_pvalue_str} >= 0.05 | "
f"原因: 价差非平稳,不适合配对交易"
f"{coin_info}")
else:
check_result = (
f"❌ 协整检验未通过(基于{stats_period_key}周期数据) | "
f"ADF p-value: N/A | "
f"原因: OLS参数计算失败"
f"{coin_info}"
)
# is_anomaly = False # ⚠️ 协整失败,拒绝信号
else:
# 协整检验通过,输出详细信息
check_result = (f"✅ 协整检验通过(基于{stats_period_key}周期数据) | "
f"α={cointegration_result['alpha']:.4f}, β={cointegration_result['beta']:.4f} | "
f"ADF p-value={cointegration_result['adf_pvalue']:.4f} < 0.05"
f"{coin_info}")
cointegration_status = True
self.alert_content += f"\n{check_result}\n"
logger.info(check_result)
return cointegration_status
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)