Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion infrastructure/instance/sqs_id_sync.tf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
resource "aws_sqs_queue" "id_sync_queue" {
name = "imms-${local.resource_scope}-id-sync-queue"
kms_master_key_id = data.aws_kms_key.existing_id_sync_sqs_encryption_key.arn
visibility_timeout_seconds = 360
visibility_timeout_seconds = 2160 # as per AWS docs to be 6 times the Lambda function timeout
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.id_sync_dlq.arn
maxReceiveCount = 4
Expand Down
47 changes: 25 additions & 22 deletions lambdas/id_sync/src/id_sync.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
"""
- Parses the incoming AWS event into `AwsLambdaEvent` and iterate its `records`.
- Delegate each record to `process_record` and collect `nhs_number` from each result.
- If any record has status == "error" raise `IdSyncException` with aggregated nhs_numbers.
- Any unexpected error is wrapped into `IdSyncException(message="Error processing id_sync event")`.
- Parses the incoming AWS event into `AwsLambdaEvent` and iterates its `records`.
- Delegates each record to `process_record` with per-record exception isolation.
- Returns {"batchItemFailures": [...]} for any failed records so SQS only re-drives the failing messages.
- A handler-level exception (bad event schema etc.) re-raises to trigger full batch retry.
"""

from typing import Any

from common.aws_lambda_event import AwsLambdaEvent
from common.clients import STREAM_NAME, logger
from common.log_decorator import logging_decorator
from exceptions.id_sync_exception import IdSyncException
from record_processor import process_record


Expand All @@ -25,28 +24,32 @@ def handler(event_data: dict[str, Any], _context) -> dict[str, Any]:

logger.info("id_sync processing event with %d records", len(records))

error_count = 0
batch_item_failures = []

for record in records:
result = process_record(record)

if result.get("status") == "error":
error_count += 1

if error_count > 0:
raise IdSyncException(
message=f"Processed {len(records)} records with {error_count} errors",
)
try:
result = process_record(record)
if result.get("status") == "error":
message_id = record.get("messageId")
logger.error(
"id_sync record processing failed for messageId: %s — %s",
message_id,
result.get("message"),
)
batch_item_failures.append({"itemIdentifier": message_id})
Comment thread
Thomas-Boyle marked this conversation as resolved.
except Exception:
message_id = record.get("messageId")
logger.exception("Unexpected error processing messageId: %s", message_id)
batch_item_failures.append({"itemIdentifier": message_id})

if batch_item_failures:
logger.error("id_sync completed with %d/%d failures", len(batch_item_failures), len(records))
return {"batchItemFailures": batch_item_failures}

response = {"status": "success", "message": f"Successfully processed {len(records)} records"}

logger.info("id_sync handler completed: %s", response)
return response

except IdSyncException as e:
logger.exception(f"id_sync error: {e.message}")
raise
except Exception:
msg = "Error processing id_sync event"
logger.exception(msg)
raise IdSyncException(message=msg)
logger.exception("Unexpected error processing id_sync event")
raise
Loading
Loading