2222import static com .rabbitmq .stream .impl .Utils .noOpConsumer ;
2323import static java .lang .String .format ;
2424import static java .lang .String .join ;
25+ import static java .util .Arrays .asList ;
2526import static java .util .concurrent .TimeUnit .SECONDS ;
27+ import static java .util .stream .StreamSupport .stream ;
2628
2729import com .rabbitmq .stream .AuthenticationFailureException ;
2830import com .rabbitmq .stream .ByteCapacity ;
8385import java .net .ConnectException ;
8486import java .net .InetSocketAddress ;
8587import java .net .SocketAddress ;
88+ import java .nio .charset .Charset ;
8689import java .nio .charset .StandardCharsets ;
8790import java .time .Duration ;
88- import java .util .ArrayList ;
89- import java .util .Collections ;
90- import java .util .HashMap ;
91- import java .util .HashSet ;
92- import java .util .List ;
93- import java .util .Map ;
94- import java .util .Objects ;
95- import java .util .Set ;
91+ import java .util .*;
9692import java .util .concurrent .ConcurrentHashMap ;
9793import java .util .concurrent .ConcurrentMap ;
9894import java .util .concurrent .CopyOnWriteArrayList ;
127123 */
128124public class Client implements AutoCloseable {
129125
126+ private static final Charset CHARSET = StandardCharsets .UTF_8 ;
130127 public static final int DEFAULT_PORT = 5552 ;
131128 public static final int DEFAULT_TLS_PORT = 5551 ;
132129 static final OutboundEntityWriteCallback OUTBOUND_MESSAGE_WRITE_CALLBACK =
@@ -446,12 +443,7 @@ int maxFrameSize() {
446443 }
447444
448445 private Map <String , String > peerProperties () {
449- int clientPropertiesSize = 4 ; // size of the map, always there
450- if (!clientProperties .isEmpty ()) {
451- for (Map .Entry <String , String > entry : clientProperties .entrySet ()) {
452- clientPropertiesSize += 2 + entry .getKey ().length () + 2 + entry .getValue ().length ();
453- }
454- }
446+ int clientPropertiesSize = mapSize (this .clientProperties );
455447 int length = 2 + 2 + 4 + clientPropertiesSize ;
456448 int correlationId = correlationSequence .incrementAndGet ();
457449 try {
@@ -460,13 +452,7 @@ private Map<String, String> peerProperties() {
460452 bb .writeShort (encodeRequestCode (COMMAND_PEER_PROPERTIES ));
461453 bb .writeShort (VERSION_1 );
462454 bb .writeInt (correlationId );
463- bb .writeInt (clientProperties .size ());
464- for (Map .Entry <String , String > entry : clientProperties .entrySet ()) {
465- bb .writeShort (entry .getKey ().length ())
466- .writeBytes (entry .getKey ().getBytes (StandardCharsets .UTF_8 ))
467- .writeShort (entry .getValue ().length ())
468- .writeBytes (entry .getValue ().getBytes (StandardCharsets .UTF_8 ));
469- }
455+ writeMap (bb , this .clientProperties );
470456 OutstandingRequest <Map <String , String >> request = outstandingRequest ();
471457 outstandingRequests .put (correlationId , request );
472458 channel .writeAndFlush (bb );
@@ -540,7 +526,7 @@ private SaslAuthenticateResponse sendSaslAuthenticate(
540526 bb .writeShort (VERSION_1 );
541527 bb .writeInt (correlationId );
542528 bb .writeShort (saslMechanism .getName ().length ());
543- bb .writeBytes (saslMechanism .getName ().getBytes (StandardCharsets . UTF_8 ));
529+ bb .writeBytes (saslMechanism .getName ().getBytes (CHARSET ));
544530 if (challengeResponse == null ) {
545531 bb .writeInt (-1 );
546532 } else {
@@ -570,7 +556,7 @@ private Map<String, String> open(String virtualHost) {
570556 bb .writeShort (VERSION_1 );
571557 bb .writeInt (correlationId );
572558 bb .writeShort (virtualHost .length ());
573- bb .writeBytes (virtualHost .getBytes (StandardCharsets . UTF_8 ));
559+ bb .writeBytes (virtualHost .getBytes (CHARSET ));
574560 OutstandingRequest <OpenResponse > request = outstandingRequest ();
575561 outstandingRequests .put (correlationId , request );
576562 channel .writeAndFlush (bb );
@@ -612,7 +598,7 @@ private void sendClose(short code, String reason) {
612598 bb .writeInt (correlationId );
613599 bb .writeShort (code );
614600 bb .writeShort (reason .length ());
615- bb .writeBytes (reason .getBytes (StandardCharsets . UTF_8 ));
601+ bb .writeBytes (reason .getBytes (CHARSET ));
616602 OutstandingRequest <Response > request = outstandingRequest ();
617603 outstandingRequests .put (correlationId , request );
618604 channel .writeAndFlush (bb );
@@ -662,10 +648,7 @@ public Response create(String stream) {
662648 }
663649
664650 public Response create (String stream , Map <String , String > arguments ) {
665- int length = 2 + 2 + 4 + 2 + stream .length () + 4 ;
666- for (Map .Entry <String , String > argument : arguments .entrySet ()) {
667- length = length + 2 + argument .getKey ().length () + 2 + argument .getValue ().length ();
668- }
651+ int length = 2 + 2 + 4 + 2 + stream .length () + mapSize (arguments );
669652 int correlationId = correlationSequence .incrementAndGet ();
670653 try {
671654 ByteBuf bb = allocate (length + 4 );
@@ -674,14 +657,8 @@ public Response create(String stream, Map<String, String> arguments) {
674657 bb .writeShort (VERSION_1 );
675658 bb .writeInt (correlationId );
676659 bb .writeShort (stream .length ());
677- bb .writeBytes (stream .getBytes (StandardCharsets .UTF_8 ));
678- bb .writeInt (arguments .size ());
679- for (Map .Entry <String , String > argument : arguments .entrySet ()) {
680- bb .writeShort (argument .getKey ().length ());
681- bb .writeBytes (argument .getKey ().getBytes (StandardCharsets .UTF_8 ));
682- bb .writeShort (argument .getValue ().length ());
683- bb .writeBytes (argument .getValue ().getBytes (StandardCharsets .UTF_8 ));
684- }
660+ bb .writeBytes (stream .getBytes (CHARSET ));
661+ writeMap (bb , arguments );
685662 OutstandingRequest <Response > request = outstandingRequest ();
686663 outstandingRequests .put (correlationId , request );
687664 channel .writeAndFlush (bb );
@@ -696,6 +673,116 @@ public Response create(String stream, Map<String, String> arguments) {
696673 }
697674 }
698675
676+ Response createSuperStream (
677+ String superStream ,
678+ List <String > partitions ,
679+ List <String > routingKeys ,
680+ Map <String , String > arguments ) {
681+ if (partitions .isEmpty () || routingKeys .isEmpty ()) {
682+ throw new IllegalArgumentException (
683+ "Partitions and routing keys of a super stream cannot be empty" );
684+ }
685+ if (partitions .size () != routingKeys .size ()) {
686+ throw new IllegalArgumentException (
687+ "Partitions and routing keys of a super stream must have "
688+ + "the same number of elements" );
689+ }
690+ int length =
691+ 2
692+ + 2
693+ + 4
694+ + 2
695+ + superStream .length ()
696+ + collectionSize (partitions )
697+ + collectionSize (routingKeys )
698+ + mapSize (arguments );
699+ int correlationId = correlationSequence .incrementAndGet ();
700+ try {
701+ ByteBuf bb = allocate (length + 4 );
702+ bb .writeInt (length );
703+ bb .writeShort (encodeRequestCode (COMMAND_CREATE_SUPER_STREAM ));
704+ bb .writeShort (VERSION_1 );
705+ bb .writeInt (correlationId );
706+ bb .writeShort (superStream .length ());
707+ bb .writeBytes (superStream .getBytes (CHARSET ));
708+ writeCollection (bb , partitions );
709+ writeCollection (bb , routingKeys );
710+ writeMap (bb , arguments );
711+ OutstandingRequest <Response > request = outstandingRequest ();
712+ outstandingRequests .put (correlationId , request );
713+ channel .writeAndFlush (bb );
714+ request .block ();
715+ return request .response .get ();
716+ } catch (StreamException e ) {
717+ outstandingRequests .remove (correlationId );
718+ throw e ;
719+ } catch (RuntimeException e ) {
720+ outstandingRequests .remove (correlationId );
721+ throw new StreamException (format ("Error while creating super stream '%s'" , superStream ), e );
722+ }
723+ }
724+
725+ Response deleteSuperStream (String superStream ) {
726+ int length = 2 + 2 + 4 + 2 + superStream .length ();
727+ int correlationId = correlationSequence .incrementAndGet ();
728+ try {
729+ ByteBuf bb = allocate (length + 4 );
730+ bb .writeInt (length );
731+ bb .writeShort (encodeRequestCode (COMMAND_DELETE_SUPER_STREAM ));
732+ bb .writeShort (VERSION_1 );
733+ bb .writeInt (correlationId );
734+ bb .writeShort (superStream .length ());
735+ bb .writeBytes (superStream .getBytes (CHARSET ));
736+ OutstandingRequest <Response > request = outstandingRequest ();
737+ outstandingRequests .put (correlationId , request );
738+ channel .writeAndFlush (bb );
739+ request .block ();
740+ return request .response .get ();
741+ } catch (StreamException e ) {
742+ outstandingRequests .remove (correlationId );
743+ throw e ;
744+ } catch (RuntimeException e ) {
745+ outstandingRequests .remove (correlationId );
746+ throw new StreamException (format ("Error while deleting stream '%s'" , superStream ), e );
747+ }
748+ }
749+
750+ private static int collectionSize (Collection <String > elements ) {
751+ return 4 + elements .stream ().mapToInt (v -> 2 + v .length ()).sum ();
752+ }
753+
754+ private static int arraySize (String ... elements ) {
755+ return 4 + collectionSize (asList (elements ));
756+ }
757+
758+ private static int mapSize (Map <String , String > elements ) {
759+ return 4
760+ + elements .entrySet ().stream ()
761+ .mapToInt (e -> 2 + e .getKey ().length () + 2 + e .getValue ().length ())
762+ .sum ();
763+ }
764+
765+ private static ByteBuf writeCollection (ByteBuf bb , Collection <String > elements ) {
766+ bb .writeInt (elements .size ());
767+ elements .forEach (e -> bb .writeShort (e .length ()).writeBytes (e .getBytes (CHARSET )));
768+ return bb ;
769+ }
770+
771+ private static ByteBuf writeArray (ByteBuf bb , String ... elements ) {
772+ return writeCollection (bb , asList (elements ));
773+ }
774+
775+ private static ByteBuf writeMap (ByteBuf bb , Map <String , String > elements ) {
776+ bb .writeInt (elements .size ());
777+ elements .forEach (
778+ (key , value ) ->
779+ bb .writeShort (key .length ())
780+ .writeBytes (key .getBytes (CHARSET ))
781+ .writeShort (value .length ())
782+ .writeBytes (value .getBytes (CHARSET )));
783+ return bb ;
784+ }
785+
699786 ByteBuf allocate (ByteBufAllocator allocator , int capacity ) {
700787 if (frameSizeCopped && capacity > this .maxFrameSize ()) {
701788 throw new IllegalArgumentException (
@@ -729,7 +816,7 @@ public Response delete(String stream) {
729816 bb .writeShort (VERSION_1 );
730817 bb .writeInt (correlationId );
731818 bb .writeShort (stream .length ());
732- bb .writeBytes (stream .getBytes (StandardCharsets . UTF_8 ));
819+ bb .writeBytes (stream .getBytes (CHARSET ));
733820 OutstandingRequest <Response > request = outstandingRequest ();
734821 outstandingRequests .put (correlationId , request );
735822 channel .writeAndFlush (bb );
@@ -748,23 +835,15 @@ public Map<String, StreamMetadata> metadata(String... streams) {
748835 if (streams == null || streams .length == 0 ) {
749836 throw new IllegalArgumentException ("At least one stream must be specified" );
750837 }
751- int length = 2 + 2 + 4 + 4 ; // API code, version, correlation ID, size of array
752- for (String stream : streams ) {
753- length += 2 ;
754- length += stream .length ();
755- }
838+ int length = 2 + 2 + 4 + arraySize (streams ); // API code, version, correlation ID, array size
756839 int correlationId = correlationSequence .incrementAndGet ();
757840 try {
758841 ByteBuf bb = allocate (length + 4 );
759842 bb .writeInt (length );
760843 bb .writeShort (encodeRequestCode (COMMAND_METADATA ));
761844 bb .writeShort (VERSION_1 );
762845 bb .writeInt (correlationId );
763- bb .writeInt (streams .length );
764- for (String stream : streams ) {
765- bb .writeShort (stream .length ());
766- bb .writeBytes (stream .getBytes (StandardCharsets .UTF_8 ));
767- }
846+ writeArray (bb , streams );
768847 OutstandingRequest <Map <String , StreamMetadata >> request = outstandingRequest ();
769848 outstandingRequests .put (correlationId , request );
770849 channel .writeAndFlush (bb );
@@ -800,10 +879,10 @@ public Response declarePublisher(byte publisherId, String publisherReference, St
800879 bb .writeByte (publisherId );
801880 bb .writeShort (publisherReferenceSize );
802881 if (publisherReferenceSize > 0 ) {
803- bb .writeBytes (publisherReference .getBytes (StandardCharsets . UTF_8 ));
882+ bb .writeBytes (publisherReference .getBytes (CHARSET ));
804883 }
805884 bb .writeShort (stream .length ());
806- bb .writeBytes (stream .getBytes (StandardCharsets . UTF_8 ));
885+ bb .writeBytes (stream .getBytes (CHARSET ));
807886 OutstandingRequest <Response > request = outstandingRequest ();
808887 outstandingRequests .put (correlationId , request );
809888 channel .writeAndFlush (bb );
@@ -1142,10 +1221,7 @@ public Response subscribe(
11421221 }
11431222 int propertiesSize = 0 ;
11441223 if (properties != null && !properties .isEmpty ()) {
1145- propertiesSize = 4 ; // size of the map
1146- for (Map .Entry <String , String > entry : properties .entrySet ()) {
1147- propertiesSize += 2 + entry .getKey ().length () + 2 + entry .getValue ().length ();
1148- }
1224+ propertiesSize = mapSize (properties );
11491225 }
11501226 length += propertiesSize ;
11511227 int correlationId = correlationSequence .getAndIncrement ();
@@ -1157,20 +1233,14 @@ public Response subscribe(
11571233 bb .writeInt (correlationId );
11581234 bb .writeByte (subscriptionId );
11591235 bb .writeShort (stream .length ());
1160- bb .writeBytes (stream .getBytes (StandardCharsets . UTF_8 ));
1236+ bb .writeBytes (stream .getBytes (CHARSET ));
11611237 bb .writeShort (offsetSpecification .getType ());
11621238 if (offsetSpecification .isOffset () || offsetSpecification .isTimestamp ()) {
11631239 bb .writeLong (offsetSpecification .getOffset ());
11641240 }
11651241 bb .writeShort (initialCredits );
11661242 if (properties != null && !properties .isEmpty ()) {
1167- bb .writeInt (properties .size ());
1168- for (Map .Entry <String , String > entry : properties .entrySet ()) {
1169- bb .writeShort (entry .getKey ().length ())
1170- .writeBytes (entry .getKey ().getBytes (StandardCharsets .UTF_8 ))
1171- .writeShort (entry .getValue ().length ())
1172- .writeBytes (entry .getValue ().getBytes (StandardCharsets .UTF_8 ));
1173- }
1243+ writeMap (bb , properties );
11741244 }
11751245 OutstandingRequest <Response > request = outstandingRequest ();
11761246 outstandingRequests .put (correlationId , request );
@@ -1205,9 +1275,9 @@ public void storeOffset(String reference, String stream, long offset) {
12051275 bb .writeShort (encodeRequestCode (COMMAND_STORE_OFFSET ));
12061276 bb .writeShort (VERSION_1 );
12071277 bb .writeShort (reference .length ());
1208- bb .writeBytes (reference .getBytes (StandardCharsets . UTF_8 ));
1278+ bb .writeBytes (reference .getBytes (CHARSET ));
12091279 bb .writeShort (stream .length ());
1210- bb .writeBytes (stream .getBytes (StandardCharsets . UTF_8 ));
1280+ bb .writeBytes (stream .getBytes (CHARSET ));
12111281 bb .writeLong (offset );
12121282 channel .writeAndFlush (bb );
12131283 }
@@ -1230,9 +1300,9 @@ public QueryOffsetResponse queryOffset(String reference, String stream) {
12301300 bb .writeShort (VERSION_1 );
12311301 bb .writeInt (correlationId );
12321302 bb .writeShort (reference .length ());
1233- bb .writeBytes (reference .getBytes (StandardCharsets . UTF_8 ));
1303+ bb .writeBytes (reference .getBytes (CHARSET ));
12341304 bb .writeShort (stream .length ());
1235- bb .writeBytes (stream .getBytes (StandardCharsets . UTF_8 ));
1305+ bb .writeBytes (stream .getBytes (CHARSET ));
12361306 OutstandingRequest <QueryOffsetResponse > request = outstandingRequest ();
12371307 outstandingRequests .put (correlationId , request );
12381308 channel .writeAndFlush (bb );
@@ -1271,9 +1341,9 @@ public long queryPublisherSequence(String publisherReference, String stream) {
12711341 bb .writeShort (VERSION_1 );
12721342 bb .writeInt (correlationId );
12731343 bb .writeShort (publisherReference .length ());
1274- bb .writeBytes (publisherReference .getBytes (StandardCharsets . UTF_8 ));
1344+ bb .writeBytes (publisherReference .getBytes (CHARSET ));
12751345 bb .writeShort (stream .length ());
1276- bb .writeBytes (stream .getBytes (StandardCharsets . UTF_8 ));
1346+ bb .writeBytes (stream .getBytes (CHARSET ));
12771347 OutstandingRequest <QueryPublisherSequenceResponse > request = outstandingRequest ();
12781348 outstandingRequests .put (correlationId , request );
12791349 channel .writeAndFlush (bb );
@@ -1436,9 +1506,9 @@ public List<String> route(String routingKey, String superStream) {
14361506 bb .writeShort (VERSION_1 );
14371507 bb .writeInt (correlationId );
14381508 bb .writeShort (routingKey .length ());
1439- bb .writeBytes (routingKey .getBytes (StandardCharsets . UTF_8 ));
1509+ bb .writeBytes (routingKey .getBytes (CHARSET ));
14401510 bb .writeShort (superStream .length ());
1441- bb .writeBytes (superStream .getBytes (StandardCharsets . UTF_8 ));
1511+ bb .writeBytes (superStream .getBytes (CHARSET ));
14421512 OutstandingRequest <List <String >> request = outstandingRequest ();
14431513 outstandingRequests .put (correlationId , request );
14441514 channel .writeAndFlush (bb );
@@ -1471,7 +1541,7 @@ public List<String> partitions(String superStream) {
14711541 bb .writeShort (VERSION_1 );
14721542 bb .writeInt (correlationId );
14731543 bb .writeShort (superStream .length ());
1474- bb .writeBytes (superStream .getBytes (StandardCharsets . UTF_8 ));
1544+ bb .writeBytes (superStream .getBytes (CHARSET ));
14751545 OutstandingRequest <List <String >> request = outstandingRequest ();
14761546 outstandingRequests .put (correlationId , request );
14771547 channel .writeAndFlush (bb );
@@ -1532,7 +1602,7 @@ StreamStatsResponse streamStats(String stream) {
15321602 bb .writeShort (VERSION_1 );
15331603 bb .writeInt (correlationId );
15341604 bb .writeShort (stream .length ());
1535- bb .writeBytes (stream .getBytes (StandardCharsets . UTF_8 ));
1605+ bb .writeBytes (stream .getBytes (CHARSET ));
15361606 OutstandingRequest <StreamStatsResponse > request = outstandingRequest ();
15371607 outstandingRequests .put (correlationId , request );
15381608 channel .writeAndFlush (bb );
0 commit comments