Skip to content

Commit 487c355

Browse files
authored
Refactor Idle Connection Resiliency timeout to use existing SharedTimer (#1794)
1 parent 0042429 commit 487c355

File tree

6 files changed

+100
-217
lines changed

6 files changed

+100
-217
lines changed

src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -7402,43 +7402,6 @@ final void trySetSensitivityClassification(SensitivityClassification sensitivity
74027402
}
74037403

74047404

7405-
/**
7406-
* The tds default implementation of a timeout command
7407-
*/
7408-
class TdsTimeoutCommand extends TimeoutCommand<TDSCommand> {
7409-
protected TdsTimeoutCommand(int timeout, TDSCommand command, SQLServerConnection sqlServerConnection) {
7410-
super(timeout, command, sqlServerConnection);
7411-
}
7412-
7413-
protected void interrupt() throws Exception {
7414-
TDSCommand command = getCommand();
7415-
SQLServerConnection sqlServerConnection = getSqlServerConnection();
7416-
try {
7417-
// If TCP Connection to server is silently dropped, exceeding the query timeout
7418-
// on the same connection does
7419-
// not throw SQLTimeoutException
7420-
// The application stops responding instead until SocketTimeoutException is
7421-
// thrown. In this case, we must
7422-
// manually terminate the connection.
7423-
if (null == command && null != sqlServerConnection) {
7424-
sqlServerConnection.terminate(SQLServerException.DRIVER_ERROR_IO_FAILED,
7425-
SQLServerException.getErrString("R_connectionIsClosed"));
7426-
} else {
7427-
// If the timer wasn't canceled before it ran out of
7428-
// time then interrupt the registered command.
7429-
if (null != command)
7430-
command.interrupt(SQLServerException.getErrString("R_queryTimedOut"));
7431-
}
7432-
} catch (SQLServerException e) {
7433-
// Request failed to time out and SQLServerConnection does not exist
7434-
if (null != command)
7435-
command.log(Level.FINE, "Command could not be timed out. Reason: " + e.getMessage());
7436-
throw new SQLServerException(SQLServerException.getErrString("R_crCommandCannotTimeOut"), e);
7437-
}
7438-
}
7439-
}
7440-
7441-
74427405
/**
74437406
* TDSCommand encapsulates an interruptable TDS conversation.
74447407
*
@@ -7566,13 +7529,7 @@ protected void setProcessedResponse(boolean processedResponse) {
75667529
private int queryTimeoutSeconds;
75677530
private int cancelQueryTimeoutSeconds;
75687531
private ScheduledFuture<?> timeout;
7569-
private TdsTimeoutCommand timeoutCommand;
75707532

7571-
/*
7572-
* Some flags for Connection Resiliency. We need to know if a command has already been registered in the poller, or
7573-
* if it was actually executed.
7574-
*/
7575-
private boolean isRegisteredInPoller = false;
75767533
private boolean isExecuted = false;
75777534

75787535
protected int getQueryTimeoutSeconds() {
@@ -7606,18 +7563,6 @@ void createCounter(ICounter previousCounter, Properties activeConnectionProperti
76067563
}
76077564
}
76087565

7609-
synchronized void addToPoller() {
7610-
if (!isRegisteredInPoller) {
7611-
// If command execution is subject to timeout then start timing until
7612-
// the server returns the first response packet.
7613-
if (queryTimeoutSeconds > 0) {
7614-
this.timeoutCommand = new TdsTimeoutCommand(queryTimeoutSeconds, this, null);
7615-
TimeoutPoller.getTimeoutPoller().addTimeoutCommand(this.timeoutCommand);
7616-
isRegisteredInPoller = true;
7617-
}
7618-
}
7619-
}
7620-
76217566
boolean wasExecuted() {
76227567
return isExecuted;
76237568
}
@@ -8033,7 +7978,6 @@ final TDSReader startResponse(boolean isAdaptive) throws SQLServerException {
80337978
SQLServerConnection conn = tdsReader != null ? tdsReader.getConnection() : null;
80347979
this.timeout = tdsWriter.getSharedTimer().schedule(new TDSTimeoutTask(this, conn), queryTimeoutSeconds);
80357980
}
8036-
addToPoller();
80377981

80387982
if (logger.isLoggable(Level.FINEST))
80397983
logger.finest(this.toString() + ": Reading response...");

src/main/java/com/microsoft/sqlserver/jdbc/IdleConnectionResiliency.java

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,23 @@
55

66
package com.microsoft.sqlserver.jdbc;
77

8+
import java.lang.Thread.State;
9+
import java.util.concurrent.ScheduledFuture;
810
import java.util.concurrent.atomic.AtomicInteger;
11+
import java.util.logging.Level;
912

1013

1114
class IdleConnectionResiliency {
15+
private static final java.util.logging.Logger loggerExternal = java.util.logging.Logger
16+
.getLogger("com.microsoft.sqlserver.jdbc.IdleConnectionResiliency");
1217
private boolean connectionRecoveryNegotiated;
1318
private int connectRetryCount;
1419
private SQLServerConnection connection;
1520
private SessionStateTable sessionStateTable;
1621
private ReconnectThread reconnectThread;
1722
private AtomicInteger unprocessedResponseCount = new AtomicInteger();
1823
private boolean connectionRecoveryPossible;
24+
private SQLServerException reconnectErrorReceived = null;
1925

2026
/*
2127
* Variables needed to perform a reconnect, these are not necessarily determined from just the connection string
@@ -27,7 +33,6 @@ class IdleConnectionResiliency {
2733

2834
IdleConnectionResiliency(SQLServerConnection connection) {
2935
this.connection = connection;
30-
reconnectThread = new ReconnectThread(connection);
3136
}
3237

3338
boolean isConnectionRecoveryNegotiated() {
@@ -54,8 +59,8 @@ void setConnection(SQLServerConnection connection) {
5459
this.connection = connection;
5560
}
5661

57-
ReconnectThread getReconnectThread() {
58-
return reconnectThread;
62+
boolean isReconnectRunning() {
63+
return reconnectThread != null && (reconnectThread.getState() != State.TERMINATED);
5964
}
6065

6166
SessionStateTable getSessionStateTable() {
@@ -105,24 +110,34 @@ void parseInitialSessionStateData(byte[] data, byte[][] sessionStateInitial) thr
105110
}
106111

107112
void incrementUnprocessedResponseCount() {
108-
if (connection.getRetryCount() > 0 && !getReconnectThread().isAlive()) {
109-
if (unprocessedResponseCount.incrementAndGet() < 0)
113+
if (connection.getRetryCount() > 0 && !isReconnectRunning()) {
114+
if (unprocessedResponseCount.incrementAndGet() < 0) {
110115
/*
111116
* When this number rolls over, connection recovery is disabled for the rest of the life of the
112117
* connection.
113118
*/
119+
if (loggerExternal.isLoggable(Level.FINER)) {
120+
loggerExternal.finer("unprocessedResponseCount < 0 on increment. Disabling connection resiliency.");
121+
}
122+
114123
setConnectionRecoveryPossible(false);
124+
}
115125
}
116126
}
117127

118128
void decrementUnprocessedResponseCount() {
119-
if (connection.getRetryCount() > 0 && !getReconnectThread().isAlive()) {
120-
if (unprocessedResponseCount.decrementAndGet() < 0)
129+
if (connection.getRetryCount() > 0 && !isReconnectRunning()) {
130+
if (unprocessedResponseCount.decrementAndGet() < 0) {
121131
/*
122132
* When this number rolls over, connection recovery is disabled for the rest of the life of the
123133
* connection.
124134
*/
135+
if (loggerExternal.isLoggable(Level.FINER)) {
136+
loggerExternal.finer("unprocessedResponseCount < 0 on decrement. Disabling connection resiliency.");
137+
}
138+
125139
setConnectionRecoveryPossible(false);
140+
}
126141
}
127142
}
128143

@@ -148,6 +163,20 @@ FailoverInfo getFailoverInfo() {
148163
int getLoginTimeoutSeconds() {
149164
return loginLoginTimeoutSeconds;
150165
}
166+
167+
void reconnect(TDSCommand cmd) throws InterruptedException {
168+
reconnectErrorReceived = null;
169+
reconnectThread = new ReconnectThread(this.connection, cmd);
170+
reconnectThread.start();
171+
reconnectThread.join();
172+
reconnectErrorReceived = reconnectThread.getException();
173+
// Remove reference so GC can clean it up
174+
reconnectThread = null;
175+
}
176+
177+
SQLServerException getReconnectException() {
178+
return reconnectErrorReceived;
179+
}
151180
}
152181

153182

@@ -370,6 +399,8 @@ public void reset() {
370399

371400

372401
final class ReconnectThread extends Thread {
402+
static final java.util.logging.Logger loggerExternal = java.util.logging.Logger
403+
.getLogger("com.microsoft.sqlserver.jdbc.ReconnectThread");
373404
private SQLServerConnection con = null;
374405
private SQLServerException eReceived = null;
375406
private TDSCommand command = null;
@@ -384,19 +415,23 @@ final class ReconnectThread extends Thread {
384415
@SuppressWarnings("unused")
385416
private ReconnectThread() {};
386417

387-
ReconnectThread(SQLServerConnection sqlC) {
418+
ReconnectThread(SQLServerConnection sqlC, TDSCommand cmd) {
388419
this.con = sqlC;
389-
}
390-
391-
// Resets the thread
392-
void init(TDSCommand cmd) {
393420
this.command = cmd;
394421
connectRetryCount = con.getRetryCount();
395422
eReceived = null;
396423
stopRequested = false;
424+
if (loggerExternal.isLoggable(Level.FINER)) {
425+
loggerExternal.finer("ReconnectThread initialized. Connection retry count = " + connectRetryCount
426+
+ "; Command = " + cmd.toString());
427+
}
428+
397429
}
398430

399431
public void run() {
432+
if (loggerExternal.isLoggable(Level.FINER)) {
433+
loggerExternal.finer("Starting ReconnectThread for command: " + command.toString());
434+
}
400435
boolean interruptsEnabled = command.getInterruptsEnabled();
401436
/*
402437
* All TDSCommands are not interruptible before execution, and all the commands passed to here won't have been
@@ -405,10 +440,24 @@ public void run() {
405440
*/
406441
command.setInterruptsEnabled(true);
407442
command.attachThread(this);
408-
command.addToPoller();
443+
444+
// We need a reference to the SharedTimer outside of the context of the connection
445+
SharedTimer timer = null;
446+
ScheduledFuture<?> timeout = null;
447+
448+
if (command.getQueryTimeoutSeconds() > 0) {
449+
timer = SharedTimer.getTimer();
450+
timeout = timer.schedule(new TDSTimeoutTask(command, null),
451+
command.getQueryTimeoutSeconds());
452+
}
453+
409454
boolean keepRetrying = true;
410455

411-
while ((connectRetryCount != 0) && (!stopRequested) && keepRetrying) {
456+
while ((connectRetryCount > 0) && (!stopRequested) && keepRetrying) {
457+
if (loggerExternal.isLoggable(Level.FINER)) {
458+
loggerExternal.finer("Running reconnect for command: " + command.toString() + " ; ConnectRetryCount = "
459+
+ connectRetryCount);
460+
}
412461
try {
413462
eReceived = null;
414463
con.connect(null, con.getPooledConnectionParent());
@@ -451,10 +500,26 @@ public void run() {
451500
}
452501

453502
command.setInterruptsEnabled(interruptsEnabled);
454-
return;
503+
504+
if (loggerExternal.isLoggable(Level.FINER)) {
505+
loggerExternal.finer("ReconnectThread exiting for command: " + command.toString());
506+
}
507+
508+
if (timeout != null) {
509+
timeout.cancel(false);
510+
timeout = null;
511+
}
512+
513+
if (timer != null) {
514+
timer.removeRef();
515+
timer = null;
516+
}
455517
}
456518

457519
void stop(boolean blocking) {
520+
if (loggerExternal.isLoggable(Level.FINER)) {
521+
loggerExternal.finer("ReconnectThread stop requested for command: " + command.toString());
522+
}
458523
stopRequested = true;
459524
if (blocking && this.isAlive()) {
460525
while (this.getState() != State.TERMINATED) {

src/main/java/com/microsoft/sqlserver/jdbc/SQLServerConnection.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3306,7 +3306,7 @@ private InetSocketAddress connectHelper(ServerPortPlaceHolder serverInfo, int ti
33063306

33073307
activeConnectionProperties.remove(SQLServerDriverStringProperty.CLIENT_KEY_PASSWORD.toString());
33083308

3309-
if (sessionRecovery.getReconnectThread().isAlive()) {
3309+
if (sessionRecovery.isReconnectRunning()) {
33103310
if (negotiatedEncryptionLevel != sessionRecovery.getSessionStateTable()
33113311
.getOriginalNegotiatedEncryptionLevel()) {
33123312
connectionlogger.warning(toString()
@@ -3838,7 +3838,7 @@ boolean executeCommand(TDSCommand newCommand) throws SQLServerException {
38383838
newCommand.createCounter(previousCounter, activeConnectionProperties);
38393839
if (!(newCommand instanceof LogonCommand)) {
38403840
// isAlive() doesn't guarantee the thread is actually running, just that it's been requested to start
3841-
if (!sessionRecovery.getReconnectThread().isAlive()) {
3841+
if (!sessionRecovery.isReconnectRunning()) {
38423842
if (this.connectRetryCount > 0 && sessionRecovery.isConnectionRecoveryNegotiated()) {
38433843
if (isConnectionDead()) {
38443844
if (connectionlogger.isLoggable(Level.FINER)) {
@@ -3854,29 +3854,22 @@ boolean executeCommand(TDSCommand newCommand) throws SQLServerException {
38543854
SQLServerException.getErrString("R_crServerSessionStateNotRecoverable"), null,
38553855
false);
38563856
}
3857-
sessionRecovery.getReconnectThread().init(newCommand);
3858-
sessionRecovery.getReconnectThread().start();
3859-
/*
3860-
* Join only blocks the thread that started the reconnect. Currently can't think of a good
3861-
* reason to leave the original thread running, no work can be done while we're not
3862-
* connected anyways. Can be easily changed to non-blocking if necessary.
3863-
*/
38643857
try {
3865-
sessionRecovery.getReconnectThread().join();
3858+
sessionRecovery.reconnect(newCommand);
38663859
} catch (InterruptedException e) {
38673860
// re-interrupt thread
38683861
Thread.currentThread().interrupt();
38693862

38703863
// Keep compiler happy, something's probably seriously wrong if this line is run
3871-
SQLServerException.makeFromDriverError(this, sessionRecovery.getReconnectThread(),
3872-
e.getMessage(), null, false);
3864+
SQLServerException.makeFromDriverError(this, sessionRecovery, e.getMessage(), null,
3865+
false);
38733866
}
3874-
if (sessionRecovery.getReconnectThread().getException() != null) {
3867+
if (sessionRecovery.getReconnectException() != null) {
38753868
if (connectionlogger.isLoggable(Level.FINER)) {
38763869
connectionlogger.finer(
38773870
this.toString() + "Connection is broken and recovery is not possible.");
38783871
}
3879-
throw sessionRecovery.getReconnectThread().getException();
3872+
throw sessionRecovery.getReconnectException();
38803873
}
38813874
}
38823875
}
@@ -4005,7 +3998,7 @@ final boolean doExecute() throws SQLServerException {
40053998
}
40063999
}
40074000

4008-
if (sessionRecovery.getReconnectThread().isAlive()) {
4001+
if (sessionRecovery.isReconnectRunning()) {
40094002
executeReconnectCommand(new ConnectionCommand(sql, logContext));
40104003
} else {
40114004
executeCommand(new ConnectionCommand(sql, logContext));
@@ -4709,7 +4702,7 @@ int writeIdleConnectionResiliencyRequest(boolean write, TDSWriter tdsWriter) thr
47094702
if (write) {
47104703
tdsWriter.writeByte(TDS.TDS_FEATURE_EXT_SESSIONRECOVERY);
47114704
}
4712-
if (!sessionRecovery.getReconnectThread().isAlive()) {
4705+
if (!sessionRecovery.isReconnectRunning()) {
47134706
if (write) {
47144707
tdsWriter.writeInt(0);
47154708
}
@@ -6266,15 +6259,15 @@ final boolean complete(LogonCommand logonCommand, TDSReader tdsReader) throws SQ
62666259
TDSParser.parse(tdsReader, logonProcessor);
62676260
} while (!logonProcessor.complete(logonCommand, tdsReader));
62686261

6269-
if (sessionRecovery.getReconnectThread().isAlive() && !sessionRecovery.isConnectionRecoveryPossible()) {
6262+
if (sessionRecovery.isReconnectRunning() && !sessionRecovery.isConnectionRecoveryPossible()) {
62706263
if (connectionlogger.isLoggable(Level.WARNING)) {
62716264
connectionlogger.warning(this.toString()
62726265
+ "SessionRecovery feature extension ack was not sent by the server during reconnection.");
62736266
}
62746267
terminate(SQLServerException.DRIVER_ERROR_INVALID_TDS,
62756268
SQLServerException.getErrString("R_crClientNoRecoveryAckFromLogin"));
62766269
}
6277-
if (connectRetryCount > 0 && !sessionRecovery.getReconnectThread().isAlive()) {
6270+
if (connectRetryCount > 0 && !sessionRecovery.isReconnectRunning()) {
62786271
sessionRecovery.getSessionStateTable().setOriginalCatalog(sCatalog);
62796272
sessionRecovery.getSessionStateTable().setOriginalCollation(databaseCollation);
62806273
sessionRecovery.getSessionStateTable().setOriginalLanguage(sLanguage);

0 commit comments

Comments
 (0)