Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions intel_owl/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def job_pipeline(
job_id: int,
):
from api_app.models import Job
from api_app.websocket import JobConsumer

job = Job.objects.get(pk=job_id)
try:
Expand All @@ -262,6 +263,14 @@ def job_pipeline(
):
report.status = report.STATUSES.FAILED.value
report.save()
# mark job as failed and notify; job_set_final_status won't run since the pipeline never started
job.status = Job.STATUSES.FAILED.value
job.errors.append(str(e))
job.finished_analysis_time = now()
job.save(update_fields=["status", "errors", "finished_analysis_time"])
if root_investigation := job.get_root().investigation:
root_investigation.set_correct_status(save=True)
JobConsumer.serialize_and_send_job(job)


@shared_task(base=FailureLoggedTask, name="run_plugin", soft_time_limit=500)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Wrap cleanup in its own try/except

If any of these cleanup lines raise (e.g., job.get_root() hits a treebeard MultipleObjectsReturned edge case, or the WebSocket layer has a config issue), the original exception e gets swallowed and replaced with the cleanup exception. The original error would be lost.

Consider:

except Exception as e:
    logger.exception(e)
    # ... existing report cleanup ...
    try:
        job.status = Job.STATUSES.FAILED.value
        job.errors.append(str(e))
        job.finished_analysis_time = now()
        job.save(update_fields=["status", "errors", "finished_analysis_time"])
        if root_investigation := job.get_root().investigation:
            root_investigation.set_correct_status(save=True)
        JobConsumer.serialize_and_send_job(job)
    except Exception as cleanup_err:
        logger.exception(
            f"Failed to clean up job {job_id} after pipeline exception: {cleanup_err}"
        )

This ensures the original error is always logged regardless of cleanup failures.

Expand Down
40 changes: 39 additions & 1 deletion tests/test_crons.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)
from api_app.choices import Classification
from api_app.models import Job
from intel_owl.tasks import check_stuck_analysis, remove_old_jobs
from intel_owl.tasks import check_stuck_analysis, job_pipeline, remove_old_jobs

from . import CustomTestCase, get_logger
from .mock_utils import MockUpResponse, if_mock_connections, patch, skip
Expand Down Expand Up @@ -58,6 +58,44 @@ def test_check_stuck_analysis(self):
_job.delete()
an.delete()

def test_job_pipeline_exception_sets_job_to_failed(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Add a second test case for the Investigation status update path

The root_investigation.set_correct_status(save=True) branch in the fix is currently untested because this job has no parent investigation. Consider adding a subTest (or separate test) that creates a job attached to an Investigation and asserts the investigation status transitions correctly after the pipeline failure. That way both branches of the if root_investigation conditional are covered.

"""
Regression test for GitHub issue #3653.
When job.execute() raises an exception inside job_pipeline,
the Job object must be marked as FAILED (not left stuck in RUNNING),
finished_analysis_time must be set, and the error must be recorded.
"""
an = Analyzable.objects.create(
name="8.8.8.8",
classification=Classification.IP,
)
_job = Job.objects.create(
user=self.user,
status=Job.STATUSES.PENDING.value,
analyzable=an,
)
error_message = "Simulated broker failure"
with (
patch.object(
_job.__class__,
"execute",
side_effect=Exception(error_message),
),
patch("api_app.websocket.JobConsumer.serialize_and_send_job"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing assertion: WebSocket notification was actually sent

You're patching JobConsumer.serialize_and_send_job here, but never asserting it was called. Since the whole point of this fix (from the user's perspective) is that the frontend stops showing an infinite spinner, this is worth verifying:

with (
    patch.object(
        _job.__class__,
        "execute",
        side_effect=Exception(error_message),
    ),
    patch("api_app.websocket.JobConsumer.serialize_and_send_job") as mock_ws,
):
    job_pipeline(_job.pk)

mock_ws.assert_called_once()

patch(
"api_app.models.Job.objects.get",
return_value=_job,
),
):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary mock — this can be removed

The Job.objects.get mock isn't needed here since _job already exists in the test DB, so the real Job.objects.get(pk=_job.pk) will work fine. Removing it makes the test more realistic and would catch regressions where someone accidentally switches from .save() to .filter().update() in the fix.

job_pipeline(_job.pk)

_job.refresh_from_db()
self.assertEqual(_job.status, Job.STATUSES.FAILED.value)
self.assertIsNotNone(_job.finished_analysis_time)
self.assertIn(error_message, _job.errors)
_job.delete()
an.delete()

def test_remove_old_jobs(self):
import datetime

Expand Down
Loading