Node.jsのStreamのpipeの練習

(他の言語での成り立ちなど)Streamの概念的な話ではなく、とりあえず Node.js の Stream を使ったコードが動くまでが分かりにくかったので練習したよ、ってだけの話。1

特に、例題として上がりやすいのはファイルを読み込みながらファイルの書き出しや HTTP レスポンスを返すとかそういうやつなんだけど、その説明は Node.js のよくある用途的には分からなくはないけど、自分の場合はそういうことよりもメモリの中身と stdout の方が分かりやすかったのでそれでやってみて、自分なりにまとめてみた。

※ 分かりやすさを重視してコードを触っていましたが、やはりダメでした。続きは StreamをEventHandlerを使ってPromiseに変換して成否を待ったり途中で止めたりする で。

Streamのメリット

大きなデータを小分けに扱うことができるので、Node.js の特徴であるシングルスレッド、イベントドリブン、ノンブロッキングI/Oの特徴を活かしやすい。

もちろんメモリ消費も抑えることができる。

Streamの種類

Stream には種類があり、以下の3つが基本。(Duplex はあえて省略。)

  • Readable
  • Writable
  • Transform

基本的な使い方と罠

誇張や嘘はあるかもしれないが、以下のように割り切っておくと分かりやすい。

  • Node.js では Stream は結局のところ EventEmitter である
  • Read Event が発生しないと次に処理が進まない

つまり、なんらかの出力を行いたい場合は最低でも Readable Stream と Writable Stream をワンセットで使う必要がある。

※ 出力を行わない場合は Readable Stream で渡ってくる chunk を淡々と処理するのでもよいが、今回はそれは扱わない。

メモリの内容をstdoutにstreamしてみる

最低限必要な準備はこんな感じ。

const {Readable} = require('stream')

const readable = new Readable()
readable.pipe(process.stdout)

これで「Readable Stream にデータが渡ってきたら STDOUT に出力するよー」という意味になる。

実は Node.js では process.stdin が Readable Stream で process.stdout が Writable Stream なので、Unix パイプを awk っぽく扱うのに向いている。ただし process.stdin を process.stdout に pipe するだけだと Node.js 要らないじゃんて感じになるので、あくまで Node.js の中のメモリの情報を stdout に出力してみることとする。

というわけで Readable Stream にデータを送ってやる。

readable.push("Hello World\n")
readable.push(null)

これでできあがり。無事に STDOUT に Hello World と表示される。

全体像は以下のようになる。

const {Readable} = require('stream')

// stream の構築
const readable = new Readable()
readable.pipe(process.stdout)

// stream にデータを流す
readable.push("Hello World\n")
readable.push(null)

もうちょっと本格的にメモリの内容をReadable Streamにする

実際には Readable Stream にわざわざ push しなきゃいけないシーンはないというか、こういう書き方は書く順番に依存しすぎてるし、おまじないじみててよろしくないので、実際にやる場合はもうちょっとなんとかした方がよい。

そこで以下を追加する。(似たような npm はいくつもあるが、これが人気っぽい)

memory-streams - npm

実際のコードはこんな感じになる。

const streams = require('memory-streams')

(new streams.ReadableStream(
  JSON.stringify({
    foo: 'bar'
  }) + "\n"))
  .pipe(WritableStream)

これで Writable Stream しか用意されていない pkgcloud/pkgcloud の Storage のようなものにもメモリから直接書き込むコードを分かりやすく書くことができるようになる。

ただし、メモリの内容を Stream で扱う際に注意しなければいけないのは、Stream は小分けに順番に処理するためのものなので、Array や String など順番が決まっているものしか扱えないということ。そのため上の例では Object を JSON に変換しているが、もし元の Object そのものが巨大な場合はその部分で小分けにするのは不可能である。

※ JSONStream という package もあるが、これはあくまで巨大なデータに対する JSON.parse を Stream ベースで行うためのものと割り切った方がよさそう。JSON.stringify を効率的に行うにはいずれにせよ Array を基本に置く必要がある。

参考

ついでにブラウザ周りも

ブラウザーは Fetch や XHR などの操作を完了前に中止させることができる AbortController および AbortSignal インターフェイス(つまり Abort API)に実験的に対応し始めています。詳しくはインターフェイスのページを参照してください。

2018-09-24 確認時点で

IE は非対応、Firefox は Stream には一部設定で対応、という状況らしい。

こういう微妙な差異をすべて開発者でフォローするのは大変つらい。

  1. 例によって3, 4年遅れの話なんだけど、まぁ避け続けてた Node.js でこの程度の遅れならだいぶマシな気がしている。 

More