Перейти к содержимому

2. Observers и Subscriptions

👁️ Observers и Subscriptions — Подписки и утечки памяти

Заголовок раздела «👁️ Observers и Subscriptions — Подписки и утечки памяти»

Привет, Яша! 🎉 В прошлом уроке мы разобрались что такое Observable. Теперь поговорим о том, кто получает данные и как правильно управлять подписками (иначе будут проблемы! 😱)


Observer — это объект с тремя колбэк-функциями:

const myObserver = {
next: (value) => { /* получаем следующее значение */ },
error: (err) => { /* получаем ошибку — поток завершается */ },
complete: () => { /* поток завершён без ошибок */ }
};
import { Observable } from 'rxjs';
const data$ = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.error(new Error('Что-то пошло не так!')); // ❌ Ошибка!
subscriber.next(3); // Это уже не придёт — после error поток мёртв
});
data$.subscribe({
next: v => console.log('📦 Значение:', v),
error: e => console.log('❌ Ошибка:', e.message),
complete: () => console.log('✅ Завершено!') // Не вызовется — была ошибка
});
// 📦 Значение: 1
// 📦 Значение: 2
// ❌ Ошибка: Что-то пошло не так!

Частичный Observer — не все колбэки обязательны!

Заголовок раздела «Частичный Observer — не все колбэки обязательны!»
import { of } from 'rxjs';
// Можно передать только next
of(1, 2, 3).subscribe(v => console.log(v)); // Просто функция!
// Или объект с нужными полями
of(1, 2, 3).subscribe({
next: v => console.log(v),
// error и complete — опциональны
});

Когда вызываешь .subscribe(), ты получаешь объект Subscription:

import { interval } from 'rxjs';
// subscribe() возвращает Subscription
const subscription = interval(1000).subscribe(n => console.log(n));
console.log(subscription.closed); // false — подписка активна
// Через 3 секунды отписываемся
setTimeout(() => {
subscription.unsubscribe(); // 🛑 Остановить поток!
console.log(subscription.closed); // true — подписка закрыта
}, 3000);

Можно объединять несколько подписок в одну для удобной очистки:

import { interval, timer } from 'rxjs';
const mainSub = new Subscription();
// Добавляем дочерние подписки
mainSub.add(
interval(500).subscribe(n => console.log('Быстрый:', n))
);
mainSub.add(
interval(1000).subscribe(n => console.log('Медленный:', n))
);
// Одним вызовом отменяем ВСЕ подписки!
setTimeout(() => mainSub.unsubscribe(), 3000);

Вот типичная ошибка в React-приложениях:

// ❌ ПЛОХО — утечка памяти!
function BadComponent() {
useEffect(() => {
interval(1000).subscribe(n => {
setCount(n);
// Компонент может размонтироваться, но подписка продолжает работать!
// React выдаст ошибку: "Can't perform a state update on unmounted component"
});
// Нет очистки!
}, []);
return <div>{count}</div>;
}
// ✅ ХОРОШО — правильная очистка
function GoodComponent() {
useEffect(() => {
const sub = interval(1000).subscribe(n => setCount(n));
// Возвращаем функцию очистки — React вызовет её при unmount
return () => sub.unsubscribe(); // 🧹 Чистота!
}, []);
return <div>{count}</div>;
}

Когда Observable завершается автоматически? ✅

Заголовок раздела «Когда Observable завершается автоматически? ✅»

Некоторые Observable завершаются сами — тогда утечки нет:

import { of, from, timer } from 'rxjs';
import { take } from 'rxjs/operators';
// of() — завершается после всех значений
of(1, 2, 3).subscribe({
next: v => console.log(v),
complete: () => console.log('Автоматически завершился!')
});
// from(promise) — завершается после resolve/reject
from(fetch('/api/data')).subscribe(...);
// take(N) — завершается после N значений
interval(1000).pipe(take(3)).subscribe(...); // 0, 1, 2 — стоп
// timer(N) — одно значение и complete
timer(1000).subscribe(v => console.log(v)); // Один раз и всё

🔑 Правило: если Observable бесконечный (interval, fromEvent, Subject), всегда сохраняй unsubscribe или используй операторы завершения (take, takeUntil, takeWhile).


import { interval, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
// Создаём "сигнальный" Subject
const destroy$ = new Subject<void>();
// Поток будет работать пока destroy$ не эмитит
interval(500).pipe(
takeUntil(destroy$) // остановись когда destroy$ выстрелит
).subscribe(n => console.log(n));
// Через 3 секунды — отправляем сигнал завершения
setTimeout(() => {
destroy$.next(); // посылаем сигнал
destroy$.complete(); // закрываем Subject
}, 3000);

В Angular это стандартный паттерн для компонентов!


subscribe() → [active] → unsubscribe() или complete()/error() → [closed]
↓ ↓
next() events нет больше событий
import { Subject } from 'rxjs';
const sub$ = new Subject<number>();
const subscription = sub$.subscribe({
next: v => console.log('Получили:', v),
error: e => console.log('Ошибка:', e),
complete: () => console.log('Завершено!')
});
console.log('Закрыта?', subscription.closed); // false
sub$.next(1); // Получили: 1
sub$.next(2); // Получили: 2
subscription.unsubscribe();
console.log('Закрыта?', subscription.closed); // true
sub$.next(3); // Тишина — подписка закрыта!

Попробуйте примеры в интерактивном редакторе: