Перейти к содержимому

15. RxJS в Angular

RxJS — это библиотека реактивного программирования, которая встроена в Angular “из коробки”. Представь, что обычный JavaScript — это один кран с водой (синхронный код), а RxJS — это целая система водопровода: трубы, клапаны, фильтры, смесители 🪣. Данные текут как поток воды, и ты можешь управлять этим потоком.


Angular использует RxJS везде:

  • HttpClient — каждый HTTP-запрос возвращает Observable
  • Router — события навигации (router.events) — это Observable
  • Reactive FormsvalueChanges, statusChanges — это Observable
  • EventEmitter — на самом деле расширяет Subject
// HttpClient возвращает Observable, не Promise
this.http.get<User[]>('/api/users').subscribe(users => {
this.users = users;
});
// Router events
this.router.events.pipe(
filter(e => e instanceof NavigationEnd)
).subscribe(e => console.log('Навигация завершена:', e));
// Reactive Forms
this.searchForm.get('query')!.valueChanges.pipe(
debounceTime(300)
).subscribe(value => this.search(value));

Promise — это как курьер: один раз привёз посылку, и всё. Observable — это как подписка на газету: получаешь новые выпуски, пока не отпишешься 📰.

ХарактеристикаPromiseObservable
ЗначенийОдноМножество (поток)
ЛенивостьСразу выполняетсяТолько при подписке
ОтменаНельзяunsubscribe()
Операторы.then(), .catch()Десятки операторов RxJS
// Promise — один результат
fetch('/api/users')
.then(res => res.json())
.then(users => console.log(users));
// Observable — поток значений, можно отменить
const sub = interval(1000).subscribe(n => console.log(n));
setTimeout(() => sub.unsubscribe(), 5000); // отменяем через 5 сек
// Observable ленивый — без subscribe ничего не происходит!
const obs$ = new Observable(observer => {
console.log('Этот код выполнится только при подписке');
observer.next(42);
observer.complete();
});
// obs$; // <- ничего не произошло
obs$.subscribe(v => console.log(v)); // <- теперь выполняется

async pipe — это самый правильный способ работать с Observable в шаблонах Angular. Он автоматически подписывается И отписывается при уничтожении компонента. Забудь про утечки памяти! 🎉

component.ts
@Component({
template: `
<!-- Простое использование -->
<div>Счётчик: {{ counter$ | async }}</div>
<!-- С проверкой на null -->
<div *ngIf="user$ | async as user">
Привет, {{ user.name }}!
</div>
<!-- Несколько подписок с одним запросом (shareReplay!) -->
<div *ngIf="data$ | async as data">
<h2>{{ data.title }}</h2>
<p>{{ data.description }}</p>
</div>
`
})
export class MyComponent {
counter$ = interval(1000);
user$ = this.http.get<User>('/api/user');
// Без shareReplay — два отдельных HTTP запроса!
data$ = this.http.get<Data>('/api/data').pipe(
shareReplay(1) // кешируем последнее значение
);
}

Паттерн as позволяет не дублировать подписку:

<!-- ❌ Плохо — два запроса к API! -->
<div>{{ data$ | async | json }}</div>
<div>{{ (data$ | async)?.title }}</div>
<!-- ✅ Хорошо — один запрос, результат в переменной -->
<ng-container *ngIf="data$ | async as data">
<div>{{ data | json }}</div>
<div>{{ data.title }}</div>
</ng-container>

До Angular 16 приходилось вручную управлять отпиской через Subject и takeUntil. Теперь есть элегантное решение:

import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
@Component({...})
export class MyComponent {
// DestroyRef внедряется автоматически из контекста компонента
constructor() {
interval(1000).pipe(
takeUntilDestroyed() // автоматически отписывается при destroy
).subscribe(n => console.log(n));
}
// Или явно через inject
private destroyRef = inject(DestroyRef);
ngOnInit() {
this.someService.data$.pipe(
takeUntilDestroyed(this.destroyRef)
).subscribe(data => this.data = data);
}
}
// Старый способ (Angular < 16)
export class OldComponent implements OnInit, OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
interval(1000).pipe(
takeUntil(this.destroy$)
).subscribe(n => console.log(n));
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}

Subject — это одновременно Observable (можно подписаться) и Observer (можно отправлять значения). Как радиостанция: вещает всем подписчикам одновременно 📻.

// 1️⃣ Subject — базовый multicast
const subject = new Subject<number>();
subject.subscribe(v => console.log('A:', v));
subject.subscribe(v => console.log('B:', v));
subject.next(1); // A: 1, B: 1
subject.next(2); // A: 2, B: 2
// ⚠️ Новый подписчик НЕ получает прошлые значения
// 2️⃣ BehaviorSubject — хранит текущее значение (идеален для стора!)
const bs = new BehaviorSubject<number>(0); // начальное значение
bs.subscribe(v => console.log('C:', v)); // C: 0 (сразу!)
bs.next(1); // C: 1
bs.next(2); // C: 2
const newSub = bs.subscribe(v => console.log('D:', v)); // D: 2 (получает текущее!)
console.log(bs.getValue()); // 2 — синхронное получение
// 3️⃣ ReplaySubject — воспроизводит N последних значений
const rs = new ReplaySubject<number>(3); // буфер из 3 значений
rs.next(1);
rs.next(2);
rs.next(3);
rs.next(4);
rs.subscribe(v => console.log('E:', v)); // E: 2, E: 3, E: 4 (последние 3)
// 4️⃣ AsyncSubject — только последнее значение при complete()
const as = new AsyncSubject<number>();
as.subscribe(v => console.log('F:', v));
as.next(1); // ничего
as.next(2); // ничего
as.next(3); // ничего
as.complete(); // F: 3 (только последнее!)

import { map, filter, tap, switchMap, mergeMap, concatMap,
exhaustMap, debounceTime, distinctUntilChanged,
combineLatest, forkJoin, startWith, catchError } from 'rxjs/operators';
// map — трансформация данных
users$.pipe(
map(users => users.filter(u => u.active))
)
// tap — side effects без изменения потока (логирование)
http.get('/api/data').pipe(
tap(data => console.log('Получили:', data)),
map(data => data.items)
)
// switchMap — отменяет предыдущий запрос (для поиска!)
searchInput$.pipe(
debounceTime(300),
switchMap(query => http.get('/api/search?q=' + query))
)
// mergeMap — параллельные запросы (все выполняются)
ids$.pipe(mergeMap(id => http.get('/api/users/' + id)))
// concatMap — последовательные запросы (по очереди)
ids$.pipe(concatMap(id => http.get('/api/users/' + id)))
// exhaustMap — игнорирует новые пока текущий не завершён (для submit)
submitBtn$.pipe(exhaustMap(() => http.post('/api/submit', data)))
// combineLatest — объединяет несколько потоков (каждый раз при обновлении любого)
combineLatest([users$, roles$]).pipe(
map(([users, roles]) => users.map(u => ({ ...u, role: roles[u.id] })))
)
// forkJoin — параллельные запросы, ждёт завершения ВСЕХ
forkJoin({
users: http.get('/api/users'),
products: http.get('/api/products'),
config: http.get('/api/config'),
}).subscribe(({ users, products, config }) => {
// Все три ответа получены
})
// startWith — начальное значение (для loading state)
data$ = http.get('/api/data').pipe(
startWith(null) // сначала null, потом данные
)

user.service.ts
@Injectable({ providedIn: 'root' })
export class UserStore {
private state = new BehaviorSubject<UserState>({
users: [],
loading: false,
error: null
});
// Публичные Observable для чтения (readonly!)
users$ = this.state.pipe(map(s => s.users));
loading$ = this.state.pipe(map(s => s.loading));
error$ = this.state.pipe(map(s => s.error));
private setState(patch: Partial<UserState>) {
this.state.next({ ...this.state.getValue(), ...patch });
}
loadUsers() {
this.setState({ loading: true, error: null });
return this.http.get<User[]>('/api/users').pipe(
tap(users => this.setState({ users, loading: false })),
catchError(err => {
this.setState({ loading: false, error: err.message });
return EMPTY;
})
);
}
}
// В компоненте — только читаем через async pipe
@Component({
template: `
<div *ngIf="store.loading$ | async">Загрузка...</div>
<div *ngIf="store.error$ | async as err" class="error">{{ err }}</div>
<ul>
<li *ngFor="let user of store.users$ | async">{{ user.name }}</li>
</ul>
`
})
export class UsersComponent {
constructor(public store: UserStore) {
store.loadUsers().subscribe();
}
}

@Component({
template: `
<input [formControl]="searchCtrl" placeholder="Поиск...">
<div *ngIf="loading$ | async">🔄 Ищем...</div>
<ul>
<li *ngFor="let result of results$ | async">{{ result.name }}</li>
</ul>
`
})
export class SearchComponent {
searchCtrl = new FormControl('');
loading$ = new BehaviorSubject(false);
results$ = this.searchCtrl.valueChanges.pipe(
debounceTime(300), // ждём 300ms после последнего символа
distinctUntilChanged(), // игнорируем одинаковые запросы
filter(q => q!.length >= 2), // минимум 2 символа
tap(() => this.loading$.next(true)),
switchMap(query => // отменяем предыдущий запрос!
this.http.get<Result[]>('/api/search', { params: { q: query! } }).pipe(
catchError(() => of([])) // при ошибке возвращаем пустой массив
)
),
tap(() => this.loading$.next(false)),
startWith([]) // начальное значение
);
}

// Паттерн: data / loading / error через один Observable
interface LoadingState<T> {
data: T | null;
loading: boolean;
error: string | null;
}
data$ = this.http.get<Product[]>('/api/products').pipe(
map(data => ({ data, loading: false, error: null })),
startWith({ data: null, loading: true, error: null }),
catchError(err => of({ data: null, loading: false, error: err.message }))
);
// В шаблоне
// <ng-container *ngIf="data$ | async as state">
// <div *ngIf="state.loading">Загрузка...</div>
// <div *ngIf="state.error" class="error">{{ state.error }}</div>
// <ul *ngIf="state.data">
// <li *ngFor="let item of state.data">{{ item.name }}</li>
// </ul>
// </ng-container>

Попробуйте концепцию в интерактивном редакторе: