b59944804019fce94907363f3a4d92d1fa3b69c95c85b2b59eff1ab5f3876b3fafcc1535cf162a9cd449068240fc293735e5b8ff5f670c7cc8a5cd937fab1b 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728
  1. # minipass
  2. A _very_ minimal implementation of a [PassThrough
  3. stream](https://nodejs.org/api/stream.html#stream_class_stream_passthrough)
  4. [It's very
  5. fast](https://docs.google.com/spreadsheets/d/1oObKSrVwLX_7Ut4Z6g3fZW-AX1j1-k6w-cDsrkaSbHM/edit#gid=0)
  6. for objects, strings, and buffers.
  7. Supports `pipe()`ing (including multi-`pipe()` and backpressure transmission),
  8. buffering data until either a `data` event handler or `pipe()` is added (so
  9. you don't lose the first chunk), and most other cases where PassThrough is
  10. a good idea.
  11. There is a `read()` method, but it's much more efficient to consume data
  12. from this stream via `'data'` events or by calling `pipe()` into some other
  13. stream. Calling `read()` requires the buffer to be flattened in some
  14. cases, which requires copying memory.
  15. If you set `objectMode: true` in the options, then whatever is written will
  16. be emitted. Otherwise, it'll do a minimal amount of Buffer copying to
  17. ensure proper Streams semantics when `read(n)` is called.
  18. `objectMode` can also be set by doing `stream.objectMode = true`, or by
  19. writing any non-string/non-buffer data. `objectMode` cannot be set to
  20. false once it is set.
  21. This is not a `through` or `through2` stream. It doesn't transform the
  22. data, it just passes it right through. If you want to transform the data,
  23. extend the class, and override the `write()` method. Once you're done
  24. transforming the data however you want, call `super.write()` with the
  25. transform output.
  26. For some examples of streams that extend Minipass in various ways, check
  27. out:
  28. - [minizlib](http://npm.im/minizlib)
  29. - [fs-minipass](http://npm.im/fs-minipass)
  30. - [tar](http://npm.im/tar)
  31. - [minipass-collect](http://npm.im/minipass-collect)
  32. - [minipass-flush](http://npm.im/minipass-flush)
  33. - [minipass-pipeline](http://npm.im/minipass-pipeline)
  34. - [tap](http://npm.im/tap)
  35. - [tap-parser](http://npm.im/tap-parser)
  36. - [treport](http://npm.im/treport)
  37. - [minipass-fetch](http://npm.im/minipass-fetch)
  38. - [pacote](http://npm.im/pacote)
  39. - [make-fetch-happen](http://npm.im/make-fetch-happen)
  40. - [cacache](http://npm.im/cacache)
  41. - [ssri](http://npm.im/ssri)
  42. - [npm-registry-fetch](http://npm.im/npm-registry-fetch)
  43. - [minipass-json-stream](http://npm.im/minipass-json-stream)
  44. - [minipass-sized](http://npm.im/minipass-sized)
  45. ## Differences from Node.js Streams
  46. There are several things that make Minipass streams different from (and in
  47. some ways superior to) Node.js core streams.
  48. Please read these caveats if you are familiar with node-core streams and
  49. intend to use Minipass streams in your programs.
  50. You can avoid most of these differences entirely (for a very
  51. small performance penalty) by setting `{async: true}` in the
  52. constructor options.
  53. ### Timing
  54. Minipass streams are designed to support synchronous use-cases. Thus, data
  55. is emitted as soon as it is available, always. It is buffered until read,
  56. but no longer. Another way to look at it is that Minipass streams are
  57. exactly as synchronous as the logic that writes into them.
  58. This can be surprising if your code relies on `PassThrough.write()` always
  59. providing data on the next tick rather than the current one, or being able
  60. to call `resume()` and not have the entire buffer disappear immediately.
  61. However, without this synchronicity guarantee, there would be no way for
  62. Minipass to achieve the speeds it does, or support the synchronous use
  63. cases that it does. Simply put, waiting takes time.
  64. This non-deferring approach makes Minipass streams much easier to reason
  65. about, especially in the context of Promises and other flow-control
  66. mechanisms.
  67. Example:
  68. ```js
  69. const Minipass = require('minipass')
  70. const stream = new Minipass({ async: true })
  71. stream.on('data', () => console.log('data event'))
  72. console.log('before write')
  73. stream.write('hello')
  74. console.log('after write')
  75. // output:
  76. // before write
  77. // data event
  78. // after write
  79. ```
  80. ### Exception: Async Opt-In
  81. If you wish to have a Minipass stream with behavior that more
  82. closely mimics Node.js core streams, you can set the stream in
  83. async mode either by setting `async: true` in the constructor
  84. options, or by setting `stream.async = true` later on.
  85. ```js
  86. const Minipass = require('minipass')
  87. const asyncStream = new Minipass({ async: true })
  88. asyncStream.on('data', () => console.log('data event'))
  89. console.log('before write')
  90. asyncStream.write('hello')
  91. console.log('after write')
  92. // output:
  93. // before write
  94. // after write
  95. // data event <-- this is deferred until the next tick
  96. ```
  97. Switching _out_ of async mode is unsafe, as it could cause data
  98. corruption, and so is not enabled. Example:
  99. ```js
  100. const Minipass = require('minipass')
  101. const stream = new Minipass({ encoding: 'utf8' })
  102. stream.on('data', chunk => console.log(chunk))
  103. stream.async = true
  104. console.log('before writes')
  105. stream.write('hello')
  106. setStreamSyncAgainSomehow(stream) // <-- this doesn't actually exist!
  107. stream.write('world')
  108. console.log('after writes')
  109. // hypothetical output would be:
  110. // before writes
  111. // world
  112. // after writes
  113. // hello
  114. // NOT GOOD!
  115. ```
  116. To avoid this problem, once set into async mode, any attempt to
  117. make the stream sync again will be ignored.
  118. ```js
  119. const Minipass = require('minipass')
  120. const stream = new Minipass({ encoding: 'utf8' })
  121. stream.on('data', chunk => console.log(chunk))
  122. stream.async = true
  123. console.log('before writes')
  124. stream.write('hello')
  125. stream.async = false // <-- no-op, stream already async
  126. stream.write('world')
  127. console.log('after writes')
  128. // actual output:
  129. // before writes
  130. // after writes
  131. // hello
  132. // world
  133. ```
  134. ### No High/Low Water Marks
  135. Node.js core streams will optimistically fill up a buffer, returning `true`
  136. on all writes until the limit is hit, even if the data has nowhere to go.
  137. Then, they will not attempt to draw more data in until the buffer size dips
  138. below a minimum value.
  139. Minipass streams are much simpler. The `write()` method will return `true`
  140. if the data has somewhere to go (which is to say, given the timing
  141. guarantees, that the data is already there by the time `write()` returns).
  142. If the data has nowhere to go, then `write()` returns false, and the data
  143. sits in a buffer, to be drained out immediately as soon as anyone consumes
  144. it.
  145. Since nothing is ever buffered unnecessarily, there is much less
  146. copying data, and less bookkeeping about buffer capacity levels.
  147. ### Hazards of Buffering (or: Why Minipass Is So Fast)
  148. Since data written to a Minipass stream is immediately written all the way
  149. through the pipeline, and `write()` always returns true/false based on
  150. whether the data was fully flushed, backpressure is communicated
  151. immediately to the upstream caller. This minimizes buffering.
  152. Consider this case:
  153. ```js
  154. const {PassThrough} = require('stream')
  155. const p1 = new PassThrough({ highWaterMark: 1024 })
  156. const p2 = new PassThrough({ highWaterMark: 1024 })
  157. const p3 = new PassThrough({ highWaterMark: 1024 })
  158. const p4 = new PassThrough({ highWaterMark: 1024 })
  159. p1.pipe(p2).pipe(p3).pipe(p4)
  160. p4.on('data', () => console.log('made it through'))
  161. // this returns false and buffers, then writes to p2 on next tick (1)
  162. // p2 returns false and buffers, pausing p1, then writes to p3 on next tick (2)
  163. // p3 returns false and buffers, pausing p2, then writes to p4 on next tick (3)
  164. // p4 returns false and buffers, pausing p3, then emits 'data' and 'drain'
  165. // on next tick (4)
  166. // p3 sees p4's 'drain' event, and calls resume(), emitting 'resume' and
  167. // 'drain' on next tick (5)
  168. // p2 sees p3's 'drain', calls resume(), emits 'resume' and 'drain' on next tick (6)
  169. // p1 sees p2's 'drain', calls resume(), emits 'resume' and 'drain' on next
  170. // tick (7)
  171. p1.write(Buffer.alloc(2048)) // returns false
  172. ```
  173. Along the way, the data was buffered and deferred at each stage, and
  174. multiple event deferrals happened, for an unblocked pipeline where it was
  175. perfectly safe to write all the way through!
  176. Furthermore, setting a `highWaterMark` of `1024` might lead someone reading
  177. the code to think an advisory maximum of 1KiB is being set for the
  178. pipeline. However, the actual advisory buffering level is the _sum_ of
  179. `highWaterMark` values, since each one has its own bucket.
  180. Consider the Minipass case:
  181. ```js
  182. const m1 = new Minipass()
  183. const m2 = new Minipass()
  184. const m3 = new Minipass()
  185. const m4 = new Minipass()
  186. m1.pipe(m2).pipe(m3).pipe(m4)
  187. m4.on('data', () => console.log('made it through'))
  188. // m1 is flowing, so it writes the data to m2 immediately
  189. // m2 is flowing, so it writes the data to m3 immediately
  190. // m3 is flowing, so it writes the data to m4 immediately
  191. // m4 is flowing, so it fires the 'data' event immediately, returns true
  192. // m4's write returned true, so m3 is still flowing, returns true
  193. // m3's write returned true, so m2 is still flowing, returns true
  194. // m2's write returned true, so m1 is still flowing, returns true
  195. // No event deferrals or buffering along the way!
  196. m1.write(Buffer.alloc(2048)) // returns true
  197. ```
  198. It is extremely unlikely that you _don't_ want to buffer any data written,
  199. or _ever_ buffer data that can be flushed all the way through. Neither
  200. node-core streams nor Minipass ever fail to buffer written data, but
  201. node-core streams do a lot of unnecessary buffering and pausing.
  202. As always, the faster implementation is the one that does less stuff and
  203. waits less time to do it.
  204. ### Immediately emit `end` for empty streams (when not paused)
  205. If a stream is not paused, and `end()` is called before writing any data
  206. into it, then it will emit `end` immediately.
  207. If you have logic that occurs on the `end` event which you don't want to
  208. potentially happen immediately (for example, closing file descriptors,
  209. moving on to the next entry in an archive parse stream, etc.) then be sure
  210. to call `stream.pause()` on creation, and then `stream.resume()` once you
  211. are ready to respond to the `end` event.
  212. However, this is _usually_ not a problem because:
  213. ### Emit `end` When Asked
  214. One hazard of immediately emitting `'end'` is that you may not yet have had
  215. a chance to add a listener. In order to avoid this hazard, Minipass
  216. streams safely re-emit the `'end'` event if a new listener is added after
  217. `'end'` has been emitted.
  218. Ie, if you do `stream.on('end', someFunction)`, and the stream has already
  219. emitted `end`, then it will call the handler right away. (You can think of
  220. this somewhat like attaching a new `.then(fn)` to a previously-resolved
  221. Promise.)
  222. To prevent calling handlers multiple times who would not expect multiple
  223. ends to occur, all listeners are removed from the `'end'` event whenever it
  224. is emitted.
  225. ### Emit `error` When Asked
  226. The most recent error object passed to the `'error'` event is
  227. stored on the stream. If a new `'error'` event handler is added,
  228. and an error was previously emitted, then the event handler will
  229. be called immediately (or on `process.nextTick` in the case of
  230. async streams).
  231. This makes it much more difficult to end up trying to interact
  232. with a broken stream, if the error handler is added after an
  233. error was previously emitted.
  234. ### Impact of "immediate flow" on Tee-streams
  235. A "tee stream" is a stream piping to multiple destinations:
  236. ```js
  237. const tee = new Minipass()
  238. t.pipe(dest1)
  239. t.pipe(dest2)
  240. t.write('foo') // goes to both destinations
  241. ```
  242. Since Minipass streams _immediately_ process any pending data through the
  243. pipeline when a new pipe destination is added, this can have surprising
  244. effects, especially when a stream comes in from some other function and may
  245. or may not have data in its buffer.
  246. ```js
  247. // WARNING! WILL LOSE DATA!
  248. const src = new Minipass()
  249. src.write('foo')
  250. src.pipe(dest1) // 'foo' chunk flows to dest1 immediately, and is gone
  251. src.pipe(dest2) // gets nothing!
  252. ```
  253. One solution is to create a dedicated tee-stream junction that pipes to
  254. both locations, and then pipe to _that_ instead.
  255. ```js
  256. // Safe example: tee to both places
  257. const src = new Minipass()
  258. src.write('foo')
  259. const tee = new Minipass()
  260. tee.pipe(dest1)
  261. tee.pipe(dest2)
  262. src.pipe(tee) // tee gets 'foo', pipes to both locations
  263. ```
  264. The same caveat applies to `on('data')` event listeners. The first one
  265. added will _immediately_ receive all of the data, leaving nothing for the
  266. second:
  267. ```js
  268. // WARNING! WILL LOSE DATA!
  269. const src = new Minipass()
  270. src.write('foo')
  271. src.on('data', handler1) // receives 'foo' right away
  272. src.on('data', handler2) // nothing to see here!
  273. ```
  274. Using a dedicated tee-stream can be used in this case as well:
  275. ```js
  276. // Safe example: tee to both data handlers
  277. const src = new Minipass()
  278. src.write('foo')
  279. const tee = new Minipass()
  280. tee.on('data', handler1)
  281. tee.on('data', handler2)
  282. src.pipe(tee)
  283. ```
  284. All of the hazards in this section are avoided by setting `{
  285. async: true }` in the Minipass constructor, or by setting
  286. `stream.async = true` afterwards. Note that this does add some
  287. overhead, so should only be done in cases where you are willing
  288. to lose a bit of performance in order to avoid having to refactor
  289. program logic.
  290. ## USAGE
  291. It's a stream! Use it like a stream and it'll most likely do what you
  292. want.
  293. ```js
  294. const Minipass = require('minipass')
  295. const mp = new Minipass(options) // optional: { encoding, objectMode }
  296. mp.write('foo')
  297. mp.pipe(someOtherStream)
  298. mp.end('bar')
  299. ```
  300. ### OPTIONS
  301. * `encoding` How would you like the data coming _out_ of the stream to be
  302. encoded? Accepts any values that can be passed to `Buffer.toString()`.
  303. * `objectMode` Emit data exactly as it comes in. This will be flipped on
  304. by default if you write() something other than a string or Buffer at any
  305. point. Setting `objectMode: true` will prevent setting any encoding
  306. value.
  307. * `async` Defaults to `false`. Set to `true` to defer data
  308. emission until next tick. This reduces performance slightly,
  309. but makes Minipass streams use timing behavior closer to Node
  310. core streams. See [Timing](#timing) for more details.
  311. ### API
  312. Implements the user-facing portions of Node.js's `Readable` and `Writable`
  313. streams.
  314. ### Methods
  315. * `write(chunk, [encoding], [callback])` - Put data in. (Note that, in the
  316. base Minipass class, the same data will come out.) Returns `false` if
  317. the stream will buffer the next write, or true if it's still in "flowing"
  318. mode.
  319. * `end([chunk, [encoding]], [callback])` - Signal that you have no more
  320. data to write. This will queue an `end` event to be fired when all the
  321. data has been consumed.
  322. * `setEncoding(encoding)` - Set the encoding for data coming of the stream.
  323. This can only be done once.
  324. * `pause()` - No more data for a while, please. This also prevents `end`
  325. from being emitted for empty streams until the stream is resumed.
  326. * `resume()` - Resume the stream. If there's data in the buffer, it is all
  327. discarded. Any buffered events are immediately emitted.
  328. * `pipe(dest)` - Send all output to the stream provided. When
  329. data is emitted, it is immediately written to any and all pipe
  330. destinations. (Or written on next tick in `async` mode.)
  331. * `unpipe(dest)` - Stop piping to the destination stream. This
  332. is immediate, meaning that any asynchronously queued data will
  333. _not_ make it to the destination when running in `async` mode.
  334. * `options.end` - Boolean, end the destination stream when
  335. the source stream ends. Default `true`.
  336. * `options.proxyErrors` - Boolean, proxy `error` events from
  337. the source stream to the destination stream. Note that
  338. errors are _not_ proxied after the pipeline terminates,
  339. either due to the source emitting `'end'` or manually
  340. unpiping with `src.unpipe(dest)`. Default `false`.
  341. * `on(ev, fn)`, `emit(ev, fn)` - Minipass streams are EventEmitters. Some
  342. events are given special treatment, however. (See below under "events".)
  343. * `promise()` - Returns a Promise that resolves when the stream emits
  344. `end`, or rejects if the stream emits `error`.
  345. * `collect()` - Return a Promise that resolves on `end` with an array
  346. containing each chunk of data that was emitted, or rejects if the stream
  347. emits `error`. Note that this consumes the stream data.
  348. * `concat()` - Same as `collect()`, but concatenates the data into a single
  349. Buffer object. Will reject the returned promise if the stream is in
  350. objectMode, or if it goes into objectMode by the end of the data.
  351. * `read(n)` - Consume `n` bytes of data out of the buffer. If `n` is not
  352. provided, then consume all of it. If `n` bytes are not available, then
  353. it returns null. **Note** consuming streams in this way is less
  354. efficient, and can lead to unnecessary Buffer copying.
  355. * `destroy([er])` - Destroy the stream. If an error is provided, then an
  356. `'error'` event is emitted. If the stream has a `close()` method, and
  357. has not emitted a `'close'` event yet, then `stream.close()` will be
  358. called. Any Promises returned by `.promise()`, `.collect()` or
  359. `.concat()` will be rejected. After being destroyed, writing to the
  360. stream will emit an error. No more data will be emitted if the stream is
  361. destroyed, even if it was previously buffered.
  362. ### Properties
  363. * `bufferLength` Read-only. Total number of bytes buffered, or in the case
  364. of objectMode, the total number of objects.
  365. * `encoding` The encoding that has been set. (Setting this is equivalent
  366. to calling `setEncoding(enc)` and has the same prohibition against
  367. setting multiple times.)
  368. * `flowing` Read-only. Boolean indicating whether a chunk written to the
  369. stream will be immediately emitted.
  370. * `emittedEnd` Read-only. Boolean indicating whether the end-ish events
  371. (ie, `end`, `prefinish`, `finish`) have been emitted. Note that
  372. listening on any end-ish event will immediateyl re-emit it if it has
  373. already been emitted.
  374. * `writable` Whether the stream is writable. Default `true`. Set to
  375. `false` when `end()`
  376. * `readable` Whether the stream is readable. Default `true`.
  377. * `buffer` A [yallist](http://npm.im/yallist) linked list of chunks written
  378. to the stream that have not yet been emitted. (It's probably a bad idea
  379. to mess with this.)
  380. * `pipes` A [yallist](http://npm.im/yallist) linked list of streams that
  381. this stream is piping into. (It's probably a bad idea to mess with
  382. this.)
  383. * `destroyed` A getter that indicates whether the stream was destroyed.
  384. * `paused` True if the stream has been explicitly paused, otherwise false.
  385. * `objectMode` Indicates whether the stream is in `objectMode`. Once set
  386. to `true`, it cannot be set to `false`.
  387. ### Events
  388. * `data` Emitted when there's data to read. Argument is the data to read.
  389. This is never emitted while not flowing. If a listener is attached, that
  390. will resume the stream.
  391. * `end` Emitted when there's no more data to read. This will be emitted
  392. immediately for empty streams when `end()` is called. If a listener is
  393. attached, and `end` was already emitted, then it will be emitted again.
  394. All listeners are removed when `end` is emitted.
  395. * `prefinish` An end-ish event that follows the same logic as `end` and is
  396. emitted in the same conditions where `end` is emitted. Emitted after
  397. `'end'`.
  398. * `finish` An end-ish event that follows the same logic as `end` and is
  399. emitted in the same conditions where `end` is emitted. Emitted after
  400. `'prefinish'`.
  401. * `close` An indication that an underlying resource has been released.
  402. Minipass does not emit this event, but will defer it until after `end`
  403. has been emitted, since it throws off some stream libraries otherwise.
  404. * `drain` Emitted when the internal buffer empties, and it is again
  405. suitable to `write()` into the stream.
  406. * `readable` Emitted when data is buffered and ready to be read by a
  407. consumer.
  408. * `resume` Emitted when stream changes state from buffering to flowing
  409. mode. (Ie, when `resume` is called, `pipe` is called, or a `data` event
  410. listener is added.)
  411. ### Static Methods
  412. * `Minipass.isStream(stream)` Returns `true` if the argument is a stream,
  413. and false otherwise. To be considered a stream, the object must be
  414. either an instance of Minipass, or an EventEmitter that has either a
  415. `pipe()` method, or both `write()` and `end()` methods. (Pretty much any
  416. stream in node-land will return `true` for this.)
  417. ## EXAMPLES
  418. Here are some examples of things you can do with Minipass streams.
  419. ### simple "are you done yet" promise
  420. ```js
  421. mp.promise().then(() => {
  422. // stream is finished
  423. }, er => {
  424. // stream emitted an error
  425. })
  426. ```
  427. ### collecting
  428. ```js
  429. mp.collect().then(all => {
  430. // all is an array of all the data emitted
  431. // encoding is supported in this case, so
  432. // so the result will be a collection of strings if
  433. // an encoding is specified, or buffers/objects if not.
  434. //
  435. // In an async function, you may do
  436. // const data = await stream.collect()
  437. })
  438. ```
  439. ### collecting into a single blob
  440. This is a bit slower because it concatenates the data into one chunk for
  441. you, but if you're going to do it yourself anyway, it's convenient this
  442. way:
  443. ```js
  444. mp.concat().then(onebigchunk => {
  445. // onebigchunk is a string if the stream
  446. // had an encoding set, or a buffer otherwise.
  447. })
  448. ```
  449. ### iteration
  450. You can iterate over streams synchronously or asynchronously in platforms
  451. that support it.
  452. Synchronous iteration will end when the currently available data is
  453. consumed, even if the `end` event has not been reached. In string and
  454. buffer mode, the data is concatenated, so unless multiple writes are
  455. occurring in the same tick as the `read()`, sync iteration loops will
  456. generally only have a single iteration.
  457. To consume chunks in this way exactly as they have been written, with no
  458. flattening, create the stream with the `{ objectMode: true }` option.
  459. ```js
  460. const mp = new Minipass({ objectMode: true })
  461. mp.write('a')
  462. mp.write('b')
  463. for (let letter of mp) {
  464. console.log(letter) // a, b
  465. }
  466. mp.write('c')
  467. mp.write('d')
  468. for (let letter of mp) {
  469. console.log(letter) // c, d
  470. }
  471. mp.write('e')
  472. mp.end()
  473. for (let letter of mp) {
  474. console.log(letter) // e
  475. }
  476. for (let letter of mp) {
  477. console.log(letter) // nothing
  478. }
  479. ```
  480. Asynchronous iteration will continue until the end event is reached,
  481. consuming all of the data.
  482. ```js
  483. const mp = new Minipass({ encoding: 'utf8' })
  484. // some source of some data
  485. let i = 5
  486. const inter = setInterval(() => {
  487. if (i-- > 0)
  488. mp.write(Buffer.from('foo\n', 'utf8'))
  489. else {
  490. mp.end()
  491. clearInterval(inter)
  492. }
  493. }, 100)
  494. // consume the data with asynchronous iteration
  495. async function consume () {
  496. for await (let chunk of mp) {
  497. console.log(chunk)
  498. }
  499. return 'ok'
  500. }
  501. consume().then(res => console.log(res))
  502. // logs `foo\n` 5 times, and then `ok`
  503. ```
  504. ### subclass that `console.log()`s everything written into it
  505. ```js
  506. class Logger extends Minipass {
  507. write (chunk, encoding, callback) {
  508. console.log('WRITE', chunk, encoding)
  509. return super.write(chunk, encoding, callback)
  510. }
  511. end (chunk, encoding, callback) {
  512. console.log('END', chunk, encoding)
  513. return super.end(chunk, encoding, callback)
  514. }
  515. }
  516. someSource.pipe(new Logger()).pipe(someDest)
  517. ```
  518. ### same thing, but using an inline anonymous class
  519. ```js
  520. // js classes are fun
  521. someSource
  522. .pipe(new (class extends Minipass {
  523. emit (ev, ...data) {
  524. // let's also log events, because debugging some weird thing
  525. console.log('EMIT', ev)
  526. return super.emit(ev, ...data)
  527. }
  528. write (chunk, encoding, callback) {
  529. console.log('WRITE', chunk, encoding)
  530. return super.write(chunk, encoding, callback)
  531. }
  532. end (chunk, encoding, callback) {
  533. console.log('END', chunk, encoding)
  534. return super.end(chunk, encoding, callback)
  535. }
  536. }))
  537. .pipe(someDest)
  538. ```
  539. ### subclass that defers 'end' for some reason
  540. ```js
  541. class SlowEnd extends Minipass {
  542. emit (ev, ...args) {
  543. if (ev === 'end') {
  544. console.log('going to end, hold on a sec')
  545. setTimeout(() => {
  546. console.log('ok, ready to end now')
  547. super.emit('end', ...args)
  548. }, 100)
  549. } else {
  550. return super.emit(ev, ...args)
  551. }
  552. }
  553. }
  554. ```
  555. ### transform that creates newline-delimited JSON
  556. ```js
  557. class NDJSONEncode extends Minipass {
  558. write (obj, cb) {
  559. try {
  560. // JSON.stringify can throw, emit an error on that
  561. return super.write(JSON.stringify(obj) + '\n', 'utf8', cb)
  562. } catch (er) {
  563. this.emit('error', er)
  564. }
  565. }
  566. end (obj, cb) {
  567. if (typeof obj === 'function') {
  568. cb = obj
  569. obj = undefined
  570. }
  571. if (obj !== undefined) {
  572. this.write(obj)
  573. }
  574. return super.end(cb)
  575. }
  576. }
  577. ```
  578. ### transform that parses newline-delimited JSON
  579. ```js
  580. class NDJSONDecode extends Minipass {
  581. constructor (options) {
  582. // always be in object mode, as far as Minipass is concerned
  583. super({ objectMode: true })
  584. this._jsonBuffer = ''
  585. }
  586. write (chunk, encoding, cb) {
  587. if (typeof chunk === 'string' &&
  588. typeof encoding === 'string' &&
  589. encoding !== 'utf8') {
  590. chunk = Buffer.from(chunk, encoding).toString()
  591. } else if (Buffer.isBuffer(chunk))
  592. chunk = chunk.toString()
  593. }
  594. if (typeof encoding === 'function') {
  595. cb = encoding
  596. }
  597. const jsonData = (this._jsonBuffer + chunk).split('\n')
  598. this._jsonBuffer = jsonData.pop()
  599. for (let i = 0; i < jsonData.length; i++) {
  600. try {
  601. // JSON.parse can throw, emit an error on that
  602. super.write(JSON.parse(jsonData[i]))
  603. } catch (er) {
  604. this.emit('error', er)
  605. continue
  606. }
  607. }
  608. if (cb)
  609. cb()
  610. }
  611. }
  612. ```