Skip to content

Commit fa8bfb2

Browse files
authored
Simplify kafka client bootstrap server names and ports config (#710)
1 parent 69b1670 commit fa8bfb2

File tree

16 files changed

+265
-18
lines changed

16 files changed

+265
-18
lines changed

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/config/KafkaOptionsConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,18 @@ public final class KafkaOptionsConfig extends OptionsConfig
2323
{
2424
public final List<String> bootstrap;
2525
public final List<KafkaTopicConfig> topics;
26+
public final List<KafkaServerConfig> servers;
2627
public final KafkaSaslConfig sasl;
2728

2829
public KafkaOptionsConfig(
2930
List<String> bootstrap,
3031
List<KafkaTopicConfig> topics,
32+
List<KafkaServerConfig> servers,
3133
KafkaSaslConfig sasl)
3234
{
3335
this.bootstrap = bootstrap;
3436
this.topics = topics;
37+
this.servers = servers;
3538
this.sasl = sasl;
3639
}
3740
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2021-2023 Aklivity Inc.
3+
*
4+
* Aklivity licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.aklivity.zilla.runtime.binding.kafka.config;
17+
18+
public class KafkaServerConfig
19+
{
20+
public final String host;
21+
public final int port;
22+
23+
public KafkaServerConfig(
24+
String host,
25+
int port)
26+
{
27+
this.host = host;
28+
this.port = port;
29+
}
30+
31+
@Override
32+
public String toString()
33+
{
34+
return String.format("%s:%d", host, port);
35+
}
36+
}

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/config/KafkaBindingConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaOptionsConfig;
2727
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaSaslConfig;
28+
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig;
2829
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicConfig;
2930
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaType;
3031
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType;
@@ -97,6 +98,11 @@ public KafkaSaslConfig sasl()
9798
return options != null ? options.sasl : null;
9899
}
99100

101+
public List<KafkaServerConfig> servers()
102+
{
103+
return options != null ? options.servers : null;
104+
}
105+
100106
public KafkaDeltaType supplyDeltaType(
101107
String topic,
102108
KafkaDeltaType deltaType)

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/config/KafkaOptionsConfigAdapter.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import java.util.ArrayList;
1919
import java.util.List;
20+
import java.util.regex.Matcher;
21+
import java.util.regex.Pattern;
2022

2123
import jakarta.json.Json;
2224
import jakarta.json.JsonArray;
@@ -28,14 +30,17 @@
2830

2931
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaOptionsConfig;
3032
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaSaslConfig;
33+
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig;
3134
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicConfig;
3235
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding;
3336
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;
3437
import io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi;
3538

3639
public final class KafkaOptionsConfigAdapter implements OptionsConfigAdapterSpi, JsonbAdapter<OptionsConfig, JsonObject>
3740
{
41+
private static final Pattern SERVER_PATTERN = Pattern.compile("([^\\:]+):(\\d+)");
3842
private static final String BOOTSTRAP_NAME = "bootstrap";
43+
private static final String SERVERS_NAME = "servers";
3944
private static final String TOPICS_NAME = "topics";
4045
private static final String SASL_NAME = "sasl";
4146
private static final String SASL_MECHANISM_NAME = "mechanism";
@@ -82,6 +87,15 @@ public JsonObject adaptToJson(
8287
object.add(TOPICS_NAME, entries);
8388
}
8489

90+
if (kafkaOptions.servers != null &&
91+
!kafkaOptions.servers.isEmpty())
92+
{
93+
JsonArrayBuilder entries = Json.createArrayBuilder();
94+
kafkaOptions.servers.forEach(s -> entries.add(String.format("%s:%d", s.host, s.port)));
95+
96+
object.add(SERVERS_NAME, entries);
97+
}
98+
8599
if (kafkaOptions.sasl != null)
86100
{
87101
JsonObjectBuilder sasl = Json.createObjectBuilder();
@@ -109,6 +123,10 @@ public OptionsConfig adaptFromJson(
109123
? object.getJsonArray(TOPICS_NAME)
110124
: null;
111125

126+
JsonArray serversArray = object.containsKey(SERVERS_NAME)
127+
? object.getJsonArray(SERVERS_NAME)
128+
: null;
129+
112130
JsonObject saslObject = object.containsKey(SASL_NAME)
113131
? object.getJsonObject(SASL_NAME)
114132
: null;
@@ -131,6 +149,26 @@ public OptionsConfig adaptFromJson(
131149
topics = topics0;
132150
}
133151

152+
List<KafkaServerConfig> servers = null;
153+
154+
if (serversArray != null)
155+
{
156+
List<KafkaServerConfig> servers0 = new ArrayList<>();
157+
serversArray.forEach(v ->
158+
{
159+
final String server = JsonString.class.cast(v).getString();
160+
final Matcher matcher = SERVER_PATTERN.matcher(server);
161+
if (matcher.matches())
162+
{
163+
final String host = matcher.group(1);
164+
final int port = Integer.parseInt(matcher.group(2));
165+
166+
servers0.add(new KafkaServerConfig(host, port));
167+
}
168+
});
169+
servers = servers0;
170+
}
171+
134172
KafkaSaslConfig sasl = null;
135173

136174
if (saslObject != null)
@@ -142,6 +180,6 @@ public OptionsConfig adaptFromJson(
142180
sasl = new KafkaSaslConfig(mechanism, username, password);
143181
}
144182

145-
return new KafkaOptionsConfig(bootstrap, topics, sasl);
183+
return new KafkaOptionsConfig(bootstrap, topics, servers, sasl);
146184
}
147185
}

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.aklivity.zilla.runtime.binding.kafka.internal.stream;
1717

18+
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.ProxyAddressProtocol.STREAM;
1819
import static io.aklivity.zilla.runtime.engine.budget.BudgetCreditor.NO_BUDGET_ID;
1920
import static io.aklivity.zilla.runtime.engine.budget.BudgetDebitor.NO_DEBITOR_INDEX;
2021
import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT;
@@ -23,6 +24,7 @@
2324
import static java.util.Objects.requireNonNull;
2425

2526
import java.nio.ByteOrder;
27+
import java.security.SecureRandom;
2628
import java.util.ArrayList;
2729
import java.util.LinkedHashMap;
2830
import java.util.List;
@@ -39,6 +41,7 @@
3941
import org.agrona.concurrent.UnsafeBuffer;
4042

4143
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaSaslConfig;
44+
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig;
4245
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding;
4346
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration;
4447
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig;
@@ -62,6 +65,7 @@
6265
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaDataExFW;
6366
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaDescribeBeginExFW;
6467
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaResetExFW;
68+
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.ProxyBeginExFW;
6569
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.ResetFW;
6670
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.SignalFW;
6771
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.WindowFW;
@@ -105,6 +109,7 @@ public final class KafkaClientDescribeFactory extends KafkaClientSaslHandshaker
105109
private final KafkaBeginExFW.Builder kafkaBeginExRW = new KafkaBeginExFW.Builder();
106110
private final KafkaDataExFW.Builder kafkaDataExRW = new KafkaDataExFW.Builder();
107111
private final KafkaResetExFW.Builder kafkaResetExRW = new KafkaResetExFW.Builder();
112+
private final ProxyBeginExFW.Builder proxyBeginExRW = new ProxyBeginExFW.Builder();
108113

109114
private final RequestHeaderFW.Builder requestHeaderRW = new RequestHeaderFW.Builder();
110115
private final DescribeConfigsRequestFW.Builder describeConfigsRequestRW = new DescribeConfigsRequestFW.Builder();
@@ -129,8 +134,11 @@ public final class KafkaClientDescribeFactory extends KafkaClientSaslHandshaker
129134
private final KafkaDescribeClientDecoder decodeIgnoreAll = this::decodeIgnoreAll;
130135
private final KafkaDescribeClientDecoder decodeReject = this::decodeReject;
131136

137+
private final SecureRandom randomServerIdGenerator = new SecureRandom();
138+
132139
private final long maxAgeMillis;
133140
private final int kafkaTypeId;
141+
private final int proxyTypeId;
134142
private final MutableDirectBuffer writeBuffer;
135143
private final MutableDirectBuffer extBuffer;
136144
private final BufferPool decodePool;
@@ -153,6 +161,7 @@ public KafkaClientDescribeFactory(
153161
super(config, context);
154162
this.maxAgeMillis = Math.min(config.clientDescribeMaxAgeMillis(), config.clientMaxIdleMillis() >> 1);
155163
this.kafkaTypeId = context.supplyTypeId(KafkaBinding.NAME);
164+
this.proxyTypeId = context.supplyTypeId("proxy");
156165
this.signaler = signaler;
157166
this.streamFactory = streamFactory;
158167
this.resolveSasl = resolveSasl;
@@ -210,6 +219,7 @@ public MessageConsumer newStream(
210219
resolvedId,
211220
topicName,
212221
configs,
222+
binding.servers(),
213223
sasl)::onApplication;
214224
}
215225

@@ -627,6 +637,7 @@ private final class KafkaDescribeStream
627637
long resolvedId,
628638
String topic,
629639
List<String> configs,
640+
List<KafkaServerConfig> servers,
630641
KafkaSaslConfig sasl)
631642
{
632643
this.application = application;
@@ -635,7 +646,7 @@ private final class KafkaDescribeStream
635646
this.initialId = initialId;
636647
this.replyId = supplyReplyId.applyAsLong(initialId);
637648
this.affinity = affinity;
638-
this.client = new KafkaDescribeClient(routedId, resolvedId, topic, configs, sasl);
649+
this.client = new KafkaDescribeClient(routedId, resolvedId, topic, configs, servers, sasl);
639650
}
640651

641652
private void onApplication(
@@ -898,6 +909,7 @@ private final class KafkaDescribeClient extends KafkaSaslClient
898909
private MessageConsumer network;
899910
private final String topic;
900911
private final Map<String, String> configs;
912+
private final List<KafkaServerConfig> servers;
901913

902914
private int state;
903915
private long authorization;
@@ -933,11 +945,13 @@ private final class KafkaDescribeClient extends KafkaSaslClient
933945
long routedId,
934946
String topic,
935947
List<String> configs,
948+
List<KafkaServerConfig> servers,
936949
KafkaSaslConfig sasl)
937950
{
938951
super(sasl, originId, routedId);
939952
this.topic = requireNonNull(topic);
940953
this.configs = new LinkedHashMap<>(configs.size());
954+
this.servers = servers;
941955
configs.forEach(c -> this.configs.put(c, null));
942956

943957
this.encoder = sasl != null ? encodeSaslHandshakeRequest : encodeDescribeRequest;
@@ -1180,8 +1194,27 @@ private void doNetworkBegin(
11801194
{
11811195
state = KafkaState.openingInitial(state);
11821196

1197+
Consumer<OctetsFW.Builder> extension = EMPTY_EXTENSION;
1198+
1199+
final KafkaServerConfig kafkaServerConfig =
1200+
servers != null ? servers.get(randomServerIdGenerator.nextInt(servers.size())) : null;
1201+
1202+
if (kafkaServerConfig != null)
1203+
{
1204+
extension = e -> e.set((b, o, l) -> proxyBeginExRW.wrap(b, o, l)
1205+
.typeId(proxyTypeId)
1206+
.address(a -> a.inet(i -> i.protocol(p -> p.set(STREAM))
1207+
.source("0.0.0.0")
1208+
.destination(kafkaServerConfig.host)
1209+
.sourcePort(0)
1210+
.destinationPort(kafkaServerConfig.port)))
1211+
.infos(i -> i.item(ii -> ii.authority(kafkaServerConfig.host)))
1212+
.build()
1213+
.sizeof());
1214+
}
1215+
11831216
network = newStream(this::onNetwork, originId, routedId, initialId, initialSeq, initialAck, initialMax,
1184-
traceId, authorization, affinity, EMPTY_EXTENSION);
1217+
traceId, authorization, affinity, extension);
11851218
}
11861219

11871220
@Override

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientGroupFactory.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static java.nio.charset.StandardCharsets.UTF_8;
2626

2727
import java.nio.ByteOrder;
28+
import java.security.SecureRandom;
2829
import java.time.Duration;
2930
import java.util.ArrayDeque;
3031
import java.util.ArrayList;
@@ -45,6 +46,7 @@
4546
import org.agrona.concurrent.UnsafeBuffer;
4647

4748
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaSaslConfig;
49+
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig;
4850
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding;
4951
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration;
5052
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig;
@@ -269,6 +271,8 @@ public final class KafkaClientGroupFactory extends KafkaClientSaslHandshaker imp
269271
private final KafkaGroupCoordinatorClientDecoder decodeCoordinatorIgnoreAll = this::decodeIgnoreAll;
270272
private final KafkaGroupCoordinatorClientDecoder decodeCoordinatorReject = this::decodeCoordinatorReject;
271273

274+
private final SecureRandom randomServerIdGenerator = new SecureRandom();
275+
272276
private final int kafkaTypeId;
273277
private final int proxyTypeId;
274278
private final MutableDirectBuffer writeBuffer;
@@ -380,6 +384,7 @@ public MessageConsumer newStream(
380384
protocol,
381385
timeout,
382386
groupMembership,
387+
binding.servers(),
383388
sasl);
384389
newStream = newGroup::onStream;
385390

@@ -1223,6 +1228,7 @@ private final class KafkaGroupStream
12231228
private final DescribeClient describeClient;
12241229
private final CoordinatorClient coordinatorClient;
12251230
private final GroupMembership groupMembership;
1231+
private final List<KafkaServerConfig> servers;
12261232
private final String groupId;
12271233
private final String protocol;
12281234
private final long resolvedId;
@@ -1266,6 +1272,7 @@ private final class KafkaGroupStream
12661272
String protocol,
12671273
int timeout,
12681274
GroupMembership groupMembership,
1275+
List<KafkaServerConfig> servers,
12691276
KafkaSaslConfig sasl)
12701277
{
12711278
this.sender = sender;
@@ -1279,6 +1286,7 @@ private final class KafkaGroupStream
12791286
this.timeout = timeout;
12801287
this.resolvedId = resolvedId;
12811288
this.groupMembership = groupMembership;
1289+
this.servers = servers;
12821290
this.clusterClient = new ClusterClient(routedId, resolvedId, sasl, this);
12831291
this.describeClient = new DescribeClient(routedId, resolvedId, sasl, this);
12841292
this.coordinatorClient = new CoordinatorClient(routedId, resolvedId, sasl, this);
@@ -1996,8 +2004,28 @@ private void doNetworkBegin(
19962004

19972005
state = KafkaState.openingInitial(state);
19982006

2007+
Consumer<OctetsFW.Builder> extension = EMPTY_EXTENSION;
2008+
2009+
final KafkaServerConfig kafkaServerConfig =
2010+
delegate.servers != null ?
2011+
delegate.servers.get(randomServerIdGenerator.nextInt(delegate.servers.size())) : null;
2012+
2013+
if (kafkaServerConfig != null)
2014+
{
2015+
extension = e -> e.set((b, o, l) -> proxyBeginExRW.wrap(b, o, l)
2016+
.typeId(proxyTypeId)
2017+
.address(a -> a.inet(i -> i.protocol(p -> p.set(STREAM))
2018+
.source("0.0.0.0")
2019+
.destination(kafkaServerConfig.host)
2020+
.sourcePort(0)
2021+
.destinationPort(kafkaServerConfig.port)))
2022+
.infos(i -> i.item(ii -> ii.authority(kafkaServerConfig.host)))
2023+
.build()
2024+
.sizeof());
2025+
}
2026+
19992027
network = newStream(this::onNetwork, originId, routedId, initialId, initialSeq, initialAck, initialMax,
2000-
traceId, authorization, affinity, EMPTY_EXTENSION);
2028+
traceId, authorization, affinity, extension);
20012029
}
20022030

20032031
@Override

0 commit comments

Comments
 (0)