Skip to content

Commit 0a4e0af

Browse files
vladvildanovCopilotpetyaslavova
committed
Refactored health check logic for MultiDBClient (#3994)
* Refactored sync health check * Refactored async multidb health check * Removed double exception handling * Update docs/geographic_failover.rst Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update tests/test_asyncio/test_multidb/test_healthcheck.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update tests/test_multidb/test_healthcheck.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Refactored health checks to be completion-based * Codestlye fixes * Fixed argument name * Fixed redundant timeout * Refactored health check to be async-only * Refactored event loop execution in sync client * Fixed proper handling of failures and updated docs * Fixed correct initial health check execution * Fixed event loop issue * Fixed flacky test * Fixed coroutine execution * Fixed flacky test * Fixed correct connection cleanup * Fixed 3.12 compatibility issue * Fixed race condition * Updated exception handling * Fix avoiding indefinite hangs * Refactor health checks to use clients instead of pools * Removed unused if statement * Updated to broaded exception * Fixed passing not supported arguments * Updated docs, added new properties to health check cluster client --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: petyaslavova <petya.slavova@redis.com>
1 parent 15492c9 commit 0a4e0af

21 files changed

Lines changed: 2285 additions & 1417 deletions

docs/geographic_failover.rst

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ MultiDbConfig
193193
health_check_interval: float = DEFAULT_HEALTH_CHECK_INTERVAL # seconds
194194
health_check_probes: int = DEFAULT_HEALTH_CHECK_PROBES
195195
health_check_delay: float = DEFAULT_HEALTH_CHECK_DELAY # seconds
196+
health_check_timeout: float = DEFAULT_HEALTH_CHECK_TIMEOUT # seconds
196197
health_check_policy: HealthCheckPolicies = DEFAULT_HEALTH_CHECK_POLICY,
197198
198199
# Failure detector
@@ -336,26 +337,22 @@ reverse proxy behind an actual REST API endpoint.
336337
337338
**Custom Health Checks**
338339
~~~~~~~~~~~~~~~~~~~~~
339-
You can add custom health checks for specific requirements:
340+
You can add custom health checks for specific requirements. Please notice that all health checks are executed within
341+
asyncio event loop, so please ensure that `check_health` method is async:
340342

341343
.. code-block:: python
342344
343-
from redis.multidb.healthcheck import AbstractHealthCheck
344-
from redis.retry import Retry
345-
from redis.utils import dummy_fail
346-
class PingHealthCheck(AbstractHealthCheck):
347-
def __init__(self, retry: Retry):
348-
super().__init__(retry=retry)
349-
def check_health(self, database) -> bool:
350-
return self._retry.call_with_retry(
351-
lambda: self._returns_pong(database),
352-
lambda _: dummy_fail()
353-
)
354-
def _returns_pong(self, database) -> bool:
355-
expected_message = ["PONG", b"PONG"]
356-
actual_message = database.client.execute_command("PING")
357-
return actual_message in expected_message
345+
from redis.asyncio.multidb.healthcheck import AbstractHealthCheck, AsyncRedisClientT
346+
347+
class EchoHealthCheck(AbstractHealthCheck):
348+
"""
349+
Health check based on ECHO command.
350+
"""
358351
352+
async def check_health(self, database, hc_client: AsyncRedisClientT) -> bool:
353+
await connection.send_command("ECHO", "healthcheck")
354+
response = await connection.read_response()
355+
return response in (b"healthcheck", "healthcheck")
359356
360357
Failure Detection (Reactive Monitoring)
361358
-----------------

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ markers = [
8686
"replica: replica tests",
8787
"experimental: run only experimental tests",
8888
"cp_integration: credential provider integration tests",
89+
"no_mock_connections: skip the autouse mock_health_check_connections fixture",
8990
]
9091
asyncio_default_fixture_loop_scope = "function"
9192
asyncio_mode = "auto"

redis/asyncio/multidb/client.py

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,9 @@ def __init__(self, config: MultiDbConfig):
4545
if not config.health_checks
4646
else config.health_checks
4747
)
48-
4948
self._health_check_interval = config.health_check_interval
50-
self._health_check_policy: HealthCheckPolicy = config.health_check_policy.value(
51-
config.health_check_probes, config.health_check_delay
49+
self._health_check_policy: HealthCheckPolicy = (
50+
config.health_check_policy.value()
5251
)
5352
self._failure_detectors = (
5453
config.default_failure_detectors()
@@ -89,14 +88,25 @@ async def __aenter__(self: "MultiDBClient") -> "MultiDBClient":
8988
await self.initialize()
9089
return self
9190

92-
async def __aexit__(self, exc_type, exc_value, traceback):
91+
async def aclose(self):
92+
# Cancel background tasks
9393
if self._recurring_hc_task:
9494
self._recurring_hc_task.cancel()
9595
if self._half_open_state_task:
9696
self._half_open_state_task.cancel()
9797
for hc_task in self._hc_tasks:
9898
hc_task.cancel()
9999

100+
# Close health check connection pools
101+
await self._health_check_policy.close()
102+
103+
# Close database client
104+
if self.command_executor.active_database:
105+
await self.command_executor.active_database.client.aclose()
106+
107+
async def __aexit__(self, exc_type, exc_value, traceback):
108+
await self.aclose()
109+
100110
async def initialize(self):
101111
"""
102112
Perform initialization of databases to define their initial state.
@@ -326,23 +336,15 @@ async def _check_databases_health(self) -> dict[Database, bool]:
326336
Runs health checks as a recurring task.
327337
Runs health checks against all databases.
328338
"""
329-
try:
330-
task_to_db: dict[asyncio.Task, Database] = {}
339+
task_to_db: dict[asyncio.Task, Database] = {}
331340

332-
self._hc_tasks = []
333-
for database, _ in self._databases:
334-
task = asyncio.create_task(self._check_db_health(database))
335-
task_to_db[task] = database
336-
self._hc_tasks.append(task)
341+
self._hc_tasks = []
342+
for database, _ in self._databases:
343+
task = asyncio.create_task(self._check_db_health(database))
344+
task_to_db[task] = database
345+
self._hc_tasks.append(task)
337346

338-
results = await asyncio.wait_for(
339-
asyncio.gather(*self._hc_tasks, return_exceptions=True),
340-
timeout=self._health_check_interval,
341-
)
342-
except asyncio.TimeoutError:
343-
raise asyncio.TimeoutError(
344-
"Health check execution exceeds health_check_interval"
345-
)
347+
results = await asyncio.gather(*self._hc_tasks, return_exceptions=True)
346348

347349
# Map end results to databases
348350
db_results = {
@@ -360,8 +362,6 @@ async def _check_databases_health(self) -> dict[Database, bool]:
360362
)
361363

362364
db_results[unhealthy_db] = False
363-
elif isinstance(result, Exception):
364-
db_results[database] = False
365365

366366
return db_results
367367

@@ -427,10 +427,6 @@ def _on_circuit_state_change_callback(
427427
if old_state != CBState.CLOSED and new_state == CBState.CLOSED:
428428
logger.info(f"Database {circuit.database} is reachable again.")
429429

430-
async def aclose(self):
431-
if self.command_executor.active_database:
432-
await self.command_executor.active_database.client.aclose()
433-
434430

435431
def _half_open_circuit(circuit: CircuitBreaker):
436432
circuit.state = CBState.HALF_OPEN

redis/asyncio/multidb/config.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
DEFAULT_HEALTH_CHECK_INTERVAL,
2222
DEFAULT_HEALTH_CHECK_POLICY,
2323
DEFAULT_HEALTH_CHECK_PROBES,
24+
DEFAULT_HEALTH_CHECK_TIMEOUT,
2425
HealthCheck,
2526
HealthCheckPolicies,
2627
PingHealthCheck,
@@ -110,6 +111,8 @@ class MultiDbConfig:
110111
health_check_interval: Time interval for executing health checks.
111112
health_check_probes: Number of attempts to evaluate the health of a database.
112113
health_check_delay: Delay between health check attempts.
114+
health_check_timeout: Timeout for the full health check operation (including all probes).
115+
health_check_policy: Policy for determining database health based on health checks.
113116
failover_strategy: Optional strategy for handling database failover scenarios.
114117
failover_attempts: Number of retries allowed for failover operations.
115118
failover_delay: Delay between failover attempts.
@@ -149,6 +152,7 @@ class MultiDbConfig:
149152
health_check_interval: float = DEFAULT_HEALTH_CHECK_INTERVAL
150153
health_check_probes: int = DEFAULT_HEALTH_CHECK_PROBES
151154
health_check_delay: float = DEFAULT_HEALTH_CHECK_DELAY
155+
health_check_timeout: float = DEFAULT_HEALTH_CHECK_TIMEOUT
152156
health_check_policy: HealthCheckPolicies = DEFAULT_HEALTH_CHECK_POLICY
153157
failover_strategy: Optional[AsyncFailoverStrategy] = None
154158
failover_attempts: int = DEFAULT_FAILOVER_ATTEMPTS
@@ -213,7 +217,11 @@ def default_failure_detectors(self) -> List[AsyncFailureDetector]:
213217

214218
def default_health_checks(self) -> List[HealthCheck]:
215219
return [
216-
PingHealthCheck(),
220+
PingHealthCheck(
221+
health_check_probes=self.health_check_probes,
222+
health_check_delay=self.health_check_delay,
223+
health_check_timeout=self.health_check_timeout,
224+
),
217225
]
218226

219227
def default_failover_strategy(self) -> AsyncFailoverStrategy:

0 commit comments

Comments
 (0)