Skip to content

Commit 734d0a7

Browse files
committed
VED-294: Split handler into multiple functions for readability.
1 parent 7cbebb0 commit 734d0a7

2 files changed

Lines changed: 242 additions & 131 deletions

File tree

delta_backend/src/delta.py

Lines changed: 234 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import time
66
from datetime import datetime, timedelta, UTC
7+
from unittest import case
78

89
import boto3
910
from boto3.dynamodb.conditions import Attr
@@ -63,9 +64,18 @@ def send_message(record, queue_url=failure_queue_url):
6364
except Exception as e:
6465
logger.error(f"Error sending record to DLQ: {e}")
6566

66-
def get_vaccine_type(patientsk) -> str:
67-
parsed = [str.strip(str.lower(s)) for s in patientsk.split("#")]
68-
return parsed[0]
67+
def get_vaccine_type(patient_sort_key: str) -> str:
68+
vaccine_type = patient_sort_key.split("#")[0]
69+
return str.strip(str.lower(vaccine_type))
70+
71+
def get_imms_id(primary_key: str) -> str:
72+
return primary_key.split("#")[1]
73+
74+
def get_creation_and_expiry_times(creation_timestamp: float) -> (str, int):
75+
creation_datetime = datetime.fromtimestamp(creation_timestamp, UTC)
76+
expiry_datetime = creation_datetime + timedelta(days=30)
77+
expiry_timestamp = int(expiry_datetime.timestamp())
78+
return creation_datetime.isoformat(), expiry_timestamp
6979

7080
def send_firehose(log_data):
7181
try:
@@ -74,133 +84,238 @@ def send_firehose(log_data):
7484
except Exception as e:
7585
logger.error(f"Error sending log to Firehose: {e}")
7686

77-
def process_record(record, log_data):
78-
ret = True
79-
operation_outcome = {}
87+
def handle_dynamodb_response(response, error_records):
88+
match response:
89+
case {"ResponseMetadata": {"HTTPStatusCode": 200}} if error_records:
90+
logger.warning(f"Partial success: successfully synced into delta, but issues found within record: {json.dumps(error_records)}")
91+
return True, {"statusCode": "207", "statusDesc": f"Partial success: successfully synced into delta, but issues found within record", "diagnostics": error_records}
92+
case {"ResponseMetadata": {"HTTPStatusCode": 200}}:
93+
logger.info("Successfully synched into delta")
94+
return True, {"statusCode": "200", "statusDesc": "Successfully synched into delta"}
95+
case _:
96+
logger.exception("Failure response from DynamoDB")
97+
return False, {"statusCode": "500", "statusDesc": "Failure response from DynamoDB", "diagnostics": response}
98+
99+
def handle_exception_response(response):
100+
match response:
101+
case ClientError(response={"Error": {"Code": "ConditionalCheckFailedException"}}):
102+
logger.info("Skipped record already present in delta")
103+
return True, {"statusCode": "200", "statusDesc": "Skipped record already present in delta"}
104+
case _:
105+
logger.exception("Exception during processing")
106+
return False, {"statusCode": "500", "statusDesc": "Exception", "diagnostics": response}
107+
108+
# def process_record_test(record):
109+
# try:
110+
# match record:
111+
# case {
112+
# "eventName": EventName.DELETE_PHYSICAL,
113+
# "eventId": event_id,
114+
# "dynamodb": {
115+
# "Keys": {"PK": {"S": primary_key}},
116+
# "ApproximateCreationDateTime": approximate_creation_timestamp
117+
# }
118+
# }:
119+
# imms_id = get_imms_id(primary_key)
120+
# creation_datetime, expiry_timestamp = get_creation_and_expiry_times(approximate_creation_timestamp)
121+
# # process_removal(event_id, primary_key, approximate_creation_time)
122+
# try:
123+
# response = get_delta_table().put_item(
124+
# Item={
125+
# "PK": event_id,
126+
# "ImmsID": imms_id,
127+
# "Operation": Operation.DELETE_PHYSICAL,
128+
# "VaccineType": "default",
129+
# "SupplierSystem": "default",
130+
# "DateTimeStamp": creation_datetime,
131+
# "Source": delta_source,
132+
# "Imms": "",
133+
# "ExpiresAt": expiry_timestamp,
134+
# },
135+
# ConditionExpression=Attr("PK").not_exists(),
136+
# )
137+
# return handle_dynamodb_response(response, None)
138+
# except Exception as e:
139+
# return handle_exception_response(e)
140+
#
141+
# case {
142+
# "eventId": event_id,
143+
# "dynamodb": {
144+
# "NewImage": {
145+
# "PK": {"S": primary_key},
146+
# "SupplierSystem": {"S": supplier_system},
147+
# },
148+
# }
149+
# } if supplier_system in ("DPSFULL", "DPSREDUCED"):
150+
# imms_id = get_imms_id(primary_key)
151+
# return True, {"statusCode": "200", "statusDesc": "Record from DPS skipped"}
152+
#
153+
# case {
154+
# "eventId": event_id,
155+
# "dynamodb": {
156+
# "NewImage": {
157+
# "PK": {"S": primary_key},
158+
# "PatientSK": {"S": patient_sort_key},
159+
# "SupplierSystem": {"S": supplier_system},
160+
# "Operation": {"S": operation},
161+
# "Resource": {"S": resource_str},
162+
# },
163+
# "ApproximateCreationDateTime": approximate_creation_timestamp
164+
# }
165+
# }:
166+
# imms_id = get_imms_id(primary_key)
167+
# vaccine_type = get_vaccine_type(patient_sort_key)
168+
# creation_datetime, expiry_timestamp = get_creation_and_expiry_times(approximate_creation_timestamp)
169+
# action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation
170+
# resource = json.loads(resource_str, parse_float=decimal.Decimal)
171+
# fhir_converter = Converter(resource, action_flag=action_flag)
172+
# flat_json = fhir_converter.run_conversion()
173+
# error_records = fhir_converter.get_error_records()
174+
# # process_insert_update_delete(event_id, primary_key, approximate_creation_time)
175+
# try:
176+
# response = get_delta_table().put_item(
177+
# Item={
178+
# "PK": event_id,
179+
# "ImmsID": imms_id,
180+
# "Operation": operation,
181+
# "VaccineType": vaccine_type,
182+
# "SupplierSystem": supplier_system,
183+
# "DateTimeStamp": creation_datetime,
184+
# "Source": delta_source,
185+
# "Imms": flat_json,
186+
# "ExpiresAt": expiry_timestamp,
187+
# },
188+
# ConditionExpression=Attr("PK").not_exists(),
189+
# )
190+
# return handle_dynamodb_response(response, error_records)
191+
# except Exception as e:
192+
# return handle_exception_response(e)
193+
#
194+
# case _:
195+
# return False, {"statusCode": "500", "statusDesc": "Unhandled record format", "diagnostics": record}
196+
# except Exception as e:
197+
# return False, {"statusCode": "500", "statusDesc": "Exception", "diagnostics": e}
198+
199+
def handle_removal(record):
200+
event_id = record["eventID"]
201+
primary_key = record["dynamodb"]["Keys"]["PK"]["S"]
202+
imms_id = get_imms_id(primary_key)
203+
operation = Operation.DELETE_PHYSICAL
204+
creation_timestamp = record["dynamodb"]["ApproximateCreationDateTime"]
205+
creation_datetime_str, expiry_timestamp = get_creation_and_expiry_times(creation_timestamp)
206+
operation_outcome = {"operation_type": operation, "record": imms_id}
207+
try:
208+
response = get_delta_table().put_item(
209+
Item={
210+
"PK": event_id,
211+
"ImmsID": imms_id,
212+
"Operation": operation,
213+
"VaccineType": "default",
214+
"SupplierSystem": "default",
215+
"DateTimeStamp": creation_datetime_str,
216+
"Source": delta_source,
217+
"Imms": "",
218+
"ExpiresAt": expiry_timestamp,
219+
},
220+
ConditionExpression=Attr("PK").not_exists(),
221+
)
222+
success, extra_log_fields = handle_dynamodb_response(response, None)
223+
operation_outcome.update(extra_log_fields)
224+
return success, operation_outcome
225+
except Exception as e:
226+
success, extra_log_fields = handle_exception_response(e)
227+
operation_outcome.update(extra_log_fields)
228+
return success, operation_outcome
229+
230+
def handle_skipped(record):
231+
primary_key = record["dynamodb"]["NewImage"]["PK"]["S"]
232+
imms_id = get_imms_id(primary_key)
233+
logger.info("Record from DPS skipped")
234+
return True, {"record": imms_id, "statusCode": "200", "statusDesc": "Record from DPS skipped"}
235+
236+
def handle_create_update_delete(record):
237+
event_id = record["eventID"]
238+
new_image = record["dynamodb"]["NewImage"]
239+
primary_key = new_image["PK"]["S"]
240+
imms_id = get_imms_id(primary_key)
241+
operation = new_image["Operation"]["S"]
242+
vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"])
243+
supplier_system = new_image["SupplierSystem"]["S"]
244+
creation_timestamp = record["dynamodb"]["ApproximateCreationDateTime"]
245+
creation_datetime_str, expiry_timestamp = get_creation_and_expiry_times(creation_timestamp)
246+
action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation
247+
resource_json = json.loads(new_image["Resource"]["S"], parse_float=decimal.Decimal)
248+
fhir_converter = Converter(resource_json, action_flag=action_flag)
249+
flat_json = fhir_converter.run_conversion()
250+
error_records = fhir_converter.get_error_records()
251+
operation_outcome = {"record": imms_id, "operation_type": operation}
252+
80253
try:
81-
start = time.time()
82-
error_records = []
83-
response = str()
84-
imms_id = str()
85-
operation = str()
86-
approximate_creation_time = datetime.fromtimestamp(record["dynamodb"]["ApproximateCreationDateTime"], UTC)
87-
expiry_time = approximate_creation_time + timedelta(days=30)
88-
expiry_time_epoch = int(expiry_time.timestamp())
89-
delta_table = get_delta_table()
90-
91-
event_id = record["eventID"]
92-
if record["eventName"] != EventName.DELETE_PHYSICAL:
93-
new_image = record["dynamodb"]["NewImage"]
94-
imms_id = new_image["PK"]["S"].split("#")[1]
95-
operation_outcome["record"] = imms_id
96-
vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"])
97-
supplier_system = new_image["SupplierSystem"]["S"]
98-
if supplier_system not in ("DPSFULL", "DPSREDUCED"):
99-
operation = new_image["Operation"]["S"]
100-
operation_outcome["operation_type"] = operation
101-
action_flag = ActionFlag.CREATE if operation == Operation.CREATE else operation
102-
resource_json = json.loads(new_image["Resource"]["S"], parse_float=decimal.Decimal)
103-
fhir_converter = Converter(resource_json, action_flag=action_flag)
104-
flat_json = fhir_converter.run_conversion()
105-
error_records = fhir_converter.get_error_records()
106-
response = delta_table.put_item(
107-
Item={
108-
"PK": event_id,
109-
"ImmsID": imms_id,
110-
"Operation": operation,
111-
"VaccineType": vaccine_type,
112-
"SupplierSystem": supplier_system,
113-
"DateTimeStamp": approximate_creation_time.isoformat(),
114-
"Source": delta_source,
115-
"Imms": flat_json,
116-
"ExpiresAt": expiry_time_epoch,
117-
},
118-
ConditionExpression=Attr("PK").not_exists(),
119-
)
120-
else:
121-
operation_outcome["statusCode"] = "200"
122-
operation_outcome["statusDesc"] = "Record from DPS skipped"
123-
log_data["operation_outcome"] = operation_outcome
124-
logger.info(f"Record from DPS skipped for {imms_id}")
125-
return True, log_data
126-
else:
127-
operation = Operation.DELETE_PHYSICAL
128-
operation_outcome["operation_type"] = operation
129-
new_image = record["dynamodb"]["Keys"]
130-
logger.info(f"Record to delta:{new_image}")
131-
imms_id = new_image["PK"]["S"].split("#")[1]
132-
operation_outcome["record"] = imms_id
133-
response = delta_table.put_item(
134-
Item={
135-
"PK": event_id,
136-
"ImmsID": imms_id,
137-
"Operation": operation,
138-
"VaccineType": "default",
139-
"SupplierSystem": "default",
140-
"DateTimeStamp": approximate_creation_time.isoformat(),
141-
"Source": delta_source,
142-
"Imms": "",
143-
"ExpiresAt": expiry_time_epoch,
144-
},
145-
ConditionExpression=Attr("PK").not_exists(),
146-
)
147-
148-
if response["ResponseMetadata"]["HTTPStatusCode"] == 200:
149-
if error_records:
150-
log = f"Partial success: successfully synced into delta, but issues found within record {imms_id}"
151-
operation_outcome["statusCode"] = "207"
152-
operation_outcome["statusDesc"] = (
153-
f"Partial success: successfully synced into delta, but issues found within record {json.dumps(error_records)}"
154-
)
155-
else:
156-
log = f"Record Successfully created for {imms_id}"
157-
operation_outcome["statusCode"] = "200"
158-
operation_outcome["statusDesc"] = "Successfully synched into delta"
159-
logger.info(log)
160-
else:
161-
log = f"Record NOT created for {imms_id}"
162-
operation_outcome["statusCode"] = "500"
163-
operation_outcome["statusDesc"] = "Failure response from DynamoDB"
164-
logger.warning(log)
165-
ret = False
254+
response = get_delta_table().put_item(
255+
Item={
256+
"PK": event_id,
257+
"ImmsID": imms_id,
258+
"Operation": operation,
259+
"VaccineType": vaccine_type,
260+
"SupplierSystem": supplier_system,
261+
"DateTimeStamp": creation_datetime_str,
262+
"Source": delta_source,
263+
"Imms": flat_json,
264+
"ExpiresAt": expiry_timestamp,
265+
},
266+
ConditionExpression=Attr("PK").not_exists(),
267+
)
268+
success, extra_log_fields = handle_dynamodb_response(response, error_records)
269+
operation_outcome.update(extra_log_fields)
270+
return success, operation_outcome
166271
except Exception as e:
167-
if isinstance(e, ClientError) and e.response["Error"]["Code"] == "ConditionalCheckFailedException":
168-
operation_outcome["statusCode"] = "200"
169-
operation_outcome["statusDesc"] = "Skipped record already present in delta"
170-
logger.info(f"Skipped record {event_id} already present in delta for {imms_id}")
171-
else:
172-
operation_outcome["statusCode"] = "500"
173-
operation_outcome["statusDesc"] = "Exception"
174-
logger.exception(f"Error processing record: {e}")
175-
ret = False
176-
177-
end = time.time()
178-
log_data["time_taken"] = f"{round(end - start, 5)}s"
179-
log_data["operation_outcome"] = operation_outcome
180-
return ret, log_data
272+
success, extra_log_fields = handle_exception_response(e)
273+
operation_outcome.update(extra_log_fields)
274+
return success, operation_outcome
275+
276+
def process_record(record):
277+
try:
278+
if record["eventName"] == EventName.DELETE_PHYSICAL:
279+
return handle_removal(record)
280+
281+
supplier_system = record["dynamodb"]["NewImage"]["SupplierSystem"]["S"]
282+
if supplier_system in ("DPSFULL", "DPSREDUCED"):
283+
return handle_skipped(record)
284+
285+
return handle_create_update_delete(record)
286+
except Exception as e:
287+
logger.exception("Exception during processing")
288+
return False, {"statusCode": "500", "statusDesc": "Exception", "diagnostics": e}
181289

182290
def handler(event, _context):
183-
ret = True
291+
overall_success = True
184292
logger.info("Starting Delta Handler")
185-
log_data = dict()
186-
log_data["function_name"] = "delta_sync"
187293
try:
188294
for record in event["Records"]:
189-
log_data["date_time"] = str(datetime.now())
190-
result, log_data = process_record(record, log_data)
191-
send_firehose(log_data)
192-
if not result:
193-
ret = False
194-
295+
datetime_str = datetime.now().isoformat()
296+
start = time.time()
297+
success, operation_outcome = process_record(record)
298+
overall_success = overall_success and success
299+
end = time.time()
300+
send_firehose({
301+
"function_name": "delta_sync",
302+
"operation_outcome": operation_outcome,
303+
"date_time": datetime_str,
304+
"time_taken": f"{round(end - start, 5)}s"
305+
})
195306
except Exception:
196-
ret = False
307+
overall_success = False
197308
operation_outcome = {
198309
"statusCode": "500",
199310
"statusDesc": "Exception",
200-
"diagnostics": f"Delta Lambda failure: Incorrect invocation of Lambda"
311+
"diagnostics": "Delta Lambda failure: Incorrect invocation of Lambda"
201312
}
202313
logger.exception(operation_outcome["diagnostics"])
203314
send_message(event) # Send failed records to DLQ
204-
log_data["operation_outcome"] = operation_outcome
205-
send_firehose(log_data)
206-
return ret
315+
send_firehose({"function_name": "delta_sync", "operation_outcome": operation_outcome})
316+
317+
# TODO - should we be doing this too?
318+
# if not overall_success:
319+
# send_message(event)
320+
321+
return overall_success

0 commit comments

Comments
 (0)