Skip to content

Commit 9b684e8

Browse files
committed
Added check to prevent NPE logs during consumer.close()
1 parent fd9b551 commit 9b684e8

File tree

2 files changed

+18
-6
lines changed

2 files changed

+18
-6
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -895,12 +895,14 @@ private void close(final Duration timeout, final boolean swallowException) {
895895
closeTimer.update();
896896

897897
// Prepare shutting down the network thread
898-
swallow(log, Level.ERROR, "Failed to release assignment before closing consumer",
899-
() -> sendAcknowledgementsAndLeaveGroup(closeTimer, firstException), firstException);
900-
swallow(log, Level.ERROR, "Failed to stop finding coordinator",
901-
this::stopFindCoordinatorOnClose, firstException);
902-
swallow(log, Level.ERROR, "Failed invoking acknowledgement commit callback",
903-
() -> handleCompletedAcknowledgements(true), firstException);
898+
if (applicationEventHandler != null && backgroundEventReaper != null && backgroundEventQueue != null) {
899+
swallow(log, Level.ERROR, "Failed to release assignment before closing consumer",
900+
() -> sendAcknowledgementsAndLeaveGroup(closeTimer, firstException), firstException);
901+
swallow(log, Level.ERROR, "Failed to stop finding coordinator",
902+
this::stopFindCoordinatorOnClose, firstException);
903+
swallow(log, Level.ERROR, "Failed invoking acknowledgement commit callback",
904+
() -> handleCompletedAcknowledgements(true), firstException);
905+
}
904906
if (applicationEventHandler != null)
905907
closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException);
906908
closeTimer.update();

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.kafka.common.metrics.Metrics;
4444
import org.apache.kafka.common.protocol.Errors;
4545
import org.apache.kafka.common.serialization.StringDeserializer;
46+
import org.apache.kafka.common.utils.LogCaptureAppender;
4647
import org.apache.kafka.common.utils.LogContext;
4748
import org.apache.kafka.common.utils.MockTime;
4849
import org.apache.kafka.common.utils.Time;
@@ -77,6 +78,7 @@
7778
import static java.util.Collections.singletonList;
7879
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
7980
import static org.junit.jupiter.api.Assertions.assertEquals;
81+
import static org.junit.jupiter.api.Assertions.assertFalse;
8082
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
8183
import static org.junit.jupiter.api.Assertions.assertNull;
8284
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -207,11 +209,19 @@ public void testFailConstructor() {
207209
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
208210
props.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "an.invalid.class");
209211
final ConsumerConfig config = new ConsumerConfig(props);
212+
213+
LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
210214
KafkaException ce = assertThrows(
211215
KafkaException.class,
212216
() -> newConsumer(config));
213217
assertTrue(ce.getMessage().contains("Failed to construct Kafka share consumer"), "Unexpected exception message: " + ce.getMessage());
214218
assertTrue(ce.getCause().getMessage().contains("Class an.invalid.class cannot be found"), "Unexpected cause: " + ce.getCause());
219+
220+
boolean npeLogged = appender.getEvents().stream()
221+
.flatMap(event -> event.getThrowableInfo().stream())
222+
.anyMatch(str -> str.contains("NullPointerException"));
223+
224+
assertFalse(npeLogged, "Unexpected NullPointerException during consumer construction");
215225
}
216226

217227
@Test

0 commit comments

Comments
 (0)