Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions infrastructure/instance/file_name_processor.tf
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ resource "aws_lambda_function" "file_processor_lambda" {

environment {
variables = {
ACCOUNT_ID = var.immunisation_account_id
SOURCE_BUCKET_NAME = aws_s3_bucket.batch_data_source_bucket.bucket
ACK_BUCKET_NAME = aws_s3_bucket.batch_data_destination_bucket.bucket
QUEUE_URL = aws_sqs_queue.batch_file_created.url
Expand Down
17 changes: 12 additions & 5 deletions lambdas/filenameprocessor/src/audit_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def upsert_audit_table(
queue_name: str,
file_status: str,
error_details: Optional[str] = None,
condition_expression: Optional[str] = None,
) -> None:
"""
Updates the audit table with the file details
Expand All @@ -33,11 +34,17 @@ def upsert_audit_table(

try:
# Add to the audit table (regardless of whether it is a duplicate)
dynamodb_client.put_item(
TableName=AUDIT_TABLE_NAME,
Item=audit_item,
ConditionExpression="attribute_not_exists(message_id)", # Prevents accidental overwrites
)
if not condition_expression:
dynamodb_client.put_item(
TableName=AUDIT_TABLE_NAME,
Item=audit_item,
)
else:
dynamodb_client.put_item(
TableName=AUDIT_TABLE_NAME,
Item=audit_item,
ConditionExpression=condition_expression,
)
logger.info(
"%s file, with message id %s, successfully added to audit table",
file_key,
Expand Down
2 changes: 2 additions & 0 deletions lambdas/filenameprocessor/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
)

SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME")
DPS_DESTINATION_BUCKET_NAME = os.getenv("ACK_BUCKET_NAME")
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
AUDIT_TABLE_TTL_DAYS = os.getenv("AUDIT_TABLE_TTL_DAYS")
VALID_VERSIONS = ["V5"]

VACCINE_TYPE_TO_DISEASES_HASH_KEY = "vacc_to_diseases"
ODS_CODE_TO_SUPPLIER_SYSTEM_HASH_KEY = "ods_code_to_supplier"
EXTENDED_ATTRIBUTES_PREFIXES = "Vaccination_Extended_Attributes"

ERROR_TYPE_TO_STATUS_CODE_MAP = {
VaccineTypePermissionsError: 403,
Expand Down
214 changes: 174 additions & 40 deletions lambdas/filenameprocessor/src/file_name_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,27 @@
import argparse
from uuid import uuid4

from botocore.exceptions import ClientError

from audit_table import upsert_audit_table
from common.aws_s3_utils import move_file
from common.aws_s3_utils import (
copy_file_outside_bucket,
delete_file,
is_file_in_bucket,
move_file,
)
from common.clients import STREAM_NAME, get_s3_client, logger
from common.log_decorator import logging_decorator
from common.models.errors import UnhandledAuditTableError
from constants import (
DPS_DESTINATION_BUCKET_NAME,
ERROR_TYPE_TO_STATUS_CODE_MAP,
EXTENDED_ATTRIBUTES_PREFIXES,
SOURCE_BUCKET_NAME,
FileNotProcessedReason,
FileStatus,
)
from file_validation import is_file_in_directory_root, validate_file_key
from file_validation import is_file_in_directory_root, validate_batch_file_key, validate_extended_attributes_file_key
from make_and_upload_ack_file import make_and_upload_the_ack_file
from models.errors import (
InvalidFileKeyError,
Expand Down Expand Up @@ -53,8 +62,6 @@ def handle_record(record) -> dict:
"error": str(error),
}

vaccine_type = "unknown"
supplier = "unknown"
expiry_timestamp = "unknown"

if bucket_name != SOURCE_BUCKET_NAME:
Expand All @@ -72,22 +79,109 @@ def handle_record(record) -> dict:
message_id = "Message id was not created"
created_at_formatted_string = "created_at_time not identified"

message_id = str(uuid4())
s3_response = get_s3_client().get_object(Bucket=bucket_name, Key=file_key)
created_at_formatted_string, expiry_timestamp = get_creation_and_expiry_times(s3_response)

if file_key.startswith(EXTENDED_ATTRIBUTES_PREFIXES):
return handle_extended_attributes_file(
file_key,
bucket_name,
message_id,
created_at_formatted_string,
expiry_timestamp,
)
else:
return handle_batch_file(
file_key,
bucket_name,
message_id,
created_at_formatted_string,
expiry_timestamp,
)


def get_file_status_for_error(error: Exception) -> str:
"""Creates a file status based on the type of error that was thrown"""
if isinstance(error, VaccineTypePermissionsError):
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"

return FileStatus.FAILED


def handle_unexpected_bucket_name(bucket_name: str, file_key: str) -> dict:
"""Handles scenario where Lambda was not invoked by the data-sources bucket. Should not occur due to terraform
config and overarching design"""
try:
message_id = str(uuid4())
s3_response = get_s3_client().get_object(Bucket=bucket_name, Key=file_key)
created_at_formatted_string, expiry_timestamp = get_creation_and_expiry_times(s3_response)
if file_key.startswith(EXTENDED_ATTRIBUTES_PREFIXES):
extended_attribute_identifier = validate_extended_attributes_file_key(file_key)
logger.error(
"Unable to process file %s due to unexpected bucket name %s",
file_key,
bucket_name,
)
message = f"Failed to process file due to unexpected bucket name {bucket_name}"
return {
"statusCode": 500,
"message": message,
"file_key": file_key,
"vaccine_supplier_info": extended_attribute_identifier,
}
else:
vaccine_type, supplier = validate_batch_file_key(file_key)
logger.error(
"Unable to process file %s due to unexpected bucket name %s",
file_key,
bucket_name,
)
message = f"Failed to process file due to unexpected bucket name {bucket_name}"

return {
"statusCode": 500,
"message": message,
"file_key": file_key,
"vaccine_type": vaccine_type,
"supplier": supplier,
}

except Exception as error:
logger.error(
"Unable to process file due to unexpected bucket name %s and file key %s",
bucket_name,
file_key,
)
message = f"Failed to process file due to unexpected bucket name {bucket_name} and file key {file_key}"

return {
"statusCode": 500,
"message": message,
"file_key": file_key,
"vaccine_type": "unknown",
"supplier": "unknown",
"error": str(error),
}

vaccine_type, supplier = validate_file_key(file_key)
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)

def handle_batch_file(file_key, bucket_name, message_id, created_at_formatted_string, expiry_timestamp) -> dict:
"""
Processes a single record for batch file.
Returns a dictionary containing information to be included in the logs.
"""
vaccine_type = "unknown"
supplier = "unknown"
try:
vaccine_type, supplier = validate_batch_file_key(file_key)
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)
queue_name = f"{supplier}_{vaccine_type}"

upsert_audit_table(
message_id,
file_key,
created_at_formatted_string,
expiry_timestamp,
queue_name,
FileStatus.QUEUED,
condition_expression="attribute_not_exists(message_id)", # Prevents accidental overwrites
)
make_and_send_sqs_message(
file_key,
Expand All @@ -97,19 +191,17 @@ def handle_record(record) -> dict:
supplier,
created_at_formatted_string,
)

logger.info("Lambda invocation successful for file '%s'", file_key)

# Return details for logs
return {
"statusCode": 200,
"message": "Successfully sent to SQS for further processing",
"message": "Batch file successfully processed",
"file_key": file_key,
"message_id": message_id,
"vaccine_type": vaccine_type,
"supplier": supplier,
"queue_name": queue_name,
}

except ( # pylint: disable=broad-exception-caught
VaccineTypePermissionsError,
InvalidFileKeyError,
Expand All @@ -119,8 +211,8 @@ def handle_record(record) -> dict:
) as error:
logger.error("Error processing file '%s': %s", file_key, str(error))

queue_name = f"{supplier}_{vaccine_type}"
file_status = get_file_status_for_error(error)
queue_name = f"{supplier}_{vaccine_type}"

upsert_audit_table(
message_id,
Expand Down Expand Up @@ -151,48 +243,90 @@ def handle_record(record) -> dict:
}


def get_file_status_for_error(error: Exception) -> str:
"""Creates a file status based on the type of error that was thrown"""
if isinstance(error, VaccineTypePermissionsError):
return f"{FileStatus.NOT_PROCESSED} - {FileNotProcessedReason.UNAUTHORISED}"

return FileStatus.FAILED
def handle_extended_attributes_file(
file_key, bucket_name, message_id, created_at_formatted_string, expiry_timestamp
) -> dict:
"""
Processes a single record for extended attributes file.
Returns a dictionary containing information to be included in the logs.
"""

# here: the sequence of events should be
# 1. upsert 'processing'
# 2. move the file to the dest bucket
# 3. check the file is present in the dest bucket
# 4. if it is, delete it from the src bucket, upsert 'processed'
# 5. if it isn't, move it to the archive/ folder, upsert 'failed'
# NB for this to work we have to retool upsert so it accepts overwrites, i.e. ignore the ConditionExpression

def handle_unexpected_bucket_name(bucket_name: str, file_key: str) -> dict:
"""Handles scenario where Lambda was not invoked by the data-sources bucket. Should not occur due to terraform
config and overarching design"""
try:
vaccine_type, supplier = validate_file_key(file_key)
logger.error(
"Unable to process file %s due to unexpected bucket name %s",
extended_attribute_identifier = validate_extended_attributes_file_key(file_key)
queue_name = extended_attribute_identifier

upsert_audit_table(
message_id,
file_key,
bucket_name,
created_at_formatted_string,
expiry_timestamp,
queue_name,
FileStatus.PROCESSING,
)

copy_file_outside_bucket(bucket_name, file_key, DPS_DESTINATION_BUCKET_NAME, f"archive/{file_key}")
is_file_in_bucket(DPS_DESTINATION_BUCKET_NAME, file_key)
delete_file(DPS_DESTINATION_BUCKET_NAME, file_key)

upsert_audit_table(
message_id,
file_key,
created_at_formatted_string,
expiry_timestamp,
queue_name,
FileStatus.PROCESSED,
)
message = f"Failed to process file due to unexpected bucket name {bucket_name}"

return {
"statusCode": 500,
"message": message,
"statusCode": 200,
"message": "Extended Attributes file successfully processed",
"file_key": file_key,
"vaccine_type": vaccine_type,
"supplier": supplier,
"message_id": message_id,
"queue_name": queue_name,
}

except Exception as error:
logger.error(
"Unable to process file due to unexpected bucket name %s and file key %s",
bucket_name,
except ( # pylint: disable=broad-exception-caught
ClientError,
VaccineTypePermissionsError,
InvalidFileKeyError,
UnhandledAuditTableError,
UnhandledSqsError,
Exception,
) as error:
logger.error("Error processing file '%s': %s", file_key, str(error))

file_status = get_file_status_for_error(error)

# NB if we got InvalidFileKeyError we won't have a valid queue name
if not queue_name:
queue_name = "unknown"

# Move file to archive
move_file(bucket_name, file_key, f"archive/{file_key}")

upsert_audit_table(
message_id,
file_key,
created_at_formatted_string,
expiry_timestamp,
queue_name,
file_status,
error_details=str(error),
)
message = f"Failed to process file due to unexpected bucket name {bucket_name} and file key {file_key}"

return {
"statusCode": 500,
"message": message,
"message": f"Failed to process extended attributes file {file_key} from bucket {bucket_name}",
"file_key": file_key,
"vaccine_type": "unknown",
"supplier": "unknown",
"message_id": message_id,
"error": str(error),
}

Expand Down
Loading
Loading