DEV Community

drake
drake

Posted on

百度指数爬虫

获取各个省份,各个关键词的数据,整合成一张csv表


"""
百度指数爬虫 2025年3月
"""
import json
from traceback import format_exc
import time

import requests
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
import random
from requests.exceptions import RequestException

# 从浏览器中复制
cookies_dict = {
    "BAIDUID_BFESS": ":FG=1",
    "Hm_lvt_d101ea4d2a5c67dab98251f0b5de24dc": "1740900295",
    "HMACCOUNT": "70309D756AB7564A",
    "ppfuid": "/69CStRUAcn/QmhIlFDxPrAc/s5tJmCocrihdwitHd04Lvs3Nfz26Zt2holplnIKVacidp8Sue4dMTyfg65BJnOFhn1HthtSiwtygiD7p=",
    "BDUSS": "",
    "SIGNIN_UC": "70a2711cf1d3d9b1a82d2f87d633bd8a04909129477oUCDVgDuFIWQq0I5Qh%%%2FMKxlLMEZywpREcfeqkBuNDImGT1swOphjUr0m7yoFRuoRONhZO0DhIUp8qMp%2BI%2BGZ9URB2%2FDv3g%2FwZ0nXnjrScjtkdIga7hBOF4Os4RsqXflHU7INYd10uoQ2Ecn99qPwcD5%2BuKJ7%2BtRR94%3D59476045678983651647832308115528",
    "__cas__rn__": "490912947",
    "__cas__st__212": "",
    "__cas__id__212": "40927145",
    "CPTK_212": "1776632285",
    "CPID_212": "40927145",
    "Hm_lpvt_d101ea4d2a5c67dab98251f0b5de24dc": "1740900504",
    "BDUSS_BFESS": "",
    "bdindexid": "jcohnli710phtu4po08tnl0o33",
    "ab_sr": "==",
    "RT": "zzl"
}

credential = {
    "cipherText": "1740888411355_1740901282164_aLrE9Za0dpKtlO3CQw1IR/Yz3hP8cGXzHI/2BnYqUk5XRMPS4pr5kfk3slC7+G60AS9KjhhlCPNuQnqbFhpZS9Z7MUVTxUeQ8XlgGhrmV+FapK3+nQuTdrm1pz8Jy5qhWO0pOhQyUqv/AR5RFI0hKsasKjMYDQfng+XPMhygTo0rCb3PLrFDflBQ1riNlJ7Bg8s6TfsE3OMaJPAQsjhaZlZO1bXUAhFIY0EMqIxq2DAkMVEatrHKmDbkb0f2NJw988jZkhDEZTAJ06iAXqSLbKnbF0bPCUIqaT/a5yeqr2KtCwbJYH4flHQSoThN40a6t/XiyTqUc1Mdds6w27Q/qOyR+nPe8978fEsEB3UssJ9LPc62xsjzLmY1x5qH7eA/j7eJAgbbWVvYW8H/4N3iaauKg0D1F8NqUHMGoGVpAQSj0/HLx5pUebCoFBVBnbA2kMYD8kvavD1WzPEMte2sp2uhlSGB4IIDMkqz13eaIsc=",
    "cookie_BDUSS": cookies_dict['BDUSS']
}

def generate_http_headers(credential):
    http_headers = {
        'Cookie': 'BDUSS=' + credential["cookie_BDUSS"],
        'Cipher-Text': credential["cipherText"],
        'Accept': 'application/json, text/plain, */*',
        'Accept-Encoding': 'gzip, deflate, br',
        'Accept-Language': 'zh-CN,zh;q=0.9',
        'Connection': 'keep-alive',
        'Referer': 'https://index.baidu.com/v2/main/index.html',
        'Host': 'index.baidu.com',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
    }
    return http_headers


def calculate_yearly_averages(start_date, end_date, data_series):
    # Convert the start and end dates to datetime objects
    start = datetime.strptime(start_date, '%Y-%m-%d')
    end = datetime.strptime(end_date, '%Y-%m-%d')
    days_span = (end - start).days + 1

    # Split the data series into a list and replace empty strings with '0'
    data_points = data_series.split(',')
    data_points = ['0' if point == '' else point for point in data_points]
    data_points = np.array(data_points, dtype=float)

    if days_span <= 366:
        dates = pd.date_range(start, periods=len(data_points))
    else:
        weeks_span = len(data_points)
        dates = pd.date_range(start, periods=weeks_span, freq='W')

    # Create a DataFrame with the dates and data points
    df = pd.DataFrame({'Date': dates, 'Data': data_points})
    df.set_index('Date', inplace=True)

    # Calculate the yearly average
    yearly_averages = df.resample('YE').mean().reset_index()
    yearly_averages['Year'] = yearly_averages['Date'].dt.year
    yearly_averages.drop('Date', axis=1, inplace=True)
    yearly_averages.rename(columns={'Data': 'Average'}, inplace=True)
    # Convert DataFrame to list of tuples (year, average)
    yearly_averages_list = list(yearly_averages.itertuples(index=False, name=None))
    print(yearly_averages_list)

    return yearly_averages_list


# 解密
def decrypt(ptbk, index_data):
    n = len(ptbk) // 2
    a = dict(zip(ptbk[:n], ptbk[n:]))
    return "".join([a[s] for s in index_data])


def keywords2json(keyword):
    import json
    converted_keywords = [[{"name": keyword, "wordType": 1}]]
    # Convert the list of lists of dictionaries into a JSON string
    json_string = json.dumps(converted_keywords, ensure_ascii=False)
    print(json_string)
    return json_string

#
# def namely(keywords):
#     return '+'.join(keywords)


def crawl_request(keyword, startDate, endDate, regionCode, credential, expectedInterval, autoSave, regionName, data_combine):
    print('正在查询:', keyword, startDate, endDate, regionCode)
    words = keywords2json(keyword)

    # 第一级以逗号分隔,第二级以加号分隔
    testwordset = keyword
    max_retries = 3  # 最大重试次数
    retries = 0  # 当前重试次数

    while retries < max_retries:
        try:
            url = f'https://index.baidu.com/api/AddWordApi/checkWordsExists?word={testwordset}'
            rsp = requests.get(url, headers=generate_http_headers(credential), timeout=10).json()
            # 若data的result不为空,则说明关键词不存在,报错并退出
            if rsp['data']['result']:
                print(f'{testwordset}关键词不存在或组合里有不存在的关键词,请检查')
                return -1

            url = f'http://index.baidu.com/api/SearchApi/index?area=0&word={words}&area={regionCode}&startDate={startDate}&endDate={endDate}'
            rsp = requests.get(url, headers=generate_http_headers(credential), timeout=10).json()

            # 获取解密秘钥
            data = rsp['data']['userIndexes']
            uniqid = rsp['data']['uniqid']
            url = f'https://index.baidu.com/Interface/ptbk?uniqid={uniqid}'
            ptbk = requests.get(url, headers=generate_http_headers(credential), timeout=10).json()['data']

            # 数据解密
            res = [0 for _ in range(len(data))]
            for i in range(len(data)):
                index_data = decrypt(ptbk, data[i]['all']['data'])
                yearly_averages = calculate_yearly_averages(startDate, endDate, index_data)
                for tuple_item in yearly_averages:
                    index_d = round(tuple_item[0],2)
                    year = tuple_item[1]
                    if year > 2022:
                        continue
                    if year in data_combine:
                        data_combine[year].append(index_d)
                    else:
                        data_combine[year] = [year, regionName, index_d]
            return res
        except Exception as e:
            print(f'请求失败,错误信息:{e}')
            retries += 1
            print(f'重试第{retries}次...')
            time.sleep(random.randint(1, 3))  # 在重试前等待一段时间
    if retries == max_retries:
        print(f'请求失败次数过多,已达到最大重试次数{max_retries},跳过当前连接')
        return -1


# regions = {}
provinces = {
            901: "山东",
            902: "贵州",
            903: "江西",
            904: "重庆",
            905: "内蒙古",
            906: "湖北",
            907: "辽宁",
            908: "湖南",
            909: "福建",
            910: "上海",
            911: "北京",
            912: "广西",
            913: "广东",
            914: "四川",
            915: "云南",
            916: "江苏",
            917: "浙江",
            918: "青海",
            919: "宁夏",
            920: "河北",
            921: "黑龙江",
            922: "吉林",
            923: "天津",
            924: "陕西",
            925: "甘肃",
            926: "新疆",
            927: "河南",
            # 928: "安徽",
            929: "山西",
            930: "海南",
            931: "台湾",
            # 932: "西藏",
            933: "香港",
            934: "澳门"
        }

regions = provinces

def crawl(regionCode, credential, expectedInterval, autoSave, regionName, data_combine):
    # 获取11年到22年的数据
    startDate = '2011-01-01'
    endDate = '2022-12-31'
    # 清洗关键词
    keywords = ['第三方支付', '在线支付', '移动支付', '网贷', '互联网理财', '互联网保险', '在线理财', '电子银行', '网银', '大数据', '云计算', '人工智能', '区块链', '生物识别']

    # res = {regionCode: []}
    for keyword in keywords:
        if regionCode != '999':
            try:
                crawl_request(keyword, startDate, endDate, regionCode, credential, expectedInterval, autoSave, regionName, data_combine)
            except:
                print(format_exc())
            # res[regionCode].extend(t)
            # 每次查询后休息一到五秒,实际上在账号很多的情况下,这个时间可以缩短
            time.sleep(expectedInterval / 1000 + random.randint(1, 3) / 2)
if __name__ == '__main__':
    import csv
    # # 清洗关键词
    # titles = ['年份', '区域', '第三方支付', '在线支付', '移动支付', '网贷', '互联网理财', '互联网保险', '在线理财', '电子银行', '网银',
    #             '大数据', '云计算', '人工智能',
    #             '区块链', '生物识别']
    # with open('combine.csv', 'a', encoding='utf-8-sig', newline='') as csvfile:
    #     writer = csv.writer(csvfile)
    #     writer.writerow(titles)

    for regionCode in regions:
        # regionCode = 928
        # regionName = '安徽'
        regionName = regions[regionCode]
        data_combine = {}


        crawl(regionCode, credential, 10, True, regionName, data_combine)
        data_list = []
        for i in data_combine:
                data_list.append(data_combine[i])
        with open('combine.csv', 'a', encoding='utf-8-sig', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerows(data_list)

Enter fullscreen mode Exit fullscreen mode

Hostinger image

Get n8n VPS hosting 3x cheaper than a cloud solution

Get fast, easy, secure n8n VPS hosting from $4.99/mo at Hostinger. Automate any workflow using a pre-installed n8n application and no-code customization.

Start now

Top comments (0)

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay