Skip to content

Commit 93b53d8

Browse files
author
ivan-skryabin
committed
feat grpc: client stream call DP handling
commit_hash:cd3d22cb016629780658d63c1f50da10860d8d9e
1 parent e73a3eb commit 93b53d8

7 files changed

Lines changed: 328 additions & 68 deletions

File tree

.mapping.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2494,6 +2494,7 @@
24942494
"grpc/include/userver/ugrpc/client/impl/codegen_definitions.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/codegen_definitions.hpp",
24952495
"grpc/include/userver/ugrpc/client/impl/compat/channel_arguments_builder.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/compat/channel_arguments_builder.hpp",
24962496
"grpc/include/userver/ugrpc/client/impl/completion_queue_pool.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/completion_queue_pool.hpp",
2497+
"grpc/include/userver/ugrpc/client/impl/deadline_propagation_detect.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/deadline_propagation_detect.hpp",
24972498
"grpc/include/userver/ugrpc/client/impl/fwd.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/fwd.hpp",
24982499
"grpc/include/userver/ugrpc/client/impl/graceful_stream_finish.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/graceful_stream_finish.hpp",
24992500
"grpc/include/userver/ugrpc/client/impl/method_stubs.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/method_stubs.hpp",
@@ -2765,6 +2766,7 @@
27652766
"grpc/tests/channels_test.cpp":"taxi/uservices/userver/grpc/tests/channels_test.cpp",
27662767
"grpc/tests/client_cancel_test.cpp":"taxi/uservices/userver/grpc/tests/client_cancel_test.cpp",
27672768
"grpc/tests/client_factory_test.cpp":"taxi/uservices/userver/grpc/tests/client_factory_test.cpp",
2769+
"grpc/tests/client_middleware_hooks_dp_test.cpp":"taxi/uservices/userver/grpc/tests/client_middleware_hooks_dp_test.cpp",
27682770
"grpc/tests/client_middleware_hooks_test.cpp":"taxi/uservices/userver/grpc/tests/client_middleware_hooks_test.cpp",
27692771
"grpc/tests/client_qos_test.cpp":"taxi/uservices/userver/grpc/tests/client_qos_test.cpp",
27702772
"grpc/tests/client_qos_validation_test.cpp":"taxi/uservices/userver/grpc/tests/client_qos_validation_test.cpp",

grpc/include/userver/ugrpc/client/impl/async_stream_methods.hpp

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <userver/ugrpc/client/impl/async_method_invocation.hpp>
1111
#include <userver/ugrpc/client/impl/async_methods.hpp>
1212
#include <userver/ugrpc/client/impl/call_state.hpp>
13+
#include <userver/ugrpc/client/impl/deadline_propagation_detect.hpp>
1314
#include <userver/ugrpc/impl/status_utils.hpp>
1415
#include <userver/ugrpc/time_utils.hpp>
1516

@@ -30,15 +31,18 @@ void CheckOk(
3031

3132
void ProcessFinish(
3233
StreamingCallState& state,
33-
const CompletionStatus& completion_status,
34+
bool throw_on_error,
35+
CompletionStatus&& completion_status,
3436
const google::protobuf::Message* final_response
3537
);
3638

37-
void ProcessFinishAbandoned(StreamingCallState& state, const grpc::Status& status) noexcept;
39+
void ProcessTimeoutDeadlinePropagated(StreamingCallState& state, bool throw_on_error, std::string_view stage);
40+
41+
void ProcessCancelled(StreamingCallState& state, bool throw_on_error, std::string_view stage);
3842

39-
void ProcessCancelled(StreamingCallState& state, std::string_view stage) noexcept;
43+
void ProcessNetworkError(StreamingCallState& state, bool throw_on_error, std::string_view stage);
4044

41-
void ProcessNetworkError(StreamingCallState& state, std::string_view stage) noexcept;
45+
void ProcessFinishAbandoned(StreamingCallState& state, const grpc::Status& status) noexcept;
4246

4347
void ThrowIfDeadlineIsExceeded(grpc::ClientContext& client_context, std::string_view call_name);
4448

@@ -71,39 +75,28 @@ void Finish(
7175
case ugrpc::impl::AsyncMethodInvocation::WaitStatus::kOk: {
7276
auto& status = completion_status.value();
7377
state.GetStatsScope().SetFinishTime(invocation.GetNotifyTime());
74-
try {
75-
ugrpc::impl::ClampStatusCodeToValidRange(status);
76-
ProcessFinish(state, completion_status, final_response);
77-
} catch (const std::exception& ex) {
78-
if (throw_on_error) {
79-
throw;
80-
} else {
81-
LOG_WARNING() << "There is a caught exception in 'impl::Finish': " << ex;
82-
}
83-
}
84-
if (throw_on_error) {
85-
if (!status.ok()) {
86-
ThrowErrorWithStatus(state.GetCallName(), std::move(status));
87-
}
78+
79+
ugrpc::impl::ClampStatusCodeToValidRange(status);
80+
if (impl::IsRequestCancelledByDeadlinePropagation(status, state)) {
81+
ProcessTimeoutDeadlinePropagated(state, throw_on_error, "Finish");
82+
} else {
83+
ProcessFinish(state, throw_on_error, std::move(completion_status), final_response);
8884
}
8985
return;
9086
}
9187
case ugrpc::impl::AsyncMethodInvocation::WaitStatus::kError:
9288
state.GetStatsScope().SetFinishTime(invocation.GetNotifyTime());
93-
ProcessNetworkError(state, "Finish");
94-
if (throw_on_error) {
95-
ThrowIfDeadlineIsExceeded(state.GetClientContext(), state.GetCallName());
96-
throw RpcInterruptedError(state.GetCallName(), "Finish");
97-
}
89+
ProcessNetworkError(state, throw_on_error, "Finish");
9890
return;
9991

10092
case ugrpc::impl::AsyncMethodInvocation::WaitStatus::kCancelled:
10193
// NOTE: `completion_status` couldn't be safely used here
102-
ProcessCancelled(state, "Finish");
103-
// Finish AsyncMethodInvocation will be awaited in its destructor.
104-
if (throw_on_error) {
105-
throw RpcCancelledError(state.GetCallName(), "Finish");
94+
if (impl::IsTaskCancelledByDeadlinePropagation()) {
95+
ProcessTimeoutDeadlinePropagated(state, true, "Finish");
96+
} else {
97+
ProcessCancelled(state, true, "Finish");
10698
}
99+
// Finish AsyncMethodInvocation will be awaited in its destructor.
107100
return;
108101

109102
case ugrpc::impl::AsyncMethodInvocation::WaitStatus::kDeadline:
@@ -133,7 +126,7 @@ void FinishAbandoned(GrpcStream& stream, StreamingCallState& state) noexcept try
133126
ugrpc::impl::ClampStatusCodeToValidRange(status);
134127
ProcessFinishAbandoned(state, status);
135128
} else {
136-
ProcessNetworkError(state, "Finish");
129+
ProcessNetworkError(state, false, "Finish");
137130
}
138131
} catch (const std::exception& ex) {
139132
LOG_WARNING() << "There is a caught exception in 'FinishAbandoned': " << ex;
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#pragma once
2+
3+
#include <userver/server/request/task_inherited_data.hpp>
4+
#include <userver/ugrpc/client/impl/call_state.hpp>
5+
6+
USERVER_NAMESPACE_BEGIN
7+
8+
namespace ugrpc::client::impl {
9+
10+
inline bool IsTaskCancelledByDeadlinePropagation() noexcept {
11+
UASSERT(engine::current_task::ShouldCancel());
12+
return USERVER_NAMESPACE::server::request::GetTaskInheritedDeadline().IsReached();
13+
}
14+
15+
inline bool IsRequestCancelledByDeadlinePropagation(const grpc::Status& status, const CallState& state) noexcept {
16+
return grpc::StatusCode::DEADLINE_EXCEEDED == status.error_code() && state.IsDeadlinePropagated();
17+
}
18+
19+
} // namespace ugrpc::client::impl
20+
21+
USERVER_NAMESPACE_END

grpc/include/userver/ugrpc/client/impl/unary_call.hpp

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <userver/ugrpc/client/impl/async_methods.hpp>
1818
#include <userver/ugrpc/client/impl/call_params.hpp>
1919
#include <userver/ugrpc/client/impl/call_state.hpp>
20+
#include <userver/ugrpc/client/impl/deadline_propagation_detect.hpp>
2021
#include <userver/ugrpc/client/impl/middleware_hooks.hpp>
2122
#include <userver/ugrpc/client/impl/prepare_async_call.hpp>
2223
#include <userver/ugrpc/client/impl/retry_backoff.hpp>
@@ -39,15 +40,6 @@ inline bool AccountCompletion(RetryLimiter& retry_limiter, const CompletionStatu
3940
return retry_limiter.CanRetry();
4041
}
4142

42-
inline bool IsTaskCancelledByDeadlinePropagation() noexcept {
43-
UASSERT(engine::current_task::ShouldCancel());
44-
return USERVER_NAMESPACE::server::request::GetTaskInheritedDeadline().IsReached();
45-
}
46-
47-
inline bool IsRequestCancelledByDeadlinePropagation(const grpc::Status& status, const CallState& state) noexcept {
48-
return grpc::StatusCode::DEADLINE_EXCEEDED == status.error_code() && state.IsDeadlinePropagated();
49-
}
50-
5143
void AccountStatistics(ugrpc::impl::RpcStatisticsScope& stats, CompletionStatus completion_status) noexcept;
5244

5345
void SetStatusForSpan(tracing::Span& span, CompletionStatus completion_status) noexcept;

grpc/src/ugrpc/client/impl/async_stream_methods.cpp

Lines changed: 63 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,15 @@ void SetErrorAndResetSpan(CallState& state, std::string_view error_message) noex
2727
state.ResetSpan();
2828
}
2929

30-
void HandleCallStatistics(CallState& state, const grpc::Status& status) noexcept {
31-
auto& stats = state.GetStatsScope();
32-
if (grpc::StatusCode::DEADLINE_EXCEEDED == status.error_code() && state.IsDeadlinePropagated()) {
33-
stats.OnCancelledByDeadlinePropagation();
34-
} else {
35-
stats.OnExplicitFinish(status.error_code());
30+
void TryRunMiddlewarePipeline(CallState& state, bool throw_on_error, const MiddlewareHooks& hooks) {
31+
try {
32+
RunMiddlewarePipeline(state, hooks);
33+
} catch (std::exception& ex) {
34+
LOG_WARNING() << "There is a caught exception in 'Finish': " << ex;
35+
if (throw_on_error) {
36+
throw;
37+
}
3638
}
37-
stats.Flush();
3839
}
3940

4041
} // namespace
@@ -66,55 +67,92 @@ void CheckOk(
6667
) {
6768
if (wait_status == ugrpc::impl::AsyncMethodInvocation::WaitStatus::kError) {
6869
state.SetFinished();
69-
ThrowIfDeadlineIsExceeded(state.GetClientContext(), state.GetCallName());
70-
ProcessNetworkError(state, stage);
71-
throw RpcInterruptedError(state.GetCallName(), stage);
70+
ProcessNetworkError(state, true, stage);
7271
} else if (wait_status == ugrpc::impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
7372
state.SetFinished();
74-
ProcessCancelled(state, stage);
75-
throw RpcCancelledError(state.GetCallName(), stage);
73+
if (impl::IsTaskCancelledByDeadlinePropagation()) {
74+
ProcessTimeoutDeadlinePropagated(state, true, stage);
75+
} else {
76+
ProcessCancelled(state, true, stage);
77+
}
7678
}
7779
}
7880

7981
void ProcessFinish(
8082
StreamingCallState& state,
81-
const CompletionStatus& completion_status,
83+
bool throw_on_error,
84+
CompletionStatus&& completion_status,
8285
const google::protobuf::Message* response
8386
) {
8487
UINVARIANT(completion_status.has_value(), "ProcessFinish must be called only with grpc::Status completions");
85-
const auto& status = completion_status.value();
88+
auto& status = completion_status.value();
89+
90+
TryRunMiddlewarePipeline(
91+
state,
92+
throw_on_error,
93+
MiddlewareHooks::FinishHooks(status, status.ok() ? response : nullptr)
94+
);
8695

87-
RunMiddlewarePipeline(state, MiddlewareHooks::FinishHooks(status, status.ok() ? response : nullptr));
88-
impl::HandleCallStatistics(state, status);
96+
state.GetStatsScope().OnExplicitFinish(status.error_code());
97+
state.GetStatsScope().Flush();
8998
SetStatusAndResetSpan(state, status);
99+
100+
if (throw_on_error && !status.ok()) {
101+
ThrowErrorWithStatus(state.GetCallName(), std::move(status));
102+
}
90103
}
91104

92-
void ProcessFinishAbandoned(StreamingCallState& state, const grpc::Status& status) noexcept {
93-
RunMiddlewarePipeline(
105+
void ProcessTimeoutDeadlinePropagated(StreamingCallState& state, bool throw_on_error, std::string_view stage) {
106+
TryRunMiddlewarePipeline(
94107
state,
95-
MiddlewareHooks::FinishHooks(utils::unexpected{SpecialCaseCompletionType::kAbandoned}, nullptr)
108+
throw_on_error,
109+
MiddlewareHooks::FinishHooks(utils::unexpected{SpecialCaseCompletionType::kTimeoutDeadlinePropagated}, nullptr)
96110
);
97111

98-
// Nothing to do with statistics, `RpcStatisticsScope` automatically accounts "abandoned-error"
99-
SetStatusAndResetSpan(state, status);
112+
state.GetStatsScope().OnCancelledByDeadlinePropagation();
113+
state.GetStatsScope().Flush();
114+
SetErrorAndResetSpan(state, fmt::format("Deadline propagated at '{}'", stage));
115+
116+
if (throw_on_error) {
117+
throw RpcCancelledError(state.GetCallName(), fmt::format("{} (Deadline Propagation)", stage));
118+
}
100119
}
101120

102-
void ProcessCancelled(StreamingCallState& state, std::string_view stage) noexcept {
121+
void ProcessCancelled(StreamingCallState& state, bool throw_on_error, std::string_view stage) {
103122
CompletionStatus result{utils::unexpected{SpecialCaseCompletionType::kCancelled}};
104-
RunMiddlewarePipeline(state, MiddlewareHooks::FinishHooks(result, nullptr));
123+
TryRunMiddlewarePipeline(state, throw_on_error, MiddlewareHooks::FinishHooks(result, nullptr));
105124

106125
state.GetStatsScope().OnCancelled();
107126
state.GetStatsScope().Flush();
108127
SetErrorAndResetSpan(state, fmt::format("Task cancellation at '{}'", stage));
128+
129+
if (throw_on_error) {
130+
throw RpcCancelledError(state.GetCallName(), stage);
131+
}
109132
}
110133

111-
void ProcessNetworkError(StreamingCallState& state, std::string_view stage) noexcept {
134+
void ProcessNetworkError(StreamingCallState& state, bool throw_on_error, std::string_view stage) {
112135
CompletionStatus result{utils::unexpected{SpecialCaseCompletionType::kNetworkError}};
113-
RunMiddlewarePipeline(state, MiddlewareHooks::FinishHooks(result, nullptr));
136+
TryRunMiddlewarePipeline(state, throw_on_error, MiddlewareHooks::FinishHooks(result, nullptr));
114137

115138
state.GetStatsScope().OnNetworkError();
116139
state.GetStatsScope().Flush();
117140
SetErrorAndResetSpan(state, fmt::format("Network error at '{}'", stage));
141+
142+
if (throw_on_error) {
143+
throw RpcInterruptedError(state.GetCallName(), stage);
144+
}
145+
}
146+
147+
void ProcessFinishAbandoned(StreamingCallState& state, const grpc::Status& status) noexcept {
148+
TryRunMiddlewarePipeline(
149+
state,
150+
false,
151+
MiddlewareHooks::FinishHooks(utils::unexpected{SpecialCaseCompletionType::kAbandoned}, nullptr)
152+
);
153+
154+
// Nothing to do with statistics, `RpcStatisticsScope` automatically accounts "abandoned-error"
155+
SetStatusAndResetSpan(state, status);
118156
}
119157

120158
} // namespace ugrpc::client::impl

0 commit comments

Comments
 (0)