19. Redis: Pub/Sub
Publish/Subscribe pattern для real-time messaging между приложениями без постоянного хранения сообщений.
Основы Pub/Sub
Заголовок раздела «Основы Pub/Sub»graph LR A[Publisher] -->|PUBLISH| B[Channel: notifications] B --> C[Subscriber 1] B --> D[Subscriber 2] B --> E[Subscriber 3]Важно: Сообщения НЕ сохраняются! Если подписчик offline — сообщение потеряно.
Базовое использование
Заголовок раздела «Базовое использование»import { createClient } from 'redis';
// Publisherconst publisher = createClient();await publisher.connect();
await publisher.publish('notifications', JSON.stringify({ type: 'new_message', userId: 123, text: 'Hello!'}));
// Subscriberconst subscriber = createClient();await subscriber.connect();
await subscriber.subscribe('notifications', (message) => { const data = JSON.parse(message); console.log('Received:', data); // Отправить push, email, WebSocket, etc.});Real-time чат
Заголовок раздела «Real-time чат»class ChatService { private publisher = createClient(); private subscriber = createClient();
async connect() { await Promise.all([ this.publisher.connect(), this.subscriber.connect() ]); }
// Отправка сообщения в комнату async sendMessage(roomId: string, userId: string, text: string) { const message = { roomId, userId, text, timestamp: Date.now() };
await this.publisher.publish( `chat:room:${roomId}`, JSON.stringify(message) );
// Также сохраняем в БД для истории await db.messages.create(message); }
// Подписка на комнату async subscribeToRoom(roomId: string, callback: (message: any) => void) { await this.subscriber.subscribe(`chat:room:${roomId}`, (msg) => { callback(JSON.parse(msg)); }); }
// Отписка async unsubscribeFromRoom(roomId: string) { await this.subscriber.unsubscribe(`chat:room:${roomId}`); }}
// Использование с WebSocketconst chat = new ChatService();await chat.connect();
io.on('connection', (socket) => { socket.on('join_room', async (roomId) => { await chat.subscribeToRoom(roomId, (message) => { socket.emit('new_message', message); }); });
socket.on('send_message', async ({ roomId, text }) => { await chat.sendMessage(roomId, socket.userId, text); });});Pattern Matching (psubscribe)
Заголовок раздела «Pattern Matching (psubscribe)»// Подписка на все каналы с паттерномawait subscriber.pSubscribe('notifications:*', (message, channel) => { console.log(`Message from ${channel}:`, message);});
// Теперь получим сообщения из:await publisher.publish('notifications:email', 'Email notification');await publisher.publish('notifications:sms', 'SMS notification');await publisher.publish('notifications:push', 'Push notification');Monitoring активности пользователей
Заголовок раздела «Monitoring активности пользователей»// Пользователь онлайн/оффлайнclass PresenceService { private publisher = createClient();
async setUserOnline(userId: string) { await this.publisher.publish('presence', JSON.stringify({ userId, status: 'online', timestamp: Date.now() })); }
async setUserOffline(userId: string) { await this.publisher.publish('presence', JSON.stringify({ userId, status: 'offline', timestamp: Date.now() })); }}
// Подписка на presence eventsawait subscriber.subscribe('presence', (message) => { const { userId, status } = JSON.parse(message);
// Обновить UI, отправить WebSocket всем friends io.to(`friends:${userId}`).emit('user_status', { userId, status });});Live notifications
Заголовок раздела «Live notifications»// Notification Serviceclass NotificationService { private publisher = createClient();
async notify(userId: string, notification: any) { await this.publisher.publish( `notifications:user:${userId}`, JSON.stringify(notification) );
// Также сохраняем в БД await db.notifications.create({ userId, ...notification, read: false }); }}
// Client подписывается на свои уведомленияasync function subscribeToUserNotifications(userId: string, callback: Function) { await subscriber.subscribe(`notifications:user:${userId}`, (msg) => { callback(JSON.parse(msg)); });}
// Использованиеawait notificationService.notify('user123', { type: 'new_follower', fromUserId: 'user456', text: 'Alice started following you'});Broadcast для всех пользователей
Заголовок раздела «Broadcast для всех пользователей»// System-wide announcementawait publisher.publish('announcements', JSON.stringify({ type: 'maintenance', text: 'Scheduled maintenance in 1 hour', duration: '30 minutes'}));
// Все подписчики получатawait subscriber.subscribe('announcements', (message) => { const announcement = JSON.parse(message); io.emit('system_announcement', announcement);});Pub/Sub + Worker Queue
Заголовок раздела «Pub/Sub + Worker Queue»// Комбинирование Pub/Sub для уведомлений и Queue для задач
// Publisher отправляет задачуawait redis.lPush('tasks:queue', JSON.stringify(task));await redis.publish('tasks:new', 'Task added');
// Worker подписан и обрабатываетawait subscriber.subscribe('tasks:new', async () => { const task = await redis.rPop('tasks:queue'); if (task) { await processTask(JSON.parse(task)); }});Ограничения Pub/Sub
Заголовок раздела «Ограничения Pub/Sub»❌ НЕ используйте Pub/Sub когда:
- Нужна надёжная доставка (сообщения не сохраняются)
- Подписчик может быть offline
- Требуется гарантия обработки каждого сообщения
✅ Используйте Pub/Sub для:
- Real-time уведомления
- Чат
- Live updates
- Invalidation кэша
- Event broadcasting
Для надёжного messaging используйте Redis Streams или RabbitMQ/Kafka.
💡 Best Practices
Заголовок раздела «💡 Best Practices»- НЕ храните критичные данные в Pub/Sub (нет persistence)
- Используйте JSON для структурированных сообщений
- Namespace channels (
chat:room:123вместо123) - Reconnection logic при обрыве связи
- Combine с БД для истории сообщений
Следующий урок: Redis Streams →