DEV Community

Muhammad Ikramullah Khan
Muhammad Ikramullah Khan

Posted on

Scrapy-Playwright Advanced Patterns: Master Complex Scraping Scenarios

After learning basic Playwright with Scrapy, I thought I could scrape any JavaScript site. Then I hit a site with infinite scroll that loaded data in batches. My simple scroll-and-wait approach loaded 20 items when there were 10,000.

I needed advanced patterns: detecting when new content loaded, handling dynamic URLs, managing browser state. Once I learned these patterns, complex sites became manageable.

Let me show you the advanced techniques that separate beginners from experts.


Pattern 1: Infinite Scroll with Smart Detection

Don't just scroll and hope. Detect when new content actually loads.

Problem with Basic Approach

# BAD: Scrolls but might miss content or wait too long
for i in range(100):
    await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
    await page.wait_for_timeout(2000)  # Arbitrary wait
Enter fullscreen mode Exit fullscreen mode

Issues:

  • Fixed number of scrolls might not be enough
  • Fixed timeout wastes time or misses content
  • No way to know when done

Smart Infinite Scroll

async def parse(self, response):
    page = response.meta['playwright_page']

    previous_count = 0
    no_change_count = 0
    max_no_change = 3  # Stop after 3 scrolls with no new items

    while no_change_count < max_no_change:
        # Count current items
        current_count = await page.locator('.product').count()

        self.logger.info(f'Items loaded: {current_count}')

        # Check if new items loaded
        if current_count == previous_count:
            no_change_count += 1
            self.logger.warning(f'No new items ({no_change_count}/{max_no_change})')
        else:
            no_change_count = 0  # Reset counter
            previous_count = current_count

        # Scroll to bottom
        await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')

        # Wait for potential new content
        try:
            # Wait for count to change or timeout
            await page.wait_for_function(
                f'document.querySelectorAll(".product").length > {current_count}',
                timeout=5000
            )
        except:
            # Timeout means no new items loaded
            pass

    self.logger.info(f'Finished scrolling. Total items: {previous_count}')

    # Get final content
    content = await page.content()
    await page.close()

    # Parse
    from scrapy.http import HtmlResponse
    final_response = HtmlResponse(url=response.url, body=content.encode())

    for product in final_response.css('.product'):
        yield {
            'name': product.css('h2::text').get(),
            'price': product.css('.price::text').get()
        }
Enter fullscreen mode Exit fullscreen mode

What this does better:

  • Counts actual items, not arbitrary scrolls
  • Detects when no new items load
  • Uses wait_for_function to wait for specific condition
  • Stops automatically when done

Pattern 2: Load More Button with Dynamic Loading

Many sites have "Load More" buttons that trigger AJAX requests.

Naive Approach

# BAD: Clicks button without waiting properly
await page.click('button.load-more')
await page.wait_for_timeout(2000)  # Hope it loaded
Enter fullscreen mode Exit fullscreen mode

Smart Load More

async def parse(self, response):
    page = response.meta['playwright_page']

    while True:
        # Get current item count
        current_count = await page.locator('.product').count()

        # Check if "Load More" button exists and is visible
        load_more = page.locator('button.load-more')

        if not await load_more.is_visible():
            self.logger.info('No more "Load More" button')
            break

        # Click button
        await load_more.click()

        # Wait for new items to appear
        try:
            await page.wait_for_function(
                f'document.querySelectorAll(".product").length > {current_count}',
                timeout=10000
            )

            new_count = await page.locator('.product').count()
            self.logger.info(f'Loaded {new_count - current_count} new items')

        except Exception as e:
            self.logger.error(f'Failed to load more items: {e}')
            break

        # Small delay to avoid overwhelming server
        await page.wait_for_timeout(500)

    content = await page.content()
    await page.close()

    # Parse all items
    final_response = HtmlResponse(url=response.url, body=content.encode())

    for product in final_response.css('.product'):
        yield {'name': product.css('h2::text').get()}
Enter fullscreen mode Exit fullscreen mode

Pattern 3: Multi-Step Forms and Workflows

Navigate through complex multi-step processes.

Example: Product Search with Filters

async def parse(self, response):
    page = response.meta['playwright_page']

    # Step 1: Search
    await page.fill('input#search', 'laptop')
    await page.click('button.search')
    await page.wait_for_selector('.search-results')

    # Step 2: Apply filters
    await page.click('input#filter-brand-dell')
    await page.click('input#filter-price-500-1000')

    # Wait for results to update (AJAX)
    await page.wait_for_function(
        'document.querySelector(".loading-indicator") === null'
    )

    # Step 3: Sort results
    await page.select_option('select#sort', 'price-low-high')
    await page.wait_for_load_state('networkidle')

    # Step 4: Extract data
    content = await page.content()
    await page.close()

    # Parse
    final_response = HtmlResponse(url=response.url, body=content.encode())

    for product in final_response.css('.product'):
        yield {
            'name': product.css('h2::text').get(),
            'price': product.css('.price::text').get(),
            'brand': 'Dell',
            'price_range': '500-1000'
        }
Enter fullscreen mode Exit fullscreen mode

Pattern 4: Handling Popups and Modals

Deal with annoying popups that block content.

Smart Popup Handler

async def parse(self, response):
    page = response.meta['playwright_page']

    # Define popup handler
    async def close_popups():
        # Common popup close selectors
        close_selectors = [
            '.popup-close',
            '.modal-close',
            'button.close',
            '[aria-label="Close"]',
            '.cookie-accept',
            '#newsletter-close'
        ]

        for selector in close_selectors:
            try:
                if await page.locator(selector).is_visible(timeout=1000):
                    await page.click(selector)
                    self.logger.info(f'Closed popup: {selector}')
                    await page.wait_for_timeout(500)
            except:
                pass  # Popup not found, continue

    # Close popups immediately
    await close_popups()

    # Wait for main content
    await page.wait_for_selector('.product-list')

    # Check for popups again (some appear on scroll)
    await page.evaluate('window.scrollTo(0, 500)')
    await close_popups()

    # Now scrape
    content = await page.content()
    await page.close()

    final_response = HtmlResponse(url=response.url, body=content.encode())

    for product in final_response.css('.product'):
        yield {'name': product.css('h2::text').get()}
Enter fullscreen mode Exit fullscreen mode

Pattern 5: File Downloads

Download files (PDFs, images, etc.) using Playwright.

Download Handler

async def parse(self, response):
    page = response.meta['playwright_page']

    # Set up download handler
    downloads = []

    async def handle_download(download):
        # Get suggested filename
        filename = download.suggested_filename

        # Save to specific path
        path = f'/tmp/downloads/{filename}'
        await download.save_as(path)

        downloads.append({
            'filename': filename,
            'path': path,
            'url': download.url
        })

        self.logger.info(f'Downloaded: {filename}')

    # Listen for downloads
    page.on('download', handle_download)

    # Click download links
    download_buttons = await page.locator('a.download').all()

    for button in download_buttons:
        await button.click()
        await page.wait_for_timeout(1000)  # Wait for download to start

    # Wait a bit for all downloads to complete
    await page.wait_for_timeout(3000)

    await page.close()

    # Yield download information
    for download_info in downloads:
        yield download_info
Enter fullscreen mode Exit fullscreen mode

Pattern 6: Dealing with Shadow DOM

Access elements inside Shadow DOM (Web Components).

Shadow DOM Extraction

async def parse(self, response):
    page = response.meta['playwright_page']

    # Method 1: Using evaluate
    shadow_content = await page.evaluate('''
        () => {
            const host = document.querySelector('my-custom-element');
            if (!host || !host.shadowRoot) return null;

            const data = host.shadowRoot.querySelector('.data');
            return data ? data.textContent : null;
        }
    ''')

    self.logger.info(f'Shadow DOM data: {shadow_content}')

    # Method 2: Using Playwright's piercing selectors
    # (Works in newer versions)
    try:
        data = await page.locator('my-custom-element').locator('.data').text_content()
        self.logger.info(f'Data from piercing: {data}')
    except:
        pass

    await page.close()

    yield {'shadow_data': shadow_content}
Enter fullscreen mode Exit fullscreen mode

Pattern 7: Extracting Data from Canvas/Charts

Some sites render data in Canvas elements (charts, graphs).

Canvas Data Extraction

async def parse(self, response):
    page = response.meta['playwright_page']

    # Wait for chart to render
    await page.wait_for_selector('canvas#chart')

    # Extract data from JavaScript
    chart_data = await page.evaluate('''
        () => {
            // Many chart libraries store data in global variables
            if (window.chartData) {
                return window.chartData;
            }

            // Or access through chart instance
            if (window.myChart) {
                return window.myChart.data;
            }

            return null;
        }
    ''')

    if chart_data:
        for data_point in chart_data.get('datasets', [{}])[0].get('data', []):
            yield {'value': data_point}
    else:
        # Fallback: take screenshot and use OCR (covered in another blog)
        await page.screenshot(path='chart.png')

    await page.close()
Enter fullscreen mode Exit fullscreen mode

Pattern 8: Monitoring Network Requests

Intercept AJAX/API calls to extract data directly.

Network Request Monitoring

async def parse(self, response):
    page = response.meta['playwright_page']

    api_data = []

    # Monitor network requests
    async def handle_response(response):
        # Look for API calls
        if '/api/' in response.url and response.status == 200:
            try:
                # Get JSON response
                data = await response.json()
                api_data.append(data)
                self.logger.info(f'Captured API call: {response.url}')
            except:
                pass

    page.on('response', handle_response)

    # Navigate or interact with page
    await page.goto('https://example.com/products')
    await page.wait_for_load_state('networkidle')

    # Scroll to trigger more API calls
    for i in range(5):
        await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
        await page.wait_for_timeout(2000)

    await page.close()

    # Extract data from captured API responses
    for api_response in api_data:
        for item in api_response.get('items', []):
            yield {
                'name': item.get('name'),
                'price': item.get('price')
            }
Enter fullscreen mode Exit fullscreen mode

Pro tip: This is often better than parsing HTML!


Pattern 9: Session Persistence Across Requests

Maintain login sessions across multiple pages.

Context Reuse

class SessionSpider(scrapy.Spider):
    name = 'session'

    def start_requests(self):
        # First request: login
        yield scrapy.Request(
            'https://example.com/login',
            meta={
                'playwright': True,
                'playwright_include_page': True,
                'playwright_context': 'persistent_context'  # Use named context
            },
            callback=self.login
        )

    async def login(self, response):
        page = response.meta['playwright_page']

        # Login
        await page.fill('input#username', 'myuser')
        await page.fill('input#password', 'mypass')
        await page.click('button.login')
        await page.wait_for_selector('.dashboard')

        self.logger.info('Login successful')
        await page.close()

        # Now scrape protected pages (using same context)
        for i in range(1, 11):
            yield scrapy.Request(
                f'https://example.com/protected/page{i}',
                meta={
                    'playwright': True,
                    'playwright_context': 'persistent_context'  # Same context!
                },
                callback=self.parse_protected
            )

    def parse_protected(self, response):
        # Session is maintained, no need to login again!
        for item in response.css('.item'):
            yield {'data': item.css('.data::text').get()}
Enter fullscreen mode Exit fullscreen mode

Pattern 10: Parallel Browser Contexts

Run multiple browser contexts for speed.

Concurrent Contexts

# settings.py
PLAYWRIGHT_MAX_CONTEXTS = 5  # Run 5 browser contexts in parallel
Enter fullscreen mode Exit fullscreen mode
def start_requests(self):
    urls = [f'https://example.com/page{i}' for i in range(1, 101)]

    for url in urls:
        yield scrapy.Request(
            url,
            meta={'playwright': True},
            callback=self.parse
        )

async def parse(self, response):
    page = response.meta['playwright_page']

    # Each request gets its own context
    # Up to PLAYWRIGHT_MAX_CONTEXTS run in parallel

    await page.wait_for_selector('.product')
    content = await page.content()
    await page.close()

    final_response = HtmlResponse(url=response.url, body=content.encode())

    for product in final_response.css('.product'):
        yield {'name': product.css('h2::text').get()}
Enter fullscreen mode Exit fullscreen mode

Pattern 11: Custom User Agents and Headers

Set custom headers for each Playwright request.

Custom Headers

async def parse(self, response):
    page = response.meta['playwright_page']

    # Set extra headers
    await page.set_extra_http_headers({
        'User-Agent': 'MyCustomBot/1.0',
        'Accept-Language': 'en-US,en;q=0.9',
        'Referer': 'https://google.com'
    })

    # Now navigate
    await page.goto('https://example.com')

    content = await page.content()
    await page.close()

    # Parse...
Enter fullscreen mode Exit fullscreen mode

Better approach (set in settings):

# settings.py
PLAYWRIGHT_LAUNCH_OPTIONS = {
    'headless': True
}

PLAYWRIGHT_CONTEXTS = {
    'default': {
        'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)...',
        'viewport': {'width': 1920, 'height': 1080},
        'locale': 'en-US'
    }
}
Enter fullscreen mode Exit fullscreen mode

Pattern 12: Mobile Device Emulation

Scrape mobile versions of sites.

Mobile Emulation

# settings.py
PLAYWRIGHT_CONTEXTS = {
    'default': {
        'viewport': {'width': 375, 'height': 667},  # iPhone SE size
        'user_agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)...',
        'is_mobile': True,
        'has_touch': True
    }
}
Enter fullscreen mode Exit fullscreen mode

Or per-request:

async def parse(self, response):
    page = response.meta['playwright_page']

    # Emulate iPhone
    await page.set_viewport_size({'width': 375, 'height': 667})

    await page.goto('https://example.com')

    # Mobile version loaded
    content = await page.content()
    await page.close()
Enter fullscreen mode Exit fullscreen mode

Pattern 13: Screenshot on Error for Debugging

Automatically capture screenshots when things go wrong.

Error Screenshot Handler

async def parse(self, response):
    page = response.meta['playwright_page']

    try:
        await page.wait_for_selector('.product', timeout=10000)
        content = await page.content()

        # Parse content
        final_response = HtmlResponse(url=response.url, body=content.encode())

        for product in final_response.css('.product'):
            yield {'name': product.css('h2::text').get()}

    except Exception as e:
        # Error occurred, take screenshot
        screenshot_path = f'errors/error_{response.url.split("/")[-1]}_{int(time.time())}.png'
        await page.screenshot(path=screenshot_path, full_page=True)

        self.logger.error(f'Error on {response.url}: {e}')
        self.logger.error(f'Screenshot saved: {screenshot_path}')

    finally:
        await page.close()
Enter fullscreen mode Exit fullscreen mode

Pattern 14: Extracting Data Before Full Page Load

Sometimes you don't need the full page. Extract data as soon as it appears.

Early Extraction

async def parse(self, response):
    page = response.meta['playwright_page']

    # Don't wait for full page load
    await page.goto('https://example.com', wait_until='domcontentloaded')

    # Or even earlier
    # await page.goto('https://example.com', wait_until='commit')

    # Extract specific element as soon as it appears
    await page.wait_for_selector('.main-content')

    # Don't wait for images, ads, etc
    content = await page.content()
    await page.close()

    # Parse
    final_response = HtmlResponse(url=response.url, body=content.encode())

    for item in final_response.css('.item'):
        yield {'data': item.css('.data::text').get()}
Enter fullscreen mode Exit fullscreen mode

Speed improvement: Can be 2-5x faster for heavy pages!


Pattern 15: Handling Dynamic URLs (SPA Routing)

Single Page Apps change URL without page reload.

SPA Navigation

async def parse(self, response):
    page = response.meta['playwright_page']

    # Initial page
    await page.goto('https://example.com')

    # Click through different routes (no page reload)
    routes = ['#/products', '#/about', '#/contact']

    for route in routes:
        # Click link or change URL directly
        await page.evaluate(f'window.location.hash = "{route}"')

        # Wait for content to change
        await page.wait_for_load_state('networkidle')

        # Extract data for this route
        content = await page.content()
        route_response = HtmlResponse(
            url=f'https://example.com{route}',
            body=content.encode()
        )

        # Parse route-specific content
        for item in route_response.css('.route-content'):
            yield {
                'route': route,
                'data': item.css('.data::text').get()
            }

    await page.close()
Enter fullscreen mode Exit fullscreen mode

Complete Real-World Example: Amazon-Like Site

Putting it all together:

import scrapy
from scrapy.http import HtmlResponse
import time

class AdvancedEcommerceSpider(scrapy.Spider):
    name = 'advanced_ecommerce'

    custom_settings = {
        'DOWNLOAD_HANDLERS': {
            'http': 'scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler',
            'https': 'scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler',
        },
        'PLAYWRIGHT_MAX_CONTEXTS': 3,
        'CONCURRENT_REQUESTS': 3
    }

    def start_requests(self):
        yield scrapy.Request(
            'https://example.com/electronics',
            meta={
                'playwright': True,
                'playwright_include_page': True
            },
            callback=self.parse
        )

    async def parse(self, response):
        page = response.meta['playwright_page']

        try:
            # Close popups
            await self.close_popups(page)

            # Apply filters
            await self.apply_filters(page)

            # Load all products
            await self.load_all_products(page)

            # Extract product data
            content = await page.content()

        except Exception as e:
            self.logger.error(f'Error: {e}')
            await page.screenshot(path=f'error_{int(time.time())}.png')

        finally:
            await page.close()

        # Parse
        final_response = HtmlResponse(url=response.url, body=content.encode())

        for product in final_response.css('.product'):
            yield {
                'name': product.css('h2::text').get(),
                'price': product.css('.price::text').get(),
                'rating': product.css('.rating::text').get(),
                'reviews': product.css('.review-count::text').get()
            }

    async def close_popups(self, page):
        """Close any popups that might appear"""
        close_selectors = [
            '.popup-close',
            '.cookie-accept',
            '[aria-label="Close"]'
        ]

        for selector in close_selectors:
            try:
                if await page.locator(selector).is_visible(timeout=2000):
                    await page.click(selector)
                    await page.wait_for_timeout(500)
            except:
                pass

    async def apply_filters(self, page):
        """Apply product filters"""
        # Price filter
        try:
            await page.click('input#price-under-100')
            await page.wait_for_load_state('networkidle')
        except:
            pass

        # Brand filter
        try:
            await page.click('input#brand-samsung')
            await page.wait_for_load_state('networkidle')
        except:
            pass

    async def load_all_products(self, page):
        """Load all products using infinite scroll"""
        previous_count = 0
        no_change_count = 0

        while no_change_count < 3:
            current_count = await page.locator('.product').count()

            if current_count == previous_count:
                no_change_count += 1
            else:
                no_change_count = 0
                previous_count = current_count

            await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')

            try:
                await page.wait_for_function(
                    f'document.querySelectorAll(".product").length > {current_count}',
                    timeout=5000
                )
            except:
                pass

        self.logger.info(f'Loaded {previous_count} total products')
Enter fullscreen mode Exit fullscreen mode

Summary

Advanced patterns covered:

  1. Smart infinite scroll detection
  2. Load more button handling
  3. Multi-step forms
  4. Popup management
  5. File downloads
  6. Shadow DOM access
  7. Canvas/chart data
  8. Network monitoring
  9. Session persistence
  10. Parallel contexts
  11. Custom headers
  12. Mobile emulation
  13. Error screenshots
  14. Early extraction
  15. SPA routing

Key takeaways:

  • Don't use arbitrary timeouts
  • Detect state changes programmatically
  • Handle errors gracefully with screenshots
  • Monitor network for hidden APIs
  • Use contexts for session management

Remember:

  • Always close pages in finally blocks
  • Use wait_for_function for dynamic conditions
  • Take screenshots when debugging
  • Monitor item counts, not time
  • Network monitoring often beats HTML parsing

These patterns handle 95% of complex scraping scenarios!

Happy scraping! 🕷️

Top comments (0)