Skip to content

Commit 08266ca

Browse files
authored
VED-845: refactor record-processor (#1126)
* refactoring file level validation
1 parent d3c89fe commit 08266ca

7 files changed

Lines changed: 93 additions & 66 deletions

File tree

lambdas/ack_backend/src/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
TEMP_ACK_DIR = "TempAck"
55
BATCH_FILE_PROCESSING_DIR = "processing"
66
BATCH_FILE_ARCHIVE_DIR = "archive"
7+
LAMBDA_FUNCTION_NAME_PREFIX = "ack_processor"
8+
DEFAULT_STREAM_NAME = "immunisation-fhir-api-internal-dev-splunk-firehose"
79

810

911
ACK_HEADERS = [

lambdas/ack_backend/src/logging_decorators.py

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
from functools import wraps
77

88
from common.log_decorator import generate_and_send_logs
9+
from constants import DEFAULT_STREAM_NAME, LAMBDA_FUNCTION_NAME_PREFIX
910

10-
PREFIX = "ack_processor"
11-
STREAM_NAME = os.getenv("SPLUNK_FIREHOSE_NAME", "immunisation-fhir-api-internal-dev-splunk-firehose")
11+
STREAM_NAME = os.getenv("SPLUNK_FIREHOSE_NAME", DEFAULT_STREAM_NAME)
1212

1313

1414
def convert_message_to_ack_row_logging_decorator(func):
@@ -17,7 +17,7 @@ def convert_message_to_ack_row_logging_decorator(func):
1717
@wraps(func)
1818
def wrapper(message, created_at_formatted_string):
1919
base_log_data = {
20-
"function_name": f"{PREFIX}_{func.__name__}",
20+
"function_name": f"{LAMBDA_FUNCTION_NAME_PREFIX}_{func.__name__}",
2121
"date_time": str(datetime.now()),
2222
}
2323
start_time = time.time()
@@ -69,41 +69,13 @@ def wrapper(message, created_at_formatted_string):
6969
return wrapper
7070

7171

72-
def complete_batch_file_process_logging_decorator(func):
73-
"""This decorator logs when record processing is complete."""
74-
75-
@wraps(func)
76-
def wrapper(*args, **kwargs):
77-
base_log_data = {
78-
"function_name": f"{PREFIX}_{func.__name__}",
79-
"date_time": str(datetime.now()),
80-
}
81-
start_time = time.time()
82-
83-
# NB this doesn't require a try-catch block as the wrapped function never throws an exception
84-
result = func(*args, **kwargs)
85-
if result is not None:
86-
message_for_logs = "Record processing complete"
87-
base_log_data.update(result)
88-
additional_log_data = {
89-
"status": "success",
90-
"statusCode": 200,
91-
"message": message_for_logs,
92-
}
93-
generate_and_send_logs(STREAM_NAME, start_time, base_log_data, additional_log_data)
94-
95-
return result
96-
97-
return wrapper
98-
99-
10072
def ack_lambda_handler_logging_decorator(func):
10173
"""This decorator logs the execution info for the ack lambda handler."""
10274

10375
@wraps(func)
10476
def wrapper(event, context, *args, **kwargs):
10577
base_log_data = {
106-
"function_name": f"{PREFIX}_{func.__name__}",
78+
"function_name": f"{LAMBDA_FUNCTION_NAME_PREFIX}_{func.__name__}",
10779
"date_time": str(datetime.now()),
10880
}
10981
start_time = time.time()

lambdas/ack_backend/src/update_ack_file.py

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
"""Functions for uploading the data to the ack file"""
22

3+
import os
34
import time
5+
from datetime import datetime
46
from io import BytesIO, StringIO
57

68
from botocore.exceptions import ClientError
@@ -12,15 +14,19 @@
1214
)
1315
from common.aws_s3_utils import move_file
1416
from common.clients import get_s3_client, logger
17+
from common.log_decorator import generate_and_send_logs
1518
from common.models.batch_constants import ACK_BUCKET_NAME, SOURCE_BUCKET_NAME
1619
from constants import (
1720
ACK_HEADERS,
1821
BATCH_FILE_ARCHIVE_DIR,
1922
BATCH_FILE_PROCESSING_DIR,
2023
COMPLETED_ACK_DIR,
24+
DEFAULT_STREAM_NAME,
25+
LAMBDA_FUNCTION_NAME_PREFIX,
2126
TEMP_ACK_DIR,
2227
)
23-
from logging_decorators import complete_batch_file_process_logging_decorator
28+
29+
STREAM_NAME = os.getenv("SPLUNK_FIREHOSE_NAME", DEFAULT_STREAM_NAME)
2430

2531

2632
def create_ack_data(
@@ -58,7 +64,6 @@ def create_ack_data(
5864
}
5965

6066

61-
@complete_batch_file_process_logging_decorator
6267
def complete_batch_file_process(
6368
message_id: str,
6469
supplier: str,
@@ -68,6 +73,8 @@ def complete_batch_file_process(
6873
) -> dict:
6974
"""Mark the batch file as processed. This involves moving the ack and original file to destinations and updating
7075
the audit table status"""
76+
start_time = time.time()
77+
7178
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
7279

7380
move_file(ACK_BUCKET_NAME, f"{TEMP_ACK_DIR}/{ack_filename}", f"{COMPLETED_ACK_DIR}/{ack_filename}")
@@ -81,7 +88,7 @@ def complete_batch_file_process(
8188
successful_record_count = total_ack_rows_processed - total_failures
8289
set_audit_record_success_count_and_end_time(file_key, message_id, successful_record_count, ingestion_end_time)
8390

84-
return {
91+
result = {
8592
"message_id": message_id,
8693
"file_key": file_key,
8794
"supplier": supplier,
@@ -91,6 +98,29 @@ def complete_batch_file_process(
9198
"failure_count": total_failures,
9299
}
93100

101+
log_batch_file_process(
102+
start_time=start_time,
103+
result=result,
104+
function_name=f"{LAMBDA_FUNCTION_NAME_PREFIX}_complete_batch_file_process",
105+
)
106+
107+
return result
108+
109+
110+
def log_batch_file_process(start_time: float, result: dict, function_name: str) -> None:
111+
"""Logs the batch file processing completion to Splunk"""
112+
base_log_data = {
113+
"function_name": function_name,
114+
"date_time": str(datetime.now()),
115+
**result,
116+
}
117+
additional_log_data = {
118+
"status": "success",
119+
"statusCode": 200,
120+
"message": "Record processing complete",
121+
}
122+
generate_and_send_logs(STREAM_NAME, start_time, base_log_data, additional_log_data)
123+
94124

95125
def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
96126
"""Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""

lambdas/ack_backend/tests/test_splunk_logging.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@ def test_splunk_update_ack_file_logged(self):
425425

426426
with ( # noqa: E999
427427
patch("common.log_decorator.send_log_to_firehose") as mock_send_log_to_firehose, # noqa: E999
428+
patch("update_ack_file.datetime") as mock_datetime,
428429
patch("common.log_decorator.logger") as mock_logger, # noqa: E999
429430
patch("update_ack_file.get_record_count_and_failures_by_message_id", return_value=(99, 2)),
430431
patch(
@@ -435,6 +436,7 @@ def test_splunk_update_ack_file_logged(self):
435436
) as mock_set_records_succeeded_count_and_end_time, # noqa: E999
436437
patch("ack_processor.increment_records_failed_count"), # noqa: E999
437438
): # noqa: E999
439+
mock_datetime.now.return_value = ValidValues.fixed_datetime
438440
result = lambda_handler(generate_event(messages, include_eof_message=True), context={})
439441

440442
self.assertEqual(result, EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS)

lambdas/recordprocessor/src/batch_processor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ def process_csv_to_fhir(incoming_message_body: dict) -> int:
6262
logger.warning(f"Encoding Error: {err}.")
6363
new_encoder = "cp1252"
6464
logger.info(f"Encode error at row {row_count} with {encoder}. Switch to {new_encoder}")
65-
encoder = new_encoder
65+
encoding = new_encoder
6666

6767
# load alternative encoder
68-
csv_reader = get_csv_content_dict_reader(f"{PROCESSING_DIR_NAME}/{file_key}", encoder=encoder)
68+
csv_reader = get_csv_content_dict_reader(f"{PROCESSING_DIR_NAME}/{file_key}", encoding=encoding)
6969
# re-read the file and skip processed rows
7070
row_count, err = process_rows(
7171
file_id,

lambdas/recordprocessor/src/file_level_validation.py

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -70,24 +70,21 @@ def file_level_validation(incoming_message_body: dict) -> dict:
7070
NOTE: If file level validation fails the source file is moved to the archive folder, the audit table is updated
7171
to reflect the file has been processed and the filename lambda is invoked with the next file in the queue.
7272
"""
73+
message_id = None
74+
file_key = None
75+
created_at_formatted_string = None
76+
7377
try:
7478
message_id = incoming_message_body.get("message_id")
7579
vaccine = incoming_message_body.get("vaccine_type").upper()
7680
supplier = incoming_message_body.get("supplier").upper()
7781
file_key = incoming_message_body.get("filename")
7882
permission = incoming_message_body.get("permission")
7983
created_at_formatted_string = incoming_message_body.get("created_at_formatted_string")
80-
encoder = incoming_message_body.get("encoder", "utf-8")
84+
encoding = incoming_message_body.get("encoding", "utf-8")
8185

8286
# Fetch the data
83-
try:
84-
csv_reader = get_csv_content_dict_reader(file_key, encoder=encoder)
85-
validate_content_headers(csv_reader)
86-
except UnicodeDecodeError as e:
87-
logger.warning("Invalid Encoding detected: %s", e)
88-
# retry with cp1252 encoding
89-
csv_reader = get_csv_content_dict_reader(file_key, encoder="cp1252")
90-
validate_content_headers(csv_reader)
87+
csv_reader = get_validated_csv_reader(file_key, encoding=encoding)
9188

9289
# Validate has permission to perform at least one of the requested actions
9390
allowed_operations_set = get_permitted_operations(supplier, vaccine, permission)
@@ -110,24 +107,48 @@ def file_level_validation(incoming_message_body: dict) -> dict:
110107
}
111108

112109
except Exception as error:
113-
logger.error("Error in file_level_validation: %s", error)
114-
115-
# NOTE: The Exception may occur before the file_id, file_key and created_at_formatted_string are assigned
116-
message_id = message_id or "Unable to ascertain message_id"
117-
file_key = file_key or "Unable to ascertain file_key"
118-
created_at_formatted_string = created_at_formatted_string or "Unable to ascertain created_at_formatted_string"
119-
make_and_upload_ack_file(message_id, file_key, False, False, created_at_formatted_string)
120-
file_status = (
121-
f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"
122-
if isinstance(error, NoOperationPermissions)
123-
else FileStatus.FAILED
110+
handle_file_level_validation_exception(
111+
error, message_id=message_id, file_key=file_key, created_at_formatted_string=created_at_formatted_string
124112
)
113+
raise
125114

126-
try:
127-
move_file(SOURCE_BUCKET_NAME, file_key, f"{ARCHIVE_DIR_NAME}/{file_key}")
128-
except Exception as move_file_error:
129-
logger.error("Failed to move file to archive: %s", move_file_error)
130115

131-
# Update the audit table
132-
update_audit_table_status(file_key, message_id, file_status, error_details=str(error))
133-
raise
116+
def get_validated_csv_reader(file_key: str, encoding: str = "utf-8") -> DictReader:
117+
"""Helper function to get a validated CSV DictReader object."""
118+
try:
119+
csv_reader = get_csv_content_dict_reader(file_key, encoding=encoding)
120+
validate_content_headers(csv_reader)
121+
return csv_reader
122+
except UnicodeDecodeError as e:
123+
logger.warning("Invalid Encoding detected: %s", e)
124+
# Retry using cp-1252 encoding if the expected utf-8 fails
125+
# This is a known issue with a supplier - see VED-754 for details
126+
csv_reader = get_csv_content_dict_reader(file_key, encoding="cp1252")
127+
128+
validate_content_headers(csv_reader)
129+
return csv_reader
130+
131+
132+
def handle_file_level_validation_exception(
133+
error: Exception, message_id: str | None, file_key: str | None, created_at_formatted_string: str | None
134+
) -> None:
135+
logger.error("Error in file_level_validation: %s", error)
136+
137+
# NOTE: The Exception may occur before the file_id, file_key and created_at_formatted_string are assigned
138+
message_id = message_id or "Unable to ascertain message_id"
139+
file_key = file_key or "Unable to ascertain file_key"
140+
created_at_formatted_string = created_at_formatted_string or "Unable to ascertain created_at_formatted_string"
141+
make_and_upload_ack_file(message_id, file_key, False, False, created_at_formatted_string)
142+
file_status = (
143+
f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"
144+
if isinstance(error, NoOperationPermissions)
145+
else FileStatus.FAILED
146+
)
147+
148+
try:
149+
move_file(SOURCE_BUCKET_NAME, file_key, f"{ARCHIVE_DIR_NAME}/{file_key}")
150+
except Exception as move_file_error:
151+
logger.error("Failed to move file to archive: %s", move_file_error)
152+
153+
# Update the audit table
154+
update_audit_table_status(file_key, message_id, file_status, error_details=str(error))

lambdas/recordprocessor/src/utils_for_recordprocessor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ def get_environment() -> str:
1414
return _env if _env in ["internal-dev", "int", "ref", "sandbox", "prod"] else "internal-dev"
1515

1616

17-
def get_csv_content_dict_reader(file_key: str, encoder="utf-8") -> DictReader:
17+
def get_csv_content_dict_reader(file_key: str, encoding="utf-8") -> DictReader:
1818
"""Returns the requested file contents from the source bucket in the form of a DictReader"""
1919
response = get_s3_client().get_object(Bucket=os.getenv("SOURCE_BUCKET_NAME"), Key=file_key)
2020
binary_io = response["Body"]
21-
text_io = TextIOWrapper(binary_io, encoding=encoder, newline="")
21+
text_io = TextIOWrapper(binary_io, encoding=encoding, newline="")
2222
return DictReader(text_io, delimiter="|")
2323

2424

0 commit comments

Comments
 (0)