Перейти к содержимому

13. Schedulers

Привет! Яша на связи. Сегодня разбираем одну из самых «низкоуровневых» и при этом мощных концепций RxJS — Schedulers. Это механизм, который контролирует когда и как выполняется код Observable.

Scheduler — это объект, который определяет:

  1. Когда начать выполнение (сейчас, в следующем таске, в следующем кадре анимации)
  2. Как планировать подписку и передачу значений

Без Scheduler операторы RxJS работают синхронно или используют встроенные таймеры JavaScript. Scheduler позволяет переключать контекст выполнения.

import { of, scheduled } from 'rxjs';
import { asyncScheduler, queueScheduler } from 'rxjs';
// Без scheduler — синхронно
console.log('до подписки');
of(1, 2, 3).subscribe(console.log);
console.log('после подписки');
// до подписки → 1 → 2 → 3 → после подписки
// С asyncScheduler — асинхронно (через setTimeout)
console.log('до подписки');
scheduled([1, 2, 3], asyncScheduler).subscribe(console.log);
console.log('после подписки');
// до подписки → после подписки → 1 → 2 → 3

asyncScheduler использует setTimeout / setInterval. Все операции становятся асинхронными.

import { asyncScheduler } from 'rxjs';
// schedule(task, delay, state)
const subscription = asyncScheduler.schedule(
function(count) {
console.log(`Тик #${count}`);
// Рекурсивный вызов для повторения
this.schedule(count + 1, 1000);
},
0, // первый тик через 0мс
0 // начальное состояние
);
// Отмена через 5 секунд
setTimeout(() => subscription.unsubscribe(), 5000);

interval и timer используют asyncScheduler по умолчанию:

import { interval, timer } from 'rxjs';
import { asyncScheduler } from 'rxjs';
// Эти два эквивалентны:
interval(1000);
interval(1000, asyncScheduler);
// observeOn — переключаем scheduler для уведомлений
import { observeOn } from 'rxjs/operators';
source$.pipe(
observeOn(asyncScheduler) // значения доставляются асинхронно
).subscribe(console.log);

queueScheduler использует очередь (FIFO) и выполняет задачи синхронно, но откладывает рекурсивные вызовы.

import { queueScheduler } from 'rxjs';
queueScheduler.schedule(() => {
console.log('Задача 1');
// Вложенный schedule — добавится в очередь, не выполнится рекурсивно
queueScheduler.schedule(() => console.log('Задача 1.1'));
console.log('Задача 1 (конец)');
});
queueScheduler.schedule(() => console.log('Задача 2'));
// Вывод: Задача 1 → Задача 1 (конец) → Задача 1.1 → Задача 2
// Рекурсия безопасна — нет переполнения стека

Полезен для рекурсивных операций, которые иначе вызвали бы stack overflow:

import { range } from 'rxjs';
import { queueScheduler } from 'rxjs';
// range синхронен, но с queueScheduler не переполнит стек
range(1, 1_000_000, queueScheduler).subscribe(/* ... */);

animationFrameScheduler привязывает выполнение к requestAnimationFrame. Идеален для анимаций — обновления гарантированно происходят перед следующим рисованием браузера (~60fps).

import { animationFrameScheduler, interval } from 'rxjs';
import { observeOn } from 'rxjs/operators';
// Вместо setInterval — используем requestAnimationFrame
interval(0, animationFrameScheduler).subscribe(() => {
// Вызывается ~60 раз в секунду, синхронно с отрисовкой
updateCanvas();
});
// Или с observeOn для плавных обновлений DOM
data$.pipe(
observeOn(animationFrameScheduler)
).subscribe(data => {
element.style.transform = \`translateX(\${data.x}px)\`;
});

import { observeOn, subscribeOn } from 'rxjs/operators';
import { asyncScheduler, queueScheduler } from 'rxjs';
// observeOn — меняет scheduler для ДОСТАВКИ значений (next/error/complete)
source$.pipe(
observeOn(asyncScheduler) // subscribe вызывается синхронно, next — асинхронно
).subscribe(console.log);
// subscribeOn — меняет scheduler для ПОДПИСКИ (когда Observable начинает выполняться)
source$.pipe(
subscribeOn(asyncScheduler) // сама подписка откладывается
).subscribe(console.log);
// Практический пример: тяжелые вычисления не блокируют UI
heavyComputation$.pipe(
subscribeOn(asyncScheduler), // не блокирует текущий поток
observeOn(animationFrameScheduler) // результаты обновляют DOM плавно
).subscribe(updateUI);

TestScheduler позволяет ускорять время в тестах — вместо ожидания реальных секунд, время «прокручивается» мгновенно.

import { TestScheduler } from 'rxjs/testing';
import { delay, debounceTime } from 'rxjs/operators';
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, hot, expectObservable }) => {
// Мраморные строки — 1 символ = 1 виртуальный фрейм (10мс по умолчанию)
const source$ = cold('--a--b--c|');
const result$ = source$.pipe(delay(20));
// Ожидаем что значения появятся с задержкой 2 фрейма
expectObservable(result$).toBe('----a--b--c|');
});
// Тест выполняется мгновенно, не ждёт реальных задержек!

Попробуйте примеры в интерактивном редакторе: