11import json
22import os
3- from typing import Tuple
43
4+ from aws_lambda_powertools .utilities .data_classes .sqs_event import SQSRecord
55from aws_lambda_typing .events .sqs import SQSMessage
66
77from common .api_clients .mns_service import MnsService
88from common .api_clients .mns_setup import get_mns_service
99from common .api_clients .mock_mns_service import MockMnsService
10- from common .clients import logger
1110from create_notification import create_mns_notification
11+ from observability import logger
1212
1313mns_env = os .getenv ("MNS_ENV" , "int" )
14- MNS_TEST_QUEUE_URL = os .getenv ("MNS_TEST_QUEUE_URL" )
14+ _mns_service : MnsService | MockMnsService | None = None
15+ SqsRecord = SQSRecord | SQSMessage
1516
1617
17- def process_records (records : list [SQSMessage ]) -> dict [str , list ]:
18+ def _get_message_id (record : SqsRecord ) -> str :
19+ if isinstance (record , SQSRecord ):
20+ return record .message_id
21+
22+ return record .get ("messageId" , "unknown" )
23+
24+
25+ def _get_body (record : SqsRecord ) -> dict | str :
26+ if isinstance (record , SQSRecord ):
27+ return record .body
28+
29+ return record .get ("body" , {})
30+
31+
32+ def _as_sqs_message (record : SqsRecord ) -> SQSMessage :
33+ if isinstance (record , SQSRecord ):
34+ return record .raw_event
35+
36+ return record
37+
38+
39+ def _get_runtime_mns_service () -> MnsService | MockMnsService :
40+ global _mns_service
41+ if _mns_service is None :
42+ _mns_service = get_mns_service (mns_env = mns_env )
43+
44+ return _mns_service
45+
46+
47+ def process_records (records : list [SqsRecord ]) -> dict [str , list ]:
1848 """
1949 Process multiple SQS records.
2050 Args: records: List of SQS records to process
2151 Returns: List of failed item identifiers for partial batch failure
2252 """
2353 batch_item_failures = []
24- mns_service = get_mns_service ( mns_env = mns_env )
54+ mns_service = _get_runtime_mns_service ( )
2555
2656 for record in records :
2757 try :
2858 process_record (record , mns_service )
2959 except Exception :
30- message_id = record . get ( "messageId" , "unknown" )
60+ message_id = _get_message_id ( record )
3161 batch_item_failures .append ({"itemIdentifier" : message_id })
3262 logger .exception ("Failed to process record" , extra = {"message_id" : message_id })
3363
@@ -39,7 +69,7 @@ def process_records(records: list[SQSMessage]) -> dict[str, list]:
3969 return {"batchItemFailures" : batch_item_failures }
4070
4171
42- def process_record (record : SQSMessage , mns_service : MnsService | MockMnsService ) -> None :
72+ def process_record (record : SqsRecord , mns_service : MnsService | MockMnsService ) -> None :
4373 """
4474 Process a single SQS record.
4575 Args:
@@ -50,34 +80,36 @@ def process_record(record: SQSMessage, mns_service: MnsService | MockMnsService)
5080 message_id , immunisation_id = extract_trace_ids (record )
5181 notification_id = None
5282
53- mns_notification_payload = create_mns_notification (record )
83+ mns_notification_payload = create_mns_notification (_as_sqs_message ( record ) )
5484 notification_id = mns_notification_payload .get ("id" )
5585
5686 action_flag = mns_notification_payload .get ("filtering" , {}).get ("action" )
5787 logger .info (
5888 "Processing message" ,
59- extra = {
60- "notification_id" : notification_id ,
61- "message_id" : message_id ,
62- "immunisation_id" : immunisation_id ,
63- "action_flag" : action_flag ,
64- },
89+ notification_id = notification_id ,
90+ message_id = message_id ,
91+ immunisation_id = immunisation_id ,
92+ action_flag = action_flag ,
6593 )
6694
6795 mns_service .publish_notification (mns_notification_payload )
68- logger .info ("Successfully created MNS notification" , extra = {"mns_notification_id" : notification_id })
96+
97+ logger .info (
98+ "Successfully created MNS notification" ,
99+ mns_notification_id = notification_id ,
100+ )
69101
70102
71- def extract_trace_ids (record : SQSMessage ) -> Tuple [str , str | None ]:
103+ def extract_trace_ids (record : SqsRecord ) -> tuple [str , str | None ]:
72104 """
73105 Extract identifiers for tracing from SQS record.
74106 Returns: Tuple of (message_id, immunisation_id)
75107 """
76- sqs_message_id = record . get ( "messageId" , "unknown" )
108+ sqs_message_id = _get_message_id ( record )
77109 immunisation_id = None
78110
79111 try :
80- sqs_event_body = record . get ( "body" , {} )
112+ sqs_event_body = _get_body ( record )
81113 if isinstance (sqs_event_body , str ):
82114 sqs_event_body = json .loads (sqs_event_body )
83115
0 commit comments