DEV Community

Diven Rastdus
Diven Rastdus

Posted on • Originally published at astraedus.dev

How I Built a Push-Based Gmail Bridge for My AI Agent (Zero Polling, Free Tier)

I missed a prize-notification email by 24 hours because my AI agent only checked Gmail when it booted. The email needed a response within 48 hours. I had 24 left by the time the next session started. That gap nearly cost me real money.

Polling is the obvious fix. Set up a cron that checks Gmail every 5 minutes. But polling Gmail has three problems:

  1. Latency floor equals poll interval. 5-minute polling means up to 5 minutes of dead time on urgent messages.
  2. Wasted API calls. 288 API calls per day to catch maybe 3-5 messages that actually matter.
  3. Rate limit risk. Gmail API quotas are generous (15K units/day/user) but polling invites you to burn them on nothing.

What I wanted: sub-5-second email delivery into my agent's filesystem, with classification and priority routing, on a total monthly cost of exactly zero dollars.

Here's what I built.

Architecture

Gmail (your-email@gmail.com)
  | users.watch() -- renew daily, 7d max expiry
  v
Cloud Pub/Sub topic
  | push subscription (OIDC-signed JWT)
  v
Cloudflare Tunnel (public URL -> localhost:8090)
  v
Python receiver (aiohttp)
  - verify OIDC JWT from Google
  - dedupe by Pub/Sub messageId (SQLite)
  - history.list since last stored historyId
  - messages.get for each new message
  - classify by rules engine (YAML, hot-reload)
  - fan out: urgent -> Telegram ping, info -> digest file
Enter fullscreen mode Exit fullscreen mode

The key insight: Gmail's users.watch() method tells Google "push a notification to this Pub/Sub topic whenever this mailbox changes." Google handles the watching. You handle the reacting.

Step 1: Set up Pub/Sub

# Create topic and subscription
gcloud pubsub topics create gmail-notifications
gcloud pubsub subscriptions create gmail-push \
  --topic=gmail-notifications \
  --push-endpoint=https://your-hook.example.com/pubsub \
  --push-auth-service-account=your-sa@project.iam.gserviceaccount.com

# Grant Gmail permission to publish to your topic
# (Gmail API uses a fixed service account for this)
gcloud pubsub topics add-iam-policy-binding gmail-notifications \
  --member="serviceAccount:gmail-api-push@system.gserviceaccount.com" \
  --role="roles/pubsub.publisher"
Enter fullscreen mode Exit fullscreen mode

Cost: $0. First 10 GiB/month of Pub/Sub throughput is free. Email notifications are tiny.

Step 2: Register the Gmail watch

from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

creds = Credentials.from_authorized_user_file("token.json")
service = build("gmail", "v1", credentials=creds)

result = service.users().watch(
    userId="me",
    body={
        "topicName": "projects/your-project/topics/gmail-notifications",
        "labelIds": ["INBOX"],
    },
).execute()

print(f"Watch expires: {result['expiration']}")
# Watch expires: 1714540800000 (7 days from now)
Enter fullscreen mode Exit fullscreen mode

Two catches:

  • The watch expires after 7 days max. Set up a daily cron to renew it.
  • If the watch silently expires (your renewal cron missed a day), you lose notifications until renewal. Build a staleness check.

Step 3: The receiver

The receiver is a tiny aiohttp server. When Pub/Sub pushes a notification, it tells you "the mailbox changed" but not what changed. You have to walk the history yourself.

from aiohttp import web
from google.oauth2 import id_token
from google.auth.transport import requests as google_requests

PUBSUB_AUDIENCE = "https://your-hook.example.com/pubsub"

async def handle_pubsub(request: web.Request) -> web.Response:
    # Step 1: Verify the OIDC JWT from Google
    auth = request.headers.get("Authorization", "")
    if not auth.startswith("Bearer "):
        return web.Response(status=401)

    token = auth[7:].strip()
    claims = id_token.verify_oauth2_token(
        token, google_requests.Request(), audience=PUBSUB_AUDIENCE
    )

    # Step 2: Decode the notification
    envelope = await request.json()
    data = base64.b64decode(envelope["message"]["data"])
    notif = json.loads(data)
    history_id = notif["historyId"]

    # Step 3: Walk history since last known point
    asyncio.create_task(walk_and_process(history_id))

    # Step 4: ACK immediately (Pub/Sub retries on non-2xx)
    return web.Response(status=204)
Enter fullscreen mode Exit fullscreen mode

The critical pattern: ACK fast, process async. If your handler takes more than 10 seconds, Pub/Sub assumes delivery failed and retries. This creates duplicate processing unless you dedupe. Return 204 immediately, do the expensive work in a background task.

Step 4: History walking

Gmail notifications only say "something changed at historyId X." You need to find out what actually changed by walking the history since your last-seen ID.

def list_history(service, start_history_id: str):
    """Walk history.list, return new message IDs."""
    added = []
    page_token = None

    while True:
        resp = service.users().history().list(
            userId="me",
            startHistoryId=start_history_id,
            historyTypes=["messageAdded"],
            pageToken=page_token,
        ).execute()

        for record in resp.get("history", []):
            for msg in record.get("messagesAdded", []):
                added.append(msg["message"])

        page_token = resp.get("nextPageToken")
        if not page_token:
            break

    return added, resp.get("historyId")
Enter fullscreen mode Exit fullscreen mode

The gotcha: if your stored historyId is older than 7 days, history.list returns 404. You need a fallback:

try:
    added, latest_id = list_history(service, last_history_id)
except HttpError as e:
    if e.resp.status == 404:
        # historyId expired -- fall back to recent messages
        added = list_recent_unread(service, days=3)
        latest_id = current_history_id
    else:
        raise
Enter fullscreen mode Exit fullscreen mode

Step 5: Classification (the useful part)

Raw email delivery is not enough. You need routing. My classifier uses a YAML rules file that hot-reloads on every call (no restart needed to add rules):

rules:
  - name: payment-notification
    match:
      from_regex: "@stripe\\.com"
      subject_regex: "(?i)(payment|payout|charge)"
    classification: REVENUE

  - name: warm-lead-reply
    match:
      in_thread_with_outbound: true
      from_not_regex: "(?i)(noreply|automated|newsletter)"
    classification: URGENT-HUMAN

  - name: default
    match: {}
    classification: INFORMATIONAL
Enter fullscreen mode Exit fullscreen mode

The in_thread_with_outbound check is the clever one. It queries the local SQLite store for "have I previously sent an email in this thread?" If yes, the reply is from someone I contacted -- a warm lead, not spam. Classify it as urgent.

def _has_outbound_in_thread(db_path, thread_id, self_addr):
    conn = sqlite3.connect(str(db_path))
    row = conn.execute(
        "SELECT 1 FROM seen_gmail_msg "
        "WHERE thread_id = ? AND from_addr LIKE ? LIMIT 1",
        (thread_id, f"%{self_addr}%"),
    ).fetchone()
    conn.close()
    return row is not None
Enter fullscreen mode Exit fullscreen mode

First match wins. The engine processes 10 rules in under 1ms. No need for a proper NLP pipeline here.

Step 6: Fan-out

After classification, messages route to different outputs:

if classification in ("URGENT-HUMAN", "REVENUE"):
    # Push notification (Telegram, Discord, whatever)
    await send_alert(f"[{classification}] {from_addr}\n{subject}")
    # Also write to urgent inbox file
    append_to_file("~/ops/INBOX-URGENT.md", formatted_entry)
else:
    # Quiet digest
    append_to_file("~/ops/INBOX-DIGEST.md", formatted_entry)

# Always: per-thread markdown file for full history
write_thread_file(f"~/ops/threads/email/{thread_id}.md", message)
Enter fullscreen mode Exit fullscreen mode

Each thread gets its own markdown file with frontmatter. This makes them searchable, greppable, and compatible with tools like Obsidian.

The tunnel: Cloudflare (free, stable)

Google Pub/Sub needs a public HTTPS endpoint. Options:

Option Cost Reliability
ngrok $8/mo for stable URL Good but paid
Cloudflare Tunnel $0 Excellent. Runs as systemd service.
Cloud Function $0 (free tier) Adds cold start latency
Self-hosted VPS $5-20/mo Overkill

Cloudflare Tunnel wins. Install cloudflared, authenticate, create a tunnel pointing to localhost:8090, add a DNS record. Done. It runs as a systemd service with automatic reconnection.

cloudflared tunnel create gmail-bridge
cloudflared tunnel route dns gmail-bridge your-hook.example.com
# Then create a systemd unit that runs:
# cloudflared tunnel run gmail-bridge
Enter fullscreen mode Exit fullscreen mode

Failure handling

The architecture has natural resilience built in:

Failure What happens
Receiver crashes systemd restarts it. Pub/Sub retries delivery for 7 days.
Tunnel drops cloudflared reconnects. Pub/Sub retries.
historyId too old Falls back to messages.list newer_than:3d.
Duplicate delivery SQLite dedup by Pub/Sub messageId.
Gmail API 5xx Logged to dead-letter file. Retried on next notification.
Watch silently expires Daily renewal cron + staleness monitor.

The entire system can be down for a week and recover automatically because Pub/Sub holds undelivered messages for 7 days.

Results

  • Latency: sub-5 seconds from Gmail receiving the email to the file appearing on disk
  • Monthly cost: $0 (all free-tier components)
  • Uptime: 6 days continuous without intervention so far
  • False positives: 0 (rule-based classification is deterministic and auditable)
  • Missed emails: 0 since deployment

The classification system has already caught a Devpost prize email within seconds instead of the 24-hour gap that motivated this build. Telegram pings for urgent items, quiet digest for everything else.

What I would do differently

  1. Start with the classification rules, not the infrastructure. I spent 2 hours on Pub/Sub setup before thinking about what to do with the emails. Should have designed the rules first.
  2. Use a single SQLite DB for everything. I initially split dedup and thread state across files. Consolidating to one DB simplified the code.
  3. Hot-reload from the start. Editing rules + restarting the service is friction. YAML hot-reload (just re-read the file on every classification call) costs nothing and removes the restart step entirely.

The code

The full implementation is ~300 lines across 4 files:

  • receiver.py: aiohttp server, OIDC verification, history walking
  • gmail_client.py: OAuth, message fetch, history list
  • classifier.py: rules engine
  • store.py: SQLite dedup + markdown persistence

Total dependencies: aiohttp, google-auth, google-api-python-client, pyyaml. All well-maintained, no exotic packages.

If your agent, automation, or workflow needs to react to emails in real-time without burning API calls on polling, this architecture works. The Gmail watch + Pub/Sub + tunnel pattern is the same one large-scale email processors use -- you just don't need the scale part.


I build production AI agent infrastructure. If your team has automation that reacts too slowly to real-world events, let's talk: astraedus.dev

Top comments (0)