Adds writer test case for RCU
[folly.git] / folly / io / async / test / EventBaseTest.cpp
index 3dab11d7e148b7d53e41f39227c6b5a980ad4e2b..ac2ff0c156321a8531251ea73ae1bb62220a7b3c 100644 (file)
@@ -1,37 +1,50 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Copyright 2014-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
+
+#include <folly/Memory.h>
+#include <folly/ScopeGuard.h>
+
 #include <folly/io/async/AsyncTimeout.h>
 #include <folly/io/async/EventBase.h>
 #include <folly/io/async/EventHandler.h>
 #include <folly/io/async/test/SocketPair.h>
 #include <folly/io/async/test/Util.h>
+#include <folly/portability/Unistd.h>
 
+#include <folly/futures/Promise.h>
+
+#include <atomic>
 #include <iostream>
-#include <unistd.h>
 #include <memory>
+#include <thread>
 
+using std::atomic;
 using std::deque;
 using std::pair;
 using std::vector;
+using std::unique_ptr;
+using std::thread;
 using std::make_pair;
 using std::cerr;
 using std::endl;
+using std::chrono::milliseconds;
+using std::chrono::microseconds;
+using std::chrono::duration_cast;
+
+using namespace std::chrono_literals;
 
 using namespace folly;
 
@@ -43,9 +56,10 @@ enum { BUF_SIZE = 4096 };
 
 ssize_t writeToFD(int fd, size_t length) {
   // write an arbitrary amount of data to the fd
-  char buf[length];
-  memset(buf, 'a', sizeof(buf));
-  ssize_t rc = write(fd, buf, sizeof(buf));
+  auto bufv = vector<char>(length);
+  auto buf = bufv.data();
+  memset(buf, 'a', length);
+  ssize_t rc = write(fd, buf, length);
   CHECK_EQ(rc, length);
   return rc;
 }
@@ -69,8 +83,8 @@ size_t writeUntilFull(int fd) {
 
 ssize_t readFromFD(int fd, size_t length) {
   // write an arbitrary amount of data to the fd
-  char buf[length];
-  return read(fd, buf, sizeof(buf));
+  auto buf = vector<char>(length);
+  return read(fd, buf.data(), length);
 }
 
 size_t readUntilEmpty(int fd) {
@@ -121,7 +135,7 @@ struct ScheduledEvent {
 
 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
   for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
-    eventBase->runAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
+    eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
                              ev->milliseconds);
   }
 }
@@ -131,7 +145,7 @@ class TestHandler : public EventHandler {
   TestHandler(EventBase* eventBase, int fd)
     : EventHandler(eventBase, fd), fd_(fd) {}
 
-  virtual void handlerReady(uint16_t events) noexcept {
+  void handlerReady(uint16_t events) noexcept override {
     ssize_t bytesRead = 0;
     ssize_t bytesWritten = 0;
     if (events & READ) {
@@ -145,7 +159,7 @@ class TestHandler : public EventHandler {
       bytesWritten = writeUntilFull(fd_);
     }
 
-    log.push_back(EventRecord(events, bytesRead, bytesWritten));
+    log.emplace_back(events, bytesRead, bytesWritten);
   }
 
   struct EventRecord {
@@ -180,9 +194,9 @@ TEST(EventBaseTest, ReadEvent) {
 
   // Register timeouts to perform two write events
   ScheduledEvent events[] = {
-    { 10, EventHandler::WRITE, 2345 },
-    { 160, EventHandler::WRITE, 99 },
-    { 0, 0, 0 },
+    { 10, EventHandler::WRITE, 2345, 0 },
+    { 160, EventHandler::WRITE, 99, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -196,10 +210,12 @@ TEST(EventBaseTest, ReadEvent) {
   // the first chunk of data was received.
   ASSERT_EQ(handler.log.size(), 1);
   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
-  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds, 90);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
+                  milliseconds(events[0].milliseconds), milliseconds(90));
   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
   ASSERT_EQ(handler.log[0].bytesWritten, 0);
-  T_CHECK_TIMEOUT(start, end, events[1].milliseconds, 30);
+  T_CHECK_TIMEOUT(start, end,
+                  milliseconds(events[1].milliseconds), milliseconds(30));
 
   // Make sure the second chunk of data is still waiting to be read.
   size_t bytesRemaining = readUntilEmpty(sp[0]);
@@ -219,16 +235,16 @@ TEST(EventBaseTest, ReadPersist) {
 
   // Register several timeouts to perform writes
   ScheduledEvent events[] = {
-    { 10,  EventHandler::WRITE, 1024 },
-    { 20,  EventHandler::WRITE, 2211 },
-    { 30,  EventHandler::WRITE, 4096 },
-    { 100, EventHandler::WRITE, 100 },
-    { 0, 0 },
+    { 10,  EventHandler::WRITE, 1024, 0 },
+    { 20,  EventHandler::WRITE, 2211, 0 },
+    { 30,  EventHandler::WRITE, 4096, 0 },
+    { 100, EventHandler::WRITE, 100,  0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
   // Schedule a timeout to unregister the handler after the third write
-  eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
+  eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
 
   // Loop
   TimePoint start;
@@ -240,11 +256,12 @@ TEST(EventBaseTest, ReadPersist) {
   ASSERT_EQ(handler.log.size(), 3);
   for (int n = 0; n < 3; ++n) {
     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
-    T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds);
+    T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
+                    milliseconds(events[n].milliseconds));
     ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
     ASSERT_EQ(handler.log[n].bytesWritten, 0);
   }
-  T_CHECK_TIMEOUT(start, end, events[3].milliseconds);
+  T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
 
   // Make sure the data from the last write is still waiting to be read
   size_t bytesRemaining = readUntilEmpty(sp[0]);
@@ -269,13 +286,13 @@ TEST(EventBaseTest, ReadImmediate) {
 
   // Register a timeout to perform another write
   ScheduledEvent events[] = {
-    { 10, EventHandler::WRITE, 2345 },
-    { 0, 0, 0 },
+    { 10, EventHandler::WRITE, 2345, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
   // Schedule a timeout to unregister the handler
-  eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
+  eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
 
   // Loop
   TimePoint start;
@@ -286,17 +303,18 @@ TEST(EventBaseTest, ReadImmediate) {
 
   // There should have been 1 event for immediate readability
   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
-  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
   ASSERT_EQ(handler.log[0].bytesRead, dataLength);
   ASSERT_EQ(handler.log[0].bytesWritten, 0);
 
   // There should be another event after the timeout wrote more data
   ASSERT_EQ(handler.log[1].events, EventHandler::READ);
-  T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds);
+  T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
+                  milliseconds(events[0].milliseconds));
   ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
   ASSERT_EQ(handler.log[1].bytesWritten, 0);
 
-  T_CHECK_TIMEOUT(start, end, 20);
+  T_CHECK_TIMEOUT(start, end, milliseconds(20));
 }
 
 /**
@@ -315,9 +333,9 @@ TEST(EventBaseTest, WriteEvent) {
 
   // Register timeouts to perform two reads
   ScheduledEvent events[] = {
-    { 10, EventHandler::READ, 0 },
-    { 60, EventHandler::READ, 0 },
-    { 0, 0, 0 },
+    { 10, EventHandler::READ, 0, 0 },
+    { 60, EventHandler::READ, 0, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -330,10 +348,11 @@ TEST(EventBaseTest, WriteEvent) {
   // have only been able to write once, then unregistered itself.
   ASSERT_EQ(handler.log.size(), 1);
   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
-  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
+                  milliseconds(events[0].milliseconds));
   ASSERT_EQ(handler.log[0].bytesRead, 0);
   ASSERT_GT(handler.log[0].bytesWritten, 0);
-  T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
+  T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
 
   ASSERT_EQ(events[0].result, initialBytesWritten);
   ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
@@ -355,16 +374,16 @@ TEST(EventBaseTest, WritePersist) {
 
   // Register several timeouts to read from the socket at several intervals
   ScheduledEvent events[] = {
-    { 10,  EventHandler::READ, 0 },
-    { 40,  EventHandler::READ, 0 },
-    { 70,  EventHandler::READ, 0 },
-    { 100, EventHandler::READ, 0 },
-    { 0, 0 },
+    { 10,  EventHandler::READ, 0, 0 },
+    { 40,  EventHandler::READ, 0, 0 },
+    { 70,  EventHandler::READ, 0, 0 },
+    { 100, EventHandler::READ, 0, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
   // Schedule a timeout to unregister the handler after the third read
-  eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
+  eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
 
   // Loop
   TimePoint start;
@@ -377,12 +396,13 @@ TEST(EventBaseTest, WritePersist) {
   ASSERT_EQ(events[0].result, initialBytesWritten);
   for (int n = 0; n < 3; ++n) {
     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
-    T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds);
+    T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
+                    milliseconds(events[n].milliseconds));
     ASSERT_EQ(handler.log[n].bytesRead, 0);
     ASSERT_GT(handler.log[n].bytesWritten, 0);
     ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
   }
-  T_CHECK_TIMEOUT(start, end, events[3].milliseconds);
+  T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
 }
 
 /**
@@ -398,14 +418,14 @@ TEST(EventBaseTest, WriteImmediate) {
 
   // Register a timeout to perform a read
   ScheduledEvent events[] = {
-    { 10, EventHandler::READ, 0 },
-    { 0, 0, 0 },
+    { 10, EventHandler::READ, 0, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
   // Schedule a timeout to unregister the handler
   int64_t unregisterTimeout = 40;
-  eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
+  eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
                    unregisterTimeout);
 
   // Loop
@@ -418,17 +438,18 @@ TEST(EventBaseTest, WriteImmediate) {
   // Since the socket buffer was initially empty,
   // there should have been 1 event for immediate writability
   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
-  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
   ASSERT_EQ(handler.log[0].bytesRead, 0);
   ASSERT_GT(handler.log[0].bytesWritten, 0);
 
   // There should be another event after the timeout wrote more data
   ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
-  T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds);
+  T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
+                  milliseconds(events[0].milliseconds));
   ASSERT_EQ(handler.log[1].bytesRead, 0);
   ASSERT_GT(handler.log[1].bytesWritten, 0);
 
-  T_CHECK_TIMEOUT(start, end, unregisterTimeout);
+  T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
 }
 
 /**
@@ -447,9 +468,9 @@ TEST(EventBaseTest, ReadWrite) {
 
   // Register timeouts to perform a write then a read.
   ScheduledEvent events[] = {
-    { 10, EventHandler::WRITE, 2345 },
-    { 40, EventHandler::READ, 0 },
-    { 0, 0, 0 },
+    { 10, EventHandler::WRITE, 2345, 0 },
+    { 40, EventHandler::READ, 0, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -463,11 +484,12 @@ TEST(EventBaseTest, ReadWrite) {
   // one event was logged.
   ASSERT_EQ(handler.log.size(), 1);
   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
-  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
+                  milliseconds(events[0].milliseconds));
   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
   ASSERT_EQ(handler.log[0].bytesWritten, 0);
   ASSERT_EQ(events[1].result, sock0WriteLength);
-  T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
+  T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
 }
 
 /**
@@ -487,9 +509,9 @@ TEST(EventBaseTest, WriteRead) {
   // Register timeouts to perform a read then a write.
   size_t sock1WriteLength = 2345;
   ScheduledEvent events[] = {
-    { 10, EventHandler::READ, 0 },
-    { 40, EventHandler::WRITE, sock1WriteLength },
-    { 0, 0, 0 },
+    { 10, EventHandler::READ, 0, 0 },
+    { 40, EventHandler::WRITE, sock1WriteLength, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -503,12 +525,13 @@ TEST(EventBaseTest, WriteRead) {
   // one event was logged.
   ASSERT_EQ(handler.log.size(), 1);
   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
-  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
+                  milliseconds(events[0].milliseconds));
   ASSERT_EQ(handler.log[0].bytesRead, 0);
   ASSERT_GT(handler.log[0].bytesWritten, 0);
   ASSERT_EQ(events[0].result, sock0WriteLength);
   ASSERT_EQ(events[1].result, sock1WriteLength);
-  T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
+  T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
 
   // Make sure the written data is still waiting to be read.
   size_t bytesRemaining = readUntilEmpty(sp[0]);
@@ -532,8 +555,8 @@ TEST(EventBaseTest, ReadWriteSimultaneous) {
 
   // Register a timeout to perform a read and write together
   ScheduledEvent events[] = {
-    { 10, EventHandler::READ | EventHandler::WRITE, 0 },
-    { 0, 0, 0 },
+    { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -550,10 +573,11 @@ TEST(EventBaseTest, ReadWriteSimultaneous) {
   ASSERT_EQ(handler.log.size(), 1);
   ASSERT_EQ(handler.log[0].events,
                     EventHandler::READ | EventHandler::WRITE);
-  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
+                  milliseconds(events[0].milliseconds));
   ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
   ASSERT_GT(handler.log[0].bytesWritten, 0);
-  T_CHECK_TIMEOUT(start, end, events[0].milliseconds);
+  T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
 }
 
 /**
@@ -570,18 +594,18 @@ TEST(EventBaseTest, ReadWritePersist) {
 
   // Register timeouts to perform several reads and writes
   ScheduledEvent events[] = {
-    { 10, EventHandler::WRITE, 2345 },
-    { 20, EventHandler::READ, 0 },
-    { 35, EventHandler::WRITE, 200 },
-    { 45, EventHandler::WRITE, 15 },
-    { 55, EventHandler::READ, 0 },
-    { 120, EventHandler::WRITE, 2345 },
-    { 0, 0, 0 },
+    { 10, EventHandler::WRITE, 2345, 0 },
+    { 20, EventHandler::READ, 0, 0 },
+    { 35, EventHandler::WRITE, 200, 0 },
+    { 45, EventHandler::WRITE, 15, 0 },
+    { 55, EventHandler::READ, 0, 0 },
+    { 120, EventHandler::WRITE, 2345, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
   // Schedule a timeout to unregister the handler
-  eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
+  eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
 
   // Loop
   TimePoint start;
@@ -593,14 +617,15 @@ TEST(EventBaseTest, ReadWritePersist) {
   // Since we didn't fill up the write buffer immediately, there should
   // be an immediate event for writability.
   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
-  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
   ASSERT_EQ(handler.log[0].bytesRead, 0);
   ASSERT_GT(handler.log[0].bytesWritten, 0);
 
   // Events 1 through 5 should correspond to the scheduled events
   for (int n = 1; n < 6; ++n) {
     ScheduledEvent* event = &events[n - 1];
-    T_CHECK_TIMEOUT(start, handler.log[n].timestamp, event->milliseconds);
+    T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
+                    milliseconds(event->milliseconds));
     if (event->events == EventHandler::READ) {
       ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
       ASSERT_EQ(handler.log[n].bytesRead, 0);
@@ -624,10 +649,10 @@ class PartialReadHandler : public TestHandler {
   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
     : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
 
-  virtual void handlerReady(uint16_t events) noexcept {
+  void handlerReady(uint16_t events) noexcept override {
     assert(events == EventHandler::READ);
     ssize_t bytesRead = readFromFD(fd_, readLength_);
-    log.push_back(EventRecord(events, bytesRead, 0));
+    log.emplace_back(events, bytesRead, 0);
   }
 
  private:
@@ -652,13 +677,13 @@ TEST(EventBaseTest, ReadPartial) {
   // Register a timeout to perform a single write,
   // with more data than PartialReadHandler will read at once
   ScheduledEvent events[] = {
-    { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) },
-    { 0, 0, 0 },
+    { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
   // Schedule a timeout to unregister the handler
-  eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
+  eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
 
   // Loop
   TimePoint start;
@@ -670,13 +695,15 @@ TEST(EventBaseTest, ReadPartial) {
   // The first 3 invocations should read readLength bytes each
   for (int n = 0; n < 3; ++n) {
     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
-    T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds);
+    T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
+                    milliseconds(events[0].milliseconds));
     ASSERT_EQ(handler.log[n].bytesRead, readLength);
     ASSERT_EQ(handler.log[n].bytesWritten, 0);
   }
   // The last read only has readLength/2 bytes
   ASSERT_EQ(handler.log[3].events, EventHandler::READ);
-  T_CHECK_TIMEOUT(start, handler.log[3].timestamp, events[0].milliseconds);
+  T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
+                  milliseconds(events[0].milliseconds));
   ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
   ASSERT_EQ(handler.log[3].bytesWritten, 0);
 }
@@ -687,10 +714,10 @@ class PartialWriteHandler : public TestHandler {
   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
     : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
 
-  virtual void handlerReady(uint16_t events) noexcept {
+  void handlerReady(uint16_t events) noexcept override {
     assert(events == EventHandler::WRITE);
     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
-    log.push_back(EventRecord(events, 0, bytesWritten));
+    log.emplace_back(events, 0, bytesWritten);
   }
 
  private:
@@ -717,13 +744,13 @@ TEST(EventBaseTest, WritePartial) {
 
   // Register a timeout to read, so that more data can be written
   ScheduledEvent events[] = {
-    { 10, EventHandler::READ, 0 },
-    { 0, 0, 0 },
+    { 10, EventHandler::READ, 0, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
   // Schedule a timeout to unregister the handler
-  eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
+  eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
 
   // Loop
   TimePoint start;
@@ -739,7 +766,8 @@ TEST(EventBaseTest, WritePartial) {
   // The first 3 invocations should read writeLength bytes each
   for (int n = 0; n < numChecked; ++n) {
     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
-    T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds);
+    T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
+                    milliseconds(events[0].milliseconds));
     ASSERT_EQ(handler.log[n].bytesRead, 0);
     ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
   }
@@ -750,15 +778,13 @@ TEST(EventBaseTest, WritePartial) {
  * Test destroying a registered EventHandler
  */
 TEST(EventBaseTest, DestroyHandler) {
-  class DestroyHandler : public TAsyncTimeout {
+  class DestroyHandler : public AsyncTimeout {
    public:
     DestroyHandler(EventBase* eb, EventHandler* h)
-      : TAsyncTimeout(eb)
+      : AsyncTimeout(eb)
       , handler_(h) {}
 
-    virtual void timeoutExpired() noexcept {
-      delete handler_;
-    }
+    void timeoutExpired() noexcept override { delete handler_; }
 
    private:
     EventHandler* handler_;
@@ -776,7 +802,7 @@ TEST(EventBaseTest, DestroyHandler) {
 
   // After 10ms, read some data, so that the handler
   // will be notified that it can write.
-  eb.runAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
+  eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
                    10);
 
   // Start a timer to destroy the handler after 25ms
@@ -790,7 +816,7 @@ TEST(EventBaseTest, DestroyHandler) {
 
   // Make sure the EventHandler was uninstalled properly when it was
   // destroyed, and the EventBase loop exited
-  T_CHECK_TIMEOUT(start, end, 25);
+  T_CHECK_TIMEOUT(start, end, milliseconds(25));
 
   // Make sure that the handler wrote data to the socket
   // before it was destroyed
@@ -809,22 +835,22 @@ TEST(EventBaseTest, RunAfterDelay) {
   TimePoint timestamp1(false);
   TimePoint timestamp2(false);
   TimePoint timestamp3(false);
-  eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
-  eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
-  eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
+  eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
+  eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
+  eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
 
   TimePoint start;
   eb.loop();
   TimePoint end;
 
-  T_CHECK_TIMEOUT(start, timestamp1, 10);
-  T_CHECK_TIMEOUT(start, timestamp2, 20);
-  T_CHECK_TIMEOUT(start, timestamp3, 40);
-  T_CHECK_TIMEOUT(start, end, 40);
+  T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
+  T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
+  T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
+  T_CHECK_TIMEOUT(start, end, milliseconds(40));
 }
 
 /**
- * Test the behavior of runAfterDelay() when some timeouts are
+ * Test the behavior of tryRunAfterDelay() when some timeouts are
  * still scheduled when the EventBase is destroyed.
  */
 TEST(EventBaseTest, RunAfterDelayDestruction) {
@@ -839,24 +865,24 @@ TEST(EventBaseTest, RunAfterDelayDestruction) {
     EventBase eb;
 
     // Run two normal timeouts
-    eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
-    eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
+    eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
+    eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
 
     // Schedule a timeout to stop the event loop after 40ms
-    eb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
+    eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
 
     // Schedule 2 timeouts that would fire after the event loop stops
-    eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
-    eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
+    eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
+    eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
 
     start.reset();
     eb.loop();
     end.reset();
   }
 
-  T_CHECK_TIMEOUT(start, timestamp1, 10);
-  T_CHECK_TIMEOUT(start, timestamp2, 20);
-  T_CHECK_TIMEOUT(start, end, 40);
+  T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
+  T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
+  T_CHECK_TIMEOUT(start, end, milliseconds(40));
 
   ASSERT_TRUE(timestamp3.isUnset());
   ASSERT_TRUE(timestamp4.isUnset());
@@ -865,15 +891,13 @@ TEST(EventBaseTest, RunAfterDelayDestruction) {
   // memory is leaked.
 }
 
-class TestTimeout : public TAsyncTimeout {
+class TestTimeout : public AsyncTimeout {
  public:
   explicit TestTimeout(EventBase* eventBase)
-    : TAsyncTimeout(eventBase)
+    : AsyncTimeout(eventBase)
     , timestamp(false) {}
 
-  virtual void timeoutExpired() noexcept {
-    timestamp.reset();
-  }
+  void timeoutExpired() noexcept override { timestamp.reset(); }
 
   TimePoint timestamp;
 };
@@ -892,16 +916,16 @@ TEST(EventBaseTest, BasicTimeouts) {
   eb.loop();
   TimePoint end;
 
-  T_CHECK_TIMEOUT(start, t1.timestamp, 10);
-  T_CHECK_TIMEOUT(start, t2.timestamp, 20);
-  T_CHECK_TIMEOUT(start, t3.timestamp, 40);
-  T_CHECK_TIMEOUT(start, end, 40);
+  T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
+  T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
+  T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
+  T_CHECK_TIMEOUT(start, end, milliseconds(40));
 }
 
-class ReschedulingTimeout : public TAsyncTimeout {
+class ReschedulingTimeout : public AsyncTimeout {
  public:
   ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
-    : TAsyncTimeout(evb)
+    : AsyncTimeout(evb)
     , timeouts_(timeouts)
     , iterator_(timeouts_.begin()) {}
 
@@ -909,8 +933,8 @@ class ReschedulingTimeout : public TAsyncTimeout {
     reschedule();
   }
 
-  virtual void timeoutExpired() noexcept {
-    timestamps.push_back(TimePoint());
+  void timeoutExpired() noexcept override {
+    timestamps.emplace_back();
     reschedule();
   }
 
@@ -950,15 +974,15 @@ TEST(EventBaseTest, ReuseTimeout) {
   // Use a higher tolerance than usual.  We're waiting on 3 timeouts
   // consecutively.  In general, each timeout may go over by a few
   // milliseconds, and we're tripling this error by witing on 3 timeouts.
-  int64_t tolerance = 6;
+  milliseconds tolerance{6};
 
   ASSERT_EQ(timeouts.size(), t.timestamps.size());
   uint32_t total = 0;
-  for (int n = 0; n < timeouts.size(); ++n) {
+  for (size_t n = 0; n < timeouts.size(); ++n) {
     total += timeouts[n];
-    T_CHECK_TIMEOUT(start, t.timestamps[n], total, tolerance);
+    T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
   }
-  T_CHECK_TIMEOUT(start, end, total, tolerance);
+  T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
 }
 
 /**
@@ -975,22 +999,22 @@ TEST(EventBaseTest, RescheduleTimeout) {
   t2.scheduleTimeout(30);
   t3.scheduleTimeout(30);
 
-  auto f = static_cast<bool(TAsyncTimeout::*)(uint32_t)>(
-      &TAsyncTimeout::scheduleTimeout);
+  auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
+      &AsyncTimeout::scheduleTimeout);
 
   // after 10ms, reschedule t2 to run sooner than originally scheduled
-  eb.runAfterDelay(std::bind(f, &t2, 10), 10);
+  eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
   // after 10ms, reschedule t3 to run later than originally scheduled
-  eb.runAfterDelay(std::bind(f, &t3, 40), 10);
+  eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
 
   TimePoint start;
   eb.loop();
   TimePoint end;
 
-  T_CHECK_TIMEOUT(start, t1.timestamp, 15);
-  T_CHECK_TIMEOUT(start, t2.timestamp, 20);
-  T_CHECK_TIMEOUT(start, t3.timestamp, 50);
-  T_CHECK_TIMEOUT(start, end, 50);
+  T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
+  T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
+  T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
+  T_CHECK_TIMEOUT(start, end, milliseconds(50));
 }
 
 /**
@@ -1006,34 +1030,32 @@ TEST(EventBaseTest, CancelTimeout) {
 
   ReschedulingTimeout t(&eb, timeouts);
   t.start();
-  eb.runAfterDelay(std::bind(&TAsyncTimeout::cancelTimeout, &t), 50);
+  eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
 
   TimePoint start;
   eb.loop();
   TimePoint end;
 
   ASSERT_EQ(t.timestamps.size(), 2);
-  T_CHECK_TIMEOUT(start, t.timestamps[0], 10);
-  T_CHECK_TIMEOUT(start, t.timestamps[1], 40);
-  T_CHECK_TIMEOUT(start, end, 50);
+  T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
+  T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
+  T_CHECK_TIMEOUT(start, end, milliseconds(50));
 }
 
 /**
  * Test destroying a scheduled timeout object
  */
 TEST(EventBaseTest, DestroyTimeout) {
-  class DestroyTimeout : public TAsyncTimeout {
+  class DestroyTimeout : public AsyncTimeout {
    public:
-    DestroyTimeout(EventBase* eb, TAsyncTimeout* t)
-      : TAsyncTimeout(eb)
+    DestroyTimeout(EventBase* eb, AsyncTimeout* t)
+      : AsyncTimeout(eb)
       , timeout_(t) {}
 
-    virtual void timeoutExpired() noexcept {
-      delete timeout_;
-    }
+    void timeoutExpired() noexcept override { delete timeout_; }
 
    private:
-    TAsyncTimeout* timeout_;
+    AsyncTimeout* timeout_;
   };
 
   EventBase eb;
@@ -1048,7 +1070,7 @@ TEST(EventBaseTest, DestroyTimeout) {
   eb.loop();
   TimePoint end;
 
-  T_CHECK_TIMEOUT(start, end, 10);
+  T_CHECK_TIMEOUT(start, end, milliseconds(10));
 }
 
 
@@ -1082,7 +1104,7 @@ struct RunInThreadArg {
 };
 
 void runInThreadTestFunc(RunInThreadArg* arg) {
-  arg->data->values.push_back(make_pair(arg->thread, arg->value));
+  arg->data->values.emplace_back(arg->thread, arg->value);
   RunInThreadData* data = arg->data;
   delete arg;
 
@@ -1092,39 +1114,27 @@ void runInThreadTestFunc(RunInThreadArg* arg) {
   }
 }
 
-class RunInThreadTester : public concurrency::Runnable {
- public:
-  RunInThreadTester(int id, RunInThreadData* data) : id_(id), data_(data) {}
-
-  void run() {
-    // Call evb->runInThread() a number of times
-    {
-      for (int n = 0; n < data_->opsPerThread; ++n) {
-        RunInThreadArg* arg = new RunInThreadArg(data_, id_, n);
-        data_->evb.runInEventBaseThread(runInThreadTestFunc, arg);
-        usleep(10);
-      }
-    }
-  }
-
- private:
-  int id_;
-  RunInThreadData* data_;
-};
-
 TEST(EventBaseTest, RunInThread) {
-  uint32_t numThreads = 50;
-  uint32_t opsPerThread = 100;
+  constexpr uint32_t numThreads = 50;
+  constexpr uint32_t opsPerThread = 100;
   RunInThreadData data(numThreads, opsPerThread);
 
-  PosixThreadFactory threadFactory;
-  threadFactory.setDetached(false);
-  deque< std::shared_ptr<Thread> > threads;
-  for (int n = 0; n < numThreads; ++n) {
-    std::shared_ptr<RunInThreadTester> runner(new RunInThreadTester(n, &data));
-    std::shared_ptr<Thread> thread = threadFactory.newThread(runner);
-    threads.push_back(thread);
-    thread->start();
+  deque<std::thread> threads;
+  SCOPE_EXIT {
+    // Wait on all of the threads.
+    for (auto& thread : threads) {
+      thread.join();
+    }
+  };
+
+  for (uint32_t i = 0; i < numThreads; ++i) {
+    threads.emplace_back([i, &data] {
+        for (int n = 0; n < data.opsPerThread; ++n) {
+          RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
+          data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
+          usleep(10);
+        }
+      });
   }
 
   // Add a timeout event to run after 3 seconds.
@@ -1132,7 +1142,7 @@ TEST(EventBaseTest, RunInThread) {
   // Once the last thread exits, it will stop the loop().  However, this
   // timeout also stops the loop in case there is a bug performing the normal
   // stop.
-  data.evb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
+  data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
                          3000);
 
   TimePoint start;
@@ -1143,13 +1153,15 @@ TEST(EventBaseTest, RunInThread) {
   // to stop.  This should happen much sooner than the 3 second timeout.
   // Assert that it happens in under a second.  (This is still tons of extra
   // padding.)
-  int64_t timeTaken = end.getTime() - start.getTime();
-  ASSERT_LT(timeTaken, 1000);
-  VLOG(11) << "Time taken: " << timeTaken;
+
+  auto timeTaken = std::chrono::duration_cast<milliseconds>(
+    end.getTime() - start.getTime());
+  ASSERT_LT(timeTaken.count(), 1000);
+  VLOG(11) << "Time taken: " << timeTaken.count();
 
   // Verify that we have all of the events from every thread
   int expectedValues[numThreads];
-  for (int n = 0; n < numThreads; ++n) {
+  for (uint32_t n = 0; n < numThreads; ++n) {
     expectedValues[n] = 0;
   }
   for (deque< pair<int, int> >::const_iterator it = data.values.begin();
@@ -1160,18 +1172,88 @@ TEST(EventBaseTest, RunInThread) {
     ASSERT_EQ(expectedValues[threadID], value);
     ++expectedValues[threadID];
   }
-  for (int n = 0; n < numThreads; ++n) {
+  for (uint32_t n = 0; n < numThreads; ++n) {
     ASSERT_EQ(expectedValues[n], opsPerThread);
   }
+}
 
-  // Wait on all of the threads.  Otherwise we can exit and clean up
-  // RunInThreadData before the last thread exits, while it is still holding
-  // the RunInThreadData's mutex.
-  for (deque< std::shared_ptr<Thread> >::const_iterator it = threads.begin();
-       it != threads.end();
-       ++it) {
-    (*it)->join();
+//  This test simulates some calls, and verifies that the waiting happens by
+//  triggering what otherwise would be race conditions, and trying to detect
+//  whether any of the race conditions happened.
+TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
+  const size_t c = 256;
+  vector<unique_ptr<atomic<size_t>>> atoms(c);
+  for (size_t i = 0; i < c; ++i) {
+    auto& atom = atoms.at(i);
+    atom = std::make_unique<atomic<size_t>>(0);
+  }
+  vector<thread> threads;
+  for (size_t i = 0; i < c; ++i) {
+    threads.emplace_back([&atoms, i] {
+      EventBase eb;
+      auto& atom = *atoms.at(i);
+      auto ebth = thread([&] { eb.loopForever(); });
+      eb.waitUntilRunning();
+      eb.runInEventBaseThreadAndWait([&] {
+        size_t x = 0;
+        atom.compare_exchange_weak(
+            x, 1, std::memory_order_release, std::memory_order_relaxed);
+      });
+      size_t x = 0;
+      atom.compare_exchange_weak(
+          x, 2, std::memory_order_release, std::memory_order_relaxed);
+      eb.terminateLoopSoon();
+      ebth.join();
+    });
   }
+  for (size_t i = 0; i < c; ++i) {
+    auto& th = threads.at(i);
+    th.join();
+  }
+  size_t sum = 0;
+  for (auto& atom : atoms) {
+    sum += *atom;
+  }
+  EXPECT_EQ(c, sum);
+}
+
+TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
+  EventBase eb;
+  thread th(&EventBase::loopForever, &eb);
+  SCOPE_EXIT {
+    eb.terminateLoopSoon();
+    th.join();
+  };
+  auto mutated = false;
+  eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
+      mutated = true;
+  });
+  EXPECT_TRUE(mutated);
+}
+
+TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
+  EventBase eb;
+  thread th(&EventBase::loopForever, &eb);
+  SCOPE_EXIT {
+    eb.terminateLoopSoon();
+    th.join();
+  };
+  eb.runInEventBaseThreadAndWait([&] {
+      auto mutated = false;
+      eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
+          mutated = true;
+      });
+      EXPECT_TRUE(mutated);
+  });
+}
+
+TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
+  EventBase eb;
+  auto mutated = false;
+  eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
+      mutated = true;
+    });
+  EXPECT_TRUE(mutated);
 }
 
 ///////////////////////////////////////////////////////////////////////////
@@ -1188,7 +1270,7 @@ class CountedLoopCallback : public EventBase::LoopCallback {
     , count_(count)
     , action_(action) {}
 
-  virtual void runLoopCallback() noexcept {
+  void runLoopCallback() noexcept override {
     --count_;
     if (count_ > 0) {
       eventBase_->runInLoop(this);
@@ -1223,6 +1305,21 @@ TEST(EventBaseTest, RepeatedRunInLoop) {
   ASSERT_EQ(c.getCount(), 0);
 }
 
+// Test that EventBase::loop() works as expected without time measurements.
+TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
+  EventBase eventBase(false);
+
+  CountedLoopCallback c(&eventBase, 10);
+  eventBase.runInLoop(&c);
+  // The callback shouldn't have run immediately
+  ASSERT_EQ(c.getCount(), 10);
+  eventBase.loop();
+
+  // loop() should loop until the CountedLoopCallback stops
+  // re-installing itself.
+  ASSERT_EQ(c.getCount(), 0);
+}
+
 // Test runInLoop() calls with terminateLoopSoon()
 TEST(EventBaseTest, RunInLoopStopLoop) {
   EventBase eventBase;
@@ -1252,6 +1349,35 @@ TEST(EventBaseTest, RunInLoopStopLoop) {
   ASSERT_LE(c1.getCount(), 11);
 }
 
+TEST(EventBaseTest, messageAvailableException) {
+  auto deadManWalking = [] {
+    EventBase eventBase;
+    std::thread t([&] {
+      // Call this from another thread to force use of NotificationQueue in
+      // runInEventBaseThread
+      eventBase.runInEventBaseThread(
+          []() { throw std::runtime_error("boom"); });
+    });
+    t.join();
+    eventBase.loopForever();
+  };
+  EXPECT_DEATH(deadManWalking(), ".*");
+}
+
+TEST(EventBaseTest, TryRunningAfterTerminate) {
+  EventBase eventBase;
+  CountedLoopCallback c1(&eventBase, 1,
+                         std::bind(&EventBase::terminateLoopSoon, &eventBase));
+  eventBase.runInLoop(&c1);
+  eventBase.loopForever();
+  bool ran = false;
+  eventBase.runInEventBaseThread([&]() {
+    ran = true;
+  });
+
+  ASSERT_FALSE(ran);
+}
+
 // Test cancelling runInLoop() callbacks
 TEST(EventBaseTest, CancelRunInLoop) {
   EventBase eventBase;
@@ -1288,7 +1414,7 @@ TEST(EventBaseTest, CancelRunInLoop) {
   // Run the loop
   eventBase.loop();
 
-  // cancelC1 and cancelC3 should have both fired after 10 iterations and
+  // cancelC1 and cancelC2 should have both fired after 10 iterations and
   // stopped re-installing themselves
   ASSERT_EQ(cancelC1.getCount(), 0);
   ASSERT_EQ(cancelC2.getCount(), 0);
@@ -1327,7 +1453,7 @@ class TerminateTestCallback : public EventBase::LoopCallback,
     unregisterHandler();
   }
 
-  virtual void handlerReady(uint16_t events) noexcept {
+  void handlerReady(uint16_t /* events */) noexcept override {
     // We didn't register with PERSIST, so we will have been automatically
     // unregistered already.
     ASSERT_FALSE(isHandlerRegistered());
@@ -1339,7 +1465,7 @@ class TerminateTestCallback : public EventBase::LoopCallback,
 
     eventBase_->runInLoop(this);
   }
-  virtual void runLoopCallback() noexcept {
+  void runLoopCallback() noexcept override {
     ++loopInvocations_;
     if (loopInvocations_ >= maxLoopInvocations_) {
       return;
@@ -1404,21 +1530,21 @@ TEST(EventBaseTest, LoopTermination) {
 // Tests for latency calculations
 ///////////////////////////////////////////////////////////////////////////
 
-class IdleTimeTimeoutSeries : public TAsyncTimeout {
+class IdleTimeTimeoutSeries : public AsyncTimeout {
 
  public:
 
   explicit IdleTimeTimeoutSeries(EventBase *base,
                                  std::deque<std::uint64_t>& timeout) :
-    TAsyncTimeout(base),
+    AsyncTimeout(base),
     timeouts_(0),
     timeout_(timeout) {
       scheduleTimeout(1);
     }
 
-  virtual ~IdleTimeTimeoutSeries() {}
+    ~IdleTimeTimeoutSeries() override {}
 
-  void timeoutExpired() noexcept {
+    void timeoutExpired() noexcept override {
     ++timeouts_;
 
     if(timeout_.empty()){
@@ -1453,7 +1579,7 @@ class IdleTimeTimeoutSeries : public TAsyncTimeout {
  */
 TEST(EventBaseTest, IdleTime) {
   EventBase eventBase;
-  eventBase.setLoadAvgMsec(1000);
+  eventBase.setLoadAvgMsec(1000ms);
   eventBase.resetLoadAvg(5900.0);
   std::deque<uint64_t> timeouts0(4, 8080);
   timeouts0.push_front(8000);
@@ -1461,23 +1587,30 @@ TEST(EventBaseTest, IdleTime) {
   IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
   std::deque<uint64_t> timeouts(20, 20);
   std::unique_ptr<IdleTimeTimeoutSeries> tos;
+  int64_t testStart = duration_cast<microseconds>(
+    std::chrono::steady_clock::now().time_since_epoch()).count();
+  bool hostOverloaded = false;
 
   int latencyCallbacks = 0;
-  eventBase.setMaxLatency(6000, [&]() {
+  eventBase.setMaxLatency(6000us, [&]() {
     ++latencyCallbacks;
-
-    switch (latencyCallbacks) {
-    case 1:
-      ASSERT_EQ(6, tos0.getTimeouts());
-      ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
-      ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
-      tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
-      break;
-
-    default:
+    if (latencyCallbacks != 1) {
       FAIL() << "Unexpected latency callback";
-      break;
     }
+
+    if (tos0.getTimeouts() < 6) {
+      // This could only happen if the host this test is running
+      // on is heavily loaded.
+      int64_t maxLatencyReached = duration_cast<microseconds>(
+          std::chrono::steady_clock::now().time_since_epoch()).count();
+      ASSERT_LE(43800, maxLatencyReached - testStart);
+      hostOverloaded = true;
+      return;
+    }
+    ASSERT_EQ(6, tos0.getTimeouts());
+    ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
+    ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
+    tos = std::make_unique<IdleTimeTimeoutSeries>(&eventBase, timeouts);
   });
 
   // Kick things off with an "immedite" timeout
@@ -1485,10 +1618,15 @@ TEST(EventBaseTest, IdleTime) {
 
   eventBase.loop();
 
+  if (hostOverloaded) {
+    return;
+  }
+
   ASSERT_EQ(1, latencyCallbacks);
   ASSERT_EQ(7, tos0.getTimeouts());
   ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
   ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
+  ASSERT_TRUE(!!tos);
   ASSERT_EQ(21, tos->getTimeouts());
 }
 
@@ -1526,7 +1664,7 @@ TEST(EventBaseTest, EventBaseThreadLoop) {
   });
   base.loop();
 
-  ASSERT_EQ(true, ran);
+  ASSERT_TRUE(ran);
 }
 
 TEST(EventBaseTest, EventBaseThreadName) {
@@ -1541,25 +1679,279 @@ TEST(EventBaseTest, EventBaseThreadName) {
 #endif
 }
 
-TEST(TEventBaseTest, RunBeforeLoop) {
-  TEventBase base;
+TEST(EventBaseTest, RunBeforeLoop) {
+  EventBase base;
   CountedLoopCallback cb(&base, 1, [&](){
     base.terminateLoopSoon();
   });
   base.runBeforeLoop(&cb);
   base.loopForever();
-  ASSERT_EQUAL(cb.getCount(), 0);
+  ASSERT_EQ(cb.getCount(), 0);
 }
 
-TEST(TEventBaseTest, RunBeforeLoopWait) {
-  TEventBase base;
+TEST(EventBaseTest, RunBeforeLoopWait) {
+  EventBase base;
   CountedLoopCallback cb(&base, 1);
-  base.runAfterDelay([&](){
+  base.tryRunAfterDelay([&](){
       base.terminateLoopSoon();
     }, 500);
   base.runBeforeLoop(&cb);
   base.loopForever();
 
   // Check that we only ran once, and did not loop multiple times.
-  ASSERT_EQUAL(cb.getCount(), 0);
+  ASSERT_EQ(cb.getCount(), 0);
+}
+
+class PipeHandler : public EventHandler {
+ public:
+  PipeHandler(EventBase* eventBase, int fd)
+    : EventHandler(eventBase, fd) {}
+
+  void handlerReady(uint16_t /* events */) noexcept override { abort(); }
+};
+
+TEST(EventBaseTest, StopBeforeLoop) {
+  EventBase evb;
+
+  // Give the evb something to do.
+  int p[2];
+  ASSERT_EQ(0, pipe(p));
+  PipeHandler handler(&evb, p[0]);
+  handler.registerHandler(EventHandler::READ);
+
+  // It's definitely not running yet
+  evb.terminateLoopSoon();
+
+  // let it run, it should exit quickly.
+  std::thread t([&] { evb.loop(); });
+  t.join();
+
+  handler.unregisterHandler();
+  close(p[0]);
+  close(p[1]);
+
+  SUCCEED();
+}
+
+TEST(EventBaseTest, RunCallbacksOnDestruction) {
+  bool ran = false;
+
+  {
+    EventBase base;
+    base.runInEventBaseThread([&](){
+      ran = true;
+    });
+  }
+
+  ASSERT_TRUE(ran);
+}
+
+TEST(EventBaseTest, LoopKeepAlive) {
+  EventBase evb;
+
+  bool done = false;
+  std::thread t([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
+    /* sleep override */ std::this_thread::sleep_for(
+        std::chrono::milliseconds(100));
+    evb.runInEventBaseThread(
+        [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
+  });
+
+  evb.loop();
+
+  ASSERT_TRUE(done);
+
+  t.join();
+}
+
+TEST(EventBaseTest, LoopKeepAliveInLoop) {
+  EventBase evb;
+
+  bool done = false;
+  std::thread t;
+
+  evb.runInEventBaseThread([&] {
+    t = std::thread([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
+      /* sleep override */ std::this_thread::sleep_for(
+          std::chrono::milliseconds(100));
+      evb.runInEventBaseThread(
+          [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
+    });
+  });
+
+  evb.loop();
+
+  ASSERT_TRUE(done);
+
+  t.join();
+}
+
+TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
+  std::unique_ptr<EventBase> evb = std::make_unique<EventBase>();
+
+  bool done = false;
+
+  std::thread evThread([&] {
+    evb->loopForever();
+    evb.reset();
+    done = true;
+  });
+
+  {
+    auto* ev = evb.get();
+    Executor::KeepAlive keepAlive;
+    ev->runInEventBaseThreadAndWait(
+        [&ev, &keepAlive] { keepAlive = ev->getKeepAliveToken(); });
+    ASSERT_FALSE(done) << "Loop finished before we asked it to";
+    ev->terminateLoopSoon();
+    /* sleep override */
+    std::this_thread::sleep_for(std::chrono::milliseconds(30));
+    ASSERT_FALSE(done) << "Loop terminated early";
+    ev->runInEventBaseThread([keepAlive = std::move(keepAlive)]{});
+  }
+
+  evThread.join();
+  ASSERT_TRUE(done);
+}
+
+TEST(EventBaseTest, LoopKeepAliveShutdown) {
+  auto evb = std::make_unique<EventBase>();
+
+  bool done = false;
+
+  std::thread t([
+    &done,
+    loopKeepAlive = evb->getKeepAliveToken(),
+    evbPtr = evb.get()
+  ]() mutable {
+    /* sleep override */ std::this_thread::sleep_for(
+        std::chrono::milliseconds(100));
+    evbPtr->runInEventBaseThread(
+        [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
+  });
+
+  evb.reset();
+
+  ASSERT_TRUE(done);
+
+  t.join();
+}
+
+TEST(EventBaseTest, LoopKeepAliveAtomic) {
+  auto evb = std::make_unique<EventBase>();
+
+  static constexpr size_t kNumThreads = 100;
+  static constexpr size_t kNumTasks = 100;
+
+  std::vector<std::thread> ts;
+  std::vector<std::unique_ptr<Baton<>>> batons;
+  size_t done{0};
+
+  for (size_t i = 0; i < kNumThreads; ++i) {
+    batons.emplace_back(std::make_unique<Baton<>>());
+  }
+
+  for (size_t i = 0; i < kNumThreads; ++i) {
+    ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
+      std::vector<Executor::KeepAlive> keepAlives;
+      for (size_t j = 0; j < kNumTasks; ++j) {
+        keepAlives.emplace_back(evbPtr->getKeepAliveToken());
+      }
+
+      batonPtr->post();
+
+      /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+      for (auto& keepAlive : keepAlives) {
+        evbPtr->runInEventBaseThread(
+            [&done, keepAlive = std::move(keepAlive) ]() { ++done; });
+      }
+    });
+  }
+
+  for (auto& baton : batons) {
+    baton->wait();
+  }
+
+  evb.reset();
+
+  EXPECT_EQ(kNumThreads * kNumTasks, done);
+
+  for (auto& t : ts) {
+    t.join();
+  }
+}
+
+TEST(EventBaseTest, DrivableExecutorTest) {
+  folly::Promise<bool> p;
+  auto f = p.getFuture();
+  EventBase base;
+  bool finished = false;
+
+  std::thread t([&] {
+    /* sleep override */
+    std::this_thread::sleep_for(std::chrono::microseconds(10));
+    finished = true;
+    base.runInEventBaseThread([&]() { p.setValue(true); });
+  });
+
+  // Ensure drive does not busy wait
+  base.drive(); // TODO: fix notification queue init() extra wakeup
+  base.drive();
+  EXPECT_TRUE(finished);
+
+  folly::Promise<bool> p2;
+  auto f2 = p2.getFuture();
+  // Ensure waitVia gets woken up properly, even from
+  // a separate thread.
+  base.runAfterDelay([&]() { p2.setValue(true); }, 10);
+  f2.waitVia(&base);
+  EXPECT_TRUE(f2.isReady());
+
+  t.join();
+}
+
+TEST(EventBaseTest, RequestContextTest) {
+  EventBase evb;
+  auto defaultCtx = RequestContext::get();
+  std::weak_ptr<RequestContext> rctx_weak_ptr;
+
+  {
+    RequestContextScopeGuard rctx;
+    rctx_weak_ptr = RequestContext::saveContext();
+    auto context = RequestContext::get();
+    EXPECT_NE(defaultCtx, context);
+    evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
+    evb.loop();
+  }
+
+  // Ensure that RequestContext created for the scope has been released and
+  // deleted.
+  EXPECT_EQ(rctx_weak_ptr.expired(), true);
+
+  EXPECT_EQ(defaultCtx, RequestContext::get());
+}
+
+TEST(EventBaseTest, CancelLoopCallbackRequestContextTest) {
+  EventBase evb;
+  CountedLoopCallback c(&evb, 1);
+
+  auto defaultCtx = RequestContext::get();
+  EXPECT_EQ(defaultCtx, RequestContext::get());
+  std::weak_ptr<RequestContext> rctx_weak_ptr;
+
+  {
+    RequestContextScopeGuard rctx;
+    rctx_weak_ptr = RequestContext::saveContext();
+    auto context = RequestContext::get();
+    EXPECT_NE(defaultCtx, context);
+    evb.runInLoop(&c);
+    c.cancelLoopCallback();
+  }
+
+  // Ensure that RequestContext created for the scope has been released and
+  // deleted.
+  EXPECT_EQ(rctx_weak_ptr.expired(), true);
+
+  EXPECT_EQ(defaultCtx, RequestContext::get());
 }