-
-
Notifications
You must be signed in to change notification settings - Fork 637
Expand file tree
/
Copy pathtasks.py
More file actions
499 lines (422 loc) · 19.1 KB
/
tasks.py
File metadata and controls
499 lines (422 loc) · 19.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.
from __future__ import absolute_import, unicode_literals
import datetime
import json
import logging
import typing
import uuid
from typing import Dict, List
import inflection
from celery import Task, shared_task, signals
from celery.worker.consumer import Consumer
from celery.worker.control import control_command
from celery.worker.request import Request
from django.conf import settings
from django.utils.timezone import now
from django_celery_beat.models import PeriodicTask
from elasticsearch.helpers import bulk
from api_app.choices import ReportStatus, Status
from api_app.helpers import mask_recursive
from intel_owl import secrets
from intel_owl.celery import app, get_queue_name
from intel_owl.settings._util import get_environment
logger = logging.getLogger(__name__)
class FailureLoggedRequest(Request):
def on_timeout(self, soft, timeout):
result = super().on_timeout(soft, timeout)
if not soft:
logger.warning(f"A hard timeout was enforced for task {self.task.name}")
return result
def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
logger.critical(
f"Failure detected for task {self.task.name}"
f" with exception {exc_info} and request {mask_recursive(self._request_dict)}"
)
return super().on_failure(exc_info, send_failed_event=send_failed_event, return_ok=return_ok)
class FailureLoggedTask(Task):
Request = FailureLoggedRequest
@control_command(
args=[("python_module_pk", int)],
)
def update_plugin(state, python_module_pk: int):
from api_app.models import PythonModule
pm: PythonModule = PythonModule.objects.get(pk=python_module_pk)
pm.python_class.update()
@shared_task(base=FailureLoggedTask, soft_time_limit=300)
def execute_ingestor(config_name: str):
from api_app.ingestors_manager.classes import Ingestor
from api_app.ingestors_manager.models import IngestorConfig
config: IngestorConfig = IngestorConfig.objects.get(name=config_name)
if config.disabled:
logger.info(f"Not executing ingestor {config.name} because disabled")
else:
class_: typing.Type[Ingestor] = config.python_module.python_class
obj: Ingestor = class_(config=config)
obj.start({}, None, None) # runtime_configuration, job_id, task_id
logger.info(f"Executing ingestor {config.name}")
@shared_task(base=FailureLoggedTask, soft_time_limit=10000)
def remove_old_jobs():
"""
this is to remove old jobs to avoid to fill the database.
Retention can be modified.
"""
from api_app.models import Job
logger.info("started remove_old_jobs")
retention_days = int(secrets.get_secret("OLD_JOBS_RETENTION_DAYS", 14))
date_to_check = now() - datetime.timedelta(days=retention_days)
old_jobs = Job.objects.filter(finished_analysis_time__lt=date_to_check)
num_jobs_to_delete = old_jobs.count()
logger.info(f"found {num_jobs_to_delete} old jobs to delete")
for old_job in old_jobs.iterator():
analyzable = old_job.analyzable
# if the job that we are going to delete is the last one, and it has a file
if analyzable.jobs.count() == 1 and analyzable.file:
analyzable.file.delete()
try:
old_job.delete()
except Job.DoesNotExist as e:
logger.warning(f"job {old_job.id} does not exist. Err: {e}", stack_info=True)
# clean up orphaned analyzable if no jobs reference it anymore
if not analyzable.jobs.exists():
analyzable.delete()
logger.info("finished remove_old_jobs")
return num_jobs_to_delete
@shared_task(base=FailureLoggedTask)
def refresh_cache(python_class_str: str):
from django.utils.module_loading import import_string
logger.info(f"Refreshing cache for {python_class_str}")
python_class = import_string(python_class_str)
python_class.delete_class_cache_keys()
from api_app.models import PythonConfig
if issubclass(python_class, PythonConfig):
for config in python_class.objects.all():
config.refresh_cache_keys()
@shared_task(base=FailureLoggedTask, soft_time_limit=120)
def check_stuck_analysis(minutes_ago: int = 25, check_pending: bool = False):
"""
In case the analysis is stuck for whatever reason,
we should force the status "failed"
to avoid special exceptions,
we can just put this function as a cron to cleanup.
"""
from api_app.models import Job
def fail_job(job):
logger.error(
f"found stuck analysis, job_id:{job.id}.Setting the job to status {Job.STATUSES.FAILED.value}'"
)
job.status = Job.STATUSES.FAILED.value
job.finished_analysis_time = now()
job.save(update_fields=["status", "finished_analysis_time"])
logger.info("started check_stuck_analysis")
running_jobs = Job.objects.running(check_pending=check_pending, minutes_ago=minutes_ago)
logger.info(f"checking if {running_jobs.count()} jobs are stuck")
jobs_id_stuck = []
for running_job in running_jobs:
jobs_id_stuck.append(running_job.id)
if running_job.status == Job.STATUSES.RUNNING.value:
fail_job(running_job)
elif running_job.status == Job.STATUSES.PENDING.value:
# the job can be pending for 2 cycles of this function
if running_job.received_request_time < (
now() - datetime.timedelta(minutes=(minutes_ago * 2) + 1)
):
# if it's still pending, we are killing
fail_job(running_job)
# the job is pending for 1 cycle
elif running_job.received_request_time < (now() - datetime.timedelta(minutes=minutes_ago)):
logger.info(f"Running again job {running_job}")
# we are trying to execute again all pending
# (and technically, but it is not the case here) all failed reports
running_job.retry()
logger.info("finished check_stuck_analysis")
return jobs_id_stuck
@shared_task(base=FailureLoggedTask, soft_time_limit=150)
def update(python_module_pk: int):
from api_app.models import PythonModule
from intel_owl.celery import broadcast
python_module: PythonModule = PythonModule.objects.get(pk=python_module_pk)
if settings.NFS:
update_plugin(None, python_module_pk)
else:
queues = {config.queue for config in python_module.configs}
for queue in queues:
broadcast(
update_plugin,
queue=queue,
arguments={"python_module_pk": python_module_pk},
)
@shared_task(base=FailureLoggedTask, soft_time_limit=30)
def health_check(python_module_pk: int, plugin_config_pk: str):
from api_app.classes import Plugin
from api_app.models import PythonConfig, PythonModule
plugin_class: typing.Type[Plugin] = PythonModule.objects.get(pk=python_module_pk).python_class
config: PythonConfig = plugin_class.config_model.objects.get(pk=plugin_config_pk)
plugin = plugin_class(
config=config,
)
if not config.disabled:
try:
enabled = plugin.health_check(user=None)
except NotImplementedError:
logger.error(f"Unable to check healthcheck for {config.name}")
else:
config.health_check_status = enabled
config.save()
else:
logger.info(f"Skipping health_check for configuration {config.name} because disabled")
@shared_task(base=FailureLoggedTask, soft_time_limit=100)
def update_notifications_with_releases():
from django.core import management
management.call_command(
"changelog_notification",
".github/CHANGELOG.md",
"INTELOWL",
"--number-of-releases",
"1",
)
@app.task(name="job_set_final_status", soft_time_limit=30)
def job_set_final_status(job_id: int):
from api_app.models import Job
from api_app.websocket import JobConsumer
job = Job.objects.get(pk=job_id)
# execute some callbacks
job.set_final_status()
JobConsumer.serialize_and_send_job(job)
@shared_task(base=FailureLoggedTask, name="job_set_pipeline_status", soft_time_limit=30)
def job_set_pipeline_status(job_id: int, status: str):
from api_app.models import Job
job = Job.objects.get(pk=job_id)
if status not in Status.running_statuses() + Status.partial_statuses():
logger.error(f"Unable to set job status to {status}")
else:
job.status = status
job.save(update_fields=["status"])
@shared_task(base=FailureLoggedTask, name="job_pipeline", soft_time_limit=100)
def job_pipeline(
job_id: int,
):
from api_app.models import Job
from api_app.websocket import JobConsumer
job = Job.objects.get(pk=job_id)
try:
job.execute()
except Exception as e:
logger.exception(e)
for report in (
list(job.analyzerreports.all())
+ list(job.connectorreports.all())
+ list(job.pivotreports.all())
+ list(job.visualizerreports.all())
):
report.status = report.STATUSES.FAILED.value
report.save()
# mark job as failed and notify; job_set_final_status won't run since the pipeline never started
job.status = Job.STATUSES.FAILED.value
job.errors.append(str(e))
job.finished_analysis_time = now()
job.save(update_fields=["status", "errors", "finished_analysis_time"])
if root_investigation := job.get_root().investigation:
root_investigation.set_correct_status(save=True)
JobConsumer.serialize_and_send_job(job)
@shared_task(base=FailureLoggedTask, name="run_plugin", soft_time_limit=500)
def run_plugin(
job_id: int,
python_module_pk: int,
plugin_config_pk: str,
runtime_configuration: dict,
task_id: int,
):
from api_app.classes import Plugin
from api_app.models import Job, PythonModule
from api_app.websocket import JobConsumer
logger.info(f"Configuring plugin {plugin_config_pk} for job {job_id} with task {task_id}")
plugin_class: typing.Type[Plugin] = PythonModule.objects.get(pk=python_module_pk).python_class
config = plugin_class.config_model.objects.get(pk=plugin_config_pk)
plugin = plugin_class(
config=config,
)
logger.info(f"Starting plugin {plugin_config_pk} for job {job_id} with task {task_id}")
try:
plugin.start(
job_id=job_id,
runtime_configuration=runtime_configuration,
task_id=task_id,
)
except Exception as e:
logger.exception(e)
config.reports.filter(job__pk=job_id).update(status=plugin.report_model.STATUSES.FAILED.value)
job = Job.objects.get(pk=job_id)
JobConsumer.serialize_and_send_job(job)
@shared_task(base=FailureLoggedTask, name="create_caches", soft_time_limit=200)
def create_caches(user_pk: int):
# we create the cache hit
from certego_saas.apps.user.models import User
user = User.objects.get(pk=user_pk)
from api_app.analyzers_manager.models import AnalyzerConfig
from api_app.analyzers_manager.serializers import AnalyzerConfigSerializer
from api_app.connectors_manager.models import ConnectorConfig
from api_app.connectors_manager.serializers import ConnectorConfigSerializer
from api_app.ingestors_manager.models import IngestorConfig
from api_app.ingestors_manager.serializers import IngestorConfigSerializer
from api_app.pivots_manager.models import PivotConfig
from api_app.pivots_manager.serializers import PivotConfigSerializer
from api_app.serializers.plugin import PythonConfigListSerializer
from api_app.visualizers_manager.models import VisualizerConfig
from api_app.visualizers_manager.serializers import VisualizerConfigSerializer
for python_config_class, serializer_class in [
(AnalyzerConfig, AnalyzerConfigSerializer),
(ConnectorConfig, ConnectorConfigSerializer),
(PivotConfig, PivotConfigSerializer),
(VisualizerConfig, VisualizerConfigSerializer),
(IngestorConfig, IngestorConfigSerializer),
]:
for plugin in python_config_class.objects.all():
PythonConfigListSerializer(child=serializer_class()).to_representation_single_plugin(plugin, user)
@signals.beat_init.connect
def beat_init_connect(*args, sender: Consumer = None, **kwargs):
from certego_saas.models import User
logger.info("Starting beat_init signal")
# update of plugins that needs it
for task in PeriodicTask.objects.filter(enabled=True, task=f"{update.__module__}.{update.__name__}"):
python_module_pk = json.loads(task.kwargs)["python_module_pk"]
logger.info(f"Updating {python_module_pk}")
update.apply_async(
queue=get_queue_name(settings.DEFAULT_QUEUE),
MessageGroupId=str(uuid.uuid4()),
args=[python_module_pk],
)
# creating cache excluding system users
for user in User.objects.exclude(email=""):
logger.info(f"Creating cache for user {user.username}")
create_caches.apply_async(
queue=get_queue_name(settings.DEFAULT_QUEUE),
MessageGroupId=str(uuid.uuid4()),
args=[user.pk],
)
@shared_task(base=FailureLoggedTask, name="send_bi_to_elastic", soft_time_limit=300)
def send_bi_to_elastic(max_timeout: int = 60, max_objects: int = 10000):
from api_app.analyzers_manager.models import AnalyzerReport
from api_app.connectors_manager.models import ConnectorReport
from api_app.ingestors_manager.models import IngestorReport
from api_app.models import AbstractReport, Job
from api_app.pivots_manager.models import PivotReport
from api_app.visualizers_manager.models import VisualizerReport
if settings.ELASTICSEARCH_BI_ENABLED:
for report_class in [
AnalyzerReport,
ConnectorReport,
PivotReport,
IngestorReport,
VisualizerReport,
]:
report_class: typing.Type[AbstractReport]
report_class.objects.filter(sent_to_bi=False).filter_completed().defer("report").order_by(
"-start_time"
)[:max_objects].send_to_elastic_as_bi(max_timeout=max_timeout)
Job.objects.filter(sent_to_bi=False).filter_completed().order_by("-received_request_time")[
:max_objects
].send_to_elastic_as_bi(max_timeout=max_timeout)
@shared_task(base=FailureLoggedTask, name="send_plugin_report_to_elastic", soft_time_limit=300)
def send_plugin_report_to_elastic(max_timeout: int = 60, max_objects: int = 10000):
from api_app.analyzers_manager.models import AnalyzerReport
from api_app.connectors_manager.models import ConnectorReport
from api_app.models import AbstractReport
from api_app.pivots_manager.models import PivotReport
if settings.ELASTICSEARCH_DSL_ENABLED and settings.ELASTICSEARCH_DSL_HOST:
upper_threshold = now().replace(second=0, microsecond=0)
lower_threshold = upper_threshold - datetime.timedelta(minutes=1)
logger.info(f"add to elastic reports from: {lower_threshold} to {upper_threshold}")
def _convert_report_to_elastic_document(
_class: AbstractReport,
start_time: datetime.datetime,
end_time: datetime.datetime,
) -> List[Dict]:
report_list: list(AbstractReport) = _class.objects.filter(
status__in=ReportStatus.final_statuses(),
end_time__gte=start_time,
end_time__lt=end_time,
)
report_document_list = [
{
"_op_type": "index",
"_index": (
"plugin-report-"
f"{get_environment()}-"
f"{inflection.underscore(_class.__name__).replace('_', '-')}-"
f"{now().date()}"
),
"_source": {
"user": {"username": report.user.username},
"membership": (
{
"is_owner": report.user.membership.is_owner,
"is_admin": report.user.membership.is_admin,
"organization": {
"name": report.user.membership.organization.name,
},
}
if report.user.has_membership()
else {}
),
"config": {
"name": report.config.name,
"plugin_name": report.config.plugin_name.lower(),
},
"job": {"id": report.job.id},
"start_time": report.start_time,
"end_time": report.end_time,
"status": report.status,
"report": report.report,
"errors": report.errors,
},
}
for report in report_list
]
logger.info(f"{_class.__name__} has {len(report_document_list)} new documents to upload")
return report_document_list
# Add document. Remove ingestors and visualizers because they contain data useless in term of search functionality:
# ingestors contain samples and visualizers data about organizing the info inside the page.
all_report_document_list = (
_convert_report_to_elastic_document(AnalyzerReport, lower_threshold, upper_threshold)
+ _convert_report_to_elastic_document(ConnectorReport, lower_threshold, upper_threshold)
+ _convert_report_to_elastic_document(PivotReport, lower_threshold, upper_threshold)
)
logger.info(f"Documents to add to elastic: {len(all_report_document_list)}")
if all_report_document_list:
logger.info(
", ".join(
[
f"{document['_source']['job']['id']}-{document['_source']['config']['name']}"
for document in all_report_document_list
]
)
)
_, errors = bulk( # noqa
settings.ELASTICSEARCH_DSL_CLIENT,
all_report_document_list,
raise_on_error=False,
stats_only=False,
)
if not errors:
logger.info("Documents correctly inserted!")
else:
logger.error(f"Errors on document indexing: {errors}")
else:
logger.info("No documents to add")
@shared_task(
base=FailureLoggedTask,
name="enable_configuration_for_org_for_rate_limit",
soft_time_limit=30,
)
def enable_configuration_for_org_for_rate_limit(org_configuration_pk: int):
from api_app.models import OrganizationPluginConfiguration
opc: OrganizationPluginConfiguration = OrganizationPluginConfiguration.objects.get(
pk=org_configuration_pk
)
opc.enable()
# set logger
@signals.setup_logging.connect
def config_loggers(*args, **kwags):
from logging.config import dictConfig
dictConfig(settings.LOGGING)