DEV Community

Cover image for Automating SEO with Python and Scraping API
Valentina Skakun for HasData

Posted on

Automating SEO with Python and Scraping API

Software as a Service costs for SEO can hit hundreds of dollars every month. Most of these platforms just wrap search data with a basic UI. You can build the same logic using Python and a scraping API. This approach gives you total control over the data. You skip the marketing and just get the insights.

Table of Contents

Code Snippets and Resources

Full scripts and detailed data outputs would make this post too long to read. I am sharing the core logic and functional snippets of 4 main scripts here to show the engineering behind the tools. If you want all scripts and deep dives into the outputs you can find the full guide on the HasData Blog.

SEO with Python

Build Instead of Buy

Search engines require JavaScript rendering and complex proxy rotation. Writing a DIY scraper is a maintenance nightmare. A better way is just using a dedicated scraping API to handle the browser headers and CAPTCHAs. This lets you focus on the data logic. You get structured JSON instead of HTML. You pay for the data you use rather than a flat monthly fee for features you might not even need.

Recursive Keyword Discovery

Most keyword tools rely on old databases. You can find real time intent by scraping Google Autosuggest recursively. The logic uses a thread pool to append characters to a seed keyword. This forces the API to reveal long tail queries.

import requests
import xml.etree.ElementTree as ET
import string
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import quote
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

# --- CONFIGURATION ---
API_KEY = "HASDATA_API_KEY"
BASE_KEYWORD = "coffee"
MAX_DEPTH = 2  # Depth of recursion (e.g., 1 = 'coffee a', 2 = 'coffee ab')

# Set this according to your HasData plan's concurrency limit.
# For paid plans, this is 15–1500. For the trial plan, keep it at 1.
MAX_WORKERS = 1 

# --- SESSION SETUP (PERFORMANCE OPTIMIZATION) ---
# We use a global Session object to enable TCP connection reuse (Keep-Alive).
# This significantly reduces the overhead of establishing new SSL handshakes for every request.
session = requests.Session()

# Configure the HTTP Adapter with a connection pool.
# pool_maxsize must be greater than or equal to MAX_WORKERS to prevent blocking.
adapter = HTTPAdapter(
    pool_connections=1, 
    pool_maxsize=MAX_WORKERS + 5,  # Adding a small buffer
    max_retries=Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
)

# Mount the adapter to both HTTP and HTTPS protocols
session.mount("https://", adapter)
session.mount("http://", adapter)

def fetch_suggestions(query):
    """
    Sends a request to Google Suggest via HasData API using a rotating proxy.
    """
    # 1. URL Encode the query to handle spaces and special characters correctly
    # 'coffee ad' becomes 'coffee%20ad'
    encoded_query = quote(query)
    target_url = f"https://suggestqueries.google.com/complete/search?output=toolbar&hl=en&q={encoded_query}"

    # HasData requires the API key in the headers
    headers = {
        "x-api-key": API_KEY,
        "Content-Type": "application/json"
    }

    # payload configuration
    payload = {
        "url": target_url,
        "jsRendering": False,      # JS is not needed for the XML endpoint, saves credits/time
        "outputFormat": ["html"]   # HasData will return the raw response body
    }

    try:
        # IMPORTANT: Use session.post() instead of requests.post() to leverage the connection pool
        response = session.post(
            "https://api.hasdata.com/scrape/web",
            headers=headers,
            json=payload,
            timeout=30
        )

        if response.status_code != 200:
            # Log errors for debugging
            print(f"Error {response.status_code} for '{query}': {response.text}")
            return []

        # 2. Parse the Response
        # Google Suggest returns XML. HasData passes this raw content in the response body.
        xml_content = response.content
        root = ET.fromstring(xml_content)

        suggestions = []
        for child in root:
            if len(child) > 0 and 'data' in child[0].attrib:
                suggestions.append(child[0].attrib['data'])

        return suggestions

    except ET.ParseError:
        return [] # Ignore parsing errors (e.g., malformed XML)
    except Exception as e:
        print(f"Exception for '{query}': {e}")
        return []

def generate_search_terms_suffix(base, current_suffix, depth, max_depth):
    """
    Recursively generates search terms by appending characters to a suffix.
    Example: 
    Depth 1: coffee a ... coffee z
    Depth 2: coffee aa ... coffee zz
    """
    terms = []
    for char in string.ascii_lowercase:
        new_suffix = current_suffix + char
        term = f"{base} {new_suffix}"
        terms.append(term)

        # Recursive call to go deeper if max_depth is not reached
        if depth < max_depth:
            terms.extend(generate_search_terms_suffix(base, new_suffix, depth + 1, max_depth))

    return terms

def run_harvest():
    print(f"Generating queries for Depth {MAX_DEPTH}...")
    queries = generate_search_terms_suffix(BASE_KEYWORD, "", 1, MAX_DEPTH)
    print(f"Total queries to process: {len(queries)}")

    results = set()
    start_time_global = time.time()

    # ThreadPoolExecutor manages concurrent execution
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_query = {executor.submit(fetch_suggestions, q): q for q in queries}

        count = 0
        total = len(queries)

        print(f"\nStarting harvest with {MAX_WORKERS} workers...")

        for future in as_completed(future_to_query):
            query = future_to_query[future]
            count += 1

            # Progress logging every 20 requests to monitor speed
            if count % 20 == 0:
                elapsed = time.time() - start_time_global
                rps = count / elapsed if elapsed > 0 else 0
                print(f"Progress: {count}/{total} | Speed: {rps:.2f} req/s")

            try:
                suggestions = future.result()
                if suggestions:
                    for s in suggestions:
                        results.add(s)
            except Exception:
                pass # Skip errors during mass scraping

    total_time = time.time() - start_time_global
    print(f"\nFinished in {total_time:.2f} seconds. Average Speed: {len(queries)/total_time:.2f} req/s")
    return results

if __name__ == "__main__":
    unique_keywords = run_harvest()

    filename = "long_tail_keywords_hasdata.csv"

    # Save results to CSV
    with open(filename, "w", encoding="utf-8") as f:
        f.write("Keyword\n")
        f.write("\n".join(unique_keywords))

    print(f"Done. Collected {len(unique_keywords)} unique keywords.")
    print(f"Saved to {filename}")
Enter fullscreen mode Exit fullscreen mode

This method collects thousands of unique keywords in seconds. You are getting the data directly from the source.

Semantic Intent Mapping

Search intent tells you if a user wants to buy something or just learn. You can automate this by analyzing the top ten URLs for any query. Using regex to scan for paths like /shop/ or /blog/ provides a breakdown of the SERP composition.

import requests
import pandas as pd
from urllib.parse import urlparse

# Configuration
API_KEY = "HASDATA_API_KEY"
KEYWORD = "instant coffee"

def get_serp_links(query):
    url = "https://api.hasdata.com/scrape/google/serp"
    params = {
        "q": query,
        "gl": "us",
        "hl": "en",
        "deviceType": "desktop"
    }
    headers = {"x-api-key": API_KEY}

    try:
        response = requests.get(url, params=params, headers=headers, timeout=20)
        if response.status_code == 200:
            data = response.json()
            # Extract organic links
            return [result['link'] for result in data.get('organicResults', [])]
        return []
    except Exception as e:
        print(f"Error: {e}")
        return []

def classify_url(url):
    """
    Classifies a URL based on domain reputation, path keywords, and structure.
    """
    try:
        parsed = urlparse(url)
        domain = parsed.netloc.lower()
        path = parsed.path.lower()

        # 1. Specialized Platforms (Domain-based)
        if "wikipedia.org" in domain:
            return "Encyclopedic (Wikipedia)"

        if "youtube.com" in domain or "youtu.be" in domain:
            return "Video Content (YouTube)"

        if any(x in domain for x in ['reddit.com', 'quora.com', 'stackoverflow.com']):
            return "UGC (Forum/Discussion)"

        # 2. General Forum Detection (Path-based)
        if any(x in path for x in ['/forum', '/threads', '/community', '/board']):
            return "UGC (Forum/Discussion)"


        # 3. Homepage Detection
        if path == "" or path == "/":
            return "Homepage (Brand)"

        # 4. Transactional Keywords
        if any(x in path for x in ['/product', '/shop', '/item', '/collections', '/buy', '/pricing', '/store']) or "amazon.com" in domain:
            return "Transactional (Product)"

        # 5. Informational Keywords
        if any(x in path for x in ['/blog', '/guide', '/news', '/article', '/how-to', '/tips', '/wiki']):
            return "Informational (Blog)"

        # 6. Fallback (General Page)
        return "General Page"
    except:
        return "Unknown"

if __name__ == "__main__":
    print(f"Analyzing SERP for: '{KEYWORD}'...\n")
    links = get_serp_links(KEYWORD)

    if links:
        # Create a DataFrame for clean visualization
        df = pd.DataFrame(links, columns=['URL'])
        df['Type'] = df['URL'].apply(classify_url)

        # Calculate percentages
        breakdown = df['Type'].value_counts(normalize=True) * 100

        # Display the Data
        print("--- SERP Composition ---")
        print(df[['Type', 'URL']].to_string(index=True))

        print("\n--- Strategic Verdict ---")
        # Handle cases where multiple types might have the same top percentage
        if not breakdown.empty:
            dominant_type = breakdown.idxmax()
            percent = breakdown.max()

            print(f"Dominant Type: {dominant_type} ({percent:.1f}%)")

            if "Informational" in dominant_type:
                print("Action: Create a long-form Guide or Blog Post.")
            elif "Transactional" in dominant_type:
                print("Action: Create a Product Page or Collection Page.")
            elif "Homepage" in dominant_type:
                print("Action: High difficulty. Requires strong Brand Authority.")
            elif "Video" in dominant_type:
                print("Action: Text alone won't rank. Produce a high-quality Video.")
            elif "UGC" in dominant_type:
                print("Action: Engage in community discussions or create 'Real Review' style content.")
            elif "Encyclopedic" in dominant_type:
                print("Action: Definitional intent. Very hard to outrank Wikipedia directly.")
            else:
                print("Action: Mixed SERP. Manual review recommended.")
        else:
            print("Action: No valid classification data available.")
    else:
        print("No results found.")
Enter fullscreen mode Exit fullscreen mode

If sixty percent of the results are products you know you need a landing page. This removes the guesswork from content strategy.

SERP Similarity Heatmaps

Targeting the same topic with two pages causes cannibalization. You can solve this by comparing the Jaccard Index of search results. If two keywords share many of the same URLs they belong on the same page.

import requests
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from collections import Counter
import time

# --- CONFIGURATION ---
API_KEY = "HASDATA_API_KEY"
KEYWORDS = [
    "health benefits of coffee",
    "benefits of coffee for men",
    "benefits of coffee for women",
    "health benefits of black coffee",
    "health benefits of mushroom coffee",
    "health benefits of decaf coffee",
]
# ---------------------

def get_top_urls_set(query):
    """
    Requests the API and returns a SET of organic URLs.
    Using a set is convenient for mathematical intersection operations.
    """
    url = "https://api.hasdata.com/scrape/google/serp"
    params = {
        "q": query,
        "gl": "us",
        "hl": "en",
        "deviceType": "desktop"
      }
    headers = {"x-api-key": API_KEY}

    print(f"Scanning SERP for: '{query}'...")
    try:
        response = requests.get(url, params=params, headers=headers, timeout=25)
        if response.status_code == 200:
            data = response.json()
            organic = data.get('organicResults', [])
            # Extract links only. Taking the first 10 if more are returned.
            links = [result['link'] for result in organic[:10] if 'link' in result]
            return set(links)
        else:
            print(f"Error {response.status_code} for query '{query}'")
            return set()
    except Exception as e:
        print(f"Exception for query '{query}': {e}")
        return set()

def calculate_jaccard(set1, set2):
    """Calculates Jaccard Index between two sets."""
    intersection = len(set1.intersection(set2))
    union = len(set1.union(set2))
    if union == 0:
        return 0.0
    return intersection / union

def visualize_similarity_heatmap(similarity_df):
    """Builds and displays the heatmap."""
    plt.figure(figsize=(10, 8))
    sns.set_theme(context='notebook', style='whitegrid', font_scale=1.1)

    # Create heatmap
    # annot=True shows numbers in cells
    # fmt=".0%" formats numbers as percentages
    # cmap="YlGnBu" - color scheme (Yellow to Blue)
    ax = sns.heatmap(
        similarity_df, 
        annot=True, 
        fmt=".0%", 
        cmap="YlGnBu", 
        vmin=0, 
        vmax=1,
        cbar_kws={'label': 'Jaccard Similarity Score'}
    )

    plt.title(f"SERP Similarity Heatmap", fontsize=16, pad=20)
    plt.xticks(rotation=45, ha='right')
    plt.yticks(rotation=0)
    plt.tight_layout()
    print("Visualizing heatmap...")
    plt.show()

def show_top_common_urls(results_data):
    """Counts and prints the most frequently occurring URLs."""
    all_urls_flat = []
    # Collect all URLs from all sets into one list
    for url_set in results_data.values():
        all_urls_flat.extend(list(url_set))

    # Count frequency
    url_counts = Counter(all_urls_flat).most_common(15)

    print(f"\n### Top Recurring URLs (across {len(KEYWORDS)} queries)")
    print(f"{'Frequency':<10} | URL")
    print("-" * 80)
    for url, count in url_counts:
        # Show only if it appears more than once, unless we have very few queries
        if count > 1 or len(KEYWORDS) <= 2: 
             print(f"{count:<10} | {url}")      

# --- MAIN LOGIC ---
if __name__ == "__main__":
    # 1. Data Collection
    results_data = {}
    print("--- Start SERP Collection ---\n")
    for keyword in KEYWORDS:
        results_data[keyword] = get_top_urls_set(keyword)
        # Short pause between requests
        time.sleep(1) 
    print("\n--- Collection Finished ---")

    # 2. Calculation of Similarity Matrix (Jaccard)
    n = len(KEYWORDS)
    similarity_matrix = pd.DataFrame(
        index=KEYWORDS, 
        columns=KEYWORDS, 
        dtype=float
    )

    # Double loop to compare every query against every other query
    for i in range(n):
        for j in range(n):
            kw1 = KEYWORDS[i]
            kw2 = KEYWORDS[j]
            score = calculate_jaccard(results_data[kw1], results_data[kw2])
            similarity_matrix.iloc[i, j] = score

    # 3. Output Results to Console (Text Table)
    print("\n### Similarity Matrix (Jaccard Index)")
    pd.set_option('display.max_columns', None)
    pd.set_option('display.width', 1000)
    # Format output as percentages
    print(similarity_matrix.style.format("{:.1%}").to_string())
    print("\n" + "="*50 + "\n")

    # 4. Common URL Analysis
    show_top_common_urls(results_data)

    # 5. Visualization (Heatmap)
    visualize_similarity_heatmap(similarity_matrix)
Enter fullscreen mode Exit fullscreen mode

Using Seaborn to plot these scores as a heatmap makes the clusters obvious.

Similarity Heatmap

You see exactly which topics need their own articles and which should be merged.

AI Overview Visibility Tracking

Standard rank trackers often miss AI Overviews. These summaries push organic results down the page. You can monitor this by parsing the aiOverview object in your API response. This script checks if your domain appears in the AI citations.

import requests
import pandas as pd
import time
from urllib.parse import urlparse

# --- CONFIGURATION ---
API_KEY = "HASDATA_API_KEY"
TARGET_DOMAIN = "webmd.com" # The domain we are monitoring for citations

KEYWORDS = [
    "health benefits of coffee",
    "benefits of coffee for men",
    "benefits of coffee for women",
    "health benefits of black coffee",
    "health benefits of mushroom coffee",
    "health benefits of decaf coffee"
]

def normalize_domain(url):
    """
    Extracts the base domain from a URL to ensure accurate matching.
    e.g., 'https://www.sub.example.com/page' -> 'sub.example.com'
    """
    try:
        parsed = urlparse(url)
        # Returns netloc (e.g., www.example.com). We strip 'www.' for broader matching.
        return parsed.netloc.lower().replace('www.', '')
    except Exception:
        return ""

def fetch_serp_data(query):
    """
    Executes a SERP API request targeting Google's AI Overview.
    Note: AI Overviews are volatile; successful triggering depends on
    location (US) and device type (Desktop/Mobile).
    """
    endpoint = "https://api.hasdata.com/scrape/google/serp"
    params = {
        "q": query,
        "gl": "us",       # Geo-location: US is critical for SGE consistency
        "hl": "en",       # Language: English
        "deviceType": "desktop"
    }
    headers = {"x-api-key": API_KEY}

    try:
        response = requests.get(endpoint, params=params, headers=headers, timeout=30)
        if response.status_code == 200:
            return response.json()
        else:
            print(f"[API Error] Status: {response.status_code} for query '{query}'")
            return None
    except Exception as e:
        print(f"[Network Error] {e}")
        return None

def extract_ai_citations(serp_data):
    """
    Parses the JSON response to locate the 'aiOverview' block.
    Extracts structured references (citations) if present.
    """
    # HasData typically returns SGE data in the 'aiOverview' key
    ai_overview = serp_data.get('aiOverview')

    if not ai_overview:
        return None, [] # AI Overview not triggered for this query

    # Extract the 'references' array which contains the source links
    references = ai_overview.get('references', [])

    # Map into a cleaner format for analysis
    citations = []
    for ref in references:
        citations.append({
            'index': ref.get('index'),
            'title': ref.get('title'),
            'url': ref.get('link'),
            'source_name': ref.get('source')
        })

    return ai_overview, citations

def run_monitor():
    results = []
    print(f"--- Starting AI Overview Monitor for: {TARGET_DOMAIN} ---")
    print(f"Processing {len(KEYWORDS)} keywords...\n")

    for query in KEYWORDS:
        print(f"Analyzing: '{query}'...")

        data = fetch_serp_data(query)

        if not data:
            continue

        # 1. Check for AI Overview existence
        ai_block, citations = extract_ai_citations(data)

        is_triggered = ai_block is not None
        is_cited = False
        citation_rank = None
        found_url = None

        # 2. Analyze Citations if AI Overview exists
        if is_triggered:
            target_clean = normalize_domain(f"https://{TARGET_DOMAIN}")

            for cit in citations:
                cited_domain = normalize_domain(cit['url'])

                # Check for substring match (handles subdomains and exact matches)
                if target_clean in cited_domain:
                    is_cited = True
                    citation_rank = cit['index'] # 0-based index in the citation carousel
                    found_url = cit['url']
                    break # Stop after finding the first occurrence

        # 3. Aggregate Data
        results.append({
            "Keyword": query,
            "AI Triggered": is_triggered,
            "Is Cited": is_cited,
            "Citation Index": citation_rank if is_cited else "-",
            "Cited URL": found_url if is_cited else "-"
        })

        # Respect API rate limits
        time.sleep(1)

    # --- REPORTING ---
    df = pd.DataFrame(results)

    print("\n" + "="*60)
    print("AI OVERVIEW VISIBILITY REPORT")
    print("="*60)

    # formatting booleans for readability
    df['AI Triggered'] = df['AI Triggered'].map({True: '✅ Yes', False: '❌ No'})
    df['Is Cited'] = df['Is Cited'].map({True: '✅ YES', False: '❌ No'})

    # Handle display if dataframe is empty
    if not df.empty:
        # Use to_markdown if available, otherwise string
        try:
            print(df.to_markdown(index=False))
        except ImportError:
            print(df.to_string(index=False))

        # Summary Metrics
        total = len(df)
        triggered = len(df[df['AI Triggered'] == '✅ Yes'])
        cited = len(df[df['Is Cited'] == '✅ YES'])

        print("\n--- SUMMARY METRICS ---")
        print(f"AI Coverage: {triggered}/{total} keywords ({(triggered/total)*100:.1f}%)")
        if triggered > 0:
            print(f"Share of Voice: {cited}/{triggered} AI Overviews ({(cited/triggered)*100:.1f}%)")
        else:
            print("Share of Voice: N/A (No AI Overviews generated)")

    else:
        print("No data collected.")

if __name__ == "__main__":
    run_monitor()
Enter fullscreen mode Exit fullscreen mode

Tracking this allows you to see if you are losing traffic to AI even when your organic rankings stay high.

Conclusion

The repository at HasData Python for SEO contains the full implementation of these features.

Top comments (0)