Adds writer test case for RCU
[folly.git] / folly / io / async / test / EventHandlerTest.cpp
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <bitset>
18 #include <future>
19 #include <thread>
20
21 #include <folly/MPMCQueue.h>
22 #include <folly/ScopeGuard.h>
23 #include <folly/io/async/EventBase.h>
24 #include <folly/io/async/EventHandler.h>
25 #include <folly/portability/GMock.h>
26 #include <folly/portability/GTest.h>
27 #include <folly/portability/Sockets.h>
28 #include <sys/eventfd.h>
29
30 using namespace std;
31 using namespace folly;
32 using namespace testing;
33
34 void runInThreadsAndWait(
35     size_t nthreads, function<void(size_t)> cb) {
36   vector<thread> threads(nthreads);
37   for (size_t i = 0; i < nthreads; ++i) {
38     threads[i] = thread(cb, i);
39   }
40   for (size_t i = 0; i < nthreads; ++i) {
41     threads[i].join();
42   }
43 }
44
45 void runInThreadsAndWait(vector<function<void()>> cbs) {
46   runInThreadsAndWait(cbs.size(), [&](size_t k) { cbs[k](); });
47 }
48
49 class EventHandlerMock : public EventHandler {
50  public:
51   EventHandlerMock(EventBase* eb, int fd) : EventHandler(eb, fd) {}
52   // gmock can't mock noexcept methods, so we need an intermediary
53   MOCK_METHOD1(_handlerReady, void(uint16_t));
54   void handlerReady(uint16_t events) noexcept override {
55     _handlerReady(events);
56   }
57 };
58
59 class EventHandlerTest : public Test {
60  public:
61   int efd = 0;
62
63   void SetUp() override {
64     efd = eventfd(0, EFD_SEMAPHORE);
65     ASSERT_THAT(efd, Gt(0));
66   }
67
68   void TearDown() override {
69     if (efd > 0) {
70       close(efd);
71     }
72     efd = 0;
73   }
74
75   void efd_write(uint64_t val) {
76     write(efd, &val, sizeof(val));
77   }
78
79   uint64_t efd_read() {
80     uint64_t val = 0;
81     read(efd, &val, sizeof(val));
82     return val;
83   }
84 };
85
86 TEST_F(EventHandlerTest, simple) {
87   const size_t writes = 4;
88   size_t readsRemaining = writes;
89
90   EventBase eb;
91   EventHandlerMock eh(&eb, efd);
92   eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
93   EXPECT_CALL(eh, _handlerReady(_))
94       .Times(writes)
95       .WillRepeatedly(Invoke([&](uint16_t /* events */) {
96         efd_read();
97         if (--readsRemaining == 0) {
98           eh.unregisterHandler();
99         }
100       }));
101   efd_write(writes);
102   eb.loop();
103
104   EXPECT_EQ(0, readsRemaining);
105 }
106
107 TEST_F(EventHandlerTest, many_concurrent_producers) {
108   const size_t writes = 200;
109   const size_t nproducers = 20;
110   size_t readsRemaining = writes;
111
112   runInThreadsAndWait({
113       [&] {
114         EventBase eb;
115         EventHandlerMock eh(&eb, efd);
116         eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
117         EXPECT_CALL(eh, _handlerReady(_))
118             .Times(writes)
119             .WillRepeatedly(Invoke([&](uint16_t /* events */) {
120               efd_read();
121               if (--readsRemaining == 0) {
122                 eh.unregisterHandler();
123               }
124             }));
125         eb.loop();
126       },
127       [&] {
128         runInThreadsAndWait(nproducers,
129                             [&](size_t /* k */) {
130                               for (size_t i = 0; i < writes / nproducers; ++i) {
131                                 this_thread::sleep_for(
132                                     std::chrono::milliseconds(1));
133                                 efd_write(1);
134                               }
135                             });
136       },
137   });
138
139   EXPECT_EQ(0, readsRemaining);
140 }
141
142 TEST_F(EventHandlerTest, many_concurrent_consumers) {
143   const size_t writes = 200;
144   const size_t nproducers = 8;
145   const size_t nconsumers = 20;
146   atomic<size_t> writesRemaining(writes);
147   atomic<size_t> readsRemaining(writes);
148
149   MPMCQueue<nullptr_t> queue(writes / 10);
150
151   runInThreadsAndWait({
152       [&] {
153         runInThreadsAndWait(
154             nconsumers,
155             [&](size_t /* k */) {
156               size_t thReadsRemaining = writes / nconsumers;
157               EventBase eb;
158               EventHandlerMock eh(&eb, efd);
159               eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
160               EXPECT_CALL(eh, _handlerReady(_))
161                   .WillRepeatedly(Invoke([&](uint16_t /* events */) {
162                     nullptr_t val;
163                     if (!queue.readIfNotEmpty(val)) {
164                       return;
165                     }
166                     efd_read();
167                     --readsRemaining;
168                     if (--thReadsRemaining == 0) {
169                       eh.unregisterHandler();
170                     }
171                   }));
172               eb.loop();
173             });
174       },
175       [&] {
176         runInThreadsAndWait(nproducers,
177                             [&](size_t /* k */) {
178                               for (size_t i = 0; i < writes / nproducers; ++i) {
179                                 this_thread::sleep_for(
180                                     std::chrono::milliseconds(1));
181                                 queue.blockingWrite(nullptr);
182                                 efd_write(1);
183                                 --writesRemaining;
184                               }
185                             });
186       },
187   });
188
189   EXPECT_EQ(0, writesRemaining);
190   EXPECT_EQ(0, readsRemaining);
191 }
192
193 #ifdef EV_PRI
194 //
195 // See rfc6093 for extensive discussion on TCP URG semantics. Specificaly,
196 // it points out that URG mechanism was never intended to be used
197 // for out-of-band information delivery. However, pretty much every
198 // implementation interprets the LAST octect or urgent data as the
199 // OOB byte.
200 //
201 class EventHandlerOobTest : public ::testing::Test {
202  public:
203   //
204   // Wait for port number to connect to, then connect and invoke
205   // clientOps(fd) where fd is the connection file descriptor
206   //
207   void runClient(std::function<void(int fd)> clientOps) {
208     clientThread = std::thread(
209         [ serverPortFuture = serverReady.get_future(), clientOps ]() mutable {
210           int clientFd = socket(AF_INET, SOCK_STREAM, 0);
211           SCOPE_EXIT {
212             close(clientFd);
213           };
214           struct hostent* he{nullptr};
215           struct sockaddr_in server;
216
217           std::array<const char, 10> hostname = {"localhost"};
218           he = gethostbyname(hostname.data());
219           PCHECK(he);
220
221           memcpy(&server.sin_addr, he->h_addr_list[0], he->h_length);
222           server.sin_family = AF_INET;
223
224           // block here until port is known
225           server.sin_port = serverPortFuture.get();
226           LOG(INFO) << "Server is ready";
227
228           PCHECK(
229               ::connect(clientFd, (struct sockaddr*)&server, sizeof(server)) ==
230               0);
231           LOG(INFO) << "Server connection available";
232
233           clientOps(clientFd);
234         });
235   }
236
237   //
238   // Bind, get port number, pass it to client, listen/accept and store the
239   // accepted fd
240   //
241   void acceptConn() {
242     // make the server.
243     int listenfd = socket(AF_INET, SOCK_STREAM, 0);
244     SCOPE_EXIT {
245       close(listenfd);
246     };
247     PCHECK(listenfd != -1) << "unable to open socket";
248
249     struct sockaddr_in sin;
250     sin.sin_port = htons(0);
251     sin.sin_addr.s_addr = INADDR_ANY;
252     sin.sin_family = AF_INET;
253
254     PCHECK(bind(listenfd, (struct sockaddr*)&sin, sizeof(sin)) >= 0)
255         << "Can't bind to port";
256     listen(listenfd, 5);
257
258     struct sockaddr_in findSockName;
259     socklen_t sz = sizeof(findSockName);
260     getsockname(listenfd, (struct sockaddr*)&findSockName, &sz);
261     serverReady.set_value(findSockName.sin_port);
262
263     struct sockaddr_in cli_addr;
264     socklen_t clilen = sizeof(cli_addr);
265     serverFd = accept(listenfd, (struct sockaddr*)&cli_addr, &clilen);
266     PCHECK(serverFd >= 0) << "can't accept";
267   }
268
269   void SetUp() override {}
270
271   void TearDown() override {
272     clientThread.join();
273     close(serverFd);
274   }
275
276   EventBase eb;
277   std::thread clientThread;
278   std::promise<decltype(sockaddr_in::sin_port)> serverReady;
279   int serverFd{-1};
280 };
281
282 //
283 // Test that sending OOB data is detected by event handler
284 //
285 TEST_F(EventHandlerOobTest, EPOLLPRI) {
286   auto clientOps = [](int fd) {
287     char buffer[] = "banana";
288     int n = send(fd, buffer, strlen(buffer) + 1, MSG_OOB);
289     LOG(INFO) << "Client send finished";
290     PCHECK(n > 0);
291   };
292
293   runClient(clientOps);
294   acceptConn();
295
296   struct SockEvent : public EventHandler {
297     SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), fd_(fd) {}
298
299     void handlerReady(uint16_t events) noexcept override {
300       EXPECT_TRUE(EventHandler::EventFlags::PRI & events);
301       std::array<char, 255> buffer;
302       int n = read(fd_, buffer.data(), buffer.size());
303       //
304       // NB: we sent 7 bytes, but only received 6. The last byte
305       // has been stored in the OOB buffer.
306       //
307       EXPECT_EQ(6, n);
308       EXPECT_EQ("banana", std::string(buffer.data(), 6));
309       // now read the byte stored in OOB buffer
310       n = recv(fd_, buffer.data(), buffer.size(), MSG_OOB);
311       EXPECT_EQ(1, n);
312     }
313
314    private:
315     int fd_;
316   } sockHandler(&eb, serverFd);
317
318   sockHandler.registerHandler(EventHandler::EventFlags::PRI);
319   LOG(INFO) << "Registered Handler";
320   eb.loop();
321 }
322
323 //
324 // Test if we can send an OOB byte and then normal data
325 //
326 TEST_F(EventHandlerOobTest, OOB_AND_NORMAL_DATA) {
327   auto clientOps = [](int sockfd) {
328     {
329       // OOB buffer can only hold one byte in most implementations
330       std::array<char, 2> buffer = {"X"};
331       int n = send(sockfd, buffer.data(), 1, MSG_OOB);
332       PCHECK(n > 0);
333     }
334
335     {
336       std::array<char, 7> buffer = {"banana"};
337       int n = send(sockfd, buffer.data(), buffer.size(), 0);
338       PCHECK(n > 0);
339     }
340   };
341
342   runClient(clientOps);
343   acceptConn();
344
345   struct SockEvent : public EventHandler {
346     SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), eb_(eb), fd_(fd) {}
347
348     void handlerReady(uint16_t events) noexcept override {
349       std::array<char, 255> buffer;
350       if (events & EventHandler::EventFlags::PRI) {
351         int n = recv(fd_, buffer.data(), buffer.size(), MSG_OOB);
352         EXPECT_EQ(1, n);
353         EXPECT_EQ("X", std::string(buffer.data(), 1));
354         registerHandler(EventHandler::EventFlags::READ);
355         return;
356       }
357
358       if (events & EventHandler::EventFlags::READ) {
359         int n = recv(fd_, buffer.data(), buffer.size(), 0);
360         EXPECT_EQ(7, n);
361         EXPECT_EQ("banana", std::string(buffer.data()));
362         eb_->terminateLoopSoon();
363         return;
364       }
365     }
366
367    private:
368     EventBase* eb_;
369     int fd_;
370   } sockHandler(&eb, serverFd);
371   sockHandler.registerHandler(
372       EventHandler::EventFlags::PRI | EventHandler::EventFlags::READ);
373   LOG(INFO) << "Registered Handler";
374   eb.loopForever();
375 }
376
377 //
378 // Demonstrate that "regular" reads ignore the OOB byte sent to us
379 //
380 TEST_F(EventHandlerOobTest, SWALLOW_OOB) {
381   auto clientOps = [](int sockfd) {
382     {
383       std::array<char, 2> buffer = {"X"};
384       int n = send(sockfd, buffer.data(), 1, MSG_OOB);
385       PCHECK(n > 0);
386     }
387
388     {
389       std::array<char, 7> buffer = {"banana"};
390       int n = send(sockfd, buffer.data(), buffer.size(), 0);
391       PCHECK(n > 0);
392     }
393   };
394
395   runClient(clientOps);
396   acceptConn();
397
398   struct SockEvent : public EventHandler {
399     SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), fd_(fd) {}
400
401     void handlerReady(uint16_t events) noexcept override {
402       std::array<char, 255> buffer;
403       ASSERT_TRUE(events & EventHandler::EventFlags::READ);
404       int n = recv(fd_, buffer.data(), buffer.size(), 0);
405       EXPECT_EQ(7, n);
406       EXPECT_EQ("banana", std::string(buffer.data()));
407     }
408
409    private:
410     int fd_;
411   } sockHandler(&eb, serverFd);
412   sockHandler.registerHandler(EventHandler::EventFlags::READ);
413   LOG(INFO) << "Registered Handler";
414   eb.loop();
415 }
416 #endif