15. RxJS в Angular
🌊 RxJS в Angular
Заголовок раздела «🌊 RxJS в Angular»RxJS — это библиотека реактивного программирования, которая встроена в Angular “из коробки”. Представь, что обычный JavaScript — это один кран с водой (синхронный код), а RxJS — это целая система водопровода: трубы, клапаны, фильтры, смесители 🪣. Данные текут как поток воды, и ты можешь управлять этим потоком.
🤔 Зачем Angular использует RxJS?
Заголовок раздела «🤔 Зачем Angular использует RxJS?»Angular использует RxJS везде:
- HttpClient — каждый HTTP-запрос возвращает
Observable - Router — события навигации (
router.events) — этоObservable - Reactive Forms —
valueChanges,statusChanges— этоObservable - EventEmitter — на самом деле расширяет
Subject
// HttpClient возвращает Observable, не Promisethis.http.get<User[]>('/api/users').subscribe(users => { this.users = users;});
// Router eventsthis.router.events.pipe( filter(e => e instanceof NavigationEnd)).subscribe(e => console.log('Навигация завершена:', e));
// Reactive Formsthis.searchForm.get('query')!.valueChanges.pipe( debounceTime(300)).subscribe(value => this.search(value));⚡ Observable vs Promise: главные отличия
Заголовок раздела «⚡ Observable vs Promise: главные отличия»Promise — это как курьер: один раз привёз посылку, и всё. Observable — это как подписка на газету: получаешь новые выпуски, пока не отпишешься 📰.
| Характеристика | Promise | Observable |
|---|---|---|
| Значений | Одно | Множество (поток) |
| Ленивость | Сразу выполняется | Только при подписке |
| Отмена | Нельзя | unsubscribe() |
| Операторы | .then(), .catch() | Десятки операторов RxJS |
// Promise — один результатfetch('/api/users') .then(res => res.json()) .then(users => console.log(users));
// Observable — поток значений, можно отменитьconst sub = interval(1000).subscribe(n => console.log(n));setTimeout(() => sub.unsubscribe(), 5000); // отменяем через 5 сек
// Observable ленивый — без subscribe ничего не происходит!const obs$ = new Observable(observer => { console.log('Этот код выполнится только при подписке'); observer.next(42); observer.complete();});// obs$; // <- ничего не произошлоobs$.subscribe(v => console.log(v)); // <- теперь выполняется🪄 async pipe: магия шаблонов
Заголовок раздела «🪄 async pipe: магия шаблонов»async pipe — это самый правильный способ работать с Observable в шаблонах Angular. Он автоматически подписывается И отписывается при уничтожении компонента. Забудь про утечки памяти! 🎉
@Component({ template: ` <!-- Простое использование --> <div>Счётчик: {{ counter$ | async }}</div>
<!-- С проверкой на null --> <div *ngIf="user$ | async as user"> Привет, {{ user.name }}! </div>
<!-- Несколько подписок с одним запросом (shareReplay!) --> <div *ngIf="data$ | async as data"> <h2>{{ data.title }}</h2> <p>{{ data.description }}</p> </div> `})export class MyComponent { counter$ = interval(1000); user$ = this.http.get<User>('/api/user');
// Без shareReplay — два отдельных HTTP запроса! data$ = this.http.get<Data>('/api/data').pipe( shareReplay(1) // кешируем последнее значение );}Паттерн as позволяет не дублировать подписку:
<!-- ❌ Плохо — два запроса к API! --><div>{{ data$ | async | json }}</div><div>{{ (data$ | async)?.title }}</div>
<!-- ✅ Хорошо — один запрос, результат в переменной --><ng-container *ngIf="data$ | async as data"> <div>{{ data | json }}</div> <div>{{ data.title }}</div></ng-container>🧹 takeUntilDestroyed: Angular 16+
Заголовок раздела «🧹 takeUntilDestroyed: Angular 16+»До Angular 16 приходилось вручную управлять отпиской через Subject и takeUntil. Теперь есть элегантное решение:
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
@Component({...})export class MyComponent { // DestroyRef внедряется автоматически из контекста компонента constructor() { interval(1000).pipe( takeUntilDestroyed() // автоматически отписывается при destroy ).subscribe(n => console.log(n)); }
// Или явно через inject private destroyRef = inject(DestroyRef);
ngOnInit() { this.someService.data$.pipe( takeUntilDestroyed(this.destroyRef) ).subscribe(data => this.data = data); }}
// Старый способ (Angular < 16)export class OldComponent implements OnInit, OnDestroy { private destroy$ = new Subject<void>();
ngOnInit() { interval(1000).pipe( takeUntil(this.destroy$) ).subscribe(n => console.log(n)); }
ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); }}📡 Типы Subject
Заголовок раздела «📡 Типы Subject»Subject — это одновременно Observable (можно подписаться) и Observer (можно отправлять значения). Как радиостанция: вещает всем подписчикам одновременно 📻.
// 1️⃣ Subject — базовый multicastconst subject = new Subject<number>();subject.subscribe(v => console.log('A:', v));subject.subscribe(v => console.log('B:', v));subject.next(1); // A: 1, B: 1subject.next(2); // A: 2, B: 2// ⚠️ Новый подписчик НЕ получает прошлые значения
// 2️⃣ BehaviorSubject — хранит текущее значение (идеален для стора!)const bs = new BehaviorSubject<number>(0); // начальное значениеbs.subscribe(v => console.log('C:', v)); // C: 0 (сразу!)bs.next(1); // C: 1bs.next(2); // C: 2const newSub = bs.subscribe(v => console.log('D:', v)); // D: 2 (получает текущее!)console.log(bs.getValue()); // 2 — синхронное получение
// 3️⃣ ReplaySubject — воспроизводит N последних значенийconst rs = new ReplaySubject<number>(3); // буфер из 3 значенийrs.next(1);rs.next(2);rs.next(3);rs.next(4);rs.subscribe(v => console.log('E:', v)); // E: 2, E: 3, E: 4 (последние 3)
// 4️⃣ AsyncSubject — только последнее значение при complete()const as = new AsyncSubject<number>();as.subscribe(v => console.log('F:', v));as.next(1); // ничегоas.next(2); // ничегоas.next(3); // ничегоas.complete(); // F: 3 (только последнее!)🔧 Популярные операторы RxJS в Angular
Заголовок раздела «🔧 Популярные операторы RxJS в Angular»import { map, filter, tap, switchMap, mergeMap, concatMap, exhaustMap, debounceTime, distinctUntilChanged, combineLatest, forkJoin, startWith, catchError } from 'rxjs/operators';
// map — трансформация данныхusers$.pipe( map(users => users.filter(u => u.active)))
// tap — side effects без изменения потока (логирование)http.get('/api/data').pipe( tap(data => console.log('Получили:', data)), map(data => data.items))
// switchMap — отменяет предыдущий запрос (для поиска!)searchInput$.pipe( debounceTime(300), switchMap(query => http.get('/api/search?q=' + query)))
// mergeMap — параллельные запросы (все выполняются)ids$.pipe(mergeMap(id => http.get('/api/users/' + id)))
// concatMap — последовательные запросы (по очереди)ids$.pipe(concatMap(id => http.get('/api/users/' + id)))
// exhaustMap — игнорирует новые пока текущий не завершён (для submit)submitBtn$.pipe(exhaustMap(() => http.post('/api/submit', data)))
// combineLatest — объединяет несколько потоков (каждый раз при обновлении любого)combineLatest([users$, roles$]).pipe( map(([users, roles]) => users.map(u => ({ ...u, role: roles[u.id] }))))
// forkJoin — параллельные запросы, ждёт завершения ВСЕХforkJoin({ users: http.get('/api/users'), products: http.get('/api/products'), config: http.get('/api/config'),}).subscribe(({ users, products, config }) => { // Все три ответа получены})
// startWith — начальное значение (для loading state)data$ = http.get('/api/data').pipe( startWith(null) // сначала null, потом данные)🏪 BehaviorSubject как простой стор состояния
Заголовок раздела «🏪 BehaviorSubject как простой стор состояния»@Injectable({ providedIn: 'root' })export class UserStore { private state = new BehaviorSubject<UserState>({ users: [], loading: false, error: null });
// Публичные Observable для чтения (readonly!) users$ = this.state.pipe(map(s => s.users)); loading$ = this.state.pipe(map(s => s.loading)); error$ = this.state.pipe(map(s => s.error));
private setState(patch: Partial<UserState>) { this.state.next({ ...this.state.getValue(), ...patch }); }
loadUsers() { this.setState({ loading: true, error: null }); return this.http.get<User[]>('/api/users').pipe( tap(users => this.setState({ users, loading: false })), catchError(err => { this.setState({ loading: false, error: err.message }); return EMPTY; }) ); }}
// В компоненте — только читаем через async pipe@Component({ template: ` <div *ngIf="store.loading$ | async">Загрузка...</div> <div *ngIf="store.error$ | async as err" class="error">{{ err }}</div> <ul> <li *ngFor="let user of store.users$ | async">{{ user.name }}</li> </ul> `})export class UsersComponent { constructor(public store: UserStore) { store.loadUsers().subscribe(); }}🔍 Поиск с debounce: полный пример
Заголовок раздела «🔍 Поиск с debounce: полный пример»@Component({ template: ` <input [formControl]="searchCtrl" placeholder="Поиск..."> <div *ngIf="loading$ | async">🔄 Ищем...</div> <ul> <li *ngFor="let result of results$ | async">{{ result.name }}</li> </ul> `})export class SearchComponent { searchCtrl = new FormControl(''); loading$ = new BehaviorSubject(false);
results$ = this.searchCtrl.valueChanges.pipe( debounceTime(300), // ждём 300ms после последнего символа distinctUntilChanged(), // игнорируем одинаковые запросы filter(q => q!.length >= 2), // минимум 2 символа tap(() => this.loading$.next(true)), switchMap(query => // отменяем предыдущий запрос! this.http.get<Result[]>('/api/search', { params: { q: query! } }).pipe( catchError(() => of([])) // при ошибке возвращаем пустой массив ) ), tap(() => this.loading$.next(false)), startWith([]) // начальное значение );}💀 Состояние загрузки и ошибки
Заголовок раздела «💀 Состояние загрузки и ошибки»// Паттерн: data / loading / error через один Observableinterface LoadingState<T> { data: T | null; loading: boolean; error: string | null;}
data$ = this.http.get<Product[]>('/api/products').pipe( map(data => ({ data, loading: false, error: null })), startWith({ data: null, loading: true, error: null }), catchError(err => of({ data: null, loading: false, error: err.message })));
// В шаблоне// <ng-container *ngIf="data$ | async as state">// <div *ngIf="state.loading">Загрузка...</div>// <div *ngIf="state.error" class="error">{{ state.error }}</div>// <ul *ngIf="state.data">// <li *ngFor="let item of state.data">{{ item.name }}</li>// </ul>// </ng-container>Практика
Заголовок раздела «Практика»Попробуйте концепцию в интерактивном редакторе: