d476a0e994f8e09525edbe61d5d7063e528b0cddc2a4001ee9dbd625e1c6c759bc883603ab70c66fd94e405baf88e9b34a37491d139d825ed1a5f0e2620e4e 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. 'use strict'
  2. var transport = require('../../../spdy-transport')
  3. var utils = transport.utils
  4. var assert = require('assert')
  5. var util = require('util')
  6. var debug = require('debug')('spdy:scheduler')
  7. var Readable = require('readable-stream').Readable
  8. /*
  9. * We create following structure in `pending`:
  10. * [ [ id = 0 ], [ id = 1 ], [ id = 2 ], [ id = 0 ] ]
  11. * chunks chunks chunks chunks
  12. * chunks chunks
  13. * chunks
  14. *
  15. * Then on the `.tick()` pass we pick one chunks from each item and remove the
  16. * item if it is empty:
  17. *
  18. * [ [ id = 0 ], [ id = 2 ] ]
  19. * chunks chunks
  20. * chunks
  21. *
  22. * Writing out: chunks for 0, chunks for 1, chunks for 2, chunks for 0
  23. *
  24. * This way data is interleaved between the different streams.
  25. */
  26. function Scheduler (options) {
  27. Readable.call(this)
  28. // Pretty big window by default
  29. this.window = 0.25
  30. if (options && options.window) { this.window = options.window }
  31. this.sync = []
  32. this.list = []
  33. this.count = 0
  34. this.pendingTick = false
  35. }
  36. util.inherits(Scheduler, Readable)
  37. module.exports = Scheduler
  38. // Just for testing, really
  39. Scheduler.create = function create (options) {
  40. return new Scheduler(options)
  41. }
  42. function insertCompare (a, b) {
  43. return a.priority === b.priority
  44. ? a.stream - b.stream
  45. : b.priority - a.priority
  46. }
  47. Scheduler.prototype.schedule = function schedule (data) {
  48. var priority = data.priority
  49. var stream = data.stream
  50. var chunks = data.chunks
  51. // Synchronous frames should not be interleaved
  52. if (priority === false) {
  53. debug('queue sync', chunks)
  54. this.sync.push(data)
  55. this.count += chunks.length
  56. this._read()
  57. return
  58. }
  59. debug('queue async priority=%d stream=%d', priority, stream, chunks)
  60. var item = new SchedulerItem(stream, priority)
  61. var index = utils.binaryLookup(this.list, item, insertCompare)
  62. // Push new item
  63. if (index >= this.list.length || insertCompare(this.list[index], item) !== 0) {
  64. this.list.splice(index, 0, item)
  65. } else { // Coalesce
  66. item = this.list[index]
  67. }
  68. item.push(data)
  69. this.count += chunks.length
  70. this._read()
  71. }
  72. Scheduler.prototype._read = function _read () {
  73. if (this.count === 0) {
  74. return
  75. }
  76. if (this.pendingTick) {
  77. return
  78. }
  79. this.pendingTick = true
  80. var self = this
  81. process.nextTick(function () {
  82. self.pendingTick = false
  83. self.tick()
  84. })
  85. }
  86. Scheduler.prototype.tick = function tick () {
  87. // No luck for async frames
  88. if (!this.tickSync()) { return false }
  89. return this.tickAsync()
  90. }
  91. Scheduler.prototype.tickSync = function tickSync () {
  92. // Empty sync queue first
  93. var sync = this.sync
  94. var res = true
  95. this.sync = []
  96. for (var i = 0; i < sync.length; i++) {
  97. var item = sync[i]
  98. debug('tick sync pending=%d', this.count, item.chunks)
  99. for (var j = 0; j < item.chunks.length; j++) {
  100. this.count--
  101. // TODO: handle stream backoff properly
  102. try {
  103. res = this.push(item.chunks[j])
  104. } catch (err) {
  105. this.emit('error', err)
  106. return false
  107. }
  108. }
  109. debug('after tick sync pending=%d', this.count)
  110. // TODO(indutny): figure out the way to invoke callback on actual write
  111. if (item.callback) {
  112. item.callback(null)
  113. }
  114. }
  115. return res
  116. }
  117. Scheduler.prototype.tickAsync = function tickAsync () {
  118. var res = true
  119. var list = this.list
  120. if (list.length === 0) {
  121. return res
  122. }
  123. var startPriority = list[0].priority
  124. for (var index = 0; list.length > 0; index++) {
  125. // Loop index
  126. index %= list.length
  127. if (startPriority - list[index].priority > this.window) { index = 0 }
  128. debug('tick async index=%d start=%d', index, startPriority)
  129. var current = list[index]
  130. var item = current.shift()
  131. if (current.isEmpty()) {
  132. list.splice(index, 1)
  133. if (index === 0 && list.length > 0) {
  134. startPriority = list[0].priority
  135. }
  136. index--
  137. }
  138. debug('tick async pending=%d', this.count, item.chunks)
  139. for (var i = 0; i < item.chunks.length; i++) {
  140. this.count--
  141. // TODO: handle stream backoff properly
  142. try {
  143. res = this.push(item.chunks[i])
  144. } catch (err) {
  145. this.emit('error', err)
  146. return false
  147. }
  148. }
  149. debug('after tick pending=%d', this.count)
  150. // TODO(indutny): figure out the way to invoke callback on actual write
  151. if (item.callback) {
  152. item.callback(null)
  153. }
  154. if (!res) { break }
  155. }
  156. return res
  157. }
  158. Scheduler.prototype.dump = function dump () {
  159. this.tickSync()
  160. // Write everything out
  161. while (!this.tickAsync()) {
  162. // Intentional no-op
  163. }
  164. assert.strictEqual(this.count, 0)
  165. }
  166. function SchedulerItem (stream, priority) {
  167. this.stream = stream
  168. this.priority = priority
  169. this.queue = []
  170. }
  171. SchedulerItem.prototype.push = function push (chunks) {
  172. this.queue.push(chunks)
  173. }
  174. SchedulerItem.prototype.shift = function shift () {
  175. return this.queue.shift()
  176. }
  177. SchedulerItem.prototype.isEmpty = function isEmpty () {
  178. return this.queue.length === 0
  179. }