| 'use strict'; |
| |
| /*! |
| * ws: a node.js websocket client |
| * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com> |
| * MIT Licensed |
| */ |
| |
| var url = require('url') |
| , util = require('util') |
| , http = require('http') |
| , https = require('https') |
| , crypto = require('crypto') |
| , stream = require('stream') |
| , Ultron = require('ultron') |
| , Options = require('options') |
| , Sender = require('./Sender') |
| , Receiver = require('./Receiver') |
| , SenderHixie = require('./Sender.hixie') |
| , ReceiverHixie = require('./Receiver.hixie') |
| , Extensions = require('./Extensions') |
| , PerMessageDeflate = require('./PerMessageDeflate') |
| , EventEmitter = require('events').EventEmitter; |
| |
| /** |
| * Constants |
| */ |
| |
| // Default protocol version |
| |
| var protocolVersion = 13; |
| |
| // Close timeout |
| |
| var closeTimeout = 30 * 1000; // Allow 30 seconds to terminate the connection cleanly |
| |
| /** |
| * WebSocket implementation |
| * |
| * @constructor |
| * @param {String} address Connection address. |
| * @param {String|Array} protocols WebSocket protocols. |
| * @param {Object} options Additional connection options. |
| * @api public |
| */ |
| function WebSocket(address, protocols, options) { |
| if (this instanceof WebSocket === false) { |
| return new WebSocket(address, protocols, options); |
| } |
| |
| EventEmitter.call(this); |
| |
| if (protocols && !Array.isArray(protocols) && 'object' === typeof protocols) { |
| // accept the "options" Object as the 2nd argument |
| options = protocols; |
| protocols = null; |
| } |
| |
| if ('string' === typeof protocols) { |
| protocols = [ protocols ]; |
| } |
| |
| if (!Array.isArray(protocols)) { |
| protocols = []; |
| } |
| |
| this._socket = null; |
| this._ultron = null; |
| this._closeReceived = false; |
| this.bytesReceived = 0; |
| this.readyState = null; |
| this.supports = {}; |
| this.extensions = {}; |
| |
| if (Array.isArray(address)) { |
| initAsServerClient.apply(this, address.concat(options)); |
| } else { |
| initAsClient.apply(this, [address, protocols, options]); |
| } |
| } |
| |
| /** |
| * Inherits from EventEmitter. |
| */ |
| util.inherits(WebSocket, EventEmitter); |
| |
| /** |
| * Ready States |
| */ |
| ["CONNECTING", "OPEN", "CLOSING", "CLOSED"].forEach(function each(state, index) { |
| WebSocket.prototype[state] = WebSocket[state] = index; |
| }); |
| |
| /** |
| * Gracefully closes the connection, after sending a description message to the server |
| * |
| * @param {Object} data to be sent to the server |
| * @api public |
| */ |
| WebSocket.prototype.close = function close(code, data) { |
| if (this.readyState === WebSocket.CLOSED) return; |
| |
| if (this.readyState === WebSocket.CONNECTING) { |
| this.readyState = WebSocket.CLOSED; |
| return; |
| } |
| |
| if (this.readyState === WebSocket.CLOSING) { |
| if (this._closeReceived && this._isServer) { |
| this.terminate(); |
| } |
| return; |
| } |
| |
| var self = this; |
| try { |
| this.readyState = WebSocket.CLOSING; |
| this._closeCode = code; |
| this._closeMessage = data; |
| var mask = !this._isServer; |
| this._sender.close(code, data, mask, function(err) { |
| if (err) self.emit('error', err); |
| |
| if (self._closeReceived && self._isServer) { |
| self.terminate(); |
| } else { |
| // ensure that the connection is cleaned up even when no response of closing handshake. |
| clearTimeout(self._closeTimer); |
| self._closeTimer = setTimeout(cleanupWebsocketResources.bind(self, true), closeTimeout); |
| } |
| }); |
| } catch (e) { |
| this.emit('error', e); |
| } |
| }; |
| |
| /** |
| * Pause the client stream |
| * |
| * @api public |
| */ |
| WebSocket.prototype.pause = function pauser() { |
| if (this.readyState !== WebSocket.OPEN) throw new Error('not opened'); |
| |
| return this._socket.pause(); |
| }; |
| |
| /** |
| * Sends a ping |
| * |
| * @param {Object} data to be sent to the server |
| * @param {Object} Members - mask: boolean, binary: boolean |
| * @param {boolean} dontFailWhenClosed indicates whether or not to throw if the connection isnt open |
| * @api public |
| */ |
| WebSocket.prototype.ping = function ping(data, options, dontFailWhenClosed) { |
| if (this.readyState !== WebSocket.OPEN) { |
| if (dontFailWhenClosed === true) return; |
| throw new Error('not opened'); |
| } |
| |
| options = options || {}; |
| |
| if (typeof options.mask === 'undefined') options.mask = !this._isServer; |
| |
| this._sender.ping(data, options); |
| }; |
| |
| /** |
| * Sends a pong |
| * |
| * @param {Object} data to be sent to the server |
| * @param {Object} Members - mask: boolean, binary: boolean |
| * @param {boolean} dontFailWhenClosed indicates whether or not to throw if the connection isnt open |
| * @api public |
| */ |
| WebSocket.prototype.pong = function(data, options, dontFailWhenClosed) { |
| if (this.readyState !== WebSocket.OPEN) { |
| if (dontFailWhenClosed === true) return; |
| throw new Error('not opened'); |
| } |
| |
| options = options || {}; |
| |
| if (typeof options.mask === 'undefined') options.mask = !this._isServer; |
| |
| this._sender.pong(data, options); |
| }; |
| |
| /** |
| * Resume the client stream |
| * |
| * @api public |
| */ |
| WebSocket.prototype.resume = function resume() { |
| if (this.readyState !== WebSocket.OPEN) throw new Error('not opened'); |
| |
| return this._socket.resume(); |
| }; |
| |
| /** |
| * Sends a piece of data |
| * |
| * @param {Object} data to be sent to the server |
| * @param {Object} Members - mask: boolean, binary: boolean, compress: boolean |
| * @param {function} Optional callback which is executed after the send completes |
| * @api public |
| */ |
| |
| WebSocket.prototype.send = function send(data, options, cb) { |
| if (typeof options === 'function') { |
| cb = options; |
| options = {}; |
| } |
| |
| if (this.readyState !== WebSocket.OPEN) { |
| if (typeof cb === 'function') cb(new Error('not opened')); |
| else throw new Error('not opened'); |
| return; |
| } |
| |
| if (!data) data = ''; |
| if (this._queue) { |
| var self = this; |
| this._queue.push(function() { self.send(data, options, cb); }); |
| return; |
| } |
| |
| options = options || {}; |
| options.fin = true; |
| |
| if (typeof options.binary === 'undefined') { |
| options.binary = (data instanceof ArrayBuffer || data instanceof Buffer || |
| data instanceof Uint8Array || |
| data instanceof Uint16Array || |
| data instanceof Uint32Array || |
| data instanceof Int8Array || |
| data instanceof Int16Array || |
| data instanceof Int32Array || |
| data instanceof Float32Array || |
| data instanceof Float64Array); |
| } |
| |
| if (typeof options.mask === 'undefined') options.mask = !this._isServer; |
| if (typeof options.compress === 'undefined') options.compress = true; |
| if (!this.extensions[PerMessageDeflate.extensionName]) { |
| options.compress = false; |
| } |
| |
| var readable = typeof stream.Readable === 'function' |
| ? stream.Readable |
| : stream.Stream; |
| |
| if (data instanceof readable) { |
| startQueue(this); |
| var self = this; |
| |
| sendStream(this, data, options, function send(error) { |
| process.nextTick(function tock() { |
| executeQueueSends(self); |
| }); |
| |
| if (typeof cb === 'function') cb(error); |
| }); |
| } else { |
| this._sender.send(data, options, cb); |
| } |
| }; |
| |
| /** |
| * Streams data through calls to a user supplied function |
| * |
| * @param {Object} Members - mask: boolean, binary: boolean, compress: boolean |
| * @param {function} 'function (error, send)' which is executed on successive ticks of which send is 'function (data, final)'. |
| * @api public |
| */ |
| WebSocket.prototype.stream = function stream(options, cb) { |
| if (typeof options === 'function') { |
| cb = options; |
| options = {}; |
| } |
| |
| var self = this; |
| |
| if (typeof cb !== 'function') throw new Error('callback must be provided'); |
| |
| if (this.readyState !== WebSocket.OPEN) { |
| if (typeof cb === 'function') cb(new Error('not opened')); |
| else throw new Error('not opened'); |
| return; |
| } |
| |
| if (this._queue) { |
| this._queue.push(function () { self.stream(options, cb); }); |
| return; |
| } |
| |
| options = options || {}; |
| |
| if (typeof options.mask === 'undefined') options.mask = !this._isServer; |
| if (typeof options.compress === 'undefined') options.compress = true; |
| if (!this.extensions[PerMessageDeflate.extensionName]) { |
| options.compress = false; |
| } |
| |
| startQueue(this); |
| |
| function send(data, final) { |
| try { |
| if (self.readyState !== WebSocket.OPEN) throw new Error('not opened'); |
| options.fin = final === true; |
| self._sender.send(data, options); |
| if (!final) process.nextTick(cb.bind(null, null, send)); |
| else executeQueueSends(self); |
| } catch (e) { |
| if (typeof cb === 'function') cb(e); |
| else { |
| delete self._queue; |
| self.emit('error', e); |
| } |
| } |
| } |
| |
| process.nextTick(cb.bind(null, null, send)); |
| }; |
| |
| /** |
| * Immediately shuts down the connection |
| * |
| * @api public |
| */ |
| WebSocket.prototype.terminate = function terminate() { |
| if (this.readyState === WebSocket.CLOSED) return; |
| |
| if (this._socket) { |
| this.readyState = WebSocket.CLOSING; |
| |
| // End the connection |
| try { this._socket.end(); } |
| catch (e) { |
| // Socket error during end() call, so just destroy it right now |
| cleanupWebsocketResources.call(this, true); |
| return; |
| } |
| |
| // Add a timeout to ensure that the connection is completely |
| // cleaned up within 30 seconds, even if the clean close procedure |
| // fails for whatever reason |
| // First cleanup any pre-existing timeout from an earlier "terminate" call, |
| // if one exists. Otherwise terminate calls in quick succession will leak timeouts |
| // and hold the program open for `closeTimout` time. |
| if (this._closeTimer) { clearTimeout(this._closeTimer); } |
| this._closeTimer = setTimeout(cleanupWebsocketResources.bind(this, true), closeTimeout); |
| } else if (this.readyState === WebSocket.CONNECTING) { |
| cleanupWebsocketResources.call(this, true); |
| } |
| }; |
| |
| /** |
| * Expose bufferedAmount |
| * |
| * @api public |
| */ |
| Object.defineProperty(WebSocket.prototype, 'bufferedAmount', { |
| get: function get() { |
| var amount = 0; |
| if (this._socket) { |
| amount = this._socket.bufferSize || 0; |
| } |
| return amount; |
| } |
| }); |
| |
| /** |
| * Emulates the W3C Browser based WebSocket interface using function members. |
| * |
| * @see http://dev.w3.org/html5/websockets/#the-websocket-interface |
| * @api public |
| */ |
| ['open', 'error', 'close', 'message'].forEach(function(method) { |
| Object.defineProperty(WebSocket.prototype, 'on' + method, { |
| /** |
| * Returns the current listener |
| * |
| * @returns {Mixed} the set function or undefined |
| * @api public |
| */ |
| get: function get() { |
| var listener = this.listeners(method)[0]; |
| return listener ? (listener._listener ? listener._listener : listener) : undefined; |
| }, |
| |
| /** |
| * Start listening for events |
| * |
| * @param {Function} listener the listener |
| * @returns {Mixed} the set function or undefined |
| * @api public |
| */ |
| set: function set(listener) { |
| this.removeAllListeners(method); |
| this.addEventListener(method, listener); |
| } |
| }); |
| }); |
| |
| /** |
| * Emulates the W3C Browser based WebSocket interface using addEventListener. |
| * |
| * @see https://developer.mozilla.org/en/DOM/element.addEventListener |
| * @see http://dev.w3.org/html5/websockets/#the-websocket-interface |
| * @api public |
| */ |
| WebSocket.prototype.addEventListener = function(method, listener) { |
| var target = this; |
| |
| function onMessage (data, flags) { |
| listener.call(target, new MessageEvent(data, !!flags.binary, target)); |
| } |
| |
| function onClose (code, message) { |
| listener.call(target, new CloseEvent(code, message, target)); |
| } |
| |
| function onError (event) { |
| event.type = 'error'; |
| event.target = target; |
| listener.call(target, event); |
| } |
| |
| function onOpen () { |
| listener.call(target, new OpenEvent(target)); |
| } |
| |
| if (typeof listener === 'function') { |
| if (method === 'message') { |
| // store a reference so we can return the original function from the |
| // addEventListener hook |
| onMessage._listener = listener; |
| this.on(method, onMessage); |
| } else if (method === 'close') { |
| // store a reference so we can return the original function from the |
| // addEventListener hook |
| onClose._listener = listener; |
| this.on(method, onClose); |
| } else if (method === 'error') { |
| // store a reference so we can return the original function from the |
| // addEventListener hook |
| onError._listener = listener; |
| this.on(method, onError); |
| } else if (method === 'open') { |
| // store a reference so we can return the original function from the |
| // addEventListener hook |
| onOpen._listener = listener; |
| this.on(method, onOpen); |
| } else { |
| this.on(method, listener); |
| } |
| } |
| }; |
| |
| module.exports = WebSocket; |
| module.exports.buildHostHeader = buildHostHeader |
| |
| /** |
| * W3C MessageEvent |
| * |
| * @see http://www.w3.org/TR/html5/comms.html |
| * @constructor |
| * @api private |
| */ |
| function MessageEvent(dataArg, isBinary, target) { |
| this.type = 'message'; |
| this.data = dataArg; |
| this.target = target; |
| this.binary = isBinary; // non-standard. |
| } |
| |
| /** |
| * W3C CloseEvent |
| * |
| * @see http://www.w3.org/TR/html5/comms.html |
| * @constructor |
| * @api private |
| */ |
| function CloseEvent(code, reason, target) { |
| this.type = 'close'; |
| this.wasClean = (typeof code === 'undefined' || code === 1000); |
| this.code = code; |
| this.reason = reason; |
| this.target = target; |
| } |
| |
| /** |
| * W3C OpenEvent |
| * |
| * @see http://www.w3.org/TR/html5/comms.html |
| * @constructor |
| * @api private |
| */ |
| function OpenEvent(target) { |
| this.type = 'open'; |
| this.target = target; |
| } |
| |
| // Append port number to Host header, only if specified in the url |
| // and non-default |
| function buildHostHeader(isSecure, hostname, port) { |
| var headerHost = hostname; |
| if (hostname) { |
| if ((isSecure && (port != 443)) || (!isSecure && (port != 80))){ |
| headerHost = headerHost + ':' + port; |
| } |
| } |
| return headerHost; |
| } |
| |
| /** |
| * Entirely private apis, |
| * which may or may not be bound to a sepcific WebSocket instance. |
| */ |
| function initAsServerClient(req, socket, upgradeHead, options) { |
| options = new Options({ |
| protocolVersion: protocolVersion, |
| protocol: null, |
| extensions: {} |
| }).merge(options); |
| |
| // expose state properties |
| this.protocol = options.value.protocol; |
| this.protocolVersion = options.value.protocolVersion; |
| this.extensions = options.value.extensions; |
| this.supports.binary = (this.protocolVersion !== 'hixie-76'); |
| this.upgradeReq = req; |
| this.readyState = WebSocket.CONNECTING; |
| this._isServer = true; |
| |
| // establish connection |
| if (options.value.protocolVersion === 'hixie-76') { |
| establishConnection.call(this, ReceiverHixie, SenderHixie, socket, upgradeHead); |
| } else { |
| establishConnection.call(this, Receiver, Sender, socket, upgradeHead); |
| } |
| } |
| |
| function initAsClient(address, protocols, options) { |
| options = new Options({ |
| origin: null, |
| protocolVersion: protocolVersion, |
| host: null, |
| headers: null, |
| protocol: protocols.join(','), |
| agent: null, |
| |
| // ssl-related options |
| pfx: null, |
| key: null, |
| passphrase: null, |
| cert: null, |
| ca: null, |
| ciphers: null, |
| rejectUnauthorized: null, |
| perMessageDeflate: true, |
| localAddress: null |
| }).merge(options); |
| |
| if (options.value.protocolVersion !== 8 && options.value.protocolVersion !== 13) { |
| throw new Error('unsupported protocol version'); |
| } |
| |
| // verify URL and establish http class |
| var serverUrl = url.parse(address); |
| var isUnixSocket = serverUrl.protocol === 'ws+unix:'; |
| if (!serverUrl.host && !isUnixSocket) throw new Error('invalid url'); |
| var isSecure = serverUrl.protocol === 'wss:' || serverUrl.protocol === 'https:'; |
| var httpObj = isSecure ? https : http; |
| var port = serverUrl.port || (isSecure ? 443 : 80); |
| var auth = serverUrl.auth; |
| |
| // prepare extensions |
| var extensionsOffer = {}; |
| var perMessageDeflate; |
| if (options.value.perMessageDeflate) { |
| perMessageDeflate = new PerMessageDeflate(typeof options.value.perMessageDeflate !== true ? options.value.perMessageDeflate : {}, false); |
| extensionsOffer[PerMessageDeflate.extensionName] = perMessageDeflate.offer(); |
| } |
| |
| // expose state properties |
| this._isServer = false; |
| this.url = address; |
| this.protocolVersion = options.value.protocolVersion; |
| this.supports.binary = (this.protocolVersion !== 'hixie-76'); |
| |
| // begin handshake |
| var key = new Buffer(options.value.protocolVersion + '-' + Date.now()).toString('base64'); |
| var shasum = crypto.createHash('sha1'); |
| shasum.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'); |
| var expectedServerKey = shasum.digest('base64'); |
| |
| var agent = options.value.agent; |
| |
| var headerHost = buildHostHeader(isSecure, serverUrl.hostname, port) |
| |
| var requestOptions = { |
| port: port, |
| host: serverUrl.hostname, |
| headers: { |
| 'Connection': 'Upgrade', |
| 'Upgrade': 'websocket', |
| 'Host': headerHost, |
| 'Sec-WebSocket-Version': options.value.protocolVersion, |
| 'Sec-WebSocket-Key': key |
| } |
| }; |
| |
| // If we have basic auth. |
| if (auth) { |
| requestOptions.headers.Authorization = 'Basic ' + new Buffer(auth).toString('base64'); |
| } |
| |
| if (options.value.protocol) { |
| requestOptions.headers['Sec-WebSocket-Protocol'] = options.value.protocol; |
| } |
| |
| if (options.value.host) { |
| requestOptions.headers.Host = options.value.host; |
| } |
| |
| if (options.value.headers) { |
| for (var header in options.value.headers) { |
| if (options.value.headers.hasOwnProperty(header)) { |
| requestOptions.headers[header] = options.value.headers[header]; |
| } |
| } |
| } |
| |
| if (Object.keys(extensionsOffer).length) { |
| requestOptions.headers['Sec-WebSocket-Extensions'] = Extensions.format(extensionsOffer); |
| } |
| |
| if (options.isDefinedAndNonNull('pfx') |
| || options.isDefinedAndNonNull('key') |
| || options.isDefinedAndNonNull('passphrase') |
| || options.isDefinedAndNonNull('cert') |
| || options.isDefinedAndNonNull('ca') |
| || options.isDefinedAndNonNull('ciphers') |
| || options.isDefinedAndNonNull('rejectUnauthorized')) { |
| |
| if (options.isDefinedAndNonNull('pfx')) requestOptions.pfx = options.value.pfx; |
| if (options.isDefinedAndNonNull('key')) requestOptions.key = options.value.key; |
| if (options.isDefinedAndNonNull('passphrase')) requestOptions.passphrase = options.value.passphrase; |
| if (options.isDefinedAndNonNull('cert')) requestOptions.cert = options.value.cert; |
| if (options.isDefinedAndNonNull('ca')) requestOptions.ca = options.value.ca; |
| if (options.isDefinedAndNonNull('ciphers')) requestOptions.ciphers = options.value.ciphers; |
| if (options.isDefinedAndNonNull('rejectUnauthorized')) requestOptions.rejectUnauthorized = options.value.rejectUnauthorized; |
| |
| if (!agent) { |
| // global agent ignores client side certificates |
| agent = new httpObj.Agent(requestOptions); |
| } |
| } |
| |
| requestOptions.path = serverUrl.path || '/'; |
| |
| if (agent) { |
| requestOptions.agent = agent; |
| } |
| |
| if (isUnixSocket) { |
| requestOptions.socketPath = serverUrl.pathname; |
| } |
| |
| if (options.value.localAddress) { |
| requestOptions.localAddress = options.value.localAddress; |
| } |
| |
| if (options.value.origin) { |
| if (options.value.protocolVersion < 13) requestOptions.headers['Sec-WebSocket-Origin'] = options.value.origin; |
| else requestOptions.headers.Origin = options.value.origin; |
| } |
| |
| var self = this; |
| var req = httpObj.request(requestOptions); |
| |
| req.on('error', function onerror(error) { |
| self.emit('error', error); |
| cleanupWebsocketResources.call(self, error); |
| }); |
| |
| req.once('response', function response(res) { |
| var error; |
| |
| if (!self.emit('unexpected-response', req, res)) { |
| error = new Error('unexpected server response (' + res.statusCode + ')'); |
| req.abort(); |
| self.emit('error', error); |
| } |
| |
| cleanupWebsocketResources.call(self, error); |
| }); |
| |
| req.once('upgrade', function upgrade(res, socket, upgradeHead) { |
| if (self.readyState === WebSocket.CLOSED) { |
| // client closed before server accepted connection |
| self.emit('close'); |
| self.removeAllListeners(); |
| socket.end(); |
| return; |
| } |
| |
| var serverKey = res.headers['sec-websocket-accept']; |
| if (typeof serverKey === 'undefined' || serverKey !== expectedServerKey) { |
| self.emit('error', 'invalid server key'); |
| self.removeAllListeners(); |
| socket.end(); |
| return; |
| } |
| |
| var serverProt = res.headers['sec-websocket-protocol']; |
| var protList = (options.value.protocol || "").split(/, */); |
| var protError = null; |
| |
| if (!options.value.protocol && serverProt) { |
| protError = 'server sent a subprotocol even though none requested'; |
| } else if (options.value.protocol && !serverProt) { |
| protError = 'server sent no subprotocol even though requested'; |
| } else if (serverProt && protList.indexOf(serverProt) === -1) { |
| protError = 'server responded with an invalid protocol'; |
| } |
| |
| if (protError) { |
| self.emit('error', protError); |
| self.removeAllListeners(); |
| socket.end(); |
| return; |
| } else if (serverProt) { |
| self.protocol = serverProt; |
| } |
| |
| var serverExtensions = Extensions.parse(res.headers['sec-websocket-extensions']); |
| if (perMessageDeflate && serverExtensions[PerMessageDeflate.extensionName]) { |
| try { |
| perMessageDeflate.accept(serverExtensions[PerMessageDeflate.extensionName]); |
| } catch (err) { |
| self.emit('error', 'invalid extension parameter'); |
| self.removeAllListeners(); |
| socket.end(); |
| return; |
| } |
| self.extensions[PerMessageDeflate.extensionName] = perMessageDeflate; |
| } |
| |
| establishConnection.call(self, Receiver, Sender, socket, upgradeHead); |
| |
| // perform cleanup on http resources |
| req.removeAllListeners(); |
| req = null; |
| agent = null; |
| }); |
| |
| req.end(); |
| this.readyState = WebSocket.CONNECTING; |
| } |
| |
| function establishConnection(ReceiverClass, SenderClass, socket, upgradeHead) { |
| var ultron = this._ultron = new Ultron(socket) |
| , called = false |
| , self = this; |
| |
| socket.setTimeout(0); |
| socket.setNoDelay(true); |
| |
| this._receiver = new ReceiverClass(this.extensions); |
| this._socket = socket; |
| |
| // socket cleanup handlers |
| ultron.on('end', cleanupWebsocketResources.bind(this)); |
| ultron.on('close', cleanupWebsocketResources.bind(this)); |
| ultron.on('error', cleanupWebsocketResources.bind(this)); |
| |
| // ensure that the upgradeHead is added to the receiver |
| function firstHandler(data) { |
| if (called || self.readyState === WebSocket.CLOSED) return; |
| |
| called = true; |
| socket.removeListener('data', firstHandler); |
| ultron.on('data', realHandler); |
| |
| if (upgradeHead && upgradeHead.length > 0) { |
| realHandler(upgradeHead); |
| upgradeHead = null; |
| } |
| |
| if (data) realHandler(data); |
| } |
| |
| // subsequent packets are pushed straight to the receiver |
| function realHandler(data) { |
| self.bytesReceived += data.length; |
| self._receiver.add(data); |
| } |
| |
| ultron.on('data', firstHandler); |
| |
| // if data was passed along with the http upgrade, |
| // this will schedule a push of that on to the receiver. |
| // this has to be done on next tick, since the caller |
| // hasn't had a chance to set event handlers on this client |
| // object yet. |
| process.nextTick(firstHandler); |
| |
| // receiver event handlers |
| self._receiver.ontext = function ontext(data, flags) { |
| flags = flags || {}; |
| |
| self.emit('message', data, flags); |
| }; |
| |
| self._receiver.onbinary = function onbinary(data, flags) { |
| flags = flags || {}; |
| |
| flags.binary = true; |
| self.emit('message', data, flags); |
| }; |
| |
| self._receiver.onping = function onping(data, flags) { |
| flags = flags || {}; |
| |
| self.pong(data, { |
| mask: !self._isServer, |
| binary: flags.binary === true |
| }, true); |
| |
| self.emit('ping', data, flags); |
| }; |
| |
| self._receiver.onpong = function onpong(data, flags) { |
| self.emit('pong', data, flags || {}); |
| }; |
| |
| self._receiver.onclose = function onclose(code, data, flags) { |
| flags = flags || {}; |
| |
| self._closeReceived = true; |
| self.close(code, data); |
| }; |
| |
| self._receiver.onerror = function onerror(reason, errorCode) { |
| // close the connection when the receiver reports a HyBi error code |
| self.close(typeof errorCode !== 'undefined' ? errorCode : 1002, ''); |
| self.emit('error', reason, errorCode); |
| }; |
| |
| // finalize the client |
| this._sender = new SenderClass(socket, this.extensions); |
| this._sender.on('error', function onerror(error) { |
| self.close(1002, ''); |
| self.emit('error', error); |
| }); |
| |
| this.readyState = WebSocket.OPEN; |
| this.emit('open'); |
| } |
| |
| function startQueue(instance) { |
| instance._queue = instance._queue || []; |
| } |
| |
| function executeQueueSends(instance) { |
| var queue = instance._queue; |
| if (typeof queue === 'undefined') return; |
| |
| delete instance._queue; |
| for (var i = 0, l = queue.length; i < l; ++i) { |
| queue[i](); |
| } |
| } |
| |
| function sendStream(instance, stream, options, cb) { |
| stream.on('data', function incoming(data) { |
| if (instance.readyState !== WebSocket.OPEN) { |
| if (typeof cb === 'function') cb(new Error('not opened')); |
| else { |
| delete instance._queue; |
| instance.emit('error', new Error('not opened')); |
| } |
| return; |
| } |
| |
| options.fin = false; |
| instance._sender.send(data, options); |
| }); |
| |
| stream.on('end', function end() { |
| if (instance.readyState !== WebSocket.OPEN) { |
| if (typeof cb === 'function') cb(new Error('not opened')); |
| else { |
| delete instance._queue; |
| instance.emit('error', new Error('not opened')); |
| } |
| return; |
| } |
| |
| options.fin = true; |
| instance._sender.send(null, options); |
| |
| if (typeof cb === 'function') cb(null); |
| }); |
| } |
| |
| function cleanupWebsocketResources(error) { |
| if (this.readyState === WebSocket.CLOSED) return; |
| |
| var emitClose = this.readyState !== WebSocket.CONNECTING; |
| this.readyState = WebSocket.CLOSED; |
| |
| clearTimeout(this._closeTimer); |
| this._closeTimer = null; |
| |
| if (emitClose) { |
| // If the connection was closed abnormally (with an error), |
| // then the close code must default to 1006. |
| if (error) { |
| this._closeCode = 1006; |
| } |
| this.emit('close', this._closeCode || 1000, this._closeMessage || ''); |
| } |
| |
| if (this._socket) { |
| if (this._ultron) this._ultron.destroy(); |
| this._socket.on('error', function onerror() { |
| try { this.destroy(); } |
| catch (e) {} |
| }); |
| |
| try { |
| if (!error) this._socket.end(); |
| else this._socket.destroy(); |
| } catch (e) { /* Ignore termination errors */ } |
| |
| this._socket = null; |
| this._ultron = null; |
| } |
| |
| if (this._sender) { |
| this._sender.removeAllListeners(); |
| this._sender = null; |
| } |
| |
| if (this._receiver) { |
| this._receiver.cleanup(); |
| this._receiver = null; |
| } |
| |
| this.removeAllListeners(); |
| this.on('error', function onerror() {}); // catch all errors after this |
| delete this._queue; |
| } |