/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <thread>
#include <vector>
-#include <gtest/gtest.h>
-
-#include <folly/Benchmark.h>
#include <folly/Memory.h>
+#include <folly/Random.h>
#include <folly/futures/Future.h>
+#include <folly/Conv.h>
#include <folly/fibers/AddTasks.h>
+#include <folly/fibers/AtomicBatchDispatcher.h>
+#include <folly/fibers/BatchDispatcher.h>
#include <folly/fibers/EventBaseLoopController.h>
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerMap.h>
#include <folly/fibers/GenericBaton.h>
+#include <folly/fibers/Semaphore.h>
#include <folly/fibers/SimpleLoopController.h>
+#include <folly/fibers/TimedMutex.h>
#include <folly/fibers/WhenN.h>
+#include <folly/io/async/ScopedEventBaseThread.h>
+#include <folly/portability/GTest.h>
using namespace folly::fibers;
if (!taskAdded) {
manager.addTask([&]() {
std::vector<std::function<std::unique_ptr<int>()>> funcs;
- for (size_t i = 0; i < 3; ++i) {
+ for (int i = 0; i < 3; ++i) {
funcs.push_back([i, &pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
manager.addTask([&]() {
std::vector<std::function<int()>> funcs;
for (size_t i = 0; i < 3; ++i) {
- funcs.push_back([i, &pendingFibers]() {
+ funcs.push_back([i, &pendingFibers]() -> size_t {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
throw std::runtime_error("Runtime");
- return i * 2 + 1;
});
}
{
folly::RequestContextScopeGuard rctx;
auto rcontext1 = folly::RequestContext::get();
- fm.addTask([&]() {
+ fm.addTask([&, rcontext1]() {
EXPECT_EQ(rcontext1, folly::RequestContext::get());
baton1.wait(
[&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
{
folly::RequestContextScopeGuard rctx;
auto rcontext2 = folly::RequestContext::get();
- fm.addTaskRemote([&]() {
+ fm.addTaskRemote([&, rcontext2]() {
EXPECT_EQ(rcontext2, folly::RequestContext::get());
baton2.wait();
EXPECT_EQ(rcontext2, folly::RequestContext::get());
folly::RequestContextScopeGuard rctx;
auto rcontext3 = folly::RequestContext::get();
fm.addTaskFinally(
- [&]() {
+ [&, rcontext3]() {
EXPECT_EQ(rcontext3, folly::RequestContext::get());
baton3.wait();
EXPECT_EQ(rcontext3, folly::RequestContext::get());
return folly::Unit();
},
- [&](Try<folly::Unit>&& /* t */) {
+ [&, rcontext3](Try<folly::Unit>&& /* t */) {
EXPECT_EQ(rcontext3, folly::RequestContext::get());
checkRun3 = true;
});
outerEvb.loopForever();
}
-static size_t sNumAwaits;
+TEST(FiberManager, semaphore) {
+ constexpr size_t kTasks = 10;
+ constexpr size_t kIterations = 10000;
+ constexpr size_t kNumTokens = 10;
-void runBenchmark(size_t numAwaits, size_t toSend) {
- sNumAwaits = numAwaits;
+ Semaphore sem(kNumTokens);
+ int counterA = 0;
+ int counterB = 0;
- FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
- auto& loopController =
- dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
+ auto task = [&sem, kTasks, kIterations, kNumTokens](
+ int& counter, folly::fibers::Baton& baton) {
+ FiberManager manager(folly::make_unique<EventBaseLoopController>());
+ folly::EventBase evb;
+ dynamic_cast<EventBaseLoopController&>(manager.loopController())
+ .attachEventBase(evb);
+
+ {
+ std::shared_ptr<folly::EventBase> completionCounter(
+ &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
+
+ for (size_t i = 0; i < kTasks; ++i) {
+ manager.addTask([&, completionCounter]() {
+ for (size_t j = 0; j < kIterations; ++j) {
+ sem.wait();
+ ++counter;
+ sem.signal();
+ --counter;
+
+ EXPECT_LT(counter, kNumTokens);
+ EXPECT_GE(counter, 0);
+ }
+ });
+ }
- std::queue<Promise<int>> pendingRequests;
- static const size_t maxOutstanding = 5;
+ baton.wait();
+ }
+ evb.loopForever();
+ };
- auto loop = [&fiberManager, &loopController, &pendingRequests, &toSend]() {
- if (pendingRequests.size() == maxOutstanding || toSend == 0) {
- if (pendingRequests.empty()) {
- return;
- }
- pendingRequests.front().setValue(0);
- pendingRequests.pop();
- } else {
- fiberManager.addTask([&pendingRequests]() {
- for (size_t i = 0; i < sNumAwaits; ++i) {
- auto result = await([&pendingRequests](Promise<int> promise) {
- pendingRequests.push(std::move(promise));
- });
- DCHECK_EQ(result, 0);
+ folly::fibers::Baton batonA;
+ folly::fibers::Baton batonB;
+ std::thread threadA([&] { task(counterA, batonA); });
+ std::thread threadB([&] { task(counterB, batonB); });
+
+ batonA.post();
+ batonB.post();
+ threadA.join();
+ threadB.join();
+
+ EXPECT_LT(counterA, kNumTokens);
+ EXPECT_LT(counterB, kNumTokens);
+ EXPECT_GE(counterA, 0);
+ EXPECT_GE(counterB, 0);
+}
+
+template <typename ExecutorT>
+void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
+ thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
+ executor, [=](std::vector<int>&& batch) {
+ EXPECT_EQ(batchSize, batch.size());
+ std::vector<std::string> results;
+ for (auto& it : batch) {
+ results.push_back(folly::to<std::string>(it));
}
+ return results;
});
- if (--toSend == 0) {
- loopController.stop();
+ auto indexCopy = index;
+ auto result = batchDispatcher.add(std::move(indexCopy));
+ EXPECT_EQ(folly::to<std::string>(index), result.get());
+}
+
+TEST(FiberManager, batchDispatchTest) {
+ folly::EventBase evb;
+ auto& executor = getFiberManager(evb);
+
+ // Launch multiple fibers with a single id.
+ executor.add([&]() {
+ int batchSize = 10;
+ for (int i = 0; i < batchSize; i++) {
+ executor.add(
+ [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
+ }
+ });
+ evb.loop();
+
+ // Reuse the same BatchDispatcher to batch once again.
+ executor.add([&]() {
+ int batchSize = 10;
+ for (int i = 0; i < batchSize; i++) {
+ executor.add(
+ [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
+ }
+ });
+ evb.loop();
+}
+
+template <typename ExecutorT>
+folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
+ ExecutorT& executor,
+ int totalNumberOfElements,
+ std::vector<int> input) {
+ thread_local BatchDispatcher<
+ std::vector<int>,
+ std::vector<std::string>,
+ ExecutorT>
+ batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
+ std::vector<std::vector<std::string>> results;
+ int numberOfElements = 0;
+ for (auto& unit : batch) {
+ numberOfElements += unit.size();
+ std::vector<std::string> result;
+ for (auto& element : unit) {
+ result.push_back(folly::to<std::string>(element));
}
+ results.push_back(std::move(result));
}
- };
+ EXPECT_EQ(totalNumberOfElements, numberOfElements);
+ return results;
+ });
- loopController.loop(std::move(loop));
+ return batchDispatcher.add(std::move(input));
}
-BENCHMARK(FiberManagerBasicOneAwait, iters) {
- runBenchmark(1, iters);
+/**
+ * Batch values in groups of 5, and then call inner dispatch.
+ */
+template <typename ExecutorT>
+void doubleBatchOuterDispatch(
+ ExecutorT& executor,
+ int totalNumberOfElements,
+ int index) {
+ thread_local BatchDispatcher<int, std::string, ExecutorT>
+ batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
+ EXPECT_EQ(totalNumberOfElements, batch.size());
+ std::vector<std::string> results;
+ std::vector<folly::Future<std::vector<std::string>>>
+ innerDispatchResultFutures;
+
+ std::vector<int> group;
+ for (auto unit : batch) {
+ group.push_back(unit);
+ if (group.size() == 5) {
+ auto localGroup = group;
+ group.clear();
+
+ innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
+ executor, totalNumberOfElements, localGroup));
+ }
+ }
+
+ folly::collectAll(
+ innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
+ .then([&](
+ std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
+ for (auto& unit : innerDispatchResults) {
+ for (auto& element : unit.value()) {
+ results.push_back(element);
+ }
+ }
+ })
+ .get();
+ return results;
+ });
+
+ auto indexCopy = index;
+ auto result = batchDispatcher.add(std::move(indexCopy));
+ EXPECT_EQ(folly::to<std::string>(index), result.get());
+}
+
+TEST(FiberManager, doubleBatchDispatchTest) {
+ folly::EventBase evb;
+ auto& executor = getFiberManager(evb);
+
+ // Launch multiple fibers with a single id.
+ executor.add([&]() {
+ int totalNumberOfElements = 20;
+ for (int i = 0; i < totalNumberOfElements; i++) {
+ executor.add([=, &executor]() {
+ doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
+ });
+ }
+ });
+ evb.loop();
}
-BENCHMARK(FiberManagerBasicFiveAwaits, iters) {
- runBenchmark(5, iters);
+template <typename ExecutorT>
+void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
+ thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
+ executor, [=, &executor](std::vector<int> &&) -> std::vector<int> {
+ throw std::runtime_error("Surprise!!");
+ });
+
+ EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
}
-BENCHMARK(FiberManagerCreateDestroy, iters) {
- for (size_t i = 0; i < iters; ++i) {
- folly::EventBase evb;
- auto& fm = folly::fibers::getFiberManager(evb);
- fm.addTask([]() {});
- evb.loop();
+TEST(FiberManager, batchDispatchExceptionHandlingTest) {
+ folly::EventBase evb;
+ auto& executor = getFiberManager(evb);
+
+ // Launch multiple fibers with a single id.
+ executor.add([&]() {
+ int totalNumberOfElements = 5;
+ for (int i = 0; i < totalNumberOfElements; i++) {
+ executor.add(
+ [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
+ }
+ });
+ evb.loop();
+}
+
+namespace AtomicBatchDispatcherTesting {
+
+using ValueT = size_t;
+using ResultT = std::string;
+using DispatchFunctionT =
+ folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
+
+#define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
+#if ENABLE_TRACE_IN_TEST
+#define OUTPUT_TRACE std::cerr
+#else // ENABLE_TRACE_IN_TEST
+struct DevNullPiper {
+ template <typename T>
+ DevNullPiper& operator<<(const T&) {
+ return *this;
}
+
+ DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
+ return *this;
+ }
+} devNullPiper;
+#define OUTPUT_TRACE devNullPiper
+#endif // ENABLE_TRACE_IN_TEST
+
+struct Job {
+ AtomicBatchDispatcher<ValueT, ResultT>::Token token;
+ ValueT input;
+
+ void preprocess(FiberManager& executor, bool die) {
+ // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
+ clock_t msecToDoIO = folly::Random::rand32() % 10;
+ double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
+ double endAfter = start + msecToDoIO;
+ while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
+ executor.yield();
+ }
+ if (die) {
+ throw std::logic_error("Simulating preprocessing failure");
+ }
+ }
+
+ Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
+ : token(std::move(t)), input(i) {}
+
+ Job(Job&&) = default;
+ Job& operator=(Job&&) = default;
+};
+
+ResultT processSingleInput(ValueT&& input) {
+ return folly::to<ResultT>(std::move(input));
}
-BENCHMARK(FiberManagerAllocateDeallocatePattern, iters) {
- static const size_t kNumAllocations = 10000;
+std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
+ size_t expectedCount = inputs.size();
+ std::vector<ResultT> results;
+ results.reserve(expectedCount);
+ for (size_t i = 0; i < expectedCount; ++i) {
+ results.emplace_back(processSingleInput(std::move(inputs[i])));
+ }
+ return results;
+}
- FiberManager::Options opts;
- opts.maxFibersPoolSize = 0;
+void createJobs(
+ AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
+ std::vector<Job>& jobs,
+ size_t count) {
+ jobs.clear();
+ for (size_t i = 0; i < count; ++i) {
+ jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
+ }
+}
- FiberManager fiberManager(folly::make_unique<SimpleLoopController>(), opts);
+enum class DispatchProblem {
+ None,
+ PreprocessThrows,
+ DuplicateDispatch,
+};
+
+void dispatchJobs(
+ FiberManager& executor,
+ std::vector<Job>& jobs,
+ std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+ DispatchProblem dispatchProblem = DispatchProblem::None,
+ size_t problemIndex = size_t(-1)) {
+ EXPECT_TRUE(
+ dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
+ results.clear();
+ results.resize(jobs.size());
+ for (size_t i = 0; i < jobs.size(); ++i) {
+ executor.add(
+ [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
+ try {
+ Job job(std::move(jobs[i]));
- for (size_t iter = 0; iter < iters; ++iter) {
- EXPECT_EQ(0, fiberManager.fibersPoolSize());
+ if (dispatchProblem == DispatchProblem::PreprocessThrows) {
+ if (i == problemIndex) {
+ EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
+ return;
+ }
+ }
- size_t fibersRun = 0;
+ job.preprocess(executor, false);
+ OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
+ results[i] = job.token.dispatch(job.input);
+ OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
- for (size_t i = 0; i < kNumAllocations; ++i) {
- fiberManager.addTask([&fibersRun] { ++fibersRun; });
- fiberManager.loopUntilNoReady();
+ if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
+ if (i == problemIndex) {
+ EXPECT_THROW(job.token.dispatch(job.input), ABDUsageException);
+ }
+ }
+ } catch (...) {
+ OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
+ }
+ });
+ }
+}
+
+void validateResult(
+ std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+ size_t i) {
+ try {
+ OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
+ << std::endl;
+ } catch (std::exception& e) {
+ OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
+ throw;
+ }
+}
+
+template <typename TException>
+void validateResults(
+ std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+ size_t expectedNumResults) {
+ size_t numResultsFilled = 0;
+ for (size_t i = 0; i < results.size(); ++i) {
+ if (!results[i]) {
+ continue;
}
+ ++numResultsFilled;
+ EXPECT_THROW(validateResult(results, i), TException);
+ }
+ EXPECT_EQ(numResultsFilled, expectedNumResults);
+}
- EXPECT_EQ(10000, fibersRun);
- EXPECT_EQ(0, fiberManager.fibersPoolSize());
+void validateResults(
+ std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+ size_t expectedNumResults) {
+ size_t numResultsFilled = 0;
+ for (size_t i = 0; i < results.size(); ++i) {
+ if (!results[i]) {
+ continue;
+ }
+ ++numResultsFilled;
+ EXPECT_NO_THROW(validateResult(results, i));
+ ValueT expectedInput = i;
+ EXPECT_EQ(
+ results[i]->value(), processSingleInput(std::move(expectedInput)));
}
+ EXPECT_EQ(numResultsFilled, expectedNumResults);
}
-BENCHMARK(FiberManagerAllocateLargeChunk, iters) {
- static const size_t kNumAllocations = 10000;
+} // AtomicBatchDispatcherTesting
+
+#define SET_UP_TEST_FUNC \
+ using namespace AtomicBatchDispatcherTesting; \
+ folly::EventBase evb; \
+ auto& executor = getFiberManager(evb); \
+ const size_t COUNT = 11; \
+ std::vector<Job> jobs; \
+ jobs.reserve(COUNT); \
+ std::vector<folly::Optional<folly::Future<ResultT>>> results; \
+ results.reserve(COUNT); \
+ DispatchFunctionT dispatchFunc
+
+TEST(FiberManager, ABD_Test) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing AtomicBatchDispatcher with explicit call to commit()
+ //
+ dispatchFunc = userDispatchFunc;
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ dispatchJobs(executor, jobs, results);
+ atomicBatchDispatcher.commit();
+ evb.loop();
+ validateResults(results, COUNT);
+}
- FiberManager::Options opts;
- opts.maxFibersPoolSize = 0;
+TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing AtomicBatchDispatcher destroyed before calling commit.
+ // Handles error cases for:
+ // - User might have forgotten to add the call to commit() in the code
+ // - An unexpected exception got thrown in user code before commit() is called
+ //
+ try {
+ dispatchFunc = userDispatchFunc;
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ dispatchJobs(executor, jobs, results);
+ throw std::runtime_error(
+ "Unexpected exception in user code before commit called");
+ // atomicBatchDispatcher.commit();
+ } catch (...) {
+ /* User code handles the exception and does not exit process */
+ }
+ evb.loop();
+ validateResults<ABDCommitNotCalledException>(results, COUNT);
+}
- FiberManager fiberManager(folly::make_unique<SimpleLoopController>(), opts);
+TEST(FiberManager, ABD_PreprocessingFailureTest) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing preprocessing failure on a job throws
+ //
+ dispatchFunc = userDispatchFunc;
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
+ atomicBatchDispatcher.commit();
+ evb.loop();
+ validateResults<ABDTokenNotDispatchedException>(results, COUNT - 1);
+}
- for (size_t iter = 0; iter < iters; ++iter) {
- EXPECT_EQ(0, fiberManager.fibersPoolSize());
+TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing that calling dispatch more than once on the same token throws
+ //
+ dispatchFunc = userDispatchFunc;
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
+ atomicBatchDispatcher.commit();
+ evb.loop();
+}
- size_t fibersRun = 0;
+TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing that exception set on attempt to call getToken after commit called
+ //
+ dispatchFunc = userDispatchFunc;
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ atomicBatchDispatcher.commit();
+ EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
+ dispatchJobs(executor, jobs, results);
+ EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
+ evb.loop();
+ validateResults(results, COUNT);
+ EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
+}
- for (size_t i = 0; i < kNumAllocations; ++i) {
- fiberManager.addTask([&fibersRun] { ++fibersRun; });
- }
+TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing that exception is set if user provided batch dispatch throws
+ //
+ dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
+ (void)userDispatchFunc(std::move(inputs));
+ throw std::runtime_error("Unexpected exception in user dispatch function");
+ };
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ dispatchJobs(executor, jobs, results);
+ atomicBatchDispatcher.commit();
+ evb.loop();
+ validateResults<std::runtime_error>(results, COUNT);
+}
+
+TEST(FiberManager, VirtualEventBase) {
+ bool done1{false};
+ bool done2{false};
+ {
+ folly::ScopedEventBaseThread thread;
+
+ auto evb1 =
+ folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
+ auto& evb2 = thread.getEventBase()->getVirtualEventBase();
+
+ getFiberManager(*evb1).addTaskRemote([&] {
+ Baton baton;
+ baton.timed_wait(std::chrono::milliseconds{100});
+
+ done1 = true;
+ });
+
+ getFiberManager(evb2).addTaskRemote([&] {
+ Baton baton;
+ baton.timed_wait(std::chrono::milliseconds{200});
+
+ done2 = true;
+ });
- fiberManager.loopUntilNoReady();
+ EXPECT_FALSE(done1);
+ EXPECT_FALSE(done2);
- EXPECT_EQ(10000, fibersRun);
- EXPECT_EQ(0, fiberManager.fibersPoolSize());
+ evb1.reset();
+ EXPECT_TRUE(done1);
+ EXPECT_FALSE(done2);
}
+ EXPECT_TRUE(done2);
+}
+
+TEST(TimedMutex, ThreadFiberDeadlockOrder) {
+ folly::EventBase evb;
+ auto& fm = getFiberManager(evb);
+ TimedMutex mutex;
+
+ mutex.lock();
+ std::thread unlockThread([&] {
+ /* sleep override */ std::this_thread::sleep_for(
+ std::chrono::milliseconds{100});
+ mutex.unlock();
+ });
+
+ fm.addTask([&] { std::lock_guard<TimedMutex> lg(mutex); });
+ fm.addTask([&] {
+ runInMainContext([&] {
+ auto locked = mutex.timed_lock(std::chrono::seconds{1});
+ EXPECT_TRUE(locked);
+ if (locked) {
+ mutex.unlock();
+ }
+ });
+ });
+
+ evb.loopOnce();
+ EXPECT_EQ(0, fm.hasTasks());
+
+ unlockThread.join();
+}
+
+TEST(TimedMutex, ThreadFiberDeadlockRace) {
+ folly::EventBase evb;
+ auto& fm = getFiberManager(evb);
+ TimedMutex mutex;
+
+ mutex.lock();
+
+ fm.addTask([&] {
+ auto locked = mutex.timed_lock(std::chrono::seconds{1});
+ EXPECT_TRUE(locked);
+ if (locked) {
+ mutex.unlock();
+ }
+ });
+ fm.addTask([&] {
+ mutex.unlock();
+ runInMainContext([&] {
+ auto locked = mutex.timed_lock(std::chrono::seconds{1});
+ EXPECT_TRUE(locked);
+ if (locked) {
+ mutex.unlock();
+ }
+ });
+ });
+
+ evb.loopOnce();
+ EXPECT_EQ(0, fm.hasTasks());
+}
+
+/**
+ * Test that we can properly track fiber stack usage.
+ *
+ * This functionality can only be enabled when ASAN is disabled, so avoid
+ * running this test with ASAN.
+ */
+#ifndef FOLLY_SANITIZE_ADDRESS
+TEST(FiberManager, recordStack) {
+ std::thread([] {
+ folly::fibers::FiberManager::Options opts;
+ opts.recordStackEvery = 1;
+
+ FiberManager fm(folly::make_unique<SimpleLoopController>(), opts);
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(fm.loopController());
+
+ static constexpr size_t n = 1000;
+ int s = 0;
+ fm.addTask([&]() {
+ int b[n] = {0};
+ for (size_t i = 0; i < n; ++i) {
+ b[i] = i;
+ }
+ for (size_t i = 0; i + 1 < n; ++i) {
+ s += b[i] * b[i + 1];
+ }
+ });
+
+ (void)s;
+
+ loopController.loop([&]() { loopController.stop(); });
+
+ // Check that we properly accounted fiber stack usage.
+ EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());
+ }).join();
}
+#endif