-
Notifications
You must be signed in to change notification settings - Fork 887
Expand file tree
/
Copy pathcommand_handle.py
More file actions
197 lines (169 loc) · 5.65 KB
/
command_handle.py
File metadata and controls
197 lines (169 loc) · 5.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import asyncio
import inspect
from typing import (
Optional,
Callable,
Any,
AsyncGenerator,
Union,
Tuple,
Coroutine,
)
from e2b.envd.rpc import handle_rpc_exception
from e2b.envd.process import process_pb2
from e2b.sandbox.commands.command_handle import (
CommandExitException,
CommandResult,
Stderr,
Stdout,
PtyOutput,
)
from e2b.sandbox_async.utils import OutputHandler
class AsyncCommandHandle:
"""
Command execution handle.
It provides methods for waiting for the command to finish, retrieving stdout/stderr, and killing the command.
"""
@property
def pid(self):
"""
Command process ID.
"""
return self._pid
@property
def stdout(self):
"""
Command stdout output.
"""
return self._stdout
@property
def stderr(self):
"""
Command stderr output.
"""
return self._stderr
@property
def error(self):
"""
Command execution error message.
"""
if self._result is None:
return None
return self._result.error
@property
def exit_code(self):
"""
Command execution exit code.
`0` if the command finished successfully.
It is `None` if the command is still running.
"""
if self._result is None:
return None
return self._result.exit_code
def __init__(
self,
pid: int,
handle_kill: Callable[[], Coroutine[Any, Any, bool]],
events: AsyncGenerator[
Union[process_pb2.StartResponse, process_pb2.ConnectResponse], Any
],
on_stdout: Optional[OutputHandler[Stdout]] = None,
on_stderr: Optional[OutputHandler[Stderr]] = None,
on_pty: Optional[OutputHandler[PtyOutput]] = None,
):
self._pid = pid
self._handle_kill = handle_kill
self._events = events
self._stdout: str = ""
self._stderr: str = ""
self._on_stdout = on_stdout
self._on_stderr = on_stderr
self._on_pty = on_pty
self._result: Optional[CommandResult] = None
self._iteration_exception: Optional[Exception] = None
self._wait = asyncio.create_task(self._handle_events())
async def _iterate_events(
self,
) -> AsyncGenerator[
Union[
Tuple[Stdout, None, None],
Tuple[None, Stderr, None],
Tuple[None, None, PtyOutput],
],
None,
]:
async for event in self._events:
if event.event.HasField("data"):
if event.event.data.stdout:
out = event.event.data.stdout.decode("utf-8", "replace")
self._stdout += out
yield out, None, None
if event.event.data.stderr:
out = event.event.data.stderr.decode("utf-8", "replace")
self._stderr += out
yield None, out, None
if event.event.data.pty:
yield None, None, event.event.data.pty
if event.event.HasField("end"):
self._result = CommandResult(
stdout=self._stdout,
stderr=self._stderr,
exit_code=event.event.end.exit_code,
error=event.event.end.error,
)
return
async def disconnect(self) -> None:
"""
Disconnects from the command.
The command is not killed, but SDK stops receiving events from the command.
You can reconnect to the command using `sandbox.commands.connect` method.
"""
self._wait.cancel()
# BUG: In Python 3.8 closing async generator can throw RuntimeError.
# await self._events.aclose()
async def _handle_events(self):
try:
async for stdout, stderr, pty in self._iterate_events():
if stdout is not None and self._on_stdout:
cb = self._on_stdout(stdout)
if inspect.isawaitable(cb):
await cb
elif stderr is not None and self._on_stderr:
cb = self._on_stderr(stderr)
if inspect.isawaitable(cb):
await cb
elif pty is not None and self._on_pty:
cb = self._on_pty(pty)
if inspect.isawaitable(cb):
await cb
except StopAsyncIteration:
pass
except Exception as e:
self._iteration_exception = handle_rpc_exception(e)
async def wait(self) -> CommandResult:
"""
Wait for the command to finish and return the result.
If the command exits with a non-zero exit code, it throws a `CommandExitException`.
:return: `CommandResult` result of command execution
"""
await self._wait
if self._iteration_exception:
raise self._iteration_exception
if self._result is None:
raise Exception("Command ended without an end event")
if self._result.exit_code != 0:
raise CommandExitException(
stdout=self._stdout,
stderr=self._stderr,
exit_code=self._result.exit_code,
error=self._result.error,
)
return self._result
async def kill(self) -> bool:
"""
Kills the command.
It uses `SIGKILL` signal to kill the command
:return: `True` if the command was killed successfully, `False` if the command was not found
"""
result = await self._handle_kill()
return result