-
Notifications
You must be signed in to change notification settings - Fork 68
Support extract-key kafka message transform #1183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| .schema() | ||
| .version("latest") | ||
| .subject(subject) | ||
| .subject(message.payload.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be consistent with the others, either subject everywhere or message.payload.name everywhere.
The value of subject is passed as part of the URL to access schema registry.
/subjects/{0}/versions/{1}
So this should probably be based on the channel address topic with -value suffix, rather than relying on any naming convention alignment between message payload name and channel address (kafka topic).
Note: given that channels (kafka topics) can have more than one message type in the general case, seems like we actually want to potentially use topic-record name strategy instead of topic name strategy to differentiate the schemas for different channel (topic) messages.
In that case, it would seem to make sense to base the record part of the topic-record name strategy on the message name within the channel messages.
| KafkaTopicTransformsConfigBuilder<KafkaTopicTransformsConfig> topicBuilder = KafkaTopicTransformsConfig.builder(); | ||
|
|
||
| String extractKey = object.containsKey(EXTRACT_KEY_NAME) | ||
| ? object.getString(EXTRACT_KEY_NAME) | ||
| : null; | ||
| topicBuilder.extractKey(extractKey); | ||
|
|
||
| JsonObject headers = object.containsKey(EXTRACT_HEADERS_NAME) ? object.getJsonObject(EXTRACT_HEADERS_NAME) : null; | ||
|
|
||
| if (headers != null) | ||
| { | ||
| for (Map.Entry<String, JsonValue> entry : headers.entrySet()) | ||
| { | ||
| JsonString jsonString = (JsonString) entry.getValue(); | ||
| topicBuilder.extractHeader(entry.getKey(), jsonString.getString()); | ||
| } | ||
| } | ||
|
|
||
| return topicBuilder.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| KafkaTopicTransformsConfigBuilder<KafkaTopicTransformsConfig> topicBuilder = KafkaTopicTransformsConfig.builder(); | |
| String extractKey = object.containsKey(EXTRACT_KEY_NAME) | |
| ? object.getString(EXTRACT_KEY_NAME) | |
| : null; | |
| topicBuilder.extractKey(extractKey); | |
| JsonObject headers = object.containsKey(EXTRACT_HEADERS_NAME) ? object.getJsonObject(EXTRACT_HEADERS_NAME) : null; | |
| if (headers != null) | |
| { | |
| for (Map.Entry<String, JsonValue> entry : headers.entrySet()) | |
| { | |
| JsonString jsonString = (JsonString) entry.getValue(); | |
| topicBuilder.extractHeader(entry.getKey(), jsonString.getString()); | |
| } | |
| } | |
| return topicBuilder.build(); | |
| KafkaTopicTransformsConfigBuilder<KafkaTopicTransformsConfig> transforms = KafkaTopicTransformsConfig.builder(); | |
| if (object.containsKey(EXTRACT_KEY_NAME)) | |
| { | |
| String extractKey = object.getString(EXTRACT_KEY_NAME); | |
| transforms.extractKey(extractKey); | |
| } | |
| if (object.containsKey(EXTRACT_HEADERS_NAME)) | |
| { | |
| JsonObject headers = object.getJsonObject(EXTRACT_HEADERS_NAME); | |
| for (Map.Entry<String, JsonValue> entry : headers.entrySet()) | |
| { | |
| String headerName = entry.getKey(); | |
| JsonString headerValue = (JsonString) entry.getValue(); | |
| transforms.extractHeader(headerName, headerValue.getString()); | |
| } | |
| } | |
| return transforms.build(); |
| "{" + | ||
| "\"correlation-id\": \"${message.value.correlationId}\"" + | ||
| "}" + | ||
| "]" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Include extract-key here please.
| .header("correlation-id", "${message.value.correlationId}") | ||
| .transforms() | ||
| .extractHeader("correlation-id", "${message.value.correlationId}") | ||
| .build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Include extract-key here please.
| "deprecated": true | ||
| }, | ||
| "headers": | ||
| "transforms": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure you have caught up to latest develop, I think this change is already present there.
| .partition(0, 2, 8) | ||
| .header("header1", "value1") | ||
| .header("correlation-id", "12345") | ||
| .header("correlation-id", "1234") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, what prompted this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The script we are using has 4 bytes key1 and the TestConverter was returning 12345 which has 5 bytes and it was causing testware issue. I just changed TestConverter to return 1234 to match number of bytes in server script.
| private final ModelConfigAdapter converter = new ModelConfigAdapter(); | ||
| private final KafkaTopicTransformsConfigAdapter transformsConverter = new KafkaTopicTransformsConfigAdapter(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private final ModelConfigAdapter converter = new ModelConfigAdapter(); | |
| private final KafkaTopicTransformsConfigAdapter transformsConverter = new KafkaTopicTransformsConfigAdapter(); | |
| private final ModelConfigAdapter model = new ModelConfigAdapter(); | |
| private final KafkaTopicTransformsConfigAdapter transforms = new KafkaTopicTransformsConfigAdapter(); |
These are not converters (means something else for us too) plus probably no need to include the type in the name.
| ConverterHandler convertValue, | ||
| boolean verbose, | ||
| List<KafkaTopicHeaderType> headerTypes) | ||
| KafkaTopicTransformsConfig transforms) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is assuming we can specify either extract-key or extract-headers only once in the transforms array, right?
|
|
||
| convertKey.extracted(transforms.extractKey, writeKey); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is assuming that extract-key is always processed after extract-key, which might be fine for now if we cannot extract the key from a header yet, but probably not sufficient for the general case later where these transforms are expected to execute in the order defined, the output of the previous feeding into the input of the next.
| private AsyncapiChannelResolver channels; | ||
| private AsyncapiMessageResolver messages; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private AsyncapiChannelResolver channels; | |
| private AsyncapiMessageResolver messages; | |
| private final AsyncapiChannelResolver channels; | |
| private final AsyncapiMessageResolver messages; |
Description
Support extract-key kafka message transform.
Fixes #1176