Skip to content

Commit 6c5ae3f

Browse files
Implement Consumers Pause (#1093)
1 parent 1cbd2e8 commit 6c5ae3f

19 files changed

+364
-5
lines changed

src/examples/java/io/nats/examples/jetstream/NatsJsManageConsumers.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@
1919
import io.nats.client.Nats;
2020
import io.nats.client.api.ConsumerConfiguration;
2121
import io.nats.client.api.ConsumerInfo;
22+
import io.nats.client.api.ConsumerPauseResponse;
2223
import io.nats.examples.ExampleArgs;
2324
import io.nats.examples.ExampleUtils;
2425

26+
import java.time.ZonedDateTime;
2527
import java.util.List;
2628

29+
import static io.nats.client.support.DateTimeUtils.ZONE_ID_GMT;
2730
import static io.nats.examples.jetstream.NatsJsUtils.*;
2831

2932
/**
@@ -82,16 +85,30 @@ public static void main(String[] args) {
8285
List<String> consumerNames = jsm.getConsumerNames(exArgs.stream);
8386
printObject(consumerNames);
8487

85-
// 4. Delete a consumer, then list them again
88+
// 4. Pause a consumer
89+
System.out.println("\n----------\n4. pauseConsumer");
90+
ZonedDateTime pauseUntil = ZonedDateTime.now(ZONE_ID_GMT).plusSeconds(30);
91+
ConsumerPauseResponse pauseResponse = jsm.pauseConsumer(exArgs.stream, durable1, pauseUntil);
92+
printObject(pauseResponse);
93+
ci = jsm.getConsumerInfo(exArgs.stream, durable1);
94+
printObject(ci);
95+
96+
// 5. Resume a (paused) consumer
97+
System.out.println("\n----------\n5. resumeConsumer");
98+
jsm.resumeConsumer(exArgs.stream, durable1);
99+
ci = jsm.getConsumerInfo(exArgs.stream, durable1);
100+
printObject(ci);
101+
102+
// 6. Delete a consumer, then list them again
86103
// Subsequent calls to deleteStream will throw a
87104
// JetStreamApiException [10014]
88-
System.out.println("\n----------\n3. Delete consumers");
105+
System.out.println("\n----------\n6. Delete consumers");
89106
jsm.deleteConsumer(exArgs.stream, durable1);
90107
consumerNames = jsm.getConsumerNames(exArgs.stream);
91108
printObject(consumerNames);
92109

93-
// 5. Try to delete the consumer again and get the exception
94-
System.out.println("\n----------\n5. Delete consumer again");
110+
// 7. Try to delete the consumer again and get the exception
111+
System.out.println("\n----------\n7. Delete consumer again");
95112
try
96113
{
97114
jsm.deleteConsumer(exArgs.stream, durable1);
@@ -101,6 +118,17 @@ public static void main(String[] args) {
101118
System.out.println("Exception was: '" + e.getMessage() + "'");
102119
}
103120

121+
// 8. Try to pause a consumer that does not exist, and you will get an exception
122+
System.out.println("\n----------\n8. Pause non-existent consumer .");
123+
try
124+
{
125+
jsm.pauseConsumer(exArgs.stream, durable1, ZonedDateTime.now());
126+
}
127+
catch (JetStreamApiException e)
128+
{
129+
System.out.println("Exception was: '" + e.getMessage() + "'");
130+
}
131+
104132
System.out.println("\n----------");
105133

106134
// delete the stream since we are done with it.

src/main/java/io/nats/client/JetStreamManagement.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.nats.client.api.*;
1616

1717
import java.io.IOException;
18+
import java.time.ZonedDateTime;
1819
import java.util.List;
1920

2021
/**
@@ -131,6 +132,29 @@ public interface JetStreamManagement {
131132
*/
132133
boolean deleteConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException;
133134

135+
/**
136+
* Pauses a consumer.
137+
* @param streamName name of the stream
138+
* @param consumerName the name of the consumer.
139+
* @param pauseUntil consumer is paused until this time.
140+
* @return ConsumerPauseResponse the pause response
141+
* @throws IOException covers various communication issues with the NATS
142+
* server such as timeout or interruption
143+
* @throws JetStreamApiException the request had an error related to the data, for instance the consumer does not exist.
144+
*/
145+
ConsumerPauseResponse pauseConsumer(String streamName, String consumerName, ZonedDateTime pauseUntil) throws IOException, JetStreamApiException;
146+
147+
/**
148+
* Resumes a paused consumer.
149+
* @param streamName name of the stream
150+
* @param consumerName the name of the consumer.
151+
* @return true if the resume succeeded
152+
* @throws IOException covers various communication issues with the NATS
153+
* server such as timeout or interruption
154+
* @throws JetStreamApiException the request had an error related to the data, for instance the consumer does not exist.
155+
*/
156+
boolean resumeConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException;
157+
134158
/**
135159
* Gets the info for an existing consumer.
136160
* @param streamName name of the stream

src/main/java/io/nats/client/api/ConsumerConfiguration.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public class ConsumerConfiguration implements JsonSerializable {
8080
protected final Integer maxBatch;
8181
protected final Integer maxBytes;
8282
protected final Integer numReplicas;
83+
protected final ZonedDateTime pauseUntil;
8384
protected final Boolean flowControl;
8485
protected final Boolean headersOnly;
8586
protected final Boolean memStorage;
@@ -110,6 +111,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
110111
this.maxBatch = cc.maxBatch;
111112
this.maxBytes = cc.maxBytes;
112113
this.numReplicas = cc.numReplicas;
114+
this.pauseUntil = cc.pauseUntil;
113115
this.flowControl = cc.flowControl;
114116
this.headersOnly = cc.headersOnly;
115117
this.memStorage = cc.memStorage;
@@ -143,6 +145,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
143145
maxBatch = readInteger(v, MAX_BATCH);
144146
maxBytes = readInteger(v, MAX_BYTES);
145147
numReplicas = readInteger(v, NUM_REPLICAS);
148+
pauseUntil = readDate(v, PAUSE_UNTIL);
146149

147150
flowControl = readBoolean(v, FLOW_CONTROL, null);
148151
headersOnly = readBoolean(v, HEADERS_ONLY, null);
@@ -187,6 +190,7 @@ protected ConsumerConfiguration(Builder b)
187190
this.maxBatch = b.maxBatch;
188191
this.maxBytes = b.maxBytes;
189192
this.numReplicas = b.numReplicas;
193+
this.pauseUntil = b.pauseUntil;
190194

191195
this.flowControl = b.flowControl;
192196
this.headersOnly = b.headersOnly;
@@ -229,6 +233,7 @@ public String toJson() {
229233
JsonUtils.addFieldAsNanos(sb, INACTIVE_THRESHOLD, inactiveThreshold);
230234
JsonUtils.addDurations(sb, BACKOFF, backoff);
231235
JsonUtils.addField(sb, NUM_REPLICAS, numReplicas);
236+
JsonUtils.addField(sb, PAUSE_UNTIL, pauseUntil);
232237
JsonUtils.addField(sb, MEM_STORAGE, memStorage);
233238
JsonUtils.addField(sb, METADATA, metadata);
234239
if (filterSubjects != null) {
@@ -485,6 +490,14 @@ public Map<String, String> getMetadata() {
485490
*/
486491
public int getNumReplicas() { return getOrUnset(numReplicas); }
487492

493+
/**
494+
* Get the time until the consumer is paused.
495+
* @return paused until time
496+
*/
497+
public ZonedDateTime getPauseUntil() {
498+
return pauseUntil;
499+
}
500+
488501
/**
489502
* Gets whether deliver policy of this consumer configuration was set or left unset
490503
* @return true if the policy was set, false if the policy was not set
@@ -663,6 +676,7 @@ public static class Builder {
663676
private Integer maxBatch;
664677
private Integer maxBytes;
665678
private Integer numReplicas;
679+
private ZonedDateTime pauseUntil;
666680

667681
private Boolean flowControl;
668682
private Boolean headersOnly;
@@ -1142,6 +1156,16 @@ public Builder numReplicas(Integer numReplicas) {
11421156
return this;
11431157
}
11441158

1159+
/**
1160+
* Sets the time to pause the consumer until.
1161+
* @param pauseUntil the time to pause
1162+
* @return Builder
1163+
*/
1164+
public Builder pauseUntil(ZonedDateTime pauseUntil) {
1165+
this.pauseUntil = pauseUntil;
1166+
return this;
1167+
}
1168+
11451169
/**
11461170
* set the headers only flag saying to deliver only the headers of
11471171
* messages in the stream and not the bodies

src/main/java/io/nats/client/api/ConsumerInfo.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.nats.client.Message;
1717
import io.nats.client.support.JsonValue;
1818

19+
import java.time.Duration;
1920
import java.time.ZonedDateTime;
2021

2122
import static io.nats.client.support.ApiConstants.*;
@@ -36,6 +37,8 @@ public class ConsumerInfo extends ApiResponse<ConsumerInfo> {
3637
private final long numWaiting;
3738
private final long numAckPending;
3839
private final long numRedelivered;
40+
private final boolean paused;
41+
private final Duration pauseRemaining;
3942
private final ClusterInfo clusterInfo;
4043
private final boolean pushBound;
4144
private final ZonedDateTime timestamp;
@@ -59,6 +62,8 @@ public ConsumerInfo(JsonValue vConsumerInfo) {
5962
numRedelivered = readLong(jv, NUM_REDELIVERED, 0);
6063
numPending = readLong(jv, NUM_PENDING, 0);
6164
numWaiting = readLong(jv, NUM_WAITING, 0);
65+
paused = readBoolean(jv, PAUSED, false);
66+
pauseRemaining = readNanos(jv, PAUSE_REMAINING);
6267

6368
clusterInfo = ClusterInfo.optionalInstance(readValue(jv, CLUSTER));
6469
pushBound = readBoolean(jv, PUSH_BOUND);
@@ -110,6 +115,14 @@ public long getRedelivered() {
110115
return numRedelivered;
111116
}
112117

118+
public boolean getPaused() {
119+
return paused;
120+
}
121+
122+
public Duration getPauseRemaining() {
123+
return pauseRemaining;
124+
}
125+
113126
public ClusterInfo getClusterInfo() {
114127
return clusterInfo;
115128
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2024 The NATS Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at:
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package io.nats.client.api;
15+
16+
import io.nats.client.support.JsonSerializable;
17+
import io.nats.client.support.JsonUtils;
18+
import java.time.ZonedDateTime;
19+
20+
import static io.nats.client.support.ApiConstants.CONFIG;
21+
import static io.nats.client.support.ApiConstants.PAUSE_UNTIL;
22+
import static io.nats.client.support.ApiConstants.STREAM_NAME;
23+
import static io.nats.client.support.JsonUtils.addField;
24+
import static io.nats.client.support.JsonUtils.beginJson;
25+
import static io.nats.client.support.JsonUtils.endJson;
26+
27+
/**
28+
* Object used to make a request to pause a consumer. Used Internally
29+
*/
30+
public class ConsumerPauseRequest implements JsonSerializable {
31+
private final ZonedDateTime pauseUntil;
32+
33+
public ConsumerPauseRequest(ZonedDateTime pauseUntil) {
34+
this.pauseUntil = pauseUntil;
35+
}
36+
37+
@Override
38+
public String toJson() {
39+
StringBuilder sb = beginJson();
40+
41+
addField(sb, PAUSE_UNTIL, pauseUntil);
42+
43+
return endJson(sb).toString();
44+
}
45+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright 2024 The NATS Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at:
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package io.nats.client.api;
15+
16+
import io.nats.client.Message;
17+
import java.time.Duration;
18+
import java.time.ZonedDateTime;
19+
20+
import static io.nats.client.support.ApiConstants.PAUSED;
21+
import static io.nats.client.support.ApiConstants.PAUSE_REMAINING;
22+
import static io.nats.client.support.ApiConstants.PAUSE_UNTIL;
23+
import static io.nats.client.support.JsonValueUtils.readBoolean;
24+
import static io.nats.client.support.JsonValueUtils.readDate;
25+
import static io.nats.client.support.JsonValueUtils.readLong;
26+
import static io.nats.client.support.JsonValueUtils.readNanos;
27+
28+
public class ConsumerPauseResponse extends ApiResponse<ConsumerPauseResponse> {
29+
30+
private final boolean paused;
31+
private final ZonedDateTime pauseUntil;
32+
private final Duration pauseRemaining;
33+
34+
public ConsumerPauseResponse(Message msg) {
35+
super(msg);
36+
paused = readBoolean(jv, PAUSED);
37+
pauseUntil = readDate(jv, PAUSE_UNTIL);
38+
pauseRemaining = readNanos(jv, PAUSE_REMAINING);
39+
}
40+
41+
/**
42+
* Returns true if the consumer was paused
43+
* @return whether the consumer is paused
44+
*/
45+
public boolean isPaused() {
46+
return paused;
47+
}
48+
49+
/**
50+
* Returns the time until the consumer is paused
51+
* @return pause until time
52+
*/
53+
public ZonedDateTime getPauseUntil() {
54+
return pauseUntil;
55+
}
56+
57+
/**
58+
* Returns how much time is remaining for this consumer to be paused
59+
* @return remaining paused time
60+
*/
61+
public Duration getPauseRemaining() {
62+
return pauseRemaining;
63+
}
64+
}

src/main/java/io/nats/client/impl/NatsJetStream.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,7 @@ public List<String> getChanges(ConsumerConfiguration serverCc) {
516516
if (maxBatch != null && maxBatch != serverCcc.getMaxBatch()) { changes.add("maxBatch"); }
517517
if (maxBytes != null && maxBytes != serverCcc.getMaxBytes()) { changes.add("maxBytes"); }
518518
if (numReplicas != null && !numReplicas.equals(serverCcc.numReplicas)) { changes.add("numReplicas"); }
519+
if (pauseUntil != null && !pauseUntil.equals(serverCcc.pauseUntil)) { changes.add("pauseUntil"); }
519520

520521
if (ackWait != null && !ackWait.equals(getOrUnset(serverCcc.ackWait))) { changes.add("ackWait"); }
521522
if (idleHeartbeat != null && !idleHeartbeat.equals(getOrUnset(serverCcc.idleHeartbeat))) { changes.add("idleHeartbeat"); }

src/main/java/io/nats/client/impl/NatsJetStreamManagement.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import java.nio.charset.StandardCharsets;
22+
import java.time.ZonedDateTime;
2223
import java.util.List;
2324

2425
import static io.nats.client.support.Validator.*;
@@ -142,6 +143,32 @@ public boolean deleteConsumer(String streamName, String consumerName) throws IOE
142143
return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
143144
}
144145

146+
/**
147+
* {@inheritDoc}
148+
*/
149+
@Override
150+
public ConsumerPauseResponse pauseConsumer(String streamName, String consumerName, ZonedDateTime pauseUntil) throws IOException, JetStreamApiException {
151+
validateNotNull(streamName, "Stream Name");
152+
validateNotNull(consumerName, "Consumer Name");
153+
String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName);
154+
ConsumerPauseRequest pauseRequest = new ConsumerPauseRequest(pauseUntil);
155+
Message resp = makeRequestResponseRequired(subj, pauseRequest.serialize(), jso.getRequestTimeout());
156+
return new ConsumerPauseResponse(resp).throwOnHasError();
157+
}
158+
159+
/**
160+
* {@inheritDoc}
161+
*/
162+
@Override
163+
public boolean resumeConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException {
164+
validateNotNull(streamName, "Stream Name");
165+
validateNotNull(consumerName, "Consumer Name");
166+
String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName);
167+
Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout());
168+
ConsumerPauseResponse response = new ConsumerPauseResponse(resp).throwOnHasError();
169+
return !response.isPaused();
170+
}
171+
145172
/**
146173
* {@inheritDoc}
147174
*/

src/main/java/io/nats/client/support/ApiConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ public interface ApiConstants {
145145
String OPT_START_SEQ = "opt_start_seq";
146146
String OPT_START_TIME = "opt_start_time";
147147
String OPTIONS = "options";
148+
String PAUSED = "paused";
149+
String PAUSE_REMAINING = "pause_remaining";
150+
String PAUSE_UNTIL = "pause_until";
148151
String PLACEMENT = "placement";
149152
String PORT = "port";
150153
String PROCESSING_TIME = "processing_time";

0 commit comments

Comments
 (0)