Перейти к содержимому

14. Создание своих операторов

Привет! Яша снова здесь. Один из признаков настоящего мастерства в RxJS — умение создавать собственные операторы. Это позволяет инкапсулировать повторяющуюся логику, делать цепочки читаемыми и переиспользовать код между проектами.

Любой оператор RxJS — это просто функция, которая принимает Observable и возвращает Observable.

import { Observable } from 'rxjs';
// Простейший кастомный оператор — вручную
function double() {
return function<T extends number>(source: Observable<T>): Observable<number> {
return new Observable<number>(observer => {
const sub = source.subscribe({
next: value => observer.next(value * 2),
error: err => observer.error(err),
complete: () => observer.complete(),
});
return () => sub.unsubscribe(); // обязательно!
});
};
}
// Использование
of(1, 2, 3).pipe(double()).subscribe(console.log); // 2, 4, 6

Лучший подход — использовать существующие операторы внутри кастомного. pipe() делает это элегантно:

import { pipe } from 'rxjs';
import { filter, map } from 'rxjs/operators';
import { MonoTypeOperatorFunction, OperatorFunction } from 'rxjs';
// MonoTypeOperatorFunction<T> — вход и выход одного типа
function filterNil<T>(): MonoTypeOperatorFunction<T | null | undefined> {
return filter((value): value is T => value !== null && value !== undefined);
}
// OperatorFunction<T, R> — вход типа T, выход типа R
function parseJSON<T>(): OperatorFunction<string, T> {
return pipe(
map(str => JSON.parse(str) as T)
);
}
// Использование
of(1, null, 3, undefined, 5).pipe(
filterNil<number>()
).subscribe(console.log); // 1, 3, 5

import { MonoTypeOperatorFunction, OperatorFunction, UnaryFunction } from 'rxjs';
// MonoTypeOperatorFunction<T> — тип входа = тип выхода
// Пример: filter, tap, distinctUntilChanged
function myFilter<T>(pred: (v: T) => boolean): MonoTypeOperatorFunction<T> {
return filter(pred);
}
// OperatorFunction<T, R> — преобразует тип
// Пример: map, switchMap, mergeMap
function toString<T>(): OperatorFunction<T, string> {
return map(v => String(v));
}
// UnaryFunction<T, R> — для pipe() вне контекста Observable
// Пример: чистые трансформации данных
const double: UnaryFunction<number, number> = n => n * 2;

Стандартный distinctUntilChanged сравнивает ссылки. Для объектов нужно глубокое сравнение:

import { pipe } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
import { MonoTypeOperatorFunction } from 'rxjs';
function deepEqual(a: unknown, b: unknown): boolean {
return JSON.stringify(a) === JSON.stringify(b);
}
function distinctUntilChangedDeep<T>(): MonoTypeOperatorFunction<T> {
return distinctUntilChanged((prev, curr) => deepEqual(prev, curr));
}
// Без оператора — каждый рендер новый объект → всегда "новые" данные
user$.pipe(
distinctUntilChangedDeep()
).subscribe(user => heavyRender(user)); // только при реальном изменении

Дебаг-оператор, который логирует всё что проходит через него:

import { pipe } from 'rxjs';
import { tap } from 'rxjs/operators';
import { MonoTypeOperatorFunction } from 'rxjs';
function logOperator<T>(label: string): MonoTypeOperatorFunction<T> {
return pipe(
tap({
subscribe: () => console.log(\`[\${label}] 🔌 подписка\`),
next: v => console.log(\`[${label}] ▶ next:`, v),
error: e => console.error(\`[${label}] ❌ error:`, e),
complete: () => console.log(\`[\${label}] ✅ complete\`),
unsubscribe: () => console.log(\`[\${label}] 🔌 отписка\`),
finalize: () => console.log(\`[\${label}] 🏁 finalize\`),
})
);
}
// Использование — идеально для отладки
source$.pipe(
logOperator('source'),
map(transformData),
logOperator('после map'),
filter(isValid),
logOperator('после filter'),
).subscribe();

import { filter } from 'rxjs/operators';
import { OperatorFunction } from 'rxjs';
// Типизированный guard для null/undefined
function filterNil<T>(): OperatorFunction<T | null | undefined, T> {
return filter((v): v is T => v != null);
}
// Теперь TypeScript знает что после filterNil() значение не может быть null
const names$ = of('Яша', null, 'Петя', undefined, 'Маша').pipe(
filterNil<string>()
);
// Тип: Observable<string> (не Observable<string | null | undefined>)

Кастомный rate-limiter: пропускает не более N значений в указанный промежуток времени.

import { pipe, Subject, timer } from 'rxjs';
import { mergeMap, take, bufferTime, concatMap, from } from 'rxjs/operators';
import { OperatorFunction } from 'rxjs';
function rateLimit<T>(count: number, windowMs: number): OperatorFunction<T, T> {
return pipe(
bufferTime(windowMs), // собираем все значения за промежуток
concatMap(batch => from(batch.slice(0, count))), // берём не более N
);
}
// Не более 3 значений каждые 2 секунды
interval(200).pipe(
rateLimit(3, 2000)
).subscribe(v => console.log('rate-limited:', v));
// 0, 1, 2 (пауза 2с) 3, 4, 5 (пауза 2с) ...

Операторы могут принимать сложные конфигурации и быть параметризованными:

interface RetryConfig {
maxAttempts: number;
delay: number;
onRetry?: (attempt: number, error: unknown) => void;
}
function retryWithDelay<T>(config: RetryConfig): MonoTypeOperatorFunction<T> {
const { maxAttempts, delay: delayMs, onRetry } = config;
return retryWhen(errors$ =>
errors$.pipe(
scan((attempt, error) => {
if (attempt >= maxAttempts) throw error;
onRetry?.(attempt, error);
return attempt + 1;
}, 0),
delayWhen(attempt => timer(attempt * delayMs))
)
);
}
// Элегантное использование
apiRequest$.pipe(
retryWithDelay({
maxAttempts: 3,
delay: 1000,
onRetry: (attempt, err) => analytics.track('retry', { attempt }),
})
).subscribe(handleResponse);

Попробуйте примеры в интерактивном редакторе: