Skip to content

Commit 8a69711

Browse files
authored
Force Reconnect API (#1100)
1 parent b1045c3 commit 8a69711

22 files changed

+659
-126
lines changed

src/examples/java/io/nats/examples/chaosTestApp/OutputErrorListener.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ public OutputErrorListener(String id, java.util.function.Consumer<String> watche
3030
this.watcher = watcher;
3131
}
3232

33-
private String supplyMessage(String label, Connection conn, Consumer consumer, Subscription sub, Object... pairs) {
33+
@Override
34+
public String supplyMessage(String label, Connection conn, Consumer consumer, Subscription sub, Object... pairs) {
3435
StringBuilder sb = new StringBuilder(label);
3536
if (conn != null) {
3637
ServerInfo si = conn.getServerInfo();
@@ -129,4 +130,9 @@ public void pullStatusError(Connection conn, JetStreamSubscription sub, Status s
129130
public void flowControlProcessed(Connection conn, JetStreamSubscription sub, String id, FlowControlSource source) {
130131
Output.controlMessage(this.id, supplyMessage("INFO flowControlProcessed", conn, null, sub, "FlowControlSource:", source));
131132
}
133+
134+
@Override
135+
public void socketWriteTimeout(Connection conn) {
136+
Output.controlMessage(this.id, supplyMessage("SEVERE socketWriteTimeout", conn, null, null));
137+
}
132138
}

src/examples/java/io/nats/examples/jetstream/ResilientPublisher.java

Lines changed: 139 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515

1616
import io.nats.client.*;
1717
import io.nats.client.api.MessageInfo;
18+
import io.nats.client.api.PublishAck;
1819
import io.nats.client.api.StorageType;
20+
import io.nats.client.impl.ErrorListenerConsoleImpl;
1921

2022
import java.util.concurrent.ThreadLocalRandom;
2123
import java.util.concurrent.atomic.AtomicBoolean;
2224
import java.util.concurrent.atomic.AtomicLong;
25+
import java.util.function.BiConsumer;
2326
import java.util.function.Function;
2427

2528
import static io.nats.examples.jetstream.NatsJsUtils.createOrReplaceStream;
@@ -36,10 +39,29 @@
3639
*/
3740
public class ResilientPublisher implements Runnable {
3841
public static void main(String[] args) {
39-
try (Connection nc = Nats.connect()) {
42+
Options options = Options.builder()
43+
.socketWriteTimeout(20_000)
44+
.connectionListener((conn, type) -> System.out.println(type))
45+
.errorListener(new ErrorListenerConsoleImpl())
46+
.build();
47+
try (Connection nc = Nats.connect(options)) {
48+
49+
// JetStream PUBLISHER EXAMPLE
4050
JetStreamManagement jsm = nc.jetStreamManagement();
41-
createOrReplaceStream(jsm, "stream", StorageType.Memory, "subject");
42-
ResilientPublisher rp = newInstanceReportingAndDelay(jsm, "stream", "subject", "prefix", 100, 1000);
51+
createOrReplaceStream(jsm, "js-stream", StorageType.Memory, "js-subject");
52+
ResilientPublisher rp = new ResilientPublisher(nc, jsm, "js-stream", "js-subject")
53+
.basicDataPrefix("data")
54+
.delay(1)
55+
.reportFrequency(1000);
56+
// END JetStream PUBLISHER EXAMPLE
57+
58+
// CORE PUBLISHER EXAMPLE
59+
// ResilientPublisher rp = new ResilientPublisher(nc, "core-subject")
60+
// .basicDataPrefix("data")
61+
// .delay(1)
62+
// .reportFrequency(1000);
63+
// END CORE PUBLISHER EXAMPLE
64+
4365
Thread t = new Thread(rp);
4466
t.start();
4567
t.join();
@@ -49,48 +71,104 @@ public static void main(String[] args) {
4971
}
5072
}
5173

52-
public static ResilientPublisher newInstanceReportingAndDelay(JetStreamManagement jsm, String stream, String subject, String prefix, long delay, long reportFrequency) {
53-
return new ResilientPublisher(jsm, stream, subject, prefix, -1, delay, reportFrequency);
54-
}
55-
56-
public static ResilientPublisher newInstanceReportingAndJitter(JetStreamManagement jsm, String stream, String subject, String prefix, long jitter, long reportFrequency) {
57-
return new ResilientPublisher(jsm, stream, subject, prefix, jitter, -1, reportFrequency);
58-
}
59-
60-
public static ResilientPublisher newInstanceQuietAndDelay(JetStreamManagement jsm, String stream, String subject, String prefix, long delay) {
61-
return new ResilientPublisher(jsm, stream, subject, prefix, -1, delay, null);
62-
}
63-
64-
public static ResilientPublisher newInstanceQuietAndJitter(JetStreamManagement jsm, String stream, String subject, String prefix, long jitter) {
65-
return new ResilientPublisher(jsm, stream, subject, prefix, jitter, -1, null);
66-
}
67-
74+
private final Connection nc;
6875
private final JetStreamManagement jsm;
6976
private final JetStream js;
7077
private final String stream;
7178
private final String subject;
72-
private final String prefix;
73-
private final long jitter;
74-
private final long delay;
75-
private final boolean reporting;
76-
private final Long reportFrequency;
77-
private final AtomicBoolean keepGoing = new AtomicBoolean(true);
7879
private final AtomicLong lastPub;
79-
private final Function<Long, byte[]> dataProvider;
80+
private final AtomicBoolean keepGoing;
81+
82+
private boolean expectationCheck;
83+
private long jitter;
84+
private long delay;
85+
private boolean reporting;
86+
private long reportFrequency;
87+
private Function<Long, byte[]> dataProvider;
88+
private java.util.function.BiConsumer<Connection, Long> beforePublish;
89+
private java.util.function.BiConsumer<Connection, PublishAck> afterPublish;
90+
private java.util.function.BiConsumer<Connection, Long> publishReporter;
91+
private java.util.function.BiConsumer<Connection, Exception> exceptionReporter;
92+
93+
public ResilientPublisher(Connection nc, String subject) {
94+
this(nc, null, null, subject);
95+
}
8096

81-
public ResilientPublisher(JetStreamManagement jsm, String stream, String subject, String prefix, long jitter, long delay, Long reportFrequency) {
82-
this.jsm = jsm;
83-
js = jsm.jetStream();
84-
this.stream = stream;
97+
public ResilientPublisher(Connection nc, JetStreamManagement jsm, String stream, String subject) {
98+
this.nc = nc;
99+
if (jsm == null) {
100+
this.jsm = null;
101+
js = null;
102+
this.stream = null;
103+
}
104+
else {
105+
this.jsm = jsm;
106+
js = jsm.jetStream();
107+
this.stream = stream;
108+
}
85109
this.subject = subject;
86-
this.prefix = prefix;
110+
lastPub = new AtomicLong();
111+
keepGoing = new AtomicBoolean(true);
112+
basicDataPrefix(null);
113+
beforePublish(null);
114+
afterPublish(null);
115+
publishReporter(null);
116+
exceptionReporter(null);
117+
}
118+
119+
public ResilientPublisher expectationCheck(boolean expectationCheck) {
120+
this.expectationCheck = expectationCheck;
121+
return this;
122+
}
123+
124+
public ResilientPublisher jitter(long jitter) {
87125
this.jitter = jitter;
126+
return this;
127+
}
128+
129+
public ResilientPublisher delay(long delay) {
88130
this.delay = delay;
89-
this.reporting = reportFrequency != null;
131+
return this;
132+
}
133+
134+
public ResilientPublisher reportFrequency(long reportFrequency) {
90135
this.reportFrequency = reportFrequency;
91-
lastPub = new AtomicLong();
136+
reporting = reportFrequency > 0;
137+
return this;
138+
}
92139

140+
public ResilientPublisher basicDataPrefix(String prefix) {
93141
dataProvider = prefix == null ? l -> null : l -> (prefix + "-" + l).getBytes();
142+
return this;
143+
}
144+
145+
public ResilientPublisher dataProvider(Function<Long, byte[]> dataProvider) {
146+
this.dataProvider = dataProvider == null ? l -> null : dataProvider;
147+
return this;
148+
}
149+
150+
public ResilientPublisher beforePublish(BiConsumer<Connection, Long> beforePublish) {
151+
this.beforePublish = beforePublish == null ? (c, l) -> {} : beforePublish;
152+
return this;
153+
}
154+
155+
public ResilientPublisher afterPublish(BiConsumer<Connection, PublishAck> afterPublish) {
156+
this.afterPublish = afterPublish == null ? (c, l) -> {} : afterPublish;
157+
return this;
158+
}
159+
160+
public ResilientPublisher publishReporter(BiConsumer<Connection, Long> publishReporter) {
161+
this.publishReporter = publishReporter == null
162+
? (c, l) -> report("Published Id: " + l)
163+
: publishReporter;
164+
return this;
165+
}
166+
167+
public ResilientPublisher exceptionReporter(BiConsumer<Connection, Exception> exceptionReporter) {
168+
this.exceptionReporter = exceptionReporter == null
169+
? (c, e) -> report("Publish Exception: " + e)
170+
: exceptionReporter;
171+
return this;
94172
}
95173

96174
public void stop() {
@@ -103,7 +181,7 @@ public long getLastPub() {
103181

104182
@Override
105183
public void run() {
106-
boolean lastWasOk = false;
184+
Exception lastEx = null;
107185
long reportAt = 0;
108186
while (keepGoing.get()) {
109187
try {
@@ -119,21 +197,32 @@ public void run() {
119197
// it's possible that the publish was not recorded
120198
// but we won't find out until the next round
121199
// at which time we will get the 10071
122-
PublishOptions po = PublishOptions.builder()
123-
.expectedLastSequence(lastPub.get())
124-
.build();
125-
js.publish(subject, dataProvider.apply(lastPub.incrementAndGet()), po);
200+
long lastPubId = lastPub.get();
201+
long pubId = lastPub.incrementAndGet();
202+
203+
beforePublish.accept(nc, pubId);
204+
if (js == null) {
205+
nc.publish(subject, dataProvider.apply(pubId));
206+
}
207+
else {
208+
PublishOptions po = expectationCheck
209+
? PublishOptions.builder().expectedLastSequence(lastPubId).build()
210+
: null;
211+
PublishAck pa = js.publish(subject, dataProvider.apply(pubId), po);
212+
afterPublish.accept(nc, pa);
213+
}
126214

127215
if (reporting) {
128-
if (!lastWasOk || System.currentTimeMillis() > reportAt) {
129-
report("Published Sequence: " + lastPub.get());
216+
if (lastEx != null || System.currentTimeMillis() > reportAt) {
217+
publishReporter.accept(nc, pubId);
130218
reportAt = System.currentTimeMillis() + reportFrequency;
131219
}
132220
}
133221

134-
lastWasOk = true;
222+
lastEx = null;
135223
}
136224
catch (Exception e) {
225+
boolean diff = lastEx == null;
137226
if (e instanceof JetStreamApiException) {
138227
JetStreamApiException j = (JetStreamApiException)e;
139228
if (j.getApiErrorCode() == 10071) {
@@ -145,12 +234,18 @@ public void run() {
145234
// ignore, it will happen again!
146235
}
147236
}
237+
if (!diff && lastEx instanceof JetStreamApiException) {
238+
diff = j.getApiErrorCode() != ((JetStreamApiException)lastEx).getApiErrorCode();
239+
}
240+
}
241+
if (!diff && lastEx.getClass().getSimpleName().equals(e.getClass().getSimpleName())){
242+
diff = true;
148243
}
149-
if (lastWasOk || System.currentTimeMillis() > reportAt) {
150-
report("Publish Exception: " + e);
244+
if (diff || System.currentTimeMillis() > reportAt) {
245+
exceptionReporter.accept(nc, e);
151246
reportAt = System.currentTimeMillis() + reportFrequency;
152247
}
153-
lastWasOk = false;
248+
lastEx = e;
154249
}
155250
}
156251
}

src/examples/java/io/nats/examples/jetstream/simple/FetchResilientExample.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,10 @@ public static void main(String[] args) {
6161
streamReportingThread.start();
6262

6363
// simulating a publisher somewhere else.
64-
ResilientPublisher rp = ResilientPublisher.
65-
newInstanceReportingAndDelay(jsm, STREAM, SUBJECT, MESSAGE_PREFIX, PUBLISH_DELAY, PUB_REPORT_FREQUENCY);
64+
ResilientPublisher rp = new ResilientPublisher(nc, jsm, STREAM, SUBJECT)
65+
.basicDataPrefix(MESSAGE_PREFIX)
66+
.delay(PUBLISH_DELAY)
67+
.reportFrequency(PUB_REPORT_FREQUENCY);
6668
Thread pubThread = new Thread(rp);
6769
pubThread.start();
6870

src/examples/java/io/nats/examples/jetstream/simple/IterableConsumerExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public static void main(String[] args) {
5757
}
5858

5959
System.out.println("Starting publish...");
60-
ResilientPublisher publisher = ResilientPublisher.newInstanceQuietAndJitter(jsm, STREAM, SUBJECT, MESSAGE_PREFIX, 10);
60+
ResilientPublisher publisher = new ResilientPublisher(nc, jsm, STREAM, SUBJECT).basicDataPrefix(MESSAGE_PREFIX).jitter(10);
6161
Thread pubThread = new Thread(publisher);
6262
pubThread.start();
6363

src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public static void main(String[] args) {
4343
createOrReplaceStream(jsm, STREAM, SUBJECT);
4444

4545
System.out.println("Starting publish...");
46-
ResilientPublisher publisher = ResilientPublisher.newInstanceQuietAndJitter(jsm, STREAM, SUBJECT, MESSAGE_PREFIX, 10);
46+
ResilientPublisher publisher = new ResilientPublisher(nc, jsm, STREAM, SUBJECT).basicDataPrefix(MESSAGE_PREFIX).jitter(10);
4747
Thread pubThread = new Thread(publisher);
4848
pubThread.start();
4949

src/main/java/io/nats/client/Connection.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,14 @@ enum Status {
545545
*/
546546
void flushBuffer() throws IOException;
547547

548+
/**
549+
* Forces reconnect behavior. Stops the current connection including the reading and writing,
550+
* copies already queued outgoing messages, and then begins the reconnect logic.
551+
* @throws IOException
552+
* @throws InterruptedException
553+
*/
554+
void forceReconnect() throws IOException, InterruptedException;
555+
548556
/**
549557
* Calculates the round trip time between this client and the server.
550558
* @return the RTT as a duration

src/main/java/io/nats/client/ConnectionListener.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,31 +20,52 @@
2020
public interface ConnectionListener {
2121
public enum Events {
2222
/** The connection has successfully completed the handshake with the nats-server. */
23-
CONNECTED("nats: connection opened"),
23+
CONNECTED(true, "opened"),
2424
/** The connection is permanently closed, either by manual action or failed reconnects. */
25-
CLOSED("nats: connection closed"),
25+
CLOSED(true, "closed"),
2626
/** The connection lost its connection, but may try to reconnect if configured to. */
27-
DISCONNECTED("nats: connection disconnected"),
27+
DISCONNECTED(true, "disconnected"),
2828
/** The connection was connected, lost its connection and successfully reconnected. */
29-
RECONNECTED("nats: connection reconnected"),
29+
RECONNECTED(true, "reconnected"),
3030
/** The connection was reconnected and the server has been notified of all subscriptions. */
31-
RESUBSCRIBED("nats: subscriptions re-established"),
31+
RESUBSCRIBED(false, "subscriptions re-established"),
3232
/** The connection was told about new servers from, from the current server. */
33-
DISCOVERED_SERVERS("nats: discovered servers"),
33+
DISCOVERED_SERVERS(false, "discovered servers"),
3434
/** Server Sent a lame duck mode. */
35-
LAME_DUCK("nats: lame duck mode");
35+
LAME_DUCK(false, "lame duck mode");
3636

37-
private String event;
37+
private final boolean connectionEvent;
38+
private final String event;
39+
private final String natsEvent;
3840

39-
Events(String err) {
40-
this.event = err;
41+
Events(boolean connectionEvent, String event) {
42+
this.connectionEvent = connectionEvent;
43+
this.event = event;
44+
if (connectionEvent) {
45+
this.natsEvent = "nats: connection " + event;
46+
}
47+
else {
48+
this.natsEvent = "nats: " + event;
49+
}
50+
}
51+
52+
public boolean isConnectionEvent() {
53+
return connectionEvent;
54+
}
55+
56+
public String getEvent() {
57+
return event;
58+
}
59+
60+
public String getNatsEvent() {
61+
return natsEvent;
4162
}
4263

4364
/**
4465
* @return the string value for this event
4566
*/
4667
public String toString() {
47-
return this.event;
68+
return this.natsEvent;
4869
}
4970
}
5071

0 commit comments

Comments
 (0)