Skip to content

Conversation

emasab
Copy link
Contributor

@emasab emasab commented Mar 26, 2024

  • use metadata cache by topic id
  • rename 'generic' protocol to 'classic'
  • consumer group serve timer to awake the loop earlier
  • compare and find into topic partition list by topic id only
  • fix memory leak when instance creation fails and app_conf is provided
  • fix cases where HB response is received after unsubscription
  • use topic name from current assignment if it's missing from metadata

@emasab emasab force-pushed the dev_metadata_cache_topic_id_and_fixes branch from b36ead0 to 268cb01 Compare March 26, 2024 17:44
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 6667956 to ca1eb2d Compare March 26, 2024 17:48
@emasab emasab force-pushed the dev_metadata_cache_topic_id_and_fixes branch 2 times, most recently from 115eba5 to d04ed7e Compare March 28, 2024 14:39
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from ca1eb2d to 953f668 Compare March 28, 2024 14:41
@emasab emasab force-pushed the dev_metadata_cache_topic_id_and_fixes branch from d04ed7e to f05bc17 Compare April 1, 2024 14:37
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 953f668 to e4614be Compare April 1, 2024 14:40
@emasab emasab force-pushed the dev_metadata_cache_topic_id_and_fixes branch from f05bc17 to bc427e2 Compare April 1, 2024 15:06
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from e4614be to 10e7cc1 Compare April 1, 2024 15:07
@emasab emasab force-pushed the dev_metadata_cache_topic_id_and_fixes branch from bc427e2 to 117d317 Compare April 1, 2024 16:33
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 10e7cc1 to 5789307 Compare April 1, 2024 16:48
@emasab emasab force-pushed the dev_metadata_cache_topic_id_and_fixes branch from 117d317 to ba3ee8d Compare April 3, 2024 13:21
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 5789307 to 434da75 Compare April 3, 2024 13:29
@emasab emasab force-pushed the dev_metadata_cache_topic_id_and_fixes branch from ba3ee8d to f405243 Compare April 3, 2024 13:39
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 434da75 to e5f3101 Compare April 3, 2024 13:40
@emasab emasab changed the base branch from dev_metadata_cache_topic_id_and_fixes to dev_fix_main_thread_tight_loop April 3, 2024 13:41
@emasab emasab force-pushed the dev_fix_main_thread_tight_loop branch from f5ffbf8 to 7403c10 Compare April 3, 2024 16:53
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from e5f3101 to 109d348 Compare April 3, 2024 16:54
@emasab emasab force-pushed the dev_fix_main_thread_tight_loop branch from 2b56329 to 6a8f931 Compare April 5, 2024 08:10
@emasab emasab changed the base branch from dev_fix_main_thread_tight_loop to dev_metadata_cache_topic_id_and_fixes April 5, 2024 08:12
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 109d348 to 3dd0e45 Compare April 5, 2024 08:28
Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked all files except rdkafka_cgrp.c. Have some doubts in that so we can discuss them first.

@@ -1132,19 +1132,19 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"Group session keepalive heartbeat interval.", 1, 3600 * 1000, 3 * 1000},
{_RK_GLOBAL | _RK_CGRP, "group.protocol.type", _RK_C_KSTR,
_RK(group_protocol_type),
"Group protocol type for the `generic` group protocol. NOTE: Currently, "
"Group protocol type for the `classic` group protocol. NOTE: Currently, "
"the only supported group "
"protocol type is `consumer`.",
.sdef = "consumer"},
{_RK_GLOBAL | _RK_CGRP | _RK_HIGH | _RK_HIDDEN, "group.protocol", _RK_C_S2I,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
{_RK_GLOBAL | _RK_CGRP | _RK_HIGH | _RK_HIDDEN, "group.protocol", _RK_C_S2I,
{_RK_GLOBAL | _RK_CGRP | _RK_HIGH, "group.protocol", _RK_C_S2I,

We should unhide these properties now.

.vdef = RD_KAFKA_GROUP_PROTOCOL_GENERIC,
.s2i = {{RD_KAFKA_GROUP_PROTOCOL_GENERIC, "generic"},
.vdef = RD_KAFKA_GROUP_PROTOCOL_CLASSIC,
.s2i = {{RD_KAFKA_GROUP_PROTOCOL_CLASSIC, "classic"},
{RD_KAFKA_GROUP_PROTOCOL_CONSUMER, "consumer"}}},
{_RK_GLOBAL | _RK_CGRP | _RK_MED | _RK_HIDDEN, "group.remote.assignor",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
{_RK_GLOBAL | _RK_CGRP | _RK_MED | _RK_HIDDEN, "group.remote.assignor",
{_RK_GLOBAL | _RK_CGRP | _RK_MED, "group.remote.assignor",

const rd_kafka_Uuid_t topic_id) {
int i = rd_kafka_topic_partition_list_find_by_id0(
rktparlist, topic_id, RD_KAFKA_PARTITION_UA,
rd_kafka_topic_partition_cmp_topic_id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another function rd_kafka_topic_partition_by_id_cmp. Lets rename that function to be rd_kafka_topic_partition_by_id_and_partition_cmp

@@ -3117,6 +3117,14 @@ int rd_kafka_topic_partition_cmp_topic(const void *_a, const void *_b) {
return strcmp(a->topic, b->topic);
}

/** @brief Compare only the topic id */
int rd_kafka_topic_partition_cmp_topic_id(const void *_a, const void *_b) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this to rd_kafka_topic_partition_by_id_cmp to be consistent with other functions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a function named rd_kafka_topic_partition_list_find_by_id but it finds by id and partition. Let's rename that rd_kafka_topic_partition_list_find_topic_by_id_and_partition

Comment on lines 300 to 301
/** Send a complete request in next heartbeat,
* but don't send the acknowledgement if it's not required */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** Send a complete request in next heartbeat,
* but don't send the acknowledgement if it's not required */
/** Send a complete request in next heartbeat */

I don't think statement after but is required.

/** Member is fenced, rejoining */
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE 0x40
/** Member is sending an acknowledgement for a reconciled assignment */
#define RD_KAFKA_CGRP_CONSUMER_F_SENDING_ACK 0x80
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this near RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK

.vdef = RD_KAFKA_GROUP_PROTOCOL_GENERIC,
.s2i = {{RD_KAFKA_GROUP_PROTOCOL_GENERIC, "generic"},
.vdef = RD_KAFKA_GROUP_PROTOCOL_CLASSIC,
.s2i = {{RD_KAFKA_GROUP_PROTOCOL_CLASSIC, "classic"},
{RD_KAFKA_GROUP_PROTOCOL_CONSUMER, "consumer"}}},
{_RK_GLOBAL | _RK_CGRP | _RK_MED | _RK_HIDDEN, "group.remote.assignor",
_RK_C_STR, _RK(group_remote_assignor),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should explain what we are doing in the default case (null) as well. Something related to partition assignment strategy. Also, we should explain that this default will be changed in the future so its better to use one of the value from now itself.

src/rdkafka.c Outdated
Comment on lines 2382 to 2437
if (!rk->rk_conf.group_remote_assignor) {
/* Default remote assignor to the chosen local one. */
if (rk->rk_conf.partition_assignors_cooperative) {
group_remote_assignor_override = rd_strdup("uniform");
rk->rk_conf.group_remote_assignor =
rd_strdup("uniform");
group_remote_assignor_override;
} else {
rd_kafka_assignor_t *range_assignor =
rd_kafka_assignor_find(rk, "range");
if (range_assignor && range_assignor->rkas_enabled)
rk->rk_conf.group_remote_assignor =
if (range_assignor && range_assignor->rkas_enabled) {
group_remote_assignor_override =
rd_strdup("range");
rk->rk_conf.group_remote_assignor =
group_remote_assignor_override;
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the new protocol, there is no significance of the partition assignment strategy. We should not depend on it. If we want to depend on it due to backward compatibility for now, we should tell user that this behaviour will be changed in the future and default remote assignor would be uniform as suggested in the KIP.

On other though, we shouldn't put any default based on partition assignment strategy and force users to chose one for this protocol. If they want to use the new protocol, they will anyways going to change to group.protocol property, so why not choose this as well.

Copy link
Contributor Author

@emasab emasab Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be able to do a transparent upgrade until next major release we want to detect the methods it's going to use in rebalance_cb, and also give the same assignment results. I'm assuming broker assignors will give the same results in terms of stickyness or rack-awareness.
For roundrobin I think it's best to revert the configuration to the classic protocol, considering the new protocol will be enabled by default it doesn't need to give a fatal error with existing code.
Instead when group.remote.assignor is set we can assume they start directly with the incremental methods.

Comment on lines +2669 to +2709
if (group_remote_assignor_override)
rd_free(group_remote_assignor_override);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be outside of if (app_conf) condition as we are already doing a NULL check here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not app_conf the property if freed in rd_kafka_destroy_internal so it would be a double free, here it's setting memory to zero later
memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));
so they're not freed by rd_kafka_destroy_internal and the properties that belonged to app_conf are still valid pointers that are freed by the app.

@emasab emasab force-pushed the dev_metadata_cache_topic_id_and_fixes branch from d2444a7 to d25149b Compare April 5, 2024 19:15
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch 3 times, most recently from 514851a to 08b9df5 Compare April 8, 2024 19:11
@emasab emasab changed the base branch from dev_metadata_cache_topic_id_and_fixes to dev_fix_undesired_partition_migration April 8, 2024 19:12
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 08b9df5 to 4eb6f47 Compare April 9, 2024 12:50
@emasab emasab force-pushed the dev_fix_undesired_partition_migration branch from 2779b09 to 4b005f0 Compare April 11, 2024 09:19
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 65cb3a8 to dfc2fee Compare April 11, 2024 09:19
@emasab emasab force-pushed the dev_fix_undesired_partition_migration branch 3 times, most recently from cd88a71 to addd3d7 Compare April 15, 2024 10:11
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from e517019 to 9ba042b Compare April 15, 2024 10:12
@emasab emasab force-pushed the dev_fix_undesired_partition_migration branch from addd3d7 to 1411693 Compare April 15, 2024 14:51
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 9ba042b to bee04c3 Compare April 15, 2024 14:51
@emasab emasab force-pushed the dev_fix_undesired_partition_migration branch from 1411693 to b605960 Compare April 15, 2024 15:26
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from bee04c3 to d0f2a7e Compare April 15, 2024 15:26
@emasab emasab force-pushed the dev_fix_undesired_partition_migration branch from b605960 to 7e06fa5 Compare April 15, 2024 17:24
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from d0f2a7e to 270f7f7 Compare April 15, 2024 17:24
@emasab emasab force-pushed the dev_fix_undesired_partition_migration branch from 7e06fa5 to 92eb02b Compare April 15, 2024 18:02
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 270f7f7 to 228b93d Compare April 15, 2024 18:02
Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More comments.

@@ -109,7 +109,9 @@ group.instance.id | C | |
partition.assignment.strategy | C | | range,roundrobin | medium | The name of one or more partition assignment strategies. The elected group leader will use a strategy supported by all members of the group to assign partitions to group members. If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority). Cooperative and non-cooperative (eager) strategies must not be mixed. Available strategies: range, roundrobin, cooperative-sticky. <br>*Type: string*
session.timeout.ms | C | 1 .. 3600000 | 45000 | high | Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. Also see `max.poll.interval.ms`. <br>*Type: integer*
heartbeat.interval.ms | C | 1 .. 3600000 | 3000 | low | Group session keepalive heartbeat interval. <br>*Type: integer*
group.protocol.type | C | | consumer | low | Group protocol type for the `generic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
group.protocol.type | C | | consumer | low | Group protocol type for the `classic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
group.protocol | C | classic, consumer | classic | high | Group protocol to use. Use `classic` for the original protocol and `consumer` for the new protocol introduced in KIP-848. Available protocols: classic or consumer. Default is `classic`, but will change to `consumer` in next releases. <br>*Type: enum value*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
group.protocol | C | classic, consumer | classic | high | Group protocol to use. Use `classic` for the original protocol and `consumer` for the new protocol introduced in KIP-848. Available protocols: classic or consumer. Default is `classic`, but will change to `consumer` in next releases. <br>*Type: enum value*
group.protocol | C | classic, consumer | classic | high | Group protocol to use. Available protocols: classic or consumer. Use `classic` for the original protocol and `consumer` for the new protocol introduced in KIP-848. Default is `classic`, but will change to `consumer` in some later release. <br>*Type: enum value*

src/rdkafka.c Outdated
Comment on lines 2381 to 2432
if (!rk->rk_conf.group_remote_assignor) {
/* Default remote assignor to the chosen local one. */
if (rk->rk_conf.partition_assignors_cooperative) {
rk->rk_conf.group_remote_assignor =
rd_strdup("uniform");
} else {
rd_kafka_assignor_t *range_assignor =
rd_kafka_assignor_find(rk, "range");
if (range_assignor && range_assignor->rkas_enabled)
rd_kafka_assignor_t *cooperative_assignor;

/* Detect if chosen assignor is cooperative */
cooperative_assignor =
rd_kafka_assignor_find(rk, "cooperative-sticky");
rk->rk_conf.partition_assignors_cooperative =
!rk->rk_conf.partition_assignors.rl_cnt ||
(cooperative_assignor &&
cooperative_assignor->rkas_enabled);

if (rk->rk_conf.group_protocol ==
RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
/* Default remote assignor to the chosen local one. */
if (rk->rk_conf.partition_assignors_cooperative) {
group_remote_assignor_override =
rd_strdup("uniform");
rk->rk_conf.group_remote_assignor =
rd_strdup("range");
group_remote_assignor_override;
} else {
rd_kafka_assignor_t *range_assignor =
rd_kafka_assignor_find(rk, "range");
if (range_assignor &&
range_assignor->rkas_enabled) {
group_remote_assignor_override =
rd_strdup("range");
rk->rk_conf.group_remote_assignor =
group_remote_assignor_override;
} else {
rd_kafka_log(
rk, LOG_WARNING, "ASSIGNOR",
"roundrobin assignor isn't "
"available"
"with group protocol CONSUMER, "
"reverting group protocol "
"to CLASSIC");
rk->rk_conf.group_protocol =
RD_KAFKA_GROUP_PROTOCOL_CLASSIC;
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a FIXME here for removing these changes.

group.protocol.type | C | | consumer | low | Group protocol type for the `generic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
group.protocol.type | C | | consumer | low | Group protocol type for the `classic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
group.protocol | C | classic, consumer | classic | high | Group protocol to use. Use `classic` for the original protocol and `consumer` for the new protocol introduced in KIP-848. Available protocols: classic or consumer. Default is `classic`, but will change to `consumer` in next releases. <br>*Type: enum value*
group.remote.assignor | C | | | medium | Server side assignor to use. Keep it null to make server select a suitable assignor for the group. Available assignors: uniform or range. Default is null <br>*Type: string*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect now as we are selecting based on partition.assignment.strategy. We can update this doc to reflect that information and tell that this is not expected path and will be removed later.

@@ -109,7 +109,9 @@ group.instance.id | C | |
partition.assignment.strategy | C | | range,roundrobin | medium | The name of one or more partition assignment strategies. The elected group leader will use a strategy supported by all members of the group to assign partitions to group members. If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority). Cooperative and non-cooperative (eager) strategies must not be mixed. Available strategies: range, roundrobin, cooperative-sticky. <br>*Type: string*
session.timeout.ms | C | 1 .. 3600000 | 45000 | high | Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. Also see `max.poll.interval.ms`. <br>*Type: integer*
heartbeat.interval.ms | C | 1 .. 3600000 | 3000 | low | Group session keepalive heartbeat interval. <br>*Type: integer*
group.protocol.type | C | | consumer | low | Group protocol type for the `generic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
group.protocol.type | C | | consumer | low | Group protocol type for the `classic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
group.protocol | C | classic, consumer | classic | high | Group protocol to use. Use `classic` for the original protocol and `consumer` for the new protocol introduced in KIP-848. Available protocols: classic or consumer. Default is `classic`, but will change to `consumer` in next releases. <br>*Type: enum value*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add that this is the EA release and the APIs and compatibility for consumer protocol is expected to change.

src/rdkafka.c Outdated
Comment on lines 2410 to 2418
rd_kafka_log(
rk, LOG_WARNING, "ASSIGNOR",
"roundrobin assignor isn't "
"available"
"with group protocol CONSUMER, "
"reverting group protocol "
"to CLASSIC");
rk->rk_conf.group_protocol =
RD_KAFKA_GROUP_PROTOCOL_CLASSIC;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverting to classic is incorrect here. We need to set it to range or uniform.

Comment on lines 2940 to +2974
if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY &&
rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK &&
(rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK) &&
rkcg->rkcg_target_assignment) {
if (rkcg->rkcg_current_assignment)
if (rkcg->rkcg_consumer_flags &
RD_KAFKA_CGRP_CONSUMER_F_SENDING_ACK) {
if (rkcg->rkcg_current_assignment)
rd_kafka_topic_partition_list_destroy(
rkcg->rkcg_current_assignment);
rkcg->rkcg_current_assignment =
rd_kafka_topic_partition_list_copy(
rkcg->rkcg_target_assignment);
rd_kafka_topic_partition_list_destroy(
rkcg->rkcg_current_assignment);
rkcg->rkcg_current_assignment =
rd_kafka_topic_partition_list_copy(
rkcg->rkcg_target_assignment);
rd_kafka_topic_partition_list_destroy(
rkcg->rkcg_target_assignment);
rkcg->rkcg_target_assignment = NULL;
rkcg->rkcg_consumer_flags &=
~RD_KAFKA_CGRP_CONSUMER_F_WAITS_ACK;
if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) {
char rkcg_current_assignment_str[512] = "NULL";
rkcg->rkcg_target_assignment);
rkcg->rkcg_target_assignment = NULL;
rkcg->rkcg_consumer_flags &=
~RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK;

rd_kafka_topic_partition_list_str(
rkcg->rkcg_current_assignment,
rkcg_current_assignment_str,
sizeof(rkcg_current_assignment_str), 0);
if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) {
char rkcg_current_assignment_str[512] = "NULL";

rd_kafka_dbg(
rkcg->rkcg_rk, CGRP, "HEARTBEAT",
"Target assignment acked, new current assignment "
" \"%s\"",
rkcg_current_assignment_str);
rd_kafka_topic_partition_list_str(
rkcg->rkcg_current_assignment,
rkcg_current_assignment_str,
sizeof(rkcg_current_assignment_str), 0);

rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
"Target assignment acked, new "
"current assignment "
" \"%s\"",
rkcg_current_assignment_str);
}
} else if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need more understanding on this change.

rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR;
~RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION &
~RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST &
~RD_KAFKA_CGRP_CONSUMER_F_SENDING_ACK;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the state is not STEADY?

@@ -3033,10 +3089,12 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
/* Re-query for coordinator */
rkcg->rkcg_consumer_flags |=
RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST;
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be after next line?

}

/* Skip heartbeat if we have one in transit */
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to rd_kafka_cgrp_consumer_heartbeat_preconditions_met

if (rd_kafka_topic_match(rkcg->rkcg_rk, pattern,
topic)) {
break;
if (likely(rkcg->rkcg_heartbeat_intvl_ms > 0)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the next timer already set?

emasab added 10 commits April 16, 2024 17:31
- rename 'generic' protocol to 'classic'
- consumer group serve timer to awake the loop earlier
- compare and find into topic partition list by topic id only
- fix memory leak when instance creation fails and app_conf is provided
- fix cases where HB response is received after unsubscription
- use topic name from current assignment if it's missing from metadata
fixes to next interval check and to the place where
number of retries is increased
- expedite HB after changing state back to INIT
- use the CONSUMER_F_WAIT_REJOIN to trigger
  the rejoin instead of calling it from max poll interval timer
reschedule it if expected earlier
expedite through scheduling cgrp serve timer
immediately, for a possible desynchronization with the coordinator,
with said partitions not being consumed anymore
@emasab emasab force-pushed the dev_kip848_use_metadata_cache_and_fixes branch from 228b93d to 215ec6e Compare April 17, 2024 06:28
@emasab emasab changed the base branch from dev_fix_undesired_partition_migration to dev_kip848 April 17, 2024 06:29
Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!.. Added couple of comments to track.

Comment on lines 2845 to +2848
rd_kafka_MetadataRequest(
rkb, NULL, topic_ids, "ConsumerGroupHeartbeat API Response",
rkb, NULL, missing_topic_ids, "ConsumerGroupHeartbeat API Response",
rd_false /*!allow_auto_create*/, rd_false, rd_false, rko);
rd_list_destroy(topic_ids);
rd_list_destroy(missing_topic_ids);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just requesting "missing_topics_ids" will not increase cache expiry time for the other topics. Can they be purged inbetween?

rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err));
}

if (actions & RD_KAFKA_ERR_ACTION_RETRY &&
rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using this flag at many place is error prone. We should improve this flow maybe with Subscription state machine.

@emasab emasab merged commit d31ed86 into dev_kip848 Apr 17, 2024
@emasab emasab deleted the dev_kip848_use_metadata_cache_and_fixes branch April 17, 2024 11:28
emasab added a commit that referenced this pull request Apr 18, 2024
- rename 'generic' protocol to 'classic'
- consumer group serve timer to awake the loop earlier
- compare and find into topic partition list by topic id only
- fix memory leak when instance creation fails and app_conf is provided
- fix cases where HB response is received after unsubscription
- use topic name from current assignment if it's missing from metadata
- expedite heartbeat simplification and
    fixes to next interval check and to the place where
    number of retries is increased
- expedite HB after changing state back to INIT
- use the CONSUMER_F_WAIT_REJOIN to trigger
  the rejoin instead of calling it from max poll interval timer
- schedule timer for next execution
  reschedule it if expected earlier
  expedite through scheduling cgrp serve timer
- treat unsupported feature error
   as fatal
- avoid removing partitions not matched by a new subscription
   immediately, for a possible desynchronization with the coordinator, 
   with said partitions not being consumed anymore
anchitj pushed a commit that referenced this pull request Jun 10, 2024
- rename 'generic' protocol to 'classic'
- consumer group serve timer to awake the loop earlier
- compare and find into topic partition list by topic id only
- fix memory leak when instance creation fails and app_conf is provided
- fix cases where HB response is received after unsubscription
- use topic name from current assignment if it's missing from metadata
- expedite heartbeat simplification and
    fixes to next interval check and to the place where
    number of retries is increased
- expedite HB after changing state back to INIT
- use the CONSUMER_F_WAIT_REJOIN to trigger
  the rejoin instead of calling it from max poll interval timer
- schedule timer for next execution
  reschedule it if expected earlier
  expedite through scheduling cgrp serve timer
- treat unsupported feature error
   as fatal
- avoid removing partitions not matched by a new subscription
   immediately, for a possible desynchronization with the coordinator, 
   with said partitions not being consumed anymore
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants