Skip to content

Commit d18986b

Browse files
committed
Merge branch 'master' into VED-000-switch-to-ruff
2 parents badf0ce + d9e1f60 commit d18986b

27 files changed

Lines changed: 470 additions & 309 deletions

.github/workflows/quality-checks.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ jobs:
175175
working-directory: lambdas/ack_backend
176176
id: acklambda
177177
env:
178-
PYTHONPATH: ${{ env.LAMBDA_PATH }}/ack_backend/src:${{ github.workspace }}/ack_backend/tests
178+
PYTHONPATH: ${{ env.LAMBDA_PATH }}/ack_backend/src:tests:${{ env.SHARED_PATH }}/src
179179
continue-on-error: true
180180
run: |
181181
poetry install

lambdas/ack_backend/poetry.lock

Lines changed: 25 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lambdas/ack_backend/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ python = "~3.11"
1414
boto3 = "~1.40.45"
1515
mypy-boto3-dynamodb = "^1.40.44"
1616
freezegun = "^1.5.2"
17-
moto = "^4"
17+
moto = "^5.1.14"
1818
coverage = "^7.10.7"
1919

2020

lambdas/ack_backend/src/ack_processor.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44

55
from convert_message_to_ack_row import convert_message_to_ack_row
66
from logging_decorators import ack_lambda_handler_logging_decorator
7-
from update_ack_file import update_ack_file
7+
from update_ack_file import complete_batch_file_process, update_ack_file
8+
from utils_for_ack_lambda import is_ack_processing_complete
89

910

1011
@ack_lambda_handler_logging_decorator
11-
def lambda_handler(event, context):
12+
def lambda_handler(event, _):
1213
"""
1314
Ack lambda handler.
1415
For each record: each message in the array of messages is converted to an ack row,
@@ -23,6 +24,7 @@ def lambda_handler(event, context):
2324
message_id = None
2425

2526
ack_data_rows = []
27+
total_ack_rows_processed = 0
2628

2729
for i, record in enumerate(event["Records"]):
2830
try:
@@ -31,10 +33,8 @@ def lambda_handler(event, context):
3133
raise ValueError("Could not load incoming message body") from body_json_error
3234

3335
if i == 0:
34-
# IMPORTANT NOTE: An assumption is made here that the file_key and created_at_formatted_string are the same
35-
# for all messages in the event. The use of FIFO SQS queues ensures that this is the case, provided that
36-
# there is only one file processing at a time for each supplier queue (combination of supplier and vaccine
37-
# type).
36+
# The SQS FIFO MessageGroupId that this lambda consumes from is based on the source filename + created at
37+
# datetime. Therefore, can safely retrieve file metadata from the first record in the list
3838
file_key = incoming_message_body[0].get("file_key")
3939
message_id = (incoming_message_body[0].get("row_id", "")).split("^")[0]
4040
vaccine_type = incoming_message_body[0].get("vaccine_type")
@@ -44,14 +44,16 @@ def lambda_handler(event, context):
4444
for message in incoming_message_body:
4545
ack_data_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))
4646

47-
update_ack_file(
48-
file_key,
49-
message_id,
50-
supplier,
51-
vaccine_type,
52-
created_at_formatted_string,
53-
ack_data_rows,
54-
)
47+
update_ack_file(file_key, created_at_formatted_string, ack_data_rows)
48+
49+
# Get the row count of the final processed record
50+
# Format of the row id is {batch_message_id}^{row_number}
51+
total_ack_rows_processed = int(incoming_message_body[-1].get("row_id", "").split("^")[1])
52+
53+
if is_ack_processing_complete(message_id, total_ack_rows_processed):
54+
complete_batch_file_process(
55+
message_id, supplier, vaccine_type, created_at_formatted_string, file_key, total_ack_rows_processed
56+
)
5557

5658
return {
5759
"statusCode": 200,

lambdas/ack_backend/src/audit_table.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Add the filename to the audit table and check for duplicates."""
22

3+
from typing import Optional
4+
35
from common.clients import dynamodb_client, logger
46
from common.models.errors import UnhandledAuditTableError
57
from constants import AUDIT_TABLE_NAME, AuditTableKeys, FileStatus
@@ -28,3 +30,17 @@ def change_audit_table_status_to_processed(file_key: str, message_id: str) -> No
2830
except Exception as error: # pylint: disable = broad-exception-caught
2931
logger.error(error)
3032
raise UnhandledAuditTableError(error) from error
33+
34+
35+
def get_record_count_by_message_id(event_message_id: str) -> Optional[int]:
36+
"""Retrieves full audit entry by unique event message ID"""
37+
audit_record = dynamodb_client.get_item(
38+
TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": event_message_id}}
39+
)
40+
41+
record_count = audit_record.get("Item", {}).get(AuditTableKeys.RECORD_COUNT, {}).get("N")
42+
43+
if not record_count:
44+
return None
45+
46+
return int(record_count)

lambdas/ack_backend/src/constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44

55
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
66

7+
COMPLETED_ACK_DIR = "forwardedFile"
8+
TEMP_ACK_DIR = "TempAck"
9+
BATCH_FILE_PROCESSING_DIR = "processing"
10+
BATCH_FILE_ARCHIVE_DIR = "archive"
11+
712

813
def get_source_bucket_name() -> str:
914
"""Get the SOURCE_BUCKET_NAME environment from environment variables."""
@@ -30,6 +35,7 @@ class AuditTableKeys:
3035
FILENAME = "filename"
3136
MESSAGE_ID = "message_id"
3237
QUEUE_NAME = "queue_name"
38+
RECORD_COUNT = "record_count"
3339
STATUS = "status"
3440
TIMESTAMP = "timestamp"
3541

lambdas/ack_backend/src/logging_decorators.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def wrapper(message, created_at_formatted_string):
6969
return wrapper
7070

7171

72-
def upload_ack_file_logging_decorator(func):
72+
def complete_batch_file_process_logging_decorator(func):
7373
"""This decorator logs when record processing is complete."""
7474

7575
@wraps(func)

lambdas/ack_backend/src/update_ack_file.py

Lines changed: 50 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
"""Functions for uploading the data to the ack file"""
22

33
from io import BytesIO, StringIO
4-
from typing import Optional
54

65
from audit_table import change_audit_table_status_to_processed
76
from botocore.exceptions import ClientError
87
from common.clients import get_s3_client, logger
9-
from constants import ACK_HEADERS, get_ack_bucket_name, get_source_bucket_name
10-
from logging_decorators import upload_ack_file_logging_decorator
11-
from utils_for_ack_lambda import get_row_count
8+
from constants import (
9+
ACK_HEADERS,
10+
BATCH_FILE_ARCHIVE_DIR,
11+
BATCH_FILE_PROCESSING_DIR,
12+
COMPLETED_ACK_DIR,
13+
TEMP_ACK_DIR,
14+
get_ack_bucket_name,
15+
get_source_bucket_name,
16+
)
17+
from logging_decorators import complete_batch_file_process_logging_decorator
1218

1319

1420
def create_ack_data(
@@ -46,6 +52,35 @@ def create_ack_data(
4652
}
4753

4854

55+
@complete_batch_file_process_logging_decorator
56+
def complete_batch_file_process(
57+
message_id: str,
58+
supplier: str,
59+
vaccine_type: str,
60+
created_at_formatted_string: str,
61+
file_key: str,
62+
total_ack_rows_processed: int,
63+
) -> dict:
64+
"""Mark the batch file as processed. This involves moving the ack and original file to destinations and updating
65+
the audit table status"""
66+
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
67+
68+
move_file(get_ack_bucket_name(), f"{TEMP_ACK_DIR}/{ack_filename}", f"{COMPLETED_ACK_DIR}/{ack_filename}")
69+
move_file(
70+
get_source_bucket_name(), f"{BATCH_FILE_PROCESSING_DIR}/{file_key}", f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}"
71+
)
72+
73+
change_audit_table_status_to_processed(file_key, message_id)
74+
75+
return {
76+
"message_id": message_id,
77+
"file_key": file_key,
78+
"supplier": supplier,
79+
"vaccine_type": vaccine_type,
80+
"row_count": total_ack_rows_processed,
81+
}
82+
83+
4984
def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
5085
"""Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
5186
try:
@@ -66,76 +101,27 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
66101
return accumulated_csv_content
67102

68103

69-
@upload_ack_file_logging_decorator
70-
def upload_ack_file(
71-
temp_ack_file_key: str,
72-
message_id: str,
73-
supplier: str,
74-
vaccine_type: str,
75-
accumulated_csv_content: StringIO,
76-
ack_data_rows: list,
77-
archive_ack_file_key: str,
104+
def update_ack_file(
78105
file_key: str,
79-
) -> Optional[dict]:
80-
"""Adds the data row to the uploaded ack file"""
106+
created_at_formatted_string: str,
107+
ack_data_rows: list,
108+
) -> None:
109+
"""Updates the ack file with the new data row based on the given arguments"""
110+
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
111+
temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}"
112+
archive_ack_file_key = f"{COMPLETED_ACK_DIR}/{ack_filename}"
113+
accumulated_csv_content = obtain_current_ack_content(temp_ack_file_key)
114+
81115
for row in ack_data_rows:
82116
data_row_str = [str(item) for item in row.values()]
83117
cleaned_row = "|".join(data_row_str).replace(" |", "|").replace("| ", "|").strip()
84118
accumulated_csv_content.write(cleaned_row + "\n")
85-
csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8"))
86119

120+
csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8"))
87121
ack_bucket_name = get_ack_bucket_name()
88-
source_bucket_name = get_source_bucket_name()
89122

90123
get_s3_client().upload_fileobj(csv_file_like_object, ack_bucket_name, temp_ack_file_key)
91-
92-
row_count_source = get_row_count(source_bucket_name, f"processing/{file_key}")
93-
row_count_destination = get_row_count(ack_bucket_name, temp_ack_file_key)
94-
# TODO: Should we check for > and if so what handling is required
95-
if row_count_destination == row_count_source:
96-
move_file(ack_bucket_name, temp_ack_file_key, archive_ack_file_key)
97-
move_file(source_bucket_name, f"processing/{file_key}", f"archive/{file_key}")
98-
99-
# Update the audit table
100-
change_audit_table_status_to_processed(file_key, message_id)
101-
102-
# Ingestion of this file is complete
103-
result = {
104-
"message_id": message_id,
105-
"file_key": file_key,
106-
"supplier": supplier,
107-
"vaccine_type": vaccine_type,
108-
"row_count": row_count_source - 1,
109-
}
110-
else:
111-
result = None
112124
logger.info("Ack file updated to %s: %s", ack_bucket_name, archive_ack_file_key)
113-
return result
114-
115-
116-
def update_ack_file(
117-
file_key: str,
118-
message_id: str,
119-
supplier: str,
120-
vaccine_type: str,
121-
created_at_formatted_string: str,
122-
ack_data_rows: list,
123-
) -> None:
124-
"""Updates the ack file with the new data row based on the given arguments"""
125-
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
126-
temp_ack_file_key = f"TempAck/{ack_filename}"
127-
archive_ack_file_key = f"forwardedFile/{ack_filename}"
128-
accumulated_csv_content = obtain_current_ack_content(temp_ack_file_key)
129-
upload_ack_file(
130-
temp_ack_file_key,
131-
message_id,
132-
supplier,
133-
vaccine_type,
134-
accumulated_csv_content,
135-
ack_data_rows,
136-
archive_ack_file_key,
137-
file_key,
138-
)
139125

140126

141127
def move_file(bucket_name: str, source_file_key: str, destination_file_key: str) -> None:
Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,21 @@
11
"""Utils for ack lambda"""
22

3-
from common.clients import get_s3_client
3+
from audit_table import get_record_count_by_message_id
44

5+
_BATCH_EVENT_ID_TO_RECORD_COUNT_MAP: dict[str, int] = {}
56

6-
def get_row_count(bucket_name: str, file_key: str) -> int:
7-
"""
8-
Looks in the given bucket and returns the count of the number of lines in the given file.
9-
NOTE: Blank lines are not included in the count.
10-
"""
11-
response = get_s3_client().get_object(Bucket=bucket_name, Key=file_key)
12-
return sum(1 for line in response["Body"].iter_lines() if line.strip())
7+
8+
def is_ack_processing_complete(batch_event_message_id: str, processed_ack_count: int) -> bool:
9+
"""Checks if we have received all the acknowledgement rows for the original source file. Also caches the value of
10+
the source file record count to reduce traffic to DynamoDB"""
11+
if batch_event_message_id in _BATCH_EVENT_ID_TO_RECORD_COUNT_MAP:
12+
return _BATCH_EVENT_ID_TO_RECORD_COUNT_MAP[batch_event_message_id] == processed_ack_count
13+
14+
record_count = get_record_count_by_message_id(batch_event_message_id)
15+
16+
if not record_count:
17+
# Record count is not set on the audit item until all rows have been preprocessed and sent to Kinesis
18+
return False
19+
20+
_BATCH_EVENT_ID_TO_RECORD_COUNT_MAP[batch_event_message_id] = record_count
21+
return record_count == processed_ack_count

0 commit comments

Comments
 (0)