Skip to content

Commit 52a84c2

Browse files
committed
ft-analyzer add active time and in cache time
1 parent b1c985a commit 52a84c2

File tree

3 files changed

+82
-21
lines changed

3 files changed

+82
-21
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
from .counter import Counter
22
from .continuous_counter import ContinuousCounter
33
from .time_series_counter import TimeSeriesCounter
4+
from .discrete_counter import DiscreteCounter

tools/ft-analyzer/ftanalyzer/events/events.py

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
"PACKETS": np.uint64,
3333
"BYTES": np.uint64,
3434
"FLOWS": np.uint64,
35+
"ACTIVE_TIME": np.uint64,
36+
"CACHE_TIME": np.uint64,
3537
}
3638

3739
STATS_CSV_COLUMN_TYPES = {
@@ -120,26 +122,43 @@ class FlowEndEvent(Event):
120122
time = 0
121123
flow_rate: float
122124
flows: int
123-
124-
def __init__(self, data_rate, packet_rate, end_time, flow_rate, flows):
125+
active_time: np.uint64
126+
cache_time: np.uint64
127+
128+
def __init__(
129+
self,
130+
data_rate,
131+
packet_rate,
132+
end_time,
133+
flow_rate,
134+
flows,
135+
active_time,
136+
cache_time,
137+
):
125138
self.data_rate = -data_rate
126139
self.packet_rate = -packet_rate
127140
self.time = end_time
128141
self.flow_rate = -flow_rate
129142
self.flows = -int(flows)
143+
self.active_time = active_time
144+
self.cache_time = cache_time
130145

131146

132147
class OnePacketFlow(Event):
133148
bytes: np.uint64
134149
packets: np.uint64
135150
time = 0
136151
flows: np.uint64
152+
active_time: np.uint64
153+
cache_time: np.uint64
137154

138-
def __init__(self, bytes, packets, time, flows):
155+
def __init__(self, bytes, packets, time, flows, active_time, cache_time):
139156
self.bytes = bytes
140157
self.packets = packets
141158
self.time = time
142159
self.flows = flows
160+
self.active_time = active_time
161+
self.cache_time = cache_time
143162

144163

145164
class ExportEvent(Event):
@@ -244,6 +263,10 @@ def create_event_queue(
244263
temp_end = tempfile.NamedTemporaryFile(
245264
mode="w", prefix="end_time", suffix=".csv", dir=out_dir, delete=False
246265
)
266+
temp_export = tempfile.NamedTemporaryFile(
267+
mode="w", prefix="export_time", suffix=".csv", dir=out_dir, delete=False
268+
)
269+
tmp_export.append(temp_export)
247270
tmp_start_time.append(temp_start)
248271
tmp_end_time.append(temp_end)
249272
tmp_one_pack.append(temp_one)
@@ -255,10 +278,23 @@ def create_event_queue(
255278
else:
256279
end = max(end, math.ceil(chunk["END_TIME"].max() / 1000))
257280

281+
# Export Events
282+
if "EXPORT_TIME" not in chunk.columns:
283+
# accurate expected EXPORT_TIME
284+
chunk["EXPORT_TIME"] = chunk["END_TIME"] // 1000 + inactive_timeout + 1
285+
# approximate SEQ_NUMBER
286+
chunk["SEQ_NUMBER"] = chunk["EXPORT_TIME"] % 32
287+
# random MSG_LENGTH
288+
chunk["MSG_LENGTH"] = random.randint(100, 2048)
289+
chunk["ACTIVE_TIME"] = chunk["END_TIME"] - chunk["START_TIME"] + 1
290+
chunk["CACHE_TIME"] = (
291+
chunk["EXPORT_TIME"] * 1000 - chunk["START_TIME"] + 1
292+
).clip(lower=0)
293+
258294
# One-packet flows
259295
(
260296
chunk[chunk["PACKETS"] == 1]
261-
.groupby("START_TIME", as_index=False)
297+
.groupby(["START_TIME", "ACTIVE_TIME", "CACHE_TIME"], as_index=False)
262298
.agg(**agg_dict)
263299
.sort_values("START_TIME")
264300
.to_csv(
@@ -267,19 +303,6 @@ def create_event_queue(
267303
)
268304
)
269305

270-
# Export Events
271-
if "EXPORT_TIME" not in chunk.columns:
272-
# accurate expected EXPORT_TIME
273-
chunk["EXPORT_TIME"] = chunk["END_TIME"] // 1000 + inactive_timeout + 1
274-
# approximate SEQ_NUMBER
275-
chunk["SEQ_NUMBER"] = chunk["EXPORT_TIME"] % 32
276-
# random MSG_LENGTH
277-
chunk["MSG_LENGTH"] = random.randint(100, 2048)
278-
279-
temp_export = tempfile.NamedTemporaryFile(
280-
mode="w", prefix="export_time", suffix=".csv", dir=out_dir, delete=False
281-
)
282-
tmp_export.append(temp_export)
283306
(
284307
chunk.groupby(["EXPORT_TIME", "SEQ_NUMBER"], as_index=False)
285308
.agg(
@@ -296,7 +319,9 @@ def create_event_queue(
296319
# Multi-packet flows
297320
(
298321
chunk[chunk["PACKETS"] > 1]
299-
.groupby(["START_TIME", "END_TIME"], as_index=False)
322+
.groupby(
323+
["START_TIME", "END_TIME", "ACTIVE_TIME", "CACHE_TIME"], as_index=False
324+
)
300325
.agg(**agg_dict)
301326
.sort_values("START_TIME")
302327
.to_csv(
@@ -307,7 +332,9 @@ def create_event_queue(
307332

308333
(
309334
chunk[chunk["PACKETS"] > 1]
310-
.groupby(["START_TIME", "END_TIME"], as_index=False)
335+
.groupby(
336+
["START_TIME", "END_TIME", "ACTIVE_TIME", "CACHE_TIME"], as_index=False
337+
)
311338
.agg(**agg_dict)
312339
.sort_values("END_TIME")
313340
.to_csv(
@@ -403,6 +430,8 @@ def read_one_packet_events(path: str) -> Iterator[OnePacketFlow]:
403430
packets=row["PACKETS"],
404431
time=row["START_TIME"],
405432
flows=row["FLOWS"],
433+
active_time=row["ACTIVE_TIME"],
434+
cache_time=row["CACHE_TIME"],
406435
)
407436

408437

@@ -433,4 +462,6 @@ def read_end_events(path: str) -> Iterator[FlowEndEvent]:
433462
end_time=row["END_TIME"],
434463
flow_rate=flow_rate,
435464
flows=row["FLOWS"],
465+
active_time=row["ACTIVE_TIME"],
466+
cache_time=row["CACHE_TIME"],
436467
)

tools/ft-analyzer/ftanalyzer/models/statistical_model.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import numpy as np
2424
import pandas as pd
2525
from ftanalyzer.common.pandas_multiprocessing import PandasMultiprocessingHelper
26-
from ftanalyzer.events.events import ExportEvent, FlowStartEvent
26+
from ftanalyzer.events.events import ExportEvent, FlowStartEvent, FlowEndEvent
2727
from ftanalyzer.models.sm_data_types import (
2828
SMException,
2929
SMMetricType,
@@ -34,7 +34,7 @@
3434
)
3535
from ftanalyzer.reports import StatisticalReport
3636
from src.generator.interface import GeneratorStats
37-
from ftanalyzer.counter import ContinuousCounter, TimeSeriesCounter
37+
from ftanalyzer.counter import ContinuousCounter, TimeSeriesCounter, DiscreteCounter
3838
from ftanalyzer.statistic_object import StatisticObject, SimState
3939
from ftanalyzer.events import (
4040
Event,
@@ -665,6 +665,8 @@ def setup_statsitic_objects(
665665
measure_start_time=start_time_offset + inactive_timeout * 1000,
666666
measure_end_time=end_time,
667667
),
668+
"dt_flows_active_time": DiscreteCounter("Flow Duration Active"),
669+
"dr_flows_cache_time": DiscreteCounter("Flow Duration in Cache"),
668670
"tsc_data_rate": TimeSeriesCounter(
669671
"data rate in Gb/s",
670672
sim,
@@ -730,6 +732,22 @@ def setup_statsitic_objects(
730732
measure_start_time=start_time_offset + inactive_timeout * 1000,
731733
measure_end_time=end_time,
732734
),
735+
"tsc_flows_active_time": TimeSeriesCounter(
736+
"Flow Duration Active",
737+
sim,
738+
start_time,
739+
end_time,
740+
measure_start_time=start_time_offset,
741+
measure_end_time=end_time_offset,
742+
),
743+
"tsc_flows_cache_time": TimeSeriesCounter(
744+
"Flow Duration in Cache",
745+
sim,
746+
start_time,
747+
end_time,
748+
measure_start_time=start_time_offset,
749+
measure_end_time=end_time_offset,
750+
),
733751
}
734752

735753
metric_mapping: dict[str, List[str]] = {
@@ -744,6 +762,8 @@ def setup_statsitic_objects(
744762
"tsc_flows_per_export_packet",
745763
"ct_flows_per_export_packet",
746764
],
765+
"active_time": ["dt_flows_active_time", "tsc_flows_active_time"],
766+
"cache_time": ["dt_flows_cache_time", "tsc_flows_cache_time"],
747767
}
748768

749769
return (statistic_objects, metric_mapping)
@@ -817,6 +837,15 @@ def _flush_event_data(
817837
statistic_objects, metric_mapping, **host_stats_event.row._asdict()
818838
)
819839

840+
for event in simultaneous_events:
841+
if isinstance(event, OnePacketFlow) or isinstance(event, FlowEndEvent):
842+
update_statistic_objects(
843+
statistic_objects,
844+
metric_mapping,
845+
active_time=event.active_time,
846+
cache_time=event.cache_time,
847+
)
848+
820849
return last_export
821850

822851

0 commit comments

Comments
 (0)