Перейти к содержимому

12. Higher-Order Observables

Привет! Яша снова здесь. Сегодня — одна из самых мощных концепций RxJS: Higher-Order Observables. Когда ты понимаешь их, реактивное программирование превращается в суперсилу.

Обычный Observable испускает значения (числа, строки, объекты). Higher-Order Observable испускает другие Observable-ы.

import { of, fromEvent, interval } from 'rxjs';
import { map } from 'rxjs/operators';
// Обычный Observable
const numbers$ = of(1, 2, 3); // испускает: 1, 2, 3
// Higher-Order Observable — испускает Observable-ы!
const observables$ = of(1, 2, 3).pipe(
map(n => interval(n * 1000)) // каждое число → новый interval Observable
);
// испускает: Observable, Observable, Observable
// Нужен оператор "выравнивания" (flattening), чтобы получить реальные значения

Чтобы работать с Higher-Order Observables, используют операторы выравнивания (flattening operators). Их четыре: mergeMap, switchMap, concatMap, exhaustMap.


ОператорПараллельностьОтменяет предыдущий?Игнорирует новые?Типичный сценарий
mergeMap✅ Все одновременно❌ Нет❌ НетПараллельные запросы
switchMap❌ Только последний✅ Да❌ НетПоиск при вводе
concatMap❌ Последовательно❌ Нет❌ Нет (ставит в очередь)Анимации, заказы
exhaustMap❌ Только первый❌ Нет✅ ДаФорма отправки

mergeMap (он же flatMap) подписывается на каждый внутренний Observable одновременно. Результаты приходят вперемешку по мере готовности.

import { of, timer } from 'rxjs';
import { mergeMap, map } from 'rxjs/operators';
of('A', 'B', 'C').pipe(
mergeMap(letter =>
// Каждый запрос — разное время ответа
timer(Math.random() * 2000).pipe(map(() => `Ответ ${letter}`))
)
).subscribe(console.log);
// Ответ B (через 300мс)
// Ответ C (через 700мс)
// Ответ A (через 1500мс)
// ⚠️ Порядок НЕ гарантирован — кто быстрее, тот первый

Ограничение параллельности:

// Не более 2 параллельных запросов
of('A', 'B', 'C', 'D', 'E').pipe(
mergeMap(letter => fetchData(letter), 2) // второй аргумент — concurrent
).subscribe(console.log);

switchMap отменяет предыдущий внутренний Observable при появлении нового. Идеален для поиска — зачем нам старые результаты?

import { fromEvent } from 'rxjs';
import { switchMap, debounceTime, map } from 'rxjs/operators';
const searchInput = document.getElementById('search') as HTMLInputElement;
fromEvent(searchInput, 'input').pipe(
map(e => (e.target as HTMLInputElement).value),
debounceTime(300),
switchMap(query => from(fetch(`/api/search?q=${query}`))) // старый запрос отменяется
).subscribe(results => renderResults(results));
// Пользователь напечатал "rea" → запрос улетел
// Сразу напечатал "react" → старый запрос ОТМЕНЯЕТСЯ, новый запрос улетает
// Мраморная диаграмма switchMap:
// источник: --1-----2--3------4-|
// switchMap → timer(1000):
// подписка 1: ----1| (отменяется при 2)
// подписка 2: ------2| (отменяется при 3)
// подписка 3: -------3---| (выживает, т.к. 4 приходит позже)
// результат: -----------3---4---|

concatMap ставит внутренние Observables в очередь — следующий запускается только после завершения предыдущего. Порядок гарантирован.

import { of, delay } from 'rxjs';
import { concatMap, map } from 'rxjs/operators';
// Анимации должны идти строго последовательно
const steps$ = of('шаг 1', 'шаг 2', 'шаг 3');
steps$.pipe(
concatMap(step =>
of(`Выполняю ${step}`).pipe(delay(1000)) // каждый шаг — 1 секунда
)
).subscribe(console.log);
// (0с) ... (1с) Выполняю шаг 1 ... (2с) Выполняю шаг 2 ... (3с) Выполняю шаг 3

exhaustMap игнорирует новые значения пока текущий внутренний Observable ещё не завершился. Идеален для кнопки «Сохранить» — не отправлять повторный запрос пока предыдущий в процессе.

import { fromEvent, from } from 'rxjs';
import { exhaustMap } from 'rxjs/operators';
const saveButton = document.getElementById('save')!;
fromEvent(saveButton, 'click').pipe(
exhaustMap(() => from(saveToServer())) // повторные клики ИГНОРИРУЮТСЯ во время сохранения
).subscribe(() => console.log('Сохранено!'));
// Клик → запрос пошёл
// Клик, клик, клик → игнорируются (запрос ещё в процессе)
// Запрос завершился → Сохранено!
// Клик → следующий запрос пошёл

Нужно обработать Higher-Order Observable?
├─ Важен ПОРЯДОК результатов?
│ ├─ Да → concatMap (очередь, последовательно)
│ └─ Нет ↓
├─ Нужно ОТМЕНЯТЬ предыдущее при новом значении?
│ ├─ Да → switchMap (поиск, навигация)
│ └─ Нет ↓
├─ Нужно ИГНОРИРОВАТЬ новые пока занят?
│ ├─ Да → exhaustMap (кнопки, формы)
│ └─ Нет ↓
└─ Нужны ВСЕ результаты параллельно?
└─ Да → mergeMap (параллельные запросы)

import { interval, Subject } from 'rxjs';
import { mergeMap, take, groupBy, toArray } from 'rxjs/operators';
// groupBy — группировка Higher-Order Observable
const clicks$ = new Subject<{ userId: string; action: string }>();
clicks$.pipe(
groupBy(click => click.userId), // создаёт Observable для каждого userId
mergeMap(group$ => // обрабатываем каждую группу
group$.pipe(
toArray() // собираем все клики пользователя
)
)
).subscribe(userClicks => console.log('Клики пользователя:', userClicks));

Попробуйте примеры в интерактивном редакторе: