Skip to content

Commit 6cc44ed

Browse files
committed
Fall back to AMQP to create super stream topology
If RabbitMQ < 3.13. For tests.
1 parent 8bcbd60 commit 6cc44ed

File tree

2 files changed

+67
-7
lines changed

2 files changed

+67
-7
lines changed

src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ void init(TestInfo info) {
5252
}
5353

5454
@Test
55+
@TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0)
5556
void createDelete() {
5657
Client c = cf.get();
5758
Client.Response response = c.createSuperStream(s, partitions, routingKeys, null);
@@ -81,6 +82,7 @@ void createDelete() {
8182
}
8283

8384
@Test
85+
@TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0)
8486
void clientWithSubscriptionShouldReceiveNotificationOnDeletion() throws Exception {
8587
Client c = cf.get();
8688
Client.Response response = c.createSuperStream(s, partitions, routingKeys, null);
@@ -109,6 +111,7 @@ void clientWithSubscriptionShouldReceiveNotificationOnDeletion() throws Exceptio
109111
}
110112

111113
@Test
114+
@TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0)
112115
void authorisation() throws Exception {
113116
String user = "stream";
114117
// routing keys do not matter for authorisation

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,19 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static io.vavr.Tuple.*;
1617
import static java.lang.String.format;
18+
import static java.util.Arrays.asList;
1719
import static java.util.concurrent.TimeUnit.SECONDS;
1820
import static java.util.stream.Collectors.toList;
1921
import static org.assertj.core.api.Assertions.assertThat;
2022
import static org.junit.jupiter.api.Assertions.fail;
2123

2224
import ch.qos.logback.classic.Level;
25+
import com.rabbitmq.client.BuiltinExchangeType;
26+
import com.rabbitmq.client.Channel;
27+
import com.rabbitmq.client.Connection;
28+
import com.rabbitmq.client.ConnectionFactory;
2329
import com.rabbitmq.stream.Address;
2430
import com.rabbitmq.stream.Constants;
2531
import com.rabbitmq.stream.Host;
@@ -32,6 +38,7 @@
3238
import com.rabbitmq.stream.impl.Client.StreamMetadata;
3339
import io.netty.channel.EventLoopGroup;
3440
import io.netty.channel.nio.NioEventLoopGroup;
41+
import io.vavr.Tuple2;
3542
import java.io.BufferedReader;
3643
import java.io.IOException;
3744
import java.io.InputStream;
@@ -46,11 +53,7 @@
4653
import java.nio.charset.StandardCharsets;
4754
import java.time.Duration;
4855
import java.util.*;
49-
import java.util.concurrent.ConcurrentHashMap;
50-
import java.util.concurrent.CountDownLatch;
51-
import java.util.concurrent.ExecutorService;
52-
import java.util.concurrent.Executors;
53-
import java.util.concurrent.TimeUnit;
56+
import java.util.concurrent.*;
5457
import java.util.concurrent.atomic.AtomicLong;
5558
import java.util.concurrent.atomic.AtomicReference;
5659
import java.util.function.*;
@@ -81,6 +84,8 @@ public final class TestUtils {
8184

8285
private static final Duration DEFAULT_CONDITION_TIMEOUT = Duration.ofSeconds(10);
8386

87+
private static final ConnectionFactory AMQP_CF = new ConnectionFactory();
88+
8489
private TestUtils() {}
8590

8691
public static Duration waitAtMost(CallableBooleanSupplier condition) throws Exception {
@@ -281,11 +286,59 @@ static void declareSuperStreamTopology(Client client, String superStream, int pa
281286
static void declareSuperStreamTopology(Client client, String superStream, String... rks) {
282287
List<String> partitions =
283288
Arrays.stream(rks).map(rk -> superStream + "-" + rk).collect(toList());
284-
client.createSuperStream(superStream, partitions, Arrays.asList(rks), null);
289+
if (atLeastVersion("3.13.0", client.brokerVersion())) {
290+
client.createSuperStream(superStream, partitions, asList(rks), null);
291+
} else {
292+
try (Connection connection = connection();
293+
Channel ch = connection.createChannel()) {
294+
ch.exchangeDeclare(
295+
superStream,
296+
BuiltinExchangeType.DIRECT,
297+
true,
298+
false,
299+
Collections.singletonMap("x-super-stream", true));
300+
List<Tuple2<String, Integer>> bindings = new ArrayList<>(rks.length);
301+
for (int i = 0; i < rks.length; i++) {
302+
bindings.add(of(rks[i], i));
303+
}
304+
// shuffle the order to make sure we get in the correct order from the server
305+
Collections.shuffle(bindings);
306+
307+
for (Tuple2<String, Integer> binding : bindings) {
308+
String routingKey = binding._1();
309+
String partitionName = superStream + "-" + routingKey;
310+
ch.queueDeclare(
311+
partitionName,
312+
true,
313+
false,
314+
false,
315+
Collections.singletonMap("x-queue-type", "stream"));
316+
ch.queueBind(
317+
partitionName,
318+
superStream,
319+
routingKey,
320+
Collections.singletonMap("x-stream-partition-order", binding._2()));
321+
}
322+
} catch (Exception e) {
323+
throw new RuntimeException(e);
324+
}
325+
}
285326
}
286327

287328
static void deleteSuperStreamTopology(Client client, String superStream) {
288-
client.deleteSuperStream(superStream);
329+
if (atLeastVersion("3.13.0", client.brokerVersion())) {
330+
client.deleteSuperStream(superStream);
331+
} else {
332+
try (Connection connection = connection();
333+
Channel ch = connection.createChannel()) {
334+
ch.exchangeDelete(superStream);
335+
for (String partition : client.partitions(superStream)) {
336+
ch.queueDelete(partition);
337+
}
338+
} catch (Exception e) {
339+
throw new RuntimeException(e);
340+
}
341+
}
289342
}
290343

291344
public static String streamName(TestInfo info) {
@@ -1034,4 +1087,8 @@ static void repeatIfFailure(RunnableWithException test) throws Exception {
10341087
throw (Exception) lastException;
10351088
}
10361089
}
1090+
1091+
private static Connection connection() throws IOException, TimeoutException {
1092+
return AMQP_CF.newConnection();
1093+
}
10371094
}

0 commit comments

Comments
 (0)