Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ public final class KafkaOptionsConfig extends OptionsConfig
{
public final List<String> bootstrap;
public final List<KafkaTopicConfig> topics;
public final List<KafkaServerConfig> servers;
public final KafkaSaslConfig sasl;

public KafkaOptionsConfig(
List<String> bootstrap,
List<KafkaTopicConfig> topics,
List<KafkaServerConfig> servers,
KafkaSaslConfig sasl)
{
this.bootstrap = bootstrap;
this.topics = topics;
this.servers = servers;
this.sasl = sasl;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.config;

public class KafkaServerConfig
{
public final String host;
public final int port;

public KafkaServerConfig(
String host,
int port)
{
this.host = host;
this.port = port;
}

@Override
public String toString()
{
return String.format("%s:%d]", host, port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import io.aklivity.zilla.runtime.binding.kafka.config.KafkaOptionsConfig;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaSaslConfig;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaType;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType;
Expand Down Expand Up @@ -97,6 +98,11 @@ public KafkaSaslConfig sasl()
return options != null ? options.sasl : null;
}

public List<KafkaServerConfig> servers()
{
return options != null ? options.servers : null;
}

public KafkaDeltaType supplyDeltaType(
String topic,
KafkaDeltaType deltaType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import jakarta.json.Json;
import jakarta.json.JsonArray;
Expand All @@ -28,14 +30,17 @@

import io.aklivity.zilla.runtime.binding.kafka.config.KafkaOptionsConfig;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaSaslConfig;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi;

public final class KafkaOptionsConfigAdapter implements OptionsConfigAdapterSpi, JsonbAdapter<OptionsConfig, JsonObject>
{
private static final Pattern SERVER_PATTERN = Pattern.compile("([\\w-]+(?:\\.[\\w-]+)*):(\\d+)");
private static final String BOOTSTRAP_NAME = "bootstrap";
private static final String SERVERS_NAME = "servers";
private static final String TOPICS_NAME = "topics";
private static final String SASL_NAME = "sasl";
private static final String SASL_MECHANISM_NAME = "mechanism";
Expand Down Expand Up @@ -82,6 +87,15 @@ public JsonObject adaptToJson(
object.add(TOPICS_NAME, entries);
}

if (kafkaOptions.servers != null &&
!kafkaOptions.servers.isEmpty())
{
JsonArrayBuilder entries = Json.createArrayBuilder();
kafkaOptions.servers.forEach(s -> entries.add(String.format("%s:%d", s.host, s.port)));

object.add(SERVERS_NAME, entries);
}

if (kafkaOptions.sasl != null)
{
JsonObjectBuilder sasl = Json.createObjectBuilder();
Expand Down Expand Up @@ -109,6 +123,10 @@ public OptionsConfig adaptFromJson(
? object.getJsonArray(TOPICS_NAME)
: null;

JsonArray serversArray = object.containsKey(SERVERS_NAME)
? object.getJsonArray(SERVERS_NAME)
: null;

JsonObject saslObject = object.containsKey(SASL_NAME)
? object.getJsonObject(SASL_NAME)
: null;
Expand All @@ -131,6 +149,26 @@ public OptionsConfig adaptFromJson(
topics = topics0;
}

List<KafkaServerConfig> servers = null;

if (serversArray != null)
{
List<KafkaServerConfig> servers0 = new ArrayList<>();
serversArray.forEach(v ->
{
final String server = JsonString.class.cast(v).getString();
final Matcher matcher = SERVER_PATTERN.matcher(server);
if (matcher.matches())
{
final String host = matcher.group(1);
final int port = Integer.parseInt(matcher.group(2));

servers0.add(new KafkaServerConfig(host, port));
}
});
servers = servers0;
}

KafkaSaslConfig sasl = null;

if (saslObject != null)
Expand All @@ -142,6 +180,6 @@ public OptionsConfig adaptFromJson(
sasl = new KafkaSaslConfig(mechanism, username, password);
}

return new KafkaOptionsConfig(bootstrap, topics, sasl);
return new KafkaOptionsConfig(bootstrap, topics, servers, sasl);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.aklivity.zilla.runtime.binding.kafka.internal.stream;

import static io.aklivity.zilla.runtime.binding.kafka.internal.types.ProxyAddressProtocol.STREAM;
import static io.aklivity.zilla.runtime.engine.budget.BudgetCreditor.NO_BUDGET_ID;
import static io.aklivity.zilla.runtime.engine.budget.BudgetDebitor.NO_DEBITOR_INDEX;
import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT;
Expand All @@ -28,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.LongFunction;
Expand All @@ -39,6 +41,7 @@
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.kafka.config.KafkaSaslConfig;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig;
Expand All @@ -62,6 +65,7 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaDataExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaDescribeBeginExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaResetExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.ProxyBeginExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.ResetFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.SignalFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.WindowFW;
Expand All @@ -78,6 +82,7 @@ public final class KafkaClientDescribeFactory extends KafkaClientSaslHandshaker

private static final int SIGNAL_NEXT_REQUEST = 1;

private static final Random RANDOM_SERVER_ID_GENERATOR = new Random();
private static final DirectBuffer EMPTY_BUFFER = new UnsafeBuffer();
private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(EMPTY_BUFFER, 0, 0);
private static final Consumer<OctetsFW.Builder> EMPTY_EXTENSION = ex -> {};
Expand Down Expand Up @@ -105,6 +110,7 @@ public final class KafkaClientDescribeFactory extends KafkaClientSaslHandshaker
private final KafkaBeginExFW.Builder kafkaBeginExRW = new KafkaBeginExFW.Builder();
private final KafkaDataExFW.Builder kafkaDataExRW = new KafkaDataExFW.Builder();
private final KafkaResetExFW.Builder kafkaResetExRW = new KafkaResetExFW.Builder();
private final ProxyBeginExFW.Builder proxyBeginExRW = new ProxyBeginExFW.Builder();

private final RequestHeaderFW.Builder requestHeaderRW = new RequestHeaderFW.Builder();
private final DescribeConfigsRequestFW.Builder describeConfigsRequestRW = new DescribeConfigsRequestFW.Builder();
Expand All @@ -131,6 +137,7 @@ public final class KafkaClientDescribeFactory extends KafkaClientSaslHandshaker

private final long maxAgeMillis;
private final int kafkaTypeId;
private final int proxyTypeId;
private final MutableDirectBuffer writeBuffer;
private final MutableDirectBuffer extBuffer;
private final BufferPool decodePool;
Expand All @@ -153,6 +160,7 @@ public KafkaClientDescribeFactory(
super(config, context);
this.maxAgeMillis = Math.min(config.clientDescribeMaxAgeMillis(), config.clientMaxIdleMillis() >> 1);
this.kafkaTypeId = context.supplyTypeId(KafkaBinding.NAME);
this.proxyTypeId = context.supplyTypeId("proxy");
this.signaler = signaler;
this.streamFactory = streamFactory;
this.resolveSasl = resolveSasl;
Expand Down Expand Up @@ -210,6 +218,7 @@ public MessageConsumer newStream(
resolvedId,
topicName,
configs,
binding.servers(),
sasl)::onApplication;
}

Expand Down Expand Up @@ -627,6 +636,7 @@ private final class KafkaDescribeStream
long resolvedId,
String topic,
List<String> configs,
List<KafkaServerConfig> servers,
KafkaSaslConfig sasl)
{
this.application = application;
Expand All @@ -635,7 +645,7 @@ private final class KafkaDescribeStream
this.initialId = initialId;
this.replyId = supplyReplyId.applyAsLong(initialId);
this.affinity = affinity;
this.client = new KafkaDescribeClient(routedId, resolvedId, topic, configs, sasl);
this.client = new KafkaDescribeClient(routedId, resolvedId, topic, configs, servers, sasl);
}

private void onApplication(
Expand Down Expand Up @@ -898,6 +908,7 @@ private final class KafkaDescribeClient extends KafkaSaslClient
private MessageConsumer network;
private final String topic;
private final Map<String, String> configs;
private final List<KafkaServerConfig> servers;

private int state;
private long authorization;
Expand Down Expand Up @@ -933,11 +944,13 @@ private final class KafkaDescribeClient extends KafkaSaslClient
long routedId,
String topic,
List<String> configs,
List<KafkaServerConfig> servers,
KafkaSaslConfig sasl)
{
super(sasl, originId, routedId);
this.topic = requireNonNull(topic);
this.configs = new LinkedHashMap<>(configs.size());
this.servers = servers;
configs.forEach(c -> this.configs.put(c, null));

this.encoder = sasl != null ? encodeSaslHandshakeRequest : encodeDescribeRequest;
Expand Down Expand Up @@ -1180,8 +1193,27 @@ private void doNetworkBegin(
{
state = KafkaState.openingInitial(state);

Consumer<OctetsFW.Builder> extension = EMPTY_EXTENSION;

final int randomServerId = RANDOM_SERVER_ID_GENERATOR.nextInt(servers.size() + 1);
final KafkaServerConfig kafkaServerConfig = servers.get(randomServerId);

if (kafkaServerConfig != null)
{
extension = e -> e.set((b, o, l) -> proxyBeginExRW.wrap(b, o, l)
.typeId(proxyTypeId)
.address(a -> a.inet(i -> i.protocol(p -> p.set(STREAM))
.source("0.0.0.0")
.destination(kafkaServerConfig.host)
.sourcePort(0)
.destinationPort(kafkaServerConfig.port)))
.infos(i -> i.item(ii -> ii.authority(kafkaServerConfig.host)))
.build()
.sizeof());
}

network = newStream(this::onNetwork, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, affinity, EMPTY_EXTENSION);
traceId, authorization, affinity, extension);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import java.util.function.Supplier;
Expand All @@ -45,6 +46,7 @@
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.kafka.config.KafkaSaslConfig;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig;
Expand Down Expand Up @@ -147,6 +149,7 @@ public final class KafkaClientGroupFactory extends KafkaClientSaslHandshaker imp
private static final String GROUP_MIN_SESSION_TIMEOUT = "group.min.session.timeout.ms";
private static final String GROUP_MAX_SESSION_TIMEOUT = "group.max.session.timeout.ms";
private static final byte GROUP_KEY_TYPE = 0x00;
private static final Random RANDOM_SERVER_ID_GENERATOR = new Random();
private static final DirectBuffer EMPTY_BUFFER = new UnsafeBuffer();
private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(EMPTY_BUFFER, 0, 0);
private static final Consumer<OctetsFW.Builder> EMPTY_EXTENSION = ex -> {};
Expand Down Expand Up @@ -380,6 +383,7 @@ public MessageConsumer newStream(
protocol,
timeout,
groupMembership,
binding.servers(),
sasl);
newStream = newGroup::onStream;

Expand Down Expand Up @@ -1223,6 +1227,7 @@ private final class KafkaGroupStream
private final DescribeClient describeClient;
private final CoordinatorClient coordinatorClient;
private final GroupMembership groupMembership;
private final List<KafkaServerConfig> servers;
private final String groupId;
private final String protocol;
private final long resolvedId;
Expand Down Expand Up @@ -1266,6 +1271,7 @@ private final class KafkaGroupStream
String protocol,
int timeout,
GroupMembership groupMembership,
List<KafkaServerConfig> servers,
KafkaSaslConfig sasl)
{
this.sender = sender;
Expand All @@ -1279,6 +1285,7 @@ private final class KafkaGroupStream
this.timeout = timeout;
this.resolvedId = resolvedId;
this.groupMembership = groupMembership;
this.servers = servers;
this.clusterClient = new ClusterClient(routedId, resolvedId, sasl, this);
this.describeClient = new DescribeClient(routedId, resolvedId, sasl, this);
this.coordinatorClient = new CoordinatorClient(routedId, resolvedId, sasl, this);
Expand Down Expand Up @@ -1996,8 +2003,27 @@ private void doNetworkBegin(

state = KafkaState.openingInitial(state);

Consumer<OctetsFW.Builder> extension = EMPTY_EXTENSION;

final int randomServerId = RANDOM_SERVER_ID_GENERATOR.nextInt(delegate.servers.size() + 1);
final KafkaServerConfig kafkaServerConfig = delegate.servers.get(randomServerId);

if (kafkaServerConfig != null)
{
extension = e -> e.set((b, o, l) -> proxyBeginExRW.wrap(b, o, l)
.typeId(proxyTypeId)
.address(a -> a.inet(i -> i.protocol(p -> p.set(STREAM))
.source("0.0.0.0")
.destination(kafkaServerConfig.host)
.sourcePort(0)
.destinationPort(kafkaServerConfig.port)))
.infos(i -> i.item(ii -> ii.authority(kafkaServerConfig.host)))
.build()
.sizeof());
}

network = newStream(this::onNetwork, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, affinity, EMPTY_EXTENSION);
traceId, authorization, affinity, extension);
}

@Override
Expand Down
Loading