X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fio%2Fasync%2Ftest%2FAsyncSocketTest2.cpp;h=13f23b75d81a6be5fab18944100bb2202f5d3ed4;hp=f44d4fd52779806a9f47de97541f32e066529f8a;hb=5c74326fdc75ccdfc2152c15203625d8588096b6;hpb=16b7f86299f935fc16d38d72a8b497e2ab4a1d4f diff --git a/folly/io/async/test/AsyncSocketTest2.cpp b/folly/io/async/test/AsyncSocketTest2.cpp index f44d4fd5..13f23b75 100644 --- a/folly/io/async/test/AsyncSocketTest2.cpp +++ b/folly/io/async/test/AsyncSocketTest2.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,25 +13,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include +#include +#include +#include #include #include #include #include -#include +#include #include #include #include +#include +#include +#include +#include +#include -#include #include -#include -#include #include -#include #include -#include -#include +#include #include using namespace boost; @@ -46,6 +50,9 @@ using std::chrono::milliseconds; using boost::scoped_array; using namespace folly; +using namespace testing; + +namespace fsp = folly::portability::sockets; class DelayedWrite: public AsyncTimeout { public: @@ -94,9 +101,33 @@ TEST(AsyncSocketTest, Connect) { evb.loop(); - CHECK_EQ(cb.state, STATE_SUCCEEDED); + ASSERT_EQ(cb.state, STATE_SUCCEEDED); + EXPECT_LE(0, socket->getConnectTime().count()); + EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30)); +} + +enum class TFOState { + DISABLED, + ENABLED, +}; + +class AsyncSocketConnectTest : public ::testing::TestWithParam {}; + +std::vector getTestingValues() { + std::vector vals; + vals.emplace_back(TFOState::DISABLED); + +#if FOLLY_ALLOW_TFO + vals.emplace_back(TFOState::ENABLED); +#endif + return vals; } +INSTANTIATE_TEST_CASE_P( + ConnectTests, + AsyncSocketConnectTest, + ::testing::ValuesIn(getTestingValues())); + /** * Test connecting to a server that isn't listening */ @@ -112,8 +143,10 @@ TEST(AsyncSocketTest, ConnectRefused) { evb.loop(); - CHECK_EQ(cb.state, STATE_FAILED); - CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN); + EXPECT_EQ(STATE_FAILED, cb.state); + EXPECT_EQ(AsyncSocketException::NOT_OPEN, cb.exception.getType()); + EXPECT_LE(0, socket->getConnectTime().count()); + EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout()); } /** @@ -130,33 +163,46 @@ TEST(AsyncSocketTest, ConnectTimeout) { // Hopefully this IP will be routable but unresponsive. // (Alternatively, we could try listening on a local raw socket, but that // normally requires root privileges.) - folly::SocketAddress addr("8.8.8.8", 65535); + auto host = + SocketAddressTestHelper::isIPv6Enabled() ? + SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 : + SocketAddressTestHelper::isIPv4Enabled() ? + SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 : + nullptr; + SocketAddress addr(host, 65535); ConnCallback cb; socket->connect(&cb, addr, 1); // also set a ridiculously small timeout evb.loop(); - CHECK_EQ(cb.state, STATE_FAILED); - CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT); + ASSERT_EQ(cb.state, STATE_FAILED); + ASSERT_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT); // Verify that we can still get the peer address after a timeout. // Use case is if the client was created from a client pool, and we want // to log which peer failed. folly::SocketAddress peer; socket->getPeerAddress(&peer); - CHECK_EQ(peer, addr); + ASSERT_EQ(peer, addr); + EXPECT_LE(0, socket->getConnectTime().count()); + EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1)); } /** * Test writing immediately after connecting, without waiting for connect * to finish. */ -TEST(AsyncSocketTest, ConnectAndWrite) { +TEST_P(AsyncSocketConnectTest, ConnectAndWrite) { TestServer server; // connect() EventBase evb; std::shared_ptr socket = AsyncSocket::newSocket(&evb); + + if (GetParam() == TFOState::ENABLED) { + socket->enableTFO(); + } + ConnCallback ccb; socket->connect(&ccb, server.getAddress(), 30); @@ -170,23 +216,31 @@ TEST(AsyncSocketTest, ConnectAndWrite) { // The kernel should be able to buffer the write request so it can succeed. evb.loop(); - CHECK_EQ(ccb.state, STATE_SUCCEEDED); - CHECK_EQ(wcb.state, STATE_SUCCEEDED); + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb.state, STATE_SUCCEEDED); // Make sure the server got a connection and received the data socket->close(); server.verifyConnection(buf, sizeof(buf)); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); + EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30)); } /** * Test connecting using a nullptr connect callback. */ -TEST(AsyncSocketTest, ConnectNullCallback) { +TEST_P(AsyncSocketConnectTest, ConnectNullCallback) { TestServer server; // connect() EventBase evb; std::shared_ptr socket = AsyncSocket::newSocket(&evb); + if (GetParam() == TFOState::ENABLED) { + socket->enableTFO(); + } + socket->connect(nullptr, server.getAddress(), 30); // write some data, just so we have some way of verifing @@ -198,11 +252,14 @@ TEST(AsyncSocketTest, ConnectNullCallback) { evb.loop(); - CHECK_EQ(wcb.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb.state, STATE_SUCCEEDED); // Make sure the server got a connection and received the data socket->close(); server.verifyConnection(buf, sizeof(buf)); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -211,12 +268,15 @@ TEST(AsyncSocketTest, ConnectNullCallback) { * * This exercises the STATE_CONNECTING_CLOSING code. */ -TEST(AsyncSocketTest, ConnectWriteAndClose) { +TEST_P(AsyncSocketConnectTest, ConnectWriteAndClose) { TestServer server; // connect() EventBase evb; std::shared_ptr socket = AsyncSocket::newSocket(&evb); + if (GetParam() == TFOState::ENABLED) { + socket->enableTFO(); + } ConnCallback ccb; socket->connect(&ccb, server.getAddress(), 30); @@ -233,11 +293,14 @@ TEST(AsyncSocketTest, ConnectWriteAndClose) { // The kernel should be able to buffer the write request so it can succeed. evb.loop(); - CHECK_EQ(ccb.state, STATE_SUCCEEDED); - CHECK_EQ(wcb.state, STATE_SUCCEEDED); + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb.state, STATE_SUCCEEDED); // Make sure the server got a connection and received the data server.verifyConnection(buf, sizeof(buf)); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -266,7 +329,10 @@ TEST(AsyncSocketTest, ConnectAndClose) { evb.loop(); // Make sure the connection was aborted - CHECK_EQ(ccb.state, STATE_FAILED); + ASSERT_EQ(ccb.state, STATE_FAILED); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -297,7 +363,10 @@ TEST(AsyncSocketTest, ConnectAndCloseNow) { evb.loop(); // Make sure the connection was aborted - CHECK_EQ(ccb.state, STATE_FAILED); + ASSERT_EQ(ccb.state, STATE_FAILED); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -335,25 +404,37 @@ TEST(AsyncSocketTest, ConnectWriteAndCloseNow) { // Loop, although there shouldn't be anything to do. evb.loop(); - CHECK_EQ(ccb.state, STATE_FAILED); - CHECK_EQ(wcb.state, STATE_FAILED); + ASSERT_EQ(ccb.state, STATE_FAILED); + ASSERT_EQ(wcb.state, STATE_FAILED); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** * Test installing a read callback immediately, before connect() finishes. */ -TEST(AsyncSocketTest, ConnectAndRead) { +TEST_P(AsyncSocketConnectTest, ConnectAndRead) { TestServer server; // connect() EventBase evb; std::shared_ptr socket = AsyncSocket::newSocket(&evb); + if (GetParam() == TFOState::ENABLED) { + socket->enableTFO(); + } + ConnCallback ccb; socket->connect(&ccb, server.getAddress(), 30); ReadCallback rcb; socket->setReadCB(&rcb); + if (GetParam() == TFOState::ENABLED) { + // Trigger a connection + socket->writeChain(nullptr, IOBuf::copyBuffer("hey")); + } + // Even though we haven't looped yet, we should be able to accept // the connection and send data to it. std::shared_ptr acceptedSocket = server.accept(); @@ -366,11 +447,13 @@ TEST(AsyncSocketTest, ConnectAndRead) { // Loop, although there shouldn't be anything to do. evb.loop(); - CHECK_EQ(ccb.state, STATE_SUCCEEDED); - CHECK_EQ(rcb.state, STATE_SUCCEEDED); - CHECK_EQ(rcb.buffers.size(), 1); - CHECK_EQ(rcb.buffers[0].length, sizeof(buf)); - CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0); + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); + ASSERT_EQ(rcb.buffers.size(), 1); + ASSERT_EQ(rcb.buffers[0].length, sizeof(buf)); + ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0); + + ASSERT_FALSE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -403,21 +486,27 @@ TEST(AsyncSocketTest, ConnectReadAndClose) { // Loop, although there shouldn't be anything to do. evb.loop(); - CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt - CHECK_EQ(rcb.buffers.size(), 0); - CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF + ASSERT_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt + ASSERT_EQ(rcb.buffers.size(), 0); + ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** * Test both writing and installing a read callback immediately, * before connect() finishes. */ -TEST(AsyncSocketTest, ConnectWriteAndRead) { +TEST_P(AsyncSocketConnectTest, ConnectWriteAndRead) { TestServer server; // connect() EventBase evb; std::shared_ptr socket = AsyncSocket::newSocket(&evb); + if (GetParam() == TFOState::ENABLED) { + socket->enableTFO(); + } ConnCallback ccb; socket->connect(&ccb, server.getAddress(), 30); @@ -447,13 +536,13 @@ TEST(AsyncSocketTest, ConnectWriteAndRead) { evb.loop(); // Make sure the connect succeeded - CHECK_EQ(ccb.state, STATE_SUCCEEDED); + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); // Make sure the AsyncSocket read the data written by the accepted socket - CHECK_EQ(rcb.state, STATE_SUCCEEDED); - CHECK_EQ(rcb.buffers.size(), 1); - CHECK_EQ(rcb.buffers[0].length, sizeof(buf2)); - CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0); + ASSERT_EQ(rcb.state, STATE_SUCCEEDED); + ASSERT_EQ(rcb.buffers.size(), 1); + ASSERT_EQ(rcb.buffers[0].length, sizeof(buf2)); + ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0); // Close the AsyncSocket so we'll see EOF on acceptedSocket socket->close(); @@ -461,9 +550,12 @@ TEST(AsyncSocketTest, ConnectWriteAndRead) { // Make sure the accepted socket saw the data written by the AsyncSocket uint8_t readbuf[sizeof(buf1)]; acceptedSocket->readAll(readbuf, sizeof(readbuf)); - CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0); + ASSERT_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0); uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf)); - CHECK_EQ(bytesRead, 0); + ASSERT_EQ(bytesRead, 0); + + ASSERT_FALSE(socket->isClosedBySelf()); + ASSERT_TRUE(socket->isClosedByPeer()); } /** @@ -507,7 +599,7 @@ TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) { fds[0].events = POLLIN; fds[0].revents = 0; int rc = poll(fds, 1, 0); - CHECK_EQ(rc, 0); + ASSERT_EQ(rc, 0); // Write data to the accepted socket uint8_t acceptedWbuf[192]; @@ -523,16 +615,16 @@ TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) { // // Check that the connection was completed successfully and that the write // callback succeeded. - CHECK_EQ(ccb.state, STATE_SUCCEEDED); - CHECK_EQ(wcb.state, STATE_SUCCEEDED); + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb.state, STATE_SUCCEEDED); // Check that we can read the data that was written to the socket, and that // we see an EOF, since its socket was half-shutdown. uint8_t readbuf[sizeof(wbuf)]; acceptedSocket->readAll(readbuf, sizeof(readbuf)); - CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0); + ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0); uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf)); - CHECK_EQ(bytesRead, 0); + ASSERT_EQ(bytesRead, 0); // Close the accepted socket. This will cause it to see EOF // and uninstall the read callback when we loop next. @@ -544,11 +636,14 @@ TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) { evb.loop(); // This loop should have read the data and seen the EOF - CHECK_EQ(rcb.state, STATE_SUCCEEDED); - CHECK_EQ(rcb.buffers.size(), 1); - CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf)); - CHECK_EQ(memcmp(rcb.buffers[0].buffer, + ASSERT_EQ(rcb.state, STATE_SUCCEEDED); + ASSERT_EQ(rcb.buffers.size(), 1); + ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf)); + ASSERT_EQ(memcmp(rcb.buffers[0].buffer, acceptedWbuf, sizeof(acceptedWbuf)), 0); + + ASSERT_FALSE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -595,7 +690,7 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) { fds[0].events = POLLIN; fds[0].revents = 0; int rc = poll(fds, 1, 0); - CHECK_EQ(rc, 0); + ASSERT_EQ(rc, 0); // Write data to the accepted socket uint8_t acceptedWbuf[192]; @@ -604,7 +699,7 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) { acceptedSocket->flush(); // Shutdown writes to the accepted socket. This will cause it to see EOF // and uninstall the read callback. - ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR); + shutdown(acceptedSocket->getSocketFD(), SHUT_WR); // Loop evb.loop(); @@ -615,25 +710,28 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) { // // Check that the connection was completed successfully and that the read // and write callbacks were invoked as expected. - CHECK_EQ(ccb.state, STATE_SUCCEEDED); - CHECK_EQ(rcb.state, STATE_SUCCEEDED); - CHECK_EQ(rcb.buffers.size(), 1); - CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf)); - CHECK_EQ(memcmp(rcb.buffers[0].buffer, + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); + ASSERT_EQ(rcb.state, STATE_SUCCEEDED); + ASSERT_EQ(rcb.buffers.size(), 1); + ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf)); + ASSERT_EQ(memcmp(rcb.buffers[0].buffer, acceptedWbuf, sizeof(acceptedWbuf)), 0); - CHECK_EQ(wcb.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb.state, STATE_SUCCEEDED); // Check that we can read the data that was written to the socket, and that // we see an EOF, since its socket was half-shutdown. uint8_t readbuf[sizeof(wbuf)]; acceptedSocket->readAll(readbuf, sizeof(readbuf)); - CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0); + ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0); uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf)); - CHECK_EQ(bytesRead, 0); + ASSERT_EQ(bytesRead, 0); // Fully close both sockets acceptedSocket->close(); socket->close(); + + ASSERT_FALSE(socket->isClosedBySelf()); + ASSERT_TRUE(socket->isClosedByPeer()); } /** @@ -671,8 +769,8 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) { socket->shutdownWriteNow(); // Verify that writeError() was invoked on the write callback. - CHECK_EQ(wcb.state, STATE_FAILED); - CHECK_EQ(wcb.bytesWritten, 0); + ASSERT_EQ(wcb.state, STATE_FAILED); + ASSERT_EQ(wcb.bytesWritten, 0); // Even though we haven't looped yet, we should be able to accept // the connection. @@ -685,7 +783,7 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) { fds[0].events = POLLIN; fds[0].revents = 0; int rc = poll(fds, 1, 0); - CHECK_EQ(rc, 0); + ASSERT_EQ(rc, 0); // Write data to the accepted socket uint8_t acceptedWbuf[192]; @@ -694,7 +792,7 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) { acceptedSocket->flush(); // Shutdown writes to the accepted socket. This will cause it to see EOF // and uninstall the read callback. - ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR); + shutdown(acceptedSocket->getSocketFD(), SHUT_WR); // Loop evb.loop(); @@ -705,11 +803,11 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) { // // Check that the connection was completed successfully and that the read // callback was invoked as expected. - CHECK_EQ(ccb.state, STATE_SUCCEEDED); - CHECK_EQ(rcb.state, STATE_SUCCEEDED); - CHECK_EQ(rcb.buffers.size(), 1); - CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf)); - CHECK_EQ(memcmp(rcb.buffers[0].buffer, + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); + ASSERT_EQ(rcb.state, STATE_SUCCEEDED); + ASSERT_EQ(rcb.buffers.size(), 1); + ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf)); + ASSERT_EQ(memcmp(rcb.buffers[0].buffer, acceptedWbuf, sizeof(acceptedWbuf)), 0); // Since we used shutdownWriteNow(), it should have discarded all pending @@ -717,11 +815,14 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) { // socket. uint8_t readbuf[sizeof(wbuf)]; uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf)); - CHECK_EQ(bytesRead, 0); + ASSERT_EQ(bytesRead, 0); // Fully close both sockets acceptedSocket->close(); socket->close(); + + ASSERT_FALSE(socket->isClosedBySelf()); + ASSERT_TRUE(socket->isClosedByPeer()); } // Helper function for use in testConnectOptWrite() @@ -795,12 +896,12 @@ void testConnectOptWrite(size_t size1, size_t size2, bool close = false) { // The kernel should be able to buffer the write request so it can succeed. evb.loop(); - CHECK_EQ(ccb.state, STATE_SUCCEEDED); + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); if (size1 > 0) { - CHECK_EQ(wcb1.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb1.state, STATE_SUCCEEDED); } if (size2 > 0) { - CHECK_EQ(wcb2.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb2.state, STATE_SUCCEEDED); } socket->close(); @@ -815,7 +916,7 @@ void testConnectOptWrite(size_t size1, size_t size2, bool close = false) { size_t end = bytesRead; if (start < size1) { size_t cmpLen = min(size1, end) - start; - CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0); + ASSERT_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0); } if (end > size1 && end <= size1 + size2) { size_t itOffset; @@ -830,12 +931,12 @@ void testConnectOptWrite(size_t size1, size_t size2, bool close = false) { buf2Offset = 0; cmpLen = end - size1; } - CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset, + ASSERT_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset, cmpLen), 0); } } - CHECK_EQ(bytesRead, size1 + size2); + ASSERT_EQ(bytesRead, size1 + size2); } TEST(AsyncSocketTest, ConnectCallbackWrite) { @@ -843,7 +944,7 @@ TEST(AsyncSocketTest, ConnectCallbackWrite) { testConnectOptWrite(100, 200); // Test using a large buffer in the connect callback, that should block - const size_t largeSize = 8*1024*1024; + const size_t largeSize = 32 * 1024 * 1024; testConnectOptWrite(100, largeSize); // Test using a large initial write @@ -895,6 +996,9 @@ TEST(AsyncSocketTest, WriteNullCallback) { // Make sure the server got a connection and received the data socket->close(); server.verifyConnection(buf, sizeof(buf)); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -909,8 +1013,12 @@ TEST(AsyncSocketTest, WriteTimeout) { AsyncSocket::newSocket(&evb, server.getAddress(), 30); evb.loop(); // loop until the socket is connected - // write() a large chunk of data, with no-one on the other end reading - size_t writeLength = 8*1024*1024; + // write() a large chunk of data, with no-one on the other end reading. + // Tricky: the kernel caches the connection metrics for recently-used + // routes (see tcp_no_metrics_save) so a freshly opened connection can + // have a send buffer size bigger than wmem_default. This makes the test + // flaky on contbuild if writeLength is < wmem_max (20M on our systems). + size_t writeLength = 32 * 1024 * 1024; uint32_t timeout = 200; socket->setSendTimeout(timeout); scoped_array buf(new char[writeLength]); @@ -923,8 +1031,8 @@ TEST(AsyncSocketTest, WriteTimeout) { TimePoint end; // Make sure the write attempt timed out as requested - CHECK_EQ(wcb.state, STATE_FAILED); - CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT); + ASSERT_EQ(wcb.state, STATE_FAILED); + ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT); // Check that the write timed out within a reasonable period of time. // We don't check for exactly the specified timeout, since AsyncSocket only @@ -964,10 +1072,10 @@ TEST(AsyncSocketTest, WritePipeError) { // accept and immediately close the socket std::shared_ptr acceptedSocket = server.accept(); - acceptedSocket.reset(); + acceptedSocket->close(); // write() a large chunk of data - size_t writeLength = 8*1024*1024; + size_t writeLength = 32 * 1024 * 1024; scoped_array buf(new char[writeLength]); memset(buf.get(), 'a', writeLength); WriteCallback wcb; @@ -978,9 +1086,78 @@ TEST(AsyncSocketTest, WritePipeError) { // Make sure the write failed. // It would be nice if AsyncSocketException could convey the errno value, // so that we could check for EPIPE - CHECK_EQ(wcb.state, STATE_FAILED); - CHECK_EQ(wcb.exception.getType(), + ASSERT_EQ(wcb.state, STATE_FAILED); + ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::INTERNAL_ERROR); + + ASSERT_FALSE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); +} + +/** + * Test that bytes written is correctly computed in case of write failure + */ +TEST(AsyncSocketTest, WriteErrorCallbackBytesWritten) { + // Send and receive buffer sizes for the sockets. + const int sockBufSize = 8 * 1024; + + TestServer server(false, sockBufSize); + + AsyncSocket::OptionMap options{ + {{SOL_SOCKET, SO_SNDBUF}, sockBufSize}, + {{SOL_SOCKET, SO_RCVBUF}, sockBufSize}, + {{IPPROTO_TCP, TCP_NODELAY}, 1}, + }; + + // The current thread will be used by the receiver - use a separate thread + // for the sender. + EventBase senderEvb; + std::thread senderThread([&]() { senderEvb.loopForever(); }); + + ConnCallback ccb; + std::shared_ptr socket; + + senderEvb.runInEventBaseThreadAndWait([&]() { + socket = AsyncSocket::newSocket(&senderEvb); + socket->connect(&ccb, server.getAddress(), 30, options); + }); + + // accept the socket on the server side + std::shared_ptr acceptedSocket = server.accept(); + + // Send a big (45KB) write so that it is partially written. The first write + // is 16KB (8KB on both sides) and subsequent writes are 8KB each. Reading + // just under 24KB would cause 3-4 writes for the total of 32-40KB in the + // following sequence: 16KB + 8KB + 8KB (+ 8KB). This ensures that not all + // bytes are written when the socket is reset. Having at least 3 writes + // ensures that the total size (45KB) would be exceeed in case of overcounting + // based on the initial write size of 16KB. + constexpr size_t sendSize = 45 * 1024; + auto const sendBuf = std::vector(sendSize, 'a'); + + WriteCallback wcb; + + senderEvb.runInEventBaseThreadAndWait( + [&]() { socket->write(&wcb, sendBuf.data(), sendSize); }); + + // Reading 20KB would cause three additional writes of 8KB, but less + // than 45KB total, so the socket is reset before all bytes are written. + constexpr size_t recvSize = 20 * 1024; + uint8_t recvBuf[recvSize]; + int bytesRead = acceptedSocket->readAll(recvBuf, sizeof(recvBuf)); + + acceptedSocket->closeWithReset(); + + senderEvb.terminateLoopSoon(); + senderThread.join(); + + LOG(INFO) << "Bytes written: " << wcb.bytesWritten; + + ASSERT_EQ(STATE_FAILED, wcb.state); + ASSERT_GE(wcb.bytesWritten, bytesRead); + ASSERT_LE(wcb.bytesWritten, sendSize); + ASSERT_EQ(recvSize, bytesRead); + ASSERT(32 * 1024 == wcb.bytesWritten || 40 * 1024 == wcb.bytesWritten); } /** @@ -1000,8 +1177,15 @@ TEST(AsyncSocketTest, WriteIOBuf) { ReadCallback rcb; acceptedSocket->setReadCB(&rcb); + // Check if EOR tracking flag can be set and reset. + EXPECT_FALSE(socket->isEorTrackingEnabled()); + socket->setEorTracking(true); + EXPECT_TRUE(socket->isEorTrackingEnabled()); + socket->setEorTracking(false); + EXPECT_FALSE(socket->isEorTrackingEnabled()); + // Write a simple buffer to the socket - size_t simpleBufLength = 5; + constexpr size_t simpleBufLength = 5; char simpleBuf[simpleBufLength]; memset(simpleBuf, 'a', simpleBufLength); WriteCallback wcb; @@ -1035,26 +1219,29 @@ TEST(AsyncSocketTest, WriteIOBuf) { // Let the reads and writes run to completion evb.loop(); - CHECK_EQ(wcb.state, STATE_SUCCEEDED); - CHECK_EQ(wcb2.state, STATE_SUCCEEDED); - CHECK_EQ(wcb3.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb2.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb3.state, STATE_SUCCEEDED); // Make sure the reader got the right data in the right order - CHECK_EQ(rcb.state, STATE_SUCCEEDED); - CHECK_EQ(rcb.buffers.size(), 1); - CHECK_EQ(rcb.buffers[0].length, + ASSERT_EQ(rcb.state, STATE_SUCCEEDED); + ASSERT_EQ(rcb.buffers.size(), 1); + ASSERT_EQ(rcb.buffers[0].length, simpleBufLength + buf1Length + buf2Length + buf3Length); - CHECK_EQ( + ASSERT_EQ( memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0); - CHECK_EQ( + ASSERT_EQ( memcmp(rcb.buffers[0].buffer + simpleBufLength, buf1Copy->data(), buf1Copy->length()), 0); - CHECK_EQ( + ASSERT_EQ( memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length, buf2Copy->data(), buf2Copy->length()), 0); acceptedSocket->close(); socket->close(); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } TEST(AsyncSocketTest, WriteIOBufCorked) { @@ -1094,25 +1281,28 @@ TEST(AsyncSocketTest, WriteIOBufCorked) { write2.scheduleTimeout(100); WriteCallback wcb3; DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true); - write3.scheduleTimeout(200); + write3.scheduleTimeout(140); evb.loop(); - CHECK_EQ(ccb.state, STATE_SUCCEEDED); - CHECK_EQ(wcb1.state, STATE_SUCCEEDED); - CHECK_EQ(wcb2.state, STATE_SUCCEEDED); + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb1.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb2.state, STATE_SUCCEEDED); if (wcb3.state != STATE_SUCCEEDED) { throw(wcb3.exception); } - CHECK_EQ(wcb3.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb3.state, STATE_SUCCEEDED); // Make sure the reader got the data with the right grouping - CHECK_EQ(rcb.state, STATE_SUCCEEDED); - CHECK_EQ(rcb.buffers.size(), 2); - CHECK_EQ(rcb.buffers[0].length, buf1Length); - CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length); + ASSERT_EQ(rcb.state, STATE_SUCCEEDED); + ASSERT_EQ(rcb.buffers.size(), 2); + ASSERT_EQ(rcb.buffers[0].length, buf1Length); + ASSERT_EQ(rcb.buffers[1].length, buf2Length + buf3Length); acceptedSocket->close(); socket->close(); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -1149,11 +1339,14 @@ TEST(AsyncSocketTest, ZeroLengthWrite) { evb.loop(); // loop until the data is sent - CHECK_EQ(wcb1.state, STATE_SUCCEEDED); - CHECK_EQ(wcb2.state, STATE_SUCCEEDED); - CHECK_EQ(wcb3.state, STATE_SUCCEEDED); - CHECK_EQ(wcb4.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb1.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb2.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb3.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb4.state, STATE_SUCCEEDED); rcb.verifyData(buf.get(), len1 + len2); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } TEST(AsyncSocketTest, ZeroLengthWritev) { @@ -1176,7 +1369,7 @@ TEST(AsyncSocketTest, ZeroLengthWritev) { memset(buf.get(), 'b', len2); WriteCallback wcb; - size_t iovCount = 4; + constexpr size_t iovCount = 4; struct iovec iov[iovCount]; iov[0].iov_base = buf.get(); iov[0].iov_len = len1; @@ -1191,8 +1384,11 @@ TEST(AsyncSocketTest, ZeroLengthWritev) { socket->close(); evb.loop(); // loop until the data is sent - CHECK_EQ(wcb.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb.state, STATE_SUCCEEDED); rcb.verifyData(buf.get(), len1 + len2); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /////////////////////////////////////////////////////////////////////////// @@ -1218,7 +1414,7 @@ TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) { evb.loop(); // Make sure we are connected - CHECK_EQ(ccb.state, STATE_SUCCEEDED); + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); // Schedule pending writes, until several write attempts have blocked char buf[128]; @@ -1249,8 +1445,11 @@ TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) { for (WriteCallbackVector::const_iterator it = writeCallbacks.begin(); it != writeCallbacks.end(); ++it) { - CHECK_EQ((*it)->state, STATE_FAILED); + ASSERT_EQ((*it)->state, STATE_FAILED); } + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /////////////////////////////////////////////////////////////////////////// @@ -1263,7 +1462,7 @@ class AsyncSocketImmediateRead : public folly::AsyncSocket { bool immediateReadCalled = false; explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {} protected: - virtual void checkForImmediateRead() noexcept override { + void checkForImmediateRead() noexcept override { immediateReadCalled = true; AsyncSocket::handleRead(); } @@ -1307,9 +1506,12 @@ TEST(AsyncSocket, ConnectReadImmediateRead) { WriteCallback wcb1; socket.write(&wcb1, expectedData, expectedDataSz); evb.loop(); - CHECK_EQ(wcb1.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb1.state, STATE_SUCCEEDED); rcb.verifyData(expectedData, expectedDataSz); - CHECK_EQ(socket.immediateReadCalled, true); + ASSERT_EQ(socket.immediateReadCalled, true); + + ASSERT_FALSE(socket.isClosedBySelf()); + ASSERT_FALSE(socket.isClosedByPeer()); } TEST(AsyncSocket, ConnectReadUninstallRead) { @@ -1355,12 +1557,15 @@ TEST(AsyncSocket, ConnectReadUninstallRead) { WriteCallback wcb; socket.write(&wcb, expectedData, expectedDataSz); evb.loop(); - CHECK_EQ(wcb.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb.state, STATE_SUCCEEDED); /* we shoud've only read maxBufferSz data since readCallback_ * was reset in dataAvailableCallback */ - CHECK_EQ(rcb.dataRead(), maxBufferSz); - CHECK_EQ(socket.immediateReadCalled, false); + ASSERT_EQ(rcb.dataRead(), maxBufferSz); + ASSERT_EQ(socket.immediateReadCalled, false); + + ASSERT_FALSE(socket.isClosedBySelf()); + ASSERT_FALSE(socket.isClosedByPeer()); } // TODO: @@ -1379,6 +1584,113 @@ TEST(AsyncSocket, ConnectReadUninstallRead) { /////////////////////////////////////////////////////////////////////////// // AsyncServerSocket tests /////////////////////////////////////////////////////////////////////////// +namespace { +/** + * Helper ConnectionEventCallback class for the test code. + * It maintains counters protected by a spin lock. + */ +class TestConnectionEventCallback : + public AsyncServerSocket::ConnectionEventCallback { + public: + virtual void onConnectionAccepted( + const int /* socket */, + const SocketAddress& /* addr */) noexcept override { + folly::RWSpinLock::WriteHolder holder(spinLock_); + connectionAccepted_++; + } + + virtual void onConnectionAcceptError(const int /* err */) noexcept override { + folly::RWSpinLock::WriteHolder holder(spinLock_); + connectionAcceptedError_++; + } + + virtual void onConnectionDropped( + const int /* socket */, + const SocketAddress& /* addr */) noexcept override { + folly::RWSpinLock::WriteHolder holder(spinLock_); + connectionDropped_++; + } + + virtual void onConnectionEnqueuedForAcceptorCallback( + const int /* socket */, + const SocketAddress& /* addr */) noexcept override { + folly::RWSpinLock::WriteHolder holder(spinLock_); + connectionEnqueuedForAcceptCallback_++; + } + + virtual void onConnectionDequeuedByAcceptorCallback( + const int /* socket */, + const SocketAddress& /* addr */) noexcept override { + folly::RWSpinLock::WriteHolder holder(spinLock_); + connectionDequeuedByAcceptCallback_++; + } + + virtual void onBackoffStarted() noexcept override { + folly::RWSpinLock::WriteHolder holder(spinLock_); + backoffStarted_++; + } + + virtual void onBackoffEnded() noexcept override { + folly::RWSpinLock::WriteHolder holder(spinLock_); + backoffEnded_++; + } + + virtual void onBackoffError() noexcept override { + folly::RWSpinLock::WriteHolder holder(spinLock_); + backoffError_++; + } + + unsigned int getConnectionAccepted() const { + folly::RWSpinLock::ReadHolder holder(spinLock_); + return connectionAccepted_; + } + + unsigned int getConnectionAcceptedError() const { + folly::RWSpinLock::ReadHolder holder(spinLock_); + return connectionAcceptedError_; + } + + unsigned int getConnectionDropped() const { + folly::RWSpinLock::ReadHolder holder(spinLock_); + return connectionDropped_; + } + + unsigned int getConnectionEnqueuedForAcceptCallback() const { + folly::RWSpinLock::ReadHolder holder(spinLock_); + return connectionEnqueuedForAcceptCallback_; + } + + unsigned int getConnectionDequeuedByAcceptCallback() const { + folly::RWSpinLock::ReadHolder holder(spinLock_); + return connectionDequeuedByAcceptCallback_; + } + + unsigned int getBackoffStarted() const { + folly::RWSpinLock::ReadHolder holder(spinLock_); + return backoffStarted_; + } + + unsigned int getBackoffEnded() const { + folly::RWSpinLock::ReadHolder holder(spinLock_); + return backoffEnded_; + } + + unsigned int getBackoffError() const { + folly::RWSpinLock::ReadHolder holder(spinLock_); + return backoffError_; + } + + private: + mutable folly::RWSpinLock spinLock_; + unsigned int connectionAccepted_{0}; + unsigned int connectionAcceptedError_{0}; + unsigned int connectionDropped_{0}; + unsigned int connectionEnqueuedForAcceptCallback_{0}; + unsigned int connectionDequeuedByAcceptCallback_{0}; + unsigned int backoffStarted_{0}; + unsigned int backoffEnded_{0}; + unsigned int backoffError_{0}; +}; /** * Helper AcceptCallback class for the test code @@ -1441,29 +1753,29 @@ class TestAcceptCallback : public AsyncServerSocket::AcceptCallback { acceptStoppedFn_ = fn; } - void connectionAccepted(int fd, const folly::SocketAddress& clientAddr) - noexcept { + void connectionAccepted( + int fd, const folly::SocketAddress& clientAddr) noexcept override { events_.emplace_back(fd, clientAddr); if (connectionAcceptedFn_) { connectionAcceptedFn_(fd, clientAddr); } } - void acceptError(const std::exception& ex) noexcept { + void acceptError(const std::exception& ex) noexcept override { events_.emplace_back(ex.what()); if (acceptErrorFn_) { acceptErrorFn_(ex); } } - void acceptStarted() noexcept { + void acceptStarted() noexcept override { events_.emplace_back(TYPE_START); if (acceptStartedFn_) { acceptStartedFn_(); } } - void acceptStopped() noexcept { + void acceptStopped() noexcept override { events_.emplace_back(TYPE_STOP); if (acceptStoppedFn_) { @@ -1479,6 +1791,7 @@ class TestAcceptCallback : public AsyncServerSocket::AcceptCallback { std::deque events_; }; +} /** * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set @@ -1497,13 +1810,13 @@ TEST(AsyncSocketTest, ServerAcceptOptions) { // Add a callback to accept one connection then stop the loop TestAcceptCallback acceptCallback; acceptCallback.setConnectionAcceptedFn( - [&](int fd, const folly::SocketAddress& addr) { - serverSocket->removeAcceptCallback(&acceptCallback, nullptr); - }); - acceptCallback.setAcceptErrorFn([&](const std::exception& ex) { - serverSocket->removeAcceptCallback(&acceptCallback, nullptr); + [&](int /* fd */, const folly::SocketAddress& /* addr */) { + serverSocket->removeAcceptCallback(&acceptCallback, &eventBase); + }); + acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) { + serverSocket->removeAcceptCallback(&acceptCallback, &eventBase); }); - serverSocket->addAcceptCallback(&acceptCallback, nullptr); + serverSocket->addAcceptCallback(&acceptCallback, &eventBase); serverSocket->startAccepting(); // Connect to the server socket @@ -1513,26 +1826,26 @@ TEST(AsyncSocketTest, ServerAcceptOptions) { eventBase.loop(); // Verify that the server accepted a connection - CHECK_EQ(acceptCallback.getEvents()->size(), 3); - CHECK_EQ(acceptCallback.getEvents()->at(0).type, + ASSERT_EQ(acceptCallback.getEvents()->size(), 3); + ASSERT_EQ(acceptCallback.getEvents()->at(0).type, TestAcceptCallback::TYPE_START); - CHECK_EQ(acceptCallback.getEvents()->at(1).type, + ASSERT_EQ(acceptCallback.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT); - CHECK_EQ(acceptCallback.getEvents()->at(2).type, + ASSERT_EQ(acceptCallback.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP); int fd = acceptCallback.getEvents()->at(1).fd; // The accepted connection should already be in non-blocking mode int flags = fcntl(fd, F_GETFL, 0); - CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK); + ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK); #ifndef TCP_NOPUSH // The accepted connection should already have TCP_NODELAY set int value; socklen_t valueLength = sizeof(value); int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength); - CHECK_EQ(rc, 0); - CHECK_EQ(value, 1); + ASSERT_EQ(rc, 0); + ASSERT_EQ(value, 1); #endif } @@ -1564,61 +1877,64 @@ TEST(AsyncSocketTest, RemoveAcceptCallback) { // Have callback 2 remove callback 3 and callback 5 the first time it is // called. int cb2Count = 0; - cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){ - std::shared_ptr sock2( + cb1.setConnectionAcceptedFn([&](int /* fd */, + const folly::SocketAddress& /* addr */) { + std::shared_ptr sock2( AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5 + }); + cb3.setConnectionAcceptedFn( + [&](int /* fd */, const folly::SocketAddress& /* addr */) {}); + cb4.setConnectionAcceptedFn( + [&](int /* fd */, const folly::SocketAddress& /* addr */) { + std::shared_ptr sock3( + AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4 + }); + cb5.setConnectionAcceptedFn( + [&](int /* fd */, const folly::SocketAddress& /* addr */) { + std::shared_ptr sock5( + AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7 + }); - cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){ - }); - cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){ - std::shared_ptr sock3( - AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4 - }); - cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){ - std::shared_ptr sock5( - AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7 - - }); cb2.setConnectionAcceptedFn( - [&](int fd, const folly::SocketAddress& addr) { - if (cb2Count == 0) { - serverSocket->removeAcceptCallback(&cb3, nullptr); - serverSocket->removeAcceptCallback(&cb5, nullptr); - } - ++cb2Count; - }); + [&](int /* fd */, const folly::SocketAddress& /* addr */) { + if (cb2Count == 0) { + serverSocket->removeAcceptCallback(&cb3, nullptr); + serverSocket->removeAcceptCallback(&cb5, nullptr); + } + ++cb2Count; + }); // Have callback 6 remove callback 4 the first time it is called, // and destroy the server socket the second time it is called int cb6Count = 0; cb6.setConnectionAcceptedFn( - [&](int fd, const folly::SocketAddress& addr) { - if (cb6Count == 0) { - serverSocket->removeAcceptCallback(&cb4, nullptr); - std::shared_ptr sock6( - AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1 - std::shared_ptr sock7( - AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2 - std::shared_ptr sock8( - AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop - - } else { - serverSocket.reset(); - } - ++cb6Count; - }); + [&](int /* fd */, const folly::SocketAddress& /* addr */) { + if (cb6Count == 0) { + serverSocket->removeAcceptCallback(&cb4, nullptr); + std::shared_ptr sock6( + AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1 + std::shared_ptr sock7( + AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2 + std::shared_ptr sock8( + AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop + + } else { + serverSocket.reset(); + } + ++cb6Count; + }); // Have callback 7 remove itself cb7.setConnectionAcceptedFn( - [&](int fd, const folly::SocketAddress& addr) { - serverSocket->removeAcceptCallback(&cb7, nullptr); - }); - - serverSocket->addAcceptCallback(&cb1, nullptr); - serverSocket->addAcceptCallback(&cb2, nullptr); - serverSocket->addAcceptCallback(&cb3, nullptr); - serverSocket->addAcceptCallback(&cb4, nullptr); - serverSocket->addAcceptCallback(&cb5, nullptr); - serverSocket->addAcceptCallback(&cb6, nullptr); - serverSocket->addAcceptCallback(&cb7, nullptr); + [&](int /* fd */, const folly::SocketAddress& /* addr */) { + serverSocket->removeAcceptCallback(&cb7, nullptr); + }); + + serverSocket->addAcceptCallback(&cb1, &eventBase); + serverSocket->addAcceptCallback(&cb2, &eventBase); + serverSocket->addAcceptCallback(&cb3, &eventBase); + serverSocket->addAcceptCallback(&cb4, &eventBase); + serverSocket->addAcceptCallback(&cb5, &eventBase); + serverSocket->addAcceptCallback(&cb6, &eventBase); + serverSocket->addAcceptCallback(&cb7, &eventBase); serverSocket->startAccepting(); // Make several connections to the socket @@ -1639,62 +1955,62 @@ TEST(AsyncSocketTest, RemoveAcceptCallback) { // exactly round robin in the future, we can simplify the test checks here. // (We'll also need to update the termination code, since we expect cb6 to // get called twice to terminate the loop.) - CHECK_EQ(cb1.getEvents()->size(), 4); - CHECK_EQ(cb1.getEvents()->at(0).type, + ASSERT_EQ(cb1.getEvents()->size(), 4); + ASSERT_EQ(cb1.getEvents()->at(0).type, TestAcceptCallback::TYPE_START); - CHECK_EQ(cb1.getEvents()->at(1).type, + ASSERT_EQ(cb1.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT); - CHECK_EQ(cb1.getEvents()->at(2).type, + ASSERT_EQ(cb1.getEvents()->at(2).type, TestAcceptCallback::TYPE_ACCEPT); - CHECK_EQ(cb1.getEvents()->at(3).type, + ASSERT_EQ(cb1.getEvents()->at(3).type, TestAcceptCallback::TYPE_STOP); - CHECK_EQ(cb2.getEvents()->size(), 4); - CHECK_EQ(cb2.getEvents()->at(0).type, + ASSERT_EQ(cb2.getEvents()->size(), 4); + ASSERT_EQ(cb2.getEvents()->at(0).type, TestAcceptCallback::TYPE_START); - CHECK_EQ(cb2.getEvents()->at(1).type, + ASSERT_EQ(cb2.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT); - CHECK_EQ(cb2.getEvents()->at(2).type, + ASSERT_EQ(cb2.getEvents()->at(2).type, TestAcceptCallback::TYPE_ACCEPT); - CHECK_EQ(cb2.getEvents()->at(3).type, + ASSERT_EQ(cb2.getEvents()->at(3).type, TestAcceptCallback::TYPE_STOP); - CHECK_EQ(cb3.getEvents()->size(), 2); - CHECK_EQ(cb3.getEvents()->at(0).type, + ASSERT_EQ(cb3.getEvents()->size(), 2); + ASSERT_EQ(cb3.getEvents()->at(0).type, TestAcceptCallback::TYPE_START); - CHECK_EQ(cb3.getEvents()->at(1).type, + ASSERT_EQ(cb3.getEvents()->at(1).type, TestAcceptCallback::TYPE_STOP); - CHECK_EQ(cb4.getEvents()->size(), 3); - CHECK_EQ(cb4.getEvents()->at(0).type, + ASSERT_EQ(cb4.getEvents()->size(), 3); + ASSERT_EQ(cb4.getEvents()->at(0).type, TestAcceptCallback::TYPE_START); - CHECK_EQ(cb4.getEvents()->at(1).type, + ASSERT_EQ(cb4.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT); - CHECK_EQ(cb4.getEvents()->at(2).type, + ASSERT_EQ(cb4.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP); - CHECK_EQ(cb5.getEvents()->size(), 2); - CHECK_EQ(cb5.getEvents()->at(0).type, + ASSERT_EQ(cb5.getEvents()->size(), 2); + ASSERT_EQ(cb5.getEvents()->at(0).type, TestAcceptCallback::TYPE_START); - CHECK_EQ(cb5.getEvents()->at(1).type, + ASSERT_EQ(cb5.getEvents()->at(1).type, TestAcceptCallback::TYPE_STOP); - CHECK_EQ(cb6.getEvents()->size(), 4); - CHECK_EQ(cb6.getEvents()->at(0).type, + ASSERT_EQ(cb6.getEvents()->size(), 4); + ASSERT_EQ(cb6.getEvents()->at(0).type, TestAcceptCallback::TYPE_START); - CHECK_EQ(cb6.getEvents()->at(1).type, + ASSERT_EQ(cb6.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT); - CHECK_EQ(cb6.getEvents()->at(2).type, + ASSERT_EQ(cb6.getEvents()->at(2).type, TestAcceptCallback::TYPE_ACCEPT); - CHECK_EQ(cb6.getEvents()->at(3).type, + ASSERT_EQ(cb6.getEvents()->at(3).type, TestAcceptCallback::TYPE_STOP); - CHECK_EQ(cb7.getEvents()->size(), 3); - CHECK_EQ(cb7.getEvents()->at(0).type, + ASSERT_EQ(cb7.getEvents()->size(), 3); + ASSERT_EQ(cb7.getEvents()->at(0).type, TestAcceptCallback::TYPE_START); - CHECK_EQ(cb7.getEvents()->at(1).type, + ASSERT_EQ(cb7.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT); - CHECK_EQ(cb7.getEvents()->at(2).type, + ASSERT_EQ(cb7.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP); } @@ -1713,21 +2029,22 @@ TEST(AsyncSocketTest, OtherThreadAcceptCallback) { // Add several accept callbacks TestAcceptCallback cb1; - auto thread_id = pthread_self(); + auto thread_id = std::this_thread::get_id(); cb1.setAcceptStartedFn([&](){ - CHECK_NE(thread_id, pthread_self()); - thread_id = pthread_self(); - }); - cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){ - CHECK_EQ(thread_id, pthread_self()); - serverSocket->removeAcceptCallback(&cb1, nullptr); + CHECK_NE(thread_id, std::this_thread::get_id()); + thread_id = std::this_thread::get_id(); }); + cb1.setConnectionAcceptedFn( + [&](int /* fd */, const folly::SocketAddress& /* addr */) { + ASSERT_EQ(thread_id, std::this_thread::get_id()); + serverSocket->removeAcceptCallback(&cb1, &eventBase); + }); cb1.setAcceptStoppedFn([&](){ - CHECK_EQ(thread_id, pthread_self()); + ASSERT_EQ(thread_id, std::this_thread::get_id()); }); // Test having callbacks remove other callbacks before them on the list, - serverSocket->addAcceptCallback(&cb1, nullptr); + serverSocket->addAcceptCallback(&cb1, &eventBase); serverSocket->startAccepting(); // Make several connections to the socket @@ -1749,31 +2066,33 @@ TEST(AsyncSocketTest, OtherThreadAcceptCallback) { // exactly round robin in the future, we can simplify the test checks here. // (We'll also need to update the termination code, since we expect cb6 to // get called twice to terminate the loop.) - CHECK_EQ(cb1.getEvents()->size(), 3); - CHECK_EQ(cb1.getEvents()->at(0).type, + ASSERT_EQ(cb1.getEvents()->size(), 3); + ASSERT_EQ(cb1.getEvents()->at(0).type, TestAcceptCallback::TYPE_START); - CHECK_EQ(cb1.getEvents()->at(1).type, + ASSERT_EQ(cb1.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT); - CHECK_EQ(cb1.getEvents()->at(2).type, + ASSERT_EQ(cb1.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP); } void serverSocketSanityTest(AsyncServerSocket* serverSocket) { + EventBase* eventBase = serverSocket->getEventBase(); + CHECK(eventBase); + // Add a callback to accept one connection then stop accepting TestAcceptCallback acceptCallback; acceptCallback.setConnectionAcceptedFn( - [&](int fd, const folly::SocketAddress& addr) { - serverSocket->removeAcceptCallback(&acceptCallback, nullptr); - }); - acceptCallback.setAcceptErrorFn([&](const std::exception& ex) { - serverSocket->removeAcceptCallback(&acceptCallback, nullptr); + [&](int /* fd */, const folly::SocketAddress& /* addr */) { + serverSocket->removeAcceptCallback(&acceptCallback, eventBase); + }); + acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) { + serverSocket->removeAcceptCallback(&acceptCallback, eventBase); }); - serverSocket->addAcceptCallback(&acceptCallback, nullptr); + serverSocket->addAcceptCallback(&acceptCallback, eventBase); serverSocket->startAccepting(); // Connect to the server socket - EventBase* eventBase = serverSocket->getEventBase(); folly::SocketAddress serverAddress; serverSocket->getAddress(&serverAddress); AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress)); @@ -1782,12 +2101,12 @@ void serverSocketSanityTest(AsyncServerSocket* serverSocket) { eventBase->loop(); // Verify that the server accepted a connection - CHECK_EQ(acceptCallback.getEvents()->size(), 3); - CHECK_EQ(acceptCallback.getEvents()->at(0).type, + ASSERT_EQ(acceptCallback.getEvents()->size(), 3); + ASSERT_EQ(acceptCallback.getEvents()->at(0).type, TestAcceptCallback::TYPE_START); - CHECK_EQ(acceptCallback.getEvents()->at(1).type, + ASSERT_EQ(acceptCallback.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT); - CHECK_EQ(acceptCallback.getEvents()->at(2).type, + ASSERT_EQ(acceptCallback.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP); } @@ -1827,9 +2146,11 @@ TEST(AsyncSocketTest, DestroyCloseTest) { acceptedSocket.reset(); // Test that server socket was closed - ssize_t sz = read(fd, simpleBuf, simpleBufLength); - CHECK_EQ(sz, -1); - CHECK_EQ(errno, 9); + folly::test::msvcSuppressAbortOnInvalidParams([&] { + ssize_t sz = read(fd, simpleBuf, simpleBufLength); + ASSERT_EQ(sz, -1); + ASSERT_EQ(errno, EBADF); + }); delete[] simpleBuf; } @@ -1842,7 +2163,7 @@ TEST(AsyncSocketTest, ServerExistingSocket) { // Test creating a socket, and letting AsyncServerSocket bind and listen { // Manually create a socket - int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); ASSERT_GE(fd, 0); // Create a server socket @@ -1863,14 +2184,14 @@ TEST(AsyncSocketTest, ServerExistingSocket) { // then letting AsyncServerSocket listen { // Manually create a socket - int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); ASSERT_GE(fd, 0); // bind struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = 0; addr.sin_addr.s_addr = INADDR_ANY; - CHECK_EQ(bind(fd, reinterpret_cast(&addr), + ASSERT_EQ(bind(fd, reinterpret_cast(&addr), sizeof(addr)), 0); // Look up the address that we bound to folly::SocketAddress boundAddress; @@ -1885,7 +2206,7 @@ TEST(AsyncSocketTest, ServerExistingSocket) { // Make sure AsyncServerSocket reports the same address that we bound to folly::SocketAddress serverSocketAddress; serverSocket->getAddress(&serverSocketAddress); - CHECK_EQ(boundAddress, serverSocketAddress); + ASSERT_EQ(boundAddress, serverSocketAddress); // Make sure the socket works serverSocketSanityTest(serverSocket.get()); @@ -1895,20 +2216,20 @@ TEST(AsyncSocketTest, ServerExistingSocket) { // then giving it to AsyncServerSocket { // Manually create a socket - int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); ASSERT_GE(fd, 0); // bind struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = 0; addr.sin_addr.s_addr = INADDR_ANY; - CHECK_EQ(bind(fd, reinterpret_cast(&addr), + ASSERT_EQ(bind(fd, reinterpret_cast(&addr), sizeof(addr)), 0); // Look up the address that we bound to folly::SocketAddress boundAddress; boundAddress.setFromLocalAddress(fd); // listen - CHECK_EQ(listen(fd, 16), 0); + ASSERT_EQ(listen(fd, 16), 0); // Create a server socket AsyncServerSocket::UniquePtr serverSocket( @@ -1918,7 +2239,7 @@ TEST(AsyncSocketTest, ServerExistingSocket) { // Make sure AsyncServerSocket reports the same address that we bound to folly::SocketAddress serverSocketAddress; serverSocket->getAddress(&serverSocketAddress); - CHECK_EQ(boundAddress, serverSocketAddress); + ASSERT_EQ(boundAddress, serverSocketAddress); // Make sure the socket works serverSocketSanityTest(serverSocket.get()); @@ -1932,7 +2253,7 @@ TEST(AsyncSocketTest, UnixDomainSocketTest) { std::shared_ptr serverSocket( AsyncServerSocket::newSocket(&eventBase)); string path(1, 0); - path.append("/anonymous"); + path.append(folly::to("/anonymous", folly::Random::rand64())); folly::SocketAddress serverAddress; serverAddress.setFromPath(path); serverSocket->bind(serverAddress); @@ -1941,13 +2262,13 @@ TEST(AsyncSocketTest, UnixDomainSocketTest) { // Add a callback to accept one connection then stop the loop TestAcceptCallback acceptCallback; acceptCallback.setConnectionAcceptedFn( - [&](int fd, const folly::SocketAddress& addr) { - serverSocket->removeAcceptCallback(&acceptCallback, nullptr); - }); - acceptCallback.setAcceptErrorFn([&](const std::exception& ex) { - serverSocket->removeAcceptCallback(&acceptCallback, nullptr); + [&](int /* fd */, const folly::SocketAddress& /* addr */) { + serverSocket->removeAcceptCallback(&acceptCallback, &eventBase); + }); + acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) { + serverSocket->removeAcceptCallback(&acceptCallback, &eventBase); }); - serverSocket->addAcceptCallback(&acceptCallback, nullptr); + serverSocket->addAcceptCallback(&acceptCallback, &eventBase); serverSocket->startAccepting(); // Connect to the server socket @@ -1957,16 +2278,749 @@ TEST(AsyncSocketTest, UnixDomainSocketTest) { eventBase.loop(); // Verify that the server accepted a connection - CHECK_EQ(acceptCallback.getEvents()->size(), 3); - CHECK_EQ(acceptCallback.getEvents()->at(0).type, + ASSERT_EQ(acceptCallback.getEvents()->size(), 3); + ASSERT_EQ(acceptCallback.getEvents()->at(0).type, TestAcceptCallback::TYPE_START); - CHECK_EQ(acceptCallback.getEvents()->at(1).type, + ASSERT_EQ(acceptCallback.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT); - CHECK_EQ(acceptCallback.getEvents()->at(2).type, + ASSERT_EQ(acceptCallback.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP); int fd = acceptCallback.getEvents()->at(1).fd; // The accepted connection should already be in non-blocking mode int flags = fcntl(fd, F_GETFL, 0); - CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK); + ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK); +} + +TEST(AsyncSocketTest, ConnectionEventCallbackDefault) { + EventBase eventBase; + TestConnectionEventCallback connectionEventCallback; + + // Create a server socket + std::shared_ptr serverSocket( + AsyncServerSocket::newSocket(&eventBase)); + serverSocket->setConnectionEventCallback(&connectionEventCallback); + serverSocket->bind(0); + serverSocket->listen(16); + folly::SocketAddress serverAddress; + serverSocket->getAddress(&serverAddress); + + // Add a callback to accept one connection then stop the loop + TestAcceptCallback acceptCallback; + acceptCallback.setConnectionAcceptedFn( + [&](int /* fd */, const folly::SocketAddress& /* addr */) { + serverSocket->removeAcceptCallback(&acceptCallback, nullptr); + }); + acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) { + serverSocket->removeAcceptCallback(&acceptCallback, nullptr); + }); + serverSocket->addAcceptCallback(&acceptCallback, &eventBase); + serverSocket->startAccepting(); + + // Connect to the server socket + std::shared_ptr socket( + AsyncSocket::newSocket(&eventBase, serverAddress)); + + eventBase.loop(); + + // Validate the connection event counters + ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1); + ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0); + ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0); + ASSERT_EQ( + connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1); + ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1); + ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0); + ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0); + ASSERT_EQ(connectionEventCallback.getBackoffError(), 0); +} + +TEST(AsyncSocketTest, CallbackInPrimaryEventBase) { + EventBase eventBase; + TestConnectionEventCallback connectionEventCallback; + + // Create a server socket + std::shared_ptr serverSocket( + AsyncServerSocket::newSocket(&eventBase)); + serverSocket->setConnectionEventCallback(&connectionEventCallback); + serverSocket->bind(0); + serverSocket->listen(16); + folly::SocketAddress serverAddress; + serverSocket->getAddress(&serverAddress); + + // Add a callback to accept one connection then stop the loop + TestAcceptCallback acceptCallback; + acceptCallback.setConnectionAcceptedFn( + [&](int /* fd */, const folly::SocketAddress& /* addr */) { + serverSocket->removeAcceptCallback(&acceptCallback, nullptr); + }); + acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) { + serverSocket->removeAcceptCallback(&acceptCallback, nullptr); + }); + bool acceptStartedFlag{false}; + acceptCallback.setAcceptStartedFn([&acceptStartedFlag](){ + acceptStartedFlag = true; + }); + bool acceptStoppedFlag{false}; + acceptCallback.setAcceptStoppedFn([&acceptStoppedFlag](){ + acceptStoppedFlag = true; + }); + serverSocket->addAcceptCallback(&acceptCallback, nullptr); + serverSocket->startAccepting(); + + // Connect to the server socket + std::shared_ptr socket( + AsyncSocket::newSocket(&eventBase, serverAddress)); + + eventBase.loop(); + + ASSERT_TRUE(acceptStartedFlag); + ASSERT_TRUE(acceptStoppedFlag); + // Validate the connection event counters + ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1); + ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0); + ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0); + ASSERT_EQ( + connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0); + ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0); + ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0); + ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0); + ASSERT_EQ(connectionEventCallback.getBackoffError(), 0); +} + + + +/** + * Test AsyncServerSocket::getNumPendingMessagesInQueue() + */ +TEST(AsyncSocketTest, NumPendingMessagesInQueue) { + EventBase eventBase; + + // Counter of how many connections have been accepted + int count = 0; + + // Create a server socket + auto serverSocket(AsyncServerSocket::newSocket(&eventBase)); + serverSocket->bind(0); + serverSocket->listen(16); + folly::SocketAddress serverAddress; + serverSocket->getAddress(&serverAddress); + + // Add a callback to accept connections + TestAcceptCallback acceptCallback; + acceptCallback.setConnectionAcceptedFn( + [&](int /* fd */, const folly::SocketAddress& /* addr */) { + count++; + ASSERT_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue()); + + if (count == 4) { + // all messages are processed, remove accept callback + serverSocket->removeAcceptCallback(&acceptCallback, &eventBase); + } + }); + acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) { + serverSocket->removeAcceptCallback(&acceptCallback, &eventBase); + }); + serverSocket->addAcceptCallback(&acceptCallback, &eventBase); + serverSocket->startAccepting(); + + // Connect to the server socket, 4 clients, there are 4 connections + auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress)); + auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress)); + auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress)); + auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress)); + + eventBase.loop(); +} + +/** + * Test AsyncTransport::BufferCallback + */ +TEST(AsyncSocketTest, BufferTest) { + TestServer server; + + EventBase evb; + AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}}; + std::shared_ptr socket = AsyncSocket::newSocket(&evb); + ConnCallback ccb; + socket->connect(&ccb, server.getAddress(), 30, option); + + char buf[100 * 1024]; + memset(buf, 'c', sizeof(buf)); + WriteCallback wcb; + BufferCallback bcb; + socket->setBufferCallback(&bcb); + socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE); + + evb.loop(); + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); + ASSERT_EQ(wcb.state, STATE_SUCCEEDED); + + ASSERT_TRUE(bcb.hasBuffered()); + ASSERT_TRUE(bcb.hasBufferCleared()); + + socket->close(); + server.verifyConnection(buf, sizeof(buf)); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); +} + +TEST(AsyncSocketTest, BufferCallbackKill) { + TestServer server; + EventBase evb; + AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}}; + std::shared_ptr socket = AsyncSocket::newSocket(&evb); + ConnCallback ccb; + socket->connect(&ccb, server.getAddress(), 30, option); + evb.loopOnce(); + + char buf[100 * 1024]; + memset(buf, 'c', sizeof(buf)); + BufferCallback bcb; + socket->setBufferCallback(&bcb); + WriteCallback wcb; + wcb.successCallback = [&] { + ASSERT_TRUE(socket.unique()); + socket.reset(); + }; + + // This will trigger AsyncSocket::handleWrite, + // which calls WriteCallback::writeSuccess, + // which calls wcb.successCallback above, + // which tries to delete socket + // Then, the socket will also try to use this BufferCallback + // And that should crash us, if there is no DestructorGuard on the stack + socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE); + + evb.loop(); + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); +} + +#if FOLLY_ALLOW_TFO +TEST(AsyncSocketTest, ConnectTFO) { + // Start listening on a local port + TestServer server(true); + + // Connect using a AsyncSocket + EventBase evb; + std::shared_ptr socket = AsyncSocket::newSocket(&evb); + socket->enableTFO(); + ConnCallback cb; + socket->connect(&cb, server.getAddress(), 30); + + std::array buf; + memset(buf.data(), 'a', buf.size()); + + std::array readBuf; + auto sendBuf = IOBuf::copyBuffer("hey"); + + std::thread t([&] { + auto acceptedSocket = server.accept(); + acceptedSocket->write(buf.data(), buf.size()); + acceptedSocket->flush(); + acceptedSocket->readAll(readBuf.data(), readBuf.size()); + acceptedSocket->close(); + }); + + evb.loop(); + + ASSERT_EQ(cb.state, STATE_SUCCEEDED); + EXPECT_LE(0, socket->getConnectTime().count()); + EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30)); + EXPECT_TRUE(socket->getTFOAttempted()); + + // Should trigger the connect + WriteCallback write; + ReadCallback rcb; + socket->writeChain(&write, sendBuf->clone()); + socket->setReadCB(&rcb); + evb.loop(); + + t.join(); + + EXPECT_EQ(STATE_SUCCEEDED, write.state); + EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size())); + EXPECT_EQ(STATE_SUCCEEDED, rcb.state); + ASSERT_EQ(1, rcb.buffers.size()); + ASSERT_EQ(sizeof(buf), rcb.buffers[0].length); + EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size())); + EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded()); +} + +TEST(AsyncSocketTest, ConnectTFOSupplyEarlyReadCB) { + // Start listening on a local port + TestServer server(true); + + // Connect using a AsyncSocket + EventBase evb; + std::shared_ptr socket = AsyncSocket::newSocket(&evb); + socket->enableTFO(); + ConnCallback cb; + socket->connect(&cb, server.getAddress(), 30); + ReadCallback rcb; + socket->setReadCB(&rcb); + + std::array buf; + memset(buf.data(), 'a', buf.size()); + + std::array readBuf; + auto sendBuf = IOBuf::copyBuffer("hey"); + + std::thread t([&] { + auto acceptedSocket = server.accept(); + acceptedSocket->write(buf.data(), buf.size()); + acceptedSocket->flush(); + acceptedSocket->readAll(readBuf.data(), readBuf.size()); + acceptedSocket->close(); + }); + + evb.loop(); + + ASSERT_EQ(cb.state, STATE_SUCCEEDED); + EXPECT_LE(0, socket->getConnectTime().count()); + EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30)); + EXPECT_TRUE(socket->getTFOAttempted()); + + // Should trigger the connect + WriteCallback write; + socket->writeChain(&write, sendBuf->clone()); + evb.loop(); + + t.join(); + + EXPECT_EQ(STATE_SUCCEEDED, write.state); + EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size())); + EXPECT_EQ(STATE_SUCCEEDED, rcb.state); + ASSERT_EQ(1, rcb.buffers.size()); + ASSERT_EQ(sizeof(buf), rcb.buffers[0].length); + EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size())); + EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded()); +} + +/** + * Test connecting to a server that isn't listening + */ +TEST(AsyncSocketTest, ConnectRefusedImmediatelyTFO) { + EventBase evb; + + std::shared_ptr socket = AsyncSocket::newSocket(&evb); + + socket->enableTFO(); + + // Hopefully nothing is actually listening on this address + folly::SocketAddress addr("::1", 65535); + ConnCallback cb; + socket->connect(&cb, addr, 30); + + evb.loop(); + + WriteCallback write1; + // Trigger the connect if TFO attempt is supported. + socket->writeChain(&write1, IOBuf::copyBuffer("hey")); + WriteCallback write2; + socket->writeChain(&write2, IOBuf::copyBuffer("hey")); + evb.loop(); + + if (!socket->getTFOFinished()) { + EXPECT_EQ(STATE_FAILED, write1.state); + } else { + EXPECT_EQ(STATE_SUCCEEDED, write1.state); + EXPECT_FALSE(socket->getTFOSucceded()); + } + + EXPECT_EQ(STATE_FAILED, write2.state); + + EXPECT_EQ(STATE_SUCCEEDED, cb.state); + EXPECT_LE(0, socket->getConnectTime().count()); + EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout()); + EXPECT_TRUE(socket->getTFOAttempted()); +} + +/** + * Test calling closeNow() immediately after connecting. + */ +TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) { + TestServer server(true); + + // connect() + EventBase evb; + std::shared_ptr socket = AsyncSocket::newSocket(&evb); + socket->enableTFO(); + + ConnCallback ccb; + socket->connect(&ccb, server.getAddress(), 30); + + // write() + std::array buf; + memset(buf.data(), 'a', buf.size()); + + // close() + socket->closeNow(); + + // Loop, although there shouldn't be anything to do. + evb.loop(); + + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); +} + +/** + * Test calling close() immediately after connect() + */ +TEST(AsyncSocketTest, ConnectAndCloseTFO) { + TestServer server(true); + + // Connect using a AsyncSocket + EventBase evb; + std::shared_ptr socket = AsyncSocket::newSocket(&evb); + socket->enableTFO(); + + ConnCallback ccb; + socket->connect(&ccb, server.getAddress(), 30); + + socket->close(); + + // Loop, although there shouldn't be anything to do. + evb.loop(); + + // Make sure the connection was aborted + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } + +class MockAsyncTFOSocket : public AsyncSocket { + public: + using UniquePtr = std::unique_ptr; + + explicit MockAsyncTFOSocket(EventBase* evb) : AsyncSocket(evb) {} + + MOCK_METHOD3(tfoSendMsg, ssize_t(int fd, struct msghdr* msg, int msg_flags)); +}; + +TEST(AsyncSocketTest, TestTFOUnsupported) { + TestServer server(true); + + // Connect using a AsyncSocket + EventBase evb; + auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb)); + socket->enableTFO(); + + ConnCallback ccb; + socket->connect(&ccb, server.getAddress(), 30); + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); + + ReadCallback rcb; + socket->setReadCB(&rcb); + + EXPECT_CALL(*socket, tfoSendMsg(_, _, _)) + .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1)); + WriteCallback write; + auto sendBuf = IOBuf::copyBuffer("hey"); + socket->writeChain(&write, sendBuf->clone()); + EXPECT_EQ(STATE_WAITING, write.state); + + std::array buf; + memset(buf.data(), 'a', buf.size()); + + std::array readBuf; + + std::thread t([&] { + std::shared_ptr acceptedSocket = server.accept(); + acceptedSocket->write(buf.data(), buf.size()); + acceptedSocket->flush(); + acceptedSocket->readAll(readBuf.data(), readBuf.size()); + acceptedSocket->close(); + }); + + evb.loop(); + + t.join(); + EXPECT_EQ(STATE_SUCCEEDED, ccb.state); + EXPECT_EQ(STATE_SUCCEEDED, write.state); + + EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size())); + EXPECT_EQ(STATE_SUCCEEDED, rcb.state); + ASSERT_EQ(1, rcb.buffers.size()); + ASSERT_EQ(sizeof(buf), rcb.buffers[0].length); + EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size())); + EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded()); +} + +TEST(AsyncSocketTest, ConnectRefusedDelayedTFO) { + EventBase evb; + + auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb)); + socket->enableTFO(); + + // Hopefully this fails + folly::SocketAddress fakeAddr("127.0.0.1", 65535); + EXPECT_CALL(*socket, tfoSendMsg(_, _, _)) + .WillOnce(Invoke([&](int fd, struct msghdr*, int) { + sockaddr_storage addr; + auto len = fakeAddr.getAddress(&addr); + int ret = connect(fd, (const struct sockaddr*)&addr, len); + LOG(INFO) << "connecting the socket " << fd << " : " << ret << " : " + << errno; + return ret; + })); + + // Hopefully nothing is actually listening on this address + ConnCallback cb; + socket->connect(&cb, fakeAddr, 30); + + WriteCallback write1; + // Trigger the connect if TFO attempt is supported. + socket->writeChain(&write1, IOBuf::copyBuffer("hey")); + + if (socket->getTFOFinished()) { + // This test is useless now. + return; + } + WriteCallback write2; + // Trigger the connect if TFO attempt is supported. + socket->writeChain(&write2, IOBuf::copyBuffer("hey")); + evb.loop(); + + EXPECT_EQ(STATE_FAILED, write1.state); + EXPECT_EQ(STATE_FAILED, write2.state); + EXPECT_FALSE(socket->getTFOSucceded()); + + EXPECT_EQ(STATE_SUCCEEDED, cb.state); + EXPECT_LE(0, socket->getConnectTime().count()); + EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout()); + EXPECT_TRUE(socket->getTFOAttempted()); +} + +TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) { + // Try connecting to server that won't respond. + // + // This depends somewhat on the network where this test is run. + // Hopefully this IP will be routable but unresponsive. + // (Alternatively, we could try listening on a local raw socket, but that + // normally requires root privileges.) + auto host = SocketAddressTestHelper::isIPv6Enabled() + ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 + : SocketAddressTestHelper::isIPv4Enabled() + ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 + : nullptr; + SocketAddress addr(host, 65535); + + // Connect using a AsyncSocket + EventBase evb; + auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb)); + socket->enableTFO(); + + ConnCallback ccb; + // Set a very small timeout + socket->connect(&ccb, addr, 1); + EXPECT_EQ(STATE_SUCCEEDED, ccb.state); + + ReadCallback rcb; + socket->setReadCB(&rcb); + + EXPECT_CALL(*socket, tfoSendMsg(_, _, _)) + .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1)); + WriteCallback write; + socket->writeChain(&write, IOBuf::copyBuffer("hey")); + + evb.loop(); + + EXPECT_EQ(STATE_FAILED, write.state); +} + +TEST(AsyncSocketTest, TestTFOFallbackToConnect) { + TestServer server(true); + + // Connect using a AsyncSocket + EventBase evb; + auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb)); + socket->enableTFO(); + + ConnCallback ccb; + socket->connect(&ccb, server.getAddress(), 30); + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); + + ReadCallback rcb; + socket->setReadCB(&rcb); + + EXPECT_CALL(*socket, tfoSendMsg(_, _, _)) + .WillOnce(Invoke([&](int fd, struct msghdr*, int) { + sockaddr_storage addr; + auto len = server.getAddress().getAddress(&addr); + return connect(fd, (const struct sockaddr*)&addr, len); + })); + WriteCallback write; + auto sendBuf = IOBuf::copyBuffer("hey"); + socket->writeChain(&write, sendBuf->clone()); + EXPECT_EQ(STATE_WAITING, write.state); + + std::array buf; + memset(buf.data(), 'a', buf.size()); + + std::array readBuf; + + std::thread t([&] { + std::shared_ptr acceptedSocket = server.accept(); + acceptedSocket->write(buf.data(), buf.size()); + acceptedSocket->flush(); + acceptedSocket->readAll(readBuf.data(), readBuf.size()); + acceptedSocket->close(); + }); + + evb.loop(); + + t.join(); + EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size())); + + EXPECT_EQ(STATE_SUCCEEDED, ccb.state); + EXPECT_EQ(STATE_SUCCEEDED, write.state); + + EXPECT_EQ(STATE_SUCCEEDED, rcb.state); + ASSERT_EQ(1, rcb.buffers.size()); + ASSERT_EQ(buf.size(), rcb.buffers[0].length); + EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size())); +} + +TEST(AsyncSocketTest, TestTFOFallbackTimeout) { + // Try connecting to server that won't respond. + // + // This depends somewhat on the network where this test is run. + // Hopefully this IP will be routable but unresponsive. + // (Alternatively, we could try listening on a local raw socket, but that + // normally requires root privileges.) + auto host = SocketAddressTestHelper::isIPv6Enabled() + ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 + : SocketAddressTestHelper::isIPv4Enabled() + ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 + : nullptr; + SocketAddress addr(host, 65535); + + // Connect using a AsyncSocket + EventBase evb; + auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb)); + socket->enableTFO(); + + ConnCallback ccb; + // Set a very small timeout + socket->connect(&ccb, addr, 1); + EXPECT_EQ(STATE_SUCCEEDED, ccb.state); + + ReadCallback rcb; + socket->setReadCB(&rcb); + + EXPECT_CALL(*socket, tfoSendMsg(_, _, _)) + .WillOnce(Invoke([&](int fd, struct msghdr*, int) { + sockaddr_storage addr2; + auto len = addr.getAddress(&addr2); + return connect(fd, (const struct sockaddr*)&addr2, len); + })); + WriteCallback write; + socket->writeChain(&write, IOBuf::copyBuffer("hey")); + + evb.loop(); + + EXPECT_EQ(STATE_FAILED, write.state); +} + +TEST(AsyncSocketTest, TestTFOEagain) { + TestServer server(true); + + // Connect using a AsyncSocket + EventBase evb; + auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb)); + socket->enableTFO(); + + ConnCallback ccb; + socket->connect(&ccb, server.getAddress(), 30); + + EXPECT_CALL(*socket, tfoSendMsg(_, _, _)) + .WillOnce(SetErrnoAndReturn(EAGAIN, -1)); + WriteCallback write; + socket->writeChain(&write, IOBuf::copyBuffer("hey")); + + evb.loop(); + + EXPECT_EQ(STATE_SUCCEEDED, ccb.state); + EXPECT_EQ(STATE_FAILED, write.state); +} + +// Sending a large amount of data in the first write which will +// definitely not fit into MSS. +TEST(AsyncSocketTest, ConnectTFOWithBigData) { + // Start listening on a local port + TestServer server(true); + + // Connect using a AsyncSocket + EventBase evb; + std::shared_ptr socket = AsyncSocket::newSocket(&evb); + socket->enableTFO(); + ConnCallback cb; + socket->connect(&cb, server.getAddress(), 30); + + std::array buf; + memset(buf.data(), 'a', buf.size()); + + constexpr size_t len = 10 * 1024; + auto sendBuf = IOBuf::create(len); + sendBuf->append(len); + std::array readBuf; + + std::thread t([&] { + auto acceptedSocket = server.accept(); + acceptedSocket->write(buf.data(), buf.size()); + acceptedSocket->flush(); + acceptedSocket->readAll(readBuf.data(), readBuf.size()); + acceptedSocket->close(); + }); + + evb.loop(); + + ASSERT_EQ(cb.state, STATE_SUCCEEDED); + EXPECT_LE(0, socket->getConnectTime().count()); + EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30)); + EXPECT_TRUE(socket->getTFOAttempted()); + + // Should trigger the connect + WriteCallback write; + ReadCallback rcb; + socket->writeChain(&write, sendBuf->clone()); + socket->setReadCB(&rcb); + evb.loop(); + + t.join(); + + EXPECT_EQ(STATE_SUCCEEDED, write.state); + EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size())); + EXPECT_EQ(STATE_SUCCEEDED, rcb.state); + ASSERT_EQ(1, rcb.buffers.size()); + ASSERT_EQ(sizeof(buf), rcb.buffers[0].length); + EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size())); + EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded()); +} + +class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback { + public: + MOCK_METHOD1(evbAttached, void(AsyncSocket*)); + MOCK_METHOD1(evbDetached, void(AsyncSocket*)); +}; + +TEST(AsyncSocketTest, EvbCallbacks) { + auto cb = folly::make_unique(); + EventBase evb; + std::shared_ptr socket = AsyncSocket::newSocket(&evb); + + InSequence seq; + EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1); + EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1); + + socket->setEvbChangedCallback(std::move(cb)); + socket->detachEventBase(); + socket->attachEventBase(&evb); +} + +#endif