|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
| 3 | +import ast |
3 | 4 | from pathlib import Path |
4 | 5 | from typing import Any |
5 | 6 | from unittest import mock |
6 | 7 |
|
7 | 8 | import pytest |
8 | 9 | import yaml |
9 | 10 |
|
| 11 | +import theHarvester.lib.core as core_module |
10 | 12 | from theHarvester.lib.core import CONFIG_DIRS, DATA_DIR, Core |
11 | 13 |
|
12 | 14 |
|
@@ -73,3 +75,64 @@ def test_read_config_copies_default_to_home(name: str, capsys): |
73 | 75 | assert got == expected |
74 | 76 | assert f"Created default {file.name} at {file}" in capsys.readouterr().out |
75 | 77 | assert file.exists() |
| 78 | + |
| 79 | + |
| 80 | +def _extract_required_apikey_entries_from_core() -> dict[str, set[str]]: |
| 81 | + tree = ast.parse(Path(core_module.__file__).read_text(encoding="utf-8")) |
| 82 | + core_class = next((n for n in tree.body if isinstance(n, ast.ClassDef) and n.name == "Core"), None) |
| 83 | + assert core_class is not None, "Unable to locate `class Core` in theHarvester.lib.core" |
| 84 | + |
| 85 | + required: dict[str, set[str]] = {} |
| 86 | + for node in ast.walk(core_class): |
| 87 | + if not isinstance(node, ast.Subscript): |
| 88 | + continue |
| 89 | + |
| 90 | + parts: list[str] = [] |
| 91 | + current: ast.AST = node |
| 92 | + while isinstance(current, ast.Subscript): |
| 93 | + sl = current.slice |
| 94 | + if isinstance(sl, ast.Constant) and isinstance(sl.value, str): |
| 95 | + parts.append(sl.value) |
| 96 | + current = current.value |
| 97 | + continue |
| 98 | + break |
| 99 | + |
| 100 | + if not parts or not isinstance(current, ast.Call): |
| 101 | + continue |
| 102 | + |
| 103 | + func = current.func |
| 104 | + is_core_api_keys = ( |
| 105 | + isinstance(func, ast.Attribute) |
| 106 | + and func.attr == "api_keys" |
| 107 | + and isinstance(func.value, ast.Name) |
| 108 | + and func.value.id == "Core" |
| 109 | + ) |
| 110 | + if not is_core_api_keys: |
| 111 | + continue |
| 112 | + |
| 113 | + parts.reverse() |
| 114 | + provider = parts[0] |
| 115 | + required.setdefault(provider, set()) |
| 116 | + if len(parts) > 1: |
| 117 | + required[provider].add(parts[1]) |
| 118 | + |
| 119 | + return required |
| 120 | + |
| 121 | + |
| 122 | +def test_api_keys_yaml_is_in_sync_with_core_accessors(): |
| 123 | + required = _extract_required_apikey_entries_from_core() |
| 124 | + assert required, "No API-key references were detected in `Core`" |
| 125 | + |
| 126 | + config = yaml.safe_load((DATA_DIR / "api-keys.yaml").read_text(encoding="utf-8")) |
| 127 | + apikeys = config["apikeys"] |
| 128 | + |
| 129 | + missing_providers = sorted(set(required) - set(apikeys)) |
| 130 | + assert not missing_providers, f"Missing providers in api-keys.yaml: {missing_providers}" |
| 131 | + |
| 132 | + missing_fields: dict[str, list[str]] = {} |
| 133 | + for provider, fields in required.items(): |
| 134 | + for field in sorted(fields): |
| 135 | + if field not in apikeys[provider]: |
| 136 | + missing_fields.setdefault(provider, []).append(field) |
| 137 | + |
| 138 | + assert not missing_fields, f"Missing fields in api-keys.yaml: {missing_fields}" |
0 commit comments