diff --git a/cortexutils/analyzer.py b/cortexutils/analyzer.py index bad3669..265292d 100644 --- a/cortexutils/analyzer.py +++ b/cortexutils/analyzer.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# encoding: utf-8 +# -*- coding: utf-8 -*- import os import tempfile @@ -11,7 +11,7 @@ class Analyzer(Worker): def __init__(self, job_directory=None, secret_phrases=None): - Worker.__init__(self, job_directory, secret_phrases) + super().__init__(job_directory, secret_phrases) # Not breaking compatibility self.artifact = self._input @@ -31,15 +31,15 @@ def get_data(self): return self.get_param("data", None, "Missing data field") def get_param(self, name, default=None, message=None): - data = super(Analyzer, self).get_param(name, default, message) + data = super().get_param(name, default, message) if ( name == "file" and self.data_type == "file" and self.job_directory is not None ): - path = "%s/input/%s" % (self.job_directory, data) - if os.path.isfile(path): - return path + input_path = os.path.join(self.job_directory, "input", data) + if os.path.isfile(input_path): + return input_path else: return data @@ -117,7 +117,7 @@ def report(self, full_report, ensure_ascii=False): operation_list = self.operations(full_report) except Exception: pass # nosec B110 - super(Analyzer, self).report( + super().report( { "success": True, "summary": summary, diff --git a/cortexutils/extractor.py b/cortexutils/extractor.py index 73440e6..4873ecf 100644 --- a/cortexutils/extractor.py +++ b/cortexutils/extractor.py @@ -1,6 +1,7 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- + import re -from builtins import str as unicode class ExtractionError(Exception): @@ -67,7 +68,7 @@ def __init_regex(): + "(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])" + ")" ) - regex.append({"type": "ip", "regex": re.compile(r"{}".format(r))}) + regex.append({"type": "ip", "regex": re.compile(r)}) # URL regex.append({"type": "url", "regex": re.compile(r"^(http://|https://)")}) @@ -95,7 +96,7 @@ def __init_regex(): { "type": "user-agent", "regex": re.compile( - r"^(Mozilla/[45]\.0 |AppleWebKit/[0-9]{3}\.[0-9]{2} |Chrome/[0-9]{2}\.[0-9]\." # noqa + r"^(Mozilla/[45]\.0 |AppleWebKit/[0-9]{3}\.[0-9]{2} |Chrome/[0-9]{2}\.[0-9]\." # noqa: E501 r"[0-9]{4}\.[0-9]{3} |Safari/[0-9]{3}\.[0-9]{2} ).*?$" ), } @@ -115,7 +116,7 @@ def __init_regex(): "type": "registry", "regex": re.compile( r"^(HKEY|HKLM|HKCU|HKCR|HKCC)" - r"(_LOCAL_MACHINE|_CURRENT_USER|_CURRENT_CONFIG|_CLASSES_ROOT|)[\\a-zA-Z0-9]+$" # noqa + r"(_LOCAL_MACHINE|_CURRENT_USER|_CURRENT_CONFIG|_CLASSES_ROOT|)[\\a-zA-Z0-9]+$" # noqa: E501 ), } ) @@ -149,7 +150,7 @@ def __checktype(self, value): if self.ignore == value: return "" - if isinstance(value, (str, unicode)): + if isinstance(value, str): for r in self.regex: if r.get("regex").match(value): return r.get("type") @@ -179,7 +180,7 @@ def check_iterable(self, iterable): """ results = [] # Only the string left - if isinstance(iterable, (str, unicode)): + if isinstance(iterable, str): dt = self.__checktype(iterable) if len(dt) > 0: results.append({"dataType": dt, "data": iterable}) diff --git a/cortexutils/responder.py b/cortexutils/responder.py index 69ebdf0..55e119a 100644 --- a/cortexutils/responder.py +++ b/cortexutils/responder.py @@ -1,12 +1,12 @@ #!/usr/bin/env python -# encoding: utf-8 +# -*- coding: utf-8 -*- from cortexutils.worker import Worker class Responder(Worker): def __init__(self, job_directory=None, secret_phrases=None): - Worker.__init__(self, job_directory, secret_phrases) + super().__init__(job_directory, secret_phrases) # Not breaking compatibility self.artifact = self._input @@ -28,7 +28,7 @@ def report(self, full_report, ensure_ascii=False): operation_list = self.operations(full_report) except Exception: pass # nosec B110 - super(Responder, self).report( + super().report( {"success": True, "full": full_report, "operations": operation_list}, ensure_ascii, ) diff --git a/cortexutils/worker.py b/cortexutils/worker.py index b4001e3..acfa0ea 100644 --- a/cortexutils/worker.py +++ b/cortexutils/worker.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# encoding: utf-8 +# -*- coding: utf-8 -*- import codecs import json @@ -9,7 +9,7 @@ DEFAULT_SECRET_PHRASES = ("key", "password", "secret") -class Worker(object): +class Worker: READ_TIMEOUT = 3 # seconds def __init__(self, job_directory, secret_phrases): @@ -25,8 +25,9 @@ def __init__(self, job_directory, secret_phrases): self.secret_phrases = secret_phrases # Load input self._input = {} - if os.path.isfile("%s/input/input.json" % self.job_directory): - with open("%s/input/input.json" % self.job_directory) as f_input: + input_path = os.path.join(self.job_directory, "input", "input.json") + if os.path.isfile(input_path): + with open(input_path) as f_input: self._input = json.load(f_input) else: # If input file doesn't exist, @@ -72,15 +73,9 @@ def __set_proxies(self): def __set_encoding(): try: if sys.stdout.encoding != "UTF-8": - if sys.version_info[0] == 3: - sys.stdout = codecs.getwriter("utf-8")(sys.stdout.buffer, "strict") - else: - sys.stdout = codecs.getwriter("utf-8")(sys.stdout, "strict") + sys.stdout = codecs.getwriter("utf-8")(sys.stdout.buffer, "strict") if sys.stderr.encoding != "UTF-8": - if sys.version_info[0] == 3: - sys.stderr = codecs.getwriter("utf-8")(sys.stderr.buffer, "strict") - else: - sys.stderr = codecs.getwriter("utf-8")(sys.stderr, "strict") + sys.stderr = codecs.getwriter("utf-8")(sys.stderr.buffer, "strict") except Exception: pass # nosec B110 @@ -123,13 +118,10 @@ def __write_output(self, data, ensure_ascii=False): if self.job_directory is None: json.dump(data, sys.stdout, ensure_ascii=ensure_ascii) else: - try: - os.makedirs("%s/output" % self.job_directory) - except Exception: - pass # nosec B110 - with open( - "%s/output/output.json" % self.job_directory, mode="w" - ) as f_output: + output_dir = os.path.join(self.job_directory, "output") + os.makedirs(output_dir, exist_ok=True) + output_path = os.path.join(output_dir, "output.json") + with open(output_path, mode="w") as f_output: json.dump(data, f_output, ensure_ascii=ensure_ascii) def get_data(self): diff --git a/tests/test_suite_analyzer.py b/tests/test_suite_analyzer.py index 826a3af..06e9f9f 100644 --- a/tests/test_suite_analyzer.py +++ b/tests/test_suite_analyzer.py @@ -1,33 +1,29 @@ #!/usr/bin/env python -# coding: utf-8 +# -*- coding: utf-8 -*- import os import sys import json import unittest -from io import open from cortexutils.analyzer import Analyzer -# Different lib when using python3 or 2 -if sys.version_info >= (3, 0): - from io import StringIO -else: - from StringIO import StringIO +from io import StringIO def load_test_fixture(fixture_path): - path = os.path.dirname(os.path.abspath(__file__)) - fixture_file = open(path + "/" + fixture_path) - input = fixture_file.read() - fixture_file.close() + tests_dir = os.path.dirname(os.path.abspath(__file__)) + file_path = os.path.join(tests_dir, fixture_path) + with open(file_path) as fixture_file: + input = fixture_file.read() sys.stdin = StringIO(input) sys.stdout = StringIO() class TestMinimalConfig(unittest.TestCase): def setUp(self): - load_test_fixture("fixtures/test-minimal-config.json") + fixture_path = os.path.join("fixtures", "test-minimal-config.json") + load_test_fixture(fixture_path) self.analyzer = Analyzer() def test_default_config(self): @@ -49,7 +45,8 @@ def test_params_data(self): class TestProxyConfig(unittest.TestCase): def setUp(self): - load_test_fixture("fixtures/test-proxy-config.json") + fixture_path = os.path.join("fixtures", "test-proxy-config.json") + load_test_fixture(fixture_path) self.analyzer = Analyzer() def test_proxy_config(self): @@ -64,7 +61,8 @@ def test_proxy_config(self): class TestTlpConfig(unittest.TestCase): def setUp(self): - load_test_fixture("fixtures/test-tlp-config.json") + fixture_path = os.path.join("fixtures", "test-tlp-config.json") + load_test_fixture(fixture_path) self.analyzer = Analyzer() def test_check_tlp_disabled(self): @@ -95,7 +93,8 @@ def test_check_tlp_ok(self): class TestErrorResponse(unittest.TestCase): def setUp(self): - load_test_fixture("fixtures/test-error-response.json") + fixture_path = os.path.join("fixtures", "test-error-response.json") + load_test_fixture(fixture_path) self.analyzer = Analyzer() def test_error_response(self): @@ -130,7 +129,8 @@ def test_error_response(self): class TestReportResponse(unittest.TestCase): def setUp(self): - load_test_fixture("fixtures/test-report-response.json") + fixture_path = os.path.join("fixtures", "test-report-response.json") + load_test_fixture(fixture_path) self.analyzer = Analyzer() def test_report_response(self): diff --git a/tests/test_suite_extractor.py b/tests/test_suite_extractor.py index e85eaba..2437654 100644 --- a/tests/test_suite_extractor.py +++ b/tests/test_suite_extractor.py @@ -1,4 +1,6 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- + """ This contains the unit tests for the extractor. """ @@ -21,13 +23,6 @@ def test_single_fqdn(self): "FQDN single string: wrong data type.", ) - def test_single_fqdn_as_unicode(self): - self.assertEqual( - self.extractor.check_string(value="www.google.de"), - "fqdn", - "FQDN single string: wrong data type.", - ) - def test_single_domain(self): self.assertEqual( self.extractor.check_string(value="google.de"), diff --git a/tests/test_suite_integration.py b/tests/test_suite_integration.py index 585af2b..034389f 100644 --- a/tests/test_suite_integration.py +++ b/tests/test_suite_integration.py @@ -1,16 +1,13 @@ #!/usr/bin/env python -# coding: utf-8 +# -*- coding: utf-8 -*- + import json import unittest import sys from cortexutils.analyzer import Analyzer -# Different lib when using python3 or 2 -if sys.version_info >= (3, 0): - from io import StringIO -else: - from StringIO import StringIO +from io import StringIO class AnalyzerExtractorOutputTest(unittest.TestCase):