Перейти к содержимому

4. Операторы: введение

Привет, Яша! ⚡ Мы уже знаем что Observable — это поток данных. Но что делает RxJS по-настоящему мощным — это операторы. Их больше 100! 🤯 Сегодня разберём как они работают и какие бывают.


Оператор — это функция, которая принимает Observable и возвращает новый Observable с изменёнными данными.

Аналогия: конвейер на заводе 🏭

  • Observable = лента конвейера с деталями
  • Оператор = станок, который что-то делает с каждой деталью
  • Результат = новая лента с обработанными деталями
import { of } from 'rxjs';
import { map, filter, take } from 'rxjs/operators';
// Без операторов — просто поток чисел
of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.subscribe(n => console.log(n)); // 1, 2, 3, 4, ...
// С операторами — трансформированный поток
of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).pipe(
filter(n => n % 2 === 0), // только чётные: 2, 4, 6, 8, 10
map(n => n * n), // возводим в квадрат: 4, 16, 36, 64, 100
take(3) // берём первые 3: 4, 16, 36
).subscribe(n => console.log(n));
// 4, 16, 36

observable$.pipe(
operator1(),
operator2(),
operator3()
)

Каждый оператор получает Observable от предыдущего и возвращает новый.

import { interval } from 'rxjs';
import { map, filter, take, tap } from 'rxjs/operators';
interval(100).pipe(
take(20), // 0..19
filter(n => n % 2 === 0), // 0, 2, 4, 6, 8, 10, 12, 14, 16, 18
map(n => `Чётное: ${n}`), // 'Чётное: 0', 'Чётное: 2', ...
tap(s => console.log('Debug:', s)) // логируем без изменений
).subscribe(s => console.log('Результат:', s));

💡 tap() — “шпион” оператор. Не изменяет данные, только позволяет заглянуть в поток (для отладки).


Операторы не изменяют исходный Observable — они создают новый:

import { of } from 'rxjs';
import { map } from 'rxjs/operators';
const source$ = of(1, 2, 3);
// map создаёт НОВЫЙ Observable
const doubled$ = source$.pipe(map(n => n * 2));
const tripled$ = source$.pipe(map(n => n * 3));
source$.subscribe(n => console.log('Исходный:', n)); // 1, 2, 3
doubled$.subscribe(n => console.log('Удвоенный:', n)); // 2, 4, 6
tripled$.subscribe(n => console.log('Утроенный:', n)); // 3, 6, 9
// source$ не изменился! Иммутабельность 💪

Marble diagrams показывают как оператор изменяет поток:

Время: ──────────────────────────→
source: ──1──2──3──4──5──|
map(x => x*2):
output: ──2──4──6──8──10─|
filter(x => x % 2 === 0):
output: ─────2─────4─────|
take(3):
output: ──1──2──3|

Создают Observable из разных источников:

import { of, from, interval, timer, fromEvent, range } from 'rxjs';
of(1, 2, 3) // из значений
from([1, 2, 3]) // из массива/промиса
interval(1000) // таймер
timer(2000, 500) // задержанный таймер
fromEvent(btn, 'click') // из DOM событий
range(1, 10) // диапазон чисел (1, 2, 3, ..., 10)

Изменяют значения в потоке:

import { of } from 'rxjs';
import { map, switchMap, mergeMap, concatMap, scan, reduce } from 'rxjs/operators';
// map — трансформирует каждое значение
of(1, 2, 3).pipe(
map(n => n * 10)
); // 10, 20, 30
// scan — как reduce, но эмитит на каждом шаге (аккумулятор)
of(1, 2, 3, 4, 5).pipe(
scan((acc, curr) => acc + curr, 0)
); // 1, 3, 6, 10, 15 ← текущая сумма на каждом шаге
// reduce — только финальное значение
of(1, 2, 3, 4, 5).pipe(
reduce((acc, curr) => acc + curr, 0)
); // 15

Пропускают или блокируют значения:

import { filter, take, takeUntil, skip, debounceTime,
throttleTime, distinctUntilChanged, first, last } from 'rxjs/operators';
// filter — пропускает только подходящие
of(1,2,3,4,5).pipe(filter(n => n > 3)); // 4, 5
// take(n) — первые N значений
interval(100).pipe(take(5)); // 0, 1, 2, 3, 4
// skip(n) — пропустить первые N
of(1,2,3,4,5).pipe(skip(2)); // 3, 4, 5
// debounceTime — ждёт паузу (для поиска!)
fromEvent(input, 'input').pipe(debounceTime(300));
// distinctUntilChanged — пропускает повторяющиеся
of(1,1,2,2,3,1).pipe(distinctUntilChanged()); // 1, 2, 3, 1

Объединяют несколько Observable:

import { merge, combineLatest, forkJoin, zip, concat } from 'rxjs';
// merge — объединяет потоки (все эмиты идут вместе)
merge(
of('A1', 'A2'),
of('B1', 'B2')
); // A1, A2, B1, B2 (или вперемешку)
// combineLatest — эмитит когда ЛЮБОЙ источник обновляется
combineLatest([
userStore$, // последний пользователь
settingsStore$ // последние настройки
]); // [user, settings] при каждом изменении
// forkJoin — ждёт завершения ВСЕХ (как Promise.all)
forkJoin({
user: fetchUser(id),
posts: fetchPosts(id)
}); // { user, posts } — только когда оба завершатся
import { catchError, retry, throwError } from 'rxjs/operators';
import { EMPTY, of } from 'rxjs';
// catchError — перехватывает ошибки
apiRequest$.pipe(
catchError(err => {
console.log('Ошибка:', err);
return of([]); // возвращаем пустой массив вместо ошибки
})
);
// retry(3) — повторить 3 раза перед ошибкой
apiRequest$.pipe(
retry(3) // 3 повторных попытки
);

import { fromEvent } from 'rxjs';
import { map, debounceTime, distinctUntilChanged,
switchMap, catchError } from 'rxjs/operators';
import { from, of } from 'rxjs';
const searchInput = document.getElementById('search');
fromEvent(searchInput, 'input').pipe(
map(e => e.target.value), // 1. берём значение поля
debounceTime(300), // 2. ждём паузу 300мс
distinctUntilChanged(), // 3. если не изменилось — не запрашиваем
switchMap(query => // 4. отменяем предыдущий запрос
query.length > 2 // если меньше 3 символов — пустой результат
? from(searchApi(query))
: of([])
),
catchError(err => of([])) // 5. при ошибке — пустой массив
).subscribe(results => {
renderResults(results);
});
// Всего 8 строк! Без RxJS это было бы 40+ строк 😅

Попробуйте примеры в интерактивном редакторе: