1616
1717package org .springframework .boot .autoconfigure .pulsar ;
1818
19- import java .net .InetSocketAddress ;
20- import java .net .URI ;
2119import java .time .Duration ;
2220import java .util .List ;
2321import java .util .concurrent .TimeUnit ;
2422
2523import org .apache .pulsar .client .api .ClientBuilder ;
26- import org .apache .pulsar .client .api .ProxyProtocol ;
2724import org .apache .pulsar .client .api .PulsarClientException ;
28- import org .apache .pulsar .client .api .SizeUnit ;
2925
3026import org .springframework .boot .autoconfigure .pulsar .PulsarProperties .Client ;
3127import org .springframework .boot .context .properties .PropertyMapper ;
32- import org .springframework .boot .context .properties .source .MutuallyExclusiveConfigurationPropertiesException ;
3328import org .springframework .boot .util .LambdaSafe ;
3429import org .springframework .pulsar .core .PulsarClientBuilderCustomizer ;
3530import org .springframework .util .Assert ;
3631import org .springframework .util .StringUtils ;
37- import org .springframework .util .unit .DataSize ;
3832
3933/**
4034 * Configure Pulsar {@link ClientBuilder} with sensible defaults and apply a list of
@@ -76,16 +70,15 @@ protected void applyProperties(PulsarProperties pulsarProperties, ClientBuilder
7670 PropertyMapper map = PropertyMapper .get ().alwaysApplyingWhenNonNull ();
7771 Client clientProperties = pulsarProperties .getClient ();
7872 map .from (clientProperties ::getServiceUrl ).to (clientBuilder ::serviceUrl );
79- map .from (clientProperties ::getListenerName ).to (clientBuilder ::listenerName );
80- map .from (clientProperties ::getNumIoThreads ).to (clientBuilder ::ioThreads );
81- map .from (clientProperties ::getNumListenerThreads ).to (clientBuilder ::listenerThreads );
82- map .from (clientProperties ::getNumConnectionsPerBroker ).to (clientBuilder ::connectionsPerBroker );
83- map .from (clientProperties ::getMaxConcurrentLookupRequest ).to (clientBuilder ::maxConcurrentLookupRequests );
84- map .from (clientProperties ::getMaxLookupRequest ).to (clientBuilder ::maxLookupRequests );
85- map .from (clientProperties ::getMaxLookupRedirects ).to (clientBuilder ::maxLookupRedirects );
86- map .from (clientProperties ::getMaxNumberOfRejectedRequestPerConnection )
87- .to (clientBuilder ::maxNumberOfRejectedRequestPerConnection );
88- map .from (clientProperties ::getUseTcpNoDelay ).to (clientBuilder ::enableTcpNoDelay );
73+ map .from (clientProperties ::getConnectionTimeout )
74+ .asInt (Duration ::toMillis )
75+ .to (clientBuilder , (cb , val ) -> cb .connectionTimeout (val , TimeUnit .MILLISECONDS ));
76+ map .from (clientProperties ::getOperationTimeout )
77+ .asInt (Duration ::toMillis )
78+ .to (clientBuilder , (cb , val ) -> cb .operationTimeout (val , TimeUnit .MILLISECONDS ));
79+ map .from (clientProperties ::getLookupTimeout )
80+ .asInt (Duration ::toMillis )
81+ .to (clientBuilder , (cb , val ) -> cb .lookupTimeout (val , TimeUnit .MILLISECONDS ));
8982
9083 // Authentication properties
9184 applyAuthentication (clientProperties , clientBuilder );
@@ -104,52 +97,12 @@ protected void applyProperties(PulsarProperties pulsarProperties, ClientBuilder
10497 map .from (clientProperties ::getTlsTrustStorePassword ).to (clientBuilder ::tlsTrustStorePassword );
10598 map .from (clientProperties ::getTlsCiphers ).to (clientBuilder ::tlsCiphers );
10699 map .from (clientProperties ::getTlsProtocols ).to (clientBuilder ::tlsProtocols );
107-
108- map .from (clientProperties ::getStatsInterval )
109- .as (Duration ::toSeconds )
110- .to (clientBuilder , (cb , val ) -> cb .statsInterval (val , TimeUnit .SECONDS ));
111- map .from (clientProperties ::getKeepAliveInterval )
112- .asInt (Duration ::toMillis )
113- .to (clientBuilder , (cb , val ) -> cb .keepAliveInterval (val , TimeUnit .MILLISECONDS ));
114- map .from (clientProperties ::getConnectionTimeout )
115- .asInt (Duration ::toMillis )
116- .to (clientBuilder , (cb , val ) -> cb .connectionTimeout (val , TimeUnit .MILLISECONDS ));
117- map .from (clientProperties ::getOperationTimeout )
118- .asInt (Duration ::toMillis )
119- .to (clientBuilder , (cb , val ) -> cb .operationTimeout (val , TimeUnit .MILLISECONDS ));
120- map .from (clientProperties ::getLookupTimeout )
121- .asInt (Duration ::toMillis )
122- .to (clientBuilder , (cb , val ) -> cb .lookupTimeout (val , TimeUnit .MILLISECONDS ));
123- map .from (clientProperties ::getInitialBackoffInterval )
124- .as (Duration ::toMillis )
125- .to (clientBuilder , (cb , val ) -> cb .startingBackoffInterval (val , TimeUnit .MILLISECONDS ));
126- map .from (clientProperties ::getMaxBackoffInterval )
127- .as (Duration ::toMillis )
128- .to (clientBuilder , (cb , val ) -> cb .maxBackoffInterval (val , TimeUnit .MILLISECONDS ));
129- map .from (clientProperties ::getEnableBusyWait ).to (clientBuilder ::enableBusyWait );
130- map .from (clientProperties ::getMemoryLimit )
131- .as (DataSize ::toBytes )
132- .to (clientBuilder , (cb , val ) -> cb .memoryLimit (val , SizeUnit .BYTES ));
133- map .from (clientProperties ::getEnableTransaction ).to (clientBuilder ::enableTransaction );
134- map .from (clientProperties ::getProxyServiceUrl )
135- .to ((proxyUrl ) -> clientBuilder .proxyServiceUrl (proxyUrl , ProxyProtocol .SNI ));
136- map .from (clientProperties ::getDnsLookupBindAddress )
137- .to ((bindAddr ) -> clientBuilder .dnsLookupBind (bindAddr , clientProperties .getDnsLookupBindPort ()));
138- map .from (clientProperties ::getSocks5ProxyAddress )
139- .as (this ::parseSocketAddress )
140- .to (clientBuilder ::socks5ProxyAddress );
141- map .from (clientProperties ::getSocks5ProxyUsername ).to (clientBuilder ::socks5ProxyUsername );
142- map .from (clientProperties ::getSocks5ProxyPassword ).to (clientBuilder ::socks5ProxyPassword );
143100 }
144101
145102 private void applyAuthentication (Client clientProperties , ClientBuilder clientBuilder ) {
146- MutuallyExclusiveConfigurationPropertiesException .throwIfMultipleNonNullValuesIn ((entries ) -> {
147- entries .put ("spring.pulsar.client.auth-params" , clientProperties .getAuthParams ());
148- entries .put ("spring.pulsar.client.authentication.*" , clientProperties .getAuthentication ());
149- });
150103 String authPluginClass = clientProperties .getAuthPluginClassName ();
151104 if (StringUtils .hasText (authPluginClass )) {
152- String authParams = clientProperties . getAuthParams () ;
105+ String authParams = null ;
153106 if (clientProperties .getAuthentication () != null ) {
154107 authParams = AuthParameterUtils .maybeConvertToEncodedParamString (clientProperties .getAuthentication ());
155108 }
@@ -162,16 +115,6 @@ private void applyAuthentication(Client clientProperties, ClientBuilder clientBu
162115 }
163116 }
164117
165- private InetSocketAddress parseSocketAddress (String address ) {
166- try {
167- URI uri = URI .create (address );
168- return new InetSocketAddress (uri .getHost (), uri .getPort ());
169- }
170- catch (Exception ex ) {
171- throw new IllegalArgumentException ("Invalid socket address" , ex );
172- }
173- }
174-
175118 protected void applyCustomizers (List <PulsarClientBuilderCustomizer > clientBuilderCustomizers ,
176119 ClientBuilder clientBuilder ) {
177120 LambdaSafe .callbacks (PulsarClientBuilderCustomizer .class , clientBuilderCustomizers , clientBuilder )
0 commit comments