60. WebSockets
63. WebSockets в Angular 🔌
Заголовок раздела «63. WebSockets в Angular 🔌»Привет! Яша здесь. WebSocket — это постоянное двусторонее соединение между браузером и сервером. Идеально для чатов, дашбордов реального времени, онлайн-игр. RxJS делает работу с WebSocket элегантной 🚀
webSocket() из rxjs/webSocket
Заголовок раздела «webSocket() из rxjs/webSocket»RxJS предоставляет готовый Subject для WebSocket:
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
const socket$ = webSocket('wss://api.example.com/ws');
// Отправить сообщениеsocket$.next({ type: 'message', text: 'Hello!' });
// Получить сообщенияsocket$.subscribe({ next: (message) => console.log('Получено:', message), error: (err) => console.error('Ошибка:', err), complete: () => console.log('Соединение закрыто'),});
// Закрыть соединениеsocket$.complete();WebSocket Service: production-паттерн
Заголовок раздела «WebSocket Service: production-паттерн»import { Injectable, OnDestroy } from '@angular/core';import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';import { Observable, Subject, timer, EMPTY } from 'rxjs';import { switchMap, catchError, retryWhen, delay, filter, share, takeUntil, tap} from 'rxjs/operators';import { environment } from '../environments/environment';
export interface WsMessage<T = unknown> { type: string; payload: T; timestamp?: number;}
@Injectable({ providedIn: 'root' })export class WebSocketService implements OnDestroy { private socket$!: WebSocketSubject<WsMessage>; private destroy$ = new Subject<void>(); private messages$!: Observable<WsMessage>; private reconnectAttempts = 0; private readonly MAX_RECONNECT_ATTEMPTS = 5;
connect(url: string = environment.wsUrl): void { const config: WebSocketSubjectConfig<WsMessage> = { url, openObserver: { next: () => { console.log('WebSocket подключён'); this.reconnectAttempts = 0; } }, closeObserver: { next: () => { console.log('WebSocket отключён'); } }, };
this.socket$ = webSocket<WsMessage>(config);
this.messages$ = this.socket$.pipe( // Автоматическое переподключение retryWhen(errors => errors.pipe( tap(err => { this.reconnectAttempts++; console.warn(\`WebSocket ошибка (попытка \${this.reconnectAttempts}):\`, err); }), // Экспоненциальная задержка: 1s, 2s, 4s, 8s... delay(Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000)), switchMap(() => { if (this.reconnectAttempts >= this.MAX_RECONNECT_ATTEMPTS) { console.error('Превышено число попыток переподключения'); return EMPTY; } return [true]; // продолжаем }), )), catchError(() => EMPTY), takeUntil(this.destroy$), share(), // один WebSocket на всех подписчиков ); }
// Подписаться на конкретный тип сообщений on<T>(type: string): Observable<T> { return this.messages$.pipe( filter(msg => msg.type === type), // Используем unknown → T для типизации payload ) as Observable<T>; }
// Отправить сообщение send<T>(type: string, payload: T): void { this.socket$.next({ type, payload, timestamp: Date.now(), }); }
ngOnDestroy(): void { this.destroy$.next(); this.destroy$.complete(); this.socket$.complete(); }}Использование в компоненте
Заголовок раздела «Использование в компоненте»import { Component, OnInit, OnDestroy, signal } from '@angular/core';import { WebSocketService, WsMessage } from '../websocket.service';import { Subject } from 'rxjs';import { takeUntil } from 'rxjs/operators';
interface ChatMessage { id: string; userId: string; userName: string; text: string; timestamp: number;}
@Component({ selector: 'app-chat', template: ` <div class="chat-container"> <div class="messages"> @for (msg of messages(); track msg.id) { <div class="message" [class.own]="msg.userId === currentUserId"> <span class="author">{{ msg.userName }}</span> <span class="text">{{ msg.text }}</span> <span class="time">{{ msg.timestamp | date:'HH:mm' }}</span> </div> } </div>
<div class="input-area"> <input [(ngModel)]="messageText" (keydown.enter)="sendMessage()" placeholder="Введите сообщение..."> <button (click)="sendMessage()">Отправить</button> </div>
<div class="status" [class.connected]="isConnected()"> {{ isConnected() ? '🟢 Подключён' : '🔴 Переподключение...' }} </div> </div> `,})export class ChatComponent implements OnInit, OnDestroy { messages = signal<ChatMessage[]>([]); isConnected = signal(false); messageText = ''; currentUserId = 'user-1';
private destroy$ = new Subject<void>();
constructor(private ws: WebSocketService) {}
ngOnInit(): void { this.ws.connect();
// Подписка на сообщения чата this.ws.on<ChatMessage>('chat:message') .pipe(takeUntil(this.destroy$)) .subscribe(msg => { this.messages.update(msgs => [...msgs, msg]); });
// Подписка на статус подключения this.ws.on<boolean>('connection:status') .pipe(takeUntil(this.destroy$)) .subscribe(status => this.isConnected.set(status));
// Уведомления о typing this.ws.on<{ userId: string; isTyping: boolean }>('chat:typing') .pipe(takeUntil(this.destroy$)) .subscribe(({ userId, isTyping }) => { console.log(\`\${userId} \${isTyping ? 'печатает...' : 'перестал печатать'}\`); }); }
sendMessage(): void { if (!this.messageText.trim()) return;
this.ws.send('chat:message', { text: this.messageText, userId: this.currentUserId, });
this.messageText = ''; }
onTyping(): void { this.ws.send('chat:typing', { userId: this.currentUserId, isTyping: true, }); }
ngOnDestroy(): void { this.destroy$.next(); this.destroy$.complete(); }}STOMP протокол (Spring/Java backends)
Заголовок раздела «STOMP протокол (Spring/Java backends)»npm install @stomp/rx-stompimport { Injectable } from '@angular/core';import { RxStomp, RxStompConfig } from '@stomp/rx-stomp';import { Observable } from 'rxjs';import { map } from 'rxjs/operators';
@Injectable({ providedIn: 'root' })export class StompService { private stomp = new RxStomp();
connect(): void { const config: RxStompConfig = { brokerURL: 'wss://api.example.com/stomp', connectHeaders: { Authorization: \`Bearer \${localStorage.getItem('token')}\`, }, heartbeatIncoming: 4000, heartbeatOutgoing: 4000, reconnectDelay: 3000, };
this.stomp.configure(config); this.stomp.activate(); }
// Подписаться на топик watch<T>(destination: string): Observable<T> { return this.stomp.watch(destination).pipe( map(message => JSON.parse(message.body) as T) ); }
// Отправить сообщение publish<T>(destination: string, body: T): void { this.stomp.publish({ destination, body: JSON.stringify(body), }); }
get connected$(): Observable<boolean> { return this.stomp.connected$; }}
// Использованиеthis.stompService.watch<Notification>('/topic/notifications') .subscribe(notification => this.showNotification(notification));
this.stompService.publish('/app/chat', { text: 'Hello STOMP!' });Subject-based Message Bus
Заголовок раздела «Subject-based Message Bus»// message-bus.service.ts — внутренняя шина событийimport { Injectable } from '@angular/core';import { Subject, Observable, filter, map } from 'rxjs';
type EventTypes = 'user:login' | 'user:logout' | 'notification' | 'cart:update';
interface BusEvent<T = unknown> { type: EventTypes; data: T;}
@Injectable({ providedIn: 'root' })export class MessageBusService { private bus = new Subject<BusEvent>();
emit<T>(type: EventTypes, data: T): void { this.bus.next({ type, data }); }
on<T>(type: EventTypes): Observable<T> { return this.bus.pipe( filter(event => event.type === type), map(event => event.data as T), ); }}Reconnection Strategy: практический пример
Заголовок раздела «Reconnection Strategy: практический пример»import { webSocket } from 'rxjs/webSocket';import { Observable, Subject, timer, EMPTY } from 'rxjs';import { switchMap, catchError, tap, share } from 'rxjs/operators';
function createReconnectingSocket<T>(url: string): Observable<T> { return new Observable<T>(observer => { let socket: WebSocketSubject<T>; let attempts = 0;
const connect = () => { socket = webSocket<T>(url); socket.pipe( catchError(err => { attempts++; const backoff = Math.min(1000 * 2 ** attempts, 30000); console.log(\`Reconnect in \${backoff}ms (attempt \${attempts})\`); return timer(backoff).pipe( switchMap(() => { connect(); return EMPTY; }) ); }) ).subscribe(observer); };
connect();
return () => socket?.complete(); }).pipe(share());}Playground 🎮
Заголовок раздела «Playground 🎮»Симулятор чата с WebSocket в реальном времени: