DEV Community

Humza Tareen
Humza Tareen

Posted on

Redis State Management on Cloud Run: Handling Missing Keys and Silent Failures

Redis HSET always returns a value, even when the key doesn't exist. This led to a subtle bug in our production system: state updates were logging success when they were actually failing silently.

This guide covers Redis state management patterns for long-running Cloud Run tasks, including how to detect missing keys, handle socket timeouts, and properly configure VPC networking for Cloud Run + Memorystore.

The Problem: Silent Failures

We use Redis (Memorystore) to track state for long-running evaluation tasks. Each task updates its progress periodically:

# ❌ BAD: This logs success even when the key doesn't exist
import redis
import logging

r = redis.Redis(host='10.x.x.x', port=6379)

def update_task_progress(task_id: str, progress: int):
    result = r.hset(f"task:{task_id}", "progress", progress)
    if result:
        logging.info(f"Updated progress for {task_id}: {progress}%")
    else:
        logging.warning(f"Failed to update progress for {task_id}")
Enter fullscreen mode Exit fullscreen mode

The bug: HSET returns 1 if a new field was added, 0 if the field already existed. But if the key task:{task_id} doesn't exist, HSET creates it and returns 1. So the code logs success even when the key was missing.

Understanding Redis HSET Behavior

Let's clarify what HSET actually does:

# Key exists, field exists → returns 0 (field updated)
r.hset("task:123", "progress", 50)  # Returns 0

# Key exists, field doesn't exist → returns 1 (new field added)
r.hset("task:123", "status", "running")  # Returns 1

# Key doesn't exist → creates key, returns 1 (new field added)
r.hset("task:456", "progress", 25)  # Returns 1 (key didn't exist!)
Enter fullscreen mode Exit fullscreen mode

The return value tells you if a field was added, not if the key existed.

The Fix: Check Key Existence First

For state management, you usually want to ensure the key exists before updating:

# ✅ GOOD: Check key existence first
def update_task_progress(task_id: str, progress: int):
    key = f"task:{task_id}"

    # Check if key exists
    if not r.exists(key):
        logging.error(f"Task {task_id} not found in Redis")
        # Create recovery record or raise exception
        create_recovery_record(task_id, "missing_key")
        return False

    # Update progress
    result = r.hset(key, "progress", progress)
    logging.info(f"Updated progress for {task_id}: {progress}%")
    return True
Enter fullscreen mode Exit fullscreen mode

Creating Recovery Records

When you detect a missing key, log it somewhere persistent (not just Redis):

from datetime import datetime
import json

def create_recovery_record(task_id: str, error_type: str):
    """Log missing keys to Firestore or Cloud SQL for recovery."""
    recovery_record = {
        "task_id": task_id,
        "error_type": error_type,
        "timestamp": datetime.utcnow().isoformat(),
        "status": "needs_recovery"
    }

    # Write to Firestore or Cloud SQL
    db.collection("recovery_records").add(recovery_record)
    logging.error(f"Created recovery record: {recovery_record}")
Enter fullscreen mode Exit fullscreen mode

Handling Socket Timeouts on Startup

Cloud Run services can start cold. If Redis isn't ready (or VPC networking isn't established), you'll get socket timeouts:

# ✅ GOOD: Retry with exponential backoff on startup
import time
from redis.exceptions import ConnectionError, TimeoutError

def get_redis_client(max_retries=5):
    """Get Redis client with retry logic for cold starts."""
    r = redis.Redis(
        host=os.getenv("REDIS_HOST"),
        port=int(os.getenv("REDIS_PORT", 6379)),
        socket_connect_timeout=5,
        socket_timeout=5,
        retry_on_timeout=True,
        health_check_interval=30
    )

    # Retry on connection failure (cold start)
    for attempt in range(max_retries):
        try:
            r.ping()  # Test connection
            return r
        except (ConnectionError, TimeoutError) as e:
            if attempt == max_retries - 1:
                raise
            wait_time = 2 ** attempt  # Exponential backoff
            logging.warning(f"Redis connection failed (attempt {attempt + 1}/{max_retries}), retrying in {wait_time}s...")
            time.sleep(wait_time)

    return r

# Use in your service
r = get_redis_client()
Enter fullscreen mode Exit fullscreen mode

Cloud Run + Memorystore VPC Requirements

Memorystore Redis uses private IPs. Your Cloud Run service needs VPC access:

Option 1: VPC Connector (Recommended)

# Create VPC Connector (if you don't have one)
gcloud compute networks vpc-access connectors create redis-connector \
  --region us-central1 \
  --subnet default \
  --subnet-project PROJECT_ID \
  --min-instances 2 \
  --max-instances 3

# Deploy Cloud Run service with VPC Connector
gcloud run deploy evaluation-service \
  --image gcr.io/PROJECT/service:latest \
  --vpc-connector projects/PROJECT/locations/us-central1/connectors/redis-connector \
  --vpc-egress private-ranges-only \
  --set-env-vars REDIS_HOST=10.x.x.x,REDIS_PORT=6379
Enter fullscreen mode Exit fullscreen mode

Why private-ranges-only? Your service only needs VPC access for Redis. Public internet traffic (API calls, Cloud Tasks) can go through the default route.

Option 2: Direct VPC Egress

# Deploy with Direct VPC Egress
gcloud run deploy evaluation-service \
  --image gcr.io/PROJECT/service:latest \
  --network projects/PROJECT/global/networks/VPC_NAME \
  --subnet projects/PROJECT/regions/us-central1/subnetworks/SUBNET_NAME \
  --set-env-vars REDIS_HOST=10.x.x.x,REDIS_PORT=6379
Enter fullscreen mode Exit fullscreen mode

Note: You can't use both VPC Connector and Direct VPC Egress. Pick one.

Complete Example: State Management with Error Handling

Here's a production-ready pattern:

import redis
import logging
import os
from typing import Optional, Dict
from datetime import datetime, timedelta

class TaskStateManager:
    def __init__(self):
        self.redis = redis.Redis(
            host=os.getenv("REDIS_HOST"),
            port=int(os.getenv("REDIS_PORT", 6379)),
            socket_connect_timeout=5,
            socket_timeout=5,
            retry_on_timeout=True,
            health_check_interval=30,
            decode_responses=True  # Return strings, not bytes
        )
        self.ttl = int(os.getenv("REDIS_TTL_SECONDS", 3600))  # 1 hour default

    def initialize_task(self, task_id: str, initial_state: Dict) -> bool:
        """Create a new task state record."""
        key = f"task:{task_id}"

        # Check if already exists
        if self.redis.exists(key):
            logging.warning(f"Task {task_id} already exists, skipping initialization")
            return False

        # Set initial state
        self.redis.hset(key, mapping=initial_state)
        self.redis.expire(key, self.ttl)  # Set TTL

        logging.info(f"Initialized task {task_id}")
        return True

    def update_progress(self, task_id: str, progress: int) -> bool:
        """Update task progress. Returns False if task doesn't exist."""
        key = f"task:{task_id}"

        # Check existence first
        if not self.redis.exists(key):
            logging.error(f"Task {task_id} not found in Redis")
            self._create_recovery_record(task_id, "missing_key_on_update")
            return False

        # Update progress
        self.redis.hset(key, "progress", progress)
        self.redis.hset(key, "last_updated", datetime.utcnow().isoformat())

        logging.info(f"Updated progress for {task_id}: {progress}%")
        return True

    def get_state(self, task_id: str) -> Optional[Dict]:
        """Get current task state."""
        key = f"task:{task_id}"

        if not self.redis.exists(key):
            return None

        return self.redis.hgetall(key)

    def complete_task(self, task_id: str, final_state: Dict) -> bool:
        """Mark task as complete and store final state."""
        key = f"task:{task_id}"

        if not self.redis.exists(key):
            logging.error(f"Task {task_id} not found when completing")
            self._create_recovery_record(task_id, "missing_key_on_complete")
            return False

        # Update final state
        final_state["status"] = "completed"
        final_state["completed_at"] = datetime.utcnow().isoformat()
        self.redis.hset(key, mapping=final_state)

        # Extend TTL for completed tasks (keep for 24 hours)
        self.redis.expire(key, 86400)

        logging.info(f"Completed task {task_id}")
        return True

    def _create_recovery_record(self, task_id: str, error_type: str):
        """Log missing keys for recovery."""
        # In production, write to Firestore or Cloud SQL
        logging.error(f"Recovery needed: task_id={task_id}, error_type={error_type}")

# Usage
state_manager = TaskStateManager()

# Initialize task
state_manager.initialize_task("eval-123", {
    "status": "running",
    "progress": 0,
    "started_at": datetime.utcnow().isoformat()
})

# Update progress
state_manager.update_progress("eval-123", 50)

# Get state
state = state_manager.get_state("eval-123")
print(state)  # {'status': 'running', 'progress': '50', ...}

# Complete task
state_manager.complete_task("eval-123", {
    "score": 0.95,
    "duration_seconds": 120
})
Enter fullscreen mode Exit fullscreen mode

Testing Redis State Management

import pytest
from unittest.mock import Mock, patch

def test_update_progress_missing_key():
    """Test that missing keys are detected."""
    state_manager = TaskStateManager()
    state_manager.redis = Mock()
    state_manager.redis.exists.return_value = False  # Key doesn't exist

    result = state_manager.update_progress("task-123", 50)

    assert result is False
    state_manager.redis.hset.assert_not_called()  # Should not update

def test_update_progress_existing_key():
    """Test that existing keys are updated."""
    state_manager = TaskStateManager()
    state_manager.redis = Mock()
    state_manager.redis.exists.return_value = True  # Key exists

    result = state_manager.update_progress("task-123", 50)

    assert result is True
    state_manager.redis.hset.assert_called()  # Should update
Enter fullscreen mode Exit fullscreen mode

TL;DR

Problem Solution
HSET returns 1 even when key doesn't exist Check EXISTS before updating
Silent failures in production Create recovery records for missing keys
Socket timeouts on cold start Retry with exponential backoff
VPC networking issues Use VPC Connector or Direct VPC Egress
No visibility into Redis state Log all state operations, use structured logging

Key takeaway: Always check key existence before updating state. Redis operations are fast, but missing keys indicate a deeper problem (task initialization failed, TTL expired, or data corruption).


More production GCP articles on my blog. I write about patterns from real infrastructure — find me at humzakt.github.io.


Top comments (0)