-
Notifications
You must be signed in to change notification settings - Fork 887
Expand file tree
/
Copy pathcommand_handle.py
More file actions
150 lines (127 loc) · 4.62 KB
/
command_handle.py
File metadata and controls
150 lines (127 loc) · 4.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from typing import Optional, Callable, Any, Generator, Union, Tuple
from e2b.envd.rpc import handle_rpc_exception
from e2b.envd.process import process_pb2
from e2b.sandbox.commands.command_handle import (
CommandExitException,
CommandResult,
Stderr,
Stdout,
PtyOutput,
)
class CommandHandle:
"""
Command execution handle.
It provides methods for waiting for the command to finish, retrieving stdout/stderr, and killing the command.
"""
@property
def pid(self):
"""
Command process ID.
"""
return self._pid
def __init__(
self,
pid: int,
handle_kill: Callable[[], bool],
events: Generator[
Union[process_pb2.StartResponse, process_pb2.ConnectResponse], Any, None
],
):
self._pid = pid
self._handle_kill = handle_kill
self._events = events
self._stdout_chunks: list[str] = []
self._stderr_chunks: list[str] = []
self._result: Optional[CommandResult] = None
self._iteration_exception: Optional[Exception] = None
def __iter__(self):
"""
Iterate over the command output.
:return: Generator of command outputs
"""
return self._handle_events()
def _handle_events(
self,
) -> Generator[
Union[
Tuple[Stdout, None, None],
Tuple[None, Stderr, None],
Tuple[None, None, PtyOutput],
],
None,
None,
]:
try:
for event in self._events:
if event.event.HasField("data"):
if event.event.data.stdout:
out = event.event.data.stdout.decode("utf-8", "replace")
self._stdout_chunks.append(out)
yield out, None, None
if event.event.data.stderr:
out = event.event.data.stderr.decode("utf-8", "replace")
self._stderr_chunks.append(out)
yield None, out, None
if event.event.data.pty:
yield None, None, event.event.data.pty
if event.event.HasField("end"):
self._result = CommandResult(
stdout="".join(self._stdout_chunks),
stderr="".join(self._stderr_chunks),
exit_code=event.event.end.exit_code,
error=event.event.end.error,
)
except Exception as e:
raise handle_rpc_exception(e)
def disconnect(self) -> None:
"""
Disconnect from the command.
The command is not killed, but SDK stops receiving events from the command.
You can reconnect to the command using `sandbox.commands.connect` method.
"""
self._events.close()
def wait(
self,
on_pty: Optional[Callable[[PtyOutput], None]] = None,
on_stdout: Optional[Callable[[str], None]] = None,
on_stderr: Optional[Callable[[str], None]] = None,
) -> CommandResult:
"""
Wait for the command to finish and returns the result.
If the command exits with a non-zero exit code, it throws a `CommandExitException`.
:param on_pty: Callback for pty output
:param on_stdout: Callback for stdout output
:param on_stderr: Callback for stderr output
:return: `CommandResult` result of command execution
"""
try:
for stdout, stderr, pty in self:
if stdout is not None and on_stdout:
on_stdout(stdout)
elif stderr is not None and on_stderr:
on_stderr(stderr)
elif pty is not None and on_pty:
on_pty(pty)
except StopIteration:
pass
except Exception as e:
self._iteration_exception = handle_rpc_exception(e)
if self._iteration_exception:
raise self._iteration_exception
if self._result is None:
raise Exception("Command ended without an end event")
if self._result.exit_code != 0:
raise CommandExitException(
stdout=self._result.stdout,
stderr=self._result.stderr,
exit_code=self._result.exit_code,
error=self._result.error,
)
return self._result
def kill(self) -> bool:
"""
Kills the command.
It uses `SIGKILL` signal to kill the command.
:return: Whether the command was killed successfully
"""
return self._handle_kill()