20. Redis: Streams
Redis Streams — это log-based структура данных для reliable messaging, похожая на Apache Kafka, но проще и быстрее.
Streams vs Pub/Sub
Заголовок раздела «Streams vs Pub/Sub»| Характеристика | Pub/Sub | Streams |
|---|---|---|
| Persistence | ❌ Нет | ✅ Да |
| История | ❌ Нет | ✅ Да |
| Consumer Groups | ❌ Нет | ✅ Да |
| Acknowledgment | ❌ Нет | ✅ Да |
| Replay | ❌ Нет | ✅ Да |
Вывод: Streams для reliable messaging, Pub/Sub для simple real-time.
Базовое использование
Заголовок раздела «Базовое использование»import { createClient } from 'redis';
const redis = createClient();await redis.connect();
// Добавление сообщения в streamconst messageId = await redis.xAdd('events', '*', { type: 'user_signup', userId: '123', timestamp: Date.now().toString()});// Возвращает ID: "1640000000000-0"
// Чтение сообщенийconst messages = await redis.xRange('events', '-', '+');// [{ id: '1640000000000-0', message: { type: 'user_signup', ... } }]
// Чтение последних N сообщенийconst last10 = await redis.xRevRange('events', '+', '-', { COUNT: 10 });Consumer Groups
Заголовок раздела «Consumer Groups»Consumer Groups позволяют нескольким consumers обрабатывать stream с распределением нагрузки.
// Создание consumer groupawait redis.xGroupCreate('events', 'email-workers', '0', { MKSTREAM: true});
// Consumer #1async function emailWorker(consumerId: string) { while (true) { const messages = await redis.xReadGroup( 'email-workers', consumerId, { key: 'events', id: '>' }, { COUNT: 10, BLOCK: 5000 } // Block 5 секунд );
if (!messages || messages.length === 0) continue;
for (const [stream, streamMessages] of messages) { for (const { id, message } of streamMessages) { try { // Обработка await sendEmail(message.email, message.type);
// Acknowledgment await redis.xAck('events', 'email-workers', id); } catch (error) { console.error('Failed to process:', id, error); // Сообщение останется в pending } } } }}
// Запуск нескольких workersemailWorker('worker-1');emailWorker('worker-2');emailWorker('worker-3');Event Sourcing пример
Заголовок раздела «Event Sourcing пример»// Order Events Streamclass OrderService { private streamKey = 'orders:events';
async createOrder(userId: string, items: any[]) { const orderId = randomUUID();
await redis.xAdd(this.streamKey, '*', { event: 'OrderCreated', orderId, userId, items: JSON.stringify(items), timestamp: Date.now().toString() });
return orderId; }
async confirmPayment(orderId: string, paymentId: string) { await redis.xAdd(this.streamKey, '*', { event: 'PaymentConfirmed', orderId, paymentId, timestamp: Date.now().toString() }); }
async shipOrder(orderId: string, trackingNumber: string) { await redis.xAdd(this.streamKey, '*', { event: 'OrderShipped', orderId, trackingNumber, timestamp: Date.now().toString() }); }
// Восстановление состояния заказа из событий async getOrderHistory(orderId: string) { const events = await redis.xRange(this.streamKey, '-', '+');
return events .filter(({ message }) => message.orderId === orderId) .map(({ id, message }) => ({ id, event: message.event, timestamp: parseInt(message.timestamp), ...message })); }}Pending Messages (не обработанные)
Заголовок раздела «Pending Messages (не обработанные)»// Получение pending messagesconst pending = await redis.xPending('events', 'email-workers');console.log('Pending count:', pending.pending);
// Детали pending для конкретного consumerconst details = await redis.xPendingRange( 'events', 'email-workers', '-', '+', 10, 'worker-1');
// Claim старых pending messages (если worker умер)const claimed = await redis.xClaim( 'events', 'email-workers', 'worker-2', // новый owner 60000, // старше 60 секунд details.map(msg => msg.id));Trimming (ограничение размера)
Заголовок раздела «Trimming (ограничение размера)»// Оставить только последние 10000 сообщенийawait redis.xTrim('events', 'MAXLEN', 10000);
// Приблизительная обрезка (быстрее)await redis.xTrim('events', 'MAXLEN', '~', 10000);
// Автоматическое trimming при добавленииawait redis.xAdd('events', '*', { data: 'value' }, { TRIM: { strategy: 'MAXLEN', strategyModifier: '~', threshold: 10000 }});Monitoring Stream
Заголовок раздела «Monitoring Stream»// Информация о streamconst info = await redis.xInfoStream('events');console.log('Length:', info.length);console.log('First entry:', info['first-entry']);console.log('Last entry:', info['last-entry']);
// Информация о consumer groupconst groupInfo = await redis.xInfoGroups('events');console.log('Groups:', groupInfo);
// Информация о consumersconst consumers = await redis.xInfoConsumers('events', 'email-workers');console.log('Consumers:', consumers);Практический пример: Activity Feed
Заголовок раздела «Практический пример: Activity Feed»class ActivityFeedService { async addActivity(userId: string, activity: any) { await redis.xAdd(`feed:${userId}`, '*', { type: activity.type, data: JSON.stringify(activity.data), timestamp: Date.now().toString() });
// Ограничение до 100 последних активностей await redis.xTrim(`feed:${userId}`, 'MAXLEN', '~', 100); }
async getUserFeed(userId: string, count = 20) { const messages = await redis.xRevRange( `feed:${userId}`, '+', '-', { COUNT: count } );
return messages.map(({ id, message }) => ({ id, type: message.type, data: JSON.parse(message.data), timestamp: parseInt(message.timestamp) })); }
// Real-time подписка на новые активности async subscribeToFeed(userId: string, callback: Function) { let lastId = '$'; // только новые сообщения
while (true) { const messages = await redis.xRead( { key: `feed:${userId}`, id: lastId }, { BLOCK: 5000 } );
if (messages && messages.length > 0) { for (const [stream, streamMessages] of messages) { for (const msg of streamMessages) { callback(msg); lastId = msg.id; } } } } }}💡 Best Practices
Заголовок раздела «💡 Best Practices»- Используйте Consumer Groups для scaling
- XACK сразу после обработки (или используйте транзакции)
- Monitoring pending messages и claim старых
- TRIM регулярно для ограничения памяти
- Event Sourcing вместо снэпшотов состояния
Когда использовать Streams
Заголовок раздела «Когда использовать Streams»✅ Хорошо для:
- Event sourcing
- Activity feeds
- Audit logs
- Task queues с гарантией доставки
- Metrics/Analytics
❌ Плохо для:
- Simple pub/sub (используйте Pub/Sub)
- Очень большие потоки (>1B сообщений) — лучше Kafka
Следующий урок: Нормализация БД →