std::chrono'ize EventBase::setLoadAvgMsec
[folly.git] / folly / io / async / test / EventBaseTest.cpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19
20 #include <folly/Memory.h>
21 #include <folly/ScopeGuard.h>
22
23 #include <folly/io/async/AsyncTimeout.h>
24 #include <folly/io/async/EventBase.h>
25 #include <folly/io/async/EventHandler.h>
26 #include <folly/io/async/test/SocketPair.h>
27 #include <folly/io/async/test/Util.h>
28 #include <folly/portability/Unistd.h>
29
30 #include <folly/futures/Promise.h>
31
32 #include <atomic>
33 #include <iostream>
34 #include <memory>
35 #include <thread>
36
37 using std::atomic;
38 using std::deque;
39 using std::pair;
40 using std::vector;
41 using std::unique_ptr;
42 using std::thread;
43 using std::make_pair;
44 using std::cerr;
45 using std::endl;
46 using std::chrono::milliseconds;
47 using std::chrono::microseconds;
48 using std::chrono::duration_cast;
49
50 using namespace std::chrono_literals;
51
52 using namespace folly;
53
54 ///////////////////////////////////////////////////////////////////////////
55 // Tests for read and write events
56 ///////////////////////////////////////////////////////////////////////////
57
58 enum { BUF_SIZE = 4096 };
59
60 ssize_t writeToFD(int fd, size_t length) {
61   // write an arbitrary amount of data to the fd
62   auto bufv = vector<char>(length);
63   auto buf = bufv.data();
64   memset(buf, 'a', length);
65   ssize_t rc = write(fd, buf, length);
66   CHECK_EQ(rc, length);
67   return rc;
68 }
69
70 size_t writeUntilFull(int fd) {
71   // Write to the fd until EAGAIN is returned
72   size_t bytesWritten = 0;
73   char buf[BUF_SIZE];
74   memset(buf, 'a', sizeof(buf));
75   while (true) {
76     ssize_t rc = write(fd, buf, sizeof(buf));
77     if (rc < 0) {
78       CHECK_EQ(errno, EAGAIN);
79       break;
80     } else {
81       bytesWritten += rc;
82     }
83   }
84   return bytesWritten;
85 }
86
87 ssize_t readFromFD(int fd, size_t length) {
88   // write an arbitrary amount of data to the fd
89   auto buf = vector<char>(length);
90   return read(fd, buf.data(), length);
91 }
92
93 size_t readUntilEmpty(int fd) {
94   // Read from the fd until EAGAIN is returned
95   char buf[BUF_SIZE];
96   size_t bytesRead = 0;
97   while (true) {
98     int rc = read(fd, buf, sizeof(buf));
99     if (rc == 0) {
100       CHECK(false) << "unexpected EOF";
101     } else if (rc < 0) {
102       CHECK_EQ(errno, EAGAIN);
103       break;
104     } else {
105       bytesRead += rc;
106     }
107   }
108   return bytesRead;
109 }
110
111 void checkReadUntilEmpty(int fd, size_t expectedLength) {
112   ASSERT_EQ(readUntilEmpty(fd), expectedLength);
113 }
114
115 struct ScheduledEvent {
116   int milliseconds;
117   uint16_t events;
118   size_t length;
119   ssize_t result;
120
121   void perform(int fd) {
122     if (events & EventHandler::READ) {
123       if (length == 0) {
124         result = readUntilEmpty(fd);
125       } else {
126         result = readFromFD(fd, length);
127       }
128     }
129     if (events & EventHandler::WRITE) {
130       if (length == 0) {
131         result = writeUntilFull(fd);
132       } else {
133         result = writeToFD(fd, length);
134       }
135     }
136   }
137 };
138
139 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
140   for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
141     eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
142                              ev->milliseconds);
143   }
144 }
145
146 class TestHandler : public EventHandler {
147  public:
148   TestHandler(EventBase* eventBase, int fd)
149     : EventHandler(eventBase, fd), fd_(fd) {}
150
151   void handlerReady(uint16_t events) noexcept override {
152     ssize_t bytesRead = 0;
153     ssize_t bytesWritten = 0;
154     if (events & READ) {
155       // Read all available data, so EventBase will stop calling us
156       // until new data becomes available
157       bytesRead = readUntilEmpty(fd_);
158     }
159     if (events & WRITE) {
160       // Write until the pipe buffer is full, so EventBase will stop calling
161       // us until the other end has read some data
162       bytesWritten = writeUntilFull(fd_);
163     }
164
165     log.emplace_back(events, bytesRead, bytesWritten);
166   }
167
168   struct EventRecord {
169     EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
170       : events(events)
171       , timestamp()
172       , bytesRead(bytesRead)
173       , bytesWritten(bytesWritten) {}
174
175     uint16_t events;
176     TimePoint timestamp;
177     ssize_t bytesRead;
178     ssize_t bytesWritten;
179   };
180
181   deque<EventRecord> log;
182
183  private:
184   int fd_;
185 };
186
187 /**
188  * Test a READ event
189  */
190 TEST(EventBaseTest, ReadEvent) {
191   EventBase eb;
192   SocketPair sp;
193
194   // Register for read events
195   TestHandler handler(&eb, sp[0]);
196   handler.registerHandler(EventHandler::READ);
197
198   // Register timeouts to perform two write events
199   ScheduledEvent events[] = {
200     { 10, EventHandler::WRITE, 2345, 0 },
201     { 160, EventHandler::WRITE, 99, 0 },
202     { 0, 0, 0, 0 },
203   };
204   scheduleEvents(&eb, sp[1], events);
205
206   // Loop
207   TimePoint start;
208   eb.loop();
209   TimePoint end;
210
211   // Since we didn't use the EventHandler::PERSIST flag, the handler should
212   // have received the first read, then unregistered itself.  Check that only
213   // the first chunk of data was received.
214   ASSERT_EQ(handler.log.size(), 1);
215   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
216   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
217                   milliseconds(events[0].milliseconds), milliseconds(90));
218   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
219   ASSERT_EQ(handler.log[0].bytesWritten, 0);
220   T_CHECK_TIMEOUT(start, end,
221                   milliseconds(events[1].milliseconds), milliseconds(30));
222
223   // Make sure the second chunk of data is still waiting to be read.
224   size_t bytesRemaining = readUntilEmpty(sp[0]);
225   ASSERT_EQ(bytesRemaining, events[1].length);
226 }
227
228 /**
229  * Test (READ | PERSIST)
230  */
231 TEST(EventBaseTest, ReadPersist) {
232   EventBase eb;
233   SocketPair sp;
234
235   // Register for read events
236   TestHandler handler(&eb, sp[0]);
237   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
238
239   // Register several timeouts to perform writes
240   ScheduledEvent events[] = {
241     { 10,  EventHandler::WRITE, 1024, 0 },
242     { 20,  EventHandler::WRITE, 2211, 0 },
243     { 30,  EventHandler::WRITE, 4096, 0 },
244     { 100, EventHandler::WRITE, 100,  0 },
245     { 0, 0, 0, 0 },
246   };
247   scheduleEvents(&eb, sp[1], events);
248
249   // Schedule a timeout to unregister the handler after the third write
250   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
251
252   // Loop
253   TimePoint start;
254   eb.loop();
255   TimePoint end;
256
257   // The handler should have received the first 3 events,
258   // then been unregistered after that.
259   ASSERT_EQ(handler.log.size(), 3);
260   for (int n = 0; n < 3; ++n) {
261     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
262     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
263                     milliseconds(events[n].milliseconds));
264     ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
265     ASSERT_EQ(handler.log[n].bytesWritten, 0);
266   }
267   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
268
269   // Make sure the data from the last write is still waiting to be read
270   size_t bytesRemaining = readUntilEmpty(sp[0]);
271   ASSERT_EQ(bytesRemaining, events[3].length);
272 }
273
274 /**
275  * Test registering for READ when the socket is immediately readable
276  */
277 TEST(EventBaseTest, ReadImmediate) {
278   EventBase eb;
279   SocketPair sp;
280
281   // Write some data to the socket so the other end will
282   // be immediately readable
283   size_t dataLength = 1234;
284   writeToFD(sp[1], dataLength);
285
286   // Register for read events
287   TestHandler handler(&eb, sp[0]);
288   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
289
290   // Register a timeout to perform another write
291   ScheduledEvent events[] = {
292     { 10, EventHandler::WRITE, 2345, 0 },
293     { 0, 0, 0, 0 },
294   };
295   scheduleEvents(&eb, sp[1], events);
296
297   // Schedule a timeout to unregister the handler
298   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
299
300   // Loop
301   TimePoint start;
302   eb.loop();
303   TimePoint end;
304
305   ASSERT_EQ(handler.log.size(), 2);
306
307   // There should have been 1 event for immediate readability
308   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
309   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
310   ASSERT_EQ(handler.log[0].bytesRead, dataLength);
311   ASSERT_EQ(handler.log[0].bytesWritten, 0);
312
313   // There should be another event after the timeout wrote more data
314   ASSERT_EQ(handler.log[1].events, EventHandler::READ);
315   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
316                   milliseconds(events[0].milliseconds));
317   ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
318   ASSERT_EQ(handler.log[1].bytesWritten, 0);
319
320   T_CHECK_TIMEOUT(start, end, milliseconds(20));
321 }
322
323 /**
324  * Test a WRITE event
325  */
326 TEST(EventBaseTest, WriteEvent) {
327   EventBase eb;
328   SocketPair sp;
329
330   // Fill up the write buffer before starting
331   size_t initialBytesWritten = writeUntilFull(sp[0]);
332
333   // Register for write events
334   TestHandler handler(&eb, sp[0]);
335   handler.registerHandler(EventHandler::WRITE);
336
337   // Register timeouts to perform two reads
338   ScheduledEvent events[] = {
339     { 10, EventHandler::READ, 0, 0 },
340     { 60, EventHandler::READ, 0, 0 },
341     { 0, 0, 0, 0 },
342   };
343   scheduleEvents(&eb, sp[1], events);
344
345   // Loop
346   TimePoint start;
347   eb.loop();
348   TimePoint end;
349
350   // Since we didn't use the EventHandler::PERSIST flag, the handler should
351   // have only been able to write once, then unregistered itself.
352   ASSERT_EQ(handler.log.size(), 1);
353   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
354   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
355                   milliseconds(events[0].milliseconds));
356   ASSERT_EQ(handler.log[0].bytesRead, 0);
357   ASSERT_GT(handler.log[0].bytesWritten, 0);
358   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
359
360   ASSERT_EQ(events[0].result, initialBytesWritten);
361   ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
362 }
363
364 /**
365  * Test (WRITE | PERSIST)
366  */
367 TEST(EventBaseTest, WritePersist) {
368   EventBase eb;
369   SocketPair sp;
370
371   // Fill up the write buffer before starting
372   size_t initialBytesWritten = writeUntilFull(sp[0]);
373
374   // Register for write events
375   TestHandler handler(&eb, sp[0]);
376   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
377
378   // Register several timeouts to read from the socket at several intervals
379   ScheduledEvent events[] = {
380     { 10,  EventHandler::READ, 0, 0 },
381     { 40,  EventHandler::READ, 0, 0 },
382     { 70,  EventHandler::READ, 0, 0 },
383     { 100, EventHandler::READ, 0, 0 },
384     { 0, 0, 0, 0 },
385   };
386   scheduleEvents(&eb, sp[1], events);
387
388   // Schedule a timeout to unregister the handler after the third read
389   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
390
391   // Loop
392   TimePoint start;
393   eb.loop();
394   TimePoint end;
395
396   // The handler should have received the first 3 events,
397   // then been unregistered after that.
398   ASSERT_EQ(handler.log.size(), 3);
399   ASSERT_EQ(events[0].result, initialBytesWritten);
400   for (int n = 0; n < 3; ++n) {
401     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
402     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
403                     milliseconds(events[n].milliseconds));
404     ASSERT_EQ(handler.log[n].bytesRead, 0);
405     ASSERT_GT(handler.log[n].bytesWritten, 0);
406     ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
407   }
408   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
409 }
410
411 /**
412  * Test registering for WRITE when the socket is immediately writable
413  */
414 TEST(EventBaseTest, WriteImmediate) {
415   EventBase eb;
416   SocketPair sp;
417
418   // Register for write events
419   TestHandler handler(&eb, sp[0]);
420   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
421
422   // Register a timeout to perform a read
423   ScheduledEvent events[] = {
424     { 10, EventHandler::READ, 0, 0 },
425     { 0, 0, 0, 0 },
426   };
427   scheduleEvents(&eb, sp[1], events);
428
429   // Schedule a timeout to unregister the handler
430   int64_t unregisterTimeout = 40;
431   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
432                    unregisterTimeout);
433
434   // Loop
435   TimePoint start;
436   eb.loop();
437   TimePoint end;
438
439   ASSERT_EQ(handler.log.size(), 2);
440
441   // Since the socket buffer was initially empty,
442   // there should have been 1 event for immediate writability
443   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
444   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
445   ASSERT_EQ(handler.log[0].bytesRead, 0);
446   ASSERT_GT(handler.log[0].bytesWritten, 0);
447
448   // There should be another event after the timeout wrote more data
449   ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
450   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
451                   milliseconds(events[0].milliseconds));
452   ASSERT_EQ(handler.log[1].bytesRead, 0);
453   ASSERT_GT(handler.log[1].bytesWritten, 0);
454
455   T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
456 }
457
458 /**
459  * Test (READ | WRITE) when the socket becomes readable first
460  */
461 TEST(EventBaseTest, ReadWrite) {
462   EventBase eb;
463   SocketPair sp;
464
465   // Fill up the write buffer before starting
466   size_t sock0WriteLength = writeUntilFull(sp[0]);
467
468   // Register for read and write events
469   TestHandler handler(&eb, sp[0]);
470   handler.registerHandler(EventHandler::READ_WRITE);
471
472   // Register timeouts to perform a write then a read.
473   ScheduledEvent events[] = {
474     { 10, EventHandler::WRITE, 2345, 0 },
475     { 40, EventHandler::READ, 0, 0 },
476     { 0, 0, 0, 0 },
477   };
478   scheduleEvents(&eb, sp[1], events);
479
480   // Loop
481   TimePoint start;
482   eb.loop();
483   TimePoint end;
484
485   // Since we didn't use the EventHandler::PERSIST flag, the handler should
486   // have only noticed readability, then unregistered itself.  Check that only
487   // one event was logged.
488   ASSERT_EQ(handler.log.size(), 1);
489   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
490   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
491                   milliseconds(events[0].milliseconds));
492   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
493   ASSERT_EQ(handler.log[0].bytesWritten, 0);
494   ASSERT_EQ(events[1].result, sock0WriteLength);
495   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
496 }
497
498 /**
499  * Test (READ | WRITE) when the socket becomes writable first
500  */
501 TEST(EventBaseTest, WriteRead) {
502   EventBase eb;
503   SocketPair sp;
504
505   // Fill up the write buffer before starting
506   size_t sock0WriteLength = writeUntilFull(sp[0]);
507
508   // Register for read and write events
509   TestHandler handler(&eb, sp[0]);
510   handler.registerHandler(EventHandler::READ_WRITE);
511
512   // Register timeouts to perform a read then a write.
513   size_t sock1WriteLength = 2345;
514   ScheduledEvent events[] = {
515     { 10, EventHandler::READ, 0, 0 },
516     { 40, EventHandler::WRITE, sock1WriteLength, 0 },
517     { 0, 0, 0, 0 },
518   };
519   scheduleEvents(&eb, sp[1], events);
520
521   // Loop
522   TimePoint start;
523   eb.loop();
524   TimePoint end;
525
526   // Since we didn't use the EventHandler::PERSIST flag, the handler should
527   // have only noticed writability, then unregistered itself.  Check that only
528   // one event was logged.
529   ASSERT_EQ(handler.log.size(), 1);
530   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
531   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
532                   milliseconds(events[0].milliseconds));
533   ASSERT_EQ(handler.log[0].bytesRead, 0);
534   ASSERT_GT(handler.log[0].bytesWritten, 0);
535   ASSERT_EQ(events[0].result, sock0WriteLength);
536   ASSERT_EQ(events[1].result, sock1WriteLength);
537   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
538
539   // Make sure the written data is still waiting to be read.
540   size_t bytesRemaining = readUntilEmpty(sp[0]);
541   ASSERT_EQ(bytesRemaining, events[1].length);
542 }
543
544 /**
545  * Test (READ | WRITE) when the socket becomes readable and writable
546  * at the same time.
547  */
548 TEST(EventBaseTest, ReadWriteSimultaneous) {
549   EventBase eb;
550   SocketPair sp;
551
552   // Fill up the write buffer before starting
553   size_t sock0WriteLength = writeUntilFull(sp[0]);
554
555   // Register for read and write events
556   TestHandler handler(&eb, sp[0]);
557   handler.registerHandler(EventHandler::READ_WRITE);
558
559   // Register a timeout to perform a read and write together
560   ScheduledEvent events[] = {
561     { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
562     { 0, 0, 0, 0 },
563   };
564   scheduleEvents(&eb, sp[1], events);
565
566   // Loop
567   TimePoint start;
568   eb.loop();
569   TimePoint end;
570
571   // It's not strictly required that the EventBase register us about both
572   // events in the same call.  So, it's possible that if the EventBase
573   // implementation changes this test could start failing, and it wouldn't be
574   // considered breaking the API.  However for now it's nice to exercise this
575   // code path.
576   ASSERT_EQ(handler.log.size(), 1);
577   ASSERT_EQ(handler.log[0].events,
578                     EventHandler::READ | EventHandler::WRITE);
579   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
580                   milliseconds(events[0].milliseconds));
581   ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
582   ASSERT_GT(handler.log[0].bytesWritten, 0);
583   T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
584 }
585
586 /**
587  * Test (READ | WRITE | PERSIST)
588  */
589 TEST(EventBaseTest, ReadWritePersist) {
590   EventBase eb;
591   SocketPair sp;
592
593   // Register for read and write events
594   TestHandler handler(&eb, sp[0]);
595   handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
596                           EventHandler::PERSIST);
597
598   // Register timeouts to perform several reads and writes
599   ScheduledEvent events[] = {
600     { 10, EventHandler::WRITE, 2345, 0 },
601     { 20, EventHandler::READ, 0, 0 },
602     { 35, EventHandler::WRITE, 200, 0 },
603     { 45, EventHandler::WRITE, 15, 0 },
604     { 55, EventHandler::READ, 0, 0 },
605     { 120, EventHandler::WRITE, 2345, 0 },
606     { 0, 0, 0, 0 },
607   };
608   scheduleEvents(&eb, sp[1], events);
609
610   // Schedule a timeout to unregister the handler
611   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
612
613   // Loop
614   TimePoint start;
615   eb.loop();
616   TimePoint end;
617
618   ASSERT_EQ(handler.log.size(), 6);
619
620   // Since we didn't fill up the write buffer immediately, there should
621   // be an immediate event for writability.
622   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
623   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
624   ASSERT_EQ(handler.log[0].bytesRead, 0);
625   ASSERT_GT(handler.log[0].bytesWritten, 0);
626
627   // Events 1 through 5 should correspond to the scheduled events
628   for (int n = 1; n < 6; ++n) {
629     ScheduledEvent* event = &events[n - 1];
630     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
631                     milliseconds(event->milliseconds));
632     if (event->events == EventHandler::READ) {
633       ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
634       ASSERT_EQ(handler.log[n].bytesRead, 0);
635       ASSERT_GT(handler.log[n].bytesWritten, 0);
636     } else {
637       ASSERT_EQ(handler.log[n].events, EventHandler::READ);
638       ASSERT_EQ(handler.log[n].bytesRead, event->length);
639       ASSERT_EQ(handler.log[n].bytesWritten, 0);
640     }
641   }
642
643   // The timeout should have unregistered the handler before the last write.
644   // Make sure that data is still waiting to be read
645   size_t bytesRemaining = readUntilEmpty(sp[0]);
646   ASSERT_EQ(bytesRemaining, events[5].length);
647 }
648
649
650 class PartialReadHandler : public TestHandler {
651  public:
652   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
653     : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
654
655   void handlerReady(uint16_t events) noexcept override {
656     assert(events == EventHandler::READ);
657     ssize_t bytesRead = readFromFD(fd_, readLength_);
658     log.emplace_back(events, bytesRead, 0);
659   }
660
661  private:
662   int fd_;
663   size_t readLength_;
664 };
665
666 /**
667  * Test reading only part of the available data when a read event is fired.
668  * When PERSIST is used, make sure the handler gets notified again the next
669  * time around the loop.
670  */
671 TEST(EventBaseTest, ReadPartial) {
672   EventBase eb;
673   SocketPair sp;
674
675   // Register for read events
676   size_t readLength = 100;
677   PartialReadHandler handler(&eb, sp[0], readLength);
678   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
679
680   // Register a timeout to perform a single write,
681   // with more data than PartialReadHandler will read at once
682   ScheduledEvent events[] = {
683     { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
684     { 0, 0, 0, 0 },
685   };
686   scheduleEvents(&eb, sp[1], events);
687
688   // Schedule a timeout to unregister the handler
689   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
690
691   // Loop
692   TimePoint start;
693   eb.loop();
694   TimePoint end;
695
696   ASSERT_EQ(handler.log.size(), 4);
697
698   // The first 3 invocations should read readLength bytes each
699   for (int n = 0; n < 3; ++n) {
700     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
701     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
702                     milliseconds(events[0].milliseconds));
703     ASSERT_EQ(handler.log[n].bytesRead, readLength);
704     ASSERT_EQ(handler.log[n].bytesWritten, 0);
705   }
706   // The last read only has readLength/2 bytes
707   ASSERT_EQ(handler.log[3].events, EventHandler::READ);
708   T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
709                   milliseconds(events[0].milliseconds));
710   ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
711   ASSERT_EQ(handler.log[3].bytesWritten, 0);
712 }
713
714
715 class PartialWriteHandler : public TestHandler {
716  public:
717   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
718     : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
719
720   void handlerReady(uint16_t events) noexcept override {
721     assert(events == EventHandler::WRITE);
722     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
723     log.emplace_back(events, 0, bytesWritten);
724   }
725
726  private:
727   int fd_;
728   size_t writeLength_;
729 };
730
731 /**
732  * Test writing without completely filling up the write buffer when the fd
733  * becomes writable.  When PERSIST is used, make sure the handler gets
734  * notified again the next time around the loop.
735  */
736 TEST(EventBaseTest, WritePartial) {
737   EventBase eb;
738   SocketPair sp;
739
740   // Fill up the write buffer before starting
741   size_t initialBytesWritten = writeUntilFull(sp[0]);
742
743   // Register for write events
744   size_t writeLength = 100;
745   PartialWriteHandler handler(&eb, sp[0], writeLength);
746   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
747
748   // Register a timeout to read, so that more data can be written
749   ScheduledEvent events[] = {
750     { 10, EventHandler::READ, 0, 0 },
751     { 0, 0, 0, 0 },
752   };
753   scheduleEvents(&eb, sp[1], events);
754
755   // Schedule a timeout to unregister the handler
756   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
757
758   // Loop
759   TimePoint start;
760   eb.loop();
761   TimePoint end;
762
763   // Depending on how big the socket buffer is, there will be multiple writes
764   // Only check the first 5
765   int numChecked = 5;
766   ASSERT_GE(handler.log.size(), numChecked);
767   ASSERT_EQ(events[0].result, initialBytesWritten);
768
769   // The first 3 invocations should read writeLength bytes each
770   for (int n = 0; n < numChecked; ++n) {
771     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
772     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
773                     milliseconds(events[0].milliseconds));
774     ASSERT_EQ(handler.log[n].bytesRead, 0);
775     ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
776   }
777 }
778
779
780 /**
781  * Test destroying a registered EventHandler
782  */
783 TEST(EventBaseTest, DestroyHandler) {
784   class DestroyHandler : public AsyncTimeout {
785    public:
786     DestroyHandler(EventBase* eb, EventHandler* h)
787       : AsyncTimeout(eb)
788       , handler_(h) {}
789
790     void timeoutExpired() noexcept override { delete handler_; }
791
792    private:
793     EventHandler* handler_;
794   };
795
796   EventBase eb;
797   SocketPair sp;
798
799   // Fill up the write buffer before starting
800   size_t initialBytesWritten = writeUntilFull(sp[0]);
801
802   // Register for write events
803   TestHandler* handler = new TestHandler(&eb, sp[0]);
804   handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
805
806   // After 10ms, read some data, so that the handler
807   // will be notified that it can write.
808   eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
809                    10);
810
811   // Start a timer to destroy the handler after 25ms
812   // This mainly just makes sure the code doesn't break or assert
813   DestroyHandler dh(&eb, handler);
814   dh.scheduleTimeout(25);
815
816   TimePoint start;
817   eb.loop();
818   TimePoint end;
819
820   // Make sure the EventHandler was uninstalled properly when it was
821   // destroyed, and the EventBase loop exited
822   T_CHECK_TIMEOUT(start, end, milliseconds(25));
823
824   // Make sure that the handler wrote data to the socket
825   // before it was destroyed
826   size_t bytesRemaining = readUntilEmpty(sp[1]);
827   ASSERT_GT(bytesRemaining, 0);
828 }
829
830
831 ///////////////////////////////////////////////////////////////////////////
832 // Tests for timeout events
833 ///////////////////////////////////////////////////////////////////////////
834
835 TEST(EventBaseTest, RunAfterDelay) {
836   EventBase eb;
837
838   TimePoint timestamp1(false);
839   TimePoint timestamp2(false);
840   TimePoint timestamp3(false);
841   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
842   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
843   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
844
845   TimePoint start;
846   eb.loop();
847   TimePoint end;
848
849   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
850   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
851   T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
852   T_CHECK_TIMEOUT(start, end, milliseconds(40));
853 }
854
855 /**
856  * Test the behavior of tryRunAfterDelay() when some timeouts are
857  * still scheduled when the EventBase is destroyed.
858  */
859 TEST(EventBaseTest, RunAfterDelayDestruction) {
860   TimePoint timestamp1(false);
861   TimePoint timestamp2(false);
862   TimePoint timestamp3(false);
863   TimePoint timestamp4(false);
864   TimePoint start(false);
865   TimePoint end(false);
866
867   {
868     EventBase eb;
869
870     // Run two normal timeouts
871     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
872     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
873
874     // Schedule a timeout to stop the event loop after 40ms
875     eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
876
877     // Schedule 2 timeouts that would fire after the event loop stops
878     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
879     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
880
881     start.reset();
882     eb.loop();
883     end.reset();
884   }
885
886   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
887   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
888   T_CHECK_TIMEOUT(start, end, milliseconds(40));
889
890   ASSERT_TRUE(timestamp3.isUnset());
891   ASSERT_TRUE(timestamp4.isUnset());
892
893   // Ideally this test should be run under valgrind to ensure that no
894   // memory is leaked.
895 }
896
897 class TestTimeout : public AsyncTimeout {
898  public:
899   explicit TestTimeout(EventBase* eventBase)
900     : AsyncTimeout(eventBase)
901     , timestamp(false) {}
902
903   void timeoutExpired() noexcept override { timestamp.reset(); }
904
905   TimePoint timestamp;
906 };
907
908 TEST(EventBaseTest, BasicTimeouts) {
909   EventBase eb;
910
911   TestTimeout t1(&eb);
912   TestTimeout t2(&eb);
913   TestTimeout t3(&eb);
914   t1.scheduleTimeout(10);
915   t2.scheduleTimeout(20);
916   t3.scheduleTimeout(40);
917
918   TimePoint start;
919   eb.loop();
920   TimePoint end;
921
922   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
923   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
924   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
925   T_CHECK_TIMEOUT(start, end, milliseconds(40));
926 }
927
928 class ReschedulingTimeout : public AsyncTimeout {
929  public:
930   ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
931     : AsyncTimeout(evb)
932     , timeouts_(timeouts)
933     , iterator_(timeouts_.begin()) {}
934
935   void start() {
936     reschedule();
937   }
938
939   void timeoutExpired() noexcept override {
940     timestamps.emplace_back();
941     reschedule();
942   }
943
944   void reschedule() {
945     if (iterator_ != timeouts_.end()) {
946       uint32_t timeout = *iterator_;
947       ++iterator_;
948       scheduleTimeout(timeout);
949     }
950   }
951
952   vector<TimePoint> timestamps;
953
954  private:
955   vector<uint32_t> timeouts_;
956   vector<uint32_t>::const_iterator iterator_;
957 };
958
959 /**
960  * Test rescheduling the same timeout multiple times
961  */
962 TEST(EventBaseTest, ReuseTimeout) {
963   EventBase eb;
964
965   vector<uint32_t> timeouts;
966   timeouts.push_back(10);
967   timeouts.push_back(30);
968   timeouts.push_back(15);
969
970   ReschedulingTimeout t(&eb, timeouts);
971   t.start();
972
973   TimePoint start;
974   eb.loop();
975   TimePoint end;
976
977   // Use a higher tolerance than usual.  We're waiting on 3 timeouts
978   // consecutively.  In general, each timeout may go over by a few
979   // milliseconds, and we're tripling this error by witing on 3 timeouts.
980   milliseconds tolerance{6};
981
982   ASSERT_EQ(timeouts.size(), t.timestamps.size());
983   uint32_t total = 0;
984   for (size_t n = 0; n < timeouts.size(); ++n) {
985     total += timeouts[n];
986     T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
987   }
988   T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
989 }
990
991 /**
992  * Test rescheduling a timeout before it has fired
993  */
994 TEST(EventBaseTest, RescheduleTimeout) {
995   EventBase eb;
996
997   TestTimeout t1(&eb);
998   TestTimeout t2(&eb);
999   TestTimeout t3(&eb);
1000
1001   t1.scheduleTimeout(15);
1002   t2.scheduleTimeout(30);
1003   t3.scheduleTimeout(30);
1004
1005   auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1006       &AsyncTimeout::scheduleTimeout);
1007
1008   // after 10ms, reschedule t2 to run sooner than originally scheduled
1009   eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1010   // after 10ms, reschedule t3 to run later than originally scheduled
1011   eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1012
1013   TimePoint start;
1014   eb.loop();
1015   TimePoint end;
1016
1017   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1018   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1019   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1020   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1021 }
1022
1023 /**
1024  * Test cancelling a timeout
1025  */
1026 TEST(EventBaseTest, CancelTimeout) {
1027   EventBase eb;
1028
1029   vector<uint32_t> timeouts;
1030   timeouts.push_back(10);
1031   timeouts.push_back(30);
1032   timeouts.push_back(25);
1033
1034   ReschedulingTimeout t(&eb, timeouts);
1035   t.start();
1036   eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1037
1038   TimePoint start;
1039   eb.loop();
1040   TimePoint end;
1041
1042   ASSERT_EQ(t.timestamps.size(), 2);
1043   T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1044   T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1045   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1046 }
1047
1048 /**
1049  * Test destroying a scheduled timeout object
1050  */
1051 TEST(EventBaseTest, DestroyTimeout) {
1052   class DestroyTimeout : public AsyncTimeout {
1053    public:
1054     DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1055       : AsyncTimeout(eb)
1056       , timeout_(t) {}
1057
1058     void timeoutExpired() noexcept override { delete timeout_; }
1059
1060    private:
1061     AsyncTimeout* timeout_;
1062   };
1063
1064   EventBase eb;
1065
1066   TestTimeout* t1 = new TestTimeout(&eb);
1067   t1->scheduleTimeout(30);
1068
1069   DestroyTimeout dt(&eb, t1);
1070   dt.scheduleTimeout(10);
1071
1072   TimePoint start;
1073   eb.loop();
1074   TimePoint end;
1075
1076   T_CHECK_TIMEOUT(start, end, milliseconds(10));
1077 }
1078
1079
1080 ///////////////////////////////////////////////////////////////////////////
1081 // Test for runInThreadTestFunc()
1082 ///////////////////////////////////////////////////////////////////////////
1083
1084 struct RunInThreadData {
1085   RunInThreadData(int numThreads, int opsPerThread)
1086     : opsPerThread(opsPerThread)
1087     , opsToGo(numThreads*opsPerThread) {}
1088
1089   EventBase evb;
1090   deque< pair<int, int> > values;
1091
1092   int opsPerThread;
1093   int opsToGo;
1094 };
1095
1096 struct RunInThreadArg {
1097   RunInThreadArg(RunInThreadData* data,
1098                  int threadId,
1099                  int value)
1100     : data(data)
1101     , thread(threadId)
1102     , value(value) {}
1103
1104   RunInThreadData* data;
1105   int thread;
1106   int value;
1107 };
1108
1109 void runInThreadTestFunc(RunInThreadArg* arg) {
1110   arg->data->values.emplace_back(arg->thread, arg->value);
1111   RunInThreadData* data = arg->data;
1112   delete arg;
1113
1114   if(--data->opsToGo == 0) {
1115     // Break out of the event base loop if we are the last thread running
1116     data->evb.terminateLoopSoon();
1117   }
1118 }
1119
1120 TEST(EventBaseTest, RunInThread) {
1121   constexpr uint32_t numThreads = 50;
1122   constexpr uint32_t opsPerThread = 100;
1123   RunInThreadData data(numThreads, opsPerThread);
1124
1125   deque<std::thread> threads;
1126   SCOPE_EXIT {
1127     // Wait on all of the threads.
1128     for (auto& thread : threads) {
1129       thread.join();
1130     }
1131   };
1132
1133   for (uint32_t i = 0; i < numThreads; ++i) {
1134     threads.emplace_back([i, &data] {
1135         for (int n = 0; n < data.opsPerThread; ++n) {
1136           RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1137           data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1138           usleep(10);
1139         }
1140       });
1141   }
1142
1143   // Add a timeout event to run after 3 seconds.
1144   // Otherwise loop() will return immediately since there are no events to run.
1145   // Once the last thread exits, it will stop the loop().  However, this
1146   // timeout also stops the loop in case there is a bug performing the normal
1147   // stop.
1148   data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1149                          3000);
1150
1151   TimePoint start;
1152   data.evb.loop();
1153   TimePoint end;
1154
1155   // Verify that the loop exited because all threads finished and requested it
1156   // to stop.  This should happen much sooner than the 3 second timeout.
1157   // Assert that it happens in under a second.  (This is still tons of extra
1158   // padding.)
1159
1160   auto timeTaken = std::chrono::duration_cast<milliseconds>(
1161     end.getTime() - start.getTime());
1162   ASSERT_LT(timeTaken.count(), 1000);
1163   VLOG(11) << "Time taken: " << timeTaken.count();
1164
1165   // Verify that we have all of the events from every thread
1166   int expectedValues[numThreads];
1167   for (uint32_t n = 0; n < numThreads; ++n) {
1168     expectedValues[n] = 0;
1169   }
1170   for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1171        it != data.values.end();
1172        ++it) {
1173     int threadID = it->first;
1174     int value = it->second;
1175     ASSERT_EQ(expectedValues[threadID], value);
1176     ++expectedValues[threadID];
1177   }
1178   for (uint32_t n = 0; n < numThreads; ++n) {
1179     ASSERT_EQ(expectedValues[n], opsPerThread);
1180   }
1181 }
1182
1183 //  This test simulates some calls, and verifies that the waiting happens by
1184 //  triggering what otherwise would be race conditions, and trying to detect
1185 //  whether any of the race conditions happened.
1186 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1187   const size_t c = 256;
1188   vector<unique_ptr<atomic<size_t>>> atoms(c);
1189   for (size_t i = 0; i < c; ++i) {
1190     auto& atom = atoms.at(i);
1191     atom = make_unique<atomic<size_t>>(0);
1192   }
1193   vector<thread> threads;
1194   for (size_t i = 0; i < c; ++i) {
1195     threads.emplace_back([&atoms, i] {
1196       EventBase eb;
1197       auto& atom = *atoms.at(i);
1198       auto ebth = thread([&] { eb.loopForever(); });
1199       eb.waitUntilRunning();
1200       eb.runInEventBaseThreadAndWait([&] {
1201         size_t x = 0;
1202         atom.compare_exchange_weak(
1203             x, 1, std::memory_order_release, std::memory_order_relaxed);
1204       });
1205       size_t x = 0;
1206       atom.compare_exchange_weak(
1207           x, 2, std::memory_order_release, std::memory_order_relaxed);
1208       eb.terminateLoopSoon();
1209       ebth.join();
1210     });
1211   }
1212   for (size_t i = 0; i < c; ++i) {
1213     auto& th = threads.at(i);
1214     th.join();
1215   }
1216   size_t sum = 0;
1217   for (auto& atom : atoms) sum += *atom;
1218   EXPECT_EQ(c, sum);
1219 }
1220
1221 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1222   EventBase eb;
1223   thread th(&EventBase::loopForever, &eb);
1224   SCOPE_EXIT {
1225     eb.terminateLoopSoon();
1226     th.join();
1227   };
1228   auto mutated = false;
1229   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1230       mutated = true;
1231   });
1232   EXPECT_TRUE(mutated);
1233 }
1234
1235 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1236   EventBase eb;
1237   thread th(&EventBase::loopForever, &eb);
1238   SCOPE_EXIT {
1239     eb.terminateLoopSoon();
1240     th.join();
1241   };
1242   eb.runInEventBaseThreadAndWait([&] {
1243       auto mutated = false;
1244       eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1245           mutated = true;
1246       });
1247       EXPECT_TRUE(mutated);
1248   });
1249 }
1250
1251 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1252   EventBase eb;
1253   auto mutated = false;
1254   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1255       mutated = true;
1256     });
1257   EXPECT_TRUE(mutated);
1258 }
1259
1260 ///////////////////////////////////////////////////////////////////////////
1261 // Tests for runInLoop()
1262 ///////////////////////////////////////////////////////////////////////////
1263
1264 class CountedLoopCallback : public EventBase::LoopCallback {
1265  public:
1266   CountedLoopCallback(EventBase* eventBase,
1267                       unsigned int count,
1268                       std::function<void()> action =
1269                         std::function<void()>())
1270     : eventBase_(eventBase)
1271     , count_(count)
1272     , action_(action) {}
1273
1274   void runLoopCallback() noexcept override {
1275     --count_;
1276     if (count_ > 0) {
1277       eventBase_->runInLoop(this);
1278     } else if (action_) {
1279       action_();
1280     }
1281   }
1282
1283   unsigned int getCount() const {
1284     return count_;
1285   }
1286
1287  private:
1288   EventBase* eventBase_;
1289   unsigned int count_;
1290   std::function<void()> action_;
1291 };
1292
1293 // Test that EventBase::loop() doesn't exit while there are
1294 // still LoopCallbacks remaining to be invoked.
1295 TEST(EventBaseTest, RepeatedRunInLoop) {
1296   EventBase eventBase;
1297
1298   CountedLoopCallback c(&eventBase, 10);
1299   eventBase.runInLoop(&c);
1300   // The callback shouldn't have run immediately
1301   ASSERT_EQ(c.getCount(), 10);
1302   eventBase.loop();
1303
1304   // loop() should loop until the CountedLoopCallback stops
1305   // re-installing itself.
1306   ASSERT_EQ(c.getCount(), 0);
1307 }
1308
1309 // Test that EventBase::loop() works as expected without time measurements.
1310 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1311   EventBase eventBase(false);
1312
1313   CountedLoopCallback c(&eventBase, 10);
1314   eventBase.runInLoop(&c);
1315   // The callback shouldn't have run immediately
1316   ASSERT_EQ(c.getCount(), 10);
1317   eventBase.loop();
1318
1319   // loop() should loop until the CountedLoopCallback stops
1320   // re-installing itself.
1321   ASSERT_EQ(c.getCount(), 0);
1322 }
1323
1324 // Test runInLoop() calls with terminateLoopSoon()
1325 TEST(EventBaseTest, RunInLoopStopLoop) {
1326   EventBase eventBase;
1327
1328   CountedLoopCallback c1(&eventBase, 20);
1329   CountedLoopCallback c2(&eventBase, 10,
1330                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1331
1332   eventBase.runInLoop(&c1);
1333   eventBase.runInLoop(&c2);
1334   ASSERT_EQ(c1.getCount(), 20);
1335   ASSERT_EQ(c2.getCount(), 10);
1336
1337   eventBase.loopForever();
1338
1339   // c2 should have stopped the loop after 10 iterations
1340   ASSERT_EQ(c2.getCount(), 0);
1341
1342   // We allow the EventBase to run the loop callbacks in whatever order it
1343   // chooses.  We'll accept c1's count being either 10 (if the loop terminated
1344   // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1345   // before c1 ran).
1346   //
1347   // (With the current code, c1 will always run 10 times, but we don't consider
1348   // this a hard API requirement.)
1349   ASSERT_GE(c1.getCount(), 10);
1350   ASSERT_LE(c1.getCount(), 11);
1351 }
1352
1353 TEST(EventBaseTest, TryRunningAfterTerminate) {
1354   EventBase eventBase;
1355   CountedLoopCallback c1(&eventBase, 1,
1356                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1357   eventBase.runInLoop(&c1);
1358   eventBase.loopForever();
1359   bool ran = false;
1360   eventBase.runInEventBaseThread([&]() {
1361     ran = true;
1362   });
1363
1364   ASSERT_FALSE(ran);
1365 }
1366
1367 // Test cancelling runInLoop() callbacks
1368 TEST(EventBaseTest, CancelRunInLoop) {
1369   EventBase eventBase;
1370
1371   CountedLoopCallback c1(&eventBase, 20);
1372   CountedLoopCallback c2(&eventBase, 20);
1373   CountedLoopCallback c3(&eventBase, 20);
1374
1375   std::function<void()> cancelC1Action =
1376     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1377   std::function<void()> cancelC2Action =
1378     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1379
1380   CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1381   CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1382
1383   // Install cancelC1 after c1
1384   eventBase.runInLoop(&c1);
1385   eventBase.runInLoop(&cancelC1);
1386
1387   // Install cancelC2 before c2
1388   eventBase.runInLoop(&cancelC2);
1389   eventBase.runInLoop(&c2);
1390
1391   // Install c3
1392   eventBase.runInLoop(&c3);
1393
1394   ASSERT_EQ(c1.getCount(), 20);
1395   ASSERT_EQ(c2.getCount(), 20);
1396   ASSERT_EQ(c3.getCount(), 20);
1397   ASSERT_EQ(cancelC1.getCount(), 10);
1398   ASSERT_EQ(cancelC2.getCount(), 10);
1399
1400   // Run the loop
1401   eventBase.loop();
1402
1403   // cancelC1 and cancelC3 should have both fired after 10 iterations and
1404   // stopped re-installing themselves
1405   ASSERT_EQ(cancelC1.getCount(), 0);
1406   ASSERT_EQ(cancelC2.getCount(), 0);
1407   // c3 should have continued on for the full 20 iterations
1408   ASSERT_EQ(c3.getCount(), 0);
1409
1410   // c1 and c2 should have both been cancelled on the 10th iteration.
1411   //
1412   // Callbacks are always run in the order they are installed,
1413   // so c1 should have fired 10 times, and been canceled after it ran on the
1414   // 10th iteration.  c2 should have only fired 9 times, because cancelC2 will
1415   // have run before it on the 10th iteration, and cancelled it before it
1416   // fired.
1417   ASSERT_EQ(c1.getCount(), 10);
1418   ASSERT_EQ(c2.getCount(), 11);
1419 }
1420
1421 class TerminateTestCallback : public EventBase::LoopCallback,
1422                               public EventHandler {
1423  public:
1424   TerminateTestCallback(EventBase* eventBase, int fd)
1425     : EventHandler(eventBase, fd),
1426       eventBase_(eventBase),
1427       loopInvocations_(0),
1428       maxLoopInvocations_(0),
1429       eventInvocations_(0),
1430       maxEventInvocations_(0) {}
1431
1432   void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1433     loopInvocations_ = 0;
1434     maxLoopInvocations_ = maxLoopInvocations;
1435     eventInvocations_ = 0;
1436     maxEventInvocations_ = maxEventInvocations;
1437
1438     cancelLoopCallback();
1439     unregisterHandler();
1440   }
1441
1442   void handlerReady(uint16_t /* events */) noexcept override {
1443     // We didn't register with PERSIST, so we will have been automatically
1444     // unregistered already.
1445     ASSERT_FALSE(isHandlerRegistered());
1446
1447     ++eventInvocations_;
1448     if (eventInvocations_ >= maxEventInvocations_) {
1449       return;
1450     }
1451
1452     eventBase_->runInLoop(this);
1453   }
1454   void runLoopCallback() noexcept override {
1455     ++loopInvocations_;
1456     if (loopInvocations_ >= maxLoopInvocations_) {
1457       return;
1458     }
1459
1460     registerHandler(READ);
1461   }
1462
1463   uint32_t getLoopInvocations() const {
1464     return loopInvocations_;
1465   }
1466   uint32_t getEventInvocations() const {
1467     return eventInvocations_;
1468   }
1469
1470  private:
1471   EventBase* eventBase_;
1472   uint32_t loopInvocations_;
1473   uint32_t maxLoopInvocations_;
1474   uint32_t eventInvocations_;
1475   uint32_t maxEventInvocations_;
1476 };
1477
1478 /**
1479  * Test that EventBase::loop() correctly detects when there are no more events
1480  * left to run.
1481  *
1482  * This uses a single callback, which alternates registering itself as a loop
1483  * callback versus a EventHandler callback.  This exercises a regression where
1484  * EventBase::loop() incorrectly exited if there were no more fd handlers
1485  * registered, but a loop callback installed a new fd handler.
1486  */
1487 TEST(EventBaseTest, LoopTermination) {
1488   EventBase eventBase;
1489
1490   // Open a pipe and close the write end,
1491   // so the read endpoint will be readable
1492   int pipeFds[2];
1493   int rc = pipe(pipeFds);
1494   ASSERT_EQ(rc, 0);
1495   close(pipeFds[1]);
1496   TerminateTestCallback callback(&eventBase, pipeFds[0]);
1497
1498   // Test once where the callback will exit after a loop callback
1499   callback.reset(10, 100);
1500   eventBase.runInLoop(&callback);
1501   eventBase.loop();
1502   ASSERT_EQ(callback.getLoopInvocations(), 10);
1503   ASSERT_EQ(callback.getEventInvocations(), 9);
1504
1505   // Test once where the callback will exit after an fd event callback
1506   callback.reset(100, 7);
1507   eventBase.runInLoop(&callback);
1508   eventBase.loop();
1509   ASSERT_EQ(callback.getLoopInvocations(), 7);
1510   ASSERT_EQ(callback.getEventInvocations(), 7);
1511
1512   close(pipeFds[0]);
1513 }
1514
1515 ///////////////////////////////////////////////////////////////////////////
1516 // Tests for latency calculations
1517 ///////////////////////////////////////////////////////////////////////////
1518
1519 class IdleTimeTimeoutSeries : public AsyncTimeout {
1520
1521  public:
1522
1523   explicit IdleTimeTimeoutSeries(EventBase *base,
1524                                  std::deque<std::uint64_t>& timeout) :
1525     AsyncTimeout(base),
1526     timeouts_(0),
1527     timeout_(timeout) {
1528       scheduleTimeout(1);
1529     }
1530
1531     ~IdleTimeTimeoutSeries() override {}
1532
1533     void timeoutExpired() noexcept override {
1534     ++timeouts_;
1535
1536     if(timeout_.empty()){
1537       cancelTimeout();
1538     } else {
1539       uint64_t sleepTime = timeout_.front();
1540       timeout_.pop_front();
1541       if (sleepTime) {
1542         usleep(sleepTime);
1543       }
1544       scheduleTimeout(1);
1545     }
1546   }
1547
1548   int getTimeouts() const {
1549     return timeouts_;
1550   }
1551
1552  private:
1553   int timeouts_;
1554   std::deque<uint64_t>& timeout_;
1555 };
1556
1557 /**
1558  * Verify that idle time is correctly accounted for when decaying our loop
1559  * time.
1560  *
1561  * This works by creating a high loop time (via usleep), expecting a latency
1562  * callback with known value, and then scheduling a timeout for later. This
1563  * later timeout is far enough in the future that the idle time should have
1564  * caused the loop time to decay.
1565  */
1566 TEST(EventBaseTest, IdleTime) {
1567   EventBase eventBase;
1568   eventBase.setLoadAvgMsec(1000ms);
1569   eventBase.resetLoadAvg(5900.0);
1570   std::deque<uint64_t> timeouts0(4, 8080);
1571   timeouts0.push_front(8000);
1572   timeouts0.push_back(14000);
1573   IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1574   std::deque<uint64_t> timeouts(20, 20);
1575   std::unique_ptr<IdleTimeTimeoutSeries> tos;
1576   int64_t testStart = duration_cast<microseconds>(
1577     std::chrono::steady_clock::now().time_since_epoch()).count();
1578   bool hostOverloaded = false;
1579
1580   int latencyCallbacks = 0;
1581   eventBase.setMaxLatency(6000, [&]() {
1582     ++latencyCallbacks;
1583     if (latencyCallbacks != 1) {
1584       FAIL() << "Unexpected latency callback";
1585     }
1586
1587     if (tos0.getTimeouts() < 6) {
1588       // This could only happen if the host this test is running
1589       // on is heavily loaded.
1590       int64_t maxLatencyReached = duration_cast<microseconds>(
1591           std::chrono::steady_clock::now().time_since_epoch()).count();
1592       ASSERT_LE(43800, maxLatencyReached - testStart);
1593       hostOverloaded = true;
1594       return;
1595     }
1596     ASSERT_EQ(6, tos0.getTimeouts());
1597     ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1598     ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1599     tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1600   });
1601
1602   // Kick things off with an "immedite" timeout
1603   tos0.scheduleTimeout(1);
1604
1605   eventBase.loop();
1606
1607   if (hostOverloaded) {
1608     return;
1609   }
1610
1611   ASSERT_EQ(1, latencyCallbacks);
1612   ASSERT_EQ(7, tos0.getTimeouts());
1613   ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1614   ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1615   ASSERT_TRUE(!!tos);
1616   ASSERT_EQ(21, tos->getTimeouts());
1617 }
1618
1619 /**
1620  * Test that thisLoop functionality works with terminateLoopSoon
1621  */
1622 TEST(EventBaseTest, ThisLoop) {
1623   EventBase eb;
1624   bool runInLoop = false;
1625   bool runThisLoop = false;
1626
1627   eb.runInLoop([&](){
1628       eb.terminateLoopSoon();
1629       eb.runInLoop([&]() {
1630           runInLoop = true;
1631         });
1632       eb.runInLoop([&]() {
1633           runThisLoop = true;
1634         }, true);
1635     }, true);
1636   eb.loopForever();
1637
1638   // Should not work
1639   ASSERT_FALSE(runInLoop);
1640   // Should work with thisLoop
1641   ASSERT_TRUE(runThisLoop);
1642 }
1643
1644 TEST(EventBaseTest, EventBaseThreadLoop) {
1645   EventBase base;
1646   bool ran = false;
1647
1648   base.runInEventBaseThread([&](){
1649     ran = true;
1650   });
1651   base.loop();
1652
1653   ASSERT_EQ(true, ran);
1654 }
1655
1656 TEST(EventBaseTest, EventBaseThreadName) {
1657   EventBase base;
1658   base.setName("foo");
1659   base.loop();
1660
1661 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1662   char name[16];
1663   pthread_getname_np(pthread_self(), name, 16);
1664   ASSERT_EQ(0, strcmp("foo", name));
1665 #endif
1666 }
1667
1668 TEST(EventBaseTest, RunBeforeLoop) {
1669   EventBase base;
1670   CountedLoopCallback cb(&base, 1, [&](){
1671     base.terminateLoopSoon();
1672   });
1673   base.runBeforeLoop(&cb);
1674   base.loopForever();
1675   ASSERT_EQ(cb.getCount(), 0);
1676 }
1677
1678 TEST(EventBaseTest, RunBeforeLoopWait) {
1679   EventBase base;
1680   CountedLoopCallback cb(&base, 1);
1681   base.tryRunAfterDelay([&](){
1682       base.terminateLoopSoon();
1683     }, 500);
1684   base.runBeforeLoop(&cb);
1685   base.loopForever();
1686
1687   // Check that we only ran once, and did not loop multiple times.
1688   ASSERT_EQ(cb.getCount(), 0);
1689 }
1690
1691 class PipeHandler : public EventHandler {
1692 public:
1693   PipeHandler(EventBase* eventBase, int fd)
1694     : EventHandler(eventBase, fd) {}
1695
1696   void handlerReady(uint16_t /* events */) noexcept override { abort(); }
1697 };
1698
1699 TEST(EventBaseTest, StopBeforeLoop) {
1700   EventBase evb;
1701
1702   // Give the evb something to do.
1703   int p[2];
1704   ASSERT_EQ(0, pipe(p));
1705   PipeHandler handler(&evb, p[0]);
1706   handler.registerHandler(EventHandler::READ);
1707
1708   // It's definitely not running yet
1709   evb.terminateLoopSoon();
1710
1711   // let it run, it should exit quickly.
1712   std::thread t([&] { evb.loop(); });
1713   t.join();
1714
1715   handler.unregisterHandler();
1716   close(p[0]);
1717   close(p[1]);
1718
1719   SUCCEED();
1720 }
1721
1722 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1723   bool ran = false;
1724
1725   {
1726     EventBase base;
1727     base.runInEventBaseThread([&](){
1728       ran = true;
1729     });
1730   }
1731
1732   ASSERT_TRUE(ran);
1733 }
1734
1735 TEST(EventBaseTest, LoopKeepAlive) {
1736   EventBase evb;
1737
1738   bool done = false;
1739   std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1740     /* sleep override */ std::this_thread::sleep_for(
1741         std::chrono::milliseconds(100));
1742     evb.runInEventBaseThread(
1743         [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1744   });
1745
1746   evb.loop();
1747
1748   ASSERT_TRUE(done);
1749
1750   t.join();
1751 }
1752
1753 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1754   EventBase evb;
1755
1756   bool done = false;
1757   std::thread t;
1758
1759   evb.runInEventBaseThread([&] {
1760     t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1761       /* sleep override */ std::this_thread::sleep_for(
1762           std::chrono::milliseconds(100));
1763       evb.runInEventBaseThread(
1764           [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1765     });
1766   });
1767
1768   evb.loop();
1769
1770   ASSERT_TRUE(done);
1771
1772   t.join();
1773 }
1774
1775 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1776   std::unique_ptr<EventBase> evb = folly::make_unique<EventBase>();
1777
1778   bool done = false;
1779
1780   std::thread evThread([&] {
1781     evb->loopForever();
1782     evb.reset();
1783     done = true;
1784   });
1785
1786   {
1787     auto* ev = evb.get();
1788     EventBase::LoopKeepAlive keepAlive;
1789     ev->runInEventBaseThreadAndWait(
1790         [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); });
1791     ASSERT_FALSE(done) << "Loop finished before we asked it to";
1792     ev->terminateLoopSoon();
1793     /* sleep override */
1794     std::this_thread::sleep_for(std::chrono::milliseconds(30));
1795     ASSERT_FALSE(done) << "Loop terminated early";
1796     ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{});
1797   }
1798
1799   evThread.join();
1800   ASSERT_TRUE(done);
1801 }
1802
1803 TEST(EventBaseTest, LoopKeepAliveShutdown) {
1804   auto evb = folly::make_unique<EventBase>();
1805
1806   bool done = false;
1807
1808   std::thread t([
1809     &done,
1810     loopKeepAlive = evb->loopKeepAlive(),
1811     evbPtr = evb.get()
1812   ]() mutable {
1813     /* sleep override */ std::this_thread::sleep_for(
1814         std::chrono::milliseconds(100));
1815     evbPtr->runInEventBaseThread(
1816         [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1817   });
1818
1819   evb.reset();
1820
1821   ASSERT_TRUE(done);
1822
1823   t.join();
1824 }
1825
1826 TEST(EventBaseTest, LoopKeepAliveAtomic) {
1827   auto evb = folly::make_unique<EventBase>();
1828
1829   static constexpr size_t kNumThreads = 100;
1830   static constexpr size_t kNumTasks = 100;
1831
1832   std::vector<std::thread> ts;
1833   std::vector<std::unique_ptr<Baton<>>> batons;
1834   size_t done{0};
1835
1836   for (size_t i = 0; i < kNumThreads; ++i) {
1837     batons.emplace_back(std::make_unique<Baton<>>());
1838   }
1839
1840   for (size_t i = 0; i < kNumThreads; ++i) {
1841     ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
1842       std::vector<EventBase::LoopKeepAlive> keepAlives;
1843       for (size_t j = 0; j < kNumTasks; ++j) {
1844         keepAlives.emplace_back(evbPtr->loopKeepAliveAtomic());
1845       }
1846
1847       batonPtr->post();
1848
1849       /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
1850
1851       for (auto& keepAlive : keepAlives) {
1852         evbPtr->runInEventBaseThread(
1853             [&done, keepAlive = std::move(keepAlive) ]() { ++done; });
1854       }
1855     });
1856   }
1857
1858   for (auto& baton : batons) {
1859     baton->wait();
1860   }
1861
1862   evb.reset();
1863
1864   EXPECT_EQ(kNumThreads * kNumTasks, done);
1865
1866   for (auto& t : ts) {
1867     t.join();
1868   }
1869 }
1870
1871 TEST(EventBaseTest, DrivableExecutorTest) {
1872   folly::Promise<bool> p;
1873   auto f = p.getFuture();
1874   EventBase base;
1875   bool finished = false;
1876
1877   std::thread t([&] {
1878     /* sleep override */
1879     std::this_thread::sleep_for(std::chrono::microseconds(10));
1880     finished = true;
1881     base.runInEventBaseThread([&]() { p.setValue(true); });
1882   });
1883
1884   // Ensure drive does not busy wait
1885   base.drive(); // TODO: fix notification queue init() extra wakeup
1886   base.drive();
1887   EXPECT_TRUE(finished);
1888
1889   folly::Promise<bool> p2;
1890   auto f2 = p2.getFuture();
1891   // Ensure waitVia gets woken up properly, even from
1892   // a separate thread.
1893   base.runAfterDelay([&]() { p2.setValue(true); }, 10);
1894   f2.waitVia(&base);
1895   EXPECT_TRUE(f2.isReady());
1896
1897   t.join();
1898 }
1899
1900 TEST(EventBaseTest, RequestContextTest) {
1901   EventBase evb;
1902   auto defaultCtx = RequestContext::get();
1903
1904   {
1905     RequestContextScopeGuard rctx;
1906     auto context = RequestContext::get();
1907     EXPECT_NE(defaultCtx, context);
1908     evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
1909   }
1910
1911   EXPECT_EQ(defaultCtx, RequestContext::get());
1912   evb.loop();
1913   EXPECT_EQ(defaultCtx, RequestContext::get());
1914 }