X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2Fasync%2Ftest%2FEventHandlerTest.cpp;h=feb2f77414bd0c4aab1620b430033f04c5cb0691;hb=bda67fde120837b77ddab74f23abcb22ae5b3029;hp=9e0c6b9df428ae7988cbe421b4fb3780373ed4e1;hpb=4928fe780da82279573a672c10db48be6244da9f;p=folly.git diff --git a/folly/io/async/test/EventHandlerTest.cpp b/folly/io/async/test/EventHandlerTest.cpp index 9e0c6b9d..feb2f774 100644 --- a/folly/io/async/test/EventHandlerTest.cpp +++ b/folly/io/async/test/EventHandlerTest.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2014 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,16 +14,18 @@ * limitations under the License. */ -#include - -#include +#include +#include #include + #include #include #include - -#include -#include +#include +#include +#include +#include +#include using namespace std; using namespace folly; @@ -45,7 +47,7 @@ void runInThreadsAndWait(vector> cbs) { } class EventHandlerMock : public EventHandler { -public: + public: EventHandlerMock(EventBase* eb, int fd) : EventHandler(eb, fd) {} // gmock can't mock noexcept methods, so we need an intermediary MOCK_METHOD1(_handlerReady, void(uint16_t)); @@ -55,7 +57,7 @@ public: }; class EventHandlerTest : public Test { -public: + public: int efd = 0; void SetUp() override { @@ -89,13 +91,13 @@ TEST_F(EventHandlerTest, simple) { EventHandlerMock eh(&eb, efd); eh.registerHandler(EventHandler::READ | EventHandler::PERSIST); EXPECT_CALL(eh, _handlerReady(_)) - .Times(writes) - .WillRepeatedly(Invoke([&](uint16_t events) { - efd_read(); - if (--readsRemaining == 0) { - eh.unregisterHandler(); - } - })); + .Times(writes) + .WillRepeatedly(Invoke([&](uint16_t /* events */) { + efd_read(); + if (--readsRemaining == 0) { + eh.unregisterHandler(); + } + })); efd_write(writes); eb.loop(); @@ -108,28 +110,29 @@ TEST_F(EventHandlerTest, many_concurrent_producers) { size_t readsRemaining = writes; runInThreadsAndWait({ - [&] { - EventBase eb; - EventHandlerMock eh(&eb, efd); - eh.registerHandler(EventHandler::READ | EventHandler::PERSIST); - EXPECT_CALL(eh, _handlerReady(_)) - .Times(writes) - .WillRepeatedly(Invoke([&](uint16_t events) { - efd_read(); - if (--readsRemaining == 0) { - eh.unregisterHandler(); - } - })); - eb.loop(); - }, - [&] { - runInThreadsAndWait(nproducers, [&](size_t k) { - for (size_t i = 0; i < writes / nproducers; ++i) { - this_thread::sleep_for(chrono::milliseconds(1)); - efd_write(1); - } - }); - }, + [&] { + EventBase eb; + EventHandlerMock eh(&eb, efd); + eh.registerHandler(EventHandler::READ | EventHandler::PERSIST); + EXPECT_CALL(eh, _handlerReady(_)) + .Times(writes) + .WillRepeatedly(Invoke([&](uint16_t /* events */) { + efd_read(); + if (--readsRemaining == 0) { + eh.unregisterHandler(); + } + })); + eb.loop(); + }, + [&] { + runInThreadsAndWait(nproducers, + [&](size_t /* k */) { + for (size_t i = 0; i < writes / nproducers; ++i) { + this_thread::sleep_for(chrono::milliseconds(1)); + efd_write(1); + } + }); + }, }); EXPECT_EQ(0, readsRemaining); @@ -145,39 +148,267 @@ TEST_F(EventHandlerTest, many_concurrent_consumers) { MPMCQueue queue(writes / 10); runInThreadsAndWait({ - [&] { - runInThreadsAndWait(nconsumers, [&](size_t k) { - size_t thReadsRemaining = writes / nconsumers; - EventBase eb; - EventHandlerMock eh(&eb, efd); - eh.registerHandler(EventHandler::READ | EventHandler::PERSIST); - EXPECT_CALL(eh, _handlerReady(_)) - .WillRepeatedly(Invoke([&](uint16_t events) { - nullptr_t val; - if (!queue.readIfNotEmpty(val)) { - return; - } - efd_read(); - --readsRemaining; - if (--thReadsRemaining == 0) { - eh.unregisterHandler(); - } - })); - eb.loop(); - }); - }, - [&] { - runInThreadsAndWait(nproducers, [&](size_t k) { - for (size_t i = 0; i < writes / nproducers; ++i) { - this_thread::sleep_for(chrono::milliseconds(1)); - queue.blockingWrite(nullptr); - efd_write(1); - --writesRemaining; - } - }); - }, + [&] { + runInThreadsAndWait( + nconsumers, + [&](size_t /* k */) { + size_t thReadsRemaining = writes / nconsumers; + EventBase eb; + EventHandlerMock eh(&eb, efd); + eh.registerHandler(EventHandler::READ | EventHandler::PERSIST); + EXPECT_CALL(eh, _handlerReady(_)) + .WillRepeatedly(Invoke([&](uint16_t /* events */) { + nullptr_t val; + if (!queue.readIfNotEmpty(val)) { + return; + } + efd_read(); + --readsRemaining; + if (--thReadsRemaining == 0) { + eh.unregisterHandler(); + } + })); + eb.loop(); + }); + }, + [&] { + runInThreadsAndWait(nproducers, + [&](size_t /* k */) { + for (size_t i = 0; i < writes / nproducers; ++i) { + this_thread::sleep_for(chrono::milliseconds(1)); + queue.blockingWrite(nullptr); + efd_write(1); + --writesRemaining; + } + }); + }, }); EXPECT_EQ(0, writesRemaining); EXPECT_EQ(0, readsRemaining); } + +#ifdef EV_PRI +// +// See rfc6093 for extensive discussion on TCP URG semantics. Specificaly, +// it points out that URG mechanism was never intended to be used +// for out-of-band information delivery. However, pretty much every +// implementation interprets the LAST octect or urgent data as the +// OOB byte. +// +class EventHandlerOobTest : public ::testing::Test { + public: + // + // Wait for port number to connect to, then connect and invoke + // clientOps(fd) where fd is the connection file descriptor + // + void runClient(std::function clientOps) { + clientThread = std::thread( + [ serverPortFuture = serverReady.get_future(), clientOps ]() mutable { + int clientFd = socket(AF_INET, SOCK_STREAM, 0); + SCOPE_EXIT { + close(clientFd); + }; + struct hostent* he{nullptr}; + struct sockaddr_in server; + + std::array hostname = {"localhost"}; + he = gethostbyname(hostname.data()); + PCHECK(he); + + memcpy(&server.sin_addr, he->h_addr_list[0], he->h_length); + server.sin_family = AF_INET; + + // block here until port is known + server.sin_port = serverPortFuture.get(); + LOG(INFO) << "Server is ready"; + + PCHECK( + ::connect(clientFd, (struct sockaddr*)&server, sizeof(server)) == + 0); + LOG(INFO) << "Server connection available"; + + clientOps(clientFd); + }); + } + + // + // Bind, get port number, pass it to client, listen/accept and store the + // accepted fd + // + void acceptConn() { + // make the server. + int listenfd = socket(AF_INET, SOCK_STREAM, 0); + SCOPE_EXIT { + close(listenfd); + }; + PCHECK(listenfd != -1) << "unable to open socket"; + + struct sockaddr_in sin; + sin.sin_port = htons(0); + sin.sin_addr.s_addr = INADDR_ANY; + sin.sin_family = AF_INET; + + PCHECK(bind(listenfd, (struct sockaddr*)&sin, sizeof(sin)) >= 0) + << "Can't bind to port"; + listen(listenfd, 5); + + struct sockaddr_in findSockName; + socklen_t sz = sizeof(findSockName); + getsockname(listenfd, (struct sockaddr*)&findSockName, &sz); + serverReady.set_value(findSockName.sin_port); + + struct sockaddr_in cli_addr; + socklen_t clilen = sizeof(cli_addr); + serverFd = accept(listenfd, (struct sockaddr*)&cli_addr, &clilen); + PCHECK(serverFd >= 0) << "can't accept"; + } + + void SetUp() override {} + + void TearDown() override { + clientThread.join(); + close(serverFd); + } + + EventBase eb; + std::thread clientThread; + std::promise serverReady; + int serverFd{-1}; +}; + +// +// Test that sending OOB data is detected by event handler +// +TEST_F(EventHandlerOobTest, EPOLLPRI) { + auto clientOps = [](int fd) { + char buffer[] = "banana"; + int n = send(fd, buffer, strlen(buffer) + 1, MSG_OOB); + LOG(INFO) << "Client send finished"; + PCHECK(n > 0); + }; + + runClient(clientOps); + acceptConn(); + + struct SockEvent : public EventHandler { + SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), fd_(fd) {} + + void handlerReady(uint16_t events) noexcept override { + EXPECT_TRUE(EventHandler::EventFlags::PRI & events); + std::array buffer; + int n = read(fd_, buffer.data(), buffer.size()); + // + // NB: we sent 7 bytes, but only received 6. The last byte + // has been stored in the OOB buffer. + // + EXPECT_EQ(6, n); + EXPECT_EQ("banana", std::string(buffer.data(), 6)); + // now read the byte stored in OOB buffer + n = recv(fd_, buffer.data(), buffer.size(), MSG_OOB); + EXPECT_EQ(1, n); + } + + private: + int fd_; + } sockHandler(&eb, serverFd); + + sockHandler.registerHandler(EventHandler::EventFlags::PRI); + LOG(INFO) << "Registered Handler"; + eb.loop(); +} + +// +// Test if we can send an OOB byte and then normal data +// +TEST_F(EventHandlerOobTest, OOB_AND_NORMAL_DATA) { + auto clientOps = [](int sockfd) { + { + // OOB buffer can only hold one byte in most implementations + std::array buffer = {"X"}; + int n = send(sockfd, buffer.data(), 1, MSG_OOB); + PCHECK(n > 0); + } + + { + std::array buffer = {"banana"}; + int n = send(sockfd, buffer.data(), buffer.size(), 0); + PCHECK(n > 0); + } + }; + + runClient(clientOps); + acceptConn(); + + struct SockEvent : public EventHandler { + SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), eb_(eb), fd_(fd) {} + + void handlerReady(uint16_t events) noexcept override { + std::array buffer; + if (events & EventHandler::EventFlags::PRI) { + int n = recv(fd_, buffer.data(), buffer.size(), MSG_OOB); + EXPECT_EQ(1, n); + EXPECT_EQ("X", std::string(buffer.data(), 1)); + registerHandler(EventHandler::EventFlags::READ); + return; + } + + if (events & EventHandler::EventFlags::READ) { + int n = recv(fd_, buffer.data(), buffer.size(), 0); + EXPECT_EQ(7, n); + EXPECT_EQ("banana", std::string(buffer.data())); + eb_->terminateLoopSoon(); + return; + } + } + + private: + EventBase* eb_; + int fd_; + } sockHandler(&eb, serverFd); + sockHandler.registerHandler( + EventHandler::EventFlags::PRI | EventHandler::EventFlags::READ); + LOG(INFO) << "Registered Handler"; + eb.loopForever(); +} + +// +// Demonstrate that "regular" reads ignore the OOB byte sent to us +// +TEST_F(EventHandlerOobTest, SWALLOW_OOB) { + auto clientOps = [](int sockfd) { + { + std::array buffer = {"X"}; + int n = send(sockfd, buffer.data(), 1, MSG_OOB); + PCHECK(n > 0); + } + + { + std::array buffer = {"banana"}; + int n = send(sockfd, buffer.data(), buffer.size(), 0); + PCHECK(n > 0); + } + }; + + runClient(clientOps); + acceptConn(); + + struct SockEvent : public EventHandler { + SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), fd_(fd) {} + + void handlerReady(uint16_t events) noexcept override { + std::array buffer; + ASSERT_TRUE(events & EventHandler::EventFlags::READ); + int n = recv(fd_, buffer.data(), buffer.size(), 0); + EXPECT_EQ(7, n); + EXPECT_EQ("banana", std::string(buffer.data())); + } + + private: + int fd_; + } sockHandler(&eb, serverFd); + sockHandler.registerHandler(EventHandler::EventFlags::READ); + LOG(INFO) << "Registered Handler"; + eb.loop(); +} +#endif