123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- 'use strict'
- var transport = require('../../../spdy-transport')
- var utils = transport.utils
- var assert = require('assert')
- var util = require('util')
- var debug = require('debug')('spdy:scheduler')
- var Readable = require('readable-stream').Readable
- /*
- * We create following structure in `pending`:
- * [ [ id = 0 ], [ id = 1 ], [ id = 2 ], [ id = 0 ] ]
- * chunks chunks chunks chunks
- * chunks chunks
- * chunks
- *
- * Then on the `.tick()` pass we pick one chunks from each item and remove the
- * item if it is empty:
- *
- * [ [ id = 0 ], [ id = 2 ] ]
- * chunks chunks
- * chunks
- *
- * Writing out: chunks for 0, chunks for 1, chunks for 2, chunks for 0
- *
- * This way data is interleaved between the different streams.
- */
- function Scheduler (options) {
- Readable.call(this)
- // Pretty big window by default
- this.window = 0.25
- if (options && options.window) { this.window = options.window }
- this.sync = []
- this.list = []
- this.count = 0
- this.pendingTick = false
- }
- util.inherits(Scheduler, Readable)
- module.exports = Scheduler
- // Just for testing, really
- Scheduler.create = function create (options) {
- return new Scheduler(options)
- }
- function insertCompare (a, b) {
- return a.priority === b.priority
- ? a.stream - b.stream
- : b.priority - a.priority
- }
- Scheduler.prototype.schedule = function schedule (data) {
- var priority = data.priority
- var stream = data.stream
- var chunks = data.chunks
- // Synchronous frames should not be interleaved
- if (priority === false) {
- debug('queue sync', chunks)
- this.sync.push(data)
- this.count += chunks.length
- this._read()
- return
- }
- debug('queue async priority=%d stream=%d', priority, stream, chunks)
- var item = new SchedulerItem(stream, priority)
- var index = utils.binaryLookup(this.list, item, insertCompare)
- // Push new item
- if (index >= this.list.length || insertCompare(this.list[index], item) !== 0) {
- this.list.splice(index, 0, item)
- } else { // Coalesce
- item = this.list[index]
- }
- item.push(data)
- this.count += chunks.length
- this._read()
- }
- Scheduler.prototype._read = function _read () {
- if (this.count === 0) {
- return
- }
- if (this.pendingTick) {
- return
- }
- this.pendingTick = true
- var self = this
- process.nextTick(function () {
- self.pendingTick = false
- self.tick()
- })
- }
- Scheduler.prototype.tick = function tick () {
- // No luck for async frames
- if (!this.tickSync()) { return false }
- return this.tickAsync()
- }
- Scheduler.prototype.tickSync = function tickSync () {
- // Empty sync queue first
- var sync = this.sync
- var res = true
- this.sync = []
- for (var i = 0; i < sync.length; i++) {
- var item = sync[i]
- debug('tick sync pending=%d', this.count, item.chunks)
- for (var j = 0; j < item.chunks.length; j++) {
- this.count--
- // TODO: handle stream backoff properly
- try {
- res = this.push(item.chunks[j])
- } catch (err) {
- this.emit('error', err)
- return false
- }
- }
- debug('after tick sync pending=%d', this.count)
- // TODO(indutny): figure out the way to invoke callback on actual write
- if (item.callback) {
- item.callback(null)
- }
- }
- return res
- }
- Scheduler.prototype.tickAsync = function tickAsync () {
- var res = true
- var list = this.list
- if (list.length === 0) {
- return res
- }
- var startPriority = list[0].priority
- for (var index = 0; list.length > 0; index++) {
- // Loop index
- index %= list.length
- if (startPriority - list[index].priority > this.window) { index = 0 }
- debug('tick async index=%d start=%d', index, startPriority)
- var current = list[index]
- var item = current.shift()
- if (current.isEmpty()) {
- list.splice(index, 1)
- if (index === 0 && list.length > 0) {
- startPriority = list[0].priority
- }
- index--
- }
- debug('tick async pending=%d', this.count, item.chunks)
- for (var i = 0; i < item.chunks.length; i++) {
- this.count--
- // TODO: handle stream backoff properly
- try {
- res = this.push(item.chunks[i])
- } catch (err) {
- this.emit('error', err)
- return false
- }
- }
- debug('after tick pending=%d', this.count)
- // TODO(indutny): figure out the way to invoke callback on actual write
- if (item.callback) {
- item.callback(null)
- }
- if (!res) { break }
- }
- return res
- }
- Scheduler.prototype.dump = function dump () {
- this.tickSync()
- // Write everything out
- while (!this.tickAsync()) {
- // Intentional no-op
- }
- assert.strictEqual(this.count, 0)
- }
- function SchedulerItem (stream, priority) {
- this.stream = stream
- this.priority = priority
- this.queue = []
- }
- SchedulerItem.prototype.push = function push (chunks) {
- this.queue.push(chunks)
- }
- SchedulerItem.prototype.shift = function shift () {
- return this.queue.shift()
- }
- SchedulerItem.prototype.isEmpty = function isEmpty () {
- return this.queue.length === 0
- }
|