Skip to content

Commit 65b7079

Browse files
committed
feat: add biz_code/biz_scenario
1 parent 9dbcea2 commit 65b7079

File tree

17 files changed

+163
-4
lines changed

17 files changed

+163
-4
lines changed

agent/crates/enterprise-utils/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ pub mod l7 {
5555
pub const SPAN_ID: &'static str = "";
5656
pub const X_REQUEST_ID: &'static str = "";
5757
pub const HTTP_PROXY_CLIENT: &'static str = "";
58+
pub const BIZ_TYPE: &'static str = "";
59+
pub const BIZ_CODE: &'static str = "";
60+
pub const BIZ_SCENARIO: &'static str = "";
5861

5962
pub fn match_key(&self, _: &str) -> bool {
6063
unimplemented!()
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
DubboInfo { msg_type: Request, event: 0, serial_id: 2, data_type: 128, request_id: 22872, dubbo_version: "2.0.2", service_name: "my.demo.service.UserService", service_version: "0.0.0", method_name: "login", trace_ids: PrioFields([PrioField { prio: 128, field: "" }]), span_id: PrioField { prio: 128, field: "" }, status_code: None, custom_result: None, custom_exception: None, endpoint: Some("my.demo.service.UserService/login"), rrt: 0, req_msg_size: Some(248), resp_msg_size: None, captured_request_byte: 264, captured_response_byte: 0 } is_dubbo: true
1+
DubboInfo { msg_type: Request, event: 0, serial_id: 2, data_type: 128, request_id: 22872, dubbo_version: "2.0.2", service_name: "my.demo.service.UserService", service_version: "0.0.0", method_name: "login", trace_ids: PrioFields([]), span_id: PrioField { prio: 128, field: "" }, status_code: None, custom_result: None, custom_exception: None, endpoint: Some("my.demo.service.UserService/login"), rrt: 0, req_msg_size: Some(248), resp_msg_size: None, captured_request_byte: 264, captured_response_byte: 0 } is_dubbo: true
22
DubboInfo { msg_type: Response, event: 0, serial_id: 2, data_type: 0, request_id: 22872, dubbo_version: "", service_name: "", service_version: "", method_name: "", trace_ids: PrioFields([]), span_id: PrioField { prio: 255, field: "" }, status_code: Some(20), custom_result: None, custom_exception: None, endpoint: Some("my.demo.service.UserService/login"), rrt: 4332, req_msg_size: None, resp_msg_size: Some(191), captured_request_byte: 0, captured_response_byte: 207 } is_dubbo: false

agent/src/flow_generator/protocol_logs.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,10 @@ impl AppProtoLogsBaseInfo {
418418
(self.end_time.as_micros() - self.start_time.as_micros()) as u64
419419
} else {
420420
0
421+
};
422+
423+
if self.biz_type == 0 {
424+
self.biz_type = log.biz_type;
421425
}
422426
}
423427
}

agent/src/flow_generator/protocol_logs/consts.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,5 @@ pub const SERVER_THREADPOOL_EXHAUSTED_ERROR: u8 = 100;
8686

8787
pub const APM_TRACE_ID_ATTR: &str = "apm_trace_id";
8888
pub const APM_SPAN_ID_ATTR: &str = "apm_span_id";
89+
90+
pub const SYS_RESPONSE_CODE_ATTR: &str = "sys_response_code";

agent/src/flow_generator/protocol_logs/http.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,10 @@ pub struct HttpInfo {
301301

302302
#[serde(skip_serializing_if = "value_is_default")]
303303
biz_type: u8,
304+
#[serde(skip_serializing_if = "value_is_default")]
305+
biz_code: String,
306+
#[serde(skip_serializing_if = "value_is_default")]
307+
biz_scenario: String,
304308

305309
#[serde(skip)]
306310
attributes: Vec<KeyVal>,
@@ -329,6 +333,14 @@ impl HttpInfo {
329333
|| self.msg_type == LogMessageType::Other
330334
}
331335

336+
// when response_code is overwritten, put it into the attributes.
337+
fn response_code_to_attribute(&mut self) {
338+
self.attributes.push(KeyVal {
339+
key: SYS_RESPONSE_CODE_ATTR.to_string(),
340+
val: self.status_code.to_string(),
341+
});
342+
}
343+
332344
pub fn merge_custom_to_http(&mut self, custom: CustomInfo, dir: PacketDirection) {
333345
if dir == PacketDirection::ClientToServer {
334346
if let Ok(v) = Version::try_from(custom.req.version.as_str()) {
@@ -358,6 +370,7 @@ impl HttpInfo {
358370

359371
if dir == PacketDirection::ServerToClient {
360372
if let Some(code) = custom.resp.code {
373+
self.response_code_to_attribute();
361374
self.status_code = code as u16;
362375
}
363376

@@ -407,6 +420,12 @@ impl HttpInfo {
407420
if custom.biz_type > 0 {
408421
self.biz_type = custom.biz_type;
409422
}
423+
if let Some(biz_code) = custom.biz_code {
424+
self.biz_code = biz_code;
425+
}
426+
if let Some(biz_scenario) = custom.biz_scenario {
427+
self.biz_scenario = biz_scenario;
428+
}
410429

411430
if let Some(is_async) = custom.is_async {
412431
self.is_async = is_async;
@@ -436,6 +455,7 @@ impl HttpInfo {
436455
self.stream_id = req_id.parse::<u32>().map_or(None, Some);
437456
}
438457
if let Some(resp_code) = tags.remove(ExtraField::RESPONSE_CODE) {
458+
self.response_code_to_attribute();
439459
self.status_code = resp_code.parse::<u16>().unwrap_or_default();
440460
}
441461

@@ -487,6 +507,16 @@ impl HttpInfo {
487507
*x_req_id = PrioField::new(CUSTOM_FIELD_POLICY_PRIORITY, x_request_id)
488508
}
489509
}
510+
511+
if let Some(biz_type) = tags.remove(ExtraField::BIZ_TYPE) {
512+
self.biz_type = biz_type.parse::<u8>().unwrap_or_default();
513+
}
514+
if let Some(biz_code) = tags.remove(ExtraField::BIZ_CODE) {
515+
self.biz_code = biz_code;
516+
}
517+
if let Some(biz_scenario) = tags.remove(ExtraField::BIZ_SCENARIO) {
518+
self.biz_scenario = biz_scenario;
519+
}
490520
}
491521
}
492522

@@ -639,6 +669,8 @@ impl HttpInfo {
639669
if other.biz_type > 0 {
640670
self.biz_type = other.biz_type;
641671
}
672+
super::swap_if!(self, biz_code, is_empty, other);
673+
super::swap_if!(self, biz_scenario, is_empty, other);
642674

643675
let other_trace_ids = std::mem::take(&mut other.trace_ids);
644676
self.trace_ids.merge(other_trace_ids);
@@ -844,6 +876,8 @@ impl From<HttpInfo> for L7ProtocolSendLog {
844876
..Default::default()
845877
}),
846878
flags: flags.bits(),
879+
biz_code: f.biz_code,
880+
biz_scenario: f.biz_scenario,
847881
..Default::default()
848882
}
849883
}

agent/src/flow_generator/protocol_logs/mq/webspheremq.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::{
3030
flow_generator::{
3131
error::{Error, Result},
3232
protocol_logs::{
33+
consts::*,
3334
pb_adapter::{
3435
ExtendedInfo, KeyVal, L7ProtocolSendLog, L7Request, L7Response, TraceInfo,
3536
},
@@ -83,6 +84,11 @@ pub struct WebSphereMqInfo {
8384

8485
#[serde(skip)]
8586
is_on_blacklist: bool,
87+
88+
#[serde(skip_serializing_if = "value_is_default")]
89+
biz_code: String,
90+
#[serde(skip_serializing_if = "value_is_default")]
91+
biz_scenario: String,
8692
}
8793

8894
impl L7ProtocolInfoInterface for WebSphereMqInfo {
@@ -142,6 +148,22 @@ impl WebSphereMqInfo {
142148
}
143149
}
144150

151+
// when response_status is overwritten, put it into the attributes.
152+
fn response_status_to_attribute(&mut self) {
153+
self.attributes.push(KeyVal {
154+
key: SYS_RESPONSE_STATUS_ATTR.to_string(),
155+
val: (self.status as u8).to_string(),
156+
});
157+
}
158+
159+
// when response_code is overwritten, put it into the attributes.
160+
fn response_code_to_attribute(&mut self) {
161+
self.attributes.push(KeyVal {
162+
key: SYS_RESPONSE_CODE_ATTR.to_string(),
163+
val: self.response_code.to_string(),
164+
});
165+
}
166+
145167
fn set_is_on_blacklist(&mut self, config: &LogParserConfig) {
146168
if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::WebSphereMq) {
147169
self.is_on_blacklist = t.request_type.is_on_blacklist(&self.request_type)
@@ -160,10 +182,12 @@ impl WebSphereMqInfo {
160182

161183
//resp rewrite
162184
if let Some(code) = custom.resp.code {
185+
self.response_code_to_attribute();
163186
self.response_code = code;
164187
}
165188

166189
if custom.resp.status != L7ResponseStatus::default() {
190+
self.response_status_to_attribute();
167191
self.status = custom.resp.status;
168192
}
169193

@@ -190,6 +214,13 @@ impl WebSphereMqInfo {
190214
if let Some(is_async) = custom.is_async {
191215
self.is_async = is_async;
192216
}
217+
218+
if let Some(biz_code) = custom.biz_code {
219+
self.biz_code = biz_code;
220+
}
221+
if let Some(biz_scenario) = custom.biz_scenario {
222+
self.biz_scenario = biz_scenario;
223+
}
193224
}
194225
}
195226

@@ -286,13 +317,13 @@ impl L7ProtocolParserInterface for WebSphereMqLog {
286317
}
287318
}
288319

320+
info.is_async = true;
289321
let has_wasm_result = self.wasm_hook(param, payload, &mut info);
290322
if has_wasm && !has_wasm_result {
291323
return Err(Error::L7ProtocolUnknown);
292324
}
293325

294326
info.is_tls = param.is_tls();
295-
info.is_async = true;
296327
set_captured_byte!(info, param);
297328
if let Some(perf_stats) = self.perf_stats.as_mut() {
298329
if let Some(stats) = info.perf_stats(param) {

agent/src/flow_generator/protocol_logs/pb_adapter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ pub struct L7ProtocolSendLog {
9090
pub flags: u32,
9191
pub captured_request_byte: u32,
9292
pub captured_response_byte: u32,
93+
pub biz_code: String,
94+
pub biz_scenario: String,
9395
}
9496

9597
impl L7ProtocolSendLog {
@@ -189,5 +191,7 @@ impl L7ProtocolSendLog {
189191
log.ext_info = Some(ext_info);
190192
}
191193
log.flags = self.flags;
194+
log.biz_code = self.biz_code;
195+
log.biz_scenario = self.biz_scenario;
192196
}
193197
}

agent/src/flow_generator/protocol_logs/rpc/dubbo.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,13 @@ pub struct DubboInfo {
142142
is_on_blacklist: bool,
143143
#[serde(skip)]
144144
endpoint: Option<String>,
145+
146+
#[serde(skip_serializing_if = "value_is_default")]
147+
biz_type: u8,
148+
#[serde(skip_serializing_if = "value_is_default")]
149+
biz_code: String,
150+
#[serde(skip_serializing_if = "value_is_default")]
151+
biz_scenario: String,
145152
}
146153

147154
impl DubboInfo {
@@ -191,6 +198,11 @@ impl DubboInfo {
191198
if other.is_on_blacklist {
192199
self.is_on_blacklist = other.is_on_blacklist;
193200
}
201+
if other.biz_type > 0 {
202+
self.biz_type = other.biz_type;
203+
}
204+
swap_if!(self, biz_code, is_empty, other);
205+
swap_if!(self, biz_scenario, is_empty, other);
194206
}
195207

196208
fn parse_trace_id(trace_info: String, trace_type: &TraceType) -> String {
@@ -251,6 +263,18 @@ impl DubboInfo {
251263
}
252264
}
253265

266+
// when response_code is overwritten, put it into the attributes.
267+
fn response_code_to_attribute(&mut self) {
268+
self.attributes.push(KeyVal {
269+
key: SYS_RESPONSE_CODE_ATTR.to_string(),
270+
val: self
271+
.status_code
272+
.as_ref()
273+
.map(ToString::to_string)
274+
.unwrap_or_default(),
275+
});
276+
}
277+
254278
pub fn merge_custom_info(&mut self, custom: CustomInfo) {
255279
// req rewrite
256280
if !custom.req.domain.is_empty() {
@@ -263,6 +287,7 @@ impl DubboInfo {
263287

264288
//resp rewrite
265289
if let Some(code) = custom.resp.code {
290+
self.response_code_to_attribute();
266291
self.status_code = Some(code);
267292
}
268293

@@ -338,6 +363,7 @@ impl DubboInfo {
338363
self.request_id = req_id.parse::<i64>().unwrap_or_default();
339364
}
340365
if let Some(resp_code) = tags.remove(ExtraField::RESPONSE_CODE) {
366+
self.response_code_to_attribute();
341367
self.status_code = Some(resp_code.parse::<i32>().unwrap_or_default());
342368
}
343369

@@ -384,6 +410,16 @@ impl DubboInfo {
384410
*x_req_id = Some(PrioField::new(CUSTOM_FIELD_POLICY_PRIORITY, x_request_id));
385411
}
386412
}
413+
414+
if let Some(biz_type) = tags.remove(ExtraField::BIZ_TYPE) {
415+
self.biz_type = biz_type.parse::<u8>().unwrap_or_default();
416+
}
417+
if let Some(biz_code) = tags.remove(ExtraField::BIZ_CODE) {
418+
self.biz_code = biz_code;
419+
}
420+
if let Some(biz_scenario) = tags.remove(ExtraField::BIZ_SCENARIO) {
421+
self.biz_scenario = biz_scenario;
422+
}
387423
}
388424
}
389425

agent/src/plugin/WasmPluginApi.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ message AppInfo {
6060
repeated KeyVal attributes = 31;
6161

6262
optional uint32 biz_type = 32;
63+
optional string biz_code = 33;
64+
optional string biz_scenario = 34;
6365
}
6466

6567
message NatsMessage {

agent/src/plugin/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ pub struct CustomInfo {
108108
pub metrics: Vec<MetricKeyVal>,
109109

110110
pub biz_type: u8,
111+
pub biz_code: Option<String>,
112+
pub biz_scenario: Option<String>,
111113

112114
#[serde(skip)]
113115
pub is_on_blacklist: bool,
@@ -444,6 +446,8 @@ impl CustomInfo {
444446
})
445447
.collect(),
446448
biz_type: pb_info.biz_type.unwrap_or_default() as u8,
449+
biz_code: pb_info.biz_code,
450+
biz_scenario: pb_info.biz_scenario,
447451
is_async: pb_info.is_async,
448452
..Default::default()
449453
};
@@ -594,6 +598,9 @@ impl L7ProtocolInfoInterface for CustomInfo {
594598
swap_if!(self.trace, http_proxy_client, is_none, w.trace);
595599

596600
self.attributes.append(&mut w.attributes);
601+
602+
swap_if!(self, biz_code, is_none, w);
603+
swap_if!(self, biz_scenario, is_none, w);
597604
}
598605
Ok(())
599606
}

0 commit comments

Comments
 (0)