DEV Community

Cover image for 沃尔玛爬虫完整构建指南:用Python打造高效商品数据采集系统
Charon XA
Charon XA

Posted on

沃尔玛爬虫完整构建指南:用Python打造高效商品数据采集系统

在电商的红海竞争中,数据是决胜的关键。特别是对于像沃尔玛这样拥有海量商品和频繁价格变动的平台,如何高效、实时地获取商品数据,成为了众多卖家、分析师和开发者面临的挑战。今天,我将带大家深入探讨如何使用Python,从基础搭建到高级优化,构建一个功能完善的沃尔玛商品数据采集系统,帮助你更好地洞察市场趋势,制定精准的商业策略。

沃尔玛爬虫完整构建指南:用Python打造高效商品数据采集系统
沃尔玛爬虫(Walmart Scraper)作为电商数据采集的重要工具,能够帮助卖家、分析师和开发者自动获取沃尔玛平台的商品信息、价格数据和市场趋势。在竞争激烈的电商环境中,掌握实时的商品数据对于制定营销策略、价格优化和竞品分析至关重要。本文将详细介绍如何使用Python构建一个功能完善的沃尔玛爬虫系统,涵盖从基础设置到高级优化的全过程。
为什么需要构建沃尔玛爬虫
在深入技术实现之前,我们先了解构建沃尔玛爬虫的核心价值。沃尔玛作为全球最大的零售商之一,其平台上包含数百万种商品,价格变化频繁,促销活动不断。对于电商从业者而言,及时获取这些数据能够:
竞品价格监控:实时跟踪竞争对手的价格策略
市场趋势分析:了解热销商品和消费者偏好
库存管理优化:基于供需数据调整采购计划
营销策略制定:根据促销信息制定相应策略
然而,手动收集这些数据不仅效率低下,而且容易出错。这就是Python沃尔玛数据抓取(Python Walmart Data Scraping)技术发挥作用的地方。
技术准备与环境搭建

  1. 开发环境配置 首先确保您的系统已安装Python 3.7或更高版本。我们将使用以下核心库来构建我们的沃尔玛商品信息采集器(Walmart Product Information Crawler): # requirements.txt requests==2.31.0 beautifulsoup4==4.12.2 selenium==4.15.0 pandas==2.1.3 fake-useragent==1.4.0 python-dotenv==1.0.0 安装依赖: pip install -r requirements.txt
  2. 基础项目结构 walmart_scraper/ ├── config/ │ ├── init.py │ └── settings.py ├── scrapers/ │ ├── init.py │ ├── base_scraper.py │ └── walmart_scraper.py ├── utils/ │ ├── init.py │ ├── proxy_handler.py │ └── data_processor.py ├── data/ │ └── output/ ├── main.py └── requirements.txt 核心爬虫组件开发
  3. 基础爬虫类设计 让我们从创建一个基础的爬虫类开始: # scrapers/base_scraper.py import requests import time import random from fake_useragent import UserAgent from bs4 import BeautifulSoup import logging

class BaseScraper:
def init(self):
self.session = requests.Session()
self.ua = UserAgent()
self.setup_logging()

def setup_logging(self):
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler('scraper.log'),
            logging.StreamHandler()
        ]
    )
    self.logger = logging.getLogger(__name__)

def get_headers(self):
    """生成随机请求头"""
    return {
        'User-Agent': self.ua.random,
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.5',
        'Accept-Encoding': 'gzip, deflate',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
    }

def random_delay(self, min_delay=1, max_delay=3):
    """随机延迟防止被识别"""
    delay = random.uniform(min_delay, max_delay)
    time.sleep(delay)

def make_request(self, url, max_retries=3):
    """发送HTTP请求with重试机制"""
    for attempt in range(max_retries):
        try:
            headers = self.get_headers()
            response = self.session.get(url, headers=headers, timeout=10)
            response.raise_for_status()
            return response
        except requests.RequestException as e:
            self.logger.warning(f"请求失败 (尝试 {attempt + 1}/{max_retries}): {e}")
            if attempt < max_retries - 1:
                self.random_delay(2, 5)
            else:
                self.logger.error(f"所有请求尝试失败: {url}")
                raise
Enter fullscreen mode Exit fullscreen mode
  1. 沃尔玛专用爬虫实现 接下来实现专门针对沃尔玛的爬虫类: # scrapers/walmart_scraper.py from .base_scraper import BaseScraper from bs4 import BeautifulSoup import json import re from urllib.parse import urljoin, urlparse, parse_qs

class WalmartScraper(BaseScraper):
def init(self):
super().init()
self.base_url = "https://www.walmart.com"

def search_products(self, keyword, page=1, max_results=50):
    """搜索商品列表"""
    search_url = f"{self.base_url}/search?q={keyword}&page={page}"

    try:
        response = self.make_request(search_url)
        soup = BeautifulSoup(response.content, 'html.parser')

        # 提取商品列表
        products = self.extract_product_list(soup)
        self.logger.info(f"成功提取 {len(products)} 个商品信息")

        return products[:max_results]

    except Exception as e:
        self.logger.error(f"搜索商品失败: {e}")
        return []

def extract_product_list(self, soup):
    """从搜索结果页面提取商品信息"""
    products = []

    # 查找商品容器
    product_containers = soup.find_all('div', {'data-automation-id': 'product-tile'})

    for container in product_containers:
        try:
            product_data = self.extract_single_product(container)
            if product_data:
                products.append(product_data)
        except Exception as e:
            self.logger.warning(f"提取单个商品失败: {e}")
            continue

    return products

def extract_single_product(self, container):
    """提取单个商品的详细信息"""
    product = {}

    try:
        # 商品标题
        title_elem = container.find('span', {'data-automation-id': 'product-title'})
        product['title'] = title_elem.get_text(strip=True) if title_elem else ''

        # 价格信息
        price_elem = container.find('div', {'data-automation-id': 'product-price'})
        if price_elem:
            price_text = price_elem.get_text(strip=True)
            product['price'] = self.clean_price(price_text)

        # 商品链接
        link_elem = container.find('a', href=True)
        if link_elem:
            product['url'] = urljoin(self.base_url, link_elem['href'])
            # 从URL中提取商品ID
            product['product_id'] = self.extract_product_id(product['url'])

        # 评分信息
        rating_elem = container.find('span', class_=re.compile(r'.*rating.*'))
        if rating_elem:
            rating_text = rating_elem.get('aria-label', '')
            product['rating'] = self.extract_rating(rating_text)

        # 图片
        img_elem = container.find('img')
        if img_elem:
            product['image_url'] = img_elem.get('src', '')

        # 供应商信息
        seller_elem = container.find('span', string=re.compile(r'Sold by'))
        if seller_elem:
            product['seller'] = seller_elem.get_text(strip=True)

        return product if product.get('title') else None

    except Exception as e:
        self.logger.warning(f"解析商品数据失败: {e}")
        return None

def get_product_details(self, product_url):
    """获取商品详细页面信息"""
    try:
        response = self.make_request(product_url)
        soup = BeautifulSoup(response.content, 'html.parser')

        details = {}

        # 从script标签中提取JSON数据
        script_tags = soup.find_all('script', {'type': 'application/ld+json'})
        for script in script_tags:
            try:
                json_data = json.loads(script.string)
                if '@type' in json_data and json_data['@type'] == 'Product':
                    details.update(self.parse_product_json(json_data))
                    break
            except json.JSONDecodeError:
                continue

        # 商品描述
        desc_elem = soup.find('div', {'data-automation-id': 'product-highlights'})
        if desc_elem:
            details['description'] = desc_elem.get_text(strip=True)

        # 库存状态
        stock_elem = soup.find('div', {'data-automation-id': 'fulfillment-section'})
        if stock_elem:
            details['in_stock'] = 'in stock' in stock_elem.get_text().lower()

        return details

    except Exception as e:
        self.logger.error(f"获取商品详情失败: {e}")
        return {}

def clean_price(self, price_text):
    """清理价格文本"""
    if not price_text:
        return None

    # 提取数字和小数点
    price_match = re.search(r'\$?(\d+\.?\d*)', price_text.replace(',', ''))
    return float(price_match.group(1)) if price_match else None

def extract_product_id(self, url):
    """从URL中提取商品ID"""
    try:
        parsed_url = urlparse(url)
        path_parts = parsed_url.path.split('/')
        for part in path_parts:
            if part.isdigit():
                return part
    except:
        pass
    return None

def extract_rating(self, rating_text):
    """提取评分数值"""
    rating_match = re.search(r'(\d+\.?\d*)', rating_text)
    return float(rating_match.group(1)) if rating_match else None

def parse_product_json(self, json_data):
    """解析产品JSON数据"""
    details = {}

    if 'name' in json_data:
        details['full_name'] = json_data['name']

    if 'offers' in json_data:
        offer = json_data['offers']
        if isinstance(offer, list):
            offer = offer[0]

        details['availability'] = offer.get('availability', '')
        details['currency'] = offer.get('priceCurrency', 'USD')

        if 'price' in offer:
            details['detailed_price'] = float(offer['price'])

    if 'aggregateRating' in json_data:
        rating_data = json_data['aggregateRating']
        details['average_rating'] = float(rating_data.get('ratingValue', 0))
        details['review_count'] = int(rating_data.get('reviewCount', 0))

    return details
Enter fullscreen mode Exit fullscreen mode

应对反爬虫策略

  1. IP代理池集成 现代电商网站都部署了先进的反爬虫系统。为了构建稳定的自动化沃尔玛爬虫系统(Automated Walmart Scraping System),我们需要集成IP代理池: # utils/proxy_handler.py import requests import random import threading from queue import Queue import time

class ProxyHandler:
def init(self, proxy_list=None):
self.proxy_queue = Queue()
self.failed_proxies = set()
self.proxy_stats = {}
self.lock = threading.Lock()

    if proxy_list:
        self.load_proxies(proxy_list)

def load_proxies(self, proxy_list):
    """加载代理列表"""
    for proxy in proxy_list:
        self.proxy_queue.put(proxy)
        self.proxy_stats[proxy] = {'success': 0, 'failed': 0}

def get_proxy(self):
    """获取可用代理"""
    with self.lock:
        while not self.proxy_queue.empty():
            proxy = self.proxy_queue.get()
            if proxy not in self.failed_proxies:
                return proxy
    return None

def test_proxy(self, proxy, test_url="http://httpbin.org/ip"):
    """测试代理是否可用"""
    try:
        proxies = {
            'http': f'http://{proxy}',
            'https': f'https://{proxy}'
        }

        response = requests.get(
            test_url, 
            proxies=proxies, 
            timeout=10,
            headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
        )

        if response.status_code == 200:
            self.mark_proxy_success(proxy)
            return True

    except requests.RequestException:
        pass

    self.mark_proxy_failed(proxy)
    return False

def mark_proxy_success(self, proxy):
    """标记代理成功"""
    with self.lock:
        if proxy in self.proxy_stats:
            self.proxy_stats[proxy]['success'] += 1
        # 成功的代理重新放回队列
        self.proxy_queue.put(proxy)

def mark_proxy_failed(self, proxy):
    """标记代理失败"""
    with self.lock:
        if proxy in self.proxy_stats:
            self.proxy_stats[proxy]['failed'] += 1

        # 失败次数过多的代理加入黑名单
        if self.proxy_stats[proxy]['failed'] > 3:
            self.failed_proxies.add(proxy)
Enter fullscreen mode Exit fullscreen mode

集成代理的爬虫类

class WalmartScraperWithProxy(WalmartScraper):
def init(self, proxy_list=None):
super().init()
self.proxy_handler = ProxyHandler(proxy_list) if proxy_list else None

def make_request_with_proxy(self, url, max_retries=3):
    """使用代理发送请求"""
    for attempt in range(max_retries):
        proxy = self.proxy_handler.get_proxy() if self.proxy_handler else None

        try:
            headers = self.get_headers()
            proxies = None

            if proxy:
                proxies = {
                    'http': f'http://{proxy}',
                    'https': f'https://{proxy}'
                }

            response = self.session.get(
                url, 
                headers=headers, 
                proxies=proxies,
                timeout=15
            )
            response.raise_for_status()

            if proxy and self.proxy_handler:
                self.proxy_handler.mark_proxy_success(proxy)

            return response

        except requests.RequestException as e:
            if proxy and self.proxy_handler:
                self.proxy_handler.mark_proxy_failed(proxy)

            self.logger.warning(f"代理请求失败 {proxy}: {e}")
            self.random_delay(3, 7)

    raise Exception(f"所有代理请求都失败: {url}")
Enter fullscreen mode Exit fullscreen mode
  1. 验证码识别与处理 沃尔玛网站可能会出现验证码挑战。我们需要集成验证码识别服务: # utils/captcha_solver.py import base64 import requests from PIL import Image import io

class CaptchaSolver:
def init(self, api_key=None, service='2captcha'):
self.api_key = api_key
self.service = service
self.base_url = 'http://2captcha.com' if service == '2captcha' else None

def solve_image_captcha(self, image_data):
    """解决图片验证码"""
    if not self.api_key:
        self.logger.warning("未配置验证码服务API密钥")
        return None

    try:
        # 提交验证码
        submit_url = f"{self.base_url}/in.php"

        files = {'file': ('captcha.png', image_data, 'image/png')}
        data = {
            'key': self.api_key,
            'method': 'post'
        }

        response = requests.post(submit_url, files=files, data=data)
        result = response.text

        if 'OK|' in result:
            captcha_id = result.split('|')[1]
            return self.get_captcha_result(captcha_id)

    except Exception as e:
        self.logger.error(f"验证码识别失败: {e}")

    return None

def get_captcha_result(self, captcha_id, max_wait=120):
    """获取验证码识别结果"""
    result_url = f"{self.base_url}/res.php"

    for _ in range(max_wait // 5):
        try:
            response = requests.get(result_url, params={
                'key': self.api_key,
                'action': 'get',
                'id': captcha_id
            })

            result = response.text

            if result == 'CAPCHA_NOT_READY':
                time.sleep(5)
                continue
            elif 'OK|' in result:
                return result.split('|')[1]
            else:
                break

        except Exception as e:
            self.logger.error(f"获取验证码结果失败: {e}")
            break

    return None
Enter fullscreen mode Exit fullscreen mode

数据处理与存储

  1. 数据清洗和标准化 # utils/data_processor.py import pandas as pd import re from datetime import datetime import json

class DataProcessor:
def init(self):
self.cleaned_data = []

def clean_product_data(self, raw_products):
    """清洗商品数据"""
    cleaned_products = []

    for product in raw_products:
        cleaned_product = {}

        # 标题清洗
        title = product.get('title', '').strip()
        cleaned_product['title'] = self.clean_title(title)

        # 价格标准化
        price = product.get('price')
        cleaned_product['price_usd'] = self.standardize_price(price)

        # URL标准化
        url = product.get('url', '')
        cleaned_product['product_url'] = self.clean_url(url)

        # 评分标准化
        rating = product.get('rating')
        cleaned_product['rating_score'] = self.standardize_rating(rating)

        # 添加时间戳
        cleaned_product['scraped_at'] = datetime.now().isoformat()

        # 商品ID
        cleaned_product['product_id'] = product.get('product_id', '')

        # 图片URL
        cleaned_product['image_url'] = product.get('image_url', '')

        # 供应商
        cleaned_product['seller'] = product.get('seller', 'Walmart')

        if cleaned_product['title']:  # 只保留有标题的商品
            cleaned_products.append(cleaned_product)

    return cleaned_products

def clean_title(self, title):
    """清洗商品标题"""
    if not title:
        return ''

    # 移除多余空白字符
    title = re.sub(r'\s+', ' ', title).strip()

    # 移除特殊字符但保留基本标点
    title = re.sub(r'[^\w\s\-\(\)\[\]&,.]', '', title)

    return title[:200]  # 限制长度

def standardize_price(self, price):
    """标准化价格"""
    if price is None:
        return None

    if isinstance(price, str):
        # 移除货币符号和逗号
        price_clean = re.sub(r'[$,]', '', price)
        try:
            return float(price_clean)
        except ValueError:
            return None

    return float(price) if price else None

def clean_url(self, url):
    """清洗URL"""
    if not url:
        return ''

    # 移除追踪参数
    if '?' in url:
        base_url = url.split('?')[0]
        return base_url

    return url

def standardize_rating(self, rating):
    """标准化评分"""
    if rating is None:
        return None

    try:
        rating_float = float(rating)
        # 确保评分在0-5范围内
        return max(0, min(5, rating_float))
    except (ValueError, TypeError):
        return None

def save_to_excel(self, products, filename):
    """保存到Excel文件"""
    if not products:
        self.logger.warning("没有数据要保存")
        return

    df = pd.DataFrame(products)

    # 重新排序列
    column_order = [
        'product_id', 'title', 'price_usd', 'rating_score', 
        'seller', 'product_url', 'image_url', 'scraped_at'
    ]

    df = df.reindex(columns=column_order)

    # 保存到Excel
    with pd.ExcelWriter(filename, engine='openpyxl') as writer:
        df.to_excel(writer, sheet_name='Products', index=False)

        # 添加统计信息
        stats_df = pd.DataFrame({
            '统计项': ['总商品数', '平均价格', '最高价格', '最低价格', '平均评分'],
            '数值': [
                len(df),
                df['price_usd'].mean() if df['price_usd'].notna().any() else 0,
                df['price_usd'].max() if df['price_usd'].notna().any() else 0,
                df['price_usd'].min() if df['price_usd'].notna().any() else 0,
                df['rating_score'].mean() if df['rating_score'].notna().any() else 0
            ]
        })
        stats_df.to_excel(writer, sheet_name='Statistics', index=False)

    print(f"数据已保存到 {filename}")

def save_to_json(self, products, filename):
    """保存到JSON文件"""
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(products, f, ensure_ascii=False, indent=2)

    print(f"JSON数据已保存到 {filename}")
Enter fullscreen mode Exit fullscreen mode
  1. 完整的主程序实现 现在让我们把所有组件整合到一个完整的沃尔玛商品列表抓取工具(Walmart Product List Scraping Tool)中: # main.py import argparse import sys import os from datetime import datetime from scrapers.walmart_scraper import WalmartScraperWithProxy from utils.data_processor import DataProcessor from utils.captcha_solver import CaptchaSolver import logging

class WalmartScrapingManager:
def init(self, proxy_list=None, captcha_api_key=None):
self.scraper = WalmartScraperWithProxy(proxy_list)
self.data_processor = DataProcessor()
self.captcha_solver = CaptchaSolver(captcha_api_key) if captcha_api_key else None
self.logger = logging.getLogger(name)

def scrape_products(self, keywords, max_products_per_keyword=50, output_format='excel'):
    """批量抓取商品数据"""
    all_products = []

    for keyword in keywords:
        self.logger.info(f"开始抓取关键词: {keyword}")

        try:
            # 搜索商品列表
            products = self.scraper.search_products(
                keyword=keyword,
                max_results=max_products_per_keyword
            )

            # 获取详细信息
            detailed_products = []
            for i, product in enumerate(products):
                if product.get('url'):
                    try:
                        details = self.scraper.get_product_details(product['url'])
                        product.update(details)
                        detailed_products.append(product)

                        # 添加关键词标签
                        product['search_keyword'] = keyword

                        self.logger.info(f"已处理 {i+1}/{len(products)} 个商品")

                        # 随机延迟
                        self.scraper.random_delay(1, 3)

                    except Exception as e:
                        self.logger.warning(f"获取商品详情失败: {e}")
                        continue

            all_products.extend(detailed_products)
            self.logger.info(f"关键词 '{keyword}' 抓取完成,获得 {len(detailed_products)} 个商品")

        except Exception as e:
            self.logger.error(f"抓取关键词 '{keyword}' 失败: {e}")
            continue

    # 数据清洗
    cleaned_products = self.data_processor.clean_product_data(all_products)

    # 保存数据
    self.save_results(cleaned_products, output_format)

    return cleaned_products

def save_results(self, products, output_format):
    """保存抓取结果"""
    if not products:
        self.logger.warning("没有数据需要保存")
        return

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    if output_format.lower() == 'excel':
        filename = f"data/output/walmart_products_{timestamp}.xlsx"
        self.data_processor.save_to_excel(products, filename)
    elif output_format.lower() == 'json':
        filename = f"data/output/walmart_products_{timestamp}.json"
        self.data_processor.save_to_json(products, filename)
    else:
        # 同时保存两种格式
        excel_filename = f"data/output/walmart_products_{timestamp}.xlsx"
        json_filename = f"data/output/walmart_products_{timestamp}.json"
        self.data_processor.save_to_excel(products, excel_filename)
        self.data_processor.save_to_json(products, json_filename)
Enter fullscreen mode Exit fullscreen mode

def main():
parser = argparse.ArgumentParser(description='沃尔玛商品数据抓取工具')
parser.add_argument('--keywords', nargs='+', required=True, help='搜索关键词列表')
parser.add_argument('--max-products', type=int, default=50, help='每个关键词最大抓取商品数')
parser.add_argument('--output-format', choices=['excel', 'json', 'both'], default='excel', help='输出格式')
parser.add_argument('--proxy-file', help='代理列表文件路径')
parser.add_argument('--captcha-api-key', help='验证码识别服务API密钥')

args = parser.parse_args()

# 确保输出目录存在
os.makedirs('data/output', exist_ok=True)

# 加载代理列表
proxy_list = None
if args.proxy_file and os.path.exists(args.proxy_file):
    with open(args.proxy_file, 'r') as f:
        proxy_list = [line.strip() for line in f if line.strip()]

# 创建爬虫管理器
scraper_manager = WalmartScrapingManager(
    proxy_list=proxy_list,
    captcha_api_key=args.captcha_api_key
)

# 开始抓取
try:
    products = scraper_manager.scrape_products(
        keywords=args.keywords,
        max_products_per_keyword=args.max_products,
        output_format=args.output_format
    )

    print(f"\n抓取完成!总共获得 {len(products)} 个商品数据")

    # 显示统计信息
    if products:
        prices = [p['price_usd'] for p in products if p.get('price_usd')]
        ratings = [p['rating_score'] for p in products if p.get('rating_score')]

        print(f"价格统计: 平均 ${sum(prices)/len(prices):.2f}" if prices else "无价格数据")
        print(f"评分统计: 平均 {sum(ratings)/len(ratings):.2f}" if ratings else "无评分数据")

except KeyboardInterrupt:
    print("\n用户中断抓取过程")
except Exception as e:
    print(f"抓取过程出现错误: {e}")
    sys.exit(1)
Enter fullscreen mode Exit fullscreen mode

if name == "main":
main()
常见挑战与解决方案

  1. 动态内容加载 现代电商网站大量使用JavaScript动态加载内容。对于这种情况,我们需要使用Selenium来处理: # scrapers/selenium_scraper.py from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options import undetected_chromedriver as uc

class SeleniumWalmartScraper:
def init(self, headless=True, proxy=None):
self.setup_driver(headless, proxy)

def setup_driver(self, headless=True, proxy=None):
    """配置浏览器驱动"""
    options = uc.ChromeOptions()

    if headless:
        options.add_argument('--headless')

    # 反检测设置
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    options.add_argument('--disable-blink-features=AutomationControlled')
    options.add_experimental_option("excludeSwitches", ["enable-automation"])
    options.add_experimental_option('useAutomationExtension', False)

    # 代理设置
    if proxy:
        options.add_argument(f'--proxy-server={proxy}')

    # 用户代理
    options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')

    self.driver = uc.Chrome(options=options)

    # 执行反检测脚本
    self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")

def scrape_with_javascript(self, url, wait_selector=None):
    """使用Selenium抓取动态内容"""
    try:
        self.driver.get(url)

        # 等待特定元素加载
        if wait_selector:
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, wait_selector))
            )

        # 滚动页面触发懒加载
        self.scroll_page()

        # 获取页面源码
        html_content = self.driver.page_source
        return html_content

    except Exception as e:
        print(f"Selenium抓取失败: {e}")
        return None

def scroll_page(self):
    """滚动页面以触发懒加载"""
    last_height = self.driver.execute_script("return document.body.scrollHeight")

    while True:
        # 滚动到页面底部
        self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

        # 等待新内容加载
        time.sleep(2)

        # 计算新的页面高度
        new_height = self.driver.execute_script("return document.body.scrollHeight")

        if new_height == last_height:
            break

        last_height = new_height

def close(self):
    """关闭浏览器"""
    if hasattr(self, 'driver'):
        self.driver.quit()
Enter fullscreen mode Exit fullscreen mode
  1. 分布式爬虫架构 对于大规模数据抓取,我们可以实现分布式爬虫: # distributed/task_manager.py import redis import json import uuid from datetime import datetime, timedelta

class TaskManager:
def init(self, redis_host='localhost', redis_port=6379, redis_db=0):
self.redis_client = redis.Redis(host=redis_host, port=redis_port, db=redis_db)
self.task_queue = 'walmart_scrape_tasks'
self.result_queue = 'walmart_scrape_results'

def add_task(self, keyword, max_products=50, priority=1):
    """添加抓取任务"""
    task_id = str(uuid.uuid4())
    task_data = {
        'task_id': task_id,
        'keyword': keyword,
        'max_products': max_products,
        'priority': priority,
        'created_at': datetime.now().isoformat(),
        'status': 'pending'
    }

    # 使用优先级队列
    self.redis_client.zadd(self.task_queue, {json.dumps(task_data): priority})
    return task_id

def get_task(self):
    """获取待处理任务"""
    # 获取最高优先级任务
    task_data = self.redis_client.zpopmax(self.task_queue)

    if task_data:
        task_json = task_data[0][0].decode('utf-8')
        return json.loads(task_json)

    return None

def save_result(self, task_id, products, status='completed'):
    """保存抓取结果"""
    result_data = {
        'task_id': task_id,
        'products': products,
        'status': status,
        'completed_at': datetime.now().isoformat(),
        'product_count': len(products)
    }

    self.redis_client.lpush(self.result_queue, json.dumps(result_data))

def get_results(self, limit=10):
    """获取抓取结果"""
    results = []
    for _ in range(limit):
        result_data = self.redis_client.rpop(self.result_queue)
        if result_data:
            results.append(json.loads(result_data.decode('utf-8')))
        else:
            break

    return results
Enter fullscreen mode Exit fullscreen mode

distributed/worker.py

import time
import logging
from task_manager import TaskManager
from scrapers.walmart_scraper import WalmartScraperWithProxy

class ScrapingWorker:
def init(self, worker_id, proxy_list=None):
self.worker_id = worker_id
self.task_manager = TaskManager()
self.scraper = WalmartScraperWithProxy(proxy_list)
self.logger = logging.getLogger(f'Worker-{worker_id}')

def run(self):
    """工作进程主循环"""
    self.logger.info(f"工作进程 {self.worker_id} 启动")

    while True:
        try:
            # 获取任务
            task = self.task_manager.get_task()

            if task:
                self.logger.info(f"处理任务: {task['task_id']}")
                self.process_task(task)
            else:
                # 没有任务时休眠
                time.sleep(5)

        except KeyboardInterrupt:
            self.logger.info("工作进程停止")
            break
        except Exception as e:
            self.logger.error(f"工作进程异常: {e}")
            time.sleep(10)

def process_task(self, task):
    """处理单个抓取任务"""
    try:
        keyword = task['keyword']
        max_products = task['max_products']

        # 执行抓取
        products = self.scraper.search_products(keyword, max_results=max_products)

        # 保存结果
        self.task_manager.save_result(
            task['task_id'], 
            products, 
            'completed'
        )

        self.logger.info(f"任务 {task['task_id']} 完成,抓取 {len(products)} 个商品")

    except Exception as e:
        self.logger.error(f"任务处理失败: {e}")
        self.task_manager.save_result(
            task['task_id'], 
            [], 
            'failed'
        )
Enter fullscreen mode Exit fullscreen mode
  1. 监控和告警系统 # monitoring/scraper_monitor.py import psutil import time import smtplib from email.mime.text import MimeText from datetime import datetime, timedelta

class ScraperMonitor:
def init(self, email_config=None):
self.email_config = email_config
self.performance_log = []

def monitor_performance(self):
    """监控系统性能"""
    cpu_percent = psutil.cpu_percent(interval=1)
    memory_percent = psutil.virtual_memory().percent
    disk_percent = psutil.disk_usage('/').percent

    performance_data = {
        'timestamp': datetime.now(),
        'cpu_percent': cpu_percent,
        'memory_percent': memory_percent,
        'disk_percent': disk_percent
    }

    self.performance_log.append(performance_data)

    # 检查是否需要告警
    if cpu_percent > 80 or memory_percent > 80:
        self.send_alert(f"系统资源使用率过高: CPU {cpu_percent}%, 内存 {memory_percent}%")

    return performance_data

def send_alert(self, message):
    """发送告警邮件"""
    if not self.email_config:
        print(f"告警: {message}")
        return

    try:
        msg = MimeText(f"沃尔玛爬虫系统告警\n\n{message}\n\n时间: {datetime.now()}")
        msg['Subject'] = '爬虫系统告警'
        msg['From'] = self.email_config['from']
        msg['To'] = self.email_config['to']

        server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
        server.starttls()
        server.login(self.email_config['username'], self.email_config['password'])
        server.send_message(msg)
        server.quit()

        print(f"告警邮件已发送: {message}")

    except Exception as e:
        print(f"发送告警邮件失败: {e}")
Enter fullscreen mode Exit fullscreen mode

高级优化技巧

  1. 智能重试机制 # utils/retry_handler.py import time import random from functools import wraps

def smart_retry(max_retries=3, base_delay=1, backoff_factor=2, jitter=True):
"""智能重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None

        for attempt in range(max_retries):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                last_exception = e

                if attempt < max_retries - 1:
                    # 计算延迟时间
                    delay = base_delay * (backoff_factor ** attempt)

                    # 添加随机抖动
                    if jitter:
                        delay += random.uniform(0, delay * 0.1)

                    print(f"重试 {attempt + 1}/{max_retries},{delay:.2f}秒后重试")
                    time.sleep(delay)
                else:
                    print(f"所有重试都失败,最后异常: {e}")

        raise last_exception

    return wrapper
return decorator
Enter fullscreen mode Exit fullscreen mode
  1. 数据去重和缓存 # utils/cache_manager.py import hashlib import json import os from datetime import datetime, timedelta

class CacheManager:
def init(self, cache_dir='cache', expire_hours=24):
self.cache_dir = cache_dir
self.expire_hours = expire_hours
os.makedirs(cache_dir, exist_ok=True)

def get_cache_key(self, url):
    """生成缓存键"""
    return hashlib.md5(url.encode()).hexdigest()

def get_cache_file(self, cache_key):
    """获取缓存文件路径"""
    return os.path.join(self.cache_dir, f"{cache_key}.json")

def is_cache_valid(self, cache_file):
    """检查缓存是否有效"""
    if not os.path.exists(cache_file):
        return False

    file_time = datetime.fromtimestamp(os.path.getmtime(cache_file))
    expire_time = datetime.now() - timedelta(hours=self.expire_hours)

    return file_time > expire_time

def get_cached_data(self, url):
    """获取缓存数据"""
    cache_key = self.get_cache_key(url)
    cache_file = self.get_cache_file(cache_key)

    if self.is_cache_valid(cache_file):
        try:
            with open(cache_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except Exception:
            pass

    return None

def save_to_cache(self, url, data):
    """保存数据到缓存"""
    cache_key = self.get_cache_key(url)
    cache_file = self.get_cache_file(cache_key)

    try:
        with open(cache_file, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
    except Exception as e:
        print(f"保存缓存失败: {e}")
Enter fullscreen mode Exit fullscreen mode

class DataDeduplicator:
def init(self):
self.seen_products = set()

def is_duplicate(self, product):
    """检查商品是否重复"""
    # 使用商品ID和标题创建唯一标识
    identifier = f"{product.get('product_id', '')}-{product.get('title', '')}"
    identifier_hash = hashlib.md5(identifier.encode()).hexdigest()

    if identifier_hash in self.seen_products:
        return True

    self.seen_products.add(identifier_hash)
    return False

def deduplicate_products(self, products):
    """去重商品列表"""
    unique_products = []

    for product in products:
        if not self.is_duplicate(product):
            unique_products.append(product)

    print(f"去重前: {len(products)} 个商品,去重后: {len(unique_products)} 个商品")
    return unique_products
Enter fullscreen mode Exit fullscreen mode

性能优化与扩展

  1. 异步并发处理 # async_scraper.py import asyncio import aiohttp from aiohttp import ClientTimeout import async_timeout

class AsyncWalmartScraper:
def init(self, max_concurrent=10):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)

async def fetch_page(self, session, url):
    """异步获取页面"""
    async with self.semaphore:
        try:
            timeout = ClientTimeout(total=30)
            async with session.get(url, timeout=timeout) as response:
                if response.status == 200:
                    return await response.text()
                else:
                    print(f"HTTP错误 {response.status}: {url}")
        except Exception as e:
            print(f"请求失败: {e}")

        return None

async def scrape_multiple_urls(self, urls):
    """并发抓取多个URL"""
    async with aiohttp.ClientSession() as session:
        tasks = [self.fetch_page(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # 过滤成功的结果
        successful_results = [r for r in results if isinstance(r, str)]
        print(f"成功抓取 {len(successful_results)}/{len(urls)} 个页面")

        return successful_results
Enter fullscreen mode Exit fullscreen mode

实际应用场景示例
使用示例

基本使用

python main.py --keywords "wireless headphones" "bluetooth speaker" --max-products 30

使用代理

python main.py --keywords "laptop" --proxy-file proxies.txt --output-format both

大批量抓取

python main.py --keywords "electronics" "home garden" "sports" --max-products 100 --output-format json
配置代理文件示例 (proxies.txt)
192.168.1.100:8080
203.123.45.67:3128
104.248.63.15:30588
167.172.180.46:41258
为什么选择专业的API服务
虽然我们已经详细介绍了如何构建一个功能完善的沃尔玛爬虫系统,但在实际业务应用中,构建和维护自己的爬虫系统面临诸多挑战:
技术维护成本高:电商网站频繁更新反爬虫策略,需要持续投入技术资源进行适配和优化。
法律合规风险:不当的爬虫行为可能面临法律风险,需要专业的合规指导。
基础设施投入大:稳定的代理服务、验证码识别、分布式架构都需要大量资金投入。
数据质量保证难:确保数据的准确性、完整性和时效性需要专业的质量控制体系。
Pangolin Scrape API:专业的电商数据解决方案
如果您专注于沃尔玛运营和选品,希望将专业的数据采集工作交给专业团队,Pangolin Scrape API是您的理想选择。
核心优势
免维护智能解析:Pangolin Scrape API采用智能识别算法,自动适配沃尔玛等电商平台的页面结构变化,开发者无需关注DOM结构更新。
丰富的数据字段:支持抓取商品ID、图片、标题、评分、评论数、尺寸、颜色、描述、价格、库存状态等全面的商品信息。
多种调用方式:提供同步和异步两种API调用方式,满足不同业务场景需求。
快速集成示例
使用Pangolin Scrape API抓取沃尔玛商品信息非常简单:
import requests
import json

认证获取token

auth_url = "http://scrapeapi.pangolinfo.com/api/v1/auth"
auth_data = {
"email": "your_email@gmail.com",
"password": "your_password"
}

response = requests.post(auth_url, json=auth_data)
token = response.json()['data']

抓取沃尔玛商品详情

scrape_url = "http://scrapeapi.pangolinfo.com/api/v1"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}"
}

scrape_data = {
"url": "https://www.walmart.com/ip/your-product-url",
"parserName": "walmProductDetail",
"formats": ["json"]
}

result = requests.post(scrape_url, headers=headers, json=scrape_data)
product_data = result.json()
服务特色
7x24小时稳定服务:专业运维团队保障服务稳定性
智能反爬虫应对:内置IP轮换、请求头随机化等反检测机制
数据质量保证:多重验证确保数据准确性和完整性
灵活的输出格式:支持JSON、Markdown、原始HTML多种格式
按需付费:根据实际使用量付费,降低成本
通过Pangolin Scrape API,您可以将更多精力投入到核心业务逻辑中,而无需担心复杂的技术实现和维护工作。
总结
本文全面介绍了如何使用Python构建一个专业级的沃尔玛爬虫系统,涵盖了从基础环境搭建到高级优化技巧的完整流程。我们详细讲解了应对反爬虫策略、数据处理、分布式架构等关键技术点,并提供了丰富的代码示例。
构建自己的爬虫系统虽然能够深度定制,但也面临着技术维护、合规风险、成本投入等诸多挑战。对于专注业务发展的企业而言,选择像Pangolin Scrape API这样的专业服务,能够更高效地获取所需数据,同时避免技术陷阱。
无论选择自建还是使用专业服务,关键是要根据自己的业务需求、技术能力和资源投入来做出明智的决策。数据驱动的电商时代,掌握准确、及时的市场信息就是掌握了竞争的主动权。
正如古人云:"工欲善其事,必先利其器"——选择合适的数据采集方案,让您在电商征途中事半功倍,决胜千里。

Top comments (0)