Redis HSET always returns a value, even when the key doesn't exist. This led to a subtle bug in our production system: state updates were logging success when they were actually failing silently.
This guide covers Redis state management patterns for long-running Cloud Run tasks, including how to detect missing keys, handle socket timeouts, and properly configure VPC networking for Cloud Run + Memorystore.
The Problem: Silent Failures
We use Redis (Memorystore) to track state for long-running evaluation tasks. Each task updates its progress periodically:
# ❌ BAD: This logs success even when the key doesn't exist
import redis
import logging
r = redis.Redis(host='10.x.x.x', port=6379)
def update_task_progress(task_id: str, progress: int):
result = r.hset(f"task:{task_id}", "progress", progress)
if result:
logging.info(f"Updated progress for {task_id}: {progress}%")
else:
logging.warning(f"Failed to update progress for {task_id}")
The bug: HSET returns 1 if a new field was added, 0 if the field already existed. But if the key task:{task_id} doesn't exist, HSET creates it and returns 1. So the code logs success even when the key was missing.
Understanding Redis HSET Behavior
Let's clarify what HSET actually does:
# Key exists, field exists → returns 0 (field updated)
r.hset("task:123", "progress", 50) # Returns 0
# Key exists, field doesn't exist → returns 1 (new field added)
r.hset("task:123", "status", "running") # Returns 1
# Key doesn't exist → creates key, returns 1 (new field added)
r.hset("task:456", "progress", 25) # Returns 1 (key didn't exist!)
The return value tells you if a field was added, not if the key existed.
The Fix: Check Key Existence First
For state management, you usually want to ensure the key exists before updating:
# ✅ GOOD: Check key existence first
def update_task_progress(task_id: str, progress: int):
key = f"task:{task_id}"
# Check if key exists
if not r.exists(key):
logging.error(f"Task {task_id} not found in Redis")
# Create recovery record or raise exception
create_recovery_record(task_id, "missing_key")
return False
# Update progress
result = r.hset(key, "progress", progress)
logging.info(f"Updated progress for {task_id}: {progress}%")
return True
Creating Recovery Records
When you detect a missing key, log it somewhere persistent (not just Redis):
from datetime import datetime
import json
def create_recovery_record(task_id: str, error_type: str):
"""Log missing keys to Firestore or Cloud SQL for recovery."""
recovery_record = {
"task_id": task_id,
"error_type": error_type,
"timestamp": datetime.utcnow().isoformat(),
"status": "needs_recovery"
}
# Write to Firestore or Cloud SQL
db.collection("recovery_records").add(recovery_record)
logging.error(f"Created recovery record: {recovery_record}")
Handling Socket Timeouts on Startup
Cloud Run services can start cold. If Redis isn't ready (or VPC networking isn't established), you'll get socket timeouts:
# ✅ GOOD: Retry with exponential backoff on startup
import time
from redis.exceptions import ConnectionError, TimeoutError
def get_redis_client(max_retries=5):
"""Get Redis client with retry logic for cold starts."""
r = redis.Redis(
host=os.getenv("REDIS_HOST"),
port=int(os.getenv("REDIS_PORT", 6379)),
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
health_check_interval=30
)
# Retry on connection failure (cold start)
for attempt in range(max_retries):
try:
r.ping() # Test connection
return r
except (ConnectionError, TimeoutError) as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt # Exponential backoff
logging.warning(f"Redis connection failed (attempt {attempt + 1}/{max_retries}), retrying in {wait_time}s...")
time.sleep(wait_time)
return r
# Use in your service
r = get_redis_client()
Cloud Run + Memorystore VPC Requirements
Memorystore Redis uses private IPs. Your Cloud Run service needs VPC access:
Option 1: VPC Connector (Recommended)
# Create VPC Connector (if you don't have one)
gcloud compute networks vpc-access connectors create redis-connector \
--region us-central1 \
--subnet default \
--subnet-project PROJECT_ID \
--min-instances 2 \
--max-instances 3
# Deploy Cloud Run service with VPC Connector
gcloud run deploy evaluation-service \
--image gcr.io/PROJECT/service:latest \
--vpc-connector projects/PROJECT/locations/us-central1/connectors/redis-connector \
--vpc-egress private-ranges-only \
--set-env-vars REDIS_HOST=10.x.x.x,REDIS_PORT=6379
Why private-ranges-only? Your service only needs VPC access for Redis. Public internet traffic (API calls, Cloud Tasks) can go through the default route.
Option 2: Direct VPC Egress
# Deploy with Direct VPC Egress
gcloud run deploy evaluation-service \
--image gcr.io/PROJECT/service:latest \
--network projects/PROJECT/global/networks/VPC_NAME \
--subnet projects/PROJECT/regions/us-central1/subnetworks/SUBNET_NAME \
--set-env-vars REDIS_HOST=10.x.x.x,REDIS_PORT=6379
Note: You can't use both VPC Connector and Direct VPC Egress. Pick one.
Complete Example: State Management with Error Handling
Here's a production-ready pattern:
import redis
import logging
import os
from typing import Optional, Dict
from datetime import datetime, timedelta
class TaskStateManager:
def __init__(self):
self.redis = redis.Redis(
host=os.getenv("REDIS_HOST"),
port=int(os.getenv("REDIS_PORT", 6379)),
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
health_check_interval=30,
decode_responses=True # Return strings, not bytes
)
self.ttl = int(os.getenv("REDIS_TTL_SECONDS", 3600)) # 1 hour default
def initialize_task(self, task_id: str, initial_state: Dict) -> bool:
"""Create a new task state record."""
key = f"task:{task_id}"
# Check if already exists
if self.redis.exists(key):
logging.warning(f"Task {task_id} already exists, skipping initialization")
return False
# Set initial state
self.redis.hset(key, mapping=initial_state)
self.redis.expire(key, self.ttl) # Set TTL
logging.info(f"Initialized task {task_id}")
return True
def update_progress(self, task_id: str, progress: int) -> bool:
"""Update task progress. Returns False if task doesn't exist."""
key = f"task:{task_id}"
# Check existence first
if not self.redis.exists(key):
logging.error(f"Task {task_id} not found in Redis")
self._create_recovery_record(task_id, "missing_key_on_update")
return False
# Update progress
self.redis.hset(key, "progress", progress)
self.redis.hset(key, "last_updated", datetime.utcnow().isoformat())
logging.info(f"Updated progress for {task_id}: {progress}%")
return True
def get_state(self, task_id: str) -> Optional[Dict]:
"""Get current task state."""
key = f"task:{task_id}"
if not self.redis.exists(key):
return None
return self.redis.hgetall(key)
def complete_task(self, task_id: str, final_state: Dict) -> bool:
"""Mark task as complete and store final state."""
key = f"task:{task_id}"
if not self.redis.exists(key):
logging.error(f"Task {task_id} not found when completing")
self._create_recovery_record(task_id, "missing_key_on_complete")
return False
# Update final state
final_state["status"] = "completed"
final_state["completed_at"] = datetime.utcnow().isoformat()
self.redis.hset(key, mapping=final_state)
# Extend TTL for completed tasks (keep for 24 hours)
self.redis.expire(key, 86400)
logging.info(f"Completed task {task_id}")
return True
def _create_recovery_record(self, task_id: str, error_type: str):
"""Log missing keys for recovery."""
# In production, write to Firestore or Cloud SQL
logging.error(f"Recovery needed: task_id={task_id}, error_type={error_type}")
# Usage
state_manager = TaskStateManager()
# Initialize task
state_manager.initialize_task("eval-123", {
"status": "running",
"progress": 0,
"started_at": datetime.utcnow().isoformat()
})
# Update progress
state_manager.update_progress("eval-123", 50)
# Get state
state = state_manager.get_state("eval-123")
print(state) # {'status': 'running', 'progress': '50', ...}
# Complete task
state_manager.complete_task("eval-123", {
"score": 0.95,
"duration_seconds": 120
})
Testing Redis State Management
import pytest
from unittest.mock import Mock, patch
def test_update_progress_missing_key():
"""Test that missing keys are detected."""
state_manager = TaskStateManager()
state_manager.redis = Mock()
state_manager.redis.exists.return_value = False # Key doesn't exist
result = state_manager.update_progress("task-123", 50)
assert result is False
state_manager.redis.hset.assert_not_called() # Should not update
def test_update_progress_existing_key():
"""Test that existing keys are updated."""
state_manager = TaskStateManager()
state_manager.redis = Mock()
state_manager.redis.exists.return_value = True # Key exists
result = state_manager.update_progress("task-123", 50)
assert result is True
state_manager.redis.hset.assert_called() # Should update
TL;DR
| Problem | Solution |
|---|---|
HSET returns 1 even when key doesn't exist |
Check EXISTS before updating |
| Silent failures in production | Create recovery records for missing keys |
| Socket timeouts on cold start | Retry with exponential backoff |
| VPC networking issues | Use VPC Connector or Direct VPC Egress |
| No visibility into Redis state | Log all state operations, use structured logging |
Key takeaway: Always check key existence before updating state. Redis operations are fast, but missing keys indicate a deeper problem (task initialization failed, TTL expired, or data corruption).
More production GCP articles on my blog. I write about patterns from real infrastructure — find me at humzakt.github.io.
Top comments (0)