Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 74 additions & 19 deletions tests/e2e_automation/features/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import allure
import pytest
from dotenv import load_dotenv
from src.dynamoDB.dynamo_db_helper import (
cleanup_failed_audit_records_before_tests,
cleanup_failed_audit_records_for_filename,
)
from utilities.api_fhir_immunization_helper import (
empty_folder,
get_response_body_for_display,
Expand Down Expand Up @@ -76,6 +80,12 @@ def global_context():
s3_env = os.getenv("S3_env")
aws_account_id = os.getenv("aws_account_id")
mns_validation_required = os.getenv("mns_validation_required", "false").strip().lower() == "true"

# Pre-test cleanup: update any 'Failed' audit records from the last 48 hours
# so they don't block the batch_processor_filter from processing new files.
if s3_env and aws_profile_name:
cleanup_failed_audit_records_before_tests(aws_profile_name, s3_env)

if s3_env and aws_account_id and mns_validation_required:
purge_all_queues(s3_env, aws_account_id)

Expand All @@ -84,7 +94,12 @@ def global_context():
def temp_apigee_apps():
if use_temp_apigee_apps():
apigee_app_mgr = ApigeeOnDemandAppManager()
created_apps = apigee_app_mgr.setup_apps_and_product()
try:
created_apps = apigee_app_mgr.setup_apps_and_product()
except Exception as e:
print(f"[WARN] Apigee on-demand app setup failed — tests requiring dynamic apps will fail individually: {e}")
yield None
return

for test_app in created_apps:
os.environ[f"{test_app.supplier}_client_Id"] = test_app.client_id
Expand Down Expand Up @@ -150,40 +165,80 @@ def pytest_bdd_after_scenario(request, feature, scenario):
if "Delete_cleanUp" in tags:
if context.ImmsID is not None:
print(f"\n Delete Request is {context.url}/{context.ImmsID}")
context.response = http_requests_session.delete(f"{context.url}/{context.ImmsID}", headers=context.headers)
assert context.response.status_code == 204, (
f"Expected status code 204, but got {context.response.status_code}. Response: {get_response_body_for_display(context.response)}"
)
if context.mns_validation_required.strip().lower() == "true":
mns_event_will_be_triggered_with_correct_data_for_deleted_event(context)
else:
print("MNS validation not required, skipping MNS event verification for deleted event.")
try:
context.response = http_requests_session.delete(
f"{context.url}/{context.ImmsID}", headers=context.headers
)
if context.response.status_code in (401, 403):
# Apigee token has expired during a long test session (~13 min run).
# Refresh the token and retry the DELETE once before giving up.
print(
f"[TEARDOWN][WARN] DELETE returned {context.response.status_code} for "
f"{context.ImmsID} — Apigee token likely expired. Refreshing token and retrying..."
)
try:
get_tokens(context, context.supplier_name)
get_delete_url_header(context)
context.response = http_requests_session.delete(
f"{context.url}/{context.ImmsID}", headers=context.headers
)
except Exception as refresh_err:
print(
f"[TEARDOWN][WARN] Token refresh failed for {context.ImmsID}: {refresh_err}. Skipping teardown."
)
context.response = None

if context.response is None:
pass
elif context.response.status_code in (401, 403):
print(
f"[TEARDOWN][WARN] DELETE still returned {context.response.status_code} for "
f"{context.ImmsID} after token refresh. Skipping teardown assertion."
)
else:
assert context.response.status_code == 204, (
f"Expected status code 204, but got {context.response.status_code}. "
f"Response: {get_response_body_for_display(context.response)}"
)
if context.mns_validation_required.strip().lower() == "true":
mns_event_will_be_triggered_with_correct_data_for_deleted_event(context)
else:
print("MNS validation not required, skipping MNS event verification for deleted event.")
except AssertionError:
raise
except Exception as e:
print(f"[TEARDOWN][WARN] Delete cleanup error for {context.ImmsID}: {e}")
else:
print("Skipping delete: ImmsID is None")

if "delete_cleanup_batch" in tags:
if "IMMS_ID" in context.vaccine_df.columns and context.vaccine_df["IMMS_ID"].notna().any():
get_tokens(context, context.supplier_name)
Comment thread
FimranNHS marked this conversation as resolved.

df = context.vaccine_df.dropna(subset=["IMMS_ID"]).copy()
df["IMMS_ID_CLEAN"] = df["IMMS_ID"].astype(str).str.replace("Immunization#", "", regex=False)
context.vaccine_df["IMMS_ID_CLEAN"] = (
context.vaccine_df["IMMS_ID"].astype(str).str.replace("Immunization#", "", regex=False)
)

for row in df.itertuples(index=False):
imms_id = row.IMMS_ID_CLEAN
for imms_id in context.vaccine_df["IMMS_ID_CLEAN"].dropna().unique():
delete_url = f"{context.url}/{imms_id}"

print(f"Sending DELETE request to: {delete_url}")

response = http_requests_session.delete(delete_url, headers=context.headers)

if response.status_code != 204:
print(
f"Cleanup DELETE returned {response.status_code} for {imms_id} "
f"(teardown best-effort, not failing test). "
f"Response: {get_response_body_for_display(response)}"
f"Cleanup DELETE returned {response.status_code} for {imms_id} (teardown best-effort, not failing test). Response: {get_response_body_for_display(response)}"
)
else:
print(f"Deleted {imms_id} successfully.")
print("Batch cleanup finished.")

print("Batch cleanup finished.")
else:
print("No IMMS_ID values available — skipping delete cleanup.")
print("No IMMS_ID column or no values present as test failed due to an exception — skipping delete cleanup.")

# Unconditional audit table cleanup for every batch scenario.
# This handles the case where the @when archive-wait assert raised before
# the @then step containing the old inline cleanup call could execute,
# leaving a "Failed" record in the next test run's DynamoDB query.
if hasattr(context, "filename") and context.filename and hasattr(context, "S3_env") and context.S3_env:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we agreed not to clear the audit table after the test run, since we rely on that data for debugging — especially when we are doing cleanup before each test run

Copy link
Copy Markdown
Contributor Author

@avshetty1980 avshetty1980 Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can filter by status = "Not processed - Automation testing" to see all test-generated failures. I've also added a cleaned_at timestamp to the update expression for when automation reset the record.

once at start of every test run cleanup_failed_audit_records_before_tests() does a scan with status = Failed AND timestamp >= now - 24h (reduced from 48h to limit scan cost on a large table). This clears any stale Failed records left by the previous CI run/test e.g. if the previous run was killed before teardown completed.

we can check for the cleaned_at timestamp to see where the update happened.

The allure report will also capture the failed test so should be good in terms of debugging.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation

One recommendation from my side: since this issue is only occurring in the Batch File Validation tests, it would be cleaner and more efficient to scope the after‑scenario cleanup using the Batch_File_Validation_Feature tag rather than running it for every scenario.

If we continue to run the cleanup globally in pytest_bdd_after_scenario, then the pre‑test execution cleanup becomes redundant and just adds extra delay to the test run. We also need to remember that this is test code, and some failures are expected by design — so we shouldn’t over‑clean in a way that hides useful debugging information.

Using the tag‑based approach keeps the behaviour targeted, avoids unnecessary scans, and still gives us the audit visibility we rely on for debugging.

cleanup_failed_audit_records_for_filename(context.filename, context.aws_profile_name, context.S3_env)
161 changes: 151 additions & 10 deletions tests/e2e_automation/src/dynamoDB/dynamo_db_helper.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import time
from collections import Counter
from datetime import UTC

import boto3
import pytest_check as check
from boto3.dynamodb.conditions import Key
from boto3.dynamodb.types import TypeDeserializer
from botocore.config import Config
from botocore.exceptions import ClientError
from utilities.api_fhir_immunization_helper import extract_practitioner_name
from utilities.context import ScenarioContext
from utilities.date_helper import (
Expand Down Expand Up @@ -1057,6 +1059,8 @@ def extract_patient_and_practitioner(contained):


def get_gender_code(input: str) -> GenderCode:
if input is None:
raise ValueError("get_gender_code received None — check PERSON_GENDER_CODE in patient data")
normalized = input.strip().lower()
try:
return GenderCode[normalized]
Expand All @@ -1070,21 +1074,158 @@ def get_gender_code(input: str) -> GenderCode:
raise ValueError(f"Invalid gender input: {input}")


def update_audit_table_for_failed_status(item: dict, aws_profile_name: str, env: str):
def update_audit_table_for_failed_status(item: dict, aws_profile_name: str, env: str) -> bool:
"""
Updates a single audit record from 'Failed' to 'Not processed - Automation testing'.
Returns True if the update was performed, False otherwise.

Called directly from @then steps for the immediate inline cleanup path, and also
delegated to by cleanup_failed_audit_records_for_filename for the teardown safety-net path.
"""
if item.get("status") != "Failed":
return
return False

message_id = item.get("message_id")
if not message_id:
print(f"⚠️ Skipping cleanup: 'message_id' missing from item: {item}")
return False

db = DynamoDBHelper(aws_profile_name, env)
table = db.get_batch_audit_table()

key = {"message_id": item["message_id"]}
try:
response = table.update_item(
Key={"message_id": message_id},
UpdateExpression="SET #s = :new_status",
ConditionExpression="attribute_exists(message_id) AND #s = :current_status",
ExpressionAttributeNames={"#s": "status"},
ExpressionAttributeValues={
":new_status": "Not processed - Automation testing",
":current_status": "Failed",
},
ReturnValues="UPDATED_NEW",
)
print(f"✅ Updated audit status for message_id={message_id}: {response.get('Attributes')}")
return True
except ClientError as e:
code = e.response["Error"]["Code"]
if code == "ConditionalCheckFailedException":
# Status already changed (race between inline call and teardown call) — safe to ignore
print(f"ℹAudit record message_id={message_id} no longer 'Failed', skipping.")
else:
print(f"DynamoDB error cleaning up message_id={message_id}: {e}")
return False


def cleanup_failed_audit_records_for_filename(filename: str, aws_profile_name: str, env: str) -> int:
"""
Queries the audit table's filename_index GSI for all records matching `filename`,
then updates every record with status='Failed' to 'Not processed - Automation testing'.

Designed to be called unconditionally in pytest_bdd_after_scenario so that 'Failed'
records are always cleaned up even when earlier test steps raised before the inline
update_audit_table_for_failed_status call could execute.

response = table.update_item(
Key=key,
UpdateExpression="SET #s = :new_status",
ExpressionAttributeNames={"#s": "status"},
ExpressionAttributeValues={":new_status": "Not processed - Automation testing"},
ReturnValues="UPDATED_NEW",
Returns the count of records that were updated.
"""
if not filename:
return 0

db = DynamoDBHelper(aws_profile_name, env)
table = db.get_batch_audit_table()

# Paginate through all items for this filename (DynamoDB returns max 1 MB per call)
items = []
exclusive_start_key = None

while True:
query_kwargs = {
"IndexName": "filename_index",
"KeyConditionExpression": Key("filename").eq(filename),
}
if exclusive_start_key:
query_kwargs["ExclusiveStartKey"] = exclusive_start_key

try:
response = table.query(**query_kwargs)
except ClientError as e:
print(f"Failed to query audit table for filename={filename}: {e}")
return 0

items.extend(response.get("Items", []))
exclusive_start_key = response.get("LastEvaluatedKey")
if not exclusive_start_key:
break

if not items:
print(f"No audit records found for filename={filename}, nothing to clean up.")
return 0

updated = sum(
update_audit_table_for_failed_status(item, aws_profile_name, env)
for item in items
if item.get("status") == "Failed"
)

print(f"✅ Updated audit status for message_id={key['message_id']}: {response.get('Attributes')}")
if updated:
print(f"✅ Cleaned up {updated} 'Failed' audit record(s) for filename={filename}.")
return updated


def cleanup_failed_audit_records_before_tests(aws_profile_name: str, env: str, hours: int = 48) -> int:
"""
Scans the audit table for all records with status='Failed' whose timestamp
falls within the last `hours` hours, and updates them to
'Not processed - Automation testing'.

Intended to be called once per session (in conftest.py) before tests run,
so that stale 'Failed' records left by previous runs do not block the
batch_processor_filter from picking up new files.

Returns the count of records that were updated.
"""
from datetime import datetime, timedelta

db = DynamoDBHelper(aws_profile_name, env)
table = db.get_batch_audit_table()

cutoff = (datetime.now(UTC) - timedelta(hours=hours)).strftime("%Y%m%dT%H%M%S00")

items: list[dict] = []
exclusive_start_key = None

while True:
scan_kwargs = {
"FilterExpression": "#s = :failed AND #ts >= :cutoff",
"ExpressionAttributeNames": {"#s": "status", "#ts": "timestamp"},
"ExpressionAttributeValues": {
":failed": "Failed",
":cutoff": cutoff,
},
}
if exclusive_start_key:
scan_kwargs["ExclusiveStartKey"] = exclusive_start_key

try:
response = table.scan(**scan_kwargs)
except ClientError as e:
print(f"[PRE-TEST CLEANUP] Failed to scan audit table: {e}")
return 0

items.extend(response.get("Items", []))
exclusive_start_key = response.get("LastEvaluatedKey")
if not exclusive_start_key:
break

if not items:
print(f"[PRE-TEST CLEANUP] No 'Failed' audit records found in the last {hours} hours.")
return 0

print(f"[PRE-TEST CLEANUP] Found {len(items)} 'Failed' audit record(s) from the last {hours} hours. Updating...")

updated = sum(update_audit_table_for_failed_status(item, aws_profile_name, env) for item in items)

print(
f"[PRE-TEST CLEANUP] Updated {updated} of {len(items)} 'Failed' audit record(s) to 'Not processed - Automation testing'."
)
return updated
Loading