Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ librdkafka v2.4.0 is a feature release:
max period of 1 ms (#4671).
* Fixed a bug causing duplicate message consumption from a stale
fetch start offset in some particular cases (#4636)
* [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4676)
* Fix to metadata cache expiration on full metadata refresh (#4677).
* Fix for a wrong error returned on full metadata refresh before joining
a consumer group (#4678).
* Fix to metadata refresh interruption (#4679).
* Fix for an undesired partition migration with stale leader epoch (#4680).


## Upgrade considerations
Expand All @@ -28,25 +36,49 @@ librdkafka v2.4.0 is a feature release:

### General fixes

* In librdkafka release pipeline a static build containing libsasl2
* Issues: [confluentinc/confluent-kafka-go#981](https://github.com/confluentinc/confluent-kafka-go/issues/981).
In librdkafka release pipeline a static build containing libsasl2
could be chosen instead of the alternative one without it.
That caused the libsasl2 dependency to be required in confluent-kafka-go
v2.1.0-linux-musl-arm64 and v2.3.0-linux-musl-arm64.
Solved by correctly excluding the binary configured with that library,
when targeting a static build.
Happening since v2.0.2, with specified platforms, when using static binaries (#4666).
* When the main thread loop was awakened less than 1 ms
Happening since v2.0.2, with specified platforms,
when using static binaries (#4666).
* Issues: #4684.
When the main thread loop was awakened less than 1 ms
before the expiration of a timeout, it was serving with a zero timeout,
leading to increased CPU usage until the timeout was reached.
Happening since 1.x (#4671).
Happening since 1.x.
* Issues: #4685.
Metadata cache was cleared on full metadata refresh, leading to unnecessary
refreshes and occasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating
cache for existing or hinted entries instead of clearing them.
Happening since 2.1.0 (#4677).
* Issues: #4589.
A metadata call before member joins consumer group,
could lead to an `UNKNOWN_TOPIC_OR_PART` error. Solved by updating
the consumer group following a metadata refresh only in safe states.
Happening since 2.1.0 (#4678).
* Issues: #4577.
Metadata refreshes without partition leader change could lead to a loop of
metadata calls at fixed intervals. Solved by stopping metadata refresh when
all existing metadata is non-stale. Happening since 2.3.0 (#4679).
* Issues: #4687.
A partition migration could happen, using stale metadata, when the partition
was undergoing a validation and being retried because of an error.
Solved by doing a partition migration only with a non-stale leader epoch.
Happening since 2.1.0 (#4680).

### Consumer fixes

* In case of subscription change with a consumer using the cooperative assignor
* Issues: #4686.
In case of subscription change with a consumer using the cooperative assignor
it could resume fetching from a previous position.
That could also happen if resuming a partition that wasn't paused.
Fixed by ensuring that a resume operation is completely a no-op when
the partition isn't paused (#4636).
the partition isn't paused.
Happening since 1.x (#4636).



Expand Down
4 changes: 3 additions & 1 deletion CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ group.instance.id | C | |
partition.assignment.strategy | C | | range,roundrobin | medium | The name of one or more partition assignment strategies. The elected group leader will use a strategy supported by all members of the group to assign partitions to group members. If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority). Cooperative and non-cooperative (eager) strategies must not be mixed. Available strategies: range, roundrobin, cooperative-sticky. <br>*Type: string*
session.timeout.ms | C | 1 .. 3600000 | 45000 | high | Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. Also see `max.poll.interval.ms`. <br>*Type: integer*
heartbeat.interval.ms | C | 1 .. 3600000 | 3000 | low | Group session keepalive heartbeat interval. <br>*Type: integer*
group.protocol.type | C | | consumer | low | Group protocol type for the `generic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
group.protocol.type | C | | consumer | low | Group protocol type for the `classic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
group.protocol | C | classic, consumer | classic | high | Group protocol to use. Use `classic` for the original protocol and `consumer` for the new protocol introduced in KIP-848. Available protocols: classic or consumer. Default is `classic`, but will change to `consumer` in next releases. <br>*Type: enum value*
group.remote.assignor | C | | | medium | Server side assignor to use. Keep it null to make server select a suitable assignor for the group. Available assignors: uniform or range. Default is null <br>*Type: string*
coordinator.query.interval.ms | C | 1 .. 3600000 | 600000 | low | How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment. <br>*Type: integer*
max.poll.interval.ms | C | 1 .. 86400000 | 300000 | high | Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. Note: It is recommended to set `enable.auto.offset.store=false` for long-time processing applications and then explicitly store offsets (using offsets_store()) *after* message processing, to make sure offsets are not auto-committed prior to processing has finished. The interval is checked two times per second. See KIP-62 for more information. <br>*Type: integer*
enable.auto.commit | C | true, false | true | high | Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign(). <br>*Type: boolean*
Expand Down
80 changes: 40 additions & 40 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1967,50 +1967,50 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf

### Supported protocol versions

"Kafka max" is the maximum ApiVersion supported in Apache Kafka 3.5.0, while
"Kafka max" is the maximum ApiVersion supported in Apache Kafka 3.7.0, while
"librdkafka max" is the maximum ApiVersion supported in the latest
release of librdkafka.


| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ------------------------------| ----------- | ----------------------- |
| 0 | Produce | 10 | 8 |
| 1 | Fetch | 15 | 11 |
| 2 | ListOffsets | 8 | 7 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 8 | 7 |
| 9 | OffsetFetch | 8 | 7 |
| 10 | FindCoordinator | 4 | 2 |
| 11 | JoinGroup | 9 | 5 |
| 12 | Heartbeat | 4 | 3 |
| 13 | LeaveGroup | 5 | 1 |
| 14 | SyncGroup | 5 | 3 |
| 15 | DescribeGroups | 5 | 4 |
| 16 | ListGroups | 4 | 4 |
| 17 | SaslHandshake | 1 | 1 |
| 18 | ApiVersions | 3 | 3 |
| 19 | CreateTopics | 7 | 4 |
| 20 | DeleteTopics | 6 | 1 |
| 21 | DeleteRecords | 2 | 1 |
| 22 | InitProducerId | 4 | 4 |
| 23 | OffsetForLeaderEpoch | 4 | 2 |
| 24 | AddPartitionsToTxn | 4 | 0 |
| 25 | AddOffsetsToTxn | 3 | 0 |
| 26 | EndTxn | 3 | 1 |
| 28 | TxnOffsetCommit | 3 | 3 |
| 29 | DescribeAcls | 3 | 1 |
| 30 | CreateAcls | 3 | 1 |
| 31 | DeleteAcls | 3 | 1 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 2 |
| 36 | SaslAuthenticate | 2 | 1 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 44 | IncrementalAlterConfigs | 1 | 1 |
| 47 | OffsetDelete | 0 | 0 |
| 50 | DescribeUserScramCredentials | 0 | 0 |
| 51 | AlterUserScramCredentials | 0 | 0 |

| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ----------------------------- | ---------- | -------------- |
| 0 | Produce | 10 | 8 |
| 1 | Fetch | 16 | 11 |
| 2 | ListOffsets | 8 | 7 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 9 | 9 |
| 9 | OffsetFetch | 9 | 9 |
| 10 | FindCoordinator | 4 | 2 |
| 11 | JoinGroup | 9 | 5 |
| 12 | Heartbeat | 4 | 3 |
| 13 | LeaveGroup | 5 | 1 |
| 14 | SyncGroup | 5 | 3 |
| 15 | DescribeGroups | 5 | 4 |
| 16 | ListGroups | 4 | 4 |
| 17 | SaslHandshake | 1 | 1 |
| 18 | ApiVersions | 3 | 3 |
| 19 | CreateTopics | 7 | 4 |
| 20 | DeleteTopics | 6 | 1 |
| 21 | DeleteRecords | 2 | 1 |
| 22 | InitProducerId | 4 | 4 |
| 23 | OffsetForLeaderEpoch | 4 | 2 |
| 24 | AddPartitionsToTxn | 4 | 0 |
| 25 | AddOffsetsToTxn | 3 | 0 |
| 26 | EndTxn | 3 | 1 |
| 28 | TxnOffsetCommit | 3 | 3 |
| 29 | DescribeAcls | 3 | 1 |
| 30 | CreateAcls | 3 | 1 |
| 31 | DeleteAcls | 3 | 1 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 2 |
| 36 | SaslAuthenticate | 2 | 1 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 44 | IncrementalAlterConfigs | 1 | 1 |
| 47 | OffsetDelete | 0 | 0 |
| 50 | DescribeUserScramCredentials | 0 | 0 |
| 51 | AlterUserScramCredentials | 0 | 0 |
| 68 | ConsumerGroupHeartbeat | 0 | 0 |

# Recommendations for language binding developers

Expand Down
2 changes: 1 addition & 1 deletion examples/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,4 @@ int main(int argc, char **argv) {
rd_kafka_destroy(rk);

return 0;
}
}
68 changes: 68 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,12 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID, "Broker: Unknown topic id"),
_ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH,
"Broker: The member epoch is fenced by the group coordinator"),
_ERR_DESC(RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID,
"Broker: The instance ID is still used by another member in the "
"consumer group"),
_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR,
"Broker: The assignor or its version range is not supported by "
"the consumer group"),
_ERR_DESC(RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH,
"Broker: The member epoch is stale"),
_ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)};
Expand Down Expand Up @@ -1610,6 +1616,7 @@ static void rd_kafka_stats_emit_broker_reqs(struct _stats_emit *st,
[RD_KAFKAP_BrokerHeartbeat] = rd_true,
[RD_KAFKAP_UnregisterBroker] = rd_true,
[RD_KAFKAP_AllocateProducerIds] = rd_true,
[RD_KAFKAP_ConsumerGroupHeartbeat] = rd_true,
},
[3 /*hide-unless-non-zero*/] = {
/* Hide Admin requests unless they've been used */
Expand Down Expand Up @@ -2179,6 +2186,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
int ret_errno = 0;
const char *conf_err;
char *group_remote_assignor_override = NULL;
#ifndef _WIN32
sigset_t newset, oldset;
#endif
Expand Down Expand Up @@ -2370,6 +2378,64 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
goto fail;
}

if (!rk->rk_conf.group_remote_assignor) {
rd_kafka_assignor_t *cooperative_assignor;

/* Detect if chosen assignor is cooperative
* FIXME: remove this compatibility altogether
* and apply the breaking changes that will be required
* in next major version. */

cooperative_assignor =
rd_kafka_assignor_find(rk, "cooperative-sticky");
rk->rk_conf.partition_assignors_cooperative =
!rk->rk_conf.partition_assignors.rl_cnt ||
(cooperative_assignor &&
cooperative_assignor->rkas_enabled);

if (rk->rk_conf.group_protocol ==
RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
/* Default remote assignor to the chosen local one. */
if (rk->rk_conf.partition_assignors_cooperative) {
group_remote_assignor_override =
rd_strdup("uniform");
rk->rk_conf.group_remote_assignor =
group_remote_assignor_override;
} else {
rd_kafka_assignor_t *range_assignor =
rd_kafka_assignor_find(rk, "range");
if (range_assignor &&
range_assignor->rkas_enabled) {
rd_kafka_log(
rk, LOG_WARNING, "ASSIGNOR",
"\"range\" assignor is sticky "
"with group protocol CONSUMER");
group_remote_assignor_override =
rd_strdup("range");
rk->rk_conf.group_remote_assignor =
group_remote_assignor_override;
} else {
rd_kafka_log(
rk, LOG_WARNING, "ASSIGNOR",
"roundrobin assignor isn't "
"available"
"with group protocol CONSUMER, "
"using the \"uniform\" one. "
"It's similar, "
"but it's also sticky");
group_remote_assignor_override =
rd_strdup("uniform");
rk->rk_conf.group_remote_assignor =
group_remote_assignor_override;
}
}
}
} else {
/* When users starts setting properties of the new protocol,
* they can only use incremental_assign/unassign. */
rk->rk_conf.partition_assignors_cooperative = rd_true;
}

/* Create Mock cluster */
rd_atomic32_init(&rk->rk_mock.cluster_cnt, 0);
if (rk->rk_conf.mock.broker_cnt > 0) {
Expand Down Expand Up @@ -2639,6 +2705,8 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
* that belong to rk_conf and thus needs to be cleaned up.
* Legacy APIs, sigh.. */
if (app_conf) {
if (group_remote_assignor_override)
rd_free(group_remote_assignor_override);
rd_kafka_assignors_term(rk);
rd_kafka_interceptors_destroy(&rk->rk_conf);
memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));
Expand Down
Loading