Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 1 addition & 119 deletions lib/WebSocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const util = require('util');
const http = require('http');
const https = require('https');
const crypto = require('crypto');
const stream = require('stream');
const Ultron = require('ultron');
const Sender = require('./Sender');
const Receiver = require('./Receiver');
Expand Down Expand Up @@ -222,11 +221,6 @@ WebSocket.prototype.send = function send (data, options, cb) {

if (!data) data = '';

if (this._queue) {
this._queue.push(() => this.send(data, options, cb));
return;
}

options = options || {};
if (options.fin !== false) options.fin = true;

Expand All @@ -241,72 +235,7 @@ WebSocket.prototype.send = function send (data, options, cb) {
options.compress = false;
}

if (data instanceof stream.Readable) {
startQueue(this);

sendStream(this, data, options, (error) => {
process.nextTick(() => executeQueueSends(this));
if (cb) cb(error);
});
} else {
this._sender.send(data, options, cb);
}
};

/**
* Streams data through calls to a user supplied function
*
* @param {Object} Members - mask: boolean, binary: boolean, compress: boolean
* @param {function} 'function (error, send)' which is executed on successive
* ticks of which send is 'function (data, final)'.
* @api public
*/
WebSocket.prototype.stream = function stream (options, cb) {
if (typeof options === 'function') {
cb = options;
options = {};
}

if (!cb) throw new Error('callback must be provided');

if (this.readyState !== WebSocket.OPEN) {
if (cb) cb(new Error('not opened'));
else throw new Error('not opened');
return;
}

if (this._queue) {
this._queue.push(() => this.stream(options, cb));
return;
}

options = options || {};

if (options.mask === undefined) options.mask = !this._isServer;
if (options.compress === undefined) options.compress = true;
if (!this.extensions[PerMessageDeflate.extensionName]) {
options.compress = false;
}

startQueue(this);

const send = (data, final) => {
try {
if (this.readyState !== WebSocket.OPEN) throw new Error('not opened');
options.fin = final === true;
this._sender.send(data, options);
if (!final) process.nextTick(cb, null, send);
else executeQueueSends(this);
} catch (e) {
if (typeof cb === 'function') cb(e);
else {
delete this._queue;
this.emit('error', e);
}
}
};

process.nextTick(cb, null, send);
this._sender.send(data, options, cb);
};

/**
Expand Down Expand Up @@ -845,52 +774,6 @@ function establishConnection (socket, upgradeHead) {
this.emit('open');
}

function startQueue (instance) {
instance._queue = instance._queue || [];
}

function executeQueueSends (instance) {
var queue = instance._queue;
if (queue === undefined) return;

delete instance._queue;
for (var i = 0, l = queue.length; i < l; ++i) {
queue[i]();
}
}

function sendStream (instance, stream, options, cb) {
stream.on('data', function incoming (data) {
if (instance.readyState !== WebSocket.OPEN) {
if (cb) cb(new Error('not opened'));
else {
delete instance._queue;
instance.emit('error', new Error('not opened'));
}
return;
}

options.fin = false;
instance._sender.send(data, options);
});

stream.on('end', function end () {
if (instance.readyState !== WebSocket.OPEN) {
if (cb) cb(new Error('not opened'));
else {
delete instance._queue;
instance.emit('error', new Error('not opened'));
}
return;
}

options.fin = true;
instance._sender.send(null, options);

if (cb) cb(null);
});
}

function cleanupWebsocketResources (error) {
if (this.readyState === WebSocket.CLOSED) return;

Expand Down Expand Up @@ -941,5 +824,4 @@ function cleanupWebsocketResources (error) {

this.removeAllListeners();
this.on('error', function onerror () {}); // catch all errors after this
delete this._queue;
}
Loading