TL;DR: Google Rankings are a bad abstraction for how Google An event-driven approach to monitoring SERP changes — tracking features, entries, and exits instead of noisy rank movements.
The SERP — Google’s ‘search engine results page’ is what humans actually see when they type in a search and hit enter. It has ads, featured snippets/AI overviews, carousels, knowledge panels long before organic search results. In fact, in 2026, raw organic results increasingly feel like a fallback rather than the main event. You can sit at position three for months and still lose traffic overnight because Google introduced a new ad or feature above you.
That’s the core problem with naïve rank tracking. It assumes the environment is static and your position within it is the only variable. The truth is those changes Google makes to the results page often matter more (for market intelligence etc.) than whether you moved up/down a position.
If you’re monitoring Google for your brand and care about traffic risk, competitive pressure, or acquisition costs these are just some things that can happen:
- Google adds ads → organic CTR drops
- Google launches a featured snippet → one domain captures disproportionate attention
- A new competitor enters the page → your share of attention changes even if your rank doesn’t
None of this is well represented by a line chart of rankings. I couldn’t find many blogs that discuss this specifically without slipping into generic SEO optimization garbage, so I figured I’d write the one I wish I’d found. 🤷♂️ Let’s get down to it.
Why Kafka?
As long as you’re answering one question for one person, you don’t need Kafka. A cron job + Postgres work just fine.
The moment that stops being enough is when you want any combination of: history, alerts, multiple downstream consumers, or independent evolution of logic over time. Anything that takes this beyond a simple one-off. That’s usually where people keep bolting features onto a single script until it’s doing scraping, diffing, alerting, analytics, and reporting all at once.
Both paths lead to systems that are hard to reason about and harder to change. How do we avoid this?
Think about the basic unit of information you care about, and design around that. In this case, our “unit” is change, not snapshots. Because we want to answer “what happened over this period of X?”
So why use Kafka? It’s just the most boring (battle tested, well documented, not flashy) tech I could think of that accomplishes it. These are the benefits:
- Event replay. If you build a new consumer next month, you can replay the last 7 days of SERP changes (or however long you configure retention). Your historical data isn’t trapped in database rows — it’s a stream you can re-process.
- Independent consumers. Each consumer has its own offset. Your alerting system can crash and restart without affecting your analytics pipeline. They process events at their own pace.
- Ordering guarantees. Events for the same keyword go to the same partition (we use {keyword}:{geo} as the key). You’ll never see a “domain entered” event before the “featured snippet appeared” event that caused it.
- Backpressure handling. If your volatility analyzer is slow, Kafka doesn’t care. Events queue up and the producer keeps producing.
Is setting up Kafka more complex than a database? Yep. Is it worth it? At this level, you can’t really afford to go with anything else.
What We’re Building
The architecture is straightforward:
We continuously monitor a set of keywords via a SERP API (I’m using Bright Data), detect when the structure changes (ads appear, featured snippets shift, domains enter/exit), emit those changes (and only the changes — we have to maintain some form of state to spot the delta) as events into Kafka, and fan them out to multiple downstream consumers who can then log or store the data as they like.
Prerequisites
Before we start building, you need Python dependencies and Kafka infrastructure running.
> pip install requests python-dotenv kafka-python
Next, Kafka requires Zookeeper and a broker. Instead of installing them manually, we use Docker Compose with this YAML file.
💡 In newer Kafka versions (≥3.3), KRaft (Kafka without ZooKeeper) replaces the old coordination model.
I’m sticking with what I’m familiar with here, but if you’re setting up a modern cluster — or using an AI agent to generate your setup — it may use KRaft instead of ZooKeeper by default. Both approaches work fine for our use case, and we treat ZooKeeper as largely set-and-forget anyway.
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
# retention: 7 days
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_RETENTION_BYTES: 1073741824
Then, run it with
> docker-compose up -d
This starts:
- Zookeeper (at port 2181) — coordinates Kafka cluster
- Kafka Broker (at port 9092) — stores and distributes events
Finally, for the SERP API, get your credentials here
…and put them in a .env file in the project root like so.
BRIGHT_DATA_API_KEY=your_api_key_here
BRIGHT_DATA_ZONE=your_zone_name_here
BRIGHT_DATA_COUNTRY=us
That’s it. You’re ready to start building.
Step 1: Fetching SERP Results
So we can’t scrape Google directly. Google blocks automated requests, and residential/datacenter proxies don’t work for search. The solution is to use a SERP API. These return structured JSON of Google’s results page, including organic results, ads, featured snippets, video carousels, and more.
For reusability, we’ll first build a client that wraps the SERP API:
import os
import requests
from typing import Dict, Any, Optional
from dotenv import load_dotenv
load_dotenv()
class BrightDataClient:
"""
Reusable client for Bright Data SERP API.
"""
def __init__( self,
api_key: Optional[str] = None,
zone: Optional[str] = None,
country: Optional[str] = None ):
# load from environment variables if not provided
env_api_key = os.getenv("BRIGHT_DATA_API_KEY")
env_zone = os.getenv("BRIGHT_DATA_ZONE")
env_country = os.getenv("BRIGHT_DATA_COUNTRY")
# use provided values or fall back to environment variables
self.api_key = api_key or env_api_key
self.zone = zone or env_zone
self.country = country or env_country
self.api_endpoint = "https://api.brightdata.com/request"
if not self.api_key:
raise ValueError(
"BRIGHT_DATA_API_KEY must be provided via constructor or environment variable. "
"Get your API key from: https://brightdata.com/cp/setting/users"
)
if not self.zone:
raise ValueError(
"BRIGHT_DATA_ZONE must be provided via constructor or environment variable. "
"Manage zones at: https://brightdata.com/cp/zones"
)
# setup session with API authentication
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}'
})
def search( self,
query: str,
num_results: int = 10,
language: Optional[str] = None,
country: Optional[str] = None ) ->` Dict[str, Any]:
"""
Executes a google search.
Args:
query: Search query string
num_results: Number of results to return (default: 10)
language: Language code (e.g., 'en', 'es', 'fr')
country: Country code (e.g., 'us', 'uk', 'ca') - overrides instance default
Returns:
Dictionary containing search results in JSON format
"""
# build Google search URL
search_url = (
f"https://www.google.com/search"
f"?q={requests.utils.quote(query)}"
f"&num={num_results}"
f"&brd_json=1"
)
if language:
search_url += f"&hl={language}&lr=lang_{language}"
# use method parameter country or instance default
target_country = country or self.country
# prepare request payload for SERP API
payload = {
'zone': self.zone,
'url': search_url,
'format': 'json'
}
if target_country:
payload['country'] = target_country
try:
# use POST request to SERP API endpoint
response = self.session.post(
self.api_endpoint,
json=payload,
timeout=30
)
response.raise_for_status()
# this SERP API returns JSON directly when format='json' so just return it
return response.json()
except requests.exceptions.HTTPError as e:
error_msg = f"Search request failed with HTTP {e.response.status_code}"
if e.response.text:
error_msg += f": {e.response.text[:200]}"
raise RuntimeError(error_msg) from e
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Search request failed: {e}") from e
As a preview, here’s how we use it later.
from src.bright_data import BrightDataClient
# initialize client (reads credentials from .env file)
client = BrightDataClient()
# fetch SERP for a keyword
raw_response = client.search(
query="ai crm",
num_results=10,
country="us"
)
# raw_response now contains the full SERP structure
There’s just one problem — the raw Bright Data response you get back from this API is way too verbose:
{
"general": {
"search_engine": "google",
"query": "ai crm",
"language": "en",
"mobile": false,
"basic_view": false,
"search_type": "text",
"page_title": "ai crm - Google Search",
"timestamp": "2026–01–07T11:18:35.216Z"
},
"input": {
"original_url": "https://www.google.com/search?q=ai+crm&brd_json=1&gl=us",
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)...",
"request_id": "some_request_id"
},
"navigation": [ /* 6 navigation links */ ],
"organic": [
{
"link": "https://attio.com/",
"source": "Attio",
"display_link": "https://attio.com",
"title": "Attio: The next gen of CRM",
"description": "Execute your revenue strategy with precision…",
"rank": 1,
"global_rank": 1,
"extensions": [ /* site links, dates, etc */ ]
},
// ...8 more results with full metadata
],
"images": [ /* image results */ ],
"top_ads": [{}, {}],
"bottom_ads": [{}, {}, {}],
"pagination": { /* pagination links */ },
"related": [ /* 8 related searches */ ]
}
That’s 350+ lines of JSON for a single search. Most of it is noise for change detection. For change detection, we only care about:
- Which domains are present (not full URLs, titles, descriptions)
- Which features exist (ads, featured snippets, video carousels) — boolean flags
- When this snapshot was taken (timestamp)
- What keyword/geo this is for (context)
Everything else — navigation links, user agents, request IDs, pagination, related searches, full result metadata — is irrelevant for detecting changes.
So let’s normalize it into a much leaner format for consumption.
Step 2 : Why Normalization Matters
So we transform the verbose raw response into a compact snapshot:
"""
/src/normalizer.py
SERP data normalization
converts raw Bright Data response to structured snapshot format
"""
from datetime import datetime
from urllib.parse import urlparse
from typing import Dict, Any
def extract_domain(url: str) -> str:
"""extract domain from URL"""
try:
parsed = urlparse(url)
domain = parsed.netloc or parsed.path
# remove www. prefix
if domain.startswith('www.'):
domain = domain[4:]
return domain
except Exception:
return url
def normalize_serp_data(raw_response: Dict[str, Any], keyword: str, geo: str = "US") -> Dict[str, Any]:
"""
normalize SERP response into structured format
Args:
raw_response: raw JSON response from Bright Data SERP API
keyword: search keyword
geo: geographic location code (default: "US")
Returns:
normalized dictionary with structured SERP data
"""
# extract organic domains
organic_domains = []
if raw_response.get('organic') and isinstance(raw_response['organic'], list):
for result in raw_response['organic']:
link = result.get('link') or result.get('url', '')
if link:
domain = extract_domain(link)
if domain:
organic_domains.append(domain)
# check for ads (top_ads or bottom_ads)
top_ads = raw_response.get('top_ads', [])
bottom_ads = raw_response.get('bottom_ads', [])
has_ads = bool(
(top_ads and len([a for a in top_ads if a]) > 0) or
(bottom_ads and len([a for a in bottom_ads if a]) > 0) or
(raw_response.get('ads') and len(raw_response.get('ads', [])) > 0)
)
ads_count = len([a for a in top_ads if a]) + len([a for a in bottom_ads if a])
# check for featured snippet (knowledge panel)
has_featured_snippet = bool(raw_response.get('knowledge'))
# check for video carousel
has_video_carousel = bool(raw_response.get('video') and len(raw_response.get('video', [])) > 0)
# check for people also ask
has_people_also_ask = bool(raw_response.get('people_also_ask') and len(raw_response.get('people_also_ask', [])) > 0)
# extract featured snippet owner if available
featured_snippet_owner = None
if has_featured_snippet:
knowledge = raw_response.get('knowledge', {})
# try to extract owner from knowledge panel
if knowledge.get('url'):
featured_snippet_owner = extract_domain(knowledge['url'])
elif knowledge.get('source'):
featured_snippet_owner = extract_domain(knowledge['source'])
# build normalized structure
normalized = {
"keyword": keyword,
"geo": geo,
"features": {
"ads": has_ads,
"ads_count": ads_count if has_ads else 0,
"featured_snippet": has_featured_snippet,
"featured_snippet_owner": featured_snippet_owner,
"video_carousel": has_video_carousel,
"people_also_ask": has_people_also_ask
},
"organic_domains": organic_domains,
"timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
}
return normalized
Here, we extract just the domain names, not full URLs (i.e. from “https://www.salesforce.com/crm/” to “salesforce.com”. Why? Because:
- URLs change (query params, paths) but domains are stable identifiers
- We care about which sites are ranking, not specific pages
- Domain-level tracking is more resilient to URL structure changes
We also convert complex nested structures into simple booleans:
# Raw:
{
"top_ads": [{...}, {...}],
"bottom_ads": []
}
# Normalized:
{
"ads": true,
"ads_count": 2
}
This makes some comparisons we’ll perform later, trivial:
if current['features']['ads'] and not previous['features']['ads']:
# Ads appeared! Emit a signal.
Finally, when a featured snippet exists, we extract which domain owns it:
if has_featured_snippet:
knowledge = raw_response.get('knowledge', {})
if knowledge.get('url'):
featured_snippet_owner = extract_domain(knowledge['url'])
This lets us track when featured snippet ownership changes — a critical competitive signal.
Before vs After
Raw response: 350+ lines, ~15KB. Normalized snapshot: 22 lines, ~500 bytes
This way, we store 30x less data per snapshot, and perform comparisons way faster because we’re comparing arrays of domains instead of deep object structures.
Step 3: Change Detection — Turning Snapshots into Events
At this point we have normalized SERP snapshots. But snapshots alone aren’t very useful for Kafka. We’ve talked about this before — Kafka wants events, not states.
So the job of Step 3 is simple:
Compare the current snapshot with the previous one and emit only what changed i.e. the delta.
"""
/src/change_detector.py
change detection logic for SERP snapshots
compares current vs previous state and emits change events
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
class ChangeDetector:
"""
detects changes between SERP snapshots
generates change events for Kafka
"""
def detect_changes( self,
current: Dict[str, Any],
previous: Optional[Dict[str, Any]] ) -> List[Dict[str, Any]]:
"""
compare current and previous snapshots, return list of change events
Args:
current: current normalized SERP snapshot
previous: previous normalized SERP snapshot (None if first run)
Returns:
list of change event dictionaries
"""
events = []
if previous is None:
# first run - no changes to detect
return events
# detect feature changes
events.extend(self._detect_feature_changes(current, previous))
# detect organic domain changes
events.extend(self._detect_organic_changes(current, previous))
return events
def _detect_feature_changes( self,
current: Dict[str, Any],
previous: Dict[str, Any] ) -> List[Dict[str, Any]]:
"""detect changes in SERP features (ads, featured snippet, etc.)"""
events = []
current_features = current.get('features', {})
previous_features = previous.get('features', {})
feature_types = ['ads', 'featured_snippet', 'video_carousel', 'people_also_ask']
for feature in feature_types:
current_value = current_features.get(feature, False)
previous_value = previous_features.get(feature, False)
# feature appeared
if current_value and not previous_value:
event = {
"event_type": "serp_feature_added",
"keyword": current['keyword'],
"geo": current['geo'],
"feature": feature,
"timestamp": current['timestamp']
}
# add feature-specific metadata
if feature == "ads":
# count ads if available
event["count"] = current.get('features', {}).get('ads_count', 1)
elif feature == "featured_snippet":
# extract owner domain if available
owner = current.get('features', {}).get('featured_snippet_owner')
if owner:
event["owner_domain"] = owner
events.append(event)
# feature disappeared
elif not current_value and previous_value:
event = {
"event_type": "serp_feature_removed",
"keyword": current['keyword'],
"geo": current['geo'],
"feature": feature,
"timestamp": current['timestamp']
}
events.append(event)
return events
def _detect_organic_changes( self,
current: Dict[str, Any],
previous: Dict[str, Any] ) -> List[Dict[str, Any]]:
"""detect changes in organic domain rankings"""
events = []
current_domains = set(current.get('organic_domains', []))
previous_domains = set(previous.get('organic_domains', []))
# domains that exited (were in previous, not in current)
exited = previous_domains - current_domains
for domain in exited:
# find previous position
previous_position = self._get_domain_position(domain, previous)
event = {
"event_type": "organic_exit",
"keyword": current['keyword'],
"geo": current['geo'],
"domain": domain,
"previous_position": previous_position,
"timestamp": current['timestamp']
}
events.append(event)
# domains that entered (in current, not in previous)
entered = current_domains - previous_domains
for domain in entered:
# find current position
current_position = self._get_domain_position(domain, current)
event = {
"event_type": "organic_entry",
"keyword": current['keyword'],
"geo": current['geo'],
"domain": domain,
"current_position": current_position,
"timestamp": current['timestamp']
}
events.append(event)
return events
def _get_domain_position( self,
domain: str,
snapshot: Dict[str, Any] ) -> Optional[int]:
"""get position of domain in organic results (1-indexed)"""
domains = snapshot.get('organic_domains', [])
try:
return domains.index(domain) + 1
except ValueError:
return None
This detector compares two normalized snapshots and emits only the delta. Because we normalized SERP features into simple booleans in Step 2, detecting changes becomes trivial.
# this avoids deep JSON comparisons + false positives from minor SERP variations
if now and not before:
# feature appeared
Also, for organic results, we intentionally ignore position changes. Rank reshuffles inside the top 10 happen constantly and aren’t very actionable. A domain entering or disappearing from the SERP is what’s actually interesting. Because organic results were normalized into a flat list of domains, this becomes a set comparison: previous — current = domains that exitedand current — previous = domains that entered. This gives us O(1) lookups and O(n) comparisons.
Each run produces zero or more small events like:
{
"event_type": "serp_feature_added",
"keyword": "ai crm",
"geo": "US",
"feature": "ads",
"count": 3,
"timestamp": "2026-01-07T12:00:00Z"
},
{
"event_type": "serp_feature_added",
"keyword": "ai crm",
"geo": "US",
"feature": "featured_snippet",
"owner_domain": "hubspot.com",
"timestamp": "2026-01-07T12:00:00Z"
}
Instead of storing “what the SERP looks like now”, now we’re storing:
- what happened
- when it happened
- for which keyword + geo it happened.
- When relevant, we also attach extra metadata (ad count, featured snippet owner) so downstream consumers don’t need to re-hydrate context.
This is the exact shape Kafka wants. At this point, SERP data has stopped being scraped snapshots for us, and is behaving like a true event stream.
Before we start the pipeline, we should cover state management, briefly.
Step 4: A Simple State Manager
We’ll use simple file-based storage:
"""
/src/state_manager.py
state management for SERP snapshots
stores previous state to enable change detection
"""
import os
import json
from typing import Dict, Any, Optional
from pathlib import Path
class StateManager:
"""
manages SERP snapshot state storage
stores previous snapshots to compare against current ones
"""
def __init__(self, state_dir: str = "state"):
self.state_dir = Path(state_dir)
self.state_dir.mkdir(exist_ok=True)
def _get_state_file(self, keyword: str, geo: str) -> Path:
"""get state file path for keyword+geo combination"""
safe_keyword = keyword.replace(" ", "_").lower()
safe_geo = geo.lower()
filename = f"{safe_keyword}_{safe_geo}.json"
return self.state_dir / filename
def load_previous_state( self,
keyword: str,
geo: str ) -> Optional[Dict[str, Any]]:
"""
load previous SERP snapshot for keyword+geo
Returns:
previous snapshot dict or None if no previous state exists
"""
state_file = self._get_state_file(keyword, geo)
if not state_file.exists():
return None
try:
with open(state_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Warning: failed to load state from {state_file}: {e}")
return None
def save_state( self,
keyword: str,
geo: str,
snapshot: Dict[str, Any] ) -> None:
"""
save current SERP snapshot as new state
Args:
keyword: search keyword
geo: geographic location code
snapshot: normalized SERP snapshot to save
"""
state_file = self._get_state_file(keyword, geo)
try:
with open(state_file, 'w', encoding='utf-8') as f:
json.dump(snapshot, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Error: failed to save state to {state_file}: {e}")
raise
So we’ll store our state files like this:
./state/
├── ai_crm_us.json
├── best_crm_us.json
└── ai_crm_uk.json
Each state file contains the last normalized snapshot for that keyword + geo combination, as you’ll see when we bring it all together.
Step 4: The Producer Pipeline — Tying It All Together
At this point we have all the individual pieces:
- SERP fetching
- Snapshot normalization
- Change detection
- Kafka event emission
Step 4 is where they come together into a single producer pipeline.
This producer fetches the latest SERP → normalizes it → compares it to the previous snapshot → emits only the detected changes (the delta) to Kafka → persists state for the next run.
"""
/src/producer.py
Kafka producer for SERP change events
fetches SERP data, detects changes, and emits events to Kafka
"""
import time
import json
from typing import Dict, Any, List
from kafka import KafkaProducer
from kafka.errors import KafkaError
from bright_data import BrightDataClient
from normalizer import normalize_serp_data
from state_manager import StateManager
from change_detector import ChangeDetector
class SERPProducer:
"""
For a Kafka producer that monitors SERP changes and emits events to Kafka
"""
def __init__( self,
kafka_brokers: str = "localhost:9092",
state_dir: str = "state" ):
self.producer = KafkaProducer(
bootstrap_servers=kafka_brokers,
# convert because KafkaProducer expects callable serializers.
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
self.bright_data = BrightDataClient()
self.state_manager = StateManager(state_dir)
self.change_detector = ChangeDetector()
def process_keyword( self,
keyword: str,
geo: str = "US",
num_results: int = 10 ) -> List[Dict[str, Any]]:
"""
process a single keyword: fetch, compare, emit changes
Args:
keyword: search keyword to monitor
geo: geographic location code
num_results: number of results to fetch
Returns:
list of change events emitted
"""
print(f"Processing keyword: {keyword} (geo: {geo})")
# fetch current SERP
print(" Fetching current SERP...")
try:
raw_response = self.bright_data.search(
query=keyword,
num_results=num_results,
country=geo.lower()
)
except Exception as e:
print(f" Error fetching SERP: {e}")
return []
# normalize current snapshot
current_snapshot = normalize_serp_data(raw_response, keyword, geo)
print(f" Current snapshot: {len(current_snapshot['organic_domains'])} domains")
# load previous state
previous_snapshot = self.state_manager.load_previous_state(keyword, geo)
if previous_snapshot:
print(f" Previous snapshot: {len(previous_snapshot['organic_domains'])} domains")
else:
print(" No previous state found (first run)")
# detect changes
change_events = self.change_detector.detect_changes(
current_snapshot,
previous_snapshot
)
print(f" Detected {len(change_events)} change(s)")
# emit events to Kafka
emitted_events = []
for event in change_events:
try:
# use keyword+geo as key for partitioning
key = f"{keyword}:{geo}"
topic = "serp-changes"
future = self.producer.send(topic, key=key, value=event)
# wait for confirmation (optional - can be async)
record_metadata = future.get(timeout=10)
print(f" Emitted: {event['event_type']} -> partition {record_metadata.partition}, offset {record_metadata.offset}")
emitted_events.append(event)
except KafkaError as e:
print(f" Error emitting event: {e}")
# save current state as new previous state
self.state_manager.save_state(keyword, geo, current_snapshot)
print(" State saved")
return emitted_events
def run_monitoring_loop( self,
keywords: List[Dict[str, str]],
interval_seconds: int = 1800 # 30 minutes default ):
"""
run continuous monitoring loop
Args:
keywords: list of dicts with 'keyword' and 'geo' keys
interval_seconds: seconds between monitoring cycles
"""
print(f"Starting monitoring loop (interval: {interval_seconds}s)")
print(f"Tracking {len(keywords)} keyword(s)")
try:
while True:
print("\\n" + "=" * 60)
print(f"Monitoring cycle at {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
for kw_config in keywords:
keyword = kw_config['keyword']
geo = kw_config.get('geo', 'US')
try:
self.process_keyword(keyword, geo)
# small delay between keywords
time.sleep(2)
except Exception as e:
print(f"Error processing {keyword}: {e}")
print(f"\\nSleeping for {interval_seconds} seconds...")
time.sleep(interval_seconds)
except KeyboardInterrupt:
print("\\nShutting down...")
self.producer.close()
def main():
"""example usage"""
producer = SERPProducer()
# example: monitor single keyword
keywords = [
{"keyword": "ai crm", "geo": "US"}
]
# run once (for testing)
# producer.process_keyword("ai crm", "US")
# or run continuous loop
producer.run_monitoring_loop(keywords, interval_seconds=3600) # 1 hour
if __name__ == "__main__":
main()
Notice the synchronous Kafka sends (future.get(timeout=10))? This is intentional. We’re not optimizing for throughput, we’re monitoring a handful of keywords every 30 minutes. I’d rather wait 10 seconds and know the event was written than fail silently and lose a critical alert.
For a high-throughput system, you’d batch events and send asynchronously.
Also, note the Kafka partitioning key we use: key = f”{keyword}:{geo}”. This ensures you’ll never see a “domain entered” event before the “featured snippet appeared” event that caused it, because they’re in the same partition and processed sequentially.
Here’s how we use run_monitoring_loop() to start the pipeline:
producer = SERPProducer()
keywords = [
{"keyword": "ai crm", "geo": "US"},
{"keyword": "best crm", "geo": "US"}
]
# run continuous loop (every 30 minutes)
producer.run_monitoring_loop(keywords, interval_seconds=1800)
This pipeline runs continuously, emitting events whenever SERP composition changes. Next, we’ll see how multiple independent consumers process these events.
Step 5: The Consumers — Independent Event Processing
This is where Kafka’s value becomes obvious. We have three completely independent consumers processing the same event stream. Each has its own offset, its own logic, and can evolve independently.
But they all follow the same pattern:
- Each consumer has a unique
group_id— this is how Kafka tracks offsets per consumer -
auto_offset_reset=’earliest’— start from the beginning on first run (can replay history) -
enable_auto_commit=True— automatically commit offsets after processing - Each consumer processes events at its own pace — if one is slow, others aren’t affected
Consumer 1: SEO Alerts
This consumer only cares about high-impact changes. Ads appearing? Featured snippets appearing? Those directly affect CTR. Everything else gets ignored.
"""
Consumer 1: SEO Alerting Service
listens for SERP feature changes that impact SEO
"""
import json
import sys
import os
from datetime import datetime
from kafka import KafkaConsumer
# add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))
class SEOAlertConsumer:
"""
consumer that alerts on SERP changes affecting SEO
focuses on ads and featured snippets appearing
"""
def __init__( self,
kafka_brokers: str = "localhost:9092",
topic: str = "serp-changes",
output_file: str = None ):
self.topic = topic
self.consumer = KafkaConsumer(
topic,
bootstrap_servers=kafka_brokers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='seo-alert-consumer',
auto_offset_reset='earliest', # start from beginning
enable_auto_commit=True
)
# set output file path relative to script directory
if output_file is None:
script_dir = os.path.dirname(os.path.abspath(__file__))
output_file = os.path.join(script_dir, 'seo_alerts.txt')
self.output_file = output_file
def process_event(self, event: dict) -> None:
"""process a single change event"""
event_type = event.get('event_type')
# only care about feature additions (ads, featured snippets)
if event_type == 'serp_feature_added':
feature = event.get('feature')
if feature in ['ads', 'featured_snippet']:
alert = self._create_alert(event, feature)
self._save_alert(alert)
self._print_alert(alert)
def _create_alert(self, event: dict, feature: str) -> dict:
"""create alert message"""
keyword = event.get('keyword')
geo = event.get('geo')
if feature == 'ads':
count = event.get('count', 1)
message = f"Heads up: Google added {count} ad(s) for '{keyword}' in {geo}. Expect organic CTR drop."
elif feature == 'featured_snippet':
owner = event.get('owner_domain', 'unknown')
message = f"Heads up: Google added a featured snippet for '{keyword}' in {geo} (owned by {owner}). Expect organic CTR drop."
else:
message = f"SERP feature '{feature}' appeared for '{keyword}' in {geo}"
return {
'timestamp': event.get('timestamp'),
'keyword': keyword,
'geo': geo,
'feature': feature,
'message': message,
'event': event
}
def _save_alert(self, alert: dict) -> None:
"""save alert to file"""
try:
with open(self.output_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(alert) + '\\n')
except Exception as e:
print(f"Error saving alert: {e}")
def _print_alert(self, alert: dict) -> None:
"""print alert to console"""
print("\\n" + "=" * 60)
print("SEO ALERT")
print("=" * 60)
print(f"Time: {alert['timestamp']}")
print(f"Keyword: {alert['keyword']} ({alert['geo']})")
print(f"Feature: {alert['feature']}")
print(f"Message: {alert['message']}")
print("=" * 60)
def run(self):
"""main consumer loop"""
print("SEO Alert Consumer started")
print(f"Listening to topic: {self.topic}")
print(f"Alerts will be saved to: {self.output_file}")
print("\\nWaiting for events...\\n")
try:
for message in self.consumer:
event = message.value
self.process_event(event)
except KeyboardInterrupt:
print("\\nShutting down...")
finally:
self.consumer.close()
def main():
consumer = SEOAlertConsumer()
consumer.run()
if __name__ == "__main__":
main()
This consumer simply filters for serp_feature_addedevents, and only processes ads and featured_snippet features. It creates human-readable alerts and saves them to a TXT file.
{
"timestamp": "2026-01-07T12:00:00Z",
"keyword": "ai crm",
"geo": "US",
"feature": "ads",
"message": "Heads up: Google added 3 ad(s) for 'ai crm' in US. Expect organic CTR drop.",
"event": { /* full event data */ }
}
Consumer 2: Competitive Intelligence
This consumer tracks competitors. You can configure a list of domains to watch (hubspot.com, salesforce.com, etc.), and it logs when they enter or exit the SERP, or when they gain/lose the featured snippet.
"""
Consumer 2: Competitive Intelligence Service
tracks competitor movements in SERP
"""
import json
import sys
import os
from kafka import KafkaConsumer
# add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))
class CompetitiveIntelConsumer:
"""
consumer that tracks competitive movements
focuses on featured snippet ownership and domain entries/exits
"""
def __init__( self,
kafka_brokers: str = "localhost:9092",
topic: str = "serp-changes",
output_file: str = "competitive_intel.txt",
tracked_domains: list = None ):
self.topic = topic
self.consumer = KafkaConsumer(
topic,
bootstrap_servers=kafka_brokers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='competitive-intel-consumer',
auto_offset_reset='earliest',
enable_auto_commit=True
)
self.output_file = output_file
self.tracked_domains = set(tracked_domains or [])
# track featured snippet ownership
self.featured_snippet_owners = {} # keyword+geo -> domain
def process_event(self, event: dict) -> None:
"""process a single change event"""
event_type = event.get('event_type')
keyword = event.get('keyword')
geo = event.get('geo')
key = f"{keyword}:{geo}"
# track featured snippet ownership changes
if event_type == 'serp_feature_added' and event.get('feature') == 'featured_snippet':
owner = event.get('owner_domain')
if owner:
previous_owner = self.featured_snippet_owners.get(key)
if previous_owner != owner:
self._log_featured_snippet_change(keyword, geo, previous_owner, owner, event)
self.featured_snippet_owners[key] = owner
# track domain entries/exits
if event_type == 'organic_entry':
domain = event.get('domain')
if self._should_track(domain):
self._log_domain_entry(keyword, geo, domain, event)
elif event_type == 'organic_exit':
domain = event.get('domain')
if self._should_track(domain):
self._log_domain_exit(keyword, geo, domain, event)
def _should_track(self, domain: str) -> bool:
"""determine if domain should be tracked"""
if not self.tracked_domains:
return True # track all if none specified
return domain in self.tracked_domains
def _log_featured_snippet_change( self,
keyword: str,
geo: str,
previous_owner: str,
new_owner: str,
event: dict ) -> None:
"""log featured snippet ownership change"""
if previous_owner:
message = f"Featured snippet ownership changed: {previous_owner} -> {new_owner}"
else:
message = f"{new_owner} gained SERP ownership via featured snippet"
intel = {
'timestamp': event.get('timestamp'),
'keyword': keyword,
'geo': geo,
'type': 'featured_snippet_ownership',
'previous_owner': previous_owner,
'new_owner': new_owner,
'message': message
}
self._save_intel(intel)
self._print_intel(intel)
def _log_domain_entry( self,
keyword: str,
geo: str,
domain: str,
event: dict ) -> None:
"""log domain entry into SERP"""
position = event.get('current_position')
intel = {
'timestamp': event.get('timestamp'),
'keyword': keyword,
'geo': geo,
'type': 'domain_entry',
'domain': domain,
'position': position,
'message': f"{domain} entered SERP at position {position}"
}
self._save_intel(intel)
self._print_intel(intel)
def _log_domain_exit( self,
keyword: str,
geo: str,
domain: str,
event: dict ) -> None:
"""log domain exit from SERP"""
previous_position = event.get('previous_position')
intel = {
'timestamp': event.get('timestamp'),
'keyword': keyword,
'geo': geo,
'type': 'domain_exit',
'domain': domain,
'previous_position': previous_position,
'message': f"{domain} dropped out of SERP (was at position {previous_position})"
}
self._save_intel(intel)
self._print_intel(intel)
def _save_intel(self, intel: dict) -> None:
"""save intelligence to file"""
try:
with open(self.output_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(intel) + '\\n')
except Exception as e:
print(f"Error saving intel: {e}")
def _print_intel(self, intel: dict) -> None:
"""print intelligence to console"""
print("\\n" + "=" * 60)
print("COMPETITIVE INTELLIGENCE")
print("=" * 60)
print(f"Time: {intel['timestamp']}")
print(f"Keyword: {intel['keyword']} ({intel['geo']})")
print(f"Type: {intel['type']}")
print(f"Message: {intel['message']}")
print("=" * 60)
def run(self):
"""main consumer loop"""
print("Competitive Intelligence Consumer started")
print(f"Listening to topic: {self.topic}")
print(f"Intelligence will be saved to: {self.output_file}")
if self.tracked_domains:
print(f"Tracking domains: {', '.join(self.tracked_domains)}")
print("\\nWaiting for events...\\n")
try:
for message in self.consumer:
event = message.value
self.process_event(event)
except KeyboardInterrupt:
print("\\nShutting down...")
finally:
self.consumer.close()
def main():
# example: track specific competitors
tracked_domains = ['hubspot.com', 'salesforce.com', 'zoho.com']
consumer = CompetitiveIntelConsumer(tracked_domains=tracked_domains)
consumer.run()
if __name__ == "__main__":
main()
This maintains an in-memory state — featured_snippet_owners — to detect ownership changes. Other than that, it can track all domains or specific competitors. Detects when featured snippet ownership changes (HubSpot → Salesforce), and tracks when competitors enter or exit the SERP
{
"timestamp": "2026-01-07T12:00:00Z",
"keyword": "ai crm",
"geo": "US",
"type": "featured_snippet_ownership",
"previous_owner": "hubspot.com",
"new_owner": "salesforce.com",
"message": "Featured snippet ownership changed: hubspot.com -> salesforce.com"
},
{
"timestamp": "2026-01-07T12:00:00Z",
"keyword": "ai crm",
"geo": "US",
"type": "domain_entry",
"domain": "zoho.com",
"position": 5,
"message": "zoho.com entered SERP at position 5"
}
Of course, this can be extended to write to a database, generate reports, or trigger alerts.
Consumer 3: Volatility Analyzer
This consumer aggregates events over a time window and calculates a volatility score. More changes = higher volatility.
"""
Consumer 3: Historical Volatility Analyzer
aggregates SERP changes over time to calculate volatility scores
"""
import json
import sys
import os
from datetime import datetime, timedelta
from collections import defaultdict
from kafka import KafkaConsumer
# add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))
class VolatilityAnalyzerConsumer:
"""
consumer that calculates SERP volatility scores
tracks change frequency over time windows
"""
def __init__( self,
kafka_brokers: str = "localhost:9092",
topic: str = "serp-changes",
output_file: str = "volatility_scores.txt",
window_days: int = 7 ):
self.topic = topic
self.consumer = KafkaConsumer(
topic,
bootstrap_servers=kafka_brokers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='volatility-analyzer-consumer',
auto_offset_reset='earliest',
enable_auto_commit=True
)
self.output_file = output_file
self.window_days = window_days
# track events per keyword+geo
self.events_by_keyword = defaultdict(list) # (keyword, geo) -> [events]
def process_event(self, event: dict) -> None:
"""process a single change event"""
keyword = event.get('keyword')
geo = event.get('geo')
key = (keyword, geo)
# store event with timestamp
self.events_by_keyword[key].append({
'event': event,
'timestamp': event.get('timestamp')
})
# calculate volatility for this keyword
volatility = self._calculate_volatility(key)
if volatility is not None:
score_data = {
'keyword': keyword,
'geo': geo,
'volatility_score': volatility,
'window_days': self.window_days,
'change_count': len(self.events_by_keyword[key]),
'timestamp': datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
}
self._save_score(score_data)
self._print_score(score_data)
def _calculate_volatility(self, key: tuple) -> float:
"""
calculate volatility score (0.0 to 1.0)
higher score = more changes = more volatile
"""
events = self.events_by_keyword[key]
if len(events) `< 2:
return None # need at least 2 events
# filter events within time window
cutoff_time = datetime.utcnow() - timedelta(days=self.window_days)
recent_events = [
e for e in events
if datetime.fromisoformat(e['timestamp'].replace('Z', '+00:00')).replace(tzinfo=None) >` cutoff_time
]
if len(recent_events) `< 2:
return None
# simple volatility: change count normalized by time window
# more sophisticated: could weight by change type, magnitude, etc.
change_count = len(recent_events)
# normalize: assume 1 change per day = 0.5 score
# max reasonable: ~10 changes per day = 1.0 score
max_changes_per_day = 10
days_in_window = self.window_days
max_reasonable_changes = max_changes_per_day * days_in_window
volatility = min(change_count / max_reasonable_changes, 1.0)
return round(volatility, 2)
def _save_score(self, score_data: dict) ->` None:
"""save volatility score to file"""
try:
with open(self.output_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(score_data) + '\\n')
except Exception as e:
print(f"Error saving score: {e}")
def _print_score(self, score_data: dict) -> None:
"""print volatility score to console"""
print("\\n" + "=" * 60)
print("VOLATILITY ANALYSIS")
print("=" * 60)
print(f"Keyword: {score_data['keyword']} ({score_data['geo']})")
print(f"Volatility Score: {score_data['volatility_score']:.2f}")
print(f"Window: {score_data['window_days']} days")
print(f"Change Count: {score_data['change_count']}")
print(f"Timestamp: {score_data['timestamp']}")
print("=" * 60)
def run(self):
"""main consumer loop"""
print("Volatility Analyzer Consumer started")
print(f"Listening to topic: {self.topic}")
print(f"Scores will be saved to: {self.output_file}")
print(f"Analysis window: {self.window_days} days")
print("\\nWaiting for events...\\n")
try:
for message in self.consumer:
event = message.value
self.process_event(event)
except KeyboardInterrupt:
print("\\nShutting down...")
finally:
self.consumer.close()
def main():
consumer = VolatilityAnalyzerConsumer(window_days=7)
consumer.run()
if __name__ == "__main__":
main()
Here’s how I’m defining and calculating volatility here (capping at 1.0 or 100%) :
volatility = min(change_count / (max_changes_per_day * window_days), 1.0)
And this’ll produce and log alerts like this.
{
"keyword": "ai crm",
"geo": "US",
"volatility_score": 0.35,
"window_days": 7,
"change_count": 25,
"timestamp": "2026-01-07T12:00:00Z"
}
This says: 25 changes in 7 days = 0.35 volatility (moderately volatile)
Is this the best volatility metric? Probably not. But it’s a starting point. You could weight by change type (featured snippet changes are more significant than domain entries), factor in change magnitude (position shifts vs entries/exits), or normalize by keyword search volume,
The point is this consumer can evolve independently. You can improve the volatility algorithm without touching the producer or other consumers.
Each consumer runs independently:
# Terminal 1: SEO Alerts
python consumers/seo_alert_consumer.py
# Terminal 2: Competitive Intelligence
python consumers/competitive_intel_consumer.py
# Terminal 3: Volatility Analyzer
python consumers/volatility_analyzer_consumer.py
These consumers read from the same Kafka topic (serp-changes), each maintains its own offset (can process at different speeds), if one crashes, others continue processing, each writes to its own output file/log, and if you restart a consumer, it resumes from its last committed offset.
What’s The Payoff?
When you put all three consumers together, you stop asking “what rank are we today?” and start answering:
- How competitive is this market right now?
- Is Google monetizing [insert query here] more aggressively?
- Are our competitors being displaced or reinforced?
- Is this SERP worth investing in, or is it too volatile?
- Are competitors winning because of strategy, or because of SERP structure?
See the shift? You’re no longer tracking positions. You’re observing market dynamics.
And because this is event-driven, every insight is replayable, every consumer can evolve independently, and every new question you want answered is just another consumer you have to write.



Top comments (3)
What's your actual polling frequency for the SERP API? Article doesn't mention it but I'm guessing you're not hitting every keyword every 5 minutes or your Bright Data bill would be insane. Do you tier keywords by importance or just run everything hourly/daily?
Crazy how normalized snapshots make change detection so cheap, saves you from parsing 350+ lines every run... tip, if you scale this, consider batching events per keyword to reduce Kafka overhead!!
Some comments may only be visible to logged-in visitors. Sign in to view all comments.