
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import re
import time
import requests
import os
import pandas as pd
from tqdm import tqdm
import signal
import sys
from datetime import datetime
class TwitterFollowerExtractor:
"""Twitter粉丝数提取器 - 支持断点续传和逐条保存"""
X_RAPIDAPI_KEY = "xxx"
ENDPOINT = "https://twitter-v1-1-v2-api.p.rapidapi.com/graphql/UserByScreenName"
def __init__(self, csv_file_path):
self.csv_file_path = csv_file_path
self.df = None
self.progress_file = csv_file_path + '.progress'
self.lock_file = csv_file_path + '.lock'
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""信号处理器"""
print(f"\n收到中断信号,正在安全退出...")
self._cleanup()
sys.exit(0)
def _cleanup(self):
"""清理资源"""
if os.path.exists(self.lock_file):
os.remove(self.lock_file)
def _extract_twitter_username(self, url):
"""从URL提取Twitter用户名"""
if not url:
return None
patterns = [
r'(?:https?://)?(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/',
r'(?:https?://)?(?:www\.)?(?:x\.com|twitter\.com)/([^/?]+)'
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
username = match.group(1).split('?')[0]
return username if username.lower() != 'status' else None
return None
def _get_follower_count(self, username):
"""获取粉丝数,带重试机制"""
if not username:
return None
headers = {
"X-RapidAPI-Key": self.X_RAPIDAPI_KEY,
"X-RapidAPI-Host": "twitter-v1-1-v2-api.p.rapidapi.com"
}
variables = {
"screen_name": username,
"withSafetyModeUserFields": True,
"withHighlightedLabel": True
}
for attempt in range(3): # 3次重试
try:
response = requests.get(
self.ENDPOINT,
headers=headers,
params={"variables": json.dumps(variables)}
)
if response.status_code == 200:
data = response.json()
if "data" in data and "user" in data["data"] and data["data"]["user"]:
user_result = data["data"]["user"]["result"]
if "legacy" in user_result:
return user_result["legacy"]["followers_count"]
if attempt < 2: # 不是最后一次尝试
time.sleep(2)
except Exception as e:
if attempt < 2:
time.sleep(2)
else:
print(f"获取 {username} 粉丝数失败: {e}")
return None
def _load_csv(self):
"""加载CSV文件"""
try:
for encoding in ['utf-8-sig', 'utf-8', 'gbk']:
try:
self.df = pd.read_csv(self.csv_file_path, encoding=encoding)
# 清理列名
self.df.columns = [col.strip().replace('\n', '').replace('\r', '')
for col in self.df.columns]
print(f"加载CSV成功: {len(self.df)} 行")
return True
except UnicodeDecodeError:
continue
print("无法读取CSV文件")
return False
except Exception as e:
print(f"加载CSV失败: {e}")
return False
def _save_csv(self):
"""原子性保存CSV"""
temp_file = self.csv_file_path + '.tmp'
try:
self.df.to_csv(temp_file, index=False, encoding='utf-8-sig')
os.replace(temp_file, self.csv_file_path)
return True
except Exception as e:
print(f"保存失败: {e}")
if os.path.exists(temp_file):
os.remove(temp_file)
return False
def _load_progress(self):
"""加载进度"""
if not os.path.exists(self.progress_file):
return set()
try:
with open(self.progress_file, 'r') as f:
data = json.load(f)
processed = set(data.get('processed_usernames', []))
print(f"从断点继续: 已处理 {len(processed)} 个用户")
return processed
except:
return set()
def _save_progress(self, processed_usernames, count):
"""保存进度"""
try:
with open(self.progress_file, 'w') as f:
json.dump({
'processed_usernames': list(processed_usernames),
'processed_count': count,
'last_update': datetime.now().isoformat()
}, f)
except Exception as e:
print(f"保存进度失败: {e}")
def _create_lock(self):
"""创建锁文件"""
if os.path.exists(self.lock_file):
print("检测到锁文件,可能有其他实例运行。如确认没有,请删除锁文件后重试。")
return False
try:
with open(self.lock_file, 'w') as f:
f.write(f"{os.getpid()}\n{datetime.now()}")
return True
except:
return False
def _extract_usernames(self):
"""提取需要处理的用户名"""
usernames = []
for idx, row in self.df.iterrows():
twitter_url = None
# 从ext_info提取
if 'ext_info' in row and pd.notna(row['ext_info']):
try:
ext_info = json.loads(row['ext_info'])
twitter_url = ext_info.get('twitterUrl')
except:
pass
# 从url列提取
if not twitter_url and 'url' in self.df.columns and pd.notna(row['url']):
twitter_url = row['url']
if twitter_url:
username = self._extract_twitter_username(twitter_url)
if username:
usernames.append((idx, username, twitter_url))
return usernames
def _is_valid_follower_count(self, value):
"""检查是否为有效的粉丝数"""
if pd.isna(value):
return False
value_str = str(value).strip()
if value_str in ['#VALUE!', 'nan', '', '\n']:
return False
try:
int(float(value_str))
return True
except:
return False
def _generate_summary(self, total_processed):
"""生成统计摘要"""
if '粉丝数' not in self.df.columns:
return
valid_counts = self.df['粉丝数'].apply(self._is_valid_follower_count).sum()
print(f"\n{'='*50}")
print(f"处理完成统计:")
print(f"总处理用户数: {total_processed}")
print(f"成功获取粉丝数: {valid_counts}")
print(f"失败数量: {total_processed - valid_counts}")
# 显示Top 10
if valid_counts > 0:
valid_df = self.df[self.df['粉丝数'].apply(self._is_valid_follower_count)]
if not valid_df.empty:
top_accounts = valid_df.nlargest(10, '粉丝数')
print(f"\nTop 10 账户:")
for _, row in top_accounts.iterrows():
url_value = row.get('url', "N/A")
followers = int(float(str(row['粉丝数']).strip()))
username = self._extract_twitter_username(url_value)
print(f"- {username}: {followers:,} 粉丝")
print(f"{'='*50}")
def process_followers(self):
"""主处理流程"""
print("开始Twitter粉丝数提取...")
# 检查文件
if not os.path.exists(self.csv_file_path):
print(f"文件不存在: {self.csv_file_path}")
return
# 创建锁文件
if not self._create_lock():
return
try:
# 备份文件
backup_file = self.csv_file_path + '.backup'
if not os.path.exists(backup_file):
with open(self.csv_file_path, 'rb') as src, open(backup_file, 'wb') as dst:
dst.write(src.read())
print(f"已创建备份: {backup_file}")
# 加载数据
if not self._load_csv():
return
# 提取用户名
all_usernames = self._extract_usernames()
print(f"找到 {len(all_usernames)} 个Twitter用户")
# 加载进度
processed_usernames = self._load_progress()
# 过滤待处理用户
remaining_usernames = []
for idx, username, url in all_usernames:
if username in processed_usernames:
continue
# 检查是否已有有效粉丝数
if '粉丝数' in self.df.columns and self._is_valid_follower_count(self.df.at[idx, '粉丝数']):
processed_usernames.add(username)
continue
remaining_usernames.append((idx, username, url))
print(f"需要处理: {len(remaining_usernames)} 个用户")
# 处理用户
processed_count = len(processed_usernames)
for idx, username, url in tqdm(remaining_usernames, desc="获取粉丝数"):
try:
follower_count = self._get_follower_count(username)
if follower_count is not None:
# 更新数据
if '粉丝数' not in self.df.columns:
self.df['粉丝数'] = None
self.df.at[idx, '粉丝数'] = follower_count
# 保存数据
if self._save_csv():
processed_usernames.add(username)
processed_count += 1
print(f"{username}: {follower_count:,} 粉丝")
# 每10个保存一次进度
if processed_count % 10 == 0:
self._save_progress(processed_usernames, processed_count)
else:
processed_usernames.add(username) # 标记为已处理避免重复
except Exception as e:
print(f"处理 {username} 时出错: {e}")
continue
# 保存最终进度
self._save_progress(processed_usernames, processed_count)
# 生成摘要
self._generate_summary(len(all_usernames))
finally:
self._cleanup()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Twitter粉丝数提取器')
parser.add_argument('--clean', action='store_true', help='清理进度文件重新开始')
parser.add_argument('--file', default='用户活动报名信息.csv', help='CSV文件路径')
args = parser.parse_args()
extractor = TwitterFollowerExtractor(args.file)
if args.clean:
for file in [extractor.progress_file, extractor.lock_file]:
if os.path.exists(file):
os.remove(file)
print(f"已清理: {file}")
extractor.process_followers()
Top comments (0)