12. Higher-Order Observables
13. Higher-Order Observables: потоки потоков 🌊
Заголовок раздела «13. Higher-Order Observables: потоки потоков 🌊»Привет! Яша снова здесь. Сегодня — одна из самых мощных концепций RxJS: Higher-Order Observables. Когда ты понимаешь их, реактивное программирование превращается в суперсилу.
Что такое Higher-Order Observable?
Заголовок раздела «Что такое Higher-Order Observable?»Обычный Observable испускает значения (числа, строки, объекты). Higher-Order Observable испускает другие Observable-ы.
import { of, fromEvent, interval } from 'rxjs';import { map } from 'rxjs/operators';
// Обычный Observableconst numbers$ = of(1, 2, 3); // испускает: 1, 2, 3
// Higher-Order Observable — испускает Observable-ы!const observables$ = of(1, 2, 3).pipe( map(n => interval(n * 1000)) // каждое число → новый interval Observable);// испускает: Observable, Observable, Observable
// Нужен оператор "выравнивания" (flattening), чтобы получить реальные значенияЧтобы работать с Higher-Order Observables, используют операторы выравнивания (flattening operators). Их четыре: mergeMap, switchMap, concatMap, exhaustMap.
Таблица сравнения стратегий
Заголовок раздела «Таблица сравнения стратегий»| Оператор | Параллельность | Отменяет предыдущий? | Игнорирует новые? | Типичный сценарий |
|---|---|---|---|---|
mergeMap | ✅ Все одновременно | ❌ Нет | ❌ Нет | Параллельные запросы |
switchMap | ❌ Только последний | ✅ Да | ❌ Нет | Поиск при вводе |
concatMap | ❌ Последовательно | ❌ Нет | ❌ Нет (ставит в очередь) | Анимации, заказы |
exhaustMap | ❌ Только первый | ❌ Нет | ✅ Да | Форма отправки |
mergeMap — все параллельно
Заголовок раздела «mergeMap — все параллельно»mergeMap (он же flatMap) подписывается на каждый внутренний Observable одновременно. Результаты приходят вперемешку по мере готовности.
import { of, timer } from 'rxjs';import { mergeMap, map } from 'rxjs/operators';
of('A', 'B', 'C').pipe( mergeMap(letter => // Каждый запрос — разное время ответа timer(Math.random() * 2000).pipe(map(() => `Ответ ${letter}`)) )).subscribe(console.log);// Ответ B (через 300мс)// Ответ C (через 700мс)// Ответ A (через 1500мс)
// ⚠️ Порядок НЕ гарантирован — кто быстрее, тот первыйОграничение параллельности:
// Не более 2 параллельных запросовof('A', 'B', 'C', 'D', 'E').pipe( mergeMap(letter => fetchData(letter), 2) // второй аргумент — concurrent).subscribe(console.log);switchMap — только последний
Заголовок раздела «switchMap — только последний»switchMap отменяет предыдущий внутренний Observable при появлении нового. Идеален для поиска — зачем нам старые результаты?
import { fromEvent } from 'rxjs';import { switchMap, debounceTime, map } from 'rxjs/operators';
const searchInput = document.getElementById('search') as HTMLInputElement;
fromEvent(searchInput, 'input').pipe( map(e => (e.target as HTMLInputElement).value), debounceTime(300), switchMap(query => from(fetch(`/api/search?q=${query}`))) // старый запрос отменяется).subscribe(results => renderResults(results));// Пользователь напечатал "rea" → запрос улетел// Сразу напечатал "react" → старый запрос ОТМЕНЯЕТСЯ, новый запрос улетает// Мраморная диаграмма switchMap:// источник: --1-----2--3------4-|// switchMap → timer(1000):// подписка 1: ----1| (отменяется при 2)// подписка 2: ------2| (отменяется при 3)// подписка 3: -------3---| (выживает, т.к. 4 приходит позже)// результат: -----------3---4---|concatMap — очередь
Заголовок раздела «concatMap — очередь»concatMap ставит внутренние Observables в очередь — следующий запускается только после завершения предыдущего. Порядок гарантирован.
import { of, delay } from 'rxjs';import { concatMap, map } from 'rxjs/operators';
// Анимации должны идти строго последовательноconst steps$ = of('шаг 1', 'шаг 2', 'шаг 3');
steps$.pipe( concatMap(step => of(`Выполняю ${step}`).pipe(delay(1000)) // каждый шаг — 1 секунда )).subscribe(console.log);// (0с) ... (1с) Выполняю шаг 1 ... (2с) Выполняю шаг 2 ... (3с) Выполняю шаг 3exhaustMap — игнорируем пока занят
Заголовок раздела «exhaustMap — игнорируем пока занят»exhaustMap игнорирует новые значения пока текущий внутренний Observable ещё не завершился. Идеален для кнопки «Сохранить» — не отправлять повторный запрос пока предыдущий в процессе.
import { fromEvent, from } from 'rxjs';import { exhaustMap } from 'rxjs/operators';
const saveButton = document.getElementById('save')!;
fromEvent(saveButton, 'click').pipe( exhaustMap(() => from(saveToServer())) // повторные клики ИГНОРИРУЮТСЯ во время сохранения).subscribe(() => console.log('Сохранено!'));// Клик → запрос пошёл// Клик, клик, клик → игнорируются (запрос ещё в процессе)// Запрос завершился → Сохранено!// Клик → следующий запрос пошёлДерево решений: что выбрать?
Заголовок раздела «Дерево решений: что выбрать?»Нужно обработать Higher-Order Observable?│├─ Важен ПОРЯДОК результатов?│ ├─ Да → concatMap (очередь, последовательно)│ └─ Нет ↓│├─ Нужно ОТМЕНЯТЬ предыдущее при новом значении?│ ├─ Да → switchMap (поиск, навигация)│ └─ Нет ↓│├─ Нужно ИГНОРИРОВАТЬ новые пока занят?│ ├─ Да → exhaustMap (кнопки, формы)│ └─ Нет ↓│└─ Нужны ВСЕ результаты параллельно? └─ Да → mergeMap (параллельные запросы)Продвинутые паттерны
Заголовок раздела «Продвинутые паттерны»import { interval, Subject } from 'rxjs';import { mergeMap, take, groupBy, toArray } from 'rxjs/operators';
// groupBy — группировка Higher-Order Observableconst clicks$ = new Subject<{ userId: string; action: string }>();
clicks$.pipe( groupBy(click => click.userId), // создаёт Observable для каждого userId mergeMap(group$ => // обрабатываем каждую группу group$.pipe( toArray() // собираем все клики пользователя ) )).subscribe(userClicks => console.log('Клики пользователя:', userClicks));Практика
Заголовок раздела «Практика»Попробуйте примеры в интерактивном редакторе: