Skip to main content
Bytes & Beyond

Redis Streams

Streams in Redis - event logs, message queues, activity tracking, and real-time data.

What are Streams?

A Stream is an append-only log of events. Each event is added with a timestamp and gets a unique ID. Perfect for tracking activities, messaging, and event processing.

Think of it as an audit trail or event log that’s ordered by time.

Basic Commands

# Add event to stream
XADD events * field1 "value1" field2 "value2"
# Returns: 1707300000000-0 (auto-generated ID)

# Add with custom ID
XADD events "2026-02-09-1" field1 "value1"

# Get all events
XRANGE events - +

# Get last 10 events
XREVRANGE events + - COUNT 10

# Get stream length
XLEN events

# Remove old events
XDEL events "1707300000000-0"

Activity Log

Track what users do:

# User logs in
XADD user:1:activity * action "login" ip "192.168.1.1" device "mobile"

# User posts
XADD user:1:activity * action "post_created" post_id 123

# User updates profile
XADD user:1:activity * action "profile_updated" field "bio"

# User logs out
XADD user:1:activity * action "logout"

# View activity history
XRANGE user:1:activity - +

# Last 5 activities
XREVRANGE user:1:activity + - COUNT 5

Node.js (ioredis):

import Redis from 'ioredis';
const redis = new Redis();

// Log user activity
async function logActivity(userId, action, metadata = {}) {
  const streamKey = `user:${userId}:activity`;
  
  await redis.xadd(streamKey, '*',
    'action', action,
    'timestamp', Date.now().toString(),
    ...Object.entries(metadata).flat()
  );
  
  // Trim to keep only last 1000 events
  await redis.xtrim(streamKey, 'MAXLEN', '~', 1000);
}

// Get activity history
async function getActivityHistory(userId, count = 50) {
  const events = await redis.xrevrange(
    `user:${userId}:activity`,
    '+', '-',
    'COUNT', count
  );
  
  return events.map(([id, fields]) => {
    const obj = { id };
    for (let i = 0; i < fields.length; i += 2) {
      obj[fields[i]] = fields[i + 1];
    }
    return obj;
  });
}

// Usage
await logActivity(1, 'login', { ip: '192.168.1.1', device: 'mobile' });
await logActivity(1, 'post_created', { post_id: '123' });
const history = await getActivityHistory(1, 10);

Python (redis-py):

import redis
from datetime import datetime

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def log_activity(user_id: int, action: str, metadata: dict = None) -> dict:
    """Log a user activity to the stream."""
    stream_key = f"user:{user_id}:activity"
    
    fields = {
        'action': action,
        'timestamp': str(datetime.now().timestamp()),
        **(metadata or {})
    }
    
    event_id = r.xadd(stream_key, fields)
    
    # Trim to keep only last 1000 events
    r.xtrim(stream_key, maxlen=1000, approximate=True)
    
    return {'event_id': event_id}

def get_activity_history(user_id: int, count: int = 50) -> list:
    """Get user's activity history."""
    events = r.xrevrange(f"user:{user_id}:activity", '+', '-', count=count)
    
    return [
        {'id': event_id, **fields}
        for event_id, fields in events
    ]

# Usage
log_activity(1, 'login', {'ip': '192.168.1.1', 'device': 'mobile'})
log_activity(1, 'post_created', {'post_id': '123'})
history = get_activity_history(1, 10)
print(history)

Chat Messages

# New messages arrive
XADD chat:room:1 * user "alice" message "Hi everyone!"
XADD chat:room:1 * user "bob" message "Hello!"
XADD chat:room:1 * user "alice" message "How are you?"

# Get message history
XRANGE chat:room:1 - +

# Get newest messages first
XREVRANGE chat:room:1 + - COUNT 50

Node.js (ioredis):

// Send a chat message
async function sendChatMessage(roomId, userId, message) {
  const messageId = await redis.xadd(
    `chat:room:${roomId}`,
    '*',
    'user', userId,
    'message', message,
    'timestamp', Date.now().toString()
  );
  
  return { messageId };
}

// Get chat history
async function getChatHistory(roomId, count = 50) {
  const messages = await redis.xrevrange(
    `chat:room:${roomId}`,
    '+', '-',
    'COUNT', count
  );
  
  return messages.map(([id, fields]) => {
    const msg = { id };
    for (let i = 0; i < fields.length; i += 2) {
      msg[fields[i]] = fields[i + 1];
    }
    return msg;
  }).reverse(); // Oldest first
}

// Listen for new messages (real-time)
async function listenForMessages(roomId, lastId = '$', callback) {
  while (true) {
    const results = await redis.xread(
      'BLOCK', 0,
      'STREAMS', `chat:room:${roomId}`, lastId
    );
    
    if (results) {
      for (const [, messages] of results) {
        for (const [id, fields] of messages) {
          const msg = { id };
          for (let i = 0; i < fields.length; i += 2) {
            msg[fields[i]] = fields[i + 1];
          }
          callback(msg);
          lastId = id;
        }
      }
    }
  }
}

Python (redis-py):

import redis
from datetime import datetime

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def send_chat_message(room_id: int, user_id: str, message: str) -> dict:
    """Send a message to a chat room."""
    message_id = r.xadd(f"chat:room:{room_id}", {
        'user': user_id,
        'message': message,
        'timestamp': str(datetime.now().timestamp())
    })
    
    return {'message_id': message_id}

def get_chat_history(room_id: int, count: int = 50) -> list:
    """Get chat message history for a room."""
    messages = r.xrevrange(f"chat:room:{room_id}", '+', '-', count=count)
    
    return list(reversed([
        {'id': msg_id, **fields}
        for msg_id, fields in messages
    ]))

def listen_for_messages(room_id: int, last_id: str = '$', callback=None):
    """Listen for new messages in real-time (blocking)."""
    while True:
        results = r.xread({f"chat:room:{room_id}": last_id}, block=1000)
        
        if results:
            for stream, messages in results:
                for msg_id, fields in messages:
                    if callback:
                        callback({'id': msg_id, **fields})
                    last_id = msg_id

# Usage
send_chat_message(1, "alice", "Hello everyone!")
send_chat_message(1, "bob", "Hi Alice!")
history = get_chat_history(1, 50)
print(history)

Event Processing with Consumer Groups

Consumer groups let multiple workers process events reliably:

# Create a consumer group
XGROUP CREATE events mygroup $ MKSTREAM
# $ = start from newest message

# Consumer 1 gets events
event = XREADGROUP GROUP mygroup consumer1 STREAMS events >
# ">" means: give me new events I haven't processed

# Process event...

# Acknowledge event (mark as processed)
XACK events mygroup event_id

Node.js (ioredis):

// Create consumer group (run once)
async function createConsumerGroup(streamKey, groupName) {
  try {
    await redis.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM');
    console.log(`Consumer group ${groupName} created`);
  } catch (err) {
    if (!err.message.includes('BUSYGROUP')) throw err;
    // Group already exists
  }
}

// Consumer worker
async function consumerWorker(streamKey, groupName, consumerName) {
  await createConsumerGroup(streamKey, groupName);
  
  while (true) {
    try {
      // Read new messages for this consumer
      const results = await redis.xreadgroup(
        'GROUP', groupName, consumerName,
        'BLOCK', 5000,
        'COUNT', 10,
        'STREAMS', streamKey, '>'
      );
      
      if (results) {
        for (const [, messages] of results) {
          for (const [id, fields] of messages) {
            try {
              // Process the message
              const data = {};
              for (let i = 0; i < fields.length; i += 2) {
                data[fields[i]] = fields[i + 1];
              }
              
              await processMessage(data);
              
              // Acknowledge successful processing
              await redis.xack(streamKey, groupName, id);
            } catch (err) {
              console.error(`Failed to process ${id}:`, err);
              // Message will be retried (not acknowledged)
            }
          }
        }
      }
    } catch (err) {
      console.error('Consumer error:', err);
      await new Promise(r => setTimeout(r, 1000));
    }
  }
}

// Start multiple consumers
consumerWorker('orders', 'order_processors', 'worker1');
consumerWorker('orders', 'order_processors', 'worker2');

Python (redis-py):

import redis
import threading
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def create_consumer_group(stream_key: str, group_name: str):
    """Create a consumer group for the stream."""
    try:
        r.xgroup_create(stream_key, group_name, id='$', mkstream=True)
        print(f"Consumer group {group_name} created")
    except redis.ResponseError as e:
        if 'BUSYGROUP' not in str(e):
            raise
        # Group already exists

def consumer_worker(stream_key: str, group_name: str, consumer_name: str):
    """Worker that processes messages from a consumer group."""
    create_consumer_group(stream_key, group_name)
    
    while True:
        try:
            # Read new messages
            results = r.xreadgroup(
                group_name, consumer_name,
                {stream_key: '>'},
                count=10,
                block=5000
            )
            
            if results:
                for stream, messages in results:
                    for msg_id, fields in messages:
                        try:
                            # Process the message
                            process_message(fields)
                            
                            # Acknowledge
                            r.xack(stream_key, group_name, msg_id)
                        except Exception as e:
                            print(f"Failed to process {msg_id}: {e}")
        except Exception as e:
            print(f"Consumer error: {e}")
            time.sleep(1)

# Usage: Start consumers in threads
for i in range(3):
    t = threading.Thread(
        target=consumer_worker,
        args=('orders', 'order_processors', f'worker{i}'),
        daemon=True
    )
    t.start()

Sensor Data / Metrics

# IoT sensor sends temperature reading
XADD temperature:sensor:1 * location "room1" temp 22.5 humidity 45

# Multiple sensors
XADD temperature:sensor:2 * location "room2" temp 20.1 humidity 50

# Get readings from time period
start_time = "1707300000000"
end_time = "1707400000000"
XRANGE temperature:sensor:1 start_time end_time

# Get latest 100 readings
XREVRANGE temperature:sensor:1 + - COUNT 100

Event Notification System

# Notifications stream
XADD notifications:user:1 * type "like" from "user:2" post_id 456
XADD notifications:user:1 * type "comment" from "user:3" post_id 456
XADD notifications:user:1 * type "follow" from "user:4"

# User sees last 20 notifications
XREVRANGE notifications:user:1 + - COUNT 20

# Check for new notifications after certain ID
last_seen = "1707300000000-5"
XRANGE notifications:user:1 (last_seen +  # New since last_seen

Audit Trail

Track all changes to important data:

# Track admin actions
XADD audit:log * \
  action "delete_user" \
  admin_id "admin:1" \
  target "user:123" \
  reason "Violated terms"

XADD audit:log * \
  action "update_config" \
  admin_id "admin:2" \
  setting "max_connections" \
  old_value 1000 \
  new_value 2000

# View audit trail
XRANGE audit:log - +

# Audit trail for specific admin
# (would need to scan and filter)
XRANGE audit:log - +

Node.js (ioredis):

// Log an audit event
async function auditLog(action, adminId, details = {}) {
  const eventId = await redis.xadd(
    'audit:log',
    '*',
    'action', action,
    'admin_id', adminId,
    'timestamp', new Date().toISOString(),
    'ip', details.ip || '',
    ...Object.entries(details).filter(([k]) => k !== 'ip').flat().map(String)
  );
  
  return eventId;
}

// Get audit log (with optional filters)
async function getAuditLog(options = {}) {
  const { count = 100, startTime, endTime, adminId } = options;
  
  let events = await redis.xrevrange(
    'audit:log',
    endTime || '+',
    startTime || '-',
    'COUNT', count
  );
  
  // Parse and filter
  const logs = events.map(([id, fields]) => {
    const log = { id };
    for (let i = 0; i < fields.length; i += 2) {
      log[fields[i]] = fields[i + 1];
    }
    return log;
  });
  
  // Filter by admin if specified
  if (adminId) {
    return logs.filter(l => l.admin_id === adminId);
  }
  
  return logs;
}

// Usage
await auditLog('delete_user', 'admin:1', {
  target: 'user:123',
  reason: 'Violated terms',
  ip: '10.0.0.1'
});

const recentLogs = await getAuditLog({ count: 50 });
const adminLogs = await getAuditLog({ adminId: 'admin:1' });

Python (redis-py):

import redis
from datetime import datetime

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def audit_log(action: str, admin_id: str, ip: str = None, details: dict = None) -> dict:
    """Log an audit event."""
    event_id = r.xadd('audit:log', {
        'action': action,
        'admin_id': admin_id,
        'timestamp': datetime.now().isoformat(),
        'ip': ip or '',
        **(details or {})
    })
    
    return {'event_id': event_id}

def get_audit_log(count: int = 100, admin_id: str = None) -> list:
    """Get audit log entries, optionally filtered by admin."""
    events = r.xrevrange('audit:log', '+', '-', count=count)
    
    logs = [
        {'id': event_id, **fields}
        for event_id, fields in events
    ]
    
    # Filter by admin if specified
    if admin_id:
        logs = [l for l in logs if l.get('admin_id') == admin_id]
    
    return logs

# Usage
audit_log(
    action='delete_user',
    admin_id='admin:1',
    ip='10.0.0.1',
    details={'target': 'user:123', 'reason': 'Violated terms'}
)

recent_logs = get_audit_log(count=50)
admin_logs = get_audit_log(admin_id='admin:1')
print(recent_logs)

Streams vs Lists

FeatureStreamList
OrderedBy timestamp (automatic)By insertion
Consumer groupsYes (reliable processing)No
Event historyKeeps everythingNeeds LTRIM
Use caseEvents, logging, messagingQueues, stacks
DurabilityBuilt-inNeed to manage

Use Streams when:

  • You need event history
  • Multiple workers process items
  • You need guaranteed delivery
  • Time ordering is important

Use Lists when:

  • Simple queue/stack
  • Don’t need history
  • Just FIFO/LIFO behavior

Common Patterns

Real-time Dashboard

# Events arrive as they happen
XADD events * event_type "page_view" user_id 1
XADD events * event_type "purchase" user_id 2

# Dashboard reads latest
latest = XREVRANGE events + - COUNT 100

# Display trending items/users in real-time

Event Replay

# Store all events
XADD user:events * action "login"
XADD user:events * action "clicked_item" item_id 123
XADD user:events * action "purchased" item_id 123

# Replay to reconstruct state
events = XRANGE user:events - +
state = {}
for event in events:
  if event.action == "purchased":
    state["purchase_count"] += 1

Delayed Processing

# Events with timestamps in future
XADD scheduled_events * \
  run_at "2026-02-10T10:00:00Z" \
  task "send_email" \
  to "user:1"

# Worker checks for due tasks
due_events = XRANGEBYSCORE scheduled_events - current_time

# Process and remove
for event in due_events:
  execute(event)
  XDEL scheduled_events event_id