Перейти к содержимому

16. RxJS + React

Привет, кодер! Яша здесь. Ты уже умеешь создавать Observable-потоки, применять операторы и управлять подписками. Настало время соединить RxJS с React — и это дуэт, который реально меняет игру. Реакт отвечает за UI, RxJS — за потоки данных. Идеальное разделение ответственности!


fromEvent превращает любое DOM-событие в Observable. Это один из самых частых паттернов RxJS + React.

import { fromEvent, Observable } from 'rxjs';
import { map, distinctUntilChanged, debounceTime } from 'rxjs/operators';
// Поток изменения размера окна
const resize$ = fromEvent(window, 'resize').pipe(
map(() => ({ width: window.innerWidth, height: window.innerHeight })),
distinctUntilChanged(
(a, b) => a.width === b.width && a.height === b.height
),
debounceTime(200) // не спамим обновлениями
);
// Поток движения мыши
const mousemove$ = fromEvent<MouseEvent>(document, 'mousemove').pipe(
map(e => ({ x: e.clientX, y: e.clientY })),
debounceTime(16) // ~60fps
);
// Поток нажатий клавиш — например, Ctrl+S
const save$ = fromEvent<KeyboardEvent>(document, 'keydown').pipe(
filter(e => e.ctrlKey && e.key === 's')
);

Главное правило: всегда отписывайся при размонтировании компонента. Иначе получишь утечку памяти и ошибки обновления размонтированного компонента.


Вот канонический способ подключить Observable к React-компоненту:

import React, { useState, useEffect } from 'react';
import { fromEvent } from 'rxjs';
import { map, debounceTime, distinctUntilChanged } from 'rxjs/operators';
interface WindowSize {
width: number;
height: number;
}
function useWindowSize(): WindowSize {
const [size, setSize] = useState<WindowSize>({
width: window.innerWidth,
height: window.innerHeight,
});
useEffect(() => {
const resize$ = fromEvent(window, 'resize').pipe(
map(() => ({
width: window.innerWidth,
height: window.innerHeight,
})),
debounceTime(200),
distinctUntilChanged((a, b) => a.width === b.width && a.height === b.height)
);
// Создаём подписку
const subscription = resize$.subscribe(setSize);
// Функция очистки — вызывается при размонтировании!
return () => subscription.unsubscribe();
}, []); // пустой массив зависимостей = один раз при монтировании
return size;
}
function WindowSizeDisplay() {
const { width, height } = useWindowSize();
return <div>Окно: {width} × {height}</div>;
}

Структура useEffect с RxJS всегда одинакова:

  1. Создать Observable внутри эффекта
  2. Подписаться, сохранить subscription
  3. Вернуть () => subscription.unsubscribe()

Subject — это шина событий. Отлично работает для передачи данных между несвязанными компонентами:

import { Subject, BehaviorSubject } from 'rxjs';
import { filter } from 'rxjs/operators';
// Тип события
interface AppEvent {
type: string;
payload?: unknown;
}
// Создаём шину — один раз, вне компонентов
const eventBus$ = new Subject<AppEvent>();
// Отправитель: компонент-кнопка в глубине дерева
function DeepButton() {
const handleClick = () => {
eventBus$.next({ type: 'USER_ACTION', payload: { id: 42 } });
};
return <button onClick={handleClick}>Сделать что-то</button>;
}
// Получатель: компонент где-то в другом месте дерева
function StatusPanel() {
const [lastAction, setLastAction] = useState<string | null>(null);
useEffect(() => {
const sub = eventBus$.pipe(
filter(e => e.type === 'USER_ACTION')
).subscribe(e => {
setLastAction(`Действие с id: ${(e.payload as any).id}`);
});
return () => sub.unsubscribe();
}, []);
return <div>Последнее: {lastAction ?? 'нет'}</div>;
}

Это работает без Redux, Context API и пропов. Просто Subject как глобальная шина.


Реальный пример: симуляция WebSocket-потока данных в React-компоненте:

import { Observable, Subject, interval } from 'rxjs';
import { map, takeUntil, share } from 'rxjs/operators';
interface StockPrice {
symbol: string;
price: number;
change: number;
}
// Фабрика "WebSocket"-потока (симуляция)
function createStockStream(symbol: string): Observable<StockPrice> {
return interval(1000).pipe(
map(() => ({
symbol,
price: 100 + Math.random() * 50,
change: (Math.random() - 0.5) * 5,
})),
share() // многоразовый поток — все подписчики получают одно значение
);
}
function StockTicker({ symbol }: { symbol: string }) {
const [data, setData] = useState<StockPrice | null>(null);
useEffect(() => {
const stock$ = createStockStream(symbol);
const sub = stock$.subscribe(setData);
return () => sub.unsubscribe();
}, [symbol]); // перезапускаем если symbol меняется
if (!data) return <div>Загрузка...</div>;
const color = data.change >= 0 ? 'green' : 'red';
return (
<div>
<b>{data.symbol}</b>: ${data.price.toFixed(2)}
<span style={{ color }}> {data.change >= 0 ? '+' : ''}{data.change.toFixed(2)}</span>
</div>
);
}

Ошибка 1: Забыть отписаться

// ❌ УТЕЧКА ПАМЯТИ!
useEffect(() => {
interval(1000).subscribe(tick => console.log(tick));
}, []);
// ✅ Правильно
useEffect(() => {
const sub = interval(1000).subscribe(tick => console.log(tick));
return () => sub.unsubscribe();
}, []);

Ошибка 2: Создавать Subject внутри компонента

// ❌ Новый Subject при каждом рендере!
function BadComponent() {
const subject$ = new Subject(); // пересоздаётся!
// ...
}
// ✅ useRef или вынести за пределы компонента
function GoodComponent() {
const subject$ = useRef(new Subject()).current;
// ...
}

Ошибка 3: Не учитывать изменение зависимостей

// ❌ Подписка не обновляется при смене userId
useEffect(() => {
userStream(userId).subscribe(setUser);
return () => sub.unsubscribe();
}, []); // забыли userId!
// ✅
useEffect(() => {
const sub = userStream(userId).subscribe(setUser);
return () => sub.unsubscribe();
}, [userId]); // правильно!


Попробуйте примеры в интерактивном редакторе: