11. Multicasting: share, publish
12. Multicasting: share, shareReplay, publish 📡
Заголовок раздела «12. Multicasting: share, shareReplay, publish 📡»Привет! Яша на связи. Сегодня разбираем одну из самых «взрывоопасных» тем в RxJS — multicasting. Именно здесь скрывается классическая ошибка с дублирующимися HTTP-запросами.
Cold vs Hot Observable — глубокое погружение
Заголовок раздела «Cold vs Hot Observable — глубокое погружение»Cold Observable — как видеозапись на DVD. Каждый подписчик получает свою копию воспроизведения с самого начала.
Hot Observable — как прямой эфир на телевидении. Все подписчики получают одни и те же данные в реальном времени. Если опоздал — пропустил.
import { Observable, Subject, interval } from 'rxjs';import { take } from 'rxjs/operators';
// ❄️ COLD: каждая подписка запускает новый producerconst cold$ = new Observable<number>(observer => { console.log('🆕 Новый producer запущен!'); // вызывается для КАЖДОГО подписчика let i = 0; const id = setInterval(() => observer.next(i++), 1000); return () => clearInterval(id);});
cold$.subscribe(v => console.log('Подписчик A:', v));// 🆕 Новый producer запущен!setTimeout(() => { cold$.subscribe(v => console.log('Подписчик B:', v)); // 🆕 Новый producer запущен! (ВТОРОЙ независимый!)}, 2000);
// 🔥 HOT: один producer, все подписчики слушают егоconst subject = new Subject<number>();const hot$ = subject.asObservable();
hot$.subscribe(v => console.log('Подписчик A:', v));setTimeout(() => { hot$.subscribe(v => console.log('Подписчик B:', v)); // Подписчик B пропустил первые события}, 2000);
// Один источник для всехsubject.next(1); // только A: 1subject.next(2); // только A: 2subject.next(3); // A: 3, B: 3 (B подписался)Проблема дублирования HTTP-запросов
Заголовок раздела «Проблема дублирования HTTP-запросов»Это самая распространённая боль с Cold Observables:
import { from } from 'rxjs';import { map } from 'rxjs/operators';
// ❌ Плохо: два запроса к APIconst users$ = from(fetch('/api/users').then(r => r.json()));
// Два компонента — два запроса!users$.subscribe(users => renderList(users));users$.subscribe(users => renderCount(users));Решение — превратить холодный Observable в горячий.
share() — самый простой multicast
Заголовок раздела «share() — самый простой multicast»share() — это сокращение для pipe(multicast(() => new Subject()), refCount()). Он делит одну подписку на источник между всеми подписчиками.
import { interval } from 'rxjs';import { share, take, tap } from 'rxjs/operators';
const source$ = interval(1000).pipe( tap(v => console.log(`[source] производим ${v}`)), // выполняется ОДИН раз take(5), share() // делимся потоком);
// Оба подписчика получают ОДИНАКОВЫЕ данные от ОДНОГО источникаsource$.subscribe(v => console.log('A:', v));source$.subscribe(v => console.log('B:', v));// [source] производим 0// A: 0// B: 0// [source] производим 1// A: 1// B: 1Важно: если все подписчики отписались, share() завершает внутреннюю подписку. При новой подписке — всё начинается заново.
// share() с конфигурацией (RxJS 7.4+)const shared$ = source$.pipe( share({ connector: () => new ReplaySubject(1), // запоминает последнее значение resetOnRefCountZero: true, // сбросить при нулевых подписчиках resetOnComplete: false, resetOnError: true, }));shareReplay() — кэш для опоздавших
Заголовок раздела «shareReplay() — кэш для опоздавших»shareReplay(bufferSize) запоминает последние N значений и немедленно отдаёт их новым подписчикам.
import { shareReplay } from 'rxjs/operators';
// ✅ Хорошо: HTTP запрос выполнится ОДИН раз, результат кэшируетсяconst users$ = from(fetch('/api/users').then(r => r.json())).pipe( shareReplay(1) // кэшируем последний ответ);
users$.subscribe(users => renderList(users)); // запрос уходитusers$.subscribe(users => renderCount(users)); // получает кэш, нет нового запроса
// Даже после завершения первого потокаsetTimeout(() => { users$.subscribe(users => console.log('Поздний подписчик:', users)); // тоже кэш!}, 5000);⚠️ Осторожно с
shareReplay! По умолчаниюrefCount: false— подписка на источник не закрывается даже если все подписчики ушли. ИспользуйтеshareReplay({ bufferSize: 1, refCount: true })для предотвращения утечек памяти.
// Безопасный вариантconst safe$ = source$.pipe( shareReplay({ bufferSize: 1, refCount: true }));publish() + refCount() — ручной multicast
Заголовок раздела «publish() + refCount() — ручной multicast»publish() создаёт ConnectableObservable. Подписки накапливаются, но источник не запускается до вызова .connect().
import { interval } from 'rxjs';import { publish, refCount, take } from 'rxjs/operators';
const published$ = interval(1000).pipe( take(5), publish() // возвращает ConnectableObservable);
// Подписываемся — но источник ещё НЕ запущенpublished$.subscribe(v => console.log('A:', v));published$.subscribe(v => console.log('B:', v));
// Запускаем вручнуюpublished$.connect(); // только теперь interval начинает работатьrefCount() автоматизирует connect()/unsubscribe(): запускает при первом подписчике, останавливает при последнем.
const autoConnect$ = interval(1000).pipe( publish(), refCount() // = share() по поведению);multicast() — для продвинутых сценариев
Заголовок раздела «multicast() — для продвинутых сценариев»multicast принимает Subject или фабрику Subject, даёт максимальный контроль:
import { multicast } from 'rxjs/operators';import { ReplaySubject } from 'rxjs';
// Используем ReplaySubject — новые подписчики получат историюconst multicasted$ = source$.pipe( multicast(() => new ReplaySubject<number>(3)), // буфер 3 значения);
// Нужно вручную connect/refCountconst connected$ = multicasted$.pipe(refCount());Таблица сравнения
Заголовок раздела «Таблица сравнения»| Оператор | Тип Subject | refCount | Буфер | Типичное применение |
|---|---|---|---|---|
share() | Subject | Да | 0 | Общие потоки событий |
shareReplay(n) | ReplaySubject(n) | Опц. | n | HTTP кэш, конфигурация |
publish() | Subject | Нет | 0 | Ручное управление запуском |
publish().refCount() | Subject | Да | 0 | ≈ share() |
publishReplay(n).refCount() | ReplaySubject(n) | Да | n | Кэш с авто-очисткой |
Практика
Заголовок раздела «Практика»Попробуйте примеры в интерактивном редакторе: