-
Notifications
You must be signed in to change notification settings - Fork 8
enhancement: add APM Stats transform #1095
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: rayz/apm-stats-event
Are you sure you want to change the base?
Conversation
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements an APM Stats transform that aggregates trace data into time-bucketed statistics and emits TraceStats events. The implementation includes span weight calculation, raw bucket aggregation, and a span concentrator that manages time-bucketed stats with configurable peer tag aggregation and span kind filtering.
Changes:
- Added APM Stats transform with time-bucketed aggregation of trace spans
- Implemented span weight calculation based on sampling rate
- Added support for peer tags aggregation and span kind-based stats computation
- Integrated DDSketch protocol buffers for histogram distributions
Reviewed changes
Copilot reviewed 18 out of 19 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| lib/saluki-core/src/data_model/event/trace_stats/mod.rs | Added mutable accessor for grouped stats in ClientStatsBucket |
| lib/saluki-core/src/data_model/event/mod.rs | Added TraceStats size logging to tests |
| lib/saluki-components/src/transforms/mod.rs | Registered ApmStatsConfiguration transform |
| lib/saluki-components/src/transforms/apm_stats/weight.rs | Implemented span weight calculation based on sampling rate |
| lib/saluki-components/src/transforms/apm_stats/statsraw.rs | Implemented raw bucket aggregation and stats export |
| lib/saluki-components/src/transforms/apm_stats/span_concentrator.rs | Implemented span concentrator with time-bucketed aggregation |
| lib/saluki-components/src/transforms/apm_stats/mod.rs | Implemented main APM Stats transform with async flush logic |
| lib/saluki-components/src/transforms/apm_stats/aggregation.rs | Defined aggregation keys and helper functions for stats computation |
| lib/saluki-components/src/common/otlp/config.rs | Changed default for enable_otlp_compute_top_level_by_span_kind to true |
| lib/saluki-components/src/common/datadog/apm.rs | Extended ApmConfig with stats computation and peer tags configuration |
| lib/saluki-components/Cargo.toml | Added fnv dependency for hashing |
| lib/protos/datadog/src/serde.rs | Added serialization helpers for protocol buffer bytes |
| lib/protos/datadog/src/lib.rs | Added sketches module for DDSketch definitions |
| lib/protos/datadog/proto/sketches-go/ddsketch/pb/ddsketch.proto | Added DDSketch protocol buffer definition |
| lib/protos/datadog/build.rs | Added TYPE_BYTES handling and sketch proto compilation |
| lib/protos/datadog/Cargo.toml | Added serde_bytes dependency |
| lib/ddsketch-agent/src/lib.rs | Implemented to_proto conversion for DDSketch |
| Cargo.toml | Added serde_bytes workspace dependency |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| fn round(f: f64) -> u64 { | ||
| let i = f as u64; | ||
| let frac = f - (i as f64); | ||
| if rand::rng().random::<f64>() < frac { |
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rand::rng() function uses thread-local RNG which is not Send. This could cause issues in async contexts where the transform may be moved across threads. Consider using a thread-safe RNG or storing an RNG instance in the struct.
Binary Size Analysis (Agent Data Plane)Target: 02eac16 (baseline) vs f6ec3e0 (comparison) diff
|
| Module | File Size | Symbols |
|---|---|---|
| [debug sections] | +1.45 MiB | 7 |
| figment | +252.97 KiB | 584 |
| otlp_protos::otlp_include::opentelemetry | +32.96 KiB | 278 |
| prost | -31.12 KiB | 498 |
| anyhow | +26.34 KiB | 1394 |
| hyper_util | -22.89 KiB | 200 |
| saluki_env::workload::providers | +20.68 KiB | 21 |
| saluki_env::workload::collectors | -18.93 KiB | 235 |
| alloc | +15.06 KiB | 1736 |
| [sections] | +14.50 KiB | 8 |
| hyper | +13.89 KiB | 603 |
| serde | +13.54 KiB | 40 |
| saluki_core::data_model::event | +12.26 KiB | 73 |
| serde_json | +11.79 KiB | 177 |
| saluki_components::common::datadog | -11.42 KiB | 322 |
| anon.c1ad624bdfe951ca673c256fb61fa178.119.llvm.13580561373593502968 | +9.79 KiB | 1 |
| anon.f30def023657add84aa3a6d7d4197fa6.35.llvm.13640408623153174339 | -9.79 KiB | 1 |
| tower_layer | +9.15 KiB | 10 |
| saluki_common::cache::Cache<K,V,W,H> | -8.80 KiB | 37 |
| tokio | -8.61 KiB | 4137 |
Detailed Symbol Changes
FILE SIZE VM SIZE
-------------- --------------
+0.9% +885Ki [ = ] 0 [section .debug_loc]
+0.3% +340Ki [ = ] 0 [section .debug_info]
+1.6% +322Ki +1.7% +235Ki [37861 Others]
[NEW] +163Ki [NEW] +162Ki agent_data_plane::cli::run::handle_run_command::_{{closure}}::hda70ed1a89fd2d1a
[NEW] +104Ki [NEW] +104Ki agent_data_plane::main::_{{closure}}::hdbdeb3f696b83878
+0.2% +103Ki [ = ] 0 [section .debug_str]
+0.4% +89.1Ki [ = ] 0 [section .debug_line]
[NEW] +80.1Ki [NEW] +80.0Ki agent_data_plane::cli::debug::handle_debug_command::_{{closure}}::h46cb9932ae2c93e5
[NEW] +61.5Ki [NEW] +61.4Ki saluki_core::topology::built::BuiltTopology::spawn::_{{closure}}::h3eea7ae28e7b9fba
+0.2% +58.8Ki [ = ] 0 [section .debug_ranges]
[NEW] +56.0Ki [NEW] +55.8Ki saluki_core::topology::blueprint::TopologyBlueprint::build::_{{closure}}::hd9ac07e3f1b31a34
[NEW] +46.7Ki [NEW] +46.5Ki _<saluki_components::forwarders::otlp::OtlpForwarder as saluki_core::components::forwarders::Forwarder>::run::_{{closure}}::hda328cfd5d7c0653
[NEW] +46.0Ki [NEW] +45.8Ki saluki_config::ConfigurationLoader::with_default_secrets_resolution::_{{closure}}::h001afafded53abf5
[NEW] +44.2Ki [NEW] +44.1Ki saluki_components::sources::otlp::metrics::translator::OtlpMetricsTranslator::map_metrics::h4d518cfa8c148b47
[DEL] -42.7Ki [DEL] -42.6Ki saluki_components::sources::otlp::metrics::translator::OtlpMetricsTranslator::map_metrics::hd0be7a53ec163d20
[DEL] -45.7Ki [DEL] -45.5Ki saluki_config::ConfigurationLoader::with_default_secrets_resolution::_{{closure}}::hf71d34d965f0471b
[DEL] -56.1Ki [DEL] -56.0Ki saluki_core::topology::blueprint::TopologyBlueprint::build::_{{closure}}::h7cd9eeae8f1c3216
[DEL] -61.7Ki [DEL] -61.5Ki saluki_core::topology::built::BuiltTopology::spawn::_{{closure}}::h867a99b8656ff505
[DEL] -80.1Ki [DEL] -79.9Ki agent_data_plane::cli::debug::handle_debug_command::_{{closure}}::hc359c1c67927acca
[DEL] -105Ki [DEL] -104Ki agent_data_plane::main::_{{closure}}::h94428d325095caca
[DEL] -166Ki [DEL] -166Ki agent_data_plane::cli::run::handle_run_command::_{{closure}}::h3ba317381d86c2d5
+0.5% +1.80Mi +0.9% +279Ki TOTAL
Regression Detector (Agent Data Plane)Regression Detector ResultsRun ID: dcd599bc-3ec2-4317-a831-02029e32bdd6 Baseline: 02eac16 ❌ Experiments with retried target crashesThis is a critical error. One or more replicates failed with a non-zero exit code. These replicates may have been retried. See Replicate Execution Details for more information.
Optimization Goals: ✅ Improvement(s) detected
|
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | otlp_ingest_metrics_adp | memory utilization | +0.45 | [+0.28, +0.61] | 1 | |
| ➖ | quality_gates_rss_dsd_heavy | memory utilization | +0.36 | [+0.22, +0.50] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_idle | memory utilization | +0.19 | [+0.16, +0.23] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_throughput | ingress throughput | +0.02 | [-0.18, +0.22] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_512kb_3k_contexts_throughput | ingress throughput | +0.01 | [-0.04, +0.05] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_1mb_3k_contexts_throughput | ingress throughput | +0.00 | [-0.05, +0.06] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_throughput | ingress throughput | +0.00 | [-0.02, +0.02] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_ultraheavy | ingress throughput | -0.01 | [-0.08, +0.06] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_low | memory utilization | -0.24 | [-0.37, -0.11] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_medium | memory utilization | -0.52 | [-0.68, -0.35] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_throughput | ingress throughput | -2.22 | [-2.35, -2.09] | 1 | (metrics) (profiles) (logs) |
| ✅ | otlp_ingest_logs_adp | memory utilization | -6.06 | [-6.49, -5.62] | 1 | (metrics) (profiles) (logs) |
Bounds Checks: ✅ Passed
| perf | experiment | bounds_check_name | replicates_passed | links |
|---|---|---|---|---|
| ✅ | quality_gates_rss_dsd_heavy | memory_usage | 10/10 | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_low | memory_usage | 10/10 | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_medium | memory_usage | 10/10 | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_ultraheavy | memory_usage | 10/10 | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_idle | memory_usage | 10/10 | (metrics) (profiles) (logs) |
Explanation
Confidence level: 90.00%
Effect size tolerance: |Δ mean %| ≥ 5.00%
Performance changes are noted in the perf column of each table:
- ✅ = significantly better comparison variant performance
- ❌ = significantly worse comparison variant performance
- ➖ = no significant change in performance
A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".
For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:
-
Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.
-
Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.
-
Its configuration does not mark it "erratic".
Replicate Execution Details
We run multiple replicates for each experiment/variant. However, we allow replicates to be automatically retried if there are any failures, up to 8 times, at which point the replicate is marked dead and we are unable to run analysis for the entire experiment. We call each of these attempts at running replicates a replicate execution. This section lists all replicate executions that failed due to the target crashing or being oom killed.
Note: In the below tables we bucket failures by experiment, variant, and failure type. For each of these buckets we list out the replicate indexes that failed with an annotation signifying how many times said replicate failed with the given failure mode. In the below example the baseline variant of the experiment named experiment_with_failures had two replicates that failed by oom kills. Replicate 0, which failed 8 executions, and replicate 1 which failed 6 executions, all with the same failure mode.
| Experiment | Variant | Replicates | Failure | Logs | Debug Dashboard |
|---|---|---|---|---|---|
| experiment_with_failures | baseline | 0 (x8) 1 (x6) | Oom killed | Debug Dashboard |
The debug dashboard links will take you to a debugging dashboard specifically designed to investigate replicate execution failures.
❌ Retried Normal Replicate Execution Failures (non-profiling)
| Experiment | Variant | Replicates | Failure | Debug Dashboard |
|---|---|---|---|---|
| otlp_ingest_metrics_adp | baseline | 6, 3 | Failed to shutdown when requested | Debug Dashboard |
53b3e3c to
e9accbf
Compare
473b8c1 to
b94e04f
Compare
| @@ -0,0 +1,54 @@ | |||
| // Copyright 2021 Datadog, Inc. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just saying this out loud so I (hopefully) don't forget: we should really pull this in via update-protos.sh so we have a known way to update. This is non-blocking, though, and we shouldn't do it here.
(I realize the proto definition probably hasn't changed in years, but I prefer that we have clear provenance / ways to bootstrap things if necessary.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, i forgot we had that 😅
| #[allow(dead_code)] | ||
| compute_stats_by_span_kind: bool, | ||
|
|
||
| /// Enables aggregation of peer related tags (e.g., `peer.service`, `db.instance`, etc.) in the Agent. | ||
| /// | ||
| /// Defaults to `true`. | ||
| #[serde(default = "default_peer_tags_aggregation")] | ||
| #[allow(dead_code)] | ||
| peer_tags_aggregation: bool, | ||
|
|
||
| /// Optional list of supplementary peer tags that go beyond the defaults. The Datadog backend validates all tags | ||
| /// and will drop ones that are unapproved. | ||
| /// | ||
| /// Defaults to an empty list. | ||
| #[serde(default)] | ||
| #[allow(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these all marked as potentially being dead code? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had them to pass clippy. Removed
|
|
||
| // Get hostname from global config if not set in apm_config | ||
| if apm_config.hostname.is_empty() { | ||
| apm_config.hostname = config.get_typed_or_default::<String>("hostname").into(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be pulling this from the environment provider like, say, the Host Enrichment transform does it. However, given that this is just a common configuration type... not sure how you want to structure that, like maybe just having the transform grab the detected hostname and then set it on the APM config with a dedicated method, like set_hostname_if_empty or something.
| let upper = normalized.to_uppercase(); | ||
|
|
||
| if let Some(code) = grpc_status_name_to_code(&upper) { | ||
| return MetaString::from(code); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use MetaString::from_static here to avoid allocating.
| return MetaString::from(code); | ||
| } | ||
|
|
||
| return MetaString::default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the intent to actually give up here if we do find a value for one of these four attribute keys but it has a non-integer/non-gRPC-status-code value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, agent code
| } | ||
|
|
||
| if let Ok(code) = str_value.parse::<u64>() { | ||
| return MetaString::from(code.to_string()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could just clone str_value here: the parse will fail unless the value is an integer without any leading/trailing/interstitial spaces... so we know if it parses, then the original string is in the same format that it will get rendered as by calling to_string.
| bsize: i64, | ||
|
|
||
| /// Timestamp of oldest allowed bucket (prevents stats for already-flushed buckets) | ||
| oldest_ts: i64, | ||
|
|
||
| /// Number of buckets to buffer before flushing | ||
| buffer_len: i64, | ||
|
|
||
| /// Time-bucketed raw stats: bucket_timestamp -> RawBucket | ||
| buckets: FastHashMap<i64, RawBucket>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know why the original code does this with i64, but let's make these u64 and convert from i64 to u64 at the boundary when the span is added to the concentrator.
A negative timestamp would be a big red flag anyways, so we should surface those if they're actually making it through.
| fn ns_timestamp_to_float(ns: i64) -> f64 { | ||
| let f = ns as f64; | ||
| let bits = f.to_bits(); | ||
| let truncated_bits = bits & 0xffff_f800_0000_0000; | ||
| f64::from_bits(truncated_bits) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's going on here? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken from here 😅 . I'll copy the comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 21 out of 22 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| builder.minimum().with_single_value::<ApmStats>("component struct"); | ||
| // TODO: Think about everything we need to account for here. |
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This TODO indicates incomplete memory accounting for the ApmStats component. Memory bounds should be properly specified to ensure accurate resource tracking, especially for production workloads where the concentrator maintains time-bucketed data.
| builder.minimum().with_single_value::<ApmStats>("component struct"); | |
| // TODO: Think about everything we need to account for here. | |
| builder | |
| .minimum() | |
| .with_single_value::<ApmStats>("component struct") | |
| .with_single_value::<SpanConcentrator>("span concentrator") | |
| .with_single_value::<MetaString>("agent env") | |
| .with_single_value::<MetaString>("agent hostname"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 24 out of 25 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| fn round(f: f64) -> u64 { | ||
| let i = f as u64; | ||
| let frac = f - (i as f64); | ||
| if rand::rng().random::<f64>() < frac { | ||
| i + 1 | ||
| } else { | ||
| i | ||
| } | ||
| } |
Copilot
AI
Jan 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function name round is misleading as it implements stochastic/probabilistic rounding rather than standard mathematical rounding. Consider renaming to stochastic_round or probabilistic_round to clearly indicate its behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 25 out of 26 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Summary
This PR adds a new transform
apm_statsthat transformsTraceevents intoTraceStatevents.The proto file for
DDSketchfromsketch-gohas also been added.Change Type
How did you test this PR?
Claude to copy the unit tests in the datadog-agent.
References