diff --git a/intel_owl/tasks.py b/intel_owl/tasks.py index 0f2b571bb1..4b59448995 100644 --- a/intel_owl/tasks.py +++ b/intel_owl/tasks.py @@ -248,6 +248,7 @@ def job_pipeline( job_id: int, ): from api_app.models import Job + from api_app.websocket import JobConsumer job = Job.objects.get(pk=job_id) try: @@ -262,6 +263,14 @@ def job_pipeline( ): report.status = report.STATUSES.FAILED.value report.save() + # mark job as failed and notify; job_set_final_status won't run since the pipeline never started + job.status = Job.STATUSES.FAILED.value + job.errors.append(str(e)) + job.finished_analysis_time = now() + job.save(update_fields=["status", "errors", "finished_analysis_time"]) + if root_investigation := job.get_root().investigation: + root_investigation.set_correct_status(save=True) + JobConsumer.serialize_and_send_job(job) @shared_task(base=FailureLoggedTask, name="run_plugin", soft_time_limit=500) diff --git a/tests/test_crons.py b/tests/test_crons.py index 0e3c90f77c..6cfadee8c2 100644 --- a/tests/test_crons.py +++ b/tests/test_crons.py @@ -19,7 +19,7 @@ ) from api_app.choices import Classification from api_app.models import Job -from intel_owl.tasks import check_stuck_analysis, remove_old_jobs +from intel_owl.tasks import check_stuck_analysis, job_pipeline, remove_old_jobs from . import CustomTestCase, get_logger from .mock_utils import MockUpResponse, if_mock_connections, patch, skip @@ -58,6 +58,44 @@ def test_check_stuck_analysis(self): _job.delete() an.delete() + def test_job_pipeline_exception_sets_job_to_failed(self): + """ + Regression test for GitHub issue #3653. + When job.execute() raises an exception inside job_pipeline, + the Job object must be marked as FAILED (not left stuck in RUNNING), + finished_analysis_time must be set, and the error must be recorded. + """ + an = Analyzable.objects.create( + name="8.8.8.8", + classification=Classification.IP, + ) + _job = Job.objects.create( + user=self.user, + status=Job.STATUSES.PENDING.value, + analyzable=an, + ) + error_message = "Simulated broker failure" + with ( + patch.object( + _job.__class__, + "execute", + side_effect=Exception(error_message), + ), + patch("api_app.websocket.JobConsumer.serialize_and_send_job"), + patch( + "api_app.models.Job.objects.get", + return_value=_job, + ), + ): + job_pipeline(_job.pk) + + _job.refresh_from_db() + self.assertEqual(_job.status, Job.STATUSES.FAILED.value) + self.assertIsNotNone(_job.finished_analysis_time) + self.assertIn(error_message, _job.errors) + _job.delete() + an.delete() + def test_remove_old_jobs(self): import datetime