Skip to content

Commit a46a587

Browse files
committed
chore: Added a data collection agent
1 parent 3e6e912 commit a46a587

File tree

8 files changed

+128
-15
lines changed

8 files changed

+128
-15
lines changed

control-plane/config/samples/loads.yaml

Lines changed: 4 additions & 6 deletions
Large diffs are not rendered by default.

gazer/deployment.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,13 @@ rules:
227227
- get
228228
- list
229229
- watch
230+
- apiGroups: ["apps"]
231+
resources:
232+
- deployments
233+
verbs:
234+
- get
235+
- list
236+
- watch
230237
- apiGroups:
231238
- extensions
232239
resources:

inspector/deployment.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ spec:
1919
command: ["inspector"]
2020
# args: ["-c", "while true; do echo hello; sleep 10;done"]
2121
imagePullPolicy: Always
22-
image: ghcr.io/mrsupiri/lazy-koala/inspector:commit-342ac6a8
22+
image: ghcr.io/mrsupiri/lazy-koala/inspector:latest
2323
ports:
2424
- containerPort: 8090
2525
name: http

scripts/data-exporter.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
from flask import Flask, request, jsonify
2+
from datetime import datetime
3+
import pymongo
4+
import os
5+
6+
app = Flask(__name__)
7+
8+
9+
myclient = pymongo.MongoClient(os.getenv("MONGODB_URI"))
10+
mydb = myclient["metrics"]
11+
12+
13+
@app.route('/save', methods=['GET', 'POST'])
14+
def add_message():
15+
content = request.json
16+
mycol = mydb[content['service']]
17+
x = mycol.insert_one({"data": content['data'], "time": datetime.now()})
18+
return jsonify({"success": True})
19+
20+
if __name__ == '__main__':
21+
app.run(host= '0.0.0.0',debug=True)
22+
23+
24+
25+
26+
27+
28+
29+
# import grequests
30+
# import urllib.parse
31+
# import numpy as np
32+
33+
# services = [
34+
# "service-1-18f76028",
35+
# "service-10-18f76028",
36+
# "service-2-18f76028",
37+
# "service-3-18f76028",
38+
# "service-4-18f76028",
39+
# "service-5-18f76028",
40+
# "service-6-18f76028",
41+
# "service-7-18f76028",
42+
# "service-8-18f76028",
43+
# "service-9-18f76028",
44+
# ]
45+
46+
# samples = ["1m", "5m", "15m"]
47+
48+
# metrics = [
49+
# 'rate(requests_sent_total{serviceName="SERVICE_NAME"}[SAMPLE])',
50+
# 'sum by (serviceName) (rate(requests_received_total{serviceName="SERVICE_NAME"}[SAMPLE]))',
51+
# 'rate(request_duration_seconds_sum{serviceName="SERVICE_NAME"}[SAMPLE])',
52+
# 'avg_over_time(cpu_seconds{serviceName="SERVICE_NAME"}[SAMPLE])',
53+
# 'avg_over_time(memory_usage_bytes{serviceName="SERVICE_NAME"}[SAMPLE])',
54+
# 'rate(acknowledged_bytes_sum{serviceName="SERVICE_NAME"}[SAMPLE])',
55+
# 'rate(transmitted_bytes_sum{serviceName="SERVICE_NAME"}[SAMPLE])',
56+
# 'avg_over_time(backlog{level="1",serviceName="SERVICE_NAME"}[SAMPLE])',
57+
# 'sum by (serviceName) (avg_over_time(backlog{level!="1",serviceName="SERVICE_NAME"}[SAMPLE]))',
58+
# ]
59+
60+
# requests = []
61+
62+
# def chunks(l, n):
63+
# n = max(1, n)
64+
# return (l[i:i+n] for i in range(0, len(l), n))
65+
66+
# for service in services:
67+
# for sample in samples:
68+
# for metric in metrics:
69+
# query = metric.replace("SERVICE_NAME", service).replace("SAMPLE", sample)
70+
# url = "http://127.0.0.1:9090/api/v1/query?query="+urllib.parse.quote_plus(query)
71+
# requests.append(grequests.get(url))
72+
# respon = grequests.map(requests)
73+
# f
74+
# break;

scripts/loadTemplete.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import json
22
import random
33

4-
services = list(range(1,10))
4+
services = list(range(1,11))
55

66
random.shuffle(services)
77

sherlock/deployment.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,15 @@ spec:
3232
volumeMounts:
3333
- name: sherlock-config
3434
mountPath: /app/config
35+
- name: data-exporter
36+
imagePullPolicy: Always
37+
image: asia.gcr.io/iconicto/data-exporter:latest
38+
ports:
39+
- containerPort: 5000
40+
name: http
41+
env:
42+
- name: MONGODB_URI
43+
value: "http://localhost:8501/v1/models"
3544
- image: google/cloud-sdk
3645
name: model-poller
3746
imagePullPolicy: IfNotPresent

sherlock/src/inference.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,27 @@ async fn query_model(service: &str, input: [[[f64; 1]; 9]; 10]) -> Result<f64, B
6464
Ok(mse)
6565
}
6666

67+
async fn save(service: &str, input: [[[f64; 3]; 9]; 10]) -> Result<(), Box<dyn std::error::Error>> {
68+
let query = json!({
69+
"service": service,
70+
"data": input,
71+
});
72+
73+
let client = reqwest::Client::new();
74+
let _res = client.post("http://localhost:5000/save")
75+
.json::<serde_json::Value>(&query)
76+
.send()
77+
.await?;
78+
79+
Ok(())
80+
}
81+
6782
async fn calculate_anomaly_score(service: &str, args: &InferenceData) -> Result<(), Box<dyn std::error::Error>> {
6883
println!("Calculate anomaly score for {} using {}", service, &args.model_name);
6984
let input = build_telemetry_matrix(&service).await?;
70-
let score = query_model(&args.model_name, input).await?;
85+
save(&service, input).await?;
86+
// let score = query_model(&args.model_name, input).await?;
87+
let score = 0.4;
7188
ANOMLAY_GAUGE.with_label_values(&[service, &args.namespace]).set(score);
7289
println!("Anomaly score for {}: {}", service, score);
7390
Ok(())

sherlock/src/query.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ lazy_static! {
1818
r#"avg_over_time(backlog{level="1",serviceName="SERVICE_NAME"}[SAMPLE])"#,
1919
r#"sum by (serviceName) (avg_over_time(backlog{level!="1",serviceName="SERVICE_NAME"}[SAMPLE]))"#
2020
];
21+
22+
static ref SAMPLES: [&'static str; 3] = [
23+
"1m",
24+
"5m",
25+
"15m"
26+
];
2127
}
2228

2329

@@ -31,8 +37,8 @@ async fn query_prometheus(query: &str, time: DateTime<Local>) -> Result<f64, Bo
3137
Ok(value)
3238
}
3339

34-
pub async fn build_telemetry_matrix(service: &str) -> Result<[[[f64; 1]; 9]; 10], Box<dyn std::error::Error>> {
35-
let mut data: [[[f64; 1]; 9]; 10] = [[[0.0; 1]; 9]; 10];
40+
pub async fn build_telemetry_matrix(service: &str) -> Result<[[[f64; 3]; 9]; 10], Box<dyn std::error::Error>> {
41+
let mut data: [[[f64; 3]; 9]; 10] = [[[0.0; 3]; 9]; 10];
3642

3743
let time_steps: [DateTime<Local>; 10] = [
3844
Local::now(),
@@ -49,10 +55,12 @@ pub async fn build_telemetry_matrix(service: &str) -> Result<[[[f64; 1]; 9]; 10]
4955

5056
for (x, time_step) in time_steps.iter().enumerate() {
5157
for (y, metric) in METRICS.iter().enumerate() {
52-
let query = &metric.replace("SERVICE_NAME", service).replace("SAMPLE", "1m");
53-
match query_prometheus(query, *time_step).await {
54-
Ok(value) => data[x][y][0] = value,
55-
Err(e) => return Err(e),
58+
for (z, sample) in SAMPLES.iter().enumerate(){
59+
let query = &metric.replace("SERVICE_NAME", service).replace("SAMPLE", sample);
60+
match query_prometheus(query, *time_step).await {
61+
Ok(value) => data[x][y][z] = value,
62+
Err(e) => return Err(e),
63+
}
5664
}
5765
}
5866
}

0 commit comments

Comments
 (0)