What are Lists?
A List is an ordered collection of items, like an array. You can add items to the left (head) or right (tail), and remove from either end. Redis optimizes Lists for fast operations on both ends.
Think of it as a queue or stack that lives in memory, accessible from anywhere.
Basic Commands
RPUSH list "first" # Add to the right (tail/end)
RPUSH list "second"
LPUSH list "zero" # Add to the left (head/start)
LRANGE list 0 -1 # Get all items
# Returns: ["zero", "first", "second"]
LLEN list # Length: 3
LPOP list # Remove and return left: "zero"
RPOP list # Remove and return right: "second"
LINDEX list 0 # Get item at index 0
LSET list 0 "new" # Set item at index 0
Lists as Queues (FIFO)
FIFO = First In, First Out. Like a line at a coffee shop — first person in is first person served.
Job Queue Example
# Add jobs to the queue (producers add to the right)
RPUSH jobs "send-email" "process-image" "generate-report"
# Workers take jobs from the left
LPOP jobs # Get: "send-email"
LPOP jobs # Get: "process-image"
LPOP jobs # Get: "generate-report"
Node.js (ioredis):
import Redis from 'ioredis';
const redis = new Redis();
// Producer: Add jobs to queue
async function addEmailJob(userId, type) {
await redis.rpush('email:queue', JSON.stringify({ userId, type }));
}
// Add multiple jobs
await addEmailJob(1, 'welcome');
await addEmailJob(2, 'reset');
await addEmailJob(3, 'notification');
// Worker: Process jobs
async function worker() {
while (true) {
const job = await redis.lpop('email:queue');
if (job) {
const { userId, type } = JSON.parse(job);
await processEmail(userId, type);
} else {
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
}
Python (redis-py):
import redis
import json
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Producer: Add jobs to queue
def add_email_job(user_id: int, job_type: str):
r.rpush('email:queue', json.dumps({'user_id': user_id, 'type': job_type}))
# Add multiple jobs
add_email_job(1, 'welcome')
add_email_job(2, 'reset')
add_email_job(3, 'notification')
# Worker: Process jobs
def worker():
while True:
job = r.lpop('email:queue')
if job:
data = json.loads(job)
process_email(data['user_id'], data['type'])
else:
time.sleep(1)
Lists as Stacks (LIFO)
LIFO = Last In, First Out. Like a plate dispenser — last plate added is first plate taken.
Browser History Example
# User visits pages
LPUSH browser:history "page3"
LPUSH browser:history "page2"
LPUSH browser:history "page1"
# Click back button (go backwards through history)
LPOP browser:history # page1
LPOP browser:history # page2
LPOP browser:history # page3
Real-world use:
# Undo functionality
LPUSH undo:stack "edited paragraph 1"
LPUSH undo:stack "added image"
LPUSH undo:stack "changed title"
# Undo button clicked
LPOP undo:stack # "changed title" - reverts that action
Lists as Activity Feeds
Store and retrieve recent activities:
# Activity happens, add to right (newest at the end)
RPUSH user:1:activity "logged in"
RPUSH user:1:activity "posted comment"
RPUSH user:1:activity "liked a post"
# Get recent activities (reverse order - newest first)
LRANGE user:1:activity -3 -1 # Last 3 items: newest first
Node.js (ioredis):
// Add activity
async function logActivity(userId, action, metadata = {}) {
const activity = JSON.stringify({
action,
...metadata,
timestamp: Date.now()
});
await redis.rpush(`user:${userId}:activity`, activity);
// Keep only last 100 activities
await redis.ltrim(`user:${userId}:activity`, -100, -1);
}
// Get recent activities
async function getRecentActivity(userId, count = 10) {
const activities = await redis.lrange(`user:${userId}:activity`, -count, -1);
return activities.map(a => JSON.parse(a)).reverse();
}
// Usage
await logActivity(1, 'logged_in', { ip: '192.168.1.1' });
await logActivity(1, 'posted_comment', { postId: 123 });
const recent = await getRecentActivity(1, 5);
Python (redis-py):
import json
from datetime import datetime
def log_activity(user_id: int, action: str, metadata: dict = None):
activity = json.dumps({
'action': action,
**(metadata or {}),
'timestamp': datetime.now().isoformat()
})
r.rpush(f'user:{user_id}:activity', activity)
# Keep only last 100 activities
r.ltrim(f'user:{user_id}:activity', -100, -1)
def get_recent_activity(user_id: int, count: int = 10) -> list:
activities = r.lrange(f'user:{user_id}:activity', -count, -1)
return [json.loads(a) for a in reversed(activities)]
# Usage
log_activity(1, 'logged_in', {'ip': '192.168.1.1'})
log_activity(1, 'posted_comment', {'post_id': 123})
recent = get_recent_activity(1, 5)
Chat Messages Example
# Messages arrive
RPUSH chat:room:1 "Alice: Hi"
RPUSH chat:room:1 "Bob: Hello"
RPUSH chat:room:1 "Alice: How are you?"
# Get message history (last 50 messages)
LRANGE chat:room:1 -50 -1
# Get newest 10
LRANGE chat:room:1 -10 -1
Node.js (ioredis):
// Send a message
async function sendMessage(roomId, userId, message) {
const chatMessage = JSON.stringify({
userId,
message,
timestamp: Date.now()
});
await redis.rpush(`chat:room:${roomId}`, chatMessage);
// Keep last 1000 messages per room
await redis.ltrim(`chat:room:${roomId}`, -1000, -1);
return chatMessage;
}
// Get chat history
async function getChatHistory(roomId, count = 50) {
const messages = await redis.lrange(`chat:room:${roomId}`, -count, -1);
return messages.map(m => JSON.parse(m));
}
// Usage
await sendMessage(1, 'alice', 'Hi everyone!');
await sendMessage(1, 'bob', 'Hello!');
const history = await getChatHistory(1, 50);
Python (redis-py):
def send_message(room_id: int, user_id: str, message: str) -> dict:
chat_message = json.dumps({
'user_id': user_id,
'message': message,
'timestamp': datetime.now().isoformat()
})
r.rpush(f'chat:room:{room_id}', chat_message)
r.ltrim(f'chat:room:{room_id}', -1000, -1) # Keep last 1000
return {'status': 'sent'}
def get_chat_history(room_id: int, limit: int = 50) -> list:
messages = r.lrange(f'chat:room:{room_id}', -limit, -1)
return [json.loads(m) for m in messages]
# Usage
send_message(1, 'alice', 'Hi everyone!')
send_message(1, 'bob', 'Hello!')
history = get_chat_history(1, 50)
Blocking Operations
Sometimes you want to wait for an item if the queue is empty:
# BLPOP - Blocking Left Pop
# Waits up to 10 seconds for an item
item = BLPOP jobs 10
# Or wait forever (0 = infinite)
item = BLPOP jobs 0
Node.js (Blocking Worker):
// Blocking worker that waits for jobs
async function blockingWorker() {
console.log('Worker started, waiting for jobs...');
while (true) {
// BLPOP blocks until a job is available (timeout 0 = wait forever)
const result = await redis.blpop('jobs', 0);
if (result) {
const [queue, job] = result;
console.log(`Processing job from ${queue}:`, job);
await processJob(JSON.parse(job));
}
}
}
// Multiple queues with priority
async function priorityWorker() {
while (true) {
// Check high priority first, then normal
const result = await redis.blpop('jobs:high', 'jobs:normal', 5);
if (result) {
const [queue, job] = result;
await processJob(JSON.parse(job));
}
}
}
Python (Blocking Worker):
import threading
def blocking_worker():
print('Worker started, waiting for jobs...')
while True:
# BLPOP blocks until a job is available (timeout 0 = wait forever)
result = r.blpop('jobs', timeout=0)
if result:
queue, job = result
print(f'Processing job from {queue}:', job)
process_job(json.loads(job))
# Run worker in background thread
worker_thread = threading.Thread(target=blocking_worker, daemon=True)
worker_thread.start()
# Priority worker - checks high priority queue first
def priority_worker():
while True:
# Check high priority first, then normal (5 second timeout)
result = r.blpop(['jobs:high', 'jobs:normal'], timeout=5)
if result:
queue, job = result
process_job(json.loads(job))
This is better than polling (checking every second) because it’s instant and uses less CPU.
Working with List Ranges
RPUSH numbers 1 2 3 4 5 6 7 8 9 10
LRANGE numbers 0 -1 # All: [1,2,3,4,5,6,7,8,9,10]
LRANGE numbers 0 4 # First 5: [1,2,3,4,5]
LRANGE numbers -5 -1 # Last 5: [6,7,8,9,10]
LRANGE numbers 2 5 # Items 2-5: [3,4,5,6]
Trimming Lists
Keep only a certain number of items:
RPUSH events "e1" "e2" "e3" "e4" "e5"
# Keep only last 3
LTRIM events -3 -1 # Now: ["e3", "e4", "e5"]
Real-world - keep only recent activity:
# Every new activity
RPUSH user:1:feed activity
LTRIM user:1:feed -100 -1 # Keep only last 100 items
Common Patterns
Producer-Consumer Pattern
# Producer (your main app)
RPUSH tasks "task1" "task2" "task3"
# Consumer (background workers)
def worker():
while True:
task = BLPOP tasks 0 # Wait for task
execute(task)
# If error, put it back or in failed queue
RPUSH failed_tasks task
Node.js (Producer-Consumer):
// Producer: Add tasks from API
@app.post('/tasks')
async function createTask(req, res) {
const task = {
id: crypto.randomUUID(),
type: req.body.type,
payload: req.body.payload,
createdAt: Date.now()
};
await redis.rpush('tasks', JSON.stringify(task));
res.json({ taskId: task.id, status: 'queued' });
}
// Consumer: Process tasks with error handling
async function consumer() {
while (true) {
try {
const result = await redis.blpop('tasks', 0);
const [, taskData] = result;
const task = JSON.parse(taskData);
await executeTask(task);
console.log(`Task ${task.id} completed`);
} catch (error) {
console.error('Task failed:', error);
// Move to failed queue for retry
await redis.rpush('tasks:failed', taskData);
}
}
}
Python (Producer-Consumer):
from uuid import uuid4
from datetime import datetime
def create_task(task_type: str, payload: dict) -> dict:
"""Producer: Add task to queue"""
task_data = {
'id': str(uuid4()),
'type': task_type,
'payload': payload,
'created_at': datetime.now().isoformat()
}
r.rpush('tasks', json.dumps(task_data))
return {'task_id': task_data['id'], 'status': 'queued'}
def consumer():
"""Consumer: Process tasks with error handling"""
while True:
try:
result = r.blpop('tasks', timeout=0)
if result:
_, task_data = result
task = json.loads(task_data)
execute_task(task)
print(f"Task {task['id']} completed")
except Exception as e:
print(f'Task failed: {e}')
# Move to failed queue for retry
r.rpush('tasks:failed', task_data)
# Usage
create_task('send_email', {'to': 'user@example.com', 'subject': 'Welcome'})
create_task('process_image', {'image_id': 123})
# Start consumer in separate thread/process
import threading
threading.Thread(target=consumer, daemon=True).start()
Breadcrumb Trail
# User does things
RPUSH user:1:trail "viewed product:123"
RPUSH user:1:trail "added to cart"
RPUSH user:1:trail "checkout started"
# See what led up to an error
LRANGE user:1:trail -10 -1
Rate Limiting with Sliding Window
RPUSH requests:user:1 1707300000
RPUSH requests:user:1 1707300001
RPUSH requests:user:1 1707300002
# Remove requests older than 1 hour
one_hour_ago = now() - 3600
LRANGE requests:user:1 0 -1
# Delete old ones