| 'use strict' |
| |
| var reusify = require('reusify') |
| |
| function fastqueue (context, worker, concurrency) { |
| if (typeof context === 'function') { |
| concurrency = worker |
| worker = context |
| context = null |
| } |
| |
| var cache = reusify(Task) |
| var queueHead = null |
| var queueTail = null |
| var _running = 0 |
| var errorHandler = null |
| |
| var self = { |
| push: push, |
| drain: noop, |
| saturated: noop, |
| pause: pause, |
| paused: false, |
| concurrency: concurrency, |
| running: running, |
| resume: resume, |
| idle: idle, |
| length: length, |
| getQueue: getQueue, |
| unshift: unshift, |
| empty: noop, |
| kill: kill, |
| killAndDrain: killAndDrain, |
| error: error |
| } |
| |
| return self |
| |
| function running () { |
| return _running |
| } |
| |
| function pause () { |
| self.paused = true |
| } |
| |
| function length () { |
| var current = queueHead |
| var counter = 0 |
| |
| while (current) { |
| current = current.next |
| counter++ |
| } |
| |
| return counter |
| } |
| |
| function getQueue () { |
| var current = queueHead |
| var tasks = [] |
| |
| while (current) { |
| tasks.push(current.value) |
| current = current.next |
| } |
| |
| return tasks |
| } |
| |
| function resume () { |
| if (!self.paused) return |
| self.paused = false |
| for (var i = 0; i < self.concurrency; i++) { |
| _running++ |
| release() |
| } |
| } |
| |
| function idle () { |
| return _running === 0 && self.length() === 0 |
| } |
| |
| function push (value, done) { |
| var current = cache.get() |
| |
| current.context = context |
| current.release = release |
| current.value = value |
| current.callback = done || noop |
| current.errorHandler = errorHandler |
| |
| if (_running === self.concurrency || self.paused) { |
| if (queueTail) { |
| queueTail.next = current |
| queueTail = current |
| } else { |
| queueHead = current |
| queueTail = current |
| self.saturated() |
| } |
| } else { |
| _running++ |
| worker.call(context, current.value, current.worked) |
| } |
| } |
| |
| function unshift (value, done) { |
| var current = cache.get() |
| |
| current.context = context |
| current.release = release |
| current.value = value |
| current.callback = done || noop |
| |
| if (_running === self.concurrency || self.paused) { |
| if (queueHead) { |
| current.next = queueHead |
| queueHead = current |
| } else { |
| queueHead = current |
| queueTail = current |
| self.saturated() |
| } |
| } else { |
| _running++ |
| worker.call(context, current.value, current.worked) |
| } |
| } |
| |
| function release (holder) { |
| if (holder) { |
| cache.release(holder) |
| } |
| var next = queueHead |
| if (next) { |
| if (!self.paused) { |
| if (queueTail === queueHead) { |
| queueTail = null |
| } |
| queueHead = next.next |
| next.next = null |
| worker.call(context, next.value, next.worked) |
| if (queueTail === null) { |
| self.empty() |
| } |
| } else { |
| _running-- |
| } |
| } else if (--_running === 0) { |
| self.drain() |
| } |
| } |
| |
| function kill () { |
| queueHead = null |
| queueTail = null |
| self.drain = noop |
| } |
| |
| function killAndDrain () { |
| queueHead = null |
| queueTail = null |
| self.drain() |
| self.drain = noop |
| } |
| |
| function error (handler) { |
| errorHandler = handler |
| } |
| } |
| |
| function noop () {} |
| |
| function Task () { |
| this.value = null |
| this.callback = noop |
| this.next = null |
| this.release = noop |
| this.context = null |
| this.errorHandler = null |
| |
| var self = this |
| |
| this.worked = function worked (err, result) { |
| var callback = self.callback |
| var errorHandler = self.errorHandler |
| var val = self.value |
| self.value = null |
| self.callback = noop |
| if (self.errorHandler) { |
| errorHandler(err, val) |
| } |
| callback.call(self.context, err, result) |
| self.release(self) |
| } |
| } |
| |
| module.exports = fastqueue |