/*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#include <folly/ExceptionWrapper.h>
+#include <folly/RWSpinLock.h>
+#include <folly/Random.h>
+#include <folly/SocketAddress.h>
#include <folly/io/async/AsyncServerSocket.h>
#include <folly/io/async/AsyncSocket.h>
#include <folly/io/async/AsyncTimeout.h>
#include <folly/io/async/EventBase.h>
-#include <folly/SocketAddress.h>
#include <folly/io/IOBuf.h>
#include <folly/io/async/test/AsyncSocketTest.h>
#include <folly/io/async/test/Util.h>
+#include <folly/portability/Sockets.h>
+#include <folly/portability/Unistd.h>
+#include <folly/test/SocketAddressTestHelper.h>
-#include <gtest/gtest.h>
#include <boost/scoped_array.hpp>
-#include <iostream>
-#include <unistd.h>
#include <fcntl.h>
-#include <poll.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/tcp.h>
+#include <iostream>
#include <thread>
using namespace boost;
using boost::scoped_array;
using namespace folly;
+using namespace testing;
class DelayedWrite: public AsyncTimeout {
public:
evb.loop();
CHECK_EQ(cb.state, STATE_SUCCEEDED);
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+}
+
+enum class TFOState {
+ DISABLED,
+ ENABLED,
+};
+
+class AsyncSocketConnectTest : public ::testing::TestWithParam<TFOState> {};
+
+std::vector<TFOState> getTestingValues() {
+ std::vector<TFOState> vals;
+ vals.emplace_back(TFOState::DISABLED);
+
+#if FOLLY_ALLOW_TFO
+ vals.emplace_back(TFOState::ENABLED);
+#endif
+ return vals;
}
+INSTANTIATE_TEST_CASE_P(
+ ConnectTests,
+ AsyncSocketConnectTest,
+ ::testing::ValuesIn(getTestingValues()));
+
/**
* Test connecting to a server that isn't listening
*/
evb.loop();
- CHECK_EQ(cb.state, STATE_FAILED);
- CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
+ EXPECT_EQ(STATE_FAILED, cb.state);
+ EXPECT_EQ(AsyncSocketException::NOT_OPEN, cb.exception.getType());
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
}
/**
// Hopefully this IP will be routable but unresponsive.
// (Alternatively, we could try listening on a local raw socket, but that
// normally requires root privileges.)
- folly::SocketAddress addr("8.8.8.8", 65535);
+ auto host =
+ SocketAddressTestHelper::isIPv6Enabled() ?
+ SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
+ SocketAddressTestHelper::isIPv4Enabled() ?
+ SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
+ nullptr;
+ SocketAddress addr(host, 65535);
ConnCallback cb;
socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
folly::SocketAddress peer;
socket->getPeerAddress(&peer);
CHECK_EQ(peer, addr);
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
}
/**
* Test writing immediately after connecting, without waiting for connect
* to finish.
*/
-TEST(AsyncSocketTest, ConnectAndWrite) {
+TEST_P(AsyncSocketConnectTest, ConnectAndWrite) {
TestServer server;
// connect()
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+ if (GetParam() == TFOState::ENABLED) {
+ socket->enableTFO();
+ }
+
ConnCallback ccb;
socket->connect(&ccb, server.getAddress(), 30);
// Make sure the server got a connection and received the data
socket->close();
server.verifyConnection(buf, sizeof(buf));
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
+ EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
}
/**
* Test connecting using a nullptr connect callback.
*/
-TEST(AsyncSocketTest, ConnectNullCallback) {
+TEST_P(AsyncSocketConnectTest, ConnectNullCallback) {
TestServer server;
// connect()
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ if (GetParam() == TFOState::ENABLED) {
+ socket->enableTFO();
+ }
+
socket->connect(nullptr, server.getAddress(), 30);
// write some data, just so we have some way of verifing
// Make sure the server got a connection and received the data
socket->close();
server.verifyConnection(buf, sizeof(buf));
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
}
/**
*
* This exercises the STATE_CONNECTING_CLOSING code.
*/
-TEST(AsyncSocketTest, ConnectWriteAndClose) {
+TEST_P(AsyncSocketConnectTest, ConnectWriteAndClose) {
TestServer server;
// connect()
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ if (GetParam() == TFOState::ENABLED) {
+ socket->enableTFO();
+ }
ConnCallback ccb;
socket->connect(&ccb, server.getAddress(), 30);
// Make sure the server got a connection and received the data
server.verifyConnection(buf, sizeof(buf));
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
}
/**
// Make sure the connection was aborted
CHECK_EQ(ccb.state, STATE_FAILED);
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
}
/**
// Make sure the connection was aborted
CHECK_EQ(ccb.state, STATE_FAILED);
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
}
/**
CHECK_EQ(ccb.state, STATE_FAILED);
CHECK_EQ(wcb.state, STATE_FAILED);
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
}
/**
* Test installing a read callback immediately, before connect() finishes.
*/
-TEST(AsyncSocketTest, ConnectAndRead) {
+TEST_P(AsyncSocketConnectTest, ConnectAndRead) {
TestServer server;
// connect()
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ if (GetParam() == TFOState::ENABLED) {
+ socket->enableTFO();
+ }
+
ConnCallback ccb;
socket->connect(&ccb, server.getAddress(), 30);
ReadCallback rcb;
socket->setReadCB(&rcb);
+ if (GetParam() == TFOState::ENABLED) {
+ // Trigger a connection
+ socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
+ }
+
// Even though we haven't looped yet, we should be able to accept
// the connection and send data to it.
std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
evb.loop();
CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.state, STATE_SUCCEEDED);
CHECK_EQ(rcb.buffers.size(), 1);
CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
+
+ ASSERT_FALSE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
}
/**
CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
CHECK_EQ(rcb.buffers.size(), 0);
CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
}
/**
* Test both writing and installing a read callback immediately,
* before connect() finishes.
*/
-TEST(AsyncSocketTest, ConnectWriteAndRead) {
+TEST_P(AsyncSocketConnectTest, ConnectWriteAndRead) {
TestServer server;
// connect()
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ if (GetParam() == TFOState::ENABLED) {
+ socket->enableTFO();
+ }
ConnCallback ccb;
socket->connect(&ccb, server.getAddress(), 30);
CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
CHECK_EQ(bytesRead, 0);
+
+ ASSERT_FALSE(socket->isClosedBySelf());
+ ASSERT_TRUE(socket->isClosedByPeer());
}
/**
CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
CHECK_EQ(memcmp(rcb.buffers[0].buffer,
acceptedWbuf, sizeof(acceptedWbuf)), 0);
+
+ ASSERT_FALSE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
}
/**
// Fully close both sockets
acceptedSocket->close();
socket->close();
+
+ ASSERT_FALSE(socket->isClosedBySelf());
+ ASSERT_TRUE(socket->isClosedByPeer());
}
/**
// Fully close both sockets
acceptedSocket->close();
socket->close();
+
+ ASSERT_FALSE(socket->isClosedBySelf());
+ ASSERT_TRUE(socket->isClosedByPeer());
}
// Helper function for use in testConnectOptWrite()
// Make sure the server got a connection and received the data
socket->close();
server.verifyConnection(buf, sizeof(buf));
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
}
/**
CHECK_EQ(wcb.state, STATE_FAILED);
CHECK_EQ(wcb.exception.getType(),
AsyncSocketException::INTERNAL_ERROR);
+
+ ASSERT_FALSE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
}
/**
acceptedSocket->setReadCB(&rcb);
// Write a simple buffer to the socket
- size_t simpleBufLength = 5;
+ constexpr size_t simpleBufLength = 5;
char simpleBuf[simpleBufLength];
memset(simpleBuf, 'a', simpleBufLength);
WriteCallback wcb;
acceptedSocket->close();
socket->close();
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
}
TEST(AsyncSocketTest, WriteIOBufCorked) {
write2.scheduleTimeout(100);
WriteCallback wcb3;
DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
- write3.scheduleTimeout(200);
+ write3.scheduleTimeout(140);
evb.loop();
CHECK_EQ(ccb.state, STATE_SUCCEEDED);
acceptedSocket->close();
socket->close();
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
}
/**
CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
rcb.verifyData(buf.get(), len1 + len2);
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
}
TEST(AsyncSocketTest, ZeroLengthWritev) {
memset(buf.get(), 'b', len2);
WriteCallback wcb;
- size_t iovCount = 4;
+ constexpr size_t iovCount = 4;
struct iovec iov[iovCount];
iov[0].iov_base = buf.get();
iov[0].iov_len = len1;
CHECK_EQ(wcb.state, STATE_SUCCEEDED);
rcb.verifyData(buf.get(), len1 + len2);
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
}
///////////////////////////////////////////////////////////////////////////
++it) {
CHECK_EQ((*it)->state, STATE_FAILED);
}
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
}
///////////////////////////////////////////////////////////////////////////
bool immediateReadCalled = false;
explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
protected:
- virtual void checkForImmediateRead() noexcept override {
+ void checkForImmediateRead() noexcept override {
immediateReadCalled = true;
AsyncSocket::handleRead();
}
CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
rcb.verifyData(expectedData, expectedDataSz);
CHECK_EQ(socket.immediateReadCalled, true);
+
+ ASSERT_FALSE(socket.isClosedBySelf());
+ ASSERT_FALSE(socket.isClosedByPeer());
}
TEST(AsyncSocket, ConnectReadUninstallRead) {
* was reset in dataAvailableCallback */
CHECK_EQ(rcb.dataRead(), maxBufferSz);
CHECK_EQ(socket.immediateReadCalled, false);
+
+ ASSERT_FALSE(socket.isClosedBySelf());
+ ASSERT_FALSE(socket.isClosedByPeer());
}
// TODO:
///////////////////////////////////////////////////////////////////////////
// AsyncServerSocket tests
///////////////////////////////////////////////////////////////////////////
+namespace {
+/**
+ * Helper ConnectionEventCallback class for the test code.
+ * It maintains counters protected by a spin lock.
+ */
+class TestConnectionEventCallback :
+ public AsyncServerSocket::ConnectionEventCallback {
+ public:
+ virtual void onConnectionAccepted(
+ const int /* socket */,
+ const SocketAddress& /* addr */) noexcept override {
+ folly::RWSpinLock::WriteHolder holder(spinLock_);
+ connectionAccepted_++;
+ }
+
+ virtual void onConnectionAcceptError(const int /* err */) noexcept override {
+ folly::RWSpinLock::WriteHolder holder(spinLock_);
+ connectionAcceptedError_++;
+ }
+
+ virtual void onConnectionDropped(
+ const int /* socket */,
+ const SocketAddress& /* addr */) noexcept override {
+ folly::RWSpinLock::WriteHolder holder(spinLock_);
+ connectionDropped_++;
+ }
+
+ virtual void onConnectionEnqueuedForAcceptorCallback(
+ const int /* socket */,
+ const SocketAddress& /* addr */) noexcept override {
+ folly::RWSpinLock::WriteHolder holder(spinLock_);
+ connectionEnqueuedForAcceptCallback_++;
+ }
+
+ virtual void onConnectionDequeuedByAcceptorCallback(
+ const int /* socket */,
+ const SocketAddress& /* addr */) noexcept override {
+ folly::RWSpinLock::WriteHolder holder(spinLock_);
+ connectionDequeuedByAcceptCallback_++;
+ }
+
+ virtual void onBackoffStarted() noexcept override {
+ folly::RWSpinLock::WriteHolder holder(spinLock_);
+ backoffStarted_++;
+ }
+
+ virtual void onBackoffEnded() noexcept override {
+ folly::RWSpinLock::WriteHolder holder(spinLock_);
+ backoffEnded_++;
+ }
+
+ virtual void onBackoffError() noexcept override {
+ folly::RWSpinLock::WriteHolder holder(spinLock_);
+ backoffError_++;
+ }
+
+ unsigned int getConnectionAccepted() const {
+ folly::RWSpinLock::ReadHolder holder(spinLock_);
+ return connectionAccepted_;
+ }
+
+ unsigned int getConnectionAcceptedError() const {
+ folly::RWSpinLock::ReadHolder holder(spinLock_);
+ return connectionAcceptedError_;
+ }
+
+ unsigned int getConnectionDropped() const {
+ folly::RWSpinLock::ReadHolder holder(spinLock_);
+ return connectionDropped_;
+ }
+
+ unsigned int getConnectionEnqueuedForAcceptCallback() const {
+ folly::RWSpinLock::ReadHolder holder(spinLock_);
+ return connectionEnqueuedForAcceptCallback_;
+ }
+
+ unsigned int getConnectionDequeuedByAcceptCallback() const {
+ folly::RWSpinLock::ReadHolder holder(spinLock_);
+ return connectionDequeuedByAcceptCallback_;
+ }
+
+ unsigned int getBackoffStarted() const {
+ folly::RWSpinLock::ReadHolder holder(spinLock_);
+ return backoffStarted_;
+ }
+
+ unsigned int getBackoffEnded() const {
+ folly::RWSpinLock::ReadHolder holder(spinLock_);
+ return backoffEnded_;
+ }
+
+ unsigned int getBackoffError() const {
+ folly::RWSpinLock::ReadHolder holder(spinLock_);
+ return backoffError_;
+ }
+
+ private:
+ mutable folly::RWSpinLock spinLock_;
+ unsigned int connectionAccepted_{0};
+ unsigned int connectionAcceptedError_{0};
+ unsigned int connectionDropped_{0};
+ unsigned int connectionEnqueuedForAcceptCallback_{0};
+ unsigned int connectionDequeuedByAcceptCallback_{0};
+ unsigned int backoffStarted_{0};
+ unsigned int backoffEnded_{0};
+ unsigned int backoffError_{0};
+};
/**
* Helper AcceptCallback class for the test code
acceptStoppedFn_ = fn;
}
- void connectionAccepted(int fd, const folly::SocketAddress& clientAddr)
- noexcept {
- events_.push_back(EventInfo(fd, clientAddr));
+ void connectionAccepted(
+ int fd, const folly::SocketAddress& clientAddr) noexcept override {
+ events_.emplace_back(fd, clientAddr);
if (connectionAcceptedFn_) {
connectionAcceptedFn_(fd, clientAddr);
}
}
- void acceptError(const std::exception& ex) noexcept {
- events_.push_back(EventInfo(ex.what()));
+ void acceptError(const std::exception& ex) noexcept override {
+ events_.emplace_back(ex.what());
if (acceptErrorFn_) {
acceptErrorFn_(ex);
}
}
- void acceptStarted() noexcept {
- events_.push_back(EventInfo(TYPE_START));
+ void acceptStarted() noexcept override {
+ events_.emplace_back(TYPE_START);
if (acceptStartedFn_) {
acceptStartedFn_();
}
}
- void acceptStopped() noexcept {
- events_.push_back(EventInfo(TYPE_STOP));
+ void acceptStopped() noexcept override {
+ events_.emplace_back(TYPE_STOP);
if (acceptStoppedFn_) {
acceptStoppedFn_();
std::deque<EventInfo> events_;
};
+}
/**
* Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
// Add a callback to accept one connection then stop the loop
TestAcceptCallback acceptCallback;
acceptCallback.setConnectionAcceptedFn(
- [&](int fd, const folly::SocketAddress& addr) {
- serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
- });
- acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+ });
+ acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
});
serverSocket->addAcceptCallback(&acceptCallback, nullptr);
// Have callback 2 remove callback 3 and callback 5 the first time it is
// called.
int cb2Count = 0;
- cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
- std::shared_ptr<AsyncSocket> sock2(
+ cb1.setConnectionAcceptedFn([&](int /* fd */,
+ const folly::SocketAddress& /* addr */) {
+ std::shared_ptr<AsyncSocket> sock2(
AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
+ });
+ cb3.setConnectionAcceptedFn(
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
+ cb4.setConnectionAcceptedFn(
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ std::shared_ptr<AsyncSocket> sock3(
+ AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
+ });
+ cb5.setConnectionAcceptedFn(
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ std::shared_ptr<AsyncSocket> sock5(
+ AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
+
});
- cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
- });
- cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
- std::shared_ptr<AsyncSocket> sock3(
- AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
- });
- cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
- std::shared_ptr<AsyncSocket> sock5(
- AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
-
- });
cb2.setConnectionAcceptedFn(
- [&](int fd, const folly::SocketAddress& addr) {
- if (cb2Count == 0) {
- serverSocket->removeAcceptCallback(&cb3, nullptr);
- serverSocket->removeAcceptCallback(&cb5, nullptr);
- }
- ++cb2Count;
- });
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ if (cb2Count == 0) {
+ serverSocket->removeAcceptCallback(&cb3, nullptr);
+ serverSocket->removeAcceptCallback(&cb5, nullptr);
+ }
+ ++cb2Count;
+ });
// Have callback 6 remove callback 4 the first time it is called,
// and destroy the server socket the second time it is called
int cb6Count = 0;
cb6.setConnectionAcceptedFn(
- [&](int fd, const folly::SocketAddress& addr) {
- if (cb6Count == 0) {
- serverSocket->removeAcceptCallback(&cb4, nullptr);
- std::shared_ptr<AsyncSocket> sock6(
- AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
- std::shared_ptr<AsyncSocket> sock7(
- AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
- std::shared_ptr<AsyncSocket> sock8(
- AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
-
- } else {
- serverSocket.reset();
- }
- ++cb6Count;
- });
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ if (cb6Count == 0) {
+ serverSocket->removeAcceptCallback(&cb4, nullptr);
+ std::shared_ptr<AsyncSocket> sock6(
+ AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
+ std::shared_ptr<AsyncSocket> sock7(
+ AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
+ std::shared_ptr<AsyncSocket> sock8(
+ AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
+
+ } else {
+ serverSocket.reset();
+ }
+ ++cb6Count;
+ });
// Have callback 7 remove itself
cb7.setConnectionAcceptedFn(
- [&](int fd, const folly::SocketAddress& addr) {
- serverSocket->removeAcceptCallback(&cb7, nullptr);
- });
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ serverSocket->removeAcceptCallback(&cb7, nullptr);
+ });
serverSocket->addAcceptCallback(&cb1, nullptr);
serverSocket->addAcceptCallback(&cb2, nullptr);
// Add several accept callbacks
TestAcceptCallback cb1;
- auto thread_id = pthread_self();
+ auto thread_id = std::this_thread::get_id();
cb1.setAcceptStartedFn([&](){
- CHECK_NE(thread_id, pthread_self());
- thread_id = pthread_self();
- });
- cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
- CHECK_EQ(thread_id, pthread_self());
- serverSocket->removeAcceptCallback(&cb1, nullptr);
+ CHECK_NE(thread_id, std::this_thread::get_id());
+ thread_id = std::this_thread::get_id();
});
+ cb1.setConnectionAcceptedFn(
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ CHECK_EQ(thread_id, std::this_thread::get_id());
+ serverSocket->removeAcceptCallback(&cb1, nullptr);
+ });
cb1.setAcceptStoppedFn([&](){
- CHECK_EQ(thread_id, pthread_self());
+ CHECK_EQ(thread_id, std::this_thread::get_id());
});
// Test having callbacks remove other callbacks before them on the list,
// Add a callback to accept one connection then stop accepting
TestAcceptCallback acceptCallback;
acceptCallback.setConnectionAcceptedFn(
- [&](int fd, const folly::SocketAddress& addr) {
- serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
- });
- acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+ });
+ acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
});
serverSocket->addAcceptCallback(&acceptCallback, nullptr);
std::shared_ptr<AsyncServerSocket> serverSocket(
AsyncServerSocket::newSocket(&eventBase));
string path(1, 0);
- path.append("/anonymous");
+ path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
folly::SocketAddress serverAddress;
serverAddress.setFromPath(path);
serverSocket->bind(serverAddress);
// Add a callback to accept one connection then stop the loop
TestAcceptCallback acceptCallback;
acceptCallback.setConnectionAcceptedFn(
- [&](int fd, const folly::SocketAddress& addr) {
- serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
- });
- acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+ });
+ acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
});
serverSocket->addAcceptCallback(&acceptCallback, nullptr);
int flags = fcntl(fd, F_GETFL, 0);
CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
}
+
+TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
+ EventBase eventBase;
+ TestConnectionEventCallback connectionEventCallback;
+
+ // Create a server socket
+ std::shared_ptr<AsyncServerSocket> serverSocket(
+ AsyncServerSocket::newSocket(&eventBase));
+ serverSocket->setConnectionEventCallback(&connectionEventCallback);
+ serverSocket->bind(0);
+ serverSocket->listen(16);
+ folly::SocketAddress serverAddress;
+ serverSocket->getAddress(&serverAddress);
+
+ // Add a callback to accept one connection then stop the loop
+ TestAcceptCallback acceptCallback;
+ acceptCallback.setConnectionAcceptedFn(
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+ });
+ acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+ });
+ serverSocket->addAcceptCallback(&acceptCallback, nullptr);
+ serverSocket->startAccepting();
+
+ // Connect to the server socket
+ std::shared_ptr<AsyncSocket> socket(
+ AsyncSocket::newSocket(&eventBase, serverAddress));
+
+ eventBase.loop();
+
+ // Validate the connection event counters
+ ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
+ ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
+ ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
+ ASSERT_EQ(
+ connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
+ ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
+ ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
+ ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
+ ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
+}
+
+/**
+ * Test AsyncServerSocket::getNumPendingMessagesInQueue()
+ */
+TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
+ EventBase eventBase;
+
+ // Counter of how many connections have been accepted
+ int count = 0;
+
+ // Create a server socket
+ auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
+ serverSocket->bind(0);
+ serverSocket->listen(16);
+ folly::SocketAddress serverAddress;
+ serverSocket->getAddress(&serverAddress);
+
+ // Add a callback to accept connections
+ TestAcceptCallback acceptCallback;
+ acceptCallback.setConnectionAcceptedFn(
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ count++;
+ CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
+
+ if (count == 4) {
+ // all messages are processed, remove accept callback
+ serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+ }
+ });
+ acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+ });
+ serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
+ serverSocket->startAccepting();
+
+ // Connect to the server socket, 4 clients, there are 4 connections
+ auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
+ auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
+ auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
+ auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
+
+ eventBase.loop();
+}
+
+/**
+ * Test AsyncTransport::BufferCallback
+ */
+TEST(AsyncSocketTest, BufferTest) {
+ TestServer server;
+
+ EventBase evb;
+ AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30, option);
+
+ char buf[100 * 1024];
+ memset(buf, 'c', sizeof(buf));
+ WriteCallback wcb;
+ BufferCallback bcb;
+ socket->setBufferCallback(&bcb);
+ socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
+
+ evb.loop();
+ CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+ CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+
+ ASSERT_TRUE(bcb.hasBuffered());
+ ASSERT_TRUE(bcb.hasBufferCleared());
+
+ socket->close();
+ server.verifyConnection(buf, sizeof(buf));
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+TEST(AsyncSocketTest, BufferCallbackKill) {
+ TestServer server;
+ EventBase evb;
+ AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30, option);
+ evb.loopOnce();
+
+ char buf[100 * 1024];
+ memset(buf, 'c', sizeof(buf));
+ BufferCallback bcb;
+ socket->setBufferCallback(&bcb);
+ WriteCallback wcb;
+ wcb.successCallback = [&] {
+ ASSERT_TRUE(socket.unique());
+ socket.reset();
+ };
+
+ // This will trigger AsyncSocket::handleWrite,
+ // which calls WriteCallback::writeSuccess,
+ // which calls wcb.successCallback above,
+ // which tries to delete socket
+ // Then, the socket will also try to use this BufferCallback
+ // And that should crash us, if there is no DestructorGuard on the stack
+ socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
+
+ evb.loop();
+ CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+}
+
+#if FOLLY_ALLOW_TFO
+TEST(AsyncSocketTest, ConnectTFO) {
+ // Start listening on a local port
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->enableTFO();
+ ConnCallback cb;
+ socket->connect(&cb, server.getAddress(), 30);
+
+ std::array<uint8_t, 128> buf;
+ memset(buf.data(), 'a', buf.size());
+
+ std::array<uint8_t, 3> readBuf;
+ auto sendBuf = IOBuf::copyBuffer("hey");
+
+ std::thread t([&] {
+ auto acceptedSocket = server.accept();
+ acceptedSocket->write(buf.data(), buf.size());
+ acceptedSocket->flush();
+ acceptedSocket->readAll(readBuf.data(), readBuf.size());
+ acceptedSocket->close();
+ });
+
+ evb.loop();
+
+ CHECK_EQ(cb.state, STATE_SUCCEEDED);
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+ EXPECT_TRUE(socket->getTFOAttempted());
+
+ // Should trigger the connect
+ WriteCallback write;
+ ReadCallback rcb;
+ socket->writeChain(&write, sendBuf->clone());
+ socket->setReadCB(&rcb);
+ evb.loop();
+
+ t.join();
+
+ EXPECT_EQ(STATE_SUCCEEDED, write.state);
+ EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+ EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+ ASSERT_EQ(1, rcb.buffers.size());
+ ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+ EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+ EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
+}
+
+/**
+ * Test connecting to a server that isn't listening
+ */
+TEST(AsyncSocketTest, ConnectRefusedTFO) {
+ EventBase evb;
+
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+ socket->enableTFO();
+
+ // Hopefully nothing is actually listening on this address
+ folly::SocketAddress addr("::1", 65535);
+ ConnCallback cb;
+ socket->connect(&cb, addr, 30);
+
+ evb.loop();
+
+ WriteCallback write1;
+ // Trigger the connect if TFO attempt is supported.
+ socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
+ evb.loop();
+ WriteCallback write2;
+ socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
+ evb.loop();
+
+ if (!socket->getTFOFinished()) {
+ EXPECT_EQ(STATE_FAILED, write1.state);
+ EXPECT_FALSE(socket->getTFOFinished());
+ } else {
+ EXPECT_EQ(STATE_SUCCEEDED, write1.state);
+ EXPECT_TRUE(socket->getTFOFinished());
+ }
+
+ EXPECT_EQ(STATE_FAILED, write2.state);
+
+ EXPECT_EQ(STATE_SUCCEEDED, cb.state);
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
+ EXPECT_TRUE(socket->getTFOAttempted());
+}
+
+/**
+ * Test calling closeNow() immediately after connecting.
+ */
+TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
+ TestServer server(true);
+
+ // connect()
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+
+ // write()
+ std::array<char, 128> buf;
+ memset(buf.data(), 'a', buf.size());
+
+ // close()
+ socket->closeNow();
+
+ // Loop, although there shouldn't be anything to do.
+ evb.loop();
+
+ EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
+ CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+/**
+ * Test calling close() immediately after connect()
+ */
+TEST(AsyncSocketTest, ConnectAndCloseTFO) {
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+
+ socket->close();
+
+ // Loop, although there shouldn't be anything to do.
+ evb.loop();
+
+ // Make sure the connection was aborted
+ CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+ EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+class MockAsyncTFOSocket : public AsyncSocket {
+ public:
+ using UniquePtr = std::unique_ptr<MockAsyncTFOSocket, Destructor>;
+
+ explicit MockAsyncTFOSocket(EventBase* evb) : AsyncSocket(evb) {}
+
+ MOCK_METHOD3(tfoSendMsg, ssize_t(int fd, struct msghdr* msg, int msg_flags));
+};
+
+TEST(AsyncSocketTest, TestTFOUnsupported) {
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+ CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+
+ ReadCallback rcb;
+ socket->setReadCB(&rcb);
+
+ EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+ .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
+ WriteCallback write;
+ auto sendBuf = IOBuf::copyBuffer("hey");
+ socket->writeChain(&write, sendBuf->clone());
+ EXPECT_EQ(STATE_WAITING, write.state);
+
+ std::array<uint8_t, 128> buf;
+ memset(buf.data(), 'a', buf.size());
+
+ std::array<uint8_t, 3> readBuf;
+
+ std::thread t([&] {
+ std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+ acceptedSocket->write(buf.data(), buf.size());
+ acceptedSocket->flush();
+ acceptedSocket->readAll(readBuf.data(), readBuf.size());
+ acceptedSocket->close();
+ });
+
+ evb.loop();
+
+ t.join();
+ EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+ EXPECT_EQ(STATE_SUCCEEDED, write.state);
+
+ EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+ EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+ ASSERT_EQ(1, rcb.buffers.size());
+ ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+ EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+}
+
+TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
+ // Try connecting to server that won't respond.
+ //
+ // This depends somewhat on the network where this test is run.
+ // Hopefully this IP will be routable but unresponsive.
+ // (Alternatively, we could try listening on a local raw socket, but that
+ // normally requires root privileges.)
+ auto host = SocketAddressTestHelper::isIPv6Enabled()
+ ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
+ : SocketAddressTestHelper::isIPv4Enabled()
+ ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
+ : nullptr;
+ SocketAddress addr(host, 65535);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ // Set a very small timeout
+ socket->connect(&ccb, addr, 1);
+ EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+
+ ReadCallback rcb;
+ socket->setReadCB(&rcb);
+
+ EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+ .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
+ WriteCallback write;
+ socket->writeChain(&write, IOBuf::copyBuffer("hey"));
+
+ evb.loop();
+
+ EXPECT_EQ(STATE_FAILED, write.state);
+}
+
+TEST(AsyncSocketTest, TestTFOFallbackToConnect) {
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+ CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+
+ ReadCallback rcb;
+ socket->setReadCB(&rcb);
+
+ EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+ .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
+ sockaddr_storage addr;
+ auto len = server.getAddress().getAddress(&addr);
+ return connect(fd, (const struct sockaddr*)&addr, len);
+ }));
+ WriteCallback write;
+ auto sendBuf = IOBuf::copyBuffer("hey");
+ socket->writeChain(&write, sendBuf->clone());
+ EXPECT_EQ(STATE_WAITING, write.state);
+
+ std::array<uint8_t, 128> buf;
+ memset(buf.data(), 'a', buf.size());
+
+ std::array<uint8_t, 3> readBuf;
+
+ std::thread t([&] {
+ std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+ acceptedSocket->write(buf.data(), buf.size());
+ acceptedSocket->flush();
+ acceptedSocket->readAll(readBuf.data(), readBuf.size());
+ acceptedSocket->close();
+ });
+
+ evb.loop();
+
+ t.join();
+ EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+
+ EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+ EXPECT_EQ(STATE_SUCCEEDED, write.state);
+
+ EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+ ASSERT_EQ(1, rcb.buffers.size());
+ ASSERT_EQ(buf.size(), rcb.buffers[0].length);
+ EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+}
+
+TEST(AsyncSocketTest, TestTFOFallbackTimeout) {
+ // Try connecting to server that won't respond.
+ //
+ // This depends somewhat on the network where this test is run.
+ // Hopefully this IP will be routable but unresponsive.
+ // (Alternatively, we could try listening on a local raw socket, but that
+ // normally requires root privileges.)
+ auto host = SocketAddressTestHelper::isIPv6Enabled()
+ ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
+ : SocketAddressTestHelper::isIPv4Enabled()
+ ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
+ : nullptr;
+ SocketAddress addr(host, 65535);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ // Set a very small timeout
+ socket->connect(&ccb, addr, 1);
+ EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+
+ ReadCallback rcb;
+ socket->setReadCB(&rcb);
+
+ EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+ .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
+ sockaddr_storage addr2;
+ auto len = addr.getAddress(&addr2);
+ return connect(fd, (const struct sockaddr*)&addr2, len);
+ }));
+ WriteCallback write;
+ socket->writeChain(&write, IOBuf::copyBuffer("hey"));
+
+ evb.loop();
+
+ EXPECT_EQ(STATE_FAILED, write.state);
+}
+
+TEST(AsyncSocketTest, TestTFOEagain) {
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+
+ EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+ .WillOnce(SetErrnoAndReturn(EAGAIN, -1));
+ WriteCallback write;
+ socket->writeChain(&write, IOBuf::copyBuffer("hey"));
+
+ evb.loop();
+
+ EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+ EXPECT_EQ(STATE_FAILED, write.state);
+}
+
+// Sending a large amount of data in the first write which will
+// definitely not fit into MSS.
+TEST(AsyncSocketTest, ConnectTFOWithBigData) {
+ // Start listening on a local port
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->enableTFO();
+ ConnCallback cb;
+ socket->connect(&cb, server.getAddress(), 30);
+
+ std::array<uint8_t, 128> buf;
+ memset(buf.data(), 'a', buf.size());
+
+ constexpr size_t len = 10 * 1024;
+ auto sendBuf = IOBuf::create(len);
+ sendBuf->append(len);
+ std::array<uint8_t, len> readBuf;
+
+ std::thread t([&] {
+ auto acceptedSocket = server.accept();
+ acceptedSocket->write(buf.data(), buf.size());
+ acceptedSocket->flush();
+ acceptedSocket->readAll(readBuf.data(), readBuf.size());
+ acceptedSocket->close();
+ });
+
+ evb.loop();
+
+ CHECK_EQ(cb.state, STATE_SUCCEEDED);
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+ EXPECT_TRUE(socket->getTFOAttempted());
+
+ // Should trigger the connect
+ WriteCallback write;
+ ReadCallback rcb;
+ socket->writeChain(&write, sendBuf->clone());
+ socket->setReadCB(&rcb);
+ evb.loop();
+
+ t.join();
+
+ EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
+ EXPECT_EQ(STATE_SUCCEEDED, write.state);
+ EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+ EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+ ASSERT_EQ(1, rcb.buffers.size());
+ ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+ EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+}
+
+#endif