Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions infrastructure/instance/file_name_processor.tf
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ resource "aws_lambda_function" "file_processor_lambda" {

environment {
variables = {
ACCOUNT_ID = var.immunisation_account_id
SOURCE_BUCKET_NAME = aws_s3_bucket.batch_data_source_bucket.bucket
ACK_BUCKET_NAME = aws_s3_bucket.batch_data_destination_bucket.bucket
QUEUE_URL = aws_sqs_queue.batch_file_created.url
Expand Down
2 changes: 2 additions & 0 deletions lambdas/filenameprocessor/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
)

SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME")
DPS_DESTINATION_BUCKET_NAME = os.getenv("ACK_BUCKET_NAME")
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
AUDIT_TABLE_TTL_DAYS = os.getenv("AUDIT_TABLE_TTL_DAYS")
VALID_VERSIONS = ["V5"]

VACCINE_TYPE_TO_DISEASES_HASH_KEY = "vacc_to_diseases"
ODS_CODE_TO_SUPPLIER_SYSTEM_HASH_KEY = "ods_code_to_supplier"
EXTENDED_ATTRIBUTES_PREFIXES = "Vaccination_Extended_Attributes"

ERROR_TYPE_TO_STATUS_CODE_MAP = {
VaccineTypePermissionsError: 403,
Expand Down
145 changes: 96 additions & 49 deletions lambdas/filenameprocessor/src/file_name_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@
from uuid import uuid4

from audit_table import upsert_audit_table
from common.aws_s3_utils import move_file
from common.aws_s3_utils import move_file, move_file_outside_bucket
from common.clients import STREAM_NAME, get_s3_client, logger
from common.log_decorator import logging_decorator
from common.models.errors import UnhandledAuditTableError
from constants import (
DPS_DESTINATION_BUCKET_NAME,
ERROR_TYPE_TO_STATUS_CODE_MAP,
EXTENDED_ATTRIBUTES_PREFIXES,
SOURCE_BUCKET_NAME,
FileNotProcessedReason,
FileStatus,
)
from file_validation import is_file_in_directory_root, validate_file_key
from file_validation import is_file_in_directory_root, validate_batch_file_key, validate_extended_attributes_file_key
from make_and_upload_ack_file import make_and_upload_the_ack_file
from models.errors import (
InvalidFileKeyError,
Expand Down Expand Up @@ -77,38 +79,58 @@ def handle_record(record) -> dict:
s3_response = get_s3_client().get_object(Bucket=bucket_name, Key=file_key)
created_at_formatted_string, expiry_timestamp = get_creation_and_expiry_times(s3_response)

vaccine_type, supplier = validate_file_key(file_key)
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)

queue_name = f"{supplier}_{vaccine_type}"
upsert_audit_table(
message_id,
file_key,
created_at_formatted_string,
expiry_timestamp,
queue_name,
FileStatus.QUEUED,
)
make_and_send_sqs_message(
file_key,
message_id,
permissions,
vaccine_type,
supplier,
created_at_formatted_string,
)

logger.info("Lambda invocation successful for file '%s'", file_key)

# Return details for logs
return {
"statusCode": 200,
"message": "Successfully sent to SQS for further processing",
"file_key": file_key,
"message_id": message_id,
"vaccine_type": vaccine_type,
"supplier": supplier,
}
if file_key.startswith(EXTENDED_ATTRIBUTES_PREFIXES):
extended_attribute_identifier = validate_extended_attributes_file_key(file_key)
move_file_outside_bucket(bucket_name, file_key, DPS_DESTINATION_BUCKET_NAME, f"archive/{file_key}")
queue_name = extended_attribute_identifier

upsert_audit_table(
message_id,
file_key,
created_at_formatted_string,
expiry_timestamp,
queue_name,
FileStatus.PROCESSING,
)
return {
"statusCode": 200,
"message": "Extended Attributes file successfully processed",
"file_key": file_key,
"message_id": message_id,
"queue_name": queue_name,
}
else:
vaccine_type, supplier = validate_batch_file_key(file_key)
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)
queue_name = f"{supplier}_{vaccine_type}"
upsert_audit_table(
message_id,
file_key,
created_at_formatted_string,
expiry_timestamp,
queue_name,
FileStatus.QUEUED,
)
make_and_send_sqs_message(
file_key,
message_id,
permissions,
vaccine_type,
supplier,
created_at_formatted_string,
)

logger.info("Lambda invocation successful for file '%s'", file_key)

# Return details for logs
return {
"statusCode": 200,
"message": "Successfully sent to SQS for further processing",
"file_key": file_key,
"message_id": message_id,
"vaccine_type": vaccine_type,
"supplier": supplier,
}

except ( # pylint: disable=broad-exception-caught
VaccineTypePermissionsError,
Expand Down Expand Up @@ -140,6 +162,16 @@ def handle_record(record) -> dict:
move_file(bucket_name, file_key, f"archive/{file_key}")

# Return details for logs
if file_key.startswith(EXTENDED_ATTRIBUTES_PREFIXES):
extended_attribute_identifier = validate_extended_attributes_file_key(file_key)
return {
"statusCode": ERROR_TYPE_TO_STATUS_CODE_MAP.get(type(error), 500),
"message": "Infrastructure Level Response Value - Processing Error",
"file_key": file_key,
"message_id": message_id,
"vaccine_supplier_info": extended_attribute_identifier,
"error": str(error),
}
return {
"statusCode": ERROR_TYPE_TO_STATUS_CODE_MAP.get(type(error), 500),
"message": "Infrastructure Level Response Value - Processing Error",
Expand All @@ -163,21 +195,36 @@ def handle_unexpected_bucket_name(bucket_name: str, file_key: str) -> dict:
"""Handles scenario where Lambda was not invoked by the data-sources bucket. Should not occur due to terraform
config and overarching design"""
try:
vaccine_type, supplier = validate_file_key(file_key)
logger.error(
"Unable to process file %s due to unexpected bucket name %s",
file_key,
bucket_name,
)
message = f"Failed to process file due to unexpected bucket name {bucket_name}"

return {
"statusCode": 500,
"message": message,
"file_key": file_key,
"vaccine_type": vaccine_type,
"supplier": supplier,
}
if file_key.startswith(EXTENDED_ATTRIBUTES_PREFIXES):
extended_attribute_identifier = validate_extended_attributes_file_key(file_key)
logger.error(
"Unable to process file %s due to unexpected bucket name %s",
file_key,
bucket_name,
)
message = f"Failed to process file due to unexpected bucket name {bucket_name}"
return {
"statusCode": 500,
"message": message,
"file_key": file_key,
"vaccine_supplier_info": extended_attribute_identifier,
}
else:
vaccine_type, supplier = validate_batch_file_key(file_key)
logger.error(
"Unable to process file %s due to unexpected bucket name %s",
file_key,
bucket_name,
)
message = f"Failed to process file due to unexpected bucket name {bucket_name}"

return {
"statusCode": 500,
"message": message,
"file_key": file_key,
"vaccine_type": vaccine_type,
"supplier": supplier,
}

except Exception as error:
logger.error(
Expand Down
35 changes: 26 additions & 9 deletions lambdas/filenameprocessor/src/file_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,20 @@ def is_valid_datetime(timestamp: str) -> bool:
return True


def validate_file_key(file_key: str) -> tuple[str, str]:
def validate_extended_attributes_file_key(file_key: str) -> str:
"""
Checks that all elements of the file key are valid, raises an exception otherwise.
Returns a string containing the organization code and COVID vaccine type needed in the audit table.
"""
if not match(r"^[^_.]*_[^_.]*_[^_.]*_[^_.]*_[^_.]*_[^_.]*_[^_.]*", file_key):
raise InvalidFileKeyError("Initial file validation failed: invalid extended attributes file key format")

file_key_parts_without_extension, _ = split_file_key(file_key)
organization_code = file_key_parts_without_extension[5]
return f"{organization_code}_COVID"


def validate_batch_file_key(file_key: str) -> tuple[str, str]:
"""
Checks that all elements of the file key are valid, raises an exception otherwise.
Returns a tuple containing the vaccine_type and supplier (both converted to upper case).
Expand All @@ -46,20 +59,14 @@ def validate_file_key(file_key: str) -> tuple[str, str]:
if not match(r"^[^_.]*_[^_.]*_[^_.]*_[^_.]*_[^_.]*", file_key):
raise InvalidFileKeyError("Initial file validation failed: invalid file key format")

file_key = file_key.upper()
file_name_and_extension = file_key.rsplit(".", 1)

if len(file_name_and_extension) != 2:
raise InvalidFileKeyError("Initial file validation failed: missing file extension")

file_key_parts_without_extension = file_name_and_extension[0].split("_")
file_key_parts_without_extension, file_name_and_extension = split_file_key(file_key)

vaccine_type = file_key_parts_without_extension[0]
vaccination = file_key_parts_without_extension[1]
version = file_key_parts_without_extension[2]
ods_code = file_key_parts_without_extension[3]
timestamp = file_key_parts_without_extension[4]
extension = file_name_and_extension[1]
extension = file_name_and_extension
supplier = get_supplier_system_from_cache(ods_code)

valid_vaccine_types = get_valid_vaccine_types_from_cache()
Expand All @@ -76,3 +83,13 @@ def validate_file_key(file_key: str) -> tuple[str, str]:
raise InvalidFileKeyError("Initial file validation failed: invalid file key")

return vaccine_type, supplier


def split_file_key(file_key: str) -> tuple[list[str], str]:
file_key = file_key.upper()
file_name_and_extension = file_key.rsplit(".", 1)

if len(file_name_and_extension) != 2:
raise InvalidFileKeyError("Initial file validation failed: missing file extension")

return file_name_and_extension[0].split("_"), file_name_and_extension[1]
52 changes: 47 additions & 5 deletions lambdas/filenameprocessor/tests/test_file_key_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
from file_validation import (
is_file_in_directory_root,
is_valid_datetime,
validate_file_key,
split_file_key,
validate_batch_file_key,
validate_extended_attributes_file_key,
)
from models.errors import InvalidFileKeyError

Expand Down Expand Up @@ -63,7 +65,7 @@ def test_is_valid_datetime(self, _):
with self.subTest():
self.assertEqual(is_valid_datetime(date_time_string), expected_result)

def test_validate_file_key(self, mock_get_redis_client):
def test_validate_batch_file_key(self, mock_get_redis_client):
"""Tests that file_key_validation returns True if all elements pass validation, and False otherwise"""
# Test case tuples are structured as (file_key, expected_result)
test_cases_for_success_scenarios = [
Expand All @@ -90,10 +92,50 @@ def test_validate_file_key(self, mock_get_redis_client):
mock_redis.hkeys.return_value = ["FLU", "RSV"]
mock_get_redis_client.return_value = mock_redis

self.assertEqual(validate_file_key(file_key), expected_result)
self.assertEqual(validate_batch_file_key(file_key), expected_result)
mock_redis.hkeys.assert_called_with("vacc_to_diseases")
mock_redis.hget.assert_called_with("ods_code_to_supplier", ods_code)

def test_split_file_key(self, _):
"""Tests that split_file_key splits the file key into parts correctly"""
test_cases = [
(
"FLU_Vaccinations_V5_YGM41_20000101T00000001.csv",
(["FLU", "VACCINATIONS", "V5", "YGM41", "20000101T00000001"], "CSV"),
),
(
"Vaccination_Extended_Attributes_V1_5_X8E5B_20000101T00000001.csv",
(["VACCINATION", "EXTENDED", "ATTRIBUTES", "V1", "5", "X8E5B", "20000101T00000001"], "CSV"),
),
]

for file_key, expected in test_cases:
with self.subTest(f"SubTest for file key: {file_key}"):
self.assertEqual(split_file_key(file_key), expected)

def test_validate_extended_attributes_file_key(self, _):
"""Tests that validate_extended_attributes_file_key returns organization code and COVID vaccine type if all
elements pass validation, and raises an exception otherwise"""
test_cases_for_success_scenarios = [
# Valid extended attributes file key
(
"Vaccination_Extended_Attributes_v1_5_X8E5B_20000101T00000001.csv",
"X8E5B_COVID",
),
# Valid extended attributes file key with different organization code
(
"Vaccination_Extended_Attributes_v1_5_YGM41_20221231T23595999.csv",
"YGM41_COVID",
),
]

for file_key, expected_result in test_cases_for_success_scenarios:
with self.subTest(f"SubTest for file key: {file_key}"):
self.assertEqual(
validate_extended_attributes_file_key(file_key),
expected_result,
)

def test_validate_file_key_false(self, mock_get_redis_client):
"""Tests that file_key_validation returns False if elements do not pass validation"""
invalid_file_key_error_message = "Initial file validation failed: invalid file key"
Expand Down Expand Up @@ -169,7 +211,7 @@ def test_validate_file_key_false(self, mock_get_redis_client):
mock_get_redis_client.return_value = mock_redis

with self.assertRaises(InvalidFileKeyError) as context:
validate_file_key(file_key)
validate_batch_file_key(file_key)
self.assertEqual(str(context.exception), expected_result)
mock_redis.hkeys.assert_called_with("vacc_to_diseases")

Expand Down Expand Up @@ -207,6 +249,6 @@ def test_validate_file_key_invalid(self, mock_get_redis_client):
mock_get_redis_client.return_value = mock_redis

with self.assertRaises(InvalidFileKeyError) as context:
validate_file_key(file_key)
validate_batch_file_key(file_key)
self.assertEqual(str(context.exception), expected_result)
mock_redis.hkeys.assert_not_called()
Loading