Перейти к содержимому

11. Multicasting: share, publish

Привет! Яша на связи. Сегодня разбираем одну из самых «взрывоопасных» тем в RxJS — multicasting. Именно здесь скрывается классическая ошибка с дублирующимися HTTP-запросами.

Cold Observable — как видеозапись на DVD. Каждый подписчик получает свою копию воспроизведения с самого начала.

Hot Observable — как прямой эфир на телевидении. Все подписчики получают одни и те же данные в реальном времени. Если опоздал — пропустил.

import { Observable, Subject, interval } from 'rxjs';
import { take } from 'rxjs/operators';
// ❄️ COLD: каждая подписка запускает новый producer
const cold$ = new Observable<number>(observer => {
console.log('🆕 Новый producer запущен!'); // вызывается для КАЖДОГО подписчика
let i = 0;
const id = setInterval(() => observer.next(i++), 1000);
return () => clearInterval(id);
});
cold$.subscribe(v => console.log('Подписчик A:', v));
// 🆕 Новый producer запущен!
setTimeout(() => {
cold$.subscribe(v => console.log('Подписчик B:', v));
// 🆕 Новый producer запущен! (ВТОРОЙ независимый!)
}, 2000);
// 🔥 HOT: один producer, все подписчики слушают его
const subject = new Subject<number>();
const hot$ = subject.asObservable();
hot$.subscribe(v => console.log('Подписчик A:', v));
setTimeout(() => {
hot$.subscribe(v => console.log('Подписчик B:', v));
// Подписчик B пропустил первые события
}, 2000);
// Один источник для всех
subject.next(1); // только A: 1
subject.next(2); // только A: 2
subject.next(3); // A: 3, B: 3 (B подписался)

Это самая распространённая боль с Cold Observables:

import { from } from 'rxjs';
import { map } from 'rxjs/operators';
// ❌ Плохо: два запроса к API
const users$ = from(fetch('/api/users').then(r => r.json()));
// Два компонента — два запроса!
users$.subscribe(users => renderList(users));
users$.subscribe(users => renderCount(users));

Решение — превратить холодный Observable в горячий.


share() — это сокращение для pipe(multicast(() => new Subject()), refCount()). Он делит одну подписку на источник между всеми подписчиками.

import { interval } from 'rxjs';
import { share, take, tap } from 'rxjs/operators';
const source$ = interval(1000).pipe(
tap(v => console.log(`[source] производим ${v}`)), // выполняется ОДИН раз
take(5),
share() // делимся потоком
);
// Оба подписчика получают ОДИНАКОВЫЕ данные от ОДНОГО источника
source$.subscribe(v => console.log('A:', v));
source$.subscribe(v => console.log('B:', v));
// [source] производим 0
// A: 0
// B: 0
// [source] производим 1
// A: 1
// B: 1

Важно: если все подписчики отписались, share() завершает внутреннюю подписку. При новой подписке — всё начинается заново.

// share() с конфигурацией (RxJS 7.4+)
const shared$ = source$.pipe(
share({
connector: () => new ReplaySubject(1), // запоминает последнее значение
resetOnRefCountZero: true, // сбросить при нулевых подписчиках
resetOnComplete: false,
resetOnError: true,
})
);

shareReplay(bufferSize) запоминает последние N значений и немедленно отдаёт их новым подписчикам.

import { shareReplay } from 'rxjs/operators';
// ✅ Хорошо: HTTP запрос выполнится ОДИН раз, результат кэшируется
const users$ = from(fetch('/api/users').then(r => r.json())).pipe(
shareReplay(1) // кэшируем последний ответ
);
users$.subscribe(users => renderList(users)); // запрос уходит
users$.subscribe(users => renderCount(users)); // получает кэш, нет нового запроса
// Даже после завершения первого потока
setTimeout(() => {
users$.subscribe(users => console.log('Поздний подписчик:', users)); // тоже кэш!
}, 5000);

⚠️ Осторожно с shareReplay! По умолчанию refCount: false — подписка на источник не закрывается даже если все подписчики ушли. Используйте shareReplay({ bufferSize: 1, refCount: true }) для предотвращения утечек памяти.

// Безопасный вариант
const safe$ = source$.pipe(
shareReplay({ bufferSize: 1, refCount: true })
);

publish() создаёт ConnectableObservable. Подписки накапливаются, но источник не запускается до вызова .connect().

import { interval } from 'rxjs';
import { publish, refCount, take } from 'rxjs/operators';
const published$ = interval(1000).pipe(
take(5),
publish() // возвращает ConnectableObservable
);
// Подписываемся — но источник ещё НЕ запущен
published$.subscribe(v => console.log('A:', v));
published$.subscribe(v => console.log('B:', v));
// Запускаем вручную
published$.connect(); // только теперь interval начинает работать

refCount() автоматизирует connect()/unsubscribe(): запускает при первом подписчике, останавливает при последнем.

const autoConnect$ = interval(1000).pipe(
publish(),
refCount() // = share() по поведению
);

multicast принимает Subject или фабрику Subject, даёт максимальный контроль:

import { multicast } from 'rxjs/operators';
import { ReplaySubject } from 'rxjs';
// Используем ReplaySubject — новые подписчики получат историю
const multicasted$ = source$.pipe(
multicast(() => new ReplaySubject<number>(3)), // буфер 3 значения
);
// Нужно вручную connect/refCount
const connected$ = multicasted$.pipe(refCount());

ОператорТип SubjectrefCountБуферТипичное применение
share()SubjectДа0Общие потоки событий
shareReplay(n)ReplaySubject(n)Опц.nHTTP кэш, конфигурация
publish()SubjectНет0Ручное управление запуском
publish().refCount()SubjectДа0≈ share()
publishReplay(n).refCount()ReplaySubject(n)ДаnКэш с авто-очисткой

Попробуйте примеры в интерактивном редакторе: