Skip to content

Commit e8a0178

Browse files
committed
[KIP-848] Assign, revoke, leave group flows
- Rebased with master - WIP: assignment, revocation, leave group flow - Remove print statements - Remove print statement left - Separate rd_kafka_cgrp_consumer_assignment_done - Allow changing subscription to empty - Expedite next heartbeat - Static group membership and max poll interval checks - Expedite next heartbeat - Fix existing protocol - Partial implementation of reconciliation and next assignment handling - Uniform tests handling across scripts and KRaft mode - Run tests with group.protocol=consumer and reusable condition to skip mock cluster - Test 0113 partial - Test 0018 - Test 0113 stickyness - Test 0113 complete except regex subscription and u_multiple_subscription_changes(true) - Skip some tests, fix subscription change - Test 0029 exclusion clarified - Debug statements - Introduce current assignment rename rkcg_current_target_assignments to rkcg_target_assignment rename rkcg_next_target_assignments to rkcg_next_target_assignment - change to ConsumerGroupHeartbeat in logs - Add remote assignor to debug log - Fix rd_kafka_buf_write_topic_partitions not using topic ids for comparison
1 parent e62c2b5 commit e8a0178

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1617
-821
lines changed

CONFIGURATION.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Configuration properties
22
## Global configuration properties
33

4-
Property | C/P | Range | Default | Importance | Description
4+
Property | C/P | Range | Default | Importance | Description
55
-----------------------------------------|-----|-----------------|--------------:|------------| --------------------------
66
builtin.features | * | | gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, plugins, zstd, sasl_oauthbearer, http, oidc | low | Indicates the builtin features for this build of librdkafka. An application can either query this value or attempt to set it with its list of required features to check for library support. <br>*Type: CSV flags*
77
client.id | * | | rdkafka | low | Client identifier. <br>*Type: string*
@@ -158,7 +158,7 @@ client.dns.lookup | * | use_all_dns_ips, resolve_canoni
158158

159159
## Topic configuration properties
160160

161-
Property | C/P | Range | Default | Importance | Description
161+
Property | C/P | Range | Default | Importance | Description
162162
-----------------------------------------|-----|-----------------|--------------:|------------| --------------------------
163163
request.required.acks | P | -1 .. 1000 | -1 | high | This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request: *0*=Broker does not send any response/ack to client, *-1* or *all*=Broker will block until message is committed by all in sync replicas (ISRs). If there are less than `min.insync.replicas` (broker configuration) in the ISR set the produce request will fail. <br>*Type: integer*
164164
acks | P | -1 .. 1000 | -1 | high | Alias for `request.required.acks`: This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request: *0*=Broker does not send any response/ack to client, *-1* or *all*=Broker will block until message is committed by all in sync replicas (ISRs). If there are less than `min.insync.replicas` (broker configuration) in the ISR set the produce request will fail. <br>*Type: integer*

examples/consumer.c

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -127,20 +127,6 @@ int main(int argc, char **argv) {
127127
return 1;
128128
}
129129

130-
if (rd_kafka_conf_set(conf, "group.protocol", "consumer", errstr,
131-
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
132-
fprintf(stderr, "%s\n", errstr);
133-
rd_kafka_conf_destroy(conf);
134-
return 1;
135-
}
136-
137-
// if (rd_kafka_conf_set(conf, "debug", "all", errstr,
138-
// sizeof(errstr)) != RD_KAFKA_CONF_OK) {
139-
// fprintf(stderr, "%s\n", errstr);
140-
// rd_kafka_conf_destroy(conf);
141-
// return 1;
142-
// }
143-
144130
/* If there is no previously committed offset for a partition
145131
* the auto.offset.reset strategy will be used to decide where
146132
* in the partition to start fetching messages.

src/rdkafka.c

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1607,6 +1607,7 @@ static void rd_kafka_stats_emit_broker_reqs(struct _stats_emit *st,
16071607
[RD_KAFKAP_BrokerHeartbeat] = rd_true,
16081608
[RD_KAFKAP_UnregisterBroker] = rd_true,
16091609
[RD_KAFKAP_AllocateProducerIds] = rd_true,
1610+
[RD_KAFKAP_ConsumerGroupHeartbeat] = rd_true,
16101611
},
16111612
[3 /*hide-unless-non-zero*/] = {
16121613
/* Hide Admin requests unless they've been used */
@@ -2173,6 +2174,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
21732174
rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
21742175
int ret_errno = 0;
21752176
const char *conf_err;
2177+
rd_kafka_assignor_t *cooperative_assignor;
21762178
#ifndef _WIN32
21772179
sigset_t newset, oldset;
21782180
#endif
@@ -2364,6 +2366,26 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
23642366
goto fail;
23652367
}
23662368

2369+
/* Detect if chosen assignor is cooperative */
2370+
cooperative_assignor = rd_kafka_assignor_find(rk, "cooperative-sticky");
2371+
rk->rk_conf.partition_assignors_cooperative =
2372+
!rk->rk_conf.partition_assignors.rl_cnt ||
2373+
(cooperative_assignor && cooperative_assignor->rkas_enabled);
2374+
2375+
if (!rk->rk_conf.group_remote_assignor) {
2376+
/* Default remote assignor to the chosen local one. */
2377+
if (rk->rk_conf.partition_assignors_cooperative) {
2378+
rk->rk_conf.group_remote_assignor =
2379+
rd_strdup("uniform");
2380+
} else {
2381+
rd_kafka_assignor_t *range_assignor =
2382+
rd_kafka_assignor_find(rk, "range");
2383+
if (range_assignor && range_assignor->rkas_enabled)
2384+
rk->rk_conf.group_remote_assignor =
2385+
rd_strdup("range");
2386+
}
2387+
}
2388+
23672389
/* Create Mock cluster */
23682390
rd_atomic32_init(&rk->rk_mock.cluster_cnt, 0);
23692391
if (rk->rk_conf.mock.broker_cnt > 0) {

src/rdkafka_admin.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3939,7 +3939,7 @@ rd_kafka_DeleteRecordsResponse_parse(rd_kafka_op_t *rko_req,
39393939
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
39403940
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
39413941
offsets = rd_kafka_buf_read_topic_partitions(
3942-
reply, rd_false /* don't use topic_id */, rd_true, 0, fields);
3942+
reply, rd_false /*don't use topic_id*/, rd_true, 0, fields);
39433943
if (!offsets)
39443944
rd_kafka_buf_parse_fail(reply,
39453945
"Failed to parse topic partitions");
@@ -4926,7 +4926,7 @@ rd_kafka_OffsetDeleteResponse_parse(rd_kafka_op_t *rko_req,
49264926
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
49274927
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
49284928
partitions = rd_kafka_buf_read_topic_partitions(
4929-
reply, rd_false /* don't use topic_id */, rd_true, 16, fields);
4929+
reply, rd_false /*don't use topic_id*/, rd_true, 16, fields);
49304930
if (!partitions) {
49314931
rd_snprintf(errstr, errstr_size,
49324932
"Failed to parse OffsetDeleteResponse partitions");
@@ -8114,7 +8114,7 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
81148114
{RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
81158115
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
81168116
partitions = rd_kafka_buf_read_topic_partitions(
8117-
rkbuf, rd_false /* don't use topic_id */,
8117+
rkbuf, rd_false /*don't use topic_id*/,
81188118
rd_true, 0, fields);
81198119
rd_kafka_buf_destroy(rkbuf);
81208120
if (!partitions)

src/rdkafka_assignor.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
159159
rd_kafka_buf_write_topic_partitions(
160160
rkbuf, owned_partitions,
161161
rd_false /*don't skip invalid offsets*/,
162-
rd_false /*any offset*/, rd_false /* use_topic name */,
163-
rd_true, fields);
162+
rd_false /*any offset*/, rd_false /*don't use topic id*/,
163+
rd_true /*use topic name*/, fields);
164164
}
165165

166166
/* Following data is ignored by consumer version < 2 */

0 commit comments

Comments
 (0)