Skip to content

Commit 60db202

Browse files
authored
Send client state to cluster in re-connections[API-1644] (#1415)
* Make ReadResultSet iterable [API-1315] * Make ReadResultSet iterable [API-1315] * Changes made according to comments on PR.[API-1315] * Send client state to cluster in re-connections [API-1644] * New test added related to check InitializeClientOnCluster * New test added * lint problem fixed
1 parent 53fe97b commit 60db202

File tree

3 files changed

+70
-26
lines changed

3 files changed

+70
-26
lines changed

src/network/ConnectionManager.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ export class ConnectionManager extends EventEmitter {
155155
* counter, but may not show the latest total.
156156
*/
157157
private totalBytesWritten : number;
158+
private establishedInitialClusterConnection: boolean;
158159

159160
constructor(
160161
private readonly client: ClientForConnectionManager,
@@ -833,10 +834,23 @@ export class ConnectionManager extends EventEmitter {
833834
this.connectionRegistry.setConnection(response.memberUuid, connection);
834835
if (connectionsEmpty) {
835836
this.clusterId = newClusterId;
836-
if (clusterIdChanged) {
837+
if (this.establishedInitialClusterConnection) {
838+
// In split brain, the client might connect to the one half
839+
// of the cluster, and then later might reconnect to the
840+
// other half, after the half it was connected to is
841+
// completely dead. Since the cluster id is preserved in
842+
// split brain scenarios, it is impossible to distinguish
843+
// reconnection to the same cluster vs reconnection to the
844+
// other half of the split brain. However, in the latter,
845+
// we might need to send some state to the other half of
846+
// the split brain (like Compact schemas or user code
847+
// deployment classes). That forces us to send the client
848+
// state to the cluster after the first cluster connection,
849+
// regardless the cluster id is changed or not.
837850
this.connectionRegistry.setClientState(ClientState.CONNECTED_TO_CLUSTER);
838851
this.initializeClientOnCluster(newClusterId);
839852
} else {
853+
this.establishedInitialClusterConnection = true;
840854
this.connectionRegistry.setClientState(ClientState.INITIALIZED_ON_CLUSTER);
841855
this.emitLifecycleEvent(LifecycleState.CONNECTED);
842856
}

test/TestUtil.js

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,3 +639,21 @@ exports.calculateServerVersionFromString = (versionString) => {
639639
return isNaN(version) ? BuildInfo.UNKNOWN_VERSION_ID : version;
640640
};
641641

642+
/**
643+
* This function will wait for the connections count to be equal to given parameter (connectionCount).
644+
*/
645+
exports.waitForConnectionCount = async (client, connectionCount) => {
646+
let getConnectionsFn;
647+
if (this.isClientVersionAtLeast('4.2')) {
648+
const clientRegistry = client.connectionRegistry;
649+
getConnectionsFn = clientRegistry.getConnections.bind(clientRegistry);
650+
} else {
651+
const connManager = client.getConnectionManager();
652+
getConnectionsFn = connManager.getActiveConnections.bind(connManager);
653+
}
654+
655+
await this.assertTrueEventually(async () => {
656+
expect(getConnectionsFn().length).to.be.equal(connectionCount);
657+
});
658+
};
659+

test/integration/backward_compatible/serial/ClientReconnectTest.js

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
const { expect } = require('chai');
1919
const RC = require('../../RC');
2020
const TestUtil = require('../../../TestUtil');
21+
const sinon = require('sinon');
22+
const sandbox = sinon.createSandbox();
23+
const { ConnectionManager } = require('../../../../lib/network/ConnectionManager');
2124

2225
/**
2326
* Basic tests for reconnection to cluster scenarios.
@@ -26,28 +29,6 @@ describe('ClientReconnectTest', function () {
2629
let cluster;
2730
let client;
2831

29-
/**
30-
* Waits for disconnection. getMap(), map.put() messages are not retryable. If terminateMember does not
31-
* close the client connection immediately it is possible for the client to realize that later when map.put
32-
* or getMap invocation started. In that case, the connection will be closed with TargetDisconnectedError.
33-
* Because these client messages are not retryable, the invocation will be rejected with an error, leading
34-
* to flaky tests. To avoid that, this function will wait for the connections count to be zero.
35-
*/
36-
const waitForDisconnection = async (client) => {
37-
let getConnectionsFn;
38-
if (TestUtil.isClientVersionAtLeast('4.2')) {
39-
const clientRegistry = client.connectionRegistry;
40-
getConnectionsFn = clientRegistry.getConnections.bind(clientRegistry);
41-
} else {
42-
const connManager = client.getConnectionManager();
43-
getConnectionsFn = connManager.getActiveConnections.bind(connManager);
44-
}
45-
46-
await TestUtil.assertTrueEventually(async () => {
47-
expect(getConnectionsFn()).to.be.empty;
48-
});
49-
};
50-
5132
const testFactory = new TestUtil.TestFactory();
5233

5334
beforeEach(function () {
@@ -59,6 +40,37 @@ describe('ClientReconnectTest', function () {
5940
await testFactory.shutdownAll();
6041
});
6142

43+
it('should send the client state to the cluster after reconnections, ' +
44+
+'regardless it is connected back to possibly the same cluster with the same id or not.', async function () {
45+
const fakeInitializeClientOnCluster = sandbox.replace(
46+
ConnectionManager.prototype,
47+
'initializeClientOnCluster',
48+
sandbox.fake(ConnectionManager.prototype.initializeClientOnCluster)
49+
);
50+
cluster = await testFactory.createClusterForSerialTests();
51+
const member = await RC.startMember(cluster.id);
52+
client = await testFactory.newHazelcastClientForSerialTests({
53+
clusterName: cluster.id,
54+
properties: {
55+
'hazelcast.client.heartbeat.interval': 1000,
56+
'hazelcast.client.heartbeat.timeout': 3000
57+
}
58+
});
59+
await RC.terminateMember(cluster.id, member.uuid);
60+
await TestUtil.waitForConnectionCount(client, 0);
61+
await RC.startMember(cluster.id);
62+
await TestUtil.waitForConnectionCount(client, 1);
63+
fakeInitializeClientOnCluster.callCount.should.be.eq(1);
64+
});
65+
66+
/**
67+
* getMap(), map.put() messages are not retryable. If terminateMember does not
68+
* close the client connection immediately it is possible for the client to realize that later when map.put
69+
* or getMap invocation started. In that case, the connection will be closed with TargetDisconnectedError.
70+
* Because these client messages are not retryable, the invocation will be rejected with an error, leading
71+
* to flaky tests. To avoid that, we use the "TestUtil.waitForConnectionCount" function
72+
* to wait for disconnection in the tests below.
73+
*/
6274
it('member restarts, while map.put in progress', async function () {
6375
cluster = await testFactory.createClusterForSerialTests();
6476
const member = await RC.startMember(cluster.id);
@@ -72,7 +84,7 @@ describe('ClientReconnectTest', function () {
7284
const map = await client.getMap('test');
7385

7486
await RC.terminateMember(cluster.id, member.uuid);
75-
await waitForDisconnection(client);
87+
await TestUtil.waitForConnectionCount(client, 0);
7688
await RC.startMember(cluster.id);
7789

7890
await map.put('testkey', 'testvalue');
@@ -95,7 +107,7 @@ describe('ClientReconnectTest', function () {
95107
});
96108
const map = await client.getMap('test');
97109
await RC.terminateMember(cluster.id, member.uuid);
98-
await waitForDisconnection(client);
110+
await TestUtil.waitForConnectionCount(client, 0);
99111

100112
const promise = map.put('testkey', 'testvalue').then(() => {
101113
return map.get('testkey');
@@ -119,7 +131,7 @@ describe('ClientReconnectTest', function () {
119131
}
120132
});
121133
await RC.terminateMember(cluster.id, member.uuid);
122-
await waitForDisconnection(client);
134+
await TestUtil.waitForConnectionCount(client, 0);
123135

124136
let map;
125137

0 commit comments

Comments
 (0)