#!/usr/bin/env python
# -*- coding: utf-8 -*-
importcsvimportjsonimportreimporttimeimportrequestsimportosimportpandasaspdfromtqdmimporttqdmimportfcntlimportsignalimportsysfromdatetimeimportdatetimeclassTwitterFollowerExtractor:"""
A class to extract Twitter follower counts from a CSV file.
"""# API configuration
X_RAPIDAPI_KEY="xxx"RAPIDAPI_HOST="twitter-v1-1-v2-api.p.rapidapi.com"ENDPOINT="https://twitter-v1-1-v2-api.p.rapidapi.com/graphql/UserByScreenName"def__init__(self,csv_file_path):"""
Initialize the extractor with the path to the CSV file.
:param csv_file_path: The path to the CSV file.
"""self.csv_file_path=csv_file_pathself.df=Noneself.progress_file=csv_file_path+'.progress'self.lock_file=csv_file_path+'.lock'self.processed_count=0self.setup_signal_handlers()defsetup_signal_handlers(self):"""设置信号处理器以优雅地处理程序中断"""defsignal_handler(signum,frame):print(f"\n收到信号 {signum},正在安全退出...")self._cleanup_and_exit()signal.signal(signal.SIGINT,signal_handler)signal.signal(signal.SIGTERM,signal_handler)def_cleanup_and_exit(self):"""清理资源并退出"""ifos.path.exists(self.lock_file):os.remove(self.lock_file)print("程序已安全退出。")sys.exit(0)def_create_lock_file(self):"""创建锁文件防止多个实例同时运行"""ifos.path.exists(self.lock_file):print(f"检测到锁文件 {self.lock_file},可能有其他实例正在运行。")print("如果确认没有其他实例运行,请删除锁文件后重试。")returnFalsetry:withopen(self.lock_file,'w')asf:f.write(f"{os.getpid()}\n{datetime.now()}")returnTrueexceptExceptionase:print(f"创建锁文件失败: {e}")returnFalsedef_save_progress(self,processed_usernames):"""保存进度到文件"""try:withopen(self.progress_file,'w')asf:json.dump({'processed_usernames':list(processed_usernames),'processed_count':self.processed_count,'last_update':datetime.now().isoformat()},f,indent=2)exceptExceptionase:print(f"保存进度失败: {e}")def_load_progress(self):"""从文件加载进度"""ifnotos.path.exists(self.progress_file):returnset()try:withopen(self.progress_file,'r')asf:progress_data=json.load(f)processed_usernames=set(progress_data.get('processed_usernames',[]))self.processed_count=progress_data.get('processed_count',0)print(f"从断点继续: 已处理 {len(processed_usernames)} 个用户")returnprocessed_usernamesexceptExceptionase:print(f"加载进度失败: {e}")returnset()def_save_single_row(self,idx,follower_count):"""逐条保存单行数据到CSV文件"""try:# 更新DataFrame
if'粉丝数'notinself.df.columns:self.df['粉丝数']=Noneself.df.at[idx,'粉丝数']=follower_count# 立即写入CSV文件
self._save_csv()returnTrueexceptExceptionase:print(f"保存单行数据失败: {e}")returnFalsedef_extract_twitter_username(self,url):"""Extract Twitter username from URL."""ifnoturl:returnNone# Handle both profile URLs and status URLs
status_pattern=r'(?:https?://)?(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/'profile_pattern=r'(?:https?://)?(?:www\.)?(?:x\.com|twitter\.com)/([^/?]+)'# First try to extract from status URL
match=re.search(status_pattern,url)ifmatch:username=match.group(1)# Skip if username is "status" (malformed URL)
ifusername.lower()=='status':returnNonereturnusername# Then try to extract from profile URL
match=re.search(profile_pattern,url)ifmatch:username=match.group(1)# Remove any query parameters if present
username=username.split('?')[0]# Skip if username is "status" (malformed URL)
ifusername.lower()=='status':returnNonereturnusernamereturnNonedef_get_follower_count(self,username):"""Get follower count for a Twitter username using RapidAPI with retry logic."""ifnotusername:returnNoneheaders={"X-RapidAPI-Key":self.X_RAPIDAPI_KEY,"X-RapidAPI-Host":self.RAPIDAPI_HOST}# Prepare variables according to the correct API format
variables={"screen_name":username,"withSafetyModeUserFields":True,"withHighlightedLabel":True}querystring={"variables":json.dumps(variables)}# Implement retry logic
max_retries=3retry_delay=2# seconds
forattemptinrange(max_retries):try:response=requests.get(self.ENDPOINT,headers=headers,params=querystring)ifresponse.status_code==200:data=response.json()# Extract follower count from the response using the correct path
if"data"indataand"user"indata["data"]anddata["data"]["user"]:user_result=data["data"]["user"]["result"]if"legacy"inuser_result:returnuser_result["legacy"]["followers_count"]else:print(f"No user data found for {username}")else:print(f"API request failed for {username} (Attempt {attempt+1}/{max_retries}): Status code {response.status_code}")# If we're not on the last attempt, wait before retrying
ifattempt<max_retries-1:print(f"Retrying in {retry_delay} seconds...")time.sleep(retry_delay)exceptExceptionase:print(f"Error fetching data for {username} (Attempt {attempt+1}/{max_retries}): {e}")# If we're not on the last attempt, wait before retrying
ifattempt<max_retries-1:print(f"Retrying in {retry_delay} seconds...")time.sleep(retry_delay)returnNonedef_backup_file(self):"""Create a backup of the original CSV file."""backup_file=self.csv_file_path+'.backup'try:withopen(self.csv_file_path,'rb')assrc,open(backup_file,'wb')asdst:dst.write(src.read())print(f"Created backup of original file at {backup_file}")exceptExceptionase:print(f"Warning: Could not create backup file: {e}")def_load_csv(self):"""Load the CSV file into a pandas DataFrame with enhanced compatibility."""try:# Try different encoding methods for better compatibility
encodings=['utf-8-sig','utf-8','gbk','gb2312','latin-1']df_loaded=Falseforencodinginencodings:try:self.df=pd.read_csv(self.csv_file_path,encoding=encoding)df_loaded=Trueprint(f"Successfully loaded CSV with {len(self.df)} rows using {encoding} encoding.")breakexceptUnicodeDecodeError:continueexceptExceptionase:print(f"Error with {encoding} encoding: {e}")continueifnotdf_loaded:print("Failed to load CSV with any encoding method.")returnFalse# Clean up the DataFrame columns and data
self._clean_dataframe()returnTrueexceptExceptionase:print(f"Error reading CSV file: {e}")returnFalsedef_clean_dataframe(self):"""Clean the DataFrame to handle malformed data."""# Clean column names by removing newlines and extra whitespace
self.df.columns=[col.strip().replace('\n','').replace('\r','')forcolinself.df.columns]# Clean the '粉丝数' column if it exists
if'粉丝数'inself.df.columns:# Remove newlines and extra whitespace from the follower count column
self.df['粉丝数']=self.df['粉丝数'].astype(str).str.strip().str.replace('\n','').str.replace('\r','')# Replace empty strings with None
self.df['粉丝数']=self.df['粉丝数'].replace('',None)# Clean other string columns
forcolinself.df.columns:ifself.df[col].dtype=='object':self.df[col]=self.df[col].astype(str).str.strip().str.replace('\n','').str.replace('\r','')# Replace 'nan' strings with None
self.df[col]=self.df[col].replace('nan',None)def_save_csv(self):"""Save the updated DataFrame back to the CSV file."""try:self.df.to_csv(self.csv_file_path,index=False,encoding='utf-8-sig')print(f"Process completed. Follower counts have been updated in {self.csv_file_path}.")exceptExceptionase:print(f"Error saving updated CSV: {e}")print("Please check the backup file if needed.")def_generate_summary(self,processed_count):"""Generate and print a summary of the results."""if'粉丝数'inself.df.columns:total_updated=self.df['粉丝数'].notna().sum()print(f"\nSummary:")print(f"Total Twitter accounts processed: {processed_count}")print(f"Successfully updated follower counts: {total_updated}")print(f"Failed to update follower counts: {processed_count-total_updated}")# Print top 10 accounts by follower count
iftotal_updated>0:print("\nTop 10 accounts by follower count:")top_accounts=self.df[self.df['粉丝数'].notna()].sort_values('粉丝数',ascending=False).head(10)for_,rowintop_accounts.iterrows():url_value=row['url']if'url'inrowandpd.notna(row['url'])else"N/A"followers=row['粉丝数']ifpd.notna(row['粉丝数'])else0# Clean the followers value and convert to int safely
try:# Remove any whitespace and newlines
followers_str=str(followers).strip()iffollowers_strandfollowers_str!='nan':followers_int=int(float(followers_str))print(f"- {self._extract_twitter_username(url_value)}: {followers_int} followers")except (ValueError,TypeError)ase:print(f"- {self._extract_twitter_username(url_value)}: Unable to parse follower count ({followers})")defprocess_followers(self):"""
Main method to run the follower extraction process.
"""print("Starting Twitter follower count extraction...")ifnotos.path.exists(self.csv_file_path):print(f"Error: File {self.csv_file_path} not found.")returnself._backup_file()ifnotself._load_csv():returnusernames_to_process=[]foridx,rowinself.df.iterrows():twitter_url=Nonetry:if'ext_info'inrowandpd.notna(row['ext_info']):ext_info=json.loads(row['ext_info'])if'twitterUrl'inext_infoandext_info['twitterUrl']:twitter_url=ext_info['twitterUrl']exceptExceptionase:print(f"Error parsing ext_info for row {idx}: {e}")ifnottwitter_urland'url'inself.df.columnsandpd.notna(row['url']):twitter_url=row['url']iftwitter_url:username=self._extract_twitter_username(twitter_url)ifusername:usernames_to_process.append((idx,username,twitter_url))print(f"Found {len(usernames_to_process)} Twitter usernames to process.")# 创建锁文件
ifnotself._create_lock_file():returntry:# 加载进度
processed_usernames=self._load_progress()# 过滤掉已处理的用户
remaining_usernames=[]foridx,username,urlinusernames_to_process:ifusernamenotinprocessed_usernames:# 检查是否已有有效的粉丝数
if'粉丝数'inself.df.columnsandpd.notna(self.df.at[idx,'粉丝数']):existing_value=str(self.df.at[idx,'粉丝数']).strip()ifexisting_valueandexisting_valuenotin['#VALUE!','nan','','\n']:try:int(float(existing_value))print(f"Skipping {username} - already has follower count: {existing_value}")processed_usernames.add(username)continueexcept (ValueError,TypeError):passremaining_usernames.append((idx,username,url))else:print(f"Skipping {username} - already processed in previous run")print(f"需要处理的用户数: {len(remaining_usernames)} (总计: {len(usernames_to_process)})")# 处理剩余用户
foridx,username,urlintqdm(remaining_usernames,desc="Fetching follower counts"):try:follower_count=self._get_follower_count(username)iffollower_countisnotNone:# 逐条保存数据
ifself._save_single_row(idx,follower_count):processed_usernames.add(username)self.processed_count+=1print(f"Updated {username} with {follower_count} followers")# 每处理10个用户保存一次进度
ifself.processed_count%10==0:self._save_progress(processed_usernames)else:print(f"Failed to save data for {username}")else:print(f"Could not get follower count for {username}")processed_usernames.add(username)# 标记为已处理以避免重复尝试
exceptExceptionase:print(f"处理用户 {username} 时发生错误: {e}")continue# 保存最终进度
self._save_progress(processed_usernames)finally:# 清理锁文件
ifos.path.exists(self.lock_file):os.remove(self.lock_file)self._generate_summary(len(usernames_to_process))if__name__=="__main__":importargparseparser=argparse.ArgumentParser(description='Twitter Follower Extractor')parser.add_argument('--clean',action='store_true',help='清理进度文件重新开始')parser.add_argument('--file',default='用户活动报名信息.csv',help='CSV文件路径')args=parser.parse_args()CSV_FILE_PATH=args.fileextractor=TwitterFollowerExtractor(CSV_FILE_PATH)ifargs.clean:# 清理进度文件和锁文件
ifos.path.exists(extractor.progress_file):os.remove(extractor.progress_file)print("已清理进度文件")ifos.path.exists(extractor.lock_file):os.remove(extractor.lock_file)print("已清理锁文件")extractor.process_followers()
Top comments (0)
Subscribe
For further actions, you may consider blocking this person and/or reporting abuse
We're a place where coders share, stay up-to-date and grow their careers.
Top comments (0)