Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions core/websock.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ export default class Websock {
this._rQi = 0; // Receive queue index
this._rQlen = 0; // Next write position in the receive queue
this._rQbufferSize = 1024 * 1024 * 4; // Receive queue buffer size (4 MiB)
this._rQmax = this._rQbufferSize / 8;
// called in init: this._rQ = new Uint8Array(this._rQbufferSize);
this._rQ = null; // Receive queue

Expand Down Expand Up @@ -226,15 +225,15 @@ export default class Websock {
}

_expand_compact_rQ(min_fit) {
const resizeNeeded = min_fit || this.rQlen > this._rQbufferSize / 2;
// if we're using less than 1/8th of the buffer even with the incoming bytes, compact in place
// instead of resizing
const required_buffer_size = (this._rQlen - this._rQi + min_fit) * 8;
const resizeNeeded = this._rQbufferSize < required_buffer_size;

if (resizeNeeded) {
if (!min_fit) {
// just double the size if we need to do compaction
this._rQbufferSize *= 2;
} else {
// otherwise, make sure we satisy rQlen - rQi + min_fit < rQbufferSize / 8
this._rQbufferSize = (this.rQlen + min_fit) * 8;
}
// Make sure we always *at least* double the buffer size, and have at least space for 8x
// the current amount of data
this._rQbufferSize = Math.max(this._rQbufferSize * 2, required_buffer_size);
}

// we don't want to grow unboundedly
Expand All @@ -247,14 +246,13 @@ export default class Websock {

if (resizeNeeded) {
const old_rQbuffer = this._rQ.buffer;
this._rQmax = this._rQbufferSize / 8;
this._rQ = new Uint8Array(this._rQbufferSize);
this._rQ.set(new Uint8Array(old_rQbuffer, this._rQi));
this._rQ.set(new Uint8Array(old_rQbuffer, this._rQi, this._rQlen - this._rQi));
} else {
if (ENABLE_COPYWITHIN) {
this._rQ.copyWithin(0, this._rQi);
this._rQ.copyWithin(0, this._rQi, this._rQlen);
} else {
this._rQ.set(new Uint8Array(this._rQ.buffer, this._rQi));
this._rQ.set(new Uint8Array(this._rQ.buffer, this._rQi, this._rQlen - this._rQi));
}
}

Expand All @@ -280,8 +278,6 @@ export default class Websock {
if (this._rQlen == this._rQi) {
this._rQlen = 0;
this._rQi = 0;
} else if (this._rQlen > this._rQmax) {
this._expand_compact_rQ();
}
} else {
Log.Debug("Ignoring empty message");
Expand Down
36 changes: 29 additions & 7 deletions tests/test.websock.js
Original file line number Diff line number Diff line change
Expand Up @@ -384,33 +384,55 @@ describe('Websock', function () {
expect(sock._eventHandlers.message).not.to.have.been.called;
});

it('should compact the receive queue', function () {
// NB(sross): while this is an internal implementation detail, it's important to
// test, otherwise the receive queue could become very large very quickly
it('should compact the receive queue when a message handler empties it', function () {
sock._eventHandlers.message = () => { sock.rQi = sock._rQlen; };
sock._rQ = new Uint8Array([0, 1, 2, 3, 4, 5, 0, 0, 0, 0]);
sock._rQlen = 6;
sock.rQi = 6;
sock._rQmax = 3;
const msg = { data: new Uint8Array([1, 2, 3]).buffer };
sock._mode = 'binary';
sock._recv_message(msg);
expect(sock._rQlen).to.equal(3);
expect(sock._rQlen).to.equal(0);
expect(sock.rQi).to.equal(0);
});

it('should automatically resize the receive queue if the incoming message is too large', function () {
it('should compact the receive queue when we reach the end of the buffer', function () {
sock._rQ = new Uint8Array(20);
sock._rQbufferSize = 20;
sock._rQlen = 20;
sock.rQi = 10;
const msg = { data: new Uint8Array([1, 2]).buffer };
sock._mode = 'binary';
sock._recv_message(msg);
expect(sock._rQlen).to.equal(12);
expect(sock.rQi).to.equal(0);
});

it('should automatically resize the receive queue if the incoming message is larger than the buffer', function () {
sock._rQ = new Uint8Array(20);
sock._rQlen = 0;
sock.rQi = 0;
sock._rQbufferSize = 20;
sock._rQmax = 2;
const msg = { data: new Uint8Array(30).buffer };
sock._mode = 'binary';
sock._recv_message(msg);
expect(sock._rQlen).to.equal(30);
expect(sock.rQi).to.equal(0);
expect(sock._rQ.length).to.equal(240); // keep the invariant that rQbufferSize / 8 >= rQlen
});

it('should automatically resize the receive queue if the incoming message is larger than 1/8th of the buffer and we reach the end of the buffer', function () {
sock._rQ = new Uint8Array(20);
sock._rQlen = 16;
sock.rQi = 16;
sock._rQbufferSize = 20;
const msg = { data: new Uint8Array(6).buffer };
sock._mode = 'binary';
sock._recv_message(msg);
expect(sock._rQlen).to.equal(6);
expect(sock.rQi).to.equal(0);
expect(sock._rQ.length).to.equal(48);
});
});

describe('Data encoding', function () {
Expand Down