Skip to content

Commit 4661929

Browse files
author
vaninvv
committed
feat redis: add CLUSTER SHARDS option
Adding support for CLUSTER SHARDS to determine topology instead of CLUSTER SLOTS. Unlike cluster slots, it includes unavailable hosts (marked with "health": "failed") in the list of shards. This is important for determining the source of the problem: whether we cannot connect to the host, or the host is unavailable and the cluster itself is aware of it. commit_hash:2fccb9fbebd76ec9093937b77b4f89c1635ead20
1 parent 22dbc4b commit 4661929

30 files changed

Lines changed: 2734 additions & 664 deletions

.mapping.json

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4016,6 +4016,11 @@
40164016
"redis/functional_tests/cluster_auto_topology_pubsub/static_config.yaml":"taxi/uservices/userver/redis/functional_tests/cluster_auto_topology_pubsub/static_config.yaml",
40174017
"redis/functional_tests/cluster_auto_topology_pubsub/tests/conftest.py":"taxi/uservices/userver/redis/functional_tests/cluster_auto_topology_pubsub/tests/conftest.py",
40184018
"redis/functional_tests/cluster_auto_topology_pubsub/tests/test_redis_pubsub.py":"taxi/uservices/userver/redis/functional_tests/cluster_auto_topology_pubsub/tests/test_redis_pubsub.py",
4019+
"redis/functional_tests/cluster_shards_auto_topology/CMakeLists.txt":"taxi/uservices/userver/redis/functional_tests/cluster_shards_auto_topology/CMakeLists.txt",
4020+
"redis/functional_tests/cluster_shards_auto_topology/redis_service.cpp":"taxi/uservices/userver/redis/functional_tests/cluster_shards_auto_topology/redis_service.cpp",
4021+
"redis/functional_tests/cluster_shards_auto_topology/static_config.yaml":"taxi/uservices/userver/redis/functional_tests/cluster_shards_auto_topology/static_config.yaml",
4022+
"redis/functional_tests/cluster_shards_auto_topology/tests/conftest.py":"taxi/uservices/userver/redis/functional_tests/cluster_shards_auto_topology/tests/conftest.py",
4023+
"redis/functional_tests/cluster_shards_auto_topology/tests/test_redis_cluster.py":"taxi/uservices/userver/redis/functional_tests/cluster_shards_auto_topology/tests/test_redis_cluster.py",
40194024
"redis/functional_tests/database_index/CMakeLists.txt":"taxi/uservices/userver/redis/functional_tests/database_index/CMakeLists.txt",
40204025
"redis/functional_tests/database_index/redis_service.cpp":"taxi/uservices/userver/redis/functional_tests/database_index/redis_service.cpp",
40214026
"redis/functional_tests/database_index/static_config.yaml":"taxi/uservices/userver/redis/functional_tests/database_index/static_config.yaml",
@@ -4078,6 +4083,7 @@
40784083
"redis/include/userver/storages/redis/sharding_strategies.hpp":"taxi/uservices/userver/redis/include/userver/storages/redis/sharding_strategies.hpp",
40794084
"redis/include/userver/storages/redis/subscribe_client.hpp":"taxi/uservices/userver/redis/include/userver/storages/redis/subscribe_client.hpp",
40804085
"redis/include/userver/storages/redis/subscription_token.hpp":"taxi/uservices/userver/redis/include/userver/storages/redis/subscription_token.hpp",
4086+
"redis/include/userver/storages/redis/topology_update_method.hpp":"taxi/uservices/userver/redis/include/userver/storages/redis/topology_update_method.hpp",
40814087
"redis/include/userver/storages/redis/transaction.hpp":"taxi/uservices/userver/redis/include/userver/storages/redis/transaction.hpp",
40824088
"redis/include/userver/storages/redis/ttl_reply.hpp":"taxi/uservices/userver/redis/include/userver/storages/redis/ttl_reply.hpp",
40834089
"redis/include/userver/storages/redis/utest/redis_fixture.hpp":"taxi/uservices/userver/redis/include/userver/storages/redis/utest/redis_fixture.hpp",
@@ -4107,6 +4113,12 @@
41074113
"redis/src/storages/redis/impl/cluster_shard.cpp":"taxi/uservices/userver/redis/src/storages/redis/impl/cluster_shard.cpp",
41084114
"redis/src/storages/redis/impl/cluster_shard.hpp":"taxi/uservices/userver/redis/src/storages/redis/impl/cluster_shard.hpp",
41094115
"redis/src/storages/redis/impl/cluster_shard_test.cpp":"taxi/uservices/userver/redis/src/storages/redis/impl/cluster_shard_test.cpp",
4116+
"redis/src/storages/redis/impl/cluster_shards_query.cpp":"taxi/uservices/userver/redis/src/storages/redis/impl/cluster_shards_query.cpp",
4117+
"redis/src/storages/redis/impl/cluster_shards_query.hpp":"taxi/uservices/userver/redis/src/storages/redis/impl/cluster_shards_query.hpp",
4118+
"redis/src/storages/redis/impl/cluster_shards_query_test.cpp":"taxi/uservices/userver/redis/src/storages/redis/impl/cluster_shards_query_test.cpp",
4119+
"redis/src/storages/redis/impl/cluster_slots_query.cpp":"taxi/uservices/userver/redis/src/storages/redis/impl/cluster_slots_query.cpp",
4120+
"redis/src/storages/redis/impl/cluster_slots_query.hpp":"taxi/uservices/userver/redis/src/storages/redis/impl/cluster_slots_query.hpp",
4121+
"redis/src/storages/redis/impl/cluster_slots_query_test.cpp":"taxi/uservices/userver/redis/src/storages/redis/impl/cluster_slots_query_test.cpp",
41104122
"redis/src/storages/redis/impl/cluster_subscription_storage.cpp":"taxi/uservices/userver/redis/src/storages/redis/impl/cluster_subscription_storage.cpp",
41114123
"redis/src/storages/redis/impl/cluster_subscription_storage.hpp":"taxi/uservices/userver/redis/src/storages/redis/impl/cluster_subscription_storage.hpp",
41124124
"redis/src/storages/redis/impl/cluster_subscription_storage_test.cpp":"taxi/uservices/userver/redis/src/storages/redis/impl/cluster_subscription_storage_test.cpp",
@@ -4212,6 +4224,7 @@
42124224
"redis/src/storages/redis/test/mock_client_google_test.cpp":"taxi/uservices/userver/redis/src/storages/redis/test/mock_client_google_test.cpp",
42134225
"redis/src/storages/redis/test/mock_publish_waiter_test.cpp":"taxi/uservices/userver/redis/src/storages/redis/test/mock_publish_waiter_test.cpp",
42144226
"redis/src/storages/redis/test/subscribe_client_mock_test.cpp":"taxi/uservices/userver/redis/src/storages/redis/test/subscribe_client_mock_test.cpp",
4227+
"redis/src/storages/redis/topology_update_method.cpp":"taxi/uservices/userver/redis/src/storages/redis/topology_update_method.cpp",
42154228
"redis/src/storages/redis/transaction_impl.cpp":"taxi/uservices/userver/redis/src/storages/redis/transaction_impl.cpp",
42164229
"redis/src/storages/redis/transaction_impl.hpp":"taxi/uservices/userver/redis/src/storages/redis/transaction_impl.hpp",
42174230
"redis/src/storages/redis/transaction_redistest.cpp":"taxi/uservices/userver/redis/src/storages/redis/transaction_redistest.cpp",
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
project(userver-redis-tests-cluster-auto-topology CXX)
2+
3+
add_executable(${PROJECT_NAME} "redis_service.cpp")
4+
target_link_libraries(${PROJECT_NAME} userver::redis)
5+
6+
userver_chaos_testsuite_add_redis()
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
#include <userver/testsuite/testsuite_support.hpp>
2+
#include <userver/utest/using_namespace_userver.hpp> // IWYU pragma: keep
3+
4+
#include <iostream>
5+
#include <string>
6+
#include <string_view>
7+
8+
#include <fmt/format.h>
9+
10+
#include <userver/clients/dns/component.hpp>
11+
#include <userver/clients/http/component_list.hpp>
12+
#include <userver/components/component.hpp>
13+
#include <userver/components/minimal_server_component_list.hpp>
14+
#include <userver/dynamic_config/updater/component_list.hpp>
15+
#include <userver/server/handlers/http_handler_base.hpp>
16+
#include <userver/server/handlers/tests_control.hpp>
17+
#include <userver/storages/redis/client.hpp>
18+
#include <userver/storages/redis/component.hpp>
19+
#include <userver/storages/secdist/component.hpp>
20+
#include <userver/storages/secdist/provider_component.hpp>
21+
#include <userver/utils/daemon_run.hpp>
22+
#include <userver/yaml_config/merge_schemas.hpp>
23+
#include "userver/storages/redis/exception.hpp"
24+
25+
namespace chaos {
26+
27+
class KeyValue final : public server::handlers::HttpHandlerBase {
28+
public:
29+
static constexpr std::string_view kName = "handler-redis";
30+
31+
KeyValue(const components::ComponentConfig& config, const components::ComponentContext& context);
32+
33+
std::string HandleRequestThrow(const server::http::HttpRequest& request, server::request::RequestContext&)
34+
const override;
35+
36+
static yaml_config::Schema GetStaticConfigSchema();
37+
38+
private:
39+
std::string GetValue(std::string_view key, const server::http::HttpRequest& request) const;
40+
std::string PostValue(std::string_view key, const server::http::HttpRequest& request) const;
41+
std::string DeleteValue(std::string_view key) const;
42+
43+
storages::redis::ClientPtr redis_client_;
44+
storages::redis::CommandControl redis_cc_;
45+
};
46+
47+
KeyValue::KeyValue(const components::ComponentConfig& config, const components::ComponentContext& context)
48+
: server::handlers::HttpHandlerBase(config, context),
49+
redis_client_{
50+
context.FindComponent<components::Redis>("key-value-database").GetClient(config["db"].As<std::string>())
51+
}
52+
{}
53+
54+
std::string KeyValue::HandleRequestThrow(
55+
const server::http::HttpRequest& request,
56+
server::request::RequestContext& /*context*/
57+
) const {
58+
const auto& key = request.GetArg("key");
59+
if (key.empty()) {
60+
throw server::handlers::ClientError(server::handlers::ExternalBody{"No 'key' query argument"});
61+
}
62+
63+
switch (request.GetMethod()) {
64+
case server::http::HttpMethod::kGet:
65+
return GetValue(key, request);
66+
case server::http::HttpMethod::kPost:
67+
return PostValue(key, request);
68+
case server::http::HttpMethod::kDelete:
69+
return DeleteValue(key);
70+
default:
71+
throw server::handlers::ClientError(server::handlers::ExternalBody{
72+
fmt::format("Unsupported method {}", request.GetMethod())
73+
});
74+
}
75+
}
76+
77+
yaml_config::Schema KeyValue::GetStaticConfigSchema() {
78+
return yaml_config::MergeSchemas<HandlerBase>(R"(
79+
type: object
80+
description: KeyValue handler schema
81+
additionalProperties: false
82+
properties:
83+
db:
84+
type: string
85+
description: redis database name
86+
)");
87+
}
88+
89+
std::string KeyValue::GetValue(std::string_view key, const server::http::HttpRequest& request) const {
90+
try {
91+
const auto result = redis_client_->Get(std::string{key}, redis_cc_).Get();
92+
if (!result) {
93+
request.SetResponseStatus(server::http::HttpStatus::kNotFound);
94+
return {};
95+
}
96+
return *result;
97+
} catch (const storages::redis::RequestFailedException& e) {
98+
LOG_WARNING() << "EXCEPTION GET: " << e.what() << ", " << e.GetStatusString() << "\n";
99+
throw;
100+
}
101+
}
102+
103+
std::string KeyValue::PostValue(std::string_view key, const server::http::HttpRequest& request) const {
104+
try {
105+
const auto& value = request.GetArg("value");
106+
const auto result = redis_client_->SetIfNotExist(std::string{key}, value, redis_cc_).Get();
107+
if (!result) {
108+
request.SetResponseStatus(server::http::HttpStatus::kConflict);
109+
return {};
110+
}
111+
112+
request.SetResponseStatus(server::http::HttpStatus::kCreated);
113+
return std::string{value};
114+
} catch (const storages::redis::RequestFailedException& e) {
115+
LOG_WARNING() << "EXCEPTION POST: " << e.what() << ", " << e.GetStatusString() << "\n";
116+
throw;
117+
}
118+
}
119+
120+
std::string KeyValue::DeleteValue(std::string_view key) const {
121+
const auto result = redis_client_->Del(std::string{key}, redis_cc_).Get();
122+
return std::to_string(result);
123+
}
124+
125+
} // namespace chaos
126+
127+
int main(int argc, char* argv[]) {
128+
const auto component_list =
129+
components::MinimalServerComponentList()
130+
.AppendComponentList(USERVER_NAMESPACE::dynamic_config::updater::ComponentList())
131+
.Append<chaos::KeyValue>("handler-cluster")
132+
.AppendComponentList(clients::http::ComponentList())
133+
.Append<components::Secdist>()
134+
.Append<components::DefaultSecdistProvider>()
135+
.Append<components::Redis>("key-value-database")
136+
.Append<components::TestsuiteSupport>()
137+
.Append<server::handlers::TestsControl>()
138+
.Append<clients::dns::Component>();
139+
return utils::DaemonMain(argc, argv, component_list);
140+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# yaml
2+
components_manager:
3+
components:
4+
handler-cluster:
5+
db: redis-cluster
6+
path: /redis-cluster
7+
task_processor: main-task-processor
8+
method: GET,DELETE,POST
9+
10+
key-value-database:
11+
groups:
12+
- config_name: redis-cluster
13+
db: redis-cluster
14+
sharding_strategy: RedisCluster
15+
topology_update_method: cluster_shards
16+
- config_name: redis-cluster2
17+
db: redis-cluster2
18+
sharding_strategy: RedisCluster
19+
topology_update_method: cluster_shards
20+
subscribe_groups: []
21+
thread_pools:
22+
redis_thread_pool_size: 2
23+
sentinel_thread_pool_size: 1
24+
25+
testsuite-support:
26+
27+
http-client:
28+
http-client-core:
29+
fs-task-processor: main-task-processor
30+
31+
tests-control:
32+
method: POST
33+
path: /tests/{action}
34+
skip-unregistered-testpoints: true
35+
task_processor: main-task-processor
36+
testpoint-timeout: 10s
37+
testpoint-url: $mockserver/testpoint
38+
throttling_enabled: false
39+
40+
secdist: {}
41+
default-secdist-provider:
42+
config: /etc/redis_service/secure_data.json
43+
missing-ok: true
44+
environment-secrets-key: SECDIST_CONFIG
45+
46+
server:
47+
listener:
48+
port: 8187
49+
task_processor: main-task-processor
50+
logging:
51+
fs-task-processor: fs-task-processor
52+
loggers:
53+
default:
54+
file_path: '@stderr'
55+
level: debug
56+
overflow_behavior: discard
57+
58+
dynamic-config:
59+
updates-enabled: true
60+
dynamic-config-client:
61+
config-url: $config-server-url
62+
http-retries: 5
63+
http-timeout: 20s
64+
service-name: testsuite-support
65+
dynamic-config-client-updater:
66+
config-settings: false
67+
first-update-fail-ok: true
68+
full-update-interval: 1m
69+
update-interval: 5s
70+
dns-client:
71+
fs-task-processor: fs-task-processor
72+
73+
task_processors:
74+
main-task-processor:
75+
worker_threads: 4
76+
fs-task-processor:
77+
worker_threads: 4
78+
79+
default_task_processor: main-task-processor
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import json
2+
3+
import pytest
4+
5+
pytest_plugins = [
6+
'pytest_userver.plugins.redis',
7+
'pytest_userver.plugins.dynamic_config',
8+
'taxi.uservices.userver.redis.functional_tests.pytest_redis_cluster_topology_plugin.pytest_plugin',
9+
]
10+
11+
12+
@pytest.fixture(scope='session')
13+
def service_env(redis_cluster_ports, redis_cluster_topology_session):
14+
cluster_hosts = []
15+
cluster_shards = []
16+
for index, port in enumerate(redis_cluster_ports):
17+
cluster_hosts.append({'host': '127.0.0.1', 'port': port})
18+
for index in range(3):
19+
cluster_shards.append({'name': f'shard{index}'})
20+
21+
secdist_config = {
22+
'redis_settings': {
23+
'redis-cluster': {
24+
'password': '',
25+
'sentinels': cluster_hosts,
26+
'shards': cluster_shards,
27+
},
28+
'redis-cluster2': {
29+
'password': '',
30+
'sentinels': cluster_hosts,
31+
'shards': cluster_shards,
32+
},
33+
},
34+
}
35+
36+
return {'SECDIST_CONFIG': json.dumps(secdist_config)}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import asyncio
2+
3+
import pytest
4+
import pytest_userver.utils.sync as sync
5+
6+
KEYS_SEQ_LEN = 20 # enough sequential keys to test all slots
7+
REDIS_PORT = 6379
8+
FAILOVER_DEADLINE_SEC = 30 # maximum time allowed to finish failover
9+
10+
11+
async def test_happy_path(service_client, redis_cluster_topology):
12+
post_reqs = [
13+
service_client.post(
14+
'/redis-cluster',
15+
params={'key': f'key{i}', 'value': 'abc'},
16+
)
17+
for i in range(KEYS_SEQ_LEN)
18+
]
19+
assert all(res.status == 201 for res in await asyncio.gather(*post_reqs))
20+
21+
get_reqs = [service_client.get('/redis-cluster', params={'key': f'key{i}'}) for i in range(KEYS_SEQ_LEN)]
22+
assert all(res.status == 200 and res.text == 'abc' for res in await asyncio.gather(*get_reqs))
23+
24+
25+
async def _check_write_all_slots(service_client, key_prefix, value):
26+
post_reqs = [
27+
service_client.post(
28+
'/redis-cluster',
29+
params={'key': f'{key_prefix}{i}', 'value': value},
30+
)
31+
for i in range(KEYS_SEQ_LEN)
32+
]
33+
return all(res.status != 500 for res in await asyncio.gather(*post_reqs))
34+
35+
36+
async def _assert_read_all_slots(service_client, key_prefix, value):
37+
get_reqs = [
38+
service_client.get(
39+
'/redis-cluster',
40+
params={'key': f'{key_prefix}{i}'},
41+
)
42+
for i in range(KEYS_SEQ_LEN)
43+
]
44+
assert all(res.status == 200 and res.text == value for res in await asyncio.gather(*get_reqs))
45+
46+
47+
async def test_hard_failover(service_client, redis_cluster_topology):
48+
# Write enough different keys to have something in every slot
49+
assert await _check_write_all_slots(service_client, 'hf_key1', 'abc')
50+
51+
# Start the failover
52+
redis_cluster_topology.get_masters()[0].stop()
53+
54+
# wait until service detect that shard 0 is broken
55+
# Failover starts in ~10 seconds
56+
async def is_ready():
57+
return await _check_write_all_slots(
58+
service_client,
59+
'hf_key2',
60+
'cde',
61+
)
62+
63+
await sync.wait(is_ready)
64+
65+
# Now that one of the replicas has become the master,
66+
# check reading from the remaining replica
67+
await _assert_read_all_slots(service_client, 'hf_key1', 'abc')
68+
await _assert_read_all_slots(service_client, 'hf_key2', 'cde')
69+
70+
71+
@pytest.mark.skip(reason='Flaky TAXICOMMON-11677')
72+
async def test_add_shard(service_client, redis_cluster_topology):
73+
# Write enough different keys to have something in every slot
74+
assert await _check_write_all_slots(service_client, 'hf_key1', 'abc')
75+
76+
await redis_cluster_topology.add_shard()
77+
78+
# Failover starts in ~10 seconds
79+
async def is_ready():
80+
return await _check_write_all_slots(
81+
service_client,
82+
'hf_key2',
83+
'cde',
84+
)
85+
86+
await sync.wait(is_ready)
87+
88+
# Now that one of the replicas has become the master,
89+
# check reading from the remaining replica
90+
await _assert_read_all_slots(service_client, 'hf_key1', 'abc')
91+
await _assert_read_all_slots(service_client, 'hf_key2', 'cde')

0 commit comments

Comments
 (0)