Skip to content

Commit 2d0b397

Browse files
authored
Change logic around closing connections and writers API-1283 (#1417)
The issue was writing some data to the connection after it being closed. I changed and refactored some logic around PipelinedWriter and Connection to avoid that. - Moved socket closing responsibility to Writers - Fixed a test in PipelinedWriterTest - Ongoing and future socket.write calls will end with an error after the writer is closed. This is because close() destroys the socket and any ongoing and future socket.write calls will end with an error as described in node.js [documentation](https://nodejs.org/api/stream.html#writabledestroyerror). So we will be able to reject all deferred promises in the write queue upon close due to connection.close() or socket write error close(). - We were not closing the socket on write error before, now we do it. We were sending end packet and don't write anymore. This is the same as java. [See](https://hazelcast.slack.com/archives/C01JU7ZJYGP/p1668695419424179) fixes #1256
1 parent 2255a52 commit 2d0b397

File tree

3 files changed

+107
-35
lines changed

3 files changed

+107
-35
lines changed

src/network/Connection.ts

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ abstract class Writer extends EventEmitter {
4141

4242
abstract write(message: ClientMessage, resolver: DeferredPromise<void>): void;
4343

44-
abstract close(): void;
44+
abstract close(cause: Error): void;
4545

4646
}
4747

@@ -81,15 +81,22 @@ export class PipelinedWriter extends Writer {
8181

8282
write(message: ClientMessage, resolver: DeferredPromise<void>): void {
8383
if (this.error) {
84-
// if there was a write error, it's useless to keep writing to the socket
84+
// if the socket is closed, it's useless to keep writing to the socket
8585
return process.nextTick(() => resolver.reject(this.error));
8686
}
8787
this.queue.push({ message, resolver });
8888
this.schedule();
8989
}
9090

91-
close(): void {
91+
close(error: Error): void {
92+
if (this.error) {
93+
return;
94+
}
95+
this.error = this.makeIOError(error);
9296
this.canWrite = false;
97+
// If we pass an error to destroy, an unhandled error will be thrown because we don't handle the error event
98+
// So we don't pass anything to the socket. It is internal anyway.
99+
this.socket.destroy();
93100
// no more items can be added now
94101
this.queue = FROZEN_ARRAY;
95102
}
@@ -164,15 +171,22 @@ export class PipelinedWriter extends Writer {
164171
});
165172
}
166173

167-
private handleError(err: any, sentResolvers: OutputQueueItem[]): void {
168-
this.error = new IOError(err);
174+
private handleError(err: Error, sentResolvers: OutputQueueItem[]): void {
175+
const error = this.makeIOError(err);
169176
for (const item of sentResolvers) {
170-
item.resolver.reject(this.error);
177+
item.resolver.reject(error);
171178
}
172179
for (const item of this.queue) {
173-
item.resolver.reject(this.error);
180+
item.resolver.reject(error);
181+
}
182+
this.close(error);
183+
}
184+
185+
private makeIOError(err: Error): IOError {
186+
if (err instanceof IOError) {
187+
return err;
174188
}
175-
this.close();
189+
return new IOError(err.message, err);
176190
}
177191
}
178192

@@ -199,8 +213,8 @@ export class DirectWriter extends Writer {
199213
});
200214
}
201215

202-
close(): void {
203-
// no-op
216+
close(cause: Error): void {
217+
this.socket.destroy();
204218
}
205219
}
206220

@@ -411,7 +425,7 @@ export class Connection {
411425
/**
412426
* Closes this connection.
413427
*/
414-
close(reason: string, cause: Error): void {
428+
close(reason: string | null, cause: Error | null): void {
415429
if (this.closedTime !== 0) {
416430
return;
417431
}
@@ -422,8 +436,7 @@ export class Connection {
422436

423437
this.logClose();
424438

425-
this.writer.close();
426-
this.socket.end();
439+
this.writer.close(this.closedCause ? this.closedCause : new Error(reason ? reason : 'Connection closed'));
427440

428441
this.connectionManager.onConnectionClose(this);
429442
}

test/unit/connection/DirectWriterTest.js

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ const {
2626
Frame
2727
} = require('../../../lib/protocol/ClientMessage');
2828
const { deferredPromise } = require('../../../lib/util/Util');
29+
const sandbox = sinon.createSandbox();
2930

3031
describe('DirectWriterTest', function () {
31-
let queue;
32+
let writer;
3233
let mockSocket;
3334
let writtenBytes;
3435

@@ -40,25 +41,29 @@ describe('DirectWriterTest', function () {
4041

4142
const setUpWriteSuccess = () => {
4243
mockSocket = new Socket({});
43-
sinon.stub(mockSocket, 'write').callsFake((data, cb) => {
44+
sandbox.stub(mockSocket, 'write').callsFake((data, cb) => {
4445
cb();
4546
mockSocket.emit('data', data);
4647
});
47-
queue = new DirectWriter(mockSocket, numberOfBytes => {
48+
writer = new DirectWriter(mockSocket, numberOfBytes => {
4849
writtenBytes += numberOfBytes;
4950
});
5051
};
5152

5253
const setUpWriteFailure = (err) => {
5354
mockSocket = new Socket({});
54-
sinon.stub(mockSocket, 'write').callsFake((_, cb) => {
55+
sandbox.stub(mockSocket, 'write').callsFake((_, cb) => {
5556
cb(err);
5657
});
57-
queue = new DirectWriter(mockSocket, numberOfBytes => {
58+
writer = new DirectWriter(mockSocket, numberOfBytes => {
5859
writtenBytes += numberOfBytes;
5960
});
6061
};
6162

63+
afterEach(function() {
64+
sandbox.restore();
65+
});
66+
6267
it('increment written bytes correctly', function(done) {
6368
setUpWriteSuccess();
6469

@@ -70,7 +75,7 @@ describe('DirectWriterTest', function () {
7075
done();
7176
});
7277

73-
queue.write(msg, deferredPromise());
78+
writer.write(msg, deferredPromise());
7479
});
7580

7681
it('writes single message into socket', function(done) {
@@ -82,7 +87,7 @@ describe('DirectWriterTest', function () {
8287
done();
8388
});
8489

85-
queue.write(msg, deferredPromise());
90+
writer.write(msg, deferredPromise());
8691
});
8792

8893
it('writes multiple messages separately into socket', function(done) {
@@ -97,16 +102,16 @@ describe('DirectWriterTest', function () {
97102
}
98103
});
99104

100-
queue.write(msg, deferredPromise());
101-
queue.write(msg, deferredPromise());
102-
queue.write(msg, deferredPromise());
105+
writer.write(msg, deferredPromise());
106+
writer.write(msg, deferredPromise());
107+
writer.write(msg, deferredPromise());
103108
});
104109

105110
it('resolves promise on write success', function(done) {
106111
setUpWriteSuccess();
107112

108113
const resolver = deferredPromise();
109-
queue.write(createMessage('test'), resolver);
114+
writer.write(createMessage('test'), resolver);
110115
resolver.promise.then(done);
111116
});
112117

@@ -115,7 +120,7 @@ describe('DirectWriterTest', function () {
115120
setUpWriteFailure(err);
116121

117122
const resolver = deferredPromise();
118-
queue.write(createMessage('test'), resolver);
123+
writer.write(createMessage('test'), resolver);
119124
resolver.promise.catch((err) => {
120125
expect(err).to.be.equal(err);
121126
done();
@@ -125,18 +130,29 @@ describe('DirectWriterTest', function () {
125130
it('emits write event on write success', function(done) {
126131
setUpWriteSuccess();
127132

128-
queue.on('write', done);
129-
queue.write(createMessage('test'), deferredPromise());
133+
writer.on('write', done);
134+
writer.write(createMessage('test'), deferredPromise());
130135
});
131136

132137
it('does not emit write event on write failure', function(done) {
133138
setUpWriteFailure(new Error());
134139

135-
queue.on('write', () => done(new Error()));
140+
writer.on('write', () => done(new Error()));
136141
const resolver = deferredPromise();
137-
queue.write(createMessage('test'), resolver);
142+
writer.write(createMessage('test'), resolver);
138143
resolver.promise.catch(() => {
139144
done();
140145
});
141146
});
147+
148+
it('should close the socket upon being closed', function() {
149+
setUpWriteSuccess();
150+
151+
// This is equivalent to a sinon spy
152+
const spy = sandbox.fake(mockSocket.destroy);
153+
sandbox.replace(mockSocket, 'destroy', spy);
154+
writer.close();
155+
156+
expect(spy.calledOnce).to.be.true;
157+
});
142158
});

test/unit/connection/PipelinedWriterTest.js

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ const sinon = require('sinon');
2020
const { expect } = require('chai');
2121

2222
const { PipelinedWriter } = require('../../../lib/network/Connection');
23+
const { IOError } = require('../../../lib/core/HazelcastError');
24+
const TestUtil = require('../../TestUtil');
2325
const {
2426
ClientMessage,
2527
Frame
2628
} = require('../../../lib/protocol/ClientMessage');
2729
const { deferredPromise } = require('../../../lib/util/Util');
30+
const sandbox = sinon.createSandbox();
2831

2932
describe('PipelinedWriterTest', function () {
3033
const THRESHOLD = 8192;
@@ -35,7 +38,7 @@ describe('PipelinedWriterTest', function () {
3538

3639
function setUpWriteSuccess(canWrite) {
3740
mockSocket = new Socket({});
38-
sinon.stub(mockSocket, 'write').callsFake((data, cb) => {
41+
sandbox.stub(mockSocket, 'write').callsFake((data, cb) => {
3942
process.nextTick(cb);
4043
process.nextTick(() => mockSocket.emit('data', data));
4144
return canWrite;
@@ -47,7 +50,7 @@ describe('PipelinedWriterTest', function () {
4750

4851
function setUpWriteFailure(err) {
4952
mockSocket = new Socket({});
50-
sinon.stub(mockSocket, 'write').callsFake((_, cb) => {
53+
sandbox.stub(mockSocket, 'write').callsFake((_, cb) => {
5154
process.nextTick(() => cb(err));
5255
return false;
5356
});
@@ -68,6 +71,10 @@ describe('PipelinedWriterTest', function () {
6871
return clientMessage;
6972
}
7073

74+
afterEach(function () {
75+
sandbox.restore();
76+
});
77+
7178
it('increment written bytes correctly', function(done) {
7279
setUpWriteSuccess(true);
7380

@@ -199,16 +206,17 @@ describe('PipelinedWriterTest', function () {
199206
Promise.all([resolver1.promise, resolver2.promise]).then(() => done());
200207
});
201208

202-
it('rejects single promise on write failure', function(done) {
209+
it('rejects single promise on write failure', async function() {
203210
const err = new Error();
204211
setUpWriteFailure(err);
205212

206213
const resolver = deferredPromise();
207214
writer.write(createMessageFromString('test'), resolver);
208-
resolver.promise.catch((err) => {
209-
expect(err).to.be.equal(err);
210-
done();
215+
const rejReason = await TestUtil.getRejectionReasonOrThrow(async () => {
216+
await resolver.promise;
211217
});
218+
expect(rejReason.cause).to.be.equal(err);
219+
expect(rejReason).to.be.instanceOf(IOError);
212220
});
213221

214222
it('rejects multiple promises on write failure', function(done) {
@@ -276,4 +284,39 @@ describe('PipelinedWriterTest', function () {
276284
writer.write(msg, deferredPromise());
277285
});
278286
});
287+
288+
it('should not schedule a write if a write() is called when the writer is already closed', async function() {
289+
setUpWriteSuccess(true);
290+
291+
const msg = createMessageFromString('test');
292+
// Pass a IOError so that the same error is used to reject the write() deferred promise
293+
const closeReason = new IOError();
294+
writer.close(closeReason);
295+
const deferred = deferredPromise();
296+
297+
// This is equivalent to a sinon spy
298+
const spy = sandbox.fake(writer.schedule);
299+
sandbox.replace(writer, 'schedule', spy);
300+
writer.write(msg, deferred);
301+
302+
const rejectionReason = await TestUtil.getRejectionReasonOrThrow(async () => await deferred.promise);
303+
expect(rejectionReason).to.be.equal(closeReason);
304+
expect(spy.callCount).to.be.equal(0);
305+
});
306+
307+
it('should not destroy the socket twice upon closing again', async function() {
308+
setUpWriteSuccess(true);
309+
310+
// Pass a IOError so that the same error is used to reject the write() deferred promise
311+
const closeReason = new IOError();
312+
313+
// This is equivalent to a sinon spy
314+
const spy = sandbox.fake(mockSocket.destroy);
315+
sandbox.replace(mockSocket, 'destroy', spy);
316+
317+
writer.close(closeReason);
318+
writer.close(closeReason);
319+
320+
expect(spy.callCount).to.be.equal(1);
321+
});
279322
});

0 commit comments

Comments
 (0)