Fix RequestContext held too long issue in EventBase
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
index a1988722ef2e41d66231c9483684abcc72fe9008..ec67d2f21c17cb0e0f55e4801b57f42c5e7be90a 100644 (file)
@@ -1096,6 +1096,44 @@ TEST(AsyncSocketTest, WritePipeError) {
   ASSERT_FALSE(socket->isClosedByPeer());
 }
 
+/**
+ * Test writing to a socket that has its read side closed
+ */
+TEST(AsyncSocketTest, WriteAfterReadEOF) {
+  TestServer server;
+
+  // connect()
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket =
+      AsyncSocket::newSocket(&evb, server.getAddress(), 30);
+  evb.loop(); // loop until the socket is connected
+
+  // Accept the connection
+  std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
+  ReadCallback rcb;
+  acceptedSocket->setReadCB(&rcb);
+
+  // Shutdown the write side of client socket (read side of server socket)
+  socket->shutdownWrite();
+  evb.loop();
+
+  // Check that accepted socket is still writable
+  ASSERT_FALSE(acceptedSocket->good());
+  ASSERT_TRUE(acceptedSocket->writable());
+
+  // Write data to accepted socket
+  constexpr size_t simpleBufLength = 5;
+  char simpleBuf[simpleBufLength];
+  memset(simpleBuf, 'a', simpleBufLength);
+  WriteCallback wcb;
+  acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
+  evb.loop();
+
+  // Make sure we were able to write even after getting a read EOF
+  ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+}
+
 /**
  * Test that bytes written is correctly computed in case of write failure
  */
@@ -2806,7 +2844,7 @@ class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback {
 };
 
 TEST(AsyncSocketTest, EvbCallbacks) {
-  auto cb = folly::make_unique<MockEvbChangeCallback>();
+  auto cb = std::make_unique<MockEvbChangeCallback>();
   EventBase evb;
   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
 
@@ -2819,7 +2857,40 @@ TEST(AsyncSocketTest, EvbCallbacks) {
   socket->attachEventBase(&evb);
 }
 
-#ifdef MSG_ERRQUEUE
+TEST(AsyncSocketTest, TestEvbDetachWtRegisteredIOHandlers) {
+  // Start listening on a local port
+  TestServer server;
+
+  // Connect using a AsyncSocket
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  ConnCallback cb;
+  socket->connect(&cb, server.getAddress(), 30);
+
+  evb.loop();
+
+  ASSERT_EQ(cb.state, STATE_SUCCEEDED);
+  EXPECT_LE(0, socket->getConnectTime().count());
+  EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+
+  // After the ioHandlers are registered, still should be able to detach/attach
+  ReadCallback rcb;
+  socket->setReadCB(&rcb);
+
+  auto cbEvbChg = std::make_unique<MockEvbChangeCallback>();
+  InSequence seq;
+  EXPECT_CALL(*cbEvbChg, evbDetached(socket.get())).Times(1);
+  EXPECT_CALL(*cbEvbChg, evbAttached(socket.get())).Times(1);
+
+  socket->setEvbChangedCallback(std::move(cbEvbChg));
+  EXPECT_TRUE(socket->isDetachable());
+  socket->detachEventBase();
+  socket->attachEventBase(&evb);
+
+  socket->close();
+}
+
+#ifdef FOLLY_HAVE_MSG_ERRQUEUE
 /* copied from include/uapi/linux/net_tstamp.h */
 /* SO_TIMESTAMPING gets an integer bit field comprised of these values */
 enum SOF_TIMESTAMPING {
@@ -2829,6 +2900,43 @@ enum SOF_TIMESTAMPING {
   SOF_TIMESTAMPING_OPT_CMSG = (1 << 10),
   SOF_TIMESTAMPING_OPT_TSONLY = (1 << 11),
 };
+
+class TestErrMessageCallback : public folly::AsyncSocket::ErrMessageCallback {
+ public:
+  TestErrMessageCallback()
+      : exception_(folly::AsyncSocketException::UNKNOWN, "none") {}
+
+  void errMessage(const cmsghdr& cmsg) noexcept override {
+    if (cmsg.cmsg_level == SOL_SOCKET && cmsg.cmsg_type == SCM_TIMESTAMPING) {
+      gotTimestamp_++;
+      checkResetCallback();
+    } else if (
+        (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
+        (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR)) {
+      gotByteSeq_++;
+      checkResetCallback();
+    }
+  }
+
+  void errMessageError(
+      const folly::AsyncSocketException& ex) noexcept override {
+    exception_ = ex;
+  }
+
+  void checkResetCallback() noexcept {
+    if (socket_ != nullptr && resetAfter_ != -1 &&
+        gotTimestamp_ + gotByteSeq_ == resetAfter_) {
+      socket_->setErrMessageCB(nullptr);
+    }
+  }
+
+  folly::AsyncSocket* socket_{nullptr};
+  folly::AsyncSocketException exception_;
+  int gotTimestamp_{0};
+  int gotByteSeq_{0};
+  int resetAfter_{-1};
+};
+
 TEST(AsyncSocketTest, ErrMessageCallback) {
   TestServer server;
 
@@ -2857,6 +2965,9 @@ TEST(AsyncSocketTest, ErrMessageCallback) {
   ASSERT_EQ(socket->getErrMessageCallback(),
             static_cast<folly::AsyncSocket::ErrMessageCallback*>(&errMsgCB));
 
+  errMsgCB.socket_ = socket.get();
+  errMsgCB.resetAfter_ = 3;
+
   // Enable timestamp notifications
   ASSERT_GT(socket->getFd(), 0);
   int flags = SOF_TIMESTAMPING_OPT_ID
@@ -2870,7 +2981,9 @@ TEST(AsyncSocketTest, ErrMessageCallback) {
   // write()
   std::vector<uint8_t> wbuf(128, 'a');
   WriteCallback wcb;
-  socket->write(&wcb, wbuf.data(), wbuf.size());
+  // Send two packets to get two EOM notifications
+  socket->write(&wcb, wbuf.data(), wbuf.size() / 2);
+  socket->write(&wcb, wbuf.data() + wbuf.size() / 2, wbuf.size() / 2);
 
   // Accept the connection.
   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
@@ -2895,10 +3008,12 @@ TEST(AsyncSocketTest, ErrMessageCallback) {
 
   // Check for the timestamp notifications.
   ASSERT_EQ(errMsgCB.exception_.type_, folly::AsyncSocketException::UNKNOWN);
-  ASSERT_TRUE(errMsgCB.gotByteSeq_);
-  ASSERT_TRUE(errMsgCB.gotTimestamp_);
+  ASSERT_GT(errMsgCB.gotByteSeq_, 0);
+  ASSERT_GT(errMsgCB.gotTimestamp_, 0);
+  ASSERT_EQ(
+      errMsgCB.gotByteSeq_ + errMsgCB.gotTimestamp_, errMsgCB.resetAfter_);
 }
-#endif // MSG_ERRQUEUE
+#endif // FOLLY_HAVE_MSG_ERRQUEUE
 
 TEST(AsyncSocket, PreReceivedData) {
   TestServer server;
@@ -3030,6 +3145,7 @@ TEST(AsyncSocket, PreReceivedDataTakeover) {
   evb.loop();
 }
 
+#ifdef MSG_NOSIGNAL
 TEST(AsyncSocketTest, SendMessageFlags) {
   TestServer server;
   TestSendMsgParamsCallback sendMsgCB(
@@ -3085,41 +3201,29 @@ TEST(AsyncSocketTest, SendMessageFlags) {
 }
 
 TEST(AsyncSocketTest, SendMessageAncillaryData) {
-  struct sockaddr_un addr = {AF_UNIX,
-                             "AsyncSocketTest.SendMessageAncillaryData\0"};
-
-  // Clean up the name in the name space we're going to use
-  ASSERT_FALSE(remove(addr.sun_path) == -1 && errno != ENOENT);
-
-  // Set up listening socket
-  int lfd = fsp::socket(AF_UNIX, SOCK_STREAM, 0);
-  ASSERT_NE(lfd, -1);
-  ASSERT_NE(bind(lfd, (struct sockaddr*)&addr, sizeof(addr)), -1)
-      << "Bind failed: " << errno;
-
-  // Create the connecting socket
-  int csd = fsp::socket(AF_UNIX, SOCK_STREAM, 0);
-  ASSERT_NE(csd, -1);
+  int fds[2];
+  EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, fds), 0);
 
-  // Listen for incoming connect
-  ASSERT_NE(listen(lfd, 5), -1);
+  // "Client" socket
+  int cfd = fds[0];
+  ASSERT_NE(cfd, -1);
 
-  // Connect to the listening socket
-  ASSERT_NE(fsp::connect(csd, (struct sockaddr*)&addr, sizeof(addr)), -1)
-      << "Connect request failed: " << errno;
-
-  // Accept the connection
-  int sfd = accept(lfd, nullptr, nullptr);
+  // "Server" socket
+  int sfd = fds[1];
   ASSERT_NE(sfd, -1);
+  SCOPE_EXIT { close(sfd); };
 
   // Instantiate AsyncSocket object for the connected socket
   EventBase evb;
-  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, csd);
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, cfd);
 
   // Open a temporary file and write a magic string to it
   // We'll transfer the file handle to test the message parameters
   // callback logic.
-  int tmpfd = open("/var/tmp", O_RDWR | O_TMPFILE);
+  TemporaryFile file(StringPiece(),
+                     fs::path(),
+                     TemporaryFile::Scope::UNLINK_IMMEDIATELY);
+  int tmpfd = file.fd();
   ASSERT_NE(tmpfd, -1) << "Failed to open a temporary file";
   std::string magicString("Magic string");
   ASSERT_EQ(write(tmpfd, magicString.c_str(), magicString.length()),
@@ -3178,6 +3282,7 @@ TEST(AsyncSocketTest, SendMessageAncillaryData) {
   int fd = 0;
   memcpy(&fd, CMSG_DATA(&r_u.cmh), sizeof(int));
   ASSERT_NE(fd, 0);
+  SCOPE_EXIT { close(fd); };
 
   std::vector<uint8_t> transferredMagicString(magicString.length() + 1, 0);
 
@@ -3193,3 +3298,73 @@ TEST(AsyncSocketTest, SendMessageAncillaryData) {
       magicString.end(),
       transferredMagicString.begin()));
 }
+
+TEST(AsyncSocketTest, UnixDomainSocketErrMessageCB) {
+  // In the latest stable kernel 4.14.3 as of 2017-12-04, Unix Domain
+  // Socket (UDS) does not support MSG_ERRQUEUE. So
+  // recvmsg(MSG_ERRQUEUE) will read application data from UDS which
+  // breaks application message flow.  To avoid this problem,
+  // AsyncSocket currently disables setErrMessageCB for UDS.
+  //
+  // This tests two things for UDS
+  // 1. setErrMessageCB fails
+  // 2. recvmsg(MSG_ERRQUEUE) reads application data
+  //
+  // Feel free to remove this test if UDS supports MSG_ERRQUEUE in the future.
+
+  int fd[2];
+  EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, fd), 0);
+  ASSERT_NE(fd[0], -1);
+  ASSERT_NE(fd[1], -1);
+  SCOPE_EXIT {
+    close(fd[1]);
+  };
+
+  EXPECT_EQ(fcntl(fd[0], F_SETFL, O_NONBLOCK), 0);
+  EXPECT_EQ(fcntl(fd[1], F_SETFL, O_NONBLOCK), 0);
+
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, fd[0]);
+
+  // setErrMessageCB should fail for unix domain socket
+  TestErrMessageCallback errMsgCB;
+  ASSERT_NE(&errMsgCB, nullptr);
+  socket->setErrMessageCB(&errMsgCB);
+  ASSERT_EQ(socket->getErrMessageCallback(), nullptr);
+
+#ifdef FOLLY_HAVE_MSG_ERRQUEUE
+  // The following verifies that MSG_ERRQUEUE does not work for UDS,
+  // and recvmsg reads application data
+  union {
+    // Space large enough to hold an 'int'
+    char control[CMSG_SPACE(sizeof(int))];
+    struct cmsghdr cmh;
+  } r_u;
+  struct msghdr msgh;
+  struct iovec iov;
+  int recv_data = 0;
+
+  msgh.msg_control = r_u.control;
+  msgh.msg_controllen = sizeof(r_u.control);
+  msgh.msg_name = nullptr;
+  msgh.msg_namelen = 0;
+  msgh.msg_iov = &iov;
+  msgh.msg_iovlen = 1;
+  iov.iov_base = &recv_data;
+  iov.iov_len = sizeof(recv_data);
+
+  // there is no data, recvmsg should fail
+  EXPECT_EQ(recvmsg(fd[1], &msgh, MSG_ERRQUEUE), -1);
+  EXPECT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK);
+
+  // provide some application data, error queue should be empty if it exists
+  // However, UDS reads application data as error message
+  int test_data = 123456;
+  WriteCallback wcb;
+  socket->write(&wcb, &test_data, sizeof(test_data));
+  recv_data = 0;
+  ASSERT_NE(recvmsg(fd[1], &msgh, MSG_ERRQUEUE), -1);
+  ASSERT_EQ(recv_data, test_data);
+#endif // FOLLY_HAVE_MSG_ERRQUEUE
+}
+#endif