Skip to content

Commit 15197d1

Browse files
committed
feat grpc: refactor StreamReadFuture, add Reader type erasure
Relates:(https://nda.ya.ru/t/Fbv9fMDT7R3izG commit_hash:3e77b684dfdae7769dd1af4a5440d53cd14414cf
1 parent fd86b35 commit 15197d1

7 files changed

Lines changed: 130 additions & 262 deletions

File tree

.mapping.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2354,6 +2354,7 @@
23542354
"grpc/include/userver/ugrpc/client/impl/retry_backoff.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/retry_backoff.hpp",
23552355
"grpc/include/userver/ugrpc/client/impl/retry_policy.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/retry_policy.hpp",
23562356
"grpc/include/userver/ugrpc/client/impl/rpc.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/rpc.hpp",
2357+
"grpc/include/userver/ugrpc/client/impl/stream_read_future_impl_base.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/stream_read_future_impl_base.hpp",
23572358
"grpc/include/userver/ugrpc/client/impl/stub_any.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/stub_any.hpp",
23582359
"grpc/include/userver/ugrpc/client/impl/stub_handle.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/stub_handle.hpp",
23592360
"grpc/include/userver/ugrpc/client/impl/stub_pool.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/stub_pool.hpp",

grpc/include/userver/ugrpc/client/impl/rpc.hpp

Lines changed: 81 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
#pragma once
22

3-
/// @file userver/ugrpc/client/impl/rpc.hpp
4-
/// @brief Classes representing an outgoing RPC
5-
63
#include <utility>
74

85
#include <grpcpp/impl/codegen/proto_utils.h>
@@ -23,17 +20,29 @@ USERVER_NAMESPACE_BEGIN
2320

2421
namespace ugrpc::client::impl {
2522

26-
/// @brief Controls a single request -> response stream RPC
27-
///
28-
/// This class is not thread-safe except for `GetContext`.
29-
///
30-
/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
31-
/// returned `false`). In that case the connection is not closed (it will be
32-
/// reused for new RPCs), and the server receives `RpcInterruptedError`
33-
/// immediately. gRPC provides no way to early-close a server-streaming RPC
34-
/// gracefully.
23+
template <typename Reader, typename Response>
24+
class StreamReadFutureImpl : public StreamReadFutureImplBase<Response> {
25+
public:
26+
StreamReadFutureImpl(impl::StreamingCallState& state, Reader& reader, const Response& response) noexcept;
27+
28+
StreamReadFutureImpl(StreamReadFutureImpl&&) = delete;
29+
StreamReadFutureImpl& operator=(StreamReadFutureImpl&&) = delete;
30+
31+
~StreamReadFutureImpl() override;
32+
33+
bool IsReady() const noexcept override;
34+
35+
bool Get() override;
36+
37+
private:
38+
impl::StreamingCallState& state_;
39+
Reader& reader_;
40+
const Response& response_{};
41+
bool valid_{};
42+
};
43+
3544
template <typename Response>
36-
class [[nodiscard]] InputStream final {
45+
class InputStream final {
3746
public:
3847
template <typename Stub, typename Request>
3948
InputStream(
@@ -42,22 +51,14 @@ class [[nodiscard]] InputStream final {
4251
const Request& request
4352
);
4453

45-
~InputStream();
46-
4754
InputStream(InputStream&&) = delete;
4855
InputStream& operator=(InputStream&&) = delete;
4956

57+
~InputStream();
58+
5059
CallContext& GetContext() noexcept { return context_; }
5160
const CallContext& GetContext() const noexcept { return context_; }
5261

53-
/// @brief Await and read the next incoming message
54-
///
55-
/// On end-of-input, `Finish` is called automatically.
56-
///
57-
/// @param response where to put response on success
58-
/// @returns `true` on success, `false` on end-of-input, task cancellation,
59-
// or if the stream is already closed for reads
60-
/// @throws ugrpc::client::RpcError on an RPC error
6162
[[nodiscard]] bool Read(Response& response);
6263

6364
private:
@@ -66,66 +67,24 @@ class [[nodiscard]] InputStream final {
6667
std::unique_ptr<grpc::ClientAsyncReader<Response>> stream_;
6768
};
6869

69-
/// @brief Controls a request stream -> single response RPC
70-
///
71-
/// This class is not thread-safe except for `GetContext`.
72-
///
73-
/// The RPC is cancelled on destruction unless `Finish` has been called. In that
74-
/// case the connection is not closed (it will be reused for new RPCs), and the
75-
/// server receives `RpcInterruptedError` immediately.
7670
template <typename Request, typename Response>
77-
class [[nodiscard]] OutputStream final {
71+
class OutputStream final {
7872
public:
7973
template <typename Stub>
8074
OutputStream(CallParams&& params, PrepareClientStreamingCall<Stub, Request, Response> prepare_async_method);
8175

82-
~OutputStream();
83-
8476
OutputStream(OutputStream&&) = delete;
8577
OutputStream& operator=(OutputStream&&) = delete;
8678

79+
~OutputStream();
80+
8781
CallContext& GetContext() noexcept { return context_; }
8882
const CallContext& GetContext() const noexcept { return context_; }
8983

90-
/// @brief Write the next outgoing message
91-
///
92-
/// `Write` doesn't store any references to `request`, so it can be
93-
/// deallocated right after the call.
94-
///
95-
/// @param request the next message to write
96-
/// @return true if the data is going to the wire; false if the write
97-
/// operation failed (including due to task cancellation,
98-
// or if the stream is already closed for writes),
99-
/// in which case no more writes will be accepted,
100-
/// and the error details can be fetched from Finish
10184
[[nodiscard]] bool Write(const Request& request);
10285

103-
/// @brief Write the next outgoing message and check result
104-
///
105-
/// `WriteAndCheck` doesn't store any references to `request`, so it can be
106-
/// deallocated right after the call.
107-
///
108-
/// `WriteAndCheck` verifies result of the write and generates exception
109-
/// in case of issues.
110-
///
111-
/// @param request the next message to write
112-
/// @throws ugrpc::client::RpcError on an RPC error
113-
/// @throws ugrpc::client::RpcCancelledError on task cancellation
114-
/// @throws ugrpc::client::RpcError if the stream is already closed for writes
11586
void WriteAndCheck(const Request& request);
11687

117-
/// @brief Complete the RPC successfully
118-
///
119-
/// Should be called once all the data is written. The server will then
120-
/// send a single `Response`.
121-
///
122-
/// `Finish` should not be called multiple times.
123-
///
124-
/// The connection is not closed, it will be reused for new RPCs.
125-
///
126-
/// @returns the single `Response` received after finishing the writes
127-
/// @throws ugrpc::client::RpcError on an RPC error
128-
/// @throws ugrpc::client::RpcCancelledError on task cancellation
12988
Response Finish();
13089

13190
private:
@@ -135,110 +94,28 @@ class [[nodiscard]] OutputStream final {
13594
std::unique_ptr<grpc::ClientAsyncWriter<Request>> stream_;
13695
};
13796

138-
/// @brief Controls a request stream -> response stream RPC
139-
///
140-
/// It is safe to call the following methods from different coroutines:
141-
///
142-
/// - `GetContext`;
143-
/// - one of (`Read`, `ReadAsync`);
144-
/// - one of (`Write`, `WritesDone`).
145-
///
146-
/// `WriteAndCheck` is NOT thread-safe.
147-
///
148-
/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
149-
/// returned `false`). In that case the connection is not closed (it will be
150-
/// reused for new RPCs), and the server receives `RpcInterruptedError`
151-
/// immediately. gRPC provides no way to early-close a server-streaming RPC
152-
/// gracefully.
153-
///
154-
/// `Read` and `AsyncRead` can throw if error status is received from server.
155-
/// User MUST NOT call `Read` or `AsyncRead` again after failure of any of these
156-
/// operations.
157-
///
158-
/// `Write` and `WritesDone` methods do not throw, but indicate issues with
159-
/// the RPC by returning `false`.
160-
///
161-
/// `WriteAndCheck` is intended for ping-pong scenarios, when after write
162-
/// operation the user calls `Read` and vice versa.
163-
///
164-
/// If `Write` or `WritesDone` returns negative result, the user MUST NOT call
165-
/// any of these methods anymore.
166-
/// Instead the user SHOULD call `Read` method until the end of input. If
167-
/// `Write` or `WritesDone` finishes with negative result, finally `Read`
168-
/// will throw an exception.
169-
/// ## Usage example:
170-
///
171-
/// @snippet grpc/tests/stream_test.cpp concurrent bidirectional stream
172-
///
17397
template <typename Request, typename Response>
174-
class [[nodiscard]] BidirectionalStream final {
98+
class BidirectionalStream final {
17599
public:
176-
using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
177-
using StreamReadFuture = ugrpc::client::StreamReadFuture<RawStream>;
178-
179100
template <typename Stub>
180101
BidirectionalStream(CallParams&& params, PrepareBidiStreamingCall<Stub, Request, Response> prepare_async_method);
181102

182-
~BidirectionalStream();
183-
184103
BidirectionalStream(BidirectionalStream&&) = delete;
185104
BidirectionalStream& operator=(BidirectionalStream&&) = delete;
186105

106+
~BidirectionalStream();
107+
187108
CallContext& GetContext() noexcept { return context_; }
188109
const CallContext& GetContext() const noexcept { return context_; }
189110

190-
/// @brief Await and read the next incoming message
191-
///
192-
/// On end-of-input, `Finish` is called automatically.
193-
///
194-
/// @param response where to put response on success
195-
/// @returns `true` on success, `false` on end-of-input, task cancellation,
196-
/// or if the stream is already closed for reads
197-
/// @throws ugrpc::client::RpcError on an RPC error
198111
[[nodiscard]] bool Read(Response& response);
199112

200-
/// @brief Return future to read next incoming result
201-
///
202-
/// @param response where to put response on success
203-
/// @return StreamReadFuture future
204-
/// @throws ugrpc::client::RpcError on an RPC error
205-
/// @throws ugrpc::client::RpcError if the stream is already closed for reads
206-
StreamReadFuture ReadAsync(Response& response);
207-
208-
/// @brief Write the next outgoing message
209-
///
210-
/// RPC will be performed immediately. No references to `request` are
211-
/// saved, so it can be deallocated right after the call.
212-
///
213-
/// @param request the next message to write
214-
/// @return true if the data is going to the wire; false if the write
215-
/// operation failed (including due to task cancellation,
216-
// or if the stream is already closed for writes),
217-
/// in which case no more writes will be accepted,
218-
/// but Read may still have some data and status code available
113+
StreamReadFuture<Response> ReadAsync(Response& response);
114+
219115
[[nodiscard]] bool Write(const Request& request);
220116

221-
/// @brief Write the next outgoing message and check result
222-
///
223-
/// `WriteAndCheck` doesn't store any references to `request`, so it can be
224-
/// deallocated right after the call.
225-
///
226-
/// `WriteAndCheck` verifies result of the write and generates exception
227-
/// in case of issues.
228-
///
229-
/// @param request the next message to write
230-
/// @throws ugrpc::client::RpcError on an RPC error
231-
/// @throws ugrpc::client::RpcCancelledError on task cancellation
232-
/// @throws ugrpc::client::RpcError if the stream is already closed for writes
233117
void WriteAndCheck(const Request& request);
234118

235-
/// @brief Announce end-of-output to the server
236-
///
237-
/// Should be called to notify the server and receive the final response(s).
238-
///
239-
/// @return true if the data is going to the wire; false if the operation
240-
/// failed (including if the stream is already closed for writes),
241-
/// but Read may still have some data and status code available
242119
[[nodiscard]] bool WritesDone();
243120

244121
private:
@@ -247,6 +124,50 @@ class [[nodiscard]] BidirectionalStream final {
247124
std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>> stream_;
248125
};
249126

127+
template <typename Reader, typename Response>
128+
StreamReadFutureImpl<Reader, Response>::StreamReadFutureImpl(
129+
impl::StreamingCallState& state,
130+
Reader& reader,
131+
const Response& response
132+
) noexcept
133+
: state_{state}, reader_{reader}, response_{response}, valid_{true} {}
134+
135+
template <typename Reader, typename Response>
136+
StreamReadFutureImpl<Reader, Response>::~StreamReadFutureImpl() {
137+
if (valid_) {
138+
// StreamReadFuture::Get wasn't called => finish RPC.
139+
impl::FinishAbandoned(reader_, state_);
140+
}
141+
}
142+
143+
template <typename Reader, typename Response>
144+
bool StreamReadFutureImpl<Reader, Response>::IsReady() const noexcept {
145+
UINVARIANT(valid_, "IsReady should be called only before 'Get'");
146+
return state_.GetAsyncMethodInvocation().IsReady();
147+
}
148+
149+
template <typename Reader, typename Response>
150+
bool StreamReadFutureImpl<Reader, Response>::Get() {
151+
UINVARIANT(valid_, "'Get' must be called only once");
152+
valid_ = false;
153+
const impl::StreamingCallState::AsyncMethodInvocationGuard guard(state_);
154+
const auto
155+
wait_status = impl::WaitAndTryCancelIfNeeded(state_.GetAsyncMethodInvocation(), state_.GetClientContext());
156+
if (wait_status == ugrpc::impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
157+
state_.GetStatsScope().OnCancelled();
158+
state_.GetStatsScope().Flush();
159+
} else if (wait_status == ugrpc::impl::AsyncMethodInvocation::WaitStatus::kError) {
160+
// Finish can only be called once all the data is read, otherwise the
161+
// underlying gRPC driver hangs.
162+
impl::Finish(reader_, state_, /*final_response=*/nullptr, /*throw_on_error=*/true);
163+
} else {
164+
if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
165+
RunMiddlewarePipeline(state_, impl::MiddlewareHooks::RecvMessageHooks(response_));
166+
}
167+
}
168+
return wait_status == ugrpc::impl::AsyncMethodInvocation::WaitStatus::kOk;
169+
}
170+
250171
template <typename Response>
251172
template <typename Stub, typename Request>
252173
InputStream<Response>::InputStream(
@@ -394,17 +315,18 @@ BidirectionalStream<Request, Response>::~BidirectionalStream() {
394315
}
395316

396317
template <typename Request, typename Response>
397-
typename BidirectionalStream<Request, Response>::StreamReadFuture BidirectionalStream<
398-
Request,
399-
Response>::ReadAsync(Response& response) {
318+
StreamReadFuture<Response> BidirectionalStream<Request, Response>::ReadAsync(Response& response) {
400319
if (!IsReadAvailable(state_)) {
401320
// If the stream is already finished, we must exit immediately.
402321
// If not, even the middlewares may access something that is already dead.
403322
throw RpcError(state_.GetCallName(), "'ReadAsync' called on a finished call");
404323
}
405324

406325
impl::ReadAsync(*stream_, response, state_);
407-
return StreamReadFuture{state_, *stream_, ToBaseMessage(&response)};
326+
using BidirectionalStreamReadFutureImpl = StreamReadFutureImpl<grpc::ClientAsyncReaderWriter<Request, Response>, Response>;
327+
return StreamReadFuture<Response>{
328+
std::make_unique<BidirectionalStreamReadFutureImpl>(state_, *stream_, response)
329+
};
408330
}
409331

410332
template <typename Request, typename Response>
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#pragma once
2+
3+
USERVER_NAMESPACE_BEGIN
4+
5+
namespace ugrpc::client::impl {
6+
7+
template <typename Response>
8+
class StreamReadFutureImplBase {
9+
public:
10+
virtual ~StreamReadFutureImplBase() = default;
11+
12+
virtual bool IsReady() const = 0;
13+
14+
virtual bool Get() = 0;
15+
};
16+
17+
} // namespace ugrpc::client::impl
18+
19+
USERVER_NAMESPACE_END

grpc/include/userver/ugrpc/client/stream.hpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
/// @brief Client streaming interfaces
55

66
#include <memory>
7+
#include <utility>
78

89
#include <userver/ugrpc/client/impl/rpc.hpp>
10+
#include <userver/ugrpc/client/stream_read_future.hpp>
911

1012
USERVER_NAMESPACE_BEGIN
1113

@@ -168,9 +170,6 @@ class [[nodiscard]] Writer final {
168170
template <typename Request, typename Response>
169171
class [[nodiscard]] ReaderWriter final {
170172
public:
171-
using StreamReadFuture = ugrpc::client::StreamReadFuture<
172-
typename impl::BidirectionalStream<Request, Response>::RawStream>;
173-
174173
/// @cond
175174
// For internal use only
176175
template <typename Stub>
@@ -202,7 +201,7 @@ class [[nodiscard]] ReaderWriter final {
202201
/// @return StreamReadFuture future
203202
/// @throws ugrpc::client::RpcError on an RPC error
204203
/// @throws ugrpc::client::RpcError if the stream is already closed for reads
205-
StreamReadFuture ReadAsync(Response& response) { return stream_->ReadAsync(response); }
204+
StreamReadFuture<Response> ReadAsync(Response& response) { return stream_->ReadAsync(response); }
206205

207206
/// @brief Write the next outgoing message
208207
///

0 commit comments

Comments
 (0)