@@ -4605,8 +4605,10 @@ static int rd_kafka_broker_thread_main(void *arg) {
4605
4605
break ;
4606
4606
}
4607
4607
4608
- if (unlikely (rd_kafka_terminating (rkb -> rkb_rk )))
4608
+ if (unlikely (rd_kafka_terminating (rkb -> rkb_rk ))) {
4609
4609
rd_kafka_broker_serve (rkb , 1000 );
4610
+ break ;
4611
+ }
4610
4612
4611
4613
if (!rd_kafka_sasl_ready (rkb -> rkb_rk )) {
4612
4614
/* SASL provider not yet ready. */
@@ -4751,28 +4753,41 @@ static int rd_kafka_broker_thread_main(void *arg) {
4751
4753
}
4752
4754
}
4753
4755
4754
- if (rkb -> rkb_source != RD_KAFKA_INTERNAL ) {
4755
- rd_kafka_wrlock (rkb -> rkb_rk );
4756
- TAILQ_REMOVE (& rkb -> rkb_rk -> rk_brokers , rkb , rkb_link );
4757
- if (rkb -> rkb_nodeid != -1 && !RD_KAFKA_BROKER_IS_LOGICAL (rkb ))
4758
- rd_list_remove (& rkb -> rkb_rk -> rk_broker_by_id , rkb );
4759
- (void )rd_atomic32_sub (& rkb -> rkb_rk -> rk_broker_cnt , 1 );
4760
- rd_kafka_wrunlock (rkb -> rkb_rk );
4761
- }
4762
-
4763
- rd_kafka_broker_fail (rkb , LOG_DEBUG , RD_KAFKA_RESP_ERR__DESTROY ,
4764
- "Broker handle is terminating" );
4765
-
4766
4756
/* Disable and drain ops queue.
4767
4757
* Simply purging the ops queue risks leaving dangling references
4768
4758
* for ops such as PARTITION_JOIN/PARTITION_LEAVE where the broker
4769
4759
* reference is not maintained in the rko (but in rktp_next_leader).
4770
- * #1596 */
4760
+ * #1596.
4761
+ * Do this before failing the broker to make sure no buffers
4762
+ * are enqueued after that. */
4771
4763
rd_kafka_q_disable (rkb -> rkb_ops );
4772
4764
while (rd_kafka_broker_ops_serve (rkb , RD_POLL_NOWAIT ))
4773
4765
;
4774
4766
4775
- rd_kafka_broker_destroy (rkb );
4767
+ rd_kafka_broker_fail (rkb , LOG_DEBUG , rd_kafka_broker_destroy_error (rk ),
4768
+ "Broker handle is terminating" );
4769
+
4770
+ rd_rkb_dbg (rkb , BROKER , "TERMINATE" ,
4771
+ "Handle terminates in state %s: "
4772
+ "%d refcnts (%p), %d toppar(s), "
4773
+ "%d active toppar(s), "
4774
+ "%d outbufs, %d waitresps, %d retrybufs" ,
4775
+ rd_kafka_broker_state_names [rkb -> rkb_state ],
4776
+ rd_refcnt_get (& rkb -> rkb_refcnt ), & rkb -> rkb_refcnt ,
4777
+ rkb -> rkb_toppar_cnt , rkb -> rkb_active_toppar_cnt ,
4778
+ (int )rd_kafka_bufq_cnt (& rkb -> rkb_outbufs ),
4779
+ (int )rd_kafka_bufq_cnt (& rkb -> rkb_waitresps ),
4780
+ (int )rd_kafka_bufq_cnt (& rkb -> rkb_retrybufs ));
4781
+
4782
+ rd_dassert (rkb -> rkb_state == RD_KAFKA_BROKER_STATE_DOWN );
4783
+ if (rkb -> rkb_source != RD_KAFKA_INTERNAL ) {
4784
+ rd_kafka_wrlock (rkb -> rkb_rk );
4785
+ TAILQ_REMOVE (& rkb -> rkb_rk -> rk_brokers , rkb , rkb_link );
4786
+ if (rkb -> rkb_nodeid != -1 && !RD_KAFKA_BROKER_IS_LOGICAL (rkb ))
4787
+ rd_list_remove (& rkb -> rkb_rk -> rk_broker_by_id , rkb );
4788
+ (void )rd_atomic32_sub (& rkb -> rkb_rk -> rk_broker_cnt , 1 );
4789
+ rd_kafka_wrunlock (rkb -> rkb_rk );
4790
+ }
4776
4791
4777
4792
#if WITH_SSL
4778
4793
/* Remove OpenSSL per-thread error state to avoid memory leaks */
0 commit comments