Skip to content

Commit 61257e4

Browse files
authored
Direct Message Subject Header May Contain Multiple Subjects (#1016)
1 parent df1a607 commit 61257e4

File tree

4 files changed

+86
-26
lines changed

4 files changed

+86
-26
lines changed

src/main/java/io/nats/client/api/MessageInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public MessageInfo(Message msg, String streamName, boolean direct) {
6565

6666
if (direct) {
6767
this.headers = msg.getHeaders();
68-
this.subject = headers.getFirst(NATS_SUBJECT);
68+
this.subject = headers.getLast(NATS_SUBJECT);
6969
this.data = msg.getData();
7070
seq = Long.parseLong(headers.getFirst(NATS_SEQUENCE));
7171
time = DateTimeUtils.parseDateTime(headers.getFirst(NATS_TIMESTAMP));

src/main/java/io/nats/client/impl/Headers.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,17 @@ public String getFirst(String key) {
299299
return values == null ? null : values.get(0);
300300
}
301301

302+
/**
303+
* Returns the last value for the specific (case sensitive) key.
304+
* Will be {@code null} if the key is not found.
305+
*
306+
* @return the last value for the case sensitive key.
307+
*/
308+
public String getLast(String key) {
309+
List<String> values = valuesMap.get(key);
310+
return values == null ? null : values.get(values.size() - 1);
311+
}
312+
302313
/**
303314
* Returns a {@link List} view of the values for the specific (case insensitive) key.
304315
* Will be {@code null} if the key is not found.

src/test/java/io/nats/client/impl/HeadersTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,15 +357,19 @@ public void remove_collection_work() {
357357
}
358358

359359
@Test
360-
public void getFirsts() {
360+
public void testGetFirstGetLast() {
361361
Headers headers = new Headers();
362362
assertNull(headers.getFirst(KEY1));
363+
assertNull(headers.getLast(KEY1));
363364
headers.add(KEY1, VAL1);
364365
assertEquals(VAL1, headers.getFirst(KEY1));
366+
assertEquals(VAL1, headers.getLast(KEY1));
365367
headers.add(KEY1, VAL2);
366368
assertEquals(VAL1, headers.getFirst(KEY1));
369+
assertEquals(VAL2, headers.getLast(KEY1));
367370
headers.put(KEY1, VAL3);
368371
assertEquals(VAL3, headers.getFirst(KEY1));
372+
assertEquals(VAL3, headers.getLast(KEY1));
369373
}
370374

371375
private void remove(

src/test/java/io/nats/client/impl/JetStreamManagementTests.java

Lines changed: 69 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,9 @@ public void testStreamCreateWithNoSubject() throws Exception {
133133
JetStreamManagement jsm = nc.jetStreamManagement();
134134

135135
StreamConfiguration sc = StreamConfiguration.builder()
136-
.name(STREAM)
137-
.storageType(StorageType.Memory)
138-
.build();
136+
.name(STREAM)
137+
.storageType(StorageType.Memory)
138+
.build();
139139

140140
StreamInfo si = jsm.addStream(sc);
141141
assertTrue(now <= si.getCreateTime().toEpochSecond());
@@ -272,8 +272,8 @@ public void testAddUpdateStreamInvalids() throws Exception {
272272

273273
// cannot change MaxConsumers
274274
StreamConfiguration scMaxCon = getTestStreamConfigurationBuilder()
275-
.maxConsumers(2)
276-
.build();
275+
.maxConsumers(2)
276+
.build();
277277
assertThrows(JetStreamApiException.class, () -> jsm.updateStream(scMaxCon));
278278

279279
StreamConfiguration scReten = getTestStreamConfigurationBuilder()
@@ -306,9 +306,9 @@ private static StreamConfiguration getTestStreamConfiguration() {
306306

307307
private static StreamConfiguration.Builder getTestStreamConfigurationBuilder() {
308308
return StreamConfiguration.builder()
309-
.name(STREAM)
310-
.storageType(StorageType.Memory)
311-
.subjects(subject(0), subject(1));
309+
.name(STREAM)
310+
.storageType(StorageType.Memory)
311+
.subjects(subject(0), subject(1));
312312
}
313313

314314
@Test
@@ -596,7 +596,7 @@ public void testDeleteStream() throws Exception {
596596
runInJsServer(nc -> {
597597
JetStreamManagement jsm = nc.jetStreamManagement();
598598
JetStreamApiException jsapiEx =
599-
assertThrows(JetStreamApiException.class, () -> jsm.deleteStream(STREAM));
599+
assertThrows(JetStreamApiException.class, () -> jsm.deleteStream(STREAM));
600600
assertEquals(10059, jsapiEx.getApiErrorCode());
601601

602602
createDefaultTestStream(jsm);
@@ -681,7 +681,7 @@ public void testAddDeleteConsumer() throws Exception {
681681

682682
final ConsumerConfiguration cc = ConsumerConfiguration.builder().build();
683683
IllegalArgumentException iae =
684-
assertThrows(IllegalArgumentException.class, () -> jsm.addOrUpdateConsumer(null, cc));
684+
assertThrows(IllegalArgumentException.class, () -> jsm.addOrUpdateConsumer(null, cc));
685685
assertTrue(iae.getMessage().contains("Stream cannot be null or empty"));
686686
iae = assertThrows(IllegalArgumentException.class, () -> jsm.addOrUpdateConsumer(STREAM, null));
687687
assertTrue(iae.getMessage().contains("Config cannot be null"));
@@ -826,12 +826,12 @@ private ConsumerConfiguration prepForUpdateTest(JetStreamManagement jsm) throws
826826
catch (Exception e) { /* ignore */ }
827827

828828
ConsumerConfiguration cc = ConsumerConfiguration.builder()
829-
.durable(durable(1))
830-
.ackPolicy(AckPolicy.Explicit)
831-
.deliverSubject(deliver(1))
832-
.maxDeliver(3)
833-
.filterSubject(SUBJECT_GT)
834-
.build();
829+
.durable(durable(1))
830+
.ackPolicy(AckPolicy.Explicit)
831+
.deliverSubject(deliver(1))
832+
.maxDeliver(3)
833+
.filterSubject(SUBJECT_GT)
834+
.build();
835835
assertValidAddOrUpdate(jsm, cc);
836836
return cc;
837837
}
@@ -966,9 +966,9 @@ private void addConsumers(JetStreamManagement jsm, String stream, int count, Str
966966
for (int x = 1; x <= count; x++) {
967967
String dur = durable(durableVary, x);
968968
ConsumerConfiguration cc = ConsumerConfiguration.builder()
969-
.durable(dur)
970-
.filterSubject(filterSubject)
971-
.build();
969+
.durable(dur)
970+
.filterSubject(filterSubject)
971+
.build();
972972
ConsumerInfo ci = jsm.addOrUpdateConsumer(stream, cc);
973973
assertEquals(dur, ci.getName());
974974
assertEquals(dur, ci.getConsumerConfiguration().getDurable());
@@ -1091,16 +1091,16 @@ public void testConsumerReplica() throws Exception {
10911091
createMemoryStream(jsm, STREAM, subject(0), subject(1));
10921092

10931093
final ConsumerConfiguration cc0 = ConsumerConfiguration.builder()
1094-
.durable(durable(0))
1095-
.build();
1094+
.durable(durable(0))
1095+
.build();
10961096
ConsumerInfo ci = jsm.addOrUpdateConsumer(STREAM, cc0);
10971097
// server returns 0 when value is not set
10981098
assertEquals(0, ci.getConsumerConfiguration().getNumReplicas());
10991099

11001100
final ConsumerConfiguration cc1 = ConsumerConfiguration.builder()
1101-
.durable(durable(0))
1102-
.numReplicas(1)
1103-
.build();
1101+
.durable(durable(0))
1102+
.numReplicas(1)
1103+
.build();
11041104
ci = jsm.addOrUpdateConsumer(STREAM, cc1);
11051105
assertEquals(1, ci.getConsumerConfiguration().getNumReplicas());
11061106
});
@@ -1240,4 +1240,49 @@ private void validateMessageGetRequest(
12401240
assertEquals(lastBySubject != null, mgr.isLastBySubject());
12411241
assertEquals(nextBySubject != null, mgr.isNextBySubject());
12421242
}
1243+
1244+
@Test
1245+
public void testDirectMessageRepublishedSubject() throws Exception {
1246+
jsServer.run(nc -> {
1247+
JetStreamManagement jsm = nc.jetStreamManagement();
1248+
String streamBucketName = "sb-" + variant(null);
1249+
String subject = subject();
1250+
String streamSubject = subject + ".>";
1251+
String publishSubject1 = subject + ".one";
1252+
String publishSubject2 = subject + ".two";
1253+
String publishSubject3 = subject + ".three";
1254+
String republishDest = "$KV." + streamBucketName + ".>";
1255+
1256+
StreamConfiguration sc = StreamConfiguration.builder()
1257+
.name(streamBucketName)
1258+
.storageType(StorageType.Memory)
1259+
.subjects(streamSubject)
1260+
.republish(Republish.builder().source(">").destination(republishDest).build())
1261+
.build();
1262+
jsm.addStream(sc);
1263+
1264+
KeyValueConfiguration kvc = KeyValueConfiguration.builder().name(streamBucketName).build();
1265+
nc.keyValueManagement().create(kvc);
1266+
KeyValue kv = nc.keyValue(streamBucketName);
1267+
1268+
nc.publish(publishSubject1, "uno".getBytes());
1269+
nc.jetStream().publish(publishSubject2, "dos".getBytes());
1270+
kv.put(publishSubject3, "tres");
1271+
1272+
KeyValueEntry kve1 = kv.get(publishSubject1);
1273+
assertEquals(streamBucketName, kve1.getBucket());
1274+
assertEquals(publishSubject1, kve1.getKey());
1275+
assertEquals("uno", kve1.getValueAsString());
1276+
1277+
KeyValueEntry kve2 = kv.get(publishSubject2);
1278+
assertEquals(streamBucketName, kve2.getBucket());
1279+
assertEquals(publishSubject2, kve2.getKey());
1280+
assertEquals("dos", kve2.getValueAsString());
1281+
1282+
KeyValueEntry kve3 = kv.get(publishSubject3);
1283+
assertEquals(streamBucketName, kve3.getBucket());
1284+
assertEquals(publishSubject3, kve3.getKey());
1285+
assertEquals("tres", kve3.getValueAsString());
1286+
});
1287+
}
12431288
}

0 commit comments

Comments
 (0)