DEV Community

drake
drake

Posted on

claude code写的代码

Image description

  • 需求是补充原表的粉丝数数据
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import csv
import json
import re
import time
import requests
import os
import pandas as pd
from tqdm import tqdm

class TwitterFollowerExtractor:
    """
    A class to extract Twitter follower counts from a CSV file.
    """

    # API configuration
    X_RAPIDAPI_KEY = "xxx"
    RAPIDAPI_HOST = "twitter-v1-1-v2-api.p.rapidapi.com"
    ENDPOINT = "https://twitter-v1-1-v2-api.p.rapidapi.com/graphql/UserByScreenName"

    def __init__(self, csv_file_path):
        """
        Initialize the extractor with the path to the CSV file.

        :param csv_file_path: The path to the CSV file.
        """
        self.csv_file_path = csv_file_path
        self.df = None

    def _extract_twitter_username(self, url):
        """Extract Twitter username from URL."""
        if not url:
            return None

        # Handle both profile URLs and status URLs
        status_pattern = r'(?:https?://)?(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/'
        profile_pattern = r'(?:https?://)?(?:www\.)?(?:x\.com|twitter\.com)/([^/?]+)'

        # First try to extract from status URL
        match = re.search(status_pattern, url)
        if match:
            username = match.group(1)
            # Skip if username is "status" (malformed URL)
            if username.lower() == 'status':
                return None
            return username

        # Then try to extract from profile URL
        match = re.search(profile_pattern, url)
        if match:
            username = match.group(1)
            # Remove any query parameters if present
            username = username.split('?')[0]
            # Skip if username is "status" (malformed URL)
            if username.lower() == 'status':
                return None
            return username

        return None

    def _get_follower_count(self, username):
        """Get follower count for a Twitter username using RapidAPI with retry logic."""
        if not username:
            return None

        headers = {
            "X-RapidAPI-Key": self.X_RAPIDAPI_KEY,
            "X-RapidAPI-Host": self.RAPIDAPI_HOST
        }

        # Prepare variables according to the correct API format
        variables = {
            "screen_name": username,
            "withSafetyModeUserFields": True,
            "withHighlightedLabel": True
        }

        querystring = {"variables": json.dumps(variables)}

        # Implement retry logic
        max_retries = 3
        retry_delay = 2  # seconds

        for attempt in range(max_retries):
            try:
                response = requests.get(self.ENDPOINT, headers=headers, params=querystring)
                if response.status_code == 200:
                    data = response.json()
                    # Extract follower count from the response using the correct path
                    if "data" in data and "user" in data["data"] and data["data"]["user"]:
                        user_result = data["data"]["user"]["result"]
                        if "legacy" in user_result:
                            return user_result["legacy"]["followers_count"]
                    else:
                        print(f"No user data found for {username}")
                else:
                    print(f"API request failed for {username} (Attempt {attempt+1}/{max_retries}): Status code {response.status_code}")

                # If we're not on the last attempt, wait before retrying
                if attempt < max_retries - 1:
                    print(f"Retrying in {retry_delay} seconds...")
                    time.sleep(retry_delay)
            except Exception as e:
                print(f"Error fetching data for {username} (Attempt {attempt+1}/{max_retries}): {e}")
                # If we're not on the last attempt, wait before retrying
                if attempt < max_retries - 1:
                    print(f"Retrying in {retry_delay} seconds...")
                    time.sleep(retry_delay)

        return None

    def _backup_file(self):
        """Create a backup of the original CSV file."""
        backup_file = self.csv_file_path + '.backup'
        try:
            with open(self.csv_file_path, 'rb') as src, open(backup_file, 'wb') as dst:
                dst.write(src.read())
            print(f"Created backup of original file at {backup_file}")
        except Exception as e:
            print(f"Warning: Could not create backup file: {e}")

    def _load_csv(self):
        """Load the CSV file into a pandas DataFrame with enhanced compatibility."""
        try:
            # Try different encoding methods for better compatibility
            encodings = ['utf-8-sig', 'utf-8', 'gbk', 'gb2312', 'latin-1']
            df_loaded = False

            for encoding in encodings:
                try:
                    self.df = pd.read_csv(self.csv_file_path, encoding=encoding)
                    df_loaded = True
                    print(f"Successfully loaded CSV with {len(self.df)} rows using {encoding} encoding.")
                    break
                except UnicodeDecodeError:
                    continue
                except Exception as e:
                    print(f"Error with {encoding} encoding: {e}")
                    continue

            if not df_loaded:
                print("Failed to load CSV with any encoding method.")
                return False

            # Clean up the DataFrame columns and data
            self._clean_dataframe()
            return True
        except Exception as e:
            print(f"Error reading CSV file: {e}")
            return False

    def _clean_dataframe(self):
        """Clean the DataFrame to handle malformed data."""
        # Clean column names by removing newlines and extra whitespace
        self.df.columns = [col.strip().replace('\n', '').replace('\r', '') for col in self.df.columns]

        # Clean the '粉丝数' column if it exists
        if '粉丝数' in self.df.columns:
            # Remove newlines and extra whitespace from the follower count column
            self.df['粉丝数'] = self.df['粉丝数'].astype(str).str.strip().str.replace('\n', '').str.replace('\r', '')
            # Replace empty strings with None
            self.df['粉丝数'] = self.df['粉丝数'].replace('', None)

        # Clean other string columns
        for col in self.df.columns:
            if self.df[col].dtype == 'object':
                self.df[col] = self.df[col].astype(str).str.strip().str.replace('\n', '').str.replace('\r', '')
                # Replace 'nan' strings with None
                self.df[col] = self.df[col].replace('nan', None)

    def _save_csv(self):
        """Save the updated DataFrame back to the CSV file."""
        try:
            self.df.to_csv(self.csv_file_path, index=False, encoding='utf-8-sig')
            print(f"Process completed. Follower counts have been updated in {self.csv_file_path}.")
        except Exception as e:
            print(f"Error saving updated CSV: {e}")
            print("Please check the backup file if needed.")

    def _generate_summary(self, processed_count):
        """Generate and print a summary of the results."""
        if '粉丝数' in self.df.columns:
            total_updated = self.df['粉丝数'].notna().sum()

            print(f"\nSummary:")
            print(f"Total Twitter accounts processed: {processed_count}")
            print(f"Successfully updated follower counts: {total_updated}")
            print(f"Failed to update follower counts: {processed_count - total_updated}")

            # Print top 10 accounts by follower count
            if total_updated > 0:
                print("\nTop 10 accounts by follower count:")
                top_accounts = self.df[self.df['粉丝数'].notna()].sort_values('粉丝数', ascending=False).head(10)
                for _, row in top_accounts.iterrows():
                    url_value = row['url'] if 'url' in row and pd.notna(row['url']) else "N/A"
                    followers = row['粉丝数'] if pd.notna(row['粉丝数']) else 0

                    # Clean the followers value and convert to int safely
                    try:
                        # Remove any whitespace and newlines
                        followers_str = str(followers).strip()
                        if followers_str and followers_str != 'nan':
                            followers_int = int(float(followers_str))
                            print(f"- {self._extract_twitter_username(url_value)}: {followers_int} followers")
                    except (ValueError, TypeError) as e:
                        print(f"- {self._extract_twitter_username(url_value)}: Unable to parse follower count ({followers})")

    def process_followers(self):
        """
        Main method to run the follower extraction process.
        """
        print("Starting Twitter follower count extraction...")

        if not os.path.exists(self.csv_file_path):
            print(f"Error: File {self.csv_file_path} not found.")
            return

        self._backup_file()

        if not self._load_csv():
            return

        usernames_to_process = []

        for idx, row in self.df.iterrows():
            twitter_url = None

            try:
                if 'ext_info' in row and pd.notna(row['ext_info']):
                    ext_info = json.loads(row['ext_info'])
                    if 'twitterUrl' in ext_info and ext_info['twitterUrl']:
                        twitter_url = ext_info['twitterUrl']
            except Exception as e:
                print(f"Error parsing ext_info for row {idx}: {e}")

            if not twitter_url and 'url' in self.df.columns and pd.notna(row['url']):
                twitter_url = row['url']

            if twitter_url:
                username = self._extract_twitter_username(twitter_url)
                if username:
                    usernames_to_process.append((idx, username, twitter_url))

        print(f"Found {len(usernames_to_process)} Twitter usernames to process.")

        for idx, username, url in tqdm(usernames_to_process, desc="Fetching follower counts"):
            # Check if we already have a valid follower count
            if '粉丝数' in self.df.columns and pd.notna(self.df.at[idx, '粉丝数']):
                existing_value = str(self.df.at[idx, '粉丝数']).strip()
                if existing_value and existing_value not in ['#VALUE!', 'nan', '', '\n']:
                    try:
                        # Try to convert to int to verify it's a valid number
                        int(float(existing_value))
                        print(f"Skipping {username} - already has follower count: {existing_value}")
                        continue
                    except (ValueError, TypeError):
                        # If conversion fails, we'll fetch new data
                        pass

            follower_count = self._get_follower_count(username)

            if follower_count is not None:
                if '粉丝数' not in self.df.columns:
                    self.df['粉丝数'] = None

                self.df.at[idx, '粉丝数'] = follower_count
                print(f"Updated {username} with {follower_count} followers")
            else:
                print(f"Could not get follower count for {username}")

        self._save_csv()
        self._generate_summary(len(usernames_to_process))

if __name__ == "__main__":
    # File path
    CSV_FILE_PATH = "用户活动报名信息.csv"

    extractor = TwitterFollowerExtractor(CSV_FILE_PATH)
    extractor.process_followers()
Enter fullscreen mode Exit fullscreen mode

Top comments (1)

Collapse
 
dragon72463399 profile image
drake

总体来说没啥毛病,就是代码有点冗长