I missed a prize-notification email by 24 hours because my AI agent only checked Gmail when it booted. The email needed a response within 48 hours. I had 24 left by the time the next session started. That gap nearly cost me real money.
Polling is the obvious fix. Set up a cron that checks Gmail every 5 minutes. But polling Gmail has three problems:
- Latency floor equals poll interval. 5-minute polling means up to 5 minutes of dead time on urgent messages.
- Wasted API calls. 288 API calls per day to catch maybe 3-5 messages that actually matter.
- Rate limit risk. Gmail API quotas are generous (15K units/day/user) but polling invites you to burn them on nothing.
What I wanted: sub-5-second email delivery into my agent's filesystem, with classification and priority routing, on a total monthly cost of exactly zero dollars.
Here's what I built.
Architecture
Gmail (your-email@gmail.com)
| users.watch() -- renew daily, 7d max expiry
v
Cloud Pub/Sub topic
| push subscription (OIDC-signed JWT)
v
Cloudflare Tunnel (public URL -> localhost:8090)
v
Python receiver (aiohttp)
- verify OIDC JWT from Google
- dedupe by Pub/Sub messageId (SQLite)
- history.list since last stored historyId
- messages.get for each new message
- classify by rules engine (YAML, hot-reload)
- fan out: urgent -> Telegram ping, info -> digest file
The key insight: Gmail's users.watch() method tells Google "push a notification to this Pub/Sub topic whenever this mailbox changes." Google handles the watching. You handle the reacting.
Step 1: Set up Pub/Sub
# Create topic and subscription
gcloud pubsub topics create gmail-notifications
gcloud pubsub subscriptions create gmail-push \
--topic=gmail-notifications \
--push-endpoint=https://your-hook.example.com/pubsub \
--push-auth-service-account=your-sa@project.iam.gserviceaccount.com
# Grant Gmail permission to publish to your topic
# (Gmail API uses a fixed service account for this)
gcloud pubsub topics add-iam-policy-binding gmail-notifications \
--member="serviceAccount:gmail-api-push@system.gserviceaccount.com" \
--role="roles/pubsub.publisher"
Cost: $0. First 10 GiB/month of Pub/Sub throughput is free. Email notifications are tiny.
Step 2: Register the Gmail watch
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
creds = Credentials.from_authorized_user_file("token.json")
service = build("gmail", "v1", credentials=creds)
result = service.users().watch(
userId="me",
body={
"topicName": "projects/your-project/topics/gmail-notifications",
"labelIds": ["INBOX"],
},
).execute()
print(f"Watch expires: {result['expiration']}")
# Watch expires: 1714540800000 (7 days from now)
Two catches:
- The watch expires after 7 days max. Set up a daily cron to renew it.
- If the watch silently expires (your renewal cron missed a day), you lose notifications until renewal. Build a staleness check.
Step 3: The receiver
The receiver is a tiny aiohttp server. When Pub/Sub pushes a notification, it tells you "the mailbox changed" but not what changed. You have to walk the history yourself.
from aiohttp import web
from google.oauth2 import id_token
from google.auth.transport import requests as google_requests
PUBSUB_AUDIENCE = "https://your-hook.example.com/pubsub"
async def handle_pubsub(request: web.Request) -> web.Response:
# Step 1: Verify the OIDC JWT from Google
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
return web.Response(status=401)
token = auth[7:].strip()
claims = id_token.verify_oauth2_token(
token, google_requests.Request(), audience=PUBSUB_AUDIENCE
)
# Step 2: Decode the notification
envelope = await request.json()
data = base64.b64decode(envelope["message"]["data"])
notif = json.loads(data)
history_id = notif["historyId"]
# Step 3: Walk history since last known point
asyncio.create_task(walk_and_process(history_id))
# Step 4: ACK immediately (Pub/Sub retries on non-2xx)
return web.Response(status=204)
The critical pattern: ACK fast, process async. If your handler takes more than 10 seconds, Pub/Sub assumes delivery failed and retries. This creates duplicate processing unless you dedupe. Return 204 immediately, do the expensive work in a background task.
Step 4: History walking
Gmail notifications only say "something changed at historyId X." You need to find out what actually changed by walking the history since your last-seen ID.
def list_history(service, start_history_id: str):
"""Walk history.list, return new message IDs."""
added = []
page_token = None
while True:
resp = service.users().history().list(
userId="me",
startHistoryId=start_history_id,
historyTypes=["messageAdded"],
pageToken=page_token,
).execute()
for record in resp.get("history", []):
for msg in record.get("messagesAdded", []):
added.append(msg["message"])
page_token = resp.get("nextPageToken")
if not page_token:
break
return added, resp.get("historyId")
The gotcha: if your stored historyId is older than 7 days, history.list returns 404. You need a fallback:
try:
added, latest_id = list_history(service, last_history_id)
except HttpError as e:
if e.resp.status == 404:
# historyId expired -- fall back to recent messages
added = list_recent_unread(service, days=3)
latest_id = current_history_id
else:
raise
Step 5: Classification (the useful part)
Raw email delivery is not enough. You need routing. My classifier uses a YAML rules file that hot-reloads on every call (no restart needed to add rules):
rules:
- name: payment-notification
match:
from_regex: "@stripe\\.com"
subject_regex: "(?i)(payment|payout|charge)"
classification: REVENUE
- name: warm-lead-reply
match:
in_thread_with_outbound: true
from_not_regex: "(?i)(noreply|automated|newsletter)"
classification: URGENT-HUMAN
- name: default
match: {}
classification: INFORMATIONAL
The in_thread_with_outbound check is the clever one. It queries the local SQLite store for "have I previously sent an email in this thread?" If yes, the reply is from someone I contacted -- a warm lead, not spam. Classify it as urgent.
def _has_outbound_in_thread(db_path, thread_id, self_addr):
conn = sqlite3.connect(str(db_path))
row = conn.execute(
"SELECT 1 FROM seen_gmail_msg "
"WHERE thread_id = ? AND from_addr LIKE ? LIMIT 1",
(thread_id, f"%{self_addr}%"),
).fetchone()
conn.close()
return row is not None
First match wins. The engine processes 10 rules in under 1ms. No need for a proper NLP pipeline here.
Step 6: Fan-out
After classification, messages route to different outputs:
if classification in ("URGENT-HUMAN", "REVENUE"):
# Push notification (Telegram, Discord, whatever)
await send_alert(f"[{classification}] {from_addr}\n{subject}")
# Also write to urgent inbox file
append_to_file("~/ops/INBOX-URGENT.md", formatted_entry)
else:
# Quiet digest
append_to_file("~/ops/INBOX-DIGEST.md", formatted_entry)
# Always: per-thread markdown file for full history
write_thread_file(f"~/ops/threads/email/{thread_id}.md", message)
Each thread gets its own markdown file with frontmatter. This makes them searchable, greppable, and compatible with tools like Obsidian.
The tunnel: Cloudflare (free, stable)
Google Pub/Sub needs a public HTTPS endpoint. Options:
| Option | Cost | Reliability |
|---|---|---|
| ngrok | $8/mo for stable URL | Good but paid |
| Cloudflare Tunnel | $0 | Excellent. Runs as systemd service. |
| Cloud Function | $0 (free tier) | Adds cold start latency |
| Self-hosted VPS | $5-20/mo | Overkill |
Cloudflare Tunnel wins. Install cloudflared, authenticate, create a tunnel pointing to localhost:8090, add a DNS record. Done. It runs as a systemd service with automatic reconnection.
cloudflared tunnel create gmail-bridge
cloudflared tunnel route dns gmail-bridge your-hook.example.com
# Then create a systemd unit that runs:
# cloudflared tunnel run gmail-bridge
Failure handling
The architecture has natural resilience built in:
| Failure | What happens |
|---|---|
| Receiver crashes | systemd restarts it. Pub/Sub retries delivery for 7 days. |
| Tunnel drops | cloudflared reconnects. Pub/Sub retries. |
| historyId too old | Falls back to messages.list newer_than:3d. |
| Duplicate delivery | SQLite dedup by Pub/Sub messageId. |
| Gmail API 5xx | Logged to dead-letter file. Retried on next notification. |
| Watch silently expires | Daily renewal cron + staleness monitor. |
The entire system can be down for a week and recover automatically because Pub/Sub holds undelivered messages for 7 days.
Results
- Latency: sub-5 seconds from Gmail receiving the email to the file appearing on disk
- Monthly cost: $0 (all free-tier components)
- Uptime: 6 days continuous without intervention so far
- False positives: 0 (rule-based classification is deterministic and auditable)
- Missed emails: 0 since deployment
The classification system has already caught a Devpost prize email within seconds instead of the 24-hour gap that motivated this build. Telegram pings for urgent items, quiet digest for everything else.
What I would do differently
- Start with the classification rules, not the infrastructure. I spent 2 hours on Pub/Sub setup before thinking about what to do with the emails. Should have designed the rules first.
- Use a single SQLite DB for everything. I initially split dedup and thread state across files. Consolidating to one DB simplified the code.
- Hot-reload from the start. Editing rules + restarting the service is friction. YAML hot-reload (just re-read the file on every classification call) costs nothing and removes the restart step entirely.
The code
The full implementation is ~300 lines across 4 files:
-
receiver.py: aiohttp server, OIDC verification, history walking -
gmail_client.py: OAuth, message fetch, history list -
classifier.py: rules engine -
store.py: SQLite dedup + markdown persistence
Total dependencies: aiohttp, google-auth, google-api-python-client, pyyaml. All well-maintained, no exotic packages.
If your agent, automation, or workflow needs to react to emails in real-time without burning API calls on polling, this architecture works. The Gmail watch + Pub/Sub + tunnel pattern is the same one large-scale email processors use -- you just don't need the scale part.
I build production AI agent infrastructure. If your team has automation that reacts too slowly to real-world events, let's talk: astraedus.dev
Top comments (0)