X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2Fasync%2Ftest%2FEventBaseTest.cpp;h=454ece3c55dd7d697a79c8d9d8c48a829bec854a;hb=3cdd3857fbfacd9312a23214f54a47f156726927;hp=ddf70a06652f1a206146346f0b85160b183b26b3;hpb=5e80856a1c8a52fc32a9d2eb776aaace0ca61182;p=folly.git diff --git a/folly/io/async/test/EventBaseTest.cpp b/folly/io/async/test/EventBaseTest.cpp index ddf70a06..454ece3c 100644 --- a/folly/io/async/test/EventBaseTest.cpp +++ b/folly/io/async/test/EventBaseTest.cpp @@ -16,22 +16,38 @@ * specific language governing permissions and limitations * under the License. */ + +#include +#include + #include #include #include #include #include +#include + +#include +#include #include -#include #include +#include +using std::atomic; using std::deque; using std::pair; using std::vector; +using std::unique_ptr; +using std::thread; using std::make_pair; using std::cerr; using std::endl; +using std::chrono::milliseconds; +using std::chrono::microseconds; +using std::chrono::duration_cast; + +using namespace std::chrono_literals; using namespace folly; @@ -43,9 +59,10 @@ enum { BUF_SIZE = 4096 }; ssize_t writeToFD(int fd, size_t length) { // write an arbitrary amount of data to the fd - char buf[length]; - memset(buf, 'a', sizeof(buf)); - ssize_t rc = write(fd, buf, sizeof(buf)); + auto bufv = vector(length); + auto buf = bufv.data(); + memset(buf, 'a', length); + ssize_t rc = write(fd, buf, length); CHECK_EQ(rc, length); return rc; } @@ -69,8 +86,8 @@ size_t writeUntilFull(int fd) { ssize_t readFromFD(int fd, size_t length) { // write an arbitrary amount of data to the fd - char buf[length]; - return read(fd, buf, sizeof(buf)); + auto buf = vector(length); + return read(fd, buf.data(), length); } size_t readUntilEmpty(int fd) { @@ -121,7 +138,7 @@ struct ScheduledEvent { void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) { for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) { - eventBase->runAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd), + eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd), ev->milliseconds); } } @@ -131,7 +148,7 @@ class TestHandler : public EventHandler { TestHandler(EventBase* eventBase, int fd) : EventHandler(eventBase, fd), fd_(fd) {} - virtual void handlerReady(uint16_t events) noexcept { + void handlerReady(uint16_t events) noexcept override { ssize_t bytesRead = 0; ssize_t bytesWritten = 0; if (events & READ) { @@ -145,7 +162,7 @@ class TestHandler : public EventHandler { bytesWritten = writeUntilFull(fd_); } - log.push_back(EventRecord(events, bytesRead, bytesWritten)); + log.emplace_back(events, bytesRead, bytesWritten); } struct EventRecord { @@ -180,9 +197,9 @@ TEST(EventBaseTest, ReadEvent) { // Register timeouts to perform two write events ScheduledEvent events[] = { - { 10, EventHandler::WRITE, 2345 }, - { 160, EventHandler::WRITE, 99 }, - { 0, 0, 0 }, + { 10, EventHandler::WRITE, 2345, 0 }, + { 160, EventHandler::WRITE, 99, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); @@ -196,10 +213,12 @@ TEST(EventBaseTest, ReadEvent) { // the first chunk of data was received. ASSERT_EQ(handler.log.size(), 1); ASSERT_EQ(handler.log[0].events, EventHandler::READ); - T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds, 90); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, + milliseconds(events[0].milliseconds), milliseconds(90)); ASSERT_EQ(handler.log[0].bytesRead, events[0].length); ASSERT_EQ(handler.log[0].bytesWritten, 0); - T_CHECK_TIMEOUT(start, end, events[1].milliseconds, 30); + T_CHECK_TIMEOUT(start, end, + milliseconds(events[1].milliseconds), milliseconds(30)); // Make sure the second chunk of data is still waiting to be read. size_t bytesRemaining = readUntilEmpty(sp[0]); @@ -219,16 +238,16 @@ TEST(EventBaseTest, ReadPersist) { // Register several timeouts to perform writes ScheduledEvent events[] = { - { 10, EventHandler::WRITE, 1024 }, - { 20, EventHandler::WRITE, 2211 }, - { 30, EventHandler::WRITE, 4096 }, - { 100, EventHandler::WRITE, 100 }, - { 0, 0 }, + { 10, EventHandler::WRITE, 1024, 0 }, + { 20, EventHandler::WRITE, 2211, 0 }, + { 30, EventHandler::WRITE, 4096, 0 }, + { 100, EventHandler::WRITE, 100, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); // Schedule a timeout to unregister the handler after the third write - eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85); + eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85); // Loop TimePoint start; @@ -240,11 +259,12 @@ TEST(EventBaseTest, ReadPersist) { ASSERT_EQ(handler.log.size(), 3); for (int n = 0; n < 3; ++n) { ASSERT_EQ(handler.log[n].events, EventHandler::READ); - T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds); + T_CHECK_TIMEOUT(start, handler.log[n].timestamp, + milliseconds(events[n].milliseconds)); ASSERT_EQ(handler.log[n].bytesRead, events[n].length); ASSERT_EQ(handler.log[n].bytesWritten, 0); } - T_CHECK_TIMEOUT(start, end, events[3].milliseconds); + T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds)); // Make sure the data from the last write is still waiting to be read size_t bytesRemaining = readUntilEmpty(sp[0]); @@ -269,13 +289,13 @@ TEST(EventBaseTest, ReadImmediate) { // Register a timeout to perform another write ScheduledEvent events[] = { - { 10, EventHandler::WRITE, 2345 }, - { 0, 0, 0 }, + { 10, EventHandler::WRITE, 2345, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); // Schedule a timeout to unregister the handler - eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20); + eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20); // Loop TimePoint start; @@ -286,17 +306,18 @@ TEST(EventBaseTest, ReadImmediate) { // There should have been 1 event for immediate readability ASSERT_EQ(handler.log[0].events, EventHandler::READ); - T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0)); ASSERT_EQ(handler.log[0].bytesRead, dataLength); ASSERT_EQ(handler.log[0].bytesWritten, 0); // There should be another event after the timeout wrote more data ASSERT_EQ(handler.log[1].events, EventHandler::READ); - T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds); + T_CHECK_TIMEOUT(start, handler.log[1].timestamp, + milliseconds(events[0].milliseconds)); ASSERT_EQ(handler.log[1].bytesRead, events[0].length); ASSERT_EQ(handler.log[1].bytesWritten, 0); - T_CHECK_TIMEOUT(start, end, 20); + T_CHECK_TIMEOUT(start, end, milliseconds(20)); } /** @@ -315,9 +336,9 @@ TEST(EventBaseTest, WriteEvent) { // Register timeouts to perform two reads ScheduledEvent events[] = { - { 10, EventHandler::READ, 0 }, - { 60, EventHandler::READ, 0 }, - { 0, 0, 0 }, + { 10, EventHandler::READ, 0, 0 }, + { 60, EventHandler::READ, 0, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); @@ -330,10 +351,11 @@ TEST(EventBaseTest, WriteEvent) { // have only been able to write once, then unregistered itself. ASSERT_EQ(handler.log.size(), 1); ASSERT_EQ(handler.log[0].events, EventHandler::WRITE); - T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, + milliseconds(events[0].milliseconds)); ASSERT_EQ(handler.log[0].bytesRead, 0); ASSERT_GT(handler.log[0].bytesWritten, 0); - T_CHECK_TIMEOUT(start, end, events[1].milliseconds); + T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds)); ASSERT_EQ(events[0].result, initialBytesWritten); ASSERT_EQ(events[1].result, handler.log[0].bytesWritten); @@ -355,16 +377,16 @@ TEST(EventBaseTest, WritePersist) { // Register several timeouts to read from the socket at several intervals ScheduledEvent events[] = { - { 10, EventHandler::READ, 0 }, - { 40, EventHandler::READ, 0 }, - { 70, EventHandler::READ, 0 }, - { 100, EventHandler::READ, 0 }, - { 0, 0 }, + { 10, EventHandler::READ, 0, 0 }, + { 40, EventHandler::READ, 0, 0 }, + { 70, EventHandler::READ, 0, 0 }, + { 100, EventHandler::READ, 0, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); // Schedule a timeout to unregister the handler after the third read - eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85); + eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85); // Loop TimePoint start; @@ -377,12 +399,13 @@ TEST(EventBaseTest, WritePersist) { ASSERT_EQ(events[0].result, initialBytesWritten); for (int n = 0; n < 3; ++n) { ASSERT_EQ(handler.log[n].events, EventHandler::WRITE); - T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds); + T_CHECK_TIMEOUT(start, handler.log[n].timestamp, + milliseconds(events[n].milliseconds)); ASSERT_EQ(handler.log[n].bytesRead, 0); ASSERT_GT(handler.log[n].bytesWritten, 0); ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result); } - T_CHECK_TIMEOUT(start, end, events[3].milliseconds); + T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds)); } /** @@ -398,14 +421,14 @@ TEST(EventBaseTest, WriteImmediate) { // Register a timeout to perform a read ScheduledEvent events[] = { - { 10, EventHandler::READ, 0 }, - { 0, 0, 0 }, + { 10, EventHandler::READ, 0, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); // Schedule a timeout to unregister the handler int64_t unregisterTimeout = 40; - eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), + eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), unregisterTimeout); // Loop @@ -418,17 +441,18 @@ TEST(EventBaseTest, WriteImmediate) { // Since the socket buffer was initially empty, // there should have been 1 event for immediate writability ASSERT_EQ(handler.log[0].events, EventHandler::WRITE); - T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0)); ASSERT_EQ(handler.log[0].bytesRead, 0); ASSERT_GT(handler.log[0].bytesWritten, 0); // There should be another event after the timeout wrote more data ASSERT_EQ(handler.log[1].events, EventHandler::WRITE); - T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds); + T_CHECK_TIMEOUT(start, handler.log[1].timestamp, + milliseconds(events[0].milliseconds)); ASSERT_EQ(handler.log[1].bytesRead, 0); ASSERT_GT(handler.log[1].bytesWritten, 0); - T_CHECK_TIMEOUT(start, end, unregisterTimeout); + T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout)); } /** @@ -447,9 +471,9 @@ TEST(EventBaseTest, ReadWrite) { // Register timeouts to perform a write then a read. ScheduledEvent events[] = { - { 10, EventHandler::WRITE, 2345 }, - { 40, EventHandler::READ, 0 }, - { 0, 0, 0 }, + { 10, EventHandler::WRITE, 2345, 0 }, + { 40, EventHandler::READ, 0, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); @@ -463,11 +487,12 @@ TEST(EventBaseTest, ReadWrite) { // one event was logged. ASSERT_EQ(handler.log.size(), 1); ASSERT_EQ(handler.log[0].events, EventHandler::READ); - T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, + milliseconds(events[0].milliseconds)); ASSERT_EQ(handler.log[0].bytesRead, events[0].length); ASSERT_EQ(handler.log[0].bytesWritten, 0); ASSERT_EQ(events[1].result, sock0WriteLength); - T_CHECK_TIMEOUT(start, end, events[1].milliseconds); + T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds)); } /** @@ -487,9 +512,9 @@ TEST(EventBaseTest, WriteRead) { // Register timeouts to perform a read then a write. size_t sock1WriteLength = 2345; ScheduledEvent events[] = { - { 10, EventHandler::READ, 0 }, - { 40, EventHandler::WRITE, sock1WriteLength }, - { 0, 0, 0 }, + { 10, EventHandler::READ, 0, 0 }, + { 40, EventHandler::WRITE, sock1WriteLength, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); @@ -503,12 +528,13 @@ TEST(EventBaseTest, WriteRead) { // one event was logged. ASSERT_EQ(handler.log.size(), 1); ASSERT_EQ(handler.log[0].events, EventHandler::WRITE); - T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, + milliseconds(events[0].milliseconds)); ASSERT_EQ(handler.log[0].bytesRead, 0); ASSERT_GT(handler.log[0].bytesWritten, 0); ASSERT_EQ(events[0].result, sock0WriteLength); ASSERT_EQ(events[1].result, sock1WriteLength); - T_CHECK_TIMEOUT(start, end, events[1].milliseconds); + T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds)); // Make sure the written data is still waiting to be read. size_t bytesRemaining = readUntilEmpty(sp[0]); @@ -532,8 +558,8 @@ TEST(EventBaseTest, ReadWriteSimultaneous) { // Register a timeout to perform a read and write together ScheduledEvent events[] = { - { 10, EventHandler::READ | EventHandler::WRITE, 0 }, - { 0, 0, 0 }, + { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); @@ -550,10 +576,11 @@ TEST(EventBaseTest, ReadWriteSimultaneous) { ASSERT_EQ(handler.log.size(), 1); ASSERT_EQ(handler.log[0].events, EventHandler::READ | EventHandler::WRITE); - T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, + milliseconds(events[0].milliseconds)); ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength); ASSERT_GT(handler.log[0].bytesWritten, 0); - T_CHECK_TIMEOUT(start, end, events[0].milliseconds); + T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds)); } /** @@ -570,18 +597,18 @@ TEST(EventBaseTest, ReadWritePersist) { // Register timeouts to perform several reads and writes ScheduledEvent events[] = { - { 10, EventHandler::WRITE, 2345 }, - { 20, EventHandler::READ, 0 }, - { 35, EventHandler::WRITE, 200 }, - { 45, EventHandler::WRITE, 15 }, - { 55, EventHandler::READ, 0 }, - { 120, EventHandler::WRITE, 2345 }, - { 0, 0, 0 }, + { 10, EventHandler::WRITE, 2345, 0 }, + { 20, EventHandler::READ, 0, 0 }, + { 35, EventHandler::WRITE, 200, 0 }, + { 45, EventHandler::WRITE, 15, 0 }, + { 55, EventHandler::READ, 0, 0 }, + { 120, EventHandler::WRITE, 2345, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); // Schedule a timeout to unregister the handler - eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80); + eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80); // Loop TimePoint start; @@ -593,14 +620,15 @@ TEST(EventBaseTest, ReadWritePersist) { // Since we didn't fill up the write buffer immediately, there should // be an immediate event for writability. ASSERT_EQ(handler.log[0].events, EventHandler::WRITE); - T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0); + T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0)); ASSERT_EQ(handler.log[0].bytesRead, 0); ASSERT_GT(handler.log[0].bytesWritten, 0); // Events 1 through 5 should correspond to the scheduled events for (int n = 1; n < 6; ++n) { ScheduledEvent* event = &events[n - 1]; - T_CHECK_TIMEOUT(start, handler.log[n].timestamp, event->milliseconds); + T_CHECK_TIMEOUT(start, handler.log[n].timestamp, + milliseconds(event->milliseconds)); if (event->events == EventHandler::READ) { ASSERT_EQ(handler.log[n].events, EventHandler::WRITE); ASSERT_EQ(handler.log[n].bytesRead, 0); @@ -624,10 +652,10 @@ class PartialReadHandler : public TestHandler { PartialReadHandler(EventBase* eventBase, int fd, size_t readLength) : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {} - virtual void handlerReady(uint16_t events) noexcept { + void handlerReady(uint16_t events) noexcept override { assert(events == EventHandler::READ); ssize_t bytesRead = readFromFD(fd_, readLength_); - log.push_back(EventRecord(events, bytesRead, 0)); + log.emplace_back(events, bytesRead, 0); } private: @@ -652,13 +680,13 @@ TEST(EventBaseTest, ReadPartial) { // Register a timeout to perform a single write, // with more data than PartialReadHandler will read at once ScheduledEvent events[] = { - { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) }, - { 0, 0, 0 }, + { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); // Schedule a timeout to unregister the handler - eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30); + eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30); // Loop TimePoint start; @@ -670,13 +698,15 @@ TEST(EventBaseTest, ReadPartial) { // The first 3 invocations should read readLength bytes each for (int n = 0; n < 3; ++n) { ASSERT_EQ(handler.log[n].events, EventHandler::READ); - T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds); + T_CHECK_TIMEOUT(start, handler.log[n].timestamp, + milliseconds(events[0].milliseconds)); ASSERT_EQ(handler.log[n].bytesRead, readLength); ASSERT_EQ(handler.log[n].bytesWritten, 0); } // The last read only has readLength/2 bytes ASSERT_EQ(handler.log[3].events, EventHandler::READ); - T_CHECK_TIMEOUT(start, handler.log[3].timestamp, events[0].milliseconds); + T_CHECK_TIMEOUT(start, handler.log[3].timestamp, + milliseconds(events[0].milliseconds)); ASSERT_EQ(handler.log[3].bytesRead, readLength / 2); ASSERT_EQ(handler.log[3].bytesWritten, 0); } @@ -687,10 +717,10 @@ class PartialWriteHandler : public TestHandler { PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength) : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {} - virtual void handlerReady(uint16_t events) noexcept { + void handlerReady(uint16_t events) noexcept override { assert(events == EventHandler::WRITE); ssize_t bytesWritten = writeToFD(fd_, writeLength_); - log.push_back(EventRecord(events, 0, bytesWritten)); + log.emplace_back(events, 0, bytesWritten); } private: @@ -717,13 +747,13 @@ TEST(EventBaseTest, WritePartial) { // Register a timeout to read, so that more data can be written ScheduledEvent events[] = { - { 10, EventHandler::READ, 0 }, - { 0, 0, 0 }, + { 10, EventHandler::READ, 0, 0 }, + { 0, 0, 0, 0 }, }; scheduleEvents(&eb, sp[1], events); // Schedule a timeout to unregister the handler - eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30); + eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30); // Loop TimePoint start; @@ -739,7 +769,8 @@ TEST(EventBaseTest, WritePartial) { // The first 3 invocations should read writeLength bytes each for (int n = 0; n < numChecked; ++n) { ASSERT_EQ(handler.log[n].events, EventHandler::WRITE); - T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds); + T_CHECK_TIMEOUT(start, handler.log[n].timestamp, + milliseconds(events[0].milliseconds)); ASSERT_EQ(handler.log[n].bytesRead, 0); ASSERT_EQ(handler.log[n].bytesWritten, writeLength); } @@ -750,15 +781,13 @@ TEST(EventBaseTest, WritePartial) { * Test destroying a registered EventHandler */ TEST(EventBaseTest, DestroyHandler) { - class DestroyHandler : public TAsyncTimeout { + class DestroyHandler : public AsyncTimeout { public: DestroyHandler(EventBase* eb, EventHandler* h) - : TAsyncTimeout(eb) + : AsyncTimeout(eb) , handler_(h) {} - virtual void timeoutExpired() noexcept { - delete handler_; - } + void timeoutExpired() noexcept override { delete handler_; } private: EventHandler* handler_; @@ -776,7 +805,7 @@ TEST(EventBaseTest, DestroyHandler) { // After 10ms, read some data, so that the handler // will be notified that it can write. - eb.runAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten), + eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten), 10); // Start a timer to destroy the handler after 25ms @@ -790,7 +819,7 @@ TEST(EventBaseTest, DestroyHandler) { // Make sure the EventHandler was uninstalled properly when it was // destroyed, and the EventBase loop exited - T_CHECK_TIMEOUT(start, end, 25); + T_CHECK_TIMEOUT(start, end, milliseconds(25)); // Make sure that the handler wrote data to the socket // before it was destroyed @@ -809,22 +838,22 @@ TEST(EventBaseTest, RunAfterDelay) { TimePoint timestamp1(false); TimePoint timestamp2(false); TimePoint timestamp3(false); - eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10); - eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20); - eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40); + eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10); + eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20); + eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40); TimePoint start; eb.loop(); TimePoint end; - T_CHECK_TIMEOUT(start, timestamp1, 10); - T_CHECK_TIMEOUT(start, timestamp2, 20); - T_CHECK_TIMEOUT(start, timestamp3, 40); - T_CHECK_TIMEOUT(start, end, 40); + T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10)); + T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20)); + T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40)); + T_CHECK_TIMEOUT(start, end, milliseconds(40)); } /** - * Test the behavior of runAfterDelay() when some timeouts are + * Test the behavior of tryRunAfterDelay() when some timeouts are * still scheduled when the EventBase is destroyed. */ TEST(EventBaseTest, RunAfterDelayDestruction) { @@ -839,24 +868,24 @@ TEST(EventBaseTest, RunAfterDelayDestruction) { EventBase eb; // Run two normal timeouts - eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10); - eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20); + eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10); + eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20); // Schedule a timeout to stop the event loop after 40ms - eb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40); + eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40); // Schedule 2 timeouts that would fire after the event loop stops - eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80); - eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160); + eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80); + eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160); start.reset(); eb.loop(); end.reset(); } - T_CHECK_TIMEOUT(start, timestamp1, 10); - T_CHECK_TIMEOUT(start, timestamp2, 20); - T_CHECK_TIMEOUT(start, end, 40); + T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10)); + T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20)); + T_CHECK_TIMEOUT(start, end, milliseconds(40)); ASSERT_TRUE(timestamp3.isUnset()); ASSERT_TRUE(timestamp4.isUnset()); @@ -865,15 +894,13 @@ TEST(EventBaseTest, RunAfterDelayDestruction) { // memory is leaked. } -class TestTimeout : public TAsyncTimeout { +class TestTimeout : public AsyncTimeout { public: explicit TestTimeout(EventBase* eventBase) - : TAsyncTimeout(eventBase) + : AsyncTimeout(eventBase) , timestamp(false) {} - virtual void timeoutExpired() noexcept { - timestamp.reset(); - } + void timeoutExpired() noexcept override { timestamp.reset(); } TimePoint timestamp; }; @@ -892,16 +919,16 @@ TEST(EventBaseTest, BasicTimeouts) { eb.loop(); TimePoint end; - T_CHECK_TIMEOUT(start, t1.timestamp, 10); - T_CHECK_TIMEOUT(start, t2.timestamp, 20); - T_CHECK_TIMEOUT(start, t3.timestamp, 40); - T_CHECK_TIMEOUT(start, end, 40); + T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10)); + T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20)); + T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40)); + T_CHECK_TIMEOUT(start, end, milliseconds(40)); } -class ReschedulingTimeout : public TAsyncTimeout { +class ReschedulingTimeout : public AsyncTimeout { public: ReschedulingTimeout(EventBase* evb, const vector& timeouts) - : TAsyncTimeout(evb) + : AsyncTimeout(evb) , timeouts_(timeouts) , iterator_(timeouts_.begin()) {} @@ -909,8 +936,8 @@ class ReschedulingTimeout : public TAsyncTimeout { reschedule(); } - virtual void timeoutExpired() noexcept { - timestamps.push_back(TimePoint()); + void timeoutExpired() noexcept override { + timestamps.emplace_back(); reschedule(); } @@ -950,15 +977,15 @@ TEST(EventBaseTest, ReuseTimeout) { // Use a higher tolerance than usual. We're waiting on 3 timeouts // consecutively. In general, each timeout may go over by a few // milliseconds, and we're tripling this error by witing on 3 timeouts. - int64_t tolerance = 6; + milliseconds tolerance{6}; ASSERT_EQ(timeouts.size(), t.timestamps.size()); uint32_t total = 0; - for (int n = 0; n < timeouts.size(); ++n) { + for (size_t n = 0; n < timeouts.size(); ++n) { total += timeouts[n]; - T_CHECK_TIMEOUT(start, t.timestamps[n], total, tolerance); + T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance); } - T_CHECK_TIMEOUT(start, end, total, tolerance); + T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance); } /** @@ -975,22 +1002,22 @@ TEST(EventBaseTest, RescheduleTimeout) { t2.scheduleTimeout(30); t3.scheduleTimeout(30); - auto f = static_cast( - &TAsyncTimeout::scheduleTimeout); + auto f = static_cast( + &AsyncTimeout::scheduleTimeout); // after 10ms, reschedule t2 to run sooner than originally scheduled - eb.runAfterDelay(std::bind(f, &t2, 10), 10); + eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10); // after 10ms, reschedule t3 to run later than originally scheduled - eb.runAfterDelay(std::bind(f, &t3, 40), 10); + eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10); TimePoint start; eb.loop(); TimePoint end; - T_CHECK_TIMEOUT(start, t1.timestamp, 15); - T_CHECK_TIMEOUT(start, t2.timestamp, 20); - T_CHECK_TIMEOUT(start, t3.timestamp, 50); - T_CHECK_TIMEOUT(start, end, 50); + T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15)); + T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20)); + T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50)); + T_CHECK_TIMEOUT(start, end, milliseconds(50)); } /** @@ -1006,34 +1033,32 @@ TEST(EventBaseTest, CancelTimeout) { ReschedulingTimeout t(&eb, timeouts); t.start(); - eb.runAfterDelay(std::bind(&TAsyncTimeout::cancelTimeout, &t), 50); + eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50); TimePoint start; eb.loop(); TimePoint end; ASSERT_EQ(t.timestamps.size(), 2); - T_CHECK_TIMEOUT(start, t.timestamps[0], 10); - T_CHECK_TIMEOUT(start, t.timestamps[1], 40); - T_CHECK_TIMEOUT(start, end, 50); + T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10)); + T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40)); + T_CHECK_TIMEOUT(start, end, milliseconds(50)); } /** * Test destroying a scheduled timeout object */ TEST(EventBaseTest, DestroyTimeout) { - class DestroyTimeout : public TAsyncTimeout { + class DestroyTimeout : public AsyncTimeout { public: - DestroyTimeout(EventBase* eb, TAsyncTimeout* t) - : TAsyncTimeout(eb) + DestroyTimeout(EventBase* eb, AsyncTimeout* t) + : AsyncTimeout(eb) , timeout_(t) {} - virtual void timeoutExpired() noexcept { - delete timeout_; - } + void timeoutExpired() noexcept override { delete timeout_; } private: - TAsyncTimeout* timeout_; + AsyncTimeout* timeout_; }; EventBase eb; @@ -1048,7 +1073,7 @@ TEST(EventBaseTest, DestroyTimeout) { eb.loop(); TimePoint end; - T_CHECK_TIMEOUT(start, end, 10); + T_CHECK_TIMEOUT(start, end, milliseconds(10)); } @@ -1082,7 +1107,7 @@ struct RunInThreadArg { }; void runInThreadTestFunc(RunInThreadArg* arg) { - arg->data->values.push_back(make_pair(arg->thread, arg->value)); + arg->data->values.emplace_back(arg->thread, arg->value); RunInThreadData* data = arg->data; delete arg; @@ -1092,39 +1117,27 @@ void runInThreadTestFunc(RunInThreadArg* arg) { } } -class RunInThreadTester : public concurrency::Runnable { - public: - RunInThreadTester(int id, RunInThreadData* data) : id_(id), data_(data) {} - - void run() { - // Call evb->runInThread() a number of times - { - for (int n = 0; n < data_->opsPerThread; ++n) { - RunInThreadArg* arg = new RunInThreadArg(data_, id_, n); - data_->evb.runInEventBaseThread(runInThreadTestFunc, arg); - usleep(10); - } - } - } - - private: - int id_; - RunInThreadData* data_; -}; - TEST(EventBaseTest, RunInThread) { - uint32_t numThreads = 50; - uint32_t opsPerThread = 100; + constexpr uint32_t numThreads = 50; + constexpr uint32_t opsPerThread = 100; RunInThreadData data(numThreads, opsPerThread); - PosixThreadFactory threadFactory; - threadFactory.setDetached(false); - deque< std::shared_ptr > threads; - for (int n = 0; n < numThreads; ++n) { - std::shared_ptr runner(new RunInThreadTester(n, &data)); - std::shared_ptr thread = threadFactory.newThread(runner); - threads.push_back(thread); - thread->start(); + deque threads; + SCOPE_EXIT { + // Wait on all of the threads. + for (auto& thread : threads) { + thread.join(); + } + }; + + for (uint32_t i = 0; i < numThreads; ++i) { + threads.emplace_back([i, &data] { + for (int n = 0; n < data.opsPerThread; ++n) { + RunInThreadArg* arg = new RunInThreadArg(&data, i, n); + data.evb.runInEventBaseThread(runInThreadTestFunc, arg); + usleep(10); + } + }); } // Add a timeout event to run after 3 seconds. @@ -1132,7 +1145,7 @@ TEST(EventBaseTest, RunInThread) { // Once the last thread exits, it will stop the loop(). However, this // timeout also stops the loop in case there is a bug performing the normal // stop. - data.evb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb), + data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb), 3000); TimePoint start; @@ -1143,13 +1156,15 @@ TEST(EventBaseTest, RunInThread) { // to stop. This should happen much sooner than the 3 second timeout. // Assert that it happens in under a second. (This is still tons of extra // padding.) - int64_t timeTaken = end.getTime() - start.getTime(); - ASSERT_LT(timeTaken, 1000); - VLOG(11) << "Time taken: " << timeTaken; + + auto timeTaken = std::chrono::duration_cast( + end.getTime() - start.getTime()); + ASSERT_LT(timeTaken.count(), 1000); + VLOG(11) << "Time taken: " << timeTaken.count(); // Verify that we have all of the events from every thread int expectedValues[numThreads]; - for (int n = 0; n < numThreads; ++n) { + for (uint32_t n = 0; n < numThreads; ++n) { expectedValues[n] = 0; } for (deque< pair >::const_iterator it = data.values.begin(); @@ -1160,18 +1175,86 @@ TEST(EventBaseTest, RunInThread) { ASSERT_EQ(expectedValues[threadID], value); ++expectedValues[threadID]; } - for (int n = 0; n < numThreads; ++n) { + for (uint32_t n = 0; n < numThreads; ++n) { ASSERT_EQ(expectedValues[n], opsPerThread); } +} - // Wait on all of the threads. Otherwise we can exit and clean up - // RunInThreadData before the last thread exits, while it is still holding - // the RunInThreadData's mutex. - for (deque< std::shared_ptr >::const_iterator it = threads.begin(); - it != threads.end(); - ++it) { - (*it)->join(); +// This test simulates some calls, and verifies that the waiting happens by +// triggering what otherwise would be race conditions, and trying to detect +// whether any of the race conditions happened. +TEST(EventBaseTest, RunInEventBaseThreadAndWait) { + const size_t c = 256; + vector>> atoms(c); + for (size_t i = 0; i < c; ++i) { + auto& atom = atoms.at(i); + atom = make_unique>(0); + } + vector threads; + for (size_t i = 0; i < c; ++i) { + threads.emplace_back([&atoms, i] { + EventBase eb; + auto& atom = *atoms.at(i); + auto ebth = thread([&] { eb.loopForever(); }); + eb.waitUntilRunning(); + eb.runInEventBaseThreadAndWait([&] { + size_t x = 0; + atom.compare_exchange_weak( + x, 1, std::memory_order_release, std::memory_order_relaxed); + }); + size_t x = 0; + atom.compare_exchange_weak( + x, 2, std::memory_order_release, std::memory_order_relaxed); + eb.terminateLoopSoon(); + ebth.join(); + }); } + for (size_t i = 0; i < c; ++i) { + auto& th = threads.at(i); + th.join(); + } + size_t sum = 0; + for (auto& atom : atoms) sum += *atom; + EXPECT_EQ(c, sum); +} + +TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) { + EventBase eb; + thread th(&EventBase::loopForever, &eb); + SCOPE_EXIT { + eb.terminateLoopSoon(); + th.join(); + }; + auto mutated = false; + eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { + mutated = true; + }); + EXPECT_TRUE(mutated); +} + +TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) { + EventBase eb; + thread th(&EventBase::loopForever, &eb); + SCOPE_EXIT { + eb.terminateLoopSoon(); + th.join(); + }; + eb.runInEventBaseThreadAndWait([&] { + auto mutated = false; + eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { + mutated = true; + }); + EXPECT_TRUE(mutated); + }); +} + +TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) { + EventBase eb; + auto mutated = false; + eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { + mutated = true; + }); + EXPECT_TRUE(mutated); } /////////////////////////////////////////////////////////////////////////// @@ -1188,7 +1271,7 @@ class CountedLoopCallback : public EventBase::LoopCallback { , count_(count) , action_(action) {} - virtual void runLoopCallback() noexcept { + void runLoopCallback() noexcept override { --count_; if (count_ > 0) { eventBase_->runInLoop(this); @@ -1223,6 +1306,21 @@ TEST(EventBaseTest, RepeatedRunInLoop) { ASSERT_EQ(c.getCount(), 0); } +// Test that EventBase::loop() works as expected without time measurements. +TEST(EventBaseTest, RunInLoopNoTimeMeasurement) { + EventBase eventBase(false); + + CountedLoopCallback c(&eventBase, 10); + eventBase.runInLoop(&c); + // The callback shouldn't have run immediately + ASSERT_EQ(c.getCount(), 10); + eventBase.loop(); + + // loop() should loop until the CountedLoopCallback stops + // re-installing itself. + ASSERT_EQ(c.getCount(), 0); +} + // Test runInLoop() calls with terminateLoopSoon() TEST(EventBaseTest, RunInLoopStopLoop) { EventBase eventBase; @@ -1252,6 +1350,20 @@ TEST(EventBaseTest, RunInLoopStopLoop) { ASSERT_LE(c1.getCount(), 11); } +TEST(EventBaseTest, TryRunningAfterTerminate) { + EventBase eventBase; + CountedLoopCallback c1(&eventBase, 1, + std::bind(&EventBase::terminateLoopSoon, &eventBase)); + eventBase.runInLoop(&c1); + eventBase.loopForever(); + bool ran = false; + eventBase.runInEventBaseThread([&]() { + ran = true; + }); + + ASSERT_FALSE(ran); +} + // Test cancelling runInLoop() callbacks TEST(EventBaseTest, CancelRunInLoop) { EventBase eventBase; @@ -1327,7 +1439,7 @@ class TerminateTestCallback : public EventBase::LoopCallback, unregisterHandler(); } - virtual void handlerReady(uint16_t events) noexcept { + void handlerReady(uint16_t /* events */) noexcept override { // We didn't register with PERSIST, so we will have been automatically // unregistered already. ASSERT_FALSE(isHandlerRegistered()); @@ -1339,7 +1451,7 @@ class TerminateTestCallback : public EventBase::LoopCallback, eventBase_->runInLoop(this); } - virtual void runLoopCallback() noexcept { + void runLoopCallback() noexcept override { ++loopInvocations_; if (loopInvocations_ >= maxLoopInvocations_) { return; @@ -1404,21 +1516,21 @@ TEST(EventBaseTest, LoopTermination) { // Tests for latency calculations /////////////////////////////////////////////////////////////////////////// -class IdleTimeTimeoutSeries : public TAsyncTimeout { +class IdleTimeTimeoutSeries : public AsyncTimeout { public: explicit IdleTimeTimeoutSeries(EventBase *base, std::deque& timeout) : - TAsyncTimeout(base), + AsyncTimeout(base), timeouts_(0), timeout_(timeout) { scheduleTimeout(1); } - virtual ~IdleTimeTimeoutSeries() {} + ~IdleTimeTimeoutSeries() override {} - void timeoutExpired() noexcept { + void timeoutExpired() noexcept override { ++timeouts_; if(timeout_.empty()){ @@ -1453,7 +1565,7 @@ class IdleTimeTimeoutSeries : public TAsyncTimeout { */ TEST(EventBaseTest, IdleTime) { EventBase eventBase; - eventBase.setLoadAvgMsec(1000); + eventBase.setLoadAvgMsec(1000ms); eventBase.resetLoadAvg(5900.0); std::deque timeouts0(4, 8080); timeouts0.push_front(8000); @@ -1461,23 +1573,30 @@ TEST(EventBaseTest, IdleTime) { IdleTimeTimeoutSeries tos0(&eventBase, timeouts0); std::deque timeouts(20, 20); std::unique_ptr tos; + int64_t testStart = duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + bool hostOverloaded = false; int latencyCallbacks = 0; - eventBase.setMaxLatency(6000, [&]() { + eventBase.setMaxLatency(6000us, [&]() { ++latencyCallbacks; - - switch (latencyCallbacks) { - case 1: - ASSERT_EQ(6, tos0.getTimeouts()); - ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200); - ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200); - tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts)); - break; - - default: + if (latencyCallbacks != 1) { FAIL() << "Unexpected latency callback"; - break; } + + if (tos0.getTimeouts() < 6) { + // This could only happen if the host this test is running + // on is heavily loaded. + int64_t maxLatencyReached = duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + ASSERT_LE(43800, maxLatencyReached - testStart); + hostOverloaded = true; + return; + } + ASSERT_EQ(6, tos0.getTimeouts()); + ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200); + ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200); + tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts)); }); // Kick things off with an "immedite" timeout @@ -1485,10 +1604,15 @@ TEST(EventBaseTest, IdleTime) { eventBase.loop(); + if (hostOverloaded) { + return; + } + ASSERT_EQ(1, latencyCallbacks); ASSERT_EQ(7, tos0.getTimeouts()); ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200); ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200); + ASSERT_TRUE(!!tos); ASSERT_EQ(21, tos->getTimeouts()); } @@ -1540,3 +1664,251 @@ TEST(EventBaseTest, EventBaseThreadName) { ASSERT_EQ(0, strcmp("foo", name)); #endif } + +TEST(EventBaseTest, RunBeforeLoop) { + EventBase base; + CountedLoopCallback cb(&base, 1, [&](){ + base.terminateLoopSoon(); + }); + base.runBeforeLoop(&cb); + base.loopForever(); + ASSERT_EQ(cb.getCount(), 0); +} + +TEST(EventBaseTest, RunBeforeLoopWait) { + EventBase base; + CountedLoopCallback cb(&base, 1); + base.tryRunAfterDelay([&](){ + base.terminateLoopSoon(); + }, 500); + base.runBeforeLoop(&cb); + base.loopForever(); + + // Check that we only ran once, and did not loop multiple times. + ASSERT_EQ(cb.getCount(), 0); +} + +class PipeHandler : public EventHandler { +public: + PipeHandler(EventBase* eventBase, int fd) + : EventHandler(eventBase, fd) {} + + void handlerReady(uint16_t /* events */) noexcept override { abort(); } +}; + +TEST(EventBaseTest, StopBeforeLoop) { + EventBase evb; + + // Give the evb something to do. + int p[2]; + ASSERT_EQ(0, pipe(p)); + PipeHandler handler(&evb, p[0]); + handler.registerHandler(EventHandler::READ); + + // It's definitely not running yet + evb.terminateLoopSoon(); + + // let it run, it should exit quickly. + std::thread t([&] { evb.loop(); }); + t.join(); + + handler.unregisterHandler(); + close(p[0]); + close(p[1]); + + SUCCEED(); +} + +TEST(EventBaseTest, RunCallbacksOnDestruction) { + bool ran = false; + + { + EventBase base; + base.runInEventBaseThread([&](){ + ran = true; + }); + } + + ASSERT_TRUE(ran); +} + +TEST(EventBaseTest, LoopKeepAlive) { + EventBase evb; + + bool done = false; + std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable { + /* sleep override */ std::this_thread::sleep_for( + std::chrono::milliseconds(100)); + evb.runInEventBaseThread( + [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; }); + }); + + evb.loop(); + + ASSERT_TRUE(done); + + t.join(); +} + +TEST(EventBaseTest, LoopKeepAliveInLoop) { + EventBase evb; + + bool done = false; + std::thread t; + + evb.runInEventBaseThread([&] { + t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable { + /* sleep override */ std::this_thread::sleep_for( + std::chrono::milliseconds(100)); + evb.runInEventBaseThread( + [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; }); + }); + }); + + evb.loop(); + + ASSERT_TRUE(done); + + t.join(); +} + +TEST(EventBaseTest, LoopKeepAliveWithLoopForever) { + std::unique_ptr evb = folly::make_unique(); + + bool done = false; + + std::thread evThread([&] { + evb->loopForever(); + evb.reset(); + done = true; + }); + + { + auto* ev = evb.get(); + EventBase::LoopKeepAlive keepAlive; + ev->runInEventBaseThreadAndWait( + [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); }); + ASSERT_FALSE(done) << "Loop finished before we asked it to"; + ev->terminateLoopSoon(); + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + ASSERT_FALSE(done) << "Loop terminated early"; + ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{}); + } + + evThread.join(); + ASSERT_TRUE(done); +} + +TEST(EventBaseTest, LoopKeepAliveShutdown) { + auto evb = folly::make_unique(); + + bool done = false; + + std::thread t([ + &done, + loopKeepAlive = evb->loopKeepAlive(), + evbPtr = evb.get() + ]() mutable { + /* sleep override */ std::this_thread::sleep_for( + std::chrono::milliseconds(100)); + evbPtr->runInEventBaseThread( + [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; }); + }); + + evb.reset(); + + ASSERT_TRUE(done); + + t.join(); +} + +TEST(EventBaseTest, LoopKeepAliveAtomic) { + auto evb = folly::make_unique(); + + static constexpr size_t kNumThreads = 100; + static constexpr size_t kNumTasks = 100; + + std::vector ts; + std::vector>> batons; + size_t done{0}; + + for (size_t i = 0; i < kNumThreads; ++i) { + batons.emplace_back(std::make_unique>()); + } + + for (size_t i = 0; i < kNumThreads; ++i) { + ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] { + std::vector keepAlives; + for (size_t j = 0; j < kNumTasks; ++j) { + keepAlives.emplace_back(evbPtr->loopKeepAliveAtomic()); + } + + batonPtr->post(); + + /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1)); + + for (auto& keepAlive : keepAlives) { + evbPtr->runInEventBaseThread( + [&done, keepAlive = std::move(keepAlive) ]() { ++done; }); + } + }); + } + + for (auto& baton : batons) { + baton->wait(); + } + + evb.reset(); + + EXPECT_EQ(kNumThreads * kNumTasks, done); + + for (auto& t : ts) { + t.join(); + } +} + +TEST(EventBaseTest, DrivableExecutorTest) { + folly::Promise p; + auto f = p.getFuture(); + EventBase base; + bool finished = false; + + std::thread t([&] { + /* sleep override */ + std::this_thread::sleep_for(std::chrono::microseconds(10)); + finished = true; + base.runInEventBaseThread([&]() { p.setValue(true); }); + }); + + // Ensure drive does not busy wait + base.drive(); // TODO: fix notification queue init() extra wakeup + base.drive(); + EXPECT_TRUE(finished); + + folly::Promise p2; + auto f2 = p2.getFuture(); + // Ensure waitVia gets woken up properly, even from + // a separate thread. + base.runAfterDelay([&]() { p2.setValue(true); }, 10); + f2.waitVia(&base); + EXPECT_TRUE(f2.isReady()); + + t.join(); +} + +TEST(EventBaseTest, RequestContextTest) { + EventBase evb; + auto defaultCtx = RequestContext::get(); + + { + RequestContextScopeGuard rctx; + auto context = RequestContext::get(); + EXPECT_NE(defaultCtx, context); + evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); }); + } + + EXPECT_EQ(defaultCtx, RequestContext::get()); + evb.loop(); + EXPECT_EQ(defaultCtx, RequestContext::get()); +}