Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ internal static class KafkaHelper
tags.Tombstone = "true";
}

if (topicPartition is not null && !string.IsNullOrEmpty(topicPartition.Topic))
{
tags.Topic = topicPartition.Topic;
}

// Producer spans should always be measured
span.SetMetric(Trace.Tags.Measured, 1.0);

Expand Down Expand Up @@ -225,6 +230,11 @@ private static long GetMessageSize<T>(T message)
tags.Tombstone = "true";
}

if (!string.IsNullOrEmpty(topic))
{
tags.Topic = topic;
}

// Consumer spans should always be measured
span.SetTag(Tags.Measured, "1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ partial class KafkaTags
private static ReadOnlySpan<byte> InstrumentationNameBytes => new byte[] { 169, 99, 111, 109, 112, 111, 110, 101, 110, 116 };
// BootstrapServersBytes = MessagePack.Serialize("messaging.kafka.bootstrap.servers");
private static ReadOnlySpan<byte> BootstrapServersBytes => new byte[] { 217, 33, 109, 101, 115, 115, 97, 103, 105, 110, 103, 46, 107, 97, 102, 107, 97, 46, 98, 111, 111, 116, 115, 116, 114, 97, 112, 46, 115, 101, 114, 118, 101, 114, 115 };
// TopicBytes = MessagePack.Serialize("messaging.destination.name");
private static ReadOnlySpan<byte> TopicBytes => new byte[] { 186, 109, 101, 115, 115, 97, 103, 105, 110, 103, 46, 100, 101, 115, 116, 105, 110, 97, 116, 105, 111, 110, 46, 110, 97, 109, 101 };
// PartitionBytes = MessagePack.Serialize("kafka.partition");
private static ReadOnlySpan<byte> PartitionBytes => new byte[] { 175, 107, 97, 102, 107, 97, 46, 112, 97, 114, 116, 105, 116, 105, 111, 110 };
// OffsetBytes = MessagePack.Serialize("kafka.offset");
Expand All @@ -38,6 +40,7 @@ partial class KafkaTags
"span.kind" => SpanKind,
"component" => InstrumentationName,
"messaging.kafka.bootstrap.servers" => BootstrapServers,
"messaging.destination.name" => Topic,
"kafka.partition" => Partition,
"kafka.offset" => Offset,
"kafka.tombstone" => Tombstone,
Expand All @@ -53,6 +56,9 @@ public override void SetTag(string key, string value)
case "messaging.kafka.bootstrap.servers":
BootstrapServers = value;
break;
case "messaging.destination.name":
Topic = value;
break;
case "kafka.partition":
Partition = value;
break;
Expand Down Expand Up @@ -92,6 +98,11 @@ public override void EnumerateTags<TProcessor>(ref TProcessor processor)
processor.Process(new TagItem<string>("messaging.kafka.bootstrap.servers", BootstrapServers, BootstrapServersBytes));
}

if (Topic is not null)
{
processor.Process(new TagItem<string>("messaging.destination.name", Topic, TopicBytes));
}

if (Partition is not null)
{
processor.Process(new TagItem<string>("kafka.partition", Partition, PartitionBytes));
Expand Down Expand Up @@ -138,6 +149,13 @@ protected override void WriteAdditionalTags(System.Text.StringBuilder sb)
.Append(',');
}

if (Topic is not null)
{
sb.Append("messaging.destination.name (tag):")
.Append(Topic)
.Append(',');
}

if (Partition is not null)
{
sb.Append("kafka.partition (tag):")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ partial class KafkaTags
private static ReadOnlySpan<byte> InstrumentationNameBytes => new byte[] { 169, 99, 111, 109, 112, 111, 110, 101, 110, 116 };
// BootstrapServersBytes = MessagePack.Serialize("messaging.kafka.bootstrap.servers");
private static ReadOnlySpan<byte> BootstrapServersBytes => new byte[] { 217, 33, 109, 101, 115, 115, 97, 103, 105, 110, 103, 46, 107, 97, 102, 107, 97, 46, 98, 111, 111, 116, 115, 116, 114, 97, 112, 46, 115, 101, 114, 118, 101, 114, 115 };
// TopicBytes = MessagePack.Serialize("messaging.destination.name");
private static ReadOnlySpan<byte> TopicBytes => new byte[] { 186, 109, 101, 115, 115, 97, 103, 105, 110, 103, 46, 100, 101, 115, 116, 105, 110, 97, 116, 105, 111, 110, 46, 110, 97, 109, 101 };
// PartitionBytes = MessagePack.Serialize("kafka.partition");
private static ReadOnlySpan<byte> PartitionBytes => new byte[] { 175, 107, 97, 102, 107, 97, 46, 112, 97, 114, 116, 105, 116, 105, 111, 110 };
// OffsetBytes = MessagePack.Serialize("kafka.offset");
Expand All @@ -38,6 +40,7 @@ partial class KafkaTags
"span.kind" => SpanKind,
"component" => InstrumentationName,
"messaging.kafka.bootstrap.servers" => BootstrapServers,
"messaging.destination.name" => Topic,
"kafka.partition" => Partition,
"kafka.offset" => Offset,
"kafka.tombstone" => Tombstone,
Expand All @@ -53,6 +56,9 @@ public override void SetTag(string key, string value)
case "messaging.kafka.bootstrap.servers":
BootstrapServers = value;
break;
case "messaging.destination.name":
Topic = value;
break;
case "kafka.partition":
Partition = value;
break;
Expand Down Expand Up @@ -92,6 +98,11 @@ public override void EnumerateTags<TProcessor>(ref TProcessor processor)
processor.Process(new TagItem<string>("messaging.kafka.bootstrap.servers", BootstrapServers, BootstrapServersBytes));
}

if (Topic is not null)
{
processor.Process(new TagItem<string>("messaging.destination.name", Topic, TopicBytes));
}

if (Partition is not null)
{
processor.Process(new TagItem<string>("kafka.partition", Partition, PartitionBytes));
Expand Down Expand Up @@ -138,6 +149,13 @@ protected override void WriteAdditionalTags(System.Text.StringBuilder sb)
.Append(',');
}

if (Topic is not null)
{
sb.Append("messaging.destination.name (tag):")
.Append(Topic)
.Append(',');
}

if (Partition is not null)
{
sb.Append("kafka.partition (tag):")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ partial class KafkaTags
private static ReadOnlySpan<byte> InstrumentationNameBytes => new byte[] { 169, 99, 111, 109, 112, 111, 110, 101, 110, 116 };
// BootstrapServersBytes = MessagePack.Serialize("messaging.kafka.bootstrap.servers");
private static ReadOnlySpan<byte> BootstrapServersBytes => new byte[] { 217, 33, 109, 101, 115, 115, 97, 103, 105, 110, 103, 46, 107, 97, 102, 107, 97, 46, 98, 111, 111, 116, 115, 116, 114, 97, 112, 46, 115, 101, 114, 118, 101, 114, 115 };
// TopicBytes = MessagePack.Serialize("messaging.destination.name");
private static ReadOnlySpan<byte> TopicBytes => new byte[] { 186, 109, 101, 115, 115, 97, 103, 105, 110, 103, 46, 100, 101, 115, 116, 105, 110, 97, 116, 105, 111, 110, 46, 110, 97, 109, 101 };
// PartitionBytes = MessagePack.Serialize("kafka.partition");
private static ReadOnlySpan<byte> PartitionBytes => new byte[] { 175, 107, 97, 102, 107, 97, 46, 112, 97, 114, 116, 105, 116, 105, 111, 110 };
// OffsetBytes = MessagePack.Serialize("kafka.offset");
Expand All @@ -38,6 +40,7 @@ partial class KafkaTags
"span.kind" => SpanKind,
"component" => InstrumentationName,
"messaging.kafka.bootstrap.servers" => BootstrapServers,
"messaging.destination.name" => Topic,
"kafka.partition" => Partition,
"kafka.offset" => Offset,
"kafka.tombstone" => Tombstone,
Expand All @@ -53,6 +56,9 @@ public override void SetTag(string key, string value)
case "messaging.kafka.bootstrap.servers":
BootstrapServers = value;
break;
case "messaging.destination.name":
Topic = value;
break;
case "kafka.partition":
Partition = value;
break;
Expand Down Expand Up @@ -92,6 +98,11 @@ public override void EnumerateTags<TProcessor>(ref TProcessor processor)
processor.Process(new TagItem<string>("messaging.kafka.bootstrap.servers", BootstrapServers, BootstrapServersBytes));
}

if (Topic is not null)
{
processor.Process(new TagItem<string>("messaging.destination.name", Topic, TopicBytes));
}

if (Partition is not null)
{
processor.Process(new TagItem<string>("kafka.partition", Partition, PartitionBytes));
Expand Down Expand Up @@ -138,6 +149,13 @@ protected override void WriteAdditionalTags(System.Text.StringBuilder sb)
.Append(',');
}

if (Topic is not null)
{
sb.Append("messaging.destination.name (tag):")
.Append(Topic)
.Append(',');
}

if (Partition is not null)
{
sb.Append("kafka.partition (tag):")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ partial class KafkaTags
private static ReadOnlySpan<byte> InstrumentationNameBytes => new byte[] { 169, 99, 111, 109, 112, 111, 110, 101, 110, 116 };
// BootstrapServersBytes = MessagePack.Serialize("messaging.kafka.bootstrap.servers");
private static ReadOnlySpan<byte> BootstrapServersBytes => new byte[] { 217, 33, 109, 101, 115, 115, 97, 103, 105, 110, 103, 46, 107, 97, 102, 107, 97, 46, 98, 111, 111, 116, 115, 116, 114, 97, 112, 46, 115, 101, 114, 118, 101, 114, 115 };
// TopicBytes = MessagePack.Serialize("messaging.destination.name");
private static ReadOnlySpan<byte> TopicBytes => new byte[] { 186, 109, 101, 115, 115, 97, 103, 105, 110, 103, 46, 100, 101, 115, 116, 105, 110, 97, 116, 105, 111, 110, 46, 110, 97, 109, 101 };
// PartitionBytes = MessagePack.Serialize("kafka.partition");
private static ReadOnlySpan<byte> PartitionBytes => new byte[] { 175, 107, 97, 102, 107, 97, 46, 112, 97, 114, 116, 105, 116, 105, 111, 110 };
// OffsetBytes = MessagePack.Serialize("kafka.offset");
Expand All @@ -38,6 +40,7 @@ partial class KafkaTags
"span.kind" => SpanKind,
"component" => InstrumentationName,
"messaging.kafka.bootstrap.servers" => BootstrapServers,
"messaging.destination.name" => Topic,
"kafka.partition" => Partition,
"kafka.offset" => Offset,
"kafka.tombstone" => Tombstone,
Expand All @@ -53,6 +56,9 @@ public override void SetTag(string key, string value)
case "messaging.kafka.bootstrap.servers":
BootstrapServers = value;
break;
case "messaging.destination.name":
Topic = value;
break;
case "kafka.partition":
Partition = value;
break;
Expand Down Expand Up @@ -92,6 +98,11 @@ public override void EnumerateTags<TProcessor>(ref TProcessor processor)
processor.Process(new TagItem<string>("messaging.kafka.bootstrap.servers", BootstrapServers, BootstrapServersBytes));
}

if (Topic is not null)
{
processor.Process(new TagItem<string>("messaging.destination.name", Topic, TopicBytes));
}

if (Partition is not null)
{
processor.Process(new TagItem<string>("kafka.partition", Partition, PartitionBytes));
Expand Down Expand Up @@ -138,6 +149,13 @@ protected override void WriteAdditionalTags(System.Text.StringBuilder sb)
.Append(',');
}

if (Topic is not null)
{
sb.Append("messaging.destination.name (tag):")
.Append(Topic)
.Append(',');
}

if (Partition is not null)
{
sb.Append("kafka.partition (tag):")
Expand Down
3 changes: 3 additions & 0 deletions tracer/src/Datadog.Trace/Tagging/KafkaTags.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public KafkaTags(string spanKind)
[Tag(Trace.Tags.KafkaBootstrapServers)]
public string BootstrapServers { get; set; }

[Tag(Trace.Tags.MessagingDestinationName)]
public string Topic { get; set; }

[Tag(Trace.Tags.KafkaPartition)]
public string Partition { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ public async Task SubmitsTraces(string packageVersion, string metadataSchemaVers
.OnlyContain(tag => Regex.IsMatch(tag, @"^\[[0-9]+\]$"))
.And.HaveCount(ExpectedSuccessProducerWithHandlerSpans + ExpectedTombstoneProducerWithHandlerSpans);

successfulProducerSpans.Should().OnlyContain(span => span.Tags.ContainsKey(Tags.MessagingDestinationName) && span.Tags[Tags.MessagingDestinationName] == topic);

allProducerSpans
.Where(span => span.Tags.ContainsKey(Tags.KafkaTombstone))
.Select(span => span.Tags[Tags.KafkaTombstone])
Expand All @@ -129,7 +131,7 @@ public async Task SubmitsTraces(string packageVersion, string metadataSchemaVers
.OnlyHaveUniqueItems()
.And.Subject.ToImmutableHashSet();

VerifyConsumerSpanProperties(successfulConsumerSpans, serviceName: clientSpanServiceName, resourceName: GetSuccessfulResourceName("Consume", topic), ExpectedConsumerSpans);
VerifyConsumerSpanProperties(successfulConsumerSpans, serviceName: clientSpanServiceName, resourceName: GetSuccessfulResourceName("Consume", topic), topic, ExpectedConsumerSpans);

// every consumer span should be a child of a producer span.
successfulConsumerSpans
Expand Down Expand Up @@ -179,7 +181,7 @@ private void VerifyProducerSpanProperties(List<MockSpan> producerSpans, string s
.And.OnlyContain(x => x.Metrics.ContainsKey(Tags.Measured) && x.Metrics[Tags.Measured] == 1.0);
}

private void VerifyConsumerSpanProperties(List<MockSpan> consumerSpans, string serviceName, string resourceName, int expectedCount)
private void VerifyConsumerSpanProperties(List<MockSpan> consumerSpans, string serviceName, string resourceName, string topic, int expectedCount)
{
// HaveCountGreaterOrEqualTo because same message may be consumed by both
consumerSpans.Should()
Expand All @@ -188,6 +190,7 @@ private void VerifyConsumerSpanProperties(List<MockSpan> consumerSpans, string s
.And.OnlyContain(x => x.Resource == resourceName)
.And.OnlyContain(x => x.Metrics.ContainsKey(Tags.Measured) && x.Metrics[Tags.Measured] == 1.0)
.And.OnlyContain(x => x.Metrics.ContainsKey(Metrics.MessageQueueTimeMs))
.And.OnlyContain(x => x.Tags.ContainsKey(Tags.MessagingDestinationName) && x.Tags[Tags.MessagingDestinationName] == topic)
.And.OnlyContain(x => x.Tags.ContainsKey(Tags.KafkaOffset) && Regex.IsMatch(x.Tags[Tags.KafkaOffset], @"^[0-9]+$"))
.And.OnlyContain(x => x.Tags.ContainsKey(Tags.KafkaPartition) && Regex.IsMatch(x.Tags[Tags.KafkaPartition], @"^\[[0-9]+\]$"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ public static Result IsKafkaInboundV0(this MockSpan span) => Result.FromSpan(spa
.IsOptional("kafka.partition")
.IsOptional("kafka.tombstone")
.IsPresent("messaging.kafka.bootstrap.servers")
.IsPresent("messaging.destination.name")
.IsOptional("_dd.base_service")
.Matches("component", "kafka")
.Matches("span.kind", "consumer"));
Expand All @@ -494,6 +495,7 @@ public static Result IsKafkaOutboundV0(this MockSpan span) => Result.FromSpan(sp
.IsOptional("kafka.partition")
.IsOptional("kafka.tombstone")
.IsPresent("messaging.kafka.bootstrap.servers")
.IsPresent("messaging.destination.name")
.IsOptional("_dd.base_service")
.Matches("component", "kafka")
.Matches("span.kind", "producer"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ public static Result IsKafkaInboundV1(this MockSpan span) => Result.FromSpan(spa
.IsOptional("kafka.partition")
.IsOptional("kafka.tombstone")
.IsPresent("messaging.kafka.bootstrap.servers")
.IsPresent("messaging.destination.name")
.IsOptional("_dd.base_service")
.Matches("component", "kafka")
.Matches("span.kind", "consumer"));
Expand All @@ -717,6 +718,7 @@ public static Result IsKafkaOutboundV1(this MockSpan span) => Result.FromSpan(sp
.IsOptional("kafka.partition")
.IsOptional("kafka.tombstone")
.IsPresent("messaging.kafka.bootstrap.servers")
.IsPresent("messaging.destination.name")
.IsPresent("peer.service")
.IsOptional("peer.service.remapped_from")
.IsOptional("_dd.base_service")
Expand Down
Loading