Skip to content
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
2c3cc82
Change cnd_timedwait_abs to take a monotonic clock value
emasab Feb 12, 2025
c67e1dd
Fix for a minimum latency of 500ms in case of leader change,
emasab Jan 20, 2025
8b15804
Fix flakyness in test 0068
emasab Jan 20, 2025
ea238ad
Fix flakyness in test 0086
emasab Jan 20, 2025
8833f85
Fix for test 0119, remove ACLs in the test that created them to preve…
emasab Jan 22, 2025
2cde9cd
Fix test 0126 memory leak
emasab Jan 23, 2025
fc9c30b
Remove brokers that aren't up from mock metadata response.
emasab Jan 22, 2025
938a73a
More tests on fast metadata refresh,
emasab Feb 12, 2025
e965b0c
Don't remove a topic from cache before its expiration in case of a te…
emasab Feb 3, 2025
9681671
Don't mark the topic as unknown even when it's already known and we'r…
emasab Feb 3, 2025
fd772dc
Only update partition leaders if the topic has no errors.
emasab Feb 3, 2025
f98fed0
Don't set errors other than TOPIC_AUTHORIZATION_FAILED as permanent t…
emasab Feb 3, 2025
47b9ae0
Fix for issuing a metadata refresh after offsets_for_times call faile…
emasab Jan 31, 2025
7338afc
Connection close debug logs
emasab Feb 3, 2025
bc758d7
Filter jmx output for improved test speed
emasab Feb 4, 2025
8156e81
Deprecate `api.version.request`, `api.version.fallback.ms` and
emasab Feb 4, 2025
1546c79
Fix for test 0084, avoid purging the rk_ops queue when terminating th…
emasab Feb 11, 2025
4b939c2
Avoid rk_telemetry.termination_cnd is triggered when
emasab Feb 11, 2025
7364325
Fix flaky test 0059. Given the timestamp is so old it's possible that…
emasab Feb 12, 2025
39bdc0e
Test 0044: use name of the created topic
emasab Feb 13, 2025
ca2fbe3
Additional test fixes
emasab Feb 4, 2025
6d82eef
Fix flakyness in test 0061
emasab Feb 14, 2025
2d943d2
Fix flakyness with metadata propagation in test 0085
emasab Feb 14, 2025
d44e6d0
Fix flakyness in test 0102:
emasab Feb 18, 2025
debbc7d
Fix flakyness in test 0137: don't consider error count during read me…
emasab Feb 18, 2025
7e3310a
Subscription version to avoid stale metadata
emasab Feb 5, 2025
fb063f3
Use same strategy for updating cache partition metadata
emasab Feb 20, 2025
2115155
Allow unittests to complete before the timeout when using Valgrind
emasab Feb 21, 2025
68a9456
Fix for the case where a metadata refresh enqueued
emasab Feb 22, 2025
36b6312
Test exclusions when running against Apache Kafka >= 4.0
emasab Feb 26, 2025
671169f
Test 848 consumer group protocol in 4.0-rc0 with librdkafka
emasab Feb 28, 2025
52d531c
Fix flakyness in test 0001.
emasab Mar 10, 2025
c5fc667
Fix refcnt preventing final destruction of a broker.
emasab Mar 15, 2025
b7af99f
Fix flakyness in compaction test
emasab Mar 15, 2025
2625445
Fix for the case where an assert was failing
emasab Mar 19, 2025
2eea02e
Fix flakyness in test 0105 first the buffer times out and
emasab Mar 19, 2025
f52dc54
Avoid hinting partitions without requesting metadata
emasab Mar 24, 2025
6a6c6ba
Last AK RC, waiting for a trivup fix on scala
emasab Mar 25, 2025
f54e80f
Copyright updates
emasab Mar 25, 2025
e07ae46
Disabling test requiring KIP-848 TOPIC_AUTHORIZATION_FAILED handling …
emasab Mar 25, 2025
21d1596
[test 0105] Increase likehood of TxnOffsetCommit
emasab Mar 27, 2025
7d562d5
Speed up fetch restart after a fetch error and an offset validation.
emasab Mar 26, 2025
c110479
[test 0146] Move start request tracking before first produce in
emasab Mar 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ blocks:
prologue:
commands:
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
- sem-version java 17
jobs:
- name: 'Build configuration checks'
commands:
Expand Down Expand Up @@ -141,7 +142,7 @@ blocks:
- make -j -C tests build
- make -C tests run_local_quick
- DESTDIR="$PWD/dest" make install
- (cd tests && python3 -m trivup.clusters.KafkaCluster --version 3.8.0 --cpversion 7.7.0 --cmd 'make quick')
- (cd tests && python3 -m trivup.clusters.KafkaCluster --version 3.9.0 --cpversion 7.9.0 --cmd 'make quick')
- name: 'Build and integration tests with "consumer" protocol'
commands:
- wget -O rapidjson-dev.deb https://launchpad.net/ubuntu/+archive/primary/+files/rapidjson-dev_1.1.0+dfsg2-3_all.deb
Expand All @@ -159,18 +160,13 @@ blocks:
- examples/rdkafka_example -X builtin.features
- ldd src/librdkafka.so.1
- ldd src-cpp/librdkafka++.so.1
- export TEST_CONSUMER_GROUP_PROTOCOL=consumer
- make -j -C tests build
- make -C tests run_local_quick
- DESTDIR="$PWD/dest" make install
- export TEST_KAFKA_VERSION=4.0.0
- export TEST_CONSUMER_GROUP_PROTOCOL=consumer
# This commit corresponds to the DescribeConfigs API change for KIP-848
# KAFKA-14510; Extend DescribeConfigs API to support group configs (#16859)
# https://github.com/apache/kafka/commit/3a0efa2845e6a0d237772adfe6364579af50ce18
# TODO: upgrade to 4.0 when released.
- (cd tests && python3 -m trivup.clusters.KafkaCluster --kraft --version 'trunk@3a0efa2845e6a0d237772adfe6364579af50ce18'
--cpversion 7.7.0 --conf '["group.coordinator.rebalance.protocols=classic,consumer"]'
- (cd tests && python3 -m trivup.clusters.KafkaCluster --kraft --version '4.0.0-rc4'
--cpversion 7.9.0
--cmd 'make quick')


Expand Down
127 changes: 114 additions & 13 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,123 @@
# librdkafka v2.8.3
# librdkafka v2.9.0

librdkafka v2.9.0 is a feature release:

* Commits during a cooperative incremental rebalance aren't causing
an assignment lost if the generation id was bumped in between (#4908).
* Fix for librdkafka yielding before timeouts had been reached (#4970)
* Removed a 500ms latency when a consumer partition switches to a different
leader (#4970)
* The mock cluster implementation removes brokers from Metadata response
when they're not available, this simulates better the actual behavior of
a cluster that is using KRaft (#4970).
* Doesn't remove topics from cache on temporary Metadata errors but only
on metadata cache expiry (#4970).
* Doesn't mark the topic as unknown if it had been marked as existent earlier
and `topic.metadata.propagation.max.ms` hasn't passed still (@marcin-krystianc, #4970).
* Doesn't update partition leaders if the topic in metadata
response has errors (#4970).
* Only topic authorization errors in a metadata response are considered
permanent and are returned to the user (#4970).
* The function `rd_kafka_offsets_for_times` refreshes leader information
if the error requires it, allowing it to succeed on
subsequent manual retries (#4970).
* Deprecated `api.version.request`, `api.version.fallback.ms` and
`broker.version.fallback` configuration properties (#4970).
* When consumer is closed before destroying the client, the operations queue
isn't purged anymore as it contains operations
unrelated to the consumer group (#4970).
* When making multiple changes to the consumer subscription in a short time,
no unknown topic error is returned for topics that are in the new subscription but weren't in previous one (#4970).
* Prevent metadata cache corruption when topic id changes
(@kwdubuc, @marcin-krystianc, @GerKr, #4970).
* Fix for the case where a metadata refresh enqueued on an unreachable broker
prevents refreshing the controller or the coordinator until that broker
becomes reachable again (#4970).

librdkafka v2.8.3 is a maintenance release:

* Commits during a cooperative incremental rebalance aren't causing
an assignment lost if the generation id was bumped in between (#4908).
## Fixes

### General fixes

## Fixes
* Issues: #4970
librdkafka code using `cnd_timedwait` was yielding before a timeout occurred
without the condition being fulfilled because of spurious wake-ups.
Solved by verifying with a monotonic clock that the expected point in time
was reached and calling the function again if needed.
Happens since 1.x (#4970).
* Issues: #4970
Doesn't remove topics from cache on temporary Metadata errors but only
on metadata cache expiry. It allows the client to continue working
in case of temporary problems to the Kafka metadata plane.
Happens since 1.x (#4970).
* Issues: #4970
Doesn't mark the topic as unknown if it had been marked as existent earlier
and `topic.metadata.propagation.max.ms` hasn't passed still. It achieves
this property expected effect even if a different broker had
previously reported the topic as existent.
Happens since 1.x (@marcin-krystianc, #4970).
* Issues: #4907
Doesn't update partition leaders if the topic in metadata
response has errors. It's in line with what Java client does and allows
to avoid segmentation faults for unknown partitions.
Happens since 1.x (#4970).
* Issues: #4970
Only topic authorization errors in a metadata response are considered
permanent and are returned to the user. It's in line with what Java client
does and avoids returning to the user an error that wasn't meant to be
permanent.
Happens since 1.x (#4970).
* Issues: #4964, #4778
Prevent metadata cache corruption when topic id for the same topic name
changes. Solved by correctly removing the entry with the old topic id from metadata cache
to prevent subsequent use-after-free.
Happens since 2.4.0 (@kwdubuc, @marcin-krystianc, @GerKr, #4970).
* Issues: #4970
Fix for the case where a metadata refresh enqueued on an unreachable broker
prevents refreshing the controller or the coordinator until that broker
becomes reachable again. Given the request continues to be retried on that
broker, the counter for refreshing complete broker metadata doesn't reach
zero and prevents the client from obtaining the new controller or group or transactional coordinator.
It causes a series of debug messages like:
"Skipping metadata request: ... full request already in-transit", until
the broker the request is enqueued on is up again.
Solved by not retrying these kinds of metadata requests.
Happens since 1.x (#4970).

### Consumer fixes

* Issues: #4059
Commits during a cooperative incremental rebalance could cause an
assignment lost if the generation id was bumped by a second join
group request.
Solved by not rejoining the group in case an illegal generation error happens
during a rebalance.
Happening since v1.6.0 (#4908)
* Issues: #4059
Commits during a cooperative incremental rebalance could cause an
assignment lost if the generation id was bumped by a second join
group request.
Solved by not rejoining the group in case an illegal generation error happens
during a rebalance.
Happening since v1.6.0 (#4908)
* Issues: #4970
When switching to a different leader a consumer could wait 500ms
(`fetch.error.backoff.ms`) before starting to fetch again. The fetch backoff wasn't reset when joining the new broker.
Solved by resetting it, given it's not needed to backoff
the first fetch on a different node. This way faster leader switches are
possible.
Happens since 1.x (#4970).
* Issues: #4970
The function `rd_kafka_offsets_for_times` refreshes leader information
if the error requires it, allowing it to succeed on
subsequent manual retries. Similar to the fix done in 2.3.0 in
`rd_kafka_query_watermark_offsets`. Additionally, the partition
current leader epoch is taken from metadata cache instead of
from passed partitions.
Happens since 1.x (#4970).
* Issues: #4970
When consumer is closed before destroying the client, the operations queue
isn't purged anymore as it contains operations
unrelated to the consumer group.
Happens since 1.x (#4970).
* Issues: #4970
When making multiple changes to the consumer subscription in a short time,
no unknown topic error is returned for topics that are in the new subscription
but weren't in previous one. This was due to the metadata request relative
to previous subscription.
Happens since 1.x (#4970).



Expand Down
6 changes: 3 additions & 3 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ resolve_cb | * | |
opaque | * | | | low | Application opaque (set with rd_kafka_conf_set_opaque()) <br>*Type: see dedicated API*
default_topic_conf | * | | | low | Default topic configuration for automatically subscribed topics <br>*Type: see dedicated API*
internal.termination.signal | * | 0 .. 128 | 0 | low | Signal that librdkafka will use to quickly terminate on rd_kafka_destroy(). If this signal is not set then there will be a delay before rd_kafka_wait_destroyed() returns true as internal threads are timing out their system calls. If this signal is set however the delay will be minimal. The application should mask this signal as an internal signal handler is installed. <br>*Type: integer*
api.version.request | * | true, false | true | high | Request broker's supported API versions to adjust functionality to available protocol features. If set to false, or the ApiVersionRequest fails, the fallback version `broker.version.fallback` will be used. **NOTE**: Depends on broker version >=0.10.0. If the request is not supported by (an older) broker the `broker.version.fallback` fallback is used. <br>*Type: boolean*
api.version.request | * | true, false | true | high | **DEPRECATED** **Post-deprecation actions: remove this configuration property, brokers < 0.10.0 won't be supported anymore in librdkafka 3.x.** Request broker's supported API versions to adjust functionality to available protocol features. If set to false, or the ApiVersionRequest fails, the fallback version `broker.version.fallback` will be used. **NOTE**: Depends on broker version >=0.10.0. If the request is not supported by (an older) broker the `broker.version.fallback` fallback is used. <br>*Type: boolean*
api.version.request.timeout.ms | * | 1 .. 300000 | 10000 | low | Timeout for broker API version requests. <br>*Type: integer*
api.version.fallback.ms | * | 0 .. 604800000 | 0 | medium | Dictates how long the `broker.version.fallback` fallback is used in the case the ApiVersionRequest fails. **NOTE**: The ApiVersionRequest is only issued when a new connection to the broker is made (such as after an upgrade). <br>*Type: integer*
broker.version.fallback | * | | 0.10.0 | medium | Older broker versions (before 0.10.0) provide no way for a client to query for supported protocol features (ApiVersionRequest, see `api.version.request`) making it impossible for the client to know what features it may use. As a workaround a user may set this property to the expected broker version and the client will automatically adjust its feature set accordingly if the ApiVersionRequest fails (or is disabled). The fallback broker version will be used for `api.version.fallback.ms`. Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. Any other value >= 0.10, such as 0.10.2.1, enables ApiVersionRequests. <br>*Type: string*
api.version.fallback.ms | * | 0 .. 604800000 | 0 | medium | **DEPRECATED** **Post-deprecation actions: remove this configuration property, brokers < 0.10.0 won't be supported anymore in librdkafka 3.x.** Dictates how long the `broker.version.fallback` fallback is used in the case the ApiVersionRequest fails. **NOTE**: The ApiVersionRequest is only issued when a new connection to the broker is made (such as after an upgrade). <br>*Type: integer*
broker.version.fallback | * | | 0.10.0 | medium | **DEPRECATED** **Post-deprecation actions: remove this configuration property, brokers < 0.10.0 won't be supported anymore in librdkafka 3.x.** Older broker versions (before 0.10.0) provide no way for a client to query for supported protocol features (ApiVersionRequest, see `api.version.request`) making it impossible for the client to know what features it may use. As a workaround a user may set this property to the expected broker version and the client will automatically adjust its feature set accordingly if the ApiVersionRequest fails (or is disabled). The fallback broker version will be used for `api.version.fallback.ms`. Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. Any other value >= 0.10, such as 0.10.2.1, enables ApiVersionRequests. <br>*Type: string*
allow.auto.create.topics | * | true, false | false | low | Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics. The broker must also be configured with `auto.create.topics.enable=true` for this configuration to take effect. Note: the default value (true) for the producer is different from the default value (false) for the consumer. Further, the consumer default value is different from the Java consumer (true), and this property is not supported by the Java producer. Requires broker version >= 0.11.0.0, for older broker versions only the broker configuration applies. <br>*Type: boolean*
security.protocol | * | plaintext, ssl, sasl_plaintext, sasl_ssl | plaintext | high | Protocol used to communicate with brokers. <br>*Type: enum value*
ssl.cipher.suites | * | | | low | A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. See manual page for `ciphers(1)` and `SSL_CTX_set_cipher_list(3). <br>*Type: string*
Expand Down
26 changes: 19 additions & 7 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1017,8 +1017,7 @@ void rd_kafka_destroy_final(rd_kafka_t *rk) {
cnd_destroy(&rk->rk_init_cnd);
mtx_destroy(&rk->rk_init_lock);

if (rk->rk_full_metadata)
rd_kafka_metadata_destroy(&rk->rk_full_metadata->metadata);

rd_kafkap_str_destroy(rk->rk_client_id);
rd_kafkap_str_destroy(rk->rk_group_id);
rd_kafkap_str_destroy(rk->rk_eos.transactional_id);
Expand Down Expand Up @@ -2082,15 +2081,15 @@ static void rd_kafka_metadata_refresh_cb(rd_kafka_timers_t *rkts, void *arg) {
* @locks none
*/
static int rd_kafka_init_wait(rd_kafka_t *rk, int timeout_ms) {
struct timespec tspec;
int ret;
rd_ts_t abs_timeout;

rd_timeout_init_timespec(&tspec, timeout_ms);
abs_timeout = rd_timeout_init(timeout_ms);

mtx_lock(&rk->rk_init_lock);
while (rk->rk_init_wait_cnt > 0 &&
cnd_timedwait_abs(&rk->rk_init_cnd, &rk->rk_init_lock, &tspec) ==
thrd_success)
cnd_timedwait_abs(&rk->rk_init_cnd, &rk->rk_init_lock,
abs_timeout) == thrd_success)
;
ret = rk->rk_init_wait_cnt;
mtx_unlock(&rk->rk_init_lock);
Expand Down Expand Up @@ -3799,6 +3798,7 @@ static void rd_kafka_get_offsets_for_times_resp_cb(rd_kafka_t *rk,
rd_kafka_buf_t *request,
void *opaque) {
struct _get_offsets_for_times *state;
int actions = 0;

if (err == RD_KAFKA_RESP_ERR__DESTROY) {
/* 'state' has gone out of scope when offsets_for_times()
Expand All @@ -3809,10 +3809,22 @@ static void rd_kafka_get_offsets_for_times_resp_cb(rd_kafka_t *rk,
state = opaque;

err = rd_kafka_handle_ListOffsets(rk, rkb, err, rkbuf, request,
state->results, NULL);
state->results, &actions);
if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
return; /* Retrying */

if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
rd_kafka_topic_partition_t *rktpar;
/* Remove its cache in case the topic isn't a known topic. */
rd_kafka_wrlock(rk);
RD_KAFKA_TPLIST_FOREACH(rktpar, state->results) {
if (rktpar->err)
rd_kafka_metadata_cache_delete_by_name(
rk, rktpar->topic);
}
rd_kafka_wrunlock(rk);
}

/* Retry if no broker connection is available yet. */
if (err == RD_KAFKA_RESP_ERR__TRANSPORT && rkb &&
rd_kafka_brokers_wait_state_change(
Expand Down
5 changes: 3 additions & 2 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -1511,8 +1511,9 @@ rd_kafka_admin_MetadataRequest(rd_kafka_broker_t *rkb,
rd_false /* No admin operation requires topic creation. */,
include_cluster_authorized_operations,
include_topic_authorized_operations,
rd_false /* No admin operation should update cgrp. */, force_racks,
resp_cb, replyq,
rd_false /* No admin operation should update cgrp. */,
-1 /* No subscription version is used */, force_racks, resp_cb,
replyq,
rd_true /* Admin operation metadata requests are always forced. */,
opaque);
}
Expand Down
Loading