2222import io .aklivity .zilla .runtime .binding .http .kafka .config .HttpKafkaCorrelationConfig ;
2323import io .aklivity .zilla .runtime .binding .http .kafka .config .HttpKafkaIdempotencyConfig ;
2424import io .aklivity .zilla .runtime .binding .http .kafka .config .HttpKafkaOptionsConfig ;
25+ import io .aklivity .zilla .runtime .binding .http .kafka .config .HttpKafkaOptionsConfigBuilder ;
2526import io .aklivity .zilla .runtime .binding .http .kafka .internal .HttpKafkaBinding ;
2627import io .aklivity .zilla .runtime .binding .http .kafka .internal .types .String16FW ;
2728import io .aklivity .zilla .runtime .binding .http .kafka .internal .types .String8FW ;
@@ -111,16 +112,16 @@ public JsonObject adaptToJson(
111112 public OptionsConfig adaptFromJson (
112113 JsonObject object )
113114 {
114- String header = null ;
115- String correlationId = null ;
116- String replyTo = null ;
115+ HttpKafkaOptionsConfigBuilder <HttpKafkaOptionsConfig > builder = HttpKafkaOptionsConfig .builder ();
117116
118117 if (object .containsKey (IDEMPOTENCY_NAME ))
119118 {
120119 JsonObject idempotency = object .getJsonObject (IDEMPOTENCY_NAME );
121120 if (idempotency .containsKey (IDEMPOTENCY_HEADER_NAME ))
122121 {
123- header = idempotency .getString (IDEMPOTENCY_HEADER_NAME );
122+ builder .idempotency ()
123+ .header (idempotency .getString (IDEMPOTENCY_HEADER_NAME ))
124+ .build ();
124125 }
125126 }
126127
@@ -131,26 +132,17 @@ public OptionsConfig adaptFromJson(
131132 {
132133 JsonObject headers = correlation .getJsonObject (CORRELATION_HEADERS_NAME );
133134
134- if (headers .containsKey (CORRELATION_HEADERS_REPLY_TO_NAME ))
135- {
136- replyTo = headers .getString (CORRELATION_HEADERS_REPLY_TO_NAME );
137- }
138-
139- if (headers .containsKey (CORRELATION_HEADERS_CORRELATION_ID_NAME ))
140- {
141- correlationId = headers .getString (CORRELATION_HEADERS_CORRELATION_ID_NAME );
142- }
135+ builder .correlation ()
136+ .replyTo (headers .containsKey (CORRELATION_HEADERS_REPLY_TO_NAME )
137+ ? headers .getString (CORRELATION_HEADERS_REPLY_TO_NAME )
138+ : null )
139+ .correlationId (headers .containsKey (CORRELATION_HEADERS_CORRELATION_ID_NAME )
140+ ? headers .getString (CORRELATION_HEADERS_CORRELATION_ID_NAME )
141+ : null )
142+ .build ();
143143 }
144144 }
145145
146- return HttpKafkaOptionsConfig .builder ()
147- .idempotency ()
148- .header (header )
149- .build ()
150- .correlation ()
151- .replyTo (replyTo )
152- .correlationId (correlationId )
153- .build ()
154- .build ();
146+ return builder .build ();
155147 }
156148}
0 commit comments