Message queues decouple services, handle traffic spikes, and make systems resilient. Here's a practical guide to getting started.
When Do You Need a Message Queue?
- Processing takes too long for a synchronous response
- You need to handle traffic spikes without dropping requests
- Services need to communicate without tight coupling
- You want retry logic for unreliable operations
Basic Concepts
Producer sends messages to a queue. Consumer reads and processes them. The queue buffers messages between them.
[Producer] --> [Queue] --> [Consumer]
| |
"Send email" Actually sends
Returns immediately the email
RabbitMQ with Python
pip install pika
Producer
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='email_queue', durable=True)
def send_email_task(to: str, subject: str, body: str):
message = json.dumps({"to": to, "subject": subject, "body": body})
channel.basic_publish(
exchange='',
routing_key='email_queue',
body=message,
properties=pika.BasicProperties(delivery_mode=2) # persistent
)
print(f"Queued email to {to}")
send_email_task("user@example.com", "Welcome!", "Thanks for signing up")
connection.close()
Consumer
import pika
import json
import smtplib
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='email_queue', durable=True)
channel.basic_qos(prefetch_count=1)
def process_email(ch, method, properties, body):
data = json.loads(body)
try:
# Actually send the email
print(f"Sending email to {data['to']}: {data['subject']}")
# smtp.send(...)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Failed: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_consume(queue='email_queue', on_message_callback=process_email)
print("Waiting for messages...")
channel.start_consuming()
Redis as a Simple Queue
For simpler use cases, Redis lists work great:
import redis
import json
r = redis.Redis()
# Producer
def enqueue(queue_name: str, task: dict):
r.lpush(queue_name, json.dumps(task))
# Consumer
def dequeue(queue_name: str, timeout: int = 0):
_, data = r.brpop(queue_name, timeout=timeout)
return json.loads(data)
# Producer side
enqueue("tasks", {"type": "resize_image", "path": "/uploads/photo.jpg", "size": [800, 600]})
# Consumer side
while True:
task = dequeue("tasks")
process(task)
Pub/Sub Pattern
One message, multiple consumers:
# Publisher
def publish_event(event_type: str, data: dict):
r.publish(event_type, json.dumps(data))
publish_event("user.created", {"id": 123, "name": "Alice"})
# Subscriber 1: Send welcome email
# Subscriber 2: Create default settings
# Subscriber 3: Notify admin
Dead Letter Queues
Handle messages that repeatedly fail:
MAX_RETRIES = 3
def process_with_retry(message):
retries = message.get("_retries", 0)
try:
do_work(message)
except Exception:
if retries < MAX_RETRIES:
message["_retries"] = retries + 1
enqueue("tasks", message) # retry
else:
enqueue("dead_letter", message) # give up
Key Takeaways
- Use queues when you need async processing or decoupling
- RabbitMQ for complex routing, Redis for simple queues
- Always acknowledge messages after successful processing
- Implement dead letter queues for failed messages
- Monitor queue depth — growing queues mean consumers can't keep up
6. Make consumers idempotent — messages may be delivered more than once
🚀 Level up your AI workflow! Check out my AI Developer Mega Prompt Pack — 80 battle-tested prompts for developers. $9.99
Top comments (0)