6969import org .apache .pulsar .common .naming .NamespaceName ;
7070import org .apache .pulsar .common .naming .TopicVersion ;
7171import org .apache .pulsar .common .policies .data .BrokerInfo ;
72+ import org .apache .pulsar .common .policies .data .BrokerOperation ;
7273import org .apache .pulsar .common .policies .data .NamespaceOwnershipStatus ;
7374import org .apache .pulsar .common .util .FutureUtil ;
7475import org .apache .pulsar .common .util .ThreadDumpUtil ;
@@ -107,7 +108,8 @@ public class BrokersBase extends AdminResource {
107108 @ ApiResponse (code = 404 , message = "Cluster does not exist: cluster={clustername}" ) })
108109 public void getActiveBrokers (@ Suspended final AsyncResponse asyncResponse ,
109110 @ PathParam ("cluster" ) String cluster ) {
110- validateSuperUserAccessAsync ()
111+ validateBothSuperuserAndBrokerOperation (cluster == null ? pulsar ().getConfiguration ().getClusterName ()
112+ : cluster , pulsar ().getBrokerId (), BrokerOperation .LIST_BROKERS )
111113 .thenCompose (__ -> validateClusterOwnershipAsync (cluster ))
112114 .thenCompose (__ -> pulsar ().getLoadManager ().get ().getAvailableBrokersAsync ())
113115 .thenAccept (activeBrokers -> {
@@ -148,7 +150,9 @@ public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse) throw
148150 @ ApiResponse (code = 403 , message = "This operation requires super-user access" ),
149151 @ ApiResponse (code = 404 , message = "Leader broker not found" ) })
150152 public void getLeaderBroker (@ Suspended final AsyncResponse asyncResponse ) {
151- validateSuperUserAccessAsync ().thenAccept (__ -> {
153+ validateBothSuperuserAndBrokerOperation (pulsar ().getConfig ().getClusterName (),
154+ pulsar ().getBrokerId (), BrokerOperation .GET_LEADER_BROKER )
155+ .thenAccept (__ -> {
152156 LeaderBroker leaderBroker = pulsar ().getLeaderElectionService ().getCurrentLeader ()
153157 .orElseThrow (() -> new RestException (Status .NOT_FOUND , "Couldn't find leader broker" ));
154158 BrokerInfo brokerInfo = BrokerInfo .builder ()
@@ -175,7 +179,8 @@ public void getLeaderBroker(@Suspended final AsyncResponse asyncResponse) {
175179 public void getOwnedNamespaces (@ Suspended final AsyncResponse asyncResponse ,
176180 @ PathParam ("clusterName" ) String cluster ,
177181 @ PathParam ("brokerId" ) String brokerId ) {
178- validateSuperUserAccessAsync ()
182+ validateBothSuperuserAndBrokerOperation (pulsar ().getConfig ().getClusterName (),
183+ pulsar ().getBrokerId (), BrokerOperation .LIST_OWNED_NAMESPACES )
179184 .thenCompose (__ -> maybeRedirectToBroker (brokerId ))
180185 .thenCompose (__ -> validateClusterOwnershipAsync (cluster ))
181186 .thenCompose (__ -> pulsar ().getNamespaceService ().getOwnedNameSpacesStatusAsync ())
@@ -204,7 +209,8 @@ public void getOwnedNamespaces(@Suspended final AsyncResponse asyncResponse,
204209 public void updateDynamicConfiguration (@ Suspended AsyncResponse asyncResponse ,
205210 @ PathParam ("configName" ) String configName ,
206211 @ PathParam ("configValue" ) String configValue ) {
207- validateSuperUserAccessAsync ()
212+ validateBothSuperuserAndBrokerOperation (pulsar ().getConfig ().getClusterName (), pulsar ().getBrokerId (),
213+ BrokerOperation .UPDATE_DYNAMIC_CONFIGURATION )
208214 .thenCompose (__ -> persistDynamicConfigurationAsync (configName , configValue ))
209215 .thenAccept (__ -> {
210216 LOG .info ("[{}] Updated Service configuration {}/{}" , clientAppId (), configName , configValue );
@@ -228,7 +234,8 @@ public void updateDynamicConfiguration(@Suspended AsyncResponse asyncResponse,
228234 public void deleteDynamicConfiguration (
229235 @ Suspended AsyncResponse asyncResponse ,
230236 @ PathParam ("configName" ) String configName ) {
231- validateSuperUserAccessAsync ()
237+ validateBothSuperuserAndBrokerOperation (pulsar ().getConfig ().getClusterName (), pulsar ().getBrokerId (),
238+ BrokerOperation .DELETE_DYNAMIC_CONFIGURATION )
232239 .thenCompose (__ -> internalDeleteDynamicConfigurationOnMetadataAsync (configName ))
233240 .thenAccept (__ -> {
234241 LOG .info ("[{}] Successfully to delete dynamic configuration {}" , clientAppId (), configName );
@@ -249,7 +256,8 @@ public void deleteDynamicConfiguration(
249256 @ ApiResponse (code = 404 , message = "Configuration not found" ),
250257 @ ApiResponse (code = 500 , message = "Internal server error" )})
251258 public void getAllDynamicConfigurations (@ Suspended AsyncResponse asyncResponse ) {
252- validateSuperUserAccessAsync ()
259+ validateBothSuperuserAndBrokerOperation (pulsar ().getConfig ().getClusterName (), pulsar ().getBrokerId (),
260+ BrokerOperation .LIST_DYNAMIC_CONFIGURATIONS )
253261 .thenCompose (__ -> dynamicConfigurationResources ().getDynamicConfigurationAsync ())
254262 .thenAccept (configOpt -> asyncResponse .resume (configOpt .orElseGet (Collections ::emptyMap )))
255263 .exceptionally (ex -> {
@@ -266,7 +274,8 @@ public void getAllDynamicConfigurations(@Suspended AsyncResponse asyncResponse)
266274 @ ApiResponses (value = {
267275 @ ApiResponse (code = 403 , message = "You don't have admin permission to get configuration" )})
268276 public void getDynamicConfigurationName (@ Suspended AsyncResponse asyncResponse ) {
269- validateSuperUserAccessAsync ()
277+ validateBothSuperuserAndBrokerOperation (pulsar ().getConfig ().getClusterName (), pulsar ().getBrokerId (),
278+ BrokerOperation .LIST_DYNAMIC_CONFIGURATIONS )
270279 .thenAccept (__ -> asyncResponse .resume (pulsar ().getBrokerService ().getDynamicConfiguration ()))
271280 .exceptionally (ex -> {
272281 LOG .error ("[{}] Failed to get all dynamic configuration names." , clientAppId (), ex );
@@ -281,7 +290,8 @@ public void getDynamicConfigurationName(@Suspended AsyncResponse asyncResponse)
281290 response = String .class , responseContainer = "Map" )
282291 @ ApiResponses (value = { @ ApiResponse (code = 403 , message = "Don't have admin permission" ) })
283292 public void getRuntimeConfiguration (@ Suspended AsyncResponse asyncResponse ) {
284- validateSuperUserAccessAsync ()
293+ validateBothSuperuserAndBrokerOperation (pulsar ().getConfig ().getClusterName (), pulsar ().getBrokerId (),
294+ BrokerOperation .LIST_RUNTIME_CONFIGURATIONS )
285295 .thenAccept (__ -> asyncResponse .resume (pulsar ().getBrokerService ().getRuntimeConfiguration ()))
286296 .exceptionally (ex -> {
287297 LOG .error ("[{}] Failed to get runtime configuration." , clientAppId (), ex );
@@ -322,7 +332,8 @@ private synchronized CompletableFuture<Void> persistDynamicConfigurationAsync(
322332 @ ApiOperation (value = "Get the internal configuration data" , response = InternalConfigurationData .class )
323333 @ ApiResponses (value = { @ ApiResponse (code = 403 , message = "Don't have admin permission" ) })
324334 public void getInternalConfigurationData (@ Suspended AsyncResponse asyncResponse ) {
325- validateSuperUserAccessAsync ()
335+ validateBothSuperuserAndBrokerOperation (pulsar ().getConfig ().getClusterName (), pulsar ().getBrokerId (),
336+ BrokerOperation .GET_INTERNAL_CONFIGURATION_DATA )
326337 .thenAccept (__ -> asyncResponse .resume (pulsar ().getInternalConfigurationData ()))
327338 .exceptionally (ex -> {
328339 LOG .error ("[{}] Failed to get internal configuration data." , clientAppId (), ex );
@@ -339,7 +350,8 @@ public void getInternalConfigurationData(@Suspended AsyncResponse asyncResponse)
339350 @ ApiResponse (code = 403 , message = "Don't have admin permission" ),
340351 @ ApiResponse (code = 500 , message = "Internal server error" )})
341352 public void backlogQuotaCheck (@ Suspended AsyncResponse asyncResponse ) {
342- validateSuperUserAccessAsync ()
353+ validateBothSuperuserAndBrokerOperation (pulsar ().getConfig ().getClusterName (), pulsar ().getBrokerId (),
354+ BrokerOperation .CHECK_BACKLOG_QUOTA )
343355 .thenAcceptAsync (__ -> {
344356 pulsar ().getBrokerService ().monitorBacklogQuota ();
345357 asyncResponse .resume (Response .noContent ().build ());
@@ -378,7 +390,8 @@ public void healthCheck(@Suspended AsyncResponse asyncResponse,
378390 @ ApiParam (value = "Topic Version" )
379391 @ QueryParam ("topicVersion" ) TopicVersion topicVersion ,
380392 @ QueryParam ("brokerId" ) String brokerId ) {
381- validateSuperUserAccessAsync ()
393+ validateBothSuperuserAndBrokerOperation (pulsar ().getConfig ().getClusterName (), StringUtils .isBlank (brokerId )
394+ ? pulsar ().getBrokerId () : brokerId , BrokerOperation .HEALTH_CHECK )
382395 .thenAccept (__ -> checkDeadlockedThreads ())
383396 .thenCompose (__ -> maybeRedirectToBroker (
384397 StringUtils .isBlank (brokerId ) ? pulsar ().getBrokerId () : brokerId ))
@@ -596,8 +609,9 @@ public void shutDownBrokerGracefully(
596609 @ QueryParam ("forcedTerminateTopic" ) @ DefaultValue ("true" ) boolean forcedTerminateTopic ,
597610 @ Suspended final AsyncResponse asyncResponse
598611 ) {
599- validateSuperUserAccess ();
600- doShutDownBrokerGracefullyAsync (maxConcurrentUnloadPerSec , forcedTerminateTopic )
612+ validateBothSuperuserAndBrokerOperation (pulsar ().getConfig ().getClusterName (), pulsar ().getBrokerId (),
613+ BrokerOperation .SHUTDOWN )
614+ .thenCompose (__ -> doShutDownBrokerGracefullyAsync (maxConcurrentUnloadPerSec , forcedTerminateTopic ))
601615 .thenAccept (__ -> {
602616 LOG .info ("[{}] Successfully shutdown broker gracefully" , clientAppId ());
603617 asyncResponse .resume (Response .noContent ().build ());
@@ -614,5 +628,65 @@ private CompletableFuture<Void> doShutDownBrokerGracefullyAsync(int maxConcurren
614628 pulsar ().getBrokerService ().unloadNamespaceBundlesGracefully (maxConcurrentUnloadPerSec , forcedTerminateTopic );
615629 return pulsar ().closeAsync ();
616630 }
631+
632+
633+ private CompletableFuture <Void > validateBothSuperuserAndBrokerOperation (String cluster , String brokerId ,
634+ BrokerOperation operation ) {
635+ final var superUserAccessValidation = validateSuperUserAccessAsync ();
636+ final var brokerOperationValidation = validateBrokerOperationAsync (cluster , brokerId , operation );
637+ return FutureUtil .waitForAll (List .of (superUserAccessValidation , brokerOperationValidation ))
638+ .handle ((result , err ) -> {
639+ if (!superUserAccessValidation .isCompletedExceptionally ()
640+ || !brokerOperationValidation .isCompletedExceptionally ()) {
641+ return null ;
642+ }
643+ if (LOG .isDebugEnabled ()) {
644+ Throwable superUserValidationException = null ;
645+ try {
646+ superUserAccessValidation .join ();
647+ } catch (Throwable ex ) {
648+ superUserValidationException = FutureUtil .unwrapCompletionException (ex );
649+ }
650+ Throwable brokerOperationValidationException = null ;
651+ try {
652+ brokerOperationValidation .join ();
653+ } catch (Throwable ex ) {
654+ brokerOperationValidationException = FutureUtil .unwrapCompletionException (ex );
655+ }
656+ LOG .debug ("validateBothSuperuserAndBrokerOperation failed."
657+ + " originalPrincipal={} clientAppId={} operation={} broker={} "
658+ + "superuserValidationError={} brokerOperationValidationError={}" ,
659+ originalPrincipal (), clientAppId (), operation .toString (), brokerId ,
660+ superUserValidationException , brokerOperationValidationException );
661+ }
662+ throw new RestException (Status .UNAUTHORIZED ,
663+ String .format ("Unauthorized to validateBothSuperuserAndBrokerOperation for"
664+ + " originalPrincipal [%s] and clientAppId [%s] "
665+ + "about operation [%s] on broker [%s]" ,
666+ originalPrincipal (), clientAppId (), operation .toString (), brokerId ));
667+ });
668+ }
669+
670+
671+ private CompletableFuture <Void > validateBrokerOperationAsync (String cluster , String brokerId ,
672+ BrokerOperation operation ) {
673+ final var pulsar = pulsar ();
674+ if (pulsar .getBrokerService ().isAuthenticationEnabled ()
675+ && pulsar .getBrokerService ().isAuthorizationEnabled ()) {
676+ return pulsar .getBrokerService ().getAuthorizationService ()
677+ .allowBrokerOperationAsync (cluster , brokerId , operation , originalPrincipal (),
678+ clientAppId (), clientAuthData ())
679+ .thenAccept (isAuthorized -> {
680+ if (!isAuthorized ) {
681+ throw new RestException (Status .UNAUTHORIZED ,
682+ String .format ("Unauthorized to validateBrokerOperation for"
683+ + " originalPrincipal [%s] and clientAppId [%s] "
684+ + "about operation [%s] on broker [%s]" ,
685+ originalPrincipal (), clientAppId (), operation .toString (), brokerId ));
686+ }
687+ });
688+ }
689+ return CompletableFuture .completedFuture (null );
690+ }
617691}
618692
0 commit comments