Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions ydb/include/userver/ydb/topic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,50 @@ class TopicReadSession final {
std::shared_ptr<NYdb::NTopic::IReadSession> read_session_;
};

/// @brief Write session used to connect to a topic for writting
///
/// @see https://ydb.tech/docs/en/reference/ydb-sdk/topic#write
class TopicWriteSession final {
public:
/// @cond
/// For internal use only.
explicit TopicWriteSession(std::shared_ptr<NYdb::NTopic::IWriteSession> write_session);
/// @endcond

/// @brief Wait for the next write session event
///
/// Suspends the current coroutine until an event is available,
/// the returns it without blocking the thread.
std::optional<NYdb::NTopic::TWriteSessionEvent::TEvent> GetEvent();

/// @brief Poll for a write session event without waiting
///
/// Returns the next buffered event immediately if one is available,
/// or `std::nullopt` if the event queue is empty. Does not suspend
/// the coroutine.
std::optional<NYdb::NTopic::TWriteSessionEvent::TEvent> TryGetEvent();

/// @brief Write a messsage using a continuation token from TReadyToAcceptEvent
///
/// Must be called only after receiving TReadyToAcceptEvent from GetEvent().
void Write(NYdb::NTopic::TContinuationToken&& token, NYdb::NTopic::TWriteMessage&& message);

/// @brief Close write session
///
/// Waits for all in-flights messages to be acknowledged.
/// Force closes after timeout
bool Close(std::chrono::milliseconds timeout);

/// Get native write session
/// @warning Use with care! Facilities from
/// `<core/include/userver/drivers/subscribable_futures.hpp>` can help with
/// non-blocking wait operations.
std::shared_ptr<NYdb::NTopic::IWriteSession> GetNativeTopicWriteSession();

private:
std::shared_ptr<NYdb::NTopic::IWriteSession> write_session_;
};

/// @ingroup userver_clients
///
/// @brief YDB Topic Client
Expand All @@ -94,6 +138,9 @@ class TopicClient final {
/// Create read session
TopicReadSession CreateReadSession(const NYdb::NTopic::TReadSessionSettings& settings);

/// Create write session
TopicWriteSession CreateWriteSession(const NYdb::NTopic::TWriteSessionSettings& settings);

/// Get native topic client
/// @warning Use with care! Facilities from
/// `<core/include/userver/drivers/subscribable_futures.hpp>` can help with
Expand Down
33 changes: 28 additions & 5 deletions ydb/src/ydb/topic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ USERVER_NAMESPACE_BEGIN
namespace ydb {

TopicReadSession::TopicReadSession(std::shared_ptr<NYdb::NTopic::IReadSession> read_session)
: read_session_(std::move(read_session))
{
: read_session_(std::move(read_session)) {
UASSERT(read_session_);
}

Expand All @@ -36,10 +35,30 @@ bool TopicReadSession::Close(std::chrono::milliseconds timeout) { return read_se

std::shared_ptr<NYdb::NTopic::IReadSession> TopicReadSession::GetNativeTopicReadSession() { return read_session_; }

TopicWriteSession::TopicWriteSession(std::shared_ptr<NYdb::NTopic::IWriteSession> write_session)
: write_session_(std::move(write_session)) {
UASSERT(write_session_);
}

std::optional<NYdb::NTopic::TWriteSessionEvent::TEvent> TopicWriteSession::GetEvent() {
impl::GetFutureValue(write_session_->WaitEvent());
return write_session_->GetEvent(/*block=*/false);
}

std::optional<NYdb::NTopic::TWriteSessionEvent::TEvent> TopicWriteSession::TryGetEvent() {
return write_session_->GetEvent(/*block=*/false);
}

void TopicWriteSession::Write(NYdb::NTopic::TContinuationToken&& token, NYdb::NTopic::TWriteMessage&& message) {
write_session_->Write(std::move(token), std::move(message));
}

bool TopicWriteSession::Close(std::chrono::milliseconds timeout) { return write_session_->Close(timeout); }

std::shared_ptr<NYdb::NTopic::IWriteSession> TopicWriteSession::GetNativeTopicWriteSession() { return write_session_; }

TopicClient::TopicClient(std::shared_ptr<impl::Driver> driver, [[maybe_unused]] impl::TopicSettings settings)
: driver_{std::move(driver)},
topic_client_{driver_->GetNativeDriver()}
{}
: driver_{std::move(driver)}, topic_client_{driver_->GetNativeDriver()} {}

TopicClient::~TopicClient() = default;

Expand All @@ -55,6 +74,10 @@ TopicReadSession TopicClient::CreateReadSession(const NYdb::NTopic::TReadSession
return TopicReadSession{topic_client_.CreateReadSession(settings)};
}

TopicWriteSession TopicClient::CreateWriteSession(const NYdb::NTopic::TWriteSessionSettings& settings) {
return TopicWriteSession{topic_client_.CreateWriteSession(settings)};
}

NYdb::NTopic::TTopicClient& TopicClient::GetNativeTopicClient() { return topic_client_; }

} // namespace ydb
Expand Down
123 changes: 123 additions & 0 deletions ydb/tests/topic_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ constexpr std::string_view kChangefeed = "test_changefeed";
const std::string kTopicPath = fmt::format("{}/{}", kTable, kChangefeed);
constexpr std::string_view kConsumerName = "test_consumer";

constexpr std::string_view kWriteTopic = "write_test_topic";
constexpr std::string_view kWriteProducerId = "test-producer";
constexpr std::string_view kWriteConsumerName = "write_test_consumer";

class YdbTopicFixture : public ydb::ClientFixtureBase {
protected:
YdbTopicFixture() {
Expand Down Expand Up @@ -73,6 +77,69 @@ class YdbTopicFixture : public ydb::ClientFixtureBase {
read_session_settings.ConsumerName(ydb::impl::ToString(consumer_name));
return GetTopicClient().CreateReadSession(read_session_settings);
}

ydb::TopicWriteSession CreateWriteSession(std::string_view topic_path, std::string_view producer_id) {
const auto producer = ydb::impl::ToString(producer_id);
NYdb::NTopic::TWriteSessionSettings write_session_settings;
write_session_settings.Path(ydb::impl::ToString(topic_path)).ProducerId(producer).MessageGroupId(producer);
return GetTopicClient().CreateWriteSession(write_session_settings);
}

void WriteAndAck(ydb::TopicWriteSession& session, std::string_view payload) {
bool written = false;
bool acked = false;

while (!written || !acked) {
auto event = session.GetEvent();
ASSERT_TRUE(event.has_value());

std::visit(
utils::Overloaded{
[&](NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent& e) {
session
.Write(std::move(e.ContinuationToken), NYdb::NTopic::TWriteMessage{std::string{payload}});
written = true;
},
[&](NYdb::NTopic::TWriteSessionEvent::TAcksEvent& e) {
for (const auto& ack : e.Acks) {
EXPECT_EQ(ack.State, NYdb::NTopic::TWriteSessionEvent::TWriteAck::EEventState::EES_WRITTEN);
}
acked = true;
},
[](const NYdb::NTopic::TSessionClosedEvent& e) {
FAIL() << "Session closed unexpectedly: " << e.GetIssues().ToString();
},
[]([[maybe_unused]] auto& e) {},
},
*event
);
}
}
};

class YdbTopicWriteSessionFixture : public YdbTopicFixture {
protected:
YdbTopicWriteSessionFixture() {
NYdb::NTopic::TCreateTopicSettings topic_settings;
topic_settings
.AppendConsumers(NYdb::NTopic::TConsumerSettings(topic_settings, ydb::impl::ToString(kWriteConsumerName)));
const auto status =
GetNativeTopicClient().CreateTopic(ydb::impl::ToString(kWriteTopic), topic_settings).GetValueSync();
EXPECT_TRUE(status.IsSuccess()) << status.GetIssues().ToString();
}

~YdbTopicWriteSessionFixture() override {
const auto status = GetNativeTopicClient().DropTopic(ydb::impl::ToString(kWriteTopic)).GetValueSync();
EXPECT_TRUE(status.IsSuccess()) << status.GetIssues().ToString();
}

ydb::TopicWriteSession CreateWriteSession() {
return YdbTopicFixture::CreateWriteSession(kWriteTopic, kWriteProducerId);
}

ydb::TopicReadSession CreateReadSession() {
return YdbTopicFixture::CreateReadSession(kWriteTopic, kWriteConsumerName);
}
};

template <class T>
Expand Down Expand Up @@ -264,4 +331,60 @@ UTEST_F(YdbTopicFixture, DescribeTopic) {
DropConsumer(kTopicPath, kConsumerName);
}

UTEST_F(YdbTopicWriteSessionFixture, TopicWriteSessionCreateClose) {
auto session = CreateWriteSession();
UASSERT_NO_THROW(session.Close(std::chrono::milliseconds{1000}));
}

UTEST_F(YdbTopicWriteSessionFixture, TopicWriteSessionGetNative) {
auto session = CreateWriteSession();
EXPECT_NE(session.GetNativeTopicWriteSession(), nullptr);
session.Close(std::chrono::milliseconds{1000});
}

UTEST_F(YdbTopicWriteSessionFixture, TopicWriteSessionWriteSingle) {
auto session = CreateWriteSession();

auto task = engine::AsyncNoSpan([&] { UASSERT_NO_THROW(WriteAndAck(session, "hello")); });
task.WaitFor(utest::kMaxTestWaitTime);
ASSERT_TRUE(task.IsFinished());

session.Close(std::chrono::milliseconds{1000});
}

UTEST_F(YdbTopicWriteSessionFixture, TopicWriteSessionWriteMultiple) {
auto session = CreateWriteSession();

auto task = engine::AsyncNoSpan([&] {
for (const std::string_view msg : {"msg-1", "msg-2", "msg-3"}) {
UASSERT_NO_THROW(WriteAndAck(session, msg));
}
});
task.WaitFor(utest::kMaxTestWaitTime);
ASSERT_TRUE(task.IsFinished());

session.Close(std::chrono::milliseconds{1000});
}

UTEST_F(YdbTopicWriteSessionFixture, TopicWriteSessionTryGetEventEmpty) {
auto session = CreateWriteSession();

auto task = engine::AsyncNoSpan([&] {
// Drain TReadyToAcceptEvent so the session is established
// and the event queue is empty.
auto event = session.GetEvent();

const bool is_ready = std::holds_alternative<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(*event);
ASSERT_TRUE(event.has_value());
ASSERT_TRUE(is_ready);

// Queue is now drained — TryGetEvent must return nullopt immediately.
EXPECT_FALSE(session.TryGetEvent().has_value());
});
task.WaitFor(utest::kMaxTestWaitTime);
ASSERT_TRUE(task.IsFinished());

session.Close(std::chrono::milliseconds{1000});
}

USERVER_NAMESPACE_END
Loading