@@ -189,8 +189,9 @@ public long applyAsLong(Object value) {
189189 private final Duration rpcTimeout ;
190190 private final List <String > saslMechanisms ;
191191 private volatile ShutdownReason shutdownReason = null ;
192- private final Runnable exchangeCommandVersionsCheck ;
192+ private final Runnable streamStatsCommandVersionsCheck ;
193193 private final boolean filteringSupported ;
194+ private final Runnable superStreamManagementCommandVersionsCheck ;
194195
195196 public Client () {
196197 this (new ClientParameters ());
@@ -376,25 +377,36 @@ public void initChannel(SocketChannel ch) {
376377 tuneState .getHeartbeat ());
377378 this .connectionProperties = open (parameters .virtualHost );
378379 Set <FrameHandlerInfo > supportedCommands = maybeExchangeCommandVersions ();
379- AtomicReference < Runnable > exchangeCommandVersionsCheckReference = new AtomicReference <>( );
380+ AtomicBoolean streamStatsSupported = new AtomicBoolean ( false );
380381 AtomicBoolean filteringSupportedReference = new AtomicBoolean (false );
382+ AtomicBoolean superStreamManagementSupported = new AtomicBoolean (false );
381383 supportedCommands .forEach (
382384 c -> {
383385 if (c .getKey () == COMMAND_STREAM_STATS ) {
384- exchangeCommandVersionsCheckReference .set (() -> {} );
386+ streamStatsSupported .set (true );
385387 }
386388 if (c .getKey () == COMMAND_PUBLISH && c .getMaxVersion () >= VERSION_2 ) {
387389 filteringSupportedReference .set (true );
388390 }
391+ if (c .getKey () == COMMAND_CREATE_SUPER_STREAM ) {
392+ superStreamManagementSupported .set (true );
393+ }
389394 });
390- this .exchangeCommandVersionsCheck =
391- exchangeCommandVersionsCheckReference .get () == null
392- ? () -> {
395+ this .streamStatsCommandVersionsCheck =
396+ streamStatsSupported .get ()
397+ ? () -> {}
398+ : () -> {
393399 throw new UnsupportedOperationException (
394400 "QueryStreamInfo is available only on RabbitMQ 3.11 or more." );
395- }
396- : exchangeCommandVersionsCheckReference .get ();
401+ };
397402 this .filteringSupported = filteringSupportedReference .get ();
403+ this .superStreamManagementCommandVersionsCheck =
404+ superStreamManagementSupported .get ()
405+ ? () -> {}
406+ : () -> {
407+ throw new UnsupportedOperationException (
408+ "Super stream management is available only on RabbitMQ 3.13 or more." );
409+ };
398410 started .set (true );
399411 this .metricsCollector .openConnection ();
400412 } catch (RuntimeException e ) {
@@ -678,6 +690,7 @@ Response createSuperStream(
678690 List <String > partitions ,
679691 List <String > routingKeys ,
680692 Map <String , String > arguments ) {
693+ this .superStreamManagementCommandVersionsCheck .run ();
681694 if (partitions .isEmpty () || routingKeys .isEmpty ()) {
682695 throw new IllegalArgumentException (
683696 "Partitions and routing keys of a super stream cannot be empty" );
@@ -724,6 +737,7 @@ Response createSuperStream(
724737 }
725738
726739 Response deleteSuperStream (String superStream ) {
740+ this .superStreamManagementCommandVersionsCheck .run ();
727741 int length = 2 + 2 + 4 + 2 + superStream .length ();
728742 int correlationId = correlationSequence .incrementAndGet ();
729743 try {
@@ -1594,7 +1608,7 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
15941608 }
15951609
15961610 StreamStatsResponse streamStats (String stream ) {
1597- this .exchangeCommandVersionsCheck .run ();
1611+ this .streamStatsCommandVersionsCheck .run ();
15981612 if (stream == null ) {
15991613 throw new IllegalArgumentException ("stream must not be null" );
16001614 }
0 commit comments