What are Streams?
A Stream is an append-only log of events. Each event is added with a timestamp and gets a unique ID. Perfect for tracking activities, messaging, and event processing.
Think of it as an audit trail or event log that’s ordered by time.
Basic Commands
# Add event to stream
XADD events * field1 "value1" field2 "value2"
# Returns: 1707300000000-0 (auto-generated ID)
# Add with custom ID
XADD events "2026-02-09-1" field1 "value1"
# Get all events
XRANGE events - +
# Get last 10 events
XREVRANGE events + - COUNT 10
# Get stream length
XLEN events
# Remove old events
XDEL events "1707300000000-0"
Activity Log
Track what users do:
# User logs in
XADD user:1:activity * action "login" ip "192.168.1.1" device "mobile"
# User posts
XADD user:1:activity * action "post_created" post_id 123
# User updates profile
XADD user:1:activity * action "profile_updated" field "bio"
# User logs out
XADD user:1:activity * action "logout"
# View activity history
XRANGE user:1:activity - +
# Last 5 activities
XREVRANGE user:1:activity + - COUNT 5
Node.js (ioredis):
import Redis from 'ioredis';
const redis = new Redis();
// Log user activity
async function logActivity(userId, action, metadata = {}) {
const streamKey = `user:${userId}:activity`;
await redis.xadd(streamKey, '*',
'action', action,
'timestamp', Date.now().toString(),
...Object.entries(metadata).flat()
);
// Trim to keep only last 1000 events
await redis.xtrim(streamKey, 'MAXLEN', '~', 1000);
}
// Get activity history
async function getActivityHistory(userId, count = 50) {
const events = await redis.xrevrange(
`user:${userId}:activity`,
'+', '-',
'COUNT', count
);
return events.map(([id, fields]) => {
const obj = { id };
for (let i = 0; i < fields.length; i += 2) {
obj[fields[i]] = fields[i + 1];
}
return obj;
});
}
// Usage
await logActivity(1, 'login', { ip: '192.168.1.1', device: 'mobile' });
await logActivity(1, 'post_created', { post_id: '123' });
const history = await getActivityHistory(1, 10);
Python (redis-py):
import redis
from datetime import datetime
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def log_activity(user_id: int, action: str, metadata: dict = None) -> dict:
"""Log a user activity to the stream."""
stream_key = f"user:{user_id}:activity"
fields = {
'action': action,
'timestamp': str(datetime.now().timestamp()),
**(metadata or {})
}
event_id = r.xadd(stream_key, fields)
# Trim to keep only last 1000 events
r.xtrim(stream_key, maxlen=1000, approximate=True)
return {'event_id': event_id}
def get_activity_history(user_id: int, count: int = 50) -> list:
"""Get user's activity history."""
events = r.xrevrange(f"user:{user_id}:activity", '+', '-', count=count)
return [
{'id': event_id, **fields}
for event_id, fields in events
]
# Usage
log_activity(1, 'login', {'ip': '192.168.1.1', 'device': 'mobile'})
log_activity(1, 'post_created', {'post_id': '123'})
history = get_activity_history(1, 10)
print(history)
Chat Messages
# New messages arrive
XADD chat:room:1 * user "alice" message "Hi everyone!"
XADD chat:room:1 * user "bob" message "Hello!"
XADD chat:room:1 * user "alice" message "How are you?"
# Get message history
XRANGE chat:room:1 - +
# Get newest messages first
XREVRANGE chat:room:1 + - COUNT 50
Node.js (ioredis):
// Send a chat message
async function sendChatMessage(roomId, userId, message) {
const messageId = await redis.xadd(
`chat:room:${roomId}`,
'*',
'user', userId,
'message', message,
'timestamp', Date.now().toString()
);
return { messageId };
}
// Get chat history
async function getChatHistory(roomId, count = 50) {
const messages = await redis.xrevrange(
`chat:room:${roomId}`,
'+', '-',
'COUNT', count
);
return messages.map(([id, fields]) => {
const msg = { id };
for (let i = 0; i < fields.length; i += 2) {
msg[fields[i]] = fields[i + 1];
}
return msg;
}).reverse(); // Oldest first
}
// Listen for new messages (real-time)
async function listenForMessages(roomId, lastId = '$', callback) {
while (true) {
const results = await redis.xread(
'BLOCK', 0,
'STREAMS', `chat:room:${roomId}`, lastId
);
if (results) {
for (const [, messages] of results) {
for (const [id, fields] of messages) {
const msg = { id };
for (let i = 0; i < fields.length; i += 2) {
msg[fields[i]] = fields[i + 1];
}
callback(msg);
lastId = id;
}
}
}
}
}
Python (redis-py):
import redis
from datetime import datetime
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def send_chat_message(room_id: int, user_id: str, message: str) -> dict:
"""Send a message to a chat room."""
message_id = r.xadd(f"chat:room:{room_id}", {
'user': user_id,
'message': message,
'timestamp': str(datetime.now().timestamp())
})
return {'message_id': message_id}
def get_chat_history(room_id: int, count: int = 50) -> list:
"""Get chat message history for a room."""
messages = r.xrevrange(f"chat:room:{room_id}", '+', '-', count=count)
return list(reversed([
{'id': msg_id, **fields}
for msg_id, fields in messages
]))
def listen_for_messages(room_id: int, last_id: str = '$', callback=None):
"""Listen for new messages in real-time (blocking)."""
while True:
results = r.xread({f"chat:room:{room_id}": last_id}, block=1000)
if results:
for stream, messages in results:
for msg_id, fields in messages:
if callback:
callback({'id': msg_id, **fields})
last_id = msg_id
# Usage
send_chat_message(1, "alice", "Hello everyone!")
send_chat_message(1, "bob", "Hi Alice!")
history = get_chat_history(1, 50)
print(history)
Event Processing with Consumer Groups
Consumer groups let multiple workers process events reliably:
# Create a consumer group
XGROUP CREATE events mygroup $ MKSTREAM
# $ = start from newest message
# Consumer 1 gets events
event = XREADGROUP GROUP mygroup consumer1 STREAMS events >
# ">" means: give me new events I haven't processed
# Process event...
# Acknowledge event (mark as processed)
XACK events mygroup event_id
Node.js (ioredis):
// Create consumer group (run once)
async function createConsumerGroup(streamKey, groupName) {
try {
await redis.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM');
console.log(`Consumer group ${groupName} created`);
} catch (err) {
if (!err.message.includes('BUSYGROUP')) throw err;
// Group already exists
}
}
// Consumer worker
async function consumerWorker(streamKey, groupName, consumerName) {
await createConsumerGroup(streamKey, groupName);
while (true) {
try {
// Read new messages for this consumer
const results = await redis.xreadgroup(
'GROUP', groupName, consumerName,
'BLOCK', 5000,
'COUNT', 10,
'STREAMS', streamKey, '>'
);
if (results) {
for (const [, messages] of results) {
for (const [id, fields] of messages) {
try {
// Process the message
const data = {};
for (let i = 0; i < fields.length; i += 2) {
data[fields[i]] = fields[i + 1];
}
await processMessage(data);
// Acknowledge successful processing
await redis.xack(streamKey, groupName, id);
} catch (err) {
console.error(`Failed to process ${id}:`, err);
// Message will be retried (not acknowledged)
}
}
}
}
} catch (err) {
console.error('Consumer error:', err);
await new Promise(r => setTimeout(r, 1000));
}
}
}
// Start multiple consumers
consumerWorker('orders', 'order_processors', 'worker1');
consumerWorker('orders', 'order_processors', 'worker2');
Python (redis-py):
import redis
import threading
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def create_consumer_group(stream_key: str, group_name: str):
"""Create a consumer group for the stream."""
try:
r.xgroup_create(stream_key, group_name, id='$', mkstream=True)
print(f"Consumer group {group_name} created")
except redis.ResponseError as e:
if 'BUSYGROUP' not in str(e):
raise
# Group already exists
def consumer_worker(stream_key: str, group_name: str, consumer_name: str):
"""Worker that processes messages from a consumer group."""
create_consumer_group(stream_key, group_name)
while True:
try:
# Read new messages
results = r.xreadgroup(
group_name, consumer_name,
{stream_key: '>'},
count=10,
block=5000
)
if results:
for stream, messages in results:
for msg_id, fields in messages:
try:
# Process the message
process_message(fields)
# Acknowledge
r.xack(stream_key, group_name, msg_id)
except Exception as e:
print(f"Failed to process {msg_id}: {e}")
except Exception as e:
print(f"Consumer error: {e}")
time.sleep(1)
# Usage: Start consumers in threads
for i in range(3):
t = threading.Thread(
target=consumer_worker,
args=('orders', 'order_processors', f'worker{i}'),
daemon=True
)
t.start()
Sensor Data / Metrics
# IoT sensor sends temperature reading
XADD temperature:sensor:1 * location "room1" temp 22.5 humidity 45
# Multiple sensors
XADD temperature:sensor:2 * location "room2" temp 20.1 humidity 50
# Get readings from time period
start_time = "1707300000000"
end_time = "1707400000000"
XRANGE temperature:sensor:1 start_time end_time
# Get latest 100 readings
XREVRANGE temperature:sensor:1 + - COUNT 100
Event Notification System
# Notifications stream
XADD notifications:user:1 * type "like" from "user:2" post_id 456
XADD notifications:user:1 * type "comment" from "user:3" post_id 456
XADD notifications:user:1 * type "follow" from "user:4"
# User sees last 20 notifications
XREVRANGE notifications:user:1 + - COUNT 20
# Check for new notifications after certain ID
last_seen = "1707300000000-5"
XRANGE notifications:user:1 (last_seen + # New since last_seen
Audit Trail
Track all changes to important data:
# Track admin actions
XADD audit:log * \
action "delete_user" \
admin_id "admin:1" \
target "user:123" \
reason "Violated terms"
XADD audit:log * \
action "update_config" \
admin_id "admin:2" \
setting "max_connections" \
old_value 1000 \
new_value 2000
# View audit trail
XRANGE audit:log - +
# Audit trail for specific admin
# (would need to scan and filter)
XRANGE audit:log - +
Node.js (ioredis):
// Log an audit event
async function auditLog(action, adminId, details = {}) {
const eventId = await redis.xadd(
'audit:log',
'*',
'action', action,
'admin_id', adminId,
'timestamp', new Date().toISOString(),
'ip', details.ip || '',
...Object.entries(details).filter(([k]) => k !== 'ip').flat().map(String)
);
return eventId;
}
// Get audit log (with optional filters)
async function getAuditLog(options = {}) {
const { count = 100, startTime, endTime, adminId } = options;
let events = await redis.xrevrange(
'audit:log',
endTime || '+',
startTime || '-',
'COUNT', count
);
// Parse and filter
const logs = events.map(([id, fields]) => {
const log = { id };
for (let i = 0; i < fields.length; i += 2) {
log[fields[i]] = fields[i + 1];
}
return log;
});
// Filter by admin if specified
if (adminId) {
return logs.filter(l => l.admin_id === adminId);
}
return logs;
}
// Usage
await auditLog('delete_user', 'admin:1', {
target: 'user:123',
reason: 'Violated terms',
ip: '10.0.0.1'
});
const recentLogs = await getAuditLog({ count: 50 });
const adminLogs = await getAuditLog({ adminId: 'admin:1' });
Python (redis-py):
import redis
from datetime import datetime
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def audit_log(action: str, admin_id: str, ip: str = None, details: dict = None) -> dict:
"""Log an audit event."""
event_id = r.xadd('audit:log', {
'action': action,
'admin_id': admin_id,
'timestamp': datetime.now().isoformat(),
'ip': ip or '',
**(details or {})
})
return {'event_id': event_id}
def get_audit_log(count: int = 100, admin_id: str = None) -> list:
"""Get audit log entries, optionally filtered by admin."""
events = r.xrevrange('audit:log', '+', '-', count=count)
logs = [
{'id': event_id, **fields}
for event_id, fields in events
]
# Filter by admin if specified
if admin_id:
logs = [l for l in logs if l.get('admin_id') == admin_id]
return logs
# Usage
audit_log(
action='delete_user',
admin_id='admin:1',
ip='10.0.0.1',
details={'target': 'user:123', 'reason': 'Violated terms'}
)
recent_logs = get_audit_log(count=50)
admin_logs = get_audit_log(admin_id='admin:1')
print(recent_logs)
Streams vs Lists
| Feature | Stream | List |
|---|---|---|
| Ordered | By timestamp (automatic) | By insertion |
| Consumer groups | Yes (reliable processing) | No |
| Event history | Keeps everything | Needs LTRIM |
| Use case | Events, logging, messaging | Queues, stacks |
| Durability | Built-in | Need to manage |
Use Streams when:
- You need event history
- Multiple workers process items
- You need guaranteed delivery
- Time ordering is important
Use Lists when:
- Simple queue/stack
- Don’t need history
- Just FIFO/LIFO behavior
Common Patterns
Real-time Dashboard
# Events arrive as they happen
XADD events * event_type "page_view" user_id 1
XADD events * event_type "purchase" user_id 2
# Dashboard reads latest
latest = XREVRANGE events + - COUNT 100
# Display trending items/users in real-time
Event Replay
# Store all events
XADD user:events * action "login"
XADD user:events * action "clicked_item" item_id 123
XADD user:events * action "purchased" item_id 123
# Replay to reconstruct state
events = XRANGE user:events - +
state = {}
for event in events:
if event.action == "purchased":
state["purchase_count"] += 1
Delayed Processing
# Events with timestamps in future
XADD scheduled_events * \
run_at "2026-02-10T10:00:00Z" \
task "send_email" \
to "user:1"
# Worker checks for due tasks
due_events = XRANGEBYSCORE scheduled_events - current_time
# Process and remove
for event in due_events:
execute(event)
XDEL scheduled_events event_id