Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 23 additions & 27 deletions lambdas/delta_backend/src/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,19 @@


class Converter:
def __init__(self, fhir_data, action_flag=ActionFlag.UPDATE, report_unexpected_exception=True):
def __init__(self, fhir_data, action_flag=ActionFlag.UPDATE):
self.converted = {}
self.error_records = []
self.action_flag = action_flag
self.report_unexpected_exception = report_unexpected_exception

try:
if not fhir_data:
raise ValueError("FHIR data is required for initialization.")
if not fhir_data:
raise ValueError("FHIR data is required for initialization.")

self.extractor = Extractor(fhir_data, self.report_unexpected_exception)
self.conversion_layout = ConversionLayout(self.extractor)
except Exception as e:
if report_unexpected_exception:
self._log_error(f"Initialization failed: [{e.__class__.__name__}] {e}")
raise
self.extractor = Extractor(fhir_data)
self.conversion_layout = ConversionLayout(self.extractor)

def run_conversion(self):
Comment thread
Thomas-Boyle marked this conversation as resolved.
Outdated
conversions = self.conversion_layout.get_conversion_layout()

for conversion in conversions:
for conversion in self.conversion_layout.get_conversion_layout():
self._convert_data(conversion)

self.error_records.extend(self.extractor.get_error_records())
Expand All @@ -36,28 +28,32 @@ def run_conversion(self):
return self.converted

def _convert_data(self, conversion: ConversionField):
try:
flat_field = conversion.field_name_flat
flat_field = conversion.field_name_flat

try:
if flat_field == "ACTION_FLAG":
self.converted[flat_field] = self.action_flag
else:
converted = conversion.expression_rule()
if converted is not None:
self.converted[flat_field] = converted
return

except Exception as e:
if (converted := conversion.expression_rule()) is not None:
self.converted[flat_field] = converted
except Exception as error:
self._log_error(
f"Conversion error [{e.__class__.__name__}]: {e}",
flat_field,
f"Conversion error [{error.__class__.__name__}]: {error}",
code=exception_messages.PARSING_ERROR,
)
self.converted[flat_field] = ""

def _log_error(self, e, code=exception_messages.UNEXPECTED_EXCEPTION):
error_obj = {"code": code, "message": str(e)}

if self.report_unexpected_exception:
self.error_records.append(error_obj)
def _log_error(self, field_name, e, code):
self.error_records.append(
{
"code": code,
"field": field_name,
"value": None,
"message": str(e),
}
)

def get_error_records(self):
return self.error_records
34 changes: 15 additions & 19 deletions lambdas/delta_backend/src/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ class Extractor:
DATE_CONVERT_FORMAT = "%Y%m%d"
DEFAULT_POSTCODE = "ZZ99 3CZ"

def __init__(self, fhir_json_data, report_unexpected_exception=True):
def __init__(self, fhir_json_data):
self.fhir_json_data = (
json.loads(fhir_json_data, parse_float=decimal.Decimal)
if isinstance(fhir_json_data, str)
else fhir_json_data
)
self.report_unexpected_exception = report_unexpected_exception
self.error_records = []

def _get_patient(self):
Expand Down Expand Up @@ -174,23 +173,20 @@ def _get_site_information(self):
return site_code, site_code_type_uri

def _log_error(self, field_name, field_value, e, code=exception_messages.RECORD_CHECK_FAILED):
if self.report_unexpected_exception:
if isinstance(e, Exception):
message = exception_messages.MESSAGES[exception_messages.UNEXPECTED_EXCEPTION] % (
e.__class__.__name__,
str(e),
)
else:
message = str(e)

self.error_records.append(
{
"code": code,
"field": field_name,
"value": field_value,
"message": message,
}
)
message = (
exception_messages.MESSAGES[exception_messages.UNEXPECTED_EXCEPTION] % (e.__class__.__name__, str(e))
if isinstance(e, Exception)
else str(e)
)

self.error_records.append(
{
"code": code,
"field": field_name,
"value": field_value,
"message": message,
}
)

def _convert_date(self, field_name, date, format) -> str:
"""
Expand Down
86 changes: 35 additions & 51 deletions lambdas/delta_backend/tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ def get_event(event_name=EventName.CREATE, operation="operation", supplier="EMIS
"""Returns test event data."""
return ValuesForTests.get_event(event_name, operation, supplier)

def assert_structured_error_records(self, error_records):
self.assertTrue(error_records)

for error_record in error_records:
self.assertEqual(set(error_record), {"code", "field", "value", "message"})
self.assertTrue(error_record["message"])

def assert_dynamodb_record(
self,
operation_flag,
Expand Down Expand Up @@ -139,68 +146,47 @@ def test_fhir_converter_json_direct_data(self):
json_data = json.dumps(ValuesForTests.json_data)

fhir_converter = Converter(json_data)
FlatFile = fhir_converter.run_conversion()
result = fhir_converter.run_conversion()

flatJSON = json.dumps(FlatFile)
expected_imms_value = deepcopy(ValuesForTests.expected_imms2) # UPDATE is currently the default action-flag
expected_imms = json.dumps(expected_imms_value)
self.assertEqual(flatJSON, expected_imms)

errorRecords = fhir_converter.get_error_records()
self.assertEqual(result, expected_imms_value)
self.assertFalse(fhir_converter.get_error_records())

self.assertEqual(len(errorRecords), 0)

def test_fhir_converter_json_error_scenario_reporting_on(self):
def test_fhir_converter_json_error_scenario(self):
"""it should convert fhir json data to flat json - error scenarios"""
error_test_cases = [
ErrorValuesForTests.missing_json,
ErrorValuesForTests.json_dob_error,
]
error_test_cases = {
"missing_json": ErrorValuesForTests.missing_json,
"json_dob_error": ErrorValuesForTests.json_dob_error,
}

for test_case in error_test_cases:
json_data = json.dumps(test_case)
for test_name, test_case in error_test_cases.items():
with self.subTest(test_name=test_name):
fhir_converter = Converter(json.dumps(test_case))
fhir_converter.run_conversion()

fhir_converter = Converter(json_data)
fhir_converter.run_conversion()
self.assert_structured_error_records(fhir_converter.get_error_records())

errorRecords = fhir_converter.get_error_records()

# Check if bad data creates error records
self.assertTrue(len(errorRecords) > 0)

def test_fhir_converter_json_error_scenario_reporting_off(self):
def test_fhir_converter_json_incorrect_data_scenario(self):
"""it should convert fhir json data to flat json - error scenarios"""
error_test_cases = [
ErrorValuesForTests.missing_json,
ErrorValuesForTests.json_dob_error,
]

for test_case in error_test_cases:
json_data = json.dumps(test_case)

fhir_converter = Converter(json_data, report_unexpected_exception=False)
fhir_converter.run_conversion()

errorRecords = fhir_converter.get_error_records()
with self.assertRaisesRegex(ValueError, "FHIR data is required for initialization."):
Converter(None)

# Check if bad data creates error records
self.assertTrue(len(errorRecords) == 0)
def test_handler_persists_structured_conversion_errors(self):
event = self.get_event(operation=Operation.UPDATE)
event["Records"][0]["dynamodb"]["NewImage"]["Resource"]["S"] = json.dumps(ErrorValuesForTests.json_dob_error)

def test_fhir_converter_json_incorrect_data_scenario_reporting_on(self):
"""it should convert fhir json data to flat json - error scenarios"""

with self.assertRaises(ValueError):
fhir_converter = Converter(None)
errorRecords = fhir_converter.get_error_records()
self.assertTrue(len(errorRecords) > 0)
response = handler(event, None)

def test_fhir_converter_json_incorrect_data_scenario_reporting_off(self):
"""it should convert fhir json data to flat json - error scenarios"""
result = self.table.scan()
items = result.get("Items", [])
self.assertTrue(response)
self.assertEqual(len(items), 1)

with self.assertRaises(ValueError):
fhir_converter = Converter(None, report_unexpected_exception=False)
errorRecords = fhir_converter.get_error_records()
self.assertTrue(len(errorRecords) == 0)
error_records = items[0]["Imms"]["CONVERSION_ERRORS"]
self.assert_structured_error_records(error_records)
self.assertEqual(error_records[0]["field"], "PERSON_DOB")
self.assertEqual(error_records[0]["value"], "196513-28")

def test_handler_imms_convert_to_flat_json(self):
"""Test that the Imms field contains the correct flat JSON data for CREATE, UPDATE, and DELETE operations."""
Expand Down Expand Up @@ -235,8 +221,6 @@ def test_handler_imms_convert_to_flat_json(self):
response,
)

result = self.table.scan()
items = result.get("Items", [])
self.clear_table()

def test_handler_imms_convert_to_flat_json_legacy_patientsk_compatibility(self):
Expand Down
21 changes: 16 additions & 5 deletions lambdas/delta_backend/tests/test_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,9 +511,17 @@ def test_dps_record_skipped(self, mock_logger_info):

@patch("delta.Converter")
def test_partial_success_with_errors(self, mock_converter):
expected_error_records = [
{
"code": 10,
"field": "PERSON_DOB",
"value": "196513-28",
"message": "Unexpected exception [ValueError]: Invalid isoformat string: '196513-28'",
}
]
mock_converter_instance = MagicMock()
mock_converter_instance.run_conversion.return_value = {"ABC": "DEF"}
mock_converter_instance.get_error_records.return_value = [{"error": "Invalid field"}]
mock_converter_instance.get_error_records.return_value = expected_error_records
mock_converter.return_value = mock_converter_instance

# Mock DynamoDB put_item success
Expand All @@ -533,13 +541,16 @@ def test_partial_success_with_errors(self, mock_converter):
args, kwargs = self.mock_send_log_to_firehose.call_args
sent_payload = args[1] # Second positional arg

# Navigate to the specific message
status_desc = sent_payload["operation_outcome"]["statusDesc"]
operation_outcome = sent_payload["operation_outcome"]

# Assert the expected message is present
self.assertIn(
"Partial success: successfully synced into delta, but issues found within record",
status_desc,
operation_outcome["statusDesc"],
)
self.assertEqual(operation_outcome["diagnostics"], expected_error_records)
self.mock_logger.warning.assert_called_once_with(
"Partial success: record synced with conversion errors",
extra={"conversion_errors": expected_error_records},
)

def test_send_message_multi_records_diverse(self):
Expand Down