99from datetime import datetime , timezone
1010from pydantic import ValidationError
1111from mesh_download .errors import MeshMessageNotFound
12- from mesh_download .document_store import DocumentAlreadyExistsError
12+ from mesh_download .document_store import DocumentAlreadyExistsError , DocumentAlreadyExistsInternalRetryError
1313
1414
1515def setup_mocks ():
@@ -20,7 +20,8 @@ def setup_mocks():
2020 # Set up default config attributes
2121 config .mesh_client = Mock ()
2222 config .download_metric = Mock ()
23- config .duplicate_download_metric = Mock ()
23+ config .internal_duplicate_download_metric = Mock ()
24+ config .trust_duplicate_download_metric = Mock ()
2425 config .s3_client = Mock ()
2526 config .environment = 'development'
2627 config .transactional_data_bucket = 'test-pii-bucket'
@@ -157,7 +158,8 @@ def test_processor_initialization_calls_mesh_handshake(self):
157158 log = log ,
158159 mesh_client = config .mesh_client ,
159160 download_metric = config .download_metric ,
160- duplicate_download_metric = config .duplicate_download_metric ,
161+ internal_duplicate_download_metric = config .internal_duplicate_download_metric ,
162+ trust_duplicate_download_metric = config .trust_duplicate_download_metric ,
161163 document_store = document_store ,
162164 event_publisher = event_publisher
163165 )
@@ -183,7 +185,8 @@ def test_process_sqs_message_success(self, mock_datetime):
183185 log = log ,
184186 mesh_client = config .mesh_client ,
185187 download_metric = config .download_metric ,
186- duplicate_download_metric = config .duplicate_download_metric ,
188+ internal_duplicate_download_metric = config .internal_duplicate_download_metric ,
189+ trust_duplicate_download_metric = config .trust_duplicate_download_metric ,
187190 document_store = document_store ,
188191 event_publisher = event_publisher
189192 )
@@ -260,7 +263,8 @@ def test_process_sqs_message_invalid_fhir_content(self, mock_datetime):
260263 log = log ,
261264 mesh_client = config .mesh_client ,
262265 download_metric = config .download_metric ,
263- duplicate_download_metric = config .duplicate_download_metric ,
266+ internal_duplicate_download_metric = config .internal_duplicate_download_metric ,
267+ trust_duplicate_download_metric = config .trust_duplicate_download_metric ,
264268 document_store = document_store ,
265269 event_publisher = event_publisher
266270 )
@@ -326,7 +330,8 @@ def test_process_sqs_message_validation_failure(self):
326330 log = log ,
327331 mesh_client = config .mesh_client ,
328332 download_metric = config .download_metric ,
329- duplicate_download_metric = config .duplicate_download_metric ,
333+ internal_duplicate_download_metric = config .internal_duplicate_download_metric ,
334+ trust_duplicate_download_metric = config .trust_duplicate_download_metric ,
330335 document_store = document_store ,
331336 event_publisher = event_publisher
332337 )
@@ -351,7 +356,8 @@ def test_process_sqs_message_missing_mesh_message_id(self):
351356 log = log ,
352357 mesh_client = config .mesh_client ,
353358 download_metric = config .download_metric ,
354- duplicate_download_metric = config .duplicate_download_metric ,
359+ internal_duplicate_download_metric = config .internal_duplicate_download_metric ,
360+ trust_duplicate_download_metric = config .trust_duplicate_download_metric ,
355361 document_store = document_store ,
356362 event_publisher = event_publisher
357363 )
@@ -379,7 +385,8 @@ def test_download_and_store_message_not_found(self):
379385 log = log ,
380386 mesh_client = config .mesh_client ,
381387 download_metric = config .download_metric ,
382- duplicate_download_metric = config .duplicate_download_metric ,
388+ internal_duplicate_download_metric = config .internal_duplicate_download_metric ,
389+ trust_duplicate_download_metric = config .trust_duplicate_download_metric ,
383390 document_store = document_store ,
384391 event_publisher = event_publisher
385392 )
@@ -410,7 +417,8 @@ def test_document_store_failure_prevents_ack_and_raises(self):
410417 log = log ,
411418 mesh_client = config .mesh_client ,
412419 download_metric = config .download_metric ,
413- duplicate_download_metric = config .duplicate_download_metric ,
420+ internal_duplicate_download_metric = config .internal_duplicate_download_metric ,
421+ trust_duplicate_download_metric = config .trust_duplicate_download_metric ,
414422 document_store = document_store ,
415423 event_publisher = event_publisher
416424 )
@@ -446,7 +454,8 @@ def test_bucket_selection_with_mesh_mock_enabled(self, mock_datetime):
446454 log = log ,
447455 mesh_client = config .mesh_client ,
448456 download_metric = config .download_metric ,
449- duplicate_download_metric = config .duplicate_download_metric ,
457+ internal_duplicate_download_metric = config .internal_duplicate_download_metric ,
458+ trust_duplicate_download_metric = config .trust_duplicate_download_metric ,
450459 document_store = document_store ,
451460 event_publisher = event_publisher
452461 )
@@ -486,7 +495,8 @@ def test_bucket_selection_with_mesh_mock_disabled(self, mock_datetime):
486495 log = log ,
487496 mesh_client = config .mesh_client ,
488497 download_metric = config .download_metric ,
489- duplicate_download_metric = config .duplicate_download_metric ,
498+ internal_duplicate_download_metric = config .internal_duplicate_download_metric ,
499+ trust_duplicate_download_metric = config .trust_duplicate_download_metric ,
490500 document_store = document_store ,
491501 event_publisher = event_publisher
492502 )
@@ -505,23 +515,27 @@ def test_bucket_selection_with_mesh_mock_disabled(self, mock_datetime):
505515 assert message_uri .startswith ('s3://test-pii-bucket/' )
506516
507517 def test_duplicate_delivery_skips_publish_and_acknowledge (self ):
508- """When S3 signals the object already exists, processor logs a warning, skips publishing and metric, but still acknowledges"""
518+ """
519+ Internal retry: the object already exists with the same meshMessageId.
520+ Skip publish, record internal metric, acknowledge
521+ """
509522 from mesh_download .processor import MeshDownloadProcessor
510523
511524 config , log , event_publisher , document_store = setup_mocks ()
512525 bound_logger = Mock ()
513526 log .bind .return_value = bound_logger
514527
515- document_store .store_document .side_effect = DocumentAlreadyExistsError (
516- "Document already exists for key: document-reference/TEST-SENDER/ref-001_mesh-123 "
528+ document_store .store_document .side_effect = DocumentAlreadyExistsInternalRetryError (
529+ "Internal retry for key: document-reference/TEST-SENDER/ref-001 "
517530 )
518531
519532 processor = MeshDownloadProcessor (
520533 config = config ,
521534 log = log ,
522535 mesh_client = config .mesh_client ,
523536 download_metric = config .download_metric ,
524- duplicate_download_metric = config .duplicate_download_metric ,
537+ internal_duplicate_download_metric = config .internal_duplicate_download_metric ,
538+ trust_duplicate_download_metric = config .trust_duplicate_download_metric ,
525539 document_store = document_store ,
526540 event_publisher = event_publisher
527541 )
@@ -530,17 +544,71 @@ def test_duplicate_delivery_skips_publish_and_acknowledge(self):
530544 config .mesh_client .retrieve_message .return_value = mesh_message
531545 sqs_record = create_sqs_record ()
532546
533- # Should complete without raising
534547 outcome = processor .process_sqs_message (sqs_record )
535548
536549 assert outcome == 'skipped'
537550 bound_logger .warning .assert_called_once ()
538- warning_msg = bound_logger .warning .call_args [0 ][0 ]
539- assert "already stored" in warning_msg
540- config .duplicate_download_metric .record .assert_called_once ()
541-
551+ assert "Internal retry" in bound_logger .warning .call_args [0 ][0 ]
552+ config .internal_duplicate_download_metric .record .assert_called_once_with (1 )
553+ config .trust_duplicate_download_metric .record .assert_not_called ()
542554 event_publisher .send_events .assert_not_called ()
543555 config .download_metric .record .assert_not_called ()
544-
545556 # Acknowledge should still be called to remove message from MESH inbox
546557 mesh_message .acknowledge .assert_called_once ()
558+
559+ @patch ('mesh_download.processor.datetime' )
560+ def test_trust_duplicate_publishes_invalid_event_and_acknowledges (self , mock_datetime ):
561+ """
562+ Trust duplicate: the object already exists with a different meshMessageId.
563+ Publish invalid event with DL_CLIV_004, record trust metric, acknowledge
564+ """
565+ from mesh_download .processor import MeshDownloadProcessor
566+
567+ config , log , event_publisher , document_store = setup_mocks ()
568+ bound_logger = Mock ()
569+ log .bind .return_value = bound_logger
570+
571+ fixed_time = datetime (2025 , 11 , 19 , 15 , 30 , 45 , tzinfo = timezone .utc )
572+ mock_datetime .now .return_value = fixed_time
573+
574+ document_store .store_document .side_effect = DocumentAlreadyExistsError (
575+ "Trust duplicate for key: document-reference/TEST-SENDER/ref-001"
576+ )
577+ event_publisher .send_events .return_value = []
578+
579+ processor = MeshDownloadProcessor (
580+ config = config ,
581+ log = log ,
582+ mesh_client = config .mesh_client ,
583+ download_metric = config .download_metric ,
584+ internal_duplicate_download_metric = config .internal_duplicate_download_metric ,
585+ trust_duplicate_download_metric = config .trust_duplicate_download_metric ,
586+ document_store = document_store ,
587+ event_publisher = event_publisher
588+ )
589+
590+ mesh_message = create_mesh_message ()
591+ config .mesh_client .retrieve_message .return_value = mesh_message
592+ sqs_record = create_sqs_record ()
593+
594+ outcome = processor .process_sqs_message (sqs_record )
595+
596+ assert outcome == 'duplicate'
597+ bound_logger .warning .assert_called_once ()
598+ assert "Trust duplicate" in bound_logger .warning .call_args [0 ][0 ]
599+ config .trust_duplicate_download_metric .record .assert_called_once_with (1 )
600+ config .internal_duplicate_download_metric .record .assert_not_called ()
601+ config .download_metric .record .assert_not_called ()
602+
603+ event_publisher .send_events .assert_called_once ()
604+ published_event = event_publisher .send_events .call_args [0 ][0 ][0 ]
605+ assert published_event ['type' ] == 'uk.nhs.notify.digital.letters.mesh.inbox.message.invalid.v1'
606+ assert published_event ['time' ] == '2025-11-19T15:30:45+00:00'
607+
608+ event_data = published_event ['data' ]
609+ assert event_data ['senderId' ] == 'TEST-SENDER'
610+ assert event_data ['meshMessageId' ] == 'test-message-123'
611+ assert event_data ['messageReference' ] == 'ref-001'
612+ assert event_data ['failureCode' ] == 'DL_CLIV_004'
613+
614+ mesh_message .acknowledge .assert_called_once ()
0 commit comments