2 * Copyright 2014-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
21 #include <folly/MPMCQueue.h>
22 #include <folly/ScopeGuard.h>
23 #include <folly/io/async/EventBase.h>
24 #include <folly/io/async/EventHandler.h>
25 #include <folly/portability/GMock.h>
26 #include <folly/portability/GTest.h>
27 #include <folly/portability/Sockets.h>
28 #include <sys/eventfd.h>
31 using namespace folly;
32 using namespace testing;
34 void runInThreadsAndWait(
35 size_t nthreads, function<void(size_t)> cb) {
36 vector<thread> threads(nthreads);
37 for (size_t i = 0; i < nthreads; ++i) {
38 threads[i] = thread(cb, i);
40 for (size_t i = 0; i < nthreads; ++i) {
45 void runInThreadsAndWait(vector<function<void()>> cbs) {
46 runInThreadsAndWait(cbs.size(), [&](size_t k) { cbs[k](); });
49 class EventHandlerMock : public EventHandler {
51 EventHandlerMock(EventBase* eb, int fd) : EventHandler(eb, fd) {}
52 // gmock can't mock noexcept methods, so we need an intermediary
53 MOCK_METHOD1(_handlerReady, void(uint16_t));
54 void handlerReady(uint16_t events) noexcept override {
55 _handlerReady(events);
59 class EventHandlerTest : public Test {
63 void SetUp() override {
64 efd = eventfd(0, EFD_SEMAPHORE);
65 ASSERT_THAT(efd, Gt(0));
68 void TearDown() override {
75 void efd_write(uint64_t val) {
76 write(efd, &val, sizeof(val));
81 read(efd, &val, sizeof(val));
86 TEST_F(EventHandlerTest, simple) {
87 const size_t writes = 4;
88 size_t readsRemaining = writes;
91 EventHandlerMock eh(&eb, efd);
92 eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
93 EXPECT_CALL(eh, _handlerReady(_))
95 .WillRepeatedly(Invoke([&](uint16_t /* events */) {
97 if (--readsRemaining == 0) {
98 eh.unregisterHandler();
104 EXPECT_EQ(0, readsRemaining);
107 TEST_F(EventHandlerTest, many_concurrent_producers) {
108 const size_t writes = 200;
109 const size_t nproducers = 20;
110 size_t readsRemaining = writes;
112 runInThreadsAndWait({
115 EventHandlerMock eh(&eb, efd);
116 eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
117 EXPECT_CALL(eh, _handlerReady(_))
119 .WillRepeatedly(Invoke([&](uint16_t /* events */) {
121 if (--readsRemaining == 0) {
122 eh.unregisterHandler();
128 runInThreadsAndWait(nproducers,
129 [&](size_t /* k */) {
130 for (size_t i = 0; i < writes / nproducers; ++i) {
131 this_thread::sleep_for(
132 std::chrono::milliseconds(1));
139 EXPECT_EQ(0, readsRemaining);
142 TEST_F(EventHandlerTest, many_concurrent_consumers) {
143 const size_t writes = 200;
144 const size_t nproducers = 8;
145 const size_t nconsumers = 20;
146 atomic<size_t> writesRemaining(writes);
147 atomic<size_t> readsRemaining(writes);
149 MPMCQueue<nullptr_t> queue(writes / 10);
151 runInThreadsAndWait({
155 [&](size_t /* k */) {
156 size_t thReadsRemaining = writes / nconsumers;
158 EventHandlerMock eh(&eb, efd);
159 eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
160 EXPECT_CALL(eh, _handlerReady(_))
161 .WillRepeatedly(Invoke([&](uint16_t /* events */) {
163 if (!queue.readIfNotEmpty(val)) {
168 if (--thReadsRemaining == 0) {
169 eh.unregisterHandler();
176 runInThreadsAndWait(nproducers,
177 [&](size_t /* k */) {
178 for (size_t i = 0; i < writes / nproducers; ++i) {
179 this_thread::sleep_for(
180 std::chrono::milliseconds(1));
181 queue.blockingWrite(nullptr);
189 EXPECT_EQ(0, writesRemaining);
190 EXPECT_EQ(0, readsRemaining);
195 // See rfc6093 for extensive discussion on TCP URG semantics. Specificaly,
196 // it points out that URG mechanism was never intended to be used
197 // for out-of-band information delivery. However, pretty much every
198 // implementation interprets the LAST octect or urgent data as the
201 class EventHandlerOobTest : public ::testing::Test {
204 // Wait for port number to connect to, then connect and invoke
205 // clientOps(fd) where fd is the connection file descriptor
207 void runClient(std::function<void(int fd)> clientOps) {
208 clientThread = std::thread(
209 [ serverPortFuture = serverReady.get_future(), clientOps ]() mutable {
210 int clientFd = socket(AF_INET, SOCK_STREAM, 0);
214 struct hostent* he{nullptr};
215 struct sockaddr_in server;
217 std::array<const char, 10> hostname = {"localhost"};
218 he = gethostbyname(hostname.data());
221 memcpy(&server.sin_addr, he->h_addr_list[0], he->h_length);
222 server.sin_family = AF_INET;
224 // block here until port is known
225 server.sin_port = serverPortFuture.get();
226 LOG(INFO) << "Server is ready";
229 ::connect(clientFd, (struct sockaddr*)&server, sizeof(server)) ==
231 LOG(INFO) << "Server connection available";
238 // Bind, get port number, pass it to client, listen/accept and store the
243 int listenfd = socket(AF_INET, SOCK_STREAM, 0);
247 PCHECK(listenfd != -1) << "unable to open socket";
249 struct sockaddr_in sin;
250 sin.sin_port = htons(0);
251 sin.sin_addr.s_addr = INADDR_ANY;
252 sin.sin_family = AF_INET;
254 PCHECK(bind(listenfd, (struct sockaddr*)&sin, sizeof(sin)) >= 0)
255 << "Can't bind to port";
258 struct sockaddr_in findSockName;
259 socklen_t sz = sizeof(findSockName);
260 getsockname(listenfd, (struct sockaddr*)&findSockName, &sz);
261 serverReady.set_value(findSockName.sin_port);
263 struct sockaddr_in cli_addr;
264 socklen_t clilen = sizeof(cli_addr);
265 serverFd = accept(listenfd, (struct sockaddr*)&cli_addr, &clilen);
266 PCHECK(serverFd >= 0) << "can't accept";
269 void SetUp() override {}
271 void TearDown() override {
277 std::thread clientThread;
278 std::promise<decltype(sockaddr_in::sin_port)> serverReady;
283 // Test that sending OOB data is detected by event handler
285 TEST_F(EventHandlerOobTest, EPOLLPRI) {
286 auto clientOps = [](int fd) {
287 char buffer[] = "banana";
288 int n = send(fd, buffer, strlen(buffer) + 1, MSG_OOB);
289 LOG(INFO) << "Client send finished";
293 runClient(clientOps);
296 struct SockEvent : public EventHandler {
297 SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), fd_(fd) {}
299 void handlerReady(uint16_t events) noexcept override {
300 EXPECT_TRUE(EventHandler::EventFlags::PRI & events);
301 std::array<char, 255> buffer;
302 int n = read(fd_, buffer.data(), buffer.size());
304 // NB: we sent 7 bytes, but only received 6. The last byte
305 // has been stored in the OOB buffer.
308 EXPECT_EQ("banana", std::string(buffer.data(), 6));
309 // now read the byte stored in OOB buffer
310 n = recv(fd_, buffer.data(), buffer.size(), MSG_OOB);
316 } sockHandler(&eb, serverFd);
318 sockHandler.registerHandler(EventHandler::EventFlags::PRI);
319 LOG(INFO) << "Registered Handler";
324 // Test if we can send an OOB byte and then normal data
326 TEST_F(EventHandlerOobTest, OOB_AND_NORMAL_DATA) {
327 auto clientOps = [](int sockfd) {
329 // OOB buffer can only hold one byte in most implementations
330 std::array<char, 2> buffer = {"X"};
331 int n = send(sockfd, buffer.data(), 1, MSG_OOB);
336 std::array<char, 7> buffer = {"banana"};
337 int n = send(sockfd, buffer.data(), buffer.size(), 0);
342 runClient(clientOps);
345 struct SockEvent : public EventHandler {
346 SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), eb_(eb), fd_(fd) {}
348 void handlerReady(uint16_t events) noexcept override {
349 std::array<char, 255> buffer;
350 if (events & EventHandler::EventFlags::PRI) {
351 int n = recv(fd_, buffer.data(), buffer.size(), MSG_OOB);
353 EXPECT_EQ("X", std::string(buffer.data(), 1));
354 registerHandler(EventHandler::EventFlags::READ);
358 if (events & EventHandler::EventFlags::READ) {
359 int n = recv(fd_, buffer.data(), buffer.size(), 0);
361 EXPECT_EQ("banana", std::string(buffer.data()));
362 eb_->terminateLoopSoon();
370 } sockHandler(&eb, serverFd);
371 sockHandler.registerHandler(
372 EventHandler::EventFlags::PRI | EventHandler::EventFlags::READ);
373 LOG(INFO) << "Registered Handler";
378 // Demonstrate that "regular" reads ignore the OOB byte sent to us
380 TEST_F(EventHandlerOobTest, SWALLOW_OOB) {
381 auto clientOps = [](int sockfd) {
383 std::array<char, 2> buffer = {"X"};
384 int n = send(sockfd, buffer.data(), 1, MSG_OOB);
389 std::array<char, 7> buffer = {"banana"};
390 int n = send(sockfd, buffer.data(), buffer.size(), 0);
395 runClient(clientOps);
398 struct SockEvent : public EventHandler {
399 SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), fd_(fd) {}
401 void handlerReady(uint16_t events) noexcept override {
402 std::array<char, 255> buffer;
403 ASSERT_TRUE(events & EventHandler::EventFlags::READ);
404 int n = recv(fd_, buffer.data(), buffer.size(), 0);
406 EXPECT_EQ("banana", std::string(buffer.data()));
411 } sockHandler(&eb, serverFd);
412 sockHandler.registerHandler(EventHandler::EventFlags::READ);
413 LOG(INFO) << "Registered Handler";