Перейти к содержимому

6. merge, concat, zip

Яша, ты уже умеешь работать с одним потоком — отлично! 🎉 Но в реальных приложениях данные приходят из разных источников одновременно. Сегодня учимся их объединять!


Представь: ты пишешь дашборд. Данные о пользователях приходят с одного API, уведомления — с другого, статус системы — с третьего. Как всё это объединить?

RxJS даёт нам 4 главных оператора комбинирования. У каждого — своя стратегия! 🎯


merge — это как несколько полос на шоссе. Автомобили из любой полосы едут вместе, и каждый попадает в результирующий поток в том порядке, в котором он прибыл.

Мраморная диаграмма:

Поток A: --A1-------A2-------A3-->
Поток B: ------B1-------B2------->
merge():
Результат: --A1---B1--A2--B2--A3-->
import { merge, interval, timer } from 'rxjs';
import { map, take } from 'rxjs/operators';
// Два потока с разными интервалами
const slow$ = interval(1000).pipe(
take(3),
map(i => `🐢 Медленный: ${i}`)
);
const fast$ = interval(400).pipe(
take(5),
map(i => `🐇 Быстрый: ${i}`)
);
// merge объединяет — значения появляются в порядке их прихода
merge(slow$, fast$).subscribe(value => {
console.log(value);
});
// 🐇 Быстрый: 0 (400ms)
// 🐇 Быстрый: 1 (800ms)
// 🐢 Медленный: 0 (1000ms)
// 🐇 Быстрый: 2 (1200ms)
// ...
// Реальный пример: объединение UI-событий
import { fromEvent } from 'rxjs';
const button1 = document.getElementById('btn1')!;
const button2 = document.getElementById('btn2')!;
// Слушаем клики на обоих кнопках в одном потоке
merge(
fromEvent(button1, 'click').pipe(map(() => 'Кнопка 1')),
fromEvent(button2, 'click').pipe(map(() => 'Кнопка 2')),
).subscribe(source => {
console.log('Нажата:', source);
});

💡 merge — когда важна реактивность: хочешь получать данные как только они появились, из любого источника.


concat — это как очередь в кассу. Пока первый человек не обслужен — второй не подходит. Потоки выполняются строго последовательно!

Мраморная диаграмма:

Поток A: --A1--A2--A3--|
Поток B: --B1--B2--|
concat():
Результат: --A1--A2--A3----B1--B2--|

⚠️ Важно: concat подписывается на следующий поток ТОЛЬКО после завершения предыдущего. Если первый поток бесконечный — второй никогда не начнётся!

import { concat, of, timer } from 'rxjs';
import { delay, map, tap } from 'rxjs/operators';
// Анимация: шаги выполняются строго по порядку
const step1$ = of('Шаг 1: Загружаем данные...').pipe(delay(1000));
const step2$ = of('Шаг 2: Обрабатываем...').pipe(delay(800));
const step3$ = of('Шаг 3: Сохраняем...').pipe(delay(600));
const done$ = of('✅ Готово!');
concat(step1$, step2$, step3$, done$).subscribe(status => {
console.log(status);
});
// (через 1с) "Шаг 1: Загружаем данные..."
// (через 1.8с) "Шаг 2: Обрабатываем..."
// (через 2.4с) "Шаг 3: Сохраняем..."
// (через 2.4с) "✅ Готово!"
// Реальный пример: последовательная загрузка
import { from } from 'rxjs';
const loadUser$ = from(fetch('/api/user').then(r => r.json()));
const loadPosts$ = from(fetch('/api/posts').then(r => r.json()));
// Сначала загружаем пользователя, потом его посты
concat(loadUser$, loadPosts$).subscribe(data => {
console.log('Получено:', data);
});

zip работает как застёжка-молния: берёт по одному элементу из каждого потока и объединяет их в пару (или тройку). Следующая пара — только когда ВСЕ потоки выдали по элементу.

Мраморная диаграмма:

Поток A: --A1-------A2-------A3-->
Поток B: ------B1-------B2------->
zip():
Результат: ------[A1,B1]---[A2,B2]-->
import { zip, of, interval } from 'rxjs';
import { take, map } from 'rxjs/operators';
// zip синхронизирует потоки "по парам"
const names$ = of('Алиса', 'Боб', 'Чарли');
const scores$ = of(95, 87, 92);
zip(names$, scores$).pipe(
map(([name, score]) => `${name}: ${score} баллов`)
).subscribe(result => {
console.log(result);
});
// "Алиса: 95 баллов"
// "Боб: 87 баллов"
// "Чарли: 92 баллов"
// Реальный пример: загрузка нескольких ресурсов и их попарное объединение
const questions$ = from(fetch('/api/questions').then(r => r.json() as Promise<string[]>));
const answers$ = from(fetch('/api/answers').then(r => r.json() as Promise<string[]>));
zip(questions$, answers$).subscribe(([questions, answers]) => {
// оба запроса завершены, данные синхронизированы
const qa = questions.map((q, i) => ({ q, a: answers[i] }));
console.log(qa);
});

💡 zip редко используется для потоков с разными скоростями — медленный тормозит всех. Отлично подходит для синхронизации завершённых потоков.


race — это соревнование. Побеждает тот поток, который первым выдаст значение. Все остальные игнорируются!

Мраморная диаграмма:

Поток A: ----A1--A2--A3-->
Поток B: --B1--B2------->
race():
Результат: --B1--B2-------> (A проиграл)
import { race, timer } from 'rxjs';
import { map, mapTo } from 'rxjs/operators';
// Таймаут для запроса
import { from } from 'rxjs';
const apiRequest$ = from(fetch('/api/data').then(r => r.json()));
const timeout$ = timer(5000).pipe(
map(() => { throw new Error('Запрос превысил таймаут!'); })
);
// Кто быстрее — API или таймаут?
race(apiRequest$, timeout$).subscribe({
next: data => console.log('Данные:', data),
error: err => console.error(err.message),
});
// Другой пример: резервный сервер
const primaryServer$ = from(fetch('https://api.primary.com/data').then(r => r.json()));
const backupServer$ = from(fetch('https://api.backup.com/data').then(r => r.json()));
race(primaryServer$, backupServer$).subscribe(data => {
console.log('Ответил быстрейший сервер:', data);
});

ОператорАналогияКогда использовать
mergeНесколько полос шоссеОбъединить UI-события, параллельные данные
concatОчередь в кассуПоследовательные шаги, зависимые запросы
zipЗастёжка-молнияПопарное объединение синхронных данных
raceГонкиТаймауты, резервные серверы

Попробуйте примеры в интерактивном редакторе: