|
14 | 14 | */ |
15 | 15 | package io.aklivity.zilla.runtime.binding.asyncapi.internal; |
16 | 16 |
|
| 17 | +import java.net.URI; |
| 18 | +import java.util.Map; |
| 19 | +import java.util.stream.Collectors; |
17 | 20 |
|
18 | 21 | import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig; |
19 | 22 | import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi; |
| 23 | +import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiMessage; |
20 | 24 | import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiOperation; |
21 | 25 | import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiChannelView; |
| 26 | +import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiMessageView; |
22 | 27 | import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView; |
23 | | -import io.aklivity.zilla.runtime.binding.http.config.HttpRequestConfig; |
24 | 28 | import io.aklivity.zilla.runtime.binding.kafka.config.KafkaOptionsConfig; |
25 | 29 | import io.aklivity.zilla.runtime.binding.kafka.config.KafkaOptionsConfigBuilder; |
26 | 30 | import io.aklivity.zilla.runtime.binding.kafka.config.KafkaSaslConfig; |
27 | 31 | import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig; |
| 32 | +import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicConfig; |
| 33 | +import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicConfigBuilder; |
28 | 34 | import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; |
| 35 | +import io.aklivity.zilla.runtime.engine.config.CatalogedConfigBuilder; |
29 | 36 | import io.aklivity.zilla.runtime.engine.config.KindConfig; |
30 | 37 | import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; |
31 | | - |
32 | | -import java.net.URI; |
33 | | -import java.util.stream.Collectors; |
| 38 | +import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig; |
34 | 39 |
|
35 | 40 | public class AyncapiKafkaProtocol extends AsyncapiProtocol |
36 | 41 | { |
@@ -82,7 +87,7 @@ public <C> BindingConfigBuilder<C> injectProtocolClientOptions( |
82 | 87 | .password(sasl.password) |
83 | 88 | .build() |
84 | 89 | .inject(this::injectKafkaServerOptions) |
85 | | - .inject(this::injectKafkaBootstrapOptions) |
| 90 | + //.inject(this::injectKafkaBootstrapOptions) |
86 | 91 | .inject(this::injectKafkaTopicOptions) |
87 | 92 | .build(); |
88 | 93 | } |
@@ -124,24 +129,56 @@ private <C> KafkaOptionsConfigBuilder<C> injectKafkaTopicOptions( |
124 | 129 | { |
125 | 130 | AsyncapiOperation operation = asyncApi.operations.get(name); |
126 | 131 | AsyncapiChannelView channel = AsyncapiChannelView.of(asyncApi.channels, operation.channel); |
127 | | - String topic = channel.address().replaceAll("\\{[^}]+\\}", "*"); |
128 | | - |
| 132 | + String topic = channel.address(); |
129 | 133 |
|
130 | | - |
131 | | - |
132 | | - HttpRequestConfig.Method method = HttpRequestConfig.Method.valueOf(operation.bindings.get("http").method); |
133 | 134 | if (channel.messages() != null && !channel.messages().isEmpty() || |
134 | 135 | channel.parameters() != null && !channel.parameters().isEmpty()) |
135 | 136 | { |
136 | 137 | options |
137 | | - .request() |
138 | | - .path(path) |
139 | | - .method(method) |
140 | | - .inject(request -> injectContent(request, channel.messages())) |
141 | | - .inject(request -> injectPathParams(request, channel.parameters())) |
| 138 | + .topic(KafkaTopicConfig::builder) |
| 139 | + .name(topic) |
| 140 | + .inject(topicConfig -> injectValue(topicConfig, channel.messages())) |
| 141 | + .build() |
142 | 142 | .build(); |
143 | 143 | } |
144 | 144 | } |
145 | 145 | return options; |
146 | 146 | } |
| 147 | + |
| 148 | + private <C> KafkaTopicConfigBuilder<C> injectValue( |
| 149 | + KafkaTopicConfigBuilder<C> topic, |
| 150 | + Map<String, AsyncapiMessage> messages) |
| 151 | + { |
| 152 | + if (messages != null) |
| 153 | + { |
| 154 | + if (hasJsonContentType()) |
| 155 | + { |
| 156 | + topic |
| 157 | + .value(JsonModelConfig::builder) |
| 158 | + .catalog() |
| 159 | + .name(INLINE_CATALOG_NAME) |
| 160 | + .inject(catalog -> injectSchemas(catalog, messages)) |
| 161 | + .build() |
| 162 | + .build(); |
| 163 | + } |
| 164 | + } |
| 165 | + return topic; |
| 166 | + } |
| 167 | + |
| 168 | + private <C> CatalogedConfigBuilder<C> injectSchemas( |
| 169 | + CatalogedConfigBuilder<C> catalog, |
| 170 | + Map<String, AsyncapiMessage> messages) |
| 171 | + { |
| 172 | + for (String name : messages.keySet()) |
| 173 | + { |
| 174 | + AsyncapiMessageView message = AsyncapiMessageView.of(asyncApi.components.messages, messages.get(name)); |
| 175 | + String subject = message.refKey() != null ? message.refKey() : name; |
| 176 | + catalog |
| 177 | + .schema() |
| 178 | + .subject(subject) |
| 179 | + .build() |
| 180 | + .build(); |
| 181 | + } |
| 182 | + return catalog; |
| 183 | + } |
147 | 184 | } |
0 commit comments