Skip to content

Commit e5f3101

Browse files
committed
[KIP-848] Use metadata cache by topic id,
- rename 'generic' protocol to 'classic' - consumer group serve timer to awake the loop earlier - compare and find into topic partition list by topic id only - fix memory leak when instance creation fails and app_conf is provided - fix cases where HB response is received after unsubscription - use topic name from current assignment if it's missing from metadata
1 parent f5ffbf8 commit e5f3101

File tree

9 files changed

+416
-216
lines changed

9 files changed

+416
-216
lines changed

CONFIGURATION.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ group.instance.id | C | |
109109
partition.assignment.strategy | C | | range,roundrobin | medium | The name of one or more partition assignment strategies. The elected group leader will use a strategy supported by all members of the group to assign partitions to group members. If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority). Cooperative and non-cooperative (eager) strategies must not be mixed. Available strategies: range, roundrobin, cooperative-sticky. <br>*Type: string*
110110
session.timeout.ms | C | 1 .. 3600000 | 45000 | high | Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. Also see `max.poll.interval.ms`. <br>*Type: integer*
111111
heartbeat.interval.ms | C | 1 .. 3600000 | 3000 | low | Group session keepalive heartbeat interval. <br>*Type: integer*
112-
group.protocol.type | C | | consumer | low | Group protocol type for the `generic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
112+
group.protocol.type | C | | consumer | low | Group protocol type for the `classic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
113113
coordinator.query.interval.ms | C | 1 .. 3600000 | 600000 | low | How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment. <br>*Type: integer*
114114
max.poll.interval.ms | C | 1 .. 86400000 | 300000 | high | Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. Note: It is recommended to set `enable.auto.offset.store=false` for long-time processing applications and then explicitly store offsets (using offsets_store()) *after* message processing, to make sure offsets are not auto-committed prior to processing has finished. The interval is checked two times per second. See KIP-62 for more information. <br>*Type: integer*
115115
enable.auto.commit | C | true, false | true | high | Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign(). <br>*Type: boolean*

src/rdkafka.c

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2183,6 +2183,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
21832183
rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
21842184
int ret_errno = 0;
21852185
const char *conf_err;
2186+
char *group_remote_assignor_override = NULL;
21862187
rd_kafka_assignor_t *cooperative_assignor;
21872188
#ifndef _WIN32
21882189
sigset_t newset, oldset;
@@ -2384,14 +2385,18 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
23842385
if (!rk->rk_conf.group_remote_assignor) {
23852386
/* Default remote assignor to the chosen local one. */
23862387
if (rk->rk_conf.partition_assignors_cooperative) {
2388+
group_remote_assignor_override = rd_strdup("uniform");
23872389
rk->rk_conf.group_remote_assignor =
2388-
rd_strdup("uniform");
2390+
group_remote_assignor_override;
23892391
} else {
23902392
rd_kafka_assignor_t *range_assignor =
23912393
rd_kafka_assignor_find(rk, "range");
2392-
if (range_assignor && range_assignor->rkas_enabled)
2393-
rk->rk_conf.group_remote_assignor =
2394+
if (range_assignor && range_assignor->rkas_enabled) {
2395+
group_remote_assignor_override =
23942396
rd_strdup("range");
2397+
rk->rk_conf.group_remote_assignor =
2398+
group_remote_assignor_override;
2399+
}
23952400
}
23962401
}
23972402

@@ -2664,8 +2669,11 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
26642669
* that belong to rk_conf and thus needs to be cleaned up.
26652670
* Legacy APIs, sigh.. */
26662671
if (app_conf) {
2672+
if (group_remote_assignor_override)
2673+
rd_free(group_remote_assignor_override);
26672674
rd_kafka_assignors_term(rk);
26682675
rd_kafka_interceptors_destroy(&rk->rk_conf);
2676+
26692677
memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));
26702678
}
26712679

0 commit comments

Comments
 (0)