Skip to content

Commit a5bfc21

Browse files
KAFKA-16791 Add thread detection to ClusterTestExtensions (#16499)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 9db5c24 commit a5bfc21

File tree

3 files changed

+164
-1
lines changed

3 files changed

+164
-1
lines changed

core/src/test/java/kafka/test/junit/ClusterTestExtensions.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,27 @@
2828
import kafka.test.annotation.Type;
2929

3030
import org.apache.kafka.server.common.Features;
31+
import org.apache.kafka.test.TestUtils;
3132

33+
import org.junit.jupiter.api.extension.AfterEachCallback;
34+
import org.junit.jupiter.api.extension.BeforeEachCallback;
3235
import org.junit.jupiter.api.extension.ExtensionContext;
36+
import org.junit.jupiter.api.extension.ExtensionContext.Namespace;
37+
import org.junit.jupiter.api.extension.ExtensionContext.Store;
3338
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
3439
import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
3540
import org.junit.platform.commons.util.ReflectionUtils;
3641

3742
import java.lang.reflect.Method;
3843
import java.util.ArrayList;
3944
import java.util.Arrays;
45+
import java.util.Collections;
4046
import java.util.HashSet;
4147
import java.util.List;
4248
import java.util.Map;
4349
import java.util.Optional;
50+
import java.util.Set;
51+
import java.util.concurrent.atomic.AtomicReference;
4452
import java.util.function.Function;
4553
import java.util.stream.Collectors;
4654
import java.util.stream.Stream;
@@ -82,7 +90,20 @@
8290
* SomeIntegrationTest will be instantiated, lifecycle methods (before/after) will be run, and "someTest" will be invoked.
8391
*
8492
*/
85-
public class ClusterTestExtensions implements TestTemplateInvocationContextProvider {
93+
public class ClusterTestExtensions implements TestTemplateInvocationContextProvider, BeforeEachCallback, AfterEachCallback {
94+
private static final String METRICS_METER_TICK_THREAD_PREFIX = "metrics-meter-tick-thread";
95+
private static final String SCALA_THREAD_PREFIX = "scala-";
96+
private static final String FORK_JOIN_POOL_THREAD_PREFIX = "ForkJoinPool";
97+
private static final String JUNIT_THREAD_PREFIX = "junit-";
98+
private static final String ATTACH_LISTENER_THREAD_PREFIX = "Attach Listener";
99+
private static final String PROCESS_REAPER_THREAD_PREFIX = "process reaper";
100+
private static final String RMI_THREAD_PREFIX = "RMI";
101+
private static final String DETECT_THREAD_LEAK_KEY = "detectThreadLeak";
102+
private static final Set<String> SKIPPED_THREAD_PREFIX = Collections.unmodifiableSet(Stream.of(
103+
METRICS_METER_TICK_THREAD_PREFIX, SCALA_THREAD_PREFIX, FORK_JOIN_POOL_THREAD_PREFIX, JUNIT_THREAD_PREFIX,
104+
ATTACH_LISTENER_THREAD_PREFIX, PROCESS_REAPER_THREAD_PREFIX, RMI_THREAD_PREFIX)
105+
.collect(Collectors.toSet()));
106+
86107
@Override
87108
public boolean supportsTestTemplate(ExtensionContext context) {
88109
return true;
@@ -119,7 +140,31 @@ public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContex
119140
return generatedContexts.stream();
120141
}
121142

143+
@Override
144+
public void beforeEach(ExtensionContext context) {
145+
DetectThreadLeak detectThreadLeak = DetectThreadLeak.of(thread ->
146+
SKIPPED_THREAD_PREFIX.stream().noneMatch(prefix -> thread.getName().startsWith(prefix)));
147+
getStore(context).put(DETECT_THREAD_LEAK_KEY, detectThreadLeak);
148+
}
149+
150+
@Override
151+
public void afterEach(ExtensionContext context) throws InterruptedException {
152+
DetectThreadLeak detectThreadLeak = getStore(context).remove(DETECT_THREAD_LEAK_KEY, DetectThreadLeak.class);
153+
if (detectThreadLeak == null) {
154+
return;
155+
}
156+
AtomicReference<List<Thread>> lastThread = new AtomicReference<>(Collections.emptyList());
157+
TestUtils.waitForCondition(() -> {
158+
List<Thread> threads = detectThreadLeak.newThreads();
159+
lastThread.set(threads);
160+
return threads.isEmpty();
161+
}, () -> "Thread leak detected: " +
162+
lastThread.get().stream().map(Thread::getName).collect(Collectors.joining(", ")));
163+
}
122164

165+
private Store getStore(ExtensionContext context) {
166+
return context.getStore(Namespace.create(context.getUniqueId()));
167+
}
123168

124169
List<TestTemplateInvocationContext> processClusterTemplate(ExtensionContext context, ClusterTemplate annot) {
125170
if (annot.value().trim().isEmpty()) {
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.test.junit;
19+
20+
import java.util.List;
21+
import java.util.Set;
22+
import java.util.function.Predicate;
23+
import java.util.stream.Collectors;
24+
25+
public interface DetectThreadLeak {
26+
/**
27+
* @return the new threads after `DetectThreadLeak` is created
28+
*/
29+
List<Thread> newThreads();
30+
31+
/**
32+
* Creates an instance of {@link DetectThreadLeak} that filters threads based on a given predicate.
33+
* This method captures the current state of threads that match the predicate at the time of invocation
34+
* and provides a way to detect new threads that match the predicate but were not present at the initial capture.
35+
*
36+
* @param predicate A {@link Predicate<Thread>} used to filter threads. Only threads that satisfy
37+
* the predicate are considered for detection.
38+
* @return An instance of {@link DetectThreadLeak} that can be used to detect new threads matching
39+
* the predicate that were not present at the time of this method's invocation.
40+
* The {@link DetectThreadLeak#newThreads()} method of the returned instance will return a list
41+
* of new threads that match the predicate and have been started after this method was called.
42+
*/
43+
static DetectThreadLeak of(Predicate<Thread> predicate) {
44+
Set<Long> before = Thread.getAllStackTraces().keySet()
45+
.stream().filter(predicate).map(Thread::getId).collect(Collectors.toSet());
46+
return () -> Thread.getAllStackTraces().keySet()
47+
.stream().filter(predicate)
48+
.filter(t -> !before.contains(t.getId()))
49+
.collect(Collectors.toList());
50+
}
51+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.test.junit;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import static org.junit.jupiter.api.Assertions.assertFalse;
23+
import static org.junit.jupiter.api.Assertions.assertTrue;
24+
25+
public class DetectThreadLeakTest {
26+
27+
private static class LeakThread implements Runnable {
28+
@Override
29+
public void run() {
30+
try {
31+
Thread.sleep(20000);
32+
} catch (InterruptedException e) {
33+
// this can be neglected
34+
}
35+
}
36+
}
37+
38+
@Test
39+
public void testThreadLeak() throws InterruptedException {
40+
DetectThreadLeak detectThreadLeak = DetectThreadLeak.of(thread -> true);
41+
Thread leakThread = new Thread(new LeakThread());
42+
try {
43+
leakThread.start();
44+
assertTrue(detectThreadLeak.newThreads().contains(leakThread));
45+
leakThread.interrupt();
46+
} finally {
47+
leakThread.join();
48+
}
49+
assertFalse(leakThread.isAlive(), "Can't interrupt the thread");
50+
assertFalse(detectThreadLeak.newThreads().contains(leakThread));
51+
}
52+
53+
@Test
54+
public void testDetectThreadLeakWithOverrideExpectedThreadNames() throws InterruptedException {
55+
String threadName = "test-thread";
56+
DetectThreadLeak detectThreadLeak = DetectThreadLeak.of(thread -> !thread.getName().equals(threadName));
57+
Thread leakThread = new Thread(new LeakThread(), threadName);
58+
try {
59+
leakThread.start();
60+
assertFalse(detectThreadLeak.newThreads().contains(leakThread));
61+
leakThread.interrupt();
62+
} finally {
63+
leakThread.join();
64+
}
65+
assertFalse(leakThread.isAlive(), "Can't interrupt the thread");
66+
}
67+
}

0 commit comments

Comments
 (0)