1010from uuid import uuid4
1111
1212from audit_table import upsert_audit_table
13- from common .aws_s3_utils import move_file , move_file_to_external_bucket
13+ from common .aws_s3_utils import (
14+ copy_file_to_external_bucket ,
15+ delete_file ,
16+ move_file ,
17+ )
1418from common .clients import STREAM_NAME , get_s3_client , logger
1519from common .log_decorator import logging_decorator
1620from common .models .errors import UnhandledAuditTableError
1721from constants import (
1822 DPS_DESTINATION_BUCKET_NAME ,
19- DPS_DESTINATION_PREFIX ,
2023 ERROR_TYPE_TO_STATUS_CODE_MAP ,
24+ EXPECTED_BUCKET_OWNER_ACCOUNT ,
2125 EXTENDED_ATTRIBUTES_FILE_PREFIX ,
26+ EXTENDED_ATTRIBUTES_VACC_TYPE ,
2227 SOURCE_BUCKET_NAME ,
2328 FileNotProcessedReason ,
2429 FileStatus ,
@@ -56,8 +61,6 @@ def handle_record(record) -> dict:
5661 "error" : str (error ),
5762 }
5863
59- expiry_timestamp = "unknown"
60-
6164 if bucket_name != SOURCE_BUCKET_NAME :
6265 return handle_unexpected_bucket_name (bucket_name , file_key )
6366
@@ -69,10 +72,6 @@ def handle_record(record) -> dict:
6972 message = "Processing not required. Event was for a file moved to /archive or /processing"
7073 return {"statusCode" : 200 , "message" : message , "file_key" : file_key }
7174
72- # Set default values for file-specific variables
73- message_id = "Message id was not created"
74- created_at_formatted_string = "created_at_time not identified"
75-
7675 message_id = str (uuid4 ())
7776 s3_response = get_s3_client ().get_object (Bucket = bucket_name , Key = file_key )
7877 created_at_formatted_string , expiry_timestamp = get_creation_and_expiry_times (s3_response )
@@ -108,7 +107,8 @@ def handle_unexpected_bucket_name(bucket_name: str, file_key: str) -> dict:
108107 config and overarching design"""
109108 try :
110109 if file_key .startswith (EXTENDED_ATTRIBUTES_FILE_PREFIX ):
111- extended_attribute_identifier = validate_extended_attributes_file_key (file_key )
110+ organization_code = validate_extended_attributes_file_key (file_key )
111+ extended_attribute_identifier = f"{ organization_code } _{ EXTENDED_ATTRIBUTES_VACC_TYPE } "
112112 logger .error (
113113 "Unable to process file %s due to unexpected bucket name %s" ,
114114 file_key ,
@@ -157,7 +157,7 @@ def handle_unexpected_bucket_name(bucket_name: str, file_key: str) -> dict:
157157
158158
159159def handle_batch_file (
160- file_key : str , bucket_name : str , message_id : str , created_at_formatted_string : str , expiry_timestamp : str
160+ file_key : str , bucket_name : str , message_id : str , created_at_formatted_string : str , expiry_timestamp : int
161161) -> dict :
162162 """
163163 Processes a single record for batch file.
@@ -177,6 +177,7 @@ def handle_batch_file(
177177 expiry_timestamp ,
178178 queue_name ,
179179 FileStatus .QUEUED ,
180+ condition_expression = "attribute_not_exists(message_id)" , # Prevents accidental overwrites
180181 )
181182 make_and_send_sqs_message (
182183 file_key ,
@@ -239,17 +240,17 @@ def handle_batch_file(
239240
240241
241242def handle_extended_attributes_file (
242- file_key : str , bucket_name : str , message_id : str , created_at_formatted_string : str , expiry_timestamp : str
243+ file_key : str , bucket_name : str , message_id : str , created_at_formatted_string : str , expiry_timestamp : int
243244) -> dict :
244245 """
245246 Processes a single record for extended attributes file.
246247 Returns a dictionary containing information to be included in the logs.
247248 """
249+
250+ extended_attribute_identifier = None
248251 try :
249- extended_attribute_identifier = validate_extended_attributes_file_key (file_key )
250- move_file_to_external_bucket (
251- bucket_name , file_key , DPS_DESTINATION_BUCKET_NAME , f"{ DPS_DESTINATION_PREFIX } { file_key } "
252- )
252+ organization_code = validate_extended_attributes_file_key (file_key )
253+ extended_attribute_identifier = f"{ organization_code } _{ EXTENDED_ATTRIBUTES_VACC_TYPE } "
253254
254255 upsert_audit_table (
255256 message_id ,
@@ -259,6 +260,28 @@ def handle_extended_attributes_file(
259260 extended_attribute_identifier ,
260261 FileStatus .PROCESSING ,
261262 )
263+
264+ # TODO: agree the prefix with DPS
265+ dest_file_key = f"dps_destination/{ file_key } "
266+ copy_file_to_external_bucket (
267+ bucket_name ,
268+ file_key ,
269+ DPS_DESTINATION_BUCKET_NAME ,
270+ dest_file_key ,
271+ EXPECTED_BUCKET_OWNER_ACCOUNT ,
272+ EXPECTED_BUCKET_OWNER_ACCOUNT ,
273+ )
274+ delete_file (bucket_name , dest_file_key , EXPECTED_BUCKET_OWNER_ACCOUNT )
275+
276+ upsert_audit_table (
277+ message_id ,
278+ file_key ,
279+ created_at_formatted_string ,
280+ expiry_timestamp ,
281+ extended_attribute_identifier ,
282+ FileStatus .PROCESSED ,
283+ )
284+
262285 return {
263286 "statusCode" : 200 ,
264287 "message" : "Extended Attributes file successfully processed" ,
@@ -276,7 +299,13 @@ def handle_extended_attributes_file(
276299 logger .error ("Error processing file '%s': %s" , file_key , str (error ))
277300
278301 file_status = get_file_status_for_error (error )
279- extended_attribute_identifier = validate_extended_attributes_file_key (file_key )
302+
303+ # NB if we got InvalidFileKeyError we won't have a valid queue name
304+ if not extended_attribute_identifier :
305+ extended_attribute_identifier = "unknown"
306+
307+ # Move file to archive
308+ move_file (bucket_name , file_key , f"archive/{ file_key } " )
280309
281310 upsert_audit_table (
282311 message_id ,
0 commit comments