-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathtest_send_sqs_message.py
More file actions
119 lines (99 loc) · 5.58 KB
/
test_send_sqs_message.py
File metadata and controls
119 lines (99 loc) · 5.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""Tests for send_sqs_message functions"""
from copy import deepcopy
from json import loads as json_loads
from unittest import TestCase
from unittest.mock import patch
from moto import mock_aws
from utils_for_tests.mock_environment_variables import MOCK_ENVIRONMENT_DICT, Sqs
from utils_for_tests.utils_for_filenameprocessor_tests import create_boto3_clients, reset_common_clients
from utils_for_tests.values_for_tests import MockFileDetails
# Ensure environment variables are mocked before importing from src files
with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT):
from models.errors import UnhandledSqsError
from send_sqs_message import make_and_send_sqs_message, send_to_supplier_queue
sqs_client = None
FLU_EMIS_FILE_DETAILS = MockFileDetails.emis_flu
RSV_RAVS_FILE_DETAILS = MockFileDetails.ravs_rsv_1
NON_EXISTENT_QUEUE_ERROR_MESSAGE = (
"An unexpected error occurred whilst sending to SQS: An error occurred "
"(AWS.SimpleQueueService.NonExistentQueue) when calling the SendMessage operation"
)
@mock_aws
@patch.dict("os.environ", MOCK_ENVIRONMENT_DICT)
class TestSendSQSMessage(TestCase):
"""Tests for send_sqs_message functions"""
def setUp(self):
global sqs_client
reset_common_clients()
(sqs_client,) = create_boto3_clients("sqs")
def test_send_to_supplier_queue_success(self):
"""Test send_to_supplier_queue function for a successful message send"""
# Set up the sqs_queue
queue_url = sqs_client.create_queue(QueueName=Sqs.QUEUE_NAME, Attributes=Sqs.ATTRIBUTES)["QueueUrl"]
# Send three separate messages to the queue to test that they are all received and appropriately
# partitioned by supplier and vaccine_type
flu_emis_1 = deepcopy(FLU_EMIS_FILE_DETAILS)
flu_emis_2 = deepcopy(FLU_EMIS_FILE_DETAILS)
flu_emis_2.sqs_message_body["message_id"] = "flu_emis_test_id_2"
rsv_ravs_1 = deepcopy(RSV_RAVS_FILE_DETAILS)
for file_details in [flu_emis_1, rsv_ravs_1, flu_emis_2]:
self.assertIsNone(
send_to_supplier_queue(
message_body=deepcopy(file_details.sqs_message_body),
vaccine_type=file_details.vaccine_type,
supplier=file_details.supplier,
)
)
# Check that the FIFO queue contains the expected messages, in correct order, and with correct MessageGroupId
received_messages = sqs_client.receive_message(
QueueUrl=queue_url, MaxNumberOfMessages=10, AttributeNames=["All"]
)["Messages"]
self.assertEqual(len(received_messages), 3)
self.assertEqual(json_loads(received_messages[0]["Body"]), flu_emis_1.sqs_message_body)
self.assertEqual(received_messages[0]["Attributes"]["MessageGroupId"], flu_emis_1.queue_name)
self.assertEqual(json_loads(received_messages[1]["Body"]), rsv_ravs_1.sqs_message_body)
self.assertEqual(received_messages[1]["Attributes"]["MessageGroupId"], rsv_ravs_1.queue_name)
self.assertEqual(json_loads(received_messages[2]["Body"]), flu_emis_2.sqs_message_body)
self.assertEqual(received_messages[2]["Attributes"]["MessageGroupId"], flu_emis_2.queue_name)
def test_send_to_supplier_queue_failure_due_to_queue_does_not_exist(self):
"""Test send_to_supplier_queue function for a failed message send due to queue not existing"""
with self.assertRaises(UnhandledSqsError) as context:
send_to_supplier_queue(
message_body=deepcopy(FLU_EMIS_FILE_DETAILS.sqs_message_body),
vaccine_type=FLU_EMIS_FILE_DETAILS.vaccine_type,
supplier=FLU_EMIS_FILE_DETAILS.supplier,
)
self.assertIn(NON_EXISTENT_QUEUE_ERROR_MESSAGE, str(context.exception))
def test_make_and_send_sqs_message_success(self):
"""Test make_and_send_sqs_message function for a successful message send"""
# Create a mock SQS queue
queue_url = sqs_client.create_queue(QueueName=Sqs.QUEUE_NAME, Attributes=Sqs.ATTRIBUTES)["QueueUrl"]
# Call the send_to_supplier_queue function
self.assertIsNone(
make_and_send_sqs_message(
file_key=FLU_EMIS_FILE_DETAILS.file_key,
message_id=FLU_EMIS_FILE_DETAILS.message_id,
permission=deepcopy(FLU_EMIS_FILE_DETAILS.permissions_list),
vaccine_type=FLU_EMIS_FILE_DETAILS.vaccine_type,
supplier=FLU_EMIS_FILE_DETAILS.supplier,
created_at_formatted_string=FLU_EMIS_FILE_DETAILS.created_at_formatted_string,
)
)
# Assert that correct message has reached the queue
messages = sqs_client.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=1)
self.assertEqual(
json_loads(messages["Messages"][0]["Body"]),
deepcopy(FLU_EMIS_FILE_DETAILS.sqs_message_body),
)
def test_make_and_send_sqs_message_failure(self):
"""Test make_and_send_sqs_message function for a failure due to queue not existing"""
with self.assertRaises(UnhandledSqsError) as context:
make_and_send_sqs_message(
file_key=FLU_EMIS_FILE_DETAILS.file_key,
message_id=FLU_EMIS_FILE_DETAILS.message_id,
permission=deepcopy(FLU_EMIS_FILE_DETAILS.permissions_list),
vaccine_type=FLU_EMIS_FILE_DETAILS.vaccine_type,
supplier=FLU_EMIS_FILE_DETAILS.supplier,
created_at_formatted_string=FLU_EMIS_FILE_DETAILS.created_at_formatted_string,
)
self.assertIn(NON_EXISTENT_QUEUE_ERROR_MESSAGE, str(context.exception))