-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathbatch_common_steps.py
More file actions
723 lines (560 loc) · 28.4 KB
/
batch_common_steps.py
File metadata and controls
723 lines (560 loc) · 28.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
import functools
import json
import os
import re
from collections import Counter
from datetime import UTC, datetime
import pandas as pd
import pytest_check as check
from pytest_bdd import given, parsers, then, when
from src.dynamoDB.dynamo_db_helper import (
fetch_batch_audit_table_detail,
fetch_immunization_events_detail_by_IdentifierPK,
fetch_immunization_int_delta_detail_by_immsID,
parse_imms_int_imms_event_response,
validate_audit_table_record,
validate_imms_delta_record_with_batch_record,
validate_to_compare_batch_record_with_event_table_record,
)
from src.objectModels.batch.batch_file_builder import (
build_batch_file,
generate_file_name,
save_record_to_batch_files_directory,
)
from src.objectModels.patient_loader import get_gp_code_by_nhs_number
from utilities.batch_file_helper import (
read_and_validate_csv_bus_ack_file_content,
validate_bus_ack_file_for_error,
validate_bus_ack_file_for_successful_records,
validate_inf_ack_file,
validate_json_bus_ack_file_failure_records,
validate_json_bus_ack_file_structure_and_metadata,
)
from utilities.batch_S3_buckets import (
upload_file_to_S3,
wait_and_read_ack_file,
wait_for_file_to_move_archive,
)
from utilities.date_helper import iso_to_compact, normalize_utc_suffix
from utilities.enums import ActionFlag, ActionMap, Operation
from utilities.sqs_message_halder import read_messages_for_batch
from features.APITests.steps.common_steps import (
calculate_age,
is_valid_uuid,
mns_event_will_be_triggered_with_correct_data,
)
def ignore_if_local_run(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Extract context from args or kwargs
context = kwargs.get("context") if "context" in kwargs else (args[-1] if args else None)
if context and getattr(context, "LOCAL_RUN_WITHOUT_S3_UPLOAD", False):
print(f"Skipping step '{func.__name__}' due to local execution mode.")
return None
return func(*args, **kwargs)
return wrapper
def ignore_local_run_set_test_data(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Extract context from args or kwargs
context = kwargs.get("context") if "context" in kwargs else (args[-1] if args else None)
if context and getattr(context, "LOCAL_RUN_WITHOUT_S3_UPLOAD", False):
print(f"Skipping step '{func.__name__}' due to local execution mode.")
file_name = os.getenv("LOCAL_RUN_FILE_NAME")
context.filename = file_name
file_path = os.path.join(context.working_directory, file_name)
# Read file into vaccine_df
try:
context.vaccine_df = pd.read_csv(
file_path,
delimiter="|", # or "," depending on your export logic
quotechar='"',
dtype=str, # optional: ensures all columns are read as strings
)
print(f"Loaded fallback vaccine_df from {file_name}")
except Exception as e:
print(f"Failed to load fallback file {file_name}: {e}")
context.vaccine_df = pd.DataFrame() # fallback to empty
return None
return func(*args, **kwargs)
return wrapper
@given("batch file is created for below data as full dataset")
@ignore_if_local_run
def valid_batch_file_is_created_with_details(datatable, context):
build_dataFrame_using_datatable(datatable, context)
create_batch_file(context)
@given("batch file is received for below data form DPS with all three action flag for each record")
@ignore_if_local_run
def valid_batch_file_is_created_for_DPSFULL(datatable, context):
build_dataFrame_using_datatable(datatable, context)
df_new = context.vaccine_df.copy()
df_update = df_new.copy()
df_update[["ACTION_FLAG", "EXPIRY_DATE"]] = ["UPDATE", "20281231"]
df_delete = df_new.copy()
df_delete[["ACTION_FLAG", "EXPIRY_DATE"]] = ["DELETE", "20281231"]
context.vaccine_df = pd.concat([df_new, df_update, df_delete], ignore_index=True)
file_name = f"{context.vaccine_type}_Vaccinations_v5_DPSFULL"
create_batch_file(context, fileName=file_name)
context.expected_version = 2
@given("batch file is created for below data as full dataset with file extension dat")
@ignore_if_local_run
def valid_batch_file_is_created_with_details_and_dat_extension(datatable, context):
build_dataFrame_using_datatable(datatable, context)
create_batch_file(context, file_ext="dat")
@when("same batch file is uploaded again in s3 bucket")
@when("batch file is uploaded in s3 bucket")
@ignore_local_run_set_test_data
def batch_file_upload_in_s3_bucket(context):
upload_file_to_S3(context)
print(f"Batch file uploaded to S3: {context.filename}")
fileIsMoved = wait_for_file_to_move_archive(context)
assert fileIsMoved, "File not found in archive after timeout"
@then("file will be moved to destination bucket and inf ack file will be created")
def ack_file_will_be_moved_to_destination_bucket(context):
result = wait_and_read_ack_file(context, "ack")
actual_file_name = result["csv"]["key"]
check.is_true(
actual_file_name.endswith(".csv"),
f"Expected ACK file extension is .csv but got {actual_file_name}",
)
check.is_true(
actual_file_name.startswith(context.forwarded_prefix),
f"Expected ACK file name to start with {context.forwarded_prefix} but got {actual_file_name}",
)
check.is_true(
"_InfAck_" in actual_file_name,
f"Expected ACK file name to contain '_InfAck_actual_file_name' but got {actual_file_name}",
)
context.fileContent = result["csv"]["content"]
@then("inf ack file has success status for processed batch file")
def all_records_are_processed_successfully_in_the_inf_ack_file(context):
all_valid = validate_inf_ack_file(context)
assert all_valid, "One or more records failed validation checks"
@then("bus ack files will be created")
def file_will_be_moved_to_destination_bucket(context):
result = wait_and_read_ack_file(context, "forwardedFile")
assert isinstance(result, dict), f"Expected both CSV and JSON ACK files but got: {type(result)}"
actual_file_name = result["csv"]["key"]
check.is_true(
actual_file_name.endswith(".csv"),
f"Expected ACK file extension is .csv but got {actual_file_name}",
)
check.is_true(
actual_file_name.startswith(context.forwarded_prefix),
f"Expected ACK file name to start with {context.forwarded_prefix} but got {actual_file_name}",
)
check.is_true(
"_BusAck_" in actual_file_name,
f"Expected ACK file name to contain '_BusAck_actual_file_name' but got {actual_file_name}",
)
context.fileContent = result["csv"]["content"]
actual_file_name = result["json"]["key"]
check.is_true(
actual_file_name.endswith(".json"),
f"Expected ACK file extension is .json but got {actual_file_name}",
)
check.is_true(
actual_file_name.startswith(context.forwarded_prefix),
f"Expected ACK file name to start with {context.forwarded_prefix} but got {actual_file_name}",
)
check.is_true(
"_BusAck_" in actual_file_name,
f"Expected ACK file name to contain '_BusAck_actual_file_name' but got {actual_file_name}",
)
context.fileContentJson = result["json"]["content"]
assert context.fileContent, (
f"BUS Ack csv File not found in destination bucket after timeout: {context.forwarded_prefix}"
)
assert context.fileContentJson, (
f"BUS Ack JSON file not found in destination bucket after timeout: {context.forwarded_prefix}"
)
@then("CSV bus ack will not have any entry of successfully processed records")
def all_records_are_processed_successfully_in_the_batch_file(context):
file_rows = read_and_validate_csv_bus_ack_file_content(context)
all_valid = validate_bus_ack_file_for_successful_records(context, file_rows)
assert all_valid, "One or more records failed validation checks"
@then("Json bus ack will only contain file metadata and no failure record entry")
def json_bus_ack_will_only_contain_file_metadata_and_no_record_entries(context):
json_content = context.fileContentJson
assert json_content is not None, "BUS Ack JSON content is None"
validate_json_bus_ack_file_structure_and_metadata(context)
success = validate_json_bus_ack_file_failure_records(context, expected_failure=False)
assert success, "Failed to validate JSON bus ack file failure records"
@then("Json bus ack will only contain file metadata and correct failure record entries")
def json_bus_ack_will_only_contain_file_metadata_and_correct_failure_record_entries(
context,
):
json_content = context.fileContentJson
assert json_content is not None, "BUS Ack JSON content is None"
validate_json_bus_ack_file_structure_and_metadata(context)
success = validate_json_bus_ack_file_failure_records(context, expected_failure=True)
assert success, "Failed to validate JSON bus ack file failure records"
@then("Audit table will have correct status, queue name and record count for the processed batch file")
def validate_imms_audit_table(context):
table_query_response = fetch_batch_audit_table_detail(context.aws_profile_name, context.filename, context.S3_env)
assert isinstance(table_query_response, list) and table_query_response, (
f"Item not found in response for filename: {context.filename}"
)
item = table_query_response[0]
validate_audit_table_record(context, item, "Processed")
@then("The delta table will be populated with the correct data for all created records in batch file")
def validate_imms_delta_table_for_created_records_in_batch_file(context):
preload_delta_data(context)
validate_imms_delta_table_for_newly_created_records_in_batch_file(context)
@then("The delta table will not be populated with DPSFULL records in batch file")
def validate_imms_delta_table_for_dpsfull_records(context):
df = context.vaccine_df
check.is_true("IMMS_ID" in df.columns, "Column 'IMMS_ID' not found in vaccine_df")
valid_rows = df[df["IMMS_ID"].notnull()]
check.is_true(not valid_rows.empty, "No rows with non-null IMMS_ID found in vaccine_df")
grouped = valid_rows.groupby("IMMS_ID")
for imms_id in grouped:
clean_id = imms_id[0].replace("Immunization#", "")
delta_items = fetch_immunization_int_delta_detail_by_immsID(
context.aws_profile_name,
clean_id,
context.S3_env,
max_attempts=2,
delay=5,
)
check.is_true(
not delta_items,
f"Delta records were unexpectedly found for IMMS_ID: {clean_id}",
)
@then("The delta table will be populated with the correct data for all updated records in batch file")
def validate_imms_delta_table_for_updated_records(context):
if context.delta_cache is None:
preload_delta_data(context)
validate_imms_delta_table_for_updated_records_in_batch_file(context)
@then("The delta table will be populated with the correct data for all deleted records in batch file")
def validate_imms_delta_table_for_deleted_records(context):
if context.delta_cache is None:
preload_delta_data(context)
validate_imms_delta_table_for_deleted_records_in_batch_file(context)
@then(
parsers.parse(
"The imms event table will be populated with the correct data for '{operation}' event for records in batch file"
)
)
def validate_imms_event_table_for_all_records_in_batch_file(context, operation: Operation):
mapping = ActionMap[operation.lower()]
df = context.vaccine_df[context.vaccine_df["ACTION_FLAG"].str.lower() == mapping.action_flag.value.lower()]
df["UNIQUE_ID_COMBINED"] = df["UNIQUE_ID_URI"].astype(str) + "#" + df["UNIQUE_ID"].astype(str)
valid_rows = df[df["UNIQUE_ID_COMBINED"].notnull() & (df["UNIQUE_ID_COMBINED"] != "nan#nan")]
for idx, row in valid_rows.iterrows():
unique_id_combined = row["UNIQUE_ID_COMBINED"]
batch_record = {k: normalize(v) for k, v in row.to_dict().items()}
table_query_response = fetch_immunization_events_detail_by_IdentifierPK(
context.aws_profile_name, unique_id_combined, context.S3_env
)
assert "Items" in table_query_response and table_query_response["Count"] > 0, (
f"Item not found in response for unique_id_combined: {unique_id_combined}"
)
item = table_query_response["Items"][0]
df.at[idx, "IMMS_ID"] = item.get("PK")
context.ImmsID = item.get("PK").replace("Immunization#", "")
update_imms_id_for_all_related_rows(context.vaccine_df, unique_id_combined, context.ImmsID)
resource_json_str = item.get("Resource")
assert resource_json_str, "Resource field missing in item."
try:
resource = json.loads(resource_json_str)
except (TypeError, json.JSONDecodeError) as e:
print(f"Failed to parse Resource from item: {e}")
raise AssertionError("Failed to parse Resource from response item.")
assert resource is not None, "Resource is None in the response"
created_event = parse_imms_int_imms_event_response(resource)
nhs_number = batch_record.get("NHS_NUMBER") or "TBC"
fields_to_compare = [
("Operation", Operation[operation].value, item.get("Operation")),
("SupplierSystem", context.supplier_name, item.get("SupplierSystem")),
("PatientPK", f"Patient#{nhs_number}", item.get("PatientPK")),
(
"PatientSK",
f"{context.vaccine_type.upper()}#{context.ImmsID}",
item.get("PatientSK"),
),
("Version", int(context.expected_version), int(item.get("Version"))),
]
for name, expected, actual in fields_to_compare:
check.is_true(expected == actual, f"Expected {name}: {expected}, Actual {actual}")
validate_to_compare_batch_record_with_event_table_record(context, batch_record, created_event)
@then("all rejected records are listed in the csv bus ack file and no imms id is generated")
def all_record_are_rejected_for_given_field_name(context):
file_rows = read_and_validate_csv_bus_ack_file_content(context)
all_valid = validate_bus_ack_file_for_error(context, file_rows)
assert all_valid, "One or more records failed validation checks"
@then(parsers.parse("MNS event will be triggered with correct data for all '{event_type}' events where NHS is not null"))
def mns_event_will_be_triggered_with_correct_data_for_created_events_in_batch_file(context, event_type):
if context.mns_validation_required.strip().lower() != "true":
print(
f"MNS event validation is skipped since mns_validation_required is set to {context.mns_validation_required}"
)
return
action = event_type.upper() if event_type.upper() in ["CREATE", "UPDATE"] else "CREATE"
df = context.vaccine_df.dropna(subset=["IMMS_ID"]).copy()
df["IMMS_ID_CLEAN"] = df["IMMS_ID"].astype(str).str.replace("Immunization#", "", regex=False)
valid_rows = list(df.itertuples(index=False))
if not valid_rows:
print("No valid NHS rows found — skipping MNS validation.")
return
mns_event_will_be_triggered_for_batch_record(context=context, action=action, valid_rows=valid_rows)
@then("Api updated event will trigger MNS event with correct data")
def mns_event_will_be_triggered_with_correct_data_for_api_updated_events(context):
mns_event_will_be_triggered_with_correct_data(context=context, action="UPDATE")
def normalize(value):
return "" if pd.isna(value) or value == "" else value
def create_batch_file(context, file_ext: str = "csv", fileName: str = None, delimiter: str = "|"):
offset = datetime.now().astimezone().strftime("%z")[-2:]
context.FileTimestamp = datetime.now().astimezone().strftime("%Y%m%dT%H%M%S") + offset
context.file_extension = file_ext
timestamp_pattern = r"\d{8}T\d{8}"
if not fileName:
context.filename = generate_file_name(context)
else:
suffix = "" if re.search(timestamp_pattern, fileName) else f"_{context.FileTimestamp}"
context.filename = f"{fileName}{suffix}.{context.file_extension}"
save_record_to_batch_files_directory(context, delimiter)
print(f"Batch file created: {context.filename}")
def build_dataFrame_using_datatable(datatable, context):
timestamp = datetime.now(UTC).strftime("%Y%m%dT%H%M%S")
headers = datatable[0]
rows = datatable[1:]
table_list = [
(
row[headers.index("patient_id")],
f"{row[headers.index('unique_id')]}-{timestamp}",
)
for row in rows
]
records = []
for patient_id, unique_id in table_list:
context.patient_id = patient_id
record = build_batch_file(context, unique_id=unique_id)
flat_record = record.dict()
if "data" in flat_record:
flat_record = flat_record["data"]
records.append(flat_record)
context.vaccine_df = pd.DataFrame(records)
def update_imms_id_for_all_related_rows(df, unique_id_combined, imms_id):
mask = (df["UNIQUE_ID_URI"].astype(str) + "#" + df["UNIQUE_ID"].astype(str)) == unique_id_combined
df.loc[mask, "IMMS_ID"] = imms_id
def preload_delta_data(context):
df = context.vaccine_df
check.is_true("IMMS_ID" in df.columns, "Column 'IMMS_ID' not found in vaccine_df")
valid_rows = df[df["IMMS_ID"].notnull()]
check.is_true(not valid_rows.empty, "No rows with non-null IMMS_ID found in vaccine_df")
grouped = valid_rows.groupby("IMMS_ID")
context.delta_cache = {}
for imms_id, group in grouped:
clean_id = imms_id.replace("Immunization#", "")
delta_items = fetch_immunization_int_delta_detail_by_immsID(context.aws_profile_name, clean_id, context.S3_env)
check.is_true(delta_items, f"No delta records returned for IMMS_ID: {clean_id}")
context.delta_cache[clean_id] = {"rows": group, "delta_items": delta_items}
def validate_imms_delta_table_for_newly_created_records_in_batch_file(context):
for clean_id, data in context.delta_cache.items():
rows = data["rows"]
delta_items = data["delta_items"]
create_items = [i for i in delta_items if i.get("Operation") == "CREATE"]
check.is_true(
len(create_items) == 1,
f"Expected exactly 1 CREATE record for IMMS_ID {clean_id}, found {len(create_items)}",
)
create_item = create_items[0]
for _, row in rows[rows["ACTION_FLAG"] == "NEW"].iterrows():
batch_record = {k: normalize(v) for k, v in row.to_dict().items()}
validate_imms_delta_record_with_batch_record(
context,
batch_record,
create_item,
Operation.created.value,
ActionFlag.created.value,
)
def validate_imms_delta_table_for_updated_records_in_batch_file(context):
for clean_id, data in context.delta_cache.items():
rows = data["rows"]
delta_items = data["delta_items"]
update_items = [i for i in delta_items if i.get("Operation") == "UPDATE"]
check.is_true(update_items, f"No UPDATE records for IMMS_ID {clean_id}")
updated_index = context.expected_version - 2
for _, row in rows[rows["ACTION_FLAG"] == "UPDATE"].iterrows():
batch_record = {k: normalize(v) for k, v in row.to_dict().items()}
item = update_items.pop(updated_index)
validate_imms_delta_record_with_batch_record(
context,
batch_record,
item,
Operation.updated.value,
ActionFlag.updated.value,
)
def validate_imms_delta_table_for_deleted_records_in_batch_file(context):
for clean_id, data in context.delta_cache.items():
rows = data["rows"]
delta_items = data["delta_items"]
delete_item = next((i for i in delta_items if i.get("Operation") == "DELETE"), None)
check.is_true(delete_item, f"No DELETE record for IMMS_ID {clean_id}")
delete_rows = rows[rows["ACTION_FLAG"] == "DELETE"]
check.is_true(
len(delete_rows) == 1,
f"Expected exactly 1 DELETE row in batch file for IMMS_ID {clean_id}, found {len(delete_rows)}",
)
row = delete_rows.iloc[0]
batch_record = {k: normalize(v) for k, v in row.to_dict().items()}
validate_imms_delta_record_with_batch_record(
context,
batch_record,
delete_item,
Operation.deleted.value,
ActionFlag.deleted.value,
)
def _is_null_nhs_row(row) -> bool:
return str(row.UNIQUE_ID).startswith("NullNHS") or str(row.NHS_NUMBER).strip() in (
"",
"None",
"nan",
)
def _assert_no_mns_events_for_null_nhs_rows(context, null_nhs_rows, wait_seconds=20):
if not null_nhs_rows:
print("No NullNHS rows — skipping negative MNS check.")
return
unexpected = read_messages_for_batch(
context,
queue_type="notification",
valid_rows=null_nhs_rows,
expected_count=len(null_nhs_rows) + 1,
max_total_wait_seconds=wait_seconds,
)
assert not unexpected, f"Unexpected MNS events received for NullNHS records: {[msg.dataref for msg in unexpected]}"
def mns_event_will_be_triggered_for_batch_record(context, action, valid_rows):
null_nhs_rows = [row for row in valid_rows if _is_null_nhs_row(row)]
positive_rows = [row for row in valid_rows if not _is_null_nhs_row(row)]
row_lookup = {(str(row.NHS_NUMBER), row.IMMS_ID_CLEAN): row for row in positive_rows}
messages = read_messages_for_batch(
context,
queue_type="notification",
valid_rows=positive_rows,
expected_count=len(positive_rows),
)
print(f"Read {len(messages)} {action} message(s) from SQS")
assert messages, f"Expected at least one {action} message but queue returned empty"
for msg in messages:
nhs = msg.subject
imms_id = msg.dataref.split("/")[-1]
key = (nhs, imms_id)
assert key in row_lookup, f"Message NHS {nhs} with IMMS_ID {imms_id} does not match any row"
row = row_lookup[key]
context.nhs_number = row.NHS_NUMBER
context.gp_code = get_gp_code_by_nhs_number(row.NHS_NUMBER)
context.patient_age = calculate_age(row.PERSON_DOB, row.DATE_AND_TIME)
context.ImmsID = row.IMMS_ID_CLEAN
print(f"Validating message for NHS {nhs}, IMMS ID {context.ImmsID}")
validate_sqs_message_for_batch_record(context, msg, row)
_assert_no_mns_events_for_null_nhs_rows(context, null_nhs_rows)
def validate_sqs_message_for_batch_record(context, message_body, row):
check.is_true(message_body.specversion == "1.0")
check.is_true(message_body.source == "uk.nhs.vaccinations-data-flow-management")
check.is_true(message_body.type == "imms-vaccination-record-change-1")
check.is_true(is_valid_uuid(message_body.id), f"Invalid UUID: {message_body.id}")
imms_date_time = normalize_utc_suffix(row.DATE_AND_TIME)
check.is_true(
message_body.time == f"{imms_date_time}Z",
f"msn event for {row.NHS_NUMBER} Time missing or mismatch: message_body.time = {message_body.time}, imms_date_time = {imms_date_time}",
)
expected_nhs_number = row.NHS_NUMBER
if expected_nhs_number is None:
expected_nhs_number = ""
check.is_true(
message_body.subject == expected_nhs_number,
f"msn event for {row.NHS_NUMBER}Subject mismatch: expected {expected_nhs_number}, got {message_body.subject}",
)
check.is_true(
message_body.dataref == f"{context.url}/{row.IMMS_ID_CLEAN}",
f"msn event for {row.NHS_NUMBER} DataRef mismatch: expected {context.url}/{row.IMMS_ID_CLEAN}, got {message_body.dataref}",
)
if context.S3_env not in ["int", "preprod"]:
check.is_true(
message_body.filtering is not None,
f"msn event for {row.NHS_NUMBER} Filtering is missing in the message body",
)
check.is_true(
normalize(message_body.filtering.generalpractitioner) == normalize(context.gp_code),
f"msn event for {row.NHS_NUMBER} GP code mismatch: expected {context.gp_code}, got {message_body.filtering.generalpractitioner}",
)
expected_org = row.SITE_CODE
check.is_true(
normalize(message_body.filtering.sourceorganisation) == normalize(expected_org),
f"msn event for {row.NHS_NUMBER} Source org mismatch: expected {expected_org}, got {message_body.filtering.sourceorganisation}",
)
check.is_true(
message_body.filtering.sourceapplication.upper() == context.supplier_name.upper(),
f"msn event for {row.NHS_NUMBER} Source application mismatch: expected {context.supplier_name}, got {message_body.filtering.sourceapplication}",
)
check.is_true(
message_body.filtering.subjectage == context.patient_age,
f"msn event for {row.NHS_NUMBER} Age mismatch: expected {context.patient_age}, got {message_body.filtering.subjectage}",
)
check.is_true(
message_body.filtering.immunisationtype == context.vaccine_type.upper(),
f"msn event for {row.NHS_NUMBER} Immunisation type mismatch: expected {context.vaccine_type.upper()}, got {message_body.filtering.immunisationtype}",
)
action = row.ACTION_FLAG.upper() if row.ACTION_FLAG.upper() in ["UPDATE", "DELETE"] else "CREATE"
check.is_true(
message_body.filtering.action == action.upper(),
f"msn event for {row.NHS_NUMBER} Action mismatch: expected {action.upper()}, got {message_body.filtering.action}",
)
else:
check.is_true(
message_body.filtering is None,
f"msn event for {row.NHS_NUMBER} Filtering is present in the message body when it shouldn't be for int environment",
)
@then("MNS event will be triggered with correct data for both events where NHS is not null")
def mns_event_will_be_triggered_with_correct_data_for_both_events_in_batch_file(
context,
):
if context.mns_validation_required.strip().lower() != "true":
print(
f"MNS event validation is skipped since mns_validation_required is set to {context.mns_validation_required}"
)
return
df = context.vaccine_df.dropna(subset=["IMMS_ID"]).copy()
df["IMMS_ID_CLEAN"] = df["IMMS_ID"].astype(str).str.replace("Immunization#", "", regex=False)
all_rows = list(df.itertuples(index=False))
if not all_rows:
print("No rows found — skipping MNS validation.")
return
null_nhs_rows = [row for row in all_rows if _is_null_nhs_row(row)]
expected_rows = [row for row in all_rows if not _is_null_nhs_row(row)]
messages = read_messages_for_batch(
context,
queue_type="notification",
valid_rows=expected_rows,
expected_count=len(expected_rows),
)
print(f"Read {len(messages)} message(s) from SQS")
assert len(messages) == len(expected_rows), f"Expected {len(expected_rows)} MNS events, but received {len(messages)}"
nhs_counts = Counter(msg.subject for msg in messages)
expected_nhs_numbers = {row.NHS_NUMBER for row in expected_rows}
assert len(nhs_counts) == len(expected_nhs_numbers), (
f"Expected {len(expected_nhs_numbers)} NHS numbers, but got {len(nhs_counts)}: {list(nhs_counts.keys())}"
)
# Check each NHS number has exactly 2 events (one CREATE, one UPDATE)
for nhs, count in nhs_counts.items():
assert count == 2, f"NHS {nhs} expected 2 events (CREATE + UPDATE) but received {count}"
_assert_no_mns_events_for_null_nhs_rows(context, null_nhs_rows)
def build_batch_row_from_api_object(context, action):
patient = context.create_object.contained[1]
imms = context.create_object
performer_org = imms.performer[1].actor.identifier.value
occurrenceDateTime = iso_to_compact(imms.occurrenceDateTime.replace("-", "").replace(":", ""))
return {
"NHS_NUMBER": patient.identifier[0].value,
"PERSON_FORENAME": patient.name[0].given[0],
"PERSON_SURNAME": patient.name[0].family,
"PERSON_GENDER_CODE": patient.gender,
"PERSON_DOB": patient.birthDate.replace("-", ""),
"PERSON_POSTCODE": patient.address[0].postalCode,
"ACTION_FLAG": action.upper(),
"UNIQUE_ID": imms.identifier[0].value,
"UNIQUE_ID_URI": imms.identifier[0].system,
"SITE_CODE": performer_org,
"DATE_AND_TIME": occurrenceDateTime,
}