DEV Community

Cover image for Turning Google Search into a Kafka Event Stream for Many Consumers
Prithwish Nath
Prithwish Nath

Posted on • Originally published at Medium

Turning Google Search into a Kafka Event Stream for Many Consumers

TL;DR: Google Rankings are a bad abstraction for how Google An event-driven approach to monitoring SERP changes — tracking features, entries, and exits instead of noisy rank movements.

The SERP — Google’s ‘search engine results page’ is what humans actually see when they type in a search and hit enter. It has ads, featured snippets/AI overviews, carousels, knowledge panels long before organic search results. In fact, in 2026, raw organic results increasingly feel like a fallback rather than the main event. You can sit at position three for months and still lose traffic overnight because Google introduced a new ad or feature above you.

That’s the core problem with naïve rank tracking. It assumes the environment is static and your position within it is the only variable. The truth is those changes Google makes to the results page often matter more (for market intelligence etc.) than whether you moved up/down a position.

If you’re monitoring Google for your brand and care about traffic risk, competitive pressure, or acquisition costs these are just some things that can happen:

  • Google adds ads → organic CTR drops
  • Google launches a featured snippet → one domain captures disproportionate attention
  • A new competitor enters the page → your share of attention changes even if your rank doesn’t

None of this is well represented by a line chart of rankings. I couldn’t find many blogs that discuss this specifically without slipping into generic SEO optimization garbage, so I figured I’d write the one I wish I’d found. 🤷‍♂️ Let’s get down to it.

Why Kafka?

As long as you’re answering one question for one person, you don’t need Kafka. A cron job + Postgres work just fine.

The moment that stops being enough is when you want any combination of: history, alerts, multiple downstream consumers, or independent evolution of logic over time. Anything that takes this beyond a simple one-off. That’s usually where people keep bolting features onto a single script until it’s doing scraping, diffing, alerting, analytics, and reporting all at once.

Both paths lead to systems that are hard to reason about and harder to change. How do we avoid this?

Think about the basic unit of information you care about, and design around that. In this case, our “unit” is change, not snapshots. Because we want to answer “what happened over this period of X?”

So why use Kafka? It’s just the most boring (battle tested, well documented, not flashy) tech I could think of that accomplishes it. These are the benefits:

  1. Event replay. If you build a new consumer next month, you can replay the last 7 days of SERP changes (or however long you configure retention). Your historical data isn’t trapped in database rows — it’s a stream you can re-process.
  2. Independent consumers. Each consumer has its own offset. Your alerting system can crash and restart without affecting your analytics pipeline. They process events at their own pace.
  3. Ordering guarantees. Events for the same keyword go to the same partition (we use {keyword}:{geo} as the key). You’ll never see a “domain entered” event before the “featured snippet appeared” event that caused it.
  4. Backpressure handling. If your volatility analyzer is slow, Kafka doesn’t care. Events queue up and the producer keeps producing.

Apache Kafka

Is setting up Kafka more complex than a database? Yep. Is it worth it? At this level, you can’t really afford to go with anything else.

What We’re Building

The architecture is straightforward:

We continuously monitor a set of keywords via a SERP API (I’m using Bright Data), detect when the structure changes (ads appear, featured snippets shift, domains enter/exit), emit those changes (and only the changes — we have to maintain some form of state to spot the delta) as events into Kafka, and fan them out to multiple downstream consumers who can then log or store the data as they like.

Prerequisites

Before we start building, you need Python dependencies and Kafka infrastructure running.

> pip install requests python-dotenv kafka-python
Enter fullscreen mode Exit fullscreen mode

Next, Kafka requires Zookeeper and a broker. Instead of installing them manually, we use Docker Compose with this YAML file.

💡 In newer Kafka versions (≥3.3), KRaft (Kafka without ZooKeeper) replaces the old coordination model.

I’m sticking with what I’m familiar with here, but if you’re setting up a modern cluster — or using an AI agent to generate your setup — it may use KRaft instead of ZooKeeper by default. Both approaches work fine for our use case, and we treat ZooKeeper as largely set-and-forget anyway.

version: '3.8'  

services:  
  zookeeper:  
    image: confluentinc/cp-zookeeper:7.5.0  
    hostname: zookeeper  
    container_name: zookeeper  
    ports:  
      - "2181:2181"  
    environment:  
      ZOOKEEPER_CLIENT_PORT: 2181  
      ZOOKEEPER_TICK_TIME: 2000  

  kafka:  
    image: confluentinc/cp-kafka:7.5.0  
    hostname: kafka  
    container_name: kafka  
    depends_on:  
      - zookeeper  
    ports:  
      - "9092:9092"  
      - "9101:9101"  
    environment:  
      KAFKA_BROKER_ID: 1  
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'  
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT  
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092  
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1  
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1  
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1  
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0  
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'  
      # retention: 7 days  
      KAFKA_LOG_RETENTION_HOURS: 168  
      KAFKA_LOG_RETENTION_BYTES: 1073741824  


Enter fullscreen mode Exit fullscreen mode

Then, run it with

> docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

This starts:

  • Zookeeper (at port 2181) — coordinates Kafka cluster
  • Kafka Broker (at port 9092) — stores and distributes events

Finally, for the SERP API, get your credentials here

…and put them in a .env file in the project root like so.

BRIGHT_DATA_API_KEY=your_api_key_here  
BRIGHT_DATA_ZONE=your_zone_name_here  
BRIGHT_DATA_COUNTRY=us
Enter fullscreen mode Exit fullscreen mode

That’s it. You’re ready to start building.

Step 1: Fetching SERP Results

So we can’t scrape Google directly. Google blocks automated requests, and residential/datacenter proxies don’t work for search. The solution is to use a SERP API. These return structured JSON of Google’s results page, including organic results, ads, featured snippets, video carousels, and more.

For reusability, we’ll first build a client that wraps the SERP API:


import os  
import requests  
from typing import Dict, Any, Optional  
from dotenv import load_dotenv  

load_dotenv()  


class BrightDataClient:  
    """  
    Reusable client for Bright Data SERP API.  
    """  

    def __init__(        self,  
        api_key: Optional[str] = None,  
        zone: Optional[str] = None,  
        country: Optional[str] = None    ):  
        # load from environment variables if not provided  
        env_api_key = os.getenv("BRIGHT_DATA_API_KEY")  
        env_zone = os.getenv("BRIGHT_DATA_ZONE")  
        env_country = os.getenv("BRIGHT_DATA_COUNTRY")  

        # use provided values or fall back to environment variables  
        self.api_key = api_key or env_api_key  
        self.zone = zone or env_zone  
        self.country = country or env_country  
        self.api_endpoint = "https://api.brightdata.com/request"  

        if not self.api_key:  
            raise ValueError(  
                "BRIGHT_DATA_API_KEY must be provided via constructor or environment variable. "  
                "Get your API key from: https://brightdata.com/cp/setting/users"  
            )  

        if not self.zone:  
            raise ValueError(  
                "BRIGHT_DATA_ZONE must be provided via constructor or environment variable. "  
                "Manage zones at: https://brightdata.com/cp/zones"  
            )  

        # setup session with API authentication  
        self.session = requests.Session()  
        self.session.headers.update({  
            'Content-Type': 'application/json',  
            'Authorization': f'Bearer {self.api_key}'  
        })  

    def search(        self,  
        query: str,  
        num_results: int = 10,  
        language: Optional[str] = None,  
        country: Optional[str] = None    ) ->` Dict[str, Any]:  
        """  
        Executes a google search.  
        Args:  
            query: Search query string  
            num_results: Number of results to return (default: 10)  
            language: Language code (e.g., 'en', 'es', 'fr')  
            country: Country code (e.g., 'us', 'uk', 'ca') - overrides instance default  

        Returns:  
            Dictionary containing search results in JSON format  
        """  
        # build Google search URL  
        search_url = (  
            f"https://www.google.com/search"  
            f"?q={requests.utils.quote(query)}"  
            f"&num={num_results}"  
            f"&brd_json=1"  
        )  

        if language:  
            search_url += f"&hl={language}&lr=lang_{language}"  

        # use method parameter country or instance default  
        target_country = country or self.country  

        # prepare request payload for SERP API  
        payload = {  
            'zone': self.zone,  
            'url': search_url,  
            'format': 'json'  
        }  

        if target_country:  
            payload['country'] = target_country  

        try:  
            # use POST request to SERP API endpoint  
            response = self.session.post(  
                self.api_endpoint,  
                json=payload,  
                timeout=30  
            )  
            response.raise_for_status()  

            # this SERP API returns JSON directly when format='json' so just return it  
            return response.json()  

        except requests.exceptions.HTTPError as e:  
            error_msg = f"Search request failed with HTTP {e.response.status_code}"  
            if e.response.text:  
                error_msg += f": {e.response.text[:200]}"  
            raise RuntimeError(error_msg) from e  
        except requests.exceptions.RequestException as e:  
            raise RuntimeError(f"Search request failed: {e}") from e
Enter fullscreen mode Exit fullscreen mode

As a preview, here’s how we use it later.

from src.bright_data import BrightDataClient  
# initialize client (reads credentials from .env file)  
client = BrightDataClient()  
# fetch SERP for a keyword  
raw_response = client.search(  
  query="ai crm",  
  num_results=10,  
  country="us"  
)  
# raw_response now contains the full SERP structure
Enter fullscreen mode Exit fullscreen mode

There’s just one problem — the raw Bright Data response you get back from this API is way too verbose:

{  
  "general": {  
    "search_engine": "google",  
    "query": "ai crm",  
    "language": "en",  
    "mobile": false,  
    "basic_view": false,  
    "search_type": "text",  
    "page_title": "ai crm - Google Search",  
    "timestamp": "2026–01–07T11:18:35.216Z"  
  },  
  "input": {  
    "original_url": "https://www.google.com/search?q=ai+crm&brd_json=1&gl=us",  
    "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)...",  
    "request_id": "some_request_id"  
  },  
  "navigation": [ /* 6 navigation links */ ],  
  "organic": [  
    {  
      "link": "https://attio.com/",  
      "source": "Attio",  
      "display_link": "https://attio.com",  
      "title": "Attio: The next gen of CRM",  
      "description": "Execute your revenue strategy with precision…",  
      "rank": 1,  
      "global_rank": 1,  
      "extensions": [ /* site links, dates, etc */ ]  
    },  
    // ...8 more results with full metadata  
  ],  
  "images": [ /* image results */ ],  
  "top_ads": [{}, {}],  
  "bottom_ads": [{}, {}, {}],  
  "pagination": { /* pagination links */ },  
  "related": [ /* 8 related searches */ ]  
}
Enter fullscreen mode Exit fullscreen mode

That’s 350+ lines of JSON for a single search. Most of it is noise for change detection. For change detection, we only care about:

  1. Which domains are present (not full URLs, titles, descriptions)
  2. Which features exist (ads, featured snippets, video carousels) — boolean flags
  3. When this snapshot was taken (timestamp)
  4. What keyword/geo this is for (context)

Everything else — navigation links, user agents, request IDs, pagination, related searches, full result metadata — is irrelevant for detecting changes.

So let’s normalize it into a much leaner format for consumption.

Step 2 : Why Normalization Matters

So we transform the verbose raw response into a compact snapshot:

"""  
/src/normalizer.py   

SERP data normalization  
converts raw Bright Data response to structured snapshot format  
"""  

from datetime import datetime  
from urllib.parse import urlparse  
from typing import Dict, Any  


def extract_domain(url: str) -> str:  
    """extract domain from URL"""  
    try:  
        parsed = urlparse(url)  
        domain = parsed.netloc or parsed.path  
        # remove www. prefix  
        if domain.startswith('www.'):  
            domain = domain[4:]  
        return domain  
    except Exception:  
        return url  


def normalize_serp_data(raw_response: Dict[str, Any], keyword: str, geo: str = "US") -> Dict[str, Any]:  
    """  
    normalize SERP response into structured format  

    Args:  
        raw_response: raw JSON response from Bright Data SERP API  
        keyword: search keyword  
        geo: geographic location code (default: "US")  

    Returns:  
        normalized dictionary with structured SERP data  
    """  

    # extract organic domains  
    organic_domains = []  
    if raw_response.get('organic') and isinstance(raw_response['organic'], list):  
        for result in raw_response['organic']:  
            link = result.get('link') or result.get('url', '')  
            if link:  
                domain = extract_domain(link)  
                if domain:  
                    organic_domains.append(domain)  

    # check for ads (top_ads or bottom_ads)  
    top_ads = raw_response.get('top_ads', [])  
    bottom_ads = raw_response.get('bottom_ads', [])  
    has_ads = bool(  
        (top_ads and len([a for a in top_ads if a]) > 0) or  
        (bottom_ads and len([a for a in bottom_ads if a]) > 0) or  
        (raw_response.get('ads') and len(raw_response.get('ads', [])) > 0)  
    )  
    ads_count = len([a for a in top_ads if a]) + len([a for a in bottom_ads if a])  

    # check for featured snippet (knowledge panel)  
    has_featured_snippet = bool(raw_response.get('knowledge'))  

    # check for video carousel  
    has_video_carousel = bool(raw_response.get('video') and len(raw_response.get('video', [])) > 0)  

    # check for people also ask  
    has_people_also_ask = bool(raw_response.get('people_also_ask') and len(raw_response.get('people_also_ask', [])) > 0)  

    # extract featured snippet owner if available  
    featured_snippet_owner = None  
    if has_featured_snippet:  
        knowledge = raw_response.get('knowledge', {})  
        # try to extract owner from knowledge panel  
        if knowledge.get('url'):  
            featured_snippet_owner = extract_domain(knowledge['url'])  
        elif knowledge.get('source'):  
            featured_snippet_owner = extract_domain(knowledge['source'])  

    # build normalized structure  
    normalized = {  
        "keyword": keyword,  
        "geo": geo,  
        "features": {  
            "ads": has_ads,  
            "ads_count": ads_count if has_ads else 0,  
            "featured_snippet": has_featured_snippet,  
            "featured_snippet_owner": featured_snippet_owner,  
            "video_carousel": has_video_carousel,  
            "people_also_ask": has_people_also_ask  
        },  
        "organic_domains": organic_domains,  
        "timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")  
    }  

    return normalized  

Enter fullscreen mode Exit fullscreen mode

Here, we extract just the domain names, not full URLs (i.e. from “https://www.salesforce.com/crm/” to “salesforce.com”. Why? Because:

  • URLs change (query params, paths) but domains are stable identifiers
  • We care about which sites are ranking, not specific pages
  • Domain-level tracking is more resilient to URL structure changes

We also convert complex nested structures into simple booleans:

# Raw:   
{  
  "top_ads": [{...}, {...}],   
  "bottom_ads": []  
}  
# Normalized:   
{  
  "ads": true,   
  "ads_count": 2  
}
Enter fullscreen mode Exit fullscreen mode

This makes some comparisons we’ll perform later, trivial:

if current['features']['ads'] and not previous['features']['ads']:  
  # Ads appeared! Emit a signal.
Enter fullscreen mode Exit fullscreen mode

Finally, when a featured snippet exists, we extract which domain owns it:

if has_featured_snippet:  
  knowledge = raw_response.get('knowledge', {})  
if knowledge.get('url'):  
  featured_snippet_owner = extract_domain(knowledge['url'])
Enter fullscreen mode Exit fullscreen mode

This lets us track when featured snippet ownership changes — a critical competitive signal.

Before vs After

Raw response: 350+ lines, ~15KB. Normalized snapshot: 22 lines, ~500 bytes

This way, we store 30x less data per snapshot, and perform comparisons way faster because we’re comparing arrays of domains instead of deep object structures.

Step 3: Change Detection — Turning Snapshots into Events

At this point we have normalized SERP snapshots. But snapshots alone aren’t very useful for Kafka. We’ve talked about this before — Kafka wants events, not states.

So the job of Step 3 is simple:

Compare the current snapshot with the previous one and emit only what changed i.e. the delta.

"""  
/src/change_detector.py  
change detection logic for SERP snapshots  
compares current vs previous state and emits change events  
"""  

from typing import Dict, Any, List, Optional  
from datetime import datetime  


class ChangeDetector:  
    """  
    detects changes between SERP snapshots  
    generates change events for Kafka  
    """  

    def detect_changes(        self,  
        current: Dict[str, Any],  
        previous: Optional[Dict[str, Any]]    ) -> List[Dict[str, Any]]:  
        """  
        compare current and previous snapshots, return list of change events  

        Args:  
            current: current normalized SERP snapshot  
            previous: previous normalized SERP snapshot (None if first run)  

        Returns:  
            list of change event dictionaries  
        """  
        events = []  

        if previous is None:  
            # first run - no changes to detect  
            return events  

        # detect feature changes  
        events.extend(self._detect_feature_changes(current, previous))  

        # detect organic domain changes  
        events.extend(self._detect_organic_changes(current, previous))  

        return events  

    def _detect_feature_changes(        self,  
        current: Dict[str, Any],  
        previous: Dict[str, Any]    ) -> List[Dict[str, Any]]:  
        """detect changes in SERP features (ads, featured snippet, etc.)"""  
        events = []  

        current_features = current.get('features', {})  
        previous_features = previous.get('features', {})  

        feature_types = ['ads', 'featured_snippet', 'video_carousel', 'people_also_ask']  

        for feature in feature_types:  
            current_value = current_features.get(feature, False)  
            previous_value = previous_features.get(feature, False)  

            # feature appeared  
            if current_value and not previous_value:  
                event = {  
                    "event_type": "serp_feature_added",  
                    "keyword": current['keyword'],  
                    "geo": current['geo'],  
                    "feature": feature,  
                    "timestamp": current['timestamp']  
                }  

                # add feature-specific metadata  
                if feature == "ads":  
                    # count ads if available  
                    event["count"] = current.get('features', {}).get('ads_count', 1)  
                elif feature == "featured_snippet":  
                    # extract owner domain if available  
                    owner = current.get('features', {}).get('featured_snippet_owner')  
                    if owner:  
                        event["owner_domain"] = owner  

                events.append(event)  

            # feature disappeared  
            elif not current_value and previous_value:  
                event = {  
                    "event_type": "serp_feature_removed",  
                    "keyword": current['keyword'],  
                    "geo": current['geo'],  
                    "feature": feature,  
                    "timestamp": current['timestamp']  
                }  
                events.append(event)  

        return events  

    def _detect_organic_changes(        self,  
        current: Dict[str, Any],  
        previous: Dict[str, Any]    ) -> List[Dict[str, Any]]:  
        """detect changes in organic domain rankings"""  
        events = []  

        current_domains = set(current.get('organic_domains', []))  
        previous_domains = set(previous.get('organic_domains', []))  

        # domains that exited (were in previous, not in current)  
        exited = previous_domains - current_domains  
        for domain in exited:  
            # find previous position  
            previous_position = self._get_domain_position(domain, previous)  
            event = {  
                "event_type": "organic_exit",  
                "keyword": current['keyword'],  
                "geo": current['geo'],  
                "domain": domain,  
                "previous_position": previous_position,  
                "timestamp": current['timestamp']  
            }  
            events.append(event)  

        # domains that entered (in current, not in previous)  
        entered = current_domains - previous_domains  
        for domain in entered:  
            # find current position  
            current_position = self._get_domain_position(domain, current)  
            event = {  
                "event_type": "organic_entry",  
                "keyword": current['keyword'],  
                "geo": current['geo'],  
                "domain": domain,  
                "current_position": current_position,  
                "timestamp": current['timestamp']  
            }  
            events.append(event)  

        return events  


    def _get_domain_position(        self,   
        domain: str,   
        snapshot: Dict[str, Any]    ) -> Optional[int]:  
        """get position of domain in organic results (1-indexed)"""  
        domains = snapshot.get('organic_domains', [])  
        try:  
            return domains.index(domain) + 1  
        except ValueError:  
            return None  


Enter fullscreen mode Exit fullscreen mode

This detector compares two normalized snapshots and emits only the delta. Because we normalized SERP features into simple booleans in Step 2, detecting changes becomes trivial.

# this avoids deep JSON comparisons + false positives from minor SERP variations  
if now and not before:  
    # feature appeared
Enter fullscreen mode Exit fullscreen mode

Also, for organic results, we intentionally ignore position changes. Rank reshuffles inside the top 10 happen constantly and aren’t very actionable. A domain entering or disappearing from the SERP is what’s actually interesting. Because organic results were normalized into a flat list of domains, this becomes a set comparison: previous — current = domains that exitedand current — previous = domains that entered. This gives us O(1) lookups and O(n) comparisons.

Each run produces zero or more small events like:

{  
  "event_type": "serp_feature_added",  
  "keyword": "ai crm",  
  "geo": "US",  
  "feature": "ads",  
  "count": 3,  
  "timestamp": "2026-01-07T12:00:00Z"  
},  
{  
  "event_type": "serp_feature_added",  
  "keyword": "ai crm",  
  "geo": "US",  
  "feature": "featured_snippet",  
  "owner_domain": "hubspot.com",  
  "timestamp": "2026-01-07T12:00:00Z"  
}
Enter fullscreen mode Exit fullscreen mode

Instead of storing “what the SERP looks like now”, now we’re storing:

  • what happened
  • when it happened
  • for which keyword + geo it happened.
  • When relevant, we also attach extra metadata (ad count, featured snippet owner) so downstream consumers don’t need to re-hydrate context.

This is the exact shape Kafka wants. At this point, SERP data has stopped being scraped snapshots for us, and is behaving like a true event stream.

Before we start the pipeline, we should cover state management, briefly.

Step 4: A Simple State Manager

We’ll use simple file-based storage:

"""  
/src/state_manager.py  
state management for SERP snapshots  
stores previous state to enable change detection  
"""  
import os  
import json  
from typing import Dict, Any, Optional  
from pathlib import Path  
class StateManager:  
    """  
    manages SERP snapshot state storage  
    stores previous snapshots to compare against current ones  
    """  

    def __init__(self, state_dir: str = "state"):  
        self.state_dir = Path(state_dir)  
        self.state_dir.mkdir(exist_ok=True)  

    def _get_state_file(self, keyword: str, geo: str) -> Path:  
        """get state file path for keyword+geo combination"""  
        safe_keyword = keyword.replace(" ", "_").lower()  
        safe_geo = geo.lower()  
        filename = f"{safe_keyword}_{safe_geo}.json"  
        return self.state_dir / filename  

    def load_previous_state(        self,   
        keyword: str,   
        geo: str    ) -> Optional[Dict[str, Any]]:  
        """  
        load previous SERP snapshot for keyword+geo  

        Returns:  
            previous snapshot dict or None if no previous state exists  
        """  
        state_file = self._get_state_file(keyword, geo)  

        if not state_file.exists():  
            return None  

        try:  
            with open(state_file, 'r', encoding='utf-8') as f:  
                return json.load(f)  
        except Exception as e:  
            print(f"Warning: failed to load state from {state_file}: {e}")  
            return None  

    def save_state(        self,   
        keyword: str,   
        geo: str,   
        snapshot: Dict[str, Any]    ) -> None:  
        """  
        save current SERP snapshot as new state  

        Args:  
            keyword: search keyword  
            geo: geographic location code  
            snapshot: normalized SERP snapshot to save  
        """  
        state_file = self._get_state_file(keyword, geo)  

        try:  
            with open(state_file, 'w', encoding='utf-8') as f:  
                json.dump(snapshot, f, indent=2, ensure_ascii=False)  
        except Exception as e:  
            print(f"Error: failed to save state to {state_file}: {e}")  
            raise
Enter fullscreen mode Exit fullscreen mode

So we’ll store our state files like this:

./state/  
  ├── ai_crm_us.json  
  ├── best_crm_us.json  
  └── ai_crm_uk.json
Enter fullscreen mode Exit fullscreen mode

Each state file contains the last normalized snapshot for that keyword + geo combination, as you’ll see when we bring it all together.

Step 4: The Producer Pipeline — Tying It All Together

At this point we have all the individual pieces:

  • SERP fetching
  • Snapshot normalization
  • Change detection
  • Kafka event emission

Step 4 is where they come together into a single producer pipeline.

This producer fetches the latest SERP → normalizes it → compares it to the previous snapshot → emits only the detected changes (the delta) to Kafka → persists state for the next run.

"""  
/src/producer.py  

Kafka producer for SERP change events  
fetches SERP data, detects changes, and emits events to Kafka  
"""  

import time  
import json  
from typing import Dict, Any, List  
from kafka import KafkaProducer  
from kafka.errors import KafkaError  

from bright_data import BrightDataClient  
from normalizer import normalize_serp_data  
from state_manager import StateManager  
from change_detector import ChangeDetector  


class SERPProducer:  
    """  
    For a Kafka producer that monitors SERP changes and emits events to Kafka  
    """  

    def __init__(        self,  
        kafka_brokers: str = "localhost:9092",  
        state_dir: str = "state"    ):  
        self.producer = KafkaProducer(  
            bootstrap_servers=kafka_brokers,  
            # convert because KafkaProducer expects callable serializers.  
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),  
            key_serializer=lambda k: k.encode('utf-8') if k else None  
        )  
        self.bright_data = BrightDataClient()  
        self.state_manager = StateManager(state_dir)  
        self.change_detector = ChangeDetector()  

    def process_keyword(        self,  
        keyword: str,  
        geo: str = "US",  
        num_results: int = 10    ) -> List[Dict[str, Any]]:  
        """  
        process a single keyword: fetch, compare, emit changes  

        Args:  
            keyword: search keyword to monitor  
            geo: geographic location code  
            num_results: number of results to fetch  

        Returns:  
            list of change events emitted  
        """  
        print(f"Processing keyword: {keyword} (geo: {geo})")  

        # fetch current SERP  
        print("  Fetching current SERP...")  
        try:  
            raw_response = self.bright_data.search(  
                query=keyword,  
                num_results=num_results,  
                country=geo.lower()  
            )  
        except Exception as e:  
            print(f"  Error fetching SERP: {e}")  
            return []  

        # normalize current snapshot  
        current_snapshot = normalize_serp_data(raw_response, keyword, geo)  
        print(f"  Current snapshot: {len(current_snapshot['organic_domains'])} domains")  

        # load previous state  
        previous_snapshot = self.state_manager.load_previous_state(keyword, geo)  

        if previous_snapshot:  
            print(f"  Previous snapshot: {len(previous_snapshot['organic_domains'])} domains")  
        else:  
            print("  No previous state found (first run)")  

        # detect changes  
        change_events = self.change_detector.detect_changes(  
            current_snapshot,  
            previous_snapshot  
        )  

        print(f"  Detected {len(change_events)} change(s)")  

        # emit events to Kafka  
        emitted_events = []  
        for event in change_events:  
            try:  
                # use keyword+geo as key for partitioning  
                key = f"{keyword}:{geo}"  
                topic = "serp-changes"                  
                future = self.producer.send(topic, key=key, value=event)  
                # wait for confirmation (optional - can be async)  
                record_metadata = future.get(timeout=10)  

                print(f"    Emitted: {event['event_type']} -> partition {record_metadata.partition}, offset {record_metadata.offset}")  
                emitted_events.append(event)  
            except KafkaError as e:  
                print(f"    Error emitting event: {e}")  

        # save current state as new previous state  
        self.state_manager.save_state(keyword, geo, current_snapshot)  
        print("  State saved")  

        return emitted_events  

    def run_monitoring_loop(        self,  
        keywords: List[Dict[str, str]],  
        interval_seconds: int = 1800  # 30 minutes default    ):  
        """  
        run continuous monitoring loop  

        Args:  
            keywords: list of dicts with 'keyword' and 'geo' keys  
            interval_seconds: seconds between monitoring cycles  
        """  
        print(f"Starting monitoring loop (interval: {interval_seconds}s)")  
        print(f"Tracking {len(keywords)} keyword(s)")  

        try:  
            while True:  
                print("\\n" + "=" * 60)  
                print(f"Monitoring cycle at {time.strftime('%Y-%m-%d %H:%M:%S')}")  
                print("=" * 60)  

                for kw_config in keywords:  
                    keyword = kw_config['keyword']  
                    geo = kw_config.get('geo', 'US')  

                    try:  
                        self.process_keyword(keyword, geo)  
                        # small delay between keywords  
                        time.sleep(2)  
                    except Exception as e:  
                        print(f"Error processing {keyword}: {e}")  

                print(f"\\nSleeping for {interval_seconds} seconds...")  
                time.sleep(interval_seconds)  

        except KeyboardInterrupt:  
            print("\\nShutting down...")  
            self.producer.close()  


def main():  
    """example usage"""  
    producer = SERPProducer()  

    # example: monitor single keyword  
    keywords = [  
        {"keyword": "ai crm", "geo": "US"}  
    ]  

    # run once (for testing)  
    # producer.process_keyword("ai crm", "US")  

    # or run continuous loop  
    producer.run_monitoring_loop(keywords, interval_seconds=3600)  # 1 hour  


if __name__ == "__main__":  
    main()  


Enter fullscreen mode Exit fullscreen mode

Notice the synchronous Kafka sends (future.get(timeout=10))? This is intentional. We’re not optimizing for throughput, we’re monitoring a handful of keywords every 30 minutes. I’d rather wait 10 seconds and know the event was written than fail silently and lose a critical alert.

For a high-throughput system, you’d batch events and send asynchronously.

Also, note the Kafka partitioning key we use: key = f”{keyword}:{geo}”. This ensures you’ll never see a “domain entered” event before the “featured snippet appeared” event that caused it, because they’re in the same partition and processed sequentially.

Here’s how we use run_monitoring_loop() to start the pipeline:

producer = SERPProducer()  
keywords = [  
    {"keyword": "ai crm", "geo": "US"},  
    {"keyword": "best crm", "geo": "US"}  
]  
# run continuous loop (every 30 minutes)  
producer.run_monitoring_loop(keywords, interval_seconds=1800)
Enter fullscreen mode Exit fullscreen mode

This pipeline runs continuously, emitting events whenever SERP composition changes. Next, we’ll see how multiple independent consumers process these events.

Step 5: The Consumers — Independent Event Processing

This is where Kafka’s value becomes obvious. We have three completely independent consumers processing the same event stream. Each has its own offset, its own logic, and can evolve independently.

But they all follow the same pattern:

  • Each consumer has a unique group_id — this is how Kafka tracks offsets per consumer
  • auto_offset_reset=’earliest’ — start from the beginning on first run (can replay history)
  • enable_auto_commit=True — automatically commit offsets after processing
  • Each consumer processes events at its own pace — if one is slow, others aren’t affected

Consumer 1: SEO Alerts

This consumer only cares about high-impact changes. Ads appearing? Featured snippets appearing? Those directly affect CTR. Everything else gets ignored.

"""  
Consumer 1: SEO Alerting Service  
listens for SERP feature changes that impact SEO  
"""  

import json  
import sys  
import os  
from datetime import datetime  
from kafka import KafkaConsumer  

# add src to path  
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))  


class SEOAlertConsumer:  
    """  
    consumer that alerts on SERP changes affecting SEO  
    focuses on ads and featured snippets appearing  
    """  

    def __init__(        self,  
        kafka_brokers: str = "localhost:9092",  
        topic: str = "serp-changes",  
        output_file: str = None    ):  
        self.topic = topic  
        self.consumer = KafkaConsumer(  
            topic,  
            bootstrap_servers=kafka_brokers,  
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),  
            group_id='seo-alert-consumer',  
            auto_offset_reset='earliest',  # start from beginning  
            enable_auto_commit=True  
        )  
        # set output file path relative to script directory  
        if output_file is None:  
            script_dir = os.path.dirname(os.path.abspath(__file__))  
            output_file = os.path.join(script_dir, 'seo_alerts.txt')  
        self.output_file = output_file  

    def process_event(self, event: dict) -> None:  
        """process a single change event"""  
        event_type = event.get('event_type')  

        # only care about feature additions (ads, featured snippets)  
        if event_type == 'serp_feature_added':  
            feature = event.get('feature')  

            if feature in ['ads', 'featured_snippet']:  
                alert = self._create_alert(event, feature)  
                self._save_alert(alert)  
                self._print_alert(alert)  

    def _create_alert(self, event: dict, feature: str) -> dict:  
        """create alert message"""  
        keyword = event.get('keyword')  
        geo = event.get('geo')  

        if feature == 'ads':  
            count = event.get('count', 1)  
            message = f"Heads up: Google added {count} ad(s) for '{keyword}' in {geo}. Expect organic CTR drop."  
        elif feature == 'featured_snippet':  
            owner = event.get('owner_domain', 'unknown')  
            message = f"Heads up: Google added a featured snippet for '{keyword}' in {geo} (owned by {owner}). Expect organic CTR drop."  
        else:  
            message = f"SERP feature '{feature}' appeared for '{keyword}' in {geo}"  

        return {  
            'timestamp': event.get('timestamp'),  
            'keyword': keyword,  
            'geo': geo,  
            'feature': feature,  
            'message': message,  
            'event': event  
        }  

    def _save_alert(self, alert: dict) -> None:  
        """save alert to file"""  
        try:  
            with open(self.output_file, 'a', encoding='utf-8') as f:  
                f.write(json.dumps(alert) + '\\n')  
        except Exception as e:  
            print(f"Error saving alert: {e}")  

    def _print_alert(self, alert: dict) -> None:  
        """print alert to console"""  
        print("\\n" + "=" * 60)  
        print("SEO ALERT")  
        print("=" * 60)  
        print(f"Time: {alert['timestamp']}")  
        print(f"Keyword: {alert['keyword']} ({alert['geo']})")  
        print(f"Feature: {alert['feature']}")  
        print(f"Message: {alert['message']}")  
        print("=" * 60)  

    def run(self):  
        """main consumer loop"""  
        print("SEO Alert Consumer started")  
        print(f"Listening to topic: {self.topic}")  
        print(f"Alerts will be saved to: {self.output_file}")  
        print("\\nWaiting for events...\\n")  

        try:  
            for message in self.consumer:  
                event = message.value  
                self.process_event(event)  
        except KeyboardInterrupt:  
            print("\\nShutting down...")  
        finally:  
            self.consumer.close()  


def main():  
    consumer = SEOAlertConsumer()  
    consumer.run()  


if __name__ == "__main__":  
    main()  

Enter fullscreen mode Exit fullscreen mode

This consumer simply filters for serp_feature_addedevents, and only processes ads and featured_snippet features. It creates human-readable alerts and saves them to a TXT file.

{  
  "timestamp": "2026-01-07T12:00:00Z",  
  "keyword": "ai crm",  
  "geo": "US",  
  "feature": "ads",  
  "message": "Heads up: Google added 3 ad(s) for 'ai crm' in US. Expect organic CTR drop.",  
  "event": { /* full event data */ }  
}
Enter fullscreen mode Exit fullscreen mode

Consumer 2: Competitive Intelligence

This consumer tracks competitors. You can configure a list of domains to watch (hubspot.com, salesforce.com, etc.), and it logs when they enter or exit the SERP, or when they gain/lose the featured snippet.

"""  
Consumer 2: Competitive Intelligence Service  
tracks competitor movements in SERP  
"""  

import json  
import sys  
import os  
from kafka import KafkaConsumer  

# add src to path  
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))  


class CompetitiveIntelConsumer:  
    """  
    consumer that tracks competitive movements  
    focuses on featured snippet ownership and domain entries/exits  
    """  

    def __init__(        self,  
        kafka_brokers: str = "localhost:9092",  
        topic: str = "serp-changes",  
        output_file: str = "competitive_intel.txt",  
        tracked_domains: list = None    ):  
        self.topic = topic  
        self.consumer = KafkaConsumer(  
            topic,  
            bootstrap_servers=kafka_brokers,  
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),  
            group_id='competitive-intel-consumer',  
            auto_offset_reset='earliest',  
            enable_auto_commit=True  
        )  
        self.output_file = output_file  
        self.tracked_domains = set(tracked_domains or [])  
        # track featured snippet ownership  
        self.featured_snippet_owners = {}  # keyword+geo -> domain  

    def process_event(self, event: dict) -> None:  
        """process a single change event"""  
        event_type = event.get('event_type')  
        keyword = event.get('keyword')  
        geo = event.get('geo')  
        key = f"{keyword}:{geo}"  

        # track featured snippet ownership changes  
        if event_type == 'serp_feature_added' and event.get('feature') == 'featured_snippet':  
            owner = event.get('owner_domain')  
            if owner:  
                previous_owner = self.featured_snippet_owners.get(key)  
                if previous_owner != owner:  
                    self._log_featured_snippet_change(keyword, geo, previous_owner, owner, event)  
                    self.featured_snippet_owners[key] = owner  

        # track domain entries/exits  
        if event_type == 'organic_entry':  
            domain = event.get('domain')  
            if self._should_track(domain):  
                self._log_domain_entry(keyword, geo, domain, event)  

        elif event_type == 'organic_exit':  
            domain = event.get('domain')  
            if self._should_track(domain):  
                self._log_domain_exit(keyword, geo, domain, event)  

    def _should_track(self, domain: str) -> bool:  
        """determine if domain should be tracked"""  
        if not self.tracked_domains:  
            return True  # track all if none specified  
        return domain in self.tracked_domains  

    def _log_featured_snippet_change(        self,  
        keyword: str,  
        geo: str,  
        previous_owner: str,  
        new_owner: str,  
        event: dict    ) -> None:  
        """log featured snippet ownership change"""  
        if previous_owner:  
            message = f"Featured snippet ownership changed: {previous_owner} -> {new_owner}"  
        else:  
            message = f"{new_owner} gained SERP ownership via featured snippet"  

        intel = {  
            'timestamp': event.get('timestamp'),  
            'keyword': keyword,  
            'geo': geo,  
            'type': 'featured_snippet_ownership',  
            'previous_owner': previous_owner,  
            'new_owner': new_owner,  
            'message': message  
        }  

        self._save_intel(intel)  
        self._print_intel(intel)  

    def _log_domain_entry(        self,  
        keyword: str,  
        geo: str,  
        domain: str,  
        event: dict    ) -> None:  
        """log domain entry into SERP"""  
        position = event.get('current_position')  
        intel = {  
            'timestamp': event.get('timestamp'),  
            'keyword': keyword,  
            'geo': geo,  
            'type': 'domain_entry',  
            'domain': domain,  
            'position': position,  
            'message': f"{domain} entered SERP at position {position}"  
        }  

        self._save_intel(intel)  
        self._print_intel(intel)  

    def _log_domain_exit(        self,  
        keyword: str,  
        geo: str,  
        domain: str,  
        event: dict    ) -> None:  
        """log domain exit from SERP"""  
        previous_position = event.get('previous_position')  
        intel = {  
            'timestamp': event.get('timestamp'),  
            'keyword': keyword,  
            'geo': geo,  
            'type': 'domain_exit',  
            'domain': domain,  
            'previous_position': previous_position,  
            'message': f"{domain} dropped out of SERP (was at position {previous_position})"  
        }  

        self._save_intel(intel)  
        self._print_intel(intel)  

    def _save_intel(self, intel: dict) -> None:  
        """save intelligence to file"""  
        try:  
            with open(self.output_file, 'a', encoding='utf-8') as f:  
                f.write(json.dumps(intel) + '\\n')  
        except Exception as e:  
            print(f"Error saving intel: {e}")  

    def _print_intel(self, intel: dict) -> None:  
        """print intelligence to console"""  
        print("\\n" + "=" * 60)  
        print("COMPETITIVE INTELLIGENCE")  
        print("=" * 60)  
        print(f"Time: {intel['timestamp']}")  
        print(f"Keyword: {intel['keyword']} ({intel['geo']})")  
        print(f"Type: {intel['type']}")  
        print(f"Message: {intel['message']}")  
        print("=" * 60)  

    def run(self):  
        """main consumer loop"""  
        print("Competitive Intelligence Consumer started")  
        print(f"Listening to topic: {self.topic}")  
        print(f"Intelligence will be saved to: {self.output_file}")  
        if self.tracked_domains:  
            print(f"Tracking domains: {', '.join(self.tracked_domains)}")  
        print("\\nWaiting for events...\\n")  

        try:  
            for message in self.consumer:  
                event = message.value  
                self.process_event(event)  
        except KeyboardInterrupt:  
            print("\\nShutting down...")  
        finally:  
            self.consumer.close()  


def main():  
    # example: track specific competitors  
    tracked_domains = ['hubspot.com', 'salesforce.com', 'zoho.com']  

    consumer = CompetitiveIntelConsumer(tracked_domains=tracked_domains)  
    consumer.run()  


if __name__ == "__main__":  
    main()  


Enter fullscreen mode Exit fullscreen mode

This maintains an in-memory state — featured_snippet_owners — to detect ownership changes. Other than that, it can track all domains or specific competitors. Detects when featured snippet ownership changes (HubSpot → Salesforce), and tracks when competitors enter or exit the SERP

{  
  "timestamp": "2026-01-07T12:00:00Z",  
  "keyword": "ai crm",  
  "geo": "US",  
  "type": "featured_snippet_ownership",  
  "previous_owner": "hubspot.com",  
  "new_owner": "salesforce.com",  
  "message": "Featured snippet ownership changed: hubspot.com -> salesforce.com"  
},  
{  
  "timestamp": "2026-01-07T12:00:00Z",  
  "keyword": "ai crm",  
  "geo": "US",  
  "type": "domain_entry",  
  "domain": "zoho.com",  
  "position": 5,  
  "message": "zoho.com entered SERP at position 5"  
}
Enter fullscreen mode Exit fullscreen mode

Of course, this can be extended to write to a database, generate reports, or trigger alerts.

Consumer 3: Volatility Analyzer

This consumer aggregates events over a time window and calculates a volatility score. More changes = higher volatility.


"""  
Consumer 3: Historical Volatility Analyzer  
aggregates SERP changes over time to calculate volatility scores  
"""  

import json  
import sys  
import os  
from datetime import datetime, timedelta  
from collections import defaultdict  
from kafka import KafkaConsumer  

# add src to path  
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))  


class VolatilityAnalyzerConsumer:  
    """  
    consumer that calculates SERP volatility scores  
    tracks change frequency over time windows  
    """  

    def __init__(        self,  
        kafka_brokers: str = "localhost:9092",  
        topic: str = "serp-changes",  
        output_file: str = "volatility_scores.txt",  
        window_days: int = 7    ):  
        self.topic = topic  
        self.consumer = KafkaConsumer(  
            topic,  
            bootstrap_servers=kafka_brokers,  
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),  
            group_id='volatility-analyzer-consumer',  
            auto_offset_reset='earliest',  
            enable_auto_commit=True  
        )  
        self.output_file = output_file  
        self.window_days = window_days  
        # track events per keyword+geo  
        self.events_by_keyword = defaultdict(list)  # (keyword, geo) -> [events]  

    def process_event(self, event: dict) -> None:  
        """process a single change event"""  
        keyword = event.get('keyword')  
        geo = event.get('geo')  
        key = (keyword, geo)  

        # store event with timestamp  
        self.events_by_keyword[key].append({  
            'event': event,  
            'timestamp': event.get('timestamp')  
        })  

        # calculate volatility for this keyword  
        volatility = self._calculate_volatility(key)  

        if volatility is not None:  
            score_data = {  
                'keyword': keyword,  
                'geo': geo,  
                'volatility_score': volatility,  
                'window_days': self.window_days,  
                'change_count': len(self.events_by_keyword[key]),  
                'timestamp': datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")  
            }  

            self._save_score(score_data)  
            self._print_score(score_data)  

    def _calculate_volatility(self, key: tuple) -> float:  
        """  
        calculate volatility score (0.0 to 1.0)  
        higher score = more changes = more volatile  
        """  
        events = self.events_by_keyword[key]  

        if len(events) `< 2:  
            return None  # need at least 2 events  

        # filter events within time window  
        cutoff_time = datetime.utcnow() - timedelta(days=self.window_days)  

        recent_events = [  
            e for e in events  
            if datetime.fromisoformat(e['timestamp'].replace('Z', '+00:00')).replace(tzinfo=None) >` cutoff_time  
        ]  

        if len(recent_events) `< 2:  
            return None  

        # simple volatility: change count normalized by time window  
        # more sophisticated: could weight by change type, magnitude, etc.  
        change_count = len(recent_events)  

        # normalize: assume 1 change per day = 0.5 score  
        # max reasonable: ~10 changes per day = 1.0 score  
        max_changes_per_day = 10  
        days_in_window = self.window_days  
        max_reasonable_changes = max_changes_per_day * days_in_window  

        volatility = min(change_count / max_reasonable_changes, 1.0)  

        return round(volatility, 2)  

    def _save_score(self, score_data: dict) ->` None:  
        """save volatility score to file"""  
        try:  
            with open(self.output_file, 'a', encoding='utf-8') as f:  
                f.write(json.dumps(score_data) + '\\n')  
        except Exception as e:  
            print(f"Error saving score: {e}")  

    def _print_score(self, score_data: dict) -> None:  
        """print volatility score to console"""  
        print("\\n" + "=" * 60)  
        print("VOLATILITY ANALYSIS")  
        print("=" * 60)  
        print(f"Keyword: {score_data['keyword']} ({score_data['geo']})")  
        print(f"Volatility Score: {score_data['volatility_score']:.2f}")  
        print(f"Window: {score_data['window_days']} days")  
        print(f"Change Count: {score_data['change_count']}")  
        print(f"Timestamp: {score_data['timestamp']}")  
        print("=" * 60)  

    def run(self):  
        """main consumer loop"""  
        print("Volatility Analyzer Consumer started")  
        print(f"Listening to topic: {self.topic}")  
        print(f"Scores will be saved to: {self.output_file}")  
        print(f"Analysis window: {self.window_days} days")  
        print("\\nWaiting for events...\\n")  

        try:  
            for message in self.consumer:  
                event = message.value  
                self.process_event(event)  
        except KeyboardInterrupt:  
            print("\\nShutting down...")  
        finally:  
            self.consumer.close()  


def main():  
    consumer = VolatilityAnalyzerConsumer(window_days=7)  
    consumer.run()  


if __name__ == "__main__":  
    main()  


Enter fullscreen mode Exit fullscreen mode

Here’s how I’m defining and calculating volatility here (capping at 1.0 or 100%) :

volatility = min(change_count / (max_changes_per_day * window_days), 1.0)

And this’ll produce and log alerts like this.

{  
  "keyword": "ai crm",  
  "geo": "US",  
  "volatility_score": 0.35,  
  "window_days": 7,  
  "change_count": 25,  
  "timestamp": "2026-01-07T12:00:00Z"  
}
Enter fullscreen mode Exit fullscreen mode

This says: 25 changes in 7 days = 0.35 volatility (moderately volatile)

Is this the best volatility metric? Probably not. But it’s a starting point. You could weight by change type (featured snippet changes are more significant than domain entries), factor in change magnitude (position shifts vs entries/exits), or normalize by keyword search volume,

The point is this consumer can evolve independently. You can improve the volatility algorithm without touching the producer or other consumers.

Each consumer runs independently:

# Terminal 1: SEO Alerts  
python consumers/seo_alert_consumer.py  
# Terminal 2: Competitive Intelligence  
python consumers/competitive_intel_consumer.py  
# Terminal 3: Volatility Analyzer  
python consumers/volatility_analyzer_consumer.py
Enter fullscreen mode Exit fullscreen mode

These consumers read from the same Kafka topic (serp-changes), each maintains its own offset (can process at different speeds), if one crashes, others continue processing, each writes to its own output file/log, and if you restart a consumer, it resumes from its last committed offset.

What’s The Payoff?

When you put all three consumers together, you stop asking “what rank are we today?” and start answering:

  • How competitive is this market right now?
  • Is Google monetizing [insert query here] more aggressively?
  • Are our competitors being displaced or reinforced?
  • Is this SERP worth investing in, or is it too volatile?
  • Are competitors winning because of strategy, or because of SERP structure?

See the shift? You’re no longer tracking positions. You’re observing market dynamics.

And because this is event-driven, every insight is replayable, every consumer can evolve independently, and every new question you want answered is just another consumer you have to write.

Top comments (3)

Collapse
 
martin_miles_297f74dd4964 profile image
Martin Miles

What's your actual polling frequency for the SERP API? Article doesn't mention it but I'm guessing you're not hitting every keyword every 5 minutes or your Bright Data bill would be insane. Do you tier keywords by importance or just run everything hourly/daily?

Collapse
 
martijn_assie_12a2d3b1833 profile image
Martijn Assie

Crazy how normalized snapshots make change detection so cheap, saves you from parsing 350+ lines every run... tip, if you scale this, consider batching events per keyword to reduce Kafka overhead!!

Some comments may only be visible to logged-in visitors. Sign in to view all comments.