/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <folly/fibers/GenericBaton.h>
#include <folly/fibers/Semaphore.h>
#include <folly/fibers/SimpleLoopController.h>
+#include <folly/fibers/TimedMutex.h>
#include <folly/fibers/WhenN.h>
+#include <folly/io/async/ScopedEventBaseThread.h>
#include <folly/portability/GTest.h>
using namespace folly::fibers;
if (!taskAdded) {
manager.addTask([&]() {
std::vector<std::function<std::unique_ptr<int>()>> funcs;
- for (size_t i = 0; i < 3; ++i) {
+ for (int i = 0; i < 3; ++i) {
funcs.push_back([i, &pendingFibers]() {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
manager.addTask([&]() {
std::vector<std::function<int()>> funcs;
for (size_t i = 0; i < 3; ++i) {
- funcs.push_back([i, &pendingFibers]() {
+ funcs.push_back([i, &pendingFibers]() -> size_t {
await([&pendingFibers](Promise<int> promise) {
pendingFibers.push_back(std::move(promise));
});
throw std::runtime_error("Runtime");
- return i * 2 + 1;
});
}
if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
if (i == problemIndex) {
- EXPECT_THROW(job.token.dispatch(job.input), std::logic_error);
+ EXPECT_THROW(job.token.dispatch(job.input), ABDUsageException);
}
}
} catch (...) {
dispatchJobs(executor, jobs, results);
throw std::runtime_error(
"Unexpected exception in user code before commit called");
- atomicBatchDispatcher.commit();
+ // atomicBatchDispatcher.commit();
} catch (...) {
/* User code handles the exception and does not exit process */
}
evb.loop();
- validateResults<std::logic_error>(results, COUNT);
+ validateResults<ABDCommitNotCalledException>(results, COUNT);
}
TEST(FiberManager, ABD_PreprocessingFailureTest) {
dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
atomicBatchDispatcher.commit();
evb.loop();
- validateResults<std::logic_error>(results, COUNT - 1);
+ validateResults<ABDTokenNotDispatchedException>(results, COUNT - 1);
}
TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
createAtomicBatchDispatcher(std::move(dispatchFunc));
createJobs(atomicBatchDispatcher, jobs, COUNT);
atomicBatchDispatcher.commit();
- EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
+ EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
dispatchJobs(executor, jobs, results);
- EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
+ EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
evb.loop();
validateResults(results, COUNT);
- EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
+ EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
}
TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
// Testing that exception is set if user provided batch dispatch throws
//
dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
- auto results = userDispatchFunc(std::move(inputs));
+ (void)userDispatchFunc(std::move(inputs));
throw std::runtime_error("Unexpected exception in user dispatch function");
- return results;
};
auto atomicBatchDispatcher =
createAtomicBatchDispatcher(std::move(dispatchFunc));
validateResults<std::runtime_error>(results, COUNT);
}
+TEST(FiberManager, VirtualEventBase) {
+ bool done1{false};
+ bool done2{false};
+ {
+ folly::ScopedEventBaseThread thread;
+
+ auto evb1 =
+ folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
+ auto& evb2 = thread.getEventBase()->getVirtualEventBase();
+
+ getFiberManager(*evb1).addTaskRemote([&] {
+ Baton baton;
+ baton.timed_wait(std::chrono::milliseconds{100});
+
+ done1 = true;
+ });
+
+ getFiberManager(evb2).addTaskRemote([&] {
+ Baton baton;
+ baton.timed_wait(std::chrono::milliseconds{200});
+
+ done2 = true;
+ });
+
+ EXPECT_FALSE(done1);
+ EXPECT_FALSE(done2);
+
+ evb1.reset();
+ EXPECT_TRUE(done1);
+ EXPECT_FALSE(done2);
+ }
+ EXPECT_TRUE(done2);
+}
+
+TEST(TimedMutex, ThreadFiberDeadlockOrder) {
+ folly::EventBase evb;
+ auto& fm = getFiberManager(evb);
+ TimedMutex mutex;
+
+ mutex.lock();
+ std::thread unlockThread([&] {
+ /* sleep override */ std::this_thread::sleep_for(
+ std::chrono::milliseconds{100});
+ mutex.unlock();
+ });
+
+ fm.addTask([&] { std::lock_guard<TimedMutex> lg(mutex); });
+ fm.addTask([&] {
+ runInMainContext([&] {
+ auto locked = mutex.timed_lock(std::chrono::seconds{1});
+ EXPECT_TRUE(locked);
+ if (locked) {
+ mutex.unlock();
+ }
+ });
+ });
+
+ evb.loopOnce();
+ EXPECT_EQ(0, fm.hasTasks());
+
+ unlockThread.join();
+}
+
+TEST(TimedMutex, ThreadFiberDeadlockRace) {
+ folly::EventBase evb;
+ auto& fm = getFiberManager(evb);
+ TimedMutex mutex;
+
+ mutex.lock();
+
+ fm.addTask([&] {
+ auto locked = mutex.timed_lock(std::chrono::seconds{1});
+ EXPECT_TRUE(locked);
+ if (locked) {
+ mutex.unlock();
+ }
+ });
+ fm.addTask([&] {
+ mutex.unlock();
+ runInMainContext([&] {
+ auto locked = mutex.timed_lock(std::chrono::seconds{1});
+ EXPECT_TRUE(locked);
+ if (locked) {
+ mutex.unlock();
+ }
+ });
+ });
+
+ evb.loopOnce();
+ EXPECT_EQ(0, fm.hasTasks());
+}
+
/**
* Test that we can properly track fiber stack usage.
*