Перейти к содержимому

7. mergeMap и switchMap

🔀 mergeMap и switchMap — укрощение вложенных потоков

Заголовок раздела «🔀 mergeMap и switchMap — укрощение вложенных потоков»

Яша, это один из самых важных уроков! 🚀 Если освоишь mergeMap и switchMap — ты поймёшь 80% реального кода на RxJS. Погнали!


Представь, что у тебя есть поток кликов. Каждый клик должен запускать HTTP-запрос. HTTP-запрос — это тоже Observable. Как объединить поток кликов с потоком ответов?

Наивное решение (и почему оно плохое):

import { fromEvent, from } from 'rxjs';
import { map } from 'rxjs/operators';
const click$ = fromEvent(document, 'click');
// ❌ ПЛОХО: получаем Observable<Observable<Response>>
const broken$ = click$.pipe(
map(() => from(fetch('/api/data')))
// Здесь тип: Observable<Observable<Response>>
// Нам нужно: Observable<Response>
);

Вот где приходят на помощь higher-order операторы! Они “разворачивают” внутренний Observable. 🎁


mergeMap (он же flatMap) подписывается на КАЖДЫЙ внутренний Observable и объединяет все их результаты. Ни один запрос не отменяется!

Мраморная диаграмма:

Клики: --C1----C2----C3-->
Каждый клик порождает запрос (---R-->):
C1: -----R1-->
C2: -----R2-->
C3: -----R3-->
mergeMap:
Результат: ------R1----R2----R3-->
import { fromEvent, from, of } from 'rxjs';
import { mergeMap, map, delay } from 'rxjs/operators';
// Симуляция API-запроса
const fetchUser = (id: number) =>
from(fetch(`/api/users/${id}`).then(r => r.json()));
// Поток ID пользователей
import { Subject } from 'rxjs';
const userId$ = new Subject<number>();
userId$.pipe(
mergeMap(id => fetchUser(id))
).subscribe(user => {
console.log('Получен пользователь:', user);
});
// Все три запроса летят параллельно!
userId$.next(1);
userId$.next(2);
userId$.next(3);

Идеальный случай для mergeMap — загрузка файлов:

import { from, Subject } from 'rxjs';
import { mergeMap, map, catchError } from 'rxjs/operators';
import { of } from 'rxjs';
interface FileUpload {
file: File;
uploadId: string;
}
const uploadQueue$ = new Subject<FileUpload>();
const uploadFile = (upload: FileUpload) =>
// Симулируем загрузку файла
new Observable<{ uploadId: string; url: string }>(observer => {
const formData = new FormData();
formData.append('file', upload.file);
fetch('/api/upload', { method: 'POST', body: formData })
.then(r => r.json())
.then(data => {
observer.next({ uploadId: upload.uploadId, url: data.url });
observer.complete();
})
.catch(err => observer.error(err));
});
// Все файлы загружаются ПАРАЛЛЕЛЬНО — mergeMap не ждёт!
uploadQueue$.pipe(
mergeMap(upload =>
uploadFile(upload).pipe(
catchError(err => of({ uploadId: upload.uploadId, url: null, error: err.message }))
)
)
).subscribe(result => {
console.log(`Файл ${result.uploadId}:`, result.url ?? result.error);
});
// Добавляем файлы в очередь — все начнут загружаться сразу
uploadQueue$.next({ file: bigFile1, uploadId: 'file-1' });
uploadQueue$.next({ file: bigFile2, uploadId: 'file-2' });
uploadQueue$.next({ file: bigFile3, uploadId: 'file-3' });

💡 Когда использовать mergeMap: когда каждая операция независима и её нельзя отменять. Загрузка файлов, отправка аналитики, независимые API-запросы.


switchMap — это как переключатель телевизора. Когда ты переключаешь канал — старый канал ОТМЕНЯЕТСЯ. Только последний запрос имеет значение!

Мраморная диаграмма:

Поиск: --"r"------"re"----"rea"-->
Каждый запрос:
"r": ----[r запрос]--⚡ ОТМЕНЁН
"re": ----[re запрос]--⚡ ОТМЕНЁН
"rea": ----[rea запрос]--✅ Результат-->
switchMap:
Результат: ----------------------------[результат для "rea"]-->
import { fromEvent } from 'rxjs';
import { switchMap, map, debounceTime, distinctUntilChanged } from 'rxjs/operators';
import { from } from 'rxjs';
// 🔍 Автодополнение — классический пример switchMap
const searchInput = document.getElementById('search') as HTMLInputElement;
fromEvent(searchInput, 'input').pipe(
map(e => (e.target as HTMLInputElement).value),
debounceTime(300), // ждём паузы в печатании
distinctUntilChanged(), // игнорируем одинаковые значения
switchMap(query => // switchMap ОТМЕНЯЕТ предыдущий запрос!
from(
fetch(`/api/search?q=${query}`).then(r => r.json())
)
)
).subscribe(results => {
renderResults(results);
});

Почему это важно? Гонка запросов:

// ❌ ПРОБЛЕМА без switchMap (с mergeMap):
// Запрос "r" летит 500ms
// Запрос "react" летит 100ms
// Ответ "react" приходит первым → рендерим результаты для "react"
// Потом приходит ответ "r" → рендерим УСТАРЕВШИЕ результаты!
// Это называется "race condition" (гонка запросов)
// ✅ РЕШЕНИЕ с switchMap:
// Запрос "r" летит 500ms
// Пользователь продолжает печатать → запрос "r" ОТМЕНЯЕТСЯ
// Запрос "react" летит 100ms → приходит ответ → рендерим
// Никакой гонки! 🎉

import { Subject } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';
import { from } from 'rxjs';
interface Route {
path: string;
params: Record<string, string>;
}
const navigation$ = new Subject<Route>();
navigation$.pipe(
tap(route => console.log('Навигация к:', route.path)),
switchMap(route =>
// При навигации на новую страницу — загрузка данных старой ОТМЕНЯЕТСЯ
from(loadPageData(route))
)
).subscribe(pageData => {
renderPage(pageData);
});
function loadPageData(route: Route) {
return fetch(`/api/pages${route.path}`, {
signal: AbortController.prototype, // добавляем abort signal для реальной отмены
}).then(r => r.json());
}

// 👍 mergeMap — независимые параллельные операции
const like$ = new Subject<number>(); // ID поста
like$.pipe(
mergeMap(postId =>
from(fetch(`/api/posts/${postId}/like`, { method: 'POST' }))
// Каждый лайк независим — не хотим отменять предыдущие!
)
).subscribe();
// 👍 switchMap — только последнее значение важно
const searchQuery$ = new Subject<string>();
searchQuery$.pipe(
switchMap(query =>
from(fetch(`/api/search?q=${query}`).then(r => r.json()))
// Предыдущие поиски устарели — отменяем!
)
).subscribe(results => renderResults(results));
mergeMapswitchMap
СтратегияВсе параллельноТолько последний
Отменяет ли?❌ Нет✅ Да
Используй дляЗагрузка файлов, лайкиПоиск, навигация

Попробуйте примеры в интерактивном редакторе: