Перейти к содержимому

20. Redis: Streams

Redis Streams — это log-based структура данных для reliable messaging, похожая на Apache Kafka, но проще и быстрее.

ХарактеристикаPub/SubStreams
Persistence❌ Нет✅ Да
История❌ Нет✅ Да
Consumer Groups❌ Нет✅ Да
Acknowledgment❌ Нет✅ Да
Replay❌ Нет✅ Да

Вывод: Streams для reliable messaging, Pub/Sub для simple real-time.

import { createClient } from 'redis';
const redis = createClient();
await redis.connect();
// Добавление сообщения в stream
const messageId = await redis.xAdd('events', '*', {
type: 'user_signup',
userId: '123',
timestamp: Date.now().toString()
});
// Возвращает ID: "1640000000000-0"
// Чтение сообщений
const messages = await redis.xRange('events', '-', '+');
// [{ id: '1640000000000-0', message: { type: 'user_signup', ... } }]
// Чтение последних N сообщений
const last10 = await redis.xRevRange('events', '+', '-', { COUNT: 10 });

Consumer Groups позволяют нескольким consumers обрабатывать stream с распределением нагрузки.

// Создание consumer group
await redis.xGroupCreate('events', 'email-workers', '0', {
MKSTREAM: true
});
// Consumer #1
async function emailWorker(consumerId: string) {
while (true) {
const messages = await redis.xReadGroup(
'email-workers',
consumerId,
{ key: 'events', id: '>' },
{ COUNT: 10, BLOCK: 5000 } // Block 5 секунд
);
if (!messages || messages.length === 0) continue;
for (const [stream, streamMessages] of messages) {
for (const { id, message } of streamMessages) {
try {
// Обработка
await sendEmail(message.email, message.type);
// Acknowledgment
await redis.xAck('events', 'email-workers', id);
} catch (error) {
console.error('Failed to process:', id, error);
// Сообщение останется в pending
}
}
}
}
}
// Запуск нескольких workers
emailWorker('worker-1');
emailWorker('worker-2');
emailWorker('worker-3');
// Order Events Stream
class OrderService {
private streamKey = 'orders:events';
async createOrder(userId: string, items: any[]) {
const orderId = randomUUID();
await redis.xAdd(this.streamKey, '*', {
event: 'OrderCreated',
orderId,
userId,
items: JSON.stringify(items),
timestamp: Date.now().toString()
});
return orderId;
}
async confirmPayment(orderId: string, paymentId: string) {
await redis.xAdd(this.streamKey, '*', {
event: 'PaymentConfirmed',
orderId,
paymentId,
timestamp: Date.now().toString()
});
}
async shipOrder(orderId: string, trackingNumber: string) {
await redis.xAdd(this.streamKey, '*', {
event: 'OrderShipped',
orderId,
trackingNumber,
timestamp: Date.now().toString()
});
}
// Восстановление состояния заказа из событий
async getOrderHistory(orderId: string) {
const events = await redis.xRange(this.streamKey, '-', '+');
return events
.filter(({ message }) => message.orderId === orderId)
.map(({ id, message }) => ({
id,
event: message.event,
timestamp: parseInt(message.timestamp),
...message
}));
}
}
// Получение pending messages
const pending = await redis.xPending('events', 'email-workers');
console.log('Pending count:', pending.pending);
// Детали pending для конкретного consumer
const details = await redis.xPendingRange(
'events',
'email-workers',
'-',
'+',
10,
'worker-1'
);
// Claim старых pending messages (если worker умер)
const claimed = await redis.xClaim(
'events',
'email-workers',
'worker-2', // новый owner
60000, // старше 60 секунд
details.map(msg => msg.id)
);
// Оставить только последние 10000 сообщений
await redis.xTrim('events', 'MAXLEN', 10000);
// Приблизительная обрезка (быстрее)
await redis.xTrim('events', 'MAXLEN', '~', 10000);
// Автоматическое trimming при добавлении
await redis.xAdd('events', '*', { data: 'value' }, {
TRIM: { strategy: 'MAXLEN', strategyModifier: '~', threshold: 10000 }
});
// Информация о stream
const info = await redis.xInfoStream('events');
console.log('Length:', info.length);
console.log('First entry:', info['first-entry']);
console.log('Last entry:', info['last-entry']);
// Информация о consumer group
const groupInfo = await redis.xInfoGroups('events');
console.log('Groups:', groupInfo);
// Информация о consumers
const consumers = await redis.xInfoConsumers('events', 'email-workers');
console.log('Consumers:', consumers);
class ActivityFeedService {
async addActivity(userId: string, activity: any) {
await redis.xAdd(`feed:${userId}`, '*', {
type: activity.type,
data: JSON.stringify(activity.data),
timestamp: Date.now().toString()
});
// Ограничение до 100 последних активностей
await redis.xTrim(`feed:${userId}`, 'MAXLEN', '~', 100);
}
async getUserFeed(userId: string, count = 20) {
const messages = await redis.xRevRange(
`feed:${userId}`,
'+',
'-',
{ COUNT: count }
);
return messages.map(({ id, message }) => ({
id,
type: message.type,
data: JSON.parse(message.data),
timestamp: parseInt(message.timestamp)
}));
}
// Real-time подписка на новые активности
async subscribeToFeed(userId: string, callback: Function) {
let lastId = '$'; // только новые сообщения
while (true) {
const messages = await redis.xRead(
{ key: `feed:${userId}`, id: lastId },
{ BLOCK: 5000 }
);
if (messages && messages.length > 0) {
for (const [stream, streamMessages] of messages) {
for (const msg of streamMessages) {
callback(msg);
lastId = msg.id;
}
}
}
}
}
}
  1. Используйте Consumer Groups для scaling
  2. XACK сразу после обработки (или используйте транзакции)
  3. Monitoring pending messages и claim старых
  4. TRIM регулярно для ограничения памяти
  5. Event Sourcing вместо снэпшотов состояния

Хорошо для:

  • Event sourcing
  • Activity feeds
  • Audit logs
  • Task queues с гарантией доставки
  • Metrics/Analytics

Плохо для:

  • Simple pub/sub (используйте Pub/Sub)
  • Очень большие потоки (>1B сообщений) — лучше Kafka

Следующий урок: Нормализация БД