Skip to content

Commit b36ead0

Browse files
committed
Metadata cache by topic id and
fixes for failing metadata tests: - cache is updated on full metadata refresh and not cleared - fast metadata refresh stops without leader change when there are no stale leader epochs - handling broker isn't updated on stale leader epoch
1 parent ce3ce56 commit b36ead0

File tree

7 files changed

+338
-106
lines changed

7 files changed

+338
-106
lines changed

CHANGELOG.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,36 @@ librdkafka v2.3.1 is a maintenance release:
66
check the [release notes](https://www.openssl.org/news/cl30.txt).
77
* Integration tests can be started in KRaft mode and run against any
88
GitHub Kafka branch other than the released versions.
9+
* [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
10+
Continue partial implementation by adding a metadata cache by topic id
11+
and updating the topic id corresponding to the partition name (#)
12+
* Fixes to metadata cache expiration, metadata refresh interruption and
13+
to avoid usage of stale metadata (#).
14+
* Fix to main loop timeout calculation leading to a tight loop for a period
15+
of 1 ms max (#).
16+
17+
18+
## Fixes
19+
20+
### General fixes
21+
22+
* Metadata cache was cleared on full metadata refresh, leading to unnecessary
23+
refreshes and accasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating
24+
cache for existing or hinted entries instead of clearing them.
25+
Happening since 2.1.0 (#).
26+
* Metadata refreshes without partition leader change could lead to a loop of
27+
intervaled metadata calls. Solved by stopping metadata refresh when
28+
all existing metadata is non-stale. Happening since 2.3.0 (#).
29+
* A partition migration could happen, using stale metadata, when the partition
30+
was undergoing a validation and being retried because of an error.
31+
Solved by doing a partition migration only with a non-stale leader epoch.
32+
Happening since 2.1.0 (#).
33+
* In librdkafka main thread loop, when it was awaken less that 1 ms
34+
before the expiration of a timeout, it was serving with a zero timeout,
35+
leading to increased CPU usage until the timeout was reached.
36+
Happening since 1.x (#).
37+
38+
939

1040

1141
# librdkafka v2.3.0

src/rdkafka.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2127,7 +2127,12 @@ static int rd_kafka_thread_main(void *arg) {
21272127
RD_KAFKA_CGRP_STATE_TERM)))) {
21282128
rd_ts_t sleeptime = rd_kafka_timers_next(
21292129
&rk->rk_timers, 1000 * 1000 /*1s*/, 1 /*lock*/);
2130-
rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0,
2130+
/* Round up to avoid calling serve with timeout 0 ms
2131+
* in a tight loop until 1 ms has passed. */
2132+
int timeout_ms = (int)(sleeptime / 1000);
2133+
if (sleeptime % 1000 > 0)
2134+
timeout_ms++;
2135+
rd_kafka_q_serve(rk->rk_ops, timeout_ms, 0,
21312136
RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
21322137
if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
21332138
rd_kafka_cgrp_serve(rk->rk_cgrp);

src/rdkafka_metadata.c

Lines changed: 94 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -474,12 +474,14 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
474474
rd_kafka_metadata_internal_t *mdi = NULL;
475475
rd_kafka_metadata_t *md = NULL;
476476
size_t rkb_namelen;
477-
const int log_decode_errors = LOG_ERR;
478-
rd_list_t *missing_topics = NULL;
479-
480-
const rd_list_t *requested_topics = request_topics;
481-
rd_bool_t all_topics = rd_false;
482-
rd_bool_t cgrp_update = rd_false;
477+
const int log_decode_errors = LOG_ERR;
478+
rd_list_t *missing_topics = NULL;
479+
rd_list_t *missing_topic_ids = NULL;
480+
481+
const rd_list_t *requested_topics = request_topics;
482+
const rd_list_t *requested_topic_ids = NULL;
483+
rd_bool_t all_topics = rd_false;
484+
rd_bool_t cgrp_update = rd_false;
483485
rd_bool_t has_reliable_leader_epochs =
484486
rd_kafka_has_reliable_leader_epochs(rkb);
485487
int ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion;
@@ -496,8 +498,9 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
496498
rd_bool_t compute_racks = has_client_rack;
497499

498500
if (request) {
499-
requested_topics = request->rkbuf_u.Metadata.topics;
500-
all_topics = request->rkbuf_u.Metadata.all_topics;
501+
requested_topics = request->rkbuf_u.Metadata.topics;
502+
requested_topic_ids = request->rkbuf_u.Metadata.topic_ids;
503+
all_topics = request->rkbuf_u.Metadata.all_topics;
501504
cgrp_update =
502505
request->rkbuf_u.Metadata.cgrp_update && rk->rk_cgrp;
503506
compute_racks |= request->rkbuf_u.Metadata.force_racks;
@@ -519,6 +522,9 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
519522
if (requested_topics)
520523
missing_topics =
521524
rd_list_copy(requested_topics, rd_list_string_copy, NULL);
525+
if (requested_topic_ids)
526+
missing_topic_ids =
527+
rd_list_copy(requested_topic_ids, rd_list_Uuid_copy, NULL);
522528

523529
rd_kafka_broker_lock(rkb);
524530
rkb_namelen = strlen(rkb->rkb_name) + 1;
@@ -833,34 +839,37 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
833839
rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i],
834840
&mdi->topics[i]);
835841

836-
// TODO: Should be done for requested_topic_ids as well.
837-
if (requested_topics) {
842+
if (requested_topics)
838843
rd_list_free_cb(missing_topics,
839844
rd_list_remove_cmp(missing_topics,
840845
md->topics[i].topic,
841846
(void *)strcmp));
842-
if (!all_topics) {
843-
/* Only update cache when not asking
844-
* for all topics. */
845-
846-
rd_kafka_wrlock(rk);
847-
rd_kafka_metadata_cache_topic_update(
848-
rk, &md->topics[i], &mdi->topics[i],
849-
rd_false /*propagate later*/,
850-
/* use has_client_rack rather than
851-
compute_racks. We need cached rack ids
852-
only in case we need to rejoin the group
853-
if they change and client.rack is set
854-
(KIP-881). */
855-
has_client_rack, mdi->brokers,
856-
md->broker_cnt);
857-
cache_changes++;
858-
rd_kafka_wrunlock(rk);
859-
}
860-
}
847+
if (requested_topic_ids)
848+
rd_list_free_cb(
849+
missing_topic_ids,
850+
rd_list_remove_cmp(missing_topic_ids,
851+
&mdi->topics[i].topic_id,
852+
(void *)rd_kafka_Uuid_ptr_cmp));
853+
/* Only update cache when not asking
854+
* for all topics or cache entry
855+
* already exists. */
856+
rd_kafka_wrlock(rk);
857+
cache_changes +=
858+
rd_kafka_metadata_cache_topic_update(
859+
rk, &md->topics[i], &mdi->topics[i],
860+
rd_false /*propagate later*/,
861+
/* use has_client_rack rather than
862+
compute_racks. We need cached rack ids
863+
only in case we need to rejoin the group
864+
if they change and client.rack is set
865+
(KIP-881). */
866+
has_client_rack, mdi->brokers,
867+
md->broker_cnt,
868+
all_topics /*cache entry needs to exist
869+
*if all_topics*/);
870+
rd_kafka_wrunlock(rk);
861871
}
862872

863-
// TODO: Should be done for missing_topic_ids as well.
864873
/* Requested topics not seen in metadata? Propogate to topic code. */
865874
if (missing_topics) {
866875
char *topic;
@@ -892,6 +901,41 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
892901
}
893902
}
894903
}
904+
if (missing_topic_ids) {
905+
rd_kafka_Uuid_t *topic_id;
906+
rd_rkb_dbg(rkb, TOPIC, "METADATA",
907+
"%d/%d requested topic(s) seen in metadata",
908+
rd_list_cnt(requested_topic_ids) -
909+
rd_list_cnt(missing_topic_ids),
910+
rd_list_cnt(requested_topic_ids));
911+
for (i = 0; i < rd_list_cnt(missing_topic_ids); i++) {
912+
rd_kafka_Uuid_t *missing_topic_id =
913+
missing_topic_ids->rl_elems[i];
914+
rd_rkb_dbg(rkb, TOPIC, "METADATA", "wanted %s",
915+
rd_kafka_Uuid_base64str(missing_topic_id));
916+
}
917+
RD_LIST_FOREACH(topic_id, missing_topic_ids, i) {
918+
rd_kafka_topic_t *rkt;
919+
920+
rd_kafka_rdlock(rk);
921+
rkt = rd_kafka_topic_find_by_topic_id(rkb->rkb_rk,
922+
*topic_id);
923+
rd_kafka_rdunlock(rk);
924+
if (rkt) {
925+
/* Received metadata response contained no
926+
* information about topic 'rkt' and thus
927+
* indicates the topic is not available in the
928+
* cluster.
929+
* Mark the topic as non-existent */
930+
rd_kafka_topic_wrlock(rkt);
931+
rd_kafka_topic_set_notexists(
932+
rkt, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
933+
rd_kafka_topic_wrunlock(rkt);
934+
935+
rd_kafka_topic_destroy0(rkt);
936+
}
937+
}
938+
}
895939

896940

897941
rd_kafka_wrlock(rkb->rkb_rk);
@@ -956,17 +1000,18 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
9561000
"Caching full metadata with "
9571001
"%d broker(s) and %d topic(s): %s",
9581002
md->broker_cnt, md->topic_cnt, reason);
959-
} else {
960-
if (cache_changes)
961-
rd_kafka_metadata_cache_propagate_changes(rk);
962-
rd_kafka_metadata_cache_expiry_start(rk);
9631003
}
9641004

1005+
if (cache_changes) {
1006+
rd_kafka_metadata_cache_propagate_changes(rk);
1007+
}
1008+
rd_kafka_metadata_cache_expiry_start(rk);
9651009

966-
// TODO: Should be done for requested_topic_ids as well.
9671010
/* Remove cache hints for the originally requested topics. */
9681011
if (requested_topics)
9691012
rd_kafka_metadata_cache_purge_hints(rk, requested_topics);
1013+
if (requested_topic_ids)
1014+
rd_kafka_metadata_cache_purge_hints(rk, requested_topic_ids);
9701015

9711016
rd_kafka_wrunlock(rkb->rkb_rk);
9721017

@@ -982,7 +1027,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
9821027
* which may contain only a sub-set of the subscribed topics (namely
9831028
* the effective subscription of available topics) as to not
9841029
* propagate non-included topics as non-existent. */
985-
if (cgrp_update && (requested_topics || all_topics))
1030+
if (cgrp_update &&
1031+
(requested_topics || requested_topic_ids || all_topics))
9861032
rd_kafka_cgrp_metadata_update_check(rkb->rkb_rk->rk_cgrp,
9871033
rd_true /*do join*/);
9881034

@@ -995,10 +1041,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
9951041
}
9961042

9971043
done:
998-
999-
// TODO: Should be done for requested_topic_ids as well.
10001044
if (missing_topics)
10011045
rd_list_destroy(missing_topics);
1046+
if (missing_topic_ids)
1047+
rd_list_destroy(missing_topic_ids);
10021048

10031049
/* This metadata request was triggered by someone wanting
10041050
* the metadata information back as a reply, so send that reply now.
@@ -1013,18 +1059,26 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
10131059
err_parse:
10141060
err = rkbuf->rkbuf_err;
10151061
err:
1016-
// TODO: Should be done for requested_topic_ids as well.
10171062
if (requested_topics) {
10181063
/* Failed requests shall purge cache hints for
10191064
* the requested topics. */
10201065
rd_kafka_wrlock(rkb->rkb_rk);
10211066
rd_kafka_metadata_cache_purge_hints(rk, requested_topics);
10221067
rd_kafka_wrunlock(rkb->rkb_rk);
10231068
}
1069+
if (requested_topic_ids) {
1070+
/* Failed requests shall purge cache hints for
1071+
* the requested topics. */
1072+
rd_kafka_wrlock(rkb->rkb_rk);
1073+
rd_kafka_metadata_cache_purge_hints_by_id(rk,
1074+
requested_topic_ids);
1075+
rd_kafka_wrunlock(rkb->rkb_rk);
1076+
}
10241077

1025-
// TODO: Should be done for requested_topic_ids as well.
10261078
if (missing_topics)
10271079
rd_list_destroy(missing_topics);
1080+
if (missing_topic_ids)
1081+
rd_list_destroy(missing_topic_ids);
10281082
rd_tmpabuf_destroy(&tbuf);
10291083

10301084
return err;

src/rdkafka_metadata.h

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,8 @@ rd_kafka_metadata_new_topic_with_partition_replicas_mock(int replication_factor,
219219
*/
220220

221221
struct rd_kafka_metadata_cache_entry {
222-
rd_avl_node_t rkmce_avlnode; /* rkmc_avl */
222+
rd_avl_node_t rkmce_avlnode; /* rkmc_avl */
223+
rd_avl_node_t rkmce_avlnode_by_id; /* rkmc_avl_by_id */
223224
TAILQ_ENTRY(rd_kafka_metadata_cache_entry) rkmce_link; /* rkmc_expiry */
224225
rd_ts_t rkmce_ts_expires; /* Expire time */
225226
rd_ts_t rkmce_ts_insert; /* Insert time */
@@ -243,6 +244,7 @@ struct rd_kafka_metadata_cache_entry {
243244

244245
struct rd_kafka_metadata_cache {
245246
rd_avl_t rkmc_avl;
247+
rd_avl_t rkmc_avl_by_id;
246248
TAILQ_HEAD(, rd_kafka_metadata_cache_entry) rkmc_expiry;
247249
rd_kafka_timer_t rkmc_expiry_tmr;
248250
int rkmc_cnt;
@@ -271,19 +273,26 @@ struct rd_kafka_metadata_cache {
271273
int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, const char *topic);
272274
void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk);
273275
int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts);
274-
void rd_kafka_metadata_cache_topic_update(
276+
int rd_kafka_metadata_cache_topic_update(
275277
rd_kafka_t *rk,
276-
const rd_kafka_metadata_topic_t *mdt,
278+
rd_kafka_metadata_topic_t *mdt,
277279
const rd_kafka_metadata_topic_internal_t *mdit,
278280
rd_bool_t propagate,
279281
rd_bool_t include_metadata,
280282
rd_kafka_metadata_broker_internal_t *brokers,
281-
size_t broker_cnt);
283+
size_t broker_cnt,
284+
rd_bool_t only_existing);
282285
void rd_kafka_metadata_cache_propagate_changes(rd_kafka_t *rk);
283286
struct rd_kafka_metadata_cache_entry *
284287
rd_kafka_metadata_cache_find(rd_kafka_t *rk, const char *topic, int valid);
288+
struct rd_kafka_metadata_cache_entry *
289+
rd_kafka_metadata_cache_find_by_id(rd_kafka_t *rk,
290+
const rd_kafka_Uuid_t,
291+
int valid);
285292
void rd_kafka_metadata_cache_purge_hints(rd_kafka_t *rk,
286293
const rd_list_t *topics);
294+
void rd_kafka_metadata_cache_purge_hints_by_id(rd_kafka_t *rk,
295+
const rd_list_t *topics);
287296
int rd_kafka_metadata_cache_hint(rd_kafka_t *rk,
288297
const rd_list_t *topics,
289298
rd_list_t *dst,

0 commit comments

Comments
 (0)