After learning basic Playwright with Scrapy, I thought I could scrape any JavaScript site. Then I hit a site with infinite scroll that loaded data in batches. My simple scroll-and-wait approach loaded 20 items when there were 10,000.
I needed advanced patterns: detecting when new content loaded, handling dynamic URLs, managing browser state. Once I learned these patterns, complex sites became manageable.
Let me show you the advanced techniques that separate beginners from experts.
Pattern 1: Infinite Scroll with Smart Detection
Don't just scroll and hope. Detect when new content actually loads.
Problem with Basic Approach
# BAD: Scrolls but might miss content or wait too long
for i in range(100):
await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
await page.wait_for_timeout(2000) # Arbitrary wait
Issues:
- Fixed number of scrolls might not be enough
- Fixed timeout wastes time or misses content
- No way to know when done
Smart Infinite Scroll
async def parse(self, response):
page = response.meta['playwright_page']
previous_count = 0
no_change_count = 0
max_no_change = 3 # Stop after 3 scrolls with no new items
while no_change_count < max_no_change:
# Count current items
current_count = await page.locator('.product').count()
self.logger.info(f'Items loaded: {current_count}')
# Check if new items loaded
if current_count == previous_count:
no_change_count += 1
self.logger.warning(f'No new items ({no_change_count}/{max_no_change})')
else:
no_change_count = 0 # Reset counter
previous_count = current_count
# Scroll to bottom
await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
# Wait for potential new content
try:
# Wait for count to change or timeout
await page.wait_for_function(
f'document.querySelectorAll(".product").length > {current_count}',
timeout=5000
)
except:
# Timeout means no new items loaded
pass
self.logger.info(f'Finished scrolling. Total items: {previous_count}')
# Get final content
content = await page.content()
await page.close()
# Parse
from scrapy.http import HtmlResponse
final_response = HtmlResponse(url=response.url, body=content.encode())
for product in final_response.css('.product'):
yield {
'name': product.css('h2::text').get(),
'price': product.css('.price::text').get()
}
What this does better:
- Counts actual items, not arbitrary scrolls
- Detects when no new items load
- Uses
wait_for_functionto wait for specific condition - Stops automatically when done
Pattern 2: Load More Button with Dynamic Loading
Many sites have "Load More" buttons that trigger AJAX requests.
Naive Approach
# BAD: Clicks button without waiting properly
await page.click('button.load-more')
await page.wait_for_timeout(2000) # Hope it loaded
Smart Load More
async def parse(self, response):
page = response.meta['playwright_page']
while True:
# Get current item count
current_count = await page.locator('.product').count()
# Check if "Load More" button exists and is visible
load_more = page.locator('button.load-more')
if not await load_more.is_visible():
self.logger.info('No more "Load More" button')
break
# Click button
await load_more.click()
# Wait for new items to appear
try:
await page.wait_for_function(
f'document.querySelectorAll(".product").length > {current_count}',
timeout=10000
)
new_count = await page.locator('.product').count()
self.logger.info(f'Loaded {new_count - current_count} new items')
except Exception as e:
self.logger.error(f'Failed to load more items: {e}')
break
# Small delay to avoid overwhelming server
await page.wait_for_timeout(500)
content = await page.content()
await page.close()
# Parse all items
final_response = HtmlResponse(url=response.url, body=content.encode())
for product in final_response.css('.product'):
yield {'name': product.css('h2::text').get()}
Pattern 3: Multi-Step Forms and Workflows
Navigate through complex multi-step processes.
Example: Product Search with Filters
async def parse(self, response):
page = response.meta['playwright_page']
# Step 1: Search
await page.fill('input#search', 'laptop')
await page.click('button.search')
await page.wait_for_selector('.search-results')
# Step 2: Apply filters
await page.click('input#filter-brand-dell')
await page.click('input#filter-price-500-1000')
# Wait for results to update (AJAX)
await page.wait_for_function(
'document.querySelector(".loading-indicator") === null'
)
# Step 3: Sort results
await page.select_option('select#sort', 'price-low-high')
await page.wait_for_load_state('networkidle')
# Step 4: Extract data
content = await page.content()
await page.close()
# Parse
final_response = HtmlResponse(url=response.url, body=content.encode())
for product in final_response.css('.product'):
yield {
'name': product.css('h2::text').get(),
'price': product.css('.price::text').get(),
'brand': 'Dell',
'price_range': '500-1000'
}
Pattern 4: Handling Popups and Modals
Deal with annoying popups that block content.
Smart Popup Handler
async def parse(self, response):
page = response.meta['playwright_page']
# Define popup handler
async def close_popups():
# Common popup close selectors
close_selectors = [
'.popup-close',
'.modal-close',
'button.close',
'[aria-label="Close"]',
'.cookie-accept',
'#newsletter-close'
]
for selector in close_selectors:
try:
if await page.locator(selector).is_visible(timeout=1000):
await page.click(selector)
self.logger.info(f'Closed popup: {selector}')
await page.wait_for_timeout(500)
except:
pass # Popup not found, continue
# Close popups immediately
await close_popups()
# Wait for main content
await page.wait_for_selector('.product-list')
# Check for popups again (some appear on scroll)
await page.evaluate('window.scrollTo(0, 500)')
await close_popups()
# Now scrape
content = await page.content()
await page.close()
final_response = HtmlResponse(url=response.url, body=content.encode())
for product in final_response.css('.product'):
yield {'name': product.css('h2::text').get()}
Pattern 5: File Downloads
Download files (PDFs, images, etc.) using Playwright.
Download Handler
async def parse(self, response):
page = response.meta['playwright_page']
# Set up download handler
downloads = []
async def handle_download(download):
# Get suggested filename
filename = download.suggested_filename
# Save to specific path
path = f'/tmp/downloads/{filename}'
await download.save_as(path)
downloads.append({
'filename': filename,
'path': path,
'url': download.url
})
self.logger.info(f'Downloaded: {filename}')
# Listen for downloads
page.on('download', handle_download)
# Click download links
download_buttons = await page.locator('a.download').all()
for button in download_buttons:
await button.click()
await page.wait_for_timeout(1000) # Wait for download to start
# Wait a bit for all downloads to complete
await page.wait_for_timeout(3000)
await page.close()
# Yield download information
for download_info in downloads:
yield download_info
Pattern 6: Dealing with Shadow DOM
Access elements inside Shadow DOM (Web Components).
Shadow DOM Extraction
async def parse(self, response):
page = response.meta['playwright_page']
# Method 1: Using evaluate
shadow_content = await page.evaluate('''
() => {
const host = document.querySelector('my-custom-element');
if (!host || !host.shadowRoot) return null;
const data = host.shadowRoot.querySelector('.data');
return data ? data.textContent : null;
}
''')
self.logger.info(f'Shadow DOM data: {shadow_content}')
# Method 2: Using Playwright's piercing selectors
# (Works in newer versions)
try:
data = await page.locator('my-custom-element').locator('.data').text_content()
self.logger.info(f'Data from piercing: {data}')
except:
pass
await page.close()
yield {'shadow_data': shadow_content}
Pattern 7: Extracting Data from Canvas/Charts
Some sites render data in Canvas elements (charts, graphs).
Canvas Data Extraction
async def parse(self, response):
page = response.meta['playwright_page']
# Wait for chart to render
await page.wait_for_selector('canvas#chart')
# Extract data from JavaScript
chart_data = await page.evaluate('''
() => {
// Many chart libraries store data in global variables
if (window.chartData) {
return window.chartData;
}
// Or access through chart instance
if (window.myChart) {
return window.myChart.data;
}
return null;
}
''')
if chart_data:
for data_point in chart_data.get('datasets', [{}])[0].get('data', []):
yield {'value': data_point}
else:
# Fallback: take screenshot and use OCR (covered in another blog)
await page.screenshot(path='chart.png')
await page.close()
Pattern 8: Monitoring Network Requests
Intercept AJAX/API calls to extract data directly.
Network Request Monitoring
async def parse(self, response):
page = response.meta['playwright_page']
api_data = []
# Monitor network requests
async def handle_response(response):
# Look for API calls
if '/api/' in response.url and response.status == 200:
try:
# Get JSON response
data = await response.json()
api_data.append(data)
self.logger.info(f'Captured API call: {response.url}')
except:
pass
page.on('response', handle_response)
# Navigate or interact with page
await page.goto('https://example.com/products')
await page.wait_for_load_state('networkidle')
# Scroll to trigger more API calls
for i in range(5):
await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
await page.wait_for_timeout(2000)
await page.close()
# Extract data from captured API responses
for api_response in api_data:
for item in api_response.get('items', []):
yield {
'name': item.get('name'),
'price': item.get('price')
}
Pro tip: This is often better than parsing HTML!
Pattern 9: Session Persistence Across Requests
Maintain login sessions across multiple pages.
Context Reuse
class SessionSpider(scrapy.Spider):
name = 'session'
def start_requests(self):
# First request: login
yield scrapy.Request(
'https://example.com/login',
meta={
'playwright': True,
'playwright_include_page': True,
'playwright_context': 'persistent_context' # Use named context
},
callback=self.login
)
async def login(self, response):
page = response.meta['playwright_page']
# Login
await page.fill('input#username', 'myuser')
await page.fill('input#password', 'mypass')
await page.click('button.login')
await page.wait_for_selector('.dashboard')
self.logger.info('Login successful')
await page.close()
# Now scrape protected pages (using same context)
for i in range(1, 11):
yield scrapy.Request(
f'https://example.com/protected/page{i}',
meta={
'playwright': True,
'playwright_context': 'persistent_context' # Same context!
},
callback=self.parse_protected
)
def parse_protected(self, response):
# Session is maintained, no need to login again!
for item in response.css('.item'):
yield {'data': item.css('.data::text').get()}
Pattern 10: Parallel Browser Contexts
Run multiple browser contexts for speed.
Concurrent Contexts
# settings.py
PLAYWRIGHT_MAX_CONTEXTS = 5 # Run 5 browser contexts in parallel
def start_requests(self):
urls = [f'https://example.com/page{i}' for i in range(1, 101)]
for url in urls:
yield scrapy.Request(
url,
meta={'playwright': True},
callback=self.parse
)
async def parse(self, response):
page = response.meta['playwright_page']
# Each request gets its own context
# Up to PLAYWRIGHT_MAX_CONTEXTS run in parallel
await page.wait_for_selector('.product')
content = await page.content()
await page.close()
final_response = HtmlResponse(url=response.url, body=content.encode())
for product in final_response.css('.product'):
yield {'name': product.css('h2::text').get()}
Pattern 11: Custom User Agents and Headers
Set custom headers for each Playwright request.
Custom Headers
async def parse(self, response):
page = response.meta['playwright_page']
# Set extra headers
await page.set_extra_http_headers({
'User-Agent': 'MyCustomBot/1.0',
'Accept-Language': 'en-US,en;q=0.9',
'Referer': 'https://google.com'
})
# Now navigate
await page.goto('https://example.com')
content = await page.content()
await page.close()
# Parse...
Better approach (set in settings):
# settings.py
PLAYWRIGHT_LAUNCH_OPTIONS = {
'headless': True
}
PLAYWRIGHT_CONTEXTS = {
'default': {
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)...',
'viewport': {'width': 1920, 'height': 1080},
'locale': 'en-US'
}
}
Pattern 12: Mobile Device Emulation
Scrape mobile versions of sites.
Mobile Emulation
# settings.py
PLAYWRIGHT_CONTEXTS = {
'default': {
'viewport': {'width': 375, 'height': 667}, # iPhone SE size
'user_agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)...',
'is_mobile': True,
'has_touch': True
}
}
Or per-request:
async def parse(self, response):
page = response.meta['playwright_page']
# Emulate iPhone
await page.set_viewport_size({'width': 375, 'height': 667})
await page.goto('https://example.com')
# Mobile version loaded
content = await page.content()
await page.close()
Pattern 13: Screenshot on Error for Debugging
Automatically capture screenshots when things go wrong.
Error Screenshot Handler
async def parse(self, response):
page = response.meta['playwright_page']
try:
await page.wait_for_selector('.product', timeout=10000)
content = await page.content()
# Parse content
final_response = HtmlResponse(url=response.url, body=content.encode())
for product in final_response.css('.product'):
yield {'name': product.css('h2::text').get()}
except Exception as e:
# Error occurred, take screenshot
screenshot_path = f'errors/error_{response.url.split("/")[-1]}_{int(time.time())}.png'
await page.screenshot(path=screenshot_path, full_page=True)
self.logger.error(f'Error on {response.url}: {e}')
self.logger.error(f'Screenshot saved: {screenshot_path}')
finally:
await page.close()
Pattern 14: Extracting Data Before Full Page Load
Sometimes you don't need the full page. Extract data as soon as it appears.
Early Extraction
async def parse(self, response):
page = response.meta['playwright_page']
# Don't wait for full page load
await page.goto('https://example.com', wait_until='domcontentloaded')
# Or even earlier
# await page.goto('https://example.com', wait_until='commit')
# Extract specific element as soon as it appears
await page.wait_for_selector('.main-content')
# Don't wait for images, ads, etc
content = await page.content()
await page.close()
# Parse
final_response = HtmlResponse(url=response.url, body=content.encode())
for item in final_response.css('.item'):
yield {'data': item.css('.data::text').get()}
Speed improvement: Can be 2-5x faster for heavy pages!
Pattern 15: Handling Dynamic URLs (SPA Routing)
Single Page Apps change URL without page reload.
SPA Navigation
async def parse(self, response):
page = response.meta['playwright_page']
# Initial page
await page.goto('https://example.com')
# Click through different routes (no page reload)
routes = ['#/products', '#/about', '#/contact']
for route in routes:
# Click link or change URL directly
await page.evaluate(f'window.location.hash = "{route}"')
# Wait for content to change
await page.wait_for_load_state('networkidle')
# Extract data for this route
content = await page.content()
route_response = HtmlResponse(
url=f'https://example.com{route}',
body=content.encode()
)
# Parse route-specific content
for item in route_response.css('.route-content'):
yield {
'route': route,
'data': item.css('.data::text').get()
}
await page.close()
Complete Real-World Example: Amazon-Like Site
Putting it all together:
import scrapy
from scrapy.http import HtmlResponse
import time
class AdvancedEcommerceSpider(scrapy.Spider):
name = 'advanced_ecommerce'
custom_settings = {
'DOWNLOAD_HANDLERS': {
'http': 'scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler',
'https': 'scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler',
},
'PLAYWRIGHT_MAX_CONTEXTS': 3,
'CONCURRENT_REQUESTS': 3
}
def start_requests(self):
yield scrapy.Request(
'https://example.com/electronics',
meta={
'playwright': True,
'playwright_include_page': True
},
callback=self.parse
)
async def parse(self, response):
page = response.meta['playwright_page']
try:
# Close popups
await self.close_popups(page)
# Apply filters
await self.apply_filters(page)
# Load all products
await self.load_all_products(page)
# Extract product data
content = await page.content()
except Exception as e:
self.logger.error(f'Error: {e}')
await page.screenshot(path=f'error_{int(time.time())}.png')
finally:
await page.close()
# Parse
final_response = HtmlResponse(url=response.url, body=content.encode())
for product in final_response.css('.product'):
yield {
'name': product.css('h2::text').get(),
'price': product.css('.price::text').get(),
'rating': product.css('.rating::text').get(),
'reviews': product.css('.review-count::text').get()
}
async def close_popups(self, page):
"""Close any popups that might appear"""
close_selectors = [
'.popup-close',
'.cookie-accept',
'[aria-label="Close"]'
]
for selector in close_selectors:
try:
if await page.locator(selector).is_visible(timeout=2000):
await page.click(selector)
await page.wait_for_timeout(500)
except:
pass
async def apply_filters(self, page):
"""Apply product filters"""
# Price filter
try:
await page.click('input#price-under-100')
await page.wait_for_load_state('networkidle')
except:
pass
# Brand filter
try:
await page.click('input#brand-samsung')
await page.wait_for_load_state('networkidle')
except:
pass
async def load_all_products(self, page):
"""Load all products using infinite scroll"""
previous_count = 0
no_change_count = 0
while no_change_count < 3:
current_count = await page.locator('.product').count()
if current_count == previous_count:
no_change_count += 1
else:
no_change_count = 0
previous_count = current_count
await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
try:
await page.wait_for_function(
f'document.querySelectorAll(".product").length > {current_count}',
timeout=5000
)
except:
pass
self.logger.info(f'Loaded {previous_count} total products')
Summary
Advanced patterns covered:
- Smart infinite scroll detection
- Load more button handling
- Multi-step forms
- Popup management
- File downloads
- Shadow DOM access
- Canvas/chart data
- Network monitoring
- Session persistence
- Parallel contexts
- Custom headers
- Mobile emulation
- Error screenshots
- Early extraction
- SPA routing
Key takeaways:
- Don't use arbitrary timeouts
- Detect state changes programmatically
- Handle errors gracefully with screenshots
- Monitor network for hidden APIs
- Use contexts for session management
Remember:
- Always close pages in finally blocks
- Use wait_for_function for dynamic conditions
- Take screenshots when debugging
- Monitor item counts, not time
- Network monitoring often beats HTML parsing
These patterns handle 95% of complex scraping scenarios!
Happy scraping! 🕷️
Top comments (0)