|
1 | 1 | #include <userver/utils/async.hpp> |
2 | 2 |
|
| 3 | +#include <concepts> |
3 | 4 | #include <vector> |
4 | 5 |
|
5 | 6 | #include <boost/range/adaptor/transformed.hpp> |
6 | 7 | #include <boost/range/numeric.hpp> |
7 | 8 |
|
8 | 9 | #include <userver/concurrent/variable.hpp> |
| 10 | +#include <userver/engine/sleep.hpp> |
| 11 | +#include <userver/engine/task/current_task.hpp> |
| 12 | +#include <userver/engine/task/inherited_variable.hpp> |
9 | 13 | #include <userver/engine/task/task_with_result.hpp> |
| 14 | +#include <userver/tracing/span.hpp> |
10 | 15 | #include <userver/utest/utest.hpp> |
11 | 16 |
|
12 | 17 | #include <engine/ev/thread_control.hpp> |
| 18 | +#include <engine/task/task_context.hpp> |
| 19 | +#include <engine/tests/task_processor_utils.hpp> |
13 | 20 |
|
14 | 21 | USERVER_NAMESPACE_BEGIN |
15 | 22 |
|
@@ -122,4 +129,201 @@ Response AsyncRequestProcessor::Foo(Request&& request) { return request * 2; } |
122 | 129 |
|
123 | 130 | } // namespace |
124 | 131 |
|
| 132 | +namespace { |
| 133 | + |
| 134 | +engine::TaskInheritedVariable<int> kInheritedVariable; |
| 135 | + |
| 136 | +} // namespace |
| 137 | + |
| 138 | +UTEST(UtilsAsync, AsyncCapturesExpectedContext) { |
| 139 | + kInheritedVariable.Set(42); |
| 140 | + const auto parent_trace_id = tracing::Span::CurrentSpan().GetTraceId(); |
| 141 | + |
| 142 | + auto task = utils::Async("async", [&, inherited = kInheritedVariable.Get()] { |
| 143 | + EXPECT_TRUE(tracing::Span::CurrentSpanUnchecked()); |
| 144 | + EXPECT_EQ(tracing::Span::CurrentSpan().GetTraceId(), parent_trace_id); |
| 145 | + EXPECT_EQ(inherited, 42); |
| 146 | + EXPECT_FALSE(engine::current_task::impl::IsCritical()); |
| 147 | + EXPECT_FALSE(engine::current_task::IsCancelRequested()); |
| 148 | + return true; |
| 149 | + }); |
| 150 | + static_assert(std::same_as<decltype(task), engine::TaskWithResult<bool>>); |
| 151 | + |
| 152 | + EXPECT_TRUE(task.Get()); |
| 153 | +} |
| 154 | + |
| 155 | +TEST(UtilsAsync, AsyncWithTaskProcessorCapturesExpectedContext) { |
| 156 | + engine::tests::TwoStandaloneTaskProcessors tp; |
| 157 | + tp.RunBlocking([&] { |
| 158 | + kInheritedVariable.Set(42); |
| 159 | + const auto parent_trace_id = tracing::Span::CurrentSpan().GetTraceId(); |
| 160 | + |
| 161 | + auto task = utils::Async(tp.GetSecondary(), "async", [&, inherited = kInheritedVariable.Get()] { |
| 162 | + EXPECT_TRUE(tracing::Span::CurrentSpanUnchecked()); |
| 163 | + EXPECT_EQ(tracing::Span::CurrentSpan().GetTraceId(), parent_trace_id); |
| 164 | + EXPECT_EQ(inherited, 42); |
| 165 | + EXPECT_FALSE(engine::current_task::impl::IsCritical()); |
| 166 | + EXPECT_FALSE(engine::current_task::IsCancelRequested()); |
| 167 | + EXPECT_EQ(&engine::current_task::GetTaskProcessor(), &tp.GetSecondary()); |
| 168 | + return true; |
| 169 | + }); |
| 170 | + static_assert(std::same_as<decltype(task), engine::TaskWithResult<bool>>); |
| 171 | + |
| 172 | + EXPECT_TRUE(task.Get()); |
| 173 | + }); |
| 174 | +} |
| 175 | + |
| 176 | +TEST(UtilsAsync, CriticalAsyncWithTaskProcessorCapturesExpectedContext) { |
| 177 | + engine::tests::TwoStandaloneTaskProcessors tp; |
| 178 | + tp.RunBlocking([&] { |
| 179 | + kInheritedVariable.Set(42); |
| 180 | + const auto parent_trace_id = tracing::Span::CurrentSpan().GetTraceId(); |
| 181 | + |
| 182 | + auto task = utils::CriticalAsync(tp.GetSecondary(), "async", [&, inherited = kInheritedVariable.Get()] { |
| 183 | + EXPECT_TRUE(tracing::Span::CurrentSpanUnchecked()); |
| 184 | + EXPECT_EQ(tracing::Span::CurrentSpan().GetTraceId(), parent_trace_id); |
| 185 | + EXPECT_EQ(inherited, 42); |
| 186 | + EXPECT_TRUE(engine::current_task::impl::IsCritical()); |
| 187 | + EXPECT_FALSE(engine::current_task::IsCancelRequested()); |
| 188 | + EXPECT_EQ(&engine::current_task::GetTaskProcessor(), &tp.GetSecondary()); |
| 189 | + return true; |
| 190 | + }); |
| 191 | + static_assert(std::same_as<decltype(task), engine::TaskWithResult<bool>>); |
| 192 | + |
| 193 | + EXPECT_TRUE(task.Get()); |
| 194 | + }); |
| 195 | +} |
| 196 | + |
| 197 | +TEST(UtilsAsync, SharedAsyncWithTaskProcessorCapturesExpectedContext) { |
| 198 | + engine::tests::TwoStandaloneTaskProcessors tp; |
| 199 | + tp.RunBlocking([&] { |
| 200 | + kInheritedVariable.Set(42); |
| 201 | + const auto parent_trace_id = tracing::Span::CurrentSpan().GetTraceId(); |
| 202 | + |
| 203 | + auto task = utils::SharedAsync(tp.GetSecondary(), "async", [&, inherited = kInheritedVariable.Get()] { |
| 204 | + EXPECT_TRUE(tracing::Span::CurrentSpanUnchecked()); |
| 205 | + EXPECT_EQ(tracing::Span::CurrentSpan().GetTraceId(), parent_trace_id); |
| 206 | + EXPECT_EQ(inherited, 42); |
| 207 | + EXPECT_FALSE(engine::current_task::impl::IsCritical()); |
| 208 | + EXPECT_FALSE(engine::current_task::IsCancelRequested()); |
| 209 | + EXPECT_EQ(&engine::current_task::GetTaskProcessor(), &tp.GetSecondary()); |
| 210 | + return true; |
| 211 | + }); |
| 212 | + static_assert(std::same_as<decltype(task), engine::SharedTaskWithResult<bool>>); |
| 213 | + |
| 214 | + EXPECT_TRUE(task.Get()); |
| 215 | + }); |
| 216 | +} |
| 217 | + |
| 218 | +UTEST(UtilsAsync, CriticalAsyncCapturesExpectedContext) { |
| 219 | + kInheritedVariable.Set(42); |
| 220 | + const auto parent_trace_id = tracing::Span::CurrentSpan().GetTraceId(); |
| 221 | + |
| 222 | + auto task = utils::CriticalAsync("async", [&, inherited = kInheritedVariable.Get()] { |
| 223 | + EXPECT_TRUE(tracing::Span::CurrentSpanUnchecked()); |
| 224 | + EXPECT_EQ(tracing::Span::CurrentSpan().GetTraceId(), parent_trace_id); |
| 225 | + EXPECT_EQ(inherited, 42); |
| 226 | + EXPECT_TRUE(engine::current_task::impl::IsCritical()); |
| 227 | + EXPECT_FALSE(engine::current_task::IsCancelRequested()); |
| 228 | + return true; |
| 229 | + }); |
| 230 | + static_assert(std::same_as<decltype(task), engine::TaskWithResult<bool>>); |
| 231 | + |
| 232 | + EXPECT_TRUE(task.Get()); |
| 233 | +} |
| 234 | + |
| 235 | +UTEST(UtilsAsync, SharedCriticalAsyncCapturesExpectedContext) { |
| 236 | + kInheritedVariable.Set(42); |
| 237 | + const auto parent_trace_id = tracing::Span::CurrentSpan().GetTraceId(); |
| 238 | + |
| 239 | + auto task = utils::SharedCriticalAsync("async", [&, inherited = kInheritedVariable.Get()] { |
| 240 | + EXPECT_TRUE(tracing::Span::CurrentSpanUnchecked()); |
| 241 | + EXPECT_EQ(tracing::Span::CurrentSpan().GetTraceId(), parent_trace_id); |
| 242 | + EXPECT_EQ(inherited, 42); |
| 243 | + EXPECT_TRUE(engine::current_task::impl::IsCritical()); |
| 244 | + EXPECT_FALSE(engine::current_task::IsCancelRequested()); |
| 245 | + return true; |
| 246 | + }); |
| 247 | + static_assert(std::same_as<decltype(task), engine::SharedTaskWithResult<bool>>); |
| 248 | + |
| 249 | + EXPECT_TRUE(task.Get()); |
| 250 | +} |
| 251 | + |
| 252 | +UTEST(UtilsAsync, SharedAsyncCapturesExpectedContext) { |
| 253 | + kInheritedVariable.Set(42); |
| 254 | + const auto parent_trace_id = tracing::Span::CurrentSpan().GetTraceId(); |
| 255 | + |
| 256 | + auto task = utils::SharedAsync("async", [&, inherited = kInheritedVariable.Get()] { |
| 257 | + EXPECT_TRUE(tracing::Span::CurrentSpanUnchecked()); |
| 258 | + EXPECT_EQ(tracing::Span::CurrentSpan().GetTraceId(), parent_trace_id); |
| 259 | + EXPECT_EQ(inherited, 42); |
| 260 | + EXPECT_FALSE(engine::current_task::impl::IsCritical()); |
| 261 | + EXPECT_FALSE(engine::current_task::IsCancelRequested()); |
| 262 | + return true; |
| 263 | + }); |
| 264 | + static_assert(std::same_as<decltype(task), engine::SharedTaskWithResult<bool>>); |
| 265 | + |
| 266 | + EXPECT_TRUE(task.Get()); |
| 267 | +} |
| 268 | + |
| 269 | +UTEST(UtilsAsync, AsyncWithDeadlineCapturesExpectedContext) { |
| 270 | + kInheritedVariable.Set(42); |
| 271 | + const auto parent_trace_id = tracing::Span::CurrentSpan().GetTraceId(); |
| 272 | + |
| 273 | + const auto deadline = engine::Deadline::FromDuration(utest::kMaxTestWaitTime); |
| 274 | + auto task = utils::Async("async", deadline, [&, inherited = kInheritedVariable.Get()] { |
| 275 | + EXPECT_TRUE(tracing::Span::CurrentSpanUnchecked()); |
| 276 | + EXPECT_EQ(tracing::Span::CurrentSpan().GetTraceId(), parent_trace_id); |
| 277 | + EXPECT_EQ(inherited, 42); |
| 278 | + EXPECT_FALSE(engine::current_task::impl::IsCritical()); |
| 279 | + EXPECT_EQ(engine::current_task::impl::GetDeadline(), deadline); |
| 280 | + return true; |
| 281 | + }); |
| 282 | + static_assert(std::same_as<decltype(task), engine::TaskWithResult<bool>>); |
| 283 | + |
| 284 | + EXPECT_TRUE(task.Get()); |
| 285 | +} |
| 286 | + |
| 287 | +TEST(UtilsAsync, AsyncBackgroundCapturesExpectedContext) { |
| 288 | + engine::tests::TwoStandaloneTaskProcessors tp; |
| 289 | + tp.RunBlocking([&] { |
| 290 | + kInheritedVariable.Set(42); |
| 291 | + const auto parent_trace_id = tracing::Span::CurrentSpan().GetTraceId(); |
| 292 | + |
| 293 | + auto task = utils::AsyncBackground("async", tp.GetSecondary(), [&, parent_trace_id] { |
| 294 | + EXPECT_TRUE(tracing::Span::CurrentSpanUnchecked()); |
| 295 | + EXPECT_EQ(tracing::Span::CurrentSpan().GetTraceId(), parent_trace_id); |
| 296 | + EXPECT_FALSE(kInheritedVariable.GetOptional()); |
| 297 | + EXPECT_FALSE(engine::current_task::impl::IsCritical()); |
| 298 | + EXPECT_FALSE(engine::current_task::IsCancelRequested()); |
| 299 | + EXPECT_EQ(&engine::current_task::GetTaskProcessor(), &tp.GetSecondary()); |
| 300 | + return true; |
| 301 | + }); |
| 302 | + static_assert(std::same_as<decltype(task), engine::TaskWithResult<bool>>); |
| 303 | + |
| 304 | + EXPECT_TRUE(task.Get()); |
| 305 | + }); |
| 306 | +} |
| 307 | + |
| 308 | +TEST(UtilsAsync, CriticalAsyncBackgroundCapturesExpectedContext) { |
| 309 | + engine::tests::TwoStandaloneTaskProcessors tp; |
| 310 | + tp.RunBlocking([&] { |
| 311 | + kInheritedVariable.Set(42); |
| 312 | + const auto parent_trace_id = tracing::Span::CurrentSpan().GetTraceId(); |
| 313 | + |
| 314 | + auto task = utils::CriticalAsyncBackground("async", tp.GetSecondary(), [&, parent_trace_id] { |
| 315 | + EXPECT_TRUE(tracing::Span::CurrentSpanUnchecked()); |
| 316 | + EXPECT_EQ(tracing::Span::CurrentSpan().GetTraceId(), parent_trace_id); |
| 317 | + EXPECT_FALSE(kInheritedVariable.GetOptional()); |
| 318 | + EXPECT_TRUE(engine::current_task::impl::IsCritical()); |
| 319 | + EXPECT_FALSE(engine::current_task::IsCancelRequested()); |
| 320 | + EXPECT_EQ(&engine::current_task::GetTaskProcessor(), &tp.GetSecondary()); |
| 321 | + return true; |
| 322 | + }); |
| 323 | + static_assert(std::same_as<decltype(task), engine::TaskWithResult<bool>>); |
| 324 | + |
| 325 | + EXPECT_TRUE(task.Get()); |
| 326 | + }); |
| 327 | +} |
| 328 | + |
125 | 329 | USERVER_NAMESPACE_END |
0 commit comments