Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ def get_best_cohort(cohort_results: dict[str, CohortResult]) -> tuple[Status, li

@staticmethod
def get_exclusion_rules(
cohort: IterationCohort, rules_filter: Iterable[rules.IterationRule]
cohort: IterationCohort, filter_rules: Iterable[rules.IterationRule]
) -> Iterator[rules.IterationRule]:
return (
ir
for ir in rules_filter
for ir in filter_rules
if ir.cohort_label is None
or cohort.cohort_label == ir.cohort_label
or (isinstance(ir.cohort_label, (list, set, tuple)) and cohort.cohort_label in ir.cohort_label)
Expand All @@ -84,59 +84,30 @@ def get_exclusion_rules(
def get_rules_by_type(
active_iteration: Iteration,
) -> tuple[tuple[rules.IterationRule, ...], tuple[rules.IterationRule, ...]]:
rules_by_type = {
rule_type: tuple(rule for rule in active_iteration.iteration_rules if attrgetter("type")(rule) == rule_type)
for rule_type in (rules.RuleType.filter, rules.RuleType.suppression, rules.RuleType.redirect)
}
rules_filter = rules_by_type[rules.RuleType.filter]
rules_suppression = rules_by_type[rules.RuleType.suppression]
return rules_filter, rules_suppression
filter_rules, suppression_rules = (
tuple(rule for rule in active_iteration.iteration_rules if attrgetter("type")(rule) == rule_type)
for rule_type in (rules.RuleType.filter, rules.RuleType.suppression)
)
return filter_rules, suppression_rules

def evaluate_eligibility(self) -> eligibility.EligibilityStatus:
"""Iterates over campaign groups, evaluates eligibility, and returns a consolidated status."""
results: dict[ConditionName, IterationResult] = {}
condition_results: dict[ConditionName, IterationResult] = {}

for condition_name, campaign_group in self.campaigns_grouped_by_condition_name:
iteration_results: dict[str, IterationResult] = {}

for active_iteration in [cc.current_iteration for cc in campaign_group]:
cohort_results: dict[str, CohortResult] = {}

rules_filter, rules_suppression = self.get_rules_by_type(active_iteration)
filter_rules, suppression_rules = self.get_rules_by_type(active_iteration)
for cohort in sorted(active_iteration.iteration_cohorts, key=attrgetter("priority")):
# Check Base Eligibility
# Base Eligibility - check
if cohort.cohort_label in self.person_cohorts or cohort.cohort_label == magic_cohort:
is_eligible: bool = True
is_eligible = self.evaluate_filter_rules(
cohort,
cohort_results,
rules_filter,
is_eligible=is_eligible,
)

if is_eligible:
is_actionable: bool = True
suppression_reasons, is_actionable = self.evaluate_suppression_rules(
cohort,
rules_suppression,
is_actionable=is_actionable,
)
if cohort.cohort_label is not None:
key = cohort.cohort_label
if is_actionable:
cohort_results[key] = CohortResult(
cohort.cohort_group if cohort.cohort_group else key,
Status.actionable,
[],
str(cohort.positive_description),
)
else:
cohort_results[key] = CohortResult(
cohort.cohort_group if cohort.cohort_group else key,
Status.not_actionable,
suppression_reasons,
str(cohort.positive_description),
)
# Eligibility - check
if self.is_eligible_by_filter_rules(cohort, cohort_results, filter_rules):
# Actionability - evaluation
self.evaluate_suppression_rules(cohort, cohort_results, suppression_rules)

# Not base eligible
elif cohort.cohort_label is not None:
Expand All @@ -156,7 +127,7 @@ def evaluate_eligibility(self) -> eligibility.EligibilityStatus:
best_candidate = max(iteration_results.values(), key=lambda r: r.status.value)
else:
best_candidate = IterationResult(eligibility.Status.not_eligible, [])
results[condition_name] = best_candidate
condition_results[condition_name] = best_candidate

# Consolidate all the results and return
final_result = [
Expand All @@ -165,23 +136,24 @@ def evaluate_eligibility(self) -> eligibility.EligibilityStatus:
status=active_iteration_result.status,
cohort_results=active_iteration_result.cohort_results,
)
for condition_name, active_iteration_result in results.items()
for condition_name, active_iteration_result in condition_results.items()
]
return eligibility.EligibilityStatus(conditions=final_result)

def evaluate_filter_rules(
def is_eligible_by_filter_rules(
self,
cohort: IterationCohort,
cohort_results: dict[str, CohortResult],
rules_filter: Iterable[rules.IterationRule],
*,
is_eligible: bool,
filter_rules: Iterable[rules.IterationRule],
) -> bool:
is_eligible = True
priority_getter = attrgetter("priority")
sorted_rules_by_priority = sorted(self.get_exclusion_rules(cohort, rules_filter), key=priority_getter)
sorted_rules_by_priority = sorted(self.get_exclusion_rules(cohort, filter_rules), key=priority_getter)

for _, rule_group in groupby(sorted_rules_by_priority, key=priority_getter):
status, group_actionable, group_exclusions, rule_stop = self.evaluate_rules_priority_group(rule_group)
status, group_inclusion_reasons, group_exclusion_reasons, rule_stop = self.evaluate_rules_priority_group(
rule_group
)
if status.is_exclusion:
if cohort.cohort_label is not None:
cohort_results[str(cohort.cohort_label)] = CohortResult(
Expand All @@ -197,27 +169,47 @@ def evaluate_filter_rules(
def evaluate_suppression_rules(
self,
cohort: IterationCohort,
rules_suppression: Iterable[rules.IterationRule],
*,
is_actionable: bool,
) -> tuple[list, bool]:
cohort_results: dict[str, CohortResult],
suppression_rules: Iterable[rules.IterationRule],
) -> None:
is_actionable: bool = True
priority_getter = attrgetter("priority")
suppression_reasons = []
sorted_rules_by_priority = sorted(self.get_exclusion_rules(cohort, rules_suppression), key=priority_getter)

sorted_rules_by_priority = sorted(self.get_exclusion_rules(cohort, suppression_rules), key=priority_getter)

for _, rule_group in groupby(sorted_rules_by_priority, key=priority_getter):
status, group_actionable, group_exclusions, rule_stop = self.evaluate_rules_priority_group(rule_group)
status, group_inclusion_reasons, group_exclusion_reasons, rule_stop = self.evaluate_rules_priority_group(
rule_group
)
if status.is_exclusion:
is_actionable = False
suppression_reasons.extend(group_exclusions)
suppression_reasons.extend(group_exclusion_reasons)
if rule_stop:
break
return suppression_reasons, is_actionable

if cohort.cohort_label is not None:
key = cohort.cohort_label
if is_actionable:
cohort_results[key] = CohortResult(
cohort.cohort_group if cohort.cohort_group else key,
Status.actionable,
[],
str(cohort.positive_description),
)
else:
cohort_results[key] = CohortResult(
cohort.cohort_group if cohort.cohort_group else key,
Status.not_actionable,
suppression_reasons,
str(cohort.positive_description),
)

def evaluate_rules_priority_group(
self, rules_group: Iterator[rules.IterationRule]
) -> tuple[eligibility.Status, list[eligibility.Reason], list[eligibility.Reason], bool]:
is_rule_stop = False
actionable_reasons, exclusion_reasons = [], []
inclusion_reasons, exclusion_reasons = [], []
best_status = eligibility.Status.not_eligible

for rule in rules_group:
Expand All @@ -229,6 +221,6 @@ def evaluate_rules_priority_group(
exclusion_reasons.append(reason)
else:
best_status = eligibility.Status.actionable
actionable_reasons.append(reason)
inclusion_reasons.append(reason)

return best_status, actionable_reasons, exclusion_reasons, is_rule_stop
return best_status, inclusion_reasons, exclusion_reasons, is_rule_stop
10 changes: 8 additions & 2 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ def persisted_person(person_table: Any, faker: Faker) -> Generator[eligibility.N
nhs_number = eligibility.NHSNumber(faker.nhs_number())
date_of_birth = eligibility.DateOfBirth(faker.date_of_birth(minimum_age=18, maximum_age=65))

for row in (rows := person_rows_builder(nhs_number, date_of_birth=date_of_birth, cohorts=["cohort1"])):
for row in (
rows := person_rows_builder(nhs_number, date_of_birth=date_of_birth, postcode="hp1", cohorts=["cohort1"])
):
person_table.put_item(Item=row)

yield nhs_number
Expand All @@ -232,7 +234,11 @@ def persisted_77yo_person(person_table: Any, faker: Faker) -> Generator[eligibil
nhs_number = eligibility.NHSNumber(faker.nhs_number())
date_of_birth = eligibility.DateOfBirth(faker.date_of_birth(minimum_age=77, maximum_age=77))

for row in (rows := person_rows_builder(nhs_number, date_of_birth=date_of_birth, cohorts=["cohort1", "cohort2"])):
for row in (
rows := person_rows_builder(
nhs_number, date_of_birth=date_of_birth, postcode="hp1", cohorts=["cohort1", "cohort2"]
)
):
person_table.put_item(Item=row)

yield nhs_number
Expand Down
16 changes: 8 additions & 8 deletions tests/unit/services/calculators/test_eligibility_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,15 +811,15 @@ def test_status_on_cohort_attribute_level(


@pytest.mark.parametrize(
("person_cohorts", "cohort_label", "expected_status", "test_comment"),
("person_cohorts", "expected_status", "test_comment"),
[
(["cohort1", "cohort2"], "cohort1", Status.actionable, "cohort1 is not actionable, cohort 2 is actionable"),
(["cohort2", "cohort3"], "cohort1", Status.actionable, "doesn't match the cohort label"),
(["cohort1"], "cohort1", Status.not_actionable, "cohort1 is not actionable"),
(["cohort1", "cohort2"], Status.actionable, "cohort1 is not actionable, cohort 2 is actionable"),
(["cohort3", "cohort2"], Status.actionable, "cohort3 is not eligible, cohort 2 is actionable"),
(["cohort1"], Status.not_actionable, "cohort1 is not actionable"),
],
)
def test_status_if_iteration_rules_contains_cohort_label_field(
person_cohorts, cohort_label: str, expected_status: Status, test_comment: str, faker: Faker
person_cohorts, expected_status: Status, test_comment: str, faker: Faker
):
# Given
nhs_number = NHSNumber(faker.nhs_number())
Expand All @@ -835,7 +835,7 @@ def test_status_if_iteration_rules_contains_cohort_label_field(
rule_builder.IterationCohortFactory.build(cohort_label="cohort1"),
rule_builder.IterationCohortFactory.build(cohort_label="cohort2"),
],
iteration_rules=[rule_builder.PersonAgeSuppressionRuleFactory.build(cohort_label=cohort_label)],
iteration_rules=[rule_builder.PersonAgeSuppressionRuleFactory.build(cohort_label="cohort1")],
)
],
)
Expand Down Expand Up @@ -883,8 +883,8 @@ def test_rules_stop_behavior(
) -> None:
# Given
nhs_number = NHSNumber(faker.nhs_number())
date_obj = datetime.datetime.strptime("19980309", "%Y%m%d").replace(tzinfo=datetime.UTC).date()
person_rows = person_rows_builder(nhs_number, date_of_birth=(DateOfBirth(date_obj)), cohorts=["cohort1"])
date_of_birth = DateOfBirth(faker.date_of_birth(minimum_age=66, maximum_age=74))
person_rows = person_rows_builder(nhs_number, date_of_birth=date_of_birth, cohorts=["cohort1"])

# Build campaign configuration
campaign_config = rule_builder.CampaignConfigFactory.build(
Expand Down
15 changes: 15 additions & 0 deletions tests/unit/views/test_eligibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ def test_build_eligibility_cohorts_results_consider_only_cohorts_with_best_statu
condition: Condition = ConditionFactory.build(
status=Status.not_actionable,
cohort_results=[
CohortResultFactory.build(
cohort_code="cohort_group1",
status=Status.not_actionable,
),
CohortResultFactory.build(
cohort_code="cohort_group1",
status=Status.not_actionable,
Expand Down Expand Up @@ -165,6 +169,17 @@ def test_build_suitability_results_with_deduplication():
)
],
),
CohortResultFactory.build(
cohort_code="cohort_group3",
status=Status.not_eligible,
reasons=[
Reason(
rule_type=RuleType.filter,
rule_name=RuleName("Exclude is present in sw1"),
rule_result=RuleResult("memberof sw1"),
)
],
),
],
)

Expand Down