Ever needed to build a system where AI agents can receive emails in real-time and schedule meetings on behalf of users? Here's how I architected a production-grade solution using Gmail Push API, Google Cloud Pub/Sub, and domain-wide delegation.
The Challenge
Traditional email polling is inefficient and introduces latency. We needed:
- Real-time email notifications when messages arrive
- Meeting scheduling on behalf of users without individual OAuth flows
- Reliable message processing with no duplicates or missed emails
Architecture Overview
Gmail Inbox → Gmail API → Pub/Sub Topic → Webhook → Backend → AI Agent
↓
Google Calendar API
↓
Google Meet Link
Part 1: Gmail Push Notifications
Setting Up Pub/Sub
Gmail's Push API publishes notifications to a Pub/Sub topic when emails arrive:
# Create topic and push subscription
gcloud pubsub topics create gmail-inbound
gcloud pubsub subscriptions create gmail-inbound-sub \
--topic=gmail-inbound \
--push-endpoint="https://your-api.com/webhooks/gmail"
# Grant Gmail permission to publish
gcloud pubsub topics add-iam-policy-binding gmail-inbound \
--member="serviceAccount:gmail-api-push@system.gserviceaccount.com" \
--role="roles/pubsub.publisher"
The historyId Gotcha
Gmail sends historyId as a number, but many JSON parsers expect strings. This caused silent failures:
// Broken - fails silently
type gmailNotification struct {
HistoryID string `json:"historyId"` // Wrong!
}
// Fixed - handles both string and number
type gmailNotification struct {
HistoryID json.Number `json:"historyId"` // Correct!
}
// Usage
historyIDStr := notification.HistoryID.String()
History-Based Incremental Sync
Instead of fetching all emails, we track the last processed historyId in Redis and only fetch new messages:
func (s *Service) HandleNotification(ctx context.Context, email, historyID string) error {
// Get last processed history ID from Redis
lastHistoryID := s.redis.Get(ctx, "gmail:history:"+email)
// Fetch only new messages since last sync
messages := s.gmail.History.List("me").
StartHistoryId(lastHistoryID).
HistoryTypes("messageAdded").Do()
// Process each message
for _, msg := range messages {
s.processMessage(ctx, msg)
}
// Update checkpoint
s.redis.Set(ctx, "gmail:history:"+email, historyID)
return nil
}
Watch Renewal
Gmail watches expire after 7 days. A Cloud Scheduler job renews them every 5 days:
gcloud scheduler jobs create http gmail-watch-renewal \
--schedule="0 0 */5 * *" \
--uri="https://your-api.com/internal/gmail/watch/renew" \
--http-method=POST
Part 2: Domain-Wide Delegation for Calendar
The magic of domain-wide delegation: a single service account can act on behalf of any user in your Google Workspace domain - no individual OAuth flows required.
JWT Bearer Token Flow
func (p *AuthProvider) GetHTTPClient(ctx context.Context, userEmail string) *http.Client {
// Create JWT claims with 'sub' for user impersonation
claims := map[string]interface{}{
"iss": p.serviceAccountEmail,
"sub": userEmail, // The user we're acting as
"scope": "https://www.googleapis.com/auth/calendar",
"aud": "https://oauth2.googleapis.com/token",
"exp": time.Now().Add(time.Hour).Unix(),
"iat": time.Now().Unix(),
}
// Sign with RSA private key
token := jwt.Sign(claims, p.privateKey)
// Exchange for access token
accessToken := p.exchangeJWT(token)
return &http.Client{
Transport: &bearerTransport{token: accessToken},
}
}
Creating Meetings with Meet Links
func (s *SchedulerService) CreateMeeting(ctx context.Context, input CreateMeetingInput) (*Meeting, error) {
// Check for conflicts
if conflicts := s.repo.FindConflicts(ctx, input.HostID, input.Start, input.End); len(conflicts) > 0 {
return nil, ErrTimeSlotConflict
}
// Get authorized client for this user
client := s.authProvider.GetHTTPClient(ctx, input.HostEmail)
// Create event with automatic Meet link
event := &calendar.Event{
Summary: input.Title,
Start: &calendar.EventDateTime{DateTime: input.Start.Format(time.RFC3339)},
End: &calendar.EventDateTime{DateTime: input.End.Format(time.RFC3339)},
ConferenceData: &calendar.ConferenceData{
CreateRequest: &calendar.CreateConferenceRequest{
RequestId: uuid.NewString(),
ConferenceSolutionKey: &calendar.ConferenceSolutionKey{
Type: "hangoutsMeet",
},
},
},
}
created := calendarService.Events.Insert("primary", event).
ConferenceDataVersion(1).Do()
return &Meeting{
MeetLink: created.HangoutLink,
// ...
}, nil
}
Part 3: Availability Checking
Calculate available slots by merging busy times from multiple sources:
func (s *AvailabilityService) GetSlots(ctx context.Context, req Request) []Slot {
// 1. Load availability rules (e.g., Mon-Fri 9AM-5PM)
rules := s.repo.GetRules(ctx, req.HostID)
// 2. Fetch Google Calendar busy times
freeBusy := s.calendar.FreeBusy(ctx, req.Start, req.End)
// 3. Fetch local meeting conflicts
localMeetings := s.repo.GetMeetings(ctx, req.HostID, req.Start, req.End)
// 4. Merge and find gaps
busyBlocks := merge(freeBusy, localMeetings)
return generateSlots(rules, busyBlocks, req.SlotDuration)
}
Key Lessons Learned
Always return 200 for invalid webhooks - Pub/Sub retries on non-2xx responses. Return 200 with a skip message for malformed payloads.
Use
json.Numberfor numeric IDs - Google APIs sometimes send numbers where you expect strings.History can expire - If
historyIdis too old, Gmail returns 404. Fall back to scanning recent messages.Buffer your slots - Add configurable buffer time before/after meetings for travel or prep.
Cache access tokens - JWT exchange is expensive. Cache tokens until ~30 seconds before expiry.
Conclusion
This architecture enables AI agents to process emails in real-time and schedule meetings autonomously. The combination of Pub/Sub push notifications and domain-wide delegation eliminates polling overhead and individual auth flows.
The system has been running in production handling thousands of emails daily with sub-second notification latency.
What challenges have you faced with Google API integrations? Share in the comments!
Top comments (0)