|
25 | 25 | import java.nio.charset.StandardCharsets; |
26 | 26 | import java.time.ZonedDateTime; |
27 | 27 | import java.util.ArrayList; |
| 28 | +import java.util.Arrays; |
| 29 | +import java.util.Collections; |
28 | 30 | import java.util.List; |
29 | 31 |
|
30 | 32 | import static io.nats.client.support.NatsConstants.DOT; |
@@ -237,28 +239,41 @@ private PublishAck _write(String key, byte[] data, Headers h) throws IOException |
237 | 239 |
|
238 | 240 | @Override |
239 | 241 | public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException { |
240 | | - validateKvKeyWildcardAllowedRequired(key); |
241 | | - validateNotNull(watcher, "Watcher is required"); |
242 | | - return new NatsKeyValueWatchSubscription(this, key, watcher, -1, watchOptions); |
| 242 | + if (key.contains(",")) { |
| 243 | + return watch(Arrays.asList(key.split(",")), watcher, -1, watchOptions); |
| 244 | + } |
| 245 | + return watch(Collections.singletonList(key), watcher, -1, watchOptions); |
243 | 246 | } |
244 | 247 |
|
245 | 248 | @Override |
246 | 249 | public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException { |
247 | | - validateKvKeyWildcardAllowedRequired(key); |
| 250 | + if (key.contains(",")) { |
| 251 | + return watch(Arrays.asList(key.split(",")), watcher, fromRevision, watchOptions); |
| 252 | + } |
| 253 | + return watch(Collections.singletonList(key), watcher, fromRevision, watchOptions); |
| 254 | + } |
| 255 | + |
| 256 | + @Override |
| 257 | + public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException { |
| 258 | + return watch(keys, watcher, -1, watchOptions); |
| 259 | + } |
| 260 | + |
| 261 | + @Override |
| 262 | + public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException { |
| 263 | + // all watch methods (watch, watch all) delegate to here |
| 264 | + validateKvKeyWildcardAllowedRequired(keys); |
248 | 265 | validateNotNull(watcher, "Watcher is required"); |
249 | | - return new NatsKeyValueWatchSubscription(this, key, watcher, fromRevision, watchOptions); |
| 266 | + return new NatsKeyValueWatchSubscription(this, keys, watcher, fromRevision, watchOptions); |
250 | 267 | } |
251 | 268 |
|
252 | 269 | @Override |
253 | 270 | public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException { |
254 | | - validateNotNull(watcher, "Watcher is required"); |
255 | | - return new NatsKeyValueWatchSubscription(this, ">", watcher, -1, watchOptions); |
| 271 | + return new NatsKeyValueWatchSubscription(this, Collections.singletonList(">"), watcher, -1, watchOptions); |
256 | 272 | } |
257 | 273 |
|
258 | 274 | @Override |
259 | 275 | public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException { |
260 | | - validateNotNull(watcher, "Watcher is required"); |
261 | | - return new NatsKeyValueWatchSubscription(this, ">", watcher, fromRevision, watchOptions); |
| 276 | + return new NatsKeyValueWatchSubscription(this, Collections.singletonList(">"), watcher, fromRevision, watchOptions); |
262 | 277 | } |
263 | 278 |
|
264 | 279 | /** |
|
0 commit comments