Перейти к содержимому

18. Event-Driven Architecture

Event-Driven Architecture (EDA) — архитектурный стиль, в котором компоненты системы общаются через события. Компоненты независимы: производитель события не знает кто его обрабатывает.


  1. Слабая связанность — сервисы не вызывают друг друга напрямую
  2. Асинхронность — события обрабатываются независимо
  3. Масштабируемость — можно добавить обработчик без изменения производителя
  4. Аудит — история событий = история изменений системы

// 1. Domain Events — что произошло в бизнес-домене
interface UserRegisteredEvent {
type: 'user.registered';
userId: string;
email: string;
registeredAt: string;
}
// 2. Integration Events — для коммуникации между сервисами
interface OrderShippedIntegrationEvent {
type: 'order.shipped';
orderId: string;
trackingCode: string;
estimatedDelivery: string;
}
// 3. System Events — технические события
interface ServiceHealthCheckEvent {
type: 'service.health_check';
serviceId: string;
status: 'healthy' | 'degraded' | 'unhealthy';
timestamp: string;
}

type EventHandler<T = any> = (event: T) => Promise<void> | void;
class EventBus {
private handlers = new Map<string, Set<EventHandler>>();
private deadLetterQueue: Array<{ event: any; error: Error }> = [];
subscribe<T>(eventType: string, handler: EventHandler<T>): () => void {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, new Set());
}
this.handlers.get(eventType)!.add(handler as EventHandler);
return () => this.handlers.get(eventType)?.delete(handler as EventHandler);
}
async publish<T>(eventType: string, payload: T): Promise<void> {
const event = {
id: crypto.randomUUID(),
type: eventType,
payload,
publishedAt: new Date().toISOString(),
};
const handlers = this.handlers.get(eventType);
if (!handlers) return;
const promises = Array.from(handlers).map(async handler => {
try {
await handler(event);
} catch (error) {
console.error(`Handler failed for event ${eventType}:`, error);
this.deadLetterQueue.push({ event, error: error as Error });
}
});
await Promise.allSettled(promises);
}
getDeadLetterQueue() {
return [...this.deadLetterQueue];
}
}

Паттерн, при котором состояние объекта восстанавливается из последовательности событий:

// Каждое событие описывает изменение состояния
type BankAccountEvent =
| { type: 'AccountOpened'; accountId: string; ownerId: string; initialBalance: number }
| { type: 'MoneyDeposited'; amount: number; timestamp: string }
| { type: 'MoneyWithdrawn'; amount: number; timestamp: string }
| { type: 'AccountClosed'; reason: string };
// Состояние восстанавливается из событий
class BankAccount {
private id = '';
private ownerId = '';
private balance = 0;
private isClosed = false;
private transactions: { type: string; amount: number; timestamp: string }[] = [];
// Применяем события к состоянию
private apply(event: BankAccountEvent): void {
switch (event.type) {
case 'AccountOpened':
this.id = event.accountId;
this.ownerId = event.ownerId;
this.balance = event.initialBalance;
break;
case 'MoneyDeposited':
this.balance += event.amount;
this.transactions.push({ type: 'deposit', amount: event.amount, timestamp: event.timestamp });
break;
case 'MoneyWithdrawn':
this.balance -= event.amount;
this.transactions.push({ type: 'withdrawal', amount: event.amount, timestamp: event.timestamp });
break;
case 'AccountClosed':
this.isClosed = true;
break;
}
}
// Восстанавливаем из истории
static fromEvents(events: BankAccountEvent[]): BankAccount {
const account = new BankAccount();
events.forEach(event => account.apply(event));
return account;
}
// Бизнес-операции создают события
deposit(amount: number): BankAccountEvent {
if (this.isClosed) throw new Error('Account is closed');
if (amount <= 0) throw new Error('Amount must be positive');
const event: BankAccountEvent = {
type: 'MoneyDeposited',
amount,
timestamp: new Date().toISOString(),
};
this.apply(event);
return event; // Сохраняем событие, а не состояние!
}
withdraw(amount: number): BankAccountEvent {
if (this.isClosed) throw new Error('Account is closed');
if (amount > this.balance) throw new Error('Insufficient funds');
const event: BankAccountEvent = {
type: 'MoneyWithdrawn',
amount,
timestamp: new Date().toISOString(),
};
this.apply(event);
return event;
}
getBalance(): number { return this.balance; }
getTransactionHistory() { return [...this.transactions]; }
}
// Репозиторий хранит события, а не состояние
class BankAccountRepository {
private eventStore = new Map<string, BankAccountEvent[]>();
async save(accountId: string, newEvents: BankAccountEvent[]): Promise<void> {
const existing = this.eventStore.get(accountId) ?? [];
this.eventStore.set(accountId, [...existing, ...newEvents]);
}
async load(accountId: string): Promise<BankAccount> {
const events = this.eventStore.get(accountId) ?? [];
return BankAccount.fromEvents(events);
}
// Можно воспроизвести состояние на любой момент времени!
async loadAtTimestamp(accountId: string, timestamp: string): Promise<BankAccount> {
const events = this.eventStore.get(accountId) ?? [];
const eventsBeforeTimestamp = events.filter(e => {
if ('timestamp' in e) return e.timestamp <= timestamp;
return true;
});
return BankAccount.fromEvents(eventsBeforeTimestamp);
}
}

// Competing Consumers — несколько воркеров обрабатывают одну очередь
class OrderProcessor {
async startWorker(workerId: string): Promise<void> {
while (true) {
const message = await messageQueue.receive('orders');
if (!message) {
await sleep(1000);
continue;
}
try {
await this.processOrder(message.payload);
await message.ack();
console.log(`Worker ${workerId}: processed order ${message.payload.orderId}`);
} catch (error) {
await message.nack(); // Вернуть в очередь
console.error(`Worker ${workerId}: failed to process order`, error);
}
}
}
}
// Запускаем несколько воркеров для параллельной обработки
for (let i = 0; i < 3; i++) {
new OrderProcessor().startWorker(`worker-${i}`);
}

Используй EDA когда:

  • Нужна слабая связанность между сервисами
  • Обработка задач может быть асинхронной
  • Нужен audit trail (история событий)
  • Большой поток данных требует масштабирования потребителей

Не используй EDA когда:

  • Нужен немедленный ответ (используй REST/gRPC)
  • Простая маленькая система
  • Порядок выполнения критически важен (сложно гарантировать)

  1. Реализуй простой MessageQueue с методами publish(queue, message), receive(queue), ack(messageId).

  2. Создай Event-Driven систему уведомлений: событие order.created → email + SMS + push уведомления.

  3. Реализуй EventSourcing для корзины покупок: события AddItem, RemoveItem, ApplyDiscount, Checkout.

  4. Чем Event Sourcing отличается от обычного логирования действий?

  5. Изучи Apache Kafka или RabbitMQ — какой выбрать и в каких случаях?