Adds writer test case for RCU
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
index 81acc8265a1e74f07d96185ffe9a19b4130628e1..82ceed8220feb78562bc0ef101da66c29b77a1c8 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2010-present Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <folly/io/async/AsyncServerSocket.h>
+
+#include <folly/io/async/test/AsyncSocketTest2.h>
+
+#include <folly/ConstexprMath.h>
+#include <folly/ExceptionWrapper.h>
+#include <folly/Random.h>
+#include <folly/SocketAddress.h>
 #include <folly/io/async/AsyncSocket.h>
 #include <folly/io/async/AsyncTimeout.h>
 #include <folly/io/async/EventBase.h>
-#include <folly/RWSpinLock.h>
-#include <folly/SocketAddress.h>
 
+#include <folly/experimental/TestUtil.h>
 #include <folly/io/IOBuf.h>
 #include <folly/io/async/test/AsyncSocketTest.h>
 #include <folly/io/async/test/Util.h>
+#include <folly/portability/GMock.h>
+#include <folly/portability/GTest.h>
+#include <folly/portability/Sockets.h>
+#include <folly/portability/Unistd.h>
 #include <folly/test/SocketAddressTestHelper.h>
 
-#include <gtest/gtest.h>
 #include <boost/scoped_array.hpp>
-#include <iostream>
-#include <unistd.h>
 #include <fcntl.h>
-#include <poll.h>
 #include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/tcp.h>
+#include <iostream>
 #include <thread>
 
 using namespace boost;
@@ -48,6 +52,10 @@ using std::chrono::milliseconds;
 using boost::scoped_array;
 
 using namespace folly;
+using namespace folly::test;
+using namespace testing;
+
+namespace fsp = folly::portability::sockets;
 
 class DelayedWrite: public AsyncTimeout {
  public:
@@ -96,10 +104,33 @@ TEST(AsyncSocketTest, Connect) {
 
   evb.loop();
 
-  CHECK_EQ(cb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(cb.state, STATE_SUCCEEDED);
   EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+}
+
+enum class TFOState {
+  DISABLED,
+  ENABLED,
+};
+
+class AsyncSocketConnectTest : public ::testing::TestWithParam<TFOState> {};
+
+std::vector<TFOState> getTestingValues() {
+  std::vector<TFOState> vals;
+  vals.emplace_back(TFOState::DISABLED);
+
+#if FOLLY_ALLOW_TFO
+  vals.emplace_back(TFOState::ENABLED);
+#endif
+  return vals;
 }
 
+INSTANTIATE_TEST_CASE_P(
+    ConnectTests,
+    AsyncSocketConnectTest,
+    ::testing::ValuesIn(getTestingValues()));
+
 /**
  * Test connecting to a server that isn't listening
  */
@@ -115,9 +146,10 @@ TEST(AsyncSocketTest, ConnectRefused) {
 
   evb.loop();
 
-  CHECK_EQ(cb.state, STATE_FAILED);
-  CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
+  EXPECT_EQ(STATE_FAILED, cb.state);
+  EXPECT_EQ(AsyncSocketException::NOT_OPEN, cb.exception.getType());
   EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
 }
 
 /**
@@ -146,28 +178,34 @@ TEST(AsyncSocketTest, ConnectTimeout) {
 
   evb.loop();
 
-  CHECK_EQ(cb.state, STATE_FAILED);
-  CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
+  ASSERT_EQ(cb.state, STATE_FAILED);
+  ASSERT_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
 
   // Verify that we can still get the peer address after a timeout.
   // Use case is if the client was created from a client pool, and we want
   // to log which peer failed.
   folly::SocketAddress peer;
   socket->getPeerAddress(&peer);
-  CHECK_EQ(peer, addr);
+  ASSERT_EQ(peer, addr);
   EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
 }
 
 /**
  * Test writing immediately after connecting, without waiting for connect
  * to finish.
  */
-TEST(AsyncSocketTest, ConnectAndWrite) {
+TEST_P(AsyncSocketConnectTest, ConnectAndWrite) {
   TestServer server;
 
   // connect()
   EventBase evb;
   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+  if (GetParam() == TFOState::ENABLED) {
+    socket->enableTFO();
+  }
+
   ConnCallback ccb;
   socket->connect(&ccb, server.getAddress(), 30);
 
@@ -181,8 +219,8 @@ TEST(AsyncSocketTest, ConnectAndWrite) {
   // The kernel should be able to buffer the write request so it can succeed.
   evb.loop();
 
-  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
-  CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
 
   // Make sure the server got a connection and received the data
   socket->close();
@@ -190,17 +228,22 @@ TEST(AsyncSocketTest, ConnectAndWrite) {
 
   ASSERT_TRUE(socket->isClosedBySelf());
   ASSERT_FALSE(socket->isClosedByPeer());
+  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
 }
 
 /**
  * Test connecting using a nullptr connect callback.
  */
-TEST(AsyncSocketTest, ConnectNullCallback) {
+TEST_P(AsyncSocketConnectTest, ConnectNullCallback) {
   TestServer server;
 
   // connect()
   EventBase evb;
   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  if (GetParam() == TFOState::ENABLED) {
+    socket->enableTFO();
+  }
+
   socket->connect(nullptr, server.getAddress(), 30);
 
   // write some data, just so we have some way of verifing
@@ -212,7 +255,7 @@ TEST(AsyncSocketTest, ConnectNullCallback) {
 
   evb.loop();
 
-  CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
 
   // Make sure the server got a connection and received the data
   socket->close();
@@ -228,12 +271,15 @@ TEST(AsyncSocketTest, ConnectNullCallback) {
  *
  * This exercises the STATE_CONNECTING_CLOSING code.
  */
-TEST(AsyncSocketTest, ConnectWriteAndClose) {
+TEST_P(AsyncSocketConnectTest, ConnectWriteAndClose) {
   TestServer server;
 
   // connect()
   EventBase evb;
   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  if (GetParam() == TFOState::ENABLED) {
+    socket->enableTFO();
+  }
   ConnCallback ccb;
   socket->connect(&ccb, server.getAddress(), 30);
 
@@ -250,8 +296,8 @@ TEST(AsyncSocketTest, ConnectWriteAndClose) {
   // The kernel should be able to buffer the write request so it can succeed.
   evb.loop();
 
-  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
-  CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
 
   // Make sure the server got a connection and received the data
   server.verifyConnection(buf, sizeof(buf));
@@ -286,7 +332,7 @@ TEST(AsyncSocketTest, ConnectAndClose) {
   evb.loop();
 
   // Make sure the connection was aborted
-  CHECK_EQ(ccb.state, STATE_FAILED);
+  ASSERT_EQ(ccb.state, STATE_FAILED);
 
   ASSERT_TRUE(socket->isClosedBySelf());
   ASSERT_FALSE(socket->isClosedByPeer());
@@ -320,7 +366,7 @@ TEST(AsyncSocketTest, ConnectAndCloseNow) {
   evb.loop();
 
   // Make sure the connection was aborted
-  CHECK_EQ(ccb.state, STATE_FAILED);
+  ASSERT_EQ(ccb.state, STATE_FAILED);
 
   ASSERT_TRUE(socket->isClosedBySelf());
   ASSERT_FALSE(socket->isClosedByPeer());
@@ -361,8 +407,8 @@ TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
   // Loop, although there shouldn't be anything to do.
   evb.loop();
 
-  CHECK_EQ(ccb.state, STATE_FAILED);
-  CHECK_EQ(wcb.state, STATE_FAILED);
+  ASSERT_EQ(ccb.state, STATE_FAILED);
+  ASSERT_EQ(wcb.state, STATE_FAILED);
 
   ASSERT_TRUE(socket->isClosedBySelf());
   ASSERT_FALSE(socket->isClosedByPeer());
@@ -371,18 +417,27 @@ TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
 /**
  * Test installing a read callback immediately, before connect() finishes.
  */
-TEST(AsyncSocketTest, ConnectAndRead) {
+TEST_P(AsyncSocketConnectTest, ConnectAndRead) {
   TestServer server;
 
   // connect()
   EventBase evb;
   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  if (GetParam() == TFOState::ENABLED) {
+    socket->enableTFO();
+  }
+
   ConnCallback ccb;
   socket->connect(&ccb, server.getAddress(), 30);
 
   ReadCallback rcb;
   socket->setReadCB(&rcb);
 
+  if (GetParam() == TFOState::ENABLED) {
+    // Trigger a connection
+    socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
+  }
+
   // Even though we haven't looped yet, we should be able to accept
   // the connection and send data to it.
   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
@@ -395,11 +450,10 @@ TEST(AsyncSocketTest, ConnectAndRead) {
   // Loop, although there shouldn't be anything to do.
   evb.loop();
 
-  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
-  CHECK_EQ(rcb.state, STATE_SUCCEEDED);
-  CHECK_EQ(rcb.buffers.size(), 1);
-  CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
-  CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(rcb.buffers.size(), 1);
+  ASSERT_EQ(rcb.buffers[0].length, sizeof(buf));
+  ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
 
   ASSERT_FALSE(socket->isClosedBySelf());
   ASSERT_FALSE(socket->isClosedByPeer());
@@ -435,9 +489,9 @@ TEST(AsyncSocketTest, ConnectReadAndClose) {
   // Loop, although there shouldn't be anything to do.
   evb.loop();
 
-  CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
-  CHECK_EQ(rcb.buffers.size(), 0);
-  CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
+  ASSERT_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
+  ASSERT_EQ(rcb.buffers.size(), 0);
+  ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
 
   ASSERT_TRUE(socket->isClosedBySelf());
   ASSERT_FALSE(socket->isClosedByPeer());
@@ -447,12 +501,15 @@ TEST(AsyncSocketTest, ConnectReadAndClose) {
  * Test both writing and installing a read callback immediately,
  * before connect() finishes.
  */
-TEST(AsyncSocketTest, ConnectWriteAndRead) {
+TEST_P(AsyncSocketConnectTest, ConnectWriteAndRead) {
   TestServer server;
 
   // connect()
   EventBase evb;
   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  if (GetParam() == TFOState::ENABLED) {
+    socket->enableTFO();
+  }
   ConnCallback ccb;
   socket->connect(&ccb, server.getAddress(), 30);
 
@@ -482,13 +539,13 @@ TEST(AsyncSocketTest, ConnectWriteAndRead) {
   evb.loop();
 
   // Make sure the connect succeeded
-  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
 
   // Make sure the AsyncSocket read the data written by the accepted socket
-  CHECK_EQ(rcb.state, STATE_SUCCEEDED);
-  CHECK_EQ(rcb.buffers.size(), 1);
-  CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
-  CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
+  ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(rcb.buffers.size(), 1);
+  ASSERT_EQ(rcb.buffers[0].length, sizeof(buf2));
+  ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
 
   // Close the AsyncSocket so we'll see EOF on acceptedSocket
   socket->close();
@@ -496,9 +553,9 @@ TEST(AsyncSocketTest, ConnectWriteAndRead) {
   // Make sure the accepted socket saw the data written by the AsyncSocket
   uint8_t readbuf[sizeof(buf1)];
   acceptedSocket->readAll(readbuf, sizeof(readbuf));
-  CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
+  ASSERT_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
-  CHECK_EQ(bytesRead, 0);
+  ASSERT_EQ(bytesRead, 0);
 
   ASSERT_FALSE(socket->isClosedBySelf());
   ASSERT_TRUE(socket->isClosedByPeer());
@@ -545,7 +602,7 @@ TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
   fds[0].events = POLLIN;
   fds[0].revents = 0;
   int rc = poll(fds, 1, 0);
-  CHECK_EQ(rc, 0);
+  ASSERT_EQ(rc, 0);
 
   // Write data to the accepted socket
   uint8_t acceptedWbuf[192];
@@ -561,16 +618,16 @@ TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
   //
   // Check that the connection was completed successfully and that the write
   // callback succeeded.
-  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
-  CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
 
   // Check that we can read the data that was written to the socket, and that
   // we see an EOF, since its socket was half-shutdown.
   uint8_t readbuf[sizeof(wbuf)];
   acceptedSocket->readAll(readbuf, sizeof(readbuf));
-  CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
+  ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
-  CHECK_EQ(bytesRead, 0);
+  ASSERT_EQ(bytesRead, 0);
 
   // Close the accepted socket.  This will cause it to see EOF
   // and uninstall the read callback when we loop next.
@@ -582,10 +639,10 @@ TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
   evb.loop();
 
   // This loop should have read the data and seen the EOF
-  CHECK_EQ(rcb.state, STATE_SUCCEEDED);
-  CHECK_EQ(rcb.buffers.size(), 1);
-  CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
-  CHECK_EQ(memcmp(rcb.buffers[0].buffer,
+  ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(rcb.buffers.size(), 1);
+  ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
+  ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
 
   ASSERT_FALSE(socket->isClosedBySelf());
@@ -636,7 +693,7 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
   fds[0].events = POLLIN;
   fds[0].revents = 0;
   int rc = poll(fds, 1, 0);
-  CHECK_EQ(rc, 0);
+  ASSERT_EQ(rc, 0);
 
   // Write data to the accepted socket
   uint8_t acceptedWbuf[192];
@@ -645,7 +702,7 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
   acceptedSocket->flush();
   // Shutdown writes to the accepted socket.  This will cause it to see EOF
   // and uninstall the read callback.
-  ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
+  shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
 
   // Loop
   evb.loop();
@@ -656,21 +713,21 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
   //
   // Check that the connection was completed successfully and that the read
   // and write callbacks were invoked as expected.
-  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
-  CHECK_EQ(rcb.state, STATE_SUCCEEDED);
-  CHECK_EQ(rcb.buffers.size(), 1);
-  CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
-  CHECK_EQ(memcmp(rcb.buffers[0].buffer,
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(rcb.buffers.size(), 1);
+  ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
+  ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
-  CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
 
   // Check that we can read the data that was written to the socket, and that
   // we see an EOF, since its socket was half-shutdown.
   uint8_t readbuf[sizeof(wbuf)];
   acceptedSocket->readAll(readbuf, sizeof(readbuf));
-  CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
+  ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
-  CHECK_EQ(bytesRead, 0);
+  ASSERT_EQ(bytesRead, 0);
 
   // Fully close both sockets
   acceptedSocket->close();
@@ -715,8 +772,8 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
   socket->shutdownWriteNow();
 
   // Verify that writeError() was invoked on the write callback.
-  CHECK_EQ(wcb.state, STATE_FAILED);
-  CHECK_EQ(wcb.bytesWritten, 0);
+  ASSERT_EQ(wcb.state, STATE_FAILED);
+  ASSERT_EQ(wcb.bytesWritten, 0);
 
   // Even though we haven't looped yet, we should be able to accept
   // the connection.
@@ -729,7 +786,7 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
   fds[0].events = POLLIN;
   fds[0].revents = 0;
   int rc = poll(fds, 1, 0);
-  CHECK_EQ(rc, 0);
+  ASSERT_EQ(rc, 0);
 
   // Write data to the accepted socket
   uint8_t acceptedWbuf[192];
@@ -738,7 +795,7 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
   acceptedSocket->flush();
   // Shutdown writes to the accepted socket.  This will cause it to see EOF
   // and uninstall the read callback.
-  ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
+  shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
 
   // Loop
   evb.loop();
@@ -749,11 +806,11 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
   //
   // Check that the connection was completed successfully and that the read
   // callback was invoked as expected.
-  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
-  CHECK_EQ(rcb.state, STATE_SUCCEEDED);
-  CHECK_EQ(rcb.buffers.size(), 1);
-  CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
-  CHECK_EQ(memcmp(rcb.buffers[0].buffer,
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(rcb.buffers.size(), 1);
+  ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
+  ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
 
   // Since we used shutdownWriteNow(), it should have discarded all pending
@@ -761,7 +818,7 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
   // socket.
   uint8_t readbuf[sizeof(wbuf)];
   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
-  CHECK_EQ(bytesRead, 0);
+  ASSERT_EQ(bytesRead, 0);
 
   // Fully close both sockets
   acceptedSocket->close();
@@ -842,12 +899,12 @@ void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
   // The kernel should be able to buffer the write request so it can succeed.
   evb.loop();
 
-  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
   if (size1 > 0) {
-    CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
+    ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
   }
   if (size2 > 0) {
-    CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
+    ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
   }
 
   socket->close();
@@ -862,7 +919,7 @@ void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
     size_t end = bytesRead;
     if (start < size1) {
       size_t cmpLen = min(size1, end) - start;
-      CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
+      ASSERT_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
     }
     if (end > size1 && end <= size1 + size2) {
       size_t itOffset;
@@ -877,12 +934,12 @@ void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
         buf2Offset = 0;
         cmpLen = end - size1;
       }
-      CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
+      ASSERT_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
                                cmpLen),
                         0);
     }
   }
-  CHECK_EQ(bytesRead, size1 + size2);
+  ASSERT_EQ(bytesRead, size1 + size2);
 }
 
 TEST(AsyncSocketTest, ConnectCallbackWrite) {
@@ -890,7 +947,7 @@ TEST(AsyncSocketTest, ConnectCallbackWrite) {
   testConnectOptWrite(100, 200);
 
   // Test using a large buffer in the connect callback, that should block
-  const size_t largeSize = 8*1024*1024;
+  const size_t largeSize = 32 * 1024 * 1024;
   testConnectOptWrite(100, largeSize);
 
   // Test using a large initial write
@@ -959,8 +1016,12 @@ TEST(AsyncSocketTest, WriteTimeout) {
     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
   evb.loop(); // loop until the socket is connected
 
-  // write() a large chunk of data, with no-one on the other end reading
-  size_t writeLength = 8*1024*1024;
+  // write() a large chunk of data, with no-one on the other end reading.
+  // Tricky: the kernel caches the connection metrics for recently-used
+  // routes (see tcp_no_metrics_save) so a freshly opened connection can
+  // have a send buffer size bigger than wmem_default.  This makes the test
+  // flaky on contbuild if writeLength is < wmem_max (20M on our systems).
+  size_t writeLength = 32 * 1024 * 1024;
   uint32_t timeout = 200;
   socket->setSendTimeout(timeout);
   scoped_array<char> buf(new char[writeLength]);
@@ -973,8 +1034,8 @@ TEST(AsyncSocketTest, WriteTimeout) {
   TimePoint end;
 
   // Make sure the write attempt timed out as requested
-  CHECK_EQ(wcb.state, STATE_FAILED);
-  CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
+  ASSERT_EQ(wcb.state, STATE_FAILED);
+  ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
 
   // Check that the write timed out within a reasonable period of time.
   // We don't check for exactly the specified timeout, since AsyncSocket only
@@ -1014,10 +1075,10 @@ TEST(AsyncSocketTest, WritePipeError) {
 
   // accept and immediately close the socket
   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
-  acceptedSocket.reset();
+  acceptedSocket->close();
 
   // write() a large chunk of data
-  size_t writeLength = 8*1024*1024;
+  size_t writeLength = 32 * 1024 * 1024;
   scoped_array<char> buf(new char[writeLength]);
   memset(buf.get(), 'a', writeLength);
   WriteCallback wcb;
@@ -1028,14 +1089,133 @@ TEST(AsyncSocketTest, WritePipeError) {
   // Make sure the write failed.
   // It would be nice if AsyncSocketException could convey the errno value,
   // so that we could check for EPIPE
-  CHECK_EQ(wcb.state, STATE_FAILED);
-  CHECK_EQ(wcb.exception.getType(),
+  ASSERT_EQ(wcb.state, STATE_FAILED);
+  ASSERT_EQ(wcb.exception.getType(),
                     AsyncSocketException::INTERNAL_ERROR);
 
   ASSERT_FALSE(socket->isClosedBySelf());
   ASSERT_FALSE(socket->isClosedByPeer());
 }
 
+/**
+ * Test writing to a socket that has its read side closed
+ */
+TEST(AsyncSocketTest, WriteAfterReadEOF) {
+  TestServer server;
+
+  // connect()
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket =
+      AsyncSocket::newSocket(&evb, server.getAddress(), 30);
+  evb.loop(); // loop until the socket is connected
+
+  // Accept the connection
+  std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
+  ReadCallback rcb;
+  acceptedSocket->setReadCB(&rcb);
+
+  // Shutdown the write side of client socket (read side of server socket)
+  socket->shutdownWrite();
+  evb.loop();
+
+  // Check that accepted socket is still writable
+  ASSERT_FALSE(acceptedSocket->good());
+  ASSERT_TRUE(acceptedSocket->writable());
+
+  // Write data to accepted socket
+  constexpr size_t simpleBufLength = 5;
+  char simpleBuf[simpleBufLength];
+  memset(simpleBuf, 'a', simpleBufLength);
+  WriteCallback wcb;
+  acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
+  evb.loop();
+
+  // Make sure we were able to write even after getting a read EOF
+  ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+}
+
+/**
+ * Test that bytes written is correctly computed in case of write failure
+ */
+TEST(AsyncSocketTest, WriteErrorCallbackBytesWritten) {
+  // Send and receive buffer sizes for the sockets.
+  constexpr size_t kSockBufSize = 8 * 1024;
+
+  TestServer server(false, kSockBufSize);
+
+  AsyncSocket::OptionMap options{
+      {{SOL_SOCKET, SO_SNDBUF}, kSockBufSize},
+      {{SOL_SOCKET, SO_RCVBUF}, kSockBufSize},
+      {{IPPROTO_TCP, TCP_NODELAY}, 1},
+  };
+
+  // The current thread will be used by the receiver - use a separate thread
+  // for the sender.
+  EventBase senderEvb;
+  std::thread senderThread([&]() { senderEvb.loopForever(); });
+
+  ConnCallback ccb;
+  std::shared_ptr<AsyncSocket> socket;
+
+  senderEvb.runInEventBaseThreadAndWait([&]() {
+    socket = AsyncSocket::newSocket(&senderEvb);
+    socket->connect(&ccb, server.getAddress(), 30, options);
+  });
+
+  // accept the socket on the server side
+  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+
+  // Send a big (45KB) write so that it is partially written. The first write
+  // is 16KB (8KB on both sides) and subsequent writes are 8KB each. Reading
+  // just under 24KB would cause 3-4 writes for the total of 32-40KB in the
+  // following sequence: 16KB + 8KB + 8KB (+ 8KB). This ensures that not all
+  // bytes are written when the socket is reset. Having at least 3 writes
+  // ensures that the total size (45KB) would be exceeed in case of overcounting
+  // based on the initial write size of 16KB.
+  constexpr size_t kSendSize = 45 * 1024;
+  auto const sendBuf = std::vector<char>(kSendSize, 'a');
+
+  WriteCallback wcb;
+
+  senderEvb.runInEventBaseThreadAndWait(
+      [&]() { socket->write(&wcb, sendBuf.data(), kSendSize); });
+
+  // Reading 20KB would cause three additional writes of 8KB, but less
+  // than 45KB total, so the socket is reset before all bytes are written.
+  constexpr size_t kRecvSize = 20 * 1024;
+  uint8_t recvBuf[kRecvSize];
+  int bytesRead = acceptedSocket->readAll(recvBuf, sizeof(recvBuf));
+  ASSERT_EQ(kRecvSize, bytesRead);
+
+  constexpr size_t kMinExpectedBytesWritten = // 20 ACK + 8 send buf
+      kRecvSize + kSockBufSize;
+  static_assert(kMinExpectedBytesWritten == 28 * 1024, "bad math");
+  static_assert(kMinExpectedBytesWritten > kRecvSize, "bad math");
+
+  constexpr size_t kMaxExpectedBytesWritten = // 24 ACK + 8 sent + 8 send buf
+      constexpr_ceil(kRecvSize, kSockBufSize) + 2 * kSockBufSize;
+  static_assert(kMaxExpectedBytesWritten == 40 * 1024, "bad math");
+  static_assert(kMaxExpectedBytesWritten < kSendSize, "bad math");
+
+  // Need to delay after receiving 20KB and before closing the receive side so
+  // that the send side has a chance to fill the send buffer past.
+  using clock = std::chrono::steady_clock;
+  auto const deadline = clock::now() + std::chrono::seconds(2);
+  while (wcb.bytesWritten < kMinExpectedBytesWritten &&
+         clock::now() < deadline) {
+    std::this_thread::yield();
+  }
+  acceptedSocket->closeWithReset();
+
+  senderEvb.terminateLoopSoon();
+  senderThread.join();
+
+  ASSERT_EQ(STATE_FAILED, wcb.state);
+  ASSERT_LE(kMinExpectedBytesWritten, wcb.bytesWritten);
+  ASSERT_GE(kMaxExpectedBytesWritten, wcb.bytesWritten);
+}
+
 /**
  * Test writing a mix of simple buffers and IOBufs
  */
@@ -1053,8 +1233,15 @@ TEST(AsyncSocketTest, WriteIOBuf) {
   ReadCallback rcb;
   acceptedSocket->setReadCB(&rcb);
 
+  // Check if EOR tracking flag can be set and reset.
+  EXPECT_FALSE(socket->isEorTrackingEnabled());
+  socket->setEorTracking(true);
+  EXPECT_TRUE(socket->isEorTrackingEnabled());
+  socket->setEorTracking(false);
+  EXPECT_FALSE(socket->isEorTrackingEnabled());
+
   // Write a simple buffer to the socket
-  size_t simpleBufLength = 5;
+  constexpr size_t simpleBufLength = 5;
   char simpleBuf[simpleBufLength];
   memset(simpleBuf, 'a', simpleBufLength);
   WriteCallback wcb;
@@ -1088,21 +1275,21 @@ TEST(AsyncSocketTest, WriteIOBuf) {
   // Let the reads and writes run to completion
   evb.loop();
 
-  CHECK_EQ(wcb.state, STATE_SUCCEEDED);
-  CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
-  CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
 
   // Make sure the reader got the right data in the right order
-  CHECK_EQ(rcb.state, STATE_SUCCEEDED);
-  CHECK_EQ(rcb.buffers.size(), 1);
-  CHECK_EQ(rcb.buffers[0].length,
+  ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(rcb.buffers.size(), 1);
+  ASSERT_EQ(rcb.buffers[0].length,
       simpleBufLength + buf1Length + buf2Length + buf3Length);
-  CHECK_EQ(
+  ASSERT_EQ(
       memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
-  CHECK_EQ(
+  ASSERT_EQ(
       memcmp(rcb.buffers[0].buffer + simpleBufLength,
           buf1Copy->data(), buf1Copy->length()), 0);
-  CHECK_EQ(
+  ASSERT_EQ(
       memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
           buf2Copy->data(), buf2Copy->length()), 0);
 
@@ -1150,22 +1337,22 @@ TEST(AsyncSocketTest, WriteIOBufCorked) {
   write2.scheduleTimeout(100);
   WriteCallback wcb3;
   DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
-  write3.scheduleTimeout(200);
+  write3.scheduleTimeout(140);
 
   evb.loop();
-  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
-  CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
-  CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
   if (wcb3.state != STATE_SUCCEEDED) {
     throw(wcb3.exception);
   }
-  CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
 
   // Make sure the reader got the data with the right grouping
-  CHECK_EQ(rcb.state, STATE_SUCCEEDED);
-  CHECK_EQ(rcb.buffers.size(), 2);
-  CHECK_EQ(rcb.buffers[0].length, buf1Length);
-  CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
+  ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(rcb.buffers.size(), 2);
+  ASSERT_EQ(rcb.buffers[0].length, buf1Length);
+  ASSERT_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
 
   acceptedSocket->close();
   socket->close();
@@ -1208,10 +1395,10 @@ TEST(AsyncSocketTest, ZeroLengthWrite) {
 
   evb.loop(); // loop until the data is sent
 
-  CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
-  CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
-  CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
-  CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb4.state, STATE_SUCCEEDED);
   rcb.verifyData(buf.get(), len1 + len2);
 
   ASSERT_TRUE(socket->isClosedBySelf());
@@ -1238,7 +1425,7 @@ TEST(AsyncSocketTest, ZeroLengthWritev) {
   memset(buf.get(), 'b', len2);
 
   WriteCallback wcb;
-  size_t iovCount = 4;
+  constexpr size_t iovCount = 4;
   struct iovec iov[iovCount];
   iov[0].iov_base = buf.get();
   iov[0].iov_len = len1;
@@ -1253,7 +1440,7 @@ TEST(AsyncSocketTest, ZeroLengthWritev) {
   socket->close();
   evb.loop(); // loop until the data is sent
 
-  CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
   rcb.verifyData(buf.get(), len1 + len2);
 
   ASSERT_TRUE(socket->isClosedBySelf());
@@ -1283,7 +1470,7 @@ TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
   evb.loop();
 
   // Make sure we are connected
-  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
 
   // Schedule pending writes, until several write attempts have blocked
   char buf[128];
@@ -1314,7 +1501,7 @@ TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
   for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
        it != writeCallbacks.end();
        ++it) {
-    CHECK_EQ((*it)->state, STATE_FAILED);
+    ASSERT_EQ((*it)->state, STATE_FAILED);
   }
 
   ASSERT_TRUE(socket->isClosedBySelf());
@@ -1375,9 +1562,9 @@ TEST(AsyncSocket, ConnectReadImmediateRead) {
   WriteCallback wcb1;
   socket.write(&wcb1, expectedData, expectedDataSz);
   evb.loop();
-  CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
   rcb.verifyData(expectedData, expectedDataSz);
-  CHECK_EQ(socket.immediateReadCalled, true);
+  ASSERT_EQ(socket.immediateReadCalled, true);
 
   ASSERT_FALSE(socket.isClosedBySelf());
   ASSERT_FALSE(socket.isClosedByPeer());
@@ -1426,12 +1613,12 @@ TEST(AsyncSocket, ConnectReadUninstallRead) {
   WriteCallback wcb;
   socket.write(&wcb, expectedData, expectedDataSz);
   evb.loop();
-  CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
 
   /* we shoud've only read maxBufferSz data since readCallback_
    * was reset in dataAvailableCallback */
-  CHECK_EQ(rcb.dataRead(), maxBufferSz);
-  CHECK_EQ(socket.immediateReadCalled, false);
+  ASSERT_EQ(rcb.dataRead(), maxBufferSz);
+  ASSERT_EQ(socket.immediateReadCalled, false);
 
   ASSERT_FALSE(socket.isClosedBySelf());
   ASSERT_FALSE(socket.isClosedByPeer());
@@ -1453,214 +1640,6 @@ TEST(AsyncSocket, ConnectReadUninstallRead) {
 ///////////////////////////////////////////////////////////////////////////
 // AsyncServerSocket tests
 ///////////////////////////////////////////////////////////////////////////
-namespace {
-/**
- * Helper ConnectionEventCallback class for the test code.
- * It maintains counters protected by a spin lock.
- */
-class TestConnectionEventCallback :
-  public AsyncServerSocket::ConnectionEventCallback {
- public:
-  virtual void onConnectionAccepted(
-      const int socket,
-      const SocketAddress& addr) noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    connectionAccepted_++;
-  }
-
-  virtual void onConnectionAcceptError(const int err) noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    connectionAcceptedError_++;
-  }
-
-  virtual void onConnectionDropped(
-      const int socket,
-      const SocketAddress& addr) noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    connectionDropped_++;
-  }
-
-  virtual void onConnectionEnqueuedForAcceptorCallback(
-      const int socket,
-      const SocketAddress& addr) noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    connectionEnqueuedForAcceptCallback_++;
-  }
-
-  virtual void onConnectionDequeuedByAcceptorCallback(
-      const int socket,
-      const SocketAddress& addr) noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    connectionDequeuedByAcceptCallback_++;
-  }
-
-  virtual void onBackoffStarted() noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    backoffStarted_++;
-  }
-
-  virtual void onBackoffEnded() noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    backoffEnded_++;
-  }
-
-  virtual void onBackoffError() noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    backoffError_++;
-  }
-
-  unsigned int getConnectionAccepted() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return connectionAccepted_;
-  }
-
-  unsigned int getConnectionAcceptedError() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return connectionAcceptedError_;
-  }
-
-  unsigned int getConnectionDropped() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return connectionDropped_;
-  }
-
-  unsigned int getConnectionEnqueuedForAcceptCallback() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return connectionEnqueuedForAcceptCallback_;
-  }
-
-  unsigned int getConnectionDequeuedByAcceptCallback() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return connectionDequeuedByAcceptCallback_;
-  }
-
-  unsigned int getBackoffStarted() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return backoffStarted_;
-  }
-
-  unsigned int getBackoffEnded() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return backoffEnded_;
-  }
-
-  unsigned int getBackoffError() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return backoffError_;
-  }
-
- private:
-  mutable folly::RWSpinLock spinLock_;
-  unsigned int connectionAccepted_{0};
-  unsigned int connectionAcceptedError_{0};
-  unsigned int connectionDropped_{0};
-  unsigned int connectionEnqueuedForAcceptCallback_{0};
-  unsigned int connectionDequeuedByAcceptCallback_{0};
-  unsigned int backoffStarted_{0};
-  unsigned int backoffEnded_{0};
-  unsigned int backoffError_{0};
-};
-
-/**
- * Helper AcceptCallback class for the test code
- * It records the callbacks that were invoked, and also supports calling
- * generic std::function objects in each callback.
- */
-class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
- public:
-  enum EventType {
-    TYPE_START,
-    TYPE_ACCEPT,
-    TYPE_ERROR,
-    TYPE_STOP
-  };
-  struct EventInfo {
-    EventInfo(int fd, const folly::SocketAddress& addr)
-      : type(TYPE_ACCEPT),
-        fd(fd),
-        address(addr),
-        errorMsg() {}
-    explicit EventInfo(const std::string& msg)
-      : type(TYPE_ERROR),
-        fd(-1),
-        address(),
-        errorMsg(msg) {}
-    explicit EventInfo(EventType et)
-      : type(et),
-        fd(-1),
-        address(),
-        errorMsg() {}
-
-    EventType type;
-    int fd;  // valid for TYPE_ACCEPT
-    folly::SocketAddress address;  // valid for TYPE_ACCEPT
-    string errorMsg;  // valid for TYPE_ERROR
-  };
-  typedef std::deque<EventInfo> EventList;
-
-  TestAcceptCallback()
-    : connectionAcceptedFn_(),
-      acceptErrorFn_(),
-      acceptStoppedFn_(),
-      events_() {}
-
-  std::deque<EventInfo>* getEvents() {
-    return &events_;
-  }
-
-  void setConnectionAcceptedFn(
-      const std::function<void(int, const folly::SocketAddress&)>& fn) {
-    connectionAcceptedFn_ = fn;
-  }
-  void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
-    acceptErrorFn_ = fn;
-  }
-  void setAcceptStartedFn(const std::function<void()>& fn) {
-    acceptStartedFn_ = fn;
-  }
-  void setAcceptStoppedFn(const std::function<void()>& fn) {
-    acceptStoppedFn_ = fn;
-  }
-
-  void connectionAccepted(
-      int fd, const folly::SocketAddress& clientAddr) noexcept override {
-    events_.emplace_back(fd, clientAddr);
-
-    if (connectionAcceptedFn_) {
-      connectionAcceptedFn_(fd, clientAddr);
-    }
-  }
-  void acceptError(const std::exception& ex) noexcept override {
-    events_.emplace_back(ex.what());
-
-    if (acceptErrorFn_) {
-      acceptErrorFn_(ex);
-    }
-  }
-  void acceptStarted() noexcept override {
-    events_.emplace_back(TYPE_START);
-
-    if (acceptStartedFn_) {
-      acceptStartedFn_();
-    }
-  }
-  void acceptStopped() noexcept override {
-    events_.emplace_back(TYPE_STOP);
-
-    if (acceptStoppedFn_) {
-      acceptStoppedFn_();
-    }
-  }
-
- private:
-  std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
-  std::function<void(const std::exception&)> acceptErrorFn_;
-  std::function<void()> acceptStartedFn_;
-  std::function<void()> acceptStoppedFn_;
-
-  std::deque<EventInfo> events_;
-};
-}
 
 /**
  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
@@ -1679,13 +1658,13 @@ TEST(AsyncSocketTest, ServerAcceptOptions) {
   // Add a callback to accept one connection then stop the loop
   TestAcceptCallback acceptCallback;
   acceptCallback.setConnectionAcceptedFn(
-    [&](int fd, const folly::SocketAddress& addr) {
-      serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
-    });
-  acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
-    serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+      });
+  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+    serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
   });
-  serverSocket->addAcceptCallback(&acceptCallback, nullptr);
+  serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
   serverSocket->startAccepting();
 
   // Connect to the server socket
@@ -1695,26 +1674,26 @@ TEST(AsyncSocketTest, ServerAcceptOptions) {
   eventBase.loop();
 
   // Verify that the server accepted a connection
-  CHECK_EQ(acceptCallback.getEvents()->size(), 3);
-  CHECK_EQ(acceptCallback.getEvents()->at(0).type,
+  ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
+  ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
                     TestAcceptCallback::TYPE_START);
-  CHECK_EQ(acceptCallback.getEvents()->at(1).type,
+  ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
                     TestAcceptCallback::TYPE_ACCEPT);
-  CHECK_EQ(acceptCallback.getEvents()->at(2).type,
+  ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
                     TestAcceptCallback::TYPE_STOP);
   int fd = acceptCallback.getEvents()->at(1).fd;
 
   // The accepted connection should already be in non-blocking mode
   int flags = fcntl(fd, F_GETFL, 0);
-  CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
+  ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
 
 #ifndef TCP_NOPUSH
   // The accepted connection should already have TCP_NODELAY set
   int value;
   socklen_t valueLength = sizeof(value);
   int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
-  CHECK_EQ(rc, 0);
-  CHECK_EQ(value, 1);
+  ASSERT_EQ(rc, 0);
+  ASSERT_EQ(value, 1);
 #endif
 }
 
@@ -1746,61 +1725,64 @@ TEST(AsyncSocketTest, RemoveAcceptCallback) {
   // Have callback 2 remove callback 3 and callback 5 the first time it is
   // called.
   int cb2Count = 0;
-  cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
-      std::shared_ptr<AsyncSocket> sock2(
+  cb1.setConnectionAcceptedFn([&](int /* fd */,
+                                  const folly::SocketAddress& /* addr */) {
+    std::shared_ptr<AsyncSocket> sock2(
         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
+  });
+  cb3.setConnectionAcceptedFn(
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
+  cb4.setConnectionAcceptedFn(
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        std::shared_ptr<AsyncSocket> sock3(
+            AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
+      });
+  cb5.setConnectionAcceptedFn(
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        std::shared_ptr<AsyncSocket> sock5(
+            AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
+
       });
-  cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
-    });
-  cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
-      std::shared_ptr<AsyncSocket> sock3(
-        AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
-    });
-  cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
-  std::shared_ptr<AsyncSocket> sock5(
-      AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
-
-    });
   cb2.setConnectionAcceptedFn(
-    [&](int fd, const folly::SocketAddress& addr) {
-      if (cb2Count == 0) {
-        serverSocket->removeAcceptCallback(&cb3, nullptr);
-        serverSocket->removeAcceptCallback(&cb5, nullptr);
-      }
-      ++cb2Count;
-    });
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        if (cb2Count == 0) {
+          serverSocket->removeAcceptCallback(&cb3, nullptr);
+          serverSocket->removeAcceptCallback(&cb5, nullptr);
+        }
+        ++cb2Count;
+      });
   // Have callback 6 remove callback 4 the first time it is called,
   // and destroy the server socket the second time it is called
   int cb6Count = 0;
   cb6.setConnectionAcceptedFn(
-    [&](int fd, const folly::SocketAddress& addr) {
-      if (cb6Count == 0) {
-        serverSocket->removeAcceptCallback(&cb4, nullptr);
-        std::shared_ptr<AsyncSocket> sock6(
-          AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
-        std::shared_ptr<AsyncSocket> sock7(
-          AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
-        std::shared_ptr<AsyncSocket> sock8(
-          AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
-
-      } else {
-        serverSocket.reset();
-      }
-      ++cb6Count;
-    });
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        if (cb6Count == 0) {
+          serverSocket->removeAcceptCallback(&cb4, nullptr);
+          std::shared_ptr<AsyncSocket> sock6(
+              AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
+          std::shared_ptr<AsyncSocket> sock7(
+              AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
+          std::shared_ptr<AsyncSocket> sock8(
+              AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
+
+        } else {
+          serverSocket.reset();
+        }
+        ++cb6Count;
+      });
   // Have callback 7 remove itself
   cb7.setConnectionAcceptedFn(
-    [&](int fd, const folly::SocketAddress& addr) {
-      serverSocket->removeAcceptCallback(&cb7, nullptr);
-    });
-
-  serverSocket->addAcceptCallback(&cb1, nullptr);
-  serverSocket->addAcceptCallback(&cb2, nullptr);
-  serverSocket->addAcceptCallback(&cb3, nullptr);
-  serverSocket->addAcceptCallback(&cb4, nullptr);
-  serverSocket->addAcceptCallback(&cb5, nullptr);
-  serverSocket->addAcceptCallback(&cb6, nullptr);
-  serverSocket->addAcceptCallback(&cb7, nullptr);
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        serverSocket->removeAcceptCallback(&cb7, nullptr);
+      });
+
+  serverSocket->addAcceptCallback(&cb1, &eventBase);
+  serverSocket->addAcceptCallback(&cb2, &eventBase);
+  serverSocket->addAcceptCallback(&cb3, &eventBase);
+  serverSocket->addAcceptCallback(&cb4, &eventBase);
+  serverSocket->addAcceptCallback(&cb5, &eventBase);
+  serverSocket->addAcceptCallback(&cb6, &eventBase);
+  serverSocket->addAcceptCallback(&cb7, &eventBase);
   serverSocket->startAccepting();
 
   // Make several connections to the socket
@@ -1821,62 +1803,62 @@ TEST(AsyncSocketTest, RemoveAcceptCallback) {
   // exactly round robin in the future, we can simplify the test checks here.
   // (We'll also need to update the termination code, since we expect cb6 to
   // get called twice to terminate the loop.)
-  CHECK_EQ(cb1.getEvents()->size(), 4);
-  CHECK_EQ(cb1.getEvents()->at(0).type,
+  ASSERT_EQ(cb1.getEvents()->size(), 4);
+  ASSERT_EQ(cb1.getEvents()->at(0).type,
                     TestAcceptCallback::TYPE_START);
-  CHECK_EQ(cb1.getEvents()->at(1).type,
+  ASSERT_EQ(cb1.getEvents()->at(1).type,
                     TestAcceptCallback::TYPE_ACCEPT);
-  CHECK_EQ(cb1.getEvents()->at(2).type,
+  ASSERT_EQ(cb1.getEvents()->at(2).type,
                     TestAcceptCallback::TYPE_ACCEPT);
-  CHECK_EQ(cb1.getEvents()->at(3).type,
+  ASSERT_EQ(cb1.getEvents()->at(3).type,
                     TestAcceptCallback::TYPE_STOP);
 
-  CHECK_EQ(cb2.getEvents()->size(), 4);
-  CHECK_EQ(cb2.getEvents()->at(0).type,
+  ASSERT_EQ(cb2.getEvents()->size(), 4);
+  ASSERT_EQ(cb2.getEvents()->at(0).type,
                     TestAcceptCallback::TYPE_START);
-  CHECK_EQ(cb2.getEvents()->at(1).type,
+  ASSERT_EQ(cb2.getEvents()->at(1).type,
                     TestAcceptCallback::TYPE_ACCEPT);
-  CHECK_EQ(cb2.getEvents()->at(2).type,
+  ASSERT_EQ(cb2.getEvents()->at(2).type,
                     TestAcceptCallback::TYPE_ACCEPT);
-  CHECK_EQ(cb2.getEvents()->at(3).type,
+  ASSERT_EQ(cb2.getEvents()->at(3).type,
                     TestAcceptCallback::TYPE_STOP);
 
-  CHECK_EQ(cb3.getEvents()->size(), 2);
-  CHECK_EQ(cb3.getEvents()->at(0).type,
+  ASSERT_EQ(cb3.getEvents()->size(), 2);
+  ASSERT_EQ(cb3.getEvents()->at(0).type,
                     TestAcceptCallback::TYPE_START);
-  CHECK_EQ(cb3.getEvents()->at(1).type,
+  ASSERT_EQ(cb3.getEvents()->at(1).type,
                     TestAcceptCallback::TYPE_STOP);
 
-  CHECK_EQ(cb4.getEvents()->size(), 3);
-  CHECK_EQ(cb4.getEvents()->at(0).type,
+  ASSERT_EQ(cb4.getEvents()->size(), 3);
+  ASSERT_EQ(cb4.getEvents()->at(0).type,
                     TestAcceptCallback::TYPE_START);
-  CHECK_EQ(cb4.getEvents()->at(1).type,
+  ASSERT_EQ(cb4.getEvents()->at(1).type,
                     TestAcceptCallback::TYPE_ACCEPT);
-  CHECK_EQ(cb4.getEvents()->at(2).type,
+  ASSERT_EQ(cb4.getEvents()->at(2).type,
                     TestAcceptCallback::TYPE_STOP);
 
-  CHECK_EQ(cb5.getEvents()->size(), 2);
-  CHECK_EQ(cb5.getEvents()->at(0).type,
+  ASSERT_EQ(cb5.getEvents()->size(), 2);
+  ASSERT_EQ(cb5.getEvents()->at(0).type,
                     TestAcceptCallback::TYPE_START);
-  CHECK_EQ(cb5.getEvents()->at(1).type,
+  ASSERT_EQ(cb5.getEvents()->at(1).type,
                     TestAcceptCallback::TYPE_STOP);
 
-  CHECK_EQ(cb6.getEvents()->size(), 4);
-  CHECK_EQ(cb6.getEvents()->at(0).type,
+  ASSERT_EQ(cb6.getEvents()->size(), 4);
+  ASSERT_EQ(cb6.getEvents()->at(0).type,
                     TestAcceptCallback::TYPE_START);
-  CHECK_EQ(cb6.getEvents()->at(1).type,
+  ASSERT_EQ(cb6.getEvents()->at(1).type,
                     TestAcceptCallback::TYPE_ACCEPT);
-  CHECK_EQ(cb6.getEvents()->at(2).type,
+  ASSERT_EQ(cb6.getEvents()->at(2).type,
                     TestAcceptCallback::TYPE_ACCEPT);
-  CHECK_EQ(cb6.getEvents()->at(3).type,
+  ASSERT_EQ(cb6.getEvents()->at(3).type,
                     TestAcceptCallback::TYPE_STOP);
 
-  CHECK_EQ(cb7.getEvents()->size(), 3);
-  CHECK_EQ(cb7.getEvents()->at(0).type,
+  ASSERT_EQ(cb7.getEvents()->size(), 3);
+  ASSERT_EQ(cb7.getEvents()->at(0).type,
                     TestAcceptCallback::TYPE_START);
-  CHECK_EQ(cb7.getEvents()->at(1).type,
+  ASSERT_EQ(cb7.getEvents()->at(1).type,
                     TestAcceptCallback::TYPE_ACCEPT);
-  CHECK_EQ(cb7.getEvents()->at(2).type,
+  ASSERT_EQ(cb7.getEvents()->at(2).type,
                     TestAcceptCallback::TYPE_STOP);
 }
 
@@ -1895,21 +1877,22 @@ TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
 
   // Add several accept callbacks
   TestAcceptCallback cb1;
-  auto thread_id = pthread_self();
+  auto thread_id = std::this_thread::get_id();
   cb1.setAcceptStartedFn([&](){
-    CHECK_NE(thread_id, pthread_self());
-    thread_id = pthread_self();
-  });
-  cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
-    CHECK_EQ(thread_id, pthread_self());
-    serverSocket->removeAcceptCallback(&cb1, nullptr);
+    CHECK_NE(thread_id, std::this_thread::get_id());
+    thread_id = std::this_thread::get_id();
   });
+  cb1.setConnectionAcceptedFn(
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        ASSERT_EQ(thread_id, std::this_thread::get_id());
+        serverSocket->removeAcceptCallback(&cb1, &eventBase);
+      });
   cb1.setAcceptStoppedFn([&](){
-    CHECK_EQ(thread_id, pthread_self());
+    ASSERT_EQ(thread_id, std::this_thread::get_id());
   });
 
   // Test having callbacks remove other callbacks before them on the list,
-  serverSocket->addAcceptCallback(&cb1, nullptr);
+  serverSocket->addAcceptCallback(&cb1, &eventBase);
   serverSocket->startAccepting();
 
   // Make several connections to the socket
@@ -1931,31 +1914,33 @@ TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
   // exactly round robin in the future, we can simplify the test checks here.
   // (We'll also need to update the termination code, since we expect cb6 to
   // get called twice to terminate the loop.)
-  CHECK_EQ(cb1.getEvents()->size(), 3);
-  CHECK_EQ(cb1.getEvents()->at(0).type,
+  ASSERT_EQ(cb1.getEvents()->size(), 3);
+  ASSERT_EQ(cb1.getEvents()->at(0).type,
                     TestAcceptCallback::TYPE_START);
-  CHECK_EQ(cb1.getEvents()->at(1).type,
+  ASSERT_EQ(cb1.getEvents()->at(1).type,
                     TestAcceptCallback::TYPE_ACCEPT);
-  CHECK_EQ(cb1.getEvents()->at(2).type,
+  ASSERT_EQ(cb1.getEvents()->at(2).type,
                     TestAcceptCallback::TYPE_STOP);
 
 }
 
 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
+  EventBase* eventBase = serverSocket->getEventBase();
+  CHECK(eventBase);
+
   // Add a callback to accept one connection then stop accepting
   TestAcceptCallback acceptCallback;
   acceptCallback.setConnectionAcceptedFn(
-    [&](int fd, const folly::SocketAddress& addr) {
-      serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
-    });
-  acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
-    serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
+      });
+  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+    serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
   });
-  serverSocket->addAcceptCallback(&acceptCallback, nullptr);
+  serverSocket->addAcceptCallback(&acceptCallback, eventBase);
   serverSocket->startAccepting();
 
   // Connect to the server socket
-  EventBase* eventBase = serverSocket->getEventBase();
   folly::SocketAddress serverAddress;
   serverSocket->getAddress(&serverAddress);
   AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
@@ -1964,12 +1949,12 @@ void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
   eventBase->loop();
 
   // Verify that the server accepted a connection
-  CHECK_EQ(acceptCallback.getEvents()->size(), 3);
-  CHECK_EQ(acceptCallback.getEvents()->at(0).type,
+  ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
+  ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
                     TestAcceptCallback::TYPE_START);
-  CHECK_EQ(acceptCallback.getEvents()->at(1).type,
+  ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
                     TestAcceptCallback::TYPE_ACCEPT);
-  CHECK_EQ(acceptCallback.getEvents()->at(2).type,
+  ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
                     TestAcceptCallback::TYPE_STOP);
 }
 
@@ -2009,9 +1994,11 @@ TEST(AsyncSocketTest, DestroyCloseTest) {
   acceptedSocket.reset();
 
   // Test that server socket was closed
-  ssize_t sz = read(fd, simpleBuf, simpleBufLength);
-  CHECK_EQ(sz, -1);
-  CHECK_EQ(errno, 9);
+  folly::test::msvcSuppressAbortOnInvalidParams([&] {
+    ssize_t sz = read(fd, simpleBuf, simpleBufLength);
+    ASSERT_EQ(sz, -1);
+    ASSERT_EQ(errno, EBADF);
+  });
   delete[] simpleBuf;
 }
 
@@ -2024,7 +2011,7 @@ TEST(AsyncSocketTest, ServerExistingSocket) {
   // Test creating a socket, and letting AsyncServerSocket bind and listen
   {
     // Manually create a socket
-    int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+    int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
     ASSERT_GE(fd, 0);
 
     // Create a server socket
@@ -2045,14 +2032,14 @@ TEST(AsyncSocketTest, ServerExistingSocket) {
   // then letting AsyncServerSocket listen
   {
     // Manually create a socket
-    int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+    int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
     ASSERT_GE(fd, 0);
     // bind
     struct sockaddr_in addr;
     addr.sin_family = AF_INET;
     addr.sin_port = 0;
     addr.sin_addr.s_addr = INADDR_ANY;
-    CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
+    ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
                              sizeof(addr)), 0);
     // Look up the address that we bound to
     folly::SocketAddress boundAddress;
@@ -2067,7 +2054,7 @@ TEST(AsyncSocketTest, ServerExistingSocket) {
     // Make sure AsyncServerSocket reports the same address that we bound to
     folly::SocketAddress serverSocketAddress;
     serverSocket->getAddress(&serverSocketAddress);
-    CHECK_EQ(boundAddress, serverSocketAddress);
+    ASSERT_EQ(boundAddress, serverSocketAddress);
 
     // Make sure the socket works
     serverSocketSanityTest(serverSocket.get());
@@ -2077,20 +2064,20 @@ TEST(AsyncSocketTest, ServerExistingSocket) {
   // then giving it to AsyncServerSocket
   {
     // Manually create a socket
-    int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+    int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
     ASSERT_GE(fd, 0);
     // bind
     struct sockaddr_in addr;
     addr.sin_family = AF_INET;
     addr.sin_port = 0;
     addr.sin_addr.s_addr = INADDR_ANY;
-    CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
+    ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
                              sizeof(addr)), 0);
     // Look up the address that we bound to
     folly::SocketAddress boundAddress;
     boundAddress.setFromLocalAddress(fd);
     // listen
-    CHECK_EQ(listen(fd, 16), 0);
+    ASSERT_EQ(listen(fd, 16), 0);
 
     // Create a server socket
     AsyncServerSocket::UniquePtr serverSocket(
@@ -2100,7 +2087,7 @@ TEST(AsyncSocketTest, ServerExistingSocket) {
     // Make sure AsyncServerSocket reports the same address that we bound to
     folly::SocketAddress serverSocketAddress;
     serverSocket->getAddress(&serverSocketAddress);
-    CHECK_EQ(boundAddress, serverSocketAddress);
+    ASSERT_EQ(boundAddress, serverSocketAddress);
 
     // Make sure the socket works
     serverSocketSanityTest(serverSocket.get());
@@ -2114,7 +2101,7 @@ TEST(AsyncSocketTest, UnixDomainSocketTest) {
   std::shared_ptr<AsyncServerSocket> serverSocket(
       AsyncServerSocket::newSocket(&eventBase));
   string path(1, 0);
-  path.append("/anonymous");
+  path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
   folly::SocketAddress serverAddress;
   serverAddress.setFromPath(path);
   serverSocket->bind(serverAddress);
@@ -2123,13 +2110,13 @@ TEST(AsyncSocketTest, UnixDomainSocketTest) {
   // Add a callback to accept one connection then stop the loop
   TestAcceptCallback acceptCallback;
   acceptCallback.setConnectionAcceptedFn(
-    [&](int fd, const folly::SocketAddress& addr) {
-      serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
-    });
-  acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
-    serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+      });
+  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+    serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
   });
-  serverSocket->addAcceptCallback(&acceptCallback, nullptr);
+  serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
   serverSocket->startAccepting();
 
   // Connect to the server socket
@@ -2139,18 +2126,18 @@ TEST(AsyncSocketTest, UnixDomainSocketTest) {
   eventBase.loop();
 
   // Verify that the server accepted a connection
-  CHECK_EQ(acceptCallback.getEvents()->size(), 3);
-  CHECK_EQ(acceptCallback.getEvents()->at(0).type,
+  ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
+  ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
                     TestAcceptCallback::TYPE_START);
-  CHECK_EQ(acceptCallback.getEvents()->at(1).type,
+  ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
                     TestAcceptCallback::TYPE_ACCEPT);
-  CHECK_EQ(acceptCallback.getEvents()->at(2).type,
+  ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
                     TestAcceptCallback::TYPE_STOP);
   int fd = acceptCallback.getEvents()->at(1).fd;
 
   // The accepted connection should already be in non-blocking mode
   int flags = fcntl(fd, F_GETFL, 0);
-  CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
+  ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
 }
 
 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
@@ -2169,13 +2156,13 @@ TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
   // Add a callback to accept one connection then stop the loop
   TestAcceptCallback acceptCallback;
   acceptCallback.setConnectionAcceptedFn(
-    [&](int fd, const folly::SocketAddress& addr) {
-      serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
-    });
-  acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+      });
+  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
   });
-  serverSocket->addAcceptCallback(&acceptCallback, nullptr);
+  serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
   serverSocket->startAccepting();
 
   // Connect to the server socket
@@ -2196,6 +2183,61 @@ TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
   ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
 }
 
+TEST(AsyncSocketTest, CallbackInPrimaryEventBase) {
+  EventBase eventBase;
+  TestConnectionEventCallback connectionEventCallback;
+
+  // Create a server socket
+  std::shared_ptr<AsyncServerSocket> serverSocket(
+      AsyncServerSocket::newSocket(&eventBase));
+  serverSocket->setConnectionEventCallback(&connectionEventCallback);
+  serverSocket->bind(0);
+  serverSocket->listen(16);
+  folly::SocketAddress serverAddress;
+  serverSocket->getAddress(&serverAddress);
+
+  // Add a callback to accept one connection then stop the loop
+  TestAcceptCallback acceptCallback;
+  acceptCallback.setConnectionAcceptedFn(
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+      });
+  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+    serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+  });
+  bool acceptStartedFlag{false};
+  acceptCallback.setAcceptStartedFn([&acceptStartedFlag](){
+    acceptStartedFlag = true;
+  });
+  bool acceptStoppedFlag{false};
+  acceptCallback.setAcceptStoppedFn([&acceptStoppedFlag](){
+    acceptStoppedFlag = true;
+  });
+  serverSocket->addAcceptCallback(&acceptCallback, nullptr);
+  serverSocket->startAccepting();
+
+  // Connect to the server socket
+  std::shared_ptr<AsyncSocket> socket(
+      AsyncSocket::newSocket(&eventBase, serverAddress));
+
+  eventBase.loop();
+
+  ASSERT_TRUE(acceptStartedFlag);
+  ASSERT_TRUE(acceptStoppedFlag);
+  // Validate the connection event counters
+  ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
+  ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
+  ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
+  ASSERT_EQ(
+      connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0);
+  ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0);
+  ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
+  ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
+  ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
+}
+
+
+
 /**
  * Test AsyncServerSocket::getNumPendingMessagesInQueue()
  */
@@ -2215,16 +2257,16 @@ TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
   // Add a callback to accept connections
   TestAcceptCallback acceptCallback;
   acceptCallback.setConnectionAcceptedFn(
-      [&](int fd, const folly::SocketAddress& addr) {
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
         count++;
-        CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
+        ASSERT_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
 
         if (count == 4) {
           // all messages are processed, remove accept callback
           serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
         }
       });
-  acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
+  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
     serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
   });
   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
@@ -2239,6 +2281,9 @@ TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
   eventBase.loop();
 }
 
+/**
+ * Test AsyncTransport::BufferCallback
+ */
 TEST(AsyncSocketTest, BufferTest) {
   TestServer server;
 
@@ -2248,18 +2293,19 @@ TEST(AsyncSocketTest, BufferTest) {
   ConnCallback ccb;
   socket->connect(&ccb, server.getAddress(), 30, option);
 
-
   char buf[100 * 1024];
   memset(buf, 'c', sizeof(buf));
   WriteCallback wcb;
   BufferCallback bcb;
-  socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE, &bcb);
+  socket->setBufferCallback(&bcb);
+  socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
 
   evb.loop();
-  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
-  CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
 
   ASSERT_TRUE(bcb.hasBuffered());
+  ASSERT_TRUE(bcb.hasBufferCleared());
 
   socket->close();
   server.verifyConnection(buf, sizeof(buf));
@@ -2267,3 +2313,1074 @@ TEST(AsyncSocketTest, BufferTest) {
   ASSERT_TRUE(socket->isClosedBySelf());
   ASSERT_FALSE(socket->isClosedByPeer());
 }
+
+TEST(AsyncSocketTest, BufferCallbackKill) {
+  TestServer server;
+  EventBase evb;
+  AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30, option);
+  evb.loopOnce();
+
+  char buf[100 * 1024];
+  memset(buf, 'c', sizeof(buf));
+  BufferCallback bcb;
+  socket->setBufferCallback(&bcb);
+  WriteCallback wcb;
+  wcb.successCallback = [&] {
+    ASSERT_TRUE(socket.unique());
+    socket.reset();
+  };
+
+  // This will trigger AsyncSocket::handleWrite,
+  // which calls WriteCallback::writeSuccess,
+  // which calls wcb.successCallback above,
+  // which tries to delete socket
+  // Then, the socket will also try to use this BufferCallback
+  // And that should crash us, if there is no DestructorGuard on the stack
+  socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
+
+  evb.loop();
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+}
+
+#if FOLLY_ALLOW_TFO
+TEST(AsyncSocketTest, ConnectTFO) {
+  // Start listening on a local port
+  TestServer server(true);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->enableTFO();
+  ConnCallback cb;
+  socket->connect(&cb, server.getAddress(), 30);
+
+  std::array<uint8_t, 128> buf;
+  memset(buf.data(), 'a', buf.size());
+
+  std::array<uint8_t, 3> readBuf;
+  auto sendBuf = IOBuf::copyBuffer("hey");
+
+  std::thread t([&] {
+    auto acceptedSocket = server.accept();
+    acceptedSocket->write(buf.data(), buf.size());
+    acceptedSocket->flush();
+    acceptedSocket->readAll(readBuf.data(), readBuf.size());
+    acceptedSocket->close();
+  });
+
+  evb.loop();
+
+  ASSERT_EQ(cb.state, STATE_SUCCEEDED);
+  EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+  EXPECT_TRUE(socket->getTFOAttempted());
+
+  // Should trigger the connect
+  WriteCallback write;
+  ReadCallback rcb;
+  socket->writeChain(&write, sendBuf->clone());
+  socket->setReadCB(&rcb);
+  evb.loop();
+
+  t.join();
+
+  EXPECT_EQ(STATE_SUCCEEDED, write.state);
+  EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+  EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+  ASSERT_EQ(1, rcb.buffers.size());
+  ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+  EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+  EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
+}
+
+TEST(AsyncSocketTest, ConnectTFOSupplyEarlyReadCB) {
+  // Start listening on a local port
+  TestServer server(true);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->enableTFO();
+  ConnCallback cb;
+  socket->connect(&cb, server.getAddress(), 30);
+  ReadCallback rcb;
+  socket->setReadCB(&rcb);
+
+  std::array<uint8_t, 128> buf;
+  memset(buf.data(), 'a', buf.size());
+
+  std::array<uint8_t, 3> readBuf;
+  auto sendBuf = IOBuf::copyBuffer("hey");
+
+  std::thread t([&] {
+    auto acceptedSocket = server.accept();
+    acceptedSocket->write(buf.data(), buf.size());
+    acceptedSocket->flush();
+    acceptedSocket->readAll(readBuf.data(), readBuf.size());
+    acceptedSocket->close();
+  });
+
+  evb.loop();
+
+  ASSERT_EQ(cb.state, STATE_SUCCEEDED);
+  EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+  EXPECT_TRUE(socket->getTFOAttempted());
+
+  // Should trigger the connect
+  WriteCallback write;
+  socket->writeChain(&write, sendBuf->clone());
+  evb.loop();
+
+  t.join();
+
+  EXPECT_EQ(STATE_SUCCEEDED, write.state);
+  EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+  EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+  ASSERT_EQ(1, rcb.buffers.size());
+  ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+  EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+  EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
+}
+
+/**
+ * Test connecting to a server that isn't listening
+ */
+TEST(AsyncSocketTest, ConnectRefusedImmediatelyTFO) {
+  EventBase evb;
+
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+  socket->enableTFO();
+
+  // Hopefully nothing is actually listening on this address
+  folly::SocketAddress addr("::1", 65535);
+  ConnCallback cb;
+  socket->connect(&cb, addr, 30);
+
+  evb.loop();
+
+  WriteCallback write1;
+  // Trigger the connect if TFO attempt is supported.
+  socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
+  WriteCallback write2;
+  socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
+  evb.loop();
+
+  if (!socket->getTFOFinished()) {
+    EXPECT_EQ(STATE_FAILED, write1.state);
+  } else {
+    EXPECT_EQ(STATE_SUCCEEDED, write1.state);
+    EXPECT_FALSE(socket->getTFOSucceded());
+  }
+
+  EXPECT_EQ(STATE_FAILED, write2.state);
+
+  EXPECT_EQ(STATE_SUCCEEDED, cb.state);
+  EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
+  EXPECT_TRUE(socket->getTFOAttempted());
+}
+
+/**
+ * Test calling closeNow() immediately after connecting.
+ */
+TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
+  TestServer server(true);
+
+  // connect()
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->enableTFO();
+
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30);
+
+  // write()
+  std::array<char, 128> buf;
+  memset(buf.data(), 'a', buf.size());
+
+  // close()
+  socket->closeNow();
+
+  // Loop, although there shouldn't be anything to do.
+  evb.loop();
+
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+/**
+ * Test calling close() immediately after connect()
+ */
+TEST(AsyncSocketTest, ConnectAndCloseTFO) {
+  TestServer server(true);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->enableTFO();
+
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30);
+
+  socket->close();
+
+  // Loop, although there shouldn't be anything to do.
+  evb.loop();
+
+  // Make sure the connection was aborted
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+class MockAsyncTFOSocket : public AsyncSocket {
+ public:
+  using UniquePtr = std::unique_ptr<MockAsyncTFOSocket, Destructor>;
+
+  explicit MockAsyncTFOSocket(EventBase* evb) : AsyncSocket(evb) {}
+
+  MOCK_METHOD3(tfoSendMsg, ssize_t(int fd, struct msghdr* msg, int msg_flags));
+};
+
+TEST(AsyncSocketTest, TestTFOUnsupported) {
+  TestServer server(true);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+  socket->enableTFO();
+
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30);
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+  ReadCallback rcb;
+  socket->setReadCB(&rcb);
+
+  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+      .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
+  WriteCallback write;
+  auto sendBuf = IOBuf::copyBuffer("hey");
+  socket->writeChain(&write, sendBuf->clone());
+  EXPECT_EQ(STATE_WAITING, write.state);
+
+  std::array<uint8_t, 128> buf;
+  memset(buf.data(), 'a', buf.size());
+
+  std::array<uint8_t, 3> readBuf;
+
+  std::thread t([&] {
+    std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+    acceptedSocket->write(buf.data(), buf.size());
+    acceptedSocket->flush();
+    acceptedSocket->readAll(readBuf.data(), readBuf.size());
+    acceptedSocket->close();
+  });
+
+  evb.loop();
+
+  t.join();
+  EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+  EXPECT_EQ(STATE_SUCCEEDED, write.state);
+
+  EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+  EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+  ASSERT_EQ(1, rcb.buffers.size());
+  ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+  EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+  EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
+}
+
+TEST(AsyncSocketTest, ConnectRefusedDelayedTFO) {
+  EventBase evb;
+
+  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+  socket->enableTFO();
+
+  // Hopefully this fails
+  folly::SocketAddress fakeAddr("127.0.0.1", 65535);
+  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+      .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
+        sockaddr_storage addr;
+        auto len = fakeAddr.getAddress(&addr);
+        int ret = connect(fd, (const struct sockaddr*)&addr, len);
+        LOG(INFO) << "connecting the socket " << fd << " : " << ret << " : "
+                  << errno;
+        return ret;
+      }));
+
+  // Hopefully nothing is actually listening on this address
+  ConnCallback cb;
+  socket->connect(&cb, fakeAddr, 30);
+
+  WriteCallback write1;
+  // Trigger the connect if TFO attempt is supported.
+  socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
+
+  if (socket->getTFOFinished()) {
+    // This test is useless now.
+    return;
+  }
+  WriteCallback write2;
+  // Trigger the connect if TFO attempt is supported.
+  socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
+  evb.loop();
+
+  EXPECT_EQ(STATE_FAILED, write1.state);
+  EXPECT_EQ(STATE_FAILED, write2.state);
+  EXPECT_FALSE(socket->getTFOSucceded());
+
+  EXPECT_EQ(STATE_SUCCEEDED, cb.state);
+  EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
+  EXPECT_TRUE(socket->getTFOAttempted());
+}
+
+TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
+  // Try connecting to server that won't respond.
+  //
+  // This depends somewhat on the network where this test is run.
+  // Hopefully this IP will be routable but unresponsive.
+  // (Alternatively, we could try listening on a local raw socket, but that
+  // normally requires root privileges.)
+  auto host = SocketAddressTestHelper::isIPv6Enabled()
+      ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
+      : SocketAddressTestHelper::isIPv4Enabled()
+          ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
+          : nullptr;
+  SocketAddress addr(host, 65535);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+  socket->enableTFO();
+
+  ConnCallback ccb;
+  // Set a very small timeout
+  socket->connect(&ccb, addr, 1);
+  EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+
+  ReadCallback rcb;
+  socket->setReadCB(&rcb);
+
+  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+      .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
+  WriteCallback write;
+  socket->writeChain(&write, IOBuf::copyBuffer("hey"));
+
+  evb.loop();
+
+  EXPECT_EQ(STATE_FAILED, write.state);
+}
+
+TEST(AsyncSocketTest, TestTFOFallbackToConnect) {
+  TestServer server(true);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+  socket->enableTFO();
+
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30);
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+  ReadCallback rcb;
+  socket->setReadCB(&rcb);
+
+  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+      .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
+        sockaddr_storage addr;
+        auto len = server.getAddress().getAddress(&addr);
+        return connect(fd, (const struct sockaddr*)&addr, len);
+      }));
+  WriteCallback write;
+  auto sendBuf = IOBuf::copyBuffer("hey");
+  socket->writeChain(&write, sendBuf->clone());
+  EXPECT_EQ(STATE_WAITING, write.state);
+
+  std::array<uint8_t, 128> buf;
+  memset(buf.data(), 'a', buf.size());
+
+  std::array<uint8_t, 3> readBuf;
+
+  std::thread t([&] {
+    std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+    acceptedSocket->write(buf.data(), buf.size());
+    acceptedSocket->flush();
+    acceptedSocket->readAll(readBuf.data(), readBuf.size());
+    acceptedSocket->close();
+  });
+
+  evb.loop();
+
+  t.join();
+  EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+
+  EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+  EXPECT_EQ(STATE_SUCCEEDED, write.state);
+
+  EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+  ASSERT_EQ(1, rcb.buffers.size());
+  ASSERT_EQ(buf.size(), rcb.buffers[0].length);
+  EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+}
+
+TEST(AsyncSocketTest, TestTFOFallbackTimeout) {
+  // Try connecting to server that won't respond.
+  //
+  // This depends somewhat on the network where this test is run.
+  // Hopefully this IP will be routable but unresponsive.
+  // (Alternatively, we could try listening on a local raw socket, but that
+  // normally requires root privileges.)
+  auto host = SocketAddressTestHelper::isIPv6Enabled()
+      ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
+      : SocketAddressTestHelper::isIPv4Enabled()
+          ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
+          : nullptr;
+  SocketAddress addr(host, 65535);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+  socket->enableTFO();
+
+  ConnCallback ccb;
+  // Set a very small timeout
+  socket->connect(&ccb, addr, 1);
+  EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+
+  ReadCallback rcb;
+  socket->setReadCB(&rcb);
+
+  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+      .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
+        sockaddr_storage addr2;
+        auto len = addr.getAddress(&addr2);
+        return connect(fd, (const struct sockaddr*)&addr2, len);
+      }));
+  WriteCallback write;
+  socket->writeChain(&write, IOBuf::copyBuffer("hey"));
+
+  evb.loop();
+
+  EXPECT_EQ(STATE_FAILED, write.state);
+}
+
+TEST(AsyncSocketTest, TestTFOEagain) {
+  TestServer server(true);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+  socket->enableTFO();
+
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30);
+
+  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+      .WillOnce(SetErrnoAndReturn(EAGAIN, -1));
+  WriteCallback write;
+  socket->writeChain(&write, IOBuf::copyBuffer("hey"));
+
+  evb.loop();
+
+  EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+  EXPECT_EQ(STATE_FAILED, write.state);
+}
+
+// Sending a large amount of data in the first write which will
+// definitely not fit into MSS.
+TEST(AsyncSocketTest, ConnectTFOWithBigData) {
+  // Start listening on a local port
+  TestServer server(true);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->enableTFO();
+  ConnCallback cb;
+  socket->connect(&cb, server.getAddress(), 30);
+
+  std::array<uint8_t, 128> buf;
+  memset(buf.data(), 'a', buf.size());
+
+  constexpr size_t len = 10 * 1024;
+  auto sendBuf = IOBuf::create(len);
+  sendBuf->append(len);
+  std::array<uint8_t, len> readBuf;
+
+  std::thread t([&] {
+    auto acceptedSocket = server.accept();
+    acceptedSocket->write(buf.data(), buf.size());
+    acceptedSocket->flush();
+    acceptedSocket->readAll(readBuf.data(), readBuf.size());
+    acceptedSocket->close();
+  });
+
+  evb.loop();
+
+  ASSERT_EQ(cb.state, STATE_SUCCEEDED);
+  EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+  EXPECT_TRUE(socket->getTFOAttempted());
+
+  // Should trigger the connect
+  WriteCallback write;
+  ReadCallback rcb;
+  socket->writeChain(&write, sendBuf->clone());
+  socket->setReadCB(&rcb);
+  evb.loop();
+
+  t.join();
+
+  EXPECT_EQ(STATE_SUCCEEDED, write.state);
+  EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+  EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+  ASSERT_EQ(1, rcb.buffers.size());
+  ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+  EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+  EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
+}
+
+#endif // FOLLY_ALLOW_TFO
+
+class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback {
+ public:
+  MOCK_METHOD1(evbAttached, void(AsyncSocket*));
+  MOCK_METHOD1(evbDetached, void(AsyncSocket*));
+};
+
+TEST(AsyncSocketTest, EvbCallbacks) {
+  auto cb = std::make_unique<MockEvbChangeCallback>();
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+  InSequence seq;
+  EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1);
+  EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1);
+
+  socket->setEvbChangedCallback(std::move(cb));
+  socket->detachEventBase();
+  socket->attachEventBase(&evb);
+}
+
+TEST(AsyncSocketTest, TestEvbDetachWtRegisteredIOHandlers) {
+  // Start listening on a local port
+  TestServer server;
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  ConnCallback cb;
+  socket->connect(&cb, server.getAddress(), 30);
+
+  evb.loop();
+
+  ASSERT_EQ(cb.state, STATE_SUCCEEDED);
+  EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+
+  // After the ioHandlers are registered, still should be able to detach/attach
+  ReadCallback rcb;
+  socket->setReadCB(&rcb);
+
+  auto cbEvbChg = std::make_unique<MockEvbChangeCallback>();
+  InSequence seq;
+  EXPECT_CALL(*cbEvbChg, evbDetached(socket.get())).Times(1);
+  EXPECT_CALL(*cbEvbChg, evbAttached(socket.get())).Times(1);
+
+  socket->setEvbChangedCallback(std::move(cbEvbChg));
+  EXPECT_TRUE(socket->isDetachable());
+  socket->detachEventBase();
+  socket->attachEventBase(&evb);
+
+  socket->close();
+}
+
+#ifdef FOLLY_HAVE_MSG_ERRQUEUE
+/* copied from include/uapi/linux/net_tstamp.h */
+/* SO_TIMESTAMPING gets an integer bit field comprised of these values */
+enum SOF_TIMESTAMPING {
+  SOF_TIMESTAMPING_SOFTWARE = (1 << 4),
+  SOF_TIMESTAMPING_OPT_ID = (1 << 7),
+  SOF_TIMESTAMPING_TX_SCHED = (1 << 8),
+  SOF_TIMESTAMPING_OPT_CMSG = (1 << 10),
+  SOF_TIMESTAMPING_OPT_TSONLY = (1 << 11),
+};
+
+class TestErrMessageCallback : public folly::AsyncSocket::ErrMessageCallback {
+ public:
+  TestErrMessageCallback()
+      : exception_(folly::AsyncSocketException::UNKNOWN, "none") {}
+
+  void errMessage(const cmsghdr& cmsg) noexcept override {
+    if (cmsg.cmsg_level == SOL_SOCKET && cmsg.cmsg_type == SCM_TIMESTAMPING) {
+      gotTimestamp_++;
+      checkResetCallback();
+    } else if (
+        (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
+        (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR)) {
+      gotByteSeq_++;
+      checkResetCallback();
+    }
+  }
+
+  void errMessageError(
+      const folly::AsyncSocketException& ex) noexcept override {
+    exception_ = ex;
+  }
+
+  void checkResetCallback() noexcept {
+    if (socket_ != nullptr && resetAfter_ != -1 &&
+        gotTimestamp_ + gotByteSeq_ == resetAfter_) {
+      socket_->setErrMessageCB(nullptr);
+    }
+  }
+
+  folly::AsyncSocket* socket_{nullptr};
+  folly::AsyncSocketException exception_;
+  int gotTimestamp_{0};
+  int gotByteSeq_{0};
+  int resetAfter_{-1};
+};
+
+TEST(AsyncSocketTest, ErrMessageCallback) {
+  TestServer server;
+
+  // connect()
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30);
+  LOG(INFO) << "Client socket fd=" << socket->getFd();
+
+  // Let the socket
+  evb.loop();
+
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+  // Set read callback to keep the socket subscribed for event
+  // notifications. Though we're no planning to read anything from
+  // this side of the connection.
+  ReadCallback rcb(1);
+  socket->setReadCB(&rcb);
+
+  // Set up timestamp callbacks
+  TestErrMessageCallback errMsgCB;
+  socket->setErrMessageCB(&errMsgCB);
+  ASSERT_EQ(socket->getErrMessageCallback(),
+            static_cast<folly::AsyncSocket::ErrMessageCallback*>(&errMsgCB));
+
+  errMsgCB.socket_ = socket.get();
+  errMsgCB.resetAfter_ = 3;
+
+  // Enable timestamp notifications
+  ASSERT_GT(socket->getFd(), 0);
+  int flags = SOF_TIMESTAMPING_OPT_ID
+              | SOF_TIMESTAMPING_OPT_TSONLY
+              | SOF_TIMESTAMPING_SOFTWARE
+              | SOF_TIMESTAMPING_OPT_CMSG
+              | SOF_TIMESTAMPING_TX_SCHED;
+  AsyncSocket::OptionKey tstampingOpt = {SOL_SOCKET, SO_TIMESTAMPING};
+  EXPECT_EQ(tstampingOpt.apply(socket->getFd(), flags), 0);
+
+  // write()
+  std::vector<uint8_t> wbuf(128, 'a');
+  WriteCallback wcb;
+  // Send two packets to get two EOM notifications
+  socket->write(&wcb, wbuf.data(), wbuf.size() / 2);
+  socket->write(&wcb, wbuf.data() + wbuf.size() / 2, wbuf.size() / 2);
+
+  // Accept the connection.
+  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+  LOG(INFO) << "Server socket fd=" << acceptedSocket->getSocketFD();
+
+  // Loop
+  evb.loopOnce();
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+
+  // Check that we can read the data that was written to the socket
+  std::vector<uint8_t> rbuf(1 + wbuf.size(), 0);
+  uint32_t bytesRead = acceptedSocket->read(rbuf.data(), rbuf.size());
+  ASSERT_TRUE(std::equal(wbuf.begin(), wbuf.end(), rbuf.begin()));
+  ASSERT_EQ(bytesRead, wbuf.size());
+
+  // Close both sockets
+  acceptedSocket->close();
+  socket->close();
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
+
+  // Check for the timestamp notifications.
+  ASSERT_EQ(errMsgCB.exception_.type_, folly::AsyncSocketException::UNKNOWN);
+  ASSERT_GT(errMsgCB.gotByteSeq_, 0);
+  ASSERT_GT(errMsgCB.gotTimestamp_, 0);
+  ASSERT_EQ(
+      errMsgCB.gotByteSeq_ + errMsgCB.gotTimestamp_, errMsgCB.resetAfter_);
+}
+#endif // FOLLY_HAVE_MSG_ERRQUEUE
+
+TEST(AsyncSocket, PreReceivedData) {
+  TestServer server;
+
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->connect(nullptr, server.getAddress(), 30);
+  evb.loop();
+
+  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+  auto acceptedSocket = server.acceptAsync(&evb);
+
+  ReadCallback peekCallback(2);
+  ReadCallback readCallback;
+  peekCallback.dataAvailableCallback = [&]() {
+    peekCallback.verifyData("he", 2);
+    acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("h"));
+    acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("e"));
+    acceptedSocket->setReadCB(nullptr);
+    acceptedSocket->setReadCB(&readCallback);
+  };
+  readCallback.dataAvailableCallback = [&]() {
+    if (readCallback.dataRead() == 5) {
+      readCallback.verifyData("hello", 5);
+      acceptedSocket->setReadCB(nullptr);
+    }
+  };
+
+  acceptedSocket->setReadCB(&peekCallback);
+
+  evb.loop();
+}
+
+TEST(AsyncSocket, PreReceivedDataOnly) {
+  TestServer server;
+
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->connect(nullptr, server.getAddress(), 30);
+  evb.loop();
+
+  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+  auto acceptedSocket = server.acceptAsync(&evb);
+
+  ReadCallback peekCallback;
+  ReadCallback readCallback;
+  peekCallback.dataAvailableCallback = [&]() {
+    peekCallback.verifyData("hello", 5);
+    acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
+    acceptedSocket->setReadCB(&readCallback);
+  };
+  readCallback.dataAvailableCallback = [&]() {
+    readCallback.verifyData("hello", 5);
+    acceptedSocket->setReadCB(nullptr);
+  };
+
+  acceptedSocket->setReadCB(&peekCallback);
+
+  evb.loop();
+}
+
+TEST(AsyncSocket, PreReceivedDataPartial) {
+  TestServer server;
+
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->connect(nullptr, server.getAddress(), 30);
+  evb.loop();
+
+  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+  auto acceptedSocket = server.acceptAsync(&evb);
+
+  ReadCallback peekCallback;
+  ReadCallback smallReadCallback(3);
+  ReadCallback normalReadCallback;
+  peekCallback.dataAvailableCallback = [&]() {
+    peekCallback.verifyData("hello", 5);
+    acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
+    acceptedSocket->setReadCB(&smallReadCallback);
+  };
+  smallReadCallback.dataAvailableCallback = [&]() {
+    smallReadCallback.verifyData("hel", 3);
+    acceptedSocket->setReadCB(&normalReadCallback);
+  };
+  normalReadCallback.dataAvailableCallback = [&]() {
+    normalReadCallback.verifyData("lo", 2);
+    acceptedSocket->setReadCB(nullptr);
+  };
+
+  acceptedSocket->setReadCB(&peekCallback);
+
+  evb.loop();
+}
+
+TEST(AsyncSocket, PreReceivedDataTakeover) {
+  TestServer server;
+
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->connect(nullptr, server.getAddress(), 30);
+  evb.loop();
+
+  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+  auto acceptedSocket =
+      AsyncSocket::UniquePtr(new AsyncSocket(&evb, server.acceptFD()));
+  AsyncSocket::UniquePtr takeoverSocket;
+
+  ReadCallback peekCallback(3);
+  ReadCallback readCallback;
+  peekCallback.dataAvailableCallback = [&]() {
+    peekCallback.verifyData("hel", 3);
+    acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
+    acceptedSocket->setReadCB(nullptr);
+    takeoverSocket =
+        AsyncSocket::UniquePtr(new AsyncSocket(std::move(acceptedSocket)));
+    takeoverSocket->setReadCB(&readCallback);
+  };
+  readCallback.dataAvailableCallback = [&]() {
+    readCallback.verifyData("hello", 5);
+    takeoverSocket->setReadCB(nullptr);
+  };
+
+  acceptedSocket->setReadCB(&peekCallback);
+
+  evb.loop();
+}
+
+#ifdef MSG_NOSIGNAL
+TEST(AsyncSocketTest, SendMessageFlags) {
+  TestServer server;
+  TestSendMsgParamsCallback sendMsgCB(
+      MSG_DONTWAIT|MSG_NOSIGNAL|MSG_MORE, 0, nullptr);
+
+  // connect()
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30);
+  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+
+  evb.loop();
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+  // Set SendMsgParamsCallback
+  socket->setSendMsgParamCB(&sendMsgCB);
+  ASSERT_EQ(socket->getSendMsgParamsCB(), &sendMsgCB);
+
+  // Write the first portion of data. This data is expected to be
+  // sent out immediately.
+  std::vector<uint8_t> buf(128, 'a');
+  WriteCallback wcb;
+  sendMsgCB.reset(MSG_DONTWAIT | MSG_NOSIGNAL);
+  socket->write(&wcb, buf.data(), buf.size());
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+  ASSERT_TRUE(sendMsgCB.queriedFlags_);
+  ASSERT_FALSE(sendMsgCB.queriedData_);
+
+  // Using different flags for the second write operation.
+  // MSG_MORE flag is expected to delay sending this
+  // data to the wire.
+  sendMsgCB.reset(MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE);
+  socket->write(&wcb, buf.data(), buf.size());
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+  ASSERT_TRUE(sendMsgCB.queriedFlags_);
+  ASSERT_FALSE(sendMsgCB.queriedData_);
+
+  // Make sure the accepted socket saw only the data from
+  // the first write request.
+  std::vector<uint8_t> readbuf(2 * buf.size());
+  uint32_t bytesRead = acceptedSocket->read(readbuf.data(), readbuf.size());
+  ASSERT_TRUE(std::equal(buf.begin(), buf.end(), readbuf.begin()));
+  ASSERT_EQ(bytesRead, buf.size());
+
+  // Make sure the server got a connection and received the data
+  acceptedSocket->close();
+  socket->close();
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+TEST(AsyncSocketTest, SendMessageAncillaryData) {
+  int fds[2];
+  EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, fds), 0);
+
+  // "Client" socket
+  int cfd = fds[0];
+  ASSERT_NE(cfd, -1);
+
+  // "Server" socket
+  int sfd = fds[1];
+  ASSERT_NE(sfd, -1);
+  SCOPE_EXIT { close(sfd); };
+
+  // Instantiate AsyncSocket object for the connected socket
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, cfd);
+
+  // Open a temporary file and write a magic string to it
+  // We'll transfer the file handle to test the message parameters
+  // callback logic.
+  TemporaryFile file(StringPiece(),
+                     fs::path(),
+                     TemporaryFile::Scope::UNLINK_IMMEDIATELY);
+  int tmpfd = file.fd();
+  ASSERT_NE(tmpfd, -1) << "Failed to open a temporary file";
+  std::string magicString("Magic string");
+  ASSERT_EQ(write(tmpfd, magicString.c_str(), magicString.length()),
+            magicString.length());
+
+  // Send message
+  union {
+    // Space large enough to hold an 'int'
+    char control[CMSG_SPACE(sizeof(int))];
+    struct cmsghdr cmh;
+  } s_u;
+  s_u.cmh.cmsg_len = CMSG_LEN(sizeof(int));
+  s_u.cmh.cmsg_level = SOL_SOCKET;
+  s_u.cmh.cmsg_type = SCM_RIGHTS;
+  memcpy(CMSG_DATA(&s_u.cmh), &tmpfd, sizeof(int));
+
+  // Set up the callback providing message parameters
+  TestSendMsgParamsCallback sendMsgCB(
+      MSG_DONTWAIT | MSG_NOSIGNAL, sizeof(s_u.control), s_u.control);
+  socket->setSendMsgParamCB(&sendMsgCB);
+
+  // We must transmit at least 1 byte of real data in order
+  // to send ancillary data
+  int s_data = 12345;
+  WriteCallback wcb;
+  socket->write(&wcb, &s_data, sizeof(s_data));
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+
+  // Receive the message
+  union {
+    // Space large enough to hold an 'int'
+    char control[CMSG_SPACE(sizeof(int))];
+    struct cmsghdr cmh;
+  } r_u;
+  struct msghdr msgh;
+  struct iovec iov;
+  int r_data = 0;
+
+  msgh.msg_control = r_u.control;
+  msgh.msg_controllen = sizeof(r_u.control);
+  msgh.msg_name = nullptr;
+  msgh.msg_namelen = 0;
+  msgh.msg_iov = &iov;
+  msgh.msg_iovlen = 1;
+  iov.iov_base = &r_data;
+  iov.iov_len = sizeof(r_data);
+
+  // Receive data
+  ASSERT_NE(recvmsg(sfd, &msgh, 0), -1) << "recvmsg failed: " << errno;
+
+  // Validate the received message
+  ASSERT_EQ(r_u.cmh.cmsg_len, CMSG_LEN(sizeof(int)));
+  ASSERT_EQ(r_u.cmh.cmsg_level, SOL_SOCKET);
+  ASSERT_EQ(r_u.cmh.cmsg_type, SCM_RIGHTS);
+  ASSERT_EQ(r_data, s_data);
+  int fd = 0;
+  memcpy(&fd, CMSG_DATA(&r_u.cmh), sizeof(int));
+  ASSERT_NE(fd, 0);
+  SCOPE_EXIT { close(fd); };
+
+  std::vector<uint8_t> transferredMagicString(magicString.length() + 1, 0);
+
+  // Reposition to the beginning of the file
+  ASSERT_EQ(0, lseek(fd, 0, SEEK_SET));
+
+  // Read the magic string back, and compare it with the original
+  ASSERT_EQ(
+      magicString.length(),
+      read(fd, transferredMagicString.data(), transferredMagicString.size()));
+  ASSERT_TRUE(std::equal(
+      magicString.begin(),
+      magicString.end(),
+      transferredMagicString.begin()));
+}
+
+TEST(AsyncSocketTest, UnixDomainSocketErrMessageCB) {
+  // In the latest stable kernel 4.14.3 as of 2017-12-04, Unix Domain
+  // Socket (UDS) does not support MSG_ERRQUEUE. So
+  // recvmsg(MSG_ERRQUEUE) will read application data from UDS which
+  // breaks application message flow.  To avoid this problem,
+  // AsyncSocket currently disables setErrMessageCB for UDS.
+  //
+  // This tests two things for UDS
+  // 1. setErrMessageCB fails
+  // 2. recvmsg(MSG_ERRQUEUE) reads application data
+  //
+  // Feel free to remove this test if UDS supports MSG_ERRQUEUE in the future.
+
+  int fd[2];
+  EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, fd), 0);
+  ASSERT_NE(fd[0], -1);
+  ASSERT_NE(fd[1], -1);
+  SCOPE_EXIT {
+    close(fd[1]);
+  };
+
+  EXPECT_EQ(fcntl(fd[0], F_SETFL, O_NONBLOCK), 0);
+  EXPECT_EQ(fcntl(fd[1], F_SETFL, O_NONBLOCK), 0);
+
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, fd[0]);
+
+  // setErrMessageCB should fail for unix domain socket
+  TestErrMessageCallback errMsgCB;
+  ASSERT_NE(&errMsgCB, nullptr);
+  socket->setErrMessageCB(&errMsgCB);
+  ASSERT_EQ(socket->getErrMessageCallback(), nullptr);
+
+#ifdef FOLLY_HAVE_MSG_ERRQUEUE
+  // The following verifies that MSG_ERRQUEUE does not work for UDS,
+  // and recvmsg reads application data
+  union {
+    // Space large enough to hold an 'int'
+    char control[CMSG_SPACE(sizeof(int))];
+    struct cmsghdr cmh;
+  } r_u;
+  struct msghdr msgh;
+  struct iovec iov;
+  int recv_data = 0;
+
+  msgh.msg_control = r_u.control;
+  msgh.msg_controllen = sizeof(r_u.control);
+  msgh.msg_name = nullptr;
+  msgh.msg_namelen = 0;
+  msgh.msg_iov = &iov;
+  msgh.msg_iovlen = 1;
+  iov.iov_base = &recv_data;
+  iov.iov_len = sizeof(recv_data);
+
+  // there is no data, recvmsg should fail
+  EXPECT_EQ(recvmsg(fd[1], &msgh, MSG_ERRQUEUE), -1);
+  EXPECT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK);
+
+  // provide some application data, error queue should be empty if it exists
+  // However, UDS reads application data as error message
+  int test_data = 123456;
+  WriteCallback wcb;
+  socket->write(&wcb, &test_data, sizeof(test_data));
+  recv_data = 0;
+  ASSERT_NE(recvmsg(fd[1], &msgh, MSG_ERRQUEUE), -1);
+  ASSERT_EQ(recv_data, test_data);
+#endif // FOLLY_HAVE_MSG_ERRQUEUE
+}
+#endif