StreamをEvent Handlerを使ってPromiseに変換して成否を待ったり途中で止めたりする
先日 Stream の練習をして分かった気になっていましたが、ダメでした。何がダメだったか。(そもそも普段 Node.js を書いていないのがダメなんだけど。)
- 「結局 EventEmitter」って自分で書いといて結果を同期的に「待って」いた1
- Stream が Node.js 標準 Stream の Event Emitter を実装しているとは限らない
1 については今回は例によって Promise によって解決します。
2 は具体例を挙げておしまいです。ちゃんと調べろ、に尽きます。
StreamのEvent Listenerでエラーを取得する
Node.js Streams: Everything you need to know – freeCodeCamp.org
ここによくまとまっていて、
Stream#on('error', fn)
で書けそうなことが分かる。
(new Reable())
.pipe(new Writable())
.on('error', () => {
..
})
で書ける。ただし、取得して中に処理を書けるだけで返せない。ということは外から処理を与える callback 地獄だ。これは困る。
StreamをPromiseにしてエラー処理のクチを外に出す
イベントと callback が困るなら解決策は Promise になる。たぶん雰囲気はこんな感じだ。
stream2promise(data) {
return new Promise((resolve, reject) => {
(new Reable())
.pipe(new Writable())
.on('error', (err) => {
return reject(err)
})
.on('finish', () => {
resolve()
})
})
}
うん、なんかイケそう。
ところが実際にはすべての Stream でエラーは起きるし、'error' event そのものは stream 上を伝播したりしないし、'error' event が emit されても処理が止まるわけではないので、これでは不十分。
Stream | Node.js v10.12.0 Documentation
The stream is not closed when the 'error' event is emitted.
具体的には Reable Stream で 'error' が起きた場合でも stream 自体を cancel する人がいなければ Writable Stream の 'finish' event は emit されてしまう。ということは 'error' が起きようが何しようが resolve() が呼ばれる、つまり正常終了してしまうということだ。「'error' event は起きているが正常終了する。」これは期待する動作ではない。
Streamをちゃんと止める
そのためには各 Stream に対してきちんとエラー処理が必要ということになる。
ということでできあがりは以下のようになる。
stream2promise(data) {
return new Promise((resolve, reject) => {
const reable = new Reable()
const writable = new Writable()
reable.pipe(writable)
reable.on('error', (err) {
reable.unpipe() // 以降の Stream で余計な event が emit されないように
return reject(err) // 起きたエラーを reject に渡す
})
writable.on('error', (err) {
return reject(err)
})
writable.on('finish', () => {
resolve() // ここに何を渡すかは思案のしどころ
})
reable.push(data)
reable.push(null) // これは標準の Stream の API に従うとこうなる
})
}
こんな感じ。
うーん、Promise は呼ぶ側はまだ比較的マシだけど、中身はアレな感じになりやすいねぇ。まー何はともあれ、これで以下のように Stream 処理を Promise で受け取れるようになる。
stream2promise(data)
.catch((err) => {
..
}).then(() => {
..
})
StreamのEventEmitter実装状況には注意が必要
例えば今回書いたコードは Node.js 標準の Stream を基本にしている。前回、「readable#push(null) とか普通書かないだろ」みたいなことを言っていたが、あれは間違いだったことが分かった。実は前回書いたコードでは今回のようなことはできないのだ。
前回は
- memory-streams - npm
- process.stdout
を利用したが、
- memory-streams.ReadableStream.on('end') は emit されない
- ということは WritableStream.on('finish') も emit されない2
- process.stdout.on('finish') は emit されない
つまり、意図通りに正常終了を判定するのは難しいです。
また memory-streams と似たような機能を持つ
を利用すると stream 処理できない object を String に無理やり変換してしまうので [object Object] が Writable Stream に渡ってしまう。
「メモリの中身を Stream っぽく扱えるコードはすぐできそうじゃね?」と思っていたけど、自分がそんな発想をするくらいなのでやはりカジュアルなコードが多いみたいで、ちゃんと標準の API を使った方がいいなと思い直しましたとさ。