2018-09-29

StreamをEvent Handlerを使ってPromiseに変換して成否を待ったり途中で止めたりする

先日 Stream の練習をして分かった気になっていましたが、ダメでした。何がダメだったか。(そもそも普段 Node.js を書いていないのがダメなんだけど。)

  1. 「結局 EventEmitter」って自分で書いといて結果を同期的に「待って」いた1
  2. Stream が Node.js 標準 Stream の Event Emitter を実装しているとは限らない

1 については今回は例によって Promise によって解決します。

2 は具体例を挙げておしまいです。ちゃんと調べろ、に尽きます。

StreamのEvent Listenerでエラーを取得する

Node.js Streams: Everything you need to know – freeCodeCamp.org

ここによくまとまっていて、

Stream#on('error', fn)

で書けそうなことが分かる。

(new Reable())
.pipe(new Writable())
.on('error', () => {
  ..
})

で書ける。ただし、取得して中に処理を書けるだけで返せない。ということは外から処理を与える callback 地獄だ。これは困る。

StreamをPromiseにしてエラー処理のクチを外に出す

イベントと callback が困るなら解決策は Promise になる。たぶん雰囲気はこんな感じだ。

stream2promise(data) {
  return new Promise((resolve, reject) => {
    (new Reable())
    .pipe(new Writable())
    .on('error', (err) => {
      return reject(err)
    })
    .on('finish', () => {
      resolve()
    })
  })
}

うん、なんかイケそう。

ところが実際にはすべての Stream でエラーは起きるし、'error' event そのものは stream 上を伝播したりしないし、'error' event が emit されても処理が止まるわけではないので、これでは不十分。

Stream | Node.js v10.12.0 Documentation

The stream is not closed when the 'error' event is emitted.

具体的には Reable Stream で 'error' が起きた場合でも stream 自体を cancel する人がいなければ Writable Stream の 'finish' event は emit されてしまう。ということは 'error' が起きようが何しようが resolve() が呼ばれる、つまり正常終了してしまうということだ。「'error' event は起きているが正常終了する。」これは期待する動作ではない。

Streamをちゃんと止める

そのためには各 Stream に対してきちんとエラー処理が必要ということになる。

ということでできあがりは以下のようになる。

stream2promise(data) {
  return new Promise((resolve, reject) => {
    const reable   = new Reable()
    const writable = new Writable()

    reable.pipe(writable)

    reable.on('error', (err) {
      reable.unpipe()     // 以降の Stream で余計な event が emit されないように
      return reject(err)  // 起きたエラーを reject に渡す
    })

    writable.on('error', (err) {
      return reject(err)
    })

    writable.on('finish', () => {
      resolve()  // ここに何を渡すかは思案のしどころ
    })

    reable.push(data)
    reable.push(null) // これは標準の Stream の API に従うとこうなる
  })
}

こんな感じ。

うーん、Promise は呼ぶ側はまだ比較的マシだけど、中身はアレな感じになりやすいねぇ。まー何はともあれ、これで以下のように Stream 処理を Promise で受け取れるようになる。

stream2promise(data)
.catch((err) => {
  ..
}).then(() => {
  ..
})

StreamのEventEmitter実装状況には注意が必要

例えば今回書いたコードは Node.js 標準の Stream を基本にしている。前回、「readable#push(null) とか普通書かないだろ」みたいなことを言っていたが、あれは間違いだったことが分かった。実は前回書いたコードでは今回のようなことはできないのだ。

前回は

を利用したが、

  • memory-streams.ReadableStream.on('end') は emit されない
    • ということは WritableStream.on('finish') も emit されない2
  • process.stdout.on('finish') は emit されない

つまり、意図通りに正常終了を判定するのは難しいです。

また memory-streams と似たような機能を持つ

を利用すると stream 処理できない object を String に無理やり変換してしまうので [object Object] が Writable Stream に渡ってしまう。

「メモリの中身を Stream っぽく扱えるコードはすぐできそうじゃね?」と思っていたけど、自分がそんな発想をするくらいなのでやはりカジュアルなコードが多いみたいで、ちゃんと標準の API を使った方がいいなと思い直しましたとさ。

  1. 以前 Promise を使って解決したのと同じなのに 

  2. readが終わらないとwriteも終わらないようだ。実際の処理が終わっていてもこれを検知することができないし、もしかしたらモノによってはwritable streamの動作に支障が出るかもしれない。 

About

例によって個人のなんちゃらです

Recent Posts

Categories

Tool 日々 Web Biz Net Apple MS ことば News Unix howto Food PHP Movie Edu Community Book Security Text TV Perl Ruby Music Pdoc 生き方 RDoc ViewCVS CVS Rsync Disk Mail FreeBSD Cygwin PDF Photo Zebedee Debian OSX Comic Cron Sysadmin Font Analog iCal Sunbird DNS Linux Wiki Emacs Thunderbird Sitecopy Terminal Drawing tDiary AppleScript Life Money Omni PukiWiki Xen XREA Zsh Screen CASL Firefox Fink zsh haXe Ecmascript PATH_INFO SQLite PEAR Lighttpd FastCGI Subversion au prototype.js jsUnit Apache Trac Template Java Rhino Mochikit Feed Bloglines CSS del.icio.us SBS qwikWeb gettext Ajax JSDoc Rails HTML CHM EPWING NDTP EB IE CLI ck ThinkPad Toy WSH RFC readline rlwrap ImageMagick epeg Frenzy sysprep Ubuntu MeCab DTP ERD DBMS eclipse Eclipse Awk RD Diigo XAMPP RubyGems PHPDoc iCab DOM YAML Camino Geekmonkey w3m Scheme Gauche Lisp JSAN Google VMware DSL SLAX Safari Markdown Textile IRC Jabber Fastladder MacPorts LLSpirit CPAN Mozilla Twitter OpenFL Rswatch ITS NTP GUI Pragger Yapra XML Mobile Git Study JSON VirtualBox Samba Pear Growl Mercurial Rack Capistrano Rake Win RSS Mechanize Sitemaps Android JavaScript Python RTM OOo iPod Yahoo Unicode Github iTunes God SBM friendfeed Friendfeed HokuUn Sinatra TDD Test Project Evernote iPad Geohash Location Map Search Simplenote Image WebKit RSpec Phone CSV WiMAX USB Chrome RubyKaigi RubyKaigi2011 Space CoffeeScript Nokogiri Hpricot Rubygems jQuery Node GTD CI UX Design VCS Kanazawa.rb Kindle Amazon Agile Vagrant Chef Windows Composer Dotenv PaaS Itamae SaaS Docker Swagger Grape WebAPI Microservices OmniAuth HTTP 分析基盤 CDN Terraform IaaS HCL Webpack Vue.js BigQuery Middleman CMS AWS PNG Laravel Selenium OAuth OpenAPI GitHub UML GCP TypeScript SQL Hanami Document SVG AsciiDoc Pandoc DocBook Develop Jekyll macOS Node.js Vite Heroku Transformer AI Data Cloud Wasm