Skip to content

Commit f9dae77

Browse files
Fixes #4976 - added optional custom scheduler to Producer and AdminClient KafkaClientMetrics
1 parent 06cff5f commit f9dae77

File tree

5 files changed

+70
-4
lines changed

5 files changed

+70
-4
lines changed

micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetrics.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@
4545
@NonNullFields
4646
public class KafkaClientMetrics extends KafkaMetrics {
4747

48+
/**
49+
* Kafka {@link Producer} metrics binder
50+
* @param kafkaProducer producer instance to be instrumented
51+
* @param tags additional tags
52+
* @param scheduler scheduler to check and bind metrics``
53+
*/
54+
public KafkaClientMetrics(Producer<?, ?> kafkaProducer, Iterable<Tag> tags, ScheduledExecutorService scheduler) {
55+
super(kafkaProducer::metrics, tags, scheduler);
56+
}
57+
4858
/**
4959
* Kafka {@link Producer} metrics binder
5060
* @param kafkaProducer producer instance to be instrumented
@@ -89,6 +99,16 @@ public KafkaClientMetrics(Consumer<?, ?> kafkaConsumer) {
8999
super(kafkaConsumer::metrics);
90100
}
91101

102+
/**
103+
* Kafka {@link AdminClient} metrics binder
104+
* @param adminClient instance to be instrumented
105+
* @param tags additional tags
106+
* @param scheduler scheduler to check and bind metrics
107+
*/
108+
public KafkaClientMetrics(AdminClient adminClient, Iterable<Tag> tags, ScheduledExecutorService scheduler) {
109+
super(adminClient::metrics, tags, scheduler);
110+
}
111+
92112
/**
93113
* Kafka {@link AdminClient} metrics binder
94114
* @param adminClient instance to be instrumented

micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsAdminTest.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.junit.jupiter.api.Test;
2424

2525
import java.util.Properties;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.ScheduledExecutorService;
2628

2729
import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX;
2830
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
@@ -32,7 +34,7 @@ class KafkaClientMetricsAdminTest {
3234

3335
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
3436

35-
private Tags tags = Tags.of("app", "myapp", "version", "1");
37+
private final Tags tags = Tags.of("app", "myapp", "version", "1");
3638

3739
KafkaClientMetrics metrics;
3840

@@ -69,6 +71,27 @@ void shouldCreateMetersWithTags() {
6971
}
7072
}
7173

74+
@Test
75+
void shouldCreateMetersWithTagsAndCustomScheduler() {
76+
try (AdminClient adminClient = createAdmin()) {
77+
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1);
78+
metrics = new KafkaClientMetrics(adminClient, tags, customScheduler);
79+
MeterRegistry registry = new SimpleMeterRegistry();
80+
81+
metrics.bindTo(registry);
82+
83+
assertThat(registry.getMeters()).hasSizeGreaterThan(0)
84+
.extracting(meter -> meter.getId().getTag("app"))
85+
.allMatch(s -> s.equals("myapp"));
86+
87+
metrics.close();
88+
assertThat(customScheduler.isShutdown()).isFalse();
89+
90+
customScheduler.shutdownNow();
91+
assertThat(customScheduler.isShutdown()).isTrue();
92+
}
93+
}
94+
7295
private AdminClient createAdmin() {
7396
Properties adminConfig = new Properties();
7497
adminConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);

micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsConsumerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ void shouldCreateMetersWithTags() {
7676
@Test
7777
void shouldCreateMetersWithTagsAndCustomScheduler() {
7878
try (Consumer<String, String> consumer = createConsumer()) {
79-
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(2);
79+
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1);
8080
metrics = new KafkaClientMetrics(consumer, tags, customScheduler);
8181
MeterRegistry registry = new SimpleMeterRegistry();
8282

micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsProducerTest.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.junit.jupiter.api.Test;
2626

2727
import java.util.Properties;
28+
import java.util.concurrent.Executors;
29+
import java.util.concurrent.ScheduledExecutorService;
2830

2931
import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX;
3032
import static org.apache.kafka.clients.producer.ProducerConfig.*;
@@ -34,7 +36,7 @@ class KafkaClientMetricsProducerTest {
3436

3537
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
3638

37-
private Tags tags = Tags.of("app", "myapp", "version", "1");
39+
private final Tags tags = Tags.of("app", "myapp", "version", "1");
3840

3941
KafkaClientMetrics metrics;
4042

@@ -71,6 +73,27 @@ void shouldCreateMetersWithTags() {
7173
}
7274
}
7375

76+
@Test
77+
void shouldCreateMetersWithTagsAndCustomScheduler() {
78+
try (Producer<String, String> producer = createProducer()) {
79+
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1);
80+
metrics = new KafkaClientMetrics(producer, tags, customScheduler);
81+
MeterRegistry registry = new SimpleMeterRegistry();
82+
83+
metrics.bindTo(registry);
84+
85+
assertThat(registry.getMeters()).hasSizeGreaterThan(0)
86+
.extracting(meter -> meter.getId().getTag("app"))
87+
.allMatch(s -> s.equals("myapp"));
88+
89+
metrics.close();
90+
assertThat(customScheduler.isShutdown()).isFalse();
91+
92+
customScheduler.shutdownNow();
93+
assertThat(customScheduler.isShutdown()).isTrue();
94+
}
95+
}
96+
7497
private Producer<String, String> createProducer() {
7598
Properties producerConfig = new Properties();
7699
producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);

micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaMetricsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ void closeShouldRemoveAllMetersAndNotShutdownCustomScheduler() {
9898
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
9999
return Collections.singletonMap(metricName, metric);
100100
};
101-
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(2);
101+
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1);
102102
kafkaMetrics = new KafkaMetrics(supplier, Collections.emptyList(), customScheduler);
103103
MeterRegistry registry = new SimpleMeterRegistry();
104104

0 commit comments

Comments
 (0)