-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathclient.py
More file actions
83 lines (67 loc) · 2.67 KB
/
client.py
File metadata and controls
83 lines (67 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""defining the object client for the simulation"""
from collections.abc import Generator
from typing import TYPE_CHECKING
import simpy
from asyncflow.config.constants import SystemNodes
from asyncflow.metrics.client import RqsClock
from asyncflow.runtime.actors.edge import EdgeRuntime
from asyncflow.schemas.topology.nodes import Client
if TYPE_CHECKING:
from asyncflow.runtime.rqs_state import RequestState
class ClientRuntime:
"""class to define the client runtime"""
def __init__(
self,
*,
env: simpy.Environment,
out_edge: EdgeRuntime | None,
client_box: simpy.Store,
completed_box: simpy.Store,
client_config: Client,
) -> None:
"""Definition of attributes for the client"""
self.env = env
self.out_edge = out_edge
self.client_config = client_config
self.client_box = client_box
self.completed_box = completed_box
# This list will be enough to calculate at the end
# of the simulation both the throughput and the
# latency distribution
self._rqs_clock: list[RqsClock] = []
def _forwarder(self) -> Generator[simpy.Event, None, None]:
"""Updtate the state before passing it to another node"""
assert self.out_edge is not None
while True:
state: RequestState = yield self.client_box.get() # type: ignore[assignment]
state.record_hop(
SystemNodes.CLIENT,
self.client_config.id,
self.env.now,
)
# if the length of the list is bigger than two
# it means that the state is coming back to the
# client after being elaborated, since if the value
# would be equal to two would mean that the state
# went through the mandatory path to be generated
# rqs generator and client registration
if len(state.history) > 3:
state.finish_time = self.env.now
clock_data = RqsClock(
start=state.initial_time,
finish=state.finish_time,
)
self._rqs_clock.append(clock_data)
yield self.completed_box.put(state)
else:
self.out_edge.transport(state)
def start(self) -> simpy.Process:
"""Initialization of the process"""
return self.env.process(self._forwarder())
@property
def rqs_clock(self) -> list[RqsClock]:
"""
Expose the value of the private list of the starting
and arrival time for each rqs just for reading purpose
"""
return self._rqs_clock