/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Copyright 2014-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
+
+#include <folly/Memory.h>
+#include <folly/ScopeGuard.h>
+
#include <folly/io/async/AsyncTimeout.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventHandler.h>
#include <folly/io/async/test/SocketPair.h>
#include <folly/io/async/test/Util.h>
+#include <folly/portability/Unistd.h>
+#include <folly/futures/Promise.h>
+
+#include <atomic>
#include <iostream>
-#include <unistd.h>
#include <memory>
+#include <thread>
+using std::atomic;
using std::deque;
using std::pair;
using std::vector;
+using std::unique_ptr;
+using std::thread;
using std::make_pair;
using std::cerr;
using std::endl;
+using std::chrono::milliseconds;
+using std::chrono::microseconds;
+using std::chrono::duration_cast;
+
+using namespace std::chrono_literals;
using namespace folly;
ssize_t writeToFD(int fd, size_t length) {
// write an arbitrary amount of data to the fd
- char buf[length];
- memset(buf, 'a', sizeof(buf));
- ssize_t rc = write(fd, buf, sizeof(buf));
+ auto bufv = vector<char>(length);
+ auto buf = bufv.data();
+ memset(buf, 'a', length);
+ ssize_t rc = write(fd, buf, length);
CHECK_EQ(rc, length);
return rc;
}
ssize_t readFromFD(int fd, size_t length) {
// write an arbitrary amount of data to the fd
- char buf[length];
- return read(fd, buf, sizeof(buf));
+ auto buf = vector<char>(length);
+ return read(fd, buf.data(), length);
}
size_t readUntilEmpty(int fd) {
void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
- eventBase->runAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
+ eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
ev->milliseconds);
}
}
TestHandler(EventBase* eventBase, int fd)
: EventHandler(eventBase, fd), fd_(fd) {}
- virtual void handlerReady(uint16_t events) noexcept {
+ void handlerReady(uint16_t events) noexcept override {
ssize_t bytesRead = 0;
ssize_t bytesWritten = 0;
if (events & READ) {
bytesWritten = writeUntilFull(fd_);
}
- log.push_back(EventRecord(events, bytesRead, bytesWritten));
+ log.emplace_back(events, bytesRead, bytesWritten);
}
struct EventRecord {
// Register timeouts to perform two write events
ScheduledEvent events[] = {
- { 10, EventHandler::WRITE, 2345 },
- { 160, EventHandler::WRITE, 99 },
- { 0, 0, 0 },
+ { 10, EventHandler::WRITE, 2345, 0 },
+ { 160, EventHandler::WRITE, 99, 0 },
+ { 0, 0, 0, 0 },
};
scheduleEvents(&eb, sp[1], events);
// the first chunk of data was received.
ASSERT_EQ(handler.log.size(), 1);
ASSERT_EQ(handler.log[0].events, EventHandler::READ);
- T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds, 90);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
+ milliseconds(events[0].milliseconds), milliseconds(90));
ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
ASSERT_EQ(handler.log[0].bytesWritten, 0);
- T_CHECK_TIMEOUT(start, end, events[1].milliseconds, 30);
+ T_CHECK_TIMEOUT(start, end,
+ milliseconds(events[1].milliseconds), milliseconds(30));
// Make sure the second chunk of data is still waiting to be read.
size_t bytesRemaining = readUntilEmpty(sp[0]);
// Register several timeouts to perform writes
ScheduledEvent events[] = {
- { 10, EventHandler::WRITE, 1024 },
- { 20, EventHandler::WRITE, 2211 },
- { 30, EventHandler::WRITE, 4096 },
- { 100, EventHandler::WRITE, 100 },
- { 0, 0 },
+ { 10, EventHandler::WRITE, 1024, 0 },
+ { 20, EventHandler::WRITE, 2211, 0 },
+ { 30, EventHandler::WRITE, 4096, 0 },
+ { 100, EventHandler::WRITE, 100, 0 },
+ { 0, 0, 0, 0 },
};
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler after the third write
- eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
+ eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
// Loop
TimePoint start;
ASSERT_EQ(handler.log.size(), 3);
for (int n = 0; n < 3; ++n) {
ASSERT_EQ(handler.log[n].events, EventHandler::READ);
- T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds);
+ T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
+ milliseconds(events[n].milliseconds));
ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
ASSERT_EQ(handler.log[n].bytesWritten, 0);
}
- T_CHECK_TIMEOUT(start, end, events[3].milliseconds);
+ T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
// Make sure the data from the last write is still waiting to be read
size_t bytesRemaining = readUntilEmpty(sp[0]);
// Register a timeout to perform another write
ScheduledEvent events[] = {
- { 10, EventHandler::WRITE, 2345 },
- { 0, 0, 0 },
+ { 10, EventHandler::WRITE, 2345, 0 },
+ { 0, 0, 0, 0 },
};
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler
- eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
+ eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
// Loop
TimePoint start;
// There should have been 1 event for immediate readability
ASSERT_EQ(handler.log[0].events, EventHandler::READ);
- T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
ASSERT_EQ(handler.log[0].bytesRead, dataLength);
ASSERT_EQ(handler.log[0].bytesWritten, 0);
// There should be another event after the timeout wrote more data
ASSERT_EQ(handler.log[1].events, EventHandler::READ);
- T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds);
+ T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
+ milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
ASSERT_EQ(handler.log[1].bytesWritten, 0);
- T_CHECK_TIMEOUT(start, end, 20);
+ T_CHECK_TIMEOUT(start, end, milliseconds(20));
}
/**
// Register timeouts to perform two reads
ScheduledEvent events[] = {
- { 10, EventHandler::READ, 0 },
- { 60, EventHandler::READ, 0 },
- { 0, 0, 0 },
+ { 10, EventHandler::READ, 0, 0 },
+ { 60, EventHandler::READ, 0, 0 },
+ { 0, 0, 0, 0 },
};
scheduleEvents(&eb, sp[1], events);
// have only been able to write once, then unregistered itself.
ASSERT_EQ(handler.log.size(), 1);
ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
- T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
+ milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[0].bytesRead, 0);
ASSERT_GT(handler.log[0].bytesWritten, 0);
- T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
+ T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
ASSERT_EQ(events[0].result, initialBytesWritten);
ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
// Register several timeouts to read from the socket at several intervals
ScheduledEvent events[] = {
- { 10, EventHandler::READ, 0 },
- { 40, EventHandler::READ, 0 },
- { 70, EventHandler::READ, 0 },
- { 100, EventHandler::READ, 0 },
- { 0, 0 },
+ { 10, EventHandler::READ, 0, 0 },
+ { 40, EventHandler::READ, 0, 0 },
+ { 70, EventHandler::READ, 0, 0 },
+ { 100, EventHandler::READ, 0, 0 },
+ { 0, 0, 0, 0 },
};
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler after the third read
- eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
+ eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
// Loop
TimePoint start;
ASSERT_EQ(events[0].result, initialBytesWritten);
for (int n = 0; n < 3; ++n) {
ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
- T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds);
+ T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
+ milliseconds(events[n].milliseconds));
ASSERT_EQ(handler.log[n].bytesRead, 0);
ASSERT_GT(handler.log[n].bytesWritten, 0);
ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
}
- T_CHECK_TIMEOUT(start, end, events[3].milliseconds);
+ T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
}
/**
// Register a timeout to perform a read
ScheduledEvent events[] = {
- { 10, EventHandler::READ, 0 },
- { 0, 0, 0 },
+ { 10, EventHandler::READ, 0, 0 },
+ { 0, 0, 0, 0 },
};
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler
int64_t unregisterTimeout = 40;
- eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
+ eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
unregisterTimeout);
// Loop
// Since the socket buffer was initially empty,
// there should have been 1 event for immediate writability
ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
- T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
ASSERT_EQ(handler.log[0].bytesRead, 0);
ASSERT_GT(handler.log[0].bytesWritten, 0);
// There should be another event after the timeout wrote more data
ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
- T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds);
+ T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
+ milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[1].bytesRead, 0);
ASSERT_GT(handler.log[1].bytesWritten, 0);
- T_CHECK_TIMEOUT(start, end, unregisterTimeout);
+ T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
}
/**
// Register timeouts to perform a write then a read.
ScheduledEvent events[] = {
- { 10, EventHandler::WRITE, 2345 },
- { 40, EventHandler::READ, 0 },
- { 0, 0, 0 },
+ { 10, EventHandler::WRITE, 2345, 0 },
+ { 40, EventHandler::READ, 0, 0 },
+ { 0, 0, 0, 0 },
};
scheduleEvents(&eb, sp[1], events);
// one event was logged.
ASSERT_EQ(handler.log.size(), 1);
ASSERT_EQ(handler.log[0].events, EventHandler::READ);
- T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
+ milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
ASSERT_EQ(handler.log[0].bytesWritten, 0);
ASSERT_EQ(events[1].result, sock0WriteLength);
- T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
+ T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
}
/**
// Register timeouts to perform a read then a write.
size_t sock1WriteLength = 2345;
ScheduledEvent events[] = {
- { 10, EventHandler::READ, 0 },
- { 40, EventHandler::WRITE, sock1WriteLength },
- { 0, 0, 0 },
+ { 10, EventHandler::READ, 0, 0 },
+ { 40, EventHandler::WRITE, sock1WriteLength, 0 },
+ { 0, 0, 0, 0 },
};
scheduleEvents(&eb, sp[1], events);
// one event was logged.
ASSERT_EQ(handler.log.size(), 1);
ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
- T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
+ milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[0].bytesRead, 0);
ASSERT_GT(handler.log[0].bytesWritten, 0);
ASSERT_EQ(events[0].result, sock0WriteLength);
ASSERT_EQ(events[1].result, sock1WriteLength);
- T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
+ T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
// Make sure the written data is still waiting to be read.
size_t bytesRemaining = readUntilEmpty(sp[0]);
// Register a timeout to perform a read and write together
ScheduledEvent events[] = {
- { 10, EventHandler::READ | EventHandler::WRITE, 0 },
- { 0, 0, 0 },
+ { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
+ { 0, 0, 0, 0 },
};
scheduleEvents(&eb, sp[1], events);
ASSERT_EQ(handler.log.size(), 1);
ASSERT_EQ(handler.log[0].events,
EventHandler::READ | EventHandler::WRITE);
- T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
+ milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
ASSERT_GT(handler.log[0].bytesWritten, 0);
- T_CHECK_TIMEOUT(start, end, events[0].milliseconds);
+ T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
}
/**
// Register timeouts to perform several reads and writes
ScheduledEvent events[] = {
- { 10, EventHandler::WRITE, 2345 },
- { 20, EventHandler::READ, 0 },
- { 35, EventHandler::WRITE, 200 },
- { 45, EventHandler::WRITE, 15 },
- { 55, EventHandler::READ, 0 },
- { 120, EventHandler::WRITE, 2345 },
- { 0, 0, 0 },
+ { 10, EventHandler::WRITE, 2345, 0 },
+ { 20, EventHandler::READ, 0, 0 },
+ { 35, EventHandler::WRITE, 200, 0 },
+ { 45, EventHandler::WRITE, 15, 0 },
+ { 55, EventHandler::READ, 0, 0 },
+ { 120, EventHandler::WRITE, 2345, 0 },
+ { 0, 0, 0, 0 },
};
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler
- eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
+ eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
// Loop
TimePoint start;
// Since we didn't fill up the write buffer immediately, there should
// be an immediate event for writability.
ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
- T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
+ T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
ASSERT_EQ(handler.log[0].bytesRead, 0);
ASSERT_GT(handler.log[0].bytesWritten, 0);
// Events 1 through 5 should correspond to the scheduled events
for (int n = 1; n < 6; ++n) {
ScheduledEvent* event = &events[n - 1];
- T_CHECK_TIMEOUT(start, handler.log[n].timestamp, event->milliseconds);
+ T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
+ milliseconds(event->milliseconds));
if (event->events == EventHandler::READ) {
ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
ASSERT_EQ(handler.log[n].bytesRead, 0);
PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
: TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
- virtual void handlerReady(uint16_t events) noexcept {
+ void handlerReady(uint16_t events) noexcept override {
assert(events == EventHandler::READ);
ssize_t bytesRead = readFromFD(fd_, readLength_);
- log.push_back(EventRecord(events, bytesRead, 0));
+ log.emplace_back(events, bytesRead, 0);
}
private:
// Register a timeout to perform a single write,
// with more data than PartialReadHandler will read at once
ScheduledEvent events[] = {
- { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) },
- { 0, 0, 0 },
+ { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
+ { 0, 0, 0, 0 },
};
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler
- eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
+ eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
// Loop
TimePoint start;
// The first 3 invocations should read readLength bytes each
for (int n = 0; n < 3; ++n) {
ASSERT_EQ(handler.log[n].events, EventHandler::READ);
- T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds);
+ T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
+ milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[n].bytesRead, readLength);
ASSERT_EQ(handler.log[n].bytesWritten, 0);
}
// The last read only has readLength/2 bytes
ASSERT_EQ(handler.log[3].events, EventHandler::READ);
- T_CHECK_TIMEOUT(start, handler.log[3].timestamp, events[0].milliseconds);
+ T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
+ milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
ASSERT_EQ(handler.log[3].bytesWritten, 0);
}
PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
: TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
- virtual void handlerReady(uint16_t events) noexcept {
+ void handlerReady(uint16_t events) noexcept override {
assert(events == EventHandler::WRITE);
ssize_t bytesWritten = writeToFD(fd_, writeLength_);
- log.push_back(EventRecord(events, 0, bytesWritten));
+ log.emplace_back(events, 0, bytesWritten);
}
private:
// Register a timeout to read, so that more data can be written
ScheduledEvent events[] = {
- { 10, EventHandler::READ, 0 },
- { 0, 0, 0 },
+ { 10, EventHandler::READ, 0, 0 },
+ { 0, 0, 0, 0 },
};
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler
- eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
+ eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
// Loop
TimePoint start;
// The first 3 invocations should read writeLength bytes each
for (int n = 0; n < numChecked; ++n) {
ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
- T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds);
+ T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
+ milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[n].bytesRead, 0);
ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
}
* Test destroying a registered EventHandler
*/
TEST(EventBaseTest, DestroyHandler) {
- class DestroyHandler : public TAsyncTimeout {
+ class DestroyHandler : public AsyncTimeout {
public:
DestroyHandler(EventBase* eb, EventHandler* h)
- : TAsyncTimeout(eb)
+ : AsyncTimeout(eb)
, handler_(h) {}
- virtual void timeoutExpired() noexcept {
- delete handler_;
- }
+ void timeoutExpired() noexcept override { delete handler_; }
private:
EventHandler* handler_;
// After 10ms, read some data, so that the handler
// will be notified that it can write.
- eb.runAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
+ eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
10);
// Start a timer to destroy the handler after 25ms
// Make sure the EventHandler was uninstalled properly when it was
// destroyed, and the EventBase loop exited
- T_CHECK_TIMEOUT(start, end, 25);
+ T_CHECK_TIMEOUT(start, end, milliseconds(25));
// Make sure that the handler wrote data to the socket
// before it was destroyed
TimePoint timestamp1(false);
TimePoint timestamp2(false);
TimePoint timestamp3(false);
- eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
- eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
- eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40);
+ eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
+ eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
+ eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40);
TimePoint start;
eb.loop();
TimePoint end;
- T_CHECK_TIMEOUT(start, timestamp1, 10);
- T_CHECK_TIMEOUT(start, timestamp2, 20);
- T_CHECK_TIMEOUT(start, timestamp3, 40);
- T_CHECK_TIMEOUT(start, end, 40);
+ T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
+ T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
+ T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
+ T_CHECK_TIMEOUT(start, end, milliseconds(40));
}
/**
- * Test the behavior of runAfterDelay() when some timeouts are
+ * Test the behavior of tryRunAfterDelay() when some timeouts are
* still scheduled when the EventBase is destroyed.
*/
TEST(EventBaseTest, RunAfterDelayDestruction) {
EventBase eb;
// Run two normal timeouts
- eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
- eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
+ eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
+ eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
// Schedule a timeout to stop the event loop after 40ms
- eb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
+ eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
// Schedule 2 timeouts that would fire after the event loop stops
- eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80);
- eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160);
+ eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80);
+ eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160);
start.reset();
eb.loop();
end.reset();
}
- T_CHECK_TIMEOUT(start, timestamp1, 10);
- T_CHECK_TIMEOUT(start, timestamp2, 20);
- T_CHECK_TIMEOUT(start, end, 40);
+ T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
+ T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
+ T_CHECK_TIMEOUT(start, end, milliseconds(40));
ASSERT_TRUE(timestamp3.isUnset());
ASSERT_TRUE(timestamp4.isUnset());
// memory is leaked.
}
-class TestTimeout : public TAsyncTimeout {
+class TestTimeout : public AsyncTimeout {
public:
explicit TestTimeout(EventBase* eventBase)
- : TAsyncTimeout(eventBase)
+ : AsyncTimeout(eventBase)
, timestamp(false) {}
- virtual void timeoutExpired() noexcept {
- timestamp.reset();
- }
+ void timeoutExpired() noexcept override { timestamp.reset(); }
TimePoint timestamp;
};
eb.loop();
TimePoint end;
- T_CHECK_TIMEOUT(start, t1.timestamp, 10);
- T_CHECK_TIMEOUT(start, t2.timestamp, 20);
- T_CHECK_TIMEOUT(start, t3.timestamp, 40);
- T_CHECK_TIMEOUT(start, end, 40);
+ T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
+ T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
+ T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
+ T_CHECK_TIMEOUT(start, end, milliseconds(40));
}
-class ReschedulingTimeout : public TAsyncTimeout {
+class ReschedulingTimeout : public AsyncTimeout {
public:
ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
- : TAsyncTimeout(evb)
+ : AsyncTimeout(evb)
, timeouts_(timeouts)
, iterator_(timeouts_.begin()) {}
reschedule();
}
- virtual void timeoutExpired() noexcept {
- timestamps.push_back(TimePoint());
+ void timeoutExpired() noexcept override {
+ timestamps.emplace_back();
reschedule();
}
// Use a higher tolerance than usual. We're waiting on 3 timeouts
// consecutively. In general, each timeout may go over by a few
// milliseconds, and we're tripling this error by witing on 3 timeouts.
- int64_t tolerance = 6;
+ milliseconds tolerance{6};
ASSERT_EQ(timeouts.size(), t.timestamps.size());
uint32_t total = 0;
- for (int n = 0; n < timeouts.size(); ++n) {
+ for (size_t n = 0; n < timeouts.size(); ++n) {
total += timeouts[n];
- T_CHECK_TIMEOUT(start, t.timestamps[n], total, tolerance);
+ T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
}
- T_CHECK_TIMEOUT(start, end, total, tolerance);
+ T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
}
/**
t2.scheduleTimeout(30);
t3.scheduleTimeout(30);
- auto f = static_cast<bool(TAsyncTimeout::*)(uint32_t)>(
- &TAsyncTimeout::scheduleTimeout);
+ auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
+ &AsyncTimeout::scheduleTimeout);
// after 10ms, reschedule t2 to run sooner than originally scheduled
- eb.runAfterDelay(std::bind(f, &t2, 10), 10);
+ eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
// after 10ms, reschedule t3 to run later than originally scheduled
- eb.runAfterDelay(std::bind(f, &t3, 40), 10);
+ eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
TimePoint start;
eb.loop();
TimePoint end;
- T_CHECK_TIMEOUT(start, t1.timestamp, 15);
- T_CHECK_TIMEOUT(start, t2.timestamp, 20);
- T_CHECK_TIMEOUT(start, t3.timestamp, 50);
- T_CHECK_TIMEOUT(start, end, 50);
+ T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
+ T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
+ T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
+ T_CHECK_TIMEOUT(start, end, milliseconds(50));
}
/**
ReschedulingTimeout t(&eb, timeouts);
t.start();
- eb.runAfterDelay(std::bind(&TAsyncTimeout::cancelTimeout, &t), 50);
+ eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
TimePoint start;
eb.loop();
TimePoint end;
ASSERT_EQ(t.timestamps.size(), 2);
- T_CHECK_TIMEOUT(start, t.timestamps[0], 10);
- T_CHECK_TIMEOUT(start, t.timestamps[1], 40);
- T_CHECK_TIMEOUT(start, end, 50);
+ T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
+ T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
+ T_CHECK_TIMEOUT(start, end, milliseconds(50));
}
/**
* Test destroying a scheduled timeout object
*/
TEST(EventBaseTest, DestroyTimeout) {
- class DestroyTimeout : public TAsyncTimeout {
+ class DestroyTimeout : public AsyncTimeout {
public:
- DestroyTimeout(EventBase* eb, TAsyncTimeout* t)
- : TAsyncTimeout(eb)
+ DestroyTimeout(EventBase* eb, AsyncTimeout* t)
+ : AsyncTimeout(eb)
, timeout_(t) {}
- virtual void timeoutExpired() noexcept {
- delete timeout_;
- }
+ void timeoutExpired() noexcept override { delete timeout_; }
private:
- TAsyncTimeout* timeout_;
+ AsyncTimeout* timeout_;
};
EventBase eb;
eb.loop();
TimePoint end;
- T_CHECK_TIMEOUT(start, end, 10);
+ T_CHECK_TIMEOUT(start, end, milliseconds(10));
}
};
void runInThreadTestFunc(RunInThreadArg* arg) {
- arg->data->values.push_back(make_pair(arg->thread, arg->value));
+ arg->data->values.emplace_back(arg->thread, arg->value);
RunInThreadData* data = arg->data;
delete arg;
}
}
-class RunInThreadTester : public concurrency::Runnable {
- public:
- RunInThreadTester(int id, RunInThreadData* data) : id_(id), data_(data) {}
-
- void run() {
- // Call evb->runInThread() a number of times
- {
- for (int n = 0; n < data_->opsPerThread; ++n) {
- RunInThreadArg* arg = new RunInThreadArg(data_, id_, n);
- data_->evb.runInEventBaseThread(runInThreadTestFunc, arg);
- usleep(10);
- }
- }
- }
-
- private:
- int id_;
- RunInThreadData* data_;
-};
-
TEST(EventBaseTest, RunInThread) {
- uint32_t numThreads = 50;
- uint32_t opsPerThread = 100;
+ constexpr uint32_t numThreads = 50;
+ constexpr uint32_t opsPerThread = 100;
RunInThreadData data(numThreads, opsPerThread);
- PosixThreadFactory threadFactory;
- threadFactory.setDetached(false);
- deque< std::shared_ptr<Thread> > threads;
- for (int n = 0; n < numThreads; ++n) {
- std::shared_ptr<RunInThreadTester> runner(new RunInThreadTester(n, &data));
- std::shared_ptr<Thread> thread = threadFactory.newThread(runner);
- threads.push_back(thread);
- thread->start();
+ deque<std::thread> threads;
+ SCOPE_EXIT {
+ // Wait on all of the threads.
+ for (auto& thread : threads) {
+ thread.join();
+ }
+ };
+
+ for (uint32_t i = 0; i < numThreads; ++i) {
+ threads.emplace_back([i, &data] {
+ for (int n = 0; n < data.opsPerThread; ++n) {
+ RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
+ data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
+ usleep(10);
+ }
+ });
}
// Add a timeout event to run after 3 seconds.
// Once the last thread exits, it will stop the loop(). However, this
// timeout also stops the loop in case there is a bug performing the normal
// stop.
- data.evb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
+ data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
3000);
TimePoint start;
// to stop. This should happen much sooner than the 3 second timeout.
// Assert that it happens in under a second. (This is still tons of extra
// padding.)
- int64_t timeTaken = end.getTime() - start.getTime();
- ASSERT_LT(timeTaken, 1000);
- VLOG(11) << "Time taken: " << timeTaken;
+
+ auto timeTaken = std::chrono::duration_cast<milliseconds>(
+ end.getTime() - start.getTime());
+ ASSERT_LT(timeTaken.count(), 1000);
+ VLOG(11) << "Time taken: " << timeTaken.count();
// Verify that we have all of the events from every thread
int expectedValues[numThreads];
- for (int n = 0; n < numThreads; ++n) {
+ for (uint32_t n = 0; n < numThreads; ++n) {
expectedValues[n] = 0;
}
for (deque< pair<int, int> >::const_iterator it = data.values.begin();
ASSERT_EQ(expectedValues[threadID], value);
++expectedValues[threadID];
}
- for (int n = 0; n < numThreads; ++n) {
+ for (uint32_t n = 0; n < numThreads; ++n) {
ASSERT_EQ(expectedValues[n], opsPerThread);
}
+}
- // Wait on all of the threads. Otherwise we can exit and clean up
- // RunInThreadData before the last thread exits, while it is still holding
- // the RunInThreadData's mutex.
- for (deque< std::shared_ptr<Thread> >::const_iterator it = threads.begin();
- it != threads.end();
- ++it) {
- (*it)->join();
+// This test simulates some calls, and verifies that the waiting happens by
+// triggering what otherwise would be race conditions, and trying to detect
+// whether any of the race conditions happened.
+TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
+ const size_t c = 256;
+ vector<unique_ptr<atomic<size_t>>> atoms(c);
+ for (size_t i = 0; i < c; ++i) {
+ auto& atom = atoms.at(i);
+ atom = std::make_unique<atomic<size_t>>(0);
+ }
+ vector<thread> threads;
+ for (size_t i = 0; i < c; ++i) {
+ threads.emplace_back([&atoms, i] {
+ EventBase eb;
+ auto& atom = *atoms.at(i);
+ auto ebth = thread([&] { eb.loopForever(); });
+ eb.waitUntilRunning();
+ eb.runInEventBaseThreadAndWait([&] {
+ size_t x = 0;
+ atom.compare_exchange_weak(
+ x, 1, std::memory_order_release, std::memory_order_relaxed);
+ });
+ size_t x = 0;
+ atom.compare_exchange_weak(
+ x, 2, std::memory_order_release, std::memory_order_relaxed);
+ eb.terminateLoopSoon();
+ ebth.join();
+ });
}
+ for (size_t i = 0; i < c; ++i) {
+ auto& th = threads.at(i);
+ th.join();
+ }
+ size_t sum = 0;
+ for (auto& atom : atoms) {
+ sum += *atom;
+ }
+ EXPECT_EQ(c, sum);
+}
+
+TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
+ EventBase eb;
+ thread th(&EventBase::loopForever, &eb);
+ SCOPE_EXIT {
+ eb.terminateLoopSoon();
+ th.join();
+ };
+ auto mutated = false;
+ eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
+ mutated = true;
+ });
+ EXPECT_TRUE(mutated);
+}
+
+TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
+ EventBase eb;
+ thread th(&EventBase::loopForever, &eb);
+ SCOPE_EXIT {
+ eb.terminateLoopSoon();
+ th.join();
+ };
+ eb.runInEventBaseThreadAndWait([&] {
+ auto mutated = false;
+ eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
+ mutated = true;
+ });
+ EXPECT_TRUE(mutated);
+ });
+}
+
+TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
+ EventBase eb;
+ auto mutated = false;
+ eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
+ mutated = true;
+ });
+ EXPECT_TRUE(mutated);
}
///////////////////////////////////////////////////////////////////////////
, count_(count)
, action_(action) {}
- virtual void runLoopCallback() noexcept {
+ void runLoopCallback() noexcept override {
--count_;
if (count_ > 0) {
eventBase_->runInLoop(this);
ASSERT_EQ(c.getCount(), 0);
}
+// Test that EventBase::loop() works as expected without time measurements.
+TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
+ EventBase eventBase(false);
+
+ CountedLoopCallback c(&eventBase, 10);
+ eventBase.runInLoop(&c);
+ // The callback shouldn't have run immediately
+ ASSERT_EQ(c.getCount(), 10);
+ eventBase.loop();
+
+ // loop() should loop until the CountedLoopCallback stops
+ // re-installing itself.
+ ASSERT_EQ(c.getCount(), 0);
+}
+
// Test runInLoop() calls with terminateLoopSoon()
TEST(EventBaseTest, RunInLoopStopLoop) {
EventBase eventBase;
ASSERT_LE(c1.getCount(), 11);
}
+TEST(EventBaseTest, messageAvailableException) {
+ auto deadManWalking = [] {
+ EventBase eventBase;
+ std::thread t([&] {
+ // Call this from another thread to force use of NotificationQueue in
+ // runInEventBaseThread
+ eventBase.runInEventBaseThread(
+ []() { throw std::runtime_error("boom"); });
+ });
+ t.join();
+ eventBase.loopForever();
+ };
+ EXPECT_DEATH(deadManWalking(), ".*");
+}
+
+TEST(EventBaseTest, TryRunningAfterTerminate) {
+ EventBase eventBase;
+ CountedLoopCallback c1(&eventBase, 1,
+ std::bind(&EventBase::terminateLoopSoon, &eventBase));
+ eventBase.runInLoop(&c1);
+ eventBase.loopForever();
+ bool ran = false;
+ eventBase.runInEventBaseThread([&]() {
+ ran = true;
+ });
+
+ ASSERT_FALSE(ran);
+}
+
// Test cancelling runInLoop() callbacks
TEST(EventBaseTest, CancelRunInLoop) {
EventBase eventBase;
// Run the loop
eventBase.loop();
- // cancelC1 and cancelC3 should have both fired after 10 iterations and
+ // cancelC1 and cancelC2 should have both fired after 10 iterations and
// stopped re-installing themselves
ASSERT_EQ(cancelC1.getCount(), 0);
ASSERT_EQ(cancelC2.getCount(), 0);
unregisterHandler();
}
- virtual void handlerReady(uint16_t events) noexcept {
+ void handlerReady(uint16_t /* events */) noexcept override {
// We didn't register with PERSIST, so we will have been automatically
// unregistered already.
ASSERT_FALSE(isHandlerRegistered());
eventBase_->runInLoop(this);
}
- virtual void runLoopCallback() noexcept {
+ void runLoopCallback() noexcept override {
++loopInvocations_;
if (loopInvocations_ >= maxLoopInvocations_) {
return;
// Tests for latency calculations
///////////////////////////////////////////////////////////////////////////
-class IdleTimeTimeoutSeries : public TAsyncTimeout {
+class IdleTimeTimeoutSeries : public AsyncTimeout {
public:
explicit IdleTimeTimeoutSeries(EventBase *base,
std::deque<std::uint64_t>& timeout) :
- TAsyncTimeout(base),
+ AsyncTimeout(base),
timeouts_(0),
timeout_(timeout) {
scheduleTimeout(1);
}
- virtual ~IdleTimeTimeoutSeries() {}
+ ~IdleTimeTimeoutSeries() override {}
- void timeoutExpired() noexcept {
+ void timeoutExpired() noexcept override {
++timeouts_;
if(timeout_.empty()){
*/
TEST(EventBaseTest, IdleTime) {
EventBase eventBase;
- eventBase.setLoadAvgMsec(1000);
+ eventBase.setLoadAvgMsec(1000ms);
eventBase.resetLoadAvg(5900.0);
std::deque<uint64_t> timeouts0(4, 8080);
timeouts0.push_front(8000);
IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
std::deque<uint64_t> timeouts(20, 20);
std::unique_ptr<IdleTimeTimeoutSeries> tos;
+ int64_t testStart = duration_cast<microseconds>(
+ std::chrono::steady_clock::now().time_since_epoch()).count();
+ bool hostOverloaded = false;
int latencyCallbacks = 0;
- eventBase.setMaxLatency(6000, [&]() {
+ eventBase.setMaxLatency(6000us, [&]() {
++latencyCallbacks;
-
- switch (latencyCallbacks) {
- case 1:
- ASSERT_EQ(6, tos0.getTimeouts());
- ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
- ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
- tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
- break;
-
- default:
+ if (latencyCallbacks != 1) {
FAIL() << "Unexpected latency callback";
- break;
}
+
+ if (tos0.getTimeouts() < 6) {
+ // This could only happen if the host this test is running
+ // on is heavily loaded.
+ int64_t maxLatencyReached = duration_cast<microseconds>(
+ std::chrono::steady_clock::now().time_since_epoch()).count();
+ ASSERT_LE(43800, maxLatencyReached - testStart);
+ hostOverloaded = true;
+ return;
+ }
+ ASSERT_EQ(6, tos0.getTimeouts());
+ ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
+ ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
+ tos = std::make_unique<IdleTimeTimeoutSeries>(&eventBase, timeouts);
});
// Kick things off with an "immedite" timeout
eventBase.loop();
+ if (hostOverloaded) {
+ return;
+ }
+
ASSERT_EQ(1, latencyCallbacks);
ASSERT_EQ(7, tos0.getTimeouts());
ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
+ ASSERT_TRUE(!!tos);
ASSERT_EQ(21, tos->getTimeouts());
}
});
base.loop();
- ASSERT_EQ(true, ran);
+ ASSERT_TRUE(ran);
}
TEST(EventBaseTest, EventBaseThreadName) {
#endif
}
-TEST(TEventBaseTest, RunBeforeLoop) {
- TEventBase base;
+TEST(EventBaseTest, RunBeforeLoop) {
+ EventBase base;
CountedLoopCallback cb(&base, 1, [&](){
base.terminateLoopSoon();
});
base.runBeforeLoop(&cb);
base.loopForever();
- ASSERT_EQUAL(cb.getCount(), 0);
+ ASSERT_EQ(cb.getCount(), 0);
}
-TEST(TEventBaseTest, RunBeforeLoopWait) {
- TEventBase base;
+TEST(EventBaseTest, RunBeforeLoopWait) {
+ EventBase base;
CountedLoopCallback cb(&base, 1);
- base.runAfterDelay([&](){
+ base.tryRunAfterDelay([&](){
base.terminateLoopSoon();
}, 500);
base.runBeforeLoop(&cb);
base.loopForever();
// Check that we only ran once, and did not loop multiple times.
- ASSERT_EQUAL(cb.getCount(), 0);
+ ASSERT_EQ(cb.getCount(), 0);
+}
+
+class PipeHandler : public EventHandler {
+ public:
+ PipeHandler(EventBase* eventBase, int fd)
+ : EventHandler(eventBase, fd) {}
+
+ void handlerReady(uint16_t /* events */) noexcept override { abort(); }
+};
+
+TEST(EventBaseTest, StopBeforeLoop) {
+ EventBase evb;
+
+ // Give the evb something to do.
+ int p[2];
+ ASSERT_EQ(0, pipe(p));
+ PipeHandler handler(&evb, p[0]);
+ handler.registerHandler(EventHandler::READ);
+
+ // It's definitely not running yet
+ evb.terminateLoopSoon();
+
+ // let it run, it should exit quickly.
+ std::thread t([&] { evb.loop(); });
+ t.join();
+
+ handler.unregisterHandler();
+ close(p[0]);
+ close(p[1]);
+
+ SUCCEED();
+}
+
+TEST(EventBaseTest, RunCallbacksOnDestruction) {
+ bool ran = false;
+
+ {
+ EventBase base;
+ base.runInEventBaseThread([&](){
+ ran = true;
+ });
+ }
+
+ ASSERT_TRUE(ran);
+}
+
+TEST(EventBaseTest, LoopKeepAlive) {
+ EventBase evb;
+
+ bool done = false;
+ std::thread t([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
+ /* sleep override */ std::this_thread::sleep_for(
+ std::chrono::milliseconds(100));
+ evb.runInEventBaseThread(
+ [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
+ });
+
+ evb.loop();
+
+ ASSERT_TRUE(done);
+
+ t.join();
+}
+
+TEST(EventBaseTest, LoopKeepAliveInLoop) {
+ EventBase evb;
+
+ bool done = false;
+ std::thread t;
+
+ evb.runInEventBaseThread([&] {
+ t = std::thread([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
+ /* sleep override */ std::this_thread::sleep_for(
+ std::chrono::milliseconds(100));
+ evb.runInEventBaseThread(
+ [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
+ });
+ });
+
+ evb.loop();
+
+ ASSERT_TRUE(done);
+
+ t.join();
+}
+
+TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
+ std::unique_ptr<EventBase> evb = std::make_unique<EventBase>();
+
+ bool done = false;
+
+ std::thread evThread([&] {
+ evb->loopForever();
+ evb.reset();
+ done = true;
+ });
+
+ {
+ auto* ev = evb.get();
+ Executor::KeepAlive keepAlive;
+ ev->runInEventBaseThreadAndWait(
+ [&ev, &keepAlive] { keepAlive = ev->getKeepAliveToken(); });
+ ASSERT_FALSE(done) << "Loop finished before we asked it to";
+ ev->terminateLoopSoon();
+ /* sleep override */
+ std::this_thread::sleep_for(std::chrono::milliseconds(30));
+ ASSERT_FALSE(done) << "Loop terminated early";
+ ev->runInEventBaseThread([keepAlive = std::move(keepAlive)]{});
+ }
+
+ evThread.join();
+ ASSERT_TRUE(done);
+}
+
+TEST(EventBaseTest, LoopKeepAliveShutdown) {
+ auto evb = std::make_unique<EventBase>();
+
+ bool done = false;
+
+ std::thread t([
+ &done,
+ loopKeepAlive = evb->getKeepAliveToken(),
+ evbPtr = evb.get()
+ ]() mutable {
+ /* sleep override */ std::this_thread::sleep_for(
+ std::chrono::milliseconds(100));
+ evbPtr->runInEventBaseThread(
+ [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
+ });
+
+ evb.reset();
+
+ ASSERT_TRUE(done);
+
+ t.join();
+}
+
+TEST(EventBaseTest, LoopKeepAliveAtomic) {
+ auto evb = std::make_unique<EventBase>();
+
+ static constexpr size_t kNumThreads = 100;
+ static constexpr size_t kNumTasks = 100;
+
+ std::vector<std::thread> ts;
+ std::vector<std::unique_ptr<Baton<>>> batons;
+ size_t done{0};
+
+ for (size_t i = 0; i < kNumThreads; ++i) {
+ batons.emplace_back(std::make_unique<Baton<>>());
+ }
+
+ for (size_t i = 0; i < kNumThreads; ++i) {
+ ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
+ std::vector<Executor::KeepAlive> keepAlives;
+ for (size_t j = 0; j < kNumTasks; ++j) {
+ keepAlives.emplace_back(evbPtr->getKeepAliveToken());
+ }
+
+ batonPtr->post();
+
+ /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ for (auto& keepAlive : keepAlives) {
+ evbPtr->runInEventBaseThread(
+ [&done, keepAlive = std::move(keepAlive) ]() { ++done; });
+ }
+ });
+ }
+
+ for (auto& baton : batons) {
+ baton->wait();
+ }
+
+ evb.reset();
+
+ EXPECT_EQ(kNumThreads * kNumTasks, done);
+
+ for (auto& t : ts) {
+ t.join();
+ }
+}
+
+TEST(EventBaseTest, DrivableExecutorTest) {
+ folly::Promise<bool> p;
+ auto f = p.getFuture();
+ EventBase base;
+ bool finished = false;
+
+ std::thread t([&] {
+ /* sleep override */
+ std::this_thread::sleep_for(std::chrono::microseconds(10));
+ finished = true;
+ base.runInEventBaseThread([&]() { p.setValue(true); });
+ });
+
+ // Ensure drive does not busy wait
+ base.drive(); // TODO: fix notification queue init() extra wakeup
+ base.drive();
+ EXPECT_TRUE(finished);
+
+ folly::Promise<bool> p2;
+ auto f2 = p2.getFuture();
+ // Ensure waitVia gets woken up properly, even from
+ // a separate thread.
+ base.runAfterDelay([&]() { p2.setValue(true); }, 10);
+ f2.waitVia(&base);
+ EXPECT_TRUE(f2.isReady());
+
+ t.join();
+}
+
+TEST(EventBaseTest, RequestContextTest) {
+ EventBase evb;
+ auto defaultCtx = RequestContext::get();
+ std::weak_ptr<RequestContext> rctx_weak_ptr;
+
+ {
+ RequestContextScopeGuard rctx;
+ rctx_weak_ptr = RequestContext::saveContext();
+ auto context = RequestContext::get();
+ EXPECT_NE(defaultCtx, context);
+ evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
+ evb.loop();
+ }
+
+ // Ensure that RequestContext created for the scope has been released and
+ // deleted.
+ EXPECT_EQ(rctx_weak_ptr.expired(), true);
+
+ EXPECT_EQ(defaultCtx, RequestContext::get());
+}
+
+TEST(EventBaseTest, CancelLoopCallbackRequestContextTest) {
+ EventBase evb;
+ CountedLoopCallback c(&evb, 1);
+
+ auto defaultCtx = RequestContext::get();
+ EXPECT_EQ(defaultCtx, RequestContext::get());
+ std::weak_ptr<RequestContext> rctx_weak_ptr;
+
+ {
+ RequestContextScopeGuard rctx;
+ rctx_weak_ptr = RequestContext::saveContext();
+ auto context = RequestContext::get();
+ EXPECT_NE(defaultCtx, context);
+ evb.runInLoop(&c);
+ c.cancelLoopCallback();
+ }
+
+ // Ensure that RequestContext created for the scope has been released and
+ // deleted.
+ EXPECT_EQ(rctx_weak_ptr.expired(), true);
+
+ EXPECT_EQ(defaultCtx, RequestContext::get());
}