123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- 'use strict'
- const check = require('check-types')
- const eventify = require('./eventify')
- const events = require('./events')
- const JsonStream = require('./jsonstream')
- const Hoopy = require('hoopy')
- const promise = require('./promise')
- const tryer = require('tryer')
- const DEFAULT_BUFFER_LENGTH = 1024
- module.exports = streamify
- /**
- * Public function `streamify`.
- *
- * Asynchronously serialises a data structure to a stream of JSON
- * data. Sanely handles promises, buffers, maps and other iterables.
- *
- * @param data: The data to transform.
- *
- * @option space: Indentation string, or the number of spaces
- * to indent each nested level by.
- *
- * @option promises: 'resolve' or 'ignore', default is 'resolve'.
- *
- * @option buffers: 'toString' or 'ignore', default is 'toString'.
- *
- * @option maps: 'object' or 'ignore', default is 'object'.
- *
- * @option iterables: 'array' or 'ignore', default is 'array'.
- *
- * @option circular: 'error' or 'ignore', default is 'error'.
- *
- * @option yieldRate: The number of data items to process per timeslice,
- * default is 16384.
- *
- * @option bufferLength: The length of the buffer, default is 1024.
- *
- * @option highWaterMark: If set, will be passed to the readable stream constructor
- * as the value for the highWaterMark option.
- *
- * @option Promise: The promise constructor to use, defaults to bluebird.
- **/
- function streamify (data, options = {}) {
- const emitter = eventify(data, options)
- const json = new Hoopy(options.bufferLength || DEFAULT_BUFFER_LENGTH)
- const Promise = promise(options)
- const space = normaliseSpace(options)
- let streamOptions
- const { highWaterMark } = options
- if (highWaterMark) {
- streamOptions = { highWaterMark }
- }
- const stream = new JsonStream(read, streamOptions)
- let awaitPush = true
- let index = 0
- let indentation = ''
- let isEnded
- let isPaused = false
- let isProperty
- let length = 0
- let mutex = Promise.resolve()
- let needsComma
- emitter.on(events.array, noRacing(array))
- emitter.on(events.object, noRacing(object))
- emitter.on(events.property, noRacing(property))
- emitter.on(events.string, noRacing(string))
- emitter.on(events.number, noRacing(value))
- emitter.on(events.literal, noRacing(value))
- emitter.on(events.endArray, noRacing(endArray))
- emitter.on(events.endObject, noRacing(endObject))
- emitter.on(events.end, noRacing(end))
- emitter.on(events.error, noRacing(error))
- emitter.on(events.dataError, noRacing(dataError))
- return stream
- function read () {
- if (awaitPush) {
- awaitPush = false
- if (isEnded) {
- if (length > 0) {
- after()
- }
- return endStream()
- }
- }
- if (isPaused) {
- after()
- }
- }
- function after () {
- if (awaitPush) {
- return
- }
- let i
- for (i = 0; i < length && ! awaitPush; ++i) {
- if (! stream.push(json[i + index], 'utf8')) {
- awaitPush = true
- }
- }
- if (i === length) {
- index = length = 0
- } else {
- length -= i
- index += i
- }
- }
- function endStream () {
- if (! awaitPush) {
- stream.push(null)
- }
- }
- function noRacing (handler) {
- return eventData => mutex = mutex.then(() => handler(eventData))
- }
- function array () {
- return beforeScope()
- .then(() => addJson('['))
- .then(() => afterScope())
- }
- function beforeScope () {
- return before(true)
- }
- function before (isScope) {
- if (isProperty) {
- isProperty = false
- if (space) {
- return addJson(' ')
- }
- return Promise.resolve()
- }
- return Promise.resolve()
- .then(() => {
- if (needsComma) {
- if (isScope) {
- needsComma = false
- }
- return addJson(',')
- }
- if (! isScope) {
- needsComma = true
- }
- })
- .then(() => {
- if (space && indentation) {
- return indent()
- }
- })
- }
- function addJson (chunk) {
- if (length + 1 <= json.length) {
- json[index + length++] = chunk
- after()
- return Promise.resolve()
- }
- isPaused = true
- return new Promise(resolve => {
- const unpause = emitter.pause()
- tryer({
- interval: -10,
- until () {
- return length + 1 <= json.length
- },
- pass () {
- isPaused = false
- json[index + length++] = chunk
- resolve()
- setImmediate(unpause)
- }
- })
- })
- }
- function indent () {
- return addJson(`\n${indentation}`)
- }
- function afterScope () {
- needsComma = false
- if (space) {
- indentation += space
- }
- }
- function object () {
- return beforeScope()
- .then(() => addJson('{'))
- .then(() => afterScope())
- }
- function property (name) {
- return before()
- .then(() => addJson(`"${name}":`))
- .then(() => {
- isProperty = true
- })
- }
- function string (s) {
- return value(`"${s}"`)
- }
- function value (v) {
- return before()
- .then(() => addJson(`${v}`))
- }
- function endArray () {
- return beforeScopeEnd()
- .then(() => addJson(']'))
- .then(() => afterScopeEnd())
- }
- function beforeScopeEnd () {
- if (space) {
- indentation = indentation.substr(space.length)
- return indent()
- }
- return Promise.resolve()
- }
- function afterScopeEnd () {
- needsComma = true
- }
- function endObject () {
- return beforeScopeEnd()
- .then(() => addJson('}'))
- .then(() => afterScopeEnd())
- }
- function end () {
- after()
- isEnded = true
- endStream()
- }
- function error (err) {
- stream.emit('error', err)
- }
- function dataError (err) {
- stream.emit('dataError', err)
- }
- }
- function normaliseSpace (options) {
- if (check.positive(options.space)) {
- return new Array(options.space + 1).join(' ')
- }
- if (check.nonEmptyString(options.space)) {
- return options.space
- }
- }
|