Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.consumerReceiveInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
Expand All @@ -16,16 +17,20 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

Expand All @@ -38,6 +43,14 @@ public ElementMatcher<TypeDescription> typeMatcher() {

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Map.class)),
this.getClass().getName() + "$ConstructorAdvice");

transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Properties.class)),
this.getClass().getName() + "$ConstructorAdvice");

transformer.applyAdviceToMethod(
named("poll")
.and(isPublic())
Expand All @@ -47,6 +60,34 @@ public void transform(TypeTransformer transformer) {
this.getClass().getName() + "$PollAdvice");
}

@SuppressWarnings("unused")
public static class ConstructorAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.This Consumer<?, ?> consumer, @Advice.Argument(0) Object configs) {

String bootstrapServers = null;
if (configs instanceof Map) {
Object servers = ((Map<?, ?>) configs).get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
if (servers != null) {
bootstrapServers = servers.toString();
}
} else if (configs instanceof Properties) {
bootstrapServers =
((Properties) configs).getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
}

if (bootstrapServers != null) {
VirtualField<Consumer<?, ?>, String> consumerStringVirtualField =
VirtualField.find(Consumer.class, String.class);
if (consumerStringVirtualField.get(consumer) == null) {
consumerStringVirtualField.set(consumer, bootstrapServers);
}
}
}
}

@SuppressWarnings("unused")
public static class PollAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,29 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11;

import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.producerInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaPropagation;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerInstrumentation implements TypeInstrumentation {
Expand All @@ -34,6 +40,14 @@ public ElementMatcher<TypeDescription> typeMatcher() {

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Map.class)),
this.getClass().getName() + "$ConstructorAdvice");

transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Properties.class)),
this.getClass().getName() + "$ConstructorAdvice");

transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
Expand All @@ -43,19 +57,50 @@ public void transform(TypeTransformer transformer) {
KafkaProducerInstrumentation.class.getName() + "$SendAdvice");
}

@SuppressWarnings("unused")
public static class ConstructorAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.This Producer<?, ?> producer, @Advice.Argument(0) Object configs) {

String bootstrapServers = null;
if (configs instanceof Map) {
Object servers = ((Map<?, ?>) configs).get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
if (servers != null) {
bootstrapServers = servers.toString();
}
} else if (configs instanceof Properties) {
bootstrapServers =
((Properties) configs).getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
}

if (bootstrapServers != null) {
VirtualField<Producer<?, ?>, String> producerStringVirtualField =
VirtualField.find(Producer.class, String.class);
if (producerStringVirtualField.get(producer) == null) {
producerStringVirtualField.set(producer, bootstrapServers);
}
}
}
}

@SuppressWarnings("unused")
public static class SendAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static KafkaProducerRequest onEnter(
@Advice.This Producer<?, ?> producer,
@Advice.FieldValue("apiVersions") ApiVersions apiVersions,
@Advice.FieldValue("clientId") String clientId,
@Advice.Argument(value = 0, readOnly = false) ProducerRecord<?, ?> record,
@Advice.Argument(value = 1, readOnly = false) Callback callback,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {

KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
String bootstrapServers = VirtualField.find(Producer.class, String.class).get(producer);
KafkaProducerRequest request =
KafkaProducerRequest.create(record, clientId, bootstrapServers);
Context parentContext = Java8BytecodeBridge.currentContext();
if (!producerInstrumenter().shouldStart(parentContext, request)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public abstract class KafkaClientBaseTest {
protected static final String SHARED_TOPIC = "shared.topic";
protected static final AttributeKey<String> MESSAGING_CLIENT_ID =
AttributeKey.stringKey("messaging.client_id");
protected static final AttributeKey<String> MESSAGING_KAFKA_BOOTSTRAP_SERVERS =
AttributeKey.stringKey("messaging.kafka.bootstrap.servers");

private KafkaContainer kafka;
protected Producer<Integer, String> producer;
Expand Down Expand Up @@ -177,6 +179,7 @@ protected static List<AttributeAssertion> sendAttributes(
equalTo(MESSAGING_SYSTEM, "kafka"),
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(MESSAGING_OPERATION, "publish"),
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")),
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative)));
Expand All @@ -203,6 +206,7 @@ protected static List<AttributeAssertion> receiveAttributes(boolean testHeaders)
equalTo(MESSAGING_SYSTEM, "kafka"),
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(MESSAGING_OPERATION, "receive"),
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
satisfies(MESSAGING_BATCH_MESSAGE_COUNT, AbstractLongAssert::isPositive)));
// consumer group is not available in version 0.11
Expand All @@ -227,6 +231,7 @@ protected static List<AttributeAssertion> processAttributes(
equalTo(MESSAGING_SYSTEM, "kafka"),
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(MESSAGING_OPERATION, "process"),
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,11 @@ <K, V> ConsumerRecords<K, V> addTracing(
*
* @param record the producer record to inject span info.
*/
<K, V> void buildAndInjectSpan(ProducerRecord<K, V> record, String clientId) {
<K, V> void buildAndInjectSpan(
ProducerRecord<K, V> record, String clientId, String bootstrapServers) {
Context parentContext = Context.current();

KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId, bootstrapServers);
if (!producerInstrumenter.shouldStart(parentContext, request)) {
return;
}
Expand Down Expand Up @@ -262,16 +263,25 @@ <K, V> Future<RecordMetadata> buildAndInjectSpan(
private <K, V> Context buildAndFinishSpan(
ConsumerRecords<K, V> records, Consumer<K, V> consumer, Timer timer) {
return buildAndFinishSpan(
records, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer), timer);
records,
KafkaUtil.getConsumerGroup(consumer),
KafkaUtil.getClientId(consumer),
KafkaUtil.getBootstrapServers(consumer),
timer);
}

<K, V> Context buildAndFinishSpan(
ConsumerRecords<K, V> records, String consumerGroup, String clientId, Timer timer) {
ConsumerRecords<K, V> records,
String consumerGroup,
String clientId,
String bootstrapServers,
Timer timer) {
if (records.isEmpty()) {
return null;
}
Context parentContext = Context.current();
KafkaReceiveRequest request = KafkaReceiveRequest.create(records, consumerGroup, clientId);
KafkaReceiveRequest request =
KafkaReceiveRequest.create(records, consumerGroup, clientId, bootstrapServers);
Context context = null;
if (consumerReceiveInstrumenter.shouldStart(parentContext, request)) {
context =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K,
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
.build();

private String bootstrapServers;
private String consumerGroup;
private String clientId;

Expand All @@ -42,12 +43,13 @@ public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K,
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
// timer should be started before fetching ConsumerRecords, but there is no callback for that
Timer timer = Timer.start();
Context receiveContext = telemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer);
Context receiveContext =
telemetry.buildAndFinishSpan(records, consumerGroup, clientId, bootstrapServers, timer);
if (receiveContext == null) {
receiveContext = Context.current();
}
KafkaConsumerContext consumerContext =
KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId);
KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId, bootstrapServers);
return telemetry.addTracing(records, consumerContext);
}

Expand All @@ -59,6 +61,7 @@ public void close() {}

@Override
public void configure(Map<String, ?> configs) {
bootstrapServers = Objects.toString(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), null);
consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null);
clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K,

private static final KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());

@Nullable private String bootstrapServers;

@Nullable private String clientId;

@Override
@CanIgnoreReturnValue
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
telemetry.buildAndInjectSpan(producerRecord, clientId);
telemetry.buildAndInjectSpan(producerRecord, clientId, bootstrapServers);
return producerRecord;
}

Expand All @@ -42,6 +44,7 @@ public void close() {}
@Override
public void configure(Map<String, ?> map) {
clientId = Objects.toString(map.get(ProducerConfig.CLIENT_ID_CONFIG), null);
bootstrapServers = Objects.toString(map.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), null);

// TODO: support experimental attributes config
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ void assertTraces() {
equalTo(MESSAGING_SYSTEM, "kafka"),
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(MESSAGING_OPERATION, "publish"),
satisfies(
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
AbstractStringAssert::isNotEmpty),
satisfies(
MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer"))),
Expand All @@ -52,6 +55,9 @@ void assertTraces() {
equalTo(
MESSAGING_MESSAGE_BODY_SIZE,
greeting.getBytes(StandardCharsets.UTF_8).length),
satisfies(
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
AbstractStringAssert::isNotEmpty),
satisfies(
MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ void assertTraces() {
equalTo(MESSAGING_SYSTEM, "kafka"),
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(MESSAGING_OPERATION, "publish"),
satisfies(
MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
satisfies(
MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer"))));
Expand All @@ -68,6 +70,9 @@ void assertTraces() {
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(MESSAGING_OPERATION, "receive"),
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
satisfies(
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
AbstractStringAssert::isNotEmpty),
satisfies(
MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("consumer")),
Expand All @@ -84,6 +89,9 @@ void assertTraces() {
equalTo(
MESSAGING_MESSAGE_BODY_SIZE,
greeting.getBytes(StandardCharsets.UTF_8).length),
satisfies(
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
AbstractStringAssert::isNotEmpty),
satisfies(
MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
equalTo(MESSAGING_SYSTEM, "kafka"),
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(MESSAGING_OPERATION, "publish"),
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")),
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative)));
Expand All @@ -89,6 +90,7 @@ private static List<AttributeAssertion> processAttributes(String greeting, boole
equalTo(MESSAGING_OPERATION, "process"),
equalTo(
MESSAGING_MESSAGE_BODY_SIZE, greeting.getBytes(StandardCharsets.UTF_8).length),
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
satisfies(
Expand Down
Loading
Loading