@@ -26,7 +26,10 @@ type accumulatedValue struct {
2626 // updated indicates when metric was last changed.
2727 updated time.Time
2828
29- scope pcommon.InstrumentationScope
29+ scopeName string
30+ scopeVersion string
31+ scopeSchemaUrl string
32+ scopeAttributes pcommon.Map
3033}
3134
3235// accumulator stores aggregated values of incoming metrics
@@ -35,7 +38,7 @@ type accumulator interface {
3538 Accumulate (resourceMetrics pmetric.ResourceMetrics ) (processed int )
3639 // Collect returns a slice with relevant aggregated metrics and their resource attributes.
3740 // The number or metrics and attributes returned will be the same.
38- Collect () (metrics []pmetric.Metric , resourceAttrs []pcommon.Map )
41+ Collect () (metrics []pmetric.Metric , resourceAttrs []pcommon.Map , scopeNames [] string , scopeVersions [] string , scopeSchemaUrls [] string , scopeAttributes []pcommon. Map )
3942}
4043
4144// LastValueAccumulator keeps last value for accumulated metrics
@@ -68,25 +71,25 @@ func (a *lastValueAccumulator) Accumulate(rm pmetric.ResourceMetrics) (n int) {
6871
6972 metrics := ilm .Metrics ()
7073 for j := 0 ; j < metrics .Len (); j ++ {
71- n += a .addMetric (metrics .At (j ), ilm .Scope (), resourceAttrs , now )
74+ n += a .addMetric (metrics .At (j ), ilm .Scope (). Name (), ilm . Scope (). Version (), ilm . SchemaUrl (), ilm . Scope (). Attributes () , resourceAttrs , now )
7275 }
7376 }
7477
7578 return
7679}
7780
78- func (a * lastValueAccumulator ) addMetric (metric pmetric.Metric , il pcommon.InstrumentationScope , resourceAttrs pcommon.Map , now time.Time ) int {
81+ func (a * lastValueAccumulator ) addMetric (metric pmetric.Metric , scopeName string , scopeVersion string , scopeSchemaUrl string , scopeAttributes pcommon.Map , resourceAttrs pcommon.Map , now time.Time ) int {
7982 a .logger .Debug (fmt .Sprintf ("accumulating metric: %s" , metric .Name ()))
8083
8184 switch metric .Type () {
8285 case pmetric .MetricTypeGauge :
83- return a .accumulateGauge (metric , il , resourceAttrs , now )
86+ return a .accumulateGauge (metric , scopeName , scopeVersion , scopeSchemaUrl , scopeAttributes , resourceAttrs , now )
8487 case pmetric .MetricTypeSum :
85- return a .accumulateSum (metric , il , resourceAttrs , now )
88+ return a .accumulateSum (metric , scopeName , scopeVersion , scopeSchemaUrl , scopeAttributes , resourceAttrs , now )
8689 case pmetric .MetricTypeHistogram :
87- return a .accumulateHistogram (metric , il , resourceAttrs , now )
90+ return a .accumulateHistogram (metric , scopeName , scopeVersion , scopeSchemaUrl , scopeAttributes , resourceAttrs , now )
8891 case pmetric .MetricTypeSummary :
89- return a .accumulateSummary (metric , il , resourceAttrs , now )
92+ return a .accumulateSummary (metric , scopeName , scopeVersion , scopeSchemaUrl , scopeAttributes , resourceAttrs , now )
9093 default :
9194 a .logger .With (
9295 zap .String ("data_type" , string (metric .Type ())),
@@ -97,12 +100,12 @@ func (a *lastValueAccumulator) addMetric(metric pmetric.Metric, il pcommon.Instr
97100 return 0
98101}
99102
100- func (a * lastValueAccumulator ) accumulateSummary (metric pmetric.Metric , il pcommon.InstrumentationScope , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
103+ func (a * lastValueAccumulator ) accumulateSummary (metric pmetric.Metric , scopeName string , scopeVersion string , scopeSchemaUrl string , scopeAttributes pcommon.Map , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
101104 dps := metric .Summary ().DataPoints ()
102105 for i := 0 ; i < dps .Len (); i ++ {
103106 ip := dps .At (i )
104107
105- signature := timeseriesSignature (il . Name () , metric , ip .Attributes (), resourceAttrs )
108+ signature := timeseriesSignature (scopeName , scopeVersion , scopeSchemaUrl , scopeAttributes , metric , ip .Attributes (), resourceAttrs )
106109 if ip .Flags ().NoRecordedValue () {
107110 a .registeredMetrics .Delete (signature )
108111 return 0
@@ -119,19 +122,19 @@ func (a *lastValueAccumulator) accumulateSummary(metric pmetric.Metric, il pcomm
119122
120123 m := copyMetricMetadata (metric )
121124 ip .CopyTo (m .SetEmptySummary ().DataPoints ().AppendEmpty ())
122- a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scope : il , updated : now })
125+ a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scopeName : scopeName , scopeVersion : scopeVersion , scopeSchemaUrl : scopeSchemaUrl , scopeAttributes : scopeAttributes , updated : now })
123126 n ++
124127 }
125128
126129 return n
127130}
128131
129- func (a * lastValueAccumulator ) accumulateGauge (metric pmetric.Metric , il pcommon.InstrumentationScope , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
132+ func (a * lastValueAccumulator ) accumulateGauge (metric pmetric.Metric , scopeName string , scopeVersion string , scopeSchemaUrl string , scopeAttributes pcommon.Map , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
130133 dps := metric .Gauge ().DataPoints ()
131134 for i := 0 ; i < dps .Len (); i ++ {
132135 ip := dps .At (i )
133136
134- signature := timeseriesSignature (il . Name () , metric , ip .Attributes (), resourceAttrs )
137+ signature := timeseriesSignature (scopeName , scopeVersion , scopeSchemaUrl , scopeAttributes , metric , ip .Attributes (), resourceAttrs )
135138 if ip .Flags ().NoRecordedValue () {
136139 a .registeredMetrics .Delete (signature )
137140 return 0
@@ -141,7 +144,7 @@ func (a *lastValueAccumulator) accumulateGauge(metric pmetric.Metric, il pcommon
141144 if ! ok {
142145 m := copyMetricMetadata (metric )
143146 ip .CopyTo (m .SetEmptyGauge ().DataPoints ().AppendEmpty ())
144- a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scope : il , updated : now })
147+ a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scopeName : scopeName , scopeVersion : scopeVersion , scopeSchemaUrl : scopeSchemaUrl , scopeAttributes : scopeAttributes , updated : now })
145148 n ++
146149 continue
147150 }
@@ -154,13 +157,13 @@ func (a *lastValueAccumulator) accumulateGauge(metric pmetric.Metric, il pcommon
154157
155158 m := copyMetricMetadata (metric )
156159 ip .CopyTo (m .SetEmptyGauge ().DataPoints ().AppendEmpty ())
157- a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scope : il , updated : now })
160+ a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scopeName : scopeName , scopeVersion : scopeVersion , scopeSchemaUrl : scopeSchemaUrl , scopeAttributes : scopeAttributes , updated : now })
158161 n ++
159162 }
160163 return
161164}
162165
163- func (a * lastValueAccumulator ) accumulateSum (metric pmetric.Metric , il pcommon.InstrumentationScope , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
166+ func (a * lastValueAccumulator ) accumulateSum (metric pmetric.Metric , scopeName string , scopeVersion string , scopeSchemaUrl string , scopeAttributes pcommon.Map , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
164167 doubleSum := metric .Sum ()
165168
166169 // Drop metrics with unspecified aggregations
@@ -177,7 +180,7 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I
177180 for i := 0 ; i < dps .Len (); i ++ {
178181 ip := dps .At (i )
179182
180- signature := timeseriesSignature (il . Name () , metric , ip .Attributes (), resourceAttrs )
183+ signature := timeseriesSignature (scopeName , scopeVersion , scopeSchemaUrl , scopeAttributes , metric , ip .Attributes (), resourceAttrs )
181184 if ip .Flags ().NoRecordedValue () {
182185 a .registeredMetrics .Delete (signature )
183186 return 0
@@ -189,7 +192,7 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I
189192 m .SetEmptySum ().SetIsMonotonic (metric .Sum ().IsMonotonic ())
190193 m .Sum ().SetAggregationTemporality (pmetric .AggregationTemporalityCumulative )
191194 ip .CopyTo (m .Sum ().DataPoints ().AppendEmpty ())
192- a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scope : il , updated : now })
195+ a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scopeName : scopeName , scopeVersion : scopeVersion , scopeSchemaUrl : scopeSchemaUrl , scopeAttributes : scopeAttributes , updated : now })
193196 n ++
194197 continue
195198 }
@@ -215,21 +218,21 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I
215218 m .SetEmptySum ().SetIsMonotonic (metric .Sum ().IsMonotonic ())
216219 m .Sum ().SetAggregationTemporality (pmetric .AggregationTemporalityCumulative )
217220 ip .CopyTo (m .Sum ().DataPoints ().AppendEmpty ())
218- a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scope : il , updated : now })
221+ a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scopeName : scopeName , scopeVersion : scopeVersion , scopeSchemaUrl : scopeSchemaUrl , scopeAttributes : scopeAttributes , updated : now })
219222 n ++
220223 }
221224 return
222225}
223226
224- func (a * lastValueAccumulator ) accumulateHistogram (metric pmetric.Metric , il pcommon.InstrumentationScope , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
227+ func (a * lastValueAccumulator ) accumulateHistogram (metric pmetric.Metric , scopeName string , scopeVersion string , scopeSchemaUrl string , scopeAttributes pcommon.Map , resourceAttrs pcommon.Map , now time.Time ) (n int ) {
225228 histogram := metric .Histogram ()
226229 a .logger .Debug ("Accumulate histogram....." )
227230 dps := histogram .DataPoints ()
228231
229232 for i := 0 ; i < dps .Len (); i ++ {
230233 ip := dps .At (i )
231234
232- signature := timeseriesSignature (il . Name () , metric , ip .Attributes (), resourceAttrs ) // uniquely identify this time series you are accumulating for
235+ signature := timeseriesSignature (scopeName , scopeVersion , scopeSchemaUrl , scopeAttributes , metric , ip .Attributes (), resourceAttrs ) // uniquely identify this time series you are accumulating for
233236 if ip .Flags ().NoRecordedValue () {
234237 a .registeredMetrics .Delete (signature )
235238 return 0
@@ -241,7 +244,7 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco
241244 m := copyMetricMetadata (metric )
242245 ip .CopyTo (m .SetEmptyHistogram ().DataPoints ().AppendEmpty ())
243246 m .Histogram ().SetAggregationTemporality (pmetric .AggregationTemporalityCumulative )
244- a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scope : il , updated : now })
247+ a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scopeName : scopeName , scopeVersion : scopeVersion , scopeSchemaUrl : scopeSchemaUrl , scopeAttributes : scopeAttributes , updated : now })
245248 n ++
246249 continue
247250 }
@@ -284,18 +287,22 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco
284287 // unsupported temporality
285288 continue
286289 }
287- a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scope : il , updated : now })
290+ a .registeredMetrics .Store (signature , & accumulatedValue {value : m , resourceAttrs : resourceAttrs , scopeName : scopeName , scopeVersion : scopeVersion , scopeSchemaUrl : scopeSchemaUrl , scopeAttributes : scopeAttributes , updated : now })
288291 n ++
289292 }
290293 return
291294}
292295
293296// Collect returns a slice with relevant aggregated metrics and their resource attributes.
294- func (a * lastValueAccumulator ) Collect () ([]pmetric.Metric , []pcommon.Map ) {
297+ func (a * lastValueAccumulator ) Collect () ([]pmetric.Metric , []pcommon.Map , [] string , [] string , [] string , []pcommon. Map ) {
295298 a .logger .Debug ("Accumulator collect called" )
296299
297300 var metrics []pmetric.Metric
298301 var resourceAttrs []pcommon.Map
302+ var scopeNames []string
303+ var scopeVersions []string
304+ var scopeSchemaUrls []string
305+ var scopeAttributes []pcommon.Map
299306 expirationTime := time .Now ().Add (- a .metricExpiration )
300307
301308 a .registeredMetrics .Range (func (key , value any ) bool {
@@ -308,23 +315,38 @@ func (a *lastValueAccumulator) Collect() ([]pmetric.Metric, []pcommon.Map) {
308315
309316 metrics = append (metrics , v .value )
310317 resourceAttrs = append (resourceAttrs , v .resourceAttrs )
318+ scopeNames = append (scopeNames , v .scopeName )
319+ scopeVersions = append (scopeVersions , v .scopeVersion )
320+ scopeSchemaUrls = append (scopeSchemaUrls , v .scopeSchemaUrl )
321+ scopeAttributes = append (scopeAttributes , v .scopeAttributes )
311322 return true
312323 })
313324
314- return metrics , resourceAttrs
325+ return metrics , resourceAttrs , scopeNames , scopeVersions , scopeSchemaUrls , scopeAttributes
315326}
316327
317- func timeseriesSignature (ilmName string , metric pmetric.Metric , attributes pcommon.Map , resourceAttrs pcommon.Map ) string {
328+ func timeseriesSignature (scopeName string , scopeVersion string , scopeSchemaUrl string , scopeAttributes pcommon. Map , metric pmetric.Metric , attributes pcommon.Map , resourceAttrs pcommon.Map ) string {
318329 var b strings.Builder
319- b .WriteString (metric .Type ().String ())
320- b .WriteString ("*" + ilmName )
321330 b .WriteString ("*" + metric .Name ())
322- attrs := make ([]string , 0 , attributes .Len ())
331+ b .WriteString (metric .Type ().String ())
332+ b .WriteString ("*" + scopeName )
333+ b .WriteString ("*" + scopeVersion )
334+ b .WriteString ("*" + scopeSchemaUrl )
335+
336+ attrs := make ([]string , 0 , scopeAttributes .Len ())
337+ for k , v := range scopeAttributes .All () {
338+ attrs = append (attrs , k + "*" + v .AsString ())
339+ }
340+ sort .Strings (attrs )
341+ b .WriteString ("*" + strings .Join (attrs , "*" ))
342+
343+ attrs = make ([]string , 0 , attributes .Len ())
323344 for k , v := range attributes .All () {
324345 attrs = append (attrs , k + "*" + v .AsString ())
325346 }
326347 sort .Strings (attrs )
327348 b .WriteString ("*" + strings .Join (attrs , "*" ))
349+
328350 if job , ok := extractJob (resourceAttrs ); ok {
329351 b .WriteString ("*" + model .JobLabel + "*" + job )
330352 }
0 commit comments