X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fio%2Fasync%2Ftest%2FAsyncSocketTest2.cpp;h=ec67d2f21c17cb0e0f55e4801b57f42c5e7be90a;hp=a1988722ef2e41d66231c9483684abcc72fe9008;hb=bda67fde120837b77ddab74f23abcb22ae5b3029;hpb=8ecc23d91b78319b9d65747c4a689b6473c52ce1 diff --git a/folly/io/async/test/AsyncSocketTest2.cpp b/folly/io/async/test/AsyncSocketTest2.cpp index a1988722..ec67d2f2 100644 --- a/folly/io/async/test/AsyncSocketTest2.cpp +++ b/folly/io/async/test/AsyncSocketTest2.cpp @@ -1096,6 +1096,44 @@ TEST(AsyncSocketTest, WritePipeError) { ASSERT_FALSE(socket->isClosedByPeer()); } +/** + * Test writing to a socket that has its read side closed + */ +TEST(AsyncSocketTest, WriteAfterReadEOF) { + TestServer server; + + // connect() + EventBase evb; + std::shared_ptr socket = + AsyncSocket::newSocket(&evb, server.getAddress(), 30); + evb.loop(); // loop until the socket is connected + + // Accept the connection + std::shared_ptr acceptedSocket = server.acceptAsync(&evb); + ReadCallback rcb; + acceptedSocket->setReadCB(&rcb); + + // Shutdown the write side of client socket (read side of server socket) + socket->shutdownWrite(); + evb.loop(); + + // Check that accepted socket is still writable + ASSERT_FALSE(acceptedSocket->good()); + ASSERT_TRUE(acceptedSocket->writable()); + + // Write data to accepted socket + constexpr size_t simpleBufLength = 5; + char simpleBuf[simpleBufLength]; + memset(simpleBuf, 'a', simpleBufLength); + WriteCallback wcb; + acceptedSocket->write(&wcb, simpleBuf, simpleBufLength); + evb.loop(); + + // Make sure we were able to write even after getting a read EOF + ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF + ASSERT_EQ(wcb.state, STATE_SUCCEEDED); +} + /** * Test that bytes written is correctly computed in case of write failure */ @@ -2806,7 +2844,7 @@ class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback { }; TEST(AsyncSocketTest, EvbCallbacks) { - auto cb = folly::make_unique(); + auto cb = std::make_unique(); EventBase evb; std::shared_ptr socket = AsyncSocket::newSocket(&evb); @@ -2819,7 +2857,40 @@ TEST(AsyncSocketTest, EvbCallbacks) { socket->attachEventBase(&evb); } -#ifdef MSG_ERRQUEUE +TEST(AsyncSocketTest, TestEvbDetachWtRegisteredIOHandlers) { + // Start listening on a local port + TestServer server; + + // Connect using a AsyncSocket + EventBase evb; + std::shared_ptr socket = AsyncSocket::newSocket(&evb); + ConnCallback cb; + socket->connect(&cb, server.getAddress(), 30); + + evb.loop(); + + ASSERT_EQ(cb.state, STATE_SUCCEEDED); + EXPECT_LE(0, socket->getConnectTime().count()); + EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30)); + + // After the ioHandlers are registered, still should be able to detach/attach + ReadCallback rcb; + socket->setReadCB(&rcb); + + auto cbEvbChg = std::make_unique(); + InSequence seq; + EXPECT_CALL(*cbEvbChg, evbDetached(socket.get())).Times(1); + EXPECT_CALL(*cbEvbChg, evbAttached(socket.get())).Times(1); + + socket->setEvbChangedCallback(std::move(cbEvbChg)); + EXPECT_TRUE(socket->isDetachable()); + socket->detachEventBase(); + socket->attachEventBase(&evb); + + socket->close(); +} + +#ifdef FOLLY_HAVE_MSG_ERRQUEUE /* copied from include/uapi/linux/net_tstamp.h */ /* SO_TIMESTAMPING gets an integer bit field comprised of these values */ enum SOF_TIMESTAMPING { @@ -2829,6 +2900,43 @@ enum SOF_TIMESTAMPING { SOF_TIMESTAMPING_OPT_CMSG = (1 << 10), SOF_TIMESTAMPING_OPT_TSONLY = (1 << 11), }; + +class TestErrMessageCallback : public folly::AsyncSocket::ErrMessageCallback { + public: + TestErrMessageCallback() + : exception_(folly::AsyncSocketException::UNKNOWN, "none") {} + + void errMessage(const cmsghdr& cmsg) noexcept override { + if (cmsg.cmsg_level == SOL_SOCKET && cmsg.cmsg_type == SCM_TIMESTAMPING) { + gotTimestamp_++; + checkResetCallback(); + } else if ( + (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) || + (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR)) { + gotByteSeq_++; + checkResetCallback(); + } + } + + void errMessageError( + const folly::AsyncSocketException& ex) noexcept override { + exception_ = ex; + } + + void checkResetCallback() noexcept { + if (socket_ != nullptr && resetAfter_ != -1 && + gotTimestamp_ + gotByteSeq_ == resetAfter_) { + socket_->setErrMessageCB(nullptr); + } + } + + folly::AsyncSocket* socket_{nullptr}; + folly::AsyncSocketException exception_; + int gotTimestamp_{0}; + int gotByteSeq_{0}; + int resetAfter_{-1}; +}; + TEST(AsyncSocketTest, ErrMessageCallback) { TestServer server; @@ -2857,6 +2965,9 @@ TEST(AsyncSocketTest, ErrMessageCallback) { ASSERT_EQ(socket->getErrMessageCallback(), static_cast(&errMsgCB)); + errMsgCB.socket_ = socket.get(); + errMsgCB.resetAfter_ = 3; + // Enable timestamp notifications ASSERT_GT(socket->getFd(), 0); int flags = SOF_TIMESTAMPING_OPT_ID @@ -2870,7 +2981,9 @@ TEST(AsyncSocketTest, ErrMessageCallback) { // write() std::vector wbuf(128, 'a'); WriteCallback wcb; - socket->write(&wcb, wbuf.data(), wbuf.size()); + // Send two packets to get two EOM notifications + socket->write(&wcb, wbuf.data(), wbuf.size() / 2); + socket->write(&wcb, wbuf.data() + wbuf.size() / 2, wbuf.size() / 2); // Accept the connection. std::shared_ptr acceptedSocket = server.accept(); @@ -2895,10 +3008,12 @@ TEST(AsyncSocketTest, ErrMessageCallback) { // Check for the timestamp notifications. ASSERT_EQ(errMsgCB.exception_.type_, folly::AsyncSocketException::UNKNOWN); - ASSERT_TRUE(errMsgCB.gotByteSeq_); - ASSERT_TRUE(errMsgCB.gotTimestamp_); + ASSERT_GT(errMsgCB.gotByteSeq_, 0); + ASSERT_GT(errMsgCB.gotTimestamp_, 0); + ASSERT_EQ( + errMsgCB.gotByteSeq_ + errMsgCB.gotTimestamp_, errMsgCB.resetAfter_); } -#endif // MSG_ERRQUEUE +#endif // FOLLY_HAVE_MSG_ERRQUEUE TEST(AsyncSocket, PreReceivedData) { TestServer server; @@ -3030,6 +3145,7 @@ TEST(AsyncSocket, PreReceivedDataTakeover) { evb.loop(); } +#ifdef MSG_NOSIGNAL TEST(AsyncSocketTest, SendMessageFlags) { TestServer server; TestSendMsgParamsCallback sendMsgCB( @@ -3085,41 +3201,29 @@ TEST(AsyncSocketTest, SendMessageFlags) { } TEST(AsyncSocketTest, SendMessageAncillaryData) { - struct sockaddr_un addr = {AF_UNIX, - "AsyncSocketTest.SendMessageAncillaryData\0"}; - - // Clean up the name in the name space we're going to use - ASSERT_FALSE(remove(addr.sun_path) == -1 && errno != ENOENT); - - // Set up listening socket - int lfd = fsp::socket(AF_UNIX, SOCK_STREAM, 0); - ASSERT_NE(lfd, -1); - ASSERT_NE(bind(lfd, (struct sockaddr*)&addr, sizeof(addr)), -1) - << "Bind failed: " << errno; - - // Create the connecting socket - int csd = fsp::socket(AF_UNIX, SOCK_STREAM, 0); - ASSERT_NE(csd, -1); + int fds[2]; + EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, fds), 0); - // Listen for incoming connect - ASSERT_NE(listen(lfd, 5), -1); + // "Client" socket + int cfd = fds[0]; + ASSERT_NE(cfd, -1); - // Connect to the listening socket - ASSERT_NE(fsp::connect(csd, (struct sockaddr*)&addr, sizeof(addr)), -1) - << "Connect request failed: " << errno; - - // Accept the connection - int sfd = accept(lfd, nullptr, nullptr); + // "Server" socket + int sfd = fds[1]; ASSERT_NE(sfd, -1); + SCOPE_EXIT { close(sfd); }; // Instantiate AsyncSocket object for the connected socket EventBase evb; - std::shared_ptr socket = AsyncSocket::newSocket(&evb, csd); + std::shared_ptr socket = AsyncSocket::newSocket(&evb, cfd); // Open a temporary file and write a magic string to it // We'll transfer the file handle to test the message parameters // callback logic. - int tmpfd = open("/var/tmp", O_RDWR | O_TMPFILE); + TemporaryFile file(StringPiece(), + fs::path(), + TemporaryFile::Scope::UNLINK_IMMEDIATELY); + int tmpfd = file.fd(); ASSERT_NE(tmpfd, -1) << "Failed to open a temporary file"; std::string magicString("Magic string"); ASSERT_EQ(write(tmpfd, magicString.c_str(), magicString.length()), @@ -3178,6 +3282,7 @@ TEST(AsyncSocketTest, SendMessageAncillaryData) { int fd = 0; memcpy(&fd, CMSG_DATA(&r_u.cmh), sizeof(int)); ASSERT_NE(fd, 0); + SCOPE_EXIT { close(fd); }; std::vector transferredMagicString(magicString.length() + 1, 0); @@ -3193,3 +3298,73 @@ TEST(AsyncSocketTest, SendMessageAncillaryData) { magicString.end(), transferredMagicString.begin())); } + +TEST(AsyncSocketTest, UnixDomainSocketErrMessageCB) { + // In the latest stable kernel 4.14.3 as of 2017-12-04, Unix Domain + // Socket (UDS) does not support MSG_ERRQUEUE. So + // recvmsg(MSG_ERRQUEUE) will read application data from UDS which + // breaks application message flow. To avoid this problem, + // AsyncSocket currently disables setErrMessageCB for UDS. + // + // This tests two things for UDS + // 1. setErrMessageCB fails + // 2. recvmsg(MSG_ERRQUEUE) reads application data + // + // Feel free to remove this test if UDS supports MSG_ERRQUEUE in the future. + + int fd[2]; + EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, fd), 0); + ASSERT_NE(fd[0], -1); + ASSERT_NE(fd[1], -1); + SCOPE_EXIT { + close(fd[1]); + }; + + EXPECT_EQ(fcntl(fd[0], F_SETFL, O_NONBLOCK), 0); + EXPECT_EQ(fcntl(fd[1], F_SETFL, O_NONBLOCK), 0); + + EventBase evb; + std::shared_ptr socket = AsyncSocket::newSocket(&evb, fd[0]); + + // setErrMessageCB should fail for unix domain socket + TestErrMessageCallback errMsgCB; + ASSERT_NE(&errMsgCB, nullptr); + socket->setErrMessageCB(&errMsgCB); + ASSERT_EQ(socket->getErrMessageCallback(), nullptr); + +#ifdef FOLLY_HAVE_MSG_ERRQUEUE + // The following verifies that MSG_ERRQUEUE does not work for UDS, + // and recvmsg reads application data + union { + // Space large enough to hold an 'int' + char control[CMSG_SPACE(sizeof(int))]; + struct cmsghdr cmh; + } r_u; + struct msghdr msgh; + struct iovec iov; + int recv_data = 0; + + msgh.msg_control = r_u.control; + msgh.msg_controllen = sizeof(r_u.control); + msgh.msg_name = nullptr; + msgh.msg_namelen = 0; + msgh.msg_iov = &iov; + msgh.msg_iovlen = 1; + iov.iov_base = &recv_data; + iov.iov_len = sizeof(recv_data); + + // there is no data, recvmsg should fail + EXPECT_EQ(recvmsg(fd[1], &msgh, MSG_ERRQUEUE), -1); + EXPECT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK); + + // provide some application data, error queue should be empty if it exists + // However, UDS reads application data as error message + int test_data = 123456; + WriteCallback wcb; + socket->write(&wcb, &test_data, sizeof(test_data)); + recv_data = 0; + ASSERT_NE(recvmsg(fd[1], &msgh, MSG_ERRQUEUE), -1); + ASSERT_EQ(recv_data, test_data); +#endif // FOLLY_HAVE_MSG_ERRQUEUE +} +#endif