/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <folly/io/async/AsyncTimeout.h>
#include <folly/io/async/EventBase.h>
+#include <folly/experimental/TestUtil.h>
#include <folly/io/IOBuf.h>
#include <folly/io/async/test/AsyncSocketTest.h>
#include <folly/io/async/test/Util.h>
evb.loop();
- CHECK_EQ(cb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(cb.state, STATE_SUCCEEDED);
EXPECT_LE(0, socket->getConnectTime().count());
EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
}
evb.loop();
- CHECK_EQ(cb.state, STATE_FAILED);
- CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
+ ASSERT_EQ(cb.state, STATE_FAILED);
+ ASSERT_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
// Verify that we can still get the peer address after a timeout.
// Use case is if the client was created from a client pool, and we want
// to log which peer failed.
folly::SocketAddress peer;
socket->getPeerAddress(&peer);
- CHECK_EQ(peer, addr);
+ ASSERT_EQ(peer, addr);
EXPECT_LE(0, socket->getConnectTime().count());
EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
}
// The kernel should be able to buffer the write request so it can succeed.
evb.loop();
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
// Make sure the server got a connection and received the data
socket->close();
evb.loop();
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
// Make sure the server got a connection and received the data
socket->close();
// The kernel should be able to buffer the write request so it can succeed.
evb.loop();
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
// Make sure the server got a connection and received the data
server.verifyConnection(buf, sizeof(buf));
evb.loop();
// Make sure the connection was aborted
- CHECK_EQ(ccb.state, STATE_FAILED);
+ ASSERT_EQ(ccb.state, STATE_FAILED);
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
evb.loop();
// Make sure the connection was aborted
- CHECK_EQ(ccb.state, STATE_FAILED);
+ ASSERT_EQ(ccb.state, STATE_FAILED);
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
// Loop, although there shouldn't be anything to do.
evb.loop();
- CHECK_EQ(ccb.state, STATE_FAILED);
- CHECK_EQ(wcb.state, STATE_FAILED);
+ ASSERT_EQ(ccb.state, STATE_FAILED);
+ ASSERT_EQ(wcb.state, STATE_FAILED);
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
// Loop, although there shouldn't be anything to do.
evb.loop();
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.buffers.size(), 1);
- CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
- CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.buffers.size(), 1);
+ ASSERT_EQ(rcb.buffers[0].length, sizeof(buf));
+ ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
ASSERT_FALSE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
// Loop, although there shouldn't be anything to do.
evb.loop();
- CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
- CHECK_EQ(rcb.buffers.size(), 0);
- CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
+ ASSERT_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
+ ASSERT_EQ(rcb.buffers.size(), 0);
+ ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
evb.loop();
// Make sure the connect succeeded
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
// Make sure the AsyncSocket read the data written by the accepted socket
- CHECK_EQ(rcb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.buffers.size(), 1);
- CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
- CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
+ ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.buffers.size(), 1);
+ ASSERT_EQ(rcb.buffers[0].length, sizeof(buf2));
+ ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
// Close the AsyncSocket so we'll see EOF on acceptedSocket
socket->close();
// Make sure the accepted socket saw the data written by the AsyncSocket
uint8_t readbuf[sizeof(buf1)];
acceptedSocket->readAll(readbuf, sizeof(readbuf));
- CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
+ ASSERT_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
- CHECK_EQ(bytesRead, 0);
+ ASSERT_EQ(bytesRead, 0);
ASSERT_FALSE(socket->isClosedBySelf());
ASSERT_TRUE(socket->isClosedByPeer());
fds[0].events = POLLIN;
fds[0].revents = 0;
int rc = poll(fds, 1, 0);
- CHECK_EQ(rc, 0);
+ ASSERT_EQ(rc, 0);
// Write data to the accepted socket
uint8_t acceptedWbuf[192];
//
// Check that the connection was completed successfully and that the write
// callback succeeded.
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
// Check that we can read the data that was written to the socket, and that
// we see an EOF, since its socket was half-shutdown.
uint8_t readbuf[sizeof(wbuf)];
acceptedSocket->readAll(readbuf, sizeof(readbuf));
- CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
+ ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
- CHECK_EQ(bytesRead, 0);
+ ASSERT_EQ(bytesRead, 0);
// Close the accepted socket. This will cause it to see EOF
// and uninstall the read callback when we loop next.
evb.loop();
// This loop should have read the data and seen the EOF
- CHECK_EQ(rcb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.buffers.size(), 1);
- CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
- CHECK_EQ(memcmp(rcb.buffers[0].buffer,
+ ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.buffers.size(), 1);
+ ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
+ ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
acceptedWbuf, sizeof(acceptedWbuf)), 0);
ASSERT_FALSE(socket->isClosedBySelf());
fds[0].events = POLLIN;
fds[0].revents = 0;
int rc = poll(fds, 1, 0);
- CHECK_EQ(rc, 0);
+ ASSERT_EQ(rc, 0);
// Write data to the accepted socket
uint8_t acceptedWbuf[192];
//
// Check that the connection was completed successfully and that the read
// and write callbacks were invoked as expected.
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.buffers.size(), 1);
- CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
- CHECK_EQ(memcmp(rcb.buffers[0].buffer,
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.buffers.size(), 1);
+ ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
+ ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
acceptedWbuf, sizeof(acceptedWbuf)), 0);
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
// Check that we can read the data that was written to the socket, and that
// we see an EOF, since its socket was half-shutdown.
uint8_t readbuf[sizeof(wbuf)];
acceptedSocket->readAll(readbuf, sizeof(readbuf));
- CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
+ ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
- CHECK_EQ(bytesRead, 0);
+ ASSERT_EQ(bytesRead, 0);
// Fully close both sockets
acceptedSocket->close();
socket->shutdownWriteNow();
// Verify that writeError() was invoked on the write callback.
- CHECK_EQ(wcb.state, STATE_FAILED);
- CHECK_EQ(wcb.bytesWritten, 0);
+ ASSERT_EQ(wcb.state, STATE_FAILED);
+ ASSERT_EQ(wcb.bytesWritten, 0);
// Even though we haven't looped yet, we should be able to accept
// the connection.
fds[0].events = POLLIN;
fds[0].revents = 0;
int rc = poll(fds, 1, 0);
- CHECK_EQ(rc, 0);
+ ASSERT_EQ(rc, 0);
// Write data to the accepted socket
uint8_t acceptedWbuf[192];
//
// Check that the connection was completed successfully and that the read
// callback was invoked as expected.
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.buffers.size(), 1);
- CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
- CHECK_EQ(memcmp(rcb.buffers[0].buffer,
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.buffers.size(), 1);
+ ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
+ ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
acceptedWbuf, sizeof(acceptedWbuf)), 0);
// Since we used shutdownWriteNow(), it should have discarded all pending
// socket.
uint8_t readbuf[sizeof(wbuf)];
uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
- CHECK_EQ(bytesRead, 0);
+ ASSERT_EQ(bytesRead, 0);
// Fully close both sockets
acceptedSocket->close();
// The kernel should be able to buffer the write request so it can succeed.
evb.loop();
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
if (size1 > 0) {
- CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
}
if (size2 > 0) {
- CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
}
socket->close();
size_t end = bytesRead;
if (start < size1) {
size_t cmpLen = min(size1, end) - start;
- CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
+ ASSERT_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
}
if (end > size1 && end <= size1 + size2) {
size_t itOffset;
buf2Offset = 0;
cmpLen = end - size1;
}
- CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
+ ASSERT_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
cmpLen),
0);
}
}
- CHECK_EQ(bytesRead, size1 + size2);
+ ASSERT_EQ(bytesRead, size1 + size2);
}
TEST(AsyncSocketTest, ConnectCallbackWrite) {
testConnectOptWrite(100, 200);
// Test using a large buffer in the connect callback, that should block
- const size_t largeSize = 8*1024*1024;
+ const size_t largeSize = 32 * 1024 * 1024;
testConnectOptWrite(100, largeSize);
// Test using a large initial write
AsyncSocket::newSocket(&evb, server.getAddress(), 30);
evb.loop(); // loop until the socket is connected
- // write() a large chunk of data, with no-one on the other end reading
- size_t writeLength = 8*1024*1024;
+ // write() a large chunk of data, with no-one on the other end reading.
+ // Tricky: the kernel caches the connection metrics for recently-used
+ // routes (see tcp_no_metrics_save) so a freshly opened connection can
+ // have a send buffer size bigger than wmem_default. This makes the test
+ // flaky on contbuild if writeLength is < wmem_max (20M on our systems).
+ size_t writeLength = 32 * 1024 * 1024;
uint32_t timeout = 200;
socket->setSendTimeout(timeout);
scoped_array<char> buf(new char[writeLength]);
TimePoint end;
// Make sure the write attempt timed out as requested
- CHECK_EQ(wcb.state, STATE_FAILED);
- CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
+ ASSERT_EQ(wcb.state, STATE_FAILED);
+ ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
// Check that the write timed out within a reasonable period of time.
// We don't check for exactly the specified timeout, since AsyncSocket only
// accept and immediately close the socket
std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
- acceptedSocket.reset();
+ acceptedSocket->close();
// write() a large chunk of data
- size_t writeLength = 8*1024*1024;
+ size_t writeLength = 32 * 1024 * 1024;
scoped_array<char> buf(new char[writeLength]);
memset(buf.get(), 'a', writeLength);
WriteCallback wcb;
// Make sure the write failed.
// It would be nice if AsyncSocketException could convey the errno value,
// so that we could check for EPIPE
- CHECK_EQ(wcb.state, STATE_FAILED);
- CHECK_EQ(wcb.exception.getType(),
+ ASSERT_EQ(wcb.state, STATE_FAILED);
+ ASSERT_EQ(wcb.exception.getType(),
AsyncSocketException::INTERNAL_ERROR);
ASSERT_FALSE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
}
+/**
+ * Test that bytes written is correctly computed in case of write failure
+ */
+TEST(AsyncSocketTest, WriteErrorCallbackBytesWritten) {
+ // Send and receive buffer sizes for the sockets.
+ const int sockBufSize = 8 * 1024;
+
+ TestServer server(false, sockBufSize);
+
+ AsyncSocket::OptionMap options{
+ {{SOL_SOCKET, SO_SNDBUF}, sockBufSize},
+ {{SOL_SOCKET, SO_RCVBUF}, sockBufSize},
+ {{IPPROTO_TCP, TCP_NODELAY}, 1},
+ };
+
+ // The current thread will be used by the receiver - use a separate thread
+ // for the sender.
+ EventBase senderEvb;
+ std::thread senderThread([&]() { senderEvb.loopForever(); });
+
+ ConnCallback ccb;
+ std::shared_ptr<AsyncSocket> socket;
+
+ senderEvb.runInEventBaseThreadAndWait([&]() {
+ socket = AsyncSocket::newSocket(&senderEvb);
+ socket->connect(&ccb, server.getAddress(), 30, options);
+ });
+
+ // accept the socket on the server side
+ std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+
+ // Send a big (45KB) write so that it is partially written. The first write
+ // is 16KB (8KB on both sides) and subsequent writes are 8KB each. Reading
+ // just under 24KB would cause 3-4 writes for the total of 32-40KB in the
+ // following sequence: 16KB + 8KB + 8KB (+ 8KB). This ensures that not all
+ // bytes are written when the socket is reset. Having at least 3 writes
+ // ensures that the total size (45KB) would be exceeed in case of overcounting
+ // based on the initial write size of 16KB.
+ constexpr size_t sendSize = 45 * 1024;
+ auto const sendBuf = std::vector<char>(sendSize, 'a');
+
+ WriteCallback wcb;
+
+ senderEvb.runInEventBaseThreadAndWait(
+ [&]() { socket->write(&wcb, sendBuf.data(), sendSize); });
+
+ // Reading 20KB would cause three additional writes of 8KB, but less
+ // than 45KB total, so the socket is reset before all bytes are written.
+ constexpr size_t recvSize = 20 * 1024;
+ uint8_t recvBuf[recvSize];
+ int bytesRead = acceptedSocket->readAll(recvBuf, sizeof(recvBuf));
+
+ acceptedSocket->closeWithReset();
+
+ senderEvb.terminateLoopSoon();
+ senderThread.join();
+
+ LOG(INFO) << "Bytes written: " << wcb.bytesWritten;
+
+ ASSERT_EQ(STATE_FAILED, wcb.state);
+ ASSERT_GE(wcb.bytesWritten, bytesRead);
+ ASSERT_LE(wcb.bytesWritten, sendSize);
+ ASSERT_EQ(recvSize, bytesRead);
+ ASSERT(32 * 1024 == wcb.bytesWritten || 40 * 1024 == wcb.bytesWritten);
+}
+
/**
* Test writing a mix of simple buffers and IOBufs
*/
ReadCallback rcb;
acceptedSocket->setReadCB(&rcb);
+ // Check if EOR tracking flag can be set and reset.
+ EXPECT_FALSE(socket->isEorTrackingEnabled());
+ socket->setEorTracking(true);
+ EXPECT_TRUE(socket->isEorTrackingEnabled());
+ socket->setEorTracking(false);
+ EXPECT_FALSE(socket->isEorTrackingEnabled());
+
// Write a simple buffer to the socket
constexpr size_t simpleBufLength = 5;
char simpleBuf[simpleBufLength];
// Let the reads and writes run to completion
evb.loop();
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
// Make sure the reader got the right data in the right order
- CHECK_EQ(rcb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.buffers.size(), 1);
- CHECK_EQ(rcb.buffers[0].length,
+ ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.buffers.size(), 1);
+ ASSERT_EQ(rcb.buffers[0].length,
simpleBufLength + buf1Length + buf2Length + buf3Length);
- CHECK_EQ(
+ ASSERT_EQ(
memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
- CHECK_EQ(
+ ASSERT_EQ(
memcmp(rcb.buffers[0].buffer + simpleBufLength,
buf1Copy->data(), buf1Copy->length()), 0);
- CHECK_EQ(
+ ASSERT_EQ(
memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
buf2Copy->data(), buf2Copy->length()), 0);
write3.scheduleTimeout(140);
evb.loop();
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
if (wcb3.state != STATE_SUCCEEDED) {
throw(wcb3.exception);
}
- CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
// Make sure the reader got the data with the right grouping
- CHECK_EQ(rcb.state, STATE_SUCCEEDED);
- CHECK_EQ(rcb.buffers.size(), 2);
- CHECK_EQ(rcb.buffers[0].length, buf1Length);
- CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
+ ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(rcb.buffers.size(), 2);
+ ASSERT_EQ(rcb.buffers[0].length, buf1Length);
+ ASSERT_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
acceptedSocket->close();
socket->close();
evb.loop(); // loop until the data is sent
- CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb4.state, STATE_SUCCEEDED);
rcb.verifyData(buf.get(), len1 + len2);
ASSERT_TRUE(socket->isClosedBySelf());
socket->close();
evb.loop(); // loop until the data is sent
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
rcb.verifyData(buf.get(), len1 + len2);
ASSERT_TRUE(socket->isClosedBySelf());
evb.loop();
// Make sure we are connected
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
// Schedule pending writes, until several write attempts have blocked
char buf[128];
for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
it != writeCallbacks.end();
++it) {
- CHECK_EQ((*it)->state, STATE_FAILED);
+ ASSERT_EQ((*it)->state, STATE_FAILED);
}
ASSERT_TRUE(socket->isClosedBySelf());
WriteCallback wcb1;
socket.write(&wcb1, expectedData, expectedDataSz);
evb.loop();
- CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
rcb.verifyData(expectedData, expectedDataSz);
- CHECK_EQ(socket.immediateReadCalled, true);
+ ASSERT_EQ(socket.immediateReadCalled, true);
ASSERT_FALSE(socket.isClosedBySelf());
ASSERT_FALSE(socket.isClosedByPeer());
WriteCallback wcb;
socket.write(&wcb, expectedData, expectedDataSz);
evb.loop();
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
/* we shoud've only read maxBufferSz data since readCallback_
* was reset in dataAvailableCallback */
- CHECK_EQ(rcb.dataRead(), maxBufferSz);
- CHECK_EQ(socket.immediateReadCalled, false);
+ ASSERT_EQ(rcb.dataRead(), maxBufferSz);
+ ASSERT_EQ(socket.immediateReadCalled, false);
ASSERT_FALSE(socket.isClosedBySelf());
ASSERT_FALSE(socket.isClosedByPeer());
eventBase.loop();
// Verify that the server accepted a connection
- CHECK_EQ(acceptCallback.getEvents()->size(), 3);
- CHECK_EQ(acceptCallback.getEvents()->at(0).type,
+ ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
+ ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(acceptCallback.getEvents()->at(1).type,
+ ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(acceptCallback.getEvents()->at(2).type,
+ ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
TestAcceptCallback::TYPE_STOP);
int fd = acceptCallback.getEvents()->at(1).fd;
// The accepted connection should already be in non-blocking mode
int flags = fcntl(fd, F_GETFL, 0);
- CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
+ ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
#ifndef TCP_NOPUSH
// The accepted connection should already have TCP_NODELAY set
int value;
socklen_t valueLength = sizeof(value);
int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
- CHECK_EQ(rc, 0);
- CHECK_EQ(value, 1);
+ ASSERT_EQ(rc, 0);
+ ASSERT_EQ(value, 1);
#endif
}
// exactly round robin in the future, we can simplify the test checks here.
// (We'll also need to update the termination code, since we expect cb6 to
// get called twice to terminate the loop.)
- CHECK_EQ(cb1.getEvents()->size(), 4);
- CHECK_EQ(cb1.getEvents()->at(0).type,
+ ASSERT_EQ(cb1.getEvents()->size(), 4);
+ ASSERT_EQ(cb1.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb1.getEvents()->at(1).type,
+ ASSERT_EQ(cb1.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb1.getEvents()->at(2).type,
+ ASSERT_EQ(cb1.getEvents()->at(2).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb1.getEvents()->at(3).type,
+ ASSERT_EQ(cb1.getEvents()->at(3).type,
TestAcceptCallback::TYPE_STOP);
- CHECK_EQ(cb2.getEvents()->size(), 4);
- CHECK_EQ(cb2.getEvents()->at(0).type,
+ ASSERT_EQ(cb2.getEvents()->size(), 4);
+ ASSERT_EQ(cb2.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb2.getEvents()->at(1).type,
+ ASSERT_EQ(cb2.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb2.getEvents()->at(2).type,
+ ASSERT_EQ(cb2.getEvents()->at(2).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb2.getEvents()->at(3).type,
+ ASSERT_EQ(cb2.getEvents()->at(3).type,
TestAcceptCallback::TYPE_STOP);
- CHECK_EQ(cb3.getEvents()->size(), 2);
- CHECK_EQ(cb3.getEvents()->at(0).type,
+ ASSERT_EQ(cb3.getEvents()->size(), 2);
+ ASSERT_EQ(cb3.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb3.getEvents()->at(1).type,
+ ASSERT_EQ(cb3.getEvents()->at(1).type,
TestAcceptCallback::TYPE_STOP);
- CHECK_EQ(cb4.getEvents()->size(), 3);
- CHECK_EQ(cb4.getEvents()->at(0).type,
+ ASSERT_EQ(cb4.getEvents()->size(), 3);
+ ASSERT_EQ(cb4.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb4.getEvents()->at(1).type,
+ ASSERT_EQ(cb4.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb4.getEvents()->at(2).type,
+ ASSERT_EQ(cb4.getEvents()->at(2).type,
TestAcceptCallback::TYPE_STOP);
- CHECK_EQ(cb5.getEvents()->size(), 2);
- CHECK_EQ(cb5.getEvents()->at(0).type,
+ ASSERT_EQ(cb5.getEvents()->size(), 2);
+ ASSERT_EQ(cb5.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb5.getEvents()->at(1).type,
+ ASSERT_EQ(cb5.getEvents()->at(1).type,
TestAcceptCallback::TYPE_STOP);
- CHECK_EQ(cb6.getEvents()->size(), 4);
- CHECK_EQ(cb6.getEvents()->at(0).type,
+ ASSERT_EQ(cb6.getEvents()->size(), 4);
+ ASSERT_EQ(cb6.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb6.getEvents()->at(1).type,
+ ASSERT_EQ(cb6.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb6.getEvents()->at(2).type,
+ ASSERT_EQ(cb6.getEvents()->at(2).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb6.getEvents()->at(3).type,
+ ASSERT_EQ(cb6.getEvents()->at(3).type,
TestAcceptCallback::TYPE_STOP);
- CHECK_EQ(cb7.getEvents()->size(), 3);
- CHECK_EQ(cb7.getEvents()->at(0).type,
+ ASSERT_EQ(cb7.getEvents()->size(), 3);
+ ASSERT_EQ(cb7.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb7.getEvents()->at(1).type,
+ ASSERT_EQ(cb7.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb7.getEvents()->at(2).type,
+ ASSERT_EQ(cb7.getEvents()->at(2).type,
TestAcceptCallback::TYPE_STOP);
}
});
cb1.setConnectionAcceptedFn(
[&](int /* fd */, const folly::SocketAddress& /* addr */) {
- CHECK_EQ(thread_id, std::this_thread::get_id());
+ ASSERT_EQ(thread_id, std::this_thread::get_id());
serverSocket->removeAcceptCallback(&cb1, &eventBase);
});
cb1.setAcceptStoppedFn([&](){
- CHECK_EQ(thread_id, std::this_thread::get_id());
+ ASSERT_EQ(thread_id, std::this_thread::get_id());
});
// Test having callbacks remove other callbacks before them on the list,
// exactly round robin in the future, we can simplify the test checks here.
// (We'll also need to update the termination code, since we expect cb6 to
// get called twice to terminate the loop.)
- CHECK_EQ(cb1.getEvents()->size(), 3);
- CHECK_EQ(cb1.getEvents()->at(0).type,
+ ASSERT_EQ(cb1.getEvents()->size(), 3);
+ ASSERT_EQ(cb1.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(cb1.getEvents()->at(1).type,
+ ASSERT_EQ(cb1.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(cb1.getEvents()->at(2).type,
+ ASSERT_EQ(cb1.getEvents()->at(2).type,
TestAcceptCallback::TYPE_STOP);
}
eventBase->loop();
// Verify that the server accepted a connection
- CHECK_EQ(acceptCallback.getEvents()->size(), 3);
- CHECK_EQ(acceptCallback.getEvents()->at(0).type,
+ ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
+ ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(acceptCallback.getEvents()->at(1).type,
+ ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(acceptCallback.getEvents()->at(2).type,
+ ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
TestAcceptCallback::TYPE_STOP);
}
acceptedSocket.reset();
// Test that server socket was closed
- ssize_t sz = read(fd, simpleBuf, simpleBufLength);
- CHECK_EQ(sz, -1);
- CHECK_EQ(errno, 9);
+ folly::test::msvcSuppressAbortOnInvalidParams([&] {
+ ssize_t sz = read(fd, simpleBuf, simpleBufLength);
+ ASSERT_EQ(sz, -1);
+ ASSERT_EQ(errno, EBADF);
+ });
delete[] simpleBuf;
}
addr.sin_family = AF_INET;
addr.sin_port = 0;
addr.sin_addr.s_addr = INADDR_ANY;
- CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
+ ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
sizeof(addr)), 0);
// Look up the address that we bound to
folly::SocketAddress boundAddress;
// Make sure AsyncServerSocket reports the same address that we bound to
folly::SocketAddress serverSocketAddress;
serverSocket->getAddress(&serverSocketAddress);
- CHECK_EQ(boundAddress, serverSocketAddress);
+ ASSERT_EQ(boundAddress, serverSocketAddress);
// Make sure the socket works
serverSocketSanityTest(serverSocket.get());
addr.sin_family = AF_INET;
addr.sin_port = 0;
addr.sin_addr.s_addr = INADDR_ANY;
- CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
+ ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
sizeof(addr)), 0);
// Look up the address that we bound to
folly::SocketAddress boundAddress;
boundAddress.setFromLocalAddress(fd);
// listen
- CHECK_EQ(listen(fd, 16), 0);
+ ASSERT_EQ(listen(fd, 16), 0);
// Create a server socket
AsyncServerSocket::UniquePtr serverSocket(
// Make sure AsyncServerSocket reports the same address that we bound to
folly::SocketAddress serverSocketAddress;
serverSocket->getAddress(&serverSocketAddress);
- CHECK_EQ(boundAddress, serverSocketAddress);
+ ASSERT_EQ(boundAddress, serverSocketAddress);
// Make sure the socket works
serverSocketSanityTest(serverSocket.get());
eventBase.loop();
// Verify that the server accepted a connection
- CHECK_EQ(acceptCallback.getEvents()->size(), 3);
- CHECK_EQ(acceptCallback.getEvents()->at(0).type,
+ ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
+ ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
TestAcceptCallback::TYPE_START);
- CHECK_EQ(acceptCallback.getEvents()->at(1).type,
+ ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
TestAcceptCallback::TYPE_ACCEPT);
- CHECK_EQ(acceptCallback.getEvents()->at(2).type,
+ ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
TestAcceptCallback::TYPE_STOP);
int fd = acceptCallback.getEvents()->at(1).fd;
// The accepted connection should already be in non-blocking mode
int flags = fcntl(fd, F_GETFL, 0);
- CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
+ ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
}
TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
acceptCallback.setConnectionAcceptedFn(
[&](int /* fd */, const folly::SocketAddress& /* addr */) {
count++;
- CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
+ ASSERT_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
if (count == 4) {
// all messages are processed, remove accept callback
socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
evb.loop();
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
- CHECK_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
ASSERT_TRUE(bcb.hasBuffered());
ASSERT_TRUE(bcb.hasBufferCleared());
socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
evb.loop();
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
}
#if FOLLY_ALLOW_TFO
evb.loop();
- CHECK_EQ(cb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(cb.state, STATE_SUCCEEDED);
EXPECT_LE(0, socket->getConnectTime().count());
EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
EXPECT_TRUE(socket->getTFOAttempted());
evb.loop();
- CHECK_EQ(cb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(cb.state, STATE_SUCCEEDED);
EXPECT_LE(0, socket->getConnectTime().count());
EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
EXPECT_TRUE(socket->getTFOAttempted());
/**
* Test connecting to a server that isn't listening
*/
-TEST(AsyncSocketTest, ConnectRefusedTFO) {
+TEST(AsyncSocketTest, ConnectRefusedImmediatelyTFO) {
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
WriteCallback write1;
// Trigger the connect if TFO attempt is supported.
socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
- evb.loop();
WriteCallback write2;
socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
evb.loop();
// Loop, although there shouldn't be anything to do.
evb.loop();
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
evb.loop();
// Make sure the connection was aborted
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
ConnCallback ccb;
socket->connect(&ccb, server.getAddress(), 30);
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
ReadCallback rcb;
socket->setReadCB(&rcb);
EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
}
+TEST(AsyncSocketTest, ConnectRefusedDelayedTFO) {
+ EventBase evb;
+
+ auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
+ socket->enableTFO();
+
+ // Hopefully this fails
+ folly::SocketAddress fakeAddr("127.0.0.1", 65535);
+ EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
+ .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
+ sockaddr_storage addr;
+ auto len = fakeAddr.getAddress(&addr);
+ int ret = connect(fd, (const struct sockaddr*)&addr, len);
+ LOG(INFO) << "connecting the socket " << fd << " : " << ret << " : "
+ << errno;
+ return ret;
+ }));
+
+ // Hopefully nothing is actually listening on this address
+ ConnCallback cb;
+ socket->connect(&cb, fakeAddr, 30);
+
+ WriteCallback write1;
+ // Trigger the connect if TFO attempt is supported.
+ socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
+
+ if (socket->getTFOFinished()) {
+ // This test is useless now.
+ return;
+ }
+ WriteCallback write2;
+ // Trigger the connect if TFO attempt is supported.
+ socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
+ evb.loop();
+
+ EXPECT_EQ(STATE_FAILED, write1.state);
+ EXPECT_EQ(STATE_FAILED, write2.state);
+ EXPECT_FALSE(socket->getTFOSucceded());
+
+ EXPECT_EQ(STATE_SUCCEEDED, cb.state);
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
+ EXPECT_TRUE(socket->getTFOAttempted());
+}
+
TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
// Try connecting to server that won't respond.
//
ConnCallback ccb;
socket->connect(&ccb, server.getAddress(), 30);
- CHECK_EQ(ccb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
ReadCallback rcb;
socket->setReadCB(&rcb);
evb.loop();
- CHECK_EQ(cb.state, STATE_SUCCEEDED);
+ ASSERT_EQ(cb.state, STATE_SUCCEEDED);
EXPECT_LE(0, socket->getConnectTime().count());
EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
EXPECT_TRUE(socket->getTFOAttempted());
EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
}
+class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback {
+ public:
+ MOCK_METHOD1(evbAttached, void(AsyncSocket*));
+ MOCK_METHOD1(evbDetached, void(AsyncSocket*));
+};
+
+TEST(AsyncSocketTest, EvbCallbacks) {
+ auto cb = folly::make_unique<MockEvbChangeCallback>();
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+ InSequence seq;
+ EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1);
+ EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1);
+
+ socket->setEvbChangedCallback(std::move(cb));
+ socket->detachEventBase();
+ socket->attachEventBase(&evb);
+}
+
#endif