Skip to content

Commit e62c2b5

Browse files
committed
[KIP-848] Added ConsumerGroupHeartbeat API request
and response with basic cgrp_consumer flow - Added topic id to topic_partition_t while reading from buffer - Added new methods and way to add topic_id to topic partition private - Added new configs group.protocol and group.remote.assignor - Added ConsumerGroupHeartbeat API Request Contract - Added ConsumerGroupApi Request without handling different cases - Working ConsumerGroupHeartbeat API with proper response - Properly receiving assigned partitions with topic_id from the Response - Added metadata request as well after parsing the response. Separate it out from to the topic partition branch - Added metadata response flow with rko - Updated OffsetFetch to v9 - Removed unrequred fields from ConsumerGroupHeartbeat API to make it work with AK > 3.6 - OffsetFetch working fine. Able to consume. - Changed subsribed list to use correct field - Fixed few memory leaks - Some more memory leak fixes. Added updation to subscribed topics list - Minor changes - [KIP-848] Added new configs group.protocol and group.remote.assignor (#4414) - Added new configs group.protocol and group.remote.assignor - Removed printfs and Updated hardcoded one topic for metadata request - Removed some changes related to removed changes in the protocols - [KIP-848] Added topic id to topic_partition_t while reading from buffer (#4416) - Updating topic name from metadata response for all the requested topic_ids instead of just 1. - Style fixes and fixed skip tag issue in buf_*_topic_partition - Changed variable for next assignment - Added topic name while reading topic partition buffer - Changed variable name from assignments to assignment
1 parent 267367c commit e62c2b5

19 files changed

+666
-112
lines changed

CONFIGURATION.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Configuration properties
22
## Global configuration properties
33

4-
Property | C/P | Range | Default | Importance | Description
4+
Property | C/P | Range | Default | Importance | Description
55
-----------------------------------------|-----|-----------------|--------------:|------------| --------------------------
66
builtin.features | * | | gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, plugins, zstd, sasl_oauthbearer, http, oidc | low | Indicates the builtin features for this build of librdkafka. An application can either query this value or attempt to set it with its list of required features to check for library support. <br>*Type: CSV flags*
77
client.id | * | | rdkafka | low | Client identifier. <br>*Type: string*
@@ -158,7 +158,7 @@ client.dns.lookup | * | use_all_dns_ips, resolve_canoni
158158

159159
## Topic configuration properties
160160

161-
Property | C/P | Range | Default | Importance | Description
161+
Property | C/P | Range | Default | Importance | Description
162162
-----------------------------------------|-----|-----------------|--------------:|------------| --------------------------
163163
request.required.acks | P | -1 .. 1000 | -1 | high | This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request: *0*=Broker does not send any response/ack to client, *-1* or *all*=Broker will block until message is committed by all in sync replicas (ISRs). If there are less than `min.insync.replicas` (broker configuration) in the ISR set the produce request will fail. <br>*Type: integer*
164164
acks | P | -1 .. 1000 | -1 | high | Alias for `request.required.acks`: This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request: *0*=Broker does not send any response/ack to client, *-1* or *all*=Broker will block until message is committed by all in sync replicas (ISRs). If there are less than `min.insync.replicas` (broker configuration) in the ISR set the produce request will fail. <br>*Type: integer*

examples/consumer.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,20 @@ int main(int argc, char **argv) {
127127
return 1;
128128
}
129129

130+
if (rd_kafka_conf_set(conf, "group.protocol", "consumer", errstr,
131+
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
132+
fprintf(stderr, "%s\n", errstr);
133+
rd_kafka_conf_destroy(conf);
134+
return 1;
135+
}
136+
137+
// if (rd_kafka_conf_set(conf, "debug", "all", errstr,
138+
// sizeof(errstr)) != RD_KAFKA_CONF_OK) {
139+
// fprintf(stderr, "%s\n", errstr);
140+
// rd_kafka_conf_destroy(conf);
141+
// return 1;
142+
// }
143+
130144
/* If there is no previously committed offset for a partition
131145
* the auto.offset.reset strategy will be used to decide where
132146
* in the partition to start fetching messages.

src/rdkafka_admin.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3939,7 +3939,7 @@ rd_kafka_DeleteRecordsResponse_parse(rd_kafka_op_t *rko_req,
39393939
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
39403940
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
39413941
offsets = rd_kafka_buf_read_topic_partitions(
3942-
reply, rd_false /* don't use topic_id */, 0, fields);
3942+
reply, rd_false /* don't use topic_id */, rd_true, 0, fields);
39433943
if (!offsets)
39443944
rd_kafka_buf_parse_fail(reply,
39453945
"Failed to parse topic partitions");
@@ -4926,7 +4926,7 @@ rd_kafka_OffsetDeleteResponse_parse(rd_kafka_op_t *rko_req,
49264926
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
49274927
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
49284928
partitions = rd_kafka_buf_read_topic_partitions(
4929-
reply, rd_false /* don't use topic_id */, 16, fields);
4929+
reply, rd_false /* don't use topic_id */, rd_true, 16, fields);
49304930
if (!partitions) {
49314931
rd_snprintf(errstr, errstr_size,
49324932
"Failed to parse OffsetDeleteResponse partitions");
@@ -6961,8 +6961,8 @@ static rd_kafka_resp_err_t rd_kafka_ListConsumerGroupOffsetsRequest(
69616961
require_stable_offsets =
69626962
rd_kafka_confval_get_int(&options->require_stable_offsets);
69636963
rd_kafka_OffsetFetchRequest(
6964-
rkb, grpoffsets->group_id, grpoffsets->partitions,
6965-
require_stable_offsets, op_timeout, replyq, resp_cb, opaque);
6964+
rkb, grpoffsets->group_id, grpoffsets->partitions, rd_false, -1,
6965+
NULL, require_stable_offsets, op_timeout, replyq, resp_cb, opaque);
69666966
return RD_KAFKA_RESP_ERR_NO_ERROR;
69676967
}
69686968

@@ -8114,8 +8114,8 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
81148114
{RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
81158115
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
81168116
partitions = rd_kafka_buf_read_topic_partitions(
8117-
rkbuf, rd_false /* don't use topic_id */, 0,
8118-
fields);
8117+
rkbuf, rd_false /* don't use topic_id */,
8118+
rd_true, 0, fields);
81198119
rd_kafka_buf_destroy(rkbuf);
81208120
if (!partitions)
81218121
rd_kafka_buf_parse_fail(

src/rdkafka_assignment.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,8 @@ static int rd_kafka_assignment_serve_pending(rd_kafka_t *rk) {
543543
partitions_to_query->cnt);
544544

545545
rd_kafka_OffsetFetchRequest(
546-
coord, rk->rk_group_id->str, partitions_to_query,
546+
coord, rk->rk_group_id->str, partitions_to_query, rd_false,
547+
-1, NULL,
547548
rk->rk_conf.isolation_level ==
548549
RD_KAFKA_READ_COMMITTED /*require_stable_offsets*/,
549550
0, /* Timeout */

src/rdkafka_assignor.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
159159
rd_kafka_buf_write_topic_partitions(
160160
rkbuf, owned_partitions,
161161
rd_false /*don't skip invalid offsets*/,
162-
rd_false /*any offset*/, rd_false /* don't use topic_id */,
163-
fields);
162+
rd_false /*any offset*/, rd_false /* use_topic name */,
163+
rd_true, fields);
164164
}
165165

166166
/* Following data is ignored by consumer version < 2 */

src/rdkafka_buf.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ void rd_kafka_buf_destroy_final(rd_kafka_buf_t *rkbuf) {
3838
case RD_KAFKAP_Metadata:
3939
if (rkbuf->rkbuf_u.Metadata.topics)
4040
rd_list_destroy(rkbuf->rkbuf_u.Metadata.topics);
41+
if (rkbuf->rkbuf_u.Metadata.topic_ids)
42+
rd_list_destroy(rkbuf->rkbuf_u.Metadata.topic_ids);
4143
if (rkbuf->rkbuf_u.Metadata.reason)
4244
rd_free(rkbuf->rkbuf_u.Metadata.reason);
4345
if (rkbuf->rkbuf_u.Metadata.rko)

src/rdkafka_buf.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,8 +375,10 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */
375375

376376
union {
377377
struct {
378-
rd_list_t *topics; /* Requested topics (char *) */
379-
char *reason; /* Textual reason */
378+
rd_list_t *topics; /* Requested topics (char *) */
379+
rd_list_t *
380+
topic_ids; /* Requested topic ids rd_kafka_Uuid_t */
381+
char *reason; /* Textual reason */
380382
rd_kafka_op_t *rko; /* Originating rko with replyq
381383
* (if any) */
382384
rd_bool_t all_topics; /**< Full/All topics requested */

0 commit comments

Comments
 (0)