@@ -173,15 +173,9 @@ public void initialize() {
173173 }
174174 }
175175
176- public CompletableFuture <Optional <LookupResult >> getBrokerServiceUrlAsync (TopicName topic ,
177- boolean authoritative ) {
178- return getBrokerServiceUrlAsync (topic , authoritative , null );
179- }
180-
181- public CompletableFuture <Optional <LookupResult >> getBrokerServiceUrlAsync (TopicName topic , boolean authoritative ,
182- final String advertisedListenerName ) {
176+ public CompletableFuture <Optional <LookupResult >> getBrokerServiceUrlAsync (TopicName topic , LookupOptions options ) {
183177 return getBundleAsync (topic )
184- .thenCompose (bundle -> findBrokerServiceUrl (bundle , authoritative , false /* read-only */ , advertisedListenerName ));
178+ .thenCompose (bundle -> findBrokerServiceUrl (bundle , options ));
185179 }
186180
187181 public CompletableFuture <NamespaceBundle > getBundleAsync (TopicName topic ) {
@@ -211,38 +205,36 @@ private NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
211205 *
212206 * If the service unit is not owned, return an empty optional
213207 */
214- public Optional <URL > getWebServiceUrl (ServiceUnitId suName , boolean authoritative , boolean isRequestHttps ,
215- boolean readOnly ) throws Exception {
208+ public Optional <URL > getWebServiceUrl (ServiceUnitId suName , LookupOptions options ) throws Exception {
216209 if (suName instanceof TopicName ) {
217210 TopicName name = (TopicName ) suName ;
218211 if (LOG .isDebugEnabled ()) {
219- LOG .debug ("Getting web service URL of topic: {} - auth : {}" , name , authoritative );
212+ LOG .debug ("Getting web service URL of topic: {} - options : {}" , name , options );
220213 }
221- return this .internalGetWebServiceUrl (getBundle (name ), authoritative , isRequestHttps , readOnly )
214+ return this .internalGetWebServiceUrl (getBundle (name ), options )
222215 .get (pulsar .getConfiguration ().getZooKeeperOperationTimeoutSeconds (), SECONDS );
223216 }
224217
225218 if (suName instanceof NamespaceName ) {
226- return this .internalGetWebServiceUrl (getFullBundle ((NamespaceName ) suName ), authoritative , isRequestHttps ,
227- readOnly ) .get (pulsar .getConfiguration ().getZooKeeperOperationTimeoutSeconds (), SECONDS );
219+ return this .internalGetWebServiceUrl (getFullBundle ((NamespaceName ) suName ), options )
220+ .get (pulsar .getConfiguration ().getZooKeeperOperationTimeoutSeconds (), SECONDS );
228221 }
229222
230223 if (suName instanceof NamespaceBundle ) {
231- return this .internalGetWebServiceUrl ((NamespaceBundle ) suName , authoritative , isRequestHttps , readOnly )
224+ return this .internalGetWebServiceUrl ((NamespaceBundle ) suName , options )
232225 .get (pulsar .getConfiguration ().getZooKeeperOperationTimeoutSeconds (), SECONDS );
233226 }
234227
235228 throw new IllegalArgumentException ("Unrecognized class of NamespaceBundle: " + suName .getClass ().getName ());
236229 }
237230
238- private CompletableFuture <Optional <URL >> internalGetWebServiceUrl (NamespaceBundle bundle , boolean authoritative ,
239- boolean isRequestHttps , boolean readOnly ) {
231+ private CompletableFuture <Optional <URL >> internalGetWebServiceUrl (NamespaceBundle bundle , LookupOptions options ) {
240232
241- return findBrokerServiceUrl (bundle , authoritative , readOnly ).thenApply (lookupResult -> {
233+ return findBrokerServiceUrl (bundle , options ).thenApply (lookupResult -> {
242234 if (lookupResult .isPresent ()) {
243235 try {
244236 LookupData lookupData = lookupResult .get ().getLookupData ();
245- final String redirectUrl = isRequestHttps ? lookupData .getHttpUrlTls () : lookupData .getHttpUrl ();
237+ final String redirectUrl = options . isRequestHttps () ? lookupData .getHttpUrlTls () : lookupData .getHttpUrl ();
246238 return Optional .of (new URL (redirectUrl ));
247239 } catch (Exception e ) {
248240 // just log the exception, nothing else to do
@@ -329,19 +321,6 @@ public boolean registerNamespace(String namespace, boolean ensureOwned) throws P
329321 private final ConcurrentOpenHashMap <NamespaceBundle , CompletableFuture <Optional <LookupResult >>> findingBundlesNotAuthoritative
330322 = new ConcurrentOpenHashMap <>();
331323
332- /**
333- * Main internal method to lookup and setup ownership of service unit to a broker.
334- *
335- * @param bundle
336- * @param authoritative
337- * @param readOnly
338- * @return
339- */
340- private CompletableFuture <Optional <LookupResult >> findBrokerServiceUrl (NamespaceBundle bundle , boolean authoritative ,
341- boolean readOnly ) {
342- return findBrokerServiceUrl (bundle , authoritative , readOnly , null );
343- }
344-
345324 /**
346325 * Main internal method to lookup and setup ownership of service unit to a broker
347326 *
@@ -352,14 +331,13 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(Namespace
352331 * @return
353332 * @throws PulsarServerException
354333 */
355- private CompletableFuture <Optional <LookupResult >> findBrokerServiceUrl (NamespaceBundle bundle , boolean authoritative ,
356- boolean readOnly , final String advertisedListenerName ) {
334+ private CompletableFuture <Optional <LookupResult >> findBrokerServiceUrl (NamespaceBundle bundle , LookupOptions options ) {
357335 if (LOG .isDebugEnabled ()) {
358- LOG .debug ("findBrokerServiceUrl: {} - read-only : {}" , bundle , readOnly );
336+ LOG .debug ("findBrokerServiceUrl: {} - options : {}" , bundle , options );
359337 }
360338
361339 ConcurrentOpenHashMap <NamespaceBundle , CompletableFuture <Optional <LookupResult >>> targetMap ;
362- if (authoritative ) {
340+ if (options . isAuthoritative () ) {
363341 targetMap = findingBundlesAuthoritative ;
364342 } else {
365343 targetMap = findingBundlesNotAuthoritative ;
@@ -373,13 +351,13 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(Namespace
373351 if (!nsData .isPresent ()) {
374352 // No one owns this bundle
375353
376- if (readOnly ) {
354+ if (options . isReadOnly () ) {
377355 // Do not attempt to acquire ownership
378356 future .complete (Optional .empty ());
379357 } else {
380358 // Now, no one owns the namespace yet. Hence, we will try to dynamically assign it
381359 pulsar .getExecutor ().execute (() -> {
382- searchForCandidateBroker (bundle , future , authoritative , advertisedListenerName );
360+ searchForCandidateBroker (bundle , future , options );
383361 });
384362 }
385363 } else if (nsData .get ().isDisabled ()) {
@@ -390,11 +368,11 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(Namespace
390368 LOG .debug ("Namespace bundle {} already owned by {} " , bundle , nsData );
391369 }
392370 // find the target
393- if (StringUtils . isNotBlank ( advertisedListenerName )) {
394- AdvertisedListener listener = nsData .get ().getAdvertisedListeners ().get (advertisedListenerName );
371+ if (options . hasAdvertisedListenerName ( )) {
372+ AdvertisedListener listener = nsData .get ().getAdvertisedListeners ().get (options . getAdvertisedListenerName () );
395373 if (listener == null ) {
396374 future .completeExceptionally (
397- new PulsarServerException ("the broker do not have " + advertisedListenerName + " listener" ));
375+ new PulsarServerException ("the broker do not have " + options . getAdvertisedListenerName () + " listener" ));
398376 } else {
399377 URI urlTls = listener .getBrokerServiceUrlTls ();
400378 future .complete (Optional .of (new LookupResult (nsData .get (),
@@ -420,13 +398,8 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(Namespace
420398 }
421399
422400 private void searchForCandidateBroker (NamespaceBundle bundle ,
423- CompletableFuture <Optional <LookupResult >> lookupFuture , boolean authoritative ) {
424- searchForCandidateBroker (bundle , lookupFuture , authoritative , null );
425- }
426-
427- private void searchForCandidateBroker (NamespaceBundle bundle ,
428- CompletableFuture <Optional <LookupResult >> lookupFuture , boolean authoritative ,
429- final String advertisedListenerName ) {
401+ CompletableFuture <Optional <LookupResult >> lookupFuture ,
402+ LookupOptions options ) {
430403 String candidateBroker = null ;
431404 boolean authoritativeRedirect = pulsar .getLeaderElectionService ().isLeader ();
432405
@@ -442,7 +415,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
442415 }
443416
444417 if (candidateBroker == null ) {
445- if (authoritative ) {
418+ if (options . isAuthoritative () ) {
446419 // leader broker already assigned the current broker as owner
447420 candidateBroker = pulsar .getSafeWebServiceAddress ();
448421 } else if (!this .loadManager .get ().isCentralized ()
@@ -488,14 +461,16 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
488461 } else {
489462 // Found owner for the namespace bundle
490463
491- // Schedule the task to pre-load topics
492- pulsar .loadNamespaceTopics (bundle );
464+ if (options .isLoadTopicsInBundle ()) {
465+ // Schedule the task to pre-load topics
466+ pulsar .loadNamespaceTopics (bundle );
467+ }
493468 // find the target
494- if (StringUtils . isNotBlank ( advertisedListenerName )) {
495- AdvertisedListener listener = ownerInfo .getAdvertisedListeners ().get (advertisedListenerName );
469+ if (options . hasAdvertisedListenerName ( )) {
470+ AdvertisedListener listener = ownerInfo .getAdvertisedListeners ().get (options . getAdvertisedListenerName () );
496471 if (listener == null ) {
497472 lookupFuture .completeExceptionally (
498- new PulsarServerException ("the broker do not have " + advertisedListenerName + " listener" ));
473+ new PulsarServerException ("the broker do not have " + options . getAdvertisedListenerName () + " listener" ));
499474 return ;
500475 } else {
501476 URI urlTls = listener .getBrokerServiceUrlTls ();
0 commit comments