From 1a1f17204e4bcc5137db13217ee69b33745ec430 Mon Sep 17 00:00:00 2001 From: Vasily Sviridov Date: Tue, 21 Apr 2026 06:47:10 +0300 Subject: [PATCH] feat ydb: add TopicWriteSession support --- ydb/include/userver/ydb/topic.hpp | 47 ++++++++++++ ydb/src/ydb/topic.cpp | 33 ++++++-- ydb/tests/topic_test.cpp | 123 ++++++++++++++++++++++++++++++ 3 files changed, 198 insertions(+), 5 deletions(-) diff --git a/ydb/include/userver/ydb/topic.hpp b/ydb/include/userver/ydb/topic.hpp index 237e290e2c4a..76b607745861 100644 --- a/ydb/include/userver/ydb/topic.hpp +++ b/ydb/include/userver/ydb/topic.hpp @@ -71,6 +71,50 @@ class TopicReadSession final { std::shared_ptr read_session_; }; +/// @brief Write session used to connect to a topic for writting +/// +/// @see https://ydb.tech/docs/en/reference/ydb-sdk/topic#write +class TopicWriteSession final { +public: + /// @cond + /// For internal use only. + explicit TopicWriteSession(std::shared_ptr write_session); + /// @endcond + + /// @brief Wait for the next write session event + /// + /// Suspends the current coroutine until an event is available, + /// the returns it without blocking the thread. + std::optional GetEvent(); + + /// @brief Poll for a write session event without waiting + /// + /// Returns the next buffered event immediately if one is available, + /// or `std::nullopt` if the event queue is empty. Does not suspend + /// the coroutine. + std::optional TryGetEvent(); + + /// @brief Write a messsage using a continuation token from TReadyToAcceptEvent + /// + /// Must be called only after receiving TReadyToAcceptEvent from GetEvent(). + void Write(NYdb::NTopic::TContinuationToken&& token, NYdb::NTopic::TWriteMessage&& message); + + /// @brief Close write session + /// + /// Waits for all in-flights messages to be acknowledged. + /// Force closes after timeout + bool Close(std::chrono::milliseconds timeout); + + /// Get native write session + /// @warning Use with care! Facilities from + /// `` can help with + /// non-blocking wait operations. + std::shared_ptr GetNativeTopicWriteSession(); + +private: + std::shared_ptr write_session_; +}; + /// @ingroup userver_clients /// /// @brief YDB Topic Client @@ -94,6 +138,9 @@ class TopicClient final { /// Create read session TopicReadSession CreateReadSession(const NYdb::NTopic::TReadSessionSettings& settings); + /// Create write session + TopicWriteSession CreateWriteSession(const NYdb::NTopic::TWriteSessionSettings& settings); + /// Get native topic client /// @warning Use with care! Facilities from /// `` can help with diff --git a/ydb/src/ydb/topic.cpp b/ydb/src/ydb/topic.cpp index b3bb169e27d5..4875b9f98103 100644 --- a/ydb/src/ydb/topic.cpp +++ b/ydb/src/ydb/topic.cpp @@ -12,8 +12,7 @@ USERVER_NAMESPACE_BEGIN namespace ydb { TopicReadSession::TopicReadSession(std::shared_ptr read_session) - : read_session_(std::move(read_session)) -{ + : read_session_(std::move(read_session)) { UASSERT(read_session_); } @@ -36,10 +35,30 @@ bool TopicReadSession::Close(std::chrono::milliseconds timeout) { return read_se std::shared_ptr TopicReadSession::GetNativeTopicReadSession() { return read_session_; } +TopicWriteSession::TopicWriteSession(std::shared_ptr write_session) + : write_session_(std::move(write_session)) { + UASSERT(write_session_); +} + +std::optional TopicWriteSession::GetEvent() { + impl::GetFutureValue(write_session_->WaitEvent()); + return write_session_->GetEvent(/*block=*/false); +} + +std::optional TopicWriteSession::TryGetEvent() { + return write_session_->GetEvent(/*block=*/false); +} + +void TopicWriteSession::Write(NYdb::NTopic::TContinuationToken&& token, NYdb::NTopic::TWriteMessage&& message) { + write_session_->Write(std::move(token), std::move(message)); +} + +bool TopicWriteSession::Close(std::chrono::milliseconds timeout) { return write_session_->Close(timeout); } + +std::shared_ptr TopicWriteSession::GetNativeTopicWriteSession() { return write_session_; } + TopicClient::TopicClient(std::shared_ptr driver, [[maybe_unused]] impl::TopicSettings settings) - : driver_{std::move(driver)}, - topic_client_{driver_->GetNativeDriver()} -{} + : driver_{std::move(driver)}, topic_client_{driver_->GetNativeDriver()} {} TopicClient::~TopicClient() = default; @@ -55,6 +74,10 @@ TopicReadSession TopicClient::CreateReadSession(const NYdb::NTopic::TReadSession return TopicReadSession{topic_client_.CreateReadSession(settings)}; } +TopicWriteSession TopicClient::CreateWriteSession(const NYdb::NTopic::TWriteSessionSettings& settings) { + return TopicWriteSession{topic_client_.CreateWriteSession(settings)}; +} + NYdb::NTopic::TTopicClient& TopicClient::GetNativeTopicClient() { return topic_client_; } } // namespace ydb diff --git a/ydb/tests/topic_test.cpp b/ydb/tests/topic_test.cpp index ab4679e1db8f..949be237df63 100644 --- a/ydb/tests/topic_test.cpp +++ b/ydb/tests/topic_test.cpp @@ -15,6 +15,10 @@ constexpr std::string_view kChangefeed = "test_changefeed"; const std::string kTopicPath = fmt::format("{}/{}", kTable, kChangefeed); constexpr std::string_view kConsumerName = "test_consumer"; +constexpr std::string_view kWriteTopic = "write_test_topic"; +constexpr std::string_view kWriteProducerId = "test-producer"; +constexpr std::string_view kWriteConsumerName = "write_test_consumer"; + class YdbTopicFixture : public ydb::ClientFixtureBase { protected: YdbTopicFixture() { @@ -73,6 +77,69 @@ class YdbTopicFixture : public ydb::ClientFixtureBase { read_session_settings.ConsumerName(ydb::impl::ToString(consumer_name)); return GetTopicClient().CreateReadSession(read_session_settings); } + + ydb::TopicWriteSession CreateWriteSession(std::string_view topic_path, std::string_view producer_id) { + const auto producer = ydb::impl::ToString(producer_id); + NYdb::NTopic::TWriteSessionSettings write_session_settings; + write_session_settings.Path(ydb::impl::ToString(topic_path)).ProducerId(producer).MessageGroupId(producer); + return GetTopicClient().CreateWriteSession(write_session_settings); + } + + void WriteAndAck(ydb::TopicWriteSession& session, std::string_view payload) { + bool written = false; + bool acked = false; + + while (!written || !acked) { + auto event = session.GetEvent(); + ASSERT_TRUE(event.has_value()); + + std::visit( + utils::Overloaded{ + [&](NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent& e) { + session + .Write(std::move(e.ContinuationToken), NYdb::NTopic::TWriteMessage{std::string{payload}}); + written = true; + }, + [&](NYdb::NTopic::TWriteSessionEvent::TAcksEvent& e) { + for (const auto& ack : e.Acks) { + EXPECT_EQ(ack.State, NYdb::NTopic::TWriteSessionEvent::TWriteAck::EEventState::EES_WRITTEN); + } + acked = true; + }, + [](const NYdb::NTopic::TSessionClosedEvent& e) { + FAIL() << "Session closed unexpectedly: " << e.GetIssues().ToString(); + }, + []([[maybe_unused]] auto& e) {}, + }, + *event + ); + } + } +}; + +class YdbTopicWriteSessionFixture : public YdbTopicFixture { +protected: + YdbTopicWriteSessionFixture() { + NYdb::NTopic::TCreateTopicSettings topic_settings; + topic_settings + .AppendConsumers(NYdb::NTopic::TConsumerSettings(topic_settings, ydb::impl::ToString(kWriteConsumerName))); + const auto status = + GetNativeTopicClient().CreateTopic(ydb::impl::ToString(kWriteTopic), topic_settings).GetValueSync(); + EXPECT_TRUE(status.IsSuccess()) << status.GetIssues().ToString(); + } + + ~YdbTopicWriteSessionFixture() override { + const auto status = GetNativeTopicClient().DropTopic(ydb::impl::ToString(kWriteTopic)).GetValueSync(); + EXPECT_TRUE(status.IsSuccess()) << status.GetIssues().ToString(); + } + + ydb::TopicWriteSession CreateWriteSession() { + return YdbTopicFixture::CreateWriteSession(kWriteTopic, kWriteProducerId); + } + + ydb::TopicReadSession CreateReadSession() { + return YdbTopicFixture::CreateReadSession(kWriteTopic, kWriteConsumerName); + } }; template @@ -264,4 +331,60 @@ UTEST_F(YdbTopicFixture, DescribeTopic) { DropConsumer(kTopicPath, kConsumerName); } +UTEST_F(YdbTopicWriteSessionFixture, TopicWriteSessionCreateClose) { + auto session = CreateWriteSession(); + UASSERT_NO_THROW(session.Close(std::chrono::milliseconds{1000})); +} + +UTEST_F(YdbTopicWriteSessionFixture, TopicWriteSessionGetNative) { + auto session = CreateWriteSession(); + EXPECT_NE(session.GetNativeTopicWriteSession(), nullptr); + session.Close(std::chrono::milliseconds{1000}); +} + +UTEST_F(YdbTopicWriteSessionFixture, TopicWriteSessionWriteSingle) { + auto session = CreateWriteSession(); + + auto task = engine::AsyncNoSpan([&] { UASSERT_NO_THROW(WriteAndAck(session, "hello")); }); + task.WaitFor(utest::kMaxTestWaitTime); + ASSERT_TRUE(task.IsFinished()); + + session.Close(std::chrono::milliseconds{1000}); +} + +UTEST_F(YdbTopicWriteSessionFixture, TopicWriteSessionWriteMultiple) { + auto session = CreateWriteSession(); + + auto task = engine::AsyncNoSpan([&] { + for (const std::string_view msg : {"msg-1", "msg-2", "msg-3"}) { + UASSERT_NO_THROW(WriteAndAck(session, msg)); + } + }); + task.WaitFor(utest::kMaxTestWaitTime); + ASSERT_TRUE(task.IsFinished()); + + session.Close(std::chrono::milliseconds{1000}); +} + +UTEST_F(YdbTopicWriteSessionFixture, TopicWriteSessionTryGetEventEmpty) { + auto session = CreateWriteSession(); + + auto task = engine::AsyncNoSpan([&] { + // Drain TReadyToAcceptEvent so the session is established + // and the event queue is empty. + auto event = session.GetEvent(); + + const bool is_ready = std::holds_alternative(*event); + ASSERT_TRUE(event.has_value()); + ASSERT_TRUE(is_ready); + + // Queue is now drained — TryGetEvent must return nullopt immediately. + EXPECT_FALSE(session.TryGetEvent().has_value()); + }); + task.WaitFor(utest::kMaxTestWaitTime); + ASSERT_TRUE(task.IsFinished()); + + session.Close(std::chrono::milliseconds{1000}); +} + USERVER_NAMESPACE_END