Skip to content

Commit b42d0d9

Browse files
committed
[KIP-848] Use metadata cache by topic id, fixes and improvements (#4661)
- rename 'generic' protocol to 'classic' - consumer group serve timer to awake the loop earlier - compare and find into topic partition list by topic id only - fix memory leak when instance creation fails and app_conf is provided - fix cases where HB response is received after unsubscription - use topic name from current assignment if it's missing from metadata - expedite heartbeat simplification and fixes to next interval check and to the place where number of retries is increased - expedite HB after changing state back to INIT - use the CONSUMER_F_WAIT_REJOIN to trigger the rejoin instead of calling it from max poll interval timer - schedule timer for next execution reschedule it if expected earlier expedite through scheduling cgrp serve timer - treat unsupported feature error as fatal - avoid removing partitions not matched by a new subscription immediately, for a possible desynchronization with the coordinator, with said partitions not being consumed anymore
1 parent 194fb5d commit b42d0d9

14 files changed

+467
-350
lines changed

CONFIGURATION.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ group.instance.id | C | |
109109
partition.assignment.strategy | C | | range,roundrobin | medium | The name of one or more partition assignment strategies. The elected group leader will use a strategy supported by all members of the group to assign partitions to group members. If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority). Cooperative and non-cooperative (eager) strategies must not be mixed. Available strategies: range, roundrobin, cooperative-sticky. <br>*Type: string*
110110
session.timeout.ms | C | 1 .. 3600000 | 45000 | high | Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. Also see `max.poll.interval.ms`. <br>*Type: integer*
111111
heartbeat.interval.ms | C | 1 .. 3600000 | 3000 | low | Group session keepalive heartbeat interval. <br>*Type: integer*
112-
group.protocol.type | C | | consumer | low | Group protocol type for the `generic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
112+
group.protocol.type | C | | consumer | low | Group protocol type for the `classic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
113+
group.protocol | C | classic, consumer | classic | high | Group protocol to use. Use `classic` for the original protocol and `consumer` for the new protocol introduced in KIP-848. Available protocols: classic or consumer. Default is `classic`, but will change to `consumer` in next releases. <br>*Type: enum value*
114+
group.remote.assignor | C | | | medium | Server side assignor to use. Keep it null to make server select a suitable assignor for the group. Available assignors: uniform or range. Default is null <br>*Type: string*
113115
coordinator.query.interval.ms | C | 1 .. 3600000 | 600000 | low | How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment. <br>*Type: integer*
114116
max.poll.interval.ms | C | 1 .. 86400000 | 300000 | high | Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. Note: It is recommended to set `enable.auto.offset.store=false` for long-time processing applications and then explicitly store offsets (using offsets_store()) *after* message processing, to make sure offsets are not auto-committed prior to processing has finished. The interval is checked two times per second. See KIP-62 for more information. <br>*Type: integer*
115117
enable.auto.commit | C | true, false | true | high | Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign(). <br>*Type: boolean*

src/rdkafka.c

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2186,7 +2186,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
21862186
rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
21872187
int ret_errno = 0;
21882188
const char *conf_err;
2189-
rd_kafka_assignor_t *cooperative_assignor;
2189+
char *group_remote_assignor_override = NULL;
21902190
#ifndef _WIN32
21912191
sigset_t newset, oldset;
21922192
#endif
@@ -2378,24 +2378,62 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
23782378
goto fail;
23792379
}
23802380

2381-
/* Detect if chosen assignor is cooperative */
2382-
cooperative_assignor = rd_kafka_assignor_find(rk, "cooperative-sticky");
2383-
rk->rk_conf.partition_assignors_cooperative =
2384-
!rk->rk_conf.partition_assignors.rl_cnt ||
2385-
(cooperative_assignor && cooperative_assignor->rkas_enabled);
2386-
23872381
if (!rk->rk_conf.group_remote_assignor) {
2388-
/* Default remote assignor to the chosen local one. */
2389-
if (rk->rk_conf.partition_assignors_cooperative) {
2390-
rk->rk_conf.group_remote_assignor =
2391-
rd_strdup("uniform");
2392-
} else {
2393-
rd_kafka_assignor_t *range_assignor =
2394-
rd_kafka_assignor_find(rk, "range");
2395-
if (range_assignor && range_assignor->rkas_enabled)
2382+
rd_kafka_assignor_t *cooperative_assignor;
2383+
2384+
/* Detect if chosen assignor is cooperative
2385+
* FIXME: remove this compatibility altogether
2386+
* and apply the breaking changes that will be required
2387+
* in next major version. */
2388+
2389+
cooperative_assignor =
2390+
rd_kafka_assignor_find(rk, "cooperative-sticky");
2391+
rk->rk_conf.partition_assignors_cooperative =
2392+
!rk->rk_conf.partition_assignors.rl_cnt ||
2393+
(cooperative_assignor &&
2394+
cooperative_assignor->rkas_enabled);
2395+
2396+
if (rk->rk_conf.group_protocol ==
2397+
RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
2398+
/* Default remote assignor to the chosen local one. */
2399+
if (rk->rk_conf.partition_assignors_cooperative) {
2400+
group_remote_assignor_override =
2401+
rd_strdup("uniform");
23962402
rk->rk_conf.group_remote_assignor =
2397-
rd_strdup("range");
2403+
group_remote_assignor_override;
2404+
} else {
2405+
rd_kafka_assignor_t *range_assignor =
2406+
rd_kafka_assignor_find(rk, "range");
2407+
if (range_assignor &&
2408+
range_assignor->rkas_enabled) {
2409+
rd_kafka_log(
2410+
rk, LOG_WARNING, "ASSIGNOR",
2411+
"\"range\" assignor is sticky "
2412+
"with group protocol CONSUMER");
2413+
group_remote_assignor_override =
2414+
rd_strdup("range");
2415+
rk->rk_conf.group_remote_assignor =
2416+
group_remote_assignor_override;
2417+
} else {
2418+
rd_kafka_log(
2419+
rk, LOG_WARNING, "ASSIGNOR",
2420+
"roundrobin assignor isn't "
2421+
"available"
2422+
"with group protocol CONSUMER, "
2423+
"using the \"uniform\" one. "
2424+
"It's similar, "
2425+
"but it's also sticky");
2426+
group_remote_assignor_override =
2427+
rd_strdup("uniform");
2428+
rk->rk_conf.group_remote_assignor =
2429+
group_remote_assignor_override;
2430+
}
2431+
}
23982432
}
2433+
} else {
2434+
/* When users starts setting properties of the new protocol,
2435+
* they can only use incremental_assign/unassign. */
2436+
rk->rk_conf.partition_assignors_cooperative = rd_true;
23992437
}
24002438

24012439
/* Create Mock cluster */
@@ -2667,6 +2705,8 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
26672705
* that belong to rk_conf and thus needs to be cleaned up.
26682706
* Legacy APIs, sigh.. */
26692707
if (app_conf) {
2708+
if (group_remote_assignor_override)
2709+
rd_free(group_remote_assignor_override);
26702710
rd_kafka_assignors_term(rk);
26712711
rd_kafka_interceptors_destroy(&rk->rk_conf);
26722712
memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));

src/rdkafka_admin.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6490,7 +6490,7 @@ rd_kafka_DeleteAclsResponse_parse(rd_kafka_op_t *rko_req,
64906490
result_response =
64916491
rd_kafka_DeleteAcls_result_response_new(error_code, errstr);
64926492

6493-
/* #maching_acls */
6493+
/* #matching_acls */
64946494
rd_kafka_buf_read_arraycnt(reply, &matching_acls_cnt, 100000);
64956495
for (j = 0; j < (int)matching_acls_cnt; j++) {
64966496
int16_t acl_error_code;

src/rdkafka_assignor.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1279,7 +1279,7 @@ int verifyValidityAndBalance0(const char *func,
12791279
* it means the assignment strategy failed to
12801280
* properly balance the partitions. */
12811281
if (!balanced &&
1282-
rd_kafka_topic_partition_list_find_topic(
1282+
rd_kafka_topic_partition_list_find_topic_by_name(
12831283
otherPartitions, partition->topic)) {
12841284
RD_UT_WARN(
12851285
"Some %s partition(s) can be "

0 commit comments

Comments
 (0)