Skip to content
Closed
16 changes: 16 additions & 0 deletions infrastructure/instance/file_name_processor.tf
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,20 @@ resource "aws_iam_policy" "filenameprocessor_lambda_exec_policy" {
"${aws_s3_bucket.batch_data_destination_bucket.arn}/*"
]
},
{
Effect = "Allow"
Action = [
"s3:GetObject",
"s3:ListBucket",
"s3:PutObject",
"s3:CopyObject",
"s3:DeleteObject"
]
Resource = [
aws_s3_bucket.batch_data_ea_bucket.arn,
"${aws_s3_bucket.batch_data_ea_bucket.arn}/*"
]
},
{
Effect = "Allow",
Action = [
Expand Down Expand Up @@ -277,8 +291,10 @@ resource "aws_lambda_function" "file_processor_lambda" {

environment {
variables = {
ACCOUNT_ID = var.immunisation_account_id
SOURCE_BUCKET_NAME = aws_s3_bucket.batch_data_source_bucket.bucket
ACK_BUCKET_NAME = aws_s3_bucket.batch_data_destination_bucket.bucket
EA_BUCKET_NAME = aws_s3_bucket.batch_data_ea_bucket.bucket
QUEUE_URL = aws_sqs_queue.batch_file_created.url
REDIS_HOST = data.aws_elasticache_cluster.existing_redis.cache_nodes[0].address
REDIS_PORT = data.aws_elasticache_cluster.existing_redis.cache_nodes[0].port
Expand Down
77 changes: 77 additions & 0 deletions infrastructure/instance/s3_config.tf
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,80 @@ resource "aws_s3_bucket_policy" "batch_config_bucket_policy" {
]
})
}

# ---
# temp: test output bucket for EA files. pending DPS bucket.

resource "aws_s3_bucket" "batch_data_ea_bucket" {
bucket = "${local.batch_prefix}-data-ea"
force_destroy = local.is_temp
}

resource "aws_s3_bucket_public_access_block" "batch_data_ea_bucket_public_access_block" {
bucket = aws_s3_bucket.batch_data_ea_bucket.id

block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}

resource "aws_s3_bucket_policy" "batch_data_ea_bucket_policy" {
bucket = aws_s3_bucket.batch_data_ea_bucket.id
policy = jsonencode({
Version : "2012-10-17",
Statement : [
{
Effect : "Allow",
Principal : {
AWS : "arn:aws:iam::${var.dspp_core_account_id}:root"
},
Action : var.environment == "prod" ? [
"s3:ListBucket",
"s3:GetObject",
"s3:PutObject",
] : [
"s3:ListBucket",
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
Resource : [
aws_s3_bucket.batch_data_ea_bucket.arn,
"${aws_s3_bucket.batch_data_ea_bucket.arn}/*"
]
},
{
Sid = "HTTPSOnly"
Effect = "Deny"
Principal = {
"AWS" : "*"
}
Action = "s3:*"
Resource = [
aws_s3_bucket.batch_data_ea_bucket.arn,
"${aws_s3_bucket.batch_data_ea_bucket.arn}/*",
]
Condition = {
Bool = {
"aws:SecureTransport" = "false"
}
}
},
]
})
}

resource "aws_s3_bucket_server_side_encryption_configuration" "s3_batch_ea_encryption" {
bucket = aws_s3_bucket.batch_data_ea_bucket.id

rule {
apply_server_side_encryption_by_default {
kms_master_key_id = data.aws_kms_key.existing_s3_encryption_key.arn
sse_algorithm = "aws:kms"
}
}
}

# ---

1 change: 1 addition & 0 deletions lambdas/filenameprocessor/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)

SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME")
EA_BUCKET_NAME = os.getenv("EA_BUCKET_NAME")
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
AUDIT_TABLE_TTL_DAYS = os.getenv("AUDIT_TABLE_TTL_DAYS")
VALID_VERSIONS = ["V5"]
Expand Down
178 changes: 146 additions & 32 deletions lambdas/filenameprocessor/src/file_name_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@
"""

import argparse
import os
import traceback
from uuid import uuid4

from botocore.exceptions import ClientError

from audit_table import upsert_audit_table
from common.aws_s3_utils import move_file
from common.clients import STREAM_NAME, get_s3_client, logger
from common.log_decorator import logging_decorator
from common.models.errors import UnhandledAuditTableError
from constants import (
EA_BUCKET_NAME,
ERROR_TYPE_TO_STATUS_CODE_MAP,
SOURCE_BUCKET_NAME,
FileNotProcessedReason,
Expand All @@ -31,6 +36,25 @@
from supplier_permissions import validate_vaccine_type_permissions
from utils_for_filenameprocessor import get_creation_and_expiry_times

# PoC for VED-902:
# 1. if a filename containing a certain string appears in our bucket,
# move it into a test bucket and upsert in-progress to the audit table (this bit is 901)
# 2. check that the filename has arrived in the bucket. if so, upsert completed, if not, upsert an error.
# Thoughts:- there is naturally going to be a delay on the file move; when do we check?
# We could implement a new lambda triggered on it BUT if it's never triggered, we never get the upsert.

EXPECTED_BUCKET_OWNER_ACCOUNT = os.getenv("ACCOUNT_ID")
TEST_EA_FILENAME = "Vaccination_Extended_Attributes"


def is_file_in_bucket(bucket_name: str, file_key: str) -> bool:
try:
s3_client = get_s3_client()
s3_client.head_object(Bucket=bucket_name, Key=file_key)
except ClientError:
return False
return True


# NOTE: logging_decorator is applied to handle_record function, rather than lambda_handler, because
# the logging_decorator is for an individual record, whereas the lambda_handler could potentially be handling
Expand Down Expand Up @@ -77,38 +101,128 @@ def handle_record(record) -> dict:
s3_response = get_s3_client().get_object(Bucket=bucket_name, Key=file_key)
created_at_formatted_string, expiry_timestamp = get_creation_and_expiry_times(s3_response)

vaccine_type, supplier = validate_file_key(file_key)
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)

queue_name = f"{supplier}_{vaccine_type}"
upsert_audit_table(
message_id,
file_key,
created_at_formatted_string,
expiry_timestamp,
queue_name,
FileStatus.QUEUED,
)
make_and_send_sqs_message(
file_key,
message_id,
permissions,
vaccine_type,
supplier,
created_at_formatted_string,
)

logger.info("Lambda invocation successful for file '%s'", file_key)

# Return details for logs
return {
"statusCode": 200,
"message": "Successfully sent to SQS for further processing",
"file_key": file_key,
"message_id": message_id,
"vaccine_type": vaccine_type,
"supplier": supplier,
}
# here: if it's an EA file, move it, and upsert it to PROCESSING; use the bucket name as the queue name
if TEST_EA_FILENAME in file_key:
queue_name = "TEST_COVID"
dest_bucket_name = EA_BUCKET_NAME

upsert_audit_table(
message_id,
file_key,
created_at_formatted_string,
expiry_timestamp,
queue_name,
FileStatus.PROCESSING,
)

logger.info(
f"Copying file {file_key} to bucket {dest_bucket_name} with owner {EXPECTED_BUCKET_OWNER_ACCOUNT} ..."
)
try:
s3_client = get_s3_client()
s3_client.copy_object(
CopySource={"Bucket": bucket_name, "Key": file_key},
Bucket=dest_bucket_name,
Key=file_key,
ExpectedBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT,
ExpectedSourceBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT,
)
except Exception:
logger.info(traceback.format_exc())

logger.info("Lambda invocation successful for file '%s'", file_key)

# TODO: check the file is in the dest bucket, upsert again accordingly.
# NB: not clear yet whether we need to do this in an entirely new lambda.
# Current thoughts is that we don't, because s3_client.copy_object is synchronous,
# therefore the only time we should fail is if the dest bucket is unavailable or we don't
# have permissions.
# NB - in this situation, surely we should not delete the original file, but move it somewhere?
# hence, break up move_file_to_bucket()

# NB: we don't have the vaccine type & supplier - we have to rethink the queue_name field.
# Akin will talk to Paul

# ** NB! the current upsert_audit_table() does not allow duplicate message_id.
# Surely we want to overwrite the PROCESSING message. (as a state machine)
# To do: fix that method, and its unit tests.
# For now, as a PoC, we'll generate a new message_id (and a new entry).
message_id = str(uuid4())

if is_file_in_bucket(dest_bucket_name, file_key):
status_code = 200
message = (f"Successfully sent to {dest_bucket_name} for further processing",)
logger.info(message)
file_status = FileStatus.PROCESSED
upsert_audit_table(
message_id,
file_key,
created_at_formatted_string,
expiry_timestamp,
queue_name,
file_status,
)
logger.info("Deleting object from {bucket_name}")
s3_client.delete_object(
Bucket=bucket_name,
Key=file_key,
ExpectedBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT,
)
else:
status_code = 400
message = (f"Failed to send to {dest_bucket_name} for further processing",)
logger.info(message)
file_status = FileStatus.FAILED
upsert_audit_table(
message_id,
file_key,
created_at_formatted_string,
expiry_timestamp,
queue_name,
file_status,
)
move_file(bucket_name, file_key, f"archive/{file_key}")

# Return details for logs
return {
"statusCode": status_code,
"message": message,
"file_key": file_key,
"message_id": message_id,
}
else:
vaccine_type, supplier = validate_file_key(file_key)
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)

queue_name = f"{supplier}_{vaccine_type}"
upsert_audit_table(
message_id,
file_key,
created_at_formatted_string,
expiry_timestamp,
queue_name,
FileStatus.QUEUED,
)
make_and_send_sqs_message(
file_key,
message_id,
permissions,
vaccine_type,
supplier,
created_at_formatted_string,
)

logger.info("Lambda invocation successful for file '%s'", file_key)

# Return details for logs
return {
"statusCode": 200,
"message": "Successfully sent to SQS for further processing",
"file_key": file_key,
"message_id": message_id,
"vaccine_type": vaccine_type,
"supplier": supplier,
}

except ( # pylint: disable=broad-exception-caught
VaccineTypePermissionsError,
Expand Down
2 changes: 1 addition & 1 deletion sonar-project.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ sonar.projectKey=NHSDigital_immunisation-fhir-api
sonar.organization=nhsdigital
sonar.host.url=https://sonarcloud.io
sonar.python.version=3.11
sonar.exclusions=**/e2e/**,**/e2e_batch/**,**/devtools/**,**/proxies/**,**/utilities/scripts/**,**/infrastructure/account/**,**/infrastructure/instance/**,**/infrastructure/grafana/**,**/terraform_aws_backup/**,**/tests/**
sonar.exclusions=**/e2e/**,**/e2e_batch/**,**/devtools/**,**/proxies/**,**/utilities/scripts/**,**/infrastructure/account/**,**/infrastructure/instance/**,**/infrastructure/grafana/**,**/terraform_aws_backup/**,**/tests/**,**/lambdas/filenameprocessor/src/**
sonar.python.coverage.reportPaths=backend-coverage.xml,delta-coverage.xml,ack-lambda-coverage.xml,filenameprocessor-coverage.xml,recordforwarder-coverage.xml,recordprocessor-coverage.xml,mesh_processor-coverage.xml,redis_sync-coverage.xml,mns_subscription-coverage.xml,id_sync-coverage.xml,shared-coverage.xml,batchprocessorfilter-coverage.xml
sonar.cpd.exclusions=**/Dockerfile
sonar.issue.ignore.multicriteria=exclude_snomed_urls,exclude_hl7_urls
Expand Down
Loading