c3a8878549b7d3cbec23a2fadb15f1bb2378e5f2f7f4107262d854cb0708474d08c6678f6f8164bbf37ecd43a1afaa5daa5e20c290e1a9bc9a8ace588f5e6d 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. 'use strict'
  2. const check = require('check-types')
  3. const EventEmitter = require('events').EventEmitter
  4. const events = require('./events')
  5. const promise = require('./promise')
  6. const invalidTypes = {
  7. undefined: true, // eslint-disable-line no-undefined
  8. function: true,
  9. symbol: true
  10. }
  11. module.exports = eventify
  12. /**
  13. * Public function `eventify`.
  14. *
  15. * Returns an event emitter and asynchronously traverses a data structure
  16. * (depth-first), emitting events as it encounters items. Sanely handles
  17. * promises, buffers, maps and other iterables. The event emitter is
  18. * decorated with a `pause` method that can be called to pause processing.
  19. *
  20. * @param data: The data structure to traverse.
  21. *
  22. * @option promises: 'resolve' or 'ignore', default is 'resolve'.
  23. *
  24. * @option buffers: 'toString' or 'ignore', default is 'toString'.
  25. *
  26. * @option maps: 'object' or 'ignore', default is 'object'.
  27. *
  28. * @option iterables: 'array' or 'ignore', default is 'array'.
  29. *
  30. * @option circular: 'error' or 'ignore', default is 'error'.
  31. *
  32. * @option yieldRate: The number of data items to process per timeslice,
  33. * default is 16384.
  34. *
  35. * @option Promise: The promise constructor to use, defaults to bluebird.
  36. **/
  37. function eventify (data, options = {}) {
  38. const coercions = {}
  39. const emitter = new EventEmitter()
  40. const Promise = promise(options)
  41. const references = new Map()
  42. let count = 0
  43. let disableCoercions = false
  44. let ignoreCircularReferences
  45. let ignoreItems
  46. let pause
  47. let yieldRate
  48. emitter.pause = () => {
  49. let resolve
  50. pause = new Promise(res => resolve = res)
  51. return () => {
  52. pause = null
  53. count = 0
  54. resolve()
  55. }
  56. }
  57. parseOptions()
  58. setImmediate(begin)
  59. return emitter
  60. function parseOptions () {
  61. parseCoercionOption('promises')
  62. parseCoercionOption('buffers')
  63. parseCoercionOption('maps')
  64. parseCoercionOption('iterables')
  65. if (Object.keys(coercions).length === 0) {
  66. disableCoercions = true
  67. }
  68. if (options.circular === 'ignore') {
  69. ignoreCircularReferences = true
  70. }
  71. check.assert.maybe.positive(options.yieldRate)
  72. yieldRate = options.yieldRate || 16384
  73. }
  74. function parseCoercionOption (key) {
  75. if (options[key] !== 'ignore') {
  76. coercions[key] = true
  77. }
  78. }
  79. function begin () {
  80. return proceed(data)
  81. .catch(error => emit(events.error, error))
  82. .then(() => emit(events.end))
  83. }
  84. function proceed (datum) {
  85. if (++count % yieldRate !== 0) {
  86. return coerce(datum).then(after)
  87. }
  88. return new Promise((resolve, reject) => {
  89. setImmediate(() => {
  90. coerce(datum)
  91. .then(after)
  92. .then(resolve)
  93. .catch(reject)
  94. })
  95. })
  96. function after (coerced) {
  97. if (isInvalid(coerced)) {
  98. return
  99. }
  100. if (coerced === false || coerced === true || coerced === null) {
  101. return literal(coerced)
  102. }
  103. if (Array.isArray(coerced)) {
  104. return array(coerced)
  105. }
  106. const type = typeof coerced
  107. switch (type) {
  108. case 'number':
  109. return value(coerced, type)
  110. case 'string':
  111. return value(escapeString(coerced), type)
  112. default:
  113. return object(coerced)
  114. }
  115. }
  116. }
  117. function coerce (datum) {
  118. if (disableCoercions || check.primitive(datum)) {
  119. return Promise.resolve(datum)
  120. }
  121. if (check.instanceStrict(datum, Promise)) {
  122. return coerceThing(datum, 'promises', coercePromise).then(coerce)
  123. }
  124. if (check.instanceStrict(datum, Buffer)) {
  125. return coerceThing(datum, 'buffers', coerceBuffer)
  126. }
  127. if (check.instanceStrict(datum, Map)) {
  128. return coerceThing(datum, 'maps', coerceMap)
  129. }
  130. if (
  131. check.iterable(datum) &&
  132. check.not.string(datum) &&
  133. check.not.array(datum)
  134. ) {
  135. return coerceThing(datum, 'iterables', coerceIterable)
  136. }
  137. if (check.function(datum.toJSON)) {
  138. return Promise.resolve(datum.toJSON())
  139. }
  140. return Promise.resolve(datum)
  141. }
  142. function coerceThing (datum, thing, fn) {
  143. if (coercions[thing]) {
  144. return fn(datum)
  145. }
  146. return Promise.resolve()
  147. }
  148. function coercePromise (p) {
  149. return p
  150. }
  151. function coerceBuffer (buffer) {
  152. return Promise.resolve(buffer.toString())
  153. }
  154. function coerceMap (map) {
  155. const result = {}
  156. return coerceCollection(map, result, (item, key) => {
  157. result[key] = item
  158. })
  159. }
  160. function coerceCollection (coll, target, push) {
  161. coll.forEach(push)
  162. return Promise.resolve(target)
  163. }
  164. function coerceIterable (iterable) {
  165. const result = []
  166. return coerceCollection(iterable, result, item => {
  167. result.push(item)
  168. })
  169. }
  170. function isInvalid (datum) {
  171. const type = typeof datum
  172. return !! invalidTypes[type] || (
  173. type === 'number' && ! isValidNumber(datum)
  174. )
  175. }
  176. function isValidNumber (datum) {
  177. return datum > Number.NEGATIVE_INFINITY && datum < Number.POSITIVE_INFINITY
  178. }
  179. function literal (datum) {
  180. return value(datum, 'literal')
  181. }
  182. function value (datum, type) {
  183. return emit(events[type], datum)
  184. }
  185. function emit (event, eventData) {
  186. return (pause || Promise.resolve())
  187. .then(() => emitter.emit(event, eventData))
  188. .catch(err => {
  189. try {
  190. emitter.emit(events.error, err)
  191. } catch (_) {
  192. // When calling user code, anything is possible
  193. }
  194. })
  195. }
  196. function array (datum) {
  197. // For an array, collection:object and collection:array are the same.
  198. return collection(datum, datum, 'array', item => {
  199. if (isInvalid(item)) {
  200. return proceed(null)
  201. }
  202. return proceed(item)
  203. })
  204. }
  205. function collection (obj, arr, type, action) {
  206. let ignoreThisItem
  207. return Promise.resolve()
  208. .then(() => {
  209. if (references.has(obj)) {
  210. ignoreThisItem = ignoreItems = true
  211. if (! ignoreCircularReferences) {
  212. return emit(events.dataError, new Error('Circular reference.'))
  213. }
  214. } else {
  215. references.set(obj, true)
  216. }
  217. })
  218. .then(() => emit(events[type]))
  219. .then(() => item(0))
  220. function item (index) {
  221. if (index >= arr.length) {
  222. if (ignoreThisItem) {
  223. ignoreItems = false
  224. }
  225. if (ignoreItems) {
  226. return Promise.resolve()
  227. }
  228. return emit(events.endPrefix + events[type])
  229. .then(() => references.delete(obj))
  230. }
  231. if (ignoreItems) {
  232. return item(index + 1)
  233. }
  234. return action(arr[index])
  235. .then(() => item(index + 1))
  236. }
  237. }
  238. function object (datum) {
  239. // For an object, collection:object and collection:array are different.
  240. return collection(datum, Object.keys(datum), 'object', key => {
  241. const item = datum[key]
  242. if (isInvalid(item)) {
  243. return Promise.resolve()
  244. }
  245. return emit(events.property, escapeString(key))
  246. .then(() => proceed(item))
  247. })
  248. }
  249. function escapeString (string) {
  250. string = JSON.stringify(string)
  251. return string.substring(1, string.length - 1)
  252. }
  253. }