Copyright 2014->2015
[folly.git] / folly / io / async / test / EventHandlerTest.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/EventHandler.h>
18
19 #include <sys/eventfd.h>
20 #include <thread>
21 #include <folly/MPMCQueue.h>
22 #include <folly/ScopeGuard.h>
23 #include <folly/io/async/EventBase.h>
24
25 #include <gtest/gtest.h>
26 #include <gmock/gmock.h>
27
28 using namespace std;
29 using namespace folly;
30 using namespace testing;
31
32 void runInThreadsAndWait(
33     size_t nthreads, function<void(size_t)> cb) {
34   vector<thread> threads(nthreads);
35   for (size_t i = 0; i < nthreads; ++i) {
36     threads[i] = thread(cb, i);
37   }
38   for (size_t i = 0; i < nthreads; ++i) {
39     threads[i].join();
40   }
41 }
42
43 void runInThreadsAndWait(vector<function<void()>> cbs) {
44   runInThreadsAndWait(cbs.size(), [&](size_t k) { cbs[k](); });
45 }
46
47 class EventHandlerMock : public EventHandler {
48 public:
49   EventHandlerMock(EventBase* eb, int fd) : EventHandler(eb, fd) {}
50   // gmock can't mock noexcept methods, so we need an intermediary
51   MOCK_METHOD1(_handlerReady, void(uint16_t));
52   void handlerReady(uint16_t events) noexcept override {
53     _handlerReady(events);
54   }
55 };
56
57 class EventHandlerTest : public Test {
58 public:
59   int efd = 0;
60
61   void SetUp() override {
62     efd = eventfd(0, EFD_SEMAPHORE);
63     ASSERT_THAT(efd, Gt(0));
64   }
65
66   void TearDown() override {
67     if (efd > 0) {
68       close(efd);
69     }
70     efd = 0;
71   }
72
73   void efd_write(uint64_t val) {
74     write(efd, &val, sizeof(val));
75   }
76
77   uint64_t efd_read() {
78     uint64_t val = 0;
79     read(efd, &val, sizeof(val));
80     return val;
81   }
82 };
83
84 TEST_F(EventHandlerTest, simple) {
85   const size_t writes = 4;
86   size_t readsRemaining = writes;
87
88   EventBase eb;
89   EventHandlerMock eh(&eb, efd);
90   eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
91   EXPECT_CALL(eh, _handlerReady(_))
92     .Times(writes)
93     .WillRepeatedly(Invoke([&](uint16_t events) {
94       efd_read();
95       if (--readsRemaining == 0) {
96         eh.unregisterHandler();
97       }
98     }));
99   efd_write(writes);
100   eb.loop();
101
102   EXPECT_EQ(0, readsRemaining);
103 }
104
105 TEST_F(EventHandlerTest, many_concurrent_producers) {
106   const size_t writes = 200;
107   const size_t nproducers = 20;
108   size_t readsRemaining = writes;
109
110   runInThreadsAndWait({
111     [&] {
112       EventBase eb;
113       EventHandlerMock eh(&eb, efd);
114       eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
115       EXPECT_CALL(eh, _handlerReady(_))
116         .Times(writes)
117         .WillRepeatedly(Invoke([&](uint16_t events) {
118           efd_read();
119           if (--readsRemaining == 0) {
120             eh.unregisterHandler();
121           }
122         }));
123       eb.loop();
124     },
125     [&] {
126       runInThreadsAndWait(nproducers, [&](size_t k) {
127         for (size_t i = 0; i < writes / nproducers; ++i) {
128           this_thread::sleep_for(chrono::milliseconds(1));
129           efd_write(1);
130         }
131       });
132     },
133   });
134
135   EXPECT_EQ(0, readsRemaining);
136 }
137
138 TEST_F(EventHandlerTest, many_concurrent_consumers) {
139   const size_t writes = 200;
140   const size_t nproducers = 8;
141   const size_t nconsumers = 20;
142   atomic<size_t> writesRemaining(writes);
143   atomic<size_t> readsRemaining(writes);
144
145   MPMCQueue<nullptr_t> queue(writes / 10);
146
147   runInThreadsAndWait({
148     [&] {
149       runInThreadsAndWait(nconsumers, [&](size_t k) {
150         size_t thReadsRemaining = writes / nconsumers;
151         EventBase eb;
152         EventHandlerMock eh(&eb, efd);
153         eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
154         EXPECT_CALL(eh, _handlerReady(_))
155           .WillRepeatedly(Invoke([&](uint16_t events) {
156             nullptr_t val;
157             if (!queue.readIfNotEmpty(val)) {
158               return;
159             }
160             efd_read();
161             --readsRemaining;
162             if (--thReadsRemaining == 0) {
163               eh.unregisterHandler();
164             }
165           }));
166         eb.loop();
167       });
168     },
169     [&] {
170       runInThreadsAndWait(nproducers, [&](size_t k) {
171         for (size_t i = 0; i < writes / nproducers; ++i) {
172           this_thread::sleep_for(chrono::milliseconds(1));
173           queue.blockingWrite(nullptr);
174           efd_write(1);
175           --writesRemaining;
176         }
177       });
178     },
179   });
180
181   EXPECT_EQ(0, writesRemaining);
182   EXPECT_EQ(0, readsRemaining);
183 }