1414package io .nats .client .impl ;
1515
1616import io .nats .client .*;
17+ import io .nats .client .api .ConsumerConfiguration ;
1718import io .nats .client .api .ConsumerInfo ;
1819
1920import java .io .IOException ;
2021
2122import static io .nats .client .BaseConsumeOptions .MIN_EXPIRES_MILLS ;
22- import static io .nats .client .BaseConsumeOptions .MIN_NOWAIT_EXPIRES_MILLS ;
2323
2424class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer , PullManagerObserver {
2525 private final boolean isNoWait ;
26+ private final boolean isNoWaitNoExpires ;
2627 private final long maxWaitNanos ;
2728 private final String pullSubject ;
2829 private long startNanos ;
@@ -35,15 +36,18 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
3536
3637 isNoWait = fetchConsumeOptions .isNoWait ();
3738 long expiresInMillis = fetchConsumeOptions .getExpiresInMillis ();
39+ isNoWaitNoExpires = isNoWait && expiresInMillis == ConsumerConfiguration .LONG_UNSET ;
40+
3841 long inactiveThreshold ;
39- if (expiresInMillis <= MIN_NOWAIT_EXPIRES_MILLS ) { // can be for noWait
40- maxWaitNanos = MIN_NOWAIT_EXPIRES_MILLS * 1_000_000 ;
42+ if (expiresInMillis == ConsumerConfiguration . LONG_UNSET ) { // can be for noWait
43+ maxWaitNanos = MIN_EXPIRES_MILLS * 1_000_000 ;
4144 inactiveThreshold = MIN_EXPIRES_MILLS ; // no need to do the 10% longer
4245 }
4346 else {
4447 maxWaitNanos = expiresInMillis * 1_000_000 ;
4548 inactiveThreshold = expiresInMillis * 110 / 100 ; // 10% longer than the wait
4649 }
50+
4751 PullRequestOptions pro = PullRequestOptions .builder (fetchConsumeOptions .getMaxMessages ())
4852 .maxBytes (fetchConsumeOptions .getMaxBytes ())
4953 .expiresIn (expiresInMillis )
@@ -78,7 +82,7 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked
7882 Message m = sub ._nextUnmanagedNoWait (pullSubject );
7983 if (m == null ) {
8084 // if there are no messages in the internal cache AND there are no more pending,
81- // they all have been read and we can go ahead and close the subscription.
85+ // they all have been read and we can go ahead and finish
8286 finished .set (true );
8387 lenientClose ();
8488 }
@@ -96,14 +100,21 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked
96100 // this might happen once, but it should already be noMorePending
97101 if (timeLeftMillis < 1 ) {
98102 Message m = sub ._nextUnmanagedNoWait (pullSubject ); // null means don't wait
99- if (m == null && isNoWait ) {
103+ if (m == null ) {
104+ // no message and no time left, go ahead and finish
100105 finished .set (true );
101106 lenientClose ();
102107 }
103108 return m ;
104109 }
105110
106- return sub ._nextUnmanaged (timeLeftMillis , pullSubject );
111+ Message m = sub ._nextUnmanaged (timeLeftMillis , pullSubject );
112+ if (m == null && isNoWaitNoExpires ) {
113+ // no message and no wait, go ahead and finish
114+ finished .set (true );
115+ lenientClose ();
116+ }
117+ return m ;
107118 }
108119 catch (JetStreamStatusException e ) {
109120 throw new JetStreamStatusCheckedException (e );
0 commit comments