Skip to content

Commit 071ceb6

Browse files
committed
add support for OpenapiAsyncapiProxy
1 parent f2e883c commit 071ceb6

File tree

1 file changed

+13
-0
lines changed

1 file changed

+13
-0
lines changed

runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/composite/OpenapiAsyncapiProxyGenerator.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.stream.Collectors;
2828

2929
import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiSchemaConfig;
30+
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiMessageView;
3031
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiOperationView;
3132
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiReplyView;
3233
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaConditionConfig;
@@ -213,10 +214,12 @@ private final class HttpKafkaBindingsHelper extends BindingsHelper
213214
private static final Pattern JSON_CONTENT_TYPE_PATTERN = Pattern.compile("^application/(?:.+\\+)?json$");
214215
private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\{([^}]+)\\}");
215216
private static final Pattern CORRELATION_PATTERN = Pattern.compile(CORRELATION_ID);
217+
private static final Pattern CORRELATION_HEADERS_NAME = Pattern.compile("\\$message\\.header#\\/(.+)");
216218

217219
private final Matcher parameters = PARAMETER_PATTERN.matcher("");
218220
private final Matcher correlation = CORRELATION_PATTERN.matcher("");
219221
private final Matcher jsonContentType = JSON_CONTENT_TYPE_PATTERN.matcher("");
222+
private final Matcher correlationHeader = CORRELATION_HEADERS_NAME.matcher("");
220223

221224

222225
private final List<ProxyRouteHelper> httpKafkaRoutes;
@@ -483,6 +486,16 @@ private <C> HttpKafkaWithProduceConfigBuilder<C> injectHttpKafkaRouteProduceWith
483486
produce.replyTo(reply.channel.address);
484487
}
485488

489+
AsyncapiMessageView messageView = kafkaOperation.messages.get(0);
490+
if (messageView.correlationId != null && messageView.correlationId.location != null)
491+
{
492+
String correlationId = messageView.correlationId.location;
493+
if (correlationHeader.reset(correlationId).matches())
494+
{
495+
produce.correlationId(correlation.group(1));
496+
}
497+
}
498+
486499
return produce;
487500
}
488501

0 commit comments

Comments
 (0)