Skip to content

Commit 06cff5f

Browse files
Fixes #4976 - added optional custom scheduler to Consumer KafkaClientMetrics
1 parent c53b4b1 commit 06cff5f

File tree

4 files changed

+99
-5
lines changed

4 files changed

+99
-5
lines changed

micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetrics.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.kafka.clients.producer.Producer;
2525
import org.apache.kafka.common.Metric;
2626

27+
import java.util.concurrent.ScheduledExecutorService;
28+
2729
/**
2830
* Kafka Client metrics binder. This should be closed on application shutdown to clean up
2931
* resources.
@@ -60,6 +62,16 @@ public KafkaClientMetrics(Producer<?, ?> kafkaProducer) {
6062
super(kafkaProducer::metrics);
6163
}
6264

65+
/**
66+
* Kafka {@link Consumer} metrics binder
67+
* @param kafkaConsumer consumer instance to be instrumented
68+
* @param tags additional tags
69+
* @param scheduler scheduler to check and bind metrics
70+
*/
71+
public KafkaClientMetrics(Consumer<?, ?> kafkaConsumer, Iterable<Tag> tags, ScheduledExecutorService scheduler) {
72+
super(kafkaConsumer::metrics, tags, scheduler);
73+
}
74+
6375
/**
6476
* Kafka {@link Consumer} metrics binder
6577
* @param kafkaConsumer consumer instance to be instrumented

micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaMetrics.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ class KafkaMetrics implements MeterBinder, AutoCloseable {
7171
static final String KAFKA_VERSION_TAG_NAME = "kafka.version";
7272
static final String DEFAULT_VALUE = "unknown";
7373

74+
private static final String DEFAULT_SCHEDULER_THREAD_NAME_PREFIX = "micrometer-kafka-metrics";
75+
7476
private static final Set<Class<?>> counterMeasurableClasses = new HashSet<>();
7577

7678
static {
@@ -96,8 +98,9 @@ class KafkaMetrics implements MeterBinder, AutoCloseable {
9698

9799
private final Duration refreshInterval;
98100

99-
private final ScheduledExecutorService scheduler = Executors
100-
.newSingleThreadScheduledExecutor(new NamedThreadFactory("micrometer-kafka-metrics"));
101+
private final ScheduledExecutorService scheduler;
102+
103+
private final boolean schedulerExternallyManaged;
101104

102105
@Nullable
103106
private Iterable<Tag> commonTags;
@@ -122,11 +125,23 @@ class KafkaMetrics implements MeterBinder, AutoCloseable {
122125
this(metricsSupplier, extraTags, DEFAULT_REFRESH_INTERVAL);
123126
}
124127

128+
KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> extraTags,
129+
ScheduledExecutorService scheduler) {
130+
this(metricsSupplier, extraTags, DEFAULT_REFRESH_INTERVAL, scheduler, true);
131+
}
132+
125133
KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> extraTags,
126134
Duration refreshInterval) {
135+
this(metricsSupplier, extraTags, refreshInterval, createDefaultScheduler(), false);
136+
}
137+
138+
KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> extraTags,
139+
Duration refreshInterval, ScheduledExecutorService scheduler, boolean schedulerExternallyManaged) {
127140
this.metricsSupplier = metricsSupplier;
128141
this.extraTags = extraTags;
129142
this.refreshInterval = refreshInterval;
143+
this.scheduler = scheduler;
144+
this.schedulerExternallyManaged = schedulerExternallyManaged;
130145
}
131146

132147
@Override
@@ -295,6 +310,10 @@ private static Class<? extends Measurable> getMeasurableClass(Metric metric) {
295310
}
296311
}
297312

313+
private static ScheduledExecutorService createDefaultScheduler() {
314+
return Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(DEFAULT_SCHEDULER_THREAD_NAME_PREFIX));
315+
}
316+
298317
private Gauge registerGauge(MeterRegistry registry, MetricName metricName, String meterName, Iterable<Tag> tags) {
299318
return Gauge.builder(meterName, this.metrics, toMetricValue(metricName))
300319
.tags(tags)
@@ -344,7 +363,9 @@ private Meter.Id meterIdForComparison(MetricName metricName) {
344363

345364
@Override
346365
public void close() {
347-
this.scheduler.shutdownNow();
366+
if (!schedulerExternallyManaged) {
367+
this.scheduler.shutdownNow();
368+
}
348369

349370
for (Meter.Id id : registeredMeterIds) {
350371
registry.remove(id);

micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsConsumerTest.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.junit.jupiter.api.Test;
2626

2727
import java.util.Properties;
28+
import java.util.concurrent.Executors;
29+
import java.util.concurrent.ScheduledExecutorService;
2830

2931
import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX;
3032
import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
@@ -34,7 +36,7 @@ class KafkaClientMetricsConsumerTest {
3436

3537
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
3638

37-
private Tags tags = Tags.of("app", "myapp", "version", "1");
39+
private final Tags tags = Tags.of("app", "myapp", "version", "1");
3840

3941
KafkaClientMetrics metrics;
4042

@@ -71,6 +73,27 @@ void shouldCreateMetersWithTags() {
7173
}
7274
}
7375

76+
@Test
77+
void shouldCreateMetersWithTagsAndCustomScheduler() {
78+
try (Consumer<String, String> consumer = createConsumer()) {
79+
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(2);
80+
metrics = new KafkaClientMetrics(consumer, tags, customScheduler);
81+
MeterRegistry registry = new SimpleMeterRegistry();
82+
83+
metrics.bindTo(registry);
84+
85+
assertThat(registry.getMeters()).hasSizeGreaterThan(0)
86+
.extracting(meter -> meter.getId().getTag("app"))
87+
.allMatch(s -> s.equals("myapp"));
88+
89+
metrics.close();
90+
assertThat(customScheduler.isShutdown()).isFalse();
91+
92+
customScheduler.shutdownNow();
93+
assertThat(customScheduler.isShutdown()).isTrue();
94+
}
95+
}
96+
7497
private Consumer<String, String> createConsumer() {
7598
Properties consumerConfig = new Properties();
7699
consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);

micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaMetricsTest.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,13 @@
3434
import java.util.HashMap;
3535
import java.util.LinkedHashMap;
3636
import java.util.Map;
37+
import java.util.concurrent.Executors;
38+
import java.util.concurrent.ScheduledExecutorService;
3739
import java.util.concurrent.atomic.AtomicReference;
3840
import java.util.function.Supplier;
3941

4042
import static org.assertj.core.api.Assertions.assertThat;
43+
import static org.awaitility.Awaitility.await;
4144

4245
class KafkaMetricsTest {
4346

@@ -68,7 +71,7 @@ void shouldKeepMetersWhenMetricsDoNotChange() {
6871
}
6972

7073
@Test
71-
void closeShouldRemoveAllMeters() {
74+
void closeShouldRemoveAllMetersAndShutdownDefaultScheduler() {
7275
// Given
7376
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
7477
MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>());
@@ -80,9 +83,35 @@ void closeShouldRemoveAllMeters() {
8083

8184
kafkaMetrics.bindTo(registry);
8285
assertThat(registry.getMeters()).hasSize(1);
86+
assertThat(isDefaultMetricsSchedulerThreadAlive()).isTrue();
8387

8488
kafkaMetrics.close();
8589
assertThat(registry.getMeters()).isEmpty();
90+
await().until(() -> !isDefaultMetricsSchedulerThreadAlive());
91+
}
92+
93+
@Test
94+
void closeShouldRemoveAllMetersAndNotShutdownCustomScheduler() {
95+
// Given
96+
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
97+
MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>());
98+
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
99+
return Collections.singletonMap(metricName, metric);
100+
};
101+
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(2);
102+
kafkaMetrics = new KafkaMetrics(supplier, Collections.emptyList(), customScheduler);
103+
MeterRegistry registry = new SimpleMeterRegistry();
104+
105+
kafkaMetrics.bindTo(registry);
106+
assertThat(registry.getMeters()).hasSize(1);
107+
await().until(() -> !isDefaultMetricsSchedulerThreadAlive());
108+
109+
kafkaMetrics.close();
110+
assertThat(registry.getMeters()).isEmpty();
111+
assertThat(customScheduler.isShutdown()).isFalse();
112+
113+
customScheduler.shutdownNow();
114+
assertThat(customScheduler.isShutdown()).isTrue();
86115
}
87116

88117
@Test
@@ -552,4 +581,13 @@ private KafkaMetric createKafkaMetric(MetricName metricName) {
552581
return new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
553582
}
554583

584+
private static boolean isDefaultMetricsSchedulerThreadAlive() {
585+
return Thread.getAllStackTraces()
586+
.keySet()
587+
.stream()
588+
.filter(Thread::isAlive)
589+
.map(Thread::getName)
590+
.anyMatch(name -> name.startsWith("micrometer-kafka-metrics"));
591+
}
592+
555593
}

0 commit comments

Comments
 (0)