Move some tests to folly
authorDaniel Sommermann <dcsommer@fb.com>
Thu, 13 Nov 2014 18:49:31 +0000 (10:49 -0800)
committerDave Watson <davejwatson@fb.com>
Wed, 19 Nov 2014 20:53:07 +0000 (12:53 -0800)
Summary:
These lived in fbthrift previously, but they should move with the
main code too.

Test Plan: unit tests

Reviewed By: davejwatson@fb.com

Subscribers: doug, alandau, bmatheny, njormrod, mshneer, folly-diffs@

FB internal diff: D1683229

Signature: t1:1683229:1416010730:36fb7e4c9916ae7a9b5972cd476f82014c5f4c78

folly/Makefile.am
folly/io/async/test/EventBaseTest.cpp [new file with mode: 0644]
folly/io/async/test/HHWheelTimerTest.cpp [new file with mode: 0644]
folly/io/async/test/SocketPair.cpp [new file with mode: 0644]
folly/io/async/test/SocketPair.h [new file with mode: 0644]
folly/io/async/test/TimeUtil.cpp [new file with mode: 0644]
folly/io/async/test/TimeUtil.h [new file with mode: 0644]
folly/io/async/test/UndelayedDestruction.h [new file with mode: 0644]
folly/io/async/test/Util.h [new file with mode: 0644]

index 66f62e9842050dc7bf1b7b40e82077bc5556bb59..4aca70683da89c6426d7d39493155f6aaad96ec3 100644 (file)
@@ -167,6 +167,9 @@ nobase_follyinclude_HEADERS = \
        io/async/Request.h \
        io/async/SSLContext.h \
        io/async/TimeoutManager.h \
+       io/async/test/TimeUtil.h \
+       io/async/test/UndelayedDestruction.h \
+       io/async/test/Util.h \
        json.h \
        Lazy.h \
        LifoSem.h \
@@ -296,6 +299,7 @@ libfolly_la_SOURCES = \
        io/async/Request.cpp \
        io/async/SSLContext.cpp \
        io/async/HHWheelTimer.cpp \
+       io/async/test/TimeUtil.cpp \
        json.cpp \
        detail/MemoryIdler.cpp \
        MacAddress.cpp \
diff --git a/folly/io/async/test/EventBaseTest.cpp b/folly/io/async/test/EventBaseTest.cpp
new file mode 100644 (file)
index 0000000..ddf70a0
--- /dev/null
@@ -0,0 +1,1542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <folly/io/async/AsyncTimeout.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/EventHandler.h>
+#include <folly/io/async/test/SocketPair.h>
+#include <folly/io/async/test/Util.h>
+
+#include <iostream>
+#include <unistd.h>
+#include <memory>
+
+using std::deque;
+using std::pair;
+using std::vector;
+using std::make_pair;
+using std::cerr;
+using std::endl;
+
+using namespace folly;
+
+///////////////////////////////////////////////////////////////////////////
+// Tests for read and write events
+///////////////////////////////////////////////////////////////////////////
+
+enum { BUF_SIZE = 4096 };
+
+ssize_t writeToFD(int fd, size_t length) {
+  // write an arbitrary amount of data to the fd
+  char buf[length];
+  memset(buf, 'a', sizeof(buf));
+  ssize_t rc = write(fd, buf, sizeof(buf));
+  CHECK_EQ(rc, length);
+  return rc;
+}
+
+size_t writeUntilFull(int fd) {
+  // Write to the fd until EAGAIN is returned
+  size_t bytesWritten = 0;
+  char buf[BUF_SIZE];
+  memset(buf, 'a', sizeof(buf));
+  while (true) {
+    ssize_t rc = write(fd, buf, sizeof(buf));
+    if (rc < 0) {
+      CHECK_EQ(errno, EAGAIN);
+      break;
+    } else {
+      bytesWritten += rc;
+    }
+  }
+  return bytesWritten;
+}
+
+ssize_t readFromFD(int fd, size_t length) {
+  // write an arbitrary amount of data to the fd
+  char buf[length];
+  return read(fd, buf, sizeof(buf));
+}
+
+size_t readUntilEmpty(int fd) {
+  // Read from the fd until EAGAIN is returned
+  char buf[BUF_SIZE];
+  size_t bytesRead = 0;
+  while (true) {
+    int rc = read(fd, buf, sizeof(buf));
+    if (rc == 0) {
+      CHECK(false) << "unexpected EOF";
+    } else if (rc < 0) {
+      CHECK_EQ(errno, EAGAIN);
+      break;
+    } else {
+      bytesRead += rc;
+    }
+  }
+  return bytesRead;
+}
+
+void checkReadUntilEmpty(int fd, size_t expectedLength) {
+  ASSERT_EQ(readUntilEmpty(fd), expectedLength);
+}
+
+struct ScheduledEvent {
+  int milliseconds;
+  uint16_t events;
+  size_t length;
+  ssize_t result;
+
+  void perform(int fd) {
+    if (events & EventHandler::READ) {
+      if (length == 0) {
+        result = readUntilEmpty(fd);
+      } else {
+        result = readFromFD(fd, length);
+      }
+    }
+    if (events & EventHandler::WRITE) {
+      if (length == 0) {
+        result = writeUntilFull(fd);
+      } else {
+        result = writeToFD(fd, length);
+      }
+    }
+  }
+};
+
+void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
+  for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
+    eventBase->runAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
+                             ev->milliseconds);
+  }
+}
+
+class TestHandler : public EventHandler {
+ public:
+  TestHandler(EventBase* eventBase, int fd)
+    : EventHandler(eventBase, fd), fd_(fd) {}
+
+  virtual void handlerReady(uint16_t events) noexcept {
+    ssize_t bytesRead = 0;
+    ssize_t bytesWritten = 0;
+    if (events & READ) {
+      // Read all available data, so EventBase will stop calling us
+      // until new data becomes available
+      bytesRead = readUntilEmpty(fd_);
+    }
+    if (events & WRITE) {
+      // Write until the pipe buffer is full, so EventBase will stop calling
+      // us until the other end has read some data
+      bytesWritten = writeUntilFull(fd_);
+    }
+
+    log.push_back(EventRecord(events, bytesRead, bytesWritten));
+  }
+
+  struct EventRecord {
+    EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
+      : events(events)
+      , timestamp()
+      , bytesRead(bytesRead)
+      , bytesWritten(bytesWritten) {}
+
+    uint16_t events;
+    TimePoint timestamp;
+    ssize_t bytesRead;
+    ssize_t bytesWritten;
+  };
+
+  deque<EventRecord> log;
+
+ private:
+  int fd_;
+};
+
+/**
+ * Test a READ event
+ */
+TEST(EventBaseTest, ReadEvent) {
+  EventBase eb;
+  SocketPair sp;
+
+  // Register for read events
+  TestHandler handler(&eb, sp[0]);
+  handler.registerHandler(EventHandler::READ);
+
+  // Register timeouts to perform two write events
+  ScheduledEvent events[] = {
+    { 10, EventHandler::WRITE, 2345 },
+    { 160, EventHandler::WRITE, 99 },
+    { 0, 0, 0 },
+  };
+  scheduleEvents(&eb, sp[1], events);
+
+  // Loop
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  // Since we didn't use the EventHandler::PERSIST flag, the handler should
+  // have received the first read, then unregistered itself.  Check that only
+  // the first chunk of data was received.
+  ASSERT_EQ(handler.log.size(), 1);
+  ASSERT_EQ(handler.log[0].events, EventHandler::READ);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds, 90);
+  ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
+  ASSERT_EQ(handler.log[0].bytesWritten, 0);
+  T_CHECK_TIMEOUT(start, end, events[1].milliseconds, 30);
+
+  // Make sure the second chunk of data is still waiting to be read.
+  size_t bytesRemaining = readUntilEmpty(sp[0]);
+  ASSERT_EQ(bytesRemaining, events[1].length);
+}
+
+/**
+ * Test (READ | PERSIST)
+ */
+TEST(EventBaseTest, ReadPersist) {
+  EventBase eb;
+  SocketPair sp;
+
+  // Register for read events
+  TestHandler handler(&eb, sp[0]);
+  handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
+
+  // Register several timeouts to perform writes
+  ScheduledEvent events[] = {
+    { 10,  EventHandler::WRITE, 1024 },
+    { 20,  EventHandler::WRITE, 2211 },
+    { 30,  EventHandler::WRITE, 4096 },
+    { 100, EventHandler::WRITE, 100 },
+    { 0, 0 },
+  };
+  scheduleEvents(&eb, sp[1], events);
+
+  // Schedule a timeout to unregister the handler after the third write
+  eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
+
+  // Loop
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  // The handler should have received the first 3 events,
+  // then been unregistered after that.
+  ASSERT_EQ(handler.log.size(), 3);
+  for (int n = 0; n < 3; ++n) {
+    ASSERT_EQ(handler.log[n].events, EventHandler::READ);
+    T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds);
+    ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
+    ASSERT_EQ(handler.log[n].bytesWritten, 0);
+  }
+  T_CHECK_TIMEOUT(start, end, events[3].milliseconds);
+
+  // Make sure the data from the last write is still waiting to be read
+  size_t bytesRemaining = readUntilEmpty(sp[0]);
+  ASSERT_EQ(bytesRemaining, events[3].length);
+}
+
+/**
+ * Test registering for READ when the socket is immediately readable
+ */
+TEST(EventBaseTest, ReadImmediate) {
+  EventBase eb;
+  SocketPair sp;
+
+  // Write some data to the socket so the other end will
+  // be immediately readable
+  size_t dataLength = 1234;
+  writeToFD(sp[1], dataLength);
+
+  // Register for read events
+  TestHandler handler(&eb, sp[0]);
+  handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
+
+  // Register a timeout to perform another write
+  ScheduledEvent events[] = {
+    { 10, EventHandler::WRITE, 2345 },
+    { 0, 0, 0 },
+  };
+  scheduleEvents(&eb, sp[1], events);
+
+  // Schedule a timeout to unregister the handler
+  eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
+
+  // Loop
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  ASSERT_EQ(handler.log.size(), 2);
+
+  // There should have been 1 event for immediate readability
+  ASSERT_EQ(handler.log[0].events, EventHandler::READ);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
+  ASSERT_EQ(handler.log[0].bytesRead, dataLength);
+  ASSERT_EQ(handler.log[0].bytesWritten, 0);
+
+  // There should be another event after the timeout wrote more data
+  ASSERT_EQ(handler.log[1].events, EventHandler::READ);
+  T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds);
+  ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
+  ASSERT_EQ(handler.log[1].bytesWritten, 0);
+
+  T_CHECK_TIMEOUT(start, end, 20);
+}
+
+/**
+ * Test a WRITE event
+ */
+TEST(EventBaseTest, WriteEvent) {
+  EventBase eb;
+  SocketPair sp;
+
+  // Fill up the write buffer before starting
+  size_t initialBytesWritten = writeUntilFull(sp[0]);
+
+  // Register for write events
+  TestHandler handler(&eb, sp[0]);
+  handler.registerHandler(EventHandler::WRITE);
+
+  // Register timeouts to perform two reads
+  ScheduledEvent events[] = {
+    { 10, EventHandler::READ, 0 },
+    { 60, EventHandler::READ, 0 },
+    { 0, 0, 0 },
+  };
+  scheduleEvents(&eb, sp[1], events);
+
+  // Loop
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  // Since we didn't use the EventHandler::PERSIST flag, the handler should
+  // have only been able to write once, then unregistered itself.
+  ASSERT_EQ(handler.log.size(), 1);
+  ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+  ASSERT_EQ(handler.log[0].bytesRead, 0);
+  ASSERT_GT(handler.log[0].bytesWritten, 0);
+  T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
+
+  ASSERT_EQ(events[0].result, initialBytesWritten);
+  ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
+}
+
+/**
+ * Test (WRITE | PERSIST)
+ */
+TEST(EventBaseTest, WritePersist) {
+  EventBase eb;
+  SocketPair sp;
+
+  // Fill up the write buffer before starting
+  size_t initialBytesWritten = writeUntilFull(sp[0]);
+
+  // Register for write events
+  TestHandler handler(&eb, sp[0]);
+  handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
+
+  // Register several timeouts to read from the socket at several intervals
+  ScheduledEvent events[] = {
+    { 10,  EventHandler::READ, 0 },
+    { 40,  EventHandler::READ, 0 },
+    { 70,  EventHandler::READ, 0 },
+    { 100, EventHandler::READ, 0 },
+    { 0, 0 },
+  };
+  scheduleEvents(&eb, sp[1], events);
+
+  // Schedule a timeout to unregister the handler after the third read
+  eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
+
+  // Loop
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  // The handler should have received the first 3 events,
+  // then been unregistered after that.
+  ASSERT_EQ(handler.log.size(), 3);
+  ASSERT_EQ(events[0].result, initialBytesWritten);
+  for (int n = 0; n < 3; ++n) {
+    ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
+    T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds);
+    ASSERT_EQ(handler.log[n].bytesRead, 0);
+    ASSERT_GT(handler.log[n].bytesWritten, 0);
+    ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
+  }
+  T_CHECK_TIMEOUT(start, end, events[3].milliseconds);
+}
+
+/**
+ * Test registering for WRITE when the socket is immediately writable
+ */
+TEST(EventBaseTest, WriteImmediate) {
+  EventBase eb;
+  SocketPair sp;
+
+  // Register for write events
+  TestHandler handler(&eb, sp[0]);
+  handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
+
+  // Register a timeout to perform a read
+  ScheduledEvent events[] = {
+    { 10, EventHandler::READ, 0 },
+    { 0, 0, 0 },
+  };
+  scheduleEvents(&eb, sp[1], events);
+
+  // Schedule a timeout to unregister the handler
+  int64_t unregisterTimeout = 40;
+  eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
+                   unregisterTimeout);
+
+  // Loop
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  ASSERT_EQ(handler.log.size(), 2);
+
+  // Since the socket buffer was initially empty,
+  // there should have been 1 event for immediate writability
+  ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
+  ASSERT_EQ(handler.log[0].bytesRead, 0);
+  ASSERT_GT(handler.log[0].bytesWritten, 0);
+
+  // There should be another event after the timeout wrote more data
+  ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
+  T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds);
+  ASSERT_EQ(handler.log[1].bytesRead, 0);
+  ASSERT_GT(handler.log[1].bytesWritten, 0);
+
+  T_CHECK_TIMEOUT(start, end, unregisterTimeout);
+}
+
+/**
+ * Test (READ | WRITE) when the socket becomes readable first
+ */
+TEST(EventBaseTest, ReadWrite) {
+  EventBase eb;
+  SocketPair sp;
+
+  // Fill up the write buffer before starting
+  size_t sock0WriteLength = writeUntilFull(sp[0]);
+
+  // Register for read and write events
+  TestHandler handler(&eb, sp[0]);
+  handler.registerHandler(EventHandler::READ_WRITE);
+
+  // Register timeouts to perform a write then a read.
+  ScheduledEvent events[] = {
+    { 10, EventHandler::WRITE, 2345 },
+    { 40, EventHandler::READ, 0 },
+    { 0, 0, 0 },
+  };
+  scheduleEvents(&eb, sp[1], events);
+
+  // Loop
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  // Since we didn't use the EventHandler::PERSIST flag, the handler should
+  // have only noticed readability, then unregistered itself.  Check that only
+  // one event was logged.
+  ASSERT_EQ(handler.log.size(), 1);
+  ASSERT_EQ(handler.log[0].events, EventHandler::READ);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+  ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
+  ASSERT_EQ(handler.log[0].bytesWritten, 0);
+  ASSERT_EQ(events[1].result, sock0WriteLength);
+  T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
+}
+
+/**
+ * Test (READ | WRITE) when the socket becomes writable first
+ */
+TEST(EventBaseTest, WriteRead) {
+  EventBase eb;
+  SocketPair sp;
+
+  // Fill up the write buffer before starting
+  size_t sock0WriteLength = writeUntilFull(sp[0]);
+
+  // Register for read and write events
+  TestHandler handler(&eb, sp[0]);
+  handler.registerHandler(EventHandler::READ_WRITE);
+
+  // Register timeouts to perform a read then a write.
+  size_t sock1WriteLength = 2345;
+  ScheduledEvent events[] = {
+    { 10, EventHandler::READ, 0 },
+    { 40, EventHandler::WRITE, sock1WriteLength },
+    { 0, 0, 0 },
+  };
+  scheduleEvents(&eb, sp[1], events);
+
+  // Loop
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  // Since we didn't use the EventHandler::PERSIST flag, the handler should
+  // have only noticed writability, then unregistered itself.  Check that only
+  // one event was logged.
+  ASSERT_EQ(handler.log.size(), 1);
+  ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+  ASSERT_EQ(handler.log[0].bytesRead, 0);
+  ASSERT_GT(handler.log[0].bytesWritten, 0);
+  ASSERT_EQ(events[0].result, sock0WriteLength);
+  ASSERT_EQ(events[1].result, sock1WriteLength);
+  T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
+
+  // Make sure the written data is still waiting to be read.
+  size_t bytesRemaining = readUntilEmpty(sp[0]);
+  ASSERT_EQ(bytesRemaining, events[1].length);
+}
+
+/**
+ * Test (READ | WRITE) when the socket becomes readable and writable
+ * at the same time.
+ */
+TEST(EventBaseTest, ReadWriteSimultaneous) {
+  EventBase eb;
+  SocketPair sp;
+
+  // Fill up the write buffer before starting
+  size_t sock0WriteLength = writeUntilFull(sp[0]);
+
+  // Register for read and write events
+  TestHandler handler(&eb, sp[0]);
+  handler.registerHandler(EventHandler::READ_WRITE);
+
+  // Register a timeout to perform a read and write together
+  ScheduledEvent events[] = {
+    { 10, EventHandler::READ | EventHandler::WRITE, 0 },
+    { 0, 0, 0 },
+  };
+  scheduleEvents(&eb, sp[1], events);
+
+  // Loop
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  // It's not strictly required that the EventBase register us about both
+  // events in the same call.  So, it's possible that if the EventBase
+  // implementation changes this test could start failing, and it wouldn't be
+  // considered breaking the API.  However for now it's nice to exercise this
+  // code path.
+  ASSERT_EQ(handler.log.size(), 1);
+  ASSERT_EQ(handler.log[0].events,
+                    EventHandler::READ | EventHandler::WRITE);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
+  ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
+  ASSERT_GT(handler.log[0].bytesWritten, 0);
+  T_CHECK_TIMEOUT(start, end, events[0].milliseconds);
+}
+
+/**
+ * Test (READ | WRITE | PERSIST)
+ */
+TEST(EventBaseTest, ReadWritePersist) {
+  EventBase eb;
+  SocketPair sp;
+
+  // Register for read and write events
+  TestHandler handler(&eb, sp[0]);
+  handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
+                          EventHandler::PERSIST);
+
+  // Register timeouts to perform several reads and writes
+  ScheduledEvent events[] = {
+    { 10, EventHandler::WRITE, 2345 },
+    { 20, EventHandler::READ, 0 },
+    { 35, EventHandler::WRITE, 200 },
+    { 45, EventHandler::WRITE, 15 },
+    { 55, EventHandler::READ, 0 },
+    { 120, EventHandler::WRITE, 2345 },
+    { 0, 0, 0 },
+  };
+  scheduleEvents(&eb, sp[1], events);
+
+  // Schedule a timeout to unregister the handler
+  eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
+
+  // Loop
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  ASSERT_EQ(handler.log.size(), 6);
+
+  // Since we didn't fill up the write buffer immediately, there should
+  // be an immediate event for writability.
+  ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
+  T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
+  ASSERT_EQ(handler.log[0].bytesRead, 0);
+  ASSERT_GT(handler.log[0].bytesWritten, 0);
+
+  // Events 1 through 5 should correspond to the scheduled events
+  for (int n = 1; n < 6; ++n) {
+    ScheduledEvent* event = &events[n - 1];
+    T_CHECK_TIMEOUT(start, handler.log[n].timestamp, event->milliseconds);
+    if (event->events == EventHandler::READ) {
+      ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
+      ASSERT_EQ(handler.log[n].bytesRead, 0);
+      ASSERT_GT(handler.log[n].bytesWritten, 0);
+    } else {
+      ASSERT_EQ(handler.log[n].events, EventHandler::READ);
+      ASSERT_EQ(handler.log[n].bytesRead, event->length);
+      ASSERT_EQ(handler.log[n].bytesWritten, 0);
+    }
+  }
+
+  // The timeout should have unregistered the handler before the last write.
+  // Make sure that data is still waiting to be read
+  size_t bytesRemaining = readUntilEmpty(sp[0]);
+  ASSERT_EQ(bytesRemaining, events[5].length);
+}
+
+
+class PartialReadHandler : public TestHandler {
+ public:
+  PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
+    : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
+
+  virtual void handlerReady(uint16_t events) noexcept {
+    assert(events == EventHandler::READ);
+    ssize_t bytesRead = readFromFD(fd_, readLength_);
+    log.push_back(EventRecord(events, bytesRead, 0));
+  }
+
+ private:
+  int fd_;
+  size_t readLength_;
+};
+
+/**
+ * Test reading only part of the available data when a read event is fired.
+ * When PERSIST is used, make sure the handler gets notified again the next
+ * time around the loop.
+ */
+TEST(EventBaseTest, ReadPartial) {
+  EventBase eb;
+  SocketPair sp;
+
+  // Register for read events
+  size_t readLength = 100;
+  PartialReadHandler handler(&eb, sp[0], readLength);
+  handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
+
+  // Register a timeout to perform a single write,
+  // with more data than PartialReadHandler will read at once
+  ScheduledEvent events[] = {
+    { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) },
+    { 0, 0, 0 },
+  };
+  scheduleEvents(&eb, sp[1], events);
+
+  // Schedule a timeout to unregister the handler
+  eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
+
+  // Loop
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  ASSERT_EQ(handler.log.size(), 4);
+
+  // The first 3 invocations should read readLength bytes each
+  for (int n = 0; n < 3; ++n) {
+    ASSERT_EQ(handler.log[n].events, EventHandler::READ);
+    T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds);
+    ASSERT_EQ(handler.log[n].bytesRead, readLength);
+    ASSERT_EQ(handler.log[n].bytesWritten, 0);
+  }
+  // The last read only has readLength/2 bytes
+  ASSERT_EQ(handler.log[3].events, EventHandler::READ);
+  T_CHECK_TIMEOUT(start, handler.log[3].timestamp, events[0].milliseconds);
+  ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
+  ASSERT_EQ(handler.log[3].bytesWritten, 0);
+}
+
+
+class PartialWriteHandler : public TestHandler {
+ public:
+  PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
+    : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
+
+  virtual void handlerReady(uint16_t events) noexcept {
+    assert(events == EventHandler::WRITE);
+    ssize_t bytesWritten = writeToFD(fd_, writeLength_);
+    log.push_back(EventRecord(events, 0, bytesWritten));
+  }
+
+ private:
+  int fd_;
+  size_t writeLength_;
+};
+
+/**
+ * Test writing without completely filling up the write buffer when the fd
+ * becomes writable.  When PERSIST is used, make sure the handler gets
+ * notified again the next time around the loop.
+ */
+TEST(EventBaseTest, WritePartial) {
+  EventBase eb;
+  SocketPair sp;
+
+  // Fill up the write buffer before starting
+  size_t initialBytesWritten = writeUntilFull(sp[0]);
+
+  // Register for write events
+  size_t writeLength = 100;
+  PartialWriteHandler handler(&eb, sp[0], writeLength);
+  handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
+
+  // Register a timeout to read, so that more data can be written
+  ScheduledEvent events[] = {
+    { 10, EventHandler::READ, 0 },
+    { 0, 0, 0 },
+  };
+  scheduleEvents(&eb, sp[1], events);
+
+  // Schedule a timeout to unregister the handler
+  eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
+
+  // Loop
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  // Depending on how big the socket buffer is, there will be multiple writes
+  // Only check the first 5
+  int numChecked = 5;
+  ASSERT_GE(handler.log.size(), numChecked);
+  ASSERT_EQ(events[0].result, initialBytesWritten);
+
+  // The first 3 invocations should read writeLength bytes each
+  for (int n = 0; n < numChecked; ++n) {
+    ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
+    T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds);
+    ASSERT_EQ(handler.log[n].bytesRead, 0);
+    ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
+  }
+}
+
+
+/**
+ * Test destroying a registered EventHandler
+ */
+TEST(EventBaseTest, DestroyHandler) {
+  class DestroyHandler : public TAsyncTimeout {
+   public:
+    DestroyHandler(EventBase* eb, EventHandler* h)
+      : TAsyncTimeout(eb)
+      , handler_(h) {}
+
+    virtual void timeoutExpired() noexcept {
+      delete handler_;
+    }
+
+   private:
+    EventHandler* handler_;
+  };
+
+  EventBase eb;
+  SocketPair sp;
+
+  // Fill up the write buffer before starting
+  size_t initialBytesWritten = writeUntilFull(sp[0]);
+
+  // Register for write events
+  TestHandler* handler = new TestHandler(&eb, sp[0]);
+  handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
+
+  // After 10ms, read some data, so that the handler
+  // will be notified that it can write.
+  eb.runAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
+                   10);
+
+  // Start a timer to destroy the handler after 25ms
+  // This mainly just makes sure the code doesn't break or assert
+  DestroyHandler dh(&eb, handler);
+  dh.scheduleTimeout(25);
+
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  // Make sure the EventHandler was uninstalled properly when it was
+  // destroyed, and the EventBase loop exited
+  T_CHECK_TIMEOUT(start, end, 25);
+
+  // Make sure that the handler wrote data to the socket
+  // before it was destroyed
+  size_t bytesRemaining = readUntilEmpty(sp[1]);
+  ASSERT_GT(bytesRemaining, 0);
+}
+
+
+///////////////////////////////////////////////////////////////////////////
+// Tests for timeout events
+///////////////////////////////////////////////////////////////////////////
+
+TEST(EventBaseTest, RunAfterDelay) {
+  EventBase eb;
+
+  TimePoint timestamp1(false);
+  TimePoint timestamp2(false);
+  TimePoint timestamp3(false);
+  eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
+  eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
+  eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
+
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  T_CHECK_TIMEOUT(start, timestamp1, 10);
+  T_CHECK_TIMEOUT(start, timestamp2, 20);
+  T_CHECK_TIMEOUT(start, timestamp3, 40);
+  T_CHECK_TIMEOUT(start, end, 40);
+}
+
+/**
+ * Test the behavior of runAfterDelay() when some timeouts are
+ * still scheduled when the EventBase is destroyed.
+ */
+TEST(EventBaseTest, RunAfterDelayDestruction) {
+  TimePoint timestamp1(false);
+  TimePoint timestamp2(false);
+  TimePoint timestamp3(false);
+  TimePoint timestamp4(false);
+  TimePoint start(false);
+  TimePoint end(false);
+
+  {
+    EventBase eb;
+
+    // Run two normal timeouts
+    eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
+    eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
+
+    // Schedule a timeout to stop the event loop after 40ms
+    eb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
+
+    // Schedule 2 timeouts that would fire after the event loop stops
+    eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
+    eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
+
+    start.reset();
+    eb.loop();
+    end.reset();
+  }
+
+  T_CHECK_TIMEOUT(start, timestamp1, 10);
+  T_CHECK_TIMEOUT(start, timestamp2, 20);
+  T_CHECK_TIMEOUT(start, end, 40);
+
+  ASSERT_TRUE(timestamp3.isUnset());
+  ASSERT_TRUE(timestamp4.isUnset());
+
+  // Ideally this test should be run under valgrind to ensure that no
+  // memory is leaked.
+}
+
+class TestTimeout : public TAsyncTimeout {
+ public:
+  explicit TestTimeout(EventBase* eventBase)
+    : TAsyncTimeout(eventBase)
+    , timestamp(false) {}
+
+  virtual void timeoutExpired() noexcept {
+    timestamp.reset();
+  }
+
+  TimePoint timestamp;
+};
+
+TEST(EventBaseTest, BasicTimeouts) {
+  EventBase eb;
+
+  TestTimeout t1(&eb);
+  TestTimeout t2(&eb);
+  TestTimeout t3(&eb);
+  t1.scheduleTimeout(10);
+  t2.scheduleTimeout(20);
+  t3.scheduleTimeout(40);
+
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  T_CHECK_TIMEOUT(start, t1.timestamp, 10);
+  T_CHECK_TIMEOUT(start, t2.timestamp, 20);
+  T_CHECK_TIMEOUT(start, t3.timestamp, 40);
+  T_CHECK_TIMEOUT(start, end, 40);
+}
+
+class ReschedulingTimeout : public TAsyncTimeout {
+ public:
+  ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
+    : TAsyncTimeout(evb)
+    , timeouts_(timeouts)
+    , iterator_(timeouts_.begin()) {}
+
+  void start() {
+    reschedule();
+  }
+
+  virtual void timeoutExpired() noexcept {
+    timestamps.push_back(TimePoint());
+    reschedule();
+  }
+
+  void reschedule() {
+    if (iterator_ != timeouts_.end()) {
+      uint32_t timeout = *iterator_;
+      ++iterator_;
+      scheduleTimeout(timeout);
+    }
+  }
+
+  vector<TimePoint> timestamps;
+
+ private:
+  vector<uint32_t> timeouts_;
+  vector<uint32_t>::const_iterator iterator_;
+};
+
+/**
+ * Test rescheduling the same timeout multiple times
+ */
+TEST(EventBaseTest, ReuseTimeout) {
+  EventBase eb;
+
+  vector<uint32_t> timeouts;
+  timeouts.push_back(10);
+  timeouts.push_back(30);
+  timeouts.push_back(15);
+
+  ReschedulingTimeout t(&eb, timeouts);
+  t.start();
+
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  // Use a higher tolerance than usual.  We're waiting on 3 timeouts
+  // consecutively.  In general, each timeout may go over by a few
+  // milliseconds, and we're tripling this error by witing on 3 timeouts.
+  int64_t tolerance = 6;
+
+  ASSERT_EQ(timeouts.size(), t.timestamps.size());
+  uint32_t total = 0;
+  for (int n = 0; n < timeouts.size(); ++n) {
+    total += timeouts[n];
+    T_CHECK_TIMEOUT(start, t.timestamps[n], total, tolerance);
+  }
+  T_CHECK_TIMEOUT(start, end, total, tolerance);
+}
+
+/**
+ * Test rescheduling a timeout before it has fired
+ */
+TEST(EventBaseTest, RescheduleTimeout) {
+  EventBase eb;
+
+  TestTimeout t1(&eb);
+  TestTimeout t2(&eb);
+  TestTimeout t3(&eb);
+
+  t1.scheduleTimeout(15);
+  t2.scheduleTimeout(30);
+  t3.scheduleTimeout(30);
+
+  auto f = static_cast<bool(TAsyncTimeout::*)(uint32_t)>(
+      &TAsyncTimeout::scheduleTimeout);
+
+  // after 10ms, reschedule t2 to run sooner than originally scheduled
+  eb.runAfterDelay(std::bind(f, &t2, 10), 10);
+  // after 10ms, reschedule t3 to run later than originally scheduled
+  eb.runAfterDelay(std::bind(f, &t3, 40), 10);
+
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  T_CHECK_TIMEOUT(start, t1.timestamp, 15);
+  T_CHECK_TIMEOUT(start, t2.timestamp, 20);
+  T_CHECK_TIMEOUT(start, t3.timestamp, 50);
+  T_CHECK_TIMEOUT(start, end, 50);
+}
+
+/**
+ * Test cancelling a timeout
+ */
+TEST(EventBaseTest, CancelTimeout) {
+  EventBase eb;
+
+  vector<uint32_t> timeouts;
+  timeouts.push_back(10);
+  timeouts.push_back(30);
+  timeouts.push_back(25);
+
+  ReschedulingTimeout t(&eb, timeouts);
+  t.start();
+  eb.runAfterDelay(std::bind(&TAsyncTimeout::cancelTimeout, &t), 50);
+
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  ASSERT_EQ(t.timestamps.size(), 2);
+  T_CHECK_TIMEOUT(start, t.timestamps[0], 10);
+  T_CHECK_TIMEOUT(start, t.timestamps[1], 40);
+  T_CHECK_TIMEOUT(start, end, 50);
+}
+
+/**
+ * Test destroying a scheduled timeout object
+ */
+TEST(EventBaseTest, DestroyTimeout) {
+  class DestroyTimeout : public TAsyncTimeout {
+   public:
+    DestroyTimeout(EventBase* eb, TAsyncTimeout* t)
+      : TAsyncTimeout(eb)
+      , timeout_(t) {}
+
+    virtual void timeoutExpired() noexcept {
+      delete timeout_;
+    }
+
+   private:
+    TAsyncTimeout* timeout_;
+  };
+
+  EventBase eb;
+
+  TestTimeout* t1 = new TestTimeout(&eb);
+  t1->scheduleTimeout(30);
+
+  DestroyTimeout dt(&eb, t1);
+  dt.scheduleTimeout(10);
+
+  TimePoint start;
+  eb.loop();
+  TimePoint end;
+
+  T_CHECK_TIMEOUT(start, end, 10);
+}
+
+
+///////////////////////////////////////////////////////////////////////////
+// Test for runInThreadTestFunc()
+///////////////////////////////////////////////////////////////////////////
+
+struct RunInThreadData {
+  RunInThreadData(int numThreads, int opsPerThread)
+    : opsPerThread(opsPerThread)
+    , opsToGo(numThreads*opsPerThread) {}
+
+  EventBase evb;
+  deque< pair<int, int> > values;
+
+  int opsPerThread;
+  int opsToGo;
+};
+
+struct RunInThreadArg {
+  RunInThreadArg(RunInThreadData* data,
+                 int threadId,
+                 int value)
+    : data(data)
+    , thread(threadId)
+    , value(value) {}
+
+  RunInThreadData* data;
+  int thread;
+  int value;
+};
+
+void runInThreadTestFunc(RunInThreadArg* arg) {
+  arg->data->values.push_back(make_pair(arg->thread, arg->value));
+  RunInThreadData* data = arg->data;
+  delete arg;
+
+  if(--data->opsToGo == 0) {
+    // Break out of the event base loop if we are the last thread running
+    data->evb.terminateLoopSoon();
+  }
+}
+
+class RunInThreadTester : public concurrency::Runnable {
+ public:
+  RunInThreadTester(int id, RunInThreadData* data) : id_(id), data_(data) {}
+
+  void run() {
+    // Call evb->runInThread() a number of times
+    {
+      for (int n = 0; n < data_->opsPerThread; ++n) {
+        RunInThreadArg* arg = new RunInThreadArg(data_, id_, n);
+        data_->evb.runInEventBaseThread(runInThreadTestFunc, arg);
+        usleep(10);
+      }
+    }
+  }
+
+ private:
+  int id_;
+  RunInThreadData* data_;
+};
+
+TEST(EventBaseTest, RunInThread) {
+  uint32_t numThreads = 50;
+  uint32_t opsPerThread = 100;
+  RunInThreadData data(numThreads, opsPerThread);
+
+  PosixThreadFactory threadFactory;
+  threadFactory.setDetached(false);
+  deque< std::shared_ptr<Thread> > threads;
+  for (int n = 0; n < numThreads; ++n) {
+    std::shared_ptr<RunInThreadTester> runner(new RunInThreadTester(n, &data));
+    std::shared_ptr<Thread> thread = threadFactory.newThread(runner);
+    threads.push_back(thread);
+    thread->start();
+  }
+
+  // Add a timeout event to run after 3 seconds.
+  // Otherwise loop() will return immediately since there are no events to run.
+  // Once the last thread exits, it will stop the loop().  However, this
+  // timeout also stops the loop in case there is a bug performing the normal
+  // stop.
+  data.evb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
+                         3000);
+
+  TimePoint start;
+  data.evb.loop();
+  TimePoint end;
+
+  // Verify that the loop exited because all threads finished and requested it
+  // to stop.  This should happen much sooner than the 3 second timeout.
+  // Assert that it happens in under a second.  (This is still tons of extra
+  // padding.)
+  int64_t timeTaken = end.getTime() - start.getTime();
+  ASSERT_LT(timeTaken, 1000);
+  VLOG(11) << "Time taken: " << timeTaken;
+
+  // Verify that we have all of the events from every thread
+  int expectedValues[numThreads];
+  for (int n = 0; n < numThreads; ++n) {
+    expectedValues[n] = 0;
+  }
+  for (deque< pair<int, int> >::const_iterator it = data.values.begin();
+       it != data.values.end();
+       ++it) {
+    int threadID = it->first;
+    int value = it->second;
+    ASSERT_EQ(expectedValues[threadID], value);
+    ++expectedValues[threadID];
+  }
+  for (int n = 0; n < numThreads; ++n) {
+    ASSERT_EQ(expectedValues[n], opsPerThread);
+  }
+
+  // Wait on all of the threads.  Otherwise we can exit and clean up
+  // RunInThreadData before the last thread exits, while it is still holding
+  // the RunInThreadData's mutex.
+  for (deque< std::shared_ptr<Thread> >::const_iterator it = threads.begin();
+       it != threads.end();
+       ++it) {
+    (*it)->join();
+  }
+}
+
+///////////////////////////////////////////////////////////////////////////
+// Tests for runInLoop()
+///////////////////////////////////////////////////////////////////////////
+
+class CountedLoopCallback : public EventBase::LoopCallback {
+ public:
+  CountedLoopCallback(EventBase* eventBase,
+                      unsigned int count,
+                      std::function<void()> action =
+                        std::function<void()>())
+    : eventBase_(eventBase)
+    , count_(count)
+    , action_(action) {}
+
+  virtual void runLoopCallback() noexcept {
+    --count_;
+    if (count_ > 0) {
+      eventBase_->runInLoop(this);
+    } else if (action_) {
+      action_();
+    }
+  }
+
+  unsigned int getCount() const {
+    return count_;
+  }
+
+ private:
+  EventBase* eventBase_;
+  unsigned int count_;
+  std::function<void()> action_;
+};
+
+// Test that EventBase::loop() doesn't exit while there are
+// still LoopCallbacks remaining to be invoked.
+TEST(EventBaseTest, RepeatedRunInLoop) {
+  EventBase eventBase;
+
+  CountedLoopCallback c(&eventBase, 10);
+  eventBase.runInLoop(&c);
+  // The callback shouldn't have run immediately
+  ASSERT_EQ(c.getCount(), 10);
+  eventBase.loop();
+
+  // loop() should loop until the CountedLoopCallback stops
+  // re-installing itself.
+  ASSERT_EQ(c.getCount(), 0);
+}
+
+// Test runInLoop() calls with terminateLoopSoon()
+TEST(EventBaseTest, RunInLoopStopLoop) {
+  EventBase eventBase;
+
+  CountedLoopCallback c1(&eventBase, 20);
+  CountedLoopCallback c2(&eventBase, 10,
+                         std::bind(&EventBase::terminateLoopSoon, &eventBase));
+
+  eventBase.runInLoop(&c1);
+  eventBase.runInLoop(&c2);
+  ASSERT_EQ(c1.getCount(), 20);
+  ASSERT_EQ(c2.getCount(), 10);
+
+  eventBase.loopForever();
+
+  // c2 should have stopped the loop after 10 iterations
+  ASSERT_EQ(c2.getCount(), 0);
+
+  // We allow the EventBase to run the loop callbacks in whatever order it
+  // chooses.  We'll accept c1's count being either 10 (if the loop terminated
+  // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
+  // before c1 ran).
+  //
+  // (With the current code, c1 will always run 10 times, but we don't consider
+  // this a hard API requirement.)
+  ASSERT_GE(c1.getCount(), 10);
+  ASSERT_LE(c1.getCount(), 11);
+}
+
+// Test cancelling runInLoop() callbacks
+TEST(EventBaseTest, CancelRunInLoop) {
+  EventBase eventBase;
+
+  CountedLoopCallback c1(&eventBase, 20);
+  CountedLoopCallback c2(&eventBase, 20);
+  CountedLoopCallback c3(&eventBase, 20);
+
+  std::function<void()> cancelC1Action =
+    std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
+  std::function<void()> cancelC2Action =
+    std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
+
+  CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
+  CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
+
+  // Install cancelC1 after c1
+  eventBase.runInLoop(&c1);
+  eventBase.runInLoop(&cancelC1);
+
+  // Install cancelC2 before c2
+  eventBase.runInLoop(&cancelC2);
+  eventBase.runInLoop(&c2);
+
+  // Install c3
+  eventBase.runInLoop(&c3);
+
+  ASSERT_EQ(c1.getCount(), 20);
+  ASSERT_EQ(c2.getCount(), 20);
+  ASSERT_EQ(c3.getCount(), 20);
+  ASSERT_EQ(cancelC1.getCount(), 10);
+  ASSERT_EQ(cancelC2.getCount(), 10);
+
+  // Run the loop
+  eventBase.loop();
+
+  // cancelC1 and cancelC3 should have both fired after 10 iterations and
+  // stopped re-installing themselves
+  ASSERT_EQ(cancelC1.getCount(), 0);
+  ASSERT_EQ(cancelC2.getCount(), 0);
+  // c3 should have continued on for the full 20 iterations
+  ASSERT_EQ(c3.getCount(), 0);
+
+  // c1 and c2 should have both been cancelled on the 10th iteration.
+  //
+  // Callbacks are always run in the order they are installed,
+  // so c1 should have fired 10 times, and been canceled after it ran on the
+  // 10th iteration.  c2 should have only fired 9 times, because cancelC2 will
+  // have run before it on the 10th iteration, and cancelled it before it
+  // fired.
+  ASSERT_EQ(c1.getCount(), 10);
+  ASSERT_EQ(c2.getCount(), 11);
+}
+
+class TerminateTestCallback : public EventBase::LoopCallback,
+                              public EventHandler {
+ public:
+  TerminateTestCallback(EventBase* eventBase, int fd)
+    : EventHandler(eventBase, fd),
+      eventBase_(eventBase),
+      loopInvocations_(0),
+      maxLoopInvocations_(0),
+      eventInvocations_(0),
+      maxEventInvocations_(0) {}
+
+  void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
+    loopInvocations_ = 0;
+    maxLoopInvocations_ = maxLoopInvocations;
+    eventInvocations_ = 0;
+    maxEventInvocations_ = maxEventInvocations;
+
+    cancelLoopCallback();
+    unregisterHandler();
+  }
+
+  virtual void handlerReady(uint16_t events) noexcept {
+    // We didn't register with PERSIST, so we will have been automatically
+    // unregistered already.
+    ASSERT_FALSE(isHandlerRegistered());
+
+    ++eventInvocations_;
+    if (eventInvocations_ >= maxEventInvocations_) {
+      return;
+    }
+
+    eventBase_->runInLoop(this);
+  }
+  virtual void runLoopCallback() noexcept {
+    ++loopInvocations_;
+    if (loopInvocations_ >= maxLoopInvocations_) {
+      return;
+    }
+
+    registerHandler(READ);
+  }
+
+  uint32_t getLoopInvocations() const {
+    return loopInvocations_;
+  }
+  uint32_t getEventInvocations() const {
+    return eventInvocations_;
+  }
+
+ private:
+  EventBase* eventBase_;
+  uint32_t loopInvocations_;
+  uint32_t maxLoopInvocations_;
+  uint32_t eventInvocations_;
+  uint32_t maxEventInvocations_;
+};
+
+/**
+ * Test that EventBase::loop() correctly detects when there are no more events
+ * left to run.
+ *
+ * This uses a single callback, which alternates registering itself as a loop
+ * callback versus a EventHandler callback.  This exercises a regression where
+ * EventBase::loop() incorrectly exited if there were no more fd handlers
+ * registered, but a loop callback installed a new fd handler.
+ */
+TEST(EventBaseTest, LoopTermination) {
+  EventBase eventBase;
+
+  // Open a pipe and close the write end,
+  // so the read endpoint will be readable
+  int pipeFds[2];
+  int rc = pipe(pipeFds);
+  ASSERT_EQ(rc, 0);
+  close(pipeFds[1]);
+  TerminateTestCallback callback(&eventBase, pipeFds[0]);
+
+  // Test once where the callback will exit after a loop callback
+  callback.reset(10, 100);
+  eventBase.runInLoop(&callback);
+  eventBase.loop();
+  ASSERT_EQ(callback.getLoopInvocations(), 10);
+  ASSERT_EQ(callback.getEventInvocations(), 9);
+
+  // Test once where the callback will exit after an fd event callback
+  callback.reset(100, 7);
+  eventBase.runInLoop(&callback);
+  eventBase.loop();
+  ASSERT_EQ(callback.getLoopInvocations(), 7);
+  ASSERT_EQ(callback.getEventInvocations(), 7);
+
+  close(pipeFds[0]);
+}
+
+///////////////////////////////////////////////////////////////////////////
+// Tests for latency calculations
+///////////////////////////////////////////////////////////////////////////
+
+class IdleTimeTimeoutSeries : public TAsyncTimeout {
+
+ public:
+
+  explicit IdleTimeTimeoutSeries(EventBase *base,
+                                 std::deque<std::uint64_t>& timeout) :
+    TAsyncTimeout(base),
+    timeouts_(0),
+    timeout_(timeout) {
+      scheduleTimeout(1);
+    }
+
+  virtual ~IdleTimeTimeoutSeries() {}
+
+  void timeoutExpired() noexcept {
+    ++timeouts_;
+
+    if(timeout_.empty()){
+      cancelTimeout();
+    } else {
+      uint64_t sleepTime = timeout_.front();
+      timeout_.pop_front();
+      if (sleepTime) {
+        usleep(sleepTime);
+      }
+      scheduleTimeout(1);
+    }
+  }
+
+  int getTimeouts() const {
+    return timeouts_;
+  }
+
+ private:
+  int timeouts_;
+  std::deque<uint64_t>& timeout_;
+};
+
+/**
+ * Verify that idle time is correctly accounted for when decaying our loop
+ * time.
+ *
+ * This works by creating a high loop time (via usleep), expecting a latency
+ * callback with known value, and then scheduling a timeout for later. This
+ * later timeout is far enough in the future that the idle time should have
+ * caused the loop time to decay.
+ */
+TEST(EventBaseTest, IdleTime) {
+  EventBase eventBase;
+  eventBase.setLoadAvgMsec(1000);
+  eventBase.resetLoadAvg(5900.0);
+  std::deque<uint64_t> timeouts0(4, 8080);
+  timeouts0.push_front(8000);
+  timeouts0.push_back(14000);
+  IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
+  std::deque<uint64_t> timeouts(20, 20);
+  std::unique_ptr<IdleTimeTimeoutSeries> tos;
+
+  int latencyCallbacks = 0;
+  eventBase.setMaxLatency(6000, [&]() {
+    ++latencyCallbacks;
+
+    switch (latencyCallbacks) {
+    case 1:
+      ASSERT_EQ(6, tos0.getTimeouts());
+      ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
+      ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
+      tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
+      break;
+
+    default:
+      FAIL() << "Unexpected latency callback";
+      break;
+    }
+  });
+
+  // Kick things off with an "immedite" timeout
+  tos0.scheduleTimeout(1);
+
+  eventBase.loop();
+
+  ASSERT_EQ(1, latencyCallbacks);
+  ASSERT_EQ(7, tos0.getTimeouts());
+  ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
+  ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
+  ASSERT_EQ(21, tos->getTimeouts());
+}
+
+/**
+ * Test that thisLoop functionality works with terminateLoopSoon
+ */
+TEST(EventBaseTest, ThisLoop) {
+  EventBase eb;
+  bool runInLoop = false;
+  bool runThisLoop = false;
+
+  eb.runInLoop([&](){
+      eb.terminateLoopSoon();
+      eb.runInLoop([&]() {
+          runInLoop = true;
+        });
+      eb.runInLoop([&]() {
+          runThisLoop = true;
+        }, true);
+    }, true);
+  eb.loopForever();
+
+  // Should not work
+  ASSERT_FALSE(runInLoop);
+  // Should work with thisLoop
+  ASSERT_TRUE(runThisLoop);
+}
+
+TEST(EventBaseTest, EventBaseThreadLoop) {
+  EventBase base;
+  bool ran = false;
+
+  base.runInEventBaseThread([&](){
+    ran = true;
+  });
+  base.loop();
+
+  ASSERT_EQ(true, ran);
+}
+
+TEST(EventBaseTest, EventBaseThreadName) {
+  EventBase base;
+  base.setName("foo");
+  base.loop();
+
+#if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
+  char name[16];
+  pthread_getname_np(pthread_self(), name, 16);
+  ASSERT_EQ(0, strcmp("foo", name));
+#endif
+}
diff --git a/folly/io/async/test/HHWheelTimerTest.cpp b/folly/io/async/test/HHWheelTimerTest.cpp
new file mode 100644 (file)
index 0000000..05a84c8
--- /dev/null
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <folly/io/async/HHWheelTimer.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/test/UndelayedDestruction.h>
+#include <folly/io/async/test/Util.h>
+
+#include <gtest/gtest.h>
+#include <vector>
+
+using namespace folly;
+using std::chrono::milliseconds;
+
+typedef UndelayedDestruction<HHWheelTimer> StackWheelTimer;
+
+class TestTimeout : public HHWheelTimer::Callback {
+ public:
+  TestTimeout() {}
+  TestTimeout(HHWheelTimer* t, milliseconds timeout) {
+    t->scheduleTimeout(this, timeout);
+  }
+  virtual void timeoutExpired() noexcept {
+    timestamps.push_back(TimePoint());
+    if (fn) {
+      fn();
+    }
+  }
+
+  std::deque<TimePoint> timestamps;
+  std::function<void()> fn;
+};
+
+/*
+ * Test firing some simple timeouts that are fired once and never rescheduled
+ */
+TEST(HHWheelTimerTest, FireOnce) {
+  EventBase eventBase;
+  StackWheelTimer t(&eventBase, milliseconds(1));
+
+  const HHWheelTimer::Callback* nullCallback = nullptr;
+
+  TestTimeout t1;
+  TestTimeout t2;
+  TestTimeout t3;
+
+  ASSERT_EQ(t.count(), 0);
+
+  t.scheduleTimeout(&t1, milliseconds(5));
+  t.scheduleTimeout(&t2, milliseconds(5));
+  // Verify scheduling it twice cancels, then schedules.
+  // Should only get one callback.
+  t.scheduleTimeout(&t2, milliseconds(5));
+  t.scheduleTimeout(&t3, milliseconds(10));
+
+  ASSERT_EQ(t.count(), 3);
+
+  TimePoint start;
+  eventBase.loop();
+  TimePoint end;
+
+  ASSERT_EQ(t1.timestamps.size(), 1);
+  ASSERT_EQ(t2.timestamps.size(), 1);
+  ASSERT_EQ(t3.timestamps.size(), 1);
+
+  ASSERT_EQ(t.count(), 0);
+
+  T_CHECK_TIMEOUT(start, t1.timestamps[0], 5);
+  T_CHECK_TIMEOUT(start, t2.timestamps[0], 5);
+  T_CHECK_TIMEOUT(start, t3.timestamps[0], 10);
+  T_CHECK_TIMEOUT(start, end, 10);
+}
+
+/*
+ * Test cancelling a timeout when it is scheduled to be fired right away.
+ */
+
+TEST(HHWheelTimerTest, CancelTimeout) {
+  EventBase eventBase;
+  StackWheelTimer t(&eventBase, milliseconds(1));
+
+  // Create several timeouts that will all fire in 5ms.
+  TestTimeout t5_1(&t, milliseconds(5));
+  TestTimeout t5_2(&t, milliseconds(5));
+  TestTimeout t5_3(&t, milliseconds(5));
+  TestTimeout t5_4(&t, milliseconds(5));
+  TestTimeout t5_5(&t, milliseconds(5));
+
+  // Also create a few timeouts to fire in 10ms
+  TestTimeout t10_1(&t, milliseconds(10));
+  TestTimeout t10_2(&t, milliseconds(10));
+  TestTimeout t10_3(&t, milliseconds(10));
+
+  TestTimeout t20_1(&t, milliseconds(20));
+  TestTimeout t20_2(&t, milliseconds(20));
+
+  // Have t5_1 cancel t5_2 and t5_4.
+  //
+  // Cancelling t5_2 will test cancelling a timeout that is at the head of the
+  // list and ready to be fired.
+  //
+  // Cancelling t5_4 will test cancelling a timeout in the middle of the list
+  t5_1.fn = [&] {
+    t5_2.cancelTimeout();
+    t5_4.cancelTimeout();
+  };
+
+  // Have t5_3 cancel t5_5.
+  // This will test cancelling the last remaining timeout.
+  //
+  // Then have t5_3 reschedule itself.
+  t5_3.fn = [&] {
+    t5_5.cancelTimeout();
+    // Reset our function so we won't continually reschedule ourself
+    std::function<void()> fnDtorGuard;
+    t5_3.fn.swap(fnDtorGuard);
+    t.scheduleTimeout(&t5_3, milliseconds(5));
+
+    // Also test cancelling timeouts in another timeset that isn't ready to
+    // fire yet.
+    //
+    // Cancel the middle timeout in ts10.
+    t10_2.cancelTimeout();
+    // Cancel both the timeouts in ts20.
+    t20_1.cancelTimeout();
+    t20_2.cancelTimeout();
+  };
+
+  TimePoint start;
+  eventBase.loop();
+  TimePoint end;
+
+  ASSERT_EQ(t5_1.timestamps.size(), 1);
+  T_CHECK_TIMEOUT(start, t5_1.timestamps[0], 5);
+
+  ASSERT_EQ(t5_3.timestamps.size(), 2);
+  T_CHECK_TIMEOUT(start, t5_3.timestamps[0], 5);
+  T_CHECK_TIMEOUT(t5_3.timestamps[0], t5_3.timestamps[1], 5);
+
+  ASSERT_EQ(t10_1.timestamps.size(), 1);
+  T_CHECK_TIMEOUT(start, t10_1.timestamps[0], 10);
+  ASSERT_EQ(t10_3.timestamps.size(), 1);
+  T_CHECK_TIMEOUT(start, t10_3.timestamps[0], 10);
+
+  // Cancelled timeouts
+  ASSERT_EQ(t5_2.timestamps.size(), 0);
+  ASSERT_EQ(t5_4.timestamps.size(), 0);
+  ASSERT_EQ(t5_5.timestamps.size(), 0);
+  ASSERT_EQ(t10_2.timestamps.size(), 0);
+  ASSERT_EQ(t20_1.timestamps.size(), 0);
+  ASSERT_EQ(t20_2.timestamps.size(), 0);
+
+  T_CHECK_TIMEOUT(start, end, 10);
+}
+
+/*
+ * Test destroying a HHWheelTimer with timeouts outstanding
+ */
+
+TEST(HHWheelTimerTest, DestroyTimeoutSet) {
+  EventBase eventBase;
+
+  HHWheelTimer::UniquePtr t(
+    new HHWheelTimer(&eventBase, milliseconds(1)));
+
+  TestTimeout t5_1(t.get(), milliseconds(5));
+  TestTimeout t5_2(t.get(), milliseconds(5));
+  TestTimeout t5_3(t.get(), milliseconds(5));
+
+  TestTimeout t10_1(t.get(), milliseconds(10));
+  TestTimeout t10_2(t.get(), milliseconds(10));
+
+  // Have t5_2 destroy t
+  // Note that this will call destroy() inside t's timeoutExpired()
+  // method.
+  t5_2.fn = [&] {
+    t5_3.cancelTimeout();
+    t5_1.cancelTimeout();
+    t10_1.cancelTimeout();
+    t10_2.cancelTimeout();
+    t.reset();};
+
+  TimePoint start;
+  eventBase.loop();
+  TimePoint end;
+
+  ASSERT_EQ(t5_1.timestamps.size(), 1);
+  T_CHECK_TIMEOUT(start, t5_1.timestamps[0], 5);
+  ASSERT_EQ(t5_2.timestamps.size(), 1);
+  T_CHECK_TIMEOUT(start, t5_2.timestamps[0], 5);
+
+  ASSERT_EQ(t5_3.timestamps.size(), 0);
+  ASSERT_EQ(t10_1.timestamps.size(), 0);
+  ASSERT_EQ(t10_2.timestamps.size(), 0);
+
+  T_CHECK_TIMEOUT(start, end, 5);
+}
+
+/*
+ * Test the tick interval parameter
+ */
+TEST(HHWheelTimerTest, AtMostEveryN) {
+  EventBase eventBase;
+
+  // Create a timeout set with a 10ms interval, to fire no more than once
+  // every 3ms.
+  milliseconds interval(25);
+  milliseconds atMostEveryN(6);
+  StackWheelTimer t(&eventBase, atMostEveryN);
+  t.setCatchupEveryN(70);
+
+  // Create 60 timeouts to be added to ts10 at 1ms intervals.
+  uint32_t numTimeouts = 60;
+  std::vector<TestTimeout> timeouts(numTimeouts);
+
+  // Create a scheduler timeout to add the timeouts 1ms apart.
+  uint32_t index = 0;
+  StackWheelTimer ts1(&eventBase, milliseconds(1));
+  TestTimeout scheduler(&ts1, milliseconds(1));
+  scheduler.fn = [&] {
+    if (index >= numTimeouts) {
+      return;
+    }
+    // Call timeoutExpired() on the timeout so it will record a timestamp.
+    // This is done only so we can record when we scheduled the timeout.
+    // This way if ts1 starts to fall behind a little over time we will still
+    // be comparing the ts10 timeouts to when they were first scheduled (rather
+    // than when we intended to schedule them).  The scheduler may fall behind
+    // eventually since we don't really schedule it once every millisecond.
+    // Each time it finishes we schedule it for 1 millisecond in the future.
+    // The amount of time it takes to run, and any delays it encounters
+    // getting scheduled may eventually add up over time.
+    timeouts[index].timeoutExpired();
+
+    // Schedule the new timeout
+    t.scheduleTimeout(&timeouts[index], interval);
+    // Reschedule ourself
+    ts1.scheduleTimeout(&scheduler, milliseconds(1));
+    ++index;
+  };
+
+  // Go ahead and schedule the first timeout now.
+  //scheduler.fn();
+
+  TimePoint start;
+  eventBase.loop();
+  TimePoint end;
+
+  // We scheduled timeouts 1ms apart, when the HHWheelTimer is only allowed
+  // to wake up at most once every 3ms.  It will therefore wake up every 3ms
+  // and fire groups of approximately 3 timeouts at a time.
+  //
+  // This is "approximately 3" since it may get slightly behind and fire 4 in
+  // one interval, etc.  T_CHECK_TIMEOUT normally allows a few milliseconds of
+  // tolerance.  We have to add the same into our checking algorithm here.
+  for (uint32_t idx = 0; idx < numTimeouts; ++idx) {
+    ASSERT_EQ(timeouts[idx].timestamps.size(), 2);
+
+    TimePoint scheduledTime(timeouts[idx].timestamps[0]);
+    TimePoint firedTime(timeouts[idx].timestamps[1]);
+
+    // Assert that the timeout fired at roughly the right time.
+    // T_CHECK_TIMEOUT() normally has a tolerance of 5ms.  Allow an additional
+    // atMostEveryN.
+    milliseconds tolerance = milliseconds(5) + interval;
+    T_CHECK_TIMEOUT(scheduledTime, firedTime, atMostEveryN.count(),
+                    tolerance.count());
+
+    // Assert that the difference between the previous timeout and now was
+    // either very small (fired in the same event loop), or larger than
+    // atMostEveryN.
+    if (idx == 0) {
+      // no previous value
+      continue;
+    }
+    TimePoint prev(timeouts[idx - 1].timestamps[1]);
+
+    milliseconds delta((firedTime.getTimeStart() - prev.getTimeEnd()) -
+                       (firedTime.getTimeWaiting() - prev.getTimeWaiting()));
+    if (delta > milliseconds(1)) {
+      T_CHECK_TIMEOUT(prev, firedTime, atMostEveryN.count()); }
+  }
+}
+
+/*
+ * Test an event loop that is blocking
+ */
+
+TEST(HHWheelTimerTest, SlowLoop) {
+  EventBase eventBase;
+  StackWheelTimer t(&eventBase, milliseconds(1));
+
+  TestTimeout t1;
+  TestTimeout t2;
+
+  ASSERT_EQ(t.count(), 0);
+
+  eventBase.runInLoop([](){usleep(10000);});
+  t.scheduleTimeout(&t1, milliseconds(5));
+
+  ASSERT_EQ(t.count(), 1);
+
+  TimePoint start;
+  eventBase.loop();
+  TimePoint end;
+
+  ASSERT_EQ(t1.timestamps.size(), 1);
+  ASSERT_EQ(t.count(), 0);
+
+  // Check that the timeout was delayed by sleep
+  T_CHECK_TIMEOUT(start, t1.timestamps[0], 15, 1);
+  T_CHECK_TIMEOUT(start, end, 15, 1);
+
+  // Try it again, this time with catchup timing every loop
+  t.setCatchupEveryN(1);
+
+  eventBase.runInLoop([](){usleep(10000);});
+  t.scheduleTimeout(&t2, milliseconds(5));
+
+  ASSERT_EQ(t.count(), 1);
+
+  TimePoint start2;
+  eventBase.loop();
+  TimePoint end2;
+
+  ASSERT_EQ(t2.timestamps.size(), 1);
+  ASSERT_EQ(t.count(), 0);
+
+  // Check that the timeout was NOT delayed by sleep
+  T_CHECK_TIMEOUT(start2, t2.timestamps[0], 10, 1);
+  T_CHECK_TIMEOUT(start2, end2, 10, 1);
+}
diff --git a/folly/io/async/test/SocketPair.cpp b/folly/io/async/test/SocketPair.cpp
new file mode 100644 (file)
index 0000000..73df89b
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <folly/io/async/test/SocketPair.h>
+
+#include <folly/Conv.h>
+
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <stdexcept>
+
+namespace folly {
+
+SocketPair::SocketPair(Mode mode) {
+  if (socketpair(PF_UNIX, SOCK_STREAM, 0, fds_) != 0) {
+    throw std::runtime_error(
+      folly::to<std::string>("test::SocketPair: failed create socket pair",
+                             errno));
+  }
+
+  if (mode == NONBLOCKING) {
+    if (fcntl(fds_[0], F_SETFL, O_NONBLOCK) != 0) {
+      throw std::runtime_error(
+        folly::to<std::string>("test::SocketPair: failed to set non-blocking "
+                               "read mode", errno));
+    }
+    if (fcntl(fds_[1], F_SETFL, O_NONBLOCK) != 0) {
+      throw std::runtime_error(
+        folly::to<std::string>("test::SocketPair: failed to set non-blocking "
+                               "write mode", errno));
+    }
+  }
+}
+
+SocketPair::~SocketPair() {
+  closeFD0();
+  closeFD1();
+}
+
+void SocketPair::closeFD0() {
+  if (fds_[0] >= 0) {
+    close(fds_[0]);
+    fds_[0] = -1;
+  }
+}
+
+void SocketPair::closeFD1() {
+  if (fds_[1] >= 0) {
+    close(fds_[1]);
+    fds_[1] = -1;
+  }
+}
+
+}
diff --git a/folly/io/async/test/SocketPair.h b/folly/io/async/test/SocketPair.h
new file mode 100644 (file)
index 0000000..81cf6b5
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+namespace folly {
+
+class SocketPair {
+ public:
+  enum Mode {
+    BLOCKING,
+    NONBLOCKING
+  };
+
+  explicit SocketPair(Mode mode = NONBLOCKING);
+  ~SocketPair();
+
+  int operator[](int index) const {
+    return fds_[index];
+  }
+
+  void closeFD0();
+  void closeFD1();
+
+  int extractFD0() {
+    return extractFD(0);
+  }
+  int extractFD1() {
+    return extractFD(1);
+  }
+  int extractFD(int index) {
+    int fd = fds_[index];
+    fds_[index] = -1;
+    return fd;
+  }
+
+ private:
+  int fds_[2];
+};
+
+}
diff --git a/folly/io/async/test/TimeUtil.cpp b/folly/io/async/test/TimeUtil.cpp
new file mode 100644 (file)
index 0000000..9a60a0e
--- /dev/null
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define __STDC_FORMAT_MACROS
+
+#include <folly/io/async/test/TimeUtil.h>
+
+#include <folly/Conv.h>
+
+#include <chrono>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/syscall.h>
+#include <sys/utsname.h>
+#include <errno.h>
+#include <glog/logging.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdexcept>
+
+using std::string;
+using namespace std::chrono;
+
+namespace folly {
+
+/**
+ * glibc doesn't provide gettid(), so define it ourselves.
+ */
+static pid_t gettid() {
+  return syscall(SYS_gettid);
+}
+
+/**
+ * The /proc/<pid>/schedstat file reports time values in jiffies.
+ *
+ * Determine how many jiffies are in a second.
+ * Returns -1 if the number of jiffies/second cannot be determined.
+ */
+static int64_t determineJiffiesHZ() {
+  // It seems like the only real way to figure out the CONFIG_HZ value used by
+  // this kernel is to look it up in the config file.
+  //
+  // Look in /boot/config-<kernel_release>
+  struct utsname unameInfo;
+  if (uname(&unameInfo) != 0) {
+    LOG(ERROR) << "unable to determine jiffies/second: uname failed: %s"
+               << strerror(errno);
+    return -1;
+  }
+
+  char configPath[256];
+  snprintf(configPath, sizeof(configPath), "/boot/config-%s",
+           unameInfo.release);
+
+  FILE* f = fopen(configPath, "r");
+  if (f == nullptr) {
+    LOG(ERROR) << "unable to determine jiffies/second: "
+      "cannot open kernel config file %s" << configPath;
+    return -1;
+  }
+
+  int64_t hz = -1;
+  char buf[1024];
+  while (fgets(buf, sizeof(buf), f) != nullptr) {
+    if (strcmp(buf, "CONFIG_NO_HZ=y\n") == 0) {
+      // schedstat info seems to be reported in nanoseconds on tickless
+      // kernels.
+      //
+      // The CONFIG_HZ value doesn't matter for our purposes,
+      // so return as soon as we see CONFIG_NO_HZ.
+      fclose(f);
+      return 1000000000;
+    } else if (strcmp(buf, "CONFIG_HZ=1000\n") == 0) {
+      hz = 1000;
+    } else if (strcmp(buf, "CONFIG_HZ=300\n") == 0) {
+      hz = 300;
+    } else if (strcmp(buf, "CONFIG_HZ=250\n") == 0) {
+      hz = 250;
+    } else if (strcmp(buf, "CONFIG_HZ=100\n") == 0) {
+      hz = 100;
+    }
+  }
+  fclose(f);
+
+  if (hz == -1) {
+    LOG(ERROR) << "unable to determine jiffies/second: no CONFIG_HZ setting "
+      "found in %s" << configPath;
+    return -1;
+  }
+
+  return hz;
+}
+
+/**
+ * Determine how long this process has spent waiting to get scheduled on the
+ * CPU.
+ *
+ * Returns the number of milliseconds spent waiting, or -1 if the amount of
+ * time cannot be determined.
+ */
+static milliseconds getTimeWaitingMS(pid_t tid) {
+  static int64_t jiffiesHZ = 0;
+  if (jiffiesHZ == 0) {
+    jiffiesHZ = determineJiffiesHZ();
+  }
+
+  if (jiffiesHZ < 0) {
+    // We couldn't figure out how many jiffies there are in a second.
+    // Don't bother reading the schedstat info if we can't interpret it.
+    return milliseconds(0);
+  }
+
+  int fd = -1;
+  try {
+    char schedstatFile[256];
+    snprintf(schedstatFile, sizeof(schedstatFile),
+             "/proc/%d/schedstat", tid);
+    fd = open(schedstatFile, O_RDONLY);
+    if (fd < 0) {
+      throw std::runtime_error(
+        folly::to<string>("failed to open process schedstat file", errno));
+    }
+
+    char buf[512];
+    ssize_t bytesReadRet = read(fd, buf, sizeof(buf) - 1);
+    if (bytesReadRet <= 0) {
+      throw std::runtime_error(
+        folly::to<string>("failed to read process schedstat file", errno));
+    }
+    size_t bytesRead = size_t(bytesReadRet);
+
+    if (buf[bytesRead - 1] != '\n') {
+      throw std::runtime_error("expected newline at end of schedstat data");
+    }
+    assert(bytesRead < sizeof(buf));
+    buf[bytesRead] = '\0';
+
+    uint64_t activeJiffies = 0;
+    uint64_t waitingJiffies = 0;
+    uint64_t numTasks = 0;
+    int rc = sscanf(buf, "%" PRIu64 " %" PRIu64 " %" PRIu64 "\n",
+                    &activeJiffies, &waitingJiffies, &numTasks);
+    if (rc != 3) {
+      throw std::runtime_error("failed to parse schedstat data");
+    }
+
+    close(fd);
+    return milliseconds((waitingJiffies * 1000) / jiffiesHZ);
+  } catch (const std::runtime_error& e) {
+    if (fd >= 0) {
+      close(fd);
+    }
+    LOG(ERROR) << "error determining process wait time: %s" << e.what();
+    return milliseconds(0);
+  }
+}
+
+void TimePoint::reset() {
+  // Remember the current time
+  timeStart_ = system_clock::now();
+
+  // Remember how long this process has spent waiting to be scheduled
+  tid_ = gettid();
+  timeWaiting_ = getTimeWaitingMS(tid_);
+
+  // In case it took a while to read the schedstat info,
+  // also record the time after the schedstat check
+  timeEnd_ = system_clock::now();
+}
+
+std::ostream& operator<<(std::ostream& os, const TimePoint& timePoint) {
+  os << "TimePoint(" << timePoint.getTimeStart().time_since_epoch().count()
+     << ", " << timePoint.getTimeEnd().time_since_epoch().count() << ", "
+     << timePoint.getTimeWaiting().count() << ")";
+  return os;
+}
+
+bool
+checkTimeout(const TimePoint& start, const TimePoint& end,
+             milliseconds expectedMS, bool allowSmaller,
+             milliseconds tolerance) {
+  auto elapsedMS = end.getTimeStart() - start.getTimeEnd();
+
+  if (!allowSmaller) {
+    // Timeouts should never fire before the time was up.
+    // Allow 1ms of wiggle room for rounding errors.
+    if (elapsedMS < expectedMS - milliseconds(1)) {
+      return false;
+    }
+  }
+
+  // Check that the event fired within a reasonable time of the timout.
+  //
+  // If the system is under heavy load, our process may have had to wait for a
+  // while to be run.  The time spent waiting for the processor shouldn't
+  // count against us, so exclude this time from the check.
+  milliseconds excludedMS;
+  if (end.getTid() != start.getTid()) {
+    // We can only correctly compute the amount of time waiting to be scheduled
+    // if both TimePoints were set in the same thread.
+    excludedMS = milliseconds(0);
+  } else {
+    excludedMS = end.getTimeWaiting() - start.getTimeWaiting();
+    assert(end.getTimeWaiting() >= start.getTimeWaiting());
+    // Add a tolerance here due to precision issues on linux, see below note.
+    assert( (elapsedMS + tolerance) >= excludedMS);
+  }
+
+  milliseconds effectiveElapsedMS = milliseconds(0);
+  if (elapsedMS > excludedMS) {
+    effectiveElapsedMS =  duration_cast<milliseconds>(elapsedMS) - excludedMS;
+  }
+
+  // On x86 Linux, sleep calls generally have precision only to the nearest
+  // millisecond.  The tolerance parameter lets users allow a few ms of slop.
+  milliseconds overrun = effectiveElapsedMS - expectedMS;
+  if (overrun > tolerance) {
+    return false;
+  }
+
+  return true;
+}
+
+}
diff --git a/folly/io/async/test/TimeUtil.h b/folly/io/async/test/TimeUtil.h
new file mode 100644 (file)
index 0000000..ca9c043
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <chrono>
+#include <iostream>
+
+namespace folly {
+
+class TimePoint {
+ public:
+  explicit TimePoint(bool set = true)
+    : tid_(0) {
+    if (set) {
+      reset();
+    }
+  }
+
+  void reset();
+
+  bool isUnset() const {
+    return (timeStart_.time_since_epoch().count() == 0 &&
+            timeEnd_.time_since_epoch().count() == 0 &&
+            timeWaiting_.count() == 0);
+  }
+
+  std::chrono::system_clock::time_point getTime() const {
+    return timeStart_;
+  }
+
+  std::chrono::system_clock::time_point getTimeStart() const {
+    return timeStart_;
+  }
+
+  std::chrono::system_clock::time_point getTimeEnd() const {
+    return timeStart_;
+  }
+
+  std::chrono::milliseconds getTimeWaiting() const {
+    return timeWaiting_;
+  }
+
+  pid_t getTid() const {
+    return tid_;
+  }
+
+ private:
+  std::chrono::system_clock::time_point timeStart_;
+  std::chrono::system_clock::time_point timeEnd_;
+  std::chrono::milliseconds timeWaiting_;
+  pid_t tid_;
+};
+
+std::ostream& operator<<(std::ostream& os, const TimePoint& timePoint);
+
+bool checkTimeout(const TimePoint& start,
+                  const TimePoint& end,
+                  std::chrono::milliseconds expectedMS,
+                  bool allowSmaller,
+                  std::chrono::milliseconds tolerance =
+                  std::chrono::milliseconds(5));
+
+}
diff --git a/folly/io/async/test/UndelayedDestruction.h b/folly/io/async/test/UndelayedDestruction.h
new file mode 100644 (file)
index 0000000..952b38c
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <cstdlib>
+#include <type_traits>
+#include <utility>
+#include <cassert>
+
+namespace folly {
+
+/**
+ * A helper class to allow a DelayedDestruction object to be instantiated on
+ * the stack.
+ *
+ * This class derives from an existing DelayedDestruction type and makes the
+ * destructor public again.  This allows objects of this type to be declared on
+ * the stack or directly inside another class.  Normally DelayedDestruction
+ * objects must be dynamically allocated on the heap.
+ *
+ * However, the trade-off is that you lose some of the protections provided by
+ * DelayedDestruction::destroy().  DelayedDestruction::destroy() will
+ * automatically delay destruction of the object until it is safe to do so.
+ * If you use UndelayedDestruction, you become responsible for ensuring that
+ * you only destroy the object where it is safe to do so.  Attempting to
+ * destroy a UndelayedDestruction object while it has a non-zero destructor
+ * guard count will abort the program.
+ */
+template<typename TDD>
+class UndelayedDestruction : public TDD {
+ public:
+  // We could just use constructor inheritance, but not all compilers
+  // support that. So, just use a forwarding constructor.
+  //
+  // Ideally we would use std::enable_if<> and std::is_constructible<> to
+  // provide only constructor methods that are valid for our parent class.
+  // Unfortunately std::is_constructible<> doesn't work for types that aren't
+  // destructible.  In gcc-4.6 it results in a compiler error.  In the latest
+  // gcc code it looks like it has been fixed to return false.  (The language
+  // in the standard seems to indicate that returning false is the correct
+  // behavior for non-destructible types, which is unfortunate.)
+  template<typename ...Args>
+  explicit UndelayedDestruction(Args&& ...args)
+    : TDD(std::forward<Args>(args)...) {}
+
+  /**
+   * Public destructor.
+   *
+   * The caller is responsible for ensuring that the object is only destroyed
+   * where it is safe to do so.  (i.e., when the destructor guard count is 0).
+   *
+   * The exact conditions for meeting this may be dependant upon your class
+   * semantics.  Typically you are only guaranteed that it is safe to destroy
+   * the object directly from the event loop (e.g., directly from a
+   * TEventBase::LoopCallback), or when the event loop is stopped.
+   */
+  virtual ~UndelayedDestruction() {
+    // Crash if the caller is destroying us with outstanding destructor guards.
+    if (this->getDestructorGuardCount() != 0) {
+      abort();
+    }
+    // Invoke destroy.  This is necessary since our base class may have
+    // implemented custom behavior in destroy().
+    this->destroy();
+  }
+
+ protected:
+  /**
+   * Override our parent's destroy() method to make it protected.
+   * Callers should use the normal destructor instead of destroy
+   */
+  virtual void destroy() {
+    this->TDD::destroy();
+  }
+
+  virtual void destroyNow(bool delayed) {
+    // Do nothing.  This will always be invoked from the call to destroy inside
+    // our destructor.
+    assert(!delayed);
+    // prevent unused variable warnings when asserts are compiled out.
+    (void)delayed;
+  }
+
+ private:
+  // Forbidden copy constructor and assignment operator
+  UndelayedDestruction(UndelayedDestruction const &) = delete;
+  UndelayedDestruction& operator=(UndelayedDestruction const &) = delete;
+};
+
+}
diff --git a/folly/io/async/test/Util.h b/folly/io/async/test/Util.h
new file mode 100644 (file)
index 0000000..e0b7360
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <folly/io/async/test/TimeUtil.h>
+#include <gtest/gtest.h>
+
+/**
+ * Check how long a timeout took to fire.
+ *
+ * This method verifies:
+ * - that the timeout did not fire too early (never less than expectedMS)
+ * - that the timeout fired within a reasonable period of the expected
+ *   duration.  It must fire within the specified tolerance, excluding time
+ *   that this process spent waiting to be scheduled.
+ *
+ * @param start                 A TimePoint object set just before the timeout
+ *                              was scheduled.
+ * @param end                   A TimePoint object set when the timeout fired.
+ * @param expectedMS            The timeout duration, in milliseconds
+ * @param tolerance             The tolerance, in milliseconds.
+ */
+#define T_CHECK_TIMEOUT(start, end, expectedMS, ...) \
+  EXPECT_TRUE(::folly::checkTimeout((start), (end),  \
+                                    (expectedMS), false,  \
+                                    ##__VA_ARGS__))
+
+/**
+ * Verify that an event took less than a specified amount of time.
+ *
+ * This is similar to T_CHECK_TIMEOUT, but does not fail if the event took less
+ * than the allowed time.
+ */
+#define T_CHECK_TIME_LT(start, end, expectedMS, ...) \
+  EXPECT_TRUE(::folly::checkTimeout((start), (end),  \
+                                    (expectedMS), true, \
+                                    ##__VA_ARGS__))