#!/usr/bin/env python
# -*- coding: utf-8 -*-
importcsvimportjsonimportreimporttimeimportrequestsimportosimportpandasaspdfromtqdmimporttqdmclassTwitterFollowerExtractor:"""
A class to extract Twitter follower counts from a CSV file.
"""# API configuration
X_RAPIDAPI_KEY="xxx"RAPIDAPI_HOST="twitter-v1-1-v2-api.p.rapidapi.com"ENDPOINT="https://twitter-v1-1-v2-api.p.rapidapi.com/graphql/UserByScreenName"def__init__(self,csv_file_path):"""
Initialize the extractor with the path to the CSV file.
:param csv_file_path: The path to the CSV file.
"""self.csv_file_path=csv_file_pathself.df=Nonedef_extract_twitter_username(self,url):"""Extract Twitter username from URL."""ifnoturl:returnNone# Handle both profile URLs and status URLs
status_pattern=r'(?:https?://)?(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/'profile_pattern=r'(?:https?://)?(?:www\.)?(?:x\.com|twitter\.com)/([^/?]+)'# First try to extract from status URL
match=re.search(status_pattern,url)ifmatch:username=match.group(1)# Skip if username is "status" (malformed URL)
ifusername.lower()=='status':returnNonereturnusername# Then try to extract from profile URL
match=re.search(profile_pattern,url)ifmatch:username=match.group(1)# Remove any query parameters if present
username=username.split('?')[0]# Skip if username is "status" (malformed URL)
ifusername.lower()=='status':returnNonereturnusernamereturnNonedef_get_follower_count(self,username):"""Get follower count for a Twitter username using RapidAPI with retry logic."""ifnotusername:returnNoneheaders={"X-RapidAPI-Key":self.X_RAPIDAPI_KEY,"X-RapidAPI-Host":self.RAPIDAPI_HOST}# Prepare variables according to the correct API format
variables={"screen_name":username,"withSafetyModeUserFields":True,"withHighlightedLabel":True}querystring={"variables":json.dumps(variables)}# Implement retry logic
max_retries=3retry_delay=2# seconds
forattemptinrange(max_retries):try:response=requests.get(self.ENDPOINT,headers=headers,params=querystring)ifresponse.status_code==200:data=response.json()# Extract follower count from the response using the correct path
if"data"indataand"user"indata["data"]anddata["data"]["user"]:user_result=data["data"]["user"]["result"]if"legacy"inuser_result:returnuser_result["legacy"]["followers_count"]else:print(f"No user data found for {username}")else:print(f"API request failed for {username} (Attempt {attempt+1}/{max_retries}): Status code {response.status_code}")# If we're not on the last attempt, wait before retrying
ifattempt<max_retries-1:print(f"Retrying in {retry_delay} seconds...")time.sleep(retry_delay)exceptExceptionase:print(f"Error fetching data for {username} (Attempt {attempt+1}/{max_retries}): {e}")# If we're not on the last attempt, wait before retrying
ifattempt<max_retries-1:print(f"Retrying in {retry_delay} seconds...")time.sleep(retry_delay)returnNonedef_backup_file(self):"""Create a backup of the original CSV file."""backup_file=self.csv_file_path+'.backup'try:withopen(self.csv_file_path,'rb')assrc,open(backup_file,'wb')asdst:dst.write(src.read())print(f"Created backup of original file at {backup_file}")exceptExceptionase:print(f"Warning: Could not create backup file: {e}")def_load_csv(self):"""Load the CSV file into a pandas DataFrame with enhanced compatibility."""try:# Try different encoding methods for better compatibility
encodings=['utf-8-sig','utf-8','gbk','gb2312','latin-1']df_loaded=Falseforencodinginencodings:try:self.df=pd.read_csv(self.csv_file_path,encoding=encoding)df_loaded=Trueprint(f"Successfully loaded CSV with {len(self.df)} rows using {encoding} encoding.")breakexceptUnicodeDecodeError:continueexceptExceptionase:print(f"Error with {encoding} encoding: {e}")continueifnotdf_loaded:print("Failed to load CSV with any encoding method.")returnFalse# Clean up the DataFrame columns and data
self._clean_dataframe()returnTrueexceptExceptionase:print(f"Error reading CSV file: {e}")returnFalsedef_clean_dataframe(self):"""Clean the DataFrame to handle malformed data."""# Clean column names by removing newlines and extra whitespace
self.df.columns=[col.strip().replace('\n','').replace('\r','')forcolinself.df.columns]# Clean the '粉丝数' column if it exists
if'粉丝数'inself.df.columns:# Remove newlines and extra whitespace from the follower count column
self.df['粉丝数']=self.df['粉丝数'].astype(str).str.strip().str.replace('\n','').str.replace('\r','')# Replace empty strings with None
self.df['粉丝数']=self.df['粉丝数'].replace('',None)# Clean other string columns
forcolinself.df.columns:ifself.df[col].dtype=='object':self.df[col]=self.df[col].astype(str).str.strip().str.replace('\n','').str.replace('\r','')# Replace 'nan' strings with None
self.df[col]=self.df[col].replace('nan',None)def_save_csv(self):"""Save the updated DataFrame back to the CSV file."""try:self.df.to_csv(self.csv_file_path,index=False,encoding='utf-8-sig')print(f"Process completed. Follower counts have been updated in {self.csv_file_path}.")exceptExceptionase:print(f"Error saving updated CSV: {e}")print("Please check the backup file if needed.")def_generate_summary(self,processed_count):"""Generate and print a summary of the results."""if'粉丝数'inself.df.columns:total_updated=self.df['粉丝数'].notna().sum()print(f"\nSummary:")print(f"Total Twitter accounts processed: {processed_count}")print(f"Successfully updated follower counts: {total_updated}")print(f"Failed to update follower counts: {processed_count-total_updated}")# Print top 10 accounts by follower count
iftotal_updated>0:print("\nTop 10 accounts by follower count:")top_accounts=self.df[self.df['粉丝数'].notna()].sort_values('粉丝数',ascending=False).head(10)for_,rowintop_accounts.iterrows():url_value=row['url']if'url'inrowandpd.notna(row['url'])else"N/A"followers=row['粉丝数']ifpd.notna(row['粉丝数'])else0# Clean the followers value and convert to int safely
try:# Remove any whitespace and newlines
followers_str=str(followers).strip()iffollowers_strandfollowers_str!='nan':followers_int=int(float(followers_str))print(f"- {self._extract_twitter_username(url_value)}: {followers_int} followers")except (ValueError,TypeError)ase:print(f"- {self._extract_twitter_username(url_value)}: Unable to parse follower count ({followers})")defprocess_followers(self):"""
Main method to run the follower extraction process.
"""print("Starting Twitter follower count extraction...")ifnotos.path.exists(self.csv_file_path):print(f"Error: File {self.csv_file_path} not found.")returnself._backup_file()ifnotself._load_csv():returnusernames_to_process=[]foridx,rowinself.df.iterrows():twitter_url=Nonetry:if'ext_info'inrowandpd.notna(row['ext_info']):ext_info=json.loads(row['ext_info'])if'twitterUrl'inext_infoandext_info['twitterUrl']:twitter_url=ext_info['twitterUrl']exceptExceptionase:print(f"Error parsing ext_info for row {idx}: {e}")ifnottwitter_urland'url'inself.df.columnsandpd.notna(row['url']):twitter_url=row['url']iftwitter_url:username=self._extract_twitter_username(twitter_url)ifusername:usernames_to_process.append((idx,username,twitter_url))print(f"Found {len(usernames_to_process)} Twitter usernames to process.")foridx,username,urlintqdm(usernames_to_process,desc="Fetching follower counts"):# Check if we already have a valid follower count
if'粉丝数'inself.df.columnsandpd.notna(self.df.at[idx,'粉丝数']):existing_value=str(self.df.at[idx,'粉丝数']).strip()ifexisting_valueandexisting_valuenotin['#VALUE!','nan','','\n']:try:# Try to convert to int to verify it's a valid number
int(float(existing_value))print(f"Skipping {username} - already has follower count: {existing_value}")continueexcept (ValueError,TypeError):# If conversion fails, we'll fetch new data
passfollower_count=self._get_follower_count(username)iffollower_countisnotNone:if'粉丝数'notinself.df.columns:self.df['粉丝数']=Noneself.df.at[idx,'粉丝数']=follower_countprint(f"Updated {username} with {follower_count} followers")else:print(f"Could not get follower count for {username}")self._save_csv()self._generate_summary(len(usernames_to_process))if__name__=="__main__":# File path
CSV_FILE_PATH="用户活动报名信息.csv"extractor=TwitterFollowerExtractor(CSV_FILE_PATH)extractor.process_followers()
Top comments (1)
总体来说没啥毛病,就是代码有点冗长