-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathaws_lambda.py
More file actions
84 lines (68 loc) · 2.37 KB
/
aws_lambda.py
File metadata and controls
84 lines (68 loc) · 2.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from json import dumps
from os import getenv
from random import randint
from time import sleep
from typing import Any
from boto3 import client
from botocore import config as botocore_config
from botocore.exceptions import ConnectionClosedError
from requests import Response
cfg = botocore_config.Config(
retries={"max_attempts": 0},
read_timeout=1000,
connect_timeout=900,
)
LAMBDA_CLIENT_FUNCTIONS = client("lambda", config=cfg)
def invoke_dos_db_handler_lambda(lambda_payload: dict) -> Any:
"""Invoke dos db handler lambda.
Args:
lambda_payload (dict): Lambda payload.
Returns:
Any: Lambda response.
"""
response_status = False
response = None
retries = 0
while not response_status:
try:
response: Any = LAMBDA_CLIENT_FUNCTIONS.invoke(
FunctionName=getenv("DOS_DB_HANDLER_LAMBDA"),
InvocationType="RequestResponse",
Payload=dumps(lambda_payload),
)
response_payload = response["Payload"].read().decode("utf-8")
if "errorMessage" not in response_payload:
return response_payload
if retries > 6:
msg = f"Unable to run dos db handler lambda successfully after {retries} retries, {response_payload}"
raise ValueError(msg) # noqa: TRY301, RUF100
except ConnectionClosedError:
sleep(60)
retries += 1
sleep(randint(5, 15))
return None
def invoke_quality_checker_lambda() -> Response:
"""Invoke quality checker lambda.
Returns:
Response: Lambda response.
"""
return LAMBDA_CLIENT_FUNCTIONS.invoke(
FunctionName=getenv("QUALITY_CHECKER_LAMBDA"),
InvocationType="RequestResponse",
Payload="{}",
)
def re_process_payload(odscode: str, seq_number: str) -> str:
"""Reprocesses a payload from the event replay lambda.
Args:
odscode (str): Odscode to send to lambda
seq_number (str): Sequence number to send to lambda
Returns:
str: Response from lambda
"""
lambda_payload = {"odscode": odscode, "sequence_number": seq_number}
response = LAMBDA_CLIENT_FUNCTIONS.invoke(
FunctionName=getenv("EVENT_REPLAY_LAMBDA"),
InvocationType="RequestResponse",
Payload=dumps(lambda_payload),
)
return response["Payload"].read().decode("utf-8")