-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmock_mns_service.py
More file actions
36 lines (29 loc) · 1.17 KB
/
mock_mns_service.py
File metadata and controls
36 lines (29 loc) · 1.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import json
import os
import boto3
from common.clients import logger
REGION_NAME = os.getenv("AWS_REGION", "eu-west-2")
class MockMnsService:
def __init__(self, queue_url):
self.queue_url = queue_url
self.sqs_client = self._get_sqs_client()
logger.info(f"MockMnsService initialized with queue: {queue_url}")
def _get_sqs_client(self):
return boto3.client("sqs", region_name=REGION_NAME)
def publish_notification(self, mns_payload: dict) -> None:
"""
Send MNS notification payload to test SQS queue as fallback.
Args: payload: MNS notification payload
"""
try:
response = self.sqs_client.send_message(
QueueUrl=self.queue_url,
MessageBody=json.dumps(mns_payload),
MessageAttributes={"source": {"StringValue": "mns-publisher-lambda", "DataType": "String"}},
)
logger.info(
"Mock MNS: Successfully sent notification to test queue", extra={"message_id": response["MessageId"]}
)
except Exception:
logger.exception("Mock MNS: Failed to send to test SQS queue")
raise