-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathcollector.py
More file actions
78 lines (58 loc) · 2.79 KB
/
collector.py
File metadata and controls
78 lines (58 loc) · 2.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"""class to centralized the the collection of time series regarding metrics"""
from collections.abc import Generator
import simpy
from asyncflow.config.constants import SampledMetricName
from asyncflow.runtime.actors.edge import EdgeRuntime
from asyncflow.runtime.actors.server import ServerRuntime
from asyncflow.schemas.settings.simulation import SimulationSettings
# The idea for this class is to gather list of runtime objects that
# are defined in the central class to build the simulation, in this
# way we optimize the initialization of various objects reducing
# the global overhead
class SampledMetricCollector:
"""class to define a centralized object to collect sampled metrics"""
def __init__(
self,
*,
edges: list[EdgeRuntime],
servers: list[ServerRuntime],
env: simpy.Environment,
sim_settings: SimulationSettings,
) -> None:
"""
Args:
edges (list[EdgeRuntime]): list of the class EdgeRuntime
servers (list[ServerRuntime]): list of server of the class ServerRuntime
env (simpy.Environment): environment for the simulation
sim_settings (SimulationSettings): general settings for the simulation
"""
self.edges = edges
self.servers = servers
self.sim_settings = sim_settings
self.env = env
self._sample_period = sim_settings.sample_period_s
# enum keys instance-level for mandatory sampled metrics to collect
self._conn_key = SampledMetricName.EDGE_CONCURRENT_CONNECTION
self._ram_key = SampledMetricName.RAM_IN_USE
self._io_key = SampledMetricName.EVENT_LOOP_IO_SLEEP
self._ready_key = SampledMetricName.READY_QUEUE_LEN
def _build_time_series(self) -> Generator[simpy.Event, None, None]:
"""Function to build time series for enabled metrics"""
while True:
yield self.env.timeout(self._sample_period)
for edge in self.edges:
if self._conn_key in edge.enabled_metrics:
edge.enabled_metrics[self._conn_key].append(
edge.concurrent_connections,
)
for server in self.servers:
if all(
k in server.enabled_metrics
for k in (self._ram_key, self._io_key, self._ready_key)
):
server.enabled_metrics[self._ram_key].append(server.ram_in_use)
server.enabled_metrics[self._io_key].append(server.io_queue_len)
server.enabled_metrics[self._ready_key].append(server.ready_queue_len)
def start(self) -> simpy.Process:
"""Definition of the process to collect sampled metrics"""
return self.env.process(self._build_time_series())