1
1
package io .smallrye .reactive .messaging .kafka .tracing ;
2
2
3
- import static io .opentelemetry .semconv . SemanticAttributes . MESSAGING_CLIENT_ID ;
4
- import static io .opentelemetry .semconv .SemanticAttributes .MESSAGING_DESTINATION_NAME ;
5
- import static io .opentelemetry .semconv .SemanticAttributes . MESSAGING_KAFKA_MESSAGE_OFFSET ;
6
- import static io .opentelemetry .semconv .SemanticAttributes .MESSAGING_OPERATION ;
7
- import static io .opentelemetry .semconv .SemanticAttributes .MESSAGING_SYSTEM ;
3
+ import static io .opentelemetry .api . common . AttributeKey . stringKey ;
4
+ import static io .opentelemetry .semconv .incubating . MessagingIncubatingAttributes .MESSAGING_DESTINATION_NAME ;
5
+ import static io .opentelemetry .semconv .incubating . MessagingIncubatingAttributes . MESSAGING_KAFKA_OFFSET ;
6
+ import static io .opentelemetry .semconv .incubating . MessagingIncubatingAttributes .MESSAGING_OPERATION ;
7
+ import static io .opentelemetry .semconv .incubating . MessagingIncubatingAttributes .MESSAGING_SYSTEM ;
8
8
import static io .smallrye .reactive .messaging .kafka .companion .RecordQualifiers .until ;
9
9
import static java .util .stream .Collectors .toList ;
10
10
import static org .assertj .core .api .Assertions .assertThat ;
11
11
import static org .awaitility .Awaitility .await ;
12
12
import static org .junit .jupiter .api .Assertions .assertEquals ;
13
+ import static org .junit .jupiter .api .Assertions .assertTrue ;
13
14
14
15
import java .net .URI ;
15
16
import java .nio .charset .StandardCharsets ;
16
17
import java .time .Duration ;
17
18
import java .util .ArrayList ;
19
+ import java .util .Arrays ;
18
20
import java .util .List ;
21
+ import java .util .Objects ;
19
22
import java .util .concurrent .CompletionStage ;
20
23
import java .util .concurrent .CopyOnWriteArrayList ;
21
24
import java .util .concurrent .Flow ;
37
40
import org .junit .jupiter .api .Test ;
38
41
39
42
import io .opentelemetry .api .GlobalOpenTelemetry ;
43
+ import io .opentelemetry .api .common .AttributeKey ;
44
+ import io .opentelemetry .api .common .Attributes ;
40
45
import io .opentelemetry .api .trace .Span ;
41
46
import io .opentelemetry .api .trace .SpanId ;
42
47
import io .opentelemetry .api .trace .SpanKind ;
53
58
import io .opentelemetry .sdk .trace .data .SpanData ;
54
59
import io .opentelemetry .sdk .trace .export .SimpleSpanProcessor ;
55
60
import io .opentelemetry .sdk .trace .samplers .Sampler ;
61
+ import io .opentelemetry .semconv .incubating .MessagingIncubatingAttributes ;
56
62
import io .smallrye .mutiny .Multi ;
57
63
import io .smallrye .reactive .messaging .ce .OutgoingCloudEventMetadata ;
58
64
import io .smallrye .reactive .messaging .kafka .base .KafkaCompanionTestBase ;
@@ -117,8 +123,9 @@ public void testFromAppToKafka() {
117
123
assertEquals ("kafka" , span .getAttributes ().get (MESSAGING_SYSTEM ));
118
124
assertEquals ("publish" , span .getAttributes ().get (MESSAGING_OPERATION ));
119
125
assertEquals (topic , span .getAttributes ().get (MESSAGING_DESTINATION_NAME ));
120
- assertEquals ("kafka-producer-kafka" , span .getAttributes ().get (MESSAGING_CLIENT_ID ));
121
- assertEquals (0 , span .getAttributes ().get (MESSAGING_KAFKA_MESSAGE_OFFSET ));
126
+ checkAttribute (span .getAttributes (), "kafka-producer-kafka" ,
127
+ stringKey ("messaging.client_id" ), MessagingIncubatingAttributes .MESSAGING_CLIENT_ID );
128
+ assertEquals (0 , span .getAttributes ().get (MESSAGING_KAFKA_OFFSET ));
122
129
123
130
assertEquals (topic + " publish" , span .getName ());
124
131
});
@@ -155,9 +162,9 @@ public void testFromAppToKafkaWithStructuredCloudEvents() {
155
162
assertEquals ("kafka" , span .getAttributes ().get (MESSAGING_SYSTEM ));
156
163
assertEquals ("publish" , span .getAttributes ().get (MESSAGING_OPERATION ));
157
164
assertEquals (topic , span .getAttributes ().get (MESSAGING_DESTINATION_NAME ));
158
- assertEquals ( "publish" , span .getAttributes (). get ( MESSAGING_OPERATION ));
159
- assertEquals ( "kafka-producer-kafka" , span . getAttributes (). get ( MESSAGING_CLIENT_ID ) );
160
- assertEquals (0 , span .getAttributes ().get (MESSAGING_KAFKA_MESSAGE_OFFSET ));
165
+ checkAttribute ( span .getAttributes (), "kafka-producer-kafka" ,
166
+ stringKey ( "messaging.client_id" ), MessagingIncubatingAttributes . MESSAGING_CLIENT_ID );
167
+ assertEquals (0 , span .getAttributes ().get (MESSAGING_KAFKA_OFFSET ));
161
168
assertEquals (topic + " publish" , span .getName ());
162
169
});
163
170
}
@@ -193,9 +200,9 @@ public void testFromAppToKafkaWithBinaryCloudEvents() {
193
200
assertEquals ("kafka" , span .getAttributes ().get (MESSAGING_SYSTEM ));
194
201
assertEquals ("publish" , span .getAttributes ().get (MESSAGING_OPERATION ));
195
202
assertEquals (topic , span .getAttributes ().get (MESSAGING_DESTINATION_NAME ));
196
- assertEquals ( "publish" , span .getAttributes (). get ( MESSAGING_OPERATION ));
197
- assertEquals ( "kafka-producer-kafka" , span . getAttributes (). get ( MESSAGING_CLIENT_ID ) );
198
- assertEquals (0 , span .getAttributes ().get (MESSAGING_KAFKA_MESSAGE_OFFSET ));
203
+ checkAttribute ( span .getAttributes (), "kafka-producer-kafka" ,
204
+ stringKey ( "messaging.client_id" ), MessagingIncubatingAttributes . MESSAGING_CLIENT_ID );
205
+ assertEquals (0 , span .getAttributes ().get (MESSAGING_KAFKA_OFFSET ));
199
206
assertEquals (topic + " publish" , span .getName ());
200
207
});
201
208
}
@@ -240,8 +247,9 @@ public void testFromKafkaToAppToKafka() {
240
247
assertEquals ("kafka" , consumer .getAttributes ().get (MESSAGING_SYSTEM ));
241
248
assertEquals ("receive" , consumer .getAttributes ().get (MESSAGING_OPERATION ));
242
249
assertEquals (parentTopic , consumer .getAttributes ().get (MESSAGING_DESTINATION_NAME ));
243
- assertEquals ("kafka-consumer-source" , consumer .getAttributes ().get (MESSAGING_CLIENT_ID ));
244
- assertEquals (0 , consumer .getAttributes ().get (MESSAGING_KAFKA_MESSAGE_OFFSET ));
250
+ checkAttribute (consumer .getAttributes (), "kafka-consumer-source" ,
251
+ stringKey ("messaging.client_id" ), MessagingIncubatingAttributes .MESSAGING_CLIENT_ID );
252
+ assertEquals (0 , consumer .getAttributes ().get (MESSAGING_KAFKA_OFFSET ));
245
253
assertEquals (parentTopic + " receive" , consumer .getName ());
246
254
247
255
SpanData producer = spans .stream ().filter (spanData -> spanData .getParentSpanId ().equals (consumer .getSpanId ()))
@@ -252,8 +260,9 @@ public void testFromKafkaToAppToKafka() {
252
260
assertEquals ("publish" , producer .getAttributes ().get (MESSAGING_OPERATION ));
253
261
assertEquals (resultTopic , producer .getAttributes ().get (MESSAGING_DESTINATION_NAME ));
254
262
assertEquals ("publish" , producer .getAttributes ().get (MESSAGING_OPERATION ));
255
- assertEquals ("kafka-producer-kafka" , producer .getAttributes ().get (MESSAGING_CLIENT_ID ));
256
- assertEquals (0 , producer .getAttributes ().get (MESSAGING_KAFKA_MESSAGE_OFFSET ));
263
+ checkAttribute (producer .getAttributes (), "kafka-producer-kafka" ,
264
+ stringKey ("messaging.client_id" ), MessagingIncubatingAttributes .MESSAGING_CLIENT_ID );
265
+ assertEquals (0 , producer .getAttributes ().get (MESSAGING_KAFKA_OFFSET ));
257
266
assertEquals (resultTopic + " publish" , producer .getName ());
258
267
});
259
268
}
@@ -306,12 +315,27 @@ public void testFromKafkaToAppWithParentSpan() {
306
315
assertEquals ("kafka" , consumer .getAttributes ().get (MESSAGING_SYSTEM ));
307
316
assertEquals ("receive" , consumer .getAttributes ().get (MESSAGING_OPERATION ));
308
317
assertEquals (parentTopic , consumer .getAttributes ().get (MESSAGING_DESTINATION_NAME ));
309
- assertEquals ("kafka-consumer-stuff" , consumer .getAttributes ().get (MESSAGING_CLIENT_ID ));
310
- assertEquals (0 , consumer .getAttributes ().get (MESSAGING_KAFKA_MESSAGE_OFFSET ));
318
+ checkAttribute (consumer .getAttributes (), "kafka-consumer-stuff" ,
319
+ stringKey ("messaging.client_id" ), MessagingIncubatingAttributes .MESSAGING_CLIENT_ID );
320
+ assertEquals (0 , consumer .getAttributes ().get (MESSAGING_KAFKA_OFFSET ));
311
321
assertEquals (parentTopic + " receive" , consumer .getName ());
312
322
});
313
323
}
314
324
325
+ /**
326
+ * Currently, Kafka creates spans using an older version of OpenTelemetry's semantic conventions. This method
327
+ * allows us to check old and new keys. It loops through the keys, making sure at least one returns the expected
328
+ * value.
329
+ *
330
+ * @param attributes The span attributes to check
331
+ * @param expected The expected value to be found
332
+ * @param keys The keys under which <code>expected</code> might be found
333
+ */
334
+ private void checkAttribute (Attributes attributes , String expected , AttributeKey <?>... keys ) {
335
+ assertTrue (
336
+ Arrays .stream (keys ).anyMatch (key -> Objects .equals (expected , attributes .get (key ))));
337
+ }
338
+
315
339
@ Test
316
340
public void testFromKafkaToAppWithNoParent () {
317
341
MyAppReceivingData bean = runApplication (
0 commit comments