Перейти к содержимому

3. Subjects: горячие потоки

📡 Subjects — Горячие потоки и общие состояния

Заголовок раздела «📡 Subjects — Горячие потоки и общие состояния»

Привет, Яша! 🔥 Сегодня разбираем Subject — особый вид Observable, который умеет быть одновременно источником и получателем данных. Это то, что делает RxJS по-настоящему мощным! 💪


Subject — это Observable, который:

  1. Является горячим (не создаёт отдельный поток для каждого подписчика)
  2. Может сам отправлять значения через .next()
  3. Может иметь несколько подписчиков — и все получают одно и то же значение

Аналогия: радиостанция 📻

  • Observable = DVD (каждый зритель смотрит с начала своего диска)
  • Subject = FM Radio (все слушают одну трансляцию в прямом эфире)
import { Subject } from 'rxjs';
const radio$ = new Subject<string>();
// Два "слушателя" подписываются
radio$.subscribe(msg => console.log('Слушатель 1:', msg));
radio$.subscribe(msg => console.log('Слушатель 2:', msg));
// Subject сам отправляет данные!
radio$.next('🎵 Первая песня');
// Слушатель 1: 🎵 Первая песня
// Слушатель 2: 🎵 Первая песня
radio$.next('🎵 Вторая песня');
// Слушатель 1: 🎵 Вторая песня
// Слушатель 2: 🎵 Вторая песня
radio$.complete(); // Станция закрылась

import { Observable, Subject, interval } from 'rxjs';
import { take } from 'rxjs/operators';
// ❄️ ХОЛОДНЫЙ — каждый подписчик получает свой поток с начала
const cold$ = new Observable(sub => {
console.log('Новый поток создан!');
sub.next(Math.random()); // Случайное число
sub.next(Math.random());
sub.complete();
});
cold$.subscribe(v => console.log('А:', v)); // Новый поток! Своё случайное число
cold$.subscribe(v => console.log('Б:', v)); // Новый поток! Другое случайное число
// A: 0.123 <- разные!
// Б: 0.987 <- разные!
// 🔥 ГОРЯЧИЙ (Subject) — все подписчики получают одни и те же данные
const hot$ = new Subject<number>();
hot$.subscribe(v => console.log('А:', v));
hot$.subscribe(v => console.log('Б:', v));
hot$.next(Math.random()); // Одно значение — оба получают одно и то же!
// A: 0.456 <- одинаковые!
// Б: 0.456 <- одинаковые!

import { Subject } from 'rxjs';
const updates$ = new Subject<string>();
updates$.next('Событие 1'); // ← никто не подписан, потеряно!
updates$.subscribe(v => console.log('Подписчик:', v));
updates$.next('Событие 2'); // ✅ Получит
updates$.next('Событие 3'); // ✅ Получит
// Подписчик: Событие 2
// Подписчик: Событие 3
// Событие 1 потеряно навсегда 😢

Это проблема. Её решают BehaviorSubject и ReplaySubject!


BehaviorSubject хранит последнее значение и сразу отдаёт его новым подписчикам.

Аналогия: термостат 🌡️ — показывает текущую температуру, и любой кто посмотрит — увидит актуальное значение.

import { BehaviorSubject } from 'rxjs';
// Начальное значение ОБЯЗАТЕЛЬНО!
const temperature$ = new BehaviorSubject<number>(22);
// Подписчик 1 сразу получает текущее значение (22)
temperature$.subscribe(t => console.log('Термометр 1:', t));
// Термометр 1: 22
// Температура изменилась
temperature$.next(25);
// Термометр 1: 25
// Новый подписчик — сразу получает ПОСЛЕДНЕЕ значение (25)
temperature$.subscribe(t => console.log('Термометр 2:', t));
// Термометр 2: 25
// Читаем текущее значение без подписки!
console.log('Текущая температура:', temperature$.getValue()); // 25
import { BehaviorSubject } from 'rxjs';
import { map } from 'rxjs/operators';
interface CartState {
items: string[];
total: number;
}
// Простой стор
const cart$ = new BehaviorSubject<CartState>({ items: [], total: 0 });
// Производные потоки
const itemCount$ = cart$.pipe(map(s => s.items.length));
const isEmpty$ = cart$.pipe(map(s => s.items.length === 0));
// Методы обновления
function addItem(item: string, price: number) {
const current = cart$.getValue();
cart$.next({
items: [...current.items, item],
total: current.total + price
});
}
// Используем
itemCount$.subscribe(n => console.log('Товаров в корзине:', n));
cart$.subscribe(s => console.log('Корзина:', s));
addItem('Яблоко', 50); // Товаров: 1
addItem('Банан', 30); // Товаров: 2

ReplaySubject(N) запоминает последние N значений и отдаёт их новым подписчикам.

Аналогия: DVR/TiVo 📹 — записывает трансляцию, и опоздавший может посмотреть последние N минут.

import { ReplaySubject } from 'rxjs';
// Запоминает последние 3 значения
const replayBuffer$ = new ReplaySubject<string>(3);
replayBuffer$.next('🎬 Событие 1');
replayBuffer$.next('🎬 Событие 2');
replayBuffer$.next('🎬 Событие 3');
replayBuffer$.next('🎬 Событие 4');
replayBuffer$.next('🎬 Событие 5');
// Подписываемся ПОСЛЕ всех событий
replayBuffer$.subscribe(v => console.log('Опоздал, но получил:', v));
// Опоздал, но получил: 🎬 Событие 3 (только последние 3!)
// Опоздал, но получил: 🎬 Событие 4
// Опоздал, но получил: 🎬 Событие 5
// ReplaySubject с временным буфером (500мс)
const timedReplay$ = new ReplaySubject<number>(100, 500);
// хранит до 100 значений за последние 500мс

AsyncSubject — только последнее значение при complete 🏁

Заголовок раздела «AsyncSubject — только последнее значение при complete 🏁»

AsyncSubject отдаёт только последнее значение, и только когда вызван complete().

Аналогия: приз за финиш 🏆 — получишь результат только когда гонка завершится.

import { AsyncSubject } from 'rxjs';
const async$ = new AsyncSubject<number>();
async$.subscribe(v => console.log('Получили:', v));
async$.next(1); // Не придёт
async$.next(2); // Не придёт
async$.next(3); // Не придёт
async$.complete(); // Только теперь!
// Получили: 3 ← только последнее значение!

💡 AsyncSubject похож на Promise — одно финальное значение. Используй его для операций типа “дождись конца загрузки”.


SubjectЗапоминаетКогда использовать
SubjectНичегоСобытия (клики, WebSocket)
BehaviorSubjectПоследнее значениеState management, текущий пользователь
ReplaySubject(N)N последних значенийКэш последних событий, логи
AsyncSubjectТолько при completeОдноразовые async операции

Попробуйте примеры в интерактивном редакторе: