8. Streams и Buffers

Streams позволяют работать с данными по частям, не загружая всё в память. Идеально для больших файлов, загрузок, видео.
Buffer — бинарные данные
Заголовок раздела «Buffer — бинарные данные»// Buffer — область памяти для бинарных данных
// Создание Bufferconst buf1 = Buffer.from('Привет, мир!'); // из строкиconst buf2 = Buffer.from([72, 101, 108, 108, 111]); // из массива байтconst buf3 = Buffer.alloc(10); // пустой, 10 байтconst buf4 = Buffer.alloc(10, 0xff); // заполненный 0xff
// Конвертацияconsole.log(buf1.toString()); // Привет, мир!console.log(buf1.toString('base64')); // 0J/RgNC40LLQtdGC...console.log(buf1.toString('hex')); // d09fd180d0b8d0b2...
// Размерconsole.log(buf1.length); // длина в байтах (не символах!)// Кирилличный символ = 2 байта в UTF-8
// Объединение буферовconst combined = Buffer.concat([buf1, buf2]);
// Сравнениеconsole.log(Buffer.from('abc').equals(Buffer.from('abc'))); // true
// Копированиеconst copy = Buffer.allocUnsafe(5);buf1.copy(copy, 0, 0, 5);Типы Streams
Заголовок раздела «Типы Streams»Readable — только чтение (файл, HTTP запрос, stdin)Writable — только запись (файл, HTTP ответ, stdout)Duplex — чтение и запись (TCP сокет)Transform — преобразование данных (gzip, crypto)Readable Stream — чтение
Заголовок раздела «Readable Stream — чтение»const { createReadStream } = require('fs');const { Readable } = require('stream');
// Читаем файл по частям (16KB по умолчанию)const readable = createReadStream('./large-file.csv', { encoding: 'utf8', highWaterMark: 64 * 1024, // 64KB чанки});
readable.on('data', (chunk) => { console.log(`Получен чанк: ${chunk.length} байт`);});
readable.on('end', () => { console.log('Файл прочитан!');});
readable.on('error', (err) => { console.error('Ошибка чтения:', err);});
// Создание своего Readableconst myReadable = new Readable({ read(size) { this.push('chunk 1\n'); this.push('chunk 2\n'); this.push(null); // конец потока }});
myReadable.pipe(process.stdout);Writable Stream — запись
Заголовок раздела «Writable Stream — запись»const { createWriteStream } = require('fs');const { Writable } = require('stream');
const writable = createWriteStream('./output.txt');
writable.write('Первая строка\n');writable.write('Вторая строка\n');writable.end('Последняя строка\n');
writable.on('finish', () => { console.log('Запись завершена!');});
// Свой Writable (например, для логирования)class Logger extends Writable { _write(chunk, encoding, callback) { process.stdout.write(`[LOG] ${chunk}`); callback(); }}
const logger = new Logger();logger.write('Привет!\n');logger.write('Мир!\n');pipe() — соединение потоков
Заголовок раздела «pipe() — соединение потоков»const { createReadStream, createWriteStream } = require('fs');const { createGzip, createGunzip } = require('zlib');const { pipeline } = require('stream/promises');
// pipe — самый простой способcreateReadStream('./input.txt') .pipe(createWriteStream('./output.txt'));
// Сжатие файла gzipcreateReadStream('./data.csv') .pipe(createGzip()) .pipe(createWriteStream('./data.csv.gz'));
// pipeline — лучше pipe (правильно обрабатывает ошибки)async function compressFile(input, output) { await pipeline( createReadStream(input), createGzip(), createWriteStream(output) ); console.log(`${input} сжат → ${output}`);}
// Распаковкаasync function decompressFile(input, output) { await pipeline( createReadStream(input), createGunzip(), createWriteStream(output) );}Transform Stream — преобразование
Заголовок раздела «Transform Stream — преобразование»const { Transform } = require('stream');
// Преобразование: строки в верхний регистрclass UpperCase extends Transform { _transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); }}
// Преобразование: CSV в JSONclass CSVToJSON extends Transform { constructor() { super({ objectMode: true }); this.headers = null; }
_transform(chunk, encoding, callback) { const lines = chunk.toString().split('\n').filter(Boolean);
for (const line of lines) { const values = line.split(','); if (!this.headers) { this.headers = values; } else { const obj = {}; this.headers.forEach((h, i) => obj[h.trim()] = values[i]?.trim()); this.push(JSON.stringify(obj) + '\n'); } } callback(); }}
// Использованиеconst { createReadStream, createWriteStream } = require('fs');const { pipeline } = require('stream/promises');
await pipeline( createReadStream('./users.csv'), new CSVToJSON(), createWriteStream('./users.jsonl'));Streams в HTTP
Заголовок раздела «Streams в HTTP»const http = require('http');const { createReadStream } = require('fs');const { stat } = require('fs/promises');
const server = http.createServer(async (req, res) => { if (req.url === '/download') { const filePath = './large-video.mp4'; const { size } = await stat(filePath);
res.writeHead(200, { 'Content-Type': 'video/mp4', 'Content-Length': size, // Поддержка Range запросов (для видео) 'Accept-Ranges': 'bytes', });
// Стримим файл — не загружаем в память! createReadStream(filePath).pipe(res); }});Async iteration со streams
Заголовок раздела «Async iteration со streams»const { createReadStream } = require('fs');const readline = require('readline');
// Чтение файла построчноasync function processLines(filename) { const fileStream = createReadStream(filename); const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity, });
let lineCount = 0; for await (const line of rl) { lineCount++; if (line.startsWith('ERROR')) { console.log(`Строка ${lineCount}: ${line}`); } } console.log(`Обработано строк: ${lineCount}`);}
await processLines('./app.log');Практика
Заголовок раздела «Практика»- Прочитай большой текстовый файл через ReadStream и посчитай количество строк
- Создай Transform stream, который заменяет все числа в тексте на
[NUMBER] - Реализуй сжатие файла:
input.txt→input.txt.gz - Создай HTTP сервер, который стримит видео файл с поддержкой Range запросов
- Напиши скрипт для построчной обработки CSV файла через readline