StreamをEvent Handlerを使ってPromiseに変換して成否を待ったり途中で止めたりする

先日 Stream の練習をして分かった気になっていましたが、ダメでした。何がダメだったか。(そもそも普段 Node.js を書いていないのがダメなんだけど。)

  1. 「結局 EventEmitter」って自分で書いといて結果を同期的に「待って」いた1
  2. Stream が Node.js 標準 Stream の Event Emitter を実装しているとは限らない

1 については今回は例によって Promise によって解決します。

2 は具体例を挙げておしまいです。ちゃんと調べろ、に尽きます。

StreamのEvent Listenerでエラーを取得する

Node.js Streams: Everything you need to know – freeCodeCamp.org

ここによくまとまっていて、

Stream#on('error', fn)

で書けそうなことが分かる。

(new Reable())
.pipe(new Writable())
.on('error', () => {
  ..
})

で書ける。ただし、取得して中に処理を書けるだけで返せない。ということは外から処理を与える callback 地獄だ。これは困る。

StreamをPromiseにしてエラー処理のクチを外に出す

イベントと callback が困るなら解決策は Promise になる。たぶん雰囲気はこんな感じだ。

stream2promise(data) {
  return new Promise((resolve, reject) => {
    (new Reable())
    .pipe(new Writable())
    .on('error', (err) => {
      return reject(err)
    })
    .on('finish', () => {
      resolve()
    })
  })
}

うん、なんかイケそう。

ところが実際にはすべての Stream でエラーは起きるし、'error' event そのものは stream 上を伝播したりしないし、'error' event が emit されても処理が止まるわけではないので、これでは不十分。

Stream | Node.js v10.12.0 Documentation

The stream is not closed when the 'error' event is emitted.

具体的には Reable Stream で 'error' が起きた場合でも stream 自体を cancel する人がいなければ Writable Stream の 'finish' event は emit されてしまう。ということは 'error' が起きようが何しようが resolve() が呼ばれる、つまり正常終了してしまうということだ。「'error' event は起きているが正常終了する。」これは期待する動作ではない。

Streamをちゃんと止める

そのためには各 Stream に対してきちんとエラー処理が必要ということになる。

ということでできあがりは以下のようになる。

stream2promise(data) {
  return new Promise((resolve, reject) => {
    const reable   = new Reable()
    const writable = new Writable()

    reable.pipe(writable)

    reable.on('error', (err) {
      reable.unpipe()     // 以降の Stream で余計な event が emit されないように
      return reject(err)  // 起きたエラーを reject に渡す
    })

    writable.on('error', (err) {
      return reject(err)
    })

    writable.on('finish', () => {
      resolve()  // ここに何を渡すかは思案のしどころ
    })

    reable.push(data)
    reable.push(null) // これは標準の Stream の API に従うとこうなる
  })
}

こんな感じ。

うーん、Promise は呼ぶ側はまだ比較的マシだけど、中身はアレな感じになりやすいねぇ。まー何はともあれ、これで以下のように Stream 処理を Promise で受け取れるようになる。

stream2promise(data)
.catch((err) => {
  ..
}).then(() => {
  ..
})

StreamのEventEmitter実装状況には注意が必要

例えば今回書いたコードは Node.js 標準の Stream を基本にしている。前回、「readable#push(null) とか普通書かないだろ」みたいなことを言っていたが、あれは間違いだったことが分かった。実は前回書いたコードでは今回のようなことはできないのだ。

前回は

を利用したが、

  • memory-streams.ReadableStream.on('end') は emit されない
    • ということは WritableStream.on('finish') も emit されない2
  • process.stdout.on('finish') は emit されない

つまり、意図通りに正常終了を判定するのは難しいです。

また memory-streams と似たような機能を持つ

を利用すると stream 処理できない object を String に無理やり変換してしまうので [object Object] が Writable Stream に渡ってしまう。

「メモリの中身を Stream っぽく扱えるコードはすぐできそうじゃね?」と思っていたけど、自分がそんな発想をするくらいなのでやはりカジュアルなコードが多いみたいで、ちゃんと標準の API を使った方がいいなと思い直しましたとさ。

  1. 以前 Promise を使って解決したのと同じなのに 

  2. readが終わらないとwriteも終わらないようだ。実際の処理が終わっていてもこれを検知することができないし、もしかしたらモノによってはwritable streamの動作に支障が出るかもしれない。 

More