Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
fa489e7
WIP
akrambek Aug 1, 2024
7713778
WIP
akrambek Aug 1, 2024
7ed2ec6
Updated config
akrambek Aug 1, 2024
ee671f7
Add parsing support
akrambek Aug 1, 2024
e3242cf
refactor fields
akrambek Aug 1, 2024
0698e57
Refactor field names
akrambek Aug 1, 2024
3ad9ba8
WIP
akrambek Aug 5, 2024
e52451d
WIP
akrambek Aug 6, 2024
2197302
WIP
akrambek Aug 6, 2024
ba14be5
WIP
akrambek Aug 6, 2024
d8575fe
WIP
akrambek Aug 7, 2024
c2cb7e1
WIP
akrambek Aug 7, 2024
34d2f1e
WIP
akrambek Aug 8, 2024
cf72d1c
WIP
akrambek Aug 8, 2024
86437bd
Revert back the change
akrambek Aug 8, 2024
bab4f44
WIP
akrambek Aug 8, 2024
eba79ab
Merge branch 'develop' into feature/transforms-key
akrambek Aug 8, 2024
5076c1f
WIP
akrambek Aug 8, 2024
1f20d82
WIP
akrambek Aug 8, 2024
b8b88ac
Clean up
akrambek Aug 8, 2024
09bff21
Refactor
akrambek Aug 8, 2024
0dff53c
Remove extra new line
akrambek Aug 8, 2024
2255013
WIP
akrambek Aug 8, 2024
a72421c
WIP
akrambek Aug 9, 2024
0d3e271
WIP
akrambek Aug 9, 2024
9c9afa7
WIP
akrambek Aug 9, 2024
f307f68
Remove unused import
jfallows Aug 9, 2024
84ca4b3
Resolve operation messages against channel
jfallows Aug 9, 2024
8842125
WIP
akrambek Aug 9, 2024
4573db0
WIP
akrambek Aug 9, 2024
babde6c
Merge branch 'develop' into feature/transforms-key
akrambek Aug 9, 2024
bd39f46
WIP
akrambek Aug 9, 2024
07a3f87
WIP
akrambek Aug 9, 2024
6d2189f
WIP
akrambek Aug 9, 2024
4726491
WIP
akrambek Aug 10, 2024
f582654
WIP
akrambek Aug 10, 2024
e8a892e
Make fields final
akrambek Aug 12, 2024
8ba9e96
Move transforms into writeEntry to get key hash
akrambek Aug 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import jakarta.json.bind.Jsonb;

import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiSchemaConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.config.AsyncapiBindingConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.config.AsyncapiCompositeConditionConfig;
Expand All @@ -37,6 +41,7 @@
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicConfig;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicConfigBuilder;
import io.aklivity.zilla.runtime.binding.tcp.config.TcpOptionsConfig;
import io.aklivity.zilla.runtime.catalog.inline.config.InlineOptionsConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.KindConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder;
Expand Down Expand Up @@ -89,7 +94,7 @@ private ClientNamespaceHelper(
AsyncapiSchemaConfig schema)
{
super(config, schema.apiLabel);
this.catalogs = new CatalogsHelper(schema);
this.catalogs = new ClientCatalogsHelper(schema);
this.bindings = new ClientBindingsHelper(schema);
}

Expand All @@ -101,6 +106,37 @@ protected <C> NamespaceConfigBuilder<C> injectComponents(
.inject(bindings::injectAll);
}

private final class ClientCatalogsHelper extends CatalogsHelper
{
private ClientCatalogsHelper(
AsyncapiSchemaConfig schema)
{
super(schema);
}

@Override
protected <C> void injectInlineSubject(
Jsonb jsonb,
InlineOptionsConfigBuilder<C> options,
AsyncapiMessageView message)
{
super.injectInlineSubject(jsonb, options, message);

if (message.bindings != null &&
message.bindings.kafka != null &&
message.bindings.kafka.key != null)
{
final String subject = "%s-key".formatted(message.channel.address);

options.schema()
.subject(subject)
.version("latest")
.schema(toSchemaJson(jsonb, message.bindings.kafka.key.model))
.build();
}
}
}

private final class ClientBindingsHelper extends BindingsHelper
{
private static final Pattern PARAMETERIZED_TOPIC_PATTERN = Pattern.compile(REGEX_ADDRESS_PARAMETER);
Expand Down Expand Up @@ -142,7 +178,7 @@ private <C> NamespaceConfigBuilder<C> injectProtocols(
.map(s -> s.protocol)
.distinct()
.map(protocols::get)
.filter(p -> p != null)
.filter(Objects::nonNull)
.forEach(p -> p.inject(namespace));

return namespace;
Expand All @@ -152,10 +188,8 @@ private <C> NamespaceConfigBuilder<C> injectTlsClient(
NamespaceConfigBuilder<C> namespace)
{
if (Stream.of(schema)
.map(s -> s.asyncapi)
.flatMap(v -> v.servers.stream())
.filter(s -> secure.contains(s.protocol))
.count() != 0L)
.map(s -> s.asyncapi)
.flatMap(v -> v.servers.stream()).anyMatch(s -> secure.contains(s.protocol)))
{
namespace
.binding()
Expand Down Expand Up @@ -204,6 +238,7 @@ private <C> NamespaceConfigBuilder<C> injectKafka(
NamespaceConfigBuilder<C> namespace)
{
return namespace
.inject(this::injectKafkaCache)
.binding()
.name("kafka_client0")
.type("kafka")
Expand Down Expand Up @@ -266,37 +301,41 @@ private <C> KafkaOptionsConfigBuilder<C> injectKafkaTopicOptions(
KafkaOptionsConfigBuilder<C> options)
{
List<KafkaTopicConfig> topics = config.options.kafka != null
? config.options.kafka.topics
: null;
? config.options.kafka.topics
: null;

if (topics != null)
{
Stream.of(schema)
.map(s -> s.asyncapi)
.flatMap(v -> v.operations.values().stream())
.filter(o -> o.channel.hasMessages() || o.channel.hasParameters())
.flatMap(o -> Stream.of(o.channel, o.reply != null ? o.reply.channel : null))
.filter(c -> c != null)
.forEach(channel ->
topics.stream()
.filter(t -> t.name.equals(channel.address))
.findFirst()
.ifPresent(topic ->
options
.topic()
.name(channel.address)
.transforms()
.extractHeaders(topic.transforms.extractHeaders)
.build()
.inject(t -> injectKafkaTopicKey(t, channel))
.inject(t -> injectKafkaTopicValue(t, channel))
.build()
.build()));
}
Stream.of(schema)
.map(s -> s.asyncapi)
.flatMap(v -> v.channels.values().stream())
.filter(c -> !PARAMETERIZED_TOPIC_PATTERN.matcher(c.address).find())
.distinct()
.forEach(channel ->
options.topic()
.name(channel.address)
.inject(t -> injectKafkaTopicTransforms(t, channel, topics))
.inject(t -> injectKafkaTopicKey(t, channel))
.inject(t -> injectKafkaTopicValue(t, channel))
.build());

return options;
}

private <C> KafkaTopicConfigBuilder<C> injectKafkaTopicTransforms(
KafkaTopicConfigBuilder<C> topic,
AsyncapiChannelView channel,
List<KafkaTopicConfig> topics)
{
Optional<KafkaTopicConfig> topicConfig = topics.stream()
.filter(t -> t.name.equals(channel.address))
.findFirst();
topicConfig.ifPresent(kafkaTopicConfig -> topic
.transforms()
.extractKey(kafkaTopicConfig.transforms.extractKey)
.extractHeaders(kafkaTopicConfig.transforms.extractHeaders)
.build());
return topic;
}

private <C> KafkaTopicConfigBuilder<C> injectKafkaTopicKey(
KafkaTopicConfigBuilder<C> topic,
AsyncapiChannelView channel)
Expand All @@ -311,9 +350,10 @@ private <C> KafkaTopicConfigBuilder<C> injectKafkaTopicKey(
.name("catalog0")
.schema()
.version("latest")
.subject(message.name)
.subject("%s-key".formatted(message.channel.address))
.build()
.build());
.build()
.build());
}
return topic;
}
Expand Down Expand Up @@ -414,8 +454,7 @@ private <C> NamespaceConfigBuilder<C> injectSseClient(
if (Stream.of(schema)
.map(s -> s.asyncapi)
.flatMap(v -> v.operations.values().stream())
.filter(AsyncapiOperationView::hasBindingsSse)
.count() != 0L)
.anyMatch(AsyncapiOperationView::hasBindingsSse))
{
namespace
.binding()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ protected final String resolveIdentity(
return value;
}

protected final class CatalogsHelper
protected class CatalogsHelper
{
private final AsyncapiSchemaConfig schema;

Expand Down Expand Up @@ -353,39 +353,9 @@ private <C> InlineOptionsConfigBuilder<C> injectInlineSubjects(
Stream.of(schema)
.map(s -> s.asyncapi)
.flatMap(v -> v.operations.values().stream())
.map(o -> o.channel)
.filter(c -> c.messages != null)
.flatMap(c -> c.messages.stream())
.forEach(m ->
{
if (m.payload != null)
{
final String subject = "%s-%s-payload".formatted(m.channel.name, m.name);

options.schema()
.subject(subject)
.version("latest")
.schema(toSchemaJson(jsonb, m.payload.model))
.build();
}

if (m.headers != null && m.headers.properties != null)
{
for (Map.Entry<String, AsyncapiSchemaView> header : m.headers.properties.entrySet())
{
final String name = header.getKey();
final AsyncapiSchemaItemView schema = header.getValue();

final String subject = "%s-%s-header-%s".formatted(m.channel.name, m.name, name);

options.schema()
.subject(subject)
.version("latest")
.schema(toSchemaJson(jsonb, schema.model))
.build();
}
}
});
.filter(o -> o.messages != null)
.flatMap(o -> o.messages.stream())
.forEach(m -> injectInlineSubject(jsonb, options, m));

Stream.of(schema)
.map(s -> s.asyncapi)
Expand Down Expand Up @@ -413,7 +383,39 @@ private <C> InlineOptionsConfigBuilder<C> injectInlineSubjects(
return options;
}

private static String toSchemaJson(
protected <C> void injectInlineSubject(
Jsonb jsonb,
InlineOptionsConfigBuilder<C> options,
AsyncapiMessageView message)
{
if (message.payload != null)
{
options.schema()
.subject("%s-%s-value".formatted(message.channel.name, message.name))
.version("latest")
.schema(toSchemaJson(jsonb, message.payload.model))
.build();
}

if (message.headers != null && message.headers.properties != null)
{
for (Map.Entry<String, AsyncapiSchemaView> header : message.headers.properties.entrySet())
{
final String name = header.getKey();
final AsyncapiSchemaItemView schema = header.getValue();

final String subject = "%s-header-%s".formatted(message.channel.address, name);

options.schema()
.subject(subject)
.version("latest")
.schema(toSchemaJson(jsonb, schema.model))
.build();
}
}
}

protected static String toSchemaJson(
Jsonb jsonb,
AsyncapiSchemaItem schema)
{
Expand Down Expand Up @@ -473,7 +475,7 @@ protected final void injectPayloadModel(
message.contentType != null &&
modelContentType.reset(message.contentType).matches())
{
final String subject = "%s-%s-payload".formatted(message.channel.name, message.name);
final String subject = "%s-value".formatted(message.channel.address);

switch (modelContentType.group(1))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ private <C> BindingConfigBuilder<C> injectHttpKafkaRoute(
final String httpMethod = httpOperation.bindings.http.method;
final String httpPath = httpChannel.address;

boolean async = httpChannel.messages.stream()
boolean async = httpOperation.messages.stream()
.anyMatch(m -> m.correlationId != null);

if (async)
Expand Down Expand Up @@ -796,9 +796,7 @@ private <C> HttpKafkaWithProduceConfigBuilder<C> injectHttpKafkaRouteProduceWith

produce.acks("in_sync_replicas").key(key);

AsyncapiChannelView httpChannel = httpOperation.channel;

httpChannel.messages.forEach(message ->
httpOperation.messages.forEach(message ->
{
if (message.correlationId != null)
{
Expand Down
Loading