Skip to content

Commit a91de57

Browse files
committed
Adding support for other metrics, missing async
1 parent 39c90eb commit a91de57

File tree

5 files changed

+294
-55
lines changed

5 files changed

+294
-55
lines changed

tracer/src/Datadog.Trace/OTelMetrics/MeterListener.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
#nullable enable
99

1010
using System;
11-
using System.Diagnostics.Metrics;
1211
using System.Threading;
1312
using Datadog.Trace.Logging;
1413

@@ -19,8 +18,8 @@ internal static class MeterListener
1918
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(typeof(MeterListener));
2019

2120
private static System.Diagnostics.Metrics.MeterListener? _meterListenerInstance;
22-
private static int _initialized = 0;
23-
private static int _stopped = 0;
21+
private static int _initialized;
22+
private static int _stopped;
2423

2524
public static bool IsRunning
2625
{
@@ -43,8 +42,9 @@ public static void Initialize()
4342
var meterListener = new System.Diagnostics.Metrics.MeterListener();
4443
meterListener.InstrumentPublished = MeterListenerHandler.OnInstrumentPublished;
4544

46-
// For now, only handle long counters to start simple
45+
// Handle basic synchronous instruments (as per RFC)
4746
meterListener.SetMeasurementEventCallback<long>(MeterListenerHandler.OnMeasurementRecordedLong);
47+
meterListener.SetMeasurementEventCallback<double>(MeterListenerHandler.OnMeasurementRecordedDouble);
4848

4949
meterListener.Start();
5050
_meterListenerInstance = meterListener;

tracer/src/Datadog.Trace/OTelMetrics/MeterListenerHandler.cs

Lines changed: 86 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using System.Collections.Concurrent;
1212
using System.Collections.Generic;
1313
using System.Diagnostics.Metrics;
14+
using System.Runtime.CompilerServices;
1415
using Datadog.Trace.Logging;
1516

1617
namespace Datadog.Trace.OTelMetrics
@@ -19,7 +20,7 @@ internal static class MeterListenerHandler
1920
{
2021
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(typeof(MeterListenerHandler));
2122

22-
private static readonly ConcurrentDictionary<string, MetricData> CapturedMetrics = new();
23+
private static readonly ConcurrentDictionary<string, MetricPoint> CapturedMetrics = new();
2324

2425
public static void OnInstrumentPublished(Instrument instrument, System.Diagnostics.Metrics.MeterListener listener)
2526
{
@@ -48,6 +49,18 @@ public static void OnInstrumentPublished(Instrument instrument, System.Diagnosti
4849
}
4950

5051
public static void OnMeasurementRecordedLong(Instrument instrument, long value, ReadOnlySpan<KeyValuePair<string, object?>> tags, object? state)
52+
{
53+
ProcessMeasurement(instrument, value, tags);
54+
}
55+
56+
public static void OnMeasurementRecordedDouble(Instrument instrument, double value, ReadOnlySpan<KeyValuePair<string, object?>> tags, object? state)
57+
{
58+
// Reuse the same logic but for double values
59+
ProcessMeasurement(instrument, value, tags);
60+
}
61+
62+
private static void ProcessMeasurement<T>(Instrument instrument, T value, ReadOnlySpan<KeyValuePair<string, object?>> tags)
63+
where T : struct
5164
{
5265
var instrumentType = instrument.GetType().FullName;
5366
if (instrumentType is null)
@@ -56,40 +69,94 @@ public static void OnMeasurementRecordedLong(Instrument instrument, long value,
5669
return;
5770
}
5871

59-
// Only handle Counter<long> for now
60-
if (!instrumentType.StartsWith("System.Diagnostics.Metrics.Counter`1"))
72+
// Handle synchronous instruments as per RFC
73+
string aggregationType;
74+
if (instrumentType.StartsWith("System.Diagnostics.Metrics.Counter`1") ||
75+
instrumentType.StartsWith("System.Diagnostics.Metrics.UpDownCounter`1"))
76+
{
77+
aggregationType = "Counter"; // Both use Sum Aggregation -> Sum Metric Point
78+
}
79+
else if (instrumentType.StartsWith("System.Diagnostics.Metrics.Gauge`1"))
6180
{
62-
Log.Debug("Skipping non-counter instrument: {InstrumentName} of type: {InstrumentType}", instrument.Name, instrumentType);
81+
aggregationType = "Gauge"; // Last Value Aggregation -> Gauge Metric Point
82+
}
83+
else if (instrumentType.StartsWith("System.Diagnostics.Metrics.Histogram`1"))
84+
{
85+
aggregationType = "Histogram"; // Histogram Aggregation -> Histogram Metric Point
86+
}
87+
else
88+
{
89+
Log.Debug("Skipping unsupported instrument: {InstrumentName} of type: {InstrumentType}", instrument.Name, instrumentType);
6390
return;
6491
}
6592

6693
var tagsDict = new Dictionary<string, object?>(tags.Length);
6794
var tagsArray = new string[tags.Length];
68-
for (int i = 0; i < tags.Length; i++)
95+
for (var i = 0; i < tags.Length; i++)
6996
{
7097
var tag = tags[i];
7198
tagsDict[tag.Key] = tag.Value;
7299
tagsArray[i] = tag.Key + "=" + tag.Value;
73100
}
74101

75-
// Create metric data
76-
var metricData = new MetricData
77-
{
78-
InstrumentName = instrument.Name,
79-
MeterName = instrument.Meter.Name ?? "unknown",
80-
InstrumentType = "Counter",
81-
Value = value,
82-
Tags = tagsDict,
83-
Timestamp = DateTimeOffset.UtcNow
84-
};
102+
// High-performance aggregation (like OTel)
103+
var doubleValue = Convert.ToDouble(value);
104+
var meterName = instrument.Meter.Name;
105+
var key = $"{meterName}.{instrument.Name}";
106+
var temporality = Tracer.Instance.Settings.OtlpMetricsTemporalityPreference.ToString();
107+
108+
var metricPoint = CapturedMetrics.AddOrUpdate(
109+
key,
110+
// Create new MetricPoint
111+
_ => new MetricPoint(instrument.Name, meterName, aggregationType, temporality, tagsDict),
112+
// Update existing MetricPoint (lock-free where possible)
113+
(_, existing) =>
114+
{
115+
UpdateMetricPoint(existing, aggregationType, doubleValue);
116+
return existing;
117+
});
85118

86-
var key = $"{metricData.MeterName}.{metricData.InstrumentName}";
87-
CapturedMetrics.AddOrUpdate(key, metricData, (k, existing) => metricData);
119+
// For new MetricPoints, record the first measurement
120+
if (metricPoint is { SnapshotCount: 0, SnapshotSum: 0, SnapshotGaugeValue: 0 })
121+
{
122+
UpdateMetricPoint(metricPoint, aggregationType, doubleValue);
123+
}
88124

89-
Log.Debug("Captured counter measurement: {InstrumentName} = {Value}", instrument.Name, value);
125+
Log.Debug("Captured {InstrumentType} measurement: {InstrumentName} = {Value}", aggregationType, instrument.Name, value);
90126

91127
var tagsString = string.Join(",", tagsArray);
92-
Console.WriteLine($"[METRICS_CAPTURE] {key}|{metricData.InstrumentType}|{value}|{tagsString}");
128+
129+
// Take snapshot for display (like OTel export)
130+
metricPoint.TakeSnapshot(outputDelta: false);
131+
132+
// Output the appropriate aggregated value for console testing
133+
var displayValue = aggregationType switch
134+
{
135+
"Counter" => metricPoint.SnapshotSum.ToString(System.Globalization.CultureInfo.InvariantCulture),
136+
"Gauge" => metricPoint.SnapshotGaugeValue.ToString(System.Globalization.CultureInfo.InvariantCulture),
137+
"Histogram" => $"count={metricPoint.SnapshotCount.ToString(System.Globalization.CultureInfo.InvariantCulture)},sum={metricPoint.SnapshotSum.ToString(System.Globalization.CultureInfo.InvariantCulture)}",
138+
_ => value.ToString()
139+
};
140+
141+
Console.WriteLine($"[METRICS_CAPTURE] {key}|{metricPoint.InstrumentType}|{displayValue}|{tagsString}");
142+
}
143+
144+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
145+
private static void UpdateMetricPoint(MetricPoint metricPoint, string aggregationType, double value)
146+
{
147+
// High-performance updates (like OTel)
148+
switch (aggregationType)
149+
{
150+
case "Counter":
151+
metricPoint.UpdateCounter(value);
152+
break;
153+
case "Gauge":
154+
metricPoint.UpdateGauge(value);
155+
break;
156+
case "Histogram":
157+
metricPoint.UpdateHistogram(value);
158+
break;
159+
}
93160
}
94161
}
95162
}

tracer/src/Datadog.Trace/OTelMetrics/MetricData.cs

Lines changed: 0 additions & 30 deletions
This file was deleted.
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
// <copyright file="MetricPoint.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
#if NET6_0_OR_GREATER
7+
8+
#nullable enable
9+
10+
using System;
11+
using System.Collections.Generic;
12+
using System.Runtime.CompilerServices;
13+
using System.Threading;
14+
15+
namespace Datadog.Trace.OTelMetrics
16+
{
17+
internal class MetricPoint
18+
{
19+
private static readonly double[] DefaultHistogramBounds = [0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000];
20+
21+
// Histogram-specific
22+
private readonly long[] _runningBucketCounts;
23+
private readonly object _histogramLock = new();
24+
25+
// Thread-safe running values (hot path)
26+
private long _runningCountValue; // For counters and histogram count
27+
private double _runningDoubleValue; // For gauges and histogram sum
28+
private double _runningMin = double.PositiveInfinity;
29+
private double _runningMax = double.NegativeInfinity;
30+
31+
public MetricPoint(string instrumentName, string meterName, string instrumentType, string temporality, Dictionary<string, object?> tags)
32+
{
33+
// Initialize histogram buckets if needed
34+
if (instrumentType == "Histogram")
35+
{
36+
_runningBucketCounts = new long[DefaultHistogramBounds.Length + 1]; // +1 for overflow
37+
SnapshotBucketCounts = new long[DefaultHistogramBounds.Length + 1];
38+
}
39+
else
40+
{
41+
_runningBucketCounts = [];
42+
SnapshotBucketCounts = [];
43+
}
44+
45+
InstrumentName = instrumentName;
46+
MeterName = meterName;
47+
InstrumentType = instrumentType;
48+
AggregationTemporality = temporality;
49+
Tags = tags;
50+
StartTime = DateTimeOffset.UtcNow;
51+
EndTime = DateTimeOffset.UtcNow;
52+
}
53+
54+
public string InstrumentName { get; }
55+
56+
public string MeterName { get; }
57+
58+
public string InstrumentType { get; }
59+
60+
public string AggregationTemporality { get; }
61+
62+
public Dictionary<string, object?> Tags { get; }
63+
64+
public DateTimeOffset StartTime { get; private set; }
65+
66+
public DateTimeOffset EndTime { get; private set; }
67+
68+
// Snapshot values (export time)
69+
public long SnapshotCount { get; private set; }
70+
71+
public double SnapshotSum { get; private set; }
72+
73+
public double SnapshotGaugeValue { get; private set; }
74+
75+
public double SnapshotMin { get; private set; }
76+
77+
public double SnapshotMax { get; private set; }
78+
79+
public long[] SnapshotBucketCounts { get; private set; }
80+
81+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
82+
public void UpdateCounter(double value)
83+
{
84+
// Use lock to avoid floating-point precision issues with CompareExchange
85+
lock (_histogramLock)
86+
{
87+
_runningDoubleValue += value;
88+
}
89+
90+
EndTime = DateTimeOffset.UtcNow;
91+
}
92+
93+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
94+
public void UpdateGauge(double value)
95+
{
96+
// Lock-free gauge update (like OTel)
97+
Interlocked.Exchange(ref _runningDoubleValue, value);
98+
EndTime = DateTimeOffset.UtcNow;
99+
}
100+
101+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
102+
public void UpdateHistogram(double value)
103+
{
104+
// Find bucket index first (outside lock for performance)
105+
var bucketIndex = FindBucketIndex(value);
106+
107+
// Minimal lock scope (like OTel)
108+
lock (_histogramLock)
109+
{
110+
unchecked
111+
{
112+
_runningCountValue++;
113+
_runningDoubleValue += value; // Sum
114+
_runningBucketCounts[bucketIndex]++;
115+
}
116+
117+
_runningMin = Math.Min(_runningMin, value);
118+
_runningMax = Math.Max(_runningMax, value);
119+
}
120+
121+
EndTime = DateTimeOffset.UtcNow;
122+
}
123+
124+
public void TakeSnapshot(bool outputDelta)
125+
{
126+
EndTime = DateTimeOffset.UtcNow;
127+
128+
switch (InstrumentType)
129+
{
130+
case "Counter":
131+
if (outputDelta)
132+
{
133+
var currentValue = Interlocked.CompareExchange(ref _runningDoubleValue, 0, 0); // Datadog read pattern
134+
SnapshotSum = currentValue - SnapshotSum; // Delta calculation
135+
}
136+
else
137+
{
138+
SnapshotSum = Interlocked.CompareExchange(ref _runningDoubleValue, 0, 0); // Cumulative
139+
}
140+
141+
break;
142+
143+
case "Gauge":
144+
SnapshotGaugeValue = Interlocked.CompareExchange(ref _runningDoubleValue, 0, 0);
145+
break;
146+
147+
case "Histogram":
148+
lock (_histogramLock)
149+
{
150+
SnapshotCount = _runningCountValue;
151+
SnapshotSum = _runningDoubleValue;
152+
SnapshotMin = _runningMin;
153+
SnapshotMax = _runningMax;
154+
155+
// Copy bucket counts
156+
Array.Copy(_runningBucketCounts, SnapshotBucketCounts, _runningBucketCounts.Length);
157+
158+
if (outputDelta)
159+
{
160+
// Reset for delta
161+
_runningCountValue = 0;
162+
_runningDoubleValue = 0;
163+
_runningMin = double.PositiveInfinity;
164+
_runningMax = double.NegativeInfinity;
165+
Array.Clear(_runningBucketCounts);
166+
}
167+
}
168+
169+
break;
170+
}
171+
}
172+
173+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
174+
private static int FindBucketIndex(double value)
175+
{
176+
// Linear search for default 15 boundaries (fast enough)
177+
for (var i = 0; i < DefaultHistogramBounds.Length; i++)
178+
{
179+
if (value <= DefaultHistogramBounds[i])
180+
{
181+
return i;
182+
}
183+
}
184+
185+
return DefaultHistogramBounds.Length; // Overflow bucket
186+
}
187+
}
188+
}
189+
#endif

0 commit comments

Comments
 (0)