Skip to content

Commit e95c619

Browse files
committed
VED-294: Fix potential duplication in the delta during blue / green switchover.
1 parent 2836c5b commit e95c619

3 files changed

Lines changed: 64 additions & 29 deletions

File tree

delta_backend/src/delta.py

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import decimal
2-
3-
import boto3
42
import json
3+
import logging
54
import os
65
import time
7-
from datetime import datetime, timedelta
8-
import uuid
9-
import logging
6+
from datetime import datetime, timedelta, UTC
7+
8+
import boto3
9+
from boto3.dynamodb.conditions import Attr
1010
from botocore.exceptions import ClientError
11-
from log_firehose import FirehoseLogger
12-
from converter import Converter
11+
1312
from common.mappings import ActionFlag, Operation, EventName
13+
from converter import Converter
14+
from log_firehose import FirehoseLogger
1415

1516
failure_queue_url = os.environ["AWS_SQS_QUEUE_URL"]
1617
delta_table_name = os.environ["DELTA_TABLE_NAME"]
@@ -75,33 +76,36 @@ def send_firehose(log_data):
7576

7677
def process_record(record, log_data):
7778
ret = True
79+
operation_outcome = {}
7880
try:
7981
start = time.time()
80-
operation_outcome = {}
8182
error_records = []
8283
response = str()
8384
imms_id = str()
8485
operation = str()
85-
approximate_creation_time = datetime.utcfromtimestamp(record["dynamodb"]["ApproximateCreationDateTime"])
86+
approximate_creation_time = datetime.fromtimestamp(record["dynamodb"]["ApproximateCreationDateTime"], UTC)
8687
expiry_time = approximate_creation_time + timedelta(days=30)
8788
expiry_time_epoch = int(expiry_time.timestamp())
8889
delta_table = get_delta_table()
8990

91+
event_id = record["eventID"]
9092
if record["eventName"] != EventName.DELETE_PHYSICAL:
9193
new_image = record["dynamodb"]["NewImage"]
9294
imms_id = new_image["PK"]["S"].split("#")[1]
95+
operation_outcome["record"] = imms_id
9396
vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"])
9497
supplier_system = new_image["SupplierSystem"]["S"]
9598
if supplier_system not in ("DPSFULL", "DPSREDUCED"):
9699
operation = new_image["Operation"]["S"]
100+
operation_outcome["operation_type"] = operation
97101
action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation
98102
resource_json = json.loads(new_image["Resource"]["S"], parse_float=decimal.Decimal)
99-
FHIRConverter = Converter(resource_json, action_flag=action_flag)
100-
flat_json = FHIRConverter.run_conversion()
101-
error_records = FHIRConverter.get_error_records()
103+
fhir_converter = Converter(resource_json, action_flag=action_flag)
104+
flat_json = fhir_converter.run_conversion()
105+
error_records = fhir_converter.get_error_records()
102106
response = delta_table.put_item(
103107
Item={
104-
"PK": str(uuid.uuid4()),
108+
"PK": event_id,
105109
"ImmsID": imms_id,
106110
"Operation": operation,
107111
"VaccineType": vaccine_type,
@@ -110,7 +114,8 @@ def process_record(record, log_data):
110114
"Source": delta_source,
111115
"Imms": flat_json,
112116
"ExpiresAt": expiry_time_epoch,
113-
}
117+
},
118+
ConditionExpression=Attr("PK").not_exists(),
114119
)
115120
else:
116121
operation_outcome["statusCode"] = "200"
@@ -120,12 +125,14 @@ def process_record(record, log_data):
120125
return True, log_data
121126
else:
122127
operation = Operation.DELETE_PHYSICAL
128+
operation_outcome["operation_type"] = operation
123129
new_image = record["dynamodb"]["Keys"]
124130
logger.info(f"Record to delta:{new_image}")
125131
imms_id = new_image["PK"]["S"].split("#")[1]
132+
operation_outcome["record"] = imms_id
126133
response = delta_table.put_item(
127134
Item={
128-
"PK": str(uuid.uuid4()),
135+
"PK": event_id,
129136
"ImmsID": imms_id,
130137
"Operation": operation,
131138
"VaccineType": "default",
@@ -134,11 +141,10 @@ def process_record(record, log_data):
134141
"Source": delta_source,
135142
"Imms": "",
136143
"ExpiresAt": expiry_time_epoch,
137-
}
144+
},
145+
ConditionExpression=Attr("PK").not_exists(),
138146
)
139-
end = time.time()
140-
log_data["time_taken"] = f"{round(end - start, 5)}s"
141-
operation_outcome = {"record": imms_id, "operation_type": operation}
147+
142148
if response["ResponseMetadata"]["HTTPStatusCode"] == 200:
143149
if error_records:
144150
log = f"Partial success: successfully synced into delta, but issues found within record {imms_id}"
@@ -154,23 +160,29 @@ def process_record(record, log_data):
154160
else:
155161
log = f"Record NOT created for {imms_id}"
156162
operation_outcome["statusCode"] = "500"
157-
operation_outcome["statusDesc"] = "Exception"
163+
operation_outcome["statusDesc"] = "Failure response from DynamoDB"
158164
logger.warning(log)
159165
ret = False
160166
except Exception as e:
161-
operation_outcome["statusCode"] = "500"
162-
operation_outcome["statusDesc"] = "Exception"
163-
logger.exception(f"Error processing record: {e}")
164-
ret = False
167+
if isinstance(e, ClientError) and e.response["Error"]["Code"] == "ConditionalCheckFailedException":
168+
operation_outcome["statusCode"] = "200"
169+
operation_outcome["statusDesc"] = "Skipped record already present in delta"
170+
logger.info(f"Skipped record {event_id} already present in delta for {imms_id}")
171+
else:
172+
operation_outcome["statusCode"] = "500"
173+
operation_outcome["statusDesc"] = "Exception"
174+
logger.exception(f"Error processing record: {e}")
175+
ret = False
165176

177+
end = time.time()
178+
log_data["time_taken"] = f"{round(end - start, 5)}s"
166179
log_data["operation_outcome"] = operation_outcome
167180
return ret, log_data
168181

169-
def handler(event, context):
182+
def handler(event, _context):
170183
ret = True
171184
logger.info("Starting Delta Handler")
172185
log_data = dict()
173-
operation_outcome = dict()
174186
log_data["function_name"] = "delta_sync"
175187
try:
176188
for record in event["Records"]:
@@ -180,7 +192,7 @@ def handler(event, context):
180192
if not result:
181193
ret = False
182194

183-
except Exception as e:
195+
except Exception:
184196
ret = False
185197
operation_outcome = {
186198
"statusCode": "500",

delta_backend/tests/test_delta.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
from delta import send_message, handler, process_record # Import after setting environment variables
1717

1818
SUCCESS_RESPONSE = {"ResponseMetadata": {"HTTPStatusCode": 200}}
19-
EXCEPTION_RESPONSE = ClientError({"Error": {"Code": "ConditionalCheckFailedException"}}, "PutItem")
19+
DUPLICATE_RESPONSE = ClientError({"Error": {"Code": "ConditionalCheckFailedException"}}, "PutItem")
20+
EXCEPTION_RESPONSE = ClientError({"Error": {"Code": "InternalServerError"}}, "PutItem")
2021
FAIL_RESPONSE = {"ResponseMetadata": {"HTTPStatusCode": 500}}
2122

2223
class DeltaHandlerTestCase(unittest.TestCase):
@@ -374,6 +375,25 @@ def test_single_exception_in_multi(self):
374375
self.assertEqual(self.mock_delta_table.put_item.call_count, len(records_config))
375376
self.assertEqual(self.mock_firehose_logger.send_log.call_count, len(records_config))
376377

378+
def test_single_duplicate_in_multi(self):
379+
# Arrange
380+
self.mock_delta_table.put_item.side_effect = [SUCCESS_RESPONSE, DUPLICATE_RESPONSE, SUCCESS_RESPONSE]
381+
382+
records_config = [
383+
RecordConfig(EventName.CREATE, Operation.CREATE, "ok-id2.1", ActionFlag.CREATE),
384+
RecordConfig(EventName.UPDATE, Operation.UPDATE, "duplicate-id2.2", ActionFlag.UPDATE),
385+
RecordConfig(EventName.DELETE_PHYSICAL, Operation.DELETE_PHYSICAL, "ok-id2.3"),
386+
]
387+
event = ValuesForTests.get_multi_record_event(records_config)
388+
389+
# Act
390+
result = handler(event, None)
391+
392+
# Assert
393+
self.assertTrue(result)
394+
self.assertEqual(self.mock_delta_table.put_item.call_count, len(records_config))
395+
self.assertEqual(self.mock_firehose_logger.send_log.call_count, len(records_config))
396+
377397
@patch("delta.process_record")
378398
@patch("delta.send_firehose")
379399
def test_handler_calls_process_record_for_each_event(self, mock_send_firehose, mock_process_record):

delta_backend/tests/utils_for_converter_tests.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import uuid
12
from decimal import Decimal
23
import json
34
from common.mappings import EventName, Operation
@@ -171,7 +172,8 @@ def get_multi_record_event(records_config: List[RecordConfig]):
171172
def get_event_record(imms_id, event_name, operation, supplier="EMIS"):
172173
pk = f"covid#{imms_id}"
173174
if operation != Operation.DELETE_PHYSICAL:
174-
return{
175+
return {
176+
"eventID": str(uuid.uuid4()),
175177
"eventName": event_name,
176178
"dynamodb": {
177179
"ApproximateCreationDateTime": 1690896000,
@@ -187,6 +189,7 @@ def get_event_record(imms_id, event_name, operation, supplier="EMIS"):
187189
}
188190
else:
189191
return {
192+
"eventID": str(uuid.uuid4()),
190193
"eventName": event_name,
191194
"dynamodb": {
192195
"ApproximateCreationDateTime": 1690896000,

0 commit comments

Comments
 (0)