Skip to content

Commit cc997d2

Browse files
CCM-15297: Prevent MeshDownloader from publishing duplicate events (#243)
* CCM-15297: Update EventPublisher to handle retries * CCM-15297: Add duplicate metric and update unit tests * CCM-15297: revert to using Exception * CCM-15297: Add component and unit tests * CCM-15297: Update logs * CCM-15297: Don't use double quotes in TF when they're not necessary * CCM-15297: Fix typecheck failing falling merge --------- Co-authored-by: simonlabarere <simon.labarere1@nhs.net> Co-authored-by: simonlabarere <156111959+simonlabarere@users.noreply.github.com>
1 parent 12012c7 commit cc997d2

15 files changed

Lines changed: 665 additions & 147 deletions

File tree

infrastructure/terraform/components/dl/module_lambda_mesh_acknowledge.tf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ module "mesh_acknowledge" {
4040
EVENT_PUBLISHER_DLQ_URL = module.sqs_event_publisher_errors.sqs_queue_url
4141
EVENT_PUBLISHER_EVENT_BUS_ARN = aws_cloudwatch_event_bus.main.arn
4242
MOCK_MESH_BUCKET = module.s3bucket_non_pii_data.bucket
43-
SSM_MESH_PREFIX = "${local.ssm_mesh_prefix}"
44-
SSM_SENDERS_PREFIX = "${local.ssm_senders_prefix}"
43+
SSM_MESH_PREFIX = local.ssm_mesh_prefix
44+
SSM_SENDERS_PREFIX = local.ssm_senders_prefix
4545
USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false"
4646
}
4747

infrastructure/terraform/components/dl/module_lambda_mesh_download.tf

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,16 @@ module "mesh_download" {
3737
log_subscription_role_arn = local.acct.log_subscription_role_arn
3838

3939
lambda_env_vars = {
40-
DOWNLOAD_METRIC_NAME = "mesh-download-successful-downloads"
41-
DOWNLOAD_METRIC_NAMESPACE = "dl-mesh-download"
42-
ENVIRONMENT = var.environment
43-
EVENT_PUBLISHER_DLQ_URL = module.sqs_event_publisher_errors.sqs_queue_url
44-
EVENT_PUBLISHER_EVENT_BUS_ARN = aws_cloudwatch_event_bus.main.arn
45-
PII_BUCKET = module.s3bucket_pii_data.bucket
46-
SSM_MESH_PREFIX = "${local.ssm_mesh_prefix}"
47-
SSM_SENDERS_PREFIX = "${local.ssm_senders_prefix}"
48-
USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false"
40+
DOWNLOAD_METRIC_NAME = "mesh-download-successful-downloads"
41+
DUPLICATE_DOWNLOAD_METRIC_NAME = "mesh-duplicate-downloads"
42+
DOWNLOAD_METRIC_NAMESPACE = "dl-mesh-download"
43+
ENVIRONMENT = var.environment
44+
EVENT_PUBLISHER_DLQ_URL = module.sqs_event_publisher_errors.sqs_queue_url
45+
EVENT_PUBLISHER_EVENT_BUS_ARN = aws_cloudwatch_event_bus.main.arn
46+
PII_BUCKET = module.s3bucket_pii_data.bucket
47+
SSM_MESH_PREFIX = local.ssm_mesh_prefix
48+
SSM_SENDERS_PREFIX = local.ssm_senders_prefix
49+
USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false"
4950
}
5051

5152
}

infrastructure/terraform/components/dl/module_lambda_mesh_poll.tf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ module "mesh_poll" {
4545
MAXIMUM_RUNTIME_MILLISECONDS = "240000" # 4 minutes (Lambda has 5 min timeout)
4646
POLLING_METRIC_NAME = "mesh-poll-successful-polls"
4747
POLLING_METRIC_NAMESPACE = "dl-mesh-poll"
48-
SSM_MESH_PREFIX = "${local.ssm_mesh_prefix}"
49-
SSM_SENDERS_PREFIX = "${local.ssm_senders_prefix}"
48+
SSM_MESH_PREFIX = local.ssm_mesh_prefix
49+
SSM_SENDERS_PREFIX = local.ssm_senders_prefix
5050
USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false"
5151
}
5252

infrastructure/terraform/components/dl/module_lambda_report_sender.tf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ module "report_sender" {
4242
EVENT_PUBLISHER_DLQ_URL = module.sqs_event_publisher_errors.sqs_queue_url
4343
EVENT_PUBLISHER_EVENT_BUS_ARN = aws_cloudwatch_event_bus.main.arn
4444
MOCK_MESH_BUCKET = module.s3bucket_non_pii_data.bucket
45-
SSM_MESH_PREFIX = "${local.ssm_mesh_prefix}"
46-
SSM_SENDERS_PREFIX = "${local.ssm_senders_prefix}"
45+
SSM_MESH_PREFIX = local.ssm_mesh_prefix
46+
SSM_SENDERS_PREFIX = local.ssm_senders_prefix
4747
USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false"
4848
}
4949

lambdas/mesh-download/mesh_download/__tests__/test_document_store.py

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
11
"""Tests for DocumentStore"""
22
import pytest
33
from unittest.mock import Mock
4-
from mesh_download.document_store import DocumentStore, IntermediaryBodyStoreError
4+
from botocore.exceptions import ClientError
5+
from mesh_download.document_store import DocumentStore, IntermediaryBodyStoreError, DocumentAlreadyExistsError
6+
7+
8+
def make_client_error(code):
9+
"""Helper to build a botocore ClientError with a given error code"""
10+
return ClientError(
11+
{'Error': {'Code': code, 'Message': 'test'}},
12+
'PutObject'
13+
)
514

615

716
class TestDocumentStore:
@@ -21,20 +30,41 @@ def test_store_document_success(self):
2130
store = DocumentStore(config)
2231

2332
result = store.store_document(
24-
sender_id='SENDER_001',
25-
message_reference='ref_123',
33+
sender_id='SENDER-001',
34+
message_reference='ref-123',
35+
mesh_message_id='mesh-456',
2636
content=b'test content'
2737
)
2838

29-
assert result == 'document-reference/SENDER_001_ref_123'
39+
assert result == 'document-reference/SENDER-001/ref-123_mesh-456'
3040
mock_s3_client.put_object.assert_called_once_with(
3141
Bucket='test-pii-bucket',
32-
Key='document-reference/SENDER_001_ref_123',
33-
Body=b'test content'
42+
Key='document-reference/SENDER-001/ref-123_mesh-456',
43+
Body=b'test content',
44+
IfNoneMatch='*'
3445
)
3546

3647
def test_store_document_s3_failure_raises_error(self):
37-
"""Raises IntermediaryBodyStoreError when S3 put_object fails"""
48+
"""Raises IntermediaryBodyStoreError when S3 put_object fails with a non-HTTP error"""
49+
mock_s3_client = Mock()
50+
mock_s3_client.put_object.side_effect = make_client_error('InternalError')
51+
52+
config = Mock()
53+
config.s3_client = mock_s3_client
54+
config.transactional_data_bucket = 'test-pii-bucket'
55+
56+
store = DocumentStore(config)
57+
58+
with pytest.raises(IntermediaryBodyStoreError):
59+
store.store_document(
60+
sender_id='SENDER-001',
61+
message_reference='ref-123',
62+
mesh_message_id='mesh-456',
63+
content=b'test content'
64+
)
65+
66+
def test_store_document_raises_error_on_non_200_response(self):
67+
"""Raises IntermediaryBodyStoreError when S3 returns a non-200 HTTP status"""
3868
mock_s3_client = Mock()
3969
mock_s3_client.put_object.return_value = {
4070
'ResponseMetadata': {'HTTPStatusCode': 500}
@@ -48,7 +78,27 @@ def test_store_document_s3_failure_raises_error(self):
4878

4979
with pytest.raises(IntermediaryBodyStoreError):
5080
store.store_document(
51-
sender_id='SENDER_001',
52-
message_reference='ref_123',
81+
sender_id='SENDER-001',
82+
message_reference='ref-123',
83+
mesh_message_id='mesh-456',
84+
content=b'test content'
85+
)
86+
87+
def test_store_document_precondition_failed_raises_document_already_exists(self):
88+
"""Raises DocumentAlreadyExistsError when S3 returns PreconditionFailed (object already exists)"""
89+
mock_s3_client = Mock()
90+
mock_s3_client.put_object.side_effect = make_client_error('PreconditionFailed')
91+
92+
config = Mock()
93+
config.s3_client = mock_s3_client
94+
config.transactional_data_bucket = 'test-pii-bucket'
95+
96+
store = DocumentStore(config)
97+
98+
with pytest.raises(DocumentAlreadyExistsError, match='document-reference/SENDER-001/ref-123_mesh-456'):
99+
store.store_document(
100+
sender_id='SENDER-001',
101+
message_reference='ref-123',
102+
mesh_message_id='mesh-456',
53103
content=b'test content'
54104
)

lambdas/mesh-download/mesh_download/__tests__/test_handler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def setup_mocks():
1515
mock_config.mesh_client = Mock()
1616

1717
mock_processor = Mock()
18-
mock_processor.process_sqs_message = Mock()
18+
mock_processor.process_sqs_message = Mock(return_value='downloaded')
1919

2020
return (
2121
mock_context,
@@ -149,9 +149,9 @@ def test_handler_partial_batch_failure(self, mock_processor_class, mock_config_c
149149

150150
# Make second message fail
151151
mock_processor.process_sqs_message.side_effect = [
152-
None,
152+
'downloaded',
153153
Exception("Test error"),
154-
None
154+
'downloaded'
155155
]
156156

157157
event = create_sqs_event(num_records=3)

lambdas/mesh-download/mesh_download/__tests__/test_processor.py

Lines changed: 70 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from datetime import datetime, timezone
1010
from pydantic import ValidationError
1111
from mesh_download.errors import MeshMessageNotFound
12+
from mesh_download.document_store import DocumentAlreadyExistsError
1213

1314

1415
def setup_mocks():
@@ -19,6 +20,7 @@ def setup_mocks():
1920
# Set up default config attributes
2021
config.mesh_client = Mock()
2122
config.download_metric = Mock()
23+
config.duplicate_download_metric = Mock()
2224
config.s3_client = Mock()
2325
config.environment = 'development'
2426
config.transactional_data_bucket = 'test-pii-bucket'
@@ -48,9 +50,9 @@ def create_valid_cloud_event():
4850
'traceparent': '00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01',
4951
'dataschema': 'https://notify.nhs.uk/cloudevents/schemas/digital-letters/2025-10-draft/data/digital-letters-mesh-inbox-message-received-data.schema.json',
5052
'data': {
51-
'meshMessageId': 'test_message_123',
52-
'senderId': 'TEST_SENDER',
53-
'messageReference': 'ref_001'
53+
'meshMessageId': 'test-message-123',
54+
'senderId': 'TEST-SENDER',
55+
'messageReference': 'ref-001'
5456
}
5557
}
5658

@@ -99,6 +101,7 @@ def test_processor_initialization_calls_mesh_handshake(self):
99101
log=log,
100102
mesh_client=config.mesh_client,
101103
download_metric=config.download_metric,
104+
duplicate_download_metric=config.duplicate_download_metric,
102105
document_store=document_store,
103106
event_publisher=event_publisher
104107
)
@@ -115,7 +118,7 @@ def test_process_sqs_message_success(self, mock_datetime):
115118
fixed_time = datetime(2025, 11, 19, 15, 30, 45, tzinfo=timezone.utc)
116119
mock_datetime.now.return_value = fixed_time
117120

118-
document_store.store_document.return_value = 'document-reference/SENDER_001_ref_001'
121+
document_store.store_document.return_value = 'document-reference/SENDER-001/ref-001_test-message-123'
119122

120123
event_publisher.send_events.return_value = []
121124

@@ -124,6 +127,7 @@ def test_process_sqs_message_success(self, mock_datetime):
124127
log=log,
125128
mesh_client=config.mesh_client,
126129
download_metric=config.download_metric,
130+
duplicate_download_metric=config.duplicate_download_metric,
127131
document_store=document_store,
128132
event_publisher=event_publisher
129133
)
@@ -133,15 +137,17 @@ def test_process_sqs_message_success(self, mock_datetime):
133137

134138
sqs_record = create_sqs_record()
135139

136-
processor.process_sqs_message(sqs_record)
140+
outcome = processor.process_sqs_message(sqs_record)
137141

138-
config.mesh_client.retrieve_message.assert_called_once_with('test_message_123')
142+
assert outcome == 'downloaded'
143+
config.mesh_client.retrieve_message.assert_called_once_with('test-message-123')
139144

140145
mesh_message.read.assert_called_once()
141146

142147
document_store.store_document.assert_called_once_with(
143-
sender_id='TEST_SENDER',
144-
message_reference='ref_001',
148+
sender_id='TEST-SENDER',
149+
message_reference='ref-001',
150+
mesh_message_id='test-message-123',
145151
content=b'Test message content'
146152
)
147153

@@ -172,9 +178,9 @@ def test_process_sqs_message_success(self, mock_datetime):
172178

173179
# Verify CloudEvent data payload
174180
event_data = published_event['data']
175-
assert event_data['senderId'] == 'TEST_SENDER'
176-
assert event_data['messageReference'] == 'ref_001'
177-
assert event_data['messageUri'] == 's3://test-pii-bucket/document-reference/SENDER_001_ref_001'
181+
assert event_data['senderId'] == 'TEST-SENDER'
182+
assert event_data['messageReference'] == 'ref-001'
183+
assert event_data['messageUri'] == 's3://test-pii-bucket/document-reference/SENDER-001/ref-001_test-message-123'
178184
assert set(event_data.keys()) == {'senderId', 'messageReference', 'messageUri', 'meshMessageId'}
179185

180186
def test_process_sqs_message_validation_failure(self):
@@ -188,6 +194,7 @@ def test_process_sqs_message_validation_failure(self):
188194
log=log,
189195
mesh_client=config.mesh_client,
190196
download_metric=config.download_metric,
197+
duplicate_download_metric=config.duplicate_download_metric,
191198
document_store=document_store,
192199
event_publisher=event_publisher
193200
)
@@ -212,6 +219,7 @@ def test_process_sqs_message_missing_mesh_message_id(self):
212219
log=log,
213220
mesh_client=config.mesh_client,
214221
download_metric=config.download_metric,
222+
duplicate_download_metric=config.duplicate_download_metric,
215223
document_store=document_store,
216224
event_publisher=event_publisher
217225
)
@@ -239,17 +247,18 @@ def test_download_and_store_message_not_found(self):
239247
log=log,
240248
mesh_client=config.mesh_client,
241249
download_metric=config.download_metric,
250+
duplicate_download_metric=config.duplicate_download_metric,
242251
document_store=document_store,
243252
event_publisher=event_publisher
244253
)
245254

246255
config.mesh_client.retrieve_message.return_value = None
247256
sqs_record = create_sqs_record()
248257

249-
with pytest.raises(MeshMessageNotFound, match="MESH message with ID test_message_123 not found"):
258+
with pytest.raises(MeshMessageNotFound, match="MESH message with ID test-message-123 not found"):
250259
processor.process_sqs_message(sqs_record)
251260

252-
config.mesh_client.retrieve_message.assert_called_once_with('test_message_123')
261+
config.mesh_client.retrieve_message.assert_called_once_with('test-message-123')
253262
document_store.store_document.assert_not_called()
254263
event_publisher.send_events.assert_not_called()
255264
config.download_metric.record.assert_not_called()
@@ -269,6 +278,7 @@ def test_document_store_failure_prevents_ack_and_raises(self):
269278
log=log,
270279
mesh_client=config.mesh_client,
271280
download_metric=config.download_metric,
281+
duplicate_download_metric=config.duplicate_download_metric,
272282
document_store=document_store,
273283
event_publisher=event_publisher
274284
)
@@ -304,6 +314,7 @@ def test_bucket_selection_with_mesh_mock_enabled(self, mock_datetime):
304314
log=log,
305315
mesh_client=config.mesh_client,
306316
download_metric=config.download_metric,
317+
duplicate_download_metric=config.duplicate_download_metric,
307318
document_store=document_store,
308319
event_publisher=event_publisher
309320
)
@@ -312,8 +323,9 @@ def test_bucket_selection_with_mesh_mock_enabled(self, mock_datetime):
312323
config.mesh_client.retrieve_message.return_value = mesh_message
313324
sqs_record = create_sqs_record()
314325

315-
processor.process_sqs_message(sqs_record)
326+
outcome = processor.process_sqs_message(sqs_record)
316327

328+
assert outcome == 'downloaded'
317329
# Verify event was published with PII bucket in URI
318330
event_publisher.send_events.assert_called_once()
319331
published_events = event_publisher.send_events.call_args[0][0]
@@ -342,6 +354,7 @@ def test_bucket_selection_with_mesh_mock_disabled(self, mock_datetime):
342354
log=log,
343355
mesh_client=config.mesh_client,
344356
download_metric=config.download_metric,
357+
duplicate_download_metric=config.duplicate_download_metric,
345358
document_store=document_store,
346359
event_publisher=event_publisher
347360
)
@@ -350,10 +363,52 @@ def test_bucket_selection_with_mesh_mock_disabled(self, mock_datetime):
350363
config.mesh_client.retrieve_message.return_value = mesh_message
351364
sqs_record = create_sqs_record()
352365

353-
processor.process_sqs_message(sqs_record)
366+
outcome = processor.process_sqs_message(sqs_record)
354367

368+
assert outcome == 'downloaded'
355369
event_publisher.send_events.assert_called_once()
356370
published_events = event_publisher.send_events.call_args[0][0]
357371
assert len(published_events) == 1
358372
message_uri = published_events[0]['data']['messageUri']
359373
assert message_uri.startswith('s3://test-pii-bucket/')
374+
375+
def test_duplicate_delivery_skips_publish_and_acknowledge(self):
376+
"""When S3 signals the object already exists, processor logs a warning, skips publishing and metric, but still acknowledges"""
377+
from mesh_download.processor import MeshDownloadProcessor
378+
379+
config, log, event_publisher, document_store = setup_mocks()
380+
bound_logger = Mock()
381+
log.bind.return_value = bound_logger
382+
383+
document_store.store_document.side_effect = DocumentAlreadyExistsError(
384+
"Document already exists for key: document-reference/TEST-SENDER/ref-001_mesh-123"
385+
)
386+
387+
processor = MeshDownloadProcessor(
388+
config=config,
389+
log=log,
390+
mesh_client=config.mesh_client,
391+
download_metric=config.download_metric,
392+
duplicate_download_metric=config.duplicate_download_metric,
393+
document_store=document_store,
394+
event_publisher=event_publisher
395+
)
396+
397+
mesh_message = create_mesh_message()
398+
config.mesh_client.retrieve_message.return_value = mesh_message
399+
sqs_record = create_sqs_record()
400+
401+
# Should complete without raising
402+
outcome = processor.process_sqs_message(sqs_record)
403+
404+
assert outcome == 'skipped'
405+
bound_logger.warning.assert_called_once()
406+
warning_msg = bound_logger.warning.call_args[0][0]
407+
assert "already stored" in warning_msg
408+
config.duplicate_download_metric.record.assert_called_once()
409+
410+
event_publisher.send_events.assert_not_called()
411+
config.download_metric.record.assert_not_called()
412+
413+
# Acknowledge should still be called to remove message from MESH inbox
414+
mesh_message.acknowledge.assert_called_once()

0 commit comments

Comments
 (0)