X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2Fasync%2Ftest%2FEventBaseTest.cpp;h=454ece3c55dd7d697a79c8d9d8c48a829bec854a;hb=3cdd3857fbfacd9312a23214f54a47f156726927;hp=a8ff23c7b7e77d39805f85accf44c06b8a10540f;hpb=35ed5364f1ba809d6d103c98c03ae951aab3bc05;p=folly.git diff --git a/folly/io/async/test/EventBaseTest.cpp b/folly/io/async/test/EventBaseTest.cpp index a8ff23c7..454ece3c 100644 --- a/folly/io/async/test/EventBaseTest.cpp +++ b/folly/io/async/test/EventBaseTest.cpp @@ -16,24 +16,38 @@ * specific language governing permissions and limitations * under the License. */ + +#include +#include + #include #include #include #include #include +#include + +#include +#include #include -#include #include #include +using std::atomic; using std::deque; using std::pair; using std::vector; +using std::unique_ptr; +using std::thread; using std::make_pair; using std::cerr; using std::endl; using std::chrono::milliseconds; +using std::chrono::microseconds; +using std::chrono::duration_cast; + +using namespace std::chrono_literals; using namespace folly; @@ -45,9 +59,10 @@ enum { BUF_SIZE = 4096 }; ssize_t writeToFD(int fd, size_t length) { // write an arbitrary amount of data to the fd - char buf[length]; - memset(buf, 'a', sizeof(buf)); - ssize_t rc = write(fd, buf, sizeof(buf)); + auto bufv = vector(length); + auto buf = bufv.data(); + memset(buf, 'a', length); + ssize_t rc = write(fd, buf, length); CHECK_EQ(rc, length); return rc; } @@ -71,8 +86,8 @@ size_t writeUntilFull(int fd) { ssize_t readFromFD(int fd, size_t length) { // write an arbitrary amount of data to the fd - char buf[length]; - return read(fd, buf, sizeof(buf)); + auto buf = vector(length); + return read(fd, buf.data(), length); } size_t readUntilEmpty(int fd) { @@ -123,7 +138,7 @@ struct ScheduledEvent { void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) { for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) { - eventBase->runAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd), + eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd), ev->milliseconds); } } @@ -133,7 +148,7 @@ class TestHandler : public EventHandler { TestHandler(EventBase* eventBase, int fd) : EventHandler(eventBase, fd), fd_(fd) {} - virtual void handlerReady(uint16_t events) noexcept { + void handlerReady(uint16_t events) noexcept override { ssize_t bytesRead = 0; ssize_t bytesWritten = 0; if (events & READ) { @@ -147,7 +162,7 @@ class TestHandler : public EventHandler { bytesWritten = writeUntilFull(fd_); } - log.push_back(EventRecord(events, bytesRead, bytesWritten)); + log.emplace_back(events, bytesRead, bytesWritten); } struct EventRecord { @@ -182,9 +197,9 @@ TEST(EventBaseTest, ReadEvent) { // Register timeouts to perform two write events ScheduledEvent events[] = { - { 10, EventHandler::WRITE, 2345 }, - { 160, EventHandler::WRITE, 99 }, - { 0, 0, 0 }, + { 10, EventHandler::WRITE, 2345, 0 }, + { 160, EventHandler::WRITE, 99, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); @@ -223,16 +238,16 @@ TEST(EventBaseTest, ReadPersist) { // Register several timeouts to perform writes ScheduledEvent events[] = { - { 10, EventHandler::WRITE, 1024 }, - { 20, EventHandler::WRITE, 2211 }, - { 30, EventHandler::WRITE, 4096 }, - { 100, EventHandler::WRITE, 100 }, - { 0, 0 }, + { 10, EventHandler::WRITE, 1024, 0 }, + { 20, EventHandler::WRITE, 2211, 0 }, + { 30, EventHandler::WRITE, 4096, 0 }, + { 100, EventHandler::WRITE, 100, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); // Schedule a timeout to unregister the handler after the third write - eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85); + eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85); // Loop TimePoint start; @@ -274,13 +289,13 @@ TEST(EventBaseTest, ReadImmediate) { // Register a timeout to perform another write ScheduledEvent events[] = { - { 10, EventHandler::WRITE, 2345 }, - { 0, 0, 0 }, + { 10, EventHandler::WRITE, 2345, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); // Schedule a timeout to unregister the handler - eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20); + eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20); // Loop TimePoint start; @@ -321,9 +336,9 @@ TEST(EventBaseTest, WriteEvent) { // Register timeouts to perform two reads ScheduledEvent events[] = { - { 10, EventHandler::READ, 0 }, - { 60, EventHandler::READ, 0 }, - { 0, 0, 0 }, + { 10, EventHandler::READ, 0, 0 }, + { 60, EventHandler::READ, 0, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); @@ -362,16 +377,16 @@ TEST(EventBaseTest, WritePersist) { // Register several timeouts to read from the socket at several intervals ScheduledEvent events[] = { - { 10, EventHandler::READ, 0 }, - { 40, EventHandler::READ, 0 }, - { 70, EventHandler::READ, 0 }, - { 100, EventHandler::READ, 0 }, - { 0, 0 }, + { 10, EventHandler::READ, 0, 0 }, + { 40, EventHandler::READ, 0, 0 }, + { 70, EventHandler::READ, 0, 0 }, + { 100, EventHandler::READ, 0, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); // Schedule a timeout to unregister the handler after the third read - eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85); + eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85); // Loop TimePoint start; @@ -406,14 +421,14 @@ TEST(EventBaseTest, WriteImmediate) { // Register a timeout to perform a read ScheduledEvent events[] = { - { 10, EventHandler::READ, 0 }, - { 0, 0, 0 }, + { 10, EventHandler::READ, 0, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); // Schedule a timeout to unregister the handler int64_t unregisterTimeout = 40; - eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), + eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), unregisterTimeout); // Loop @@ -456,9 +471,9 @@ TEST(EventBaseTest, ReadWrite) { // Register timeouts to perform a write then a read. ScheduledEvent events[] = { - { 10, EventHandler::WRITE, 2345 }, - { 40, EventHandler::READ, 0 }, - { 0, 0, 0 }, + { 10, EventHandler::WRITE, 2345, 0 }, + { 40, EventHandler::READ, 0, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); @@ -497,9 +512,9 @@ TEST(EventBaseTest, WriteRead) { // Register timeouts to perform a read then a write. size_t sock1WriteLength = 2345; ScheduledEvent events[] = { - { 10, EventHandler::READ, 0 }, - { 40, EventHandler::WRITE, sock1WriteLength }, - { 0, 0, 0 }, + { 10, EventHandler::READ, 0, 0 }, + { 40, EventHandler::WRITE, sock1WriteLength, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); @@ -543,8 +558,8 @@ TEST(EventBaseTest, ReadWriteSimultaneous) { // Register a timeout to perform a read and write together ScheduledEvent events[] = { - { 10, EventHandler::READ | EventHandler::WRITE, 0 }, - { 0, 0, 0 }, + { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); @@ -582,18 +597,18 @@ TEST(EventBaseTest, ReadWritePersist) { // Register timeouts to perform several reads and writes ScheduledEvent events[] = { - { 10, EventHandler::WRITE, 2345 }, - { 20, EventHandler::READ, 0 }, - { 35, EventHandler::WRITE, 200 }, - { 45, EventHandler::WRITE, 15 }, - { 55, EventHandler::READ, 0 }, - { 120, EventHandler::WRITE, 2345 }, - { 0, 0, 0 }, + { 10, EventHandler::WRITE, 2345, 0 }, + { 20, EventHandler::READ, 0, 0 }, + { 35, EventHandler::WRITE, 200, 0 }, + { 45, EventHandler::WRITE, 15, 0 }, + { 55, EventHandler::READ, 0, 0 }, + { 120, EventHandler::WRITE, 2345, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); // Schedule a timeout to unregister the handler - eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80); + eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80); // Loop TimePoint start; @@ -637,10 +652,10 @@ class PartialReadHandler : public TestHandler { PartialReadHandler(EventBase* eventBase, int fd, size_t readLength) : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {} - virtual void handlerReady(uint16_t events) noexcept { + void handlerReady(uint16_t events) noexcept override { assert(events == EventHandler::READ); ssize_t bytesRead = readFromFD(fd_, readLength_); - log.push_back(EventRecord(events, bytesRead, 0)); + log.emplace_back(events, bytesRead, 0); } private: @@ -665,13 +680,13 @@ TEST(EventBaseTest, ReadPartial) { // Register a timeout to perform a single write, // with more data than PartialReadHandler will read at once ScheduledEvent events[] = { - { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) }, - { 0, 0, 0 }, + { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); // Schedule a timeout to unregister the handler - eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30); + eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30); // Loop TimePoint start; @@ -702,10 +717,10 @@ class PartialWriteHandler : public TestHandler { PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength) : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {} - virtual void handlerReady(uint16_t events) noexcept { + void handlerReady(uint16_t events) noexcept override { assert(events == EventHandler::WRITE); ssize_t bytesWritten = writeToFD(fd_, writeLength_); - log.push_back(EventRecord(events, 0, bytesWritten)); + log.emplace_back(events, 0, bytesWritten); } private: @@ -732,13 +747,13 @@ TEST(EventBaseTest, WritePartial) { // Register a timeout to read, so that more data can be written ScheduledEvent events[] = { - { 10, EventHandler::READ, 0 }, - { 0, 0, 0 }, + { 10, EventHandler::READ, 0, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); // Schedule a timeout to unregister the handler - eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30); + eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30); // Loop TimePoint start; @@ -772,9 +787,7 @@ TEST(EventBaseTest, DestroyHandler) { : AsyncTimeout(eb) , handler_(h) {} - virtual void timeoutExpired() noexcept { - delete handler_; - } + void timeoutExpired() noexcept override { delete handler_; } private: EventHandler* handler_; @@ -792,7 +805,7 @@ TEST(EventBaseTest, DestroyHandler) { // After 10ms, read some data, so that the handler // will be notified that it can write. - eb.runAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten), + eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten), 10); // Start a timer to destroy the handler after 25ms @@ -825,9 +838,9 @@ TEST(EventBaseTest, RunAfterDelay) { TimePoint timestamp1(false); TimePoint timestamp2(false); TimePoint timestamp3(false); - eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10); - eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20); - eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40); + eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10); + eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20); + eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40); TimePoint start; eb.loop(); @@ -840,7 +853,7 @@ TEST(EventBaseTest, RunAfterDelay) { } /** - * Test the behavior of runAfterDelay() when some timeouts are + * Test the behavior of tryRunAfterDelay() when some timeouts are * still scheduled when the EventBase is destroyed. */ TEST(EventBaseTest, RunAfterDelayDestruction) { @@ -855,15 +868,15 @@ TEST(EventBaseTest, RunAfterDelayDestruction) { EventBase eb; // Run two normal timeouts - eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10); - eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20); + eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10); + eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20); // Schedule a timeout to stop the event loop after 40ms - eb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40); + eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40); // Schedule 2 timeouts that would fire after the event loop stops - eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80); - eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160); + eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80); + eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160); start.reset(); eb.loop(); @@ -887,9 +900,7 @@ class TestTimeout : public AsyncTimeout { : AsyncTimeout(eventBase) , timestamp(false) {} - virtual void timeoutExpired() noexcept { - timestamp.reset(); - } + void timeoutExpired() noexcept override { timestamp.reset(); } TimePoint timestamp; }; @@ -925,8 +936,8 @@ class ReschedulingTimeout : public AsyncTimeout { reschedule(); } - virtual void timeoutExpired() noexcept { - timestamps.push_back(TimePoint()); + void timeoutExpired() noexcept override { + timestamps.emplace_back(); reschedule(); } @@ -995,9 +1006,9 @@ TEST(EventBaseTest, RescheduleTimeout) { &AsyncTimeout::scheduleTimeout); // after 10ms, reschedule t2 to run sooner than originally scheduled - eb.runAfterDelay(std::bind(f, &t2, 10), 10); + eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10); // after 10ms, reschedule t3 to run later than originally scheduled - eb.runAfterDelay(std::bind(f, &t3, 40), 10); + eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10); TimePoint start; eb.loop(); @@ -1022,7 +1033,7 @@ TEST(EventBaseTest, CancelTimeout) { ReschedulingTimeout t(&eb, timeouts); t.start(); - eb.runAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50); + eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50); TimePoint start; eb.loop(); @@ -1044,9 +1055,7 @@ TEST(EventBaseTest, DestroyTimeout) { : AsyncTimeout(eb) , timeout_(t) {} - virtual void timeoutExpired() noexcept { - delete timeout_; - } + void timeoutExpired() noexcept override { delete timeout_; } private: AsyncTimeout* timeout_; @@ -1098,7 +1107,7 @@ struct RunInThreadArg { }; void runInThreadTestFunc(RunInThreadArg* arg) { - arg->data->values.push_back(make_pair(arg->thread, arg->value)); + arg->data->values.emplace_back(arg->thread, arg->value); RunInThreadData* data = arg->data; delete arg; @@ -1109,11 +1118,18 @@ void runInThreadTestFunc(RunInThreadArg* arg) { } TEST(EventBaseTest, RunInThread) { - uint32_t numThreads = 50; - uint32_t opsPerThread = 100; + constexpr uint32_t numThreads = 50; + constexpr uint32_t opsPerThread = 100; RunInThreadData data(numThreads, opsPerThread); deque threads; + SCOPE_EXIT { + // Wait on all of the threads. + for (auto& thread : threads) { + thread.join(); + } + }; + for (uint32_t i = 0; i < numThreads; ++i) { threads.emplace_back([i, &data] { for (int n = 0; n < data.opsPerThread; ++n) { @@ -1129,7 +1145,7 @@ TEST(EventBaseTest, RunInThread) { // Once the last thread exits, it will stop the loop(). However, this // timeout also stops the loop in case there is a bug performing the normal // stop. - data.evb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb), + data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb), 3000); TimePoint start; @@ -1162,11 +1178,83 @@ TEST(EventBaseTest, RunInThread) { for (uint32_t n = 0; n < numThreads; ++n) { ASSERT_EQ(expectedValues[n], opsPerThread); } +} - // Wait on all of the threads. - for (auto& thread: threads) { - thread.join(); +// This test simulates some calls, and verifies that the waiting happens by +// triggering what otherwise would be race conditions, and trying to detect +// whether any of the race conditions happened. +TEST(EventBaseTest, RunInEventBaseThreadAndWait) { + const size_t c = 256; + vector>> atoms(c); + for (size_t i = 0; i < c; ++i) { + auto& atom = atoms.at(i); + atom = make_unique>(0); } + vector threads; + for (size_t i = 0; i < c; ++i) { + threads.emplace_back([&atoms, i] { + EventBase eb; + auto& atom = *atoms.at(i); + auto ebth = thread([&] { eb.loopForever(); }); + eb.waitUntilRunning(); + eb.runInEventBaseThreadAndWait([&] { + size_t x = 0; + atom.compare_exchange_weak( + x, 1, std::memory_order_release, std::memory_order_relaxed); + }); + size_t x = 0; + atom.compare_exchange_weak( + x, 2, std::memory_order_release, std::memory_order_relaxed); + eb.terminateLoopSoon(); + ebth.join(); + }); + } + for (size_t i = 0; i < c; ++i) { + auto& th = threads.at(i); + th.join(); + } + size_t sum = 0; + for (auto& atom : atoms) sum += *atom; + EXPECT_EQ(c, sum); +} + +TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) { + EventBase eb; + thread th(&EventBase::loopForever, &eb); + SCOPE_EXIT { + eb.terminateLoopSoon(); + th.join(); + }; + auto mutated = false; + eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { + mutated = true; + }); + EXPECT_TRUE(mutated); +} + +TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) { + EventBase eb; + thread th(&EventBase::loopForever, &eb); + SCOPE_EXIT { + eb.terminateLoopSoon(); + th.join(); + }; + eb.runInEventBaseThreadAndWait([&] { + auto mutated = false; + eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { + mutated = true; + }); + EXPECT_TRUE(mutated); + }); +} + +TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) { + EventBase eb; + auto mutated = false; + eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { + mutated = true; + }); + EXPECT_TRUE(mutated); } /////////////////////////////////////////////////////////////////////////// @@ -1183,7 +1271,7 @@ class CountedLoopCallback : public EventBase::LoopCallback { , count_(count) , action_(action) {} - virtual void runLoopCallback() noexcept { + void runLoopCallback() noexcept override { --count_; if (count_ > 0) { eventBase_->runInLoop(this); @@ -1218,6 +1306,21 @@ TEST(EventBaseTest, RepeatedRunInLoop) { ASSERT_EQ(c.getCount(), 0); } +// Test that EventBase::loop() works as expected without time measurements. +TEST(EventBaseTest, RunInLoopNoTimeMeasurement) { + EventBase eventBase(false); + + CountedLoopCallback c(&eventBase, 10); + eventBase.runInLoop(&c); + // The callback shouldn't have run immediately + ASSERT_EQ(c.getCount(), 10); + eventBase.loop(); + + // loop() should loop until the CountedLoopCallback stops + // re-installing itself. + ASSERT_EQ(c.getCount(), 0); +} + // Test runInLoop() calls with terminateLoopSoon() TEST(EventBaseTest, RunInLoopStopLoop) { EventBase eventBase; @@ -1247,6 +1350,20 @@ TEST(EventBaseTest, RunInLoopStopLoop) { ASSERT_LE(c1.getCount(), 11); } +TEST(EventBaseTest, TryRunningAfterTerminate) { + EventBase eventBase; + CountedLoopCallback c1(&eventBase, 1, + std::bind(&EventBase::terminateLoopSoon, &eventBase)); + eventBase.runInLoop(&c1); + eventBase.loopForever(); + bool ran = false; + eventBase.runInEventBaseThread([&]() { + ran = true; + }); + + ASSERT_FALSE(ran); +} + // Test cancelling runInLoop() callbacks TEST(EventBaseTest, CancelRunInLoop) { EventBase eventBase; @@ -1322,7 +1439,7 @@ class TerminateTestCallback : public EventBase::LoopCallback, unregisterHandler(); } - virtual void handlerReady(uint16_t events) noexcept { + void handlerReady(uint16_t /* events */) noexcept override { // We didn't register with PERSIST, so we will have been automatically // unregistered already. ASSERT_FALSE(isHandlerRegistered()); @@ -1334,7 +1451,7 @@ class TerminateTestCallback : public EventBase::LoopCallback, eventBase_->runInLoop(this); } - virtual void runLoopCallback() noexcept { + void runLoopCallback() noexcept override { ++loopInvocations_; if (loopInvocations_ >= maxLoopInvocations_) { return; @@ -1411,9 +1528,9 @@ class IdleTimeTimeoutSeries : public AsyncTimeout { scheduleTimeout(1); } - virtual ~IdleTimeTimeoutSeries() {} + ~IdleTimeTimeoutSeries() override {} - void timeoutExpired() noexcept { + void timeoutExpired() noexcept override { ++timeouts_; if(timeout_.empty()){ @@ -1448,7 +1565,7 @@ class IdleTimeTimeoutSeries : public AsyncTimeout { */ TEST(EventBaseTest, IdleTime) { EventBase eventBase; - eventBase.setLoadAvgMsec(1000); + eventBase.setLoadAvgMsec(1000ms); eventBase.resetLoadAvg(5900.0); std::deque timeouts0(4, 8080); timeouts0.push_front(8000); @@ -1456,23 +1573,30 @@ TEST(EventBaseTest, IdleTime) { IdleTimeTimeoutSeries tos0(&eventBase, timeouts0); std::deque timeouts(20, 20); std::unique_ptr tos; + int64_t testStart = duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + bool hostOverloaded = false; int latencyCallbacks = 0; - eventBase.setMaxLatency(6000, [&]() { + eventBase.setMaxLatency(6000us, [&]() { ++latencyCallbacks; - - switch (latencyCallbacks) { - case 1: - ASSERT_EQ(6, tos0.getTimeouts()); - ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200); - ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200); - tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts)); - break; - - default: + if (latencyCallbacks != 1) { FAIL() << "Unexpected latency callback"; - break; } + + if (tos0.getTimeouts() < 6) { + // This could only happen if the host this test is running + // on is heavily loaded. + int64_t maxLatencyReached = duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + ASSERT_LE(43800, maxLatencyReached - testStart); + hostOverloaded = true; + return; + } + ASSERT_EQ(6, tos0.getTimeouts()); + ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200); + ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200); + tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts)); }); // Kick things off with an "immedite" timeout @@ -1480,10 +1604,15 @@ TEST(EventBaseTest, IdleTime) { eventBase.loop(); + if (hostOverloaded) { + return; + } + ASSERT_EQ(1, latencyCallbacks); ASSERT_EQ(7, tos0.getTimeouts()); ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200); ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200); + ASSERT_TRUE(!!tos); ASSERT_EQ(21, tos->getTimeouts()); } @@ -1549,7 +1678,7 @@ TEST(EventBaseTest, RunBeforeLoop) { TEST(EventBaseTest, RunBeforeLoopWait) { EventBase base; CountedLoopCallback cb(&base, 1); - base.runAfterDelay([&](){ + base.tryRunAfterDelay([&](){ base.terminateLoopSoon(); }, 500); base.runBeforeLoop(&cb); @@ -1564,9 +1693,7 @@ public: PipeHandler(EventBase* eventBase, int fd) : EventHandler(eventBase, fd) {} - void handlerReady(uint16_t events) noexcept { - abort(); - } + void handlerReady(uint16_t /* events */) noexcept override { abort(); } }; TEST(EventBaseTest, StopBeforeLoop) { @@ -1604,3 +1731,184 @@ TEST(EventBaseTest, RunCallbacksOnDestruction) { ASSERT_TRUE(ran); } + +TEST(EventBaseTest, LoopKeepAlive) { + EventBase evb; + + bool done = false; + std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable { + /* sleep override */ std::this_thread::sleep_for( + std::chrono::milliseconds(100)); + evb.runInEventBaseThread( + [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; }); + }); + + evb.loop(); + + ASSERT_TRUE(done); + + t.join(); +} + +TEST(EventBaseTest, LoopKeepAliveInLoop) { + EventBase evb; + + bool done = false; + std::thread t; + + evb.runInEventBaseThread([&] { + t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable { + /* sleep override */ std::this_thread::sleep_for( + std::chrono::milliseconds(100)); + evb.runInEventBaseThread( + [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; }); + }); + }); + + evb.loop(); + + ASSERT_TRUE(done); + + t.join(); +} + +TEST(EventBaseTest, LoopKeepAliveWithLoopForever) { + std::unique_ptr evb = folly::make_unique(); + + bool done = false; + + std::thread evThread([&] { + evb->loopForever(); + evb.reset(); + done = true; + }); + + { + auto* ev = evb.get(); + EventBase::LoopKeepAlive keepAlive; + ev->runInEventBaseThreadAndWait( + [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); }); + ASSERT_FALSE(done) << "Loop finished before we asked it to"; + ev->terminateLoopSoon(); + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + ASSERT_FALSE(done) << "Loop terminated early"; + ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{}); + } + + evThread.join(); + ASSERT_TRUE(done); +} + +TEST(EventBaseTest, LoopKeepAliveShutdown) { + auto evb = folly::make_unique(); + + bool done = false; + + std::thread t([ + &done, + loopKeepAlive = evb->loopKeepAlive(), + evbPtr = evb.get() + ]() mutable { + /* sleep override */ std::this_thread::sleep_for( + std::chrono::milliseconds(100)); + evbPtr->runInEventBaseThread( + [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; }); + }); + + evb.reset(); + + ASSERT_TRUE(done); + + t.join(); +} + +TEST(EventBaseTest, LoopKeepAliveAtomic) { + auto evb = folly::make_unique(); + + static constexpr size_t kNumThreads = 100; + static constexpr size_t kNumTasks = 100; + + std::vector ts; + std::vector>> batons; + size_t done{0}; + + for (size_t i = 0; i < kNumThreads; ++i) { + batons.emplace_back(std::make_unique>()); + } + + for (size_t i = 0; i < kNumThreads; ++i) { + ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] { + std::vector keepAlives; + for (size_t j = 0; j < kNumTasks; ++j) { + keepAlives.emplace_back(evbPtr->loopKeepAliveAtomic()); + } + + batonPtr->post(); + + /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1)); + + for (auto& keepAlive : keepAlives) { + evbPtr->runInEventBaseThread( + [&done, keepAlive = std::move(keepAlive) ]() { ++done; }); + } + }); + } + + for (auto& baton : batons) { + baton->wait(); + } + + evb.reset(); + + EXPECT_EQ(kNumThreads * kNumTasks, done); + + for (auto& t : ts) { + t.join(); + } +} + +TEST(EventBaseTest, DrivableExecutorTest) { + folly::Promise p; + auto f = p.getFuture(); + EventBase base; + bool finished = false; + + std::thread t([&] { + /* sleep override */ + std::this_thread::sleep_for(std::chrono::microseconds(10)); + finished = true; + base.runInEventBaseThread([&]() { p.setValue(true); }); + }); + + // Ensure drive does not busy wait + base.drive(); // TODO: fix notification queue init() extra wakeup + base.drive(); + EXPECT_TRUE(finished); + + folly::Promise p2; + auto f2 = p2.getFuture(); + // Ensure waitVia gets woken up properly, even from + // a separate thread. + base.runAfterDelay([&]() { p2.setValue(true); }, 10); + f2.waitVia(&base); + EXPECT_TRUE(f2.isReady()); + + t.join(); +} + +TEST(EventBaseTest, RequestContextTest) { + EventBase evb; + auto defaultCtx = RequestContext::get(); + + { + RequestContextScopeGuard rctx; + auto context = RequestContext::get(); + EXPECT_NE(defaultCtx, context); + evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); }); + } + + EXPECT_EQ(defaultCtx, RequestContext::get()); + evb.loop(); + EXPECT_EQ(defaultCtx, RequestContext::get()); +}