Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f7c5872
VED-755: refactor handler
motola Sep 12, 2025
2f9d5e4
VED-755: refactor handler2
motola Sep 12, 2025
bc9f4bc
VED-755: Refactor existing id_sync codebase
motola Sep 12, 2025
13610c5
VED-755: refactoring record_processor
motola Sep 12, 2025
bac82fe
resolve lint
motola Sep 15, 2025
516ac9a
resolve lint2
motola Sep 15, 2025
cd02066
add logic for demographics match and integrate with nhs number proces…
motola Sep 15, 2025
b36d8ed
add unit test for demographics test
motola Sep 15, 2025
001fda1
refactor all functions
motola Sep 15, 2025
973c8a2
VED-755: Adding demographics logic
motola Sep 16, 2025
7df9358
resolve failing test
motola Sep 16, 2025
0644072
VED-755: Resolve lint issues
motola Sep 16, 2025
4bb330a
VED-755: remove duplication
motola Sep 16, 2025
af29a78
VED-755: remove duplication2
motola Sep 16, 2025
876485f
VED-755: improve coverage
motola Sep 16, 2025
f637a93
VED-755: refactor process_nhs_number
motola Sep 16, 2025
31858cc
VED-755: added pagination, remove ieds_check_exist, modify test
motola Sep 17, 2025
2ffffb7
refactor pagination
motola Sep 17, 2025
9da9a17
VED-755: update records whose vaccination match
motola Sep 17, 2025
d1cd6a7
Merge branch 'master' into VED-755-Handling-NHS-Confusions
motola Sep 17, 2025
9c446b5
refactoring record processor.py
motola Sep 18, 2025
a0381b7
Merge remote branch into VED-755-Handling-NHS-Confusions
motola Sep 18, 2025
1ae026a
reduce cognitive complexity and use utils and constants
motola Sep 18, 2025
8523b24
fix lint errors
motola Sep 18, 2025
1941dba
VED-765: review based on changes
motola Sep 18, 2025
c930608
VED-755: code review changes
motola Sep 18, 2025
5dd0764
VED-755: code review
motola Sep 18, 2025
e1461b4
Merge branch 'master' into VED-755-Handling-NHS-Confusions
motola Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions lambdas/id_sync/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,4 @@

- Code is located in the `lambdas/id_sync/src/` directory.
- Unit tests are in the `lambdas/id_sync/tests/` directory.
- Use the provided Makefile and Dockerfile for building, testing, and packaging.

## License

This project is maintained by NHS. See [LICENSE](../LICENSE) for details.
- Use the provided Makefile and Dockerfile for building, testing, and packaging.
67 changes: 36 additions & 31 deletions lambdas/id_sync/src/id_sync.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,51 @@
from common.clients import logger
from common.clients import STREAM_NAME
from typing import Any, Dict

from common.clients import logger, STREAM_NAME
from common.log_decorator import logging_decorator
from common.aws_lambda_event import AwsLambdaEvent
from exceptions.id_sync_exception import IdSyncException
from record_processor import process_record
'''
Lambda function handler for processing SQS events.Lambda for ID Sync. Fired by SQS
'''

"""
- Parses the incoming AWS event into `AwsLambdaEvent` and iterate its `records`.
Comment thread
Akol125 marked this conversation as resolved.
- Delegate each record to `process_record` and collect `nhs_number` from each result.
- If any record has status == "error" raise `IdSyncException` with aggregated nhs_numbers.
- Any unexpected error is wrapped into `IdSyncException(message="Error processing id_sync event")`.
"""

@logging_decorator(prefix="id_sync", stream_name=STREAM_NAME)
def handler(event_data, _):

@logging_decorator(prefix="id_sync", stream_name=STREAM_NAME)
def handler(event_data: Dict[str, Any], _context) -> Dict[str, Any]:
try:
logger.info("id_sync handler invoked")
event = AwsLambdaEvent(event_data)
Comment thread
dlzhry2nhs marked this conversation as resolved.
record_count = len(event.records)
if record_count > 0:
logger.info("id_sync processing event with %d records", record_count)
error_count = 0
nhs_numbers = []
for record in event.records:
record_result = process_record(record)
nhs_numbers.append(record_result["nhs_number"])
if record_result["status"] == "error":
error_count += 1
if error_count > 0:
raise IdSyncException(message=f"Processed {record_count} records with {error_count} errors",
nhs_numbers=nhs_numbers)

else:
response = {"status": "success",
"message": f"Successfully processed {record_count} records",
"nhs_numbers": nhs_numbers}
else:
response = {"status": "success", "message": "No records found in event"}
records = event.records

if not records:
return {"status": "success", "message": "No records found in event"}

logger.info("id_sync processing event with %d records", len(records))

results = [process_record(record) for record in records]
Comment thread
Akol125 marked this conversation as resolved.
nhs_numbers = [result["nhs_number"] for result in results]
error_count = sum(1 for result in results if result.get("status") == "error")

if error_count:
raise IdSyncException(message=f"Processed {len(records)} records with {error_count} errors",
nhs_numbers=nhs_numbers)

response = {
"status": "success",
"message": f"Successfully processed {len(records)} records",
"nhs_numbers": nhs_numbers
}

logger.info("id_sync handler completed: %s", response)
return response

except IdSyncException as e:
logger.exception(f"id_sync error: {e.message}")
raise e
except Exception as e:
raise
except Exception:
msg = "Error processing id_sync event"
logger.exception(msg)
raise IdSyncException(message=msg, exception=e)
raise IdSyncException(message=msg)
16 changes: 16 additions & 0 deletions lambdas/id_sync/src/ieds_db_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,19 @@ def get_items_from_patient_id(id: str, limit=BATCH_SIZE) -> list:
nhs_numbers=[patient_pk],
exception=e
)

Comment thread
dlzhry2nhs marked this conversation as resolved.

def extract_patient_resource_from_item(item: dict) -> dict | None:
"""
Extract a Patient resource dict from an IEDS database.
"""
patient_resource = item.get("Resource", None)
if not isinstance(patient_resource, dict):
return None

# Otherwise, check contained
for response in patient_resource.get("contained", []):
if isinstance(response, dict) and response.get("resourceType") == "Patient":
return response

return None
3 changes: 2 additions & 1 deletion lambdas/id_sync/src/pds_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
safe_tmp_dir = tempfile.mkdtemp(dir="/tmp") # NOSONAR


# Get Patient details from external service PDS using NHS number from MNS notification
def pds_get_patient_details(nhs_number: str) -> dict:
try:
logger.info(f"get patient details. nhs_number: {nhs_number}")
Expand All @@ -34,6 +35,7 @@ def pds_get_patient_details(nhs_number: str) -> dict:
raise IdSyncException(message=msg, exception=e)


# Extract Patient identifier value from PDS patient details
Comment thread
JamesW1-NHS marked this conversation as resolved.
def pds_get_patient_id(nhs_number: str) -> str:
"""
Get PDS patient ID from NHS number.
Expand All @@ -48,7 +50,6 @@ def pds_get_patient_id(nhs_number: str) -> str:

return patient_details["identifier"][0]["value"]

# ✅ Remove the IdSyncException catch since you're just re-raising
except Exception as e:
msg = f"Error getting PDS patient ID for {nhs_number}"
logger.exception(msg)
Expand Down
179 changes: 147 additions & 32 deletions lambdas/id_sync/src/record_processor.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,176 @@
'''
record Processor
'''
from common.clients import logger
from typing import Optional
from pds_details import pds_get_patient_id
from ieds_db_operations import ieds_check_exist, ieds_update_patient_id
from typing import Dict, Any
from pds_details import pds_get_patient_id, pds_get_patient_details
from ieds_db_operations import (
ieds_check_exist,
ieds_update_patient_id,
extract_patient_resource_from_item,
get_items_from_patient_id,
)
import json
import ast


def process_record(event_record):
def process_record(event_record: Dict[str, Any]) -> Dict[str, Any]:

logger.info("process_record. Processing record: %s", event_record)
body_text = event_record.get('body', '')
# convert body to json

# convert body to json (try JSON first, then fall back to Python literal)
if isinstance(body_text, str):
try:
# Try JSON first
body = json.loads(body_text)
except json.JSONDecodeError:
try:
# Fall back to Python dict syntax
body = ast.literal_eval(body_text)
except (ValueError, SyntaxError):
logger.error("Failed to parse body: %s", body_text)
return {"status": "error", "message": "Invalid body format"}
else:
body = body_text

nhs_number = body.get("subject")
logger.info("process record NHS number: %s", nhs_number)
if nhs_number:
return process_nhs_number(nhs_number)
else:
logger.info("No NHS number found in event record")
return {"status": "error", "message": "No NHS number found in event record"}

logger.info("No NHS number found in event record")
return {"status": "error", "message": "No NHS number found in event record"}


def process_nhs_number(nhs_number: str) -> Optional[str]:
def process_nhs_number(nhs_number: str) -> Dict[str, Any]:
Comment thread
Akol125 marked this conversation as resolved.
# get patient details from PDS
logger.info(f"process_nhs_number. Processing NHS number: {nhs_number}")
patient_details_id = pds_get_patient_id(nhs_number)

base_log_data = {"nhs_number": nhs_number}
if patient_details_id:
logger.info(f"process_nhs_number. Patient details ID: {patient_details_id}")
# if patient NHS != id, update patient index of vax events to new number
if patient_details_id != nhs_number and patient_details_id:
logger.info(f"process_nhs_number. Update patient ID from {nhs_number} to {patient_details_id}")
if ieds_check_exist(nhs_number):
logger.info("process_nhs_number. IEDS record found, updating patient ID")
response = ieds_update_patient_id(nhs_number, patient_details_id)
else:
logger.info("process_nhs_number. No ieds record found for: %s", nhs_number)
response = {"status": "success", "message": f"No records returned for ID: {nhs_number}"}
new_nhs_number = pds_get_patient_id(nhs_number)
Comment thread
dlzhry2nhs marked this conversation as resolved.

if not new_nhs_number:
return {
"status": "success",
"message": "No patient ID found for NHS number",
"nhs_number": nhs_number,
}

if new_nhs_number == nhs_number:
return {
"status": "success",
"message": "No update required",
"nhs_number": nhs_number,
}
logger.info("Update patient ID from %s to %s", nhs_number, new_nhs_number)

if ieds_check_exist(nhs_number):
# Fetch PDS details for demographic comparison
try:
pds_details = pds_get_patient_details(nhs_number)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have called pds_get_patient_id which calls this pds_get_patient_details function. This is a bit inefficient. Could just make 1 API call rather than 2 and get the relevant data that we need.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed on call - optional. Can do if it does not take too much time, but is an existing issue. Would be nice as improves efficiency.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are still calling PDS twice in this flow. Both times we retrieve the full patient details. On the first call, we retrieve the details and then just obtain the ID/NHS Number.

On the second call we retrieve the full patient details again so we can use it to compare the demographic details etc.

It would be really nice if we could just perform the call once. Please confirm if you plan to do this or are leaving out of this PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have given this some more thought. Maybe it is too much for this ticket, but can we raise a ticket to remove to 2 calls to PDS and replace with 1. It is an external service that already faces a lot of traffic, so I would like to minimise this in future.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would be nice, we could have another ticket to tighten up some other checks, including this and add backoffs and retries for the ieds_db operations. A ticket basically to make things more durable and cater for scenarios that might also come up in the test??

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

except Exception:
logger.exception("process_nhs_number: failed to fetch PDS details, aborting update")
return {
"status": "error",
"message": "Failed to fetch PDS details for demographic comparison",
"nhs_number": nhs_number,
}

# Get IEDS items for this patient id and compare demographics
try:
items = get_items_from_patient_id(nhs_number)
Comment thread
dlzhry2nhs marked this conversation as resolved.
Outdated
except Exception:
logger.exception("process_nhs_number: failed to fetch IEDS items, aborting update")
return {
"status": "error",
"message": "Failed to fetch IEDS items for demographic comparison",
"nhs_number": nhs_number,
}

# If at least one IEDS item matches demographics, proceed with update
match_found = False
for item in items:
try:
if demographics_match(pds_details, item):
Comment thread
dlzhry2nhs marked this conversation as resolved.
Outdated
match_found = True
break
except Exception:
logger.exception("process_nhs_number: error while comparing demographics for item: %s", item)

if not match_found:
logger.info("process_nhs_number: No IEDS items matched PDS demographics. Skipping update for %s", nhs_number)
response = {
"status": "success",
"message": "No IEDS items matched PDS demographics; update skipped",
}
else:
response = {"status": "success", "message": "No update required"}
response = ieds_update_patient_id(nhs_number, new_nhs_number)
Comment thread
dlzhry2nhs marked this conversation as resolved.
Outdated
else:
response = {"status": "success", "message": f"No patient ID found for NHS number: {nhs_number}"}
response.update(base_log_data)
logger.info("No IEDS record found for: %s", nhs_number)
response = {"status": "success", "message": f"No records returned for ID: {nhs_number}"}

response["nhs_number"] = nhs_number
return response


def extract_normalized_name_from_patient(patient: dict) -> str | None:
Comment thread
JamesW1-NHS marked this conversation as resolved.
"""Return a normalized 'given family' name string from a Patient resource or None."""
if not patient:
return None
name = patient.get("name")
if not name:
return None
try:
name_entry = name[0] if isinstance(name, list) else name
given = name_entry.get("given")
given_str = None
if isinstance(given, list) and given:
given_str = given[0]
elif isinstance(given, str):
given_str = given
family = name_entry.get("family")
parts = [p for p in [given_str, family] if p]
return " ".join(parts).strip().lower() if parts else None
except Exception:
return None


def demographics_match(pds_details: dict, ieds_item: dict) -> bool:
"""Compare PDS patient details from PDS to an IEDS item (FHIR Patient resource).
Returns True if name, birthDate and gender match (when present in both sources).
If required fields are missing or unparsable on the IEDS side the function returns False.
"""
try:
def normalize_strings(item: Any) -> str | None:
Comment thread
Akol125 marked this conversation as resolved.
return str(item).strip().lower() if item else None

# Retrieve patient resource from PDS
pds_name = normalize_strings(extract_normalized_name_from_patient(pds_details))
pds_gender = normalize_strings(pds_details.get("gender"))
pds_birth = normalize_strings(pds_details.get("birthDate"))

# Retrieve patient resource from IEDS item
patient = extract_patient_resource_from_item(ieds_item)
if not patient:
logger.debug("demographics_match: no patient resource in IEDS table item")
return False

# normalize patient name
ieds_name = normalize_strings(extract_normalized_name_from_patient(patient))

ieds_gender = normalize_strings(patient.get("gender"))
ieds_birth = patient.get("birthDate")

if not all([pds_name, pds_gender, pds_birth, ieds_name, ieds_gender, ieds_birth]):
logger.debug("demographics_match: missing required demographics")
return False

if pds_birth != ieds_birth:
logger.debug("demographics_match: birthDate mismatch %s != %s", pds_birth, ieds_birth)
return False

if pds_gender != ieds_gender:
logger.debug("demographics_match: gender mismatch %s != %s", pds_gender, ieds_gender)
return False

if pds_name != ieds_name:
logger.debug("demographics_match: name mismatch %s != %s", pds_name, ieds_name)
return False

return True
except Exception:
logger.exception("demographics_match: comparison failed with exception")
return False
4 changes: 0 additions & 4 deletions lambdas/id_sync/tests/test_ieds_db_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,3 @@ def test_ieds_check_exist_limit_parameter(self):

# Assert - Verify the limit parameter is correctly passed
self.mock_get_items_from_patient_id.assert_called_once_with(patient_id, 1)

# ✅ Remove tests that are no longer relevant:
# - test_ieds_check_exist_missing_count_field (no longer uses Count)
# - test_ieds_check_exist_count_greater_than_one (no longer uses Count)
Loading