Skip to content

Conversation

@mfleming
Copy link
Contributor

Brokers that are not in the metadata should be purged from the internal client lists. This helps to avoid annoying "No route to host" and other connection failure messages.

Fixes #238

The Kafka protocol allows for brokers to have multiple host:port pairs
for a given node Id, e.g. see UpdateMetadata request which contains a
live_brokers list where each broker Id has a list of host:port pairs. It
follows from this that the thing that uniquely identifies a broker is
its Id, and not the host:port.

The behaviour right now is that if we have multiple brokers with the
same host:port but different Ids, the first broker in the list will be
updated to have the Id of whatever broker we're looking at as we iterate
through the brokers in the Metadata response in
rd_kafka_parse_Metadata0(), e.g.

 Step 1. Broker[0] = Metadata.brokers[0]
 Step 2. Broker[0] = Metadata.brokers[1]
 Step 3. Broker[0] = Metadata.brokers[2]

A typical situation where brokers have the same host:port pair but
differ in their Id is if the brokers are behind a load balancer.

The NODE_UPDATE mechanism responsible for this was originally added in
b09ff60 ("Handle broker name and nodeid updates (issue confluentinc#343)") as a way
to forcibly update a broker hostname if an Id is reused with a new host
after the original one was decommissioned. But this isn't how the Java
Kafka client works, so let's use the Metadata response as the source of
truth instead of updating brokers if we can only match by their
host:port.
Brokers that are not in the metadata should be purged from the internal
client lists. This helps to avoid annoying "No route to host" and other
connection failure messages.

Fixes confluentinc#238.
Copy link
Contributor

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mfleming thanks a lot for this contribution and sorry for letting it wait for long. We want to include these fixes in next version. Both fixes are good, just on broker decommission we want to do some additional checks.

Here are some comments mainly for the first fix:

@emasab
Copy link
Contributor

emasab commented May 21, 2024

Hi @mfleming can I apply those changes or do you want to continue the PR? Thanks!

@mfleming
Copy link
Contributor Author

mfleming commented May 21, 2024 via email

@emasab
Copy link
Contributor

emasab commented Jun 7, 2024

@mfleming thanks, sorry for the delay too, I'm checking it again

@emasab emasab requested a review from a team as a code owner June 10, 2024 13:56
@emasab
Copy link
Contributor

emasab commented Jun 10, 2024

/sem-approve

@emasab
Copy link
Contributor

emasab commented Jun 10, 2024

/sem-approve

@emasab
Copy link
Contributor

emasab commented Jun 12, 2024

We're not going to merge this for 2.5.0 that is due in July as we need to do more checks on possible regressions but we want to merge it for a maintenance release in September

@mfleming
Copy link
Contributor Author

We're not going to merge this for 2.5.0 that is due in July as we need to do more checks on possible regressions but we want to merge it for a maintenance release in September

Thanks for fixing things :)

@emasab
Copy link
Contributor

emasab commented Jun 15, 2024

Thanks for fixing things :)

Thank you for this PR!

@benesch
Copy link
Contributor

benesch commented Jul 1, 2024

Just wanted to say a big thank you to both of you—@mfleming for writing this and @emasab for reviewing. We just ran into a slow thread leak in a Kafka consumer at @MaterializeInc that will be fixed by this patch.

@pranavrth
Copy link
Member

pranavrth commented Sep 16, 2024

While going through the PR, I found that there are a few issues in the PR.

  1. The number of threads are increased as part of this PR. Currently, all the bootstrap servers threads are updated to the real broker thread if they appear in the metadata response. This PR creates new broker threads for the new brokers which are seen in the metadata. The bootstrap servers threads are not reused for this purpose in the PR. Due to this, we generally have 'n*2' brokers threads for 'n' bootstrap servers. Earlier there were only 'n' threads.
  2. In the current librdkafka implementation, we always keep bootstrap broker threads even if they are not reported in the metadata. This ensured that we always have bootstrap broker threads if required to rebootstrap. This PR removed this functionality.
  3. To fix the issue introduced in point number 2, KIP-899 needs to be implemented.

We need to fix the above issues before releasing this PR and need of more testing. As a result, it won't be part of the upcoming 2.6 release.

@emasab
Copy link
Contributor

emasab commented Oct 22, 2024

Closes #4881

@emasab
Copy link
Contributor

emasab commented Dec 13, 2024

We agreed KIP-899 and KIP-1102 aren't strictly necessary, in Java client they were disabled by default until recently.
The reason is that when a change in broker set happens slow enough to be detected by the periodic (5 min) metadata refresh, the client cannot remain without brokers.

In any case the metadata response contains at least the majority of KRaft elegible brokers. If majority changes, it must contain at least one broker from previous set.
So it's usually not possible the client remains without brokers, unless the set of brokers changes to fast it cannot reach any of them and that's the case KIP-899 and KIP-1102 are addressing. But this last case is also a problem with current librdkafka code so we leave the improvement for a later PR.

There are a few things left to check, at least:

  • removing bootstrap brokers without id (-1) and adding brokers with real ids and advertised hostnames, to avoid duplicating the threads
  • check some fields in rd_kafka_s: no_idemp_brokers, rk_telemetry.preferred_broker, rk_broker_down_cnt, rk_broker_up_cnt, rk_broker_cnt, ... when brokers are removed. It could help changing the state to DOWN before removing them.
  • maybe there's something to change in the stats_cb, but probably nothing to do
  • brokers configured with rd_kafka_brokers_add must be removed after usage too
  • currently, decommissioned threads are still joined on destroy, is it possible to join them from time to time without accumulating them but also without blocking the main thread because of that? At specific intervals maybe, so they're already terminated when joined
  • check thoroughly for memory leaks and use-after-free cases

@emasab
Copy link
Contributor

emasab commented Jan 10, 2025

  • check some fields in rd_kafka_s: no_idemp_brokers, rk_telemetry.preferred_broker, rk_broker_down_cnt, rk_broker_up_cnt, rk_broker_cnt, ... when brokers are removed. It could help changing the state to DOWN before removing them.

no_idemp_brokers seems not being used,
rk_broker_cnt is decremented in rd_kafka_broker_thread_main.
rk_broker_down_cnt and rk_broker_up_cnt are changed when broker thread receives RD_KAFKA_OP_TERMINATE: it calls rd_kafka_broker_fail that sets the state to DOWN and changes those counters.
rk_telemetry.preferred_broker is also cleared on rd_kafka_broker_fail

@confluent-cla-assistant
Copy link

confluent-cla-assistant bot commented Feb 18, 2025

🎉 All Contributor License Agreements have been signed. Ready to merge.
✅ mfleming
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

emasab added 2 commits March 28, 2025 16:32
…sible to await for the correct list of brokers in all tests given since decommissioning brokers are excluded from that list
Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking test part.

emasab added 4 commits March 31, 2025 15:48
test log interceptor.
Used the test log interceptor for test 0151 too
Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few comments on the test part.

@emasab
Copy link
Contributor

emasab commented Apr 1, 2025

/sem-approve

Copy link
Contributor

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving as I was initially reviewing. Thanks @pranavrth and @mfleming !

Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!. Great work @emasab and @mfleming. Thanks.

removing bootstrap brokers:

In tests 0121 bootstrap brokers from different clusters
are added to the same list, that is something that should never
be done. Previously it was keeping both sets of bootstrap
brokers and giving a warning. Now it's keeping only the
`learned` brokers from the cluster that replies first.
@emasab
Copy link
Contributor

emasab commented Apr 1, 2025

/sem-approve

@emasab
Copy link
Contributor

emasab commented Apr 1, 2025

/sem-approve

Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks @emasab and @mfleming !!

@emasab emasab merged commit f7c4273 into confluentinc:master Apr 1, 2025
2 checks passed
airlock-confluentinc bot pushed a commit that referenced this pull request Apr 8, 2025
* Fix for brokers with different Ids but same host:port

The Kafka protocol allows for brokers to have multiple host:port pairs
for a given node Id, e.g. see UpdateMetadata request which contains a
live_brokers list where each broker Id has a list of host:port pairs. It
follows from this that the thing that uniquely identifies a broker is
its Id, and not the host:port.

The behaviour right now is that if we have multiple brokers with the
same host:port but different Ids, the first broker in the list will be
updated to have the Id of whatever broker we're looking at as we iterate
through the brokers in the Metadata response in
rd_kafka_parse_Metadata0(), e.g.

 Step 1. Broker[0] = Metadata.brokers[0]
 Step 2. Broker[0] = Metadata.brokers[1]
 Step 3. Broker[0] = Metadata.brokers[2]

A typical situation where brokers have the same host:port pair but
differ in their Id is if the brokers are behind a load balancer.

The NODE_UPDATE mechanism responsible for this was originally added in
b09ff60 ("Handle broker name and nodeid updates (issue #343)") as a way
to forcibly update a broker hostname if an Id is reused with a new host
after the original one was decommissioned. But this isn't how the Java
Kafka client works, so let's use the Metadata response as the source of
truth instead of updating brokers if we can only match by their
host:port.

* Fix for purging brokers no longer reported in metadata

Brokers that are not in the metadata should be purged from the internal
client lists. This helps to avoid annoying "No route to host" and other
connection failure messages.

Fixes #238.

* Remove the possibility to modify rkb_nodeid after rkb creation.

* Remove locking when accessing rkb_nodeid as it's now set only on creation and not modified anymore

* Add new brokers and reassign partitions in the mock cluster

* Remove bootstrap broker after receiving learned
ones. Wait decommissioned threads after they've stopped instead of on termination.

* Handle the _DESTROY_BROKER local error,
triggered when a broker is removed
without terminating the client.

* Test 0151 improved with cluster replacement
and cluster roll

* Fix for test 0105, do_test_txn_broker_down_in_txn:
remove left references when decommissioning a broker
and avoid it's selected as leader again or that partitions
are delegated to it

* Avoid selecting a configured broker as a logical or telemetry broker

* Avoid selecting terminating brokers for sending calls or new connections

* Remove addressless count and avoid counting the logical
broker for the all brokers down error, to send
the error in all the cases

* Test: verify that decommissining a broker while adding a new one with same id isn't causing problems

* Handle the case where current group coordinator
is decommissioned without leaving dangling references
until the coordinator is changed

Test 0151 fix. Given the find coordinator response adds a new broker (not a logical one, a learned one to set into `rkcg_curr_coord`)
Removed brokers can be added again even if not present in metadata. This is mock cluster only problem as in a
real cluster a broker that is set down cannot be a coordinator. This commit changes the coordinator before
setting down a broker that is current coordinator

* Remove the decommissioning broker from rk_broker_by_id when starting to avoid multiple
instances are added to the list with same id.
The decommissioned broker returned by the find can lead to multiple brokers with same id being added.

* Don't select logical brokers at all
for general purpose request like metadata ones

* Schedule an immediate connection when there are no brokers connecting nor requests for connection.
When we're in this state, if we respect the sparse connection interval, there's no event that notifies the awaiters at `rd_kafka_brokers_wait_state_change` given it's an interval and not a timer.
This is more visible when brokers are decommissioned and there's no broker down even causing the notification.
Check it with test `0113` subtest `u_multiple_subscription_changes`.

* Remove all configured brokers when there are learned
ones. This is to avoid leaving connections to the boostrap brokers that are continued to be used instead of the learned one, adding additional requests that can be later purged by the decommissioning of that last configured broker.

* Change test 0075 after removing all bootstrap brokers.
This will be reverted with KIP-899

* Remove rk_logical_broker_up_cnt

* [test 0151] Simplify the test removing `await_verification`. It's possible to await for the correct list of brokers in all tests given since decommissioning brokers are excluded from that list

* Remove broker state from labels

* Remove `nodeid` from op

* Use `rk_broker_by_id` for learned broker ids to return sorted broker ids

* Verify nodename change through a
test log interceptor.
Used the test log interceptor for test 0151 too


---------

Co-authored-by: Emanuele Sabellico <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Purge brokers no longer reported in metadata

4 participants