Skip to content

Commit 6de613b

Browse files
committed
refactor engine: avoid AfterWait() in FdPoller
* Add `WaitListLightWithEpoch` that ignores the notification if the epoch passed with the notification does not match the epoch set by the `Awaiter` * An epoch separate from `Awaiter`'s `context` is required, because: 1. A single epoch is set per multiple wait operations, so the previous notification may be OK for a while (e.g. for multiple waits of `GetAll`), but it can be explicitly cut off by the awaiter side 2. `context` is not an epoch, it can have arbitrary meaning, e.g. it can always have the same value for each iteration * Add an optional epoch to `ev::Watcher`. The callback can get the epoch value with which the `Watcher` was started * Use them in `FdPoller`. Only wake up the task if the epoch of the `Awaitable` is the same as the epoch with which the `Watcher` was started. This avoids spurious wakeups without the need for synchronous `Stop` commit_hash:235bf1e252ab99e5808576fb85da1d126f1512a0
1 parent 2e45bee commit 6de613b

9 files changed

Lines changed: 291 additions & 39 deletions

File tree

.mapping.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1584,6 +1584,8 @@
15841584
"core/src/engine/impl/wait_list_benchmark.cpp":"taxi/uservices/userver/core/src/engine/impl/wait_list_benchmark.cpp",
15851585
"core/src/engine/impl/wait_list_light.cpp":"taxi/uservices/userver/core/src/engine/impl/wait_list_light.cpp",
15861586
"core/src/engine/impl/wait_list_light.hpp":"taxi/uservices/userver/core/src/engine/impl/wait_list_light.hpp",
1587+
"core/src/engine/impl/wait_list_light_with_epoch.cpp":"taxi/uservices/userver/core/src/engine/impl/wait_list_light_with_epoch.cpp",
1588+
"core/src/engine/impl/wait_list_light_with_epoch.hpp":"taxi/uservices/userver/core/src/engine/impl/wait_list_light_with_epoch.hpp",
15871589
"core/src/engine/io/buffered.cpp":"taxi/uservices/userver/core/src/engine/io/buffered.cpp",
15881590
"core/src/engine/io/buffered_test.cpp":"taxi/uservices/userver/core/src/engine/io/buffered_test.cpp",
15891591
"core/src/engine/io/common.cpp":"taxi/uservices/userver/core/src/engine/io/common.cpp",

core/include/userver/engine/impl/context_accessor.hpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,12 @@ class ContextAccessor {
3030
// @overload
3131
void RemoveAwaiter(TaskContext& task_context) noexcept;
3232

33-
// Wait for some cleanup (e.g. wait for `awaiter` to actually remove itself).
34-
// You may sleep in `AfterWait`.
35-
virtual void AfterWait() noexcept = 0;
33+
/// Wait for some cleanup (e.g. wait for `awaiter` to actually remove itself).
34+
/// You may sleep in `AfterWait`.
35+
///
36+
/// @deprecated `AfterWait` will be dropped soon. Please write the synchronization primitives in a way
37+
/// that does not require mixing synchronous waiting and async notifications.
38+
virtual void AfterWait() noexcept {}
3639

3740
// Precondition: IsReady
3841
// This method is required for WaitAllChecked to properly function.

core/include/userver/engine/io/fd_poller.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class FdPoller final {
100100
void SwitchStateToReadyToUse();
101101

102102
class Impl;
103-
utils::FastPimpl<Impl, 128, 16> pimpl_;
103+
utils::FastPimpl<Impl, 144, 16> pimpl_;
104104
};
105105

106106
} // namespace engine::io

core/src/engine/ev/watcher.hpp

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ using LibEvDuration = std::chrono::duration<double>;
2020
template <typename EvType>
2121
using RawCallback = void (*)(struct ev_loop*, EvType*, int) noexcept;
2222

23+
using WatcherEpoch = std::uint32_t;
24+
2325
/// An ev watcher wrapper. Usable from coroutines and from the bound ev thread.
2426
template <typename EvType>
2527
class Watcher final : public MultiShotAsyncPayload<Watcher<EvType>> {
@@ -71,12 +73,15 @@ class Watcher final : public MultiShotAsyncPayload<Watcher<EvType>> {
7173
// }
7274
[[nodiscard]] auto StopWithinEvCallback() noexcept;
7375

74-
// Asynchronously start ev_xxx.
75-
void StartAsync() noexcept;
76+
// Asynchronously start ev_xxx. epoch will be made available from GetEpochWithinEvCallback.
77+
void StartAsync(WatcherEpoch epoch = 0) noexcept;
7678

7779
// Asynchronously stop ev_xxx. Beware of dangling references!
7880
void StopAsync() noexcept;
7981

82+
// Returns the epoch stored by the StartAsync call that started the watcher with the current callback.
83+
WatcherEpoch GetEpochWithinEvCallback() const noexcept;
84+
8085
template <typename T = EvType>
8186
std::enable_if_t<std::is_same_v<T, ev_timer>> Again();
8287

@@ -98,9 +103,14 @@ class Watcher final : public MultiShotAsyncPayload<Watcher<EvType>> {
98103

99104
enum class AsyncOpType : std::uint8_t { kNone, kStart, kStop };
100105

106+
struct alignas(8) AsyncOp {
107+
AsyncOpType type{AsyncOpType::kNone};
108+
WatcherEpoch epoch{0}; // Used only for kStart.
109+
};
110+
101111
auto EvLoopOpsCountingGuard() noexcept;
102112
void DoPerformAndRelease() noexcept;
103-
void PushAsyncOp(AsyncOpType payload) noexcept;
113+
void PushAsyncOp(AsyncOp payload) noexcept;
104114
bool IsActive() const noexcept;
105115

106116
void StartImpl() noexcept;
@@ -112,8 +122,9 @@ class Watcher final : public MultiShotAsyncPayload<Watcher<EvType>> {
112122
EvType w_;
113123
ThreadControl thread_control_;
114124
std::atomic<bool> is_running_{false};
115-
std::atomic<AsyncOpType> pending_async_op_{{}};
125+
std::atomic<AsyncOp> pending_async_op_{{}};
116126
std::atomic<std::uint32_t> pending_op_count_{0};
127+
WatcherEpoch delivered_epoch_{0}; // Last epoch delivered to ev thread by kStart.
117128
};
118129

119130
template <typename EvType>
@@ -159,16 +170,22 @@ auto Watcher<EvType>::StopWithinEvCallback() noexcept {
159170
}
160171

161172
template <typename EvType>
162-
void Watcher<EvType>::StartAsync() noexcept {
163-
PushAsyncOp(AsyncOpType::kStart);
173+
void Watcher<EvType>::StartAsync(WatcherEpoch epoch) noexcept {
174+
PushAsyncOp({AsyncOpType::kStart, epoch});
164175
}
165176

166177
template <typename EvType>
167178
void Watcher<EvType>::StopAsync() noexcept {
168179
if (!IsActive()) {
169180
return;
170181
}
171-
PushAsyncOp(AsyncOpType::kStop);
182+
PushAsyncOp({AsyncOpType::kStop, /*epoch=*/0});
183+
}
184+
185+
template <typename EvType>
186+
WatcherEpoch Watcher<EvType>::GetEpochWithinEvCallback() const noexcept {
187+
UASSERT(thread_control_.IsInEvThread());
188+
return delivered_epoch_;
172189
}
173190

174191
template <typename EvType>
@@ -218,10 +235,11 @@ void Watcher<EvType>::DoPerformAndRelease() noexcept {
218235

219236
const auto op = pending_async_op_.exchange({}, std::memory_order_relaxed);
220237

221-
switch (op) {
238+
switch (op.type) {
222239
case AsyncOpType::kNone:
223240
break;
224241
case AsyncOpType::kStart:
242+
delivered_epoch_ = op.epoch;
225243
StartImpl();
226244
break;
227245
case AsyncOpType::kStop:
@@ -233,7 +251,7 @@ void Watcher<EvType>::DoPerformAndRelease() noexcept {
233251
}
234252

235253
template <typename EvType>
236-
void Watcher<EvType>::PushAsyncOp(AsyncOpType payload) noexcept {
254+
void Watcher<EvType>::PushAsyncOp(AsyncOp payload) noexcept {
237255
pending_async_op_.store(payload, std::memory_order_relaxed);
238256
if (this->PrepareEnqueue()) {
239257
++pending_op_count_;
@@ -249,7 +267,8 @@ bool Watcher<EvType>::IsActive() const noexcept {
249267
template <typename EvType>
250268
void Watcher<EvType>::StartImpl() noexcept {
251269
if (is_running_) {
252-
return;
270+
// This may happen due to stop + start operations being merged.
271+
StopImpl();
253272
}
254273
is_running_ = true;
255274
thread_control_.Start(w_);

core/src/engine/impl/wait_list_light.cpp

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,6 @@
1414

1515
USERVER_NAMESPACE_BEGIN
1616

17-
namespace engine::impl {
18-
namespace {
19-
20-
struct alignas(sizeof(std::uintptr_t) * 2) AwaiterWithContext final {
21-
Awaiter* awaiter{nullptr};
22-
std::uintptr_t context{0};
23-
};
24-
25-
} // namespace
26-
} // namespace engine::impl
27-
2817
USERVER_NAMESPACE_END
2918

3019
template <>

core/src/engine/impl/wait_list_light.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ namespace engine::impl {
1212

1313
class Awaiter;
1414

15+
struct alignas(sizeof(std::uintptr_t) * 2) AwaiterWithContext final {
16+
Awaiter* awaiter{nullptr};
17+
std::uintptr_t context{0};
18+
};
19+
1520
/// Wait list for a single entry. All functions are thread-safe.
1621
class WaitListLight final {
1722
public:
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
#include "wait_list_light_with_epoch.hpp"
2+
3+
#include <memory>
4+
5+
#include <fmt/format.h>
6+
7+
#include <engine/impl/wait_list_light.hpp>
8+
#include <userver/engine/impl/awaiter.hpp>
9+
#include <userver/logging/log.hpp>
10+
#include <userver/utils/assert.hpp>
11+
12+
USERVER_NAMESPACE_BEGIN
13+
14+
namespace engine::impl {
15+
16+
namespace {
17+
18+
AwaiterWithContext* const kSignaled = reinterpret_cast<AwaiterWithContext*>(1);
19+
20+
} // namespace
21+
22+
WaitListLightWithEpoch::WaitListLightWithEpoch(std::uint32_t epoch) noexcept : state_({nullptr, epoch}) {}
23+
24+
WaitListLightWithEpoch::~WaitListLightWithEpoch() {
25+
const auto state = state_.load<std::memory_order_relaxed>();
26+
UASSERT_MSG(
27+
state.awaiter_with_context == nullptr || state.awaiter_with_context == kSignaled,
28+
"Someone is awaiting on WaitListLightWithEpoch while it's being destroyed"
29+
);
30+
}
31+
32+
bool WaitListLightWithEpoch::SetEpochThenGetAndResetSignal(std::uint32_t epoch) noexcept {
33+
const AwaiterWithContextPtrAndEpoch new_value{nullptr, epoch};
34+
const auto old_state = state_.exchange<std::memory_order_seq_cst>(new_value);
35+
UASSERT_MSG(
36+
old_state.awaiter_with_context == nullptr || old_state.awaiter_with_context == kSignaled,
37+
"Someone is awaiting on WaitListLightWithEpoch during SetEpoch"
38+
);
39+
return old_state.awaiter_with_context == kSignaled;
40+
}
41+
42+
std::uint32_t WaitListLightWithEpoch::GetEpoch() const noexcept { return state_.LoadWithTearing().epoch; }
43+
44+
bool WaitListLightWithEpoch::GetSignalOrAppend(boost::intrusive_ptr<Awaiter> awaiter, std::uintptr_t context) {
45+
UASSERT(awaiter);
46+
47+
const auto epoch = state_.LoadWithTearing().epoch;
48+
49+
auto new_awaiter_with_context = std::make_unique<AwaiterWithContext>(AwaiterWithContext{awaiter.get(), context});
50+
51+
const AwaiterWithContextPtrAndEpoch new_state{new_awaiter_with_context.get(), epoch};
52+
53+
// Expect either {nullptr, epoch} or {kSignaled, epoch}.
54+
AwaiterWithContextPtrAndEpoch expected{nullptr, epoch};
55+
56+
// seq_cst is important for the "Append-Check-Wakeup" sequence.
57+
bool success =
58+
state_.compare_exchange_strong<std::memory_order_seq_cst, std::memory_order_relaxed>(expected, new_state);
59+
60+
if (!success) {
61+
// CAS failed - new_awaiter_with_context will clean up.
62+
63+
UASSERT_MSG(
64+
expected.awaiter_with_context == kSignaled && expected.epoch == epoch,
65+
"An unexpected awaiter is occupying the WaitListLightWithEpoch or epoch does not match"
66+
);
67+
68+
// Already signaled with matching epoch.
69+
return true;
70+
}
71+
72+
// CAS succeeded - release ownership to the atomic.
73+
[[maybe_unused]] auto* const released_ptr = new_awaiter_with_context.release();
74+
75+
// Keep a reference logically stored in the WaitListLightWithEpoch.
76+
awaiter.detach();
77+
return false;
78+
}
79+
80+
void WaitListLightWithEpoch::Remove(Awaiter& awaiter, std::uintptr_t context) noexcept {
81+
const auto current = state_.LoadWithTearing();
82+
AwaiterWithContextPtrAndEpoch expected = current;
83+
84+
UASSERT(expected.awaiter_with_context != nullptr);
85+
if (expected.awaiter_with_context == kSignaled) {
86+
// Already removed/signaled.
87+
return;
88+
}
89+
90+
UASSERT_MSG(
91+
expected.awaiter_with_context->awaiter == &awaiter && expected.awaiter_with_context->context == context,
92+
"An unexpected awaiter is occupying the WaitListLightWithEpoch"
93+
);
94+
95+
// Preserve the epoch when removing the awaiter.
96+
const AwaiterWithContextPtrAndEpoch new_value{nullptr, current.epoch};
97+
98+
const bool success =
99+
state_.compare_exchange_strong<std::memory_order_release, std::memory_order_relaxed>(expected, new_value);
100+
101+
if (!success) {
102+
UASSERT_MSG(
103+
expected.awaiter_with_context == nullptr || expected.awaiter_with_context == kSignaled,
104+
"An unexpected awaiter is occupying the WaitListLightWithEpoch or epoch changed mid-await"
105+
);
106+
return;
107+
}
108+
109+
if (current.awaiter_with_context != nullptr && current.awaiter_with_context != kSignaled) {
110+
std::default_delete<AwaiterWithContext>{}(current.awaiter_with_context);
111+
}
112+
intrusive_ptr_release(&awaiter);
113+
}
114+
115+
bool WaitListLightWithEpoch::GetAndResetSignal() noexcept {
116+
const auto current = state_.load<std::memory_order_acquire>();
117+
118+
// Reset signal from {kSignaled, epoch} to {nullptr, epoch}.
119+
AwaiterWithContextPtrAndEpoch expected{kSignaled, current.epoch};
120+
const AwaiterWithContextPtrAndEpoch new_value{nullptr, current.epoch};
121+
122+
const bool success =
123+
state_.compare_exchange_strong<std::memory_order_relaxed, std::memory_order_relaxed>(expected, new_value);
124+
125+
if (!success) {
126+
// Only two states are allowed: {nullptr, epoch} or {kSignaled, epoch}.
127+
UASSERT_MSG(
128+
expected.epoch == current.epoch && expected.awaiter_with_context == nullptr,
129+
"An unexpected awaiter is occupying the WaitListLightWithEpoch or epoch does not match"
130+
);
131+
}
132+
133+
return success;
134+
}
135+
136+
void WaitListLightWithEpoch::SetSignalAndNotifyOneIfEpochMatches(std::uint32_t epoch) noexcept {
137+
auto current = state_.LoadWithTearing();
138+
139+
for (;;) {
140+
if (current.epoch != epoch) {
141+
return;
142+
}
143+
144+
const AwaiterWithContextPtrAndEpoch new_value{kSignaled, epoch};
145+
146+
const bool success =
147+
state_.compare_exchange_strong<std::memory_order_seq_cst, std::memory_order_relaxed>(current, new_value);
148+
149+
if (success) {
150+
if (current.awaiter_with_context != nullptr && current.awaiter_with_context != kSignaled) {
151+
const boost::intrusive_ptr<Awaiter> awaiter{current.awaiter_with_context->awaiter, /*add_ref=*/false};
152+
awaiter->Notify(current.awaiter_with_context->context);
153+
std::default_delete<AwaiterWithContext>{}(current.awaiter_with_context);
154+
}
155+
return;
156+
}
157+
// CAS failed, current is updated with actual value, retry.
158+
}
159+
}
160+
161+
bool WaitListLightWithEpoch::IsSignaled() const noexcept {
162+
const auto torn_state = state_.LoadWithTearing();
163+
std::atomic_thread_fence(std::memory_order_acquire);
164+
return torn_state.awaiter_with_context == kSignaled;
165+
}
166+
167+
} // namespace engine::impl
168+
169+
USERVER_NAMESPACE_END

0 commit comments

Comments
 (0)