Перейти к содержимому

60. WebSockets

Привет! Яша здесь. WebSocket — это постоянное двусторонее соединение между браузером и сервером. Идеально для чатов, дашбордов реального времени, онлайн-игр. RxJS делает работу с WebSocket элегантной 🚀


RxJS предоставляет готовый Subject для WebSocket:

import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
const socket$ = webSocket('wss://api.example.com/ws');
// Отправить сообщение
socket$.next({ type: 'message', text: 'Hello!' });
// Получить сообщения
socket$.subscribe({
next: (message) => console.log('Получено:', message),
error: (err) => console.error('Ошибка:', err),
complete: () => console.log('Соединение закрыто'),
});
// Закрыть соединение
socket$.complete();

websocket.service.ts
import { Injectable, OnDestroy } from '@angular/core';
import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { Observable, Subject, timer, EMPTY } from 'rxjs';
import {
switchMap, catchError, retryWhen, delay,
filter, share, takeUntil, tap
} from 'rxjs/operators';
import { environment } from '../environments/environment';
export interface WsMessage<T = unknown> {
type: string;
payload: T;
timestamp?: number;
}
@Injectable({ providedIn: 'root' })
export class WebSocketService implements OnDestroy {
private socket$!: WebSocketSubject<WsMessage>;
private destroy$ = new Subject<void>();
private messages$!: Observable<WsMessage>;
private reconnectAttempts = 0;
private readonly MAX_RECONNECT_ATTEMPTS = 5;
connect(url: string = environment.wsUrl): void {
const config: WebSocketSubjectConfig<WsMessage> = {
url,
openObserver: {
next: () => {
console.log('WebSocket подключён');
this.reconnectAttempts = 0;
}
},
closeObserver: {
next: () => {
console.log('WebSocket отключён');
}
},
};
this.socket$ = webSocket<WsMessage>(config);
this.messages$ = this.socket$.pipe(
// Автоматическое переподключение
retryWhen(errors => errors.pipe(
tap(err => {
this.reconnectAttempts++;
console.warn(\`WebSocket ошибка (попытка \${this.reconnectAttempts}):\`, err);
}),
// Экспоненциальная задержка: 1s, 2s, 4s, 8s...
delay(Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000)),
switchMap(() => {
if (this.reconnectAttempts >= this.MAX_RECONNECT_ATTEMPTS) {
console.error('Превышено число попыток переподключения');
return EMPTY;
}
return [true]; // продолжаем
}),
)),
catchError(() => EMPTY),
takeUntil(this.destroy$),
share(), // один WebSocket на всех подписчиков
);
}
// Подписаться на конкретный тип сообщений
on<T>(type: string): Observable<T> {
return this.messages$.pipe(
filter(msg => msg.type === type),
// Используем unknown → T для типизации payload
) as Observable<T>;
}
// Отправить сообщение
send<T>(type: string, payload: T): void {
this.socket$.next({
type,
payload,
timestamp: Date.now(),
});
}
ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
this.socket$.complete();
}
}

chat.component.ts
import { Component, OnInit, OnDestroy, signal } from '@angular/core';
import { WebSocketService, WsMessage } from '../websocket.service';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
interface ChatMessage {
id: string;
userId: string;
userName: string;
text: string;
timestamp: number;
}
@Component({
selector: 'app-chat',
template: `
<div class="chat-container">
<div class="messages">
@for (msg of messages(); track msg.id) {
<div class="message" [class.own]="msg.userId === currentUserId">
<span class="author">{{ msg.userName }}</span>
<span class="text">{{ msg.text }}</span>
<span class="time">{{ msg.timestamp | date:'HH:mm' }}</span>
</div>
}
</div>
<div class="input-area">
<input
[(ngModel)]="messageText"
(keydown.enter)="sendMessage()"
placeholder="Введите сообщение...">
<button (click)="sendMessage()">Отправить</button>
</div>
<div class="status" [class.connected]="isConnected()">
{{ isConnected() ? '🟢 Подключён' : '🔴 Переподключение...' }}
</div>
</div>
`,
})
export class ChatComponent implements OnInit, OnDestroy {
messages = signal<ChatMessage[]>([]);
isConnected = signal(false);
messageText = '';
currentUserId = 'user-1';
private destroy$ = new Subject<void>();
constructor(private ws: WebSocketService) {}
ngOnInit(): void {
this.ws.connect();
// Подписка на сообщения чата
this.ws.on<ChatMessage>('chat:message')
.pipe(takeUntil(this.destroy$))
.subscribe(msg => {
this.messages.update(msgs => [...msgs, msg]);
});
// Подписка на статус подключения
this.ws.on<boolean>('connection:status')
.pipe(takeUntil(this.destroy$))
.subscribe(status => this.isConnected.set(status));
// Уведомления о typing
this.ws.on<{ userId: string; isTyping: boolean }>('chat:typing')
.pipe(takeUntil(this.destroy$))
.subscribe(({ userId, isTyping }) => {
console.log(\`\${userId} \${isTyping ? 'печатает...' : 'перестал печатать'}\`);
});
}
sendMessage(): void {
if (!this.messageText.trim()) return;
this.ws.send('chat:message', {
text: this.messageText,
userId: this.currentUserId,
});
this.messageText = '';
}
onTyping(): void {
this.ws.send('chat:typing', {
userId: this.currentUserId,
isTyping: true,
});
}
ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
}
}

Окно терминала
npm install @stomp/rx-stomp
stomp.service.ts
import { Injectable } from '@angular/core';
import { RxStomp, RxStompConfig } from '@stomp/rx-stomp';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
@Injectable({ providedIn: 'root' })
export class StompService {
private stomp = new RxStomp();
connect(): void {
const config: RxStompConfig = {
brokerURL: 'wss://api.example.com/stomp',
connectHeaders: {
Authorization: \`Bearer \${localStorage.getItem('token')}\`,
},
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
reconnectDelay: 3000,
};
this.stomp.configure(config);
this.stomp.activate();
}
// Подписаться на топик
watch<T>(destination: string): Observable<T> {
return this.stomp.watch(destination).pipe(
map(message => JSON.parse(message.body) as T)
);
}
// Отправить сообщение
publish<T>(destination: string, body: T): void {
this.stomp.publish({
destination,
body: JSON.stringify(body),
});
}
get connected$(): Observable<boolean> {
return this.stomp.connected$;
}
}
// Использование
this.stompService.watch<Notification>('/topic/notifications')
.subscribe(notification => this.showNotification(notification));
this.stompService.publish('/app/chat', { text: 'Hello STOMP!' });

// message-bus.service.ts — внутренняя шина событий
import { Injectable } from '@angular/core';
import { Subject, Observable, filter, map } from 'rxjs';
type EventTypes = 'user:login' | 'user:logout' | 'notification' | 'cart:update';
interface BusEvent<T = unknown> {
type: EventTypes;
data: T;
}
@Injectable({ providedIn: 'root' })
export class MessageBusService {
private bus = new Subject<BusEvent>();
emit<T>(type: EventTypes, data: T): void {
this.bus.next({ type, data });
}
on<T>(type: EventTypes): Observable<T> {
return this.bus.pipe(
filter(event => event.type === type),
map(event => event.data as T),
);
}
}

import { webSocket } from 'rxjs/webSocket';
import { Observable, Subject, timer, EMPTY } from 'rxjs';
import { switchMap, catchError, tap, share } from 'rxjs/operators';
function createReconnectingSocket<T>(url: string): Observable<T> {
return new Observable<T>(observer => {
let socket: WebSocketSubject<T>;
let attempts = 0;
const connect = () => {
socket = webSocket<T>(url);
socket.pipe(
catchError(err => {
attempts++;
const backoff = Math.min(1000 * 2 ** attempts, 30000);
console.log(\`Reconnect in \${backoff}ms (attempt \${attempts})\`);
return timer(backoff).pipe(
switchMap(() => {
connect();
return EMPTY;
})
);
})
).subscribe(observer);
};
connect();
return () => socket?.complete();
}).pipe(share());
}

Симулятор чата с WebSocket в реальном времени: