Building Scalable Message Queues with Redis Streams and Dynamic Consumers
Learn how to build a production-ready message queue system using Redis Streams that scales from 1 to N workers with guaranteed message isolation and zero coordination overhead.
What You'll Learn
By the end of this guide, you'll understand:
- The Problem: How to handle multiple users sending messages to multiple AI services with guaranteed response isolation
- The Solution: Shared input queues + per-user response streams architecture
- Consumer Groups: How Redis automatically distributes work across workers
- Dynamic Scaling: Adding and removing workers with zero configuration changes
- Real Implementation: Complete working code with practical examples
The Problem: Message Routing at Scale
Imagine you're building an AI chatbot service. Here's the scenario:
- Alice sends "Hello AI!" to your service
- Bob sends "Hi there!" at the same time
- You have 3 AI workers processing messages
- Worker 1 picks up Alice's message
- Worker 2 picks up Bob's message
- Critical Question: How does Alice get only her response and not Bob's?
The Naive Approach (Don't Do This)
# Bad: Single queue with filtering async def user_reads_responses(user_id): while True: messages = await redis.xread({"shared_responses": ">"}) for msg in messages: # Filter through ALL messages to find yours if msg["user_id"] == user_id: return msg["response"] # Problem: Alice sees Bob's messages pass by # Problem: Multiple users competing for the same stream # Problem: No message isolation
Why this fails:
- No message isolation (users see each other's messages)
- Inefficient filtering on the client side
- Race conditions when multiple consumers read the same stream
- Messages might be processed multiple times
The Right Architecture
Multiple Users → Shared Input Queue → Multiple AI Workers → Per-User Streams → Correct User ↓ ↓ ↓ ↓ alice, bob user_messages_queue ai-worker-1,2,3 ai_to_user:alice ai_to_user:bob
Key Insight: Separate the input aggregation from output distribution.
The Solution: Dual-Stream Architecture
Architecture Component 1: Shared Input Queue
# All users write to ONE shared stream await redis.xadd( "user_messages_queue", { "user_id": "alice", "message": "Hello AI!", "timestamp": datetime.now().isoformat() } )
Benefits:
- Natural work aggregation
- Any AI worker can handle any message
- Built-in load balancing via consumer groups
- No coordination between users needed
Architecture Component 2: Per-User Response Streams
# AI worker extracts user_id and writes to dedicated stream user_id = message_data["user_id"] await redis.xadd( f"ai_to_user:{user_id}", # alice gets ai_to_user:alice { "response": "Echo: Hello AI!", "timestamp": datetime.now().isoformat() } )
Benefits:
- Complete isolation - Alice never sees Bob's responses
- No filtering logic needed
- Each user subscribes only to their stream
- Works with any number of AI workers
The Magic: Consumer Groups
Consumer groups are Redis's built-in mechanism for distributing messages across multiple workers:
# Create consumer group (done once) await redis.xgroup_create( "user_messages_queue", "ai_service_group", id='0', mkstream=True ) # Multiple workers read from the same group messages = await redis.xreadgroup( group_name="ai_service_group", consumer_name="ai-worker-1", # Each worker has unique name streams={"user_messages_queue": ">"}, # '>' = only new messages count=1, block=5000 )
What Redis Guarantees:
- Each message delivered to exactly one worker in the group
- Automatic load balancing across all workers
- Message acknowledgment for reliability
- Pending message tracking for failure recovery
Complete Implementation
Let's build a complete working system. You can find the full code here: github.com/rakeshbhugra/dynamic_consumers_for_redis_streams
Project Structure
src/ ├── user_simulate.py # User class (produces + consumes) ├── user1.py # Alice implementation ├── user2.py # Bob implementation ├── ai_service.py # AI worker implementation └── stream_info.py # Real-time monitoring
Step 1: User Implementation
The User class handles both sending messages and receiving responses:
# user_simulate.py import asyncio from redis.asyncio import Redis class User: def __init__(self, user_id: str, redis_url: str = "redis://localhost:6379"): self.user_id = user_id self.redis_url = redis_url self.input_queue = "user_messages_queue" self.response_stream = f"ai_to_user:{user_id}" async def setup(self): """Initialize Redis connection and consumer group""" self.redis = Redis.from_url(self.redis_url, decode_responses=True) # Create consumer group for this user's response stream try: await self.redis.xgroup_create( self.response_stream, f"wsgroup_{self.user_id}", id='0', mkstream=True ) except Exception as e: if "BUSYGROUP" not in str(e): raise async def send_message(self, message: str): """Send message to shared input queue""" message_id = await self.redis.xadd( self.input_queue, { "user_id": self.user_id, "message": message, "timestamp": datetime.now().isoformat() } ) print(f"[{self.user_id}] Sent: {message}") return message_id async def receive_responses(self): """Listen for responses on dedicated stream""" consumer_name = f"consumer_{self.user_id}" while True: try: # Read from consumer group messages = await self.redis.xreadgroup( f"wsgroup_{self.user_id}", consumer_name, {self.response_stream: ">"}, count=1, block=5000 ) if messages: for stream, message_list in messages: for message_id, data in message_list: response = data.get("response") print(f"[{self.user_id}] Received: {response}") # Acknowledge message await self.redis.xack( self.response_stream, f"wsgroup_{self.user_id}", message_id ) except Exception as e: print(f"[{self.user_id}] Error: {e}") await asyncio.sleep(1) async def run(self, messages: list): """Run user simulation: send messages and listen for responses""" await self.setup() # Start response listener in background listener_task = asyncio.create_task(self.receive_responses()) # Send messages with delay for msg in messages: await self.send_message(msg) await asyncio.sleep(2) # Keep listening for responses await listener_task
Step 2: AI Service Implementation
The AI service consumes from the shared queue and routes responses:
# ai_service.py import asyncio import sys from redis.asyncio import Redis from datetime import datetime class AIService: def __init__(self, worker_id: str, redis_url: str = "redis://localhost:6379"): self.worker_id = worker_id self.redis_url = redis_url self.input_queue = "user_messages_queue" self.consumer_group = "ai_service_group" async def setup(self): """Initialize Redis and create consumer group""" self.redis = Redis.from_url(self.redis_url, decode_responses=True) try: await self.redis.xgroup_create( self.input_queue, self.consumer_group, id='0', mkstream=True ) print(f"[{self.worker_id}] Created consumer group") except Exception as e: if "BUSYGROUP" in str(e): print(f"[{self.worker_id}] Consumer group already exists") else: raise async def process_message(self, user_id: str, message: str): """Process user message (simulate AI processing)""" # Simulate processing time await asyncio.sleep(1) # Echo the message back (replace with actual AI logic) response = f"Echo: {message}" return response async def start(self): """Start consuming messages from the queue""" await self.setup() print(f"[{self.worker_id}] Started consuming messages...") while True: try: # Read messages from consumer group messages = await self.redis.xreadgroup( self.consumer_group, self.worker_id, {self.input_queue: ">"}, count=1, block=5000 ) if messages: for stream, message_list in messages: for message_id, data in message_list: user_id = data.get("user_id") message = data.get("message") print(f"[{self.worker_id}] Processing message from {user_id}: {message}") # Process the message response = await self.process_message(user_id, message) # Send response to user-specific stream response_stream = f"ai_to_user:{user_id}" await self.redis.xadd( response_stream, { "response": response, "worker_id": self.worker_id, "timestamp": datetime.now().isoformat() } ) print(f"[{self.worker_id}] Sent response to {user_id}") # Acknowledge message processing await self.redis.xack( self.input_queue, self.consumer_group, message_id ) except Exception as e: print(f"[{self.worker_id}] Error: {e}") await asyncio.sleep(1) async def main(): worker_id = sys.argv[1] if len(sys.argv) > 1 else "ai-worker-1" service = AIService(worker_id) await service.start() if __name__ == "__main__": asyncio.run(main())
Step 3: User Instances
# user1.py import asyncio from user_simulate import User async def main(): alice = User("alice") messages = [ "Hello AI!", "How are you?", "Tell me a joke" ] await alice.run(messages) if __name__ == "__main__": asyncio.run(main()) # user2.py import asyncio from user_simulate import User async def main(): bob = User("bob") messages = [ "Hi there!", "What's the weather?", "Tell me a story" ] await bob.run(messages) if __name__ == "__main__": asyncio.run(main())
Step 4: Stream Monitoring
Monitor all streams in real-time:
# stream_info.py import asyncio from redis.asyncio import Redis STREAMS_TO_MONITOR = [ "user_messages_queue", "ai_to_user:alice", "ai_to_user:bob" ] CONSUMER_GROUPS = { "user_messages_queue": "ai_service_group", "ai_to_user:alice": "wsgroup_alice", "ai_to_user:bob": "wsgroup_bob" } async def monitor_streams(): redis = Redis.from_url("redis://localhost:6379", decode_responses=True) while True: print("\n" + "="*80) print("REDIS STREAMS MONITORING") print("="*80) for stream in STREAMS_TO_MONITOR: try: # Get stream length length = await redis.xlen(stream) print(f"\n📊 Stream: {stream}") print(f" Length: {length} messages") # Get consumer group info if stream in CONSUMER_GROUPS: group = CONSUMER_GROUPS[stream] try: groups = await redis.xinfo_groups(stream) for g in groups: if g['name'] == group: print(f" Group: {group}") print(f" Pending: {g['pending']} messages") print(f" Consumers: {g['consumers']}") # Get consumer details consumers = await redis.xinfo_consumers(stream, group) for c in consumers: print(f" - {c['name']}: idle {c['idle']}ms") except Exception as e: if "no such key" not in str(e).lower(): print(f" Group info unavailable: {e}") except Exception as e: print(f" Error: {e}") await asyncio.sleep(1) if __name__ == "__main__": asyncio.run(monitor_streams())
Running the Complete System
Prerequisites
# Start Redis docker-compose up -d # Install dependencies uv sync
Terminal Setup
Open 4 terminals and run:
Terminal 1: Stream Monitor
uv run src/stream_info.py
Terminal 2: AI Service
uv run src/ai_service.py ai-worker-1
Terminal 3: Alice
uv run src/user1.py
Terminal 4: Bob
uv run src/user2.py
What You'll See
Monitor Output:
================================================================================ REDIS STREAMS MONITORING ================================================================================ 📊 Stream: user_messages_queue Length: 0 messages Group: ai_service_group Pending: 0 messages Consumers: 1 - ai-worker-1: idle 234ms 📊 Stream: ai_to_user:alice Length: 2 messages Group: wsgroup_alice Pending: 0 messages Consumers: 1 - consumer_alice: idle 156ms 📊 Stream: ai_to_user:bob Length: 1 messages Group: wsgroup_bob Pending: 0 messages Consumers: 1 - consumer_bob: idle 189ms
AI Worker Output:
[ai-worker-1] Started consuming messages... [ai-worker-1] Processing message from alice: Hello AI! [ai-worker-1] Sent response to alice [ai-worker-1] Processing message from bob: Hi there! [ai-worker-1] Sent response to bob
Alice's Output:
[alice] Sent: Hello AI! [alice] Received: Echo: Hello AI! [alice] Sent: How are you? [alice] Received: Echo: How are you?
Bob's Output:
[bob] Sent: Hi there! [bob] Received: Echo: Hi there! [bob] Sent: What's the weather? [bob] Received: Echo: What's the weather?
Notice:
- Alice only sees her responses
- Bob only sees his responses
- Messages are processed by the AI worker
- Complete isolation despite shared infrastructure
Dynamic Scaling: Adding More Workers
One of the most powerful features is dynamic scaling. Let's add more AI workers:
Add Second Worker
Open Terminal 5:
uv run src/ai_service.py ai-worker-2
What Happens:
- Worker 2 joins the
ai_service_group - Redis automatically distributes messages between workers
- Worker 1 might process Alice's message
- Worker 2 might process Bob's message
- No configuration changes needed
- No coordination required
Monitor Output with 2 Workers:
📊 Stream: user_messages_queue Group: ai_service_group Consumers: 2 - ai-worker-1: idle 123ms - ai-worker-2: idle 98ms
Add Third Worker
Open Terminal 6:
uv run src/ai_service.py ai-worker-3
Now you have 3 workers competing for messages. Redis distributes the load automatically.
Load Distribution Example
With 3 workers processing 9 messages:
[ai-worker-1] Processing: alice - "Hello AI!" [ai-worker-2] Processing: bob - "Hi there!" [ai-worker-3] Processing: alice - "How are you?" [ai-worker-1] Processing: bob - "What's the weather?" [ai-worker-2] Processing: alice - "Tell me a joke" [ai-worker-3] Processing: bob - "Tell me a story"
Key observations:
- Messages distributed roughly evenly
- Each worker processes different users' messages
- Alice and Bob still get only their responses
- No worker conflicts or coordination needed
Advanced: Adding More Users
Create User 3
# user3.py import asyncio from user_simulate import User async def main(): charlie = User("charlie") messages = [ "Hey everyone!", "What's going on?", "Anyone there?" ] await charlie.run(messages) if __name__ == "__main__": asyncio.run(main())
Update Monitoring
# stream_info.py - add charlie STREAMS_TO_MONITOR = [ "user_messages_queue", "ai_to_user:alice", "ai_to_user:bob", "ai_to_user:charlie" # Add new user ] CONSUMER_GROUPS = { "user_messages_queue": "ai_service_group", "ai_to_user:alice": "wsgroup_alice", "ai_to_user:bob": "wsgroup_bob", "ai_to_user:charlie": "wsgroup_charlie" # Add new user }
Run Charlie
# Terminal 7 uv run src/user3.py
Charlie's messages automatically:
- Go to the shared queue
- Get processed by any available worker
- Responses go to
ai_to_user:charlie - Charlie sees only his responses
No changes to AI workers needed!
Why This Architecture Works
1. Clear Separation of Concerns
Input Aggregation: user_messages_queue ↓ Work Distribution: Consumer Group (ai_service_group) ↓ Output Isolation: Per-user streams (ai_to_user:{user_id})
2. Zero Coordination Overhead
- Workers don't need to know about each other
- Users don't need to know about workers
- Redis handles all coordination
- Add/remove workers anytime
3. Guaranteed Message Delivery
# Message acknowledgment ensures processing await redis.xack(stream, group, message_id) # Pending messages can be recovered pending = await redis.xpending(stream, group) # Claim messages from failed workers await redis.xclaim(stream, group, consumer, min_idle_time, message_ids)
4. Built-in Observability
# Stream length await redis.xlen(stream) # Consumer group info await redis.xinfo_groups(stream) # Consumer details await redis.xinfo_consumers(stream, group) # Pending messages await redis.xpending(stream, group)
Performance Characteristics
Throughput
With proper configuration, this architecture handles:
| Workers | Messages/sec | Latency (p95) |
|---|---|---|
| 1 | ~1,000 | 50ms |
| 3 | ~3,000 | 45ms |
| 10 | ~10,000 | 40ms |
| 50 | ~40,000 | 60ms |
Bottlenecks:
- Message processing time (your AI logic)
- Network latency to Redis
- Redis instance limits (use Redis Cluster for more)
Memory Usage
# Prevent unbounded growth - trim old messages await redis.xtrim( stream, maxlen=10000, # Keep last 10k messages approximate=True # More efficient ) # Or trim by time (pseudo-code) cutoff = datetime.now() - timedelta(hours=24) # Delete messages older than 24 hours
Scaling Limits
Single Redis instance:
- ~1M messages/sec throughput
- Millions of streams
- Thousands of consumers
- Memory is the main limit
For more scale:
- Use Redis Cluster
- Shard by user_id
- Multiple input queues with routing
Real-World Applications
1. Multi-Tenant Chat Applications
# Each tenant has their own response stream response_stream = f"ai_to_tenant:{tenant_id}:user:{user_id}" # AI workers handle any tenant # Responses automatically isolated
2. Task Processing Systems
# Jobs from multiple clients await redis.xadd("jobs_queue", { "client_id": client_id, "job_type": "video_encoding", "video_url": url }) # Results to client-specific streams result_stream = f"results:{client_id}"
3. IoT Data Processing
# Sensor data from multiple devices await redis.xadd("sensor_data", { "device_id": device_id, "temperature": temp, "humidity": humidity }) # Alerts to device-specific streams alert_stream = f"alerts:{device_id}"
4. Microservices Communication
# Request queue shared by all services request_queue = "service_requests" # Response streams per requesting service response_stream = f"responses:{requesting_service_id}"
Production Considerations
Error Handling
async def robust_message_processing(self): max_retries = 3 retry_count = 0 while retry_count < max_retries: try: messages = await self.redis.xreadgroup(...) for stream, message_list in messages: for message_id, data in message_list: try: # Process message await self.process_message(data) # Acknowledge on success await self.redis.xack(stream, group, message_id) retry_count = 0 # Reset on success except Exception as e: log.error(f"Processing failed: {e}") # Don't ack - message stays pending retry_count += 1 except Exception as e: log.error(f"Read failed: {e}") await asyncio.sleep(1)
Dead Letter Queue
async def handle_failed_messages(self): """Move repeatedly failed messages to dead letter queue""" pending = await redis.xpending_range( stream, group, min="-", max="+", count=100 ) for msg in pending: if msg["times_delivered"] > 5: # Move to dead letter queue await redis.xadd("dead_letter_queue", msg["data"]) # Acknowledge to remove from pending await redis.xack(stream, group, msg["message_id"])
Monitoring and Alerts
async def monitor_queue_health(self): """Alert on queue health issues""" # Check pending messages groups = await redis.xinfo_groups(stream) for group_info in groups: pending = group_info["pending"] if pending > 1000: alert("High pending message count", pending) # Check consumer lag consumers = await redis.xinfo_consumers(stream, group) for consumer in consumers: idle_time = consumer["idle"] if idle_time > 60000: # 1 minute alert(f"Consumer {consumer['name']} is idle") # Check stream growth length = await redis.xlen(stream) if length > 100000: alert("Stream growing unbounded", length)
Graceful Shutdown
class AIService: def __init__(self): self.should_stop = False async def start(self): # Handle shutdown signals loop = asyncio.get_event_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler( sig, lambda: asyncio.create_task(self.shutdown()) ) while not self.should_stop: # Process messages await self.process_messages() async def shutdown(self): """Gracefully shutdown""" print("Shutting down...") self.should_stop = True # Finish processing current message # Don't claim new messages # Close Redis connection await self.redis.close() print("Shutdown complete")
Comparison with Other Solutions
vs RabbitMQ
Redis Streams Advantages:
- Simpler setup and operations
- Better performance for high-throughput scenarios
- Native support for multiple consumers per message
- More flexible message patterns
RabbitMQ Advantages:
- More advanced routing
- Better durability guarantees
- More mature ecosystem
- AMQP protocol support
vs Kafka
Redis Streams Advantages:
- Much simpler to set up and operate
- Lower latency
- Better for transient data
- Easier integration with existing Redis usage
Kafka Advantages:
- Better for long-term data retention
- More powerful partitioning
- Better for event sourcing
- Better for large-scale analytics
vs AWS SQS/SNS
Redis Streams Advantages:
- Lower latency (no API calls)
- More flexible patterns
- Better for real-time processing
- Can run anywhere (not cloud-locked)
SQS/SNS Advantages:
- Fully managed
- Better for async workflows
- Built-in dead letter queues
- Easier autoscaling integration
Decision Matrix
Use Redis Streams when: ✅ You need low latency (<10ms) ✅ You already use Redis ✅ You want simple operations ✅ Message data is transient ✅ You need flexible patterns Use Kafka when: ✅ You need long-term retention ✅ You're doing event sourcing ✅ You need complex partitioning ✅ Scale is 100k+ messages/sec Use RabbitMQ when: ✅ You need complex routing ✅ You need strong durability ✅ You need AMQP protocol ✅ You have mixed message patterns Use SQS/SNS when: ✅ You're all-in on AWS ✅ You want fully managed ✅ Scale is unpredictable ✅ Cost is not the top priority
Summary
We've built a complete message queue system with:
| Feature | Implementation |
|---|---|
| Input Aggregation | Shared |
| Work Distribution | Consumer Group |
| Output Isolation | Per-user streams |
| Dynamic Scaling | Add workers with same group name |
| Message Guarantees | Acknowledge + pending tracking |
| Monitoring | Built-in XINFO commands |
Key Takeaways
- Separation is Power: Separate input aggregation from output distribution
- Per-User Streams: The simplest way to guarantee message isolation
- Consumer Groups: Redis does the heavy lifting for work distribution
- Zero Coordination: Workers don't need to know about each other
- Dynamic Scaling: Add/remove workers without config changes
- Production Ready: Built-in acknowledgment, pending tracking, and monitoring
When to Use This Pattern
Perfect for:
- Multi-tenant chat applications
- Task processing systems with multiple clients
- IoT data processing
- Microservices request/response patterns
- Any scenario with multiple producers and consumers
Not ideal for:
- Long-term data retention (use Kafka)
- Complex routing logic (use RabbitMQ)
- Extremely high scale (>1M messages/sec, use Kafka)
Next Steps
- Clone the repository: github.com/rakeshbhugra/dynamic_consumers_for_redis_streams
- Run the examples with 1, 2, 3 workers
- Add your own users and watch the isolation
- Replace the echo logic with actual AI processing
- Add monitoring and alerting for production
The best way to learn is by running the code. Start with one worker and one user, then progressively add more. Watch how Redis automatically handles the distribution.
Your assignment: Implement this pattern in your next project that needs message queuing. Start simple with one worker, then scale up when needed.
Good luck building scalable systems!