/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
testConnectOptWrite(100, 200);
// Test using a large buffer in the connect callback, that should block
- const size_t largeSize = 8*1024*1024;
+ const size_t largeSize = 32 * 1024 * 1024;
testConnectOptWrite(100, largeSize);
// Test using a large initial write
AsyncSocket::newSocket(&evb, server.getAddress(), 30);
evb.loop(); // loop until the socket is connected
- // write() a large chunk of data, with no-one on the other end reading
- size_t writeLength = 8*1024*1024;
+ // write() a large chunk of data, with no-one on the other end reading.
+ // Tricky: the kernel caches the connection metrics for recently-used
+ // routes (see tcp_no_metrics_save) so a freshly opened connection can
+ // have a send buffer size bigger than wmem_default. This makes the test
+ // flaky on contbuild if writeLength is < wmem_max (20M on our systems).
+ size_t writeLength = 32 * 1024 * 1024;
uint32_t timeout = 200;
socket->setSendTimeout(timeout);
scoped_array<char> buf(new char[writeLength]);
acceptedSocket->close();
// write() a large chunk of data
- size_t writeLength = 8*1024*1024;
+ size_t writeLength = 32 * 1024 * 1024;
scoped_array<char> buf(new char[writeLength]);
memset(buf.get(), 'a', writeLength);
WriteCallback wcb;
ASSERT_FALSE(socket->isClosedByPeer());
}
+/**
+ * Test that bytes written is correctly computed in case of write failure
+ */
+TEST(AsyncSocketTest, WriteErrorCallbackBytesWritten) {
+ // Send and receive buffer sizes for the sockets.
+ const int sockBufSize = 8 * 1024;
+
+ TestServer server(false, sockBufSize);
+
+ AsyncSocket::OptionMap options{
+ {{SOL_SOCKET, SO_SNDBUF}, sockBufSize},
+ {{SOL_SOCKET, SO_RCVBUF}, sockBufSize},
+ {{IPPROTO_TCP, TCP_NODELAY}, 1},
+ };
+
+ // The current thread will be used by the receiver - use a separate thread
+ // for the sender.
+ EventBase senderEvb;
+ std::thread senderThread([&]() { senderEvb.loopForever(); });
+
+ ConnCallback ccb;
+ std::shared_ptr<AsyncSocket> socket;
+
+ senderEvb.runInEventBaseThreadAndWait([&]() {
+ socket = AsyncSocket::newSocket(&senderEvb);
+ socket->connect(&ccb, server.getAddress(), 30, options);
+ });
+
+ // accept the socket on the server side
+ std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+
+ // Send a big (45KB) write so that it is partially written. The first write
+ // is 16KB (8KB on both sides) and subsequent writes are 8KB each. Reading
+ // just under 24KB would cause 3-4 writes for the total of 32-40KB in the
+ // following sequence: 16KB + 8KB + 8KB (+ 8KB). This ensures that not all
+ // bytes are written when the socket is reset. Having at least 3 writes
+ // ensures that the total size (45KB) would be exceeed in case of overcounting
+ // based on the initial write size of 16KB.
+ constexpr size_t sendSize = 45 * 1024;
+ auto const sendBuf = std::vector<char>(sendSize, 'a');
+
+ WriteCallback wcb;
+
+ senderEvb.runInEventBaseThreadAndWait(
+ [&]() { socket->write(&wcb, sendBuf.data(), sendSize); });
+
+ // Reading 20KB would cause three additional writes of 8KB, but less
+ // than 45KB total, so the socket is reset before all bytes are written.
+ constexpr size_t recvSize = 20 * 1024;
+ uint8_t recvBuf[recvSize];
+ int bytesRead = acceptedSocket->readAll(recvBuf, sizeof(recvBuf));
+
+ acceptedSocket->closeWithReset();
+
+ senderEvb.terminateLoopSoon();
+ senderThread.join();
+
+ LOG(INFO) << "Bytes written: " << wcb.bytesWritten;
+
+ ASSERT_EQ(STATE_FAILED, wcb.state);
+ ASSERT_GE(wcb.bytesWritten, bytesRead);
+ ASSERT_LE(wcb.bytesWritten, sendSize);
+ ASSERT_EQ(recvSize, bytesRead);
+ ASSERT(32 * 1024 == wcb.bytesWritten || 40 * 1024 == wcb.bytesWritten);
+}
+
/**
* Test writing a mix of simple buffers and IOBufs
*/
ReadCallback rcb;
acceptedSocket->setReadCB(&rcb);
+ // Check if EOR tracking flag can be set and reset.
+ EXPECT_FALSE(socket->isEorTrackingEnabled());
+ socket->setEorTracking(true);
+ EXPECT_TRUE(socket->isEorTrackingEnabled());
+ socket->setEorTracking(false);
+ EXPECT_FALSE(socket->isEorTrackingEnabled());
+
// Write a simple buffer to the socket
constexpr size_t simpleBufLength = 5;
char simpleBuf[simpleBufLength];
EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
}
+class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback {
+ public:
+ MOCK_METHOD1(evbAttached, void(AsyncSocket*));
+ MOCK_METHOD1(evbDetached, void(AsyncSocket*));
+};
+
+TEST(AsyncSocketTest, EvbCallbacks) {
+ auto cb = folly::make_unique<MockEvbChangeCallback>();
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+ InSequence seq;
+ EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1);
+ EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1);
+
+ socket->setEvbChangedCallback(std::move(cb));
+ socket->detachEventBase();
+ socket->attachEventBase(&evb);
+}
+
#endif