Skip to content

Latest commit

 

History

History

stefan-baumgartner-the-definitive-guide-to-object-streams-in-nodejs

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 

Исчерпывающее руководство по объектным потокам в Node.js

Перевод заметки Stefan Baumgartner: The Definitive Guide to Object Streams in Node.js. Опубликовано с разрешения автора.

Потоки в Node.js обладают большой силой: в вашем распоряжении асинхронность в работе с вводом и выводом и вы можете преобразовывать данные в независимых этапах. В этом руководстве я расскажу вам о теории и научу, как использовать трансформаторы объектного потока в стиле Gulp.


Когда я находился в процессе написания моей книги FrontEnd инструментирование с Gulp, Bower и Yeoman, я решил не просто объяснять API и варианты использования этих инструментов, но также сосредоточиться на концепциях, лежащих в их основе.

Вы знаете, что в JavaScript, как нигде больше, инструменты и платформы приходят и уходят быстрее, чем вы сможете зарегистрировать домены и Github группы для них. Для Gulp.js одна из самых важных концепций - потоки!

Около 50 лет c потоками

Используя Gulp, вы хотите считывать входные файлы и преобразовывать их в желаемый результат, загружать множество файлов JavaScript и объединять их в один. API Gulp предоставляет ряд методов для чтения, преобразования и записи файлов, причём все они под капотом используют потоки.

Потоки - довольно старая концепция в вычислительной технике, возникшая с ранних дней Unix в 1960-х годах.

Поток представляет собой последовательность данных, поступающих во времени от источника и направляющихся к месту назначения. Типов источников множество: файлы, память компьютера или устройства ввода, такие как клавиатура или мышь.

Как только поток открывается, данные передаются блоками (chunks) из своего источника в процесс, потребляющий их. Поступая из файла, каждый символ или байт считывается по одному; поступая с клавиатуры, каждое нажатие клавиши передаёт данные потоком.

Самое большое преимущество по сравнению с одновременной загрузкой всех данных состоит в том, что теоретически входные данные могут быть бесконечными и без ограничений.

Например, возвращаясь к клавиатуре, в этом есть смысл: почему кто-то должен закрывать входной поток, который вы используете для управления своим компьютером?

Входные потоки также называются читаемыми потоками, что означает, что они предназначены для чтения данных из источника. С другой стороны, есть исходящие потоки или пункты назначения. Они также могут быть файлами или местом в памяти, но также и устройствами вывода, такими как командная строка, принтер или экран. Их также называют записываемыми потоками, что означает, что они предназначены для хранения данных, поступающих потоком. На рисунке ниже показано, как работают потоки.

Данные представляют собой последовательность элементов, поступающих во времени (например, символы или байты).

Читаемые потоки могут исходить из разных источников, таких как устройства ввода (клавиатура), файлы или данные, хранящиеся в памяти. Записываемые потоки также могут заканчиваться в разных местах, таких как файлы и память, а также в командной строке. Читаемые и записываемые потоки могут быть взаимозаменяемы: ввод с клавиатуры может оказаться в файле, ввод файла в командной строке.

Мало того, что возможно бесконечное количество входных данных, но вы также можете комбинировать различные читаемые и записываемые потоки. Данные нажатой клавиши могут быть напрямую сохранены в файл или вы можете вывести считываемый файл в командной строке или даже подключённом принтере. Интерфейс остаётся неизменным независимо от того, какие источники или места назначения.

Самая простая программа в Node.js, использующая потоки, - пробрасывание (piping) стандартного ввода нажатия клавиши клавиатуры в стандартный вывод (консоль):

process.stdin.pipe(process.stdout);

Мы берём наш читаемый поток (process.stdin) и пробрасываем его в записываемый поток (process.stdout). Как было сказано ранее, мы можем передавать любой контент из любого читаемого источника в любое записываемое место назначения.

Возьмите, например, пакет request, где вы можете выполнить HTTP-запрос к URL-адресу. Почему бы не получить (fetching) какую-либо страницу в Интернете и не вывести её в process.stdout (прим. пер.: в оригинале ошибочно указан process.stdin)?

const request = require('request');

request('https://fettblog.eu').pipe(process.stdout);

Вывод в консоли HTML-страницы может быть не очень полезен, но думайте об этом, как о пробрасывании данных в файл для веб-скрейпинга.

Преобразование данных

Потоки хороши не только для передачи данных между различными источниками и адресатами.

Когда после открытия потока данные становятся доступны, разработчики могут преобразовывать данные, поступающие из потока, до того, как они достигнут своего адресата, например, преобразовывая все символы нижнего регистра в файле в символы верхнего регистра.

Это одна из величайших сил потоков. Как только поток открывается и вы можете прочитать данные по частям, вы можете вставить различные программы в промежутке. Этот процесс показан на рисунке ниже.

Чтобы изменить данные, вы добавляете преобразующие блоки между входом и выходом.

В этом примере вы получаете входные данные из разных источников и передаёте их через преобразующий toUpperCase. Это изменяет символы нижнего регистра на их эквивалент в верхнем регистре. Эти блоки могут быть созданы один раз и использоваться повторно для разных входов и выходов.

В следующем примере мы определяем функцию toUpperCase, которая... преобразует каждую букву в её эквивалент в верхнем регистре. Есть много способов создать эту функциональность, но я всегда был большим поклонником потоковых пакетов Node.js, таких как through2. Они предлагают хорошую оболочку для создания новых трансформаторов:

const through2 = require('through2');

const toUpperCase = through2((data, enc, cb) => {      /* 1 */
  cb(null, new Buffer(data.toString().toUpperCase())); /* 2 */
});

process.stdin.pipe(toUpperCase).pipe(process.stdout);  /* 3 */
  1. Пакет through2 принимает функцию в качестве первого параметра. Эта функция принимает данные (в буфере), информацию о кодировании и функцию обратного вызова, которую мы можем вызвать, как только закончим наше преобразование.
  2. Обычно в потоках Node.js мы передаём Buffers с данными из потока. Исходя из process.stdin, это, скорее всего, текущая строка до того, как мы нажали Return. Исходя из файла, фактически это может быть что угодно. Мы преобразуем текущий буфер в строку, создаём прописную версию и снова конвертируем её в буфер. Функция обратного вызова принимает два аргумента. Первый - возможная ошибка. Поток будет аварийно завершён и программа остановит выполнение, если для отлавливания ошибки нет слушателя события end. Передаём null, если нас это устраивает. Второй аргумент - преобразованные данные.
  3. Мы можем использовать этот трансформатор и прокинуть в него входные данные от читаемого потока. Преобразованные данные пересылаются в наш записываемый поток.

Это полностью в духе функционального программирования. Мы можем использовать и переиспользовать один и тот же трансформатор для любого другого ввода или вывода, если он поступает из читаемого потока. Нам не важен входной источник или адресат. Кроме того, мы не ограничены одним трансформатором. Мы можем связать (chain) столько трансформаторов, сколько пожелаем:

const through2 = require('through2');

const toUpperCase = through2((data, enc, cb) => {
  cb(null, new Buffer(data.toString().toUpperCase()));
});

const dashBetweenWords = through2((data, enc, cb) => {
  cb(null, new Buffer(data.toString().split(' ').join('-')));
});

process.stdin
  .pipe(toUpperCase)
  .pipe(dashBetweenWords)
  .pipe(process.stdout);

Если вы работали с Gulp, приведенный выше код должен казаться вам знакомым. Очень похоже, не правда ли? Однако потоки Gulp различаются в одном специфичном вопросе: мы не передаём данные в буфере, мы используем старые добрые JavaScript объекты.

Объектные потоки

Обычно в стандартных потоках файл рассматривается как возможный входящий источник реальных данных, который необходимо обработать. Вся информация о происхождении, подобно пути или имени файла, теряется после открытия потока.

В Gulp вы работаете не только с содержимым одного или нескольких файлов, вам также нужно имя файла и информация файловой системы о его происхождении.

Представьте 20 файлов JavaScript, которые вы хотите минимизировать. Вы должны запомнить каждое имя файла отдельно и отслеживать, какие данные относятся к каждому из них, чтобы восстановить связь после того, как результат (миниатюрные файлы с таким же именем) должен быть сохранён.

К счастью, Gulp позаботился об этом за вас, создав и новый источник ввода, и тип данных, который можно использовать для ваших потоков: виртуальные файловые объекты.

После открытия Gulp потока все исходные физические файлы оборачиваются в виртуальный файловый объект и обрабатываются в виртуальной файловой системе - Vinyl (соответствующее программное обеспечение вызывается в Gulp).

Vinyl объекты, файловые объекты вашей виртуальной файловой системы, содержат два типа информации: путь, из которого был создан файл, становящийся именем файла, а также поток, отображающий содержимое файла. Эти виртуальные файлы хранятся в памяти вашего компьютера, известной как самый быстрый способ обработки данных.

Именно там происходят все изменения, которые обычно совершаются на вашем жёстком диске. Сохраняя все в памяти и не требуя дорогостоящих операций чтения и записи между процессами, Gulp может вносить изменения чрезвычайно быстро.

Внутри Gulp использует объектные потоки, чтобы выдавать файл за файлом в конвейер обработки. Потоки объектов ведут себя как обычные потоки, но вместо буферов и строк мы проходим через старые добрые объекты JavaScript.

Мы можем создать наш собственный читаемый объектный поток, используя пакет readable-stream:

const through2 = require('through2');
const Readable = require('readable-stream').Readable;

const stream = Readable({objectMode: true});   /* 1 */
stream._read = () => {};                       /* 2 */

setInterval(() => {                            /* 3 */
  stream.push({
    x: Math.random()
  });
}, 100);

const getX = through2.obj((data, enc, cb) => { /* 4 */
  cb(null, `${data.x.toString()}\n`);
});

stream.pipe(getX).pipe(process.stdout);        /* 5 */
  1. Для создания читаемого объекта важно установить флаг objectMode в значение true. При этом поток может передавать объекты JavaScript через конвейер. В противном случае предполагается использование буферов или строк.
  2. Каждому потоку нужна функция _read. Эта функция вызывается, когда поток проверяет данные. Это правильное место, чтобы запустить другие механизмы и вставить обновлённое содержимое в поток. Поскольку мы вставляем данные извне, нам не нужна эта функция, и мы можем оставить её пустой. Однако читаемые потоки должны её реализовывать, иначе мы получим ошибку.
  3. Здесь мы заполняем поток демонстрационными данными. Каждые 100 миллисекунд мы вставляем в наш поток объект со случайным числом.
  4. Так как мы хотим передать результаты объектного потока в process.stdout, а process.stdout принимает только строки, у нас имеется небольшой трансформатор, где мы извлекаем свойство из переданного JavaScript объекта.
  5. Мы создаём конвейер. Наш читаемый объектный поток передаёт все свои данные в трансформатор getX и, наконец, в записываемый process.stdout.

Примечание о пакетах для работы с потоками в Node.js

Вы могли заметить, что для работы с потоками мы используем различные npm-пакеты. Разве это не странно? Потоки очень важны для асинхронного ввода-вывода, не должны ли они быть частью ядра Node.js?

Тем не менее, ядро потоковой передачи постоянно подвергалось изменениям в старые 0.x дни Node.js, поэтому сообщество вмешалось и создало прочный и стабильный API вокруг базовых пакетов. При семантическом версионировании вы можете быть уверены, что экосистема потоков в вашем приложении чувствует себя прекрасно.

Достаточно демо. Давайте сделаем что-нибудь настоящее

Хорошо! Давайте рассмотрим небольшое приложение, считывающее данные CSV и сохраняющее их в JSON. Мы хотим использовать объектные потоки, потому что в некоторых случаях мы можем захотеть изменить данные в зависимости от ситуации. Поскольку потоки очень крутые и позволяют нам это, мы хотим иметь возможность выводить результат в различных форматах.

Во-первых, мы устанавливаем несколько пакетов:

const through2 = require('through2');
const fs = require('fs');
const split = require('split2');
  1. through2 мы уже знаем. Его мы используем для создания всех наших трансформаторов.
  2. Пакет fs очевидно предназначен для чтения и записи файлов. Отличная новость: он позволяет создавать читаемые потоки! Именно то, что нам нужно.
  3. Поскольку вы никогда не знаете, как данные из fs.createReadStream загружаются в вашу память, пакет split2 гарантирует, что вы можете обрабатывать данные построчно. Обратите внимание на «2» в имени этого трансформатора. Она говорит вам, что он является частью более крупной семантически версионируемой экосистемы.

Парсим CSV!

CSV отлично подходит для синтаксического анализа, поскольку он следует за очень простому для понимания формату: запятая означает новый столбец, линия - новую строку.

Легко.

В этом примере первая строка всегда является направляющей для наших данных. Поэтому мы хотим обработать её особым образом: она предоставит ключи для наших объектов JSON.

const parseCSV = () => {
  let templateKeys = [];
  let parseHeadline = true;
  return through2.obj((data, enc, cb) => {       /* 1 */
    if (parseHeadline) {
      templateKeys = data.toString().split(',');
      parseHeadline = false;
      return cb(null, null);                     /* 2 */
    }

    const entries = data.toString().split(',');
    const obj = {};

    templateKeys.forEach((el, index) => {       /* 3 */
      obj[el] = entries[index];
    });

    return cb(null, obj);                       /* 4 */
  });
};
  1. Мы создаём трансформатор для объектного потока. Обратите внимание на метод .obj. Даже если ваши входные данные - это просто строки, вам нужен трансформатор объектного потока, если вы хотите продолжать выпускать объекты.
  2. В этом блоке мы парсим направляющую строку (разбитую на запятые). Это будет наш шаблон для ключей. Мы удаляем эту строку из потока, поэтому передаём null оба раза.
  3. Для всех остальных строк каждый объект мы создаём с помощью шаблона ключей, который мы распарсили ранее.
  4. Мы передаём этот объект на следующий этап.

Это все, что нужно для создания JavaScript объекта из CSV-файла!

Изменение и адаптация данных

Когда у нас есть все возможности объектов, мы можем намного проще преобразовать данные. Удалять свойства и добавлять новые; осуществлять filter, map и reduce. Все, что вы любите. Пример этого не хочется усложнять: выберите первые 10 записей:

const pickFirst10 = () => {
  let cnt = 0;
  return through2.obj((data, enc, cb) => {
    if (cnt++ < 10) {
      return cb(null, data);
    }
    return cb(null, null);
  });
};

Как и в предыдущем примере: Передача данных для второго аргумента обратного вызова означает, что мы сохраняем элемент в потоке. Передача null означает, что мы отбрасываем данные. Это важно для фильтров!

Промываем в JSON

Вы знаете, что означает JSON? Нотация (запись) объектов JavaScript (JavaScript object notation). Это замечательно, потому что у нас есть объекты JavaScript, и мы можем записать их в строковом представлении!

Итак, что мы хотим сделать с объектами в нашем потоке, - это собрать все проходящие объекты и сохранить их в одно строковое представление. Сразу приходит в голову JSON.stringify.

Одна важная вещь, которую вы должны знать при работе с потоками, состоит в том, что после того, как объект (или данные буфера, если это важно) проходит через ваш трансформатор к следующему этапу, в этом этапе он более не доступен.

Это означает, что вы можете передавать объекты только на один записываемый поток. Существует, однако, способ сбора данных и выполнения с ними каких-либо иных действий. Если в потоке больше нет данных, каждый трансформатор вызывает метод flush (промывать).

Подумайте о раковине, наполняющейся жидкостями.

Вы не можете выбрать каждую её каплю и проанализировать её снова. Но вы можете промыть все это до следующего этапа. Это то, что мы делаем в следующем трансформаторе - toJSON:

const toJSON = () => {
  let objs = [];
  return through2.obj(function(data, enc, cb) {
    objs.push(data);                              /* 1 */
    cb(null, null);
  }, function(cb) {                               /* 2 */
    this.push(JSON.stringify(objs));
    cb();
  });
};
  1. Мы собираем все проходящие данные в массив и удаляем объекты из нашего потока.
  2. Во второй функции обратного вызова, метод flush, мы преобразуем собранные данные в строку JSON. С помощью this.push мы помещаем этот новый объект в следующий этап нашего потока. В этом примере новый «объект» - просто строка - что-то, что совместимо с обычными записываемыми потоками!

Gulp, например, использует это поведение при работе с плагинами конкатенации. Чтение всех файлов в первой стадии, а затем очистка одного файла для следующего этапа.

Объединяем все вместе

Снова приходит в голову функциональное программирование: каждый трансформатор, который мы написали, полностью отделен от остальных. И они идеально подходят для разных сценариев, независимо от входных данных или формата вывода.

Единственные ограничения в формате CSV (первая строка - это заголовок) и в том, что pickFirst10 и toJSON нуждаются в объектах JavaScript в качестве входных данных. Давайте объединим их и выведем первые десять записей в JSON в стандартную консоль:

const stream = fs.createReadStream('sample.csv');

stream
  .pipe(split())
  .pipe(parseCSV())
  .pipe(pickFirst10())
  .pipe(toJSON())
  .pipe(process.stdout);

Отлично! Мы столько всего можем передать различным записываемым потокам. В Node.js основной интерфейс ввода-вывода (IO) совместим с потоками. Поэтому давайте использовать быстрый HTTP-сервер и пробрасывать все в Интернет:

const http = require('http');

// All from above
const stream = fs.createReadStream('sample.csv')
  .pipe(split())
  .pipe(parseCSV())
  .pipe(pickFirst10())
  .pipe(toJSON())

const server = http.createServer((req, res) => {
  stream.pipe(res);
});

server.listen(8000);

Это великая сила потоков Node.js. У вас есть асинхронный способ обработки ввода и вывода, и вы можете преобразовывать данные в независимых шагах. С объектными потоками вы можете использовать объекты JavaScript, которые вы знаете и которые любите преобразовывать.

Это основа Gulp как потоковой системы сборки, но также отличный инструмент для повседневной разработки.

Дальнейшее чтение

Если вас зацепили потоки, я могу порекомендовать несколько ресурсов:

  • FrontEnd инструментирование с Gulp, Bower и Yeoman. В поздних главах там можно найти несколько больших глав об инструментах для работы с потоками, например, для слияния потоков.
  • Fettblog. Мой блог содержит множество статей о Gulp и плагинах Gulp. Поскольку все плагины Gulp пишутся как объектные потоки, из них вы что-нибудь да почерпнёте.
  • Руководство по потокам от Substack. Также известен как основной источник информации о потоках.
  • Rod Vagg на потоках ядра. Старый, но все же хороший: объяснение Rod Vagg о том, зачем использовать npm-пакеты для потоков.

Слушайте наш подкаст в iTunes и SoundCloud, читайте нас на Medium, контрибьютьте на GitHub, общайтесь в группе Telegram, следите в Twitter и канале Telegram, рекомендуйте в VK и Facebook.

Статья на Medium