Перейти к содержимому

20. RxJS + MobX

Яша здесь. Оба инструмента — реактивные. Оба отслеживают изменения и уведомляют зависимости. Но они разные по природе: MobX — это реактивная объектная модель (observables как свойства объектов), RxJS — это реактивные потоки (observable как цепочки событий). Разберём разницу и научимся их комбинировать.


MobX превращает обычные JavaScript-объекты в реактивные. Ты меняешь свойство — всё что от него зависит пересчитывается автоматически:

// ── MobX (концептуально) ──────────────────────────────────────
import { makeObservable, observable, computed, action } from 'mobx';
class CartStore {
items: CartItem[] = []; // observable — MobX отслеживает
constructor() {
makeObservable(this, {
items: observable,
total: computed,
addItem: action,
});
}
get total() { // computed — пересчитывается автоматически
return this.items.reduce((s, i) => s + i.price * i.qty, 0);
}
addItem(item: CartItem) { // action — единственный способ мутировать
this.items.push(item);
}
}
// ── RxJS (тот же результат) ────────────────────────────────────
import { BehaviorSubject } from 'rxjs';
import { map, distinctUntilChanged, shareReplay } from 'rxjs/operators';
class CartStore {
private state$ = new BehaviorSubject<CartItem[]>([]);
// "computed" — это просто pipe()
readonly total$ = this.state$.pipe(
map(items => items.reduce((s, i) => s + i.price * i.qty, 0)),
distinctUntilChanged(),
shareReplay(1) // кэш последнего значения
);
// "action" — это next()
addItem(item: CartItem) {
this.state$.next([...this.state$.getValue(), item]);
}
// Доступ к Observable для подписок
get items$() {
return this.state$.asObservable();
}
}

Ключевая разница: MobX мутирует, RxJS заменяет. MobX работает с объектами, RxJS — с потоками.


Можно подключить MobX-стор к RxJS-потокам. Это полезно, когда нужны сложные операторы над MobX-данными:

import { from } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
import { reaction, autorun } from 'mobx';
import { observable, makeAutoObservable } from 'mobx';
class SearchStore {
query = '';
results: string[] = [];
constructor() {
makeAutoObservable(this);
}
}
const searchStore = new SearchStore();
// Конвертируем MobX observable в RxJS Observable
function fromMobX<T>(fn: () => T): Observable<T> {
return new Observable<T>(subscriber => {
// reaction запускается при каждом изменении
const disposer = reaction(
fn,
value => subscriber.next(value),
{ fireImmediately: true }
);
// Функция очистки — отключаем MobX reaction
return () => disposer();
});
}
// Теперь можем использовать все RxJS операторы!
const query$ = fromMobX(() => searchStore.query);
query$.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query =>
from(fetch(\`/api/search?q=\${query}\`).then(r => r.json()))
)
).subscribe(results => {
searchStore.results = results; // записываем обратно в MobX
});

Паттерн: RxJS обрабатывает сложную логику, MobX хранит состояние и предоставляет реактивность для UI:

import { Subject, interval, merge } from 'rxjs';
import { map, scan, bufferTime, filter } from 'rxjs/operators';
import { makeAutoObservable, action } from 'mobx';
interface LogEntry {
timestamp: number;
level: 'info' | 'warn' | 'error';
message: string;
}
class LogStore {
entries: LogEntry[] = [];
stats = { info: 0, warn: 0, error: 0 };
constructor() {
makeAutoObservable(this);
}
// action — единственный способ мутировать в MobX
addEntries = action((newEntries: LogEntry[]) => {
this.entries.push(...newEntries);
// Обновляем статистику
newEntries.forEach(e => this.stats[e.level]++);
// Ограничиваем размер лога
if (this.entries.length > 1000) {
this.entries = this.entries.slice(-500);
}
});
get errorEntries() {
return this.entries.filter(e => e.level === 'error');
}
}
const logStore = new LogStore();
// RxJS обрабатывает поток логов: буферизует, батчит → передаёт в MobX
const rawLog$ = new Subject<LogEntry>();
rawLog$.pipe(
bufferTime(500), // собираем записи каждые 500ms
filter(batch => batch.length > 0), // пропускаем пустые батчи
).subscribe(batch => logStore.addEntries(batch));
// Использование: просто пушим события в Subject
rawLog$.next({ timestamp: Date.now(), level: 'info', message: 'Пользователь вошёл' });
rawLog$.next({ timestamp: Date.now(), level: 'error', message: 'Ошибка API' });

Паттерн MobX computed можно воспроизвести через pipe(map(...), distinctUntilChanged(), shareReplay(1)):

import { BehaviorSubject, combineLatest } from 'rxjs';
import { map, distinctUntilChanged, shareReplay } from 'rxjs/operators';
interface AppState {
products: Product[];
cartIds: number[];
discount: number;
}
const state$ = new BehaviorSubject<AppState>({
products: [],
cartIds: [],
discount: 0,
});
// Derived observables — аналог MobX @computed
const cartItems$ = state$.pipe(
map(s => s.products.filter(p => s.cartIds.includes(p.id))),
distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)),
shareReplay(1) // MobX-like кэширование последнего значения
);
const subtotal$ = cartItems$.pipe(
map(items => items.reduce((s, i) => s + i.price, 0)),
distinctUntilChanged(),
shareReplay(1)
);
// combineLatest — аналог MobX reaction на несколько сторов
const finalPrice$ = combineLatest([subtotal$, state$.pipe(map(s => s.discount))]).pipe(
map(([subtotal, discount]) => subtotal * (1 - discount / 100)),
distinctUntilChanged(),
shareReplay(1)
);
// Автоматически обновляется при любом изменении зависимостей
finalPrice$.subscribe(price => console.log('Итого:', price));

Ошибка 1: Подписываться на MobX внутри RxJS без очистки

// ❌ MobX reaction не удаляется
fromMobX(() => store.count).subscribe(console.log);
// ✅ Храни disposer и отписывайся
const sub = fromMobX(() => store.count).subscribe(console.log);
// При размонтировании:
sub.unsubscribe(); // вызывает disposer внутри Observable

Ошибка 2: Мутировать MobX снаружи action

// ❌ MobX ругается в strict mode
fromRxJS$.subscribe(data => {
store.items = data; // прямая мутация!
});
// ✅ Через action
fromRxJS$.subscribe(action(data => {
store.items = data;
}));

Ошибка 3: Дублировать реактивность

// ❌ Зачем MobX если уже BehaviorSubject?
const mobxStore = makeAutoObservable({ count: 0 });
const rxStore = new BehaviorSubject(0);
// Синхронизировать их — боль
// ✅ Выбери одно: либо MobX хранит стейт, либо BehaviorSubject


Попробуйте примеры в интерактивном редакторе: