From 6b4ed284123ba90da6af6525ba41064af3d65d76 Mon Sep 17 00:00:00 2001 From: Daniel Yip Date: Tue, 16 Sep 2025 12:44:13 +0100 Subject: [PATCH] Improved blank file checking --- filenameprocessor/src/constants.py | 5 --- filenameprocessor/src/errors.py | 4 -- filenameprocessor/src/file_name_processor.py | 16 ++------ filenameprocessor/src/file_validation.py | 10 +---- .../tests/test_lambda_handler.py | 41 +------------------ recordprocessor/src/batch_processor.py | 31 ++++++++++---- recordprocessor/src/constants.py | 4 ++ recordprocessor/src/file_level_validation.py | 15 +++++-- .../tests/test_recordprocessor_main.py | 38 +++++++++++++++++ .../values_for_recordprocessor_tests.py | 1 + 10 files changed, 83 insertions(+), 82 deletions(-) diff --git a/filenameprocessor/src/constants.py b/filenameprocessor/src/constants.py index d075989a01..43f6d264b0 100644 --- a/filenameprocessor/src/constants.py +++ b/filenameprocessor/src/constants.py @@ -7,7 +7,6 @@ VaccineTypePermissionsError, InvalidFileKeyError, UnhandledAuditTableError, - EmptyFileError, UnhandledSqsError ) @@ -23,15 +22,11 @@ ERROR_TYPE_TO_STATUS_CODE_MAP = { VaccineTypePermissionsError: 403, InvalidFileKeyError: 400, # Includes invalid ODS code, therefore unable to identify supplier - EmptyFileError: 400, UnhandledAuditTableError: 500, UnhandledSqsError: 500, Exception: 500, } -# The size in bytes of an empty batch file containing only the headers row -EMPTY_BATCH_FILE_SIZE_IN_BYTES = 700 - class FileStatus(StrEnum): """File status constants""" diff --git a/filenameprocessor/src/errors.py b/filenameprocessor/src/errors.py index d9fcd069bf..f131da6ac0 100644 --- a/filenameprocessor/src/errors.py +++ b/filenameprocessor/src/errors.py @@ -1,10 +1,6 @@ """Custom exceptions for the Filename Processor.""" -class EmptyFileError(Exception): - """A custom exception for when the batch file contains only the header row or is completely empty""" - - class UnhandledAuditTableError(Exception): """A custom exception for when an unexpected error occurs whilst adding the file to the audit table.""" diff --git a/filenameprocessor/src/file_name_processor.py b/filenameprocessor/src/file_name_processor.py index a9d8797222..746a7e9c69 100644 --- a/filenameprocessor/src/file_name_processor.py +++ b/filenameprocessor/src/file_name_processor.py @@ -9,7 +9,7 @@ import argparse from uuid import uuid4 from utils_for_filenameprocessor import get_creation_and_expiry_times, move_file -from file_validation import validate_file_key, is_file_in_directory_root, validate_file_not_empty +from file_validation import validate_file_key, is_file_in_directory_root from send_sqs_message import make_and_send_sqs_message from make_and_upload_ack_file import make_and_upload_the_ack_file from audit_table import upsert_audit_table @@ -20,8 +20,7 @@ VaccineTypePermissionsError, InvalidFileKeyError, UnhandledAuditTableError, - UnhandledSqsError, - EmptyFileError + UnhandledSqsError ) from constants import FileNotProcessedReason, FileStatus, ERROR_TYPE_TO_STATUS_CODE_MAP, SOURCE_BUCKET_NAME @@ -68,8 +67,6 @@ def handle_record(record) -> dict: created_at_formatted_string, expiry_timestamp = get_creation_and_expiry_times(s3_response) vaccine_type, supplier = validate_file_key(file_key) - # VED-757: Known issue with suppliers sometimes sending empty files - validate_file_not_empty(s3_response) permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier) queue_name = f"{supplier}_{vaccine_type}" @@ -94,17 +91,12 @@ def handle_record(record) -> dict: except ( # pylint: disable=broad-exception-caught VaccineTypePermissionsError, - EmptyFileError, InvalidFileKeyError, UnhandledAuditTableError, UnhandledSqsError, Exception, ) as error: - if isinstance(error, EmptyFileError): - # Avoid error log noise for accepted scenario in which supplier provides a batch file with no records - logger.warning("Error processing file '%s': %s", file_key, str(error)) - else: - logger.error("Error processing file '%s': %s", file_key, str(error)) + logger.error("Error processing file '%s': %s", file_key, str(error)) queue_name = f"{supplier}_{vaccine_type}" file_status = get_file_status_for_error(error) @@ -137,8 +129,6 @@ def get_file_status_for_error(error: Exception) -> str: """Creates a file status based on the type of error that was thrown""" if isinstance(error, VaccineTypePermissionsError): return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}" - elif isinstance(error, EmptyFileError): - return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.EMPTY}" return FileStatus.FAILED diff --git a/filenameprocessor/src/file_validation.py b/filenameprocessor/src/file_validation.py index 65634c87fa..efe56464ac 100644 --- a/filenameprocessor/src/file_validation.py +++ b/filenameprocessor/src/file_validation.py @@ -2,9 +2,9 @@ from re import match from datetime import datetime -from constants import VALID_VERSIONS, EMPTY_BATCH_FILE_SIZE_IN_BYTES +from constants import VALID_VERSIONS from elasticache import get_valid_vaccine_types_from_cache, get_supplier_system_from_cache -from errors import InvalidFileKeyError, EmptyFileError +from errors import InvalidFileKeyError def is_file_in_directory_root(file_key: str) -> bool: @@ -72,9 +72,3 @@ def validate_file_key(file_key: str) -> tuple[str, str]: raise InvalidFileKeyError("Initial file validation failed: invalid file key") return vaccine_type, supplier - - -def validate_file_not_empty(s3_response: dict) -> None: - """Checks that the batch file from S3 is not empty or containing only the header row""" - if s3_response.get("ContentLength", 0) <= EMPTY_BATCH_FILE_SIZE_IN_BYTES: - raise EmptyFileError("Initial file validation failed: batch file was empty") diff --git a/filenameprocessor/tests/test_lambda_handler.py b/filenameprocessor/tests/test_lambda_handler.py index b0be302bab..a3daf8a0fc 100644 --- a/filenameprocessor/tests/test_lambda_handler.py +++ b/filenameprocessor/tests/test_lambda_handler.py @@ -19,7 +19,7 @@ ) from tests.utils_for_tests.mock_environment_variables import MOCK_ENVIRONMENT_DICT, BucketNames, Sqs from tests.utils_for_tests.values_for_tests import MOCK_CREATED_AT_FORMATTED_STRING, MockFileDetails, \ - MOCK_BATCH_FILE_CONTENT, MOCK_FILE_HEADERS, MOCK_EXPIRES_AT + MOCK_BATCH_FILE_CONTENT, MOCK_EXPIRES_AT # Ensure environment variables are mocked before importing from src files with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT): @@ -196,45 +196,6 @@ def test_lambda_handler_new_file_success_and_first_in_queue(self): self.assert_sqs_message(file_details) self.assert_no_ack_file(file_details) - def test_lambda_handler_correctly_flags_empty_file(self): - """ - VED-757 Tests that for an empty batch file: - * The file status is updated to 'Not processed - empty file' in the audit table - * The message is not sent to SQS - * The failure inf_ack file is created - """ - file_details = MockFileDetails.ravs_rsv_1 - - s3_client.put_object(Bucket=BucketNames.SOURCE, Key=file_details.file_key, Body=MOCK_FILE_HEADERS) - - with ( # noqa: E999 - patch("file_name_processor.uuid4", return_value=file_details.message_id), # noqa: E999 - ): # noqa: E999 - lambda_handler( - self.make_event([self.make_record_with_message_id(file_details.file_key, file_details.message_id)]), - None, - ) - - expected_table_items = [ - { - "message_id": {"S": file_details.message_id}, - "filename": {"S": file_details.file_key}, - "queue_name": {"S": "RAVS_RSV"}, - "status": {"S": "Not processed - Empty file"}, - "error_details": {"S": "Initial file validation failed: batch file was empty"}, - "timestamp": {"S": file_details.created_at_formatted_string}, - "expires_at": {"N": str(file_details.expires_at)}, - } - ] - self.assertEqual(self.get_audit_table_items(), expected_table_items) - self.assert_no_sqs_message() - self.assert_ack_file_contents(file_details) - self.mock_logger.warning.assert_called_once_with( - "Error processing file '%s': %s", - "RSV_Vaccinations_v5_X8E5B_20000101T00000001.csv", - "Initial file validation failed: batch file was empty" - ) - def test_lambda_handler_non_root_file(self): """ Tests that when the file is not in the root of the source bucket, no action is taken: diff --git a/recordprocessor/src/batch_processor.py b/recordprocessor/src/batch_processor.py index c5783b5c79..129313b72e 100644 --- a/recordprocessor/src/batch_processor.py +++ b/recordprocessor/src/batch_processor.py @@ -3,15 +3,16 @@ import json import os import time +from csv import DictReader from json import JSONDecodeError -from constants import FileStatus +from constants import FileStatus, FileNotProcessedReason, SOURCE_BUCKET_NAME, ARCHIVE_DIR_NAME, PROCESSING_DIR_NAME from process_row import process_row from mappings import map_target_disease from audit_table import update_audit_table_status from send_to_kinesis import send_to_kinesis from clients import logger -from file_level_validation import file_level_validation +from file_level_validation import file_level_validation, file_is_empty, move_file from errors import NoOperationPermissions, InvalidHeaders from utils_for_recordprocessor import get_csv_content_dict_reader from typing import Optional @@ -42,7 +43,6 @@ def process_csv_to_fhir(incoming_message_body: dict) -> int: target_disease = map_target_disease(vaccine) - row_count = 0 row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations, created_at_formatted_string, csv_reader, target_disease) @@ -55,7 +55,7 @@ def process_csv_to_fhir(incoming_message_body: dict) -> int: encoder = new_encoder # load alternative encoder - csv_reader = get_csv_content_dict_reader(f"processing/{file_key}", encoder=encoder) + csv_reader = get_csv_content_dict_reader(f"{PROCESSING_DIR_NAME}/{file_key}", encoder=encoder) # re-read the file and skip processed rows row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations, created_at_formatted_string, csv_reader, target_disease, row_count) @@ -63,14 +63,29 @@ def process_csv_to_fhir(incoming_message_body: dict) -> int: logger.error(f"Row Processing error: {err}") raise err - update_audit_table_status(file_key, file_id, FileStatus.PREPROCESSED) + file_status = FileStatus.PREPROCESSED + + if file_is_empty(row_count): + logger.warning("File was empty: %s. Moving file to archive directory.", file_key) + move_file(SOURCE_BUCKET_NAME, f"{PROCESSING_DIR_NAME}/{file_key}", f"{ARCHIVE_DIR_NAME}/{file_key}") + file_status = f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.EMPTY}" + + update_audit_table_status(file_key, file_id, file_status) return row_count # Process the row to obtain the details needed for the message_body and ack file -def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, created_at_formatted_string, - csv_reader, target_disease, - total_rows_processed_count=0) -> tuple[int, Optional[Exception]]: +def process_rows( + file_id: str, + vaccine: str, + supplier: str, + file_key: str, + allowed_operations: set, + created_at_formatted_string: str, + csv_reader: DictReader, + target_disease: list[dict], + total_rows_processed_count: int = 0 +) -> tuple[int, Optional[Exception]]: """ Processes each row in the csv_reader starting from start_row. """ diff --git a/recordprocessor/src/constants.py b/recordprocessor/src/constants.py index cab86b6c73..f6ce844194 100644 --- a/recordprocessor/src/constants.py +++ b/recordprocessor/src/constants.py @@ -10,6 +10,9 @@ AUDIT_TABLE_QUEUE_NAME_GSI = "queue_name_index" FILE_NAME_PROC_LAMBDA_NAME = os.getenv("FILE_NAME_PROC_LAMBDA_NAME") +ARCHIVE_DIR_NAME = "archive" +PROCESSING_DIR_NAME = "processing" + EXPECTED_CSV_HEADERS = [ "NHS_NUMBER", "PERSON_FORENAME", @@ -62,6 +65,7 @@ class FileStatus: class FileNotProcessedReason(StrEnum): """Reasons why a file was not processed""" UNAUTHORISED = "Unauthorised" + EMPTY = "Empty file" class AuditTableKeys: diff --git a/recordprocessor/src/file_level_validation.py b/recordprocessor/src/file_level_validation.py index 5009820566..6541129f97 100644 --- a/recordprocessor/src/file_level_validation.py +++ b/recordprocessor/src/file_level_validation.py @@ -2,6 +2,8 @@ Functions for completing file-level validation (validating headers and ensuring that the supplier has permission to perform at least one of the requested operations) """ +from csv import DictReader + from clients import logger, s3_client from make_and_upload_ack_file import make_and_upload_ack_file from utils_for_recordprocessor import get_csv_content_dict_reader @@ -9,15 +11,20 @@ from logging_decorator import file_level_validation_logging_decorator from audit_table import update_audit_table_status from constants import SOURCE_BUCKET_NAME, EXPECTED_CSV_HEADERS, permission_to_operation_map, FileStatus, Permission, \ - FileNotProcessedReason + FileNotProcessedReason, ARCHIVE_DIR_NAME, PROCESSING_DIR_NAME -def validate_content_headers(csv_content_reader) -> None: +def validate_content_headers(csv_content_reader: DictReader) -> None: """Raises an InvalidHeaders error if the headers in the CSV file do not match the expected headers.""" if csv_content_reader.fieldnames != EXPECTED_CSV_HEADERS: raise InvalidHeaders("File headers are invalid.") +def file_is_empty(row_count: int) -> bool: + """Simple helper for readability to check if no rows were processed in a file i.e. empty""" + return row_count == 0 + + def get_permitted_operations( supplier: str, vaccine_type: str, allowed_permissions_list: list ) -> set: @@ -94,7 +101,7 @@ def file_level_validation(incoming_message_body: dict) -> dict: make_and_upload_ack_file(message_id, file_key, True, True, created_at_formatted_string) - move_file(SOURCE_BUCKET_NAME, file_key, f"processing/{file_key}") + move_file(SOURCE_BUCKET_NAME, file_key, f"{PROCESSING_DIR_NAME}/{file_key}") return { "message_id": message_id, @@ -118,7 +125,7 @@ def file_level_validation(incoming_message_body: dict) -> dict: if isinstance(error, NoOperationPermissions) else FileStatus.FAILED try: - move_file(SOURCE_BUCKET_NAME, file_key, f"archive/{file_key}") + move_file(SOURCE_BUCKET_NAME, file_key, f"{ARCHIVE_DIR_NAME}/{file_key}") except Exception as move_file_error: logger.error("Failed to move file to archive: %s", move_file_error) diff --git a/recordprocessor/tests/test_recordprocessor_main.py b/recordprocessor/tests/test_recordprocessor_main.py index edb33aa40d..1c95aaffec 100644 --- a/recordprocessor/tests/test_recordprocessor_main.py +++ b/recordprocessor/tests/test_recordprocessor_main.py @@ -153,6 +153,14 @@ def make_kinesis_assertions(self, test_cases): kinesis_data.pop(key_to_ignore) self.assertEqual(kinesis_data, expected_kinesis_data) + def assert_object_moved_to_archive(self, file_key: str) -> None: + """Checks that the S3 object was moved to the archive directory""" + with self.assertRaises(s3_client.exceptions.NoSuchKey): + s3_client.get_object(Bucket=BucketNames.SOURCE, Key=f"processing/{file_key}") + + response = s3_client.get_object(Bucket=BucketNames.SOURCE, Key=f"archive/{file_key}") + self.assertIsNotNone(response) + def test_e2e_full_permissions(self): """ Tests that file containing CREATE, UPDATE and DELETE is successfully processed when the supplier has @@ -402,6 +410,36 @@ def test_e2e_kinesis_failed(self): " not found."} }) + def test_e2e_empty_file_is_flagged_and_processed_correctly(self): + """ + Tests files that contain only the headers and no records are marked as empty and moved to archive. + """ + test_cases = [ + ("File containing only headers", ValidMockFileContent.headers), + ("File containing headers and new line", ValidMockFileContent.headers + "\n"), + ("File containing headers and multiple new lines", ValidMockFileContent.empty_file_with_multiple_new_lines) + ] + for description, file_content in test_cases: + + with self.subTest(description=description): + self.mock_batch_processor_logger.reset_mock() + test_file = mock_rsv_emis_file + self.upload_source_files(file_content) + add_entry_to_table(test_file, FileStatus.PROCESSING) + + main(test_file.event_full_permissions) + + kinesis_records = kinesis_client.get_records( + ShardIterator=self.get_shard_iterator(), Limit=10)["Records"] + + self.mock_batch_processor_logger.warning.assert_called_once_with( + "File was empty: %s. Moving file to archive directory.", + "RSV_Vaccinations_v5_8HK48_20210730T12000000.csv" + ) + self.assertListEqual(kinesis_records, []) + assert_audit_table_entry(test_file, "Not processed - Empty file") + self.assert_object_moved_to_archive(test_file.file_key) + def test_e2e_error_is_logged_if_invalid_json_provided(self): """This scenario should not happen. If it does, it means our batch processing system config is broken and we have received malformed content from SQS -> EventBridge. In this case we log the error so we will be alerted. diff --git a/recordprocessor/tests/utils_for_recordprocessor_tests/values_for_recordprocessor_tests.py b/recordprocessor/tests/utils_for_recordprocessor_tests/values_for_recordprocessor_tests.py index 466806d8a8..347d768c5d 100644 --- a/recordprocessor/tests/utils_for_recordprocessor_tests/values_for_recordprocessor_tests.py +++ b/recordprocessor/tests/utils_for_recordprocessor_tests/values_for_recordprocessor_tests.py @@ -125,6 +125,7 @@ class ValidMockFileContent: with_new_and_update_and_delete = ( headers + "\n" + MockFileRows.NEW + "\n" + MockFileRows.UPDATE + "\n" + MockFileRows.DELETE ) + empty_file_with_multiple_new_lines = MockFileRows.HEADERS + "\n".join(["\n" for i in range(100)]) class FileDetails: