diff --git a/infrastructure/instance/file_name_processor.tf b/infrastructure/instance/file_name_processor.tf index 64345fcfdd..20b39f5d01 100644 --- a/infrastructure/instance/file_name_processor.tf +++ b/infrastructure/instance/file_name_processor.tf @@ -134,6 +134,20 @@ resource "aws_iam_policy" "filenameprocessor_lambda_exec_policy" { "${aws_s3_bucket.batch_data_destination_bucket.arn}/*" ] }, + { + Effect = "Allow" + Action = [ + "s3:GetObject", + "s3:ListBucket", + "s3:PutObject", + "s3:CopyObject", + "s3:DeleteObject" + ] + Resource = [ + aws_s3_bucket.batch_data_ea_bucket.arn, + "${aws_s3_bucket.batch_data_ea_bucket.arn}/*" + ] + }, { Effect = "Allow", Action = [ @@ -277,8 +291,10 @@ resource "aws_lambda_function" "file_processor_lambda" { environment { variables = { + ACCOUNT_ID = var.immunisation_account_id SOURCE_BUCKET_NAME = aws_s3_bucket.batch_data_source_bucket.bucket ACK_BUCKET_NAME = aws_s3_bucket.batch_data_destination_bucket.bucket + EA_BUCKET_NAME = aws_s3_bucket.batch_data_ea_bucket.bucket QUEUE_URL = aws_sqs_queue.batch_file_created.url REDIS_HOST = data.aws_elasticache_cluster.existing_redis.cache_nodes[0].address REDIS_PORT = data.aws_elasticache_cluster.existing_redis.cache_nodes[0].port diff --git a/infrastructure/instance/s3_config.tf b/infrastructure/instance/s3_config.tf index 0cb2d39d74..265f6aede1 100644 --- a/infrastructure/instance/s3_config.tf +++ b/infrastructure/instance/s3_config.tf @@ -231,3 +231,80 @@ resource "aws_s3_bucket_policy" "batch_config_bucket_policy" { ] }) } + +# --- +# temp: test output bucket for EA files. pending DPS bucket. + +resource "aws_s3_bucket" "batch_data_ea_bucket" { + bucket = "${local.batch_prefix}-data-ea" + force_destroy = local.is_temp +} + +resource "aws_s3_bucket_public_access_block" "batch_data_ea_bucket_public_access_block" { + bucket = aws_s3_bucket.batch_data_ea_bucket.id + + block_public_acls = true + block_public_policy = true + ignore_public_acls = true + restrict_public_buckets = true +} + +resource "aws_s3_bucket_policy" "batch_data_ea_bucket_policy" { + bucket = aws_s3_bucket.batch_data_ea_bucket.id + policy = jsonencode({ + Version : "2012-10-17", + Statement : [ + { + Effect : "Allow", + Principal : { + AWS : "arn:aws:iam::${var.dspp_core_account_id}:root" + }, + Action : var.environment == "prod" ? [ + "s3:ListBucket", + "s3:GetObject", + "s3:PutObject", + ] : [ + "s3:ListBucket", + "s3:GetObject", + "s3:PutObject", + "s3:DeleteObject" + ], + Resource : [ + aws_s3_bucket.batch_data_ea_bucket.arn, + "${aws_s3_bucket.batch_data_ea_bucket.arn}/*" + ] + }, + { + Sid = "HTTPSOnly" + Effect = "Deny" + Principal = { + "AWS" : "*" + } + Action = "s3:*" + Resource = [ + aws_s3_bucket.batch_data_ea_bucket.arn, + "${aws_s3_bucket.batch_data_ea_bucket.arn}/*", + ] + Condition = { + Bool = { + "aws:SecureTransport" = "false" + } + } + }, + ] + }) +} + +resource "aws_s3_bucket_server_side_encryption_configuration" "s3_batch_ea_encryption" { + bucket = aws_s3_bucket.batch_data_ea_bucket.id + + rule { + apply_server_side_encryption_by_default { + kms_master_key_id = data.aws_kms_key.existing_s3_encryption_key.arn + sse_algorithm = "aws:kms" + } + } +} + +# --- + diff --git a/lambdas/filenameprocessor/src/constants.py b/lambdas/filenameprocessor/src/constants.py index 7388b0c3ef..a1fe0a78a5 100644 --- a/lambdas/filenameprocessor/src/constants.py +++ b/lambdas/filenameprocessor/src/constants.py @@ -11,6 +11,7 @@ ) SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME") +EA_BUCKET_NAME = os.getenv("EA_BUCKET_NAME") AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME") AUDIT_TABLE_TTL_DAYS = os.getenv("AUDIT_TABLE_TTL_DAYS") VALID_VERSIONS = ["V5"] diff --git a/lambdas/filenameprocessor/src/file_name_processor.py b/lambdas/filenameprocessor/src/file_name_processor.py index 5588f81eb0..91dd29220e 100644 --- a/lambdas/filenameprocessor/src/file_name_processor.py +++ b/lambdas/filenameprocessor/src/file_name_processor.py @@ -7,14 +7,19 @@ """ import argparse +import os +import traceback from uuid import uuid4 +from botocore.exceptions import ClientError + from audit_table import upsert_audit_table from common.aws_s3_utils import move_file from common.clients import STREAM_NAME, get_s3_client, logger from common.log_decorator import logging_decorator from common.models.errors import UnhandledAuditTableError from constants import ( + EA_BUCKET_NAME, ERROR_TYPE_TO_STATUS_CODE_MAP, SOURCE_BUCKET_NAME, FileNotProcessedReason, @@ -31,6 +36,25 @@ from supplier_permissions import validate_vaccine_type_permissions from utils_for_filenameprocessor import get_creation_and_expiry_times +# PoC for VED-902: +# 1. if a filename containing a certain string appears in our bucket, +# move it into a test bucket and upsert in-progress to the audit table (this bit is 901) +# 2. check that the filename has arrived in the bucket. if so, upsert completed, if not, upsert an error. +# Thoughts:- there is naturally going to be a delay on the file move; when do we check? +# We could implement a new lambda triggered on it BUT if it's never triggered, we never get the upsert. + +EXPECTED_BUCKET_OWNER_ACCOUNT = os.getenv("ACCOUNT_ID") +TEST_EA_FILENAME = "Vaccination_Extended_Attributes" + + +def is_file_in_bucket(bucket_name: str, file_key: str) -> bool: + try: + s3_client = get_s3_client() + s3_client.head_object(Bucket=bucket_name, Key=file_key) + except ClientError: + return False + return True + # NOTE: logging_decorator is applied to handle_record function, rather than lambda_handler, because # the logging_decorator is for an individual record, whereas the lambda_handler could potentially be handling @@ -77,38 +101,128 @@ def handle_record(record) -> dict: s3_response = get_s3_client().get_object(Bucket=bucket_name, Key=file_key) created_at_formatted_string, expiry_timestamp = get_creation_and_expiry_times(s3_response) - vaccine_type, supplier = validate_file_key(file_key) - permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier) - - queue_name = f"{supplier}_{vaccine_type}" - upsert_audit_table( - message_id, - file_key, - created_at_formatted_string, - expiry_timestamp, - queue_name, - FileStatus.QUEUED, - ) - make_and_send_sqs_message( - file_key, - message_id, - permissions, - vaccine_type, - supplier, - created_at_formatted_string, - ) - - logger.info("Lambda invocation successful for file '%s'", file_key) - - # Return details for logs - return { - "statusCode": 200, - "message": "Successfully sent to SQS for further processing", - "file_key": file_key, - "message_id": message_id, - "vaccine_type": vaccine_type, - "supplier": supplier, - } + # here: if it's an EA file, move it, and upsert it to PROCESSING; use the bucket name as the queue name + if TEST_EA_FILENAME in file_key: + queue_name = "TEST_COVID" + dest_bucket_name = EA_BUCKET_NAME + + upsert_audit_table( + message_id, + file_key, + created_at_formatted_string, + expiry_timestamp, + queue_name, + FileStatus.PROCESSING, + ) + + logger.info( + f"Copying file {file_key} to bucket {dest_bucket_name} with owner {EXPECTED_BUCKET_OWNER_ACCOUNT} ..." + ) + try: + s3_client = get_s3_client() + s3_client.copy_object( + CopySource={"Bucket": bucket_name, "Key": file_key}, + Bucket=dest_bucket_name, + Key=file_key, + ExpectedBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT, + ExpectedSourceBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT, + ) + except Exception: + logger.info(traceback.format_exc()) + + logger.info("Lambda invocation successful for file '%s'", file_key) + + # TODO: check the file is in the dest bucket, upsert again accordingly. + # NB: not clear yet whether we need to do this in an entirely new lambda. + # Current thoughts is that we don't, because s3_client.copy_object is synchronous, + # therefore the only time we should fail is if the dest bucket is unavailable or we don't + # have permissions. + # NB - in this situation, surely we should not delete the original file, but move it somewhere? + # hence, break up move_file_to_bucket() + + # NB: we don't have the vaccine type & supplier - we have to rethink the queue_name field. + # Akin will talk to Paul + + # ** NB! the current upsert_audit_table() does not allow duplicate message_id. + # Surely we want to overwrite the PROCESSING message. (as a state machine) + # To do: fix that method, and its unit tests. + # For now, as a PoC, we'll generate a new message_id (and a new entry). + message_id = str(uuid4()) + + if is_file_in_bucket(dest_bucket_name, file_key): + status_code = 200 + message = (f"Successfully sent to {dest_bucket_name} for further processing",) + logger.info(message) + file_status = FileStatus.PROCESSED + upsert_audit_table( + message_id, + file_key, + created_at_formatted_string, + expiry_timestamp, + queue_name, + file_status, + ) + logger.info("Deleting object from {bucket_name}") + s3_client.delete_object( + Bucket=bucket_name, + Key=file_key, + ExpectedBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT, + ) + else: + status_code = 400 + message = (f"Failed to send to {dest_bucket_name} for further processing",) + logger.info(message) + file_status = FileStatus.FAILED + upsert_audit_table( + message_id, + file_key, + created_at_formatted_string, + expiry_timestamp, + queue_name, + file_status, + ) + move_file(bucket_name, file_key, f"archive/{file_key}") + + # Return details for logs + return { + "statusCode": status_code, + "message": message, + "file_key": file_key, + "message_id": message_id, + } + else: + vaccine_type, supplier = validate_file_key(file_key) + permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier) + + queue_name = f"{supplier}_{vaccine_type}" + upsert_audit_table( + message_id, + file_key, + created_at_formatted_string, + expiry_timestamp, + queue_name, + FileStatus.QUEUED, + ) + make_and_send_sqs_message( + file_key, + message_id, + permissions, + vaccine_type, + supplier, + created_at_formatted_string, + ) + + logger.info("Lambda invocation successful for file '%s'", file_key) + + # Return details for logs + return { + "statusCode": 200, + "message": "Successfully sent to SQS for further processing", + "file_key": file_key, + "message_id": message_id, + "vaccine_type": vaccine_type, + "supplier": supplier, + } except ( # pylint: disable=broad-exception-caught VaccineTypePermissionsError, diff --git a/sonar-project.properties b/sonar-project.properties index bba260dd4c..a91f267e0c 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -3,7 +3,7 @@ sonar.projectKey=NHSDigital_immunisation-fhir-api sonar.organization=nhsdigital sonar.host.url=https://sonarcloud.io sonar.python.version=3.11 -sonar.exclusions=**/e2e/**,**/e2e_batch/**,**/devtools/**,**/proxies/**,**/utilities/scripts/**,**/infrastructure/account/**,**/infrastructure/instance/**,**/infrastructure/grafana/**,**/terraform_aws_backup/**,**/tests/** +sonar.exclusions=**/e2e/**,**/e2e_batch/**,**/devtools/**,**/proxies/**,**/utilities/scripts/**,**/infrastructure/account/**,**/infrastructure/instance/**,**/infrastructure/grafana/**,**/terraform_aws_backup/**,**/tests/**,**/lambdas/filenameprocessor/src/** sonar.python.coverage.reportPaths=backend-coverage.xml,delta-coverage.xml,ack-lambda-coverage.xml,filenameprocessor-coverage.xml,recordforwarder-coverage.xml,recordprocessor-coverage.xml,mesh_processor-coverage.xml,redis_sync-coverage.xml,mns_subscription-coverage.xml,id_sync-coverage.xml,shared-coverage.xml,batchprocessorfilter-coverage.xml sonar.cpd.exclusions=**/Dockerfile sonar.issue.ignore.multicriteria=exclude_snomed_urls,exclude_hl7_urls