folly copyright 2015 -> copyright 2016
[folly.git] / folly / io / async / test / EventHandlerTest.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/EventHandler.h>
18
19 #include <sys/eventfd.h>
20 #include <thread>
21 #include <folly/MPMCQueue.h>
22 #include <folly/ScopeGuard.h>
23 #include <folly/io/async/EventBase.h>
24
25 #include <gtest/gtest.h>
26 #include <gmock/gmock.h>
27
28 using namespace std;
29 using namespace folly;
30 using namespace testing;
31
32 void runInThreadsAndWait(
33     size_t nthreads, function<void(size_t)> cb) {
34   vector<thread> threads(nthreads);
35   for (size_t i = 0; i < nthreads; ++i) {
36     threads[i] = thread(cb, i);
37   }
38   for (size_t i = 0; i < nthreads; ++i) {
39     threads[i].join();
40   }
41 }
42
43 void runInThreadsAndWait(vector<function<void()>> cbs) {
44   runInThreadsAndWait(cbs.size(), [&](size_t k) { cbs[k](); });
45 }
46
47 class EventHandlerMock : public EventHandler {
48 public:
49   EventHandlerMock(EventBase* eb, int fd) : EventHandler(eb, fd) {}
50   // gmock can't mock noexcept methods, so we need an intermediary
51   MOCK_METHOD1(_handlerReady, void(uint16_t));
52   void handlerReady(uint16_t events) noexcept override {
53     _handlerReady(events);
54   }
55 };
56
57 class EventHandlerTest : public Test {
58 public:
59   int efd = 0;
60
61   void SetUp() override {
62     efd = eventfd(0, EFD_SEMAPHORE);
63     ASSERT_THAT(efd, Gt(0));
64   }
65
66   void TearDown() override {
67     if (efd > 0) {
68       close(efd);
69     }
70     efd = 0;
71   }
72
73   void efd_write(uint64_t val) {
74     write(efd, &val, sizeof(val));
75   }
76
77   uint64_t efd_read() {
78     uint64_t val = 0;
79     read(efd, &val, sizeof(val));
80     return val;
81   }
82 };
83
84 TEST_F(EventHandlerTest, simple) {
85   const size_t writes = 4;
86   size_t readsRemaining = writes;
87
88   EventBase eb;
89   EventHandlerMock eh(&eb, efd);
90   eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
91   EXPECT_CALL(eh, _handlerReady(_))
92       .Times(writes)
93       .WillRepeatedly(Invoke([&](uint16_t /* events */) {
94         efd_read();
95         if (--readsRemaining == 0) {
96           eh.unregisterHandler();
97         }
98       }));
99   efd_write(writes);
100   eb.loop();
101
102   EXPECT_EQ(0, readsRemaining);
103 }
104
105 TEST_F(EventHandlerTest, many_concurrent_producers) {
106   const size_t writes = 200;
107   const size_t nproducers = 20;
108   size_t readsRemaining = writes;
109
110   runInThreadsAndWait({
111       [&] {
112         EventBase eb;
113         EventHandlerMock eh(&eb, efd);
114         eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
115         EXPECT_CALL(eh, _handlerReady(_))
116             .Times(writes)
117             .WillRepeatedly(Invoke([&](uint16_t /* events */) {
118               efd_read();
119               if (--readsRemaining == 0) {
120                 eh.unregisterHandler();
121               }
122             }));
123         eb.loop();
124       },
125       [&] {
126         runInThreadsAndWait(nproducers,
127                             [&](size_t /* k */) {
128                               for (size_t i = 0; i < writes / nproducers; ++i) {
129                                 this_thread::sleep_for(chrono::milliseconds(1));
130                                 efd_write(1);
131                               }
132                             });
133       },
134   });
135
136   EXPECT_EQ(0, readsRemaining);
137 }
138
139 TEST_F(EventHandlerTest, many_concurrent_consumers) {
140   const size_t writes = 200;
141   const size_t nproducers = 8;
142   const size_t nconsumers = 20;
143   atomic<size_t> writesRemaining(writes);
144   atomic<size_t> readsRemaining(writes);
145
146   MPMCQueue<nullptr_t> queue(writes / 10);
147
148   runInThreadsAndWait({
149       [&] {
150         runInThreadsAndWait(
151             nconsumers,
152             [&](size_t /* k */) {
153               size_t thReadsRemaining = writes / nconsumers;
154               EventBase eb;
155               EventHandlerMock eh(&eb, efd);
156               eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
157               EXPECT_CALL(eh, _handlerReady(_))
158                   .WillRepeatedly(Invoke([&](uint16_t /* events */) {
159                     nullptr_t val;
160                     if (!queue.readIfNotEmpty(val)) {
161                       return;
162                     }
163                     efd_read();
164                     --readsRemaining;
165                     if (--thReadsRemaining == 0) {
166                       eh.unregisterHandler();
167                     }
168                   }));
169               eb.loop();
170             });
171       },
172       [&] {
173         runInThreadsAndWait(nproducers,
174                             [&](size_t /* k */) {
175                               for (size_t i = 0; i < writes / nproducers; ++i) {
176                                 this_thread::sleep_for(chrono::milliseconds(1));
177                                 queue.blockingWrite(nullptr);
178                                 efd_write(1);
179                                 --writesRemaining;
180                               }
181                             });
182       },
183   });
184
185   EXPECT_EQ(0, writesRemaining);
186   EXPECT_EQ(0, readsRemaining);
187 }