Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions filenameprocessor/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
VaccineTypePermissionsError,
InvalidFileKeyError,
UnhandledAuditTableError,
EmptyFileError,
UnhandledSqsError
)

Expand All @@ -23,15 +22,11 @@
ERROR_TYPE_TO_STATUS_CODE_MAP = {
VaccineTypePermissionsError: 403,
InvalidFileKeyError: 400, # Includes invalid ODS code, therefore unable to identify supplier
EmptyFileError: 400,
UnhandledAuditTableError: 500,
UnhandledSqsError: 500,
Exception: 500,
}

# The size in bytes of an empty batch file containing only the headers row
EMPTY_BATCH_FILE_SIZE_IN_BYTES = 700


class FileStatus(StrEnum):
"""File status constants"""
Expand Down
4 changes: 0 additions & 4 deletions filenameprocessor/src/errors.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
"""Custom exceptions for the Filename Processor."""


class EmptyFileError(Exception):
"""A custom exception for when the batch file contains only the header row or is completely empty"""


class UnhandledAuditTableError(Exception):
"""A custom exception for when an unexpected error occurs whilst adding the file to the audit table."""

Expand Down
16 changes: 3 additions & 13 deletions filenameprocessor/src/file_name_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import argparse
from uuid import uuid4
from utils_for_filenameprocessor import get_creation_and_expiry_times, move_file
from file_validation import validate_file_key, is_file_in_directory_root, validate_file_not_empty
from file_validation import validate_file_key, is_file_in_directory_root
from send_sqs_message import make_and_send_sqs_message
from make_and_upload_ack_file import make_and_upload_the_ack_file
from audit_table import upsert_audit_table
Expand All @@ -20,8 +20,7 @@
VaccineTypePermissionsError,
InvalidFileKeyError,
UnhandledAuditTableError,
UnhandledSqsError,
EmptyFileError
UnhandledSqsError
)
from constants import FileNotProcessedReason, FileStatus, ERROR_TYPE_TO_STATUS_CODE_MAP, SOURCE_BUCKET_NAME

Expand Down Expand Up @@ -68,8 +67,6 @@ def handle_record(record) -> dict:
created_at_formatted_string, expiry_timestamp = get_creation_and_expiry_times(s3_response)

vaccine_type, supplier = validate_file_key(file_key)
# VED-757: Known issue with suppliers sometimes sending empty files
validate_file_not_empty(s3_response)
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)

queue_name = f"{supplier}_{vaccine_type}"
Expand All @@ -94,17 +91,12 @@ def handle_record(record) -> dict:

except ( # pylint: disable=broad-exception-caught
VaccineTypePermissionsError,
EmptyFileError,
InvalidFileKeyError,
UnhandledAuditTableError,
UnhandledSqsError,
Exception,
) as error:
if isinstance(error, EmptyFileError):
# Avoid error log noise for accepted scenario in which supplier provides a batch file with no records
logger.warning("Error processing file '%s': %s", file_key, str(error))
else:
logger.error("Error processing file '%s': %s", file_key, str(error))
logger.error("Error processing file '%s': %s", file_key, str(error))

queue_name = f"{supplier}_{vaccine_type}"
file_status = get_file_status_for_error(error)
Expand Down Expand Up @@ -137,8 +129,6 @@ def get_file_status_for_error(error: Exception) -> str:
"""Creates a file status based on the type of error that was thrown"""
if isinstance(error, VaccineTypePermissionsError):
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"
elif isinstance(error, EmptyFileError):
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.EMPTY}"

return FileStatus.FAILED

Expand Down
10 changes: 2 additions & 8 deletions filenameprocessor/src/file_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from re import match
from datetime import datetime
from constants import VALID_VERSIONS, EMPTY_BATCH_FILE_SIZE_IN_BYTES
from constants import VALID_VERSIONS
from elasticache import get_valid_vaccine_types_from_cache, get_supplier_system_from_cache
from errors import InvalidFileKeyError, EmptyFileError
from errors import InvalidFileKeyError


def is_file_in_directory_root(file_key: str) -> bool:
Expand Down Expand Up @@ -72,9 +72,3 @@ def validate_file_key(file_key: str) -> tuple[str, str]:
raise InvalidFileKeyError("Initial file validation failed: invalid file key")

return vaccine_type, supplier


def validate_file_not_empty(s3_response: dict) -> None:
"""Checks that the batch file from S3 is not empty or containing only the header row"""
if s3_response.get("ContentLength", 0) <= EMPTY_BATCH_FILE_SIZE_IN_BYTES:
raise EmptyFileError("Initial file validation failed: batch file was empty")
41 changes: 1 addition & 40 deletions filenameprocessor/tests/test_lambda_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)
from tests.utils_for_tests.mock_environment_variables import MOCK_ENVIRONMENT_DICT, BucketNames, Sqs
from tests.utils_for_tests.values_for_tests import MOCK_CREATED_AT_FORMATTED_STRING, MockFileDetails, \
MOCK_BATCH_FILE_CONTENT, MOCK_FILE_HEADERS, MOCK_EXPIRES_AT
MOCK_BATCH_FILE_CONTENT, MOCK_EXPIRES_AT

# Ensure environment variables are mocked before importing from src files
with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT):
Expand Down Expand Up @@ -196,45 +196,6 @@ def test_lambda_handler_new_file_success_and_first_in_queue(self):
self.assert_sqs_message(file_details)
self.assert_no_ack_file(file_details)

def test_lambda_handler_correctly_flags_empty_file(self):
"""
VED-757 Tests that for an empty batch file:
* The file status is updated to 'Not processed - empty file' in the audit table
* The message is not sent to SQS
* The failure inf_ack file is created
"""
file_details = MockFileDetails.ravs_rsv_1

s3_client.put_object(Bucket=BucketNames.SOURCE, Key=file_details.file_key, Body=MOCK_FILE_HEADERS)

with ( # noqa: E999
patch("file_name_processor.uuid4", return_value=file_details.message_id), # noqa: E999
): # noqa: E999
lambda_handler(
self.make_event([self.make_record_with_message_id(file_details.file_key, file_details.message_id)]),
None,
)

expected_table_items = [
{
"message_id": {"S": file_details.message_id},
"filename": {"S": file_details.file_key},
"queue_name": {"S": "RAVS_RSV"},
"status": {"S": "Not processed - Empty file"},
"error_details": {"S": "Initial file validation failed: batch file was empty"},
"timestamp": {"S": file_details.created_at_formatted_string},
"expires_at": {"N": str(file_details.expires_at)},
}
]
self.assertEqual(self.get_audit_table_items(), expected_table_items)
self.assert_no_sqs_message()
self.assert_ack_file_contents(file_details)
self.mock_logger.warning.assert_called_once_with(
"Error processing file '%s': %s",
"RSV_Vaccinations_v5_X8E5B_20000101T00000001.csv",
"Initial file validation failed: batch file was empty"
)

def test_lambda_handler_non_root_file(self):
"""
Tests that when the file is not in the root of the source bucket, no action is taken:
Expand Down
31 changes: 23 additions & 8 deletions recordprocessor/src/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
import json
import os
import time
from csv import DictReader
from json import JSONDecodeError

from constants import FileStatus
from constants import FileStatus, FileNotProcessedReason, SOURCE_BUCKET_NAME, ARCHIVE_DIR_NAME, PROCESSING_DIR_NAME
from process_row import process_row
from mappings import map_target_disease
from audit_table import update_audit_table_status
from send_to_kinesis import send_to_kinesis
from clients import logger
from file_level_validation import file_level_validation
from file_level_validation import file_level_validation, file_is_empty, move_file
from errors import NoOperationPermissions, InvalidHeaders
from utils_for_recordprocessor import get_csv_content_dict_reader
from typing import Optional
Expand Down Expand Up @@ -42,7 +43,6 @@ def process_csv_to_fhir(incoming_message_body: dict) -> int:

target_disease = map_target_disease(vaccine)

row_count = 0
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
created_at_formatted_string, csv_reader, target_disease)

Expand All @@ -55,22 +55,37 @@ def process_csv_to_fhir(incoming_message_body: dict) -> int:
encoder = new_encoder

# load alternative encoder
csv_reader = get_csv_content_dict_reader(f"processing/{file_key}", encoder=encoder)
csv_reader = get_csv_content_dict_reader(f"{PROCESSING_DIR_NAME}/{file_key}", encoder=encoder)
# re-read the file and skip processed rows
row_count, err = process_rows(file_id, vaccine, supplier, file_key, allowed_operations,
created_at_formatted_string, csv_reader, target_disease, row_count)
else:
logger.error(f"Row Processing error: {err}")
raise err

update_audit_table_status(file_key, file_id, FileStatus.PREPROCESSED)
file_status = FileStatus.PREPROCESSED

if file_is_empty(row_count):
logger.warning("File was empty: %s. Moving file to archive directory.", file_key)
move_file(SOURCE_BUCKET_NAME, f"{PROCESSING_DIR_NAME}/{file_key}", f"{ARCHIVE_DIR_NAME}/{file_key}")
file_status = f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.EMPTY}"

update_audit_table_status(file_key, file_id, file_status)
return row_count


# Process the row to obtain the details needed for the message_body and ack file
def process_rows(file_id, vaccine, supplier, file_key, allowed_operations, created_at_formatted_string,
csv_reader, target_disease,
total_rows_processed_count=0) -> tuple[int, Optional[Exception]]:
def process_rows(
file_id: str,
vaccine: str,
supplier: str,
file_key: str,
allowed_operations: set,
created_at_formatted_string: str,
csv_reader: DictReader,
target_disease: list[dict],
total_rows_processed_count: int = 0
) -> tuple[int, Optional[Exception]]:
"""
Processes each row in the csv_reader starting from start_row.
"""
Expand Down
4 changes: 4 additions & 0 deletions recordprocessor/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
AUDIT_TABLE_QUEUE_NAME_GSI = "queue_name_index"
FILE_NAME_PROC_LAMBDA_NAME = os.getenv("FILE_NAME_PROC_LAMBDA_NAME")

ARCHIVE_DIR_NAME = "archive"
PROCESSING_DIR_NAME = "processing"

EXPECTED_CSV_HEADERS = [
"NHS_NUMBER",
"PERSON_FORENAME",
Expand Down Expand Up @@ -62,6 +65,7 @@ class FileStatus:
class FileNotProcessedReason(StrEnum):
"""Reasons why a file was not processed"""
UNAUTHORISED = "Unauthorised"
EMPTY = "Empty file"


class AuditTableKeys:
Expand Down
15 changes: 11 additions & 4 deletions recordprocessor/src/file_level_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,29 @@
Functions for completing file-level validation
(validating headers and ensuring that the supplier has permission to perform at least one of the requested operations)
"""
from csv import DictReader

from clients import logger, s3_client
from make_and_upload_ack_file import make_and_upload_ack_file
from utils_for_recordprocessor import get_csv_content_dict_reader
from errors import InvalidHeaders, NoOperationPermissions
from logging_decorator import file_level_validation_logging_decorator
from audit_table import update_audit_table_status
from constants import SOURCE_BUCKET_NAME, EXPECTED_CSV_HEADERS, permission_to_operation_map, FileStatus, Permission, \
FileNotProcessedReason
FileNotProcessedReason, ARCHIVE_DIR_NAME, PROCESSING_DIR_NAME


def validate_content_headers(csv_content_reader) -> None:
def validate_content_headers(csv_content_reader: DictReader) -> None:
"""Raises an InvalidHeaders error if the headers in the CSV file do not match the expected headers."""
if csv_content_reader.fieldnames != EXPECTED_CSV_HEADERS:
raise InvalidHeaders("File headers are invalid.")


def file_is_empty(row_count: int) -> bool:
"""Simple helper for readability to check if no rows were processed in a file i.e. empty"""
return row_count == 0


def get_permitted_operations(
supplier: str, vaccine_type: str, allowed_permissions_list: list
) -> set:
Expand Down Expand Up @@ -94,7 +101,7 @@ def file_level_validation(incoming_message_body: dict) -> dict:

make_and_upload_ack_file(message_id, file_key, True, True, created_at_formatted_string)

move_file(SOURCE_BUCKET_NAME, file_key, f"processing/{file_key}")
move_file(SOURCE_BUCKET_NAME, file_key, f"{PROCESSING_DIR_NAME}/{file_key}")

return {
"message_id": message_id,
Expand All @@ -118,7 +125,7 @@ def file_level_validation(incoming_message_body: dict) -> dict:
if isinstance(error, NoOperationPermissions) else FileStatus.FAILED

try:
move_file(SOURCE_BUCKET_NAME, file_key, f"archive/{file_key}")
move_file(SOURCE_BUCKET_NAME, file_key, f"{ARCHIVE_DIR_NAME}/{file_key}")
except Exception as move_file_error:
logger.error("Failed to move file to archive: %s", move_file_error)

Expand Down
38 changes: 38 additions & 0 deletions recordprocessor/tests/test_recordprocessor_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ def make_kinesis_assertions(self, test_cases):
kinesis_data.pop(key_to_ignore)
self.assertEqual(kinesis_data, expected_kinesis_data)

def assert_object_moved_to_archive(self, file_key: str) -> None:
"""Checks that the S3 object was moved to the archive directory"""
with self.assertRaises(s3_client.exceptions.NoSuchKey):
s3_client.get_object(Bucket=BucketNames.SOURCE, Key=f"processing/{file_key}")

response = s3_client.get_object(Bucket=BucketNames.SOURCE, Key=f"archive/{file_key}")
self.assertIsNotNone(response)

def test_e2e_full_permissions(self):
"""
Tests that file containing CREATE, UPDATE and DELETE is successfully processed when the supplier has
Expand Down Expand Up @@ -402,6 +410,36 @@ def test_e2e_kinesis_failed(self):
" not found."}
})

def test_e2e_empty_file_is_flagged_and_processed_correctly(self):
"""
Tests files that contain only the headers and no records are marked as empty and moved to archive.
"""
test_cases = [
("File containing only headers", ValidMockFileContent.headers),
("File containing headers and new line", ValidMockFileContent.headers + "\n"),
("File containing headers and multiple new lines", ValidMockFileContent.empty_file_with_multiple_new_lines)
]
for description, file_content in test_cases:

with self.subTest(description=description):
self.mock_batch_processor_logger.reset_mock()
test_file = mock_rsv_emis_file
self.upload_source_files(file_content)
add_entry_to_table(test_file, FileStatus.PROCESSING)

main(test_file.event_full_permissions)

kinesis_records = kinesis_client.get_records(
ShardIterator=self.get_shard_iterator(), Limit=10)["Records"]

self.mock_batch_processor_logger.warning.assert_called_once_with(
"File was empty: %s. Moving file to archive directory.",
"RSV_Vaccinations_v5_8HK48_20210730T12000000.csv"
)
self.assertListEqual(kinesis_records, [])
assert_audit_table_entry(test_file, "Not processed - Empty file")
self.assert_object_moved_to_archive(test_file.file_key)

def test_e2e_error_is_logged_if_invalid_json_provided(self):
"""This scenario should not happen. If it does, it means our batch processing system config is broken and we
have received malformed content from SQS -> EventBridge. In this case we log the error so we will be alerted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class ValidMockFileContent:
with_new_and_update_and_delete = (
headers + "\n" + MockFileRows.NEW + "\n" + MockFileRows.UPDATE + "\n" + MockFileRows.DELETE
)
empty_file_with_multiple_new_lines = MockFileRows.HEADERS + "\n".join(["\n" for i in range(100)])


class FileDetails:
Expand Down
Loading