13. Schedulers
14. Schedulers: управление временем ⏰
Заголовок раздела «14. Schedulers: управление временем ⏰»Привет! Яша на связи. Сегодня разбираем одну из самых «низкоуровневых» и при этом мощных концепций RxJS — Schedulers. Это механизм, который контролирует когда и как выполняется код Observable.
Что такое Scheduler?
Заголовок раздела «Что такое Scheduler?»Scheduler — это объект, который определяет:
- Когда начать выполнение (сейчас, в следующем таске, в следующем кадре анимации)
- Как планировать подписку и передачу значений
Без Scheduler операторы RxJS работают синхронно или используют встроенные таймеры JavaScript. Scheduler позволяет переключать контекст выполнения.
import { of, scheduled } from 'rxjs';import { asyncScheduler, queueScheduler } from 'rxjs';
// Без scheduler — синхронноconsole.log('до подписки');of(1, 2, 3).subscribe(console.log);console.log('после подписки');// до подписки → 1 → 2 → 3 → после подписки
// С asyncScheduler — асинхронно (через setTimeout)console.log('до подписки');scheduled([1, 2, 3], asyncScheduler).subscribe(console.log);console.log('после подписки');// до подписки → после подписки → 1 → 2 → 3asyncScheduler
Заголовок раздела «asyncScheduler»asyncScheduler использует setTimeout / setInterval. Все операции становятся асинхронными.
import { asyncScheduler } from 'rxjs';
// schedule(task, delay, state)const subscription = asyncScheduler.schedule( function(count) { console.log(`Тик #${count}`); // Рекурсивный вызов для повторения this.schedule(count + 1, 1000); }, 0, // первый тик через 0мс 0 // начальное состояние);
// Отмена через 5 секундsetTimeout(() => subscription.unsubscribe(), 5000);interval и timer используют asyncScheduler по умолчанию:
import { interval, timer } from 'rxjs';import { asyncScheduler } from 'rxjs';
// Эти два эквивалентны:interval(1000);interval(1000, asyncScheduler);
// observeOn — переключаем scheduler для уведомленийimport { observeOn } from 'rxjs/operators';
source$.pipe( observeOn(asyncScheduler) // значения доставляются асинхронно).subscribe(console.log);queueScheduler
Заголовок раздела «queueScheduler»queueScheduler использует очередь (FIFO) и выполняет задачи синхронно, но откладывает рекурсивные вызовы.
import { queueScheduler } from 'rxjs';
queueScheduler.schedule(() => { console.log('Задача 1'); // Вложенный schedule — добавится в очередь, не выполнится рекурсивно queueScheduler.schedule(() => console.log('Задача 1.1')); console.log('Задача 1 (конец)');});queueScheduler.schedule(() => console.log('Задача 2'));
// Вывод: Задача 1 → Задача 1 (конец) → Задача 1.1 → Задача 2// Рекурсия безопасна — нет переполнения стекаПолезен для рекурсивных операций, которые иначе вызвали бы stack overflow:
import { range } from 'rxjs';import { queueScheduler } from 'rxjs';
// range синхронен, но с queueScheduler не переполнит стекrange(1, 1_000_000, queueScheduler).subscribe(/* ... */);animationFrameScheduler
Заголовок раздела «animationFrameScheduler»animationFrameScheduler привязывает выполнение к requestAnimationFrame. Идеален для анимаций — обновления гарантированно происходят перед следующим рисованием браузера (~60fps).
import { animationFrameScheduler, interval } from 'rxjs';import { observeOn } from 'rxjs/operators';
// Вместо setInterval — используем requestAnimationFrameinterval(0, animationFrameScheduler).subscribe(() => { // Вызывается ~60 раз в секунду, синхронно с отрисовкой updateCanvas();});
// Или с observeOn для плавных обновлений DOMdata$.pipe( observeOn(animationFrameScheduler)).subscribe(data => { element.style.transform = \`translateX(\${data.x}px)\`;});observeOn vs subscribeOn
Заголовок раздела «observeOn vs subscribeOn»import { observeOn, subscribeOn } from 'rxjs/operators';import { asyncScheduler, queueScheduler } from 'rxjs';
// observeOn — меняет scheduler для ДОСТАВКИ значений (next/error/complete)source$.pipe( observeOn(asyncScheduler) // subscribe вызывается синхронно, next — асинхронно).subscribe(console.log);
// subscribeOn — меняет scheduler для ПОДПИСКИ (когда Observable начинает выполняться)source$.pipe( subscribeOn(asyncScheduler) // сама подписка откладывается).subscribe(console.log);
// Практический пример: тяжелые вычисления не блокируют UIheavyComputation$.pipe( subscribeOn(asyncScheduler), // не блокирует текущий поток observeOn(animationFrameScheduler) // результаты обновляют DOM плавно).subscribe(updateUI);Virtual Time в тестах
Заголовок раздела «Virtual Time в тестах»TestScheduler позволяет ускорять время в тестах — вместо ожидания реальных секунд, время «прокручивается» мгновенно.
import { TestScheduler } from 'rxjs/testing';import { delay, debounceTime } from 'rxjs/operators';
const testScheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected);});
testScheduler.run(({ cold, hot, expectObservable }) => { // Мраморные строки — 1 символ = 1 виртуальный фрейм (10мс по умолчанию) const source$ = cold('--a--b--c|'); const result$ = source$.pipe(delay(20));
// Ожидаем что значения появятся с задержкой 2 фрейма expectObservable(result$).toBe('----a--b--c|');});// Тест выполняется мгновенно, не ждёт реальных задержек!Практика
Заголовок раздела «Практика»Попробуйте примеры в интерактивном редакторе: