Skip to content

Commit 760e698

Browse files
authored
fix: kafka header extraction expression in composite zilla.yaml (#1362)
1 parent 811415b commit 760e698

File tree

10 files changed

+118
-45
lines changed

10 files changed

+118
-45
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc.
3+
*
4+
* Aklivity licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.aklivity.zilla.runtime.binding.kafka.config;
17+
18+
public class KafkaTopicHeaderConfig
19+
{
20+
public final String name;
21+
public final String path;
22+
23+
public KafkaTopicHeaderConfig(
24+
String name,
25+
String path)
26+
{
27+
this.name = name;
28+
this.path = path;
29+
}
30+
}

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/config/KafkaTopicTransformsConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class KafkaTopicTransformsConfig
2222
{
2323
public final String extractKey;
2424

25-
public final List<KafkaTopicHeaderType> extractHeaders;
25+
public final List<KafkaTopicHeaderConfig> extractHeaders;
2626

2727
public static KafkaTopicTransformsConfigBuilder<KafkaTopicTransformsConfig> builder()
2828
{
@@ -37,7 +37,7 @@ public static <T> KafkaTopicTransformsConfigBuilder<T> builder(
3737

3838
KafkaTopicTransformsConfig(
3939
String extractKey,
40-
List<KafkaTopicHeaderType> extractHeaders)
40+
List<KafkaTopicHeaderConfig> extractHeaders)
4141
{
4242
this.extractKey = extractKey;
4343
this.extractHeaders = extractHeaders;

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/config/KafkaTopicTransformsConfigBuilder.java

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,20 @@
1818
import java.util.ArrayList;
1919
import java.util.List;
2020
import java.util.function.Function;
21-
import java.util.regex.Matcher;
22-
import java.util.regex.Pattern;
2321

2422
import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;
2523

2624
public final class KafkaTopicTransformsConfigBuilder<T> extends ConfigBuilder<T, KafkaTopicTransformsConfigBuilder<T>>
2725
{
28-
private static final String PATH = "^\\$\\{message\\.(key|value)\\.([A-Za-z_][A-Za-z0-9_]*)\\}$";
29-
private static final Pattern PATH_PATTERN = Pattern.compile(PATH);
30-
private static final String INTERNAL_VALUE = "$.%s";
31-
private static final String INTERNAL_PATH = "^\\$\\..*$";
32-
private static final Pattern INTERNAL_PATH_PATTERN = Pattern.compile(INTERNAL_PATH);
33-
34-
private final Matcher matcher;
35-
private final Matcher internalMatcher;
3626
private final Function<KafkaTopicTransformsConfig, T> mapper;
3727
private String extractKey;
38-
private List<KafkaTopicHeaderType> extractHeaders;
28+
private List<KafkaTopicHeaderConfig> extractHeaders;
3929

4030
KafkaTopicTransformsConfigBuilder(
4131
Function<KafkaTopicTransformsConfig, T> mapper)
4232
{
4333
this.mapper = mapper;
4434
this.extractHeaders = new ArrayList<>();
45-
this.matcher = PATH_PATTERN.matcher("");
46-
this.internalMatcher = INTERNAL_PATH_PATTERN.matcher("");
4735
}
4836

4937
@Override
@@ -56,14 +44,13 @@ protected Class<KafkaTopicTransformsConfigBuilder<T>> thisType()
5644
public KafkaTopicTransformsConfigBuilder<T> extractKey(
5745
String extractKey)
5846
{
59-
this.extractKey = extractKey != null && matcher.reset(extractKey).matches()
60-
? String.format(INTERNAL_VALUE, matcher.group(2))
61-
: extractKey;
47+
this.extractKey = extractKey;
48+
6249
return this;
6350
}
6451

6552
public KafkaTopicTransformsConfigBuilder<T> extractHeaders(
66-
List<KafkaTopicHeaderType> extractHeaders)
53+
List<KafkaTopicHeaderConfig> extractHeaders)
6754
{
6855
if (extractHeaders != null)
6956
{
@@ -73,7 +60,7 @@ public KafkaTopicTransformsConfigBuilder<T> extractHeaders(
7360
}
7461

7562
public KafkaTopicTransformsConfigBuilder<T> extractHeader(
76-
KafkaTopicHeaderType header)
63+
KafkaTopicHeaderConfig header)
7764
{
7865
return extractHeader(header.name, header.path);
7966
}
@@ -86,15 +73,9 @@ public KafkaTopicTransformsConfigBuilder<T> extractHeader(
8673
{
8774
this.extractHeaders = new ArrayList<>();
8875
}
89-
if (matcher.reset(path).matches())
90-
{
91-
this.extractHeaders.add(new KafkaTopicHeaderType(name,
92-
String.format(INTERNAL_VALUE, matcher.group(2))));
93-
}
94-
else if (internalMatcher.reset(path).matches())
95-
{
96-
this.extractHeaders.add(new KafkaTopicHeaderType(name, path));
97-
}
76+
77+
this.extractHeaders.add(new KafkaTopicHeaderConfig(name, path));
78+
9879
return this;
9980
}
10081

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCachePartition.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@
6565
import org.agrona.io.DirectBufferInputStream;
6666
import org.agrona.io.ExpandableDirectBufferOutputStream;
6767

68-
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicHeaderType;
69-
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicTransformsConfig;
68+
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaTopicHeaderType;
69+
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaTopicTransformsType;
7070
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Array32FW;
7171
import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW;
7272
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight;
@@ -126,7 +126,7 @@ public final class KafkaCachePartition
126126

127127
private final Varint32FW varintRO = new Varint32FW();
128128
private final KafkaCachePaddedKeyFW.Builder paddedKeyRW = new KafkaCachePaddedKeyFW.Builder()
129-
.wrap(new UnsafeBuffer(new byte[8192]), 0, 8192);;
129+
.wrap(new UnsafeBuffer(new byte[8192]), 0, 8192);
130130
private final String32FW.Builder stringRW = new String32FW.Builder()
131131
.wrap(new UnsafeBuffer(new byte[256]), 0, 256);
132132
private final Varint32FW.Builder varintRW = new Varint32FW.Builder().wrap(new UnsafeBuffer(new byte[5]), 0, 5);
@@ -364,7 +364,7 @@ public void writeEntry(
364364
ConverterHandler convertKey,
365365
ConverterHandler convertValue,
366366
boolean verbose,
367-
KafkaTopicTransformsConfig transforms)
367+
KafkaTopicTransformsType transforms)
368368
{
369369
final int valueLength = value != null ? value.sizeof() : -1;
370370
writeEntryStart(context, traceId, bindingId, offset, entryMark, valueMark, timestamp, producerId, key,
@@ -391,7 +391,7 @@ public void writeEntryStart(
391391
OctetsFW payload,
392392
ConverterHandler convertKey,
393393
ConverterHandler convertValue,
394-
KafkaTopicTransformsConfig transforms,
394+
KafkaTopicTransformsType transforms,
395395
boolean verbose)
396396
{
397397
assert offset > this.progress : String.format("%d > %d", offset, this.progress);
@@ -573,7 +573,7 @@ public void writeEntryFinish(
573573
ConverterHandler convertKey,
574574
ConverterHandler convertValue,
575575
boolean verbose,
576-
KafkaTopicTransformsConfig transforms)
576+
KafkaTopicTransformsType transforms)
577577
{
578578
final Node head = sentinel.previous;
579579
assert head != sentinel;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* License for the specific language governing permissions and limitations
1414
* under the License.
1515
*/
16-
package io.aklivity.zilla.runtime.binding.kafka.config;
16+
package io.aklivity.zilla.runtime.binding.kafka.internal.config;
1717

1818
public class KafkaTopicHeaderType
1919
{

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/config/KafkaTopicTransformsConfigAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import jakarta.json.JsonValue;
2525
import jakarta.json.bind.adapter.JsonbAdapter;
2626

27-
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicHeaderType;
27+
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicHeaderConfig;
2828
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicTransformsConfig;
2929
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicTransformsConfigBuilder;
3030

@@ -47,7 +47,7 @@ public JsonObject adaptToJson(
4747
if (transforms.extractHeaders != null && !transforms.extractHeaders.isEmpty())
4848
{
4949
JsonObjectBuilder headers = Json.createObjectBuilder();
50-
for (KafkaTopicHeaderType header : transforms.extractHeaders)
50+
for (KafkaTopicHeaderConfig header : transforms.extractHeaders)
5151
{
5252
headers.add(header.name, header.path);
5353
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc.
3+
*
4+
* Aklivity licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.aklivity.zilla.runtime.binding.kafka.internal.config;
17+
18+
import java.util.List;
19+
20+
public class KafkaTopicTransformsType
21+
{
22+
public final String extractKey;
23+
24+
public final List<KafkaTopicHeaderType> extractHeaders;
25+
26+
KafkaTopicTransformsType(
27+
String extractKey,
28+
List<KafkaTopicHeaderType> extractHeaders)
29+
{
30+
this.extractKey = extractKey;
31+
this.extractHeaders = extractHeaders;
32+
}
33+
}

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/config/KafkaTopicType.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,34 @@
1515
*/
1616
package io.aklivity.zilla.runtime.binding.kafka.internal.config;
1717

18+
import static java.util.Collections.emptyList;
19+
20+
import java.util.List;
1821
import java.util.Optional;
1922
import java.util.regex.Matcher;
2023
import java.util.regex.Pattern;
2124

2225
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicConfig;
23-
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicHeaderType;
2426
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicTransformsConfig;
2527
import io.aklivity.zilla.runtime.engine.EngineContext;
2628
import io.aklivity.zilla.runtime.engine.model.ConverterHandler;
2729

2830
public class KafkaTopicType
2931
{
32+
private static final String TRANSFORM_PATH = "^\\$\\{message\\.(key|value)\\.([A-Za-z_][A-Za-z0-9_]*)\\}$";
33+
private static final Pattern TRANSFORM_PATH_PATTERN = Pattern.compile(TRANSFORM_PATH);
34+
private static final String TRANSFORM_INTERNAL_PATH = "$.%s";
35+
3036
public static final KafkaTopicType DEFAULT_TOPIC_TYPE = new KafkaTopicType();
3137

3238
public final ConverterHandler keyReader;
3339
public final ConverterHandler keyWriter;
3440
public final ConverterHandler valueReader;
3541
public final ConverterHandler valueWriter;
36-
public final KafkaTopicTransformsConfig transforms;
42+
public final KafkaTopicTransformsType transforms;
3743

3844
private final Matcher topicMatch;
45+
private final Matcher matcher;
3946

4047
private KafkaTopicType()
4148
{
@@ -45,14 +52,16 @@ private KafkaTopicType()
4552
this.valueReader = ConverterHandler.NONE;
4653
this.valueWriter = ConverterHandler.NONE;
4754
this.transforms = null;
55+
this.matcher = TRANSFORM_PATH_PATTERN.matcher("");
4856
}
4957

5058
public KafkaTopicType(
5159
EngineContext context,
5260
KafkaTopicConfig topicConfig)
5361
{
62+
this.matcher = TRANSFORM_PATH_PATTERN.matcher("");
5463
this.topicMatch = topicConfig.name != null ? asMatcher(topicConfig.name) : null;
55-
this.transforms = topicConfig.transforms;
64+
this.transforms = topicConfig.transforms != null ? transforms(topicConfig.transforms) : null;
5665
this.keyReader = Optional.ofNullable(topicConfig.key)
5766
.map(context::supplyReadConverter)
5867
.map(this::key)
@@ -100,6 +109,26 @@ private ConverterHandler headers(
100109
return handler;
101110
}
102111

112+
113+
private KafkaTopicTransformsType transforms(
114+
KafkaTopicTransformsConfig transforms)
115+
{
116+
String transformKey = Optional.ofNullable(transforms.extractKey)
117+
.filter(key -> matcher.reset(key).matches())
118+
.map(key -> String.format(TRANSFORM_INTERNAL_PATH, matcher.group(2)))
119+
.orElse(transforms.extractKey);
120+
121+
List<KafkaTopicHeaderType> transformHeaders = Optional.ofNullable(transforms.extractHeaders)
122+
.orElse(emptyList())
123+
.stream()
124+
.filter(header -> matcher.reset(header.path).matches())
125+
.map(header -> new KafkaTopicHeaderType(header.name,
126+
String.format(TRANSFORM_INTERNAL_PATH, matcher.group(2))))
127+
.toList();
128+
129+
return new KafkaTopicTransformsType(transformKey, transformHeaders);
130+
}
131+
103132
private static Matcher asMatcher(
104133
String topic)
105134
{

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerFetchFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.agrona.collections.MutableInteger;
4444
import org.agrona.concurrent.UnsafeBuffer;
4545

46-
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicTransformsConfig;
4746
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding;
4847
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration;
4948
import io.aklivity.zilla.runtime.binding.kafka.internal.cache.KafkaCache;
@@ -54,6 +53,7 @@
5453
import io.aklivity.zilla.runtime.binding.kafka.internal.cache.KafkaCacheTopic;
5554
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig;
5655
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig;
56+
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaTopicTransformsType;
5757
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaTopicType;
5858
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Array32FW;
5959
import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW;
@@ -486,7 +486,7 @@ final class KafkaCacheServerFetchFanout
486486
private final ConverterHandler convertValue;
487487
private final MutableInteger entryMark;
488488
private final MutableInteger valueMark;
489-
private final KafkaTopicTransformsConfig transforms;
489+
private final KafkaTopicTransformsType transforms;
490490

491491
private long leaderId;
492492
private long initialId;

runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/config/KafkaOptionsConfigAdapterTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ public void shouldReadHeadersOptions()
243243
assertThat(options, not(nullValue()));
244244
assertThat(options.bootstrap, equalTo(singletonList("test")));
245245
assertEquals(options.topics.get(0).transforms.extractHeaders.get(0).name, "correlation-id");
246-
assertEquals(options.topics.get(0).transforms.extractHeaders.get(0).path, "$.correlationId");
246+
assertEquals(options.topics.get(0).transforms.extractHeaders.get(0).path, "${message.value.correlationId}");
247247
}
248248

249249
@Test
@@ -264,6 +264,6 @@ public void shouldWriteHeadersOptions()
264264

265265
assertThat(text, not(nullValue()));
266266
assertThat(text, equalTo("{\"bootstrap\":[\"test\"],\"topics\":[{\"name\":\"test\",\"value\":\"test\"," +
267-
"\"transforms\":{\"extract-headers\":{\"correlation-id\":\"$.correlationId\"}}}]}"));
267+
"\"transforms\":{\"extract-headers\":{\"correlation-id\":\"${message.value.correlationId}\"}}}]}"));
268268
}
269269
}

0 commit comments

Comments
 (0)