@@ -62,7 +62,6 @@ static rd_bool_t fetch_metadata_verify_brokers(int32_t *expected_broker_ids,
6262static void fetch_metadata (rd_kafka_t * rk ,
6363 int32_t * expected_broker_ids ,
6464 size_t expected_broker_id_cnt ,
65- rd_bool_t await_verification ,
6665 rd_bool_t (* await_after_action_cb )(int action ),
6766 int action ) {
6867 const rd_kafka_metadata_t * md = NULL ;
@@ -87,41 +86,33 @@ static void fetch_metadata(rd_kafka_t *rk,
8786
8887 rd_usleep (100 * 1000 , 0 );
8988
90- if (await_verification ) {
91- RD_IF_FREE (actual_broker_ids , rd_free );
92- actual_broker_ids = rd_kafka_brokers_learned_ids (
93- rk , & actual_broker_id_cnt );
94- }
89+ RD_IF_FREE (actual_broker_ids , rd_free );
90+ actual_broker_ids =
91+ rd_kafka_brokers_learned_ids (rk , & actual_broker_id_cnt );
9592 continue_requesting_metadata = test_clock () <= abs_timeout_us ;
9693
97- if (await_verification )
98- continue_requesting_metadata &=
99- !fetch_metadata_verify_brokers (
100- expected_broker_ids , expected_broker_id_cnt ,
101- actual_broker_ids , actual_broker_id_cnt );
94+ continue_requesting_metadata &= !fetch_metadata_verify_brokers (
95+ expected_broker_ids , expected_broker_id_cnt ,
96+ actual_broker_ids , actual_broker_id_cnt );
97+
10298 if (await_after_action_cb )
103- continue_requesting_metadata & =
99+ continue_requesting_metadata | =
104100 await_after_action_cb (action );
105- else if (!await_verification )
106- continue_requesting_metadata = rd_false ;
101+
107102 } while (continue_requesting_metadata );
108103
109- if (await_verification ) {
110- TEST_ASSERT (actual_broker_id_cnt == expected_broker_id_cnt ,
111- "expected %" PRIusz
112- " brokers in cache, got %" PRIusz ,
113- expected_broker_id_cnt , actual_broker_id_cnt );
104+ TEST_ASSERT (actual_broker_id_cnt == expected_broker_id_cnt ,
105+ "expected %" PRIusz " brokers in cache, got %" PRIusz ,
106+ expected_broker_id_cnt , actual_broker_id_cnt );
114107
115- for (i = 0 ; i < actual_broker_id_cnt ; i ++ ) {
116- TEST_ASSERT (
117- actual_broker_ids [i ] == expected_broker_ids [i ],
108+ for (i = 0 ; i < actual_broker_id_cnt ; i ++ ) {
109+ TEST_ASSERT (actual_broker_ids [i ] == expected_broker_ids [i ],
118110 "expected broker id[%" PRIusz
119111 "] to be "
120112 "%" PRId32 ", got %" PRId32 ,
121113 i , expected_broker_ids [i ], actual_broker_ids [i ]);
122- }
123- RD_IF_FREE (actual_broker_ids , rd_free );
124114 }
115+ RD_IF_FREE (actual_broker_ids , rd_free );
125116}
126117
127118/**
@@ -160,7 +151,6 @@ do_test_add_remove_brokers(int32_t initial_cluster_size,
160151 int32_t expected_broker_ids [][5 ],
161152 size_t expected_broker_ids_cnt ,
162153 int32_t expected_brokers_cnt [],
163- rd_bool_t await_verification ,
164154 void (* edit_configuration_cb )(rd_kafka_conf_t * conf ),
165155 rd_bool_t (* await_after_action_cb )(int action )) {
166156 rd_kafka_mock_cluster_t * cluster ;
@@ -244,10 +234,6 @@ do_test_add_remove_brokers(int32_t initial_cluster_size,
244234
245235 fetch_metadata (rk , expected_broker_ids [action ],
246236 expected_brokers_cnt [action ],
247- /* Await verification at each action or only
248- * after last action. */
249- await_verification ||
250- action == expected_broker_ids_cnt - 1 ,
251237 await_after_action_cb , action );
252238 }
253239 TEST_SAY ("Test verification complete\n" );
@@ -282,12 +268,9 @@ static void do_test_replace_with_new_cluster(void) {
282268 {TEST_ACTION_ADD_BROKER , 6 },
283269 };
284270
285- do_test_add_remove_brokers (
286- 3 , actions , expected_broker_ids , RD_ARRAY_SIZE (expected_broker_ids ),
287- expected_brokers_cnt , rd_true /* await verification after
288- each action */
289- ,
290- NULL , NULL );
271+ do_test_add_remove_brokers (3 , actions , expected_broker_ids ,
272+ RD_ARRAY_SIZE (expected_broker_ids ),
273+ expected_brokers_cnt , NULL , NULL );
291274
292275 SUB_TEST_PASS ();
293276}
@@ -330,10 +313,7 @@ static void do_test_cluster_roll(void) {
330313
331314 do_test_add_remove_brokers (5 , actions , expected_broker_ids ,
332315 RD_ARRAY_SIZE (expected_broker_ids ),
333- expected_brokers_cnt ,
334- rd_true
335- /* await verification after each action */ ,
336- NULL , NULL );
316+ expected_brokers_cnt , NULL , NULL );
337317
338318 SUB_TEST_PASS ();
339319}
@@ -364,18 +344,16 @@ static void do_test_remove_then_add_log_cb(const rd_kafka_t *rk,
364344}
365345
366346/**
367- * @brief Await for the TERMINATE op to be received after the first action
368- * that removes the broker then proceed with action 2 to
347+ * @brief Await for the TERMINATE op to be received after the action
348+ * that removes the broker then proceed to
369349 * add the broker again.
370350 */
371351static rd_bool_t do_test_remove_then_add_await_after_action_cb (int action ) {
372352 if (action == 2 ) {
373353 /* Wait until TERMINATE is received */
374354 return !rd_atomic32_get (
375355 & do_test_remove_then_add_received_terminate );
376- } else if (action == 3 )
377- /* Wait until final verification */
378- return rd_true ;
356+ }
379357 return rd_false ;
380358}
381359
@@ -390,7 +368,7 @@ do_test_remove_then_add_edit_configuration_cb(rd_kafka_conf_t *conf) {
390368 /* This timeout verifies that the correct brokers are returned
391369 * without duplicates as soon as possible. */
392370 test_timeout_set (6 );
393- /* Hidden property the forces connections to all brokers,
371+ /* Hidden property that forces connections to all brokers,
394372 * increasing likelyhood of wrong behaviour if the decommissioned broker
395373 * starts re-connecting. */
396374 test_conf_set (conf , "enable.sparse.connections" , "false" );
@@ -430,10 +408,7 @@ static void do_test_remove_then_add(void) {
430408
431409 do_test_add_remove_brokers (
432410 3 , actions , expected_broker_ids , RD_ARRAY_SIZE (expected_broker_ids ),
433- expected_brokers_cnt , rd_false /* await verification
434- * only after last action. */
435- ,
436- do_test_remove_then_add_edit_configuration_cb ,
411+ expected_brokers_cnt , do_test_remove_then_add_edit_configuration_cb ,
437412 do_test_remove_then_add_await_after_action_cb );
438413
439414 SUB_TEST_PASS ();
0 commit comments