Switch some assertions to std::thread rather than pthread
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
index 2f31124b6e959f06ebfe057320868f81642231b3..9876da6c98d665e9c5fdbd6e5928d83410ea3d5a 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2016 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include <folly/ExceptionWrapper.h>
+#include <folly/RWSpinLock.h>
+#include <folly/Random.h>
+#include <folly/SocketAddress.h>
 #include <folly/io/async/AsyncServerSocket.h>
 #include <folly/io/async/AsyncSocket.h>
 #include <folly/io/async/AsyncTimeout.h>
 #include <folly/io/async/EventBase.h>
-#include <folly/SocketAddress.h>
 
 #include <folly/io/IOBuf.h>
 #include <folly/io/async/test/AsyncSocketTest.h>
 #include <folly/io/async/test/Util.h>
+#include <folly/portability/Sockets.h>
+#include <folly/portability/Unistd.h>
+#include <folly/test/SocketAddressTestHelper.h>
 
-#include <gtest/gtest.h>
 #include <boost/scoped_array.hpp>
-#include <iostream>
-#include <unistd.h>
 #include <fcntl.h>
-#include <poll.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
 #include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/tcp.h>
+#include <iostream>
 #include <thread>
 
 using namespace boost;
@@ -46,6 +49,7 @@ using std::chrono::milliseconds;
 using boost::scoped_array;
 
 using namespace folly;
+using namespace testing;
 
 class DelayedWrite: public AsyncTimeout {
  public:
@@ -95,8 +99,32 @@ TEST(AsyncSocketTest, Connect) {
   evb.loop();
 
   CHECK_EQ(cb.state, STATE_SUCCEEDED);
+  EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+}
+
+enum class TFOState {
+  DISABLED,
+  ENABLED,
+};
+
+class AsyncSocketConnectTest : public ::testing::TestWithParam<TFOState> {};
+
+std::vector<TFOState> getTestingValues() {
+  std::vector<TFOState> vals;
+  vals.emplace_back(TFOState::DISABLED);
+
+#if FOLLY_ALLOW_TFO
+  vals.emplace_back(TFOState::ENABLED);
+#endif
+  return vals;
 }
 
+INSTANTIATE_TEST_CASE_P(
+    ConnectTests,
+    AsyncSocketConnectTest,
+    ::testing::ValuesIn(getTestingValues()));
+
 /**
  * Test connecting to a server that isn't listening
  */
@@ -112,8 +140,10 @@ TEST(AsyncSocketTest, ConnectRefused) {
 
   evb.loop();
 
-  CHECK_EQ(cb.state, STATE_FAILED);
-  CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
+  EXPECT_EQ(STATE_FAILED, cb.state);
+  EXPECT_EQ(AsyncSocketException::NOT_OPEN, cb.exception.getType());
+  EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
 }
 
 /**
@@ -130,7 +160,13 @@ TEST(AsyncSocketTest, ConnectTimeout) {
   // Hopefully this IP will be routable but unresponsive.
   // (Alternatively, we could try listening on a local raw socket, but that
   // normally requires root privileges.)
-  folly::SocketAddress addr("8.8.8.8", 65535);
+  auto host =
+      SocketAddressTestHelper::isIPv6Enabled() ?
+      SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
+      SocketAddressTestHelper::isIPv4Enabled() ?
+      SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
+      nullptr;
+  SocketAddress addr(host, 65535);
   ConnCallback cb;
   socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
 
@@ -145,18 +181,25 @@ TEST(AsyncSocketTest, ConnectTimeout) {
   folly::SocketAddress peer;
   socket->getPeerAddress(&peer);
   CHECK_EQ(peer, addr);
+  EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
 }
 
 /**
  * Test writing immediately after connecting, without waiting for connect
  * to finish.
  */
-TEST(AsyncSocketTest, ConnectAndWrite) {
+TEST_P(AsyncSocketConnectTest, ConnectAndWrite) {
   TestServer server;
 
   // connect()
   EventBase evb;
   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+  if (GetParam() == TFOState::ENABLED) {
+    socket->enableTFO();
+  }
+
   ConnCallback ccb;
   socket->connect(&ccb, server.getAddress(), 30);
 
@@ -176,17 +219,25 @@ TEST(AsyncSocketTest, ConnectAndWrite) {
   // Make sure the server got a connection and received the data
   socket->close();
   server.verifyConnection(buf, sizeof(buf));
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
+  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
 }
 
 /**
  * Test connecting using a nullptr connect callback.
  */
-TEST(AsyncSocketTest, ConnectNullCallback) {
+TEST_P(AsyncSocketConnectTest, ConnectNullCallback) {
   TestServer server;
 
   // connect()
   EventBase evb;
   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  if (GetParam() == TFOState::ENABLED) {
+    socket->enableTFO();
+  }
+
   socket->connect(nullptr, server.getAddress(), 30);
 
   // write some data, just so we have some way of verifing
@@ -203,6 +254,9 @@ TEST(AsyncSocketTest, ConnectNullCallback) {
   // Make sure the server got a connection and received the data
   socket->close();
   server.verifyConnection(buf, sizeof(buf));
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -211,12 +265,15 @@ TEST(AsyncSocketTest, ConnectNullCallback) {
  *
  * This exercises the STATE_CONNECTING_CLOSING code.
  */
-TEST(AsyncSocketTest, ConnectWriteAndClose) {
+TEST_P(AsyncSocketConnectTest, ConnectWriteAndClose) {
   TestServer server;
 
   // connect()
   EventBase evb;
   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  if (GetParam() == TFOState::ENABLED) {
+    socket->enableTFO();
+  }
   ConnCallback ccb;
   socket->connect(&ccb, server.getAddress(), 30);
 
@@ -238,6 +295,9 @@ TEST(AsyncSocketTest, ConnectWriteAndClose) {
 
   // Make sure the server got a connection and received the data
   server.verifyConnection(buf, sizeof(buf));
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -267,6 +327,9 @@ TEST(AsyncSocketTest, ConnectAndClose) {
 
   // Make sure the connection was aborted
   CHECK_EQ(ccb.state, STATE_FAILED);
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -298,6 +361,9 @@ TEST(AsyncSocketTest, ConnectAndCloseNow) {
 
   // Make sure the connection was aborted
   CHECK_EQ(ccb.state, STATE_FAILED);
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -337,23 +403,35 @@ TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
 
   CHECK_EQ(ccb.state, STATE_FAILED);
   CHECK_EQ(wcb.state, STATE_FAILED);
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
  * Test installing a read callback immediately, before connect() finishes.
  */
-TEST(AsyncSocketTest, ConnectAndRead) {
+TEST_P(AsyncSocketConnectTest, ConnectAndRead) {
   TestServer server;
 
   // connect()
   EventBase evb;
   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  if (GetParam() == TFOState::ENABLED) {
+    socket->enableTFO();
+  }
+
   ConnCallback ccb;
   socket->connect(&ccb, server.getAddress(), 30);
 
   ReadCallback rcb;
   socket->setReadCB(&rcb);
 
+  if (GetParam() == TFOState::ENABLED) {
+    // Trigger a connection
+    socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
+  }
+
   // Even though we haven't looped yet, we should be able to accept
   // the connection and send data to it.
   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
@@ -367,10 +445,12 @@ TEST(AsyncSocketTest, ConnectAndRead) {
   evb.loop();
 
   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
-  CHECK_EQ(rcb.state, STATE_SUCCEEDED);
   CHECK_EQ(rcb.buffers.size(), 1);
   CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
+
+  ASSERT_FALSE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -406,18 +486,24 @@ TEST(AsyncSocketTest, ConnectReadAndClose) {
   CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
   CHECK_EQ(rcb.buffers.size(), 0);
   CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
  * Test both writing and installing a read callback immediately,
  * before connect() finishes.
  */
-TEST(AsyncSocketTest, ConnectWriteAndRead) {
+TEST_P(AsyncSocketConnectTest, ConnectWriteAndRead) {
   TestServer server;
 
   // connect()
   EventBase evb;
   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  if (GetParam() == TFOState::ENABLED) {
+    socket->enableTFO();
+  }
   ConnCallback ccb;
   socket->connect(&ccb, server.getAddress(), 30);
 
@@ -464,6 +550,9 @@ TEST(AsyncSocketTest, ConnectWriteAndRead) {
   CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
   CHECK_EQ(bytesRead, 0);
+
+  ASSERT_FALSE(socket->isClosedBySelf());
+  ASSERT_TRUE(socket->isClosedByPeer());
 }
 
 /**
@@ -549,6 +638,9 @@ TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
+
+  ASSERT_FALSE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -634,6 +726,9 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
   // Fully close both sockets
   acceptedSocket->close();
   socket->close();
+
+  ASSERT_FALSE(socket->isClosedBySelf());
+  ASSERT_TRUE(socket->isClosedByPeer());
 }
 
 /**
@@ -722,6 +817,9 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
   // Fully close both sockets
   acceptedSocket->close();
   socket->close();
+
+  ASSERT_FALSE(socket->isClosedBySelf());
+  ASSERT_TRUE(socket->isClosedByPeer());
 }
 
 // Helper function for use in testConnectOptWrite()
@@ -895,6 +993,9 @@ TEST(AsyncSocketTest, WriteNullCallback) {
   // Make sure the server got a connection and received the data
   socket->close();
   server.verifyConnection(buf, sizeof(buf));
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -981,6 +1082,9 @@ TEST(AsyncSocketTest, WritePipeError) {
   CHECK_EQ(wcb.state, STATE_FAILED);
   CHECK_EQ(wcb.exception.getType(),
                     AsyncSocketException::INTERNAL_ERROR);
+
+  ASSERT_FALSE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -1001,7 +1105,7 @@ TEST(AsyncSocketTest, WriteIOBuf) {
   acceptedSocket->setReadCB(&rcb);
 
   // Write a simple buffer to the socket
-  size_t simpleBufLength = 5;
+  constexpr size_t simpleBufLength = 5;
   char simpleBuf[simpleBufLength];
   memset(simpleBuf, 'a', simpleBufLength);
   WriteCallback wcb;
@@ -1055,6 +1159,9 @@ TEST(AsyncSocketTest, WriteIOBuf) {
 
   acceptedSocket->close();
   socket->close();
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 TEST(AsyncSocketTest, WriteIOBufCorked) {
@@ -1094,7 +1201,7 @@ TEST(AsyncSocketTest, WriteIOBufCorked) {
   write2.scheduleTimeout(100);
   WriteCallback wcb3;
   DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
-  write3.scheduleTimeout(200);
+  write3.scheduleTimeout(140);
 
   evb.loop();
   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
@@ -1113,6 +1220,9 @@ TEST(AsyncSocketTest, WriteIOBufCorked) {
 
   acceptedSocket->close();
   socket->close();
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -1154,6 +1264,9 @@ TEST(AsyncSocketTest, ZeroLengthWrite) {
   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
   CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
   rcb.verifyData(buf.get(), len1 + len2);
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 TEST(AsyncSocketTest, ZeroLengthWritev) {
@@ -1176,7 +1289,7 @@ TEST(AsyncSocketTest, ZeroLengthWritev) {
   memset(buf.get(), 'b', len2);
 
   WriteCallback wcb;
-  size_t iovCount = 4;
+  constexpr size_t iovCount = 4;
   struct iovec iov[iovCount];
   iov[0].iov_base = buf.get();
   iov[0].iov_len = len1;
@@ -1193,6 +1306,9 @@ TEST(AsyncSocketTest, ZeroLengthWritev) {
 
   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
   rcb.verifyData(buf.get(), len1 + len2);
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 ///////////////////////////////////////////////////////////////////////////
@@ -1251,6 +1367,9 @@ TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
        ++it) {
     CHECK_EQ((*it)->state, STATE_FAILED);
   }
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 ///////////////////////////////////////////////////////////////////////////
@@ -1263,7 +1382,7 @@ class AsyncSocketImmediateRead : public folly::AsyncSocket {
   bool immediateReadCalled = false;
   explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
  protected:
-  virtual void checkForImmediateRead() noexcept override {
+  void checkForImmediateRead() noexcept override {
     immediateReadCalled = true;
     AsyncSocket::handleRead();
   }
@@ -1310,6 +1429,9 @@ TEST(AsyncSocket, ConnectReadImmediateRead) {
   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
   rcb.verifyData(expectedData, expectedDataSz);
   CHECK_EQ(socket.immediateReadCalled, true);
+
+  ASSERT_FALSE(socket.isClosedBySelf());
+  ASSERT_FALSE(socket.isClosedByPeer());
 }
 
 TEST(AsyncSocket, ConnectReadUninstallRead) {
@@ -1361,6 +1483,9 @@ TEST(AsyncSocket, ConnectReadUninstallRead) {
    * was reset in dataAvailableCallback */
   CHECK_EQ(rcb.dataRead(), maxBufferSz);
   CHECK_EQ(socket.immediateReadCalled, false);
+
+  ASSERT_FALSE(socket.isClosedBySelf());
+  ASSERT_FALSE(socket.isClosedByPeer());
 }
 
 // TODO:
@@ -1379,6 +1504,113 @@ TEST(AsyncSocket, ConnectReadUninstallRead) {
 ///////////////////////////////////////////////////////////////////////////
 // AsyncServerSocket tests
 ///////////////////////////////////////////////////////////////////////////
+namespace {
+/**
+ * Helper ConnectionEventCallback class for the test code.
+ * It maintains counters protected by a spin lock.
+ */
+class TestConnectionEventCallback :
+  public AsyncServerSocket::ConnectionEventCallback {
+ public:
+  virtual void onConnectionAccepted(
+      const int /* socket */,
+      const SocketAddress& /* addr */) noexcept override {
+    folly::RWSpinLock::WriteHolder holder(spinLock_);
+    connectionAccepted_++;
+  }
+
+  virtual void onConnectionAcceptError(const int /* err */) noexcept override {
+    folly::RWSpinLock::WriteHolder holder(spinLock_);
+    connectionAcceptedError_++;
+  }
+
+  virtual void onConnectionDropped(
+      const int /* socket */,
+      const SocketAddress& /* addr */) noexcept override {
+    folly::RWSpinLock::WriteHolder holder(spinLock_);
+    connectionDropped_++;
+  }
+
+  virtual void onConnectionEnqueuedForAcceptorCallback(
+      const int /* socket */,
+      const SocketAddress& /* addr */) noexcept override {
+    folly::RWSpinLock::WriteHolder holder(spinLock_);
+    connectionEnqueuedForAcceptCallback_++;
+  }
+
+  virtual void onConnectionDequeuedByAcceptorCallback(
+      const int /* socket */,
+      const SocketAddress& /* addr */) noexcept override {
+    folly::RWSpinLock::WriteHolder holder(spinLock_);
+    connectionDequeuedByAcceptCallback_++;
+  }
+
+  virtual void onBackoffStarted() noexcept override {
+    folly::RWSpinLock::WriteHolder holder(spinLock_);
+    backoffStarted_++;
+  }
+
+  virtual void onBackoffEnded() noexcept override {
+    folly::RWSpinLock::WriteHolder holder(spinLock_);
+    backoffEnded_++;
+  }
+
+  virtual void onBackoffError() noexcept override {
+    folly::RWSpinLock::WriteHolder holder(spinLock_);
+    backoffError_++;
+  }
+
+  unsigned int getConnectionAccepted() const {
+    folly::RWSpinLock::ReadHolder holder(spinLock_);
+    return connectionAccepted_;
+  }
+
+  unsigned int getConnectionAcceptedError() const {
+    folly::RWSpinLock::ReadHolder holder(spinLock_);
+    return connectionAcceptedError_;
+  }
+
+  unsigned int getConnectionDropped() const {
+    folly::RWSpinLock::ReadHolder holder(spinLock_);
+    return connectionDropped_;
+  }
+
+  unsigned int getConnectionEnqueuedForAcceptCallback() const {
+    folly::RWSpinLock::ReadHolder holder(spinLock_);
+    return connectionEnqueuedForAcceptCallback_;
+  }
+
+  unsigned int getConnectionDequeuedByAcceptCallback() const {
+    folly::RWSpinLock::ReadHolder holder(spinLock_);
+    return connectionDequeuedByAcceptCallback_;
+  }
+
+  unsigned int getBackoffStarted() const {
+    folly::RWSpinLock::ReadHolder holder(spinLock_);
+    return backoffStarted_;
+  }
+
+  unsigned int getBackoffEnded() const {
+    folly::RWSpinLock::ReadHolder holder(spinLock_);
+    return backoffEnded_;
+  }
+
+  unsigned int getBackoffError() const {
+    folly::RWSpinLock::ReadHolder holder(spinLock_);
+    return backoffError_;
+  }
+
+ private:
+  mutable folly::RWSpinLock spinLock_;
+  unsigned int connectionAccepted_{0};
+  unsigned int connectionAcceptedError_{0};
+  unsigned int connectionDropped_{0};
+  unsigned int connectionEnqueuedForAcceptCallback_{0};
+  unsigned int connectionDequeuedByAcceptCallback_{0};
+  unsigned int backoffStarted_{0};
+  unsigned int backoffEnded_{0};
+  unsigned int backoffError_{0};
+};
 
 /**
  * Helper AcceptCallback class for the test code
@@ -1441,30 +1673,30 @@ class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
     acceptStoppedFn_ = fn;
   }
 
-  void connectionAccepted(int fd, const folly::SocketAddress& clientAddr)
-      noexcept {
-    events_.push_back(EventInfo(fd, clientAddr));
+  void connectionAccepted(
+      int fd, const folly::SocketAddress& clientAddr) noexcept override {
+    events_.emplace_back(fd, clientAddr);
 
     if (connectionAcceptedFn_) {
       connectionAcceptedFn_(fd, clientAddr);
     }
   }
-  void acceptError(const std::exception& ex) noexcept {
-    events_.push_back(EventInfo(ex.what()));
+  void acceptError(const std::exception& ex) noexcept override {
+    events_.emplace_back(ex.what());
 
     if (acceptErrorFn_) {
       acceptErrorFn_(ex);
     }
   }
-  void acceptStarted() noexcept {
-    events_.push_back(EventInfo(TYPE_START));
+  void acceptStarted() noexcept override {
+    events_.emplace_back(TYPE_START);
 
     if (acceptStartedFn_) {
       acceptStartedFn_();
     }
   }
-  void acceptStopped() noexcept {
-    events_.push_back(EventInfo(TYPE_STOP));
+  void acceptStopped() noexcept override {
+    events_.emplace_back(TYPE_STOP);
 
     if (acceptStoppedFn_) {
       acceptStoppedFn_();
@@ -1479,6 +1711,7 @@ class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
 
   std::deque<EventInfo> events_;
 };
+}
 
 /**
  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
@@ -1497,10 +1730,10 @@ TEST(AsyncSocketTest, ServerAcceptOptions) {
   // Add a callback to accept one connection then stop the loop
   TestAcceptCallback acceptCallback;
   acceptCallback.setConnectionAcceptedFn(
-    [&](int fd, const folly::SocketAddress& addr) {
-      serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
-    });
-  acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+      });
+  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
   });
   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
@@ -1564,53 +1797,56 @@ TEST(AsyncSocketTest, RemoveAcceptCallback) {
   // Have callback 2 remove callback 3 and callback 5 the first time it is
   // called.
   int cb2Count = 0;
-  cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
-      std::shared_ptr<AsyncSocket> sock2(
+  cb1.setConnectionAcceptedFn([&](int /* fd */,
+                                  const folly::SocketAddress& /* addr */) {
+    std::shared_ptr<AsyncSocket> sock2(
         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
+  });
+  cb3.setConnectionAcceptedFn(
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
+  cb4.setConnectionAcceptedFn(
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        std::shared_ptr<AsyncSocket> sock3(
+            AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
+      });
+  cb5.setConnectionAcceptedFn(
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        std::shared_ptr<AsyncSocket> sock5(
+            AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
+
       });
-  cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
-    });
-  cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
-      std::shared_ptr<AsyncSocket> sock3(
-        AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
-    });
-  cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
-  std::shared_ptr<AsyncSocket> sock5(
-      AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
-
-    });
   cb2.setConnectionAcceptedFn(
-    [&](int fd, const folly::SocketAddress& addr) {
-      if (cb2Count == 0) {
-        serverSocket->removeAcceptCallback(&cb3, nullptr);
-        serverSocket->removeAcceptCallback(&cb5, nullptr);
-      }
-      ++cb2Count;
-    });
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        if (cb2Count == 0) {
+          serverSocket->removeAcceptCallback(&cb3, nullptr);
+          serverSocket->removeAcceptCallback(&cb5, nullptr);
+        }
+        ++cb2Count;
+      });
   // Have callback 6 remove callback 4 the first time it is called,
   // and destroy the server socket the second time it is called
   int cb6Count = 0;
   cb6.setConnectionAcceptedFn(
-    [&](int fd, const folly::SocketAddress& addr) {
-      if (cb6Count == 0) {
-        serverSocket->removeAcceptCallback(&cb4, nullptr);
-        std::shared_ptr<AsyncSocket> sock6(
-          AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
-        std::shared_ptr<AsyncSocket> sock7(
-          AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
-        std::shared_ptr<AsyncSocket> sock8(
-          AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
-
-      } else {
-        serverSocket.reset();
-      }
-      ++cb6Count;
-    });
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        if (cb6Count == 0) {
+          serverSocket->removeAcceptCallback(&cb4, nullptr);
+          std::shared_ptr<AsyncSocket> sock6(
+              AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
+          std::shared_ptr<AsyncSocket> sock7(
+              AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
+          std::shared_ptr<AsyncSocket> sock8(
+              AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
+
+        } else {
+          serverSocket.reset();
+        }
+        ++cb6Count;
+      });
   // Have callback 7 remove itself
   cb7.setConnectionAcceptedFn(
-    [&](int fd, const folly::SocketAddress& addr) {
-      serverSocket->removeAcceptCallback(&cb7, nullptr);
-    });
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        serverSocket->removeAcceptCallback(&cb7, nullptr);
+      });
 
   serverSocket->addAcceptCallback(&cb1, nullptr);
   serverSocket->addAcceptCallback(&cb2, nullptr);
@@ -1713,17 +1949,18 @@ TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
 
   // Add several accept callbacks
   TestAcceptCallback cb1;
-  auto thread_id = pthread_self();
+  auto thread_id = std::this_thread::get_id();
   cb1.setAcceptStartedFn([&](){
-    CHECK_NE(thread_id, pthread_self());
-    thread_id = pthread_self();
-  });
-  cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
-    CHECK_EQ(thread_id, pthread_self());
-    serverSocket->removeAcceptCallback(&cb1, nullptr);
+    CHECK_NE(thread_id, std::this_thread::get_id());
+    thread_id = std::this_thread::get_id();
   });
+  cb1.setConnectionAcceptedFn(
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        CHECK_EQ(thread_id, std::this_thread::get_id());
+        serverSocket->removeAcceptCallback(&cb1, nullptr);
+      });
   cb1.setAcceptStoppedFn([&](){
-    CHECK_EQ(thread_id, pthread_self());
+    CHECK_EQ(thread_id, std::this_thread::get_id());
   });
 
   // Test having callbacks remove other callbacks before them on the list,
@@ -1763,10 +2000,10 @@ void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
   // Add a callback to accept one connection then stop accepting
   TestAcceptCallback acceptCallback;
   acceptCallback.setConnectionAcceptedFn(
-    [&](int fd, const folly::SocketAddress& addr) {
-      serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
-    });
-  acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+      });
+  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
   });
   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
@@ -1932,7 +2169,7 @@ TEST(AsyncSocketTest, UnixDomainSocketTest) {
   std::shared_ptr<AsyncServerSocket> serverSocket(
       AsyncServerSocket::newSocket(&eventBase));
   string path(1, 0);
-  path.append("/anonymous");
+  path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
   folly::SocketAddress serverAddress;
   serverAddress.setFromPath(path);
   serverSocket->bind(serverAddress);
@@ -1941,10 +2178,10 @@ TEST(AsyncSocketTest, UnixDomainSocketTest) {
   // Add a callback to accept one connection then stop the loop
   TestAcceptCallback acceptCallback;
   acceptCallback.setConnectionAcceptedFn(
-    [&](int fd, const folly::SocketAddress& addr) {
-      serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
-    });
-  acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+      });
+  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
   });
   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
@@ -1970,3 +2207,569 @@ TEST(AsyncSocketTest, UnixDomainSocketTest) {
   int flags = fcntl(fd, F_GETFL, 0);
   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
 }
+
+TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
+  EventBase eventBase;
+  TestConnectionEventCallback connectionEventCallback;
+
+  // Create a server socket
+  std::shared_ptr<AsyncServerSocket> serverSocket(
+      AsyncServerSocket::newSocket(&eventBase));
+  serverSocket->setConnectionEventCallback(&connectionEventCallback);
+  serverSocket->bind(0);
+  serverSocket->listen(16);
+  folly::SocketAddress serverAddress;
+  serverSocket->getAddress(&serverAddress);
+
+  // Add a callback to accept one connection then stop the loop
+  TestAcceptCallback acceptCallback;
+  acceptCallback.setConnectionAcceptedFn(
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+      });
+  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+    serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+  });
+  serverSocket->addAcceptCallback(&acceptCallback, nullptr);
+  serverSocket->startAccepting();
+
+  // Connect to the server socket
+  std::shared_ptr<AsyncSocket> socket(
+      AsyncSocket::newSocket(&eventBase, serverAddress));
+
+  eventBase.loop();
+
+  // Validate the connection event counters
+  ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
+  ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
+  ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
+  ASSERT_EQ(
+      connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
+  ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
+  ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
+  ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
+  ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
+}
+
+/**
+ * Test AsyncServerSocket::getNumPendingMessagesInQueue()
+ */
+TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
+  EventBase eventBase;
+
+  // Counter of how many connections have been accepted
+  int count = 0;
+
+  // Create a server socket
+  auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
+  serverSocket->bind(0);
+  serverSocket->listen(16);
+  folly::SocketAddress serverAddress;
+  serverSocket->getAddress(&serverAddress);
+
+  // Add a callback to accept connections
+  TestAcceptCallback acceptCallback;
+  acceptCallback.setConnectionAcceptedFn(
+      [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+        count++;
+        CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
+
+        if (count == 4) {
+          // all messages are processed, remove accept callback
+          serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+        }
+      });
+  acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+    serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+  });
+  serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
+  serverSocket->startAccepting();
+
+  // Connect to the server socket, 4 clients, there are 4 connections
+  auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
+  auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
+  auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
+  auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
+
+  eventBase.loop();
+}
+
+/**
+ * Test AsyncTransport::BufferCallback
+ */
+TEST(AsyncSocketTest, BufferTest) {
+  TestServer server;
+
+  EventBase evb;
+  AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30, option);
+
+  char buf[100 * 1024];
+  memset(buf, 'c', sizeof(buf));
+  WriteCallback wcb;
+  BufferCallback bcb;
+  socket->setBufferCallback(&bcb);
+  socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
+
+  evb.loop();
+  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+  CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+
+  ASSERT_TRUE(bcb.hasBuffered());
+  ASSERT_TRUE(bcb.hasBufferCleared());
+
+  socket->close();
+  server.verifyConnection(buf, sizeof(buf));
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+TEST(AsyncSocketTest, BufferCallbackKill) {
+  TestServer server;
+  EventBase evb;
+  AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30, option);
+  evb.loopOnce();
+
+  char buf[100 * 1024];
+  memset(buf, 'c', sizeof(buf));
+  BufferCallback bcb;
+  socket->setBufferCallback(&bcb);
+  WriteCallback wcb;
+  wcb.successCallback = [&] {
+    ASSERT_TRUE(socket.unique());
+    socket.reset();
+  };
+
+  // This will trigger AsyncSocket::handleWrite,
+  // which calls WriteCallback::writeSuccess,
+  // which calls wcb.successCallback above,
+  // which tries to delete socket
+  // Then, the socket will also try to use this BufferCallback
+  // And that should crash us, if there is no DestructorGuard on the stack
+  socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
+
+  evb.loop();
+  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+}
+
+#if FOLLY_ALLOW_TFO
+TEST(AsyncSocketTest, ConnectTFO) {
+  // Start listening on a local port
+  TestServer server(true);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->enableTFO();
+  ConnCallback cb;
+  socket->connect(&cb, server.getAddress(), 30);
+
+  std::array<uint8_t, 128> buf;
+  memset(buf.data(), 'a', buf.size());
+
+  std::array<uint8_t, 3> readBuf;
+  auto sendBuf = IOBuf::copyBuffer("hey");
+
+  std::thread t([&] {
+    auto acceptedSocket = server.accept();
+    acceptedSocket->write(buf.data(), buf.size());
+    acceptedSocket->flush();
+    acceptedSocket->readAll(readBuf.data(), readBuf.size());
+    acceptedSocket->close();
+  });
+
+  evb.loop();
+
+  CHECK_EQ(cb.state, STATE_SUCCEEDED);
+  EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+  EXPECT_TRUE(socket->getTFOAttempted());
+
+  // Should trigger the connect
+  WriteCallback write;
+  ReadCallback rcb;
+  socket->writeChain(&write, sendBuf->clone());
+  socket->setReadCB(&rcb);
+  evb.loop();
+
+  t.join();
+
+  EXPECT_EQ(STATE_SUCCEEDED, write.state);
+  EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+  EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+  ASSERT_EQ(1, rcb.buffers.size());
+  ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+  EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+  EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
+}
+
+/**
+ * Test connecting to a server that isn't listening
+ */
+TEST(AsyncSocketTest, ConnectRefusedTFO) {
+  EventBase evb;
+
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+  socket->enableTFO();
+
+  // Hopefully nothing is actually listening on this address
+  folly::SocketAddress addr("::1", 65535);
+  ConnCallback cb;
+  socket->connect(&cb, addr, 30);
+
+  evb.loop();
+
+  WriteCallback write1;
+  // Trigger the connect if TFO attempt is supported.
+  socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
+  evb.loop();
+  WriteCallback write2;
+  socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
+  evb.loop();
+
+  if (!socket->getTFOFinished()) {
+    EXPECT_EQ(STATE_FAILED, write1.state);
+    EXPECT_FALSE(socket->getTFOFinished());
+  } else {
+    EXPECT_EQ(STATE_SUCCEEDED, write1.state);
+    EXPECT_TRUE(socket->getTFOFinished());
+  }
+
+  EXPECT_EQ(STATE_FAILED, write2.state);
+
+  EXPECT_EQ(STATE_SUCCEEDED, cb.state);
+  EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
+  EXPECT_TRUE(socket->getTFOAttempted());
+}
+
+/**
+ * Test calling closeNow() immediately after connecting.
+ */
+TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
+  TestServer server(true);
+
+  // connect()
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->enableTFO();
+
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30);
+
+  // write()
+  std::array<char, 128> buf;
+  memset(buf.data(), 'a', buf.size());
+
+  // close()
+  socket->closeNow();
+
+  // Loop, although there shouldn't be anything to do.
+  evb.loop();
+
+  EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
+  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+/**
+ * Test calling close() immediately after connect()
+ */
+TEST(AsyncSocketTest, ConnectAndCloseTFO) {
+  TestServer server(true);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->enableTFO();
+
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30);
+
+  socket->close();
+
+  // Loop, although there shouldn't be anything to do.
+  evb.loop();
+
+  // Make sure the connection was aborted
+  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+  EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+class MockAsyncTFOSocket : public AsyncSocket {
+ public:
+  using UniquePtr = std::unique_ptr<MockAsyncTFOSocket, Destructor>;
+
+  explicit MockAsyncTFOSocket(EventBase* evb) : AsyncSocket(evb) {}
+
+  MOCK_METHOD3(tfoSendMsg, ssize_t(int fd, struct msghdr* msg, int msg_flags));
+};
+
+TEST(AsyncSocketTest, TestTFOUnsupported) {
+  TestServer server(true);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+  socket->enableTFO();
+
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30);
+  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+
+  ReadCallback rcb;
+  socket->setReadCB(&rcb);
+
+  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+      .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
+  WriteCallback write;
+  auto sendBuf = IOBuf::copyBuffer("hey");
+  socket->writeChain(&write, sendBuf->clone());
+  EXPECT_EQ(STATE_WAITING, write.state);
+
+  std::array<uint8_t, 128> buf;
+  memset(buf.data(), 'a', buf.size());
+
+  std::array<uint8_t, 3> readBuf;
+
+  std::thread t([&] {
+    std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+    acceptedSocket->write(buf.data(), buf.size());
+    acceptedSocket->flush();
+    acceptedSocket->readAll(readBuf.data(), readBuf.size());
+    acceptedSocket->close();
+  });
+
+  evb.loop();
+
+  t.join();
+  EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+  EXPECT_EQ(STATE_SUCCEEDED, write.state);
+
+  EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+  EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+  ASSERT_EQ(1, rcb.buffers.size());
+  ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+  EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+}
+
+TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
+  // Try connecting to server that won't respond.
+  //
+  // This depends somewhat on the network where this test is run.
+  // Hopefully this IP will be routable but unresponsive.
+  // (Alternatively, we could try listening on a local raw socket, but that
+  // normally requires root privileges.)
+  auto host = SocketAddressTestHelper::isIPv6Enabled()
+      ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
+      : SocketAddressTestHelper::isIPv4Enabled()
+          ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
+          : nullptr;
+  SocketAddress addr(host, 65535);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+  socket->enableTFO();
+
+  ConnCallback ccb;
+  // Set a very small timeout
+  socket->connect(&ccb, addr, 1);
+  EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+
+  ReadCallback rcb;
+  socket->setReadCB(&rcb);
+
+  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+      .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
+  WriteCallback write;
+  socket->writeChain(&write, IOBuf::copyBuffer("hey"));
+
+  evb.loop();
+
+  EXPECT_EQ(STATE_FAILED, write.state);
+}
+
+TEST(AsyncSocketTest, TestTFOFallbackToConnect) {
+  TestServer server(true);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+  socket->enableTFO();
+
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30);
+  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+
+  ReadCallback rcb;
+  socket->setReadCB(&rcb);
+
+  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+      .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
+        sockaddr_storage addr;
+        auto len = server.getAddress().getAddress(&addr);
+        return connect(fd, (const struct sockaddr*)&addr, len);
+      }));
+  WriteCallback write;
+  auto sendBuf = IOBuf::copyBuffer("hey");
+  socket->writeChain(&write, sendBuf->clone());
+  EXPECT_EQ(STATE_WAITING, write.state);
+
+  std::array<uint8_t, 128> buf;
+  memset(buf.data(), 'a', buf.size());
+
+  std::array<uint8_t, 3> readBuf;
+
+  std::thread t([&] {
+    std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+    acceptedSocket->write(buf.data(), buf.size());
+    acceptedSocket->flush();
+    acceptedSocket->readAll(readBuf.data(), readBuf.size());
+    acceptedSocket->close();
+  });
+
+  evb.loop();
+
+  t.join();
+  EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+
+  EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+  EXPECT_EQ(STATE_SUCCEEDED, write.state);
+
+  EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+  ASSERT_EQ(1, rcb.buffers.size());
+  ASSERT_EQ(buf.size(), rcb.buffers[0].length);
+  EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+}
+
+TEST(AsyncSocketTest, TestTFOFallbackTimeout) {
+  // Try connecting to server that won't respond.
+  //
+  // This depends somewhat on the network where this test is run.
+  // Hopefully this IP will be routable but unresponsive.
+  // (Alternatively, we could try listening on a local raw socket, but that
+  // normally requires root privileges.)
+  auto host = SocketAddressTestHelper::isIPv6Enabled()
+      ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
+      : SocketAddressTestHelper::isIPv4Enabled()
+          ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
+          : nullptr;
+  SocketAddress addr(host, 65535);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+  socket->enableTFO();
+
+  ConnCallback ccb;
+  // Set a very small timeout
+  socket->connect(&ccb, addr, 1);
+  EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+
+  ReadCallback rcb;
+  socket->setReadCB(&rcb);
+
+  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+      .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
+        sockaddr_storage addr2;
+        auto len = addr.getAddress(&addr2);
+        return connect(fd, (const struct sockaddr*)&addr2, len);
+      }));
+  WriteCallback write;
+  socket->writeChain(&write, IOBuf::copyBuffer("hey"));
+
+  evb.loop();
+
+  EXPECT_EQ(STATE_FAILED, write.state);
+}
+
+TEST(AsyncSocketTest, TestTFOEagain) {
+  TestServer server(true);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+  socket->enableTFO();
+
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30);
+
+  EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+      .WillOnce(SetErrnoAndReturn(EAGAIN, -1));
+  WriteCallback write;
+  socket->writeChain(&write, IOBuf::copyBuffer("hey"));
+
+  evb.loop();
+
+  EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+  EXPECT_EQ(STATE_FAILED, write.state);
+}
+
+// Sending a large amount of data in the first write which will
+// definitely not fit into MSS.
+TEST(AsyncSocketTest, ConnectTFOWithBigData) {
+  // Start listening on a local port
+  TestServer server(true);
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->enableTFO();
+  ConnCallback cb;
+  socket->connect(&cb, server.getAddress(), 30);
+
+  std::array<uint8_t, 128> buf;
+  memset(buf.data(), 'a', buf.size());
+
+  constexpr size_t len = 10 * 1024;
+  auto sendBuf = IOBuf::create(len);
+  sendBuf->append(len);
+  std::array<uint8_t, len> readBuf;
+
+  std::thread t([&] {
+    auto acceptedSocket = server.accept();
+    acceptedSocket->write(buf.data(), buf.size());
+    acceptedSocket->flush();
+    acceptedSocket->readAll(readBuf.data(), readBuf.size());
+    acceptedSocket->close();
+  });
+
+  evb.loop();
+
+  CHECK_EQ(cb.state, STATE_SUCCEEDED);
+  EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+  EXPECT_TRUE(socket->getTFOAttempted());
+
+  // Should trigger the connect
+  WriteCallback write;
+  ReadCallback rcb;
+  socket->writeChain(&write, sendBuf->clone());
+  socket->setReadCB(&rcb);
+  evb.loop();
+
+  t.join();
+
+  EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
+  EXPECT_EQ(STATE_SUCCEEDED, write.state);
+  EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+  EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+  ASSERT_EQ(1, rcb.buffers.size());
+  ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+  EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+}
+
+#endif