|
20 | 20 | import jakarta.json.bind.adapter.JsonbAdapter; |
21 | 21 |
|
22 | 22 | import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaCorrelationConfig; |
| 23 | +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaCorrelationConfigBuilder; |
23 | 24 | import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaIdempotencyConfig; |
24 | 25 | import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaOptionsConfig; |
25 | 26 | import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaOptionsConfigBuilder; |
@@ -127,19 +128,24 @@ public OptionsConfig adaptFromJson( |
127 | 128 |
|
128 | 129 | if (object.containsKey(CORRELATION_NAME)) |
129 | 130 | { |
130 | | - JsonObject correlation = object.getJsonObject(CORRELATION_NAME); |
131 | | - if (correlation.containsKey(CORRELATION_HEADERS_NAME)) |
| 131 | + JsonObject correlationJson = object.getJsonObject(CORRELATION_NAME); |
| 132 | + if (correlationJson.containsKey(CORRELATION_HEADERS_NAME)) |
132 | 133 | { |
133 | | - JsonObject headers = correlation.getJsonObject(CORRELATION_HEADERS_NAME); |
134 | | - |
135 | | - builder.correlation() |
136 | | - .replyTo(headers.containsKey(CORRELATION_HEADERS_REPLY_TO_NAME) |
137 | | - ? headers.getString(CORRELATION_HEADERS_REPLY_TO_NAME) |
138 | | - : null) |
139 | | - .correlationId(headers.containsKey(CORRELATION_HEADERS_CORRELATION_ID_NAME) |
140 | | - ? headers.getString(CORRELATION_HEADERS_CORRELATION_ID_NAME) |
141 | | - : null) |
142 | | - .build(); |
| 134 | + HttpKafkaCorrelationConfigBuilder<HttpKafkaOptionsConfigBuilder<HttpKafkaOptionsConfig>> |
| 135 | + correlation = builder.correlation(); |
| 136 | + JsonObject headers = correlationJson.getJsonObject(CORRELATION_HEADERS_NAME); |
| 137 | + |
| 138 | + if (headers.containsKey(CORRELATION_HEADERS_REPLY_TO_NAME)) |
| 139 | + { |
| 140 | + correlation.replyTo(headers.getString(CORRELATION_HEADERS_REPLY_TO_NAME)); |
| 141 | + } |
| 142 | + |
| 143 | + if (headers.containsKey(CORRELATION_HEADERS_CORRELATION_ID_NAME)) |
| 144 | + { |
| 145 | + correlation.correlationId(headers.getString(CORRELATION_HEADERS_CORRELATION_ID_NAME)); |
| 146 | + } |
| 147 | + |
| 148 | + correlation.build(); |
143 | 149 | } |
144 | 150 | } |
145 | 151 |
|
|
0 commit comments