-
Notifications
You must be signed in to change notification settings - Fork 1.7k
GH-3067: Spring Kafka support multiple headers with same key. #3874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 18 commits
c7e5e09
42010c6
dfdfb8d
865b26a
019efb7
065bedc
6aed15b
462fe25
b3f4374
34e6860
dd24248
13267dc
c9c360e
4a762bd
b5375d4
c945dd5
07a49df
732ae6a
585f356
667f76c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -179,3 +179,81 @@ MessagingMessageConverter converter() { | |
|
||
If using Spring Boot, it will auto configure this converter bean into the auto-configured `KafkaTemplate`; otherwise you should add this converter to the template. | ||
|
||
[[multi-value-header]] | ||
== Support multi-value header | ||
|
||
Spring for Apache Kafka 4.0 supports multi-value header where same logical header key appears more than once in a Kafka record. | ||
|
||
By default, when multiple headers share the same name, the `HeaderMapper` treats them as a single value and serialises the collection to JSON. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sentence does not reflect reality. The message headers abstraction doesn’t have multi-value headers notion. |
||
|
||
* **Producer side:** `DefaultKafkaHeaderMapper` writes the JSON bytes, while `SimpleKafkaHeaderMapper` ignore it. | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* **Consumer side:** the mapper exposes the header as a single value—the **last occurrence wins**; earlier duplicates are silently discarded. | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
For example, on the producer side: | ||
[source, java] | ||
---- | ||
// For Producer Side | ||
Message<String> message = MessageBuilder | ||
.withPayload("test-multi-value-header") | ||
.setHeader("test-multi-value-header", List.of("test-value1", "test-value2")) | ||
.build(); | ||
|
||
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper(); | ||
RecordHeaders mappedHeaders = new RecordHeaders(); | ||
headerMapper.fromHeaders(message.getHeaders(), mappedHeaders); | ||
|
||
ObjectMapper objectMapper = new ObjectMapper(); | ||
RecordHeader expectedHeader = new RecordHeader("test-multi-value-header", | ||
objectMapper.writeValueAsBytes(List.of("test-value1", "test-value2"))); | ||
assertThat(mappedHeaders.headers(multiValueHeader1).iterator().next()).isEqualTo(expectedHeader); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, @artembilan. I was basically thinking about a simple way to test an end-to-end scenario that is very obvious from the test itself. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need test in the docs? |
||
---- | ||
|
||
For example, on the consumer side: | ||
[source, java] | ||
---- | ||
// Consumer Side | ||
RecordHeaders recordHeaders = new RecordHeaders(); | ||
recordHeaders.add("test-multi-value1", new byte[] { 0, 0, 0, 1 }); | ||
recordHeaders.add("test-multi-value1", new byte[] { 0, 0, 0, 2 }); | ||
|
||
Map<String, Object> mappedHeaders = new HashMap<>(); | ||
headerMapper.toHeaders(recordHeaders, mappedHeaders); | ||
|
||
assertThat(headersMapped.get("test-multi-value1")).isEqualTo(new byte[] { 0, 0, 0, 2 }); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would you mind to double check this request. Thanks There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Anything that makes it more complex than it needs to be, let's not do it. I wasn't thinking about showing test assertions on the docs either. |
||
---- | ||
|
||
Preserving each individual header requires explicit registration of patterns that designate the header as multi‑valued. | ||
|
||
`DefaultKafkaHeaderMapper#setMultiValueHeaderPatterns(String... patterns)` accepts a list of patterns, which can be either wildcard expressions or exact header names. | ||
|
||
[source, java] | ||
---- | ||
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); | ||
|
||
// Explicit header names | ||
mapper.setMultiValueHeaderPatterns("test-multi-value1", "test-multi-value2"); | ||
|
||
// Wildcard patterns for test-multi-value1, test-multi-value2 | ||
mapper.setMultiValueHeaderPatterns("test-multi-*"); | ||
---- | ||
|
||
Any header whose name matches one of the supplied patterns is | ||
|
||
* **Producer side:** written as separate Kafka headers, one per element. | ||
* **Consumer side:** reconstructed as a `List<?>` and returned unchanged to the application. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I’m not sure about this “unchanged” statement. |
||
|
||
[NOTE] | ||
==== | ||
Regular expressions are *not* supported; only the `*` wildcard is allowed in simple patterns—supporting direct equality and forms such as: | ||
|
||
- `xxx*` | ||
- `*xxx` | ||
- `*xxx*` | ||
- `xxx*yyy` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see a value extracting it into a list section. Feels like continuation of the previous sentence is enough:
|
||
==== | ||
|
||
[IMPORTANT] | ||
==== | ||
All elements collected under the same multi‑value header **must be of the same Java type**. | ||
Mixing, for example, `String` and `byte[]` values under a single header key is not supported and will lead to a conversion error. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is true only if producer side is our |
||
==== |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,3 +70,9 @@ Several deprecated items have been removed: | |
|
||
Spring for Apache Kafka 4.0 supports Kafka 4.0’s new consumer rebalance protocol - https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[KIP-848]. | ||
For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-rebalalcne-protocol[New Consumer Rebalace Protocol docs]. | ||
|
||
[[x40-multi-value-header]] | ||
=== Support multi-value header | ||
|
||
The `DefaultKafkaHeaderMapper` and `SimpleKafkaHeaderMapper` supports multi-value header for Kafka Record. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since you use several objects , no need in a singular verb. Plus looks like “mapping” word is missed after “header”. |
||
More details are available in xref:kafka/headers.adoc#multi-value-header[Support multi-value header]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually we say
Starting with 4.0,
. Otherwise it sounds like in5.0
there won’t be such a support 😜.Plus no need to mention our project name in every sentence since it is kinda obvious that this doc is about Spring for Apache Kafka.