Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use serde::{
de::{self, Unexpected},
Deserialize, Deserializer,
};
use serde_yaml::Value as YamlValue;
use thiserror::Error;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -855,6 +856,7 @@ pub struct CbpfTunning {
pub dispatcher_queue_enabled: bool,
pub max_capture_packet_size: u32,
pub raw_packet_buffer_block_size: usize,
#[serde(deserialize_with = "parse_usize_or_auto")]
pub raw_packet_queue_size: usize,
pub max_capture_pps: u64,
}
Expand Down Expand Up @@ -1129,6 +1131,7 @@ pub struct EbpfProfile {
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct EbpfTunning {
#[serde(deserialize_with = "parse_usize_or_auto")]
pub collector_queue_size: usize,
pub userspace_worker_threads: i32,
pub perf_pages_count: u32,
Expand Down Expand Up @@ -1496,10 +1499,43 @@ where
));
}

fn parse_usize_or_auto<'de, D>(deserializer: D) -> Result<usize, D::Error>
where
D: Deserializer<'de>,
{
let val = YamlValue::deserialize(deserializer)?;
match val {
YamlValue::Number(n) => n
.as_u64()
.map(|v| v as usize)
.ok_or_else(|| de::Error::invalid_value(Unexpected::Other("number"), &"unsigned integer or 'auto'")),
YamlValue::String(s) => {
if s.eq_ignore_ascii_case("auto") {
Ok(0)
} else {
s.parse::<usize>().map_err(|_| {
de::Error::invalid_value(Unexpected::Str(&s), &"unsigned integer or 'auto'")
})
}
}
other => Err(de::Error::invalid_type(
match other {
YamlValue::Null => Unexpected::Unit,
YamlValue::Bool(b) => Unexpected::Bool(b),
YamlValue::Sequence(_) => Unexpected::Seq,
YamlValue::Mapping(_) => Unexpected::Map,
_ => Unexpected::Other("value"),
},
&"unsigned integer or 'auto'",
)),
}
}

#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct TcpHeader {
pub block_size: usize,
#[serde(deserialize_with = "parse_usize_or_auto")]
pub sender_queue_size: usize,
#[serde(deserialize_with = "parse_maybe_binary_u8")]
pub header_fields_flag: u8,
Expand All @@ -1518,6 +1554,7 @@ impl Default for TcpHeader {
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct PcapStream {
#[serde(deserialize_with = "parse_usize_or_auto")]
pub receiver_queue_size: usize,
pub buffer_size_per_flow: u32,
pub total_buffer_size: u64,
Expand All @@ -1539,6 +1576,7 @@ impl Default for PcapStream {
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct Toa {
#[serde(deserialize_with = "parse_usize_or_auto")]
pub sender_queue_size: usize,
pub cache_size: usize,
}
Expand Down Expand Up @@ -2302,8 +2340,11 @@ pub struct ProcessorsFlowLogTunning {
pub concurrent_flow_limit: u32,
pub memory_pool_size: usize,
pub max_batched_buffer_size: usize,
#[serde(deserialize_with = "parse_usize_or_auto")]
pub flow_aggregator_queue_size: usize,
#[serde(deserialize_with = "parse_usize_or_auto")]
pub flow_generator_queue_size: usize,
#[serde(deserialize_with = "parse_usize_or_auto")]
pub quadruple_generator_queue_size: usize,
}

Expand Down Expand Up @@ -2853,6 +2894,7 @@ impl Default for Throttles {
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct OutputsFlowLogTunning {
#[serde(deserialize_with = "parse_usize_or_auto")]
pub collector_queue_size: usize,
pub sender_threads: usize,
}
Expand Down Expand Up @@ -2902,6 +2944,7 @@ impl Default for FlowMetricsFilters {
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct FlowMetricsTunning {
#[serde(deserialize_with = "parse_usize_or_auto")]
pub sender_queue_size: usize,
pub sender_threads: usize,
}
Expand Down
80 changes: 78 additions & 2 deletions agent/src/config/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ use nix::{
sched::{sched_setaffinity, CpuSet},
unistd::Pid,
};
use sysinfo::SystemExt;
use sysinfo::{System, SystemExt};
#[cfg(any(target_os = "linux", target_os = "android"))]
use sysinfo::{CpuRefreshKind, RefreshKind, System};
use sysinfo::{CpuRefreshKind, RefreshKind};
use tokio::runtime::Runtime;

#[cfg(any(target_os = "linux", target_os = "android"))]
Expand Down Expand Up @@ -2265,6 +2265,8 @@ impl TryFrom<(Config, UserConfig)> for ModuleConfig {
proxy_controller_port: conf.global.communication.proxy_controller_port,
},
};
let mut config = config;
config.auto_tune();
Ok(config)
}
}
Expand Down Expand Up @@ -5507,6 +5509,80 @@ impl ModuleConfig {

min((mem_size / MB / 128 * 65536) as usize, 1 << 30)
}

pub fn auto_tune(&mut self) {
let mut sys = sysinfo::System::new();
sys.refresh_cpu();
sys.refresh_memory();
let cpus = sys.cpus().len().max(1);
let mem_gb = (sys.total_memory() / (1024 * 1024 * 1024)).max(1);

if self.user_config.outputs.flow_log.tunning.sender_threads == 0 {
self.user_config.outputs.flow_log.tunning.sender_threads =
(cpus / 2).max(1);
}
if self.user_config.outputs.flow_metrics.tunning.sender_threads == 0 {
self.user_config.outputs.flow_metrics.tunning.sender_threads =
(cpus / 4).max(1);
}

let queue_base = 65536 * mem_gb as usize;
const MAX_QUEUE: usize = 64_000_000;
const FLOW_MIN: usize = 65_536;
const AGG_MIN: usize = 65_535;
const QUAD_MIN: usize = 262_144;

if self
.user_config
.processors
.flow_log
.tunning
.flow_generator_queue_size
== 0
{
self.user_config
.processors
.flow_log
.tunning
.flow_generator_queue_size = queue_base.clamp(FLOW_MIN, MAX_QUEUE);
}
if self
.user_config
.processors
.flow_log
.tunning
.flow_aggregator_queue_size
== 0
{
self.user_config
.processors
.flow_log
.tunning
.flow_aggregator_queue_size = queue_base.clamp(AGG_MIN, MAX_QUEUE);
}
if self
.user_config
.processors
.flow_log
.tunning
.quadruple_generator_queue_size
== 0
{
self.user_config
.processors
.flow_log
.tunning
.quadruple_generator_queue_size = queue_base.clamp(QUAD_MIN, MAX_QUEUE);
}
if self.user_config.outputs.flow_log.tunning.collector_queue_size == 0 {
self.user_config.outputs.flow_log.tunning.collector_queue_size =
queue_base.clamp(FLOW_MIN, MAX_QUEUE);
}
if self.user_config.outputs.flow_metrics.tunning.sender_queue_size == 0 {
self.user_config.outputs.flow_metrics.tunning.sender_queue_size =
queue_base.clamp(FLOW_MIN, MAX_QUEUE);
}
}
}

#[cfg(test)]
Expand Down
Loading