Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;

import java.util.concurrent.ScheduledExecutorService;

/**
* Kafka Client metrics binder. This should be closed on application shutdown to clean up
* resources.
Expand All @@ -43,6 +45,21 @@
@NonNullFields
public class KafkaClientMetrics extends KafkaMetrics {

/**
* Kafka {@link Producer} metrics binder. The lifecycle of the custom scheduler passed
* is the responsibility of the caller. It will not be shut down when this instance is
* {@link #close() closed}. A scheduler can be shared among multiple instances of
* {@link KafkaClientMetrics} to reduce resource usage by reducing the number of
* threads if there will be many instances.
* @param kafkaProducer producer instance to be instrumented
* @param tags additional tags
* @param scheduler custom scheduler to check and bind metrics
* @since 1.14.0
*/
public KafkaClientMetrics(Producer<?, ?> kafkaProducer, Iterable<Tag> tags, ScheduledExecutorService scheduler) {
super(kafkaProducer::metrics, tags, scheduler);
}

/**
* Kafka {@link Producer} metrics binder
* @param kafkaProducer producer instance to be instrumented
Expand All @@ -60,6 +77,21 @@ public KafkaClientMetrics(Producer<?, ?> kafkaProducer) {
super(kafkaProducer::metrics);
}

/**
* Kafka {@link Consumer} metrics binder. The lifecycle of the custom scheduler passed
* is the responsibility of the caller. It will not be shut down when this instance is
* {@link #close() closed}. A scheduler can be shared among multiple instances of
* {@link KafkaClientMetrics} to reduce resource usage by reducing the number of
* threads if there will be many instances.
* @param kafkaConsumer consumer instance to be instrumented
* @param tags additional tags
* @param scheduler custom scheduler to check and bind metrics
* @since 1.14.0
*/
public KafkaClientMetrics(Consumer<?, ?> kafkaConsumer, Iterable<Tag> tags, ScheduledExecutorService scheduler) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine with me since it does not bring any breaking changes.
In Spring we have a ThreadPoolTaskScheduler (which is auto-configured by Spring Boot) and that one comes with a property:

	/**
	 * Return the underlying ScheduledExecutorService for native access.
	 * @return the underlying ScheduledExecutorService (never {@code null})
	 * @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet
	 */
	public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {

I'll be totally fine exposing that property on the MicrometerConsumerListener and MicrometerProducerListener.
However you are not done with the fixed here yet.
Pay attention to those ctors of this class which relies on the Producer and AdminClient, respectively.
Thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @artembilan.
@vasiliy-sarzhynskyi any reason to not add similar constructors for producers and admin clients?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shakuzen I need to check how producers and admin clients are used by Spring-Kafka, and how many threads we might have with multiple produced topics (whether the number of threads increased or not). We could add there as well, but do we have use cases when producers and admin clients might need that? In my understanding, it's especially relevant for consumers as we might specify concurrency config value, and a single app instance will have hundreds of threads.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not sure how is this relevant to the use of those clients in the app, the scheduler we provide here is used for publishing metrics from those clients. See bindTo() impl.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This scheduler isn't for publishing; MeterRegistry implementations can be configured with a scheduler to publish metrics. This scheduler is for syncing the metrics from the Kafka client to Micrometer's registry periodically. The background is in #4976, but with many clients and the existing code, each client will have a thread for syncing between the Kafka client's metrics and Micrometer. With the changes here, a thread pool could be shared among the clients for this syncing.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @shakuzen , for clarification!
So, sounds like I was right anyway: this scheduler has nothing to do with Spring Kafka and has same effect for all the clients.
Therefore similar injection approach has to be applied for Producer and Admin.
Otherwise it is strange that we can customize a scheduler for Consumer client metrics synchronization, but that is not the case for Producer and Admin.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. In that case, I will update the current PR for Producer and Admin as well

Copy link
Contributor Author

@vasiliy-sarzhynskyi vasiliy-sarzhynskyi Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shakuzen I pushed my changes for Producer and AdminClient constructors as well. After I rebased on branch main, recently added test class VirtualThreadMetricsTests is failing, looks like it's flaky.
I could try to increase the upper bound threshold inside VirtualThreadMetricsTests pinnedEventsShouldBeRecorded(..), but would be better to contact owner of these changes to clarify the intention of asserted values (changes by Virtual threads metrics)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pinging @jonatan-ivanov about the flakiness of VirtualThreadMetricsTests. I'll re-run the CI manually in the meantime.

super(kafkaConsumer::metrics, tags, scheduler);
}

/**
* Kafka {@link Consumer} metrics binder
* @param kafkaConsumer consumer instance to be instrumented
Expand All @@ -77,6 +109,21 @@ public KafkaClientMetrics(Consumer<?, ?> kafkaConsumer) {
super(kafkaConsumer::metrics);
}

/**
* Kafka {@link AdminClient} metrics binder. The lifecycle of the custom scheduler
* passed is the responsibility of the caller. It will not be shut down when this
* instance is {@link #close() closed}. A scheduler can be shared among multiple
* instances of {@link KafkaClientMetrics} to reduce resource usage by reducing the
* number of threads if there will be many instances.
* @param adminClient instance to be instrumented
* @param tags additional tags
* @param scheduler custom scheduler to check and bind metrics
* @since 1.14.0
*/
public KafkaClientMetrics(AdminClient adminClient, Iterable<Tag> tags, ScheduledExecutorService scheduler) {
super(adminClient::metrics, tags, scheduler);
}

/**
* Kafka {@link AdminClient} metrics binder
* @param adminClient instance to be instrumented
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class KafkaMetrics implements MeterBinder, AutoCloseable {
static final String KAFKA_VERSION_TAG_NAME = "kafka.version";
static final String DEFAULT_VALUE = "unknown";

private static final String DEFAULT_SCHEDULER_THREAD_NAME_PREFIX = "micrometer-kafka-metrics";

private static final Set<Class<?>> counterMeasurableClasses = new HashSet<>();

static {
Expand All @@ -96,8 +98,9 @@ class KafkaMetrics implements MeterBinder, AutoCloseable {

private final Duration refreshInterval;

private final ScheduledExecutorService scheduler = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("micrometer-kafka-metrics"));
private final ScheduledExecutorService scheduler;

private final boolean schedulerExternallyManaged;

@Nullable
private Iterable<Tag> commonTags;
Expand All @@ -122,11 +125,23 @@ class KafkaMetrics implements MeterBinder, AutoCloseable {
this(metricsSupplier, extraTags, DEFAULT_REFRESH_INTERVAL);
}

KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> extraTags,
ScheduledExecutorService scheduler) {
this(metricsSupplier, extraTags, DEFAULT_REFRESH_INTERVAL, scheduler, true);
}

KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> extraTags,
Duration refreshInterval) {
this(metricsSupplier, extraTags, refreshInterval, createDefaultScheduler(), false);
}

KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> extraTags,
Duration refreshInterval, ScheduledExecutorService scheduler, boolean schedulerExternallyManaged) {
this.metricsSupplier = metricsSupplier;
this.extraTags = extraTags;
this.refreshInterval = refreshInterval;
this.scheduler = scheduler;
this.schedulerExternallyManaged = schedulerExternallyManaged;
}

@Override
Expand Down Expand Up @@ -295,6 +310,10 @@ private static Class<? extends Measurable> getMeasurableClass(Metric metric) {
}
}

private static ScheduledExecutorService createDefaultScheduler() {
return Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(DEFAULT_SCHEDULER_THREAD_NAME_PREFIX));
}

private Gauge registerGauge(MeterRegistry registry, MetricName metricName, String meterName, Iterable<Tag> tags) {
return Gauge.builder(meterName, this.metrics, toMetricValue(metricName))
.tags(tags)
Expand Down Expand Up @@ -344,7 +363,9 @@ private Meter.Id meterIdForComparison(MetricName metricName) {

@Override
public void close() {
this.scheduler.shutdownNow();
if (!schedulerExternallyManaged) {
this.scheduler.shutdownNow();
}

for (Meter.Id id : registeredMeterIds) {
registry.remove(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.kafka.common.Metric;
import org.apache.kafka.streams.KafkaStreams;

import java.util.concurrent.ScheduledExecutorService;

/**
* Kafka Streams metrics binder. This should be closed on application shutdown to clean up
* resources.
Expand Down Expand Up @@ -58,4 +60,19 @@ public KafkaStreamsMetrics(KafkaStreams kafkaStreams) {
super(kafkaStreams::metrics);
}

/**
* {@link KafkaStreams} metrics binder. The lifecycle of the custom scheduler passed
* is the responsibility of the caller. It will not be shut down when this instance is
* {@link #close() closed}. A scheduler can be shared among multiple instances of
* {@link KafkaStreamsMetrics} to reduce resource usage by reducing the number of
* threads if there will be many instances.
* @param kafkaStreams instance to be instrumented
* @param tags additional tags
* @param scheduler customer scheduler to run the task that checks and binds metrics
* @since 1.14.0
*/
public KafkaStreamsMetrics(KafkaStreams kafkaStreams, Iterable<Tag> tags, ScheduledExecutorService scheduler) {
super(kafkaStreams::metrics, tags, scheduler);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.junit.jupiter.api.Test;

import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX;
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
Expand All @@ -32,7 +34,7 @@ class KafkaClientMetricsAdminTest {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";

private Tags tags = Tags.of("app", "myapp", "version", "1");
private final Tags tags = Tags.of("app", "myapp", "version", "1");

KafkaClientMetrics metrics;

Expand Down Expand Up @@ -69,6 +71,27 @@ void shouldCreateMetersWithTags() {
}
}

@Test
void shouldCreateMetersWithTagsAndCustomScheduler() {
try (AdminClient adminClient = createAdmin()) {
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1);
metrics = new KafkaClientMetrics(adminClient, tags, customScheduler);
MeterRegistry registry = new SimpleMeterRegistry();

metrics.bindTo(registry);

assertThat(registry.getMeters()).hasSizeGreaterThan(0)
.extracting(meter -> meter.getId().getTag("app"))
.allMatch(s -> s.equals("myapp"));

metrics.close();
assertThat(customScheduler.isShutdown()).isFalse();

customScheduler.shutdownNow();
assertThat(customScheduler.isShutdown()).isTrue();
}
}

private AdminClient createAdmin() {
Properties adminConfig = new Properties();
adminConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.junit.jupiter.api.Test;

import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX;
import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
Expand All @@ -34,7 +36,7 @@ class KafkaClientMetricsConsumerTest {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";

private Tags tags = Tags.of("app", "myapp", "version", "1");
private final Tags tags = Tags.of("app", "myapp", "version", "1");

KafkaClientMetrics metrics;

Expand Down Expand Up @@ -71,6 +73,27 @@ void shouldCreateMetersWithTags() {
}
}

@Test
void shouldCreateMetersWithTagsAndCustomScheduler() {
try (Consumer<String, String> consumer = createConsumer()) {
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1);
metrics = new KafkaClientMetrics(consumer, tags, customScheduler);
MeterRegistry registry = new SimpleMeterRegistry();

metrics.bindTo(registry);

assertThat(registry.getMeters()).hasSizeGreaterThan(0)
.extracting(meter -> meter.getId().getTag("app"))
.allMatch(s -> s.equals("myapp"));

metrics.close();
assertThat(customScheduler.isShutdown()).isFalse();

customScheduler.shutdownNow();
assertThat(customScheduler.isShutdown()).isTrue();
}
}

private Consumer<String, String> createConsumer() {
Properties consumerConfig = new Properties();
consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.junit.jupiter.api.Test;

import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX;
import static org.apache.kafka.clients.producer.ProducerConfig.*;
Expand All @@ -34,7 +36,7 @@ class KafkaClientMetricsProducerTest {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";

private Tags tags = Tags.of("app", "myapp", "version", "1");
private final Tags tags = Tags.of("app", "myapp", "version", "1");

KafkaClientMetrics metrics;

Expand Down Expand Up @@ -71,6 +73,27 @@ void shouldCreateMetersWithTags() {
}
}

@Test
void shouldCreateMetersWithTagsAndCustomScheduler() {
try (Producer<String, String> producer = createProducer()) {
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1);
metrics = new KafkaClientMetrics(producer, tags, customScheduler);
MeterRegistry registry = new SimpleMeterRegistry();

metrics.bindTo(registry);

assertThat(registry.getMeters()).hasSizeGreaterThan(0)
.extracting(meter -> meter.getId().getTag("app"))
.allMatch(s -> s.equals("myapp"));

metrics.close();
assertThat(customScheduler.isShutdown()).isFalse();

customScheduler.shutdownNow();
assertThat(customScheduler.isShutdown()).isTrue();
}
}

private Producer<String, String> createProducer() {
Properties producerConfig = new Properties();
producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class KafkaMetricsTest {

Expand Down Expand Up @@ -68,7 +71,7 @@ void shouldKeepMetersWhenMetricsDoNotChange() {
}

@Test
void closeShouldRemoveAllMeters() {
void closeShouldRemoveAllMetersAndShutdownDefaultScheduler() {
// Given
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>());
Expand All @@ -80,9 +83,35 @@ void closeShouldRemoveAllMeters() {

kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
assertThat(isDefaultMetricsSchedulerThreadAlive()).isTrue();

kafkaMetrics.close();
assertThat(registry.getMeters()).isEmpty();
await().until(() -> !isDefaultMetricsSchedulerThreadAlive());
}

@Test
void closeShouldRemoveAllMetersAndNotShutdownCustomScheduler() {
// Given
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>());
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
return Collections.singletonMap(metricName, metric);
};
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1);
kafkaMetrics = new KafkaMetrics(supplier, Collections.emptyList(), customScheduler);
MeterRegistry registry = new SimpleMeterRegistry();

kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
await().until(() -> !isDefaultMetricsSchedulerThreadAlive());

kafkaMetrics.close();
assertThat(registry.getMeters()).isEmpty();
assertThat(customScheduler.isShutdown()).isFalse();

customScheduler.shutdownNow();
assertThat(customScheduler.isShutdown()).isTrue();
}

@Test
Expand Down Expand Up @@ -552,4 +581,13 @@ private KafkaMetric createKafkaMetric(MetricName metricName) {
return new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
}

private static boolean isDefaultMetricsSchedulerThreadAlive() {
return Thread.getAllStackTraces()
.keySet()
.stream()
.filter(Thread::isAlive)
.map(Thread::getName)
.anyMatch(name -> name.startsWith("micrometer-kafka-metrics"));
}

}
Loading