Skip to content

Commit 215ec6e

Browse files
committed
Address comments
1 parent cc862cc commit 215ec6e

File tree

2 files changed

+40
-26
lines changed

2 files changed

+40
-26
lines changed

src/rdkafka.c

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2381,7 +2381,11 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
23812381
if (!rk->rk_conf.group_remote_assignor) {
23822382
rd_kafka_assignor_t *cooperative_assignor;
23832383

2384-
/* Detect if chosen assignor is cooperative */
2384+
/* Detect if chosen assignor is cooperative
2385+
* FIXME: remove this compatibility altogether
2386+
* and apply the breaking changes that will be required
2387+
* in next major version. */
2388+
23852389
cooperative_assignor =
23862390
rd_kafka_assignor_find(rk, "cooperative-sticky");
23872391
rk->rk_conf.partition_assignors_cooperative =
@@ -2402,6 +2406,10 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
24022406
rd_kafka_assignor_find(rk, "range");
24032407
if (range_assignor &&
24042408
range_assignor->rkas_enabled) {
2409+
rd_kafka_log(
2410+
rk, LOG_WARNING, "ASSIGNOR",
2411+
"\"range\" assignor is sticky "
2412+
"with group protocol CONSUMER");
24052413
group_remote_assignor_override =
24062414
rd_strdup("range");
24072415
rk->rk_conf.group_remote_assignor =
@@ -2412,10 +2420,13 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
24122420
"roundrobin assignor isn't "
24132421
"available"
24142422
"with group protocol CONSUMER, "
2415-
"reverting group protocol "
2416-
"to CLASSIC");
2417-
rk->rk_conf.group_protocol =
2418-
RD_KAFKA_GROUP_PROTOCOL_CLASSIC;
2423+
"using the \"uniform\" one. "
2424+
"It's similar, "
2425+
"but it's also sticky");
2426+
group_remote_assignor_override =
2427+
rd_strdup("uniform");
2428+
rk->rk_conf.group_remote_assignor =
2429+
group_remote_assignor_override;
24192430
}
24202431
}
24212432
}

src/rdkafka_cgrp.c

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -785,9 +785,8 @@ void rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t *rkcg, const char *reason) {
785785
return;
786786
}
787787

788-
if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_QUERY_COORD) {
788+
if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_QUERY_COORD)
789789
rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_COORD);
790-
}
791790

792791
rd_kafka_broker_destroy(rkb);
793792

@@ -901,8 +900,13 @@ static void rd_kafka_cgrp_consumer_reset(rd_kafka_cgrp_t *rkcg) {
901900
rd_kafka_topic_partition_list_destroy);
902901
rkcg->rkcg_next_target_assignment = NULL;
903902
rkcg->rkcg_current_assignment = rd_kafka_topic_partition_list_new(0);
904-
rkcg->rkcg_consumer_flags &= ~RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK &
905-
~RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN;
903+
904+
/* Leave only specified flags, reset the rest */
905+
rkcg->rkcg_consumer_flags =
906+
(rkcg->rkcg_consumer_flags &
907+
RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE) |
908+
(rkcg->rkcg_consumer_flags &
909+
RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE);
906910
}
907911

908912
/**
@@ -2704,45 +2708,44 @@ rd_kafka_cgrp_consumer_assignment_with_metadata(
27042708
for (i = 0; i < assignment->cnt; i++) {
27052709
struct rd_kafka_metadata_cache_entry *rkmce;
27062710
rd_kafka_topic_partition_t *rktpar;
2711+
char *topic_name = NULL;
27072712
rd_kafka_Uuid_t request_topic_id =
27082713
rd_kafka_topic_partition_get_topic_id(
27092714
&assignment->elems[i]);
2715+
27102716
rd_kafka_rdlock(rk);
27112717
rkmce =
27122718
rd_kafka_metadata_cache_find_by_id(rk, request_topic_id, 1);
27132719

2714-
if (rkmce) {
2715-
rd_kafka_topic_partition_list_add_with_topic_name_and_id(
2716-
assignment_with_metadata, request_topic_id,
2717-
rkmce->rkmce_mtopic.topic,
2718-
assignment->elems[i].partition);
2719-
rd_kafka_rdunlock(rk);
2720-
continue;
2721-
}
2720+
if (rkmce)
2721+
topic_name = rd_strdup(rkmce->rkmce_mtopic.topic);
27222722
rd_kafka_rdunlock(rk);
27232723

2724-
rktpar = rd_kafka_topic_partition_list_find_topic_by_id(
2725-
rkcg->rkcg_current_assignment, request_topic_id);
2726-
if (rktpar) {
2724+
if (unlikely(!topic_name)) {
2725+
rktpar = rd_kafka_topic_partition_list_find_topic_by_id(
2726+
rkcg->rkcg_current_assignment, request_topic_id);
2727+
if (rktpar)
2728+
topic_name = rd_strdup(rktpar->topic);
2729+
}
2730+
2731+
if (likely(topic_name != NULL)) {
27272732
rd_kafka_topic_partition_list_add_with_topic_name_and_id(
27282733
assignment_with_metadata, request_topic_id,
2729-
rktpar->topic, assignment->elems[i].partition);
2734+
topic_name, assignment->elems[i].partition);
2735+
rd_free(topic_name);
27302736
continue;
27312737
}
27322738

27332739
if (missing_topic_ids) {
2734-
rd_kafka_Uuid_t topic_id;
27352740
if (unlikely(!*missing_topic_ids))
27362741
*missing_topic_ids =
27372742
rd_list_new(1, rd_list_Uuid_destroy);
2738-
topic_id = rd_kafka_topic_partition_get_topic_id(
2739-
&assignment->elems[i]);
27402743
rd_list_add(*missing_topic_ids,
2741-
rd_kafka_Uuid_copy(&topic_id));
2744+
rd_kafka_Uuid_copy(&request_topic_id));
27422745
}
27432746
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
27442747
"Metadata not found for the "
2745-
"assigned topic id - %s."
2748+
"assigned topic id: %s."
27462749
" Continuing without it",
27472750
rd_kafka_Uuid_base64str(&request_topic_id));
27482751
}

0 commit comments

Comments
 (0)