Enable EOR flag configuration for folly::AsyncSocket.
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
index afe23fa134eb37f37e037373df6f4e627f6fc1e9..13f23b75d81a6be5fab18944100bb2202f5d3ed4 100644 (file)
@@ -1,5 +1,5 @@
 /*
 /*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -944,7 +944,7 @@ TEST(AsyncSocketTest, ConnectCallbackWrite) {
   testConnectOptWrite(100, 200);
 
   // Test using a large buffer in the connect callback, that should block
   testConnectOptWrite(100, 200);
 
   // Test using a large buffer in the connect callback, that should block
-  const size_t largeSize = 8*1024*1024;
+  const size_t largeSize = 32 * 1024 * 1024;
   testConnectOptWrite(100, largeSize);
 
   // Test using a large initial write
   testConnectOptWrite(100, largeSize);
 
   // Test using a large initial write
@@ -1013,8 +1013,12 @@ TEST(AsyncSocketTest, WriteTimeout) {
     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
   evb.loop(); // loop until the socket is connected
 
     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
   evb.loop(); // loop until the socket is connected
 
-  // write() a large chunk of data, with no-one on the other end reading
-  size_t writeLength = 8*1024*1024;
+  // write() a large chunk of data, with no-one on the other end reading.
+  // Tricky: the kernel caches the connection metrics for recently-used
+  // routes (see tcp_no_metrics_save) so a freshly opened connection can
+  // have a send buffer size bigger than wmem_default.  This makes the test
+  // flaky on contbuild if writeLength is < wmem_max (20M on our systems).
+  size_t writeLength = 32 * 1024 * 1024;
   uint32_t timeout = 200;
   socket->setSendTimeout(timeout);
   scoped_array<char> buf(new char[writeLength]);
   uint32_t timeout = 200;
   socket->setSendTimeout(timeout);
   scoped_array<char> buf(new char[writeLength]);
@@ -1071,7 +1075,7 @@ TEST(AsyncSocketTest, WritePipeError) {
   acceptedSocket->close();
 
   // write() a large chunk of data
   acceptedSocket->close();
 
   // write() a large chunk of data
-  size_t writeLength = 8*1024*1024;
+  size_t writeLength = 32 * 1024 * 1024;
   scoped_array<char> buf(new char[writeLength]);
   memset(buf.get(), 'a', writeLength);
   WriteCallback wcb;
   scoped_array<char> buf(new char[writeLength]);
   memset(buf.get(), 'a', writeLength);
   WriteCallback wcb;
@@ -1090,6 +1094,72 @@ TEST(AsyncSocketTest, WritePipeError) {
   ASSERT_FALSE(socket->isClosedByPeer());
 }
 
   ASSERT_FALSE(socket->isClosedByPeer());
 }
 
+/**
+ * Test that bytes written is correctly computed in case of write failure
+ */
+TEST(AsyncSocketTest, WriteErrorCallbackBytesWritten) {
+  // Send and receive buffer sizes for the sockets.
+  const int sockBufSize = 8 * 1024;
+
+  TestServer server(false, sockBufSize);
+
+  AsyncSocket::OptionMap options{
+      {{SOL_SOCKET, SO_SNDBUF}, sockBufSize},
+      {{SOL_SOCKET, SO_RCVBUF}, sockBufSize},
+      {{IPPROTO_TCP, TCP_NODELAY}, 1},
+  };
+
+  // The current thread will be used by the receiver - use a separate thread
+  // for the sender.
+  EventBase senderEvb;
+  std::thread senderThread([&]() { senderEvb.loopForever(); });
+
+  ConnCallback ccb;
+  std::shared_ptr<AsyncSocket> socket;
+
+  senderEvb.runInEventBaseThreadAndWait([&]() {
+    socket = AsyncSocket::newSocket(&senderEvb);
+    socket->connect(&ccb, server.getAddress(), 30, options);
+  });
+
+  // accept the socket on the server side
+  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+
+  // Send a big (45KB) write so that it is partially written. The first write
+  // is 16KB (8KB on both sides) and subsequent writes are 8KB each. Reading
+  // just under 24KB would cause 3-4 writes for the total of 32-40KB in the
+  // following sequence: 16KB + 8KB + 8KB (+ 8KB). This ensures that not all
+  // bytes are written when the socket is reset. Having at least 3 writes
+  // ensures that the total size (45KB) would be exceeed in case of overcounting
+  // based on the initial write size of 16KB.
+  constexpr size_t sendSize = 45 * 1024;
+  auto const sendBuf = std::vector<char>(sendSize, 'a');
+
+  WriteCallback wcb;
+
+  senderEvb.runInEventBaseThreadAndWait(
+      [&]() { socket->write(&wcb, sendBuf.data(), sendSize); });
+
+  // Reading 20KB would cause three additional writes of 8KB, but less
+  // than 45KB total, so the socket is reset before all bytes are written.
+  constexpr size_t recvSize = 20 * 1024;
+  uint8_t recvBuf[recvSize];
+  int bytesRead = acceptedSocket->readAll(recvBuf, sizeof(recvBuf));
+
+  acceptedSocket->closeWithReset();
+
+  senderEvb.terminateLoopSoon();
+  senderThread.join();
+
+  LOG(INFO) << "Bytes written: " << wcb.bytesWritten;
+
+  ASSERT_EQ(STATE_FAILED, wcb.state);
+  ASSERT_GE(wcb.bytesWritten, bytesRead);
+  ASSERT_LE(wcb.bytesWritten, sendSize);
+  ASSERT_EQ(recvSize, bytesRead);
+  ASSERT(32 * 1024 == wcb.bytesWritten || 40 * 1024 == wcb.bytesWritten);
+}
+
 /**
  * Test writing a mix of simple buffers and IOBufs
  */
 /**
  * Test writing a mix of simple buffers and IOBufs
  */
@@ -1107,6 +1177,13 @@ TEST(AsyncSocketTest, WriteIOBuf) {
   ReadCallback rcb;
   acceptedSocket->setReadCB(&rcb);
 
   ReadCallback rcb;
   acceptedSocket->setReadCB(&rcb);
 
+  // Check if EOR tracking flag can be set and reset.
+  EXPECT_FALSE(socket->isEorTrackingEnabled());
+  socket->setEorTracking(true);
+  EXPECT_TRUE(socket->isEorTrackingEnabled());
+  socket->setEorTracking(false);
+  EXPECT_FALSE(socket->isEorTrackingEnabled());
+
   // Write a simple buffer to the socket
   constexpr size_t simpleBufLength = 5;
   char simpleBuf[simpleBufLength];
   // Write a simple buffer to the socket
   constexpr size_t simpleBufLength = 5;
   char simpleBuf[simpleBufLength];
@@ -2926,4 +3003,24 @@ TEST(AsyncSocketTest, ConnectTFOWithBigData) {
   EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
 }
 
   EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
 }
 
+class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback {
+ public:
+  MOCK_METHOD1(evbAttached, void(AsyncSocket*));
+  MOCK_METHOD1(evbDetached, void(AsyncSocket*));
+};
+
+TEST(AsyncSocketTest, EvbCallbacks) {
+  auto cb = folly::make_unique<MockEvbChangeCallback>();
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+  InSequence seq;
+  EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1);
+  EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1);
+
+  socket->setEvbChangedCallback(std::move(cb));
+  socket->detachEventBase();
+  socket->attachEventBase(&evb);
+}
+
 #endif
 #endif