Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,11 @@ private final class HttpKafkaBindingsHelper extends BindingsHelper

private static final Pattern HEADER_LOCATION_PATTERN = Pattern.compile("([^/]+)$");
private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\{([^}]+)\\}");
private static final Pattern CORRELATION_HEADERS_NAME = Pattern.compile("\\$message\\.header#\\/(.+)");

private final Matcher headerLocation = HEADER_LOCATION_PATTERN.matcher("");
private final Matcher parameters = PARAMETER_PATTERN.matcher("");
private final Matcher correlation = CORRELATION_HEADERS_NAME.matcher("");

private final List<ProxyRouteHelper> httpKafkaRoutes;

Expand Down Expand Up @@ -823,6 +825,16 @@ private <C> HttpKafkaWithProduceConfigBuilder<C> injectHttpKafkaRouteProduceWith
produce.replyTo(kafkaOperation.reply.channel.address);
}

AsyncapiMessageView messageView = kafkaOperation.messages.get(0);
if (messageView.correlationId != null && messageView.correlationId.location != null)
{
String correlationId = messageView.correlationId.location;
if (correlation.reset(correlationId).matches())
{
produce.correlationId(correlation.group(1));
}
}

AsyncapiHttpKafkaOperationBinding httpKafkaBinding = httpOperation.bindings.httpKafka;
if (httpKafkaBinding != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package io.aklivity.zilla.runtime.binding.http.kafka.config;

import java.util.function.Function;

import io.aklivity.zilla.runtime.binding.http.kafka.internal.types.String16FW;

public final class HttpKafkaCorrelationConfig
Expand All @@ -28,4 +30,15 @@ public HttpKafkaCorrelationConfig(
this.replyTo = replyTo;
this.correlationId = correlationId;
}

public static HttpKafkaCorrelationConfigBuilder<HttpKafkaCorrelationConfig> builder()
{
return new HttpKafkaCorrelationConfigBuilder<>(HttpKafkaCorrelationConfig.class::cast);
}

public static <T> HttpKafkaCorrelationConfigBuilder<T> builder(
Function<HttpKafkaCorrelationConfig, T> mapper)
{
return new HttpKafkaCorrelationConfigBuilder<>(mapper);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2021-2024 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.http.kafka.config;

import java.util.function.Function;

import io.aklivity.zilla.runtime.binding.http.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;

public final class HttpKafkaCorrelationConfigBuilder<T> extends ConfigBuilder<T, HttpKafkaCorrelationConfigBuilder<T>>
{
private static final String16FW CORRELATION_HEADERS_REPLY_TO_DEFAULT = new String16FW("zilla:reply-to");
private static final String16FW CORRELATION_HEADERS_CORRELATION_ID_DEFAULT = new String16FW("zilla:correlation-id");

private final Function<HttpKafkaCorrelationConfig, T> mapper;

private String replyTo;
private String correlationId;

HttpKafkaCorrelationConfigBuilder(
Function<HttpKafkaCorrelationConfig, T> mapper)
{
this.mapper = mapper;
}

public HttpKafkaCorrelationConfigBuilder<T> replyTo(
String replyTo)
{
this.replyTo = replyTo;
return this;
}

public HttpKafkaCorrelationConfigBuilder<T> correlationId(
String correlationId)
{
this.correlationId = correlationId;
return this;
}

@Override
@SuppressWarnings("unchecked")
protected Class<HttpKafkaCorrelationConfigBuilder<T>> thisType()
{
return (Class<HttpKafkaCorrelationConfigBuilder<T>>) getClass();
}

@Override
public T build()
{
String16FW replyTo = this.replyTo != null
? new String16FW(this.replyTo)
: CORRELATION_HEADERS_REPLY_TO_DEFAULT;
String16FW correlationId = this.correlationId != null
? new String16FW(this.correlationId)
: CORRELATION_HEADERS_CORRELATION_ID_DEFAULT;
return mapper.apply(new HttpKafkaCorrelationConfig(replyTo, correlationId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package io.aklivity.zilla.runtime.binding.http.kafka.config;

import java.util.function.Function;

import io.aklivity.zilla.runtime.binding.http.kafka.internal.types.String8FW;

public final class HttpKafkaIdempotencyConfig
Expand All @@ -25,4 +27,15 @@ public HttpKafkaIdempotencyConfig(
{
this.header = header;
}

public static HttpKafkaIdempotencyConfigBuilder<HttpKafkaIdempotencyConfig> builder()
{
return new HttpKafkaIdempotencyConfigBuilder<>(HttpKafkaIdempotencyConfig.class::cast);
}

public static <T> HttpKafkaIdempotencyConfigBuilder<T> builder(
Function<HttpKafkaIdempotencyConfig, T> mapper)
{
return new HttpKafkaIdempotencyConfigBuilder<>(mapper);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2021-2024 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.http.kafka.config;

import java.util.function.Function;

import io.aklivity.zilla.runtime.binding.http.kafka.internal.types.String8FW;
import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;

public final class HttpKafkaIdempotencyConfigBuilder<T> extends ConfigBuilder<T, HttpKafkaIdempotencyConfigBuilder<T>>
{
private static final String8FW IDEMPOTENCY_HEADER_DEFAULT = new String8FW("idempotency-key");

private final Function<HttpKafkaIdempotencyConfig, T> mapper;

private String header;

HttpKafkaIdempotencyConfigBuilder(
Function<HttpKafkaIdempotencyConfig, T> mapper)
{
this.mapper = mapper;
}

public HttpKafkaIdempotencyConfigBuilder<T> header(
String header)
{
this.header = header;
return this;
}

@Override
@SuppressWarnings("unchecked")
protected Class<HttpKafkaIdempotencyConfigBuilder<T>> thisType()
{
return (Class<HttpKafkaIdempotencyConfigBuilder<T>>) getClass();
}

@Override
public T build()
{
String8FW header = this.header != null ? new String8FW(this.header) : IDEMPOTENCY_HEADER_DEFAULT;
return mapper.apply(new HttpKafkaIdempotencyConfig(header));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package io.aklivity.zilla.runtime.binding.http.kafka.config;

import java.util.function.Function;

import io.aklivity.zilla.runtime.engine.config.OptionsConfig;

public final class HttpKafkaOptionsConfig extends OptionsConfig
Expand All @@ -28,4 +30,15 @@ public HttpKafkaOptionsConfig(
this.idempotency = idempotency;
this.correlation = correlation;
}

public static HttpKafkaOptionsConfigBuilder<HttpKafkaOptionsConfig> builder()
{
return new HttpKafkaOptionsConfigBuilder<>(HttpKafkaOptionsConfig.class::cast);
}

public static <T> HttpKafkaOptionsConfigBuilder<T> builder(
Function<OptionsConfig, T> mapper)
{
return new HttpKafkaOptionsConfigBuilder<>(mapper);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2021-2024 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.http.kafka.config;

import java.util.function.Function;

import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;

public final class HttpKafkaOptionsConfigBuilder<T> extends ConfigBuilder<T, HttpKafkaOptionsConfigBuilder<T>>
{
private final Function<OptionsConfig, T> mapper;

private HttpKafkaCorrelationConfig correlation;
private HttpKafkaIdempotencyConfig idempotency;

HttpKafkaOptionsConfigBuilder(
Function<OptionsConfig, T> mapper)
{
this.mapper = mapper;
}

public HttpKafkaCorrelationConfigBuilder<HttpKafkaOptionsConfigBuilder<T>> correlation()
{
return new HttpKafkaCorrelationConfigBuilder<>(this::correlation);
}

public HttpKafkaIdempotencyConfigBuilder<HttpKafkaOptionsConfigBuilder<T>> idempotency()
{
return new HttpKafkaIdempotencyConfigBuilder<>(this::idempotency);
}

private HttpKafkaOptionsConfigBuilder<T> correlation(
HttpKafkaCorrelationConfig correlation)
{
this.correlation = correlation;
return this;
}

private HttpKafkaOptionsConfigBuilder<T> idempotency(
HttpKafkaIdempotencyConfig idempotency)
{
this.idempotency = idempotency;
return this;
}

@Override
@SuppressWarnings("unchecked")
protected Class<HttpKafkaOptionsConfigBuilder<T>> thisType()
{
return (Class<HttpKafkaOptionsConfigBuilder<T>>) getClass();
}

@Override
public T build()
{
return mapper.apply(new HttpKafkaOptionsConfig(idempotency, correlation));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public final class HttpKafkaWithProduceConfig
public final Optional<List<HttpKafkaWithProduceOverrideConfig>> overrides;
public final Optional<String> replyTo;
public final Optional<List<HttpKafkaWithProduceAsyncHeaderConfig>> async;
public final String16FW correlationId;

private final List<Matcher> asyncMatchers;

Expand All @@ -43,13 +44,15 @@ public final class HttpKafkaWithProduceConfig
String key,
List<HttpKafkaWithProduceOverrideConfig> overrides,
String replyTo,
String16FW correlationId,
List<HttpKafkaWithProduceAsyncHeaderConfig> async)
{
this.topic = topic;
this.acks = acks;
this.key = Optional.ofNullable(key);
this.overrides = Optional.ofNullable(overrides);
this.replyTo = Optional.ofNullable(replyTo);
this.correlationId = correlationId;
this.async = Optional.ofNullable(async);

this.asyncMatchers = this.async.isPresent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.Function;

import io.aklivity.zilla.runtime.binding.http.kafka.internal.types.KafkaAckMode;
import io.aklivity.zilla.runtime.binding.http.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;

public final class HttpKafkaWithProduceConfigBuilder<T> extends ConfigBuilder<T, HttpKafkaWithProduceConfigBuilder<T>>
Expand All @@ -31,6 +32,7 @@ public final class HttpKafkaWithProduceConfigBuilder<T> extends ConfigBuilder<T,
private List<HttpKafkaWithProduceOverrideConfig> overrides;
private String replyTo;
private List<HttpKafkaWithProduceAsyncHeaderConfig> async;
private String16FW correlationId;


HttpKafkaWithProduceConfigBuilder(
Expand Down Expand Up @@ -79,6 +81,13 @@ public HttpKafkaWithProduceConfigBuilder<T> replyTo(
return this;
}

public HttpKafkaWithProduceConfigBuilder<T> correlationId(
String correlationId)
{
this.correlationId = correlationId != null ? new String16FW(correlationId) : null;
return this;
}

public HttpKafkaWithProduceAsyncHeaderConfigBuilder<HttpKafkaWithProduceConfigBuilder<T>> async()
{
return HttpKafkaWithProduceAsyncHeaderConfig.builder(this::async);
Expand Down Expand Up @@ -115,7 +124,7 @@ protected Class<HttpKafkaWithProduceConfigBuilder<T>> thisType()
@Override
public T build()
{
return mapper.apply(new HttpKafkaWithProduceConfig(topic, acks, key, overrides, replyTo, async));
return mapper.apply(new HttpKafkaWithProduceConfig(topic, acks, key, overrides, replyTo, correlationId, async));
}

private HttpKafkaWithProduceConfigBuilder<T> override(
Expand Down
Loading
Loading