+ ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
+}
+
+TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
+ EventBase eventBase;
+ TestConnectionEventCallback connectionEventCallback;
+
+ // Create a server socket
+ std::shared_ptr<AsyncServerSocket> serverSocket(
+ AsyncServerSocket::newSocket(&eventBase));
+ serverSocket->setConnectionEventCallback(&connectionEventCallback);
+ serverSocket->bind(0);
+ serverSocket->listen(16);
+ folly::SocketAddress serverAddress;
+ serverSocket->getAddress(&serverAddress);
+
+ // Add a callback to accept one connection then stop the loop
+ TestAcceptCallback acceptCallback;
+ acceptCallback.setConnectionAcceptedFn(
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+ });
+ acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+ });
+ serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
+ serverSocket->startAccepting();
+
+ // Connect to the server socket
+ std::shared_ptr<AsyncSocket> socket(
+ AsyncSocket::newSocket(&eventBase, serverAddress));
+
+ eventBase.loop();
+
+ // Validate the connection event counters
+ ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
+ ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
+ ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
+ ASSERT_EQ(
+ connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
+ ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
+ ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
+ ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
+ ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
+}
+
+TEST(AsyncSocketTest, CallbackInPrimaryEventBase) {
+ EventBase eventBase;
+ TestConnectionEventCallback connectionEventCallback;
+
+ // Create a server socket
+ std::shared_ptr<AsyncServerSocket> serverSocket(
+ AsyncServerSocket::newSocket(&eventBase));
+ serverSocket->setConnectionEventCallback(&connectionEventCallback);
+ serverSocket->bind(0);
+ serverSocket->listen(16);
+ folly::SocketAddress serverAddress;
+ serverSocket->getAddress(&serverAddress);
+
+ // Add a callback to accept one connection then stop the loop
+ TestAcceptCallback acceptCallback;
+ acceptCallback.setConnectionAcceptedFn(
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+ });
+ acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+ });
+ bool acceptStartedFlag{false};
+ acceptCallback.setAcceptStartedFn([&acceptStartedFlag](){
+ acceptStartedFlag = true;
+ });
+ bool acceptStoppedFlag{false};
+ acceptCallback.setAcceptStoppedFn([&acceptStoppedFlag](){
+ acceptStoppedFlag = true;
+ });
+ serverSocket->addAcceptCallback(&acceptCallback, nullptr);
+ serverSocket->startAccepting();
+
+ // Connect to the server socket
+ std::shared_ptr<AsyncSocket> socket(
+ AsyncSocket::newSocket(&eventBase, serverAddress));
+
+ eventBase.loop();
+
+ ASSERT_TRUE(acceptStartedFlag);
+ ASSERT_TRUE(acceptStoppedFlag);
+ // Validate the connection event counters
+ ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
+ ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
+ ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
+ ASSERT_EQ(
+ connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0);
+ ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0);
+ ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
+ ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
+ ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
+}
+
+
+
+/**
+ * Test AsyncServerSocket::getNumPendingMessagesInQueue()
+ */
+TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
+ EventBase eventBase;
+
+ // Counter of how many connections have been accepted
+ int count = 0;
+
+ // Create a server socket
+ auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
+ serverSocket->bind(0);
+ serverSocket->listen(16);
+ folly::SocketAddress serverAddress;
+ serverSocket->getAddress(&serverAddress);
+
+ // Add a callback to accept connections
+ TestAcceptCallback acceptCallback;
+ acceptCallback.setConnectionAcceptedFn(
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ count++;
+ ASSERT_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
+
+ if (count == 4) {
+ // all messages are processed, remove accept callback
+ serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+ }
+ });
+ acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+ });
+ serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
+ serverSocket->startAccepting();
+
+ // Connect to the server socket, 4 clients, there are 4 connections
+ auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
+ auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
+ auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
+ auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
+
+ eventBase.loop();
+}
+
+/**
+ * Test AsyncTransport::BufferCallback
+ */
+TEST(AsyncSocketTest, BufferTest) {
+ TestServer server;
+
+ EventBase evb;
+ AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30, option);
+
+ char buf[100 * 1024];
+ memset(buf, 'c', sizeof(buf));
+ WriteCallback wcb;
+ BufferCallback bcb;
+ socket->setBufferCallback(&bcb);
+ socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
+
+ evb.loop();
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+
+ ASSERT_TRUE(bcb.hasBuffered());
+ ASSERT_TRUE(bcb.hasBufferCleared());
+
+ socket->close();
+ server.verifyConnection(buf, sizeof(buf));
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+TEST(AsyncSocketTest, BufferCallbackKill) {
+ TestServer server;
+ EventBase evb;
+ AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30, option);
+ evb.loopOnce();
+
+ char buf[100 * 1024];
+ memset(buf, 'c', sizeof(buf));
+ BufferCallback bcb;
+ socket->setBufferCallback(&bcb);
+ WriteCallback wcb;
+ wcb.successCallback = [&] {
+ ASSERT_TRUE(socket.unique());
+ socket.reset();
+ };
+
+ // This will trigger AsyncSocket::handleWrite,
+ // which calls WriteCallback::writeSuccess,
+ // which calls wcb.successCallback above,
+ // which tries to delete socket
+ // Then, the socket will also try to use this BufferCallback
+ // And that should crash us, if there is no DestructorGuard on the stack
+ socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
+
+ evb.loop();
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+}
+
+#if FOLLY_ALLOW_TFO
+TEST(AsyncSocketTest, ConnectTFO) {
+ // Start listening on a local port
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->enableTFO();
+ ConnCallback cb;
+ socket->connect(&cb, server.getAddress(), 30);
+
+ std::array<uint8_t, 128> buf;
+ memset(buf.data(), 'a', buf.size());
+
+ std::array<uint8_t, 3> readBuf;
+ auto sendBuf = IOBuf::copyBuffer("hey");
+
+ std::thread t([&] {
+ auto acceptedSocket = server.accept();
+ acceptedSocket->write(buf.data(), buf.size());
+ acceptedSocket->flush();
+ acceptedSocket->readAll(readBuf.data(), readBuf.size());
+ acceptedSocket->close();
+ });
+
+ evb.loop();
+
+ ASSERT_EQ(cb.state, STATE_SUCCEEDED);
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+ EXPECT_TRUE(socket->getTFOAttempted());
+
+ // Should trigger the connect
+ WriteCallback write;
+ ReadCallback rcb;
+ socket->writeChain(&write, sendBuf->clone());
+ socket->setReadCB(&rcb);
+ evb.loop();
+
+ t.join();
+
+ EXPECT_EQ(STATE_SUCCEEDED, write.state);
+ EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+ EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+ ASSERT_EQ(1, rcb.buffers.size());
+ ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+ EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+ EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
+}
+
+TEST(AsyncSocketTest, ConnectTFOSupplyEarlyReadCB) {
+ // Start listening on a local port
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->enableTFO();
+ ConnCallback cb;
+ socket->connect(&cb, server.getAddress(), 30);
+ ReadCallback rcb;
+ socket->setReadCB(&rcb);
+
+ std::array<uint8_t, 128> buf;
+ memset(buf.data(), 'a', buf.size());
+
+ std::array<uint8_t, 3> readBuf;
+ auto sendBuf = IOBuf::copyBuffer("hey");
+
+ std::thread t([&] {
+ auto acceptedSocket = server.accept();
+ acceptedSocket->write(buf.data(), buf.size());
+ acceptedSocket->flush();
+ acceptedSocket->readAll(readBuf.data(), readBuf.size());
+ acceptedSocket->close();
+ });
+
+ evb.loop();
+
+ ASSERT_EQ(cb.state, STATE_SUCCEEDED);
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+ EXPECT_TRUE(socket->getTFOAttempted());
+
+ // Should trigger the connect
+ WriteCallback write;
+ socket->writeChain(&write, sendBuf->clone());
+ evb.loop();
+
+ t.join();
+
+ EXPECT_EQ(STATE_SUCCEEDED, write.state);
+ EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+ EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+ ASSERT_EQ(1, rcb.buffers.size());
+ ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+ EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+ EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
+}
+
+/**
+ * Test connecting to a server that isn't listening
+ */
+TEST(AsyncSocketTest, ConnectRefusedImmediatelyTFO) {
+ EventBase evb;
+
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+ socket->enableTFO();
+
+ // Hopefully nothing is actually listening on this address
+ folly::SocketAddress addr("::1", 65535);
+ ConnCallback cb;
+ socket->connect(&cb, addr, 30);
+
+ evb.loop();
+
+ WriteCallback write1;
+ // Trigger the connect if TFO attempt is supported.
+ socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
+ WriteCallback write2;
+ socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
+ evb.loop();
+
+ if (!socket->getTFOFinished()) {
+ EXPECT_EQ(STATE_FAILED, write1.state);
+ } else {
+ EXPECT_EQ(STATE_SUCCEEDED, write1.state);
+ EXPECT_FALSE(socket->getTFOSucceded());
+ }
+
+ EXPECT_EQ(STATE_FAILED, write2.state);
+
+ EXPECT_EQ(STATE_SUCCEEDED, cb.state);
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
+ EXPECT_TRUE(socket->getTFOAttempted());
+}
+
+/**
+ * Test calling closeNow() immediately after connecting.
+ */
+TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
+ TestServer server(true);
+
+ // connect()
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+
+ // write()
+ std::array<char, 128> buf;
+ memset(buf.data(), 'a', buf.size());
+
+ // close()
+ socket->closeNow();
+
+ // Loop, although there shouldn't be anything to do.
+ evb.loop();
+
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+/**
+ * Test calling close() immediately after connect()
+ */
+TEST(AsyncSocketTest, ConnectAndCloseTFO) {
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+
+ socket->close();
+
+ // Loop, although there shouldn't be anything to do.
+ evb.loop();
+
+ // Make sure the connection was aborted
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());