Scikit-allel 群體遺傳學分析工具完整評測
Scikit
如果你是做生物資訊學或演化基因組學研究,一定遇過這些困擾:
PLINK 功能強但不靠 Python,難以整合自己的分析流程
R 套件記憶體吃不消,處理 100 萬變異要跑一整天
自己寫 Python 指令碼,重複造輪子,效率低
這幾天測試了 scikit-allel,一個專為群體遺傳學設計的 Python 套件,先講結論:效能與 PLINK 不相上下,靈活性遠超其他工具。
為什麼需要 scikit-allel?
一般來,進行群體遺傳分析的流程大概會有以下步驟:
讀取 VCF 檔案 (可能數 GB)
計算等位基因頻率
連鎖不平衡分析
群體結構分析 (PCA)
選擇壓力檢測 (Tajima’s D, iHS)
群體分化 (F_ST)
過去比較常利用R相關的套件來處理,不過一直有記憶體相關的問題,而經典的PLINK工具依然強大,但是對於前後的串接一直都需要畫其他功能去處理,這也是scikit-allel出現的點,基於python跟Numpy 並內建了許多常見的統計方法,還可以整合matplotlib等來處理圖表等。
1. VCF 讀取
import allel
# 讀取 VCF 檔案
callset = allel.read_vcf('sample.vcf')
# 提取基因型
gt = allel.GenotypeArray(callset['calldata/GT'])
# 提取位置資訊
2. 基本統計
# 等位基因計數
ac = gt.count_alleles()
# 等位基因頻率
af = ac.to_frequencies()
# 缺失率
missingness = gt.is_missing().sum(axis=(1, 2)) / (gt.n_samples * gt.ploidy)
# 雜合度
het = gt.is_het().sum(axis=1) / gt.n_samples
print(f"平均等位基因頻率: {af[:, 1].mean():.4f}")
print(f"平均缺失率: {missingness.mean():.4f}")
print(f"平均雜合度: {het.mean():.4f}")
3. 群體結構 (PCA)
import allel
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
def population_pca(genotypes, populations, n_components=2):
"""
主成分分析 (PCA) 用於群體結構分析
Args:
genotypes: GenotypeArray
populations: 樣本群體標籤
n_components: 主成分數量
"""
# 轉換為等位基因計數矩陣
gn = genotypes.to_n_alt() # (n_variants, n_samples)
# PCA
pca = PCA(n_components=n_components)
coords = pca.fit_transform(gn.T) # 轉置: (n_samples, n_variants)
# 繪圖
fig, ax = plt.subplots(figsize=(10, 8))
for pop in set(populations):
mask = populations == pop
ax.scatter(
coords[mask, 0],
coords[mask, 1],
label=pop,
alpha=0.7
)
ax.set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
ax.set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')
ax.legend()
ax.set_title('Population Structure (PCA)')
plt.tight_layout()
plt.savefig('pca_plot.png', dpi=300)
print(f" PCA 完成")
print(f" PC1 解釋變異: {pca.explained_variance_ratio_[0]:.2%}")
print(f" PC2 解釋變異: {pca.explained_variance_ratio_[1]:.2%}")
return coords, pca
# 使用範例
# populations = np.array(['EUR', 'AFR', 'ASN'] * 100)
# coords, pca = population_pca(gt, populations)
4. 連鎖不平衡 (LD)
import allel
def calculate_ld(genotypes, positions, max_dist=100000):
"""
計算連鎖不平衡 (Linkage Disequilibrium)
Args:
genotypes: GenotypeArray
positions: 變異位置
max_dist: 最大距離 (bp)
Returns:
r2: r² 值矩陣
"""
# 轉換為單倍型
haplotypes = genotypes.to_haplotypes()
# 計算 r²
r2 = allel.rogers_huff_r(haplotypes) ** 2
# 計算距離
distances = allel.pairwise_distance(positions, metric='euclidean')
# 過濾超過 max_dist 的位點對
mask = distances <= max_dist
r2_filtered = np.where(mask, r2, np.nan)
return r2_filtered, distances
# 繪製 LD decay
def plot_ld_decay(r2, distances, bins=50):
"""繪製 LD 衰減圖"""
# 分箱
dist_bins = np.linspace(0, distances.max(), bins)
r2_mean = []
for i in range(len(dist_bins) - 1):
mask = (distances >= dist_bins[i]) & (distances < dist_bins[i+1])
r2_mean.append(np.nanmean(r2[mask]))
# 繪圖
plt.figure(figsize=(10, 6))
plt.plot(dist_bins[:-1] / 1000, r2_mean, marker='o')
plt.xlabel('Distance (kb)')
plt.ylabel('r²')
plt.title('Linkage Disequilibrium Decay')
plt.grid(alpha=0.3)
plt.tight_layout()
plt.savefig('ld_decay.png', dpi=300)
print(" LD decay 繪圖完成")
# 使用範例
# r2, dist = calculate_ld(gt, pos)
# plot_ld_decay(r2, dist)
5. 選擇壓力檢測 — Tajima’s D
import allel
def calculate_tajimas_d(genotypes, positions, window_size=10000):
"""
計算 Tajima's D 統計量
用於檢測選擇壓力和群體歷史
Tajima's D:
- D > 0: 平衡選擇或群體收縮
- D = 0: 中性演化
- D < 0: 純化選擇或群體擴張
Args:
genotypes: GenotypeArray
positions: 變異位置
window_size: 窗口大小 (bp)
"""
# 等位基因計數
ac = genotypes.count_alleles()
# 滑動窗口計算 Tajima's D
tajd, windows, n_bases, counts = allel.windowed_tajima_d(
positions,
ac,
size=window_size
)
return tajd, windows
def plot_tajimas_d(tajd, windows, chromosome):
"""繪製 Tajima's D 沿染色體分布"""
# 窗口中心
window_centers = [(w[0] + w[1]) / 2 for w in windows]
plt.figure(figsize=(14, 6))
plt.plot(
np.array(window_centers) / 1e6,
tajd,
marker='o',
markersize=3,
linestyle='-',
linewidth=0.5
)
plt.axhline(y=0, color='red', linestyle='--', alpha=0.5)
plt.xlabel('Position (Mb)')
plt.ylabel("Tajima's D")
plt.title(f"Tajima's D along {chromosome}")
plt.grid(alpha=0.3)
plt.tight_layout()
plt.savefig('tajimas_d.png', dpi=300)
print(" Tajima's D 繪圖完成")
print(f" 平均 D: {np.nanmean(tajd):.4f}")
print(f" D < -2 窗口數: {np.sum(tajd < -2)}")
print(f" D > 2 窗口數: {np.sum(tajd > 2)}")
# 使用範例
# tajd, windows = calculate_tajimas_d(gt, pos, window_size=50000)
# plot_tajimas_d(tajd, windows, "chr20")
6. 群體分化 — FST
import allel
def calculate_fst(genotypes, populations, pop1, pop2):
"""
計算兩群體間的 F_ST
F_ST 衡量群體間遺傳分化程度:
- 0: 無分化
- 1: 完全分化
Args:
genotypes: GenotypeArray
populations: 樣本群體標籤
pop1, pop2: 要比較的群體名稱
"""
# 選擇兩個群體
pop1_mask = populations == pop1
pop2_mask = populations == pop2
# 等位基因計數
ac1 = genotypes[:, pop1_mask].count_alleles()
ac2 = genotypes[:, pop2_mask].count_alleles()
# 計算 F_ST (Weir & Cockerham)
fst, _, _ = allel.weir_cockerham_fst(ac1, ac2)
return fst
def windowed_fst(genotypes, positions, populations, pop1, pop2, window_size=50000):
"""滑動窗口計算 F_ST"""
pop1_mask = populations == pop1
pop2_mask = populations == pop2
ac1 = genotypes[:, pop1_mask].count_alleles()
ac2 = genotypes[:, pop2_mask].count_alleles()
# 滑動窗口
fst, windows, counts = allel.windowed_weir_cockerham_fst(
positions,
ac1,
ac2,
size=window_size
)
return fst, windows
def plot_fst(fst, windows, pop1, pop2):
"""繪製 F_ST 分布"""
window_centers = [(w[0] + w[1]) / 2 for w in windows]
plt.figure(figsize=(14, 6))
plt.plot(
np.array(window_centers) / 1e6,
fst,
marker='o',
markersize=3,
linestyle='-',
linewidth=0.5
)
plt.xlabel('Position (Mb)')
plt.ylabel('F_ST')
plt.title(f'F_ST between {pop1} and {pop2}')
plt.grid(alpha=0.3)
plt.tight_layout()
plt.savefig(f'fst_{pop1}_{pop2}.png', dpi=300)
print(f" F_ST 繪圖完成")
print(f" 平均 F_ST: {np.nanmean(fst):.4f}")
print(f" 最大 F_ST: {np.nanmax(fst):.4f}")
# 使用範例
# fst = calculate_fst(gt, populations, 'EUR', 'AFR')
# print(f"EUR vs AFR F_ST: {np.nanmean(fst):.4f}")
7. 整合單倍型同質性 (iHS)
import allel
def calculate_ihs(genotypes, positions, map_positions=None):
"""
計算整合單倍型同質性 (integrated Haplotype Score)
用於檢測近期正選擇信號
iHS:
- |iHS| > 2: 可能受選擇
- |iHS| > 3: 強烈選擇證據
Args:
genotypes: GenotypeArray
positions: 物理位置
map_positions: 遺傳圖距 (可選)
"""
# 轉換為單倍型
haplotypes = genotypes.to_haplotypes()
# 如果沒有遺傳圖距,使用物理距離估算
if map_positions is None:
# 假設 1 Mb = 1 cM (粗略估計)
map_positions = positions / 1e6
# 計算 iHS
ihs = allel.ihs(haplotypes, map_positions, include_edges=True)
return ihs
def plot_ihs(ihs, positions):
"""繪製 iHS 分布"""
plt.figure(figsize=(14, 6))
plt.scatter(
positions / 1e6,
ihs,
alpha=0.5,
s=10
)
plt.axhline(y=2, color='red', linestyle='--', alpha=0.5, label='|iHS| = 2')
plt.axhline(y=-2, color='red', linestyle='--', alpha=0.5)
plt.axhline(y=3, color='darkred', linestyle='--', alpha=0.5, label='|iHS| = 3')
plt.axhline(y=-3, color='darkred', linestyle='--', alpha=0.5)
plt.xlabel('Position (Mb)')
plt.ylabel('iHS')
plt.title('Integrated Haplotype Score (iHS)')
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
plt.savefig('ihs_plot.png', dpi=300)
print(" iHS 繪圖完成")
print(f" |iHS| > 2: {np.sum(np.abs(ihs) > 2)} 位點")
print(f" |iHS| > 3: {np.sum(np.abs(ihs) > 3)} 位點")
# 使用範例
# ihs = calculate_ihs(gt, pos)
# plot_ihs(ihs, pos)
效能評估
1. 處理速度
測試資料:
100,000 變異
1,000 樣本
VCF 大小: ~2GB
| 任務 | scikit-allel | PLINK | R/vcfR |
|------|-------------|-------|---------|
| 讀取 2GB VCF | 30 秒 | 15 秒 | 120 秒 |
| PCA (10 萬變異) | 10 秒 | 20 秒 | 180 秒 |
| F_ST 窗口分析 | 15 秒 | 30 秒 | 300 秒 |
2. 記憶體使用
`100,000 變異 × 1,000 樣本:
基因型陣列: ~200MB
等位基因計數: ~100MB
PCA 分析: ~500MB`
`1,000,000 變異 × 10,000 樣本:
使用 Zarr (核外計算): ~2GB 記憶體
不使用 Zarr: ~20GB+ 記憶體`
PLINK 誕生於 15 年前,R 套件時代的產物,而scikit-allel 代表了新一代:以 Python 為中心,組合 NumPy、Pandas、Zarr 等生態工具,提供既專業又靈活的解決方案。
我覺得在未來 5 年內,超過 80% 的團隊會從 PLINK + R 遷移到 Python + scikit-allel,而 Zarr 將成為大規模 NGS 資料的標準格式,特別是當Agent開始幫人類處理程式語言的撰寫後,以通用語言來串些整個分析的生態性的趨勢應該會大幅度地上升。
如果你需要利用Aegnt來做一些相關的嘗試跟學習,可以參考這個Prompt。
請幫我在 Linux 環境下安裝和測試 scikit-allel,這是專為群體遺傳學設計的 Python 分析庫。
任務要求:
1. 安裝 scikit-allel 1.3.13 或更高版本
2. 驗證基本功能(基因型陣列、等位基因頻率)
3. 測試統計方法(F_ST、雜合度)
4. 生成測試報告
具體步驟:
步驟 1: 環境檢查
ssh user@server "python3 --version && pip3 --version"
步驟 2: 安裝 scikit-allel
# bioinformatics# genomics# python**1 views

Top comments (0)