Skip to content

Commit 66311ec

Browse files
authored
Allow simplification fetch to have noWait with or without expires. (#1089)
1 parent 39d651b commit 66311ec

File tree

4 files changed

+79
-18
lines changed

4 files changed

+79
-18
lines changed

src/main/java/io/nats/client/BaseConsumeOptions.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package io.nats.client;
1515

16+
import io.nats.client.api.ConsumerConfiguration;
17+
1618
/**
1719
* Base Consume Options are provided to customize the way the consume and
1820
* fetch operate. It is the base class for ConsumeOptions and FetchConsumeOptions.
@@ -23,6 +25,7 @@ public class BaseConsumeOptions {
2325
public static final int DEFAULT_THRESHOLD_PERCENT = 25;
2426
public static final long DEFAULT_EXPIRES_IN_MILLIS = 30000;
2527
public static final long MIN_EXPIRES_MILLS = 1000;
28+
public static final long MIN_NOWAIT_EXPIRES_MILLS = 100;
2629
public static final long MAX_HEARTBEAT_MILLIS = 30000;
2730
public static final int MAX_IDLE_HEARTBEAT_PERCENT = 50;
2831

@@ -31,6 +34,7 @@ public class BaseConsumeOptions {
3134
protected final long expiresIn;
3235
protected final long idleHeartbeat;
3336
protected final int thresholdPercent;
37+
protected final boolean noWait;
3438

3539
@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
3640
protected BaseConsumeOptions(Builder b) {
@@ -45,6 +49,7 @@ protected BaseConsumeOptions(Builder b) {
4549
// validation handled in builder
4650
thresholdPercent = b.thresholdPercent;
4751
expiresIn = b.expiresIn;
52+
noWait = b.noWait;
4853

4954
// calculated
5055
idleHeartbeat = Math.min(MAX_HEARTBEAT_MILLIS, expiresIn * MAX_IDLE_HEARTBEAT_PERCENT / 100);
@@ -67,6 +72,7 @@ protected static abstract class Builder<B, CO> {
6772
protected long bytes = 0;
6873
protected int thresholdPercent = DEFAULT_THRESHOLD_PERCENT;
6974
protected long expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
75+
protected boolean noWait = false;
7076

7177
protected abstract B getThis();
7278

@@ -90,11 +96,23 @@ protected B bytes(long bytes) {
9096
* @return the builder
9197
*/
9298
public B expiresIn(long expiresInMillis) {
93-
if (expiresInMillis < 1) {
94-
expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
99+
if (expiresInMillis < 1) { // this is way to clear or reset, just a code guard really
100+
if (noWait) {
101+
expiresIn = ConsumerConfiguration.LONG_UNSET;
102+
}
103+
else {
104+
expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
105+
}
95106
}
96107
else if (expiresInMillis < MIN_EXPIRES_MILLS) {
97-
throw new IllegalArgumentException("Expires must be greater than or equal to " + MIN_EXPIRES_MILLS);
108+
if (noWait) {
109+
if (expiresInMillis < MIN_NOWAIT_EXPIRES_MILLS) {
110+
throw new IllegalArgumentException("Expires when No Wait must be greater than or equal to " + MIN_NOWAIT_EXPIRES_MILLS);
111+
}
112+
}
113+
else {
114+
throw new IllegalArgumentException("Expires must be greater than or equal to " + MIN_EXPIRES_MILLS);
115+
}
98116
}
99117
else {
100118
expiresIn = expiresInMillis;

src/main/java/io/nats/client/FetchConsumeOptions.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package io.nats.client;
1515

16+
import io.nats.client.api.ConsumerConfiguration;
17+
1618
/**
1719
* Fetch Consume Options are provided to customize the fetch operation.
1820
*/
@@ -39,6 +41,8 @@ public long getMaxBytes() {
3941
return bytes;
4042
}
4143

44+
public boolean isNoWait() { return noWait; }
45+
4246
public static Builder builder() {
4347
return new Builder();
4448
}
@@ -92,6 +96,27 @@ public Builder max(int maxBytes, int maxMessages) {
9296
return bytes(maxBytes);
9397
}
9498

99+
/**
100+
* Set no wait to true
101+
* @return the builder
102+
*/
103+
public Builder noWait() {
104+
this.noWait = true;
105+
expiresIn = ConsumerConfiguration.LONG_UNSET;
106+
return this;
107+
}
108+
109+
/**
110+
* Set no wait to true with an expiration, special behavior.
111+
* @param expiresInMillis the expiration time in milliseconds
112+
* @return the builder
113+
*/
114+
public Builder noWaitExpiresIn(long expiresInMillis) {
115+
this.noWait = true;
116+
expiresIn(expiresInMillis);
117+
return this;
118+
}
119+
95120
/**
96121
* Build the FetchConsumeOptions.
97122
* @return a FetchConsumeOptions instance

src/main/java/io/nats/client/impl/NatsFetchConsumer.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818

1919
import java.io.IOException;
2020

21+
import static io.nats.client.BaseConsumeOptions.MIN_EXPIRES_MILLS;
22+
import static io.nats.client.BaseConsumeOptions.MIN_NOWAIT_EXPIRES_MILLS;
23+
2124
class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer, PullManagerObserver {
25+
private final boolean isNoWait;
2226
private final long maxWaitNanos;
2327
private final String pullSubject;
2428
private long startNanos;
@@ -29,13 +33,22 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
2933
{
3034
super(cachedConsumerInfo);
3135

36+
isNoWait = fetchConsumeOptions.isNoWait();
3237
long expiresInMillis = fetchConsumeOptions.getExpiresInMillis();
33-
maxWaitNanos = expiresInMillis * 1_000_000;
34-
long inactiveThreshold = expiresInMillis * 110 / 100; // ten % longer than the wait
38+
long inactiveThreshold;
39+
if (expiresInMillis <= MIN_NOWAIT_EXPIRES_MILLS ) { // can be for noWait
40+
maxWaitNanos = MIN_NOWAIT_EXPIRES_MILLS * 1_000_000;
41+
inactiveThreshold = MIN_EXPIRES_MILLS; // no need to do the 10% longer
42+
}
43+
else {
44+
maxWaitNanos = expiresInMillis * 1_000_000;
45+
inactiveThreshold = expiresInMillis * 110 / 100; // 10% longer than the wait
46+
}
3547
PullRequestOptions pro = PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages())
3648
.maxBytes(fetchConsumeOptions.getMaxBytes())
37-
.expiresIn(fetchConsumeOptions.getExpiresInMillis())
49+
.expiresIn(expiresInMillis)
3850
.idleHeartbeat(fetchConsumeOptions.getIdleHeartbeat())
51+
.noWait(isNoWait)
3952
.build();
4053
initSub(subscriptionMaker.subscribe(null, null, null, inactiveThreshold));
4154
pullSubject = sub._pull(pro, false, this);
@@ -82,7 +95,12 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked
8295
// if the timer has run out, don't allow waiting
8396
// this might happen once, but it should already be noMorePending
8497
if (timeLeftMillis < 1) {
85-
return sub._nextUnmanagedNoWait(pullSubject); // null means don't wait
98+
Message m = sub._nextUnmanagedNoWait(pullSubject); // null means don't wait
99+
if (m == null && isNoWait) {
100+
finished.set(true);
101+
lenientClose();
102+
}
103+
return m;
86104
}
87105

88106
return sub._nextUnmanaged(timeLeftMillis, pullSubject);

src/main/java/io/nats/client/impl/NatsJetStreamMetaData.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,22 @@ public class NatsJetStreamMetaData {
3535
private final long consumerSeq;
3636
private final ZonedDateTime timestamp;
3737
private final long pending;
38-
3938
@Override
4039
public String toString() {
4140
return "NatsJetStreamMetaData{" +
42-
"prefix='" + prefix + '\'' +
43-
", domain='" + domain + '\'' +
44-
", stream='" + stream + '\'' +
45-
", consumer='" + consumer + '\'' +
46-
", delivered=" + delivered +
47-
", streamSeq=" + streamSeq +
48-
", consumerSeq=" + consumerSeq +
49-
", timestamp=" + timestamp +
50-
", pending=" + pending +
51-
'}';
41+
"prefix='" + prefix + '\'' +
42+
", domain='" + domain + '\'' +
43+
", stream='" + stream + '\'' +
44+
", consumer='" + consumer + '\'' +
45+
", delivered=" + delivered +
46+
", streamSeq=" + streamSeq +
47+
", consumerSeq=" + consumerSeq +
48+
", timestamp=" + timestamp +
49+
", pending=" + pending +
50+
'}';
5251
}
5352

53+
5454
/*
5555
v0 <prefix>.ACK.<stream name>.<consumer name>.<num delivered>.<stream sequence>.<consumer sequence>.<timestamp>
5656
v1 <prefix>.ACK.<stream name>.<consumer name>.<num delivered>.<stream sequence>.<consumer sequence>.<timestamp>.<num pending>

0 commit comments

Comments
 (0)