Skip to content

Commit fa1d133

Browse files
committed
Support super streams in StreamCreator
1 parent 9308d71 commit fa1d133

File tree

15 files changed

+317
-37
lines changed

15 files changed

+317
-37
lines changed

src/docs/asciidoc/super-streams.adoc

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,51 @@ When a super stream is in use, the stream Java client queries this information t
5858
From the application code point of view, using a super stream is mostly configuration-based.
5959
Some logic must also be provided to extract routing information from messages.
6060

61-
==== Super Stream Creation
61+
==== Super Stream Creation and Deletion
6262

63-
It is possible to create the topology of a super stream with any AMQP 0.9.1 library or with the https://www.rabbitmq.com/management.html[management plugin], but the `rabbitmq-streams add_super_stream` command is a handy shortcut.
64-
Here is how to create an invoices super stream with 3 partitions:
63+
It is possible to manage super streams with
64+
65+
* the stream Java client, by using `Environment#streamCreator()` and `Environment#deleteSuperStream(String)`
66+
* the `add_super_stream` and `delete_super_stream` commands in `rabbitmq-streams` (CLI)
67+
* any AMQP 0.9.1 client library
68+
* the https://www.rabbitmq.com/management.html[management plugin]
69+
70+
The stream Java client and the dedicated CLI commands are easier to use as they take care of the topology details (exchange, streams, and bindings).
71+
72+
===== With the Client Library
73+
74+
Here is how to create an `invoices` super stream with 5 partitions:
75+
76+
.Creating a super stream by specifying the number of partitions
77+
[source,java,indent=0]
78+
--------
79+
include::{test-examples}/SuperStreamUsage.java[tag=creation-partitions]
80+
--------
81+
82+
The super stream partitions will be `invoices-0`, `invoices-1`, ..., `invoices-5`.
83+
We use this kind of topology when routing keys of outbound messages are hashed to pick the partition to publish them to.
84+
This way, if the routing key is the customer ID of the invoice, all the invoices for a given customer end up in the same partition, and they can be processed in the publishing order.
85+
86+
It is also possible to specify routing keys when creating a super stream:
87+
88+
.Creating a super stream by specifying the routing keys
89+
[source,java,indent=0]
90+
--------
91+
include::{test-examples}/SuperStreamUsage.java[tag=creation-routing-keys]
92+
--------
93+
94+
The super stream partitions will be `invoices-amer`, `invoices-emea` and `invoices-apac` in this case.
95+
96+
Using one type of topology or the other depends on the use cases, especially how messages are processed.
97+
See the next sections on publishing and consuming to find out more.
98+
99+
===== With the CLI
100+
101+
Here is how to create an `invoices` super stream with 5 partitions:
65102

66103
.Creating a super stream from the CLI
67104
----
68-
rabbitmq-streams add_super_stream invoices --partitions 3
105+
rabbitmq-streams add_super_stream invoices --partitions 5
69106
----
70107

71108
Use `rabbitmq-streams add_super_stream --help` to learn more about the command.

src/main/java/com/rabbitmq/stream/Constants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").

src/main/java/com/rabbitmq/stream/Environment.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -58,9 +58,19 @@ static EnvironmentBuilder builder() {
5858
* Delete a stream
5959
*
6060
* @param stream the stream to delete
61+
* @since 0.15.0
6162
*/
6263
void deleteStream(String stream);
6364

65+
/**
66+
* Delete a super stream.
67+
*
68+
* <p>Requires RabbitMQ 3.13.0 or more.
69+
*
70+
* @param superStream the super stream to delete
71+
*/
72+
void deleteSuperStream(String superStream);
73+
6474
/**
6575
* Query statistics on a stream.
6676
*

src/main/java/com/rabbitmq/stream/StreamCreator.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -23,13 +23,24 @@ public interface StreamCreator {
2323
ByteCapacity MAX_SEGMENT_SIZE = ByteCapacity.from("3GB");
2424

2525
/**
26-
* The name of the stream
26+
* The name of the stream.
27+
*
28+
* <p>Alias for {@link #name(String)}.
2729
*
2830
* @param stream
2931
* @return this creator instance
3032
*/
3133
StreamCreator stream(String stream);
3234

35+
/**
36+
* The name of the (super) stream.
37+
*
38+
* @param name
39+
* @return this creator instance
40+
* @since 0.15.0
41+
*/
42+
StreamCreator name(String name);
43+
3344
/**
3445
* The maximum size of the stream before it gets truncated.
3546
*
@@ -80,6 +91,16 @@ public interface StreamCreator {
8091
*/
8192
StreamCreator filterSize(int size);
8293

94+
/**
95+
* Configure the super stream to create.
96+
*
97+
* <p>Requires RabbitMQ 3.13.0 or more.
98+
*
99+
* @return the super stream configuration
100+
* @since 0.15.0
101+
*/
102+
SuperStreamConfiguration superStream();
103+
83104
/**
84105
* Create the stream.
85106
*
@@ -142,4 +163,39 @@ public String value() {
142163
return this.value;
143164
}
144165
}
166+
167+
/**
168+
* Super stream configuration.
169+
*
170+
* @since 0.15.0
171+
*/
172+
interface SuperStreamConfiguration {
173+
174+
/**
175+
* The number of partitions of the super stream.
176+
*
177+
* <p>Mutually exclusive with {@link #routingKeys(String...)}. Default is 3.
178+
*
179+
* @param partitions
180+
* @return this super stream configuration instance
181+
*/
182+
SuperStreamConfiguration partitions(int partitions);
183+
184+
/**
185+
* The routing keys to use when declaring the super stream partitions.
186+
*
187+
* <p>Mutually exclusive with {@link #partitions(int)}. Default is null.
188+
*
189+
* @param routingKeys
190+
* @return this super stream configuration instance
191+
*/
192+
SuperStreamConfiguration routingKeys(String... routingKeys);
193+
194+
/**
195+
* Go back to the creator.
196+
*
197+
* @return the stream creator
198+
*/
199+
StreamCreator creator();
200+
}
145201
}

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,22 @@ public void deleteStream(String stream) {
460460
}
461461
}
462462

463+
@Override
464+
public void deleteSuperStream(String superStream) {
465+
checkNotClosed();
466+
this.maybeInitializeLocator();
467+
Client.Response response = this.locator().deleteSuperStream(superStream);
468+
if (!response.isOk()) {
469+
throw new StreamException(
470+
"Error while deleting super stream "
471+
+ superStream
472+
+ " ("
473+
+ formatConstant(response.getResponseCode())
474+
+ ")",
475+
response.getResponseCode());
476+
}
477+
}
478+
463479
@Override
464480
public StreamStats queryStreamStats(String stream) {
465481
checkNotClosed();

src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java

Lines changed: 102 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -15,27 +15,39 @@
1515

1616
import static com.rabbitmq.stream.impl.Utils.formatConstant;
1717
import static com.rabbitmq.stream.impl.Utils.namedFunction;
18+
import static java.util.stream.Collectors.toList;
1819

1920
import com.rabbitmq.stream.ByteCapacity;
2021
import com.rabbitmq.stream.Constants;
2122
import com.rabbitmq.stream.StreamCreator;
2223
import com.rabbitmq.stream.StreamException;
2324
import java.time.Duration;
25+
import java.util.Arrays;
26+
import java.util.List;
27+
import java.util.function.Function;
28+
import java.util.stream.IntStream;
2429

2530
class StreamStreamCreator implements StreamCreator {
2631

2732
private final StreamEnvironment environment;
2833
private final Client.StreamParametersBuilder streamParametersBuilder =
2934
new Client.StreamParametersBuilder().leaderLocator(LeaderLocator.LEAST_LEADERS);
30-
private String stream;
35+
private String name;
36+
private DefaultSuperStreamConfiguration superStreamConfiguration;
3137

3238
StreamStreamCreator(StreamEnvironment environment) {
3339
this.environment = environment;
3440
}
3541

3642
@Override
3743
public StreamCreator stream(String stream) {
38-
this.stream = stream;
44+
this.name = stream;
45+
return this;
46+
}
47+
48+
@Override
49+
public StreamCreator name(String name) {
50+
this.name = name;
3951
return this;
4052
}
4153

@@ -73,27 +85,104 @@ public StreamCreator filterSize(int size) {
7385
return this;
7486
}
7587

88+
@Override
89+
public SuperStreamConfiguration superStream() {
90+
if (this.superStreamConfiguration == null) {
91+
this.superStreamConfiguration = new DefaultSuperStreamConfiguration(this);
92+
}
93+
return this.superStreamConfiguration;
94+
}
95+
7696
@Override
7797
public void create() {
78-
if (stream == null) {
79-
throw new IllegalArgumentException("Stream cannot be null");
98+
if (name == null) {
99+
throw new IllegalArgumentException("Stream name cannot be null");
100+
}
101+
Function<Client, Client.Response> function;
102+
boolean superStream = this.superStreamConfiguration != null;
103+
if (superStream) {
104+
List<String> partitions, routingKeys;
105+
if (this.superStreamConfiguration.routingKeys == null) {
106+
partitions =
107+
IntStream.range(0, this.superStreamConfiguration.partitions)
108+
.mapToObj(i -> this.name + "-" + i)
109+
.collect(toList());
110+
routingKeys =
111+
IntStream.range(0, this.superStreamConfiguration.partitions)
112+
.mapToObj(String::valueOf)
113+
.collect(toList());
114+
} else {
115+
partitions =
116+
this.superStreamConfiguration.routingKeys.stream()
117+
.map(rk -> this.name + "-" + rk)
118+
.collect(toList());
119+
routingKeys = this.superStreamConfiguration.routingKeys;
120+
}
121+
function =
122+
namedFunction(
123+
c ->
124+
c.createSuperStream(
125+
this.name, partitions, routingKeys, streamParametersBuilder.build()),
126+
"Creation of super stream '%s'",
127+
this.name);
128+
} else {
129+
function =
130+
namedFunction(
131+
c -> c.create(name, streamParametersBuilder.build()),
132+
"Creation of stream '%s'",
133+
this.name);
80134
}
81135
this.environment.maybeInitializeLocator();
82-
Client.Response response =
83-
environment.locatorOperation(
84-
namedFunction(
85-
c -> c.create(stream, streamParametersBuilder.build()),
86-
"Creation of stream '%s'",
87-
this.stream));
136+
Client.Response response = environment.locatorOperation(function);
88137
if (!response.isOk()
89138
&& response.getResponseCode() != Constants.RESPONSE_CODE_STREAM_ALREADY_EXISTS) {
139+
String label = superStream ? "super stream" : "stream";
90140
throw new StreamException(
91-
"Error while creating stream '"
92-
+ stream
141+
"Error while creating "
142+
+ label
143+
+ " '"
144+
+ name
93145
+ "' ("
94146
+ formatConstant(response.getResponseCode())
95147
+ ")",
96148
response.getResponseCode());
97149
}
98150
}
151+
152+
private static class DefaultSuperStreamConfiguration implements SuperStreamConfiguration {
153+
154+
private final StreamCreator creator;
155+
156+
private int partitions = 3;
157+
private List<String> routingKeys = null;
158+
159+
private DefaultSuperStreamConfiguration(StreamCreator creator) {
160+
this.creator = creator;
161+
}
162+
163+
@Override
164+
public SuperStreamConfiguration partitions(int partitions) {
165+
if (partitions <= 0) {
166+
throw new IllegalArgumentException("The number of partitions must be greater than 0");
167+
}
168+
this.partitions = partitions;
169+
this.routingKeys = null;
170+
return this;
171+
}
172+
173+
@Override
174+
public SuperStreamConfiguration routingKeys(String... routingKeys) {
175+
if (routingKeys == null || routingKeys.length == 0) {
176+
throw new IllegalArgumentException("There must be at least 1 routing key");
177+
}
178+
this.routingKeys = Arrays.asList(routingKeys);
179+
this.partitions = -1;
180+
return this;
181+
}
182+
183+
@Override
184+
public StreamCreator creator() {
185+
return this.creator;
186+
}
187+
}
99188
}

src/test/java/com/rabbitmq/stream/Host.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").

src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,22 @@
2626

2727
public class SuperStreamUsage {
2828

29+
void creation() {
30+
Environment environment = Environment.builder().build();
31+
// tag::creation-partitions[]
32+
environment.streamCreator().name("invoices")
33+
.superStream()
34+
.partitions(5).creator()
35+
.create();
36+
// end::creation-partitions[]
37+
// tag::creation-routing-keys[]
38+
environment.streamCreator().name("invoices")
39+
.superStream()
40+
.routingKeys("amer", "emea", "apac").creator()
41+
.create();
42+
// end::creation-routing-keys[]
43+
}
44+
2945
void producerSimple() {
3046
Environment environment = Environment.builder().build();
3147
// tag::producer-simple[]

0 commit comments

Comments
 (0)