/*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include <folly/io/async/AsyncServerSocket.h>
+
+#include <folly/io/async/test/AsyncSocketTest2.h>
+
+#include <folly/ExceptionWrapper.h>
+#include <folly/Random.h>
+#include <folly/SocketAddress.h>
#include <folly/io/async/AsyncSocket.h>
#include <folly/io/async/AsyncTimeout.h>
#include <folly/io/async/EventBase.h>
-#include <folly/RWSpinLock.h>
-#include <folly/SocketAddress.h>
+#include <folly/experimental/TestUtil.h>
#include <folly/io/IOBuf.h>
#include <folly/io/async/test/AsyncSocketTest.h>
#include <folly/io/async/test/Util.h>
+#include <folly/portability/GMock.h>
+#include <folly/portability/GTest.h>
+#include <folly/portability/Sockets.h>
+#include <folly/portability/Unistd.h>
#include <folly/test/SocketAddressTestHelper.h>
-#include <gtest/gtest.h>
#include <boost/scoped_array.hpp>
-#include <iostream>
-#include <unistd.h>
#include <fcntl.h>
-#include <poll.h>
#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/tcp.h>
+#include <iostream>
#include <thread>
using namespace boost;
using boost::scoped_array;
using namespace folly;
+using namespace folly::test;
+using namespace testing;
+
+namespace fsp = folly::portability::sockets;
class DelayedWrite: public AsyncTimeout {
public:
evb.loop();
- CHECK_EQ(cb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(cb.state, STATE_SUCCEEDED);
EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+}
+
+enum class TFOState {
+ DISABLED,
+ ENABLED,
+};
+
+class AsyncSocketConnectTest : public ::testing::TestWithParam<TFOState> {};
+
+std::vector<TFOState> getTestingValues() {
+ std::vector<TFOState> vals;
+ vals.emplace_back(TFOState::DISABLED);
+
+#if FOLLY_ALLOW_TFO
+ vals.emplace_back(TFOState::ENABLED);
+#endif
+ return vals;
}
+INSTANTIATE_TEST_CASE_P(
+ ConnectTests,
+ AsyncSocketConnectTest,
+ ::testing::ValuesIn(getTestingValues()));
+
/**
* Test connecting to a server that isn't listening
*/
evb.loop();
- CHECK_EQ(cb.state, STATE_FAILED);
- CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
+ EXPECT_EQ(STATE_FAILED, cb.state);
+ EXPECT_EQ(AsyncSocketException::NOT_OPEN, cb.exception.getType());
EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
}
/**
evb.loop();
- CHECK_EQ(cb.state, STATE_FAILED);
- CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
+ ASSERT_EQ(cb.state, STATE_FAILED);
+ ASSERT_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
// Verify that we can still get the peer address after a timeout.
// Use case is if the client was created from a client pool, and we want
// to log which peer failed.
folly::SocketAddress peer;
socket->getPeerAddress(&peer);
- CHECK_EQ(peer, addr);
+ ASSERT_EQ(peer, addr);
EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
}
/**
* Test writing immediately after connecting, without waiting for connect
* to finish.
*/
-TEST(AsyncSocketTest, ConnectAndWrite) {
+TEST_P(AsyncSocketConnectTest, ConnectAndWrite) {
TestServer server;
// connect()
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+ if (GetParam() == TFOState::ENABLED) {
+ socket->enableTFO();
+ }
+
ConnCallback ccb;
socket->connect(&ccb, server.getAddress(), 30);
// The kernel should be able to buffer the write request so it can succeed.
evb.loop();
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
// Make sure the server got a connection and received the data
socket->close();
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
+ EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
}
/**
* Test connecting using a nullptr connect callback.
*/
-TEST(AsyncSocketTest, ConnectNullCallback) {
+TEST_P(AsyncSocketConnectTest, ConnectNullCallback) {
TestServer server;
// connect()
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ if (GetParam() == TFOState::ENABLED) {
+ socket->enableTFO();
+ }
+
socket->connect(nullptr, server.getAddress(), 30);
// write some data, just so we have some way of verifing
evb.loop();
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
// Make sure the server got a connection and received the data
socket->close();
*
* This exercises the STATE_CONNECTING_CLOSING code.
*/
-TEST(AsyncSocketTest, ConnectWriteAndClose) {
+TEST_P(AsyncSocketConnectTest, ConnectWriteAndClose) {
TestServer server;
// connect()
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ if (GetParam() == TFOState::ENABLED) {
+ socket->enableTFO();
+ }
ConnCallback ccb;
socket->connect(&ccb, server.getAddress(), 30);
// The kernel should be able to buffer the write request so it can succeed.
evb.loop();
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
// Make sure the server got a connection and received the data
server.verifyConnection(buf, sizeof(buf));
evb.loop();
// Make sure the connection was aborted
- CHECK_EQ(ccb.state, STATE_FAILED);
+ ASSERT_EQ(ccb.state, STATE_FAILED);
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
evb.loop();
// Make sure the connection was aborted
- CHECK_EQ(ccb.state, STATE_FAILED);
+ ASSERT_EQ(ccb.state, STATE_FAILED);
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
// Loop, although there shouldn't be anything to do.
evb.loop();
- CHECK_EQ(ccb.state, STATE_FAILED);
- CHECK_EQ(wcb.state, STATE_FAILED);
+ ASSERT_EQ(ccb.state, STATE_FAILED);
+ ASSERT_EQ(wcb.state, STATE_FAILED);
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
/**
* Test installing a read callback immediately, before connect() finishes.
*/
-TEST(AsyncSocketTest, ConnectAndRead) {
+TEST_P(AsyncSocketConnectTest, ConnectAndRead) {
TestServer server;
// connect()
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ if (GetParam() == TFOState::ENABLED) {
+ socket->enableTFO();
+ }
+
ConnCallback ccb;
socket->connect(&ccb, server.getAddress(), 30);
ReadCallback rcb;
socket->setReadCB(&rcb);
+ if (GetParam() == TFOState::ENABLED) {
+ // Trigger a connection
+ socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
+ }
+
// Even though we haven't looped yet, we should be able to accept
// the connection and send data to it.
std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
// Loop, although there shouldn't be anything to do.
evb.loop();
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.buffers.size(), 1);
- CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
- CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.buffers.size(), 1);
+ ASSERT_EQ(rcb.buffers[0].length, sizeof(buf));
+ ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
ASSERT_FALSE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
// Loop, although there shouldn't be anything to do.
evb.loop();
- CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
- CHECK_EQ(rcb.buffers.size(), 0);
- CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
+ ASSERT_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
+ ASSERT_EQ(rcb.buffers.size(), 0);
+ ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
* Test both writing and installing a read callback immediately,
* before connect() finishes.
*/
-TEST(AsyncSocketTest, ConnectWriteAndRead) {
+TEST_P(AsyncSocketConnectTest, ConnectWriteAndRead) {
TestServer server;
// connect()
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ if (GetParam() == TFOState::ENABLED) {
+ socket->enableTFO();
+ }
ConnCallback ccb;
socket->connect(&ccb, server.getAddress(), 30);
evb.loop();
// Make sure the connect succeeded
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
// Make sure the AsyncSocket read the data written by the accepted socket
- CHECK_EQ(rcb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.buffers.size(), 1);
- CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
- CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
+ ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.buffers.size(), 1);
+ ASSERT_EQ(rcb.buffers[0].length, sizeof(buf2));
+ ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
// Close the AsyncSocket so we'll see EOF on acceptedSocket
socket->close();
// Make sure the accepted socket saw the data written by the AsyncSocket
uint8_t readbuf[sizeof(buf1)];
acceptedSocket->readAll(readbuf, sizeof(readbuf));
- CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
+ ASSERT_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
- CHECK_EQ(bytesRead, 0);
+ ASSERT_EQ(bytesRead, 0);
ASSERT_FALSE(socket->isClosedBySelf());
ASSERT_TRUE(socket->isClosedByPeer());
fds[0].events = POLLIN;
fds[0].revents = 0;
int rc = poll(fds, 1, 0);
- CHECK_EQ(rc, 0);
+ ASSERT_EQ(rc, 0);
// Write data to the accepted socket
uint8_t acceptedWbuf[192];
//
// Check that the connection was completed successfully and that the write
// callback succeeded.
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
// Check that we can read the data that was written to the socket, and that
// we see an EOF, since its socket was half-shutdown.
uint8_t readbuf[sizeof(wbuf)];
acceptedSocket->readAll(readbuf, sizeof(readbuf));
- CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
+ ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
- CHECK_EQ(bytesRead, 0);
+ ASSERT_EQ(bytesRead, 0);
// Close the accepted socket. This will cause it to see EOF
// and uninstall the read callback when we loop next.
evb.loop();
// This loop should have read the data and seen the EOF
- CHECK_EQ(rcb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.buffers.size(), 1);
- CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
- CHECK_EQ(memcmp(rcb.buffers[0].buffer,
+ ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.buffers.size(), 1);
+ ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
+ ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
acceptedWbuf, sizeof(acceptedWbuf)), 0);
ASSERT_FALSE(socket->isClosedBySelf());
fds[0].events = POLLIN;
fds[0].revents = 0;
int rc = poll(fds, 1, 0);
- CHECK_EQ(rc, 0);
+ ASSERT_EQ(rc, 0);
// Write data to the accepted socket
uint8_t acceptedWbuf[192];
acceptedSocket->flush();
// Shutdown writes to the accepted socket. This will cause it to see EOF
// and uninstall the read callback.
- ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
+ shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
// Loop
evb.loop();
//
// Check that the connection was completed successfully and that the read
// and write callbacks were invoked as expected.
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.buffers.size(), 1);
- CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
- CHECK_EQ(memcmp(rcb.buffers[0].buffer,
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.buffers.size(), 1);
+ ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
+ ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
acceptedWbuf, sizeof(acceptedWbuf)), 0);
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
// Check that we can read the data that was written to the socket, and that
// we see an EOF, since its socket was half-shutdown.
uint8_t readbuf[sizeof(wbuf)];
acceptedSocket->readAll(readbuf, sizeof(readbuf));
- CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
+ ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
- CHECK_EQ(bytesRead, 0);
+ ASSERT_EQ(bytesRead, 0);
// Fully close both sockets
acceptedSocket->close();
socket->shutdownWriteNow();
// Verify that writeError() was invoked on the write callback.
- CHECK_EQ(wcb.state, STATE_FAILED);
- CHECK_EQ(wcb.bytesWritten, 0);
+ ASSERT_EQ(wcb.state, STATE_FAILED);
+ ASSERT_EQ(wcb.bytesWritten, 0);
// Even though we haven't looped yet, we should be able to accept
// the connection.
fds[0].events = POLLIN;
fds[0].revents = 0;
int rc = poll(fds, 1, 0);
- CHECK_EQ(rc, 0);
+ ASSERT_EQ(rc, 0);
// Write data to the accepted socket
uint8_t acceptedWbuf[192];
acceptedSocket->flush();
// Shutdown writes to the accepted socket. This will cause it to see EOF
// and uninstall the read callback.
- ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
+ shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
// Loop
evb.loop();
//
// Check that the connection was completed successfully and that the read
// callback was invoked as expected.
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.buffers.size(), 1);
- CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
- CHECK_EQ(memcmp(rcb.buffers[0].buffer,
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.buffers.size(), 1);
+ ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
+ ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
acceptedWbuf, sizeof(acceptedWbuf)), 0);
// Since we used shutdownWriteNow(), it should have discarded all pending
// socket.
uint8_t readbuf[sizeof(wbuf)];
uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
- CHECK_EQ(bytesRead, 0);
+ ASSERT_EQ(bytesRead, 0);
// Fully close both sockets
acceptedSocket->close();
// The kernel should be able to buffer the write request so it can succeed.
evb.loop();
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
if (size1 > 0) {
- CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
}
if (size2 > 0) {
- CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
}
socket->close();
size_t end = bytesRead;
if (start < size1) {
size_t cmpLen = min(size1, end) - start;
- CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
+ ASSERT_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
}
if (end > size1 && end <= size1 + size2) {
size_t itOffset;
buf2Offset = 0;
cmpLen = end - size1;
}
- CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
+ ASSERT_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
cmpLen),
0);
}
}
- CHECK_EQ(bytesRead, size1 + size2);
+ ASSERT_EQ(bytesRead, size1 + size2);
}
TEST(AsyncSocketTest, ConnectCallbackWrite) {
testConnectOptWrite(100, 200);
// Test using a large buffer in the connect callback, that should block
- const size_t largeSize = 8*1024*1024;
+ const size_t largeSize = 32 * 1024 * 1024;
testConnectOptWrite(100, largeSize);
// Test using a large initial write
AsyncSocket::newSocket(&evb, server.getAddress(), 30);
evb.loop(); // loop until the socket is connected
- // write() a large chunk of data, with no-one on the other end reading
- size_t writeLength = 8*1024*1024;
+ // write() a large chunk of data, with no-one on the other end reading.
+ // Tricky: the kernel caches the connection metrics for recently-used
+ // routes (see tcp_no_metrics_save) so a freshly opened connection can
+ // have a send buffer size bigger than wmem_default. This makes the test
+ // flaky on contbuild if writeLength is < wmem_max (20M on our systems).
+ size_t writeLength = 32 * 1024 * 1024;
uint32_t timeout = 200;
socket->setSendTimeout(timeout);
scoped_array<char> buf(new char[writeLength]);
TimePoint end;
// Make sure the write attempt timed out as requested
- CHECK_EQ(wcb.state, STATE_FAILED);
- CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
+ ASSERT_EQ(wcb.state, STATE_FAILED);
+ ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
// Check that the write timed out within a reasonable period of time.
// We don't check for exactly the specified timeout, since AsyncSocket only
// accept and immediately close the socket
std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
- acceptedSocket.reset();
+ acceptedSocket->close();
// write() a large chunk of data
- size_t writeLength = 8*1024*1024;
+ size_t writeLength = 32 * 1024 * 1024;
scoped_array<char> buf(new char[writeLength]);
memset(buf.get(), 'a', writeLength);
WriteCallback wcb;
// Make sure the write failed.
// It would be nice if AsyncSocketException could convey the errno value,
// so that we could check for EPIPE
- CHECK_EQ(wcb.state, STATE_FAILED);
- CHECK_EQ(wcb.exception.getType(),
+ ASSERT_EQ(wcb.state, STATE_FAILED);
+ ASSERT_EQ(wcb.exception.getType(),
AsyncSocketException::INTERNAL_ERROR);
ASSERT_FALSE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
}
+/**
+ * Test that bytes written is correctly computed in case of write failure
+ */
+TEST(AsyncSocketTest, WriteErrorCallbackBytesWritten) {
+ // Send and receive buffer sizes for the sockets.
+ const int sockBufSize = 8 * 1024;
+
+ TestServer server(false, sockBufSize);
+
+ AsyncSocket::OptionMap options{
+ {{SOL_SOCKET, SO_SNDBUF}, sockBufSize},
+ {{SOL_SOCKET, SO_RCVBUF}, sockBufSize},
+ {{IPPROTO_TCP, TCP_NODELAY}, 1},
+ };
+
+ // The current thread will be used by the receiver - use a separate thread
+ // for the sender.
+ EventBase senderEvb;
+ std::thread senderThread([&]() { senderEvb.loopForever(); });
+
+ ConnCallback ccb;
+ std::shared_ptr<AsyncSocket> socket;
+
+ senderEvb.runInEventBaseThreadAndWait([&]() {
+ socket = AsyncSocket::newSocket(&senderEvb);
+ socket->connect(&ccb, server.getAddress(), 30, options);
+ });
+
+ // accept the socket on the server side
+ std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+
+ // Send a big (45KB) write so that it is partially written. The first write
+ // is 16KB (8KB on both sides) and subsequent writes are 8KB each. Reading
+ // just under 24KB would cause 3-4 writes for the total of 32-40KB in the
+ // following sequence: 16KB + 8KB + 8KB (+ 8KB). This ensures that not all
+ // bytes are written when the socket is reset. Having at least 3 writes
+ // ensures that the total size (45KB) would be exceeed in case of overcounting
+ // based on the initial write size of 16KB.
+ constexpr size_t sendSize = 45 * 1024;
+ auto const sendBuf = std::vector<char>(sendSize, 'a');
+
+ WriteCallback wcb;
+
+ senderEvb.runInEventBaseThreadAndWait(
+ [&]() { socket->write(&wcb, sendBuf.data(), sendSize); });
+
+ // Reading 20KB would cause three additional writes of 8KB, but less
+ // than 45KB total, so the socket is reset before all bytes are written.
+ constexpr size_t recvSize = 20 * 1024;
+ uint8_t recvBuf[recvSize];
+ int bytesRead = acceptedSocket->readAll(recvBuf, sizeof(recvBuf));
+
+ acceptedSocket->closeWithReset();
+
+ senderEvb.terminateLoopSoon();
+ senderThread.join();
+
+ LOG(INFO) << "Bytes written: " << wcb.bytesWritten;
+
+ ASSERT_EQ(STATE_FAILED, wcb.state);
+ ASSERT_GE(wcb.bytesWritten, bytesRead);
+ ASSERT_LE(wcb.bytesWritten, sendSize);
+ ASSERT_EQ(recvSize, bytesRead);
+ ASSERT(32 * 1024 == wcb.bytesWritten || 40 * 1024 == wcb.bytesWritten);
+}
+
/**
* Test writing a mix of simple buffers and IOBufs
*/
ReadCallback rcb;
acceptedSocket->setReadCB(&rcb);
+ // Check if EOR tracking flag can be set and reset.
+ EXPECT_FALSE(socket->isEorTrackingEnabled());
+ socket->setEorTracking(true);
+ EXPECT_TRUE(socket->isEorTrackingEnabled());
+ socket->setEorTracking(false);
+ EXPECT_FALSE(socket->isEorTrackingEnabled());
+
// Write a simple buffer to the socket
- size_t simpleBufLength = 5;
+ constexpr size_t simpleBufLength = 5;
char simpleBuf[simpleBufLength];
memset(simpleBuf, 'a', simpleBufLength);
WriteCallback wcb;
// Let the reads and writes run to completion
evb.loop();
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
// Make sure the reader got the right data in the right order
- CHECK_EQ(rcb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.buffers.size(), 1);
- CHECK_EQ(rcb.buffers[0].length,
+ ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.buffers.size(), 1);
+ ASSERT_EQ(rcb.buffers[0].length,
simpleBufLength + buf1Length + buf2Length + buf3Length);
- CHECK_EQ(
+ ASSERT_EQ(
memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
- CHECK_EQ(
+ ASSERT_EQ(
memcmp(rcb.buffers[0].buffer + simpleBufLength,
buf1Copy->data(), buf1Copy->length()), 0);
- CHECK_EQ(
+ ASSERT_EQ(
memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
buf2Copy->data(), buf2Copy->length()), 0);
write2.scheduleTimeout(100);
WriteCallback wcb3;
DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
- write3.scheduleTimeout(200);
+ write3.scheduleTimeout(140);
evb.loop();
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
if (wcb3.state != STATE_SUCCEEDED) {
throw(wcb3.exception);
}
- CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
// Make sure the reader got the data with the right grouping
- CHECK_EQ(rcb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.buffers.size(), 2);
- CHECK_EQ(rcb.buffers[0].length, buf1Length);
- CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
+ ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.buffers.size(), 2);
+ ASSERT_EQ(rcb.buffers[0].length, buf1Length);
+ ASSERT_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
acceptedSocket->close();
socket->close();
evb.loop(); // loop until the data is sent
- CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb4.state, STATE_SUCCEEDED);
rcb.verifyData(buf.get(), len1 + len2);
ASSERT_TRUE(socket->isClosedBySelf());
memset(buf.get(), 'b', len2);
WriteCallback wcb;
- size_t iovCount = 4;
+ constexpr size_t iovCount = 4;
struct iovec iov[iovCount];
iov[0].iov_base = buf.get();
iov[0].iov_len = len1;
socket->close();
evb.loop(); // loop until the data is sent
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
rcb.verifyData(buf.get(), len1 + len2);
ASSERT_TRUE(socket->isClosedBySelf());
evb.loop();
// Make sure we are connected
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
// Schedule pending writes, until several write attempts have blocked
char buf[128];
for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
it != writeCallbacks.end();
++it) {
- CHECK_EQ((*it)->state, STATE_FAILED);
+ ASSERT_EQ((*it)->state, STATE_FAILED);
}
ASSERT_TRUE(socket->isClosedBySelf());
WriteCallback wcb1;
socket.write(&wcb1, expectedData, expectedDataSz);
evb.loop();
- CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
rcb.verifyData(expectedData, expectedDataSz);
- CHECK_EQ(socket.immediateReadCalled, true);
+ ASSERT_EQ(socket.immediateReadCalled, true);
ASSERT_FALSE(socket.isClosedBySelf());
ASSERT_FALSE(socket.isClosedByPeer());
WriteCallback wcb;
socket.write(&wcb, expectedData, expectedDataSz);
evb.loop();
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
/* we shoud've only read maxBufferSz data since readCallback_
* was reset in dataAvailableCallback */
- CHECK_EQ(rcb.dataRead(), maxBufferSz);
- CHECK_EQ(socket.immediateReadCalled, false);
+ ASSERT_EQ(rcb.dataRead(), maxBufferSz);
+ ASSERT_EQ(socket.immediateReadCalled, false);
ASSERT_FALSE(socket.isClosedBySelf());
ASSERT_FALSE(socket.isClosedByPeer());
///////////////////////////////////////////////////////////////////////////
// AsyncServerSocket tests
///////////////////////////////////////////////////////////////////////////
-namespace {
-/**
- * Helper ConnectionEventCallback class for the test code.
- * It maintains counters protected by a spin lock.
- */
-class TestConnectionEventCallback :
- public AsyncServerSocket::ConnectionEventCallback {
- public:
- virtual void onConnectionAccepted(
- const int socket,
- const SocketAddress& addr) noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- connectionAccepted_++;
- }
-
- virtual void onConnectionAcceptError(const int err) noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- connectionAcceptedError_++;
- }
-
- virtual void onConnectionDropped(
- const int socket,
- const SocketAddress& addr) noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- connectionDropped_++;
- }
-
- virtual void onConnectionEnqueuedForAcceptorCallback(
- const int socket,
- const SocketAddress& addr) noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- connectionEnqueuedForAcceptCallback_++;
- }
-
- virtual void onConnectionDequeuedByAcceptorCallback(
- const int socket,
- const SocketAddress& addr) noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- connectionDequeuedByAcceptCallback_++;
- }
-
- virtual void onBackoffStarted() noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- backoffStarted_++;
- }
-
- virtual void onBackoffEnded() noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- backoffEnded_++;
- }
-
- virtual void onBackoffError() noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- backoffError_++;
- }
-
- unsigned int getConnectionAccepted() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return connectionAccepted_;
- }
-
- unsigned int getConnectionAcceptedError() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return connectionAcceptedError_;
- }
-
- unsigned int getConnectionDropped() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return connectionDropped_;
- }
-
- unsigned int getConnectionEnqueuedForAcceptCallback() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return connectionEnqueuedForAcceptCallback_;
- }
-
- unsigned int getConnectionDequeuedByAcceptCallback() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return connectionDequeuedByAcceptCallback_;
- }
-
- unsigned int getBackoffStarted() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return backoffStarted_;
- }
-
- unsigned int getBackoffEnded() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return backoffEnded_;
- }
-
- unsigned int getBackoffError() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return backoffError_;
- }
-
- private:
- mutable folly::RWSpinLock spinLock_;
- unsigned int connectionAccepted_{0};
- unsigned int connectionAcceptedError_{0};
- unsigned int connectionDropped_{0};
- unsigned int connectionEnqueuedForAcceptCallback_{0};
- unsigned int connectionDequeuedByAcceptCallback_{0};
- unsigned int backoffStarted_{0};
- unsigned int backoffEnded_{0};
- unsigned int backoffError_{0};
-};
-
-/**
- * Helper AcceptCallback class for the test code
- * It records the callbacks that were invoked, and also supports calling
- * generic std::function objects in each callback.
- */
-class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
- public:
- enum EventType {
- TYPE_START,
- TYPE_ACCEPT,
- TYPE_ERROR,
- TYPE_STOP
- };
- struct EventInfo {
- EventInfo(int fd, const folly::SocketAddress& addr)
- : type(TYPE_ACCEPT),
- fd(fd),
- address(addr),
- errorMsg() {}
- explicit EventInfo(const std::string& msg)
- : type(TYPE_ERROR),
- fd(-1),
- address(),
- errorMsg(msg) {}
- explicit EventInfo(EventType et)
- : type(et),
- fd(-1),
- address(),
- errorMsg() {}
-
- EventType type;
- int fd; // valid for TYPE_ACCEPT
- folly::SocketAddress address; // valid for TYPE_ACCEPT
- string errorMsg; // valid for TYPE_ERROR
- };
- typedef std::deque<EventInfo> EventList;
-
- TestAcceptCallback()
- : connectionAcceptedFn_(),
- acceptErrorFn_(),
- acceptStoppedFn_(),
- events_() {}
-
- std::deque<EventInfo>* getEvents() {
- return &events_;
- }
-
- void setConnectionAcceptedFn(
- const std::function<void(int, const folly::SocketAddress&)>& fn) {
- connectionAcceptedFn_ = fn;
- }
- void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
- acceptErrorFn_ = fn;
- }
- void setAcceptStartedFn(const std::function<void()>& fn) {
- acceptStartedFn_ = fn;
- }
- void setAcceptStoppedFn(const std::function<void()>& fn) {
- acceptStoppedFn_ = fn;
- }
-
- void connectionAccepted(
- int fd, const folly::SocketAddress& clientAddr) noexcept override {
- events_.emplace_back(fd, clientAddr);
-
- if (connectionAcceptedFn_) {
- connectionAcceptedFn_(fd, clientAddr);
- }
- }
- void acceptError(const std::exception& ex) noexcept override {
- events_.emplace_back(ex.what());
-
- if (acceptErrorFn_) {
- acceptErrorFn_(ex);
- }
- }
- void acceptStarted() noexcept override {
- events_.emplace_back(TYPE_START);
-
- if (acceptStartedFn_) {
- acceptStartedFn_();
- }
- }
- void acceptStopped() noexcept override {
- events_.emplace_back(TYPE_STOP);
-
- if (acceptStoppedFn_) {
- acceptStoppedFn_();
- }
- }
-
- private:
- std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
- std::function<void(const std::exception&)> acceptErrorFn_;
- std::function<void()> acceptStartedFn_;
- std::function<void()> acceptStoppedFn_;
-
- std::deque<EventInfo> events_;
-};
-}
/**
* Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
// Add a callback to accept one connection then stop the loop
TestAcceptCallback acceptCallback;
acceptCallback.setConnectionAcceptedFn(
- [&](int fd, const folly::SocketAddress& addr) {
- serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
- });
- acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
- serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+ });
+ acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
});
- serverSocket->addAcceptCallback(&acceptCallback, nullptr);
+ serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
serverSocket->startAccepting();
// Connect to the server socket
eventBase.loop();
// Verify that the server accepted a connection
- CHECK_EQ(acceptCallback.getEvents()->size(), 3);
- CHECK_EQ(acceptCallback.getEvents()->at(0).type,
+ ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
+ ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(acceptCallback.getEvents()->at(1).type,
+ ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(acceptCallback.getEvents()->at(2).type,
+ ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
TestAcceptCallback::TYPE_STOP);
int fd = acceptCallback.getEvents()->at(1).fd;
// The accepted connection should already be in non-blocking mode
int flags = fcntl(fd, F_GETFL, 0);
- CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
+ ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
#ifndef TCP_NOPUSH
// The accepted connection should already have TCP_NODELAY set
int value;
socklen_t valueLength = sizeof(value);
int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
- CHECK_EQ(rc, 0);
- CHECK_EQ(value, 1);
+ ASSERT_EQ(rc, 0);
+ ASSERT_EQ(value, 1);
#endif
}
// Have callback 2 remove callback 3 and callback 5 the first time it is
// called.
int cb2Count = 0;
- cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
- std::shared_ptr<AsyncSocket> sock2(
+ cb1.setConnectionAcceptedFn([&](int /* fd */,
+ const folly::SocketAddress& /* addr */) {
+ std::shared_ptr<AsyncSocket> sock2(
AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
+ });
+ cb3.setConnectionAcceptedFn(
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
+ cb4.setConnectionAcceptedFn(
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ std::shared_ptr<AsyncSocket> sock3(
+ AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
+ });
+ cb5.setConnectionAcceptedFn(
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ std::shared_ptr<AsyncSocket> sock5(
+ AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
+
});
- cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
- });
- cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
- std::shared_ptr<AsyncSocket> sock3(
- AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
- });
- cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
- std::shared_ptr<AsyncSocket> sock5(
- AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
-
- });
cb2.setConnectionAcceptedFn(
- [&](int fd, const folly::SocketAddress& addr) {
- if (cb2Count == 0) {
- serverSocket->removeAcceptCallback(&cb3, nullptr);
- serverSocket->removeAcceptCallback(&cb5, nullptr);
- }
- ++cb2Count;
- });
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ if (cb2Count == 0) {
+ serverSocket->removeAcceptCallback(&cb3, nullptr);
+ serverSocket->removeAcceptCallback(&cb5, nullptr);
+ }
+ ++cb2Count;
+ });
// Have callback 6 remove callback 4 the first time it is called,
// and destroy the server socket the second time it is called
int cb6Count = 0;
cb6.setConnectionAcceptedFn(
- [&](int fd, const folly::SocketAddress& addr) {
- if (cb6Count == 0) {
- serverSocket->removeAcceptCallback(&cb4, nullptr);
- std::shared_ptr<AsyncSocket> sock6(
- AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
- std::shared_ptr<AsyncSocket> sock7(
- AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
- std::shared_ptr<AsyncSocket> sock8(
- AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
-
- } else {
- serverSocket.reset();
- }
- ++cb6Count;
- });
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ if (cb6Count == 0) {
+ serverSocket->removeAcceptCallback(&cb4, nullptr);
+ std::shared_ptr<AsyncSocket> sock6(
+ AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
+ std::shared_ptr<AsyncSocket> sock7(
+ AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
+ std::shared_ptr<AsyncSocket> sock8(
+ AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
+
+ } else {
+ serverSocket.reset();
+ }
+ ++cb6Count;
+ });
// Have callback 7 remove itself
cb7.setConnectionAcceptedFn(
- [&](int fd, const folly::SocketAddress& addr) {
- serverSocket->removeAcceptCallback(&cb7, nullptr);
- });
-
- serverSocket->addAcceptCallback(&cb1, nullptr);
- serverSocket->addAcceptCallback(&cb2, nullptr);
- serverSocket->addAcceptCallback(&cb3, nullptr);
- serverSocket->addAcceptCallback(&cb4, nullptr);
- serverSocket->addAcceptCallback(&cb5, nullptr);
- serverSocket->addAcceptCallback(&cb6, nullptr);
- serverSocket->addAcceptCallback(&cb7, nullptr);
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ serverSocket->removeAcceptCallback(&cb7, nullptr);
+ });
+
+ serverSocket->addAcceptCallback(&cb1, &eventBase);
+ serverSocket->addAcceptCallback(&cb2, &eventBase);
+ serverSocket->addAcceptCallback(&cb3, &eventBase);
+ serverSocket->addAcceptCallback(&cb4, &eventBase);
+ serverSocket->addAcceptCallback(&cb5, &eventBase);
+ serverSocket->addAcceptCallback(&cb6, &eventBase);
+ serverSocket->addAcceptCallback(&cb7, &eventBase);
serverSocket->startAccepting();
// Make several connections to the socket
// exactly round robin in the future, we can simplify the test checks here.
// (We'll also need to update the termination code, since we expect cb6 to
// get called twice to terminate the loop.)
- CHECK_EQ(cb1.getEvents()->size(), 4);
- CHECK_EQ(cb1.getEvents()->at(0).type,
+ ASSERT_EQ(cb1.getEvents()->size(), 4);
+ ASSERT_EQ(cb1.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb1.getEvents()->at(1).type,
+ ASSERT_EQ(cb1.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb1.getEvents()->at(2).type,
+ ASSERT_EQ(cb1.getEvents()->at(2).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb1.getEvents()->at(3).type,
+ ASSERT_EQ(cb1.getEvents()->at(3).type,
TestAcceptCallback::TYPE_STOP);
- CHECK_EQ(cb2.getEvents()->size(), 4);
- CHECK_EQ(cb2.getEvents()->at(0).type,
+ ASSERT_EQ(cb2.getEvents()->size(), 4);
+ ASSERT_EQ(cb2.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb2.getEvents()->at(1).type,
+ ASSERT_EQ(cb2.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb2.getEvents()->at(2).type,
+ ASSERT_EQ(cb2.getEvents()->at(2).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb2.getEvents()->at(3).type,
+ ASSERT_EQ(cb2.getEvents()->at(3).type,
TestAcceptCallback::TYPE_STOP);
- CHECK_EQ(cb3.getEvents()->size(), 2);
- CHECK_EQ(cb3.getEvents()->at(0).type,
+ ASSERT_EQ(cb3.getEvents()->size(), 2);
+ ASSERT_EQ(cb3.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb3.getEvents()->at(1).type,
+ ASSERT_EQ(cb3.getEvents()->at(1).type,
TestAcceptCallback::TYPE_STOP);
- CHECK_EQ(cb4.getEvents()->size(), 3);
- CHECK_EQ(cb4.getEvents()->at(0).type,
+ ASSERT_EQ(cb4.getEvents()->size(), 3);
+ ASSERT_EQ(cb4.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb4.getEvents()->at(1).type,
+ ASSERT_EQ(cb4.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb4.getEvents()->at(2).type,
+ ASSERT_EQ(cb4.getEvents()->at(2).type,
TestAcceptCallback::TYPE_STOP);
- CHECK_EQ(cb5.getEvents()->size(), 2);
- CHECK_EQ(cb5.getEvents()->at(0).type,
+ ASSERT_EQ(cb5.getEvents()->size(), 2);
+ ASSERT_EQ(cb5.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb5.getEvents()->at(1).type,
+ ASSERT_EQ(cb5.getEvents()->at(1).type,
TestAcceptCallback::TYPE_STOP);
- CHECK_EQ(cb6.getEvents()->size(), 4);
- CHECK_EQ(cb6.getEvents()->at(0).type,
+ ASSERT_EQ(cb6.getEvents()->size(), 4);
+ ASSERT_EQ(cb6.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb6.getEvents()->at(1).type,
+ ASSERT_EQ(cb6.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb6.getEvents()->at(2).type,
+ ASSERT_EQ(cb6.getEvents()->at(2).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb6.getEvents()->at(3).type,
+ ASSERT_EQ(cb6.getEvents()->at(3).type,
TestAcceptCallback::TYPE_STOP);
- CHECK_EQ(cb7.getEvents()->size(), 3);
- CHECK_EQ(cb7.getEvents()->at(0).type,
+ ASSERT_EQ(cb7.getEvents()->size(), 3);
+ ASSERT_EQ(cb7.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb7.getEvents()->at(1).type,
+ ASSERT_EQ(cb7.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb7.getEvents()->at(2).type,
+ ASSERT_EQ(cb7.getEvents()->at(2).type,
TestAcceptCallback::TYPE_STOP);
}
// Add several accept callbacks
TestAcceptCallback cb1;
- auto thread_id = pthread_self();
+ auto thread_id = std::this_thread::get_id();
cb1.setAcceptStartedFn([&](){
- CHECK_NE(thread_id, pthread_self());
- thread_id = pthread_self();
- });
- cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
- CHECK_EQ(thread_id, pthread_self());
- serverSocket->removeAcceptCallback(&cb1, nullptr);
+ CHECK_NE(thread_id, std::this_thread::get_id());
+ thread_id = std::this_thread::get_id();
});
+ cb1.setConnectionAcceptedFn(
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ ASSERT_EQ(thread_id, std::this_thread::get_id());
+ serverSocket->removeAcceptCallback(&cb1, &eventBase);
+ });
cb1.setAcceptStoppedFn([&](){
- CHECK_EQ(thread_id, pthread_self());
+ ASSERT_EQ(thread_id, std::this_thread::get_id());
});
// Test having callbacks remove other callbacks before them on the list,
- serverSocket->addAcceptCallback(&cb1, nullptr);
+ serverSocket->addAcceptCallback(&cb1, &eventBase);
serverSocket->startAccepting();
// Make several connections to the socket
// exactly round robin in the future, we can simplify the test checks here.
// (We'll also need to update the termination code, since we expect cb6 to
// get called twice to terminate the loop.)
- CHECK_EQ(cb1.getEvents()->size(), 3);
- CHECK_EQ(cb1.getEvents()->at(0).type,
+ ASSERT_EQ(cb1.getEvents()->size(), 3);
+ ASSERT_EQ(cb1.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb1.getEvents()->at(1).type,
+ ASSERT_EQ(cb1.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb1.getEvents()->at(2).type,
+ ASSERT_EQ(cb1.getEvents()->at(2).type,
TestAcceptCallback::TYPE_STOP);
}
void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
+ EventBase* eventBase = serverSocket->getEventBase();
+ CHECK(eventBase);
+
// Add a callback to accept one connection then stop accepting
TestAcceptCallback acceptCallback;
acceptCallback.setConnectionAcceptedFn(
- [&](int fd, const folly::SocketAddress& addr) {
- serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
- });
- acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
- serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
+ });
+ acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
});
- serverSocket->addAcceptCallback(&acceptCallback, nullptr);
+ serverSocket->addAcceptCallback(&acceptCallback, eventBase);
serverSocket->startAccepting();
// Connect to the server socket
- EventBase* eventBase = serverSocket->getEventBase();
folly::SocketAddress serverAddress;
serverSocket->getAddress(&serverAddress);
AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
eventBase->loop();
// Verify that the server accepted a connection
- CHECK_EQ(acceptCallback.getEvents()->size(), 3);
- CHECK_EQ(acceptCallback.getEvents()->at(0).type,
+ ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
+ ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(acceptCallback.getEvents()->at(1).type,
+ ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(acceptCallback.getEvents()->at(2).type,
+ ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
TestAcceptCallback::TYPE_STOP);
}
acceptedSocket.reset();
// Test that server socket was closed
- ssize_t sz = read(fd, simpleBuf, simpleBufLength);
- CHECK_EQ(sz, -1);
- CHECK_EQ(errno, 9);
+ folly::test::msvcSuppressAbortOnInvalidParams([&] {
+ ssize_t sz = read(fd, simpleBuf, simpleBufLength);
+ ASSERT_EQ(sz, -1);
+ ASSERT_EQ(errno, EBADF);
+ });
delete[] simpleBuf;
}
// Test creating a socket, and letting AsyncServerSocket bind and listen
{
// Manually create a socket
- int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
ASSERT_GE(fd, 0);
// Create a server socket
// then letting AsyncServerSocket listen
{
// Manually create a socket
- int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
ASSERT_GE(fd, 0);
// bind
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = 0;
addr.sin_addr.s_addr = INADDR_ANY;
- CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
+ ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
sizeof(addr)), 0);
// Look up the address that we bound to
folly::SocketAddress boundAddress;
// Make sure AsyncServerSocket reports the same address that we bound to
folly::SocketAddress serverSocketAddress;
serverSocket->getAddress(&serverSocketAddress);
- CHECK_EQ(boundAddress, serverSocketAddress);
+ ASSERT_EQ(boundAddress, serverSocketAddress);
// Make sure the socket works
serverSocketSanityTest(serverSocket.get());
// then giving it to AsyncServerSocket
{
// Manually create a socket
- int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
ASSERT_GE(fd, 0);
// bind
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = 0;
addr.sin_addr.s_addr = INADDR_ANY;
- CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
+ ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
sizeof(addr)), 0);
// Look up the address that we bound to
folly::SocketAddress boundAddress;
boundAddress.setFromLocalAddress(fd);
// listen
- CHECK_EQ(listen(fd, 16), 0);
+ ASSERT_EQ(listen(fd, 16), 0);
// Create a server socket
AsyncServerSocket::UniquePtr serverSocket(
// Make sure AsyncServerSocket reports the same address that we bound to
folly::SocketAddress serverSocketAddress;
serverSocket->getAddress(&serverSocketAddress);
- CHECK_EQ(boundAddress, serverSocketAddress);
+ ASSERT_EQ(boundAddress, serverSocketAddress);
// Make sure the socket works
serverSocketSanityTest(serverSocket.get());
std::shared_ptr<AsyncServerSocket> serverSocket(
AsyncServerSocket::newSocket(&eventBase));
string path(1, 0);
- path.append("/anonymous");
+ path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
folly::SocketAddress serverAddress;
serverAddress.setFromPath(path);
serverSocket->bind(serverAddress);
// Add a callback to accept one connection then stop the loop
TestAcceptCallback acceptCallback;
acceptCallback.setConnectionAcceptedFn(
- [&](int fd, const folly::SocketAddress& addr) {
- serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
- });
- acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
- serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+ });
+ acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
});
- serverSocket->addAcceptCallback(&acceptCallback, nullptr);
+ serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
serverSocket->startAccepting();
// Connect to the server socket
eventBase.loop();
// Verify that the server accepted a connection
- CHECK_EQ(acceptCallback.getEvents()->size(), 3);
- CHECK_EQ(acceptCallback.getEvents()->at(0).type,
+ ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
+ ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(acceptCallback.getEvents()->at(1).type,
+ ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(acceptCallback.getEvents()->at(2).type,
+ ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
TestAcceptCallback::TYPE_STOP);
int fd = acceptCallback.getEvents()->at(1).fd;
// The accepted connection should already be in non-blocking mode
int flags = fcntl(fd, F_GETFL, 0);
- CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
+ ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
}
TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
// Add a callback to accept one connection then stop the loop
TestAcceptCallback acceptCallback;
acceptCallback.setConnectionAcceptedFn(
- [&](int fd, const folly::SocketAddress& addr) {
- serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
- });
- acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+ });
+ acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
});
- serverSocket->addAcceptCallback(&acceptCallback, nullptr);
+ serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
serverSocket->startAccepting();
// Connect to the server socket
ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
}
+
+TEST(AsyncSocketTest, CallbackInPrimaryEventBase) {
+ EventBase eventBase;
+ TestConnectionEventCallback connectionEventCallback;
+
+ // Create a server socket
+ std::shared_ptr<AsyncServerSocket> serverSocket(
+ AsyncServerSocket::newSocket(&eventBase));
+ serverSocket->setConnectionEventCallback(&connectionEventCallback);
+ serverSocket->bind(0);
+ serverSocket->listen(16);
+ folly::SocketAddress serverAddress;
+ serverSocket->getAddress(&serverAddress);
+
+ // Add a callback to accept one connection then stop the loop
+ TestAcceptCallback acceptCallback;
+ acceptCallback.setConnectionAcceptedFn(
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+ });
+ acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
+ });
+ bool acceptStartedFlag{false};
+ acceptCallback.setAcceptStartedFn([&acceptStartedFlag](){
+ acceptStartedFlag = true;
+ });
+ bool acceptStoppedFlag{false};
+ acceptCallback.setAcceptStoppedFn([&acceptStoppedFlag](){
+ acceptStoppedFlag = true;
+ });
+ serverSocket->addAcceptCallback(&acceptCallback, nullptr);
+ serverSocket->startAccepting();
+
+ // Connect to the server socket
+ std::shared_ptr<AsyncSocket> socket(
+ AsyncSocket::newSocket(&eventBase, serverAddress));
+
+ eventBase.loop();
+
+ ASSERT_TRUE(acceptStartedFlag);
+ ASSERT_TRUE(acceptStoppedFlag);
+ // Validate the connection event counters
+ ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
+ ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
+ ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
+ ASSERT_EQ(
+ connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0);
+ ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0);
+ ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
+ ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
+ ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
+}
+
+
+
+/**
+ * Test AsyncServerSocket::getNumPendingMessagesInQueue()
+ */
+TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
+ EventBase eventBase;
+
+ // Counter of how many connections have been accepted
+ int count = 0;
+
+ // Create a server socket
+ auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
+ serverSocket->bind(0);
+ serverSocket->listen(16);
+ folly::SocketAddress serverAddress;
+ serverSocket->getAddress(&serverAddress);
+
+ // Add a callback to accept connections
+ TestAcceptCallback acceptCallback;
+ acceptCallback.setConnectionAcceptedFn(
+ [&](int /* fd */, const folly::SocketAddress& /* addr */) {
+ count++;
+ ASSERT_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
+
+ if (count == 4) {
+ // all messages are processed, remove accept callback
+ serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+ }
+ });
+ acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
+ serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
+ });
+ serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
+ serverSocket->startAccepting();
+
+ // Connect to the server socket, 4 clients, there are 4 connections
+ auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
+ auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
+ auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
+ auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
+
+ eventBase.loop();
+}
+
+/**
+ * Test AsyncTransport::BufferCallback
+ */
+TEST(AsyncSocketTest, BufferTest) {
+ TestServer server;
+
+ EventBase evb;
+ AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30, option);
+
+ char buf[100 * 1024];
+ memset(buf, 'c', sizeof(buf));
+ WriteCallback wcb;
+ BufferCallback bcb;
+ socket->setBufferCallback(&bcb);
+ socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
+
+ evb.loop();
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+
+ ASSERT_TRUE(bcb.hasBuffered());
+ ASSERT_TRUE(bcb.hasBufferCleared());
+
+ socket->close();
+ server.verifyConnection(buf, sizeof(buf));
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+TEST(AsyncSocketTest, BufferCallbackKill) {
+ TestServer server;
+ EventBase evb;
+ AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30, option);
+ evb.loopOnce();
+
+ char buf[100 * 1024];
+ memset(buf, 'c', sizeof(buf));
+ BufferCallback bcb;
+ socket->setBufferCallback(&bcb);
+ WriteCallback wcb;
+ wcb.successCallback = [&] {
+ ASSERT_TRUE(socket.unique());
+ socket.reset();
+ };
+
+ // This will trigger AsyncSocket::handleWrite,
+ // which calls WriteCallback::writeSuccess,
+ // which calls wcb.successCallback above,
+ // which tries to delete socket
+ // Then, the socket will also try to use this BufferCallback
+ // And that should crash us, if there is no DestructorGuard on the stack
+ socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
+
+ evb.loop();
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+}
+
+#if FOLLY_ALLOW_TFO
+TEST(AsyncSocketTest, ConnectTFO) {
+ // Start listening on a local port
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->enableTFO();
+ ConnCallback cb;
+ socket->connect(&cb, server.getAddress(), 30);
+
+ std::array<uint8_t, 128> buf;
+ memset(buf.data(), 'a', buf.size());
+
+ std::array<uint8_t, 3> readBuf;
+ auto sendBuf = IOBuf::copyBuffer("hey");
+
+ std::thread t([&] {
+ auto acceptedSocket = server.accept();
+ acceptedSocket->write(buf.data(), buf.size());
+ acceptedSocket->flush();
+ acceptedSocket->readAll(readBuf.data(), readBuf.size());
+ acceptedSocket->close();
+ });
+
+ evb.loop();
+
+ ASSERT_EQ(cb.state, STATE_SUCCEEDED);
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+ EXPECT_TRUE(socket->getTFOAttempted());
+
+ // Should trigger the connect
+ WriteCallback write;
+ ReadCallback rcb;
+ socket->writeChain(&write, sendBuf->clone());
+ socket->setReadCB(&rcb);
+ evb.loop();
+
+ t.join();
+
+ EXPECT_EQ(STATE_SUCCEEDED, write.state);
+ EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+ EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+ ASSERT_EQ(1, rcb.buffers.size());
+ ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+ EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+ EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
+}
+
+TEST(AsyncSocketTest, ConnectTFOSupplyEarlyReadCB) {
+ // Start listening on a local port
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->enableTFO();
+ ConnCallback cb;
+ socket->connect(&cb, server.getAddress(), 30);
+ ReadCallback rcb;
+ socket->setReadCB(&rcb);
+
+ std::array<uint8_t, 128> buf;
+ memset(buf.data(), 'a', buf.size());
+
+ std::array<uint8_t, 3> readBuf;
+ auto sendBuf = IOBuf::copyBuffer("hey");
+
+ std::thread t([&] {
+ auto acceptedSocket = server.accept();
+ acceptedSocket->write(buf.data(), buf.size());
+ acceptedSocket->flush();
+ acceptedSocket->readAll(readBuf.data(), readBuf.size());
+ acceptedSocket->close();
+ });
+
+ evb.loop();
+
+ ASSERT_EQ(cb.state, STATE_SUCCEEDED);
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+ EXPECT_TRUE(socket->getTFOAttempted());
+
+ // Should trigger the connect
+ WriteCallback write;
+ socket->writeChain(&write, sendBuf->clone());
+ evb.loop();
+
+ t.join();
+
+ EXPECT_EQ(STATE_SUCCEEDED, write.state);
+ EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+ EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+ ASSERT_EQ(1, rcb.buffers.size());
+ ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+ EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+ EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
+}
+
+/**
+ * Test connecting to a server that isn't listening
+ */
+TEST(AsyncSocketTest, ConnectRefusedImmediatelyTFO) {
+ EventBase evb;
+
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+ socket->enableTFO();
+
+ // Hopefully nothing is actually listening on this address
+ folly::SocketAddress addr("::1", 65535);
+ ConnCallback cb;
+ socket->connect(&cb, addr, 30);
+
+ evb.loop();
+
+ WriteCallback write1;
+ // Trigger the connect if TFO attempt is supported.
+ socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
+ WriteCallback write2;
+ socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
+ evb.loop();
+
+ if (!socket->getTFOFinished()) {
+ EXPECT_EQ(STATE_FAILED, write1.state);
+ } else {
+ EXPECT_EQ(STATE_SUCCEEDED, write1.state);
+ EXPECT_FALSE(socket->getTFOSucceded());
+ }
+
+ EXPECT_EQ(STATE_FAILED, write2.state);
+
+ EXPECT_EQ(STATE_SUCCEEDED, cb.state);
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
+ EXPECT_TRUE(socket->getTFOAttempted());
+}
+
+/**
+ * Test calling closeNow() immediately after connecting.
+ */
+TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
+ TestServer server(true);
+
+ // connect()
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+
+ // write()
+ std::array<char, 128> buf;
+ memset(buf.data(), 'a', buf.size());
+
+ // close()
+ socket->closeNow();
+
+ // Loop, although there shouldn't be anything to do.
+ evb.loop();
+
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+/**
+ * Test calling close() immediately after connect()
+ */
+TEST(AsyncSocketTest, ConnectAndCloseTFO) {
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+
+ socket->close();
+
+ // Loop, although there shouldn't be anything to do.
+ evb.loop();
+
+ // Make sure the connection was aborted
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+class MockAsyncTFOSocket : public AsyncSocket {
+ public:
+ using UniquePtr = std::unique_ptr<MockAsyncTFOSocket, Destructor>;
+
+ explicit MockAsyncTFOSocket(EventBase* evb) : AsyncSocket(evb) {}
+
+ MOCK_METHOD3(tfoSendMsg, ssize_t(int fd, struct msghdr* msg, int msg_flags));
+};
+
+TEST(AsyncSocketTest, TestTFOUnsupported) {
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+ ReadCallback rcb;
+ socket->setReadCB(&rcb);
+
+ EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+ .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
+ WriteCallback write;
+ auto sendBuf = IOBuf::copyBuffer("hey");
+ socket->writeChain(&write, sendBuf->clone());
+ EXPECT_EQ(STATE_WAITING, write.state);
+
+ std::array<uint8_t, 128> buf;
+ memset(buf.data(), 'a', buf.size());
+
+ std::array<uint8_t, 3> readBuf;
+
+ std::thread t([&] {
+ std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+ acceptedSocket->write(buf.data(), buf.size());
+ acceptedSocket->flush();
+ acceptedSocket->readAll(readBuf.data(), readBuf.size());
+ acceptedSocket->close();
+ });
+
+ evb.loop();
+
+ t.join();
+ EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+ EXPECT_EQ(STATE_SUCCEEDED, write.state);
+
+ EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+ EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+ ASSERT_EQ(1, rcb.buffers.size());
+ ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+ EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+ EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
+}
+
+TEST(AsyncSocketTest, ConnectRefusedDelayedTFO) {
+ EventBase evb;
+
+ auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+ socket->enableTFO();
+
+ // Hopefully this fails
+ folly::SocketAddress fakeAddr("127.0.0.1", 65535);
+ EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+ .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
+ sockaddr_storage addr;
+ auto len = fakeAddr.getAddress(&addr);
+ int ret = connect(fd, (const struct sockaddr*)&addr, len);
+ LOG(INFO) << "connecting the socket " << fd << " : " << ret << " : "
+ << errno;
+ return ret;
+ }));
+
+ // Hopefully nothing is actually listening on this address
+ ConnCallback cb;
+ socket->connect(&cb, fakeAddr, 30);
+
+ WriteCallback write1;
+ // Trigger the connect if TFO attempt is supported.
+ socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
+
+ if (socket->getTFOFinished()) {
+ // This test is useless now.
+ return;
+ }
+ WriteCallback write2;
+ // Trigger the connect if TFO attempt is supported.
+ socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
+ evb.loop();
+
+ EXPECT_EQ(STATE_FAILED, write1.state);
+ EXPECT_EQ(STATE_FAILED, write2.state);
+ EXPECT_FALSE(socket->getTFOSucceded());
+
+ EXPECT_EQ(STATE_SUCCEEDED, cb.state);
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
+ EXPECT_TRUE(socket->getTFOAttempted());
+}
+
+TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
+ // Try connecting to server that won't respond.
+ //
+ // This depends somewhat on the network where this test is run.
+ // Hopefully this IP will be routable but unresponsive.
+ // (Alternatively, we could try listening on a local raw socket, but that
+ // normally requires root privileges.)
+ auto host = SocketAddressTestHelper::isIPv6Enabled()
+ ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
+ : SocketAddressTestHelper::isIPv4Enabled()
+ ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
+ : nullptr;
+ SocketAddress addr(host, 65535);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ // Set a very small timeout
+ socket->connect(&ccb, addr, 1);
+ EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+
+ ReadCallback rcb;
+ socket->setReadCB(&rcb);
+
+ EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+ .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
+ WriteCallback write;
+ socket->writeChain(&write, IOBuf::copyBuffer("hey"));
+
+ evb.loop();
+
+ EXPECT_EQ(STATE_FAILED, write.state);
+}
+
+TEST(AsyncSocketTest, TestTFOFallbackToConnect) {
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+ ReadCallback rcb;
+ socket->setReadCB(&rcb);
+
+ EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+ .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
+ sockaddr_storage addr;
+ auto len = server.getAddress().getAddress(&addr);
+ return connect(fd, (const struct sockaddr*)&addr, len);
+ }));
+ WriteCallback write;
+ auto sendBuf = IOBuf::copyBuffer("hey");
+ socket->writeChain(&write, sendBuf->clone());
+ EXPECT_EQ(STATE_WAITING, write.state);
+
+ std::array<uint8_t, 128> buf;
+ memset(buf.data(), 'a', buf.size());
+
+ std::array<uint8_t, 3> readBuf;
+
+ std::thread t([&] {
+ std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+ acceptedSocket->write(buf.data(), buf.size());
+ acceptedSocket->flush();
+ acceptedSocket->readAll(readBuf.data(), readBuf.size());
+ acceptedSocket->close();
+ });
+
+ evb.loop();
+
+ t.join();
+ EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+
+ EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+ EXPECT_EQ(STATE_SUCCEEDED, write.state);
+
+ EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+ ASSERT_EQ(1, rcb.buffers.size());
+ ASSERT_EQ(buf.size(), rcb.buffers[0].length);
+ EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+}
+
+TEST(AsyncSocketTest, TestTFOFallbackTimeout) {
+ // Try connecting to server that won't respond.
+ //
+ // This depends somewhat on the network where this test is run.
+ // Hopefully this IP will be routable but unresponsive.
+ // (Alternatively, we could try listening on a local raw socket, but that
+ // normally requires root privileges.)
+ auto host = SocketAddressTestHelper::isIPv6Enabled()
+ ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
+ : SocketAddressTestHelper::isIPv4Enabled()
+ ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
+ : nullptr;
+ SocketAddress addr(host, 65535);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ // Set a very small timeout
+ socket->connect(&ccb, addr, 1);
+ EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+
+ ReadCallback rcb;
+ socket->setReadCB(&rcb);
+
+ EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+ .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
+ sockaddr_storage addr2;
+ auto len = addr.getAddress(&addr2);
+ return connect(fd, (const struct sockaddr*)&addr2, len);
+ }));
+ WriteCallback write;
+ socket->writeChain(&write, IOBuf::copyBuffer("hey"));
+
+ evb.loop();
+
+ EXPECT_EQ(STATE_FAILED, write.state);
+}
+
+TEST(AsyncSocketTest, TestTFOEagain) {
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+ socket->enableTFO();
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+
+ EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+ .WillOnce(SetErrnoAndReturn(EAGAIN, -1));
+ WriteCallback write;
+ socket->writeChain(&write, IOBuf::copyBuffer("hey"));
+
+ evb.loop();
+
+ EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
+ EXPECT_EQ(STATE_FAILED, write.state);
+}
+
+// Sending a large amount of data in the first write which will
+// definitely not fit into MSS.
+TEST(AsyncSocketTest, ConnectTFOWithBigData) {
+ // Start listening on a local port
+ TestServer server(true);
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->enableTFO();
+ ConnCallback cb;
+ socket->connect(&cb, server.getAddress(), 30);
+
+ std::array<uint8_t, 128> buf;
+ memset(buf.data(), 'a', buf.size());
+
+ constexpr size_t len = 10 * 1024;
+ auto sendBuf = IOBuf::create(len);
+ sendBuf->append(len);
+ std::array<uint8_t, len> readBuf;
+
+ std::thread t([&] {
+ auto acceptedSocket = server.accept();
+ acceptedSocket->write(buf.data(), buf.size());
+ acceptedSocket->flush();
+ acceptedSocket->readAll(readBuf.data(), readBuf.size());
+ acceptedSocket->close();
+ });
+
+ evb.loop();
+
+ ASSERT_EQ(cb.state, STATE_SUCCEEDED);
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+ EXPECT_TRUE(socket->getTFOAttempted());
+
+ // Should trigger the connect
+ WriteCallback write;
+ ReadCallback rcb;
+ socket->writeChain(&write, sendBuf->clone());
+ socket->setReadCB(&rcb);
+ evb.loop();
+
+ t.join();
+
+ EXPECT_EQ(STATE_SUCCEEDED, write.state);
+ EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
+ EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
+ ASSERT_EQ(1, rcb.buffers.size());
+ ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
+ EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
+ EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
+}
+
+#endif // FOLLY_ALLOW_TFO
+
+class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback {
+ public:
+ MOCK_METHOD1(evbAttached, void(AsyncSocket*));
+ MOCK_METHOD1(evbDetached, void(AsyncSocket*));
+};
+
+TEST(AsyncSocketTest, EvbCallbacks) {
+ auto cb = folly::make_unique<MockEvbChangeCallback>();
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+ InSequence seq;
+ EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1);
+ EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1);
+
+ socket->setEvbChangedCallback(std::move(cb));
+ socket->detachEventBase();
+ socket->attachEventBase(&evb);
+}
+
+#ifdef MSG_ERRQUEUE
+/* copied from include/uapi/linux/net_tstamp.h */
+/* SO_TIMESTAMPING gets an integer bit field comprised of these values */
+enum SOF_TIMESTAMPING {
+ SOF_TIMESTAMPING_SOFTWARE = (1 << 4),
+ SOF_TIMESTAMPING_OPT_ID = (1 << 7),
+ SOF_TIMESTAMPING_TX_SCHED = (1 << 8),
+ SOF_TIMESTAMPING_OPT_CMSG = (1 << 10),
+ SOF_TIMESTAMPING_OPT_TSONLY = (1 << 11),
+};
+TEST(AsyncSocketTest, ErrMessageCallback) {
+ TestServer server;
+
+ // connect()
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+ LOG(INFO) << "Client socket fd=" << socket->getFd();
+
+ // Let the socket
+ evb.loop();
+
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+ // Set read callback to keep the socket subscribed for event
+ // notifications. Though we're no planning to read anything from
+ // this side of the connection.
+ ReadCallback rcb(1);
+ socket->setReadCB(&rcb);
+
+ // Set up timestamp callbacks
+ TestErrMessageCallback errMsgCB;
+ socket->setErrMessageCB(&errMsgCB);
+ ASSERT_EQ(socket->getErrMessageCallback(),
+ static_cast<folly::AsyncSocket::ErrMessageCallback*>(&errMsgCB));
+
+ // Enable timestamp notifications
+ ASSERT_GT(socket->getFd(), 0);
+ int flags = SOF_TIMESTAMPING_OPT_ID
+ | SOF_TIMESTAMPING_OPT_TSONLY
+ | SOF_TIMESTAMPING_SOFTWARE
+ | SOF_TIMESTAMPING_OPT_CMSG
+ | SOF_TIMESTAMPING_TX_SCHED;
+ AsyncSocket::OptionKey tstampingOpt = {SOL_SOCKET, SO_TIMESTAMPING};
+ EXPECT_EQ(tstampingOpt.apply(socket->getFd(), flags), 0);
+
+ // write()
+ std::vector<uint8_t> wbuf(128, 'a');
+ WriteCallback wcb;
+ socket->write(&wcb, wbuf.data(), wbuf.size());
+
+ // Accept the connection.
+ std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+ LOG(INFO) << "Server socket fd=" << acceptedSocket->getSocketFD();
+
+ // Loop
+ evb.loopOnce();
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+
+ // Check that we can read the data that was written to the socket
+ std::vector<uint8_t> rbuf(1 + wbuf.size(), 0);
+ uint32_t bytesRead = acceptedSocket->read(rbuf.data(), rbuf.size());
+ ASSERT_TRUE(std::equal(wbuf.begin(), wbuf.end(), rbuf.begin()));
+ ASSERT_EQ(bytesRead, wbuf.size());
+
+ // Close both sockets
+ acceptedSocket->close();
+ socket->close();
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
+
+ // Check for the timestamp notifications.
+ ASSERT_EQ(errMsgCB.exception_.type_, folly::AsyncSocketException::UNKNOWN);
+ ASSERT_TRUE(errMsgCB.gotByteSeq_);
+ ASSERT_TRUE(errMsgCB.gotTimestamp_);
+}
+#endif // MSG_ERRQUEUE
+
+TEST(AsyncSocket, PreReceivedData) {
+ TestServer server;
+
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->connect(nullptr, server.getAddress(), 30);
+ evb.loop();
+
+ socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+ auto acceptedSocket = server.acceptAsync(&evb);
+
+ ReadCallback peekCallback(2);
+ ReadCallback readCallback;
+ peekCallback.dataAvailableCallback = [&]() {
+ peekCallback.verifyData("he", 2);
+ acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("h"));
+ acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("e"));
+ acceptedSocket->setReadCB(nullptr);
+ acceptedSocket->setReadCB(&readCallback);
+ };
+ readCallback.dataAvailableCallback = [&]() {
+ if (readCallback.dataRead() == 5) {
+ readCallback.verifyData("hello", 5);
+ acceptedSocket->setReadCB(nullptr);
+ }
+ };
+
+ acceptedSocket->setReadCB(&peekCallback);
+
+ evb.loop();
+}
+
+TEST(AsyncSocket, PreReceivedDataOnly) {
+ TestServer server;
+
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->connect(nullptr, server.getAddress(), 30);
+ evb.loop();
+
+ socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+ auto acceptedSocket = server.acceptAsync(&evb);
+
+ ReadCallback peekCallback;
+ ReadCallback readCallback;
+ peekCallback.dataAvailableCallback = [&]() {
+ peekCallback.verifyData("hello", 5);
+ acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
+ acceptedSocket->setReadCB(&readCallback);
+ };
+ readCallback.dataAvailableCallback = [&]() {
+ readCallback.verifyData("hello", 5);
+ acceptedSocket->setReadCB(nullptr);
+ };
+
+ acceptedSocket->setReadCB(&peekCallback);
+
+ evb.loop();
+}
+
+TEST(AsyncSocket, PreReceivedDataPartial) {
+ TestServer server;
+
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->connect(nullptr, server.getAddress(), 30);
+ evb.loop();
+
+ socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+ auto acceptedSocket = server.acceptAsync(&evb);
+
+ ReadCallback peekCallback;
+ ReadCallback smallReadCallback(3);
+ ReadCallback normalReadCallback;
+ peekCallback.dataAvailableCallback = [&]() {
+ peekCallback.verifyData("hello", 5);
+ acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
+ acceptedSocket->setReadCB(&smallReadCallback);
+ };
+ smallReadCallback.dataAvailableCallback = [&]() {
+ smallReadCallback.verifyData("hel", 3);
+ acceptedSocket->setReadCB(&normalReadCallback);
+ };
+ normalReadCallback.dataAvailableCallback = [&]() {
+ normalReadCallback.verifyData("lo", 2);
+ acceptedSocket->setReadCB(nullptr);
+ };
+
+ acceptedSocket->setReadCB(&peekCallback);
+
+ evb.loop();
+}
+
+TEST(AsyncSocket, PreReceivedDataTakeover) {
+ TestServer server;
+
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->connect(nullptr, server.getAddress(), 30);
+ evb.loop();
+
+ socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+ auto acceptedSocket =
+ AsyncSocket::UniquePtr(new AsyncSocket(&evb, server.acceptFD()));
+ AsyncSocket::UniquePtr takeoverSocket;
+
+ ReadCallback peekCallback(3);
+ ReadCallback readCallback;
+ peekCallback.dataAvailableCallback = [&]() {
+ peekCallback.verifyData("hel", 3);
+ acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
+ acceptedSocket->setReadCB(nullptr);
+ takeoverSocket =
+ AsyncSocket::UniquePtr(new AsyncSocket(std::move(acceptedSocket)));
+ takeoverSocket->setReadCB(&readCallback);
+ };
+ readCallback.dataAvailableCallback = [&]() {
+ readCallback.verifyData("hello", 5);
+ takeoverSocket->setReadCB(nullptr);
+ };
+
+ acceptedSocket->setReadCB(&peekCallback);
+
+ evb.loop();
+}
+
+TEST(AsyncSocketTest, SendMessageFlags) {
+ TestServer server;
+ TestSendMsgParamsCallback sendMsgCB(
+ MSG_DONTWAIT|MSG_NOSIGNAL|MSG_MORE, 0, nullptr);
+
+ // connect()
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+ std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+
+ evb.loop();
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+ // Set SendMsgParamsCallback
+ socket->setSendMsgParamCB(&sendMsgCB);
+ ASSERT_EQ(socket->getSendMsgParamsCB(), &sendMsgCB);
+
+ // Write the first portion of data. This data is expected to be
+ // sent out immediately.
+ std::vector<uint8_t> buf(128, 'a');
+ WriteCallback wcb;
+ sendMsgCB.reset(MSG_DONTWAIT | MSG_NOSIGNAL);
+ socket->write(&wcb, buf.data(), buf.size());
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_TRUE(sendMsgCB.queriedFlags_);
+ ASSERT_FALSE(sendMsgCB.queriedData_);
+
+ // Using different flags for the second write operation.
+ // MSG_MORE flag is expected to delay sending this
+ // data to the wire.
+ sendMsgCB.reset(MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE);
+ socket->write(&wcb, buf.data(), buf.size());
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_TRUE(sendMsgCB.queriedFlags_);
+ ASSERT_FALSE(sendMsgCB.queriedData_);
+
+ // Make sure the accepted socket saw only the data from
+ // the first write request.
+ std::vector<uint8_t> readbuf(2 * buf.size());
+ uint32_t bytesRead = acceptedSocket->read(readbuf.data(), readbuf.size());
+ ASSERT_TRUE(std::equal(buf.begin(), buf.end(), readbuf.begin()));
+ ASSERT_EQ(bytesRead, buf.size());
+
+ // Make sure the server got a connection and received the data
+ acceptedSocket->close();
+ socket->close();
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+TEST(AsyncSocketTest, SendMessageAncillaryData) {
+ struct sockaddr_un addr = {AF_UNIX,
+ "AsyncSocketTest.SendMessageAncillaryData\0"};
+
+ // Clean up the name in the name space we're going to use
+ ASSERT_FALSE(remove(addr.sun_path) == -1 && errno != ENOENT);
+
+ // Set up listening socket
+ int lfd = fsp::socket(AF_UNIX, SOCK_STREAM, 0);
+ ASSERT_NE(lfd, -1);
+ ASSERT_NE(bind(lfd, (struct sockaddr*)&addr, sizeof(addr)), -1)
+ << "Bind failed: " << errno;
+
+ // Create the connecting socket
+ int csd = fsp::socket(AF_UNIX, SOCK_STREAM, 0);
+ ASSERT_NE(csd, -1);
+
+ // Listen for incoming connect
+ ASSERT_NE(listen(lfd, 5), -1);
+
+ // Connect to the listening socket
+ ASSERT_NE(fsp::connect(csd, (struct sockaddr*)&addr, sizeof(addr)), -1)
+ << "Connect request failed: " << errno;
+
+ // Accept the connection
+ int sfd = accept(lfd, nullptr, nullptr);
+ ASSERT_NE(sfd, -1);
+
+ // Instantiate AsyncSocket object for the connected socket
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, csd);
+
+ // Open a temporary file and write a magic string to it
+ // We'll transfer the file handle to test the message parameters
+ // callback logic.
+ int tmpfd = open("/var/tmp", O_RDWR | O_TMPFILE);
+ ASSERT_NE(tmpfd, -1) << "Failed to open a temporary file";
+ std::string magicString("Magic string");
+ ASSERT_EQ(write(tmpfd, magicString.c_str(), magicString.length()),
+ magicString.length());
+
+ // Send message
+ union {
+ // Space large enough to hold an 'int'
+ char control[CMSG_SPACE(sizeof(int))];
+ struct cmsghdr cmh;
+ } s_u;
+ s_u.cmh.cmsg_len = CMSG_LEN(sizeof(int));
+ s_u.cmh.cmsg_level = SOL_SOCKET;
+ s_u.cmh.cmsg_type = SCM_RIGHTS;
+ memcpy(CMSG_DATA(&s_u.cmh), &tmpfd, sizeof(int));
+
+ // Set up the callback providing message parameters
+ TestSendMsgParamsCallback sendMsgCB(
+ MSG_DONTWAIT | MSG_NOSIGNAL, sizeof(s_u.control), s_u.control);
+ socket->setSendMsgParamCB(&sendMsgCB);
+
+ // We must transmit at least 1 byte of real data in order
+ // to send ancillary data
+ int s_data = 12345;
+ WriteCallback wcb;
+ socket->write(&wcb, &s_data, sizeof(s_data));
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+
+ // Receive the message
+ union {
+ // Space large enough to hold an 'int'
+ char control[CMSG_SPACE(sizeof(int))];
+ struct cmsghdr cmh;
+ } r_u;
+ struct msghdr msgh;
+ struct iovec iov;
+ int r_data = 0;
+
+ msgh.msg_control = r_u.control;
+ msgh.msg_controllen = sizeof(r_u.control);
+ msgh.msg_name = nullptr;
+ msgh.msg_namelen = 0;
+ msgh.msg_iov = &iov;
+ msgh.msg_iovlen = 1;
+ iov.iov_base = &r_data;
+ iov.iov_len = sizeof(r_data);
+
+ // Receive data
+ ASSERT_NE(recvmsg(sfd, &msgh, 0), -1) << "recvmsg failed: " << errno;
+
+ // Validate the received message
+ ASSERT_EQ(r_u.cmh.cmsg_len, CMSG_LEN(sizeof(int)));
+ ASSERT_EQ(r_u.cmh.cmsg_level, SOL_SOCKET);
+ ASSERT_EQ(r_u.cmh.cmsg_type, SCM_RIGHTS);
+ ASSERT_EQ(r_data, s_data);
+ int fd = 0;
+ memcpy(&fd, CMSG_DATA(&r_u.cmh), sizeof(int));
+ ASSERT_NE(fd, 0);
+
+ std::vector<uint8_t> transferredMagicString(magicString.length() + 1, 0);
+
+ // Reposition to the beginning of the file
+ ASSERT_EQ(0, lseek(fd, 0, SEEK_SET));
+
+ // Read the magic string back, and compare it with the original
+ ASSERT_EQ(
+ magicString.length(),
+ read(fd, transferredMagicString.data(), transferredMagicString.size()));
+ ASSERT_TRUE(std::equal(
+ magicString.begin(),
+ magicString.end(),
+ transferredMagicString.begin()));
+}