Skip to content

Commit c18de0c

Browse files
authored
Ordered Push don't cancel sub if trying to recover (#1045)
1 parent a4b50a9 commit c18de0c

File tree

2 files changed

+15
-7
lines changed

2 files changed

+15
-7
lines changed

src/main/java/io/nats/client/impl/MessageManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,17 @@ public void run() {
149149
}
150150
}
151151
}
152+
153+
@Override
154+
public String toString() {
155+
long sinceLast = System.currentTimeMillis() - lastMsgReceived.get();
156+
return "MmTimerTask{" +
157+
"id='" + id + '\'' +
158+
", alarmPeriod=" + alarmPeriod +
159+
", alive=" + alive.get() +
160+
", sinceLast=" + sinceLast +
161+
'}';
162+
}
152163
}
153164

154165
protected void initOrResetHeartbeatTimer() {

src/main/java/io/nats/client/impl/OrderedMessageManager.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.nats.client.Message;
1818
import io.nats.client.SubscribeOptions;
1919
import io.nats.client.api.ConsumerConfiguration;
20+
import io.nats.client.api.ConsumerInfo;
2021

2122
import java.util.concurrent.atomic.AtomicReference;
2223

@@ -97,19 +98,15 @@ private void handleErrorCondition() {
9798
// 3. make a new consumer using the same deliver subject but
9899
// with a new starting point
99100
ConsumerConfiguration userCC = js.consumerConfigurationForOrdered(originalCc, lastStreamSeq, newDeliverSubject, actualConsumerName);
100-
js._createConsumerUnsubscribeOnException(stream, userCC, sub);
101+
ConsumerInfo ci = js._createConsumer(stream, userCC); // this can fail when a server is down.
102+
sub.setConsumerName(ci.getName());
101103

102104
// 4. restart the manager.
103105
startup(sub);
104106
}
105107
catch (Exception e) {
106108
js.conn.processException(e);
107-
setupHbAlarmToTrigger();
109+
initOrResetHeartbeatTimer();
108110
}
109111
}
110-
111-
private void setupHbAlarmToTrigger() {
112-
updateLastMessageReceived();
113-
initOrResetHeartbeatTimer();
114-
}
115112
}

0 commit comments

Comments
 (0)