Software as a Service costs for SEO can hit hundreds of dollars every month. Most of these platforms just wrap search data with a basic UI. You can build the same logic using Python and a scraping API. This approach gives you total control over the data. You skip the marketing and just get the insights.
Table of Contents
- Build Instead of Buy
- Recursive Keyword Discovery
- Semantic Intent Mapping
- SERP Similarity Heatmaps
- AI Overview Visibility Tracking
- Conclusion
Code Snippets and Resources
Full scripts and detailed data outputs would make this post too long to read. I am sharing the core logic and functional snippets of 4 main scripts here to show the engineering behind the tools. If you want all scripts and deep dives into the outputs you can find the full guide on the HasData Blog.
Build Instead of Buy
Search engines require JavaScript rendering and complex proxy rotation. Writing a DIY scraper is a maintenance nightmare. A better way is just using a dedicated scraping API to handle the browser headers and CAPTCHAs. This lets you focus on the data logic. You get structured JSON instead of HTML. You pay for the data you use rather than a flat monthly fee for features you might not even need.
Recursive Keyword Discovery
Most keyword tools rely on old databases. You can find real time intent by scraping Google Autosuggest recursively. The logic uses a thread pool to append characters to a seed keyword. This forces the API to reveal long tail queries.
import requests
import xml.etree.ElementTree as ET
import string
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import quote
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
# --- CONFIGURATION ---
API_KEY = "HASDATA_API_KEY"
BASE_KEYWORD = "coffee"
MAX_DEPTH = 2 # Depth of recursion (e.g., 1 = 'coffee a', 2 = 'coffee ab')
# Set this according to your HasData plan's concurrency limit.
# For paid plans, this is 15–1500. For the trial plan, keep it at 1.
MAX_WORKERS = 1
# --- SESSION SETUP (PERFORMANCE OPTIMIZATION) ---
# We use a global Session object to enable TCP connection reuse (Keep-Alive).
# This significantly reduces the overhead of establishing new SSL handshakes for every request.
session = requests.Session()
# Configure the HTTP Adapter with a connection pool.
# pool_maxsize must be greater than or equal to MAX_WORKERS to prevent blocking.
adapter = HTTPAdapter(
pool_connections=1,
pool_maxsize=MAX_WORKERS + 5, # Adding a small buffer
max_retries=Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
)
# Mount the adapter to both HTTP and HTTPS protocols
session.mount("https://", adapter)
session.mount("http://", adapter)
def fetch_suggestions(query):
"""
Sends a request to Google Suggest via HasData API using a rotating proxy.
"""
# 1. URL Encode the query to handle spaces and special characters correctly
# 'coffee ad' becomes 'coffee%20ad'
encoded_query = quote(query)
target_url = f"https://suggestqueries.google.com/complete/search?output=toolbar&hl=en&q={encoded_query}"
# HasData requires the API key in the headers
headers = {
"x-api-key": API_KEY,
"Content-Type": "application/json"
}
# payload configuration
payload = {
"url": target_url,
"jsRendering": False, # JS is not needed for the XML endpoint, saves credits/time
"outputFormat": ["html"] # HasData will return the raw response body
}
try:
# IMPORTANT: Use session.post() instead of requests.post() to leverage the connection pool
response = session.post(
"https://api.hasdata.com/scrape/web",
headers=headers,
json=payload,
timeout=30
)
if response.status_code != 200:
# Log errors for debugging
print(f"Error {response.status_code} for '{query}': {response.text}")
return []
# 2. Parse the Response
# Google Suggest returns XML. HasData passes this raw content in the response body.
xml_content = response.content
root = ET.fromstring(xml_content)
suggestions = []
for child in root:
if len(child) > 0 and 'data' in child[0].attrib:
suggestions.append(child[0].attrib['data'])
return suggestions
except ET.ParseError:
return [] # Ignore parsing errors (e.g., malformed XML)
except Exception as e:
print(f"Exception for '{query}': {e}")
return []
def generate_search_terms_suffix(base, current_suffix, depth, max_depth):
"""
Recursively generates search terms by appending characters to a suffix.
Example:
Depth 1: coffee a ... coffee z
Depth 2: coffee aa ... coffee zz
"""
terms = []
for char in string.ascii_lowercase:
new_suffix = current_suffix + char
term = f"{base} {new_suffix}"
terms.append(term)
# Recursive call to go deeper if max_depth is not reached
if depth < max_depth:
terms.extend(generate_search_terms_suffix(base, new_suffix, depth + 1, max_depth))
return terms
def run_harvest():
print(f"Generating queries for Depth {MAX_DEPTH}...")
queries = generate_search_terms_suffix(BASE_KEYWORD, "", 1, MAX_DEPTH)
print(f"Total queries to process: {len(queries)}")
results = set()
start_time_global = time.time()
# ThreadPoolExecutor manages concurrent execution
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_query = {executor.submit(fetch_suggestions, q): q for q in queries}
count = 0
total = len(queries)
print(f"\nStarting harvest with {MAX_WORKERS} workers...")
for future in as_completed(future_to_query):
query = future_to_query[future]
count += 1
# Progress logging every 20 requests to monitor speed
if count % 20 == 0:
elapsed = time.time() - start_time_global
rps = count / elapsed if elapsed > 0 else 0
print(f"Progress: {count}/{total} | Speed: {rps:.2f} req/s")
try:
suggestions = future.result()
if suggestions:
for s in suggestions:
results.add(s)
except Exception:
pass # Skip errors during mass scraping
total_time = time.time() - start_time_global
print(f"\nFinished in {total_time:.2f} seconds. Average Speed: {len(queries)/total_time:.2f} req/s")
return results
if __name__ == "__main__":
unique_keywords = run_harvest()
filename = "long_tail_keywords_hasdata.csv"
# Save results to CSV
with open(filename, "w", encoding="utf-8") as f:
f.write("Keyword\n")
f.write("\n".join(unique_keywords))
print(f"Done. Collected {len(unique_keywords)} unique keywords.")
print(f"Saved to {filename}")
This method collects thousands of unique keywords in seconds. You are getting the data directly from the source.
Semantic Intent Mapping
Search intent tells you if a user wants to buy something or just learn. You can automate this by analyzing the top ten URLs for any query. Using regex to scan for paths like /shop/ or /blog/ provides a breakdown of the SERP composition.
import requests
import pandas as pd
from urllib.parse import urlparse
# Configuration
API_KEY = "HASDATA_API_KEY"
KEYWORD = "instant coffee"
def get_serp_links(query):
url = "https://api.hasdata.com/scrape/google/serp"
params = {
"q": query,
"gl": "us",
"hl": "en",
"deviceType": "desktop"
}
headers = {"x-api-key": API_KEY}
try:
response = requests.get(url, params=params, headers=headers, timeout=20)
if response.status_code == 200:
data = response.json()
# Extract organic links
return [result['link'] for result in data.get('organicResults', [])]
return []
except Exception as e:
print(f"Error: {e}")
return []
def classify_url(url):
"""
Classifies a URL based on domain reputation, path keywords, and structure.
"""
try:
parsed = urlparse(url)
domain = parsed.netloc.lower()
path = parsed.path.lower()
# 1. Specialized Platforms (Domain-based)
if "wikipedia.org" in domain:
return "Encyclopedic (Wikipedia)"
if "youtube.com" in domain or "youtu.be" in domain:
return "Video Content (YouTube)"
if any(x in domain for x in ['reddit.com', 'quora.com', 'stackoverflow.com']):
return "UGC (Forum/Discussion)"
# 2. General Forum Detection (Path-based)
if any(x in path for x in ['/forum', '/threads', '/community', '/board']):
return "UGC (Forum/Discussion)"
# 3. Homepage Detection
if path == "" or path == "/":
return "Homepage (Brand)"
# 4. Transactional Keywords
if any(x in path for x in ['/product', '/shop', '/item', '/collections', '/buy', '/pricing', '/store']) or "amazon.com" in domain:
return "Transactional (Product)"
# 5. Informational Keywords
if any(x in path for x in ['/blog', '/guide', '/news', '/article', '/how-to', '/tips', '/wiki']):
return "Informational (Blog)"
# 6. Fallback (General Page)
return "General Page"
except:
return "Unknown"
if __name__ == "__main__":
print(f"Analyzing SERP for: '{KEYWORD}'...\n")
links = get_serp_links(KEYWORD)
if links:
# Create a DataFrame for clean visualization
df = pd.DataFrame(links, columns=['URL'])
df['Type'] = df['URL'].apply(classify_url)
# Calculate percentages
breakdown = df['Type'].value_counts(normalize=True) * 100
# Display the Data
print("--- SERP Composition ---")
print(df[['Type', 'URL']].to_string(index=True))
print("\n--- Strategic Verdict ---")
# Handle cases where multiple types might have the same top percentage
if not breakdown.empty:
dominant_type = breakdown.idxmax()
percent = breakdown.max()
print(f"Dominant Type: {dominant_type} ({percent:.1f}%)")
if "Informational" in dominant_type:
print("Action: Create a long-form Guide or Blog Post.")
elif "Transactional" in dominant_type:
print("Action: Create a Product Page or Collection Page.")
elif "Homepage" in dominant_type:
print("Action: High difficulty. Requires strong Brand Authority.")
elif "Video" in dominant_type:
print("Action: Text alone won't rank. Produce a high-quality Video.")
elif "UGC" in dominant_type:
print("Action: Engage in community discussions or create 'Real Review' style content.")
elif "Encyclopedic" in dominant_type:
print("Action: Definitional intent. Very hard to outrank Wikipedia directly.")
else:
print("Action: Mixed SERP. Manual review recommended.")
else:
print("Action: No valid classification data available.")
else:
print("No results found.")
If sixty percent of the results are products you know you need a landing page. This removes the guesswork from content strategy.
SERP Similarity Heatmaps
Targeting the same topic with two pages causes cannibalization. You can solve this by comparing the Jaccard Index of search results. If two keywords share many of the same URLs they belong on the same page.
import requests
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from collections import Counter
import time
# --- CONFIGURATION ---
API_KEY = "HASDATA_API_KEY"
KEYWORDS = [
"health benefits of coffee",
"benefits of coffee for men",
"benefits of coffee for women",
"health benefits of black coffee",
"health benefits of mushroom coffee",
"health benefits of decaf coffee",
]
# ---------------------
def get_top_urls_set(query):
"""
Requests the API and returns a SET of organic URLs.
Using a set is convenient for mathematical intersection operations.
"""
url = "https://api.hasdata.com/scrape/google/serp"
params = {
"q": query,
"gl": "us",
"hl": "en",
"deviceType": "desktop"
}
headers = {"x-api-key": API_KEY}
print(f"Scanning SERP for: '{query}'...")
try:
response = requests.get(url, params=params, headers=headers, timeout=25)
if response.status_code == 200:
data = response.json()
organic = data.get('organicResults', [])
# Extract links only. Taking the first 10 if more are returned.
links = [result['link'] for result in organic[:10] if 'link' in result]
return set(links)
else:
print(f"Error {response.status_code} for query '{query}'")
return set()
except Exception as e:
print(f"Exception for query '{query}': {e}")
return set()
def calculate_jaccard(set1, set2):
"""Calculates Jaccard Index between two sets."""
intersection = len(set1.intersection(set2))
union = len(set1.union(set2))
if union == 0:
return 0.0
return intersection / union
def visualize_similarity_heatmap(similarity_df):
"""Builds and displays the heatmap."""
plt.figure(figsize=(10, 8))
sns.set_theme(context='notebook', style='whitegrid', font_scale=1.1)
# Create heatmap
# annot=True shows numbers in cells
# fmt=".0%" formats numbers as percentages
# cmap="YlGnBu" - color scheme (Yellow to Blue)
ax = sns.heatmap(
similarity_df,
annot=True,
fmt=".0%",
cmap="YlGnBu",
vmin=0,
vmax=1,
cbar_kws={'label': 'Jaccard Similarity Score'}
)
plt.title(f"SERP Similarity Heatmap", fontsize=16, pad=20)
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
plt.tight_layout()
print("Visualizing heatmap...")
plt.show()
def show_top_common_urls(results_data):
"""Counts and prints the most frequently occurring URLs."""
all_urls_flat = []
# Collect all URLs from all sets into one list
for url_set in results_data.values():
all_urls_flat.extend(list(url_set))
# Count frequency
url_counts = Counter(all_urls_flat).most_common(15)
print(f"\n### Top Recurring URLs (across {len(KEYWORDS)} queries)")
print(f"{'Frequency':<10} | URL")
print("-" * 80)
for url, count in url_counts:
# Show only if it appears more than once, unless we have very few queries
if count > 1 or len(KEYWORDS) <= 2:
print(f"{count:<10} | {url}")
# --- MAIN LOGIC ---
if __name__ == "__main__":
# 1. Data Collection
results_data = {}
print("--- Start SERP Collection ---\n")
for keyword in KEYWORDS:
results_data[keyword] = get_top_urls_set(keyword)
# Short pause between requests
time.sleep(1)
print("\n--- Collection Finished ---")
# 2. Calculation of Similarity Matrix (Jaccard)
n = len(KEYWORDS)
similarity_matrix = pd.DataFrame(
index=KEYWORDS,
columns=KEYWORDS,
dtype=float
)
# Double loop to compare every query against every other query
for i in range(n):
for j in range(n):
kw1 = KEYWORDS[i]
kw2 = KEYWORDS[j]
score = calculate_jaccard(results_data[kw1], results_data[kw2])
similarity_matrix.iloc[i, j] = score
# 3. Output Results to Console (Text Table)
print("\n### Similarity Matrix (Jaccard Index)")
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)
# Format output as percentages
print(similarity_matrix.style.format("{:.1%}").to_string())
print("\n" + "="*50 + "\n")
# 4. Common URL Analysis
show_top_common_urls(results_data)
# 5. Visualization (Heatmap)
visualize_similarity_heatmap(similarity_matrix)
Using Seaborn to plot these scores as a heatmap makes the clusters obvious.
You see exactly which topics need their own articles and which should be merged.
AI Overview Visibility Tracking
Standard rank trackers often miss AI Overviews. These summaries push organic results down the page. You can monitor this by parsing the aiOverview object in your API response. This script checks if your domain appears in the AI citations.
import requests
import pandas as pd
import time
from urllib.parse import urlparse
# --- CONFIGURATION ---
API_KEY = "HASDATA_API_KEY"
TARGET_DOMAIN = "webmd.com" # The domain we are monitoring for citations
KEYWORDS = [
"health benefits of coffee",
"benefits of coffee for men",
"benefits of coffee for women",
"health benefits of black coffee",
"health benefits of mushroom coffee",
"health benefits of decaf coffee"
]
def normalize_domain(url):
"""
Extracts the base domain from a URL to ensure accurate matching.
e.g., 'https://www.sub.example.com/page' -> 'sub.example.com'
"""
try:
parsed = urlparse(url)
# Returns netloc (e.g., www.example.com). We strip 'www.' for broader matching.
return parsed.netloc.lower().replace('www.', '')
except Exception:
return ""
def fetch_serp_data(query):
"""
Executes a SERP API request targeting Google's AI Overview.
Note: AI Overviews are volatile; successful triggering depends on
location (US) and device type (Desktop/Mobile).
"""
endpoint = "https://api.hasdata.com/scrape/google/serp"
params = {
"q": query,
"gl": "us", # Geo-location: US is critical for SGE consistency
"hl": "en", # Language: English
"deviceType": "desktop"
}
headers = {"x-api-key": API_KEY}
try:
response = requests.get(endpoint, params=params, headers=headers, timeout=30)
if response.status_code == 200:
return response.json()
else:
print(f"[API Error] Status: {response.status_code} for query '{query}'")
return None
except Exception as e:
print(f"[Network Error] {e}")
return None
def extract_ai_citations(serp_data):
"""
Parses the JSON response to locate the 'aiOverview' block.
Extracts structured references (citations) if present.
"""
# HasData typically returns SGE data in the 'aiOverview' key
ai_overview = serp_data.get('aiOverview')
if not ai_overview:
return None, [] # AI Overview not triggered for this query
# Extract the 'references' array which contains the source links
references = ai_overview.get('references', [])
# Map into a cleaner format for analysis
citations = []
for ref in references:
citations.append({
'index': ref.get('index'),
'title': ref.get('title'),
'url': ref.get('link'),
'source_name': ref.get('source')
})
return ai_overview, citations
def run_monitor():
results = []
print(f"--- Starting AI Overview Monitor for: {TARGET_DOMAIN} ---")
print(f"Processing {len(KEYWORDS)} keywords...\n")
for query in KEYWORDS:
print(f"Analyzing: '{query}'...")
data = fetch_serp_data(query)
if not data:
continue
# 1. Check for AI Overview existence
ai_block, citations = extract_ai_citations(data)
is_triggered = ai_block is not None
is_cited = False
citation_rank = None
found_url = None
# 2. Analyze Citations if AI Overview exists
if is_triggered:
target_clean = normalize_domain(f"https://{TARGET_DOMAIN}")
for cit in citations:
cited_domain = normalize_domain(cit['url'])
# Check for substring match (handles subdomains and exact matches)
if target_clean in cited_domain:
is_cited = True
citation_rank = cit['index'] # 0-based index in the citation carousel
found_url = cit['url']
break # Stop after finding the first occurrence
# 3. Aggregate Data
results.append({
"Keyword": query,
"AI Triggered": is_triggered,
"Is Cited": is_cited,
"Citation Index": citation_rank if is_cited else "-",
"Cited URL": found_url if is_cited else "-"
})
# Respect API rate limits
time.sleep(1)
# --- REPORTING ---
df = pd.DataFrame(results)
print("\n" + "="*60)
print("AI OVERVIEW VISIBILITY REPORT")
print("="*60)
# formatting booleans for readability
df['AI Triggered'] = df['AI Triggered'].map({True: '✅ Yes', False: '❌ No'})
df['Is Cited'] = df['Is Cited'].map({True: '✅ YES', False: '❌ No'})
# Handle display if dataframe is empty
if not df.empty:
# Use to_markdown if available, otherwise string
try:
print(df.to_markdown(index=False))
except ImportError:
print(df.to_string(index=False))
# Summary Metrics
total = len(df)
triggered = len(df[df['AI Triggered'] == '✅ Yes'])
cited = len(df[df['Is Cited'] == '✅ YES'])
print("\n--- SUMMARY METRICS ---")
print(f"AI Coverage: {triggered}/{total} keywords ({(triggered/total)*100:.1f}%)")
if triggered > 0:
print(f"Share of Voice: {cited}/{triggered} AI Overviews ({(cited/triggered)*100:.1f}%)")
else:
print("Share of Voice: N/A (No AI Overviews generated)")
else:
print("No data collected.")
if __name__ == "__main__":
run_monitor()
Tracking this allows you to see if you are losing traffic to AI even when your organic rankings stay high.
Conclusion
The repository at HasData Python for SEO contains the full implementation of these features.


Top comments (0)