Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
8dc8665
Fix for brokers with different Ids but same host:port
mfleming Nov 2, 2023
cf114ef
Fix locking
mfleming Nov 17, 2023
9754ee7
Fix for purging brokers no longer reported in metadata
mfleming Nov 16, 2023
ba5a3c4
Update tests/0145-broker-same-host-port.c
mfleming May 24, 2024
a735614
Update tests/0146-purge-brokers.c
mfleming May 24, 2024
05e8950
Remove the possibility to modify rkb_nodeid after
emasab May 14, 2024
df12c6a
Remove locking when accessing rkb_nodeid
emasab Jun 7, 2024
4365c9e
Add new brokers and reassign partitions in the
emasab Jun 10, 2024
6253618
CHANGELOG
emasab Jun 10, 2024
d16b8ce
Automatic style fix
emasab Jun 10, 2024
959bf16
Rename to rd_kafka_brokers_learned_ids
emasab Jun 10, 2024
f6c6811
Remove debug configuration in test
emasab Jun 10, 2024
90d330e
Merge branch 'master' into purge-brokers
emasab Feb 22, 2025
fd4c298
Remove bootstrap broker after receiving learned
emasab Jan 10, 2025
67d4b12
Handle the _DESTROY_BROKER local error,
emasab Jan 10, 2025
ac54d6a
Change test number
emasab Jan 10, 2025
4200fd6
Test 0151 improved with cluster replacement
emasab Jan 23, 2025
066d698
Fix for test 0105, do_test_txn_broker_down_in_txn:
emasab Jan 23, 2025
a070ba0
Avoid selecting a configured broker as a logical or telemetry broker
emasab Feb 11, 2025
27a1386
Avoid selecting terminating brokers for sending calls or new connections
emasab Feb 18, 2025
302c90c
Remove addressless count and avoid counting the logical
emasab Feb 26, 2025
eca3fd9
fixup: Handle the _DESTROY_BROKER local error,
emasab Mar 13, 2025
efc3954
fixup: rd_kafka_brokers_learned_ids
emasab Mar 13, 2025
1d04f86
Test: verify that decommissining a broker while adding a new one with…
emasab Mar 13, 2025
426bc80
fixup: Avoid selecting a configured broker as a logical or telemetry …
emasab Mar 18, 2025
0d94749
Handle the case where current group coordinator
emasab Mar 18, 2025
0d13f48
fixup: Handle the _DESTROY_BROKER local error,
emasab Mar 18, 2025
ed8bbe1
Remove the decommissioning broker from rk_broker_by_id when starting …
emasab Mar 19, 2025
c657034
Don't select logical brokers at all
emasab Mar 19, 2025
8df0710
fixup: Handle the _DESTROY_BROKER local error,
emasab Mar 25, 2025
b2782d9
Schedule an immediate connection when there are no brokers connecting…
emasab Mar 20, 2025
3f38488
Remove all configured brokers when there are learned
emasab Mar 20, 2025
077f29d
Change test 0075 after removing all bootstrap brokers.
emasab Mar 24, 2025
2a7a0aa
Remove rk_logical_broker_up_cnt
emasab Mar 27, 2025
43624e6
Address rest of comments
emasab Mar 27, 2025
018d150
[test 0149] Fix Windows builds
emasab Mar 28, 2025
372525c
fixup: Address rest of comments
emasab Mar 28, 2025
8d775ef
[test 0151] Simplify the test removing `await_verification`. It's pos…
emasab Mar 28, 2025
bc989d2
Remove broker state from labels
emasab Mar 31, 2025
5d27eca
Remove `nodeid` from op
emasab Mar 31, 2025
fce45ba
Address comments
emasab Mar 31, 2025
0dd5db0
Use `rk_broker_by_id` for learned broker ids to
emasab Mar 31, 2025
d5b2a42
Verify nodename change through a
emasab Mar 31, 2025
9c71107
Address comments
emasab Apr 1, 2025
20a3f10
Additional documentation for the log interceptor
emasab Apr 1, 2025
5889efb
Merge branch 'master' into purge-brokers
emasab Apr 1, 2025
9027489
Test changes because of
emasab Jan 20, 2025
cd6d98b
clang fix for variable definition inside switch case
emasab Apr 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1407,9 +1407,8 @@ rd_kafka_broker_t *rd_kafka_broker_random0(const char *func,
int fcnt = 0;

TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
if (rd_kafka_broker_or_instance_terminating(rkb))
continue;
if (RD_KAFKA_BROKER_IS_LOGICAL(rkb))
if (rd_kafka_broker_or_instance_terminating(rkb) ||
RD_KAFKA_BROKER_IS_LOGICAL(rkb))
continue;

rd_kafka_broker_lock(rkb);
Expand Down Expand Up @@ -3483,7 +3482,7 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) {
break;

case RD_KAFKA_OP_CONNECT:
rd_atomic32_add(&rkb->rkb_rk->rk_scheduled_connections_cnt, -1);
rd_atomic32_sub(&rkb->rkb_rk->rk_scheduled_connections_cnt, 1);
/* Sparse connections: connection requested, transition
* to TRY_CONNECT state to trigger new connection. */
if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_INIT) {
Expand Down Expand Up @@ -5540,7 +5539,6 @@ void rd_kafka_broker_update(rd_kafka_t *rk,
rko = rd_kafka_op_new(RD_KAFKA_OP_NODE_UPDATE);
rd_strlcpy(rko->rko_u.node.nodename, nodename,
sizeof(rko->rko_u.node.nodename));
rko->rko_u.node.nodeid = mdb->id;
/* Perform a blocking op request so that all
* broker-related state, such as the rk broker list,
* is up to date by the time this call returns.
Expand Down Expand Up @@ -5941,7 +5939,7 @@ void rd_kafka_broker_schedule_connection(rd_kafka_broker_t *rkb) {
rko = rd_kafka_op_new(RD_KAFKA_OP_CONNECT);
rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_FLASH);
if (!rd_kafka_q_enq(rkb->rkb_ops, rko))
rd_atomic32_add(&rkb->rkb_rk->rk_scheduled_connections_cnt, -1);
rd_atomic32_sub(&rkb->rkb_rk->rk_scheduled_connections_cnt, 1);
}


Expand Down Expand Up @@ -6118,19 +6116,14 @@ void rd_kafka_broker_start_reauth_cb(rd_kafka_timers_t *rkts, void *_rkb) {

int32_t *rd_kafka_brokers_learned_ids(rd_kafka_t *rk, size_t *cntp) {
rd_kafka_broker_t *rkb;

size_t all_broker_cnt = rd_atomic32_get(&rk->rk_broker_cnt);
/* This over-allocates but simplifies the code. */
int32_t *ids = malloc(sizeof(*ids) * all_broker_cnt);
int32_t *p = ids;
int32_t *ids, *p;
int32_t i;

*cntp = 0;
rd_kafka_rdlock(rk);
TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
if (rkb->rkb_source != RD_KAFKA_LEARNED ||
rd_kafka_broker_termination_in_progress(rkb))
continue;

ids = malloc(sizeof(*ids) * rd_list_cnt(&rk->rk_broker_by_id));
p = ids;
RD_LIST_FOREACH(rkb, &rk->rk_broker_by_id, i) {
*p++ = rkb->rkb_nodeid;
(*cntp)++;
}
Expand Down
3 changes: 1 addition & 2 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,7 @@ static void rd_kafka_metadata_decommission_unavailable_brokers(
TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
rd_bool_t purge_broker;

if (rkb->rkb_source != RD_KAFKA_LEARNED &&
rkb->rkb_source != RD_KAFKA_CONFIGURED)
if (rkb->rkb_source == RD_KAFKA_LOGICAL)
continue;

purge_broker = rd_true;
Expand Down
1 change: 0 additions & 1 deletion src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ struct rd_kafka_op_s {
} dr;

struct {
int32_t nodeid;
char nodename[RD_KAFKA_NODENAME_SIZE];
} node;

Expand Down
108 changes: 79 additions & 29 deletions tests/0149-broker-same-host-port.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,76 @@

#include "test.h"

static rd_bool_t broker1_changed;
static rd_bool_t broker2_changed;
static rd_bool_t broker3_changed;

/**
* @brief Keep track of which brokers have changed their nodename.
*/
static void broker_same_host_port_mock_log_cb(const rd_kafka_t *rk,
int level,
const char *fac,
const char *buf) {
const char *nodename = "to localhost:11192";
if (strstr(buf, "/1: Nodename changed") && strstr(buf, nodename))
broker1_changed = rd_true;

if (strstr(buf, "/2: Nodename changed") && strstr(buf, nodename))
broker2_changed = rd_true;

if (strstr(buf, "/3: Nodename changed") && strstr(buf, nodename))
broker3_changed = rd_true;
}

static void broker_same_host_port_mock_verify_broker_ids(rd_kafka_t *rk) {
const rd_kafka_metadata_t *md;
rd_kafka_resp_err_t err;
int32_t *ids;
size_t cnt = 0;
const size_t num_brokers = 3;
size_t i;

/* Trigger Metadata request which will get initial broker hostnames. */
err = rd_kafka_metadata(rk, 0, NULL, &md, tmout_multip(5000));
/* Metadata timeout can happen if nodename change did already
* take place and there was a disconnection followed by a retry */
if (err && err != RD_KAFKA_RESP_ERR__TIMED_OUT)
TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
else if (!err)
rd_kafka_metadata_destroy(md);

ids = rd_kafka_brokers_learned_ids(rk, &cnt);

TEST_ASSERT(cnt == num_brokers,
"expected %" PRIusz " brokers in cache, not %" PRIusz,
num_brokers, cnt);

for (i = 0; i < cnt; i++) {
int32_t expected_id = i + 1;

TEST_ASSERT(ids[i] == expected_id,
"expected broker %d in cache, not %d", expected_id,
ids[i]);
}
if (ids)
free(ids);
}

/**
* @brief It should be possible to set the same hostname to brokers with
* broker ids, when doing that, verify that the brokers are kept
* separate instances and that the hostname is propagated to all of them.
*/
int main_0149_broker_same_host_port_mock(int argc, char **argv) {
rd_kafka_mock_cluster_t *cluster;
const char *bootstraps;
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
const rd_kafka_metadata_t *md;
rd_kafka_resp_err_t err;
const size_t num_brokers = 3;
size_t cnt = 0;
int32_t *ids;
size_t i;
test_conf_log_interceptor_t *log_interceptor;
const char *debug_contexts[2] = {"broker", NULL};

if (test_needs_auth()) {
TEST_SKIP("Mock cluster does not support SSL/SASL\n");
Expand All @@ -48,41 +106,33 @@ int main_0149_broker_same_host_port_mock(int argc, char **argv) {

cluster = test_mock_cluster_new(num_brokers, &bootstraps);

for (i = 1; i <= num_brokers; i++) {
rd_kafka_mock_broker_set_host_port(cluster, i, "localhost",
9092);
}

test_conf_init(&conf, NULL, 10);

test_conf_init(&conf, NULL, tmout_multip(10));
test_conf_set(conf, "bootstrap.servers", bootstraps);
log_interceptor = test_conf_set_log_interceptor(
conf, broker_same_host_port_mock_log_cb, debug_contexts);

rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

/* Trigger Metadata request which will update learned brokers. */
err = rd_kafka_metadata(rk, 0, NULL, &md, tmout_multip(5000));
rd_kafka_metadata_destroy(md);
TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));

ids = rd_kafka_brokers_learned_ids(rk, &cnt);
TEST_SAY("Initial metadata request\n");
/* Trigger Metadata request which will get initial broker hostnames. */
broker_same_host_port_mock_verify_broker_ids(rk);

TEST_ASSERT(cnt == num_brokers,
"expected %" PRIusz " brokers in cache, not %" PRIusz,
num_brokers, cnt);
TEST_SAY("Changing nodenames\n");
for (i = 1; i <= num_brokers; i++) {
rd_kafka_mock_broker_set_host_port(cluster, i, "localhost",
11192);
}

for (i = 0; i < cnt; i++) {
/* Brokers are added at the head of the list. */
int32_t expected_id = cnt - i;
TEST_SAY("Modified nodenames metadata request\n");
/* Trigger Metadata request which will get initial changed hostnames. */
broker_same_host_port_mock_verify_broker_ids(rk);

TEST_ASSERT(ids[i] == expected_id,
"expected broker %d in cache, not %d", expected_id,
ids[i]);
}
while (!(broker1_changed && broker2_changed && broker3_changed))
rd_usleep(100000, 0);

if (ids)
free(ids);
rd_kafka_destroy(rk);
test_mock_cluster_destroy(cluster);
rd_free(log_interceptor);

return 0;
}
37 changes: 7 additions & 30 deletions tests/0151-purge-brokers.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,9 @@

#include "test.h"

static const char *test_debug;
static rd_atomic32_t do_test_remove_then_add_received_terminate;
static rd_atomic32_t verification_complete;

static int int32_cmp(const void *a, const void *b) {
int *int_a = (int *)a;
int *int_b = (int *)b;
return *int_a - *int_b;
}

/** @brief Verify that \p expected_broker_ids
* and \p actual_broker_ids correspond in
* count and value. Sorts \p actual_broker_ids .
Expand All @@ -50,8 +43,6 @@ static rd_bool_t fetch_metadata_verify_brokers(int32_t *expected_broker_ids,
if (actual_broker_id_cnt != expected_broker_id_cnt)
return rd_false;

qsort(actual_broker_ids, actual_broker_id_cnt,
sizeof(*actual_broker_ids), int32_cmp);
for (i = 0; i < actual_broker_id_cnt; i++) {
if (actual_broker_ids[i] != expected_broker_ids[i])
return rd_false;
Expand Down Expand Up @@ -325,22 +316,12 @@ static void do_test_remove_then_add_log_cb(const rd_kafka_t *rk,
int level,
const char *fac,
const char *buf) {
int secs, msecs;
struct timeval tv;
if (!rd_atomic32_get(&do_test_remove_then_add_received_terminate) &&
strstr(buf, "/1: Handle terminates in state") > 0) {
rd_atomic32_set(&do_test_remove_then_add_received_terminate, 1);
while (!rd_atomic32_get(&verification_complete))
rd_usleep(100 * 1000, 0);
}

if (test_debug) {
rd_gettimeofday(&tv, NULL);
secs = (int)tv.tv_sec;
msecs = (int)(tv.tv_usec / 1000);
fprintf(stderr, "%%%i|%u.%03u|%s|%s| %s\n", level, secs, msecs,
fac, rk ? rd_kafka_name(rk) : "", buf);
}
}

/**
Expand All @@ -363,24 +344,20 @@ static rd_bool_t do_test_remove_then_add_await_after_action_cb(int action) {
* Add a pause after receiving the TERMINATE op to allow to
* proceed with adding it again before it's decommissioned.
*/
static test_conf_log_interceptor_t *log_interceptor;
static void
do_test_remove_then_add_edit_configuration_cb(rd_kafka_conf_t *conf) {
const char *debug_contexts[2] = {"broker", NULL};

/* This timeout verifies that the correct brokers are returned
* without duplicates as soon as possible. */
test_timeout_set(6);
/* Hidden property that forces connections to all brokers,
* increasing likelyhood of wrong behaviour if the decommissioned broker
* starts re-connecting. */
test_conf_set(conf, "enable.sparse.connections", "false");
if (!test_debug ||
(!strstr(test_debug, "broker") && !strstr(test_debug, "all"))) {
char debug_with_broker[512];
rd_snprintf(debug_with_broker, sizeof(debug_with_broker),
"%s%s%s", test_debug ? test_debug : "",
test_debug ? "," : "", "broker");
test_conf_set(conf, "debug", debug_with_broker);
}
rd_kafka_conf_set_log_cb(conf, do_test_remove_then_add_log_cb);
log_interceptor = test_conf_set_log_interceptor(
conf, do_test_remove_then_add_log_cb, debug_contexts);
}

/**
Expand Down Expand Up @@ -411,6 +388,8 @@ static void do_test_remove_then_add(void) {
expected_brokers_cnt, do_test_remove_then_add_edit_configuration_cb,
do_test_remove_then_add_await_after_action_cb);

rd_free(log_interceptor);
log_interceptor = NULL;
SUB_TEST_PASS();
}

Expand All @@ -420,8 +399,6 @@ int main_0151_purge_brokers_mock(int argc, char **argv) {
TEST_SKIP("Mock cluster does not support SSL/SASL\n");
return 0;
}
test_debug = test_getenv("TEST_DEBUG", NULL);


do_test_replace_with_new_cluster();

Expand Down
Loading