Skip to content

Commit 0377dd9

Browse files
committed
ft-analyzer aggregate flows before creating events
1 parent 3f71f70 commit 0377dd9

File tree

4 files changed

+65
-25
lines changed

4 files changed

+65
-25
lines changed

tools/ft-analyzer/ftanalyzer/events/events.py

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,14 @@
2424
"MSG_LENGTH": np.uint64,
2525
}
2626

27+
CSV_AGGREGATE_TYPES = {
28+
"START_TIME": np.uint64,
29+
"END_TIME": np.uint64,
30+
"PACKETS": np.uint64,
31+
"BYTES": np.uint64,
32+
"FLOWS": np.uint64,
33+
}
34+
2735
STATS_CSV_COLUMN_TYPES = {
2836
"Time": np.uint64,
2937
"UID": np.uint64,
@@ -94,36 +102,42 @@ class FlowStartEvent(Event):
94102
packet_rate: float
95103
time = 0
96104
flow_rate: float
105+
flows: int
97106

98-
def __init__(self, data_rate, packet_rate, start_time, flow_rate):
107+
def __init__(self, data_rate, packet_rate, start_time, flow_rate, flows):
99108
self.data_rate = data_rate
100109
self.packet_rate = packet_rate
101110
self.time = start_time
102111
self.flow_rate = flow_rate
112+
self.flows = flows
103113

104114

105115
class FlowEndEvent(Event):
106116
data_rate: float
107117
packet_rate: float
108118
time = 0
109119
flow_rate: float
120+
flows: int
110121

111-
def __init__(self, data_rate, packet_rate, end_time, flow_rate):
122+
def __init__(self, data_rate, packet_rate, end_time, flow_rate, flows):
112123
self.data_rate = -data_rate
113124
self.packet_rate = -packet_rate
114125
self.time = end_time
115126
self.flow_rate = -flow_rate
127+
self.flows = -flows
116128

117129

118130
class OnePacketFlow(Event):
119131
bytes: np.uint64
120132
packets: np.uint64
121133
time = 0
134+
flows: np.uint64
122135

123-
def __init__(self, bytes, packets, time):
136+
def __init__(self, bytes, packets, time, flows):
124137
self.bytes = bytes
125138
self.packets = packets
126139
self.time = time
140+
self.flows = flows
127141

128142

129143
class ExportEvent(Event):
@@ -188,12 +202,26 @@ def create_event_queue(
188202

189203
stats_df.to_csv(hosts_stats_file, sep=";", index=False)
190204

205+
agg_dict = {
206+
"PACKETS": ("PACKETS", "sum"),
207+
"BYTES": ("BYTES", "sum"),
208+
"FLOWS": ("PACKETS", "count"),
209+
}
191210
# One-packet flows
192-
one_packet_df = df[df["PACKETS"] == 1].sort_values("START_TIME")
193-
one_packet_df.to_csv(one_packet_path, index=False)
211+
(
212+
df[df["PACKETS"] == 1]
213+
.groupby("START_TIME", as_index=False)
214+
.agg(**agg_dict)
215+
.sort_values("START_TIME")
216+
.to_csv(one_packet_path, index=False)
217+
)
194218

195219
# Multi-packet flows
196-
multi_df = df[df["PACKETS"] > 1]
220+
multi_df = (
221+
df[df["PACKETS"] > 1]
222+
.groupby(["START_TIME", "END_TIME"], as_index=False)
223+
.agg(**agg_dict)
224+
)
197225
multi_df.sort_values("START_TIME").to_csv(sorted_by_start_path, index=False)
198226
multi_df.sort_values("END_TIME").to_csv(sorted_by_end_path, index=False)
199227

@@ -245,21 +273,25 @@ def read_host_stats_events(path: os.PathLike):
245273

246274

247275
def read_one_packet_events(path: str) -> Iterator[OnePacketFlow]:
248-
for chunk in pd.read_csv(path, dtype=CSV_COLUMN_TYPES, chunksize=100_000):
276+
CSV_AGGREGATE_TYPES_NO_END = {
277+
k: v for k, v in CSV_AGGREGATE_TYPES.items() if k != "END_TIME"
278+
}
279+
for chunk in pd.read_csv(path, dtype=CSV_AGGREGATE_TYPES_NO_END, chunksize=100_000):
249280
for row in chunk.itertuples(index=False):
250281
yield OnePacketFlow(
251282
bytes=np.uint64(row.BYTES),
252283
packets=np.uint64(row.PACKETS),
253284
time=np.uint64(row.START_TIME),
285+
flows=row.FLOWS,
254286
)
255287

256288

257289
def read_start_events(path: str) -> Iterator[FlowStartEvent]:
258-
for chunk in pd.read_csv(path, dtype=CSV_COLUMN_TYPES, chunksize=100_000):
290+
for chunk in pd.read_csv(path, dtype=CSV_AGGREGATE_TYPES, chunksize=100_000):
259291
durations = (chunk.END_TIME - chunk.START_TIME + 1) / 1_000
260292
data_rates = (chunk.BYTES * 8) / durations
261293
packet_rates = chunk.PACKETS / durations
262-
flow_rates = 1 / durations
294+
flow_rates = chunk.FLOWS / durations
263295
for row, dr, pr, fr in zip(
264296
chunk.itertuples(index=False), data_rates, packet_rates, flow_rates
265297
):
@@ -268,15 +300,16 @@ def read_start_events(path: str) -> Iterator[FlowStartEvent]:
268300
packet_rate=pr,
269301
start_time=np.uint64(row.START_TIME),
270302
flow_rate=fr,
303+
flows=row.FLOWS,
271304
)
272305

273306

274307
def read_end_events(path: str) -> Iterator[FlowEndEvent]:
275-
for chunk in pd.read_csv(path, dtype=CSV_COLUMN_TYPES, chunksize=100_000):
308+
for chunk in pd.read_csv(path, dtype=CSV_AGGREGATE_TYPES, chunksize=100_000):
276309
durations = (chunk.END_TIME - chunk.START_TIME + 1) / 1_000
277310
data_rates = (chunk.BYTES * 8) / durations
278311
packet_rates = chunk.PACKETS / durations
279-
flow_rates = 1 / durations
312+
flow_rates = chunk.FLOWS / durations
280313
for row, dr, pr, fr in zip(
281314
chunk.itertuples(index=False), data_rates, packet_rates, flow_rates
282315
):
@@ -285,4 +318,5 @@ def read_end_events(path: str) -> Iterator[FlowEndEvent]:
285318
packet_rate=pr,
286319
end_time=np.uint64(row.END_TIME),
287320
flow_rate=fr,
321+
flows=row.FLOWS,
288322
)

tools/ft-analyzer/ftanalyzer/models/statistical_model.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"""
88

99
import atexit
10-
from concurrent.futures import ProcessPoolExecutor
10+
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
1111
import ipaddress
1212
import logging
1313
import operator
@@ -40,10 +40,14 @@
4040
Event,
4141
HostStatsEvent,
4242
OnePacketFlow,
43-
FlowEndEvent,
44-
FlowStartEvent,
4543
create_event_queue,
4644
)
45+
import sys
46+
47+
48+
def is_debugger_active():
49+
return sys.gettrace() is not None
50+
4751

4852
_TEMP_FILES = []
4953

@@ -221,7 +225,9 @@ def __init__(
221225

222226
if use_statistical_counter:
223227
# statistic objects
224-
self._executor = ProcessPoolExecutor()
228+
self._executor = (
229+
ThreadPoolExecutor() if is_debugger_active() else ProcessPoolExecutor()
230+
)
225231
self._future_sim = self._executor.submit(
226232
self._run_sim,
227233
host_stats,
@@ -842,7 +848,7 @@ def process_events(
842848
# aggregate OnePacketFlow events within this window
843849
total_bytes = sum(e.bytes for e in one_packet_events)
844850
total_packets = sum(e.packets for e in one_packet_events)
845-
total_flows = np.uint64(len(one_packet_events))
851+
total_flows = np.uint64(sum(e.flows for e in one_packet_events))
846852

847853
singleton_data_rate = (
848854
(total_bytes * 8) / duration_s if duration_s > 0 else 0.0
@@ -921,10 +927,7 @@ def process_events(
921927
current_data_rate += e.data_rate
922928
current_packet_rate += e.packet_rate
923929
current_flow_rate += e.flow_rate
924-
if isinstance(e, FlowStartEvent):
925-
current_flow_count += np.uint64(1)
926-
elif isinstance(e, FlowEndEvent):
927-
current_flow_count -= np.uint64(1)
930+
current_flow_count += e.flows
928931

929932
sim.set_time(event.time)
930933
simultaneous_events = [event]
@@ -937,7 +940,7 @@ def process_events(
937940
# aggregate OnePacketFlow events within this window
938941
total_bytes = sum(e.bytes for e in one_packet_events)
939942
total_packets = sum(e.packets for e in one_packet_events)
940-
total_flows = np.uint64(len(one_packet_events))
943+
total_flows = np.uint64(sum(e.flows for e in one_packet_events))
941944

942945
singleton_data_rate = (total_bytes * 8) / duration_s if duration_s > 0 else 0.0
943946
singleton_packet_rate = total_packets / duration_s if duration_s > 0 else 0.0

tools/ft-orchestration/src/collector/jq.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,12 @@ def __init__(self, executor: Executor, file: str):
7171
self._file = stdout.strip()
7272
tmp_file = path.join(self._rsync.get_data_directory(), "flows.json")
7373
self._cmd_json = f"ipfixcol2 -c {Path(self._conf_dir, self.CONFIG_FILE)}"
74-
fields = ", ".join([f'.\\"{field}\\"' for field in CSV_HEADER_TO_ANALYZER_HEADER.keys()])
75-
header = ",".join([f'"{field}"' for field in CSV_HEADER_TO_ANALYZER_HEADER.keys()])
74+
fields = ", ".join(
75+
[f'.\\"{field}\\"' for field in CSV_HEADER_TO_ANALYZER_HEADER.keys()]
76+
)
77+
header = ",".join(
78+
[f'"{field}"' for field in CSV_HEADER_TO_ANALYZER_HEADER.keys()]
79+
)
7680
self._cmd_csv = f"""set -e
7781
ipfixcol2 -c {Path(self._conf_dir, self.CONFIG_FILE)} > {tmp_file}
7882
echo {header}

tools/ft-orchestration/tests/simulation/test_simulation_threshold.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,8 +294,7 @@ def run_single_test(loops: int, speed: MbpsSpeed) -> tuple[bool, StatisticalRepo
294294
# round up to required accuracy
295295
if speed_current % scenario.test.mbps_accuracy > 0:
296296
speed_current = speed_current + (
297-
scenario.test.mbps_accuracy
298-
- speed_current % scenario.test.mbps_accuracy
297+
scenario.test.mbps_accuracy - speed_current % scenario.test.mbps_accuracy
299298
)
300299
while True:
301300
# setup log path

0 commit comments

Comments
 (0)