DEV Community

drake
drake

Posted on

claude code写的代码(鲁棒性+代码优雅 优化)

Image description

  • 需求是补充原表的粉丝数数据

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import json
import re
import time
import requests
import os
import pandas as pd
from tqdm import tqdm
import signal
import sys
from datetime import datetime

class TwitterFollowerExtractor:
    """Twitter粉丝数提取器 - 支持断点续传和逐条保存"""

    X_RAPIDAPI_KEY = "xxx"
    ENDPOINT = "https://twitter-v1-1-v2-api.p.rapidapi.com/graphql/UserByScreenName"

    def __init__(self, csv_file_path):
        self.csv_file_path = csv_file_path
        self.df = None
        self.progress_file = csv_file_path + '.progress'
        self.lock_file = csv_file_path + '.lock'
        signal.signal(signal.SIGINT, self._signal_handler)
        signal.signal(signal.SIGTERM, self._signal_handler)

    def _signal_handler(self, signum, frame):
        """信号处理器"""
        print(f"\n收到中断信号,正在安全退出...")
        self._cleanup()
        sys.exit(0)

    def _cleanup(self):
        """清理资源"""
        if os.path.exists(self.lock_file):
            os.remove(self.lock_file)

    def _extract_twitter_username(self, url):
        """从URL提取Twitter用户名"""
        if not url:
            return None

        patterns = [
            r'(?:https?://)?(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/',
            r'(?:https?://)?(?:www\.)?(?:x\.com|twitter\.com)/([^/?]+)'
        ]

        for pattern in patterns:
            match = re.search(pattern, url)
            if match:
                username = match.group(1).split('?')[0]
                return username if username.lower() != 'status' else None
        return None

    def _get_follower_count(self, username):
        """获取粉丝数,带重试机制"""
        if not username:
            return None

        headers = {
            "X-RapidAPI-Key": self.X_RAPIDAPI_KEY,
            "X-RapidAPI-Host": "twitter-v1-1-v2-api.p.rapidapi.com"
        }

        variables = {
            "screen_name": username,
            "withSafetyModeUserFields": True,
            "withHighlightedLabel": True
        }

        for attempt in range(3):  # 3次重试
            try:
                response = requests.get(
                    self.ENDPOINT,
                    headers=headers,
                    params={"variables": json.dumps(variables)}
                )

                if response.status_code == 200:
                    data = response.json()
                    if "data" in data and "user" in data["data"] and data["data"]["user"]:
                        user_result = data["data"]["user"]["result"]
                        if "legacy" in user_result:
                            return user_result["legacy"]["followers_count"]

                if attempt < 2:  # 不是最后一次尝试
                    time.sleep(2)

            except Exception as e:
                if attempt < 2:
                    time.sleep(2)
                else:
                    print(f"获取 {username} 粉丝数失败: {e}")

        return None

    def _load_csv(self):
        """加载CSV文件"""
        try:
            for encoding in ['utf-8-sig', 'utf-8', 'gbk']:
                try:
                    self.df = pd.read_csv(self.csv_file_path, encoding=encoding)
                    # 清理列名
                    self.df.columns = [col.strip().replace('\n', '').replace('\r', '') 
                                     for col in self.df.columns]
                    print(f"加载CSV成功: {len(self.df)}")
                    return True
                except UnicodeDecodeError:
                    continue
            print("无法读取CSV文件")
            return False
        except Exception as e:
            print(f"加载CSV失败: {e}")
            return False

    def _save_csv(self):
        """原子性保存CSV"""
        temp_file = self.csv_file_path + '.tmp'
        try:
            self.df.to_csv(temp_file, index=False, encoding='utf-8-sig')
            os.replace(temp_file, self.csv_file_path)
            return True
        except Exception as e:
            print(f"保存失败: {e}")
            if os.path.exists(temp_file):
                os.remove(temp_file)
            return False

    def _load_progress(self):
        """加载进度"""
        if not os.path.exists(self.progress_file):
            return set()
        try:
            with open(self.progress_file, 'r') as f:
                data = json.load(f)
                processed = set(data.get('processed_usernames', []))
                print(f"从断点继续: 已处理 {len(processed)} 个用户")
                return processed
        except:
            return set()

    def _save_progress(self, processed_usernames, count):
        """保存进度"""
        try:
            with open(self.progress_file, 'w') as f:
                json.dump({
                    'processed_usernames': list(processed_usernames),
                    'processed_count': count,
                    'last_update': datetime.now().isoformat()
                }, f)
        except Exception as e:
            print(f"保存进度失败: {e}")

    def _create_lock(self):
        """创建锁文件"""
        if os.path.exists(self.lock_file):
            print("检测到锁文件,可能有其他实例运行。如确认没有,请删除锁文件后重试。")
            return False
        try:
            with open(self.lock_file, 'w') as f:
                f.write(f"{os.getpid()}\n{datetime.now()}")
            return True
        except:
            return False

    def _extract_usernames(self):
        """提取需要处理的用户名"""
        usernames = []
        for idx, row in self.df.iterrows():
            twitter_url = None

            # 从ext_info提取
            if 'ext_info' in row and pd.notna(row['ext_info']):
                try:
                    ext_info = json.loads(row['ext_info'])
                    twitter_url = ext_info.get('twitterUrl')
                except:
                    pass

            # 从url列提取
            if not twitter_url and 'url' in self.df.columns and pd.notna(row['url']):
                twitter_url = row['url']

            if twitter_url:
                username = self._extract_twitter_username(twitter_url)
                if username:
                    usernames.append((idx, username, twitter_url))

        return usernames

    def _is_valid_follower_count(self, value):
        """检查是否为有效的粉丝数"""
        if pd.isna(value):
            return False
        value_str = str(value).strip()
        if value_str in ['#VALUE!', 'nan', '', '\n']:
            return False
        try:
            int(float(value_str))
            return True
        except:
            return False

    def _generate_summary(self, total_processed):
        """生成统计摘要"""
        if '粉丝数' not in self.df.columns:
            return

        valid_counts = self.df['粉丝数'].apply(self._is_valid_follower_count).sum()

        print(f"\n{'='*50}")
        print(f"处理完成统计:")
        print(f"总处理用户数: {total_processed}")
        print(f"成功获取粉丝数: {valid_counts}")
        print(f"失败数量: {total_processed - valid_counts}")

        # 显示Top 10
        if valid_counts > 0:
            valid_df = self.df[self.df['粉丝数'].apply(self._is_valid_follower_count)]
            if not valid_df.empty:
                top_accounts = valid_df.nlargest(10, '粉丝数')
                print(f"\nTop 10 账户:")
                for _, row in top_accounts.iterrows():
                    url_value = row.get('url', "N/A")
                    followers = int(float(str(row['粉丝数']).strip()))
                    username = self._extract_twitter_username(url_value)
                    print(f"- {username}: {followers:,} 粉丝")
        print(f"{'='*50}")

    def process_followers(self):
        """主处理流程"""
        print("开始Twitter粉丝数提取...")

        # 检查文件
        if not os.path.exists(self.csv_file_path):
            print(f"文件不存在: {self.csv_file_path}")
            return

        # 创建锁文件
        if not self._create_lock():
            return

        try:
            # 备份文件
            backup_file = self.csv_file_path + '.backup'
            if not os.path.exists(backup_file):
                with open(self.csv_file_path, 'rb') as src, open(backup_file, 'wb') as dst:
                    dst.write(src.read())
                print(f"已创建备份: {backup_file}")

            # 加载数据
            if not self._load_csv():
                return

            # 提取用户名
            all_usernames = self._extract_usernames()
            print(f"找到 {len(all_usernames)} 个Twitter用户")

            # 加载进度
            processed_usernames = self._load_progress()

            # 过滤待处理用户
            remaining_usernames = []
            for idx, username, url in all_usernames:
                if username in processed_usernames:
                    continue

                # 检查是否已有有效粉丝数
                if '粉丝数' in self.df.columns and self._is_valid_follower_count(self.df.at[idx, '粉丝数']):
                    processed_usernames.add(username)
                    continue

                remaining_usernames.append((idx, username, url))

            print(f"需要处理: {len(remaining_usernames)} 个用户")

            # 处理用户
            processed_count = len(processed_usernames)
            for idx, username, url in tqdm(remaining_usernames, desc="获取粉丝数"):
                try:
                    follower_count = self._get_follower_count(username)

                    if follower_count is not None:
                        # 更新数据
                        if '粉丝数' not in self.df.columns:
                            self.df['粉丝数'] = None
                        self.df.at[idx, '粉丝数'] = follower_count

                        # 保存数据
                        if self._save_csv():
                            processed_usernames.add(username)
                            processed_count += 1
                            print(f"{username}: {follower_count:,} 粉丝")

                            # 每10个保存一次进度
                            if processed_count % 10 == 0:
                                self._save_progress(processed_usernames, processed_count)
                    else:
                        processed_usernames.add(username)  # 标记为已处理避免重复

                except Exception as e:
                    print(f"处理 {username} 时出错: {e}")
                    continue

            # 保存最终进度
            self._save_progress(processed_usernames, processed_count)

            # 生成摘要
            self._generate_summary(len(all_usernames))

        finally:
            self._cleanup()

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description='Twitter粉丝数提取器')
    parser.add_argument('--clean', action='store_true', help='清理进度文件重新开始')
    parser.add_argument('--file', default='用户活动报名信息.csv', help='CSV文件路径')
    args = parser.parse_args()

    extractor = TwitterFollowerExtractor(args.file)

    if args.clean:
        for file in [extractor.progress_file, extractor.lock_file]:
            if os.path.exists(file):
                os.remove(file)
                print(f"已清理: {file}")

    extractor.process_followers()

Enter fullscreen mode Exit fullscreen mode

Top comments (0)