Skip to content

Commit fc347c5

Browse files
committed
Add error capturing and logging for job/task process failures
1 parent 95c3a37 commit fc347c5

3 files changed

Lines changed: 20 additions & 15 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
## [v3.4.1.dev0]
44

55
### Added
6-
-
6+
- [Localhost] Added error capturing and logging for job/task process failures
77

88
### Changed
99
-
@@ -18,13 +18,9 @@
1818
- [Ceph] Added extra region parameter to Ceph backend
1919

2020
### Changed
21-
- [Setup] Moved IBM dependencies to lithops[ibm] extra
22-
- [Setup] Moved AWS dependencies to lithops[aws] extra
23-
- [Setup] Moved kubernetes dependencies to lithops[kubernetes] extra
24-
- [Setup] Moved knative dependencies to lithops[knative] extra
25-
- [Setup] Moved minio dependencies to lithops[minio] extra
26-
- [Setup] Moved ceph dependencies to lithops[ceph] extra
27-
- [Setup] Moved redis dependencies to lithops[redis] extra
21+
- [Setup] Moved IBM and AWS deps to lithops[ibm] and lithops[aws] extra
22+
- [Setup] Moved kubernetes and knative deps to lithops[kubernetes] and lithops[knative] extra
23+
- [Setup] Moved minio, ceph and redis deps to lithops[minio], lithops[ceph] and lithops[redis] extra
2824
- [Setup] Moved matplotlib, seaborn, numpy and pandas dependencies to lithops[plotting] extra
2925
- [Setup] Removed unused 'lxml', 'docker' and 'python-dateutil' packages from the setup.py
3026
- [Core] Detached progress bar from INFO logs

lithops/localhost/v1/localhost.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,10 @@ def job_manager():
114114
logger.debug(f'ExecutorID {executor_id} | JobID {job_id} - Running '
115115
f'{total_calls} activations in the localhost worker')
116116
process = self.env.run_job(job_key, job_filename)
117-
process.communicate() # blocks until the process finishes
117+
stdout, stderr = process.communicate() # blocks until the process finishes
118+
if process.returncode != 0:
119+
logger.error(f"ExecutorID {executor_id} | JobID {job_id} - Job failed with return code {process.returncode}")
120+
logger.error(f"ExecutorID {executor_id} | JobID {job_id} - Error output from job process: {stderr}")
118121
logger.debug(f'ExecutorID {executor_id} | JobID {job_id} - Execution finished')
119122

120123
if self.job_queue.empty():
@@ -301,7 +304,7 @@ def run_job(self, job_key, job_filename):
301304
self.setup()
302305

303306
cmd = [self.runtime_name, RUNNER_FILE, 'run_job', job_filename]
304-
process = sp.Popen(cmd, start_new_session=True)
307+
process = sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE, start_new_session=True)
305308
self.jobs[job_key] = process
306309

307310
return process
@@ -369,7 +372,7 @@ def run_job(self, job_key, job_filename):
369372
cmd += f'--rm -v {tmp_path}:/tmp --entrypoint "python3" '
370373
cmd += f'{self.runtime_name} /tmp/{USER_TEMP_DIR}/localhost-runner.py run_job {job_filename}'
371374

372-
process = sp.Popen(shlex.split(cmd), start_new_session=True)
375+
process = sp.Popen(shlex.split(cmd), stdout=sp.PIPE, stderr=sp.PIPE, start_new_session=True)
373376
self.jobs[job_key] = process
374377

375378
return process

lithops/localhost/v2/localhost.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -321,9 +321,12 @@ def run_task(self, job_key, call_id):
321321

322322
logger.debug(f"Going to execute task process {job_key_call_id}")
323323
cmd = [self.runtime_name, RUNNER_FILE, 'run_job', task_filename]
324-
process = sp.Popen(cmd, start_new_session=True)
324+
process = sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE, start_new_session=True)
325325
self.task_processes[job_key_call_id] = process
326-
process.communicate() # blocks until the process finishes
326+
stdout, stderr = process.communicate() # blocks until the process finishes
327+
if process.returncode != 0:
328+
logger.error(f"Task process {job_key_call_id} failed with return code {process.returncode}")
329+
logger.error(f"Error output from task process {job_key_call_id}: {stderr}")
327330
del self.task_processes[job_key_call_id]
328331
logger.debug(f"Task process {job_key_call_id} finished")
329332

@@ -434,9 +437,12 @@ def run_task(self, job_key, call_id):
434437
cmd += f'"python3 /tmp/{USER_TEMP_DIR}/localhost-runner.py '
435438
cmd += f'run_job {docker_task_filename}"'
436439

437-
process = sp.Popen(shlex.split(cmd), start_new_session=True)
440+
process = sp.Popen(shlex.split(cmd), stdout=sp.PIPE, stderr=sp.PIPE, start_new_session=True)
438441
self.task_processes[job_key_call_id] = process
439-
process.communicate() # blocks until the process finishes
442+
stdout, stderr = process.communicate() # blocks until the process finishes
443+
if process.returncode != 0:
444+
logger.error(f"Task process {job_key_call_id} failed with return code {process.returncode}")
445+
logger.error(f"Error output from task process {job_key_call_id}: {stderr}")
440446
del self.task_processes[job_key_call_id]
441447
logger.debug(f"Task process {job_key_call_id} finished")
442448

0 commit comments

Comments
 (0)