432aba75c8d64244bf6f49f4a9ffd20aefd74a69822bf409686f405dfbdc2a36ae980390fd229ab02ecb8ca683786e62f5251e8e81b318a3e736992de7dc02 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. 'use strict'
  2. const check = require('check-types')
  3. const eventify = require('./eventify')
  4. const events = require('./events')
  5. const JsonStream = require('./jsonstream')
  6. const Hoopy = require('hoopy')
  7. const promise = require('./promise')
  8. const tryer = require('tryer')
  9. const DEFAULT_BUFFER_LENGTH = 1024
  10. module.exports = streamify
  11. /**
  12. * Public function `streamify`.
  13. *
  14. * Asynchronously serialises a data structure to a stream of JSON
  15. * data. Sanely handles promises, buffers, maps and other iterables.
  16. *
  17. * @param data: The data to transform.
  18. *
  19. * @option space: Indentation string, or the number of spaces
  20. * to indent each nested level by.
  21. *
  22. * @option promises: 'resolve' or 'ignore', default is 'resolve'.
  23. *
  24. * @option buffers: 'toString' or 'ignore', default is 'toString'.
  25. *
  26. * @option maps: 'object' or 'ignore', default is 'object'.
  27. *
  28. * @option iterables: 'array' or 'ignore', default is 'array'.
  29. *
  30. * @option circular: 'error' or 'ignore', default is 'error'.
  31. *
  32. * @option yieldRate: The number of data items to process per timeslice,
  33. * default is 16384.
  34. *
  35. * @option bufferLength: The length of the buffer, default is 1024.
  36. *
  37. * @option highWaterMark: If set, will be passed to the readable stream constructor
  38. * as the value for the highWaterMark option.
  39. *
  40. * @option Promise: The promise constructor to use, defaults to bluebird.
  41. **/
  42. function streamify (data, options = {}) {
  43. const emitter = eventify(data, options)
  44. const json = new Hoopy(options.bufferLength || DEFAULT_BUFFER_LENGTH)
  45. const Promise = promise(options)
  46. const space = normaliseSpace(options)
  47. let streamOptions
  48. const { highWaterMark } = options
  49. if (highWaterMark) {
  50. streamOptions = { highWaterMark }
  51. }
  52. const stream = new JsonStream(read, streamOptions)
  53. let awaitPush = true
  54. let index = 0
  55. let indentation = ''
  56. let isEnded
  57. let isPaused = false
  58. let isProperty
  59. let length = 0
  60. let mutex = Promise.resolve()
  61. let needsComma
  62. emitter.on(events.array, noRacing(array))
  63. emitter.on(events.object, noRacing(object))
  64. emitter.on(events.property, noRacing(property))
  65. emitter.on(events.string, noRacing(string))
  66. emitter.on(events.number, noRacing(value))
  67. emitter.on(events.literal, noRacing(value))
  68. emitter.on(events.endArray, noRacing(endArray))
  69. emitter.on(events.endObject, noRacing(endObject))
  70. emitter.on(events.end, noRacing(end))
  71. emitter.on(events.error, noRacing(error))
  72. emitter.on(events.dataError, noRacing(dataError))
  73. return stream
  74. function read () {
  75. if (awaitPush) {
  76. awaitPush = false
  77. if (isEnded) {
  78. if (length > 0) {
  79. after()
  80. }
  81. return endStream()
  82. }
  83. }
  84. if (isPaused) {
  85. after()
  86. }
  87. }
  88. function after () {
  89. if (awaitPush) {
  90. return
  91. }
  92. let i
  93. for (i = 0; i < length && ! awaitPush; ++i) {
  94. if (! stream.push(json[i + index], 'utf8')) {
  95. awaitPush = true
  96. }
  97. }
  98. if (i === length) {
  99. index = length = 0
  100. } else {
  101. length -= i
  102. index += i
  103. }
  104. }
  105. function endStream () {
  106. if (! awaitPush) {
  107. stream.push(null)
  108. }
  109. }
  110. function noRacing (handler) {
  111. return eventData => mutex = mutex.then(() => handler(eventData))
  112. }
  113. function array () {
  114. return beforeScope()
  115. .then(() => addJson('['))
  116. .then(() => afterScope())
  117. }
  118. function beforeScope () {
  119. return before(true)
  120. }
  121. function before (isScope) {
  122. if (isProperty) {
  123. isProperty = false
  124. if (space) {
  125. return addJson(' ')
  126. }
  127. return Promise.resolve()
  128. }
  129. return Promise.resolve()
  130. .then(() => {
  131. if (needsComma) {
  132. if (isScope) {
  133. needsComma = false
  134. }
  135. return addJson(',')
  136. }
  137. if (! isScope) {
  138. needsComma = true
  139. }
  140. })
  141. .then(() => {
  142. if (space && indentation) {
  143. return indent()
  144. }
  145. })
  146. }
  147. function addJson (chunk) {
  148. if (length + 1 <= json.length) {
  149. json[index + length++] = chunk
  150. after()
  151. return Promise.resolve()
  152. }
  153. isPaused = true
  154. return new Promise(resolve => {
  155. const unpause = emitter.pause()
  156. tryer({
  157. interval: -10,
  158. until () {
  159. return length + 1 <= json.length
  160. },
  161. pass () {
  162. isPaused = false
  163. json[index + length++] = chunk
  164. resolve()
  165. setImmediate(unpause)
  166. }
  167. })
  168. })
  169. }
  170. function indent () {
  171. return addJson(`\n${indentation}`)
  172. }
  173. function afterScope () {
  174. needsComma = false
  175. if (space) {
  176. indentation += space
  177. }
  178. }
  179. function object () {
  180. return beforeScope()
  181. .then(() => addJson('{'))
  182. .then(() => afterScope())
  183. }
  184. function property (name) {
  185. return before()
  186. .then(() => addJson(`"${name}":`))
  187. .then(() => {
  188. isProperty = true
  189. })
  190. }
  191. function string (s) {
  192. return value(`"${s}"`)
  193. }
  194. function value (v) {
  195. return before()
  196. .then(() => addJson(`${v}`))
  197. }
  198. function endArray () {
  199. return beforeScopeEnd()
  200. .then(() => addJson(']'))
  201. .then(() => afterScopeEnd())
  202. }
  203. function beforeScopeEnd () {
  204. if (space) {
  205. indentation = indentation.substr(space.length)
  206. return indent()
  207. }
  208. return Promise.resolve()
  209. }
  210. function afterScopeEnd () {
  211. needsComma = true
  212. }
  213. function endObject () {
  214. return beforeScopeEnd()
  215. .then(() => addJson('}'))
  216. .then(() => afterScopeEnd())
  217. }
  218. function end () {
  219. after()
  220. isEnded = true
  221. endStream()
  222. }
  223. function error (err) {
  224. stream.emit('error', err)
  225. }
  226. function dataError (err) {
  227. stream.emit('dataError', err)
  228. }
  229. }
  230. function normaliseSpace (options) {
  231. if (check.positive(options.space)) {
  232. return new Array(options.space + 1).join(' ')
  233. }
  234. if (check.nonEmptyString(options.space)) {
  235. return options.space
  236. }
  237. }