Skip to content

Commit 53fb83d

Browse files
committed
Subscription version to avoid stale metadata
calls cause an unknown topic or partition error
1 parent bf7248c commit 53fb83d

12 files changed

+115
-42
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ librdkafka v2.9.0 is a feature release:
2424
* When consumer is closed before destroying the client, the operations queue
2525
isn't purged anymore as it contains operations
2626
unrelated to the consumer group (#).
27+
* When making multiple changes to the consumer subscription in a short time,
28+
no unknown topic error is returned for topics that are in the new subscription but weren't in previous one (#).
2729

2830

2931
## Fixes
@@ -81,6 +83,12 @@ librdkafka v2.9.0 is a feature release:
8183
isn't purged anymore as it contains operations
8284
unrelated to the consumer group.
8385
Happens since 1.x (#).
86+
* Issues: #
87+
When making multiple changes to the consumer subscription in a short time,
88+
no unknown topic error is returned for topics that are in the new subscription
89+
but weren't in previous one. This was due to the metadata request relative
90+
to previous subscription.
91+
Happens since 1.x (#).
8492

8593

8694

src/rdkafka_admin.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1511,8 +1511,9 @@ rd_kafka_admin_MetadataRequest(rd_kafka_broker_t *rkb,
15111511
rd_false /* No admin operation requires topic creation. */,
15121512
include_cluster_authorized_operations,
15131513
include_topic_authorized_operations,
1514-
rd_false /* No admin operation should update cgrp. */, force_racks,
1515-
resp_cb, replyq,
1514+
rd_false /* No admin operation should update cgrp. */,
1515+
-1 /* No subscription version is used */, force_racks, resp_cb,
1516+
replyq,
15161517
rd_true /* Admin operation metadata requests are always forced. */,
15171518
opaque);
15181519
}

src/rdkafka_buf.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,9 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */
384384
rd_bool_t all_topics; /**< Full/All topics requested */
385385
rd_bool_t cgrp_update; /**< Update cgrp with topic
386386
* status from response. */
387+
int32_t cgrp_subscription_version;
388+
/**< Consumer group subscription version, to
389+
* check before updating cgrp state. */
387390
rd_bool_t force_racks; /**< Force the returned metadata
388391
* to contain partition to
389392
* rack mapping. */

src/rdkafka_cgrp.c

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,7 @@ rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk,
476476
rd_interval_init(&rkcg->rkcg_timeout_scan_intvl);
477477
rd_atomic32_init(&rkcg->rkcg_assignment_lost, rd_false);
478478
rd_atomic32_init(&rkcg->rkcg_terminated, rd_false);
479+
rd_atomic32_init(&rkcg->rkcg_subscription_version, 0);
479480
rkcg->rkcg_current_assignment = rd_kafka_topic_partition_list_new(0);
480481
rkcg->rkcg_target_assignment = NULL;
481482
rkcg->rkcg_next_target_assignment = NULL;
@@ -2232,6 +2233,8 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk,
22322233
* on receiving the response since some topics
22332234
* may be missing. */
22342235
rd_false,
2236+
/* cgrp_update=false: no subscription version is used */
2237+
-1,
22352238
/* force_racks is true if any memeber has a client rack set,
22362239
since we will require partition to rack mapping in that
22372240
case for rack-aware assignors. */
@@ -2336,7 +2339,10 @@ static rd_kafka_op_res_t rd_kafka_cgrp_handle_Metadata_op(rd_kafka_t *rk,
23362339
if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
23372340
return RD_KAFKA_OP_RES_HANDLED; /* Terminating */
23382341

2339-
rd_kafka_cgrp_metadata_update_check(rkcg, rd_false /*dont rejoin*/);
2342+
if (rd_atomic32_get(&rkcg->rkcg_subscription_version) ==
2343+
rko->rko_u.metadata.subscription_version)
2344+
rd_kafka_cgrp_metadata_update_check(rkcg,
2345+
rd_false /*dont rejoin*/);
23402346

23412347
return RD_KAFKA_OP_RES_HANDLED;
23422348
}
@@ -2353,6 +2359,7 @@ static rd_kafka_op_res_t rd_kafka_cgrp_handle_Metadata_op(rd_kafka_t *rk,
23532359
*/
23542360
static int rd_kafka_cgrp_metadata_refresh(rd_kafka_cgrp_t *rkcg,
23552361
int *metadata_agep,
2362+
int32_t cgrp_subscription_version,
23562363
const char *reason) {
23572364
rd_kafka_t *rk = rkcg->rkcg_rk;
23582365
rd_kafka_op_t *rko;
@@ -2421,9 +2428,9 @@ static int rd_kafka_cgrp_metadata_refresh(rd_kafka_cgrp_t *rkcg,
24212428
rd_kafka_cgrp_handle_Metadata_op);
24222429
rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, 0);
24232430

2424-
err = rd_kafka_metadata_request(rkcg->rkcg_rk, NULL, &topics,
2425-
rd_false /*!allow auto create */,
2426-
rd_true /*cgrp_update*/, reason, rko);
2431+
err = rd_kafka_metadata_request(
2432+
rkcg->rkcg_rk, NULL, &topics, rd_false /*!allow auto create */,
2433+
rd_true /*cgrp_update*/, cgrp_subscription_version, reason, rko);
24272434
if (err) {
24282435
rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_METADATA, "CGRPMETADATA",
24292436
"%s: need to refresh metadata (%dms old) "
@@ -2439,7 +2446,8 @@ static int rd_kafka_cgrp_metadata_refresh(rd_kafka_cgrp_t *rkcg,
24392446

24402447

24412448

2442-
static void rd_kafka_cgrp_join(rd_kafka_cgrp_t *rkcg) {
2449+
static void rd_kafka_cgrp_join(rd_kafka_cgrp_t *rkcg,
2450+
int32_t cgrp_subscription_version) {
24432451
int metadata_age;
24442452

24452453
if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP ||
@@ -2475,6 +2483,7 @@ static void rd_kafka_cgrp_join(rd_kafka_cgrp_t *rkcg) {
24752483
/* We need up-to-date full metadata to continue,
24762484
* refresh metadata if necessary. */
24772485
if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age,
2486+
cgrp_subscription_version,
24782487
"consumer join") == 1) {
24792488
rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER,
24802489
"JOIN",
@@ -2879,7 +2888,8 @@ void rd_kafka_cgrp_consumer_next_target_assignment_request_metadata(
28792888
rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, NULL);
28802889
rd_kafka_MetadataRequest(
28812890
rkb, NULL, missing_topic_ids, "ConsumerGroupHeartbeat API Response",
2882-
rd_false /*!allow_auto_create*/, rd_false, rd_false, rko);
2891+
rd_false /*!allow_auto_create*/, rd_false,
2892+
-1 /* no subscription version is used */, rd_false, rko);
28832893
rd_list_destroy(missing_topic_ids);
28842894
}
28852895

@@ -5136,9 +5146,10 @@ rd_kafka_cgrp_calculate_subscribe_revoking_partitions(
51365146
return revoking;
51375147
}
51385148

5139-
static void
5149+
static int32_t
51405150
rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg,
51415151
rd_kafka_topic_partition_list_t *rktparlist) {
5152+
int32_t ret = rd_atomic32_add(&rkcg->rkcg_subscription_version, 1);
51425153
rkcg->rkcg_subscription = rktparlist;
51435154
if (rkcg->rkcg_subscription) {
51445155
/* Insert all non-wildcard topics in cache immediately.
@@ -5149,6 +5160,7 @@ rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg,
51495160
rkcg->rkcg_rk, rkcg->rkcg_subscription, NULL,
51505161
0 /*dont replace*/);
51515162
}
5163+
return ret;
51525164
}
51535165

51545166
/**
@@ -5166,6 +5178,7 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg,
51665178
rd_kafka_topic_partition_list_t *errored;
51675179
int metadata_age;
51685180
int old_cnt = rkcg->rkcg_subscription->cnt;
5181+
int32_t cgrp_subscription_version;
51695182

51705183
rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
51715184

@@ -5182,9 +5195,11 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg,
51825195
rkcg, unsubscribing_topics);
51835196

51845197
rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
5185-
rd_kafka_cgrp_subscription_set(rkcg, rktparlist);
5198+
cgrp_subscription_version =
5199+
rd_kafka_cgrp_subscription_set(rkcg, rktparlist);
51865200

51875201
if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age,
5202+
cgrp_subscription_version,
51885203
"modify subscription") == 1) {
51895204
rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER,
51905205
"MODSUB",
@@ -5323,7 +5338,7 @@ static rd_kafka_resp_err_t rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t *rkcg,
53235338
static rd_kafka_resp_err_t
53245339
rd_kafka_cgrp_subscribe(rd_kafka_cgrp_t *rkcg,
53255340
rd_kafka_topic_partition_list_t *rktparlist) {
5326-
5341+
int32_t subscription_version;
53275342
rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "SUBSCRIBE",
53285343
"Group \"%.*s\": subscribe to new %ssubscription "
53295344
"of %d topics (join-state %s)",
@@ -5389,9 +5404,9 @@ rd_kafka_cgrp_subscribe(rd_kafka_cgrp_t *rkcg,
53895404
if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
53905405
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
53915406

5392-
rd_kafka_cgrp_subscription_set(rkcg, rktparlist);
5407+
subscription_version = rd_kafka_cgrp_subscription_set(rkcg, rktparlist);
53935408

5394-
rd_kafka_cgrp_join(rkcg);
5409+
rd_kafka_cgrp_join(rkcg, subscription_version);
53955410

53965411
return RD_KAFKA_RESP_ERR_NO_ERROR;
53975412
}
@@ -5749,7 +5764,8 @@ static void rd_kafka_cgrp_join_state_serve(rd_kafka_cgrp_t *rkcg) {
57495764

57505765
if (rd_interval_immediate(&rkcg->rkcg_join_intvl, 1000 * 1000,
57515766
now) > 0)
5752-
rd_kafka_cgrp_join(rkcg);
5767+
rd_kafka_cgrp_join(
5768+
rkcg, -1 /* current subscription version */);
57535769
break;
57545770

57555771
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN:

src/rdkafka_cgrp.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,8 @@ typedef struct rd_kafka_cgrp_s {
333333

334334
rd_atomic32_t rkcg_terminated; /**< Consumer has been closed */
335335

336+
rd_atomic32_t rkcg_subscription_version; /**< Subscription version */
337+
336338
/* Protected by rd_kafka_*lock() */
337339
struct {
338340
rd_ts_t ts_rebalance; /* Timestamp of
@@ -403,6 +405,12 @@ void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg,
403405
rd_bool_t do_join);
404406
#define rd_kafka_cgrp_get(rk) ((rk)->rk_cgrp)
405407

408+
#define rd_kafka_cgrp_same_subscription_version(rk_cgrp, \
409+
cgrp_subscription_version) \
410+
((rk_cgrp) && \
411+
(cgrp_subscription_version == -1 || \
412+
rd_atomic32_get(&(rk_cgrp)->rkcg_subscription_version) == \
413+
cgrp_subscription_version))
406414

407415
void rd_kafka_cgrp_assigned_offsets_commit(
408416
rd_kafka_cgrp_t *rkcg,

src/rdkafka_metadata.c

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ rd_kafka_metadata(rd_kafka_t *rk,
135135
* partial request may make it seem
136136
* like some subscribed topics are missing. */
137137
all_topics ? rd_true : rd_false,
138+
-1 /* same subscription version */,
138139
rd_false /* force_racks */, rko);
139140

140141
rd_list_destroy(&topics);
@@ -484,12 +485,13 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
484485
rd_bool_t cgrp_update = rd_false;
485486
rd_bool_t has_reliable_leader_epochs =
486487
rd_kafka_has_reliable_leader_epochs(rkb);
487-
int ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion;
488-
rd_kafkap_str_t cluster_id = RD_ZERO_INIT;
489-
int32_t controller_id = -1;
490-
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
491-
int broker_changes = 0;
492-
int cache_changes = 0;
488+
int ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion;
489+
rd_kafkap_str_t cluster_id = RD_ZERO_INIT;
490+
int32_t controller_id = -1;
491+
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
492+
int broker_changes = 0;
493+
int cache_changes = 0;
494+
int cgrp_subscription_version = -1;
493495

494496
/* If client rack is present, the metadata cache (topic or full) needs
495497
* to contain the partition to rack map. */
@@ -504,6 +506,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
504506
cgrp_update =
505507
request->rkbuf_u.Metadata.cgrp_update && rk->rk_cgrp;
506508
compute_racks |= request->rkbuf_u.Metadata.force_racks;
509+
cgrp_subscription_version =
510+
request->rkbuf_u.Metadata.cgrp_subscription_version;
507511
}
508512

509513
/* If there's reason is NULL, set it to a human-readable string. */
@@ -1031,9 +1035,13 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
10311035
* the effective subscription of available topics) as to not
10321036
* propagate non-included topics as non-existent. */
10331037
if (cgrp_update &&
1034-
(requested_topics || requested_topic_ids || all_topics))
1038+
(all_topics ||
1039+
((requested_topics || requested_topic_ids) &&
1040+
rd_kafka_cgrp_same_subscription_version(
1041+
rkb->rkb_rk->rk_cgrp, cgrp_subscription_version)))) {
10351042
rd_kafka_cgrp_metadata_update_check(rkb->rkb_rk->rk_cgrp,
10361043
rd_true /*do join*/);
1044+
}
10371045

10381046
/* Try to acquire a Producer ID from this broker if we
10391047
* don't have one. */
@@ -1347,6 +1355,7 @@ rd_kafka_metadata_refresh_topics(rd_kafka_t *rk,
13471355
rd_bool_t force,
13481356
rd_bool_t allow_auto_create,
13491357
rd_bool_t cgrp_update,
1358+
int32_t cgrp_subscription_version,
13501359
const char *reason) {
13511360
rd_list_t q_topics;
13521361
int destroy_rkb = 0;
@@ -1413,9 +1422,9 @@ rd_kafka_metadata_refresh_topics(rd_kafka_t *rk,
14131422
"Requesting metadata for %d/%d topics: %s",
14141423
rd_list_cnt(&q_topics), rd_list_cnt(topics), reason);
14151424

1416-
rd_kafka_MetadataRequest(rkb, &q_topics, NULL, reason,
1417-
allow_auto_create, cgrp_update,
1418-
rd_false /* force_racks */, NULL);
1425+
rd_kafka_MetadataRequest(
1426+
rkb, &q_topics, NULL, reason, allow_auto_create, cgrp_update,
1427+
cgrp_subscription_version, rd_false /* force_racks */, NULL);
14191428

14201429
rd_list_destroy(&q_topics);
14211430

@@ -1464,7 +1473,7 @@ rd_kafka_metadata_refresh_known_topics(rd_kafka_t *rk,
14641473
else
14651474
err = rd_kafka_metadata_refresh_topics(
14661475
rk, rkb, &topics, force, allow_auto_create_topics,
1467-
rd_false /*!cgrp_update*/, reason);
1476+
rd_false /*!cgrp_update*/, -1, reason);
14681477

14691478
rd_list_destroy(&topics);
14701479

@@ -1530,7 +1539,8 @@ rd_kafka_metadata_refresh_consumer_topics(rd_kafka_t *rk,
15301539
else
15311540
err = rd_kafka_metadata_refresh_topics(
15321541
rk, rkb, &topics, rd_true /*force*/,
1533-
allow_auto_create_topics, rd_true /*cgrp_update*/, reason);
1542+
allow_auto_create_topics, rd_true /*cgrp_update*/,
1543+
rd_atomic32_get(&rkcg->rkcg_subscription_version), reason);
15341544

15351545
rd_list_destroy(&topics);
15361546

@@ -1557,8 +1567,9 @@ rd_kafka_resp_err_t rd_kafka_metadata_refresh_brokers(rd_kafka_t *rk,
15571567
const char *reason) {
15581568
return rd_kafka_metadata_request(rk, rkb, NULL /*brokers only*/,
15591569
rd_false /*!allow auto create topics*/,
1560-
rd_false /*no cgrp update */, reason,
1561-
NULL);
1570+
rd_false /*no cgrp update */,
1571+
-1 /* same subscription version */,
1572+
reason, NULL);
15621573
}
15631574

15641575

@@ -1592,7 +1603,8 @@ rd_kafka_resp_err_t rd_kafka_metadata_refresh_all(rd_kafka_t *rk,
15921603
rd_list_init(&topics, 0, NULL); /* empty list = all topics */
15931604
rd_kafka_MetadataRequest(
15941605
rkb, &topics, NULL, reason, rd_false /*no auto create*/,
1595-
rd_true /*cgrp update*/, rd_false /* force_rack */, NULL);
1606+
rd_true /*cgrp update*/, -1 /* same subscription version */,
1607+
rd_false /* force_rack */, NULL);
15961608
rd_list_destroy(&topics);
15971609

15981610
if (destroy_rkb)
@@ -1618,6 +1630,7 @@ rd_kafka_metadata_request(rd_kafka_t *rk,
16181630
const rd_list_t *topics,
16191631
rd_bool_t allow_auto_create_topics,
16201632
rd_bool_t cgrp_update,
1633+
int32_t cgrp_subscription_version,
16211634
const char *reason,
16221635
rd_kafka_op_t *rko) {
16231636
int destroy_rkb = 0;
@@ -1629,9 +1642,9 @@ rd_kafka_metadata_request(rd_kafka_t *rk,
16291642
destroy_rkb = 1;
16301643
}
16311644

1632-
rd_kafka_MetadataRequest(rkb, topics, NULL, reason,
1633-
allow_auto_create_topics, cgrp_update,
1634-
rd_false /* force racks */, rko);
1645+
rd_kafka_MetadataRequest(
1646+
rkb, topics, NULL, reason, allow_auto_create_topics, cgrp_update,
1647+
cgrp_subscription_version, rd_false /* force racks */, rko);
16351648

16361649
if (destroy_rkb)
16371650
rd_kafka_broker_destroy(rkb);
@@ -1695,7 +1708,7 @@ static void rd_kafka_metadata_leader_query_tmr_cb(rd_kafka_timers_t *rkts,
16951708
rd_kafka_metadata_refresh_topics(
16961709
rk, NULL, &topics, rd_true /*force*/,
16971710
rk->rk_conf.allow_auto_create_topics,
1698-
rd_false /*!cgrp_update*/, "partition leader query");
1711+
rd_false /*!cgrp_update*/, -1, "partition leader query");
16991712

17001713
/* Back off next query exponentially till we reach
17011714
* the retry backoff max ms */

src/rdkafka_metadata.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ rd_kafka_metadata_refresh_topics(rd_kafka_t *rk,
147147
rd_bool_t force,
148148
rd_bool_t allow_auto_create,
149149
rd_bool_t cgrp_update,
150+
int32_t cgrp_subscription_version,
150151
const char *reason);
151152
rd_kafka_resp_err_t
152153
rd_kafka_metadata_refresh_known_topics(rd_kafka_t *rk,
@@ -170,6 +171,7 @@ rd_kafka_metadata_request(rd_kafka_t *rk,
170171
const rd_list_t *topics,
171172
rd_bool_t allow_auto_create_topics,
172173
rd_bool_t cgrp_update,
174+
int32_t cgrp_subscription_version,
173175
const char *reason,
174176
rd_kafka_op_t *rko);
175177

src/rdkafka_op.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,8 @@ struct rd_kafka_op_s {
400400
struct {
401401
rd_kafka_metadata_t *md;
402402
rd_kafka_metadata_internal_t *mdi;
403+
/* subscription version for this call */
404+
int32_t subscription_version;
403405
int force; /* force request regardless of outstanding
404406
* metadata requests. */
405407
} metadata;

0 commit comments

Comments
 (0)