Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.internal;

public final class AttributeValueConstants {

private AttributeValueConstants() {}

public static final String PROCESS_STATUS_DROPPED = "dropped";
public static final String PROCESS_STATUS_PROCESSED = "processed";
public static final String PROCESS_STATUS_EXPORTED = "exported";
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package io.opentelemetry.sdk.logs.export;

import static io.opentelemetry.sdk.internal.AttributeValueConstants.PROCESS_STATUS_DROPPED;
import static io.opentelemetry.sdk.internal.AttributeValueConstants.PROCESS_STATUS_EXPORTED;
import static io.opentelemetry.sdk.internal.AttributeValueConstants.PROCESS_STATUS_PROCESSED;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
Expand Down Expand Up @@ -45,8 +49,8 @@ public final class BatchLogRecordProcessor implements LogRecordProcessor {
BatchLogRecordProcessor.class.getSimpleName() + "_WorkerThread";
private static final AttributeKey<String> LOG_RECORD_PROCESSOR_TYPE_LABEL =
AttributeKey.stringKey("logRecordProcessorType");
private static final AttributeKey<Boolean> LOG_RECORD_PROCESSOR_DROPPED_LABEL =
AttributeKey.booleanKey("dropped");
private static final AttributeKey<String> LOG_RECORD_PROCESS_STATUS_LABEL =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strongly recommend avoiding using the term "label" to refer to OpenTelemetry attributes. Let's try and be consistent in our naming. 🙏🏻

AttributeKey.stringKey("status");
private static final String LOG_RECORD_PROCESSOR_TYPE_VALUE =
BatchLogRecordProcessor.class.getSimpleName();

Expand Down Expand Up @@ -133,6 +137,7 @@ private static final class Worker implements Runnable {
private final LongCounter processedLogsCounter;
private final Attributes droppedAttrs;
private final Attributes exportedAttrs;
private final Attributes processedAttrs;

private final LogRecordExporter logRecordExporter;
private final long scheduleDelayNanos;
Expand Down Expand Up @@ -191,19 +196,26 @@ private Worker(
Attributes.of(
LOG_RECORD_PROCESSOR_TYPE_LABEL,
LOG_RECORD_PROCESSOR_TYPE_VALUE,
LOG_RECORD_PROCESSOR_DROPPED_LABEL,
true);
LOG_RECORD_PROCESS_STATUS_LABEL,
PROCESS_STATUS_DROPPED);
exportedAttrs =
Attributes.of(
LOG_RECORD_PROCESSOR_TYPE_LABEL,
LOG_RECORD_PROCESSOR_TYPE_VALUE,
LOG_RECORD_PROCESSOR_DROPPED_LABEL,
false);
LOG_RECORD_PROCESS_STATUS_LABEL,
PROCESS_STATUS_EXPORTED);
processedAttrs =
Attributes.of(
LOG_RECORD_PROCESSOR_TYPE_LABEL,
LOG_RECORD_PROCESSOR_TYPE_VALUE,
LOG_RECORD_PROCESS_STATUS_LABEL,
PROCESS_STATUS_PROCESSED);

this.batch = new ArrayList<>(this.maxExportBatchSize);
}

private void addLog(ReadWriteLogRecord logData) {
processedLogsCounter.add(1, processedAttrs);
if (!queue.offer(logData)) {
processedLogsCounter.add(1, droppedAttrs);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package io.opentelemetry.sdk.trace.export;

import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.internal.AttributeValueConstants.PROCESS_STATUS_DROPPED;
import static io.opentelemetry.sdk.internal.AttributeValueConstants.PROCESS_STATUS_EXPORTED;
import static io.opentelemetry.sdk.internal.AttributeValueConstants.PROCESS_STATUS_PROCESSED;

import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
Expand All @@ -22,31 +25,30 @@ public BatchSpanProcessorMetrics(Collection<MetricData> allMetrics, int numThrea
}

public double dropRatio() {
long exported = getMetric(false);
long dropped = getMetric(true);
long total = exported + dropped;
long dropped = getMetric(PROCESS_STATUS_DROPPED);
long total = getMetric(PROCESS_STATUS_PROCESSED);
// Due to peculiarities of JMH reporting we have to divide this by the number of the
// concurrent threads running the actual benchmark.
return total == 0 ? 0 : (double) dropped / total / numThreads;
}

public long exportedSpans() {
return getMetric(false) / numThreads;
return getMetric(PROCESS_STATUS_EXPORTED) / numThreads;
}

public long droppedSpans() {
return getMetric(true) / numThreads;
return getMetric(PROCESS_STATUS_DROPPED) / numThreads;
}

private long getMetric(boolean dropped) {
String labelValue = String.valueOf(dropped);
private long getMetric(String status) {
String labelValue = status;
OptionalLong value =
allMetrics.stream()
.filter(metricData -> metricData.getName().equals("processedSpans"))
.filter(metricData -> !metricData.isEmpty())
.map(metricData -> metricData.getLongSumData().getPoints())
.flatMap(Collection::stream)
.filter(point -> labelValue.equals(point.getAttributes().get(stringKey("dropped"))))
.filter(point -> labelValue.equals(point.getAttributes().get(stringKey("status"))))
.mapToLong(LongPointData::getValue)
.findFirst();
return value.isPresent() ? value.getAsLong() : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package io.opentelemetry.sdk.trace.export;

import static io.opentelemetry.sdk.internal.AttributeValueConstants.PROCESS_STATUS_DROPPED;
import static io.opentelemetry.sdk.internal.AttributeValueConstants.PROCESS_STATUS_EXPORTED;
import static io.opentelemetry.sdk.internal.AttributeValueConstants.PROCESS_STATUS_PROCESSED;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
Expand Down Expand Up @@ -48,8 +52,8 @@ public final class BatchSpanProcessor implements SpanProcessor {
BatchSpanProcessor.class.getSimpleName() + "_WorkerThread";
private static final AttributeKey<String> SPAN_PROCESSOR_TYPE_LABEL =
AttributeKey.stringKey("spanProcessorType");
private static final AttributeKey<Boolean> SPAN_PROCESSOR_DROPPED_LABEL =
AttributeKey.booleanKey("dropped");
private static final AttributeKey<String> SPAN_PROCESS_STATUS_LABEL =
AttributeKey.stringKey("status");
private static final String SPAN_PROCESSOR_TYPE_VALUE = BatchSpanProcessor.class.getSimpleName();

private final Worker worker;
Expand Down Expand Up @@ -150,6 +154,7 @@ private static final class Worker implements Runnable {
private final LongCounter processedSpansCounter;
private final Attributes droppedAttrs;
private final Attributes exportedAttrs;
private final Attributes processedAttrs;

private final SpanExporter spanExporter;
private final long scheduleDelayNanos;
Expand Down Expand Up @@ -207,19 +212,26 @@ private Worker(
Attributes.of(
SPAN_PROCESSOR_TYPE_LABEL,
SPAN_PROCESSOR_TYPE_VALUE,
SPAN_PROCESSOR_DROPPED_LABEL,
true);
SPAN_PROCESS_STATUS_LABEL,
PROCESS_STATUS_DROPPED);
exportedAttrs =
Attributes.of(
SPAN_PROCESSOR_TYPE_LABEL,
SPAN_PROCESSOR_TYPE_VALUE,
SPAN_PROCESSOR_DROPPED_LABEL,
false);
SPAN_PROCESS_STATUS_LABEL,
PROCESS_STATUS_EXPORTED);
processedAttrs =
Attributes.of(
SPAN_PROCESSOR_TYPE_LABEL,
SPAN_PROCESSOR_TYPE_VALUE,
SPAN_PROCESS_STATUS_LABEL,
PROCESS_STATUS_PROCESSED);

this.batch = new ArrayList<>(this.maxExportBatchSize);
}

private void addSpan(ReadableSpan span) {
processedSpansCounter.add(1, processedAttrs);
if (!queue.offer(span)) {
processedSpansCounter.add(1, droppedAttrs);
} else {
Expand Down