-
Notifications
You must be signed in to change notification settings - Fork 4
VED-755: Handling NHS Number confusions #816
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
f7c5872
2f9d5e4
bc9f4bc
13610c5
bac82fe
516ac9a
cd02066
b36d8ed
001fda1
973c8a2
7df9358
0644072
4bb330a
af29a78
876485f
f637a93
31858cc
2ffffb7
9da9a17
d1cd6a7
9c446b5
a0381b7
1ae026a
8523b24
1941dba
c930608
5dd0764
e1461b4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,46 +1,51 @@ | ||
| from common.clients import logger | ||
| from common.clients import STREAM_NAME | ||
| from typing import Any, Dict | ||
|
|
||
| from common.clients import logger, STREAM_NAME | ||
| from common.log_decorator import logging_decorator | ||
| from common.aws_lambda_event import AwsLambdaEvent | ||
| from exceptions.id_sync_exception import IdSyncException | ||
| from record_processor import process_record | ||
| ''' | ||
| Lambda function handler for processing SQS events.Lambda for ID Sync. Fired by SQS | ||
| ''' | ||
|
|
||
| """ | ||
| - Parses the incoming AWS event into `AwsLambdaEvent` and iterate its `records`. | ||
| - Delegate each record to `process_record` and collect `nhs_number` from each result. | ||
| - If any record has status == "error" raise `IdSyncException` with aggregated nhs_numbers. | ||
| - Any unexpected error is wrapped into `IdSyncException(message="Error processing id_sync event")`. | ||
| """ | ||
|
|
||
| @logging_decorator(prefix="id_sync", stream_name=STREAM_NAME) | ||
| def handler(event_data, _): | ||
|
|
||
| @logging_decorator(prefix="id_sync", stream_name=STREAM_NAME) | ||
| def handler(event_data: Dict[str, Any], _context) -> Dict[str, Any]: | ||
| try: | ||
| logger.info("id_sync handler invoked") | ||
| event = AwsLambdaEvent(event_data) | ||
|
dlzhry2nhs marked this conversation as resolved.
|
||
| record_count = len(event.records) | ||
| if record_count > 0: | ||
| logger.info("id_sync processing event with %d records", record_count) | ||
| error_count = 0 | ||
| nhs_numbers = [] | ||
| for record in event.records: | ||
| record_result = process_record(record) | ||
| nhs_numbers.append(record_result["nhs_number"]) | ||
| if record_result["status"] == "error": | ||
| error_count += 1 | ||
| if error_count > 0: | ||
| raise IdSyncException(message=f"Processed {record_count} records with {error_count} errors", | ||
| nhs_numbers=nhs_numbers) | ||
|
|
||
| else: | ||
| response = {"status": "success", | ||
| "message": f"Successfully processed {record_count} records", | ||
| "nhs_numbers": nhs_numbers} | ||
| else: | ||
| response = {"status": "success", "message": "No records found in event"} | ||
| records = event.records | ||
|
|
||
| if not records: | ||
| return {"status": "success", "message": "No records found in event"} | ||
|
|
||
| logger.info("id_sync processing event with %d records", len(records)) | ||
|
|
||
| results = [process_record(record) for record in records] | ||
|
Akol125 marked this conversation as resolved.
|
||
| nhs_numbers = [result["nhs_number"] for result in results] | ||
| error_count = sum(1 for result in results if result.get("status") == "error") | ||
|
|
||
| if error_count: | ||
| raise IdSyncException(message=f"Processed {len(records)} records with {error_count} errors", | ||
| nhs_numbers=nhs_numbers) | ||
|
|
||
| response = { | ||
| "status": "success", | ||
| "message": f"Successfully processed {len(records)} records", | ||
| "nhs_numbers": nhs_numbers | ||
| } | ||
|
|
||
| logger.info("id_sync handler completed: %s", response) | ||
| return response | ||
|
|
||
| except IdSyncException as e: | ||
| logger.exception(f"id_sync error: {e.message}") | ||
| raise e | ||
| except Exception as e: | ||
| raise | ||
| except Exception: | ||
| msg = "Error processing id_sync event" | ||
| logger.exception(msg) | ||
| raise IdSyncException(message=msg, exception=e) | ||
| raise IdSyncException(message=msg) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,61 +1,172 @@ | ||
| ''' | ||
| record Processor | ||
| ''' | ||
| from common.clients import logger | ||
| from typing import Optional | ||
| from pds_details import pds_get_patient_id | ||
| from ieds_db_operations import ieds_check_exist, ieds_update_patient_id | ||
| from typing import Dict, Any | ||
| from pds_details import pds_get_patient_id, pds_get_patient_details, normalize_name_from_pds | ||
| from ieds_db_operations import ( | ||
| ieds_check_exist, | ||
| ieds_update_patient_id, | ||
| extract_patient_resource_from_item, | ||
| get_items_from_patient_id, | ||
| ) | ||
| import json | ||
| import ast | ||
|
|
||
|
|
||
| def process_record(event_record): | ||
| def process_record(event_record: Dict[str, Any]) -> Dict[str, Any]: | ||
|
|
||
| logger.info("process_record. Processing record: %s", event_record) | ||
| body_text = event_record.get('body', '') | ||
| # convert body to json | ||
|
|
||
| # convert body to json (try JSON first, then fall back to Python literal) | ||
| if isinstance(body_text, str): | ||
| try: | ||
| # Try JSON first | ||
| body = json.loads(body_text) | ||
| except json.JSONDecodeError: | ||
| try: | ||
| # Fall back to Python dict syntax | ||
| body = ast.literal_eval(body_text) | ||
| except (ValueError, SyntaxError): | ||
| logger.error("Failed to parse body: %s", body_text) | ||
| return {"status": "error", "message": "Invalid body format"} | ||
| else: | ||
| body = body_text | ||
|
|
||
| nhs_number = body.get("subject") | ||
| logger.info("process record NHS number: %s", nhs_number) | ||
| if nhs_number: | ||
| return process_nhs_number(nhs_number) | ||
| else: | ||
| logger.info("No NHS number found in event record") | ||
| return {"status": "error", "message": "No NHS number found in event record"} | ||
|
|
||
| logger.info("No NHS number found in event record") | ||
| return {"status": "error", "message": "No NHS number found in event record"} | ||
|
|
||
| def process_nhs_number(nhs_number: str) -> Optional[str]: | ||
|
|
||
| def process_nhs_number(nhs_number: str) -> Dict[str, Any]: | ||
|
Akol125 marked this conversation as resolved.
|
||
| # get patient details from PDS | ||
| logger.info(f"process_nhs_number. Processing NHS number: {nhs_number}") | ||
| patient_details_id = pds_get_patient_id(nhs_number) | ||
|
|
||
| base_log_data = {"nhs_number": nhs_number} | ||
| if patient_details_id: | ||
| logger.info(f"process_nhs_number. Patient details ID: {patient_details_id}") | ||
| # if patient NHS != id, update patient index of vax events to new number | ||
| if patient_details_id != nhs_number and patient_details_id: | ||
| logger.info(f"process_nhs_number. Update patient ID from {nhs_number} to {patient_details_id}") | ||
| if ieds_check_exist(nhs_number): | ||
| logger.info("process_nhs_number. IEDS record found, updating patient ID") | ||
| response = ieds_update_patient_id(nhs_number, patient_details_id) | ||
| else: | ||
| logger.info("process_nhs_number. No ieds record found for: %s", nhs_number) | ||
| response = {"status": "success", "message": f"No records returned for ID: {nhs_number}"} | ||
| new_nhs_number = pds_get_patient_id(nhs_number) | ||
|
dlzhry2nhs marked this conversation as resolved.
|
||
|
|
||
| if not new_nhs_number: | ||
| return { | ||
| "status": "success", | ||
| "message": "No patient ID found for NHS number", | ||
| "nhs_number": nhs_number, | ||
| } | ||
|
|
||
| if new_nhs_number == nhs_number: | ||
| return { | ||
| "status": "success", | ||
| "message": "No update required", | ||
| "nhs_number": nhs_number, | ||
| } | ||
| logger.info("Update patient ID from %s to %s", nhs_number, new_nhs_number) | ||
|
|
||
| if ieds_check_exist(nhs_number): | ||
| # Fetch PDS details for demographic comparison | ||
| try: | ||
| pds_details = pds_get_patient_details(nhs_number) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We already have called
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed on call - optional. Can do if it does not take too much time, but is an existing issue. Would be nice as improves efficiency.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are still calling PDS twice in this flow. Both times we retrieve the full patient details. On the first call, we retrieve the details and then just obtain the ID/NHS Number. On the second call we retrieve the full patient details again so we can use it to compare the demographic details etc. It would be really nice if we could just perform the call once. Please confirm if you plan to do this or are leaving out of this PR.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have given this some more thought. Maybe it is too much for this ticket, but can we raise a ticket to remove to 2 calls to PDS and replace with 1. It is an external service that already faces a lot of traffic, so I would like to minimise this in future.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that would be nice, we could have another ticket to tighten up some other checks, including this and add backoffs and retries for the ieds_db operations. A ticket basically to make things more durable and cater for scenarios that might also come up in the test??
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. |
||
| except Exception: | ||
| logger.exception("process_nhs_number: failed to fetch PDS details, aborting update") | ||
| return { | ||
| "status": "error", | ||
| "message": "Failed to fetch PDS details for demographic comparison", | ||
| "nhs_number": nhs_number, | ||
| } | ||
|
|
||
| # Get IEDS items for this patient id and compare demographics | ||
| try: | ||
| items = get_items_from_patient_id(nhs_number) | ||
|
dlzhry2nhs marked this conversation as resolved.
Outdated
|
||
| except Exception: | ||
| logger.exception("process_nhs_number: failed to fetch IEDS items, aborting update") | ||
| return { | ||
| "status": "error", | ||
| "message": "Failed to fetch IEDS items for demographic comparison", | ||
| "nhs_number": nhs_number, | ||
| } | ||
|
|
||
| # If at least one IEDS item matches demographics, proceed with update | ||
| match_found = False | ||
| for item in items: | ||
| try: | ||
| if demographics_match(pds_details, item): | ||
|
dlzhry2nhs marked this conversation as resolved.
Outdated
|
||
| match_found = True | ||
| break | ||
| except Exception: | ||
| logger.exception("process_nhs_number: error while comparing demographics for item: %s", item) | ||
|
|
||
| if not match_found: | ||
| logger.info("process_nhs_number: No IEDS items matched PDS demographics. Skipping update for %s", nhs_number) | ||
| response = { | ||
| "status": "success", | ||
| "message": "No IEDS items matched PDS demographics; update skipped", | ||
| } | ||
| else: | ||
| response = {"status": "success", "message": "No update required"} | ||
| response = ieds_update_patient_id(nhs_number, new_nhs_number) | ||
|
dlzhry2nhs marked this conversation as resolved.
Outdated
|
||
| else: | ||
| response = {"status": "success", "message": f"No patient ID found for NHS number: {nhs_number}"} | ||
| response.update(base_log_data) | ||
| logger.info("No IEDS record found for: %s", nhs_number) | ||
| response = {"status": "success", "message": f"No records returned for ID: {nhs_number}"} | ||
|
|
||
| response["nhs_number"] = nhs_number | ||
| return response | ||
|
|
||
| def extract_normalized_name_from_patient(patient: dict) -> str | None: | ||
|
JamesW1-NHS marked this conversation as resolved.
|
||
| """Return a normalized 'given family' name string from a Patient resource or None.""" | ||
| if not patient: | ||
| return None | ||
| name = patient.get("name") | ||
| if not name: | ||
| return None | ||
| try: | ||
| name_entry = name[0] if isinstance(name, list) else name | ||
| given = name_entry.get("given") | ||
| given_str = None | ||
| if isinstance(given, list) and given: | ||
| given_str = given[0] | ||
| elif isinstance(given, str): | ||
| given_str = given | ||
| family = name_entry.get("family") | ||
| parts = [p for p in [given_str, family] if p] | ||
| return " ".join(parts).strip().lower() if parts else None | ||
| except Exception: | ||
| return None | ||
|
|
||
|
|
||
| def demographics_match(pds_details: dict, ieds_item: dict) -> bool: | ||
| """Compare PDS patient details from PDS to an IEDS item (FHIR Patient resource). | ||
| Returns True if name, birthDate and gender match (when present in both sources). | ||
| If required fields are missing or unparsable on the IEDS side the function returns False. | ||
| """ | ||
| try: | ||
|
|
||
| def normalize_strings(item: Any) -> str | None: | ||
|
Akol125 marked this conversation as resolved.
|
||
| return str(item).strip().lower() if item else None | ||
|
|
||
| # Retrieve patient resource from PDS | ||
| pds_name = normalize_strings(normalize_name_from_pds(pds_details)) | ||
| pds_gender = normalize_strings(pds_details.get("gender")) | ||
| pds_birth = normalize_strings(pds_details.get("birthDate")) | ||
|
|
||
| # Retrieve patient resource from IEDS item | ||
| patient = extract_patient_resource_from_item(ieds_item) | ||
| if not patient: | ||
| logger.debug("demographics_match: no patient resource in IEDS table item") | ||
| return False | ||
|
|
||
| # normalize patient name | ||
| ieds_name = extract_normalized_name_from_patient(patient) | ||
|
|
||
| ieds_gender = normalize_strings(patient.get("gender")) | ||
| ieds_birth = patient.get("birthDate") | ||
|
|
||
| if pds_birth and ieds_birth and pds_birth != ieds_birth: | ||
| logger.debug("demographics_match: birthDate mismatch %s != %s", pds_birth, ieds_birth) | ||
| return False | ||
|
|
||
| if pds_gender and ieds_gender and pds_gender != ieds_gender: | ||
| logger.debug("demographics_match: gender mismatch %s != %s", pds_gender, ieds_gender) | ||
| return False | ||
|
|
||
| if pds_name and ieds_name and pds_name != ieds_name: | ||
| logger.debug("demographics_match: name mismatch %s != %s", pds_name, ieds_name) | ||
| return False | ||
|
|
||
| return True | ||
| except Exception: | ||
| logger.exception("demographics_match: comparison failed with exception") | ||
| return False | ||
Uh oh!
There was an error while loading. Please reload this page.