99from datetime import datetime , timezone
1010from pydantic import ValidationError
1111from mesh_download .errors import MeshMessageNotFound
12+ from mesh_download .document_store import DocumentAlreadyExistsError
1213
1314
1415def setup_mocks ():
@@ -19,6 +20,7 @@ def setup_mocks():
1920 # Set up default config attributes
2021 config .mesh_client = Mock ()
2122 config .download_metric = Mock ()
23+ config .duplicate_download_metric = Mock ()
2224 config .s3_client = Mock ()
2325 config .environment = 'development'
2426 config .transactional_data_bucket = 'test-pii-bucket'
@@ -48,9 +50,9 @@ def create_valid_cloud_event():
4850 'traceparent' : '00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01' ,
4951 'dataschema' : 'https://notify.nhs.uk/cloudevents/schemas/digital-letters/2025-10-draft/data/digital-letters-mesh-inbox-message-received-data.schema.json' ,
5052 'data' : {
51- 'meshMessageId' : 'test_message_123 ' ,
52- 'senderId' : 'TEST_SENDER ' ,
53- 'messageReference' : 'ref_001 '
53+ 'meshMessageId' : 'test-message-123 ' ,
54+ 'senderId' : 'TEST-SENDER ' ,
55+ 'messageReference' : 'ref-001 '
5456 }
5557 }
5658
@@ -152,6 +154,7 @@ def test_processor_initialization_calls_mesh_handshake(self):
152154 log = log ,
153155 mesh_client = config .mesh_client ,
154156 download_metric = config .download_metric ,
157+ duplicate_download_metric = config .duplicate_download_metric ,
155158 document_store = document_store ,
156159 event_publisher = event_publisher
157160 )
@@ -168,7 +171,7 @@ def test_process_sqs_message_success(self, mock_datetime):
168171 fixed_time = datetime (2025 , 11 , 19 , 15 , 30 , 45 , tzinfo = timezone .utc )
169172 mock_datetime .now .return_value = fixed_time
170173
171- document_store .store_document .return_value = 'document-reference/SENDER_001_ref_001 '
174+ document_store .store_document .return_value = 'document-reference/SENDER-001/ref-001_test-message-123 '
172175
173176 event_publisher .send_events .return_value = []
174177
@@ -177,6 +180,7 @@ def test_process_sqs_message_success(self, mock_datetime):
177180 log = log ,
178181 mesh_client = config .mesh_client ,
179182 download_metric = config .download_metric ,
183+ duplicate_download_metric = config .duplicate_download_metric ,
180184 document_store = document_store ,
181185 event_publisher = event_publisher
182186 )
@@ -186,15 +190,17 @@ def test_process_sqs_message_success(self, mock_datetime):
186190
187191 sqs_record = create_sqs_record ()
188192
189- processor .process_sqs_message (sqs_record )
193+ outcome = processor .process_sqs_message (sqs_record )
190194
191- config .mesh_client .retrieve_message .assert_called_once_with ('test_message_123' )
195+ assert outcome == 'downloaded'
196+ config .mesh_client .retrieve_message .assert_called_once_with ('test-message-123' )
192197
193198 mesh_message .read .assert_called_once ()
194199
195200 document_store .store_document .assert_called_once_with (
196201 sender_id = 'TEST_SENDER' ,
197202 message_reference = 'ref_001' ,
203+ mesh_message_id = 'test-message-123' ,
198204 content = create_fhir_content ()
199205 )
200206
@@ -225,9 +231,9 @@ def test_process_sqs_message_success(self, mock_datetime):
225231
226232 # Verify CloudEvent data payload
227233 event_data = published_event ['data' ]
228- assert event_data ['senderId' ] == 'TEST_SENDER '
229- assert event_data ['messageReference' ] == 'ref_001 '
230- assert event_data ['messageUri' ] == 's3://test-pii-bucket/document-reference/SENDER_001_ref_001 '
234+ assert event_data ['senderId' ] == 'TEST-SENDER '
235+ assert event_data ['messageReference' ] == 'ref-001 '
236+ assert event_data ['messageUri' ] == 's3://test-pii-bucket/document-reference/SENDER-001/ref-001_test-message-123 '
231237 assert set (event_data .keys ()) == {'senderId' , 'messageReference' , 'messageUri' , 'meshMessageId' }
232238
233239 @patch ('mesh_download.processor.datetime' )
@@ -310,6 +316,7 @@ def test_process_sqs_message_validation_failure(self):
310316 log = log ,
311317 mesh_client = config .mesh_client ,
312318 download_metric = config .download_metric ,
319+ duplicate_download_metric = config .duplicate_download_metric ,
313320 document_store = document_store ,
314321 event_publisher = event_publisher
315322 )
@@ -334,6 +341,7 @@ def test_process_sqs_message_missing_mesh_message_id(self):
334341 log = log ,
335342 mesh_client = config .mesh_client ,
336343 download_metric = config .download_metric ,
344+ duplicate_download_metric = config .duplicate_download_metric ,
337345 document_store = document_store ,
338346 event_publisher = event_publisher
339347 )
@@ -361,17 +369,18 @@ def test_download_and_store_message_not_found(self):
361369 log = log ,
362370 mesh_client = config .mesh_client ,
363371 download_metric = config .download_metric ,
372+ duplicate_download_metric = config .duplicate_download_metric ,
364373 document_store = document_store ,
365374 event_publisher = event_publisher
366375 )
367376
368377 config .mesh_client .retrieve_message .return_value = None
369378 sqs_record = create_sqs_record ()
370379
371- with pytest .raises (MeshMessageNotFound , match = "MESH message with ID test_message_123 not found" ):
380+ with pytest .raises (MeshMessageNotFound , match = "MESH message with ID test-message-123 not found" ):
372381 processor .process_sqs_message (sqs_record )
373382
374- config .mesh_client .retrieve_message .assert_called_once_with ('test_message_123 ' )
383+ config .mesh_client .retrieve_message .assert_called_once_with ('test-message-123 ' )
375384 document_store .store_document .assert_not_called ()
376385 event_publisher .send_events .assert_not_called ()
377386 config .download_metric .record .assert_not_called ()
@@ -391,6 +400,7 @@ def test_document_store_failure_prevents_ack_and_raises(self):
391400 log = log ,
392401 mesh_client = config .mesh_client ,
393402 download_metric = config .download_metric ,
403+ duplicate_download_metric = config .duplicate_download_metric ,
394404 document_store = document_store ,
395405 event_publisher = event_publisher
396406 )
@@ -426,6 +436,7 @@ def test_bucket_selection_with_mesh_mock_enabled(self, mock_datetime):
426436 log = log ,
427437 mesh_client = config .mesh_client ,
428438 download_metric = config .download_metric ,
439+ duplicate_download_metric = config .duplicate_download_metric ,
429440 document_store = document_store ,
430441 event_publisher = event_publisher
431442 )
@@ -434,8 +445,9 @@ def test_bucket_selection_with_mesh_mock_enabled(self, mock_datetime):
434445 config .mesh_client .retrieve_message .return_value = mesh_message
435446 sqs_record = create_sqs_record ()
436447
437- processor .process_sqs_message (sqs_record )
448+ outcome = processor .process_sqs_message (sqs_record )
438449
450+ assert outcome == 'downloaded'
439451 # Verify event was published with PII bucket in URI
440452 event_publisher .send_events .assert_called_once ()
441453 published_events = event_publisher .send_events .call_args [0 ][0 ]
@@ -464,6 +476,7 @@ def test_bucket_selection_with_mesh_mock_disabled(self, mock_datetime):
464476 log = log ,
465477 mesh_client = config .mesh_client ,
466478 download_metric = config .download_metric ,
479+ duplicate_download_metric = config .duplicate_download_metric ,
467480 document_store = document_store ,
468481 event_publisher = event_publisher
469482 )
@@ -472,10 +485,52 @@ def test_bucket_selection_with_mesh_mock_disabled(self, mock_datetime):
472485 config .mesh_client .retrieve_message .return_value = mesh_message
473486 sqs_record = create_sqs_record ()
474487
475- processor .process_sqs_message (sqs_record )
488+ outcome = processor .process_sqs_message (sqs_record )
476489
490+ assert outcome == 'downloaded'
477491 event_publisher .send_events .assert_called_once ()
478492 published_events = event_publisher .send_events .call_args [0 ][0 ]
479493 assert len (published_events ) == 1
480494 message_uri = published_events [0 ]['data' ]['messageUri' ]
481495 assert message_uri .startswith ('s3://test-pii-bucket/' )
496+
497+ def test_duplicate_delivery_skips_publish_and_acknowledge (self ):
498+ """When S3 signals the object already exists, processor logs a warning, skips publishing and metric, but still acknowledges"""
499+ from mesh_download .processor import MeshDownloadProcessor
500+
501+ config , log , event_publisher , document_store = setup_mocks ()
502+ bound_logger = Mock ()
503+ log .bind .return_value = bound_logger
504+
505+ document_store .store_document .side_effect = DocumentAlreadyExistsError (
506+ "Document already exists for key: document-reference/TEST-SENDER/ref-001_mesh-123"
507+ )
508+
509+ processor = MeshDownloadProcessor (
510+ config = config ,
511+ log = log ,
512+ mesh_client = config .mesh_client ,
513+ download_metric = config .download_metric ,
514+ duplicate_download_metric = config .duplicate_download_metric ,
515+ document_store = document_store ,
516+ event_publisher = event_publisher
517+ )
518+
519+ mesh_message = create_mesh_message ()
520+ config .mesh_client .retrieve_message .return_value = mesh_message
521+ sqs_record = create_sqs_record ()
522+
523+ # Should complete without raising
524+ outcome = processor .process_sqs_message (sqs_record )
525+
526+ assert outcome == 'skipped'
527+ bound_logger .warning .assert_called_once ()
528+ warning_msg = bound_logger .warning .call_args [0 ][0 ]
529+ assert "already stored" in warning_msg
530+ config .duplicate_download_metric .record .assert_called_once ()
531+
532+ event_publisher .send_events .assert_not_called ()
533+ config .download_metric .record .assert_not_called ()
534+
535+ # Acknowledge should still be called to remove message from MESH inbox
536+ mesh_message .acknowledge .assert_called_once ()
0 commit comments