Use the GTest portability headers
[folly.git] / folly / io / async / test / EventHandlerTest.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <bitset>
18 #include <future>
19 #include <thread>
20
21 #include <folly/MPMCQueue.h>
22 #include <folly/ScopeGuard.h>
23 #include <folly/io/async/EventBase.h>
24 #include <folly/io/async/EventHandler.h>
25 #include <folly/portability/GMock.h>
26 #include <folly/portability/GTest.h>
27 #include <folly/portability/Sockets.h>
28 #include <sys/eventfd.h>
29
30 using namespace std;
31 using namespace folly;
32 using namespace testing;
33
34 void runInThreadsAndWait(
35     size_t nthreads, function<void(size_t)> cb) {
36   vector<thread> threads(nthreads);
37   for (size_t i = 0; i < nthreads; ++i) {
38     threads[i] = thread(cb, i);
39   }
40   for (size_t i = 0; i < nthreads; ++i) {
41     threads[i].join();
42   }
43 }
44
45 void runInThreadsAndWait(vector<function<void()>> cbs) {
46   runInThreadsAndWait(cbs.size(), [&](size_t k) { cbs[k](); });
47 }
48
49 class EventHandlerMock : public EventHandler {
50 public:
51   EventHandlerMock(EventBase* eb, int fd) : EventHandler(eb, fd) {}
52   // gmock can't mock noexcept methods, so we need an intermediary
53   MOCK_METHOD1(_handlerReady, void(uint16_t));
54   void handlerReady(uint16_t events) noexcept override {
55     _handlerReady(events);
56   }
57 };
58
59 class EventHandlerTest : public Test {
60 public:
61   int efd = 0;
62
63   void SetUp() override {
64     efd = eventfd(0, EFD_SEMAPHORE);
65     ASSERT_THAT(efd, Gt(0));
66   }
67
68   void TearDown() override {
69     if (efd > 0) {
70       close(efd);
71     }
72     efd = 0;
73   }
74
75   void efd_write(uint64_t val) {
76     write(efd, &val, sizeof(val));
77   }
78
79   uint64_t efd_read() {
80     uint64_t val = 0;
81     read(efd, &val, sizeof(val));
82     return val;
83   }
84 };
85
86 TEST_F(EventHandlerTest, simple) {
87   const size_t writes = 4;
88   size_t readsRemaining = writes;
89
90   EventBase eb;
91   EventHandlerMock eh(&eb, efd);
92   eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
93   EXPECT_CALL(eh, _handlerReady(_))
94       .Times(writes)
95       .WillRepeatedly(Invoke([&](uint16_t /* events */) {
96         efd_read();
97         if (--readsRemaining == 0) {
98           eh.unregisterHandler();
99         }
100       }));
101   efd_write(writes);
102   eb.loop();
103
104   EXPECT_EQ(0, readsRemaining);
105 }
106
107 TEST_F(EventHandlerTest, many_concurrent_producers) {
108   const size_t writes = 200;
109   const size_t nproducers = 20;
110   size_t readsRemaining = writes;
111
112   runInThreadsAndWait({
113       [&] {
114         EventBase eb;
115         EventHandlerMock eh(&eb, efd);
116         eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
117         EXPECT_CALL(eh, _handlerReady(_))
118             .Times(writes)
119             .WillRepeatedly(Invoke([&](uint16_t /* events */) {
120               efd_read();
121               if (--readsRemaining == 0) {
122                 eh.unregisterHandler();
123               }
124             }));
125         eb.loop();
126       },
127       [&] {
128         runInThreadsAndWait(nproducers,
129                             [&](size_t /* k */) {
130                               for (size_t i = 0; i < writes / nproducers; ++i) {
131                                 this_thread::sleep_for(chrono::milliseconds(1));
132                                 efd_write(1);
133                               }
134                             });
135       },
136   });
137
138   EXPECT_EQ(0, readsRemaining);
139 }
140
141 TEST_F(EventHandlerTest, many_concurrent_consumers) {
142   const size_t writes = 200;
143   const size_t nproducers = 8;
144   const size_t nconsumers = 20;
145   atomic<size_t> writesRemaining(writes);
146   atomic<size_t> readsRemaining(writes);
147
148   MPMCQueue<nullptr_t> queue(writes / 10);
149
150   runInThreadsAndWait({
151       [&] {
152         runInThreadsAndWait(
153             nconsumers,
154             [&](size_t /* k */) {
155               size_t thReadsRemaining = writes / nconsumers;
156               EventBase eb;
157               EventHandlerMock eh(&eb, efd);
158               eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
159               EXPECT_CALL(eh, _handlerReady(_))
160                   .WillRepeatedly(Invoke([&](uint16_t /* events */) {
161                     nullptr_t val;
162                     if (!queue.readIfNotEmpty(val)) {
163                       return;
164                     }
165                     efd_read();
166                     --readsRemaining;
167                     if (--thReadsRemaining == 0) {
168                       eh.unregisterHandler();
169                     }
170                   }));
171               eb.loop();
172             });
173       },
174       [&] {
175         runInThreadsAndWait(nproducers,
176                             [&](size_t /* k */) {
177                               for (size_t i = 0; i < writes / nproducers; ++i) {
178                                 this_thread::sleep_for(chrono::milliseconds(1));
179                                 queue.blockingWrite(nullptr);
180                                 efd_write(1);
181                                 --writesRemaining;
182                               }
183                             });
184       },
185   });
186
187   EXPECT_EQ(0, writesRemaining);
188   EXPECT_EQ(0, readsRemaining);
189 }
190
191 #ifdef EV_PRI
192 TEST(EventHandlerSocketTest, EPOLLPRI) {
193   std::promise<decltype(sockaddr_in::sin_port)> serverReady;
194   std::thread t([serverReadyFuture = serverReady.get_future()]() mutable {
195     // client
196     LOG(INFO) << "Server is ready";
197     int sockfd = socket(AF_INET, SOCK_STREAM, 0);
198     SCOPE_EXIT {
199       close(sockfd);
200     };
201     struct hostent* he;
202     struct sockaddr_in server;
203
204     const char hostname[] = "localhost";
205     he = gethostbyname(hostname);
206     PCHECK(he);
207
208     memcpy(&server.sin_addr, he->h_addr_list[0], he->h_length);
209     server.sin_family = AF_INET;
210     server.sin_port = serverReadyFuture.get();
211
212     PCHECK(::connect(sockfd, (struct sockaddr*)&server, sizeof(server)) == 0);
213     LOG(INFO) << "Server connection available";
214
215     char buffer[] = "banana";
216     int n = send(sockfd, buffer, strlen(buffer) + 1, MSG_OOB);
217     PCHECK(n > 0);
218   });
219   SCOPE_EXIT {
220     t.join();
221   };
222   // make the server.
223   int sockfd = socket(AF_INET, SOCK_STREAM, 0);
224   SCOPE_EXIT {
225     close(sockfd);
226   };
227   PCHECK(sockfd != -1) << "unable to open socket";
228
229   struct sockaddr_in sin;
230   sin.sin_port = htons(0);
231   sin.sin_addr.s_addr = INADDR_ANY;
232   sin.sin_family = AF_INET;
233
234   PCHECK(bind(sockfd, (struct sockaddr*)&sin, sizeof(sin)) >= 0)
235       << "Can't bind to port";
236   listen(sockfd, 5);
237
238   struct sockaddr_in findSockName;
239   socklen_t sz = sizeof(findSockName);
240   getsockname(sockfd, (struct sockaddr*)&findSockName, &sz);
241   serverReady.set_value(findSockName.sin_port);
242
243   socklen_t clilen;
244   struct sockaddr_in cli_addr;
245   clilen = sizeof(cli_addr);
246   int newsockfd = accept(sockfd, (struct sockaddr*)&cli_addr, &clilen);
247   PCHECK(newsockfd >= 0) << "can't accept";
248   SCOPE_EXIT {
249     close(newsockfd);
250   };
251
252   EventBase eb;
253   struct SockEvent : public EventHandler {
254     SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), fd_(fd) {}
255
256     void handlerReady(uint16_t events) noexcept override {
257       EXPECT_TRUE(EventHandler::EventFlags::PRI & events);
258       char buffer[256];
259       int n = read(fd_, buffer, 255);
260       EXPECT_EQ(6, n);
261       EXPECT_EQ("banana", std::string(buffer));
262     }
263
264    private:
265     int fd_;
266   } sockHandler(&eb, newsockfd);
267   sockHandler.registerHandler(EventHandler::EventFlags::PRI);
268   LOG(INFO) << "Registered Handler";
269   eb.loop();
270 }
271 #endif