ASSERT_FALSE(socket->isClosedByPeer());
}
+/**
+ * Test writing to a socket that has its read side closed
+ */
+TEST(AsyncSocketTest, WriteAfterReadEOF) {
+ TestServer server;
+
+ // connect()
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket =
+ AsyncSocket::newSocket(&evb, server.getAddress(), 30);
+ evb.loop(); // loop until the socket is connected
+
+ // Accept the connection
+ std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
+ ReadCallback rcb;
+ acceptedSocket->setReadCB(&rcb);
+
+ // Shutdown the write side of client socket (read side of server socket)
+ socket->shutdownWrite();
+ evb.loop();
+
+ // Check that accepted socket is still writable
+ ASSERT_FALSE(acceptedSocket->good());
+ ASSERT_TRUE(acceptedSocket->writable());
+
+ // Write data to accepted socket
+ constexpr size_t simpleBufLength = 5;
+ char simpleBuf[simpleBufLength];
+ memset(simpleBuf, 'a', simpleBufLength);
+ WriteCallback wcb;
+ acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
+ evb.loop();
+
+ // Make sure we were able to write even after getting a read EOF
+ ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+}
+
/**
* Test that bytes written is correctly computed in case of write failure
*/
};
TEST(AsyncSocketTest, EvbCallbacks) {
- auto cb = folly::make_unique<MockEvbChangeCallback>();
+ auto cb = std::make_unique<MockEvbChangeCallback>();
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
socket->attachEventBase(&evb);
}
-#ifdef MSG_ERRQUEUE
+TEST(AsyncSocketTest, TestEvbDetachWtRegisteredIOHandlers) {
+ // Start listening on a local port
+ TestServer server;
+
+ // Connect using a AsyncSocket
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ ConnCallback cb;
+ socket->connect(&cb, server.getAddress(), 30);
+
+ evb.loop();
+
+ ASSERT_EQ(cb.state, STATE_SUCCEEDED);
+ EXPECT_LE(0, socket->getConnectTime().count());
+ EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
+
+ // After the ioHandlers are registered, still should be able to detach/attach
+ ReadCallback rcb;
+ socket->setReadCB(&rcb);
+
+ auto cbEvbChg = std::make_unique<MockEvbChangeCallback>();
+ InSequence seq;
+ EXPECT_CALL(*cbEvbChg, evbDetached(socket.get())).Times(1);
+ EXPECT_CALL(*cbEvbChg, evbAttached(socket.get())).Times(1);
+
+ socket->setEvbChangedCallback(std::move(cbEvbChg));
+ EXPECT_TRUE(socket->isDetachable());
+ socket->detachEventBase();
+ socket->attachEventBase(&evb);
+
+ socket->close();
+}
+
+#ifdef FOLLY_HAVE_MSG_ERRQUEUE
/* copied from include/uapi/linux/net_tstamp.h */
/* SO_TIMESTAMPING gets an integer bit field comprised of these values */
enum SOF_TIMESTAMPING {
SOF_TIMESTAMPING_OPT_CMSG = (1 << 10),
SOF_TIMESTAMPING_OPT_TSONLY = (1 << 11),
};
+
+class TestErrMessageCallback : public folly::AsyncSocket::ErrMessageCallback {
+ public:
+ TestErrMessageCallback()
+ : exception_(folly::AsyncSocketException::UNKNOWN, "none") {}
+
+ void errMessage(const cmsghdr& cmsg) noexcept override {
+ if (cmsg.cmsg_level == SOL_SOCKET && cmsg.cmsg_type == SCM_TIMESTAMPING) {
+ gotTimestamp_++;
+ checkResetCallback();
+ } else if (
+ (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
+ (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR)) {
+ gotByteSeq_++;
+ checkResetCallback();
+ }
+ }
+
+ void errMessageError(
+ const folly::AsyncSocketException& ex) noexcept override {
+ exception_ = ex;
+ }
+
+ void checkResetCallback() noexcept {
+ if (socket_ != nullptr && resetAfter_ != -1 &&
+ gotTimestamp_ + gotByteSeq_ == resetAfter_) {
+ socket_->setErrMessageCB(nullptr);
+ }
+ }
+
+ folly::AsyncSocket* socket_{nullptr};
+ folly::AsyncSocketException exception_;
+ int gotTimestamp_{0};
+ int gotByteSeq_{0};
+ int resetAfter_{-1};
+};
+
TEST(AsyncSocketTest, ErrMessageCallback) {
TestServer server;
ASSERT_EQ(socket->getErrMessageCallback(),
static_cast<folly::AsyncSocket::ErrMessageCallback*>(&errMsgCB));
+ errMsgCB.socket_ = socket.get();
+ errMsgCB.resetAfter_ = 3;
+
// Enable timestamp notifications
ASSERT_GT(socket->getFd(), 0);
int flags = SOF_TIMESTAMPING_OPT_ID
// write()
std::vector<uint8_t> wbuf(128, 'a');
WriteCallback wcb;
- socket->write(&wcb, wbuf.data(), wbuf.size());
+ // Send two packets to get two EOM notifications
+ socket->write(&wcb, wbuf.data(), wbuf.size() / 2);
+ socket->write(&wcb, wbuf.data() + wbuf.size() / 2, wbuf.size() / 2);
// Accept the connection.
std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
// Check for the timestamp notifications.
ASSERT_EQ(errMsgCB.exception_.type_, folly::AsyncSocketException::UNKNOWN);
- ASSERT_TRUE(errMsgCB.gotByteSeq_);
- ASSERT_TRUE(errMsgCB.gotTimestamp_);
+ ASSERT_GT(errMsgCB.gotByteSeq_, 0);
+ ASSERT_GT(errMsgCB.gotTimestamp_, 0);
+ ASSERT_EQ(
+ errMsgCB.gotByteSeq_ + errMsgCB.gotTimestamp_, errMsgCB.resetAfter_);
}
-#endif // MSG_ERRQUEUE
+#endif // FOLLY_HAVE_MSG_ERRQUEUE
TEST(AsyncSocket, PreReceivedData) {
TestServer server;
evb.loop();
}
+#ifdef MSG_NOSIGNAL
TEST(AsyncSocketTest, SendMessageFlags) {
TestServer server;
TestSendMsgParamsCallback sendMsgCB(
}
TEST(AsyncSocketTest, SendMessageAncillaryData) {
- struct sockaddr_un addr = {AF_UNIX,
- "AsyncSocketTest.SendMessageAncillaryData\0"};
-
- // Clean up the name in the name space we're going to use
- ASSERT_FALSE(remove(addr.sun_path) == -1 && errno != ENOENT);
-
- // Set up listening socket
- int lfd = fsp::socket(AF_UNIX, SOCK_STREAM, 0);
- ASSERT_NE(lfd, -1);
- ASSERT_NE(bind(lfd, (struct sockaddr*)&addr, sizeof(addr)), -1)
- << "Bind failed: " << errno;
-
- // Create the connecting socket
- int csd = fsp::socket(AF_UNIX, SOCK_STREAM, 0);
- ASSERT_NE(csd, -1);
+ int fds[2];
+ EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, fds), 0);
- // Listen for incoming connect
- ASSERT_NE(listen(lfd, 5), -1);
+ // "Client" socket
+ int cfd = fds[0];
+ ASSERT_NE(cfd, -1);
- // Connect to the listening socket
- ASSERT_NE(fsp::connect(csd, (struct sockaddr*)&addr, sizeof(addr)), -1)
- << "Connect request failed: " << errno;
-
- // Accept the connection
- int sfd = accept(lfd, nullptr, nullptr);
+ // "Server" socket
+ int sfd = fds[1];
ASSERT_NE(sfd, -1);
+ SCOPE_EXIT { close(sfd); };
// Instantiate AsyncSocket object for the connected socket
EventBase evb;
- std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, csd);
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, cfd);
// Open a temporary file and write a magic string to it
// We'll transfer the file handle to test the message parameters
// callback logic.
- int tmpfd = open("/var/tmp", O_RDWR | O_TMPFILE);
+ TemporaryFile file(StringPiece(),
+ fs::path(),
+ TemporaryFile::Scope::UNLINK_IMMEDIATELY);
+ int tmpfd = file.fd();
ASSERT_NE(tmpfd, -1) << "Failed to open a temporary file";
std::string magicString("Magic string");
ASSERT_EQ(write(tmpfd, magicString.c_str(), magicString.length()),
int fd = 0;
memcpy(&fd, CMSG_DATA(&r_u.cmh), sizeof(int));
ASSERT_NE(fd, 0);
+ SCOPE_EXIT { close(fd); };
std::vector<uint8_t> transferredMagicString(magicString.length() + 1, 0);
magicString.end(),
transferredMagicString.begin()));
}
+
+TEST(AsyncSocketTest, UnixDomainSocketErrMessageCB) {
+ // In the latest stable kernel 4.14.3 as of 2017-12-04, Unix Domain
+ // Socket (UDS) does not support MSG_ERRQUEUE. So
+ // recvmsg(MSG_ERRQUEUE) will read application data from UDS which
+ // breaks application message flow. To avoid this problem,
+ // AsyncSocket currently disables setErrMessageCB for UDS.
+ //
+ // This tests two things for UDS
+ // 1. setErrMessageCB fails
+ // 2. recvmsg(MSG_ERRQUEUE) reads application data
+ //
+ // Feel free to remove this test if UDS supports MSG_ERRQUEUE in the future.
+
+ int fd[2];
+ EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, fd), 0);
+ ASSERT_NE(fd[0], -1);
+ ASSERT_NE(fd[1], -1);
+ SCOPE_EXIT {
+ close(fd[1]);
+ };
+
+ EXPECT_EQ(fcntl(fd[0], F_SETFL, O_NONBLOCK), 0);
+ EXPECT_EQ(fcntl(fd[1], F_SETFL, O_NONBLOCK), 0);
+
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, fd[0]);
+
+ // setErrMessageCB should fail for unix domain socket
+ TestErrMessageCallback errMsgCB;
+ ASSERT_NE(&errMsgCB, nullptr);
+ socket->setErrMessageCB(&errMsgCB);
+ ASSERT_EQ(socket->getErrMessageCallback(), nullptr);
+
+#ifdef FOLLY_HAVE_MSG_ERRQUEUE
+ // The following verifies that MSG_ERRQUEUE does not work for UDS,
+ // and recvmsg reads application data
+ union {
+ // Space large enough to hold an 'int'
+ char control[CMSG_SPACE(sizeof(int))];
+ struct cmsghdr cmh;
+ } r_u;
+ struct msghdr msgh;
+ struct iovec iov;
+ int recv_data = 0;
+
+ msgh.msg_control = r_u.control;
+ msgh.msg_controllen = sizeof(r_u.control);
+ msgh.msg_name = nullptr;
+ msgh.msg_namelen = 0;
+ msgh.msg_iov = &iov;
+ msgh.msg_iovlen = 1;
+ iov.iov_base = &recv_data;
+ iov.iov_len = sizeof(recv_data);
+
+ // there is no data, recvmsg should fail
+ EXPECT_EQ(recvmsg(fd[1], &msgh, MSG_ERRQUEUE), -1);
+ EXPECT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK);
+
+ // provide some application data, error queue should be empty if it exists
+ // However, UDS reads application data as error message
+ int test_data = 123456;
+ WriteCallback wcb;
+ socket->write(&wcb, &test_data, sizeof(test_data));
+ recv_data = 0;
+ ASSERT_NE(recvmsg(fd[1], &msgh, MSG_ERRQUEUE), -1);
+ ASSERT_EQ(recv_data, test_data);
+#endif // FOLLY_HAVE_MSG_ERRQUEUE
+}
+#endif