Clean up runBeforeLoop API
[folly.git] / folly / io / async / test / EventBaseTest.cpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 #include <folly/io/async/AsyncTimeout.h>
20 #include <folly/io/async/EventBase.h>
21 #include <folly/io/async/EventHandler.h>
22 #include <folly/io/async/test/SocketPair.h>
23 #include <folly/io/async/test/Util.h>
24
25 #include <iostream>
26 #include <unistd.h>
27 #include <memory>
28
29 using std::deque;
30 using std::pair;
31 using std::vector;
32 using std::make_pair;
33 using std::cerr;
34 using std::endl;
35
36 using namespace folly;
37
38 ///////////////////////////////////////////////////////////////////////////
39 // Tests for read and write events
40 ///////////////////////////////////////////////////////////////////////////
41
42 enum { BUF_SIZE = 4096 };
43
44 ssize_t writeToFD(int fd, size_t length) {
45   // write an arbitrary amount of data to the fd
46   char buf[length];
47   memset(buf, 'a', sizeof(buf));
48   ssize_t rc = write(fd, buf, sizeof(buf));
49   CHECK_EQ(rc, length);
50   return rc;
51 }
52
53 size_t writeUntilFull(int fd) {
54   // Write to the fd until EAGAIN is returned
55   size_t bytesWritten = 0;
56   char buf[BUF_SIZE];
57   memset(buf, 'a', sizeof(buf));
58   while (true) {
59     ssize_t rc = write(fd, buf, sizeof(buf));
60     if (rc < 0) {
61       CHECK_EQ(errno, EAGAIN);
62       break;
63     } else {
64       bytesWritten += rc;
65     }
66   }
67   return bytesWritten;
68 }
69
70 ssize_t readFromFD(int fd, size_t length) {
71   // write an arbitrary amount of data to the fd
72   char buf[length];
73   return read(fd, buf, sizeof(buf));
74 }
75
76 size_t readUntilEmpty(int fd) {
77   // Read from the fd until EAGAIN is returned
78   char buf[BUF_SIZE];
79   size_t bytesRead = 0;
80   while (true) {
81     int rc = read(fd, buf, sizeof(buf));
82     if (rc == 0) {
83       CHECK(false) << "unexpected EOF";
84     } else if (rc < 0) {
85       CHECK_EQ(errno, EAGAIN);
86       break;
87     } else {
88       bytesRead += rc;
89     }
90   }
91   return bytesRead;
92 }
93
94 void checkReadUntilEmpty(int fd, size_t expectedLength) {
95   ASSERT_EQ(readUntilEmpty(fd), expectedLength);
96 }
97
98 struct ScheduledEvent {
99   int milliseconds;
100   uint16_t events;
101   size_t length;
102   ssize_t result;
103
104   void perform(int fd) {
105     if (events & EventHandler::READ) {
106       if (length == 0) {
107         result = readUntilEmpty(fd);
108       } else {
109         result = readFromFD(fd, length);
110       }
111     }
112     if (events & EventHandler::WRITE) {
113       if (length == 0) {
114         result = writeUntilFull(fd);
115       } else {
116         result = writeToFD(fd, length);
117       }
118     }
119   }
120 };
121
122 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
123   for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
124     eventBase->runAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
125                              ev->milliseconds);
126   }
127 }
128
129 class TestHandler : public EventHandler {
130  public:
131   TestHandler(EventBase* eventBase, int fd)
132     : EventHandler(eventBase, fd), fd_(fd) {}
133
134   virtual void handlerReady(uint16_t events) noexcept {
135     ssize_t bytesRead = 0;
136     ssize_t bytesWritten = 0;
137     if (events & READ) {
138       // Read all available data, so EventBase will stop calling us
139       // until new data becomes available
140       bytesRead = readUntilEmpty(fd_);
141     }
142     if (events & WRITE) {
143       // Write until the pipe buffer is full, so EventBase will stop calling
144       // us until the other end has read some data
145       bytesWritten = writeUntilFull(fd_);
146     }
147
148     log.push_back(EventRecord(events, bytesRead, bytesWritten));
149   }
150
151   struct EventRecord {
152     EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
153       : events(events)
154       , timestamp()
155       , bytesRead(bytesRead)
156       , bytesWritten(bytesWritten) {}
157
158     uint16_t events;
159     TimePoint timestamp;
160     ssize_t bytesRead;
161     ssize_t bytesWritten;
162   };
163
164   deque<EventRecord> log;
165
166  private:
167   int fd_;
168 };
169
170 /**
171  * Test a READ event
172  */
173 TEST(EventBaseTest, ReadEvent) {
174   EventBase eb;
175   SocketPair sp;
176
177   // Register for read events
178   TestHandler handler(&eb, sp[0]);
179   handler.registerHandler(EventHandler::READ);
180
181   // Register timeouts to perform two write events
182   ScheduledEvent events[] = {
183     { 10, EventHandler::WRITE, 2345 },
184     { 160, EventHandler::WRITE, 99 },
185     { 0, 0, 0 },
186   };
187   scheduleEvents(&eb, sp[1], events);
188
189   // Loop
190   TimePoint start;
191   eb.loop();
192   TimePoint end;
193
194   // Since we didn't use the EventHandler::PERSIST flag, the handler should
195   // have received the first read, then unregistered itself.  Check that only
196   // the first chunk of data was received.
197   ASSERT_EQ(handler.log.size(), 1);
198   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
199   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds, 90);
200   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
201   ASSERT_EQ(handler.log[0].bytesWritten, 0);
202   T_CHECK_TIMEOUT(start, end, events[1].milliseconds, 30);
203
204   // Make sure the second chunk of data is still waiting to be read.
205   size_t bytesRemaining = readUntilEmpty(sp[0]);
206   ASSERT_EQ(bytesRemaining, events[1].length);
207 }
208
209 /**
210  * Test (READ | PERSIST)
211  */
212 TEST(EventBaseTest, ReadPersist) {
213   EventBase eb;
214   SocketPair sp;
215
216   // Register for read events
217   TestHandler handler(&eb, sp[0]);
218   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
219
220   // Register several timeouts to perform writes
221   ScheduledEvent events[] = {
222     { 10,  EventHandler::WRITE, 1024 },
223     { 20,  EventHandler::WRITE, 2211 },
224     { 30,  EventHandler::WRITE, 4096 },
225     { 100, EventHandler::WRITE, 100 },
226     { 0, 0 },
227   };
228   scheduleEvents(&eb, sp[1], events);
229
230   // Schedule a timeout to unregister the handler after the third write
231   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
232
233   // Loop
234   TimePoint start;
235   eb.loop();
236   TimePoint end;
237
238   // The handler should have received the first 3 events,
239   // then been unregistered after that.
240   ASSERT_EQ(handler.log.size(), 3);
241   for (int n = 0; n < 3; ++n) {
242     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
243     T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds);
244     ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
245     ASSERT_EQ(handler.log[n].bytesWritten, 0);
246   }
247   T_CHECK_TIMEOUT(start, end, events[3].milliseconds);
248
249   // Make sure the data from the last write is still waiting to be read
250   size_t bytesRemaining = readUntilEmpty(sp[0]);
251   ASSERT_EQ(bytesRemaining, events[3].length);
252 }
253
254 /**
255  * Test registering for READ when the socket is immediately readable
256  */
257 TEST(EventBaseTest, ReadImmediate) {
258   EventBase eb;
259   SocketPair sp;
260
261   // Write some data to the socket so the other end will
262   // be immediately readable
263   size_t dataLength = 1234;
264   writeToFD(sp[1], dataLength);
265
266   // Register for read events
267   TestHandler handler(&eb, sp[0]);
268   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
269
270   // Register a timeout to perform another write
271   ScheduledEvent events[] = {
272     { 10, EventHandler::WRITE, 2345 },
273     { 0, 0, 0 },
274   };
275   scheduleEvents(&eb, sp[1], events);
276
277   // Schedule a timeout to unregister the handler
278   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
279
280   // Loop
281   TimePoint start;
282   eb.loop();
283   TimePoint end;
284
285   ASSERT_EQ(handler.log.size(), 2);
286
287   // There should have been 1 event for immediate readability
288   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
289   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
290   ASSERT_EQ(handler.log[0].bytesRead, dataLength);
291   ASSERT_EQ(handler.log[0].bytesWritten, 0);
292
293   // There should be another event after the timeout wrote more data
294   ASSERT_EQ(handler.log[1].events, EventHandler::READ);
295   T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds);
296   ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
297   ASSERT_EQ(handler.log[1].bytesWritten, 0);
298
299   T_CHECK_TIMEOUT(start, end, 20);
300 }
301
302 /**
303  * Test a WRITE event
304  */
305 TEST(EventBaseTest, WriteEvent) {
306   EventBase eb;
307   SocketPair sp;
308
309   // Fill up the write buffer before starting
310   size_t initialBytesWritten = writeUntilFull(sp[0]);
311
312   // Register for write events
313   TestHandler handler(&eb, sp[0]);
314   handler.registerHandler(EventHandler::WRITE);
315
316   // Register timeouts to perform two reads
317   ScheduledEvent events[] = {
318     { 10, EventHandler::READ, 0 },
319     { 60, EventHandler::READ, 0 },
320     { 0, 0, 0 },
321   };
322   scheduleEvents(&eb, sp[1], events);
323
324   // Loop
325   TimePoint start;
326   eb.loop();
327   TimePoint end;
328
329   // Since we didn't use the EventHandler::PERSIST flag, the handler should
330   // have only been able to write once, then unregistered itself.
331   ASSERT_EQ(handler.log.size(), 1);
332   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
333   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
334   ASSERT_EQ(handler.log[0].bytesRead, 0);
335   ASSERT_GT(handler.log[0].bytesWritten, 0);
336   T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
337
338   ASSERT_EQ(events[0].result, initialBytesWritten);
339   ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
340 }
341
342 /**
343  * Test (WRITE | PERSIST)
344  */
345 TEST(EventBaseTest, WritePersist) {
346   EventBase eb;
347   SocketPair sp;
348
349   // Fill up the write buffer before starting
350   size_t initialBytesWritten = writeUntilFull(sp[0]);
351
352   // Register for write events
353   TestHandler handler(&eb, sp[0]);
354   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
355
356   // Register several timeouts to read from the socket at several intervals
357   ScheduledEvent events[] = {
358     { 10,  EventHandler::READ, 0 },
359     { 40,  EventHandler::READ, 0 },
360     { 70,  EventHandler::READ, 0 },
361     { 100, EventHandler::READ, 0 },
362     { 0, 0 },
363   };
364   scheduleEvents(&eb, sp[1], events);
365
366   // Schedule a timeout to unregister the handler after the third read
367   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
368
369   // Loop
370   TimePoint start;
371   eb.loop();
372   TimePoint end;
373
374   // The handler should have received the first 3 events,
375   // then been unregistered after that.
376   ASSERT_EQ(handler.log.size(), 3);
377   ASSERT_EQ(events[0].result, initialBytesWritten);
378   for (int n = 0; n < 3; ++n) {
379     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
380     T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds);
381     ASSERT_EQ(handler.log[n].bytesRead, 0);
382     ASSERT_GT(handler.log[n].bytesWritten, 0);
383     ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
384   }
385   T_CHECK_TIMEOUT(start, end, events[3].milliseconds);
386 }
387
388 /**
389  * Test registering for WRITE when the socket is immediately writable
390  */
391 TEST(EventBaseTest, WriteImmediate) {
392   EventBase eb;
393   SocketPair sp;
394
395   // Register for write events
396   TestHandler handler(&eb, sp[0]);
397   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
398
399   // Register a timeout to perform a read
400   ScheduledEvent events[] = {
401     { 10, EventHandler::READ, 0 },
402     { 0, 0, 0 },
403   };
404   scheduleEvents(&eb, sp[1], events);
405
406   // Schedule a timeout to unregister the handler
407   int64_t unregisterTimeout = 40;
408   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
409                    unregisterTimeout);
410
411   // Loop
412   TimePoint start;
413   eb.loop();
414   TimePoint end;
415
416   ASSERT_EQ(handler.log.size(), 2);
417
418   // Since the socket buffer was initially empty,
419   // there should have been 1 event for immediate writability
420   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
421   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
422   ASSERT_EQ(handler.log[0].bytesRead, 0);
423   ASSERT_GT(handler.log[0].bytesWritten, 0);
424
425   // There should be another event after the timeout wrote more data
426   ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
427   T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds);
428   ASSERT_EQ(handler.log[1].bytesRead, 0);
429   ASSERT_GT(handler.log[1].bytesWritten, 0);
430
431   T_CHECK_TIMEOUT(start, end, unregisterTimeout);
432 }
433
434 /**
435  * Test (READ | WRITE) when the socket becomes readable first
436  */
437 TEST(EventBaseTest, ReadWrite) {
438   EventBase eb;
439   SocketPair sp;
440
441   // Fill up the write buffer before starting
442   size_t sock0WriteLength = writeUntilFull(sp[0]);
443
444   // Register for read and write events
445   TestHandler handler(&eb, sp[0]);
446   handler.registerHandler(EventHandler::READ_WRITE);
447
448   // Register timeouts to perform a write then a read.
449   ScheduledEvent events[] = {
450     { 10, EventHandler::WRITE, 2345 },
451     { 40, EventHandler::READ, 0 },
452     { 0, 0, 0 },
453   };
454   scheduleEvents(&eb, sp[1], events);
455
456   // Loop
457   TimePoint start;
458   eb.loop();
459   TimePoint end;
460
461   // Since we didn't use the EventHandler::PERSIST flag, the handler should
462   // have only noticed readability, then unregistered itself.  Check that only
463   // one event was logged.
464   ASSERT_EQ(handler.log.size(), 1);
465   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
466   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
467   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
468   ASSERT_EQ(handler.log[0].bytesWritten, 0);
469   ASSERT_EQ(events[1].result, sock0WriteLength);
470   T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
471 }
472
473 /**
474  * Test (READ | WRITE) when the socket becomes writable first
475  */
476 TEST(EventBaseTest, WriteRead) {
477   EventBase eb;
478   SocketPair sp;
479
480   // Fill up the write buffer before starting
481   size_t sock0WriteLength = writeUntilFull(sp[0]);
482
483   // Register for read and write events
484   TestHandler handler(&eb, sp[0]);
485   handler.registerHandler(EventHandler::READ_WRITE);
486
487   // Register timeouts to perform a read then a write.
488   size_t sock1WriteLength = 2345;
489   ScheduledEvent events[] = {
490     { 10, EventHandler::READ, 0 },
491     { 40, EventHandler::WRITE, sock1WriteLength },
492     { 0, 0, 0 },
493   };
494   scheduleEvents(&eb, sp[1], events);
495
496   // Loop
497   TimePoint start;
498   eb.loop();
499   TimePoint end;
500
501   // Since we didn't use the EventHandler::PERSIST flag, the handler should
502   // have only noticed writability, then unregistered itself.  Check that only
503   // one event was logged.
504   ASSERT_EQ(handler.log.size(), 1);
505   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
506   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
507   ASSERT_EQ(handler.log[0].bytesRead, 0);
508   ASSERT_GT(handler.log[0].bytesWritten, 0);
509   ASSERT_EQ(events[0].result, sock0WriteLength);
510   ASSERT_EQ(events[1].result, sock1WriteLength);
511   T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
512
513   // Make sure the written data is still waiting to be read.
514   size_t bytesRemaining = readUntilEmpty(sp[0]);
515   ASSERT_EQ(bytesRemaining, events[1].length);
516 }
517
518 /**
519  * Test (READ | WRITE) when the socket becomes readable and writable
520  * at the same time.
521  */
522 TEST(EventBaseTest, ReadWriteSimultaneous) {
523   EventBase eb;
524   SocketPair sp;
525
526   // Fill up the write buffer before starting
527   size_t sock0WriteLength = writeUntilFull(sp[0]);
528
529   // Register for read and write events
530   TestHandler handler(&eb, sp[0]);
531   handler.registerHandler(EventHandler::READ_WRITE);
532
533   // Register a timeout to perform a read and write together
534   ScheduledEvent events[] = {
535     { 10, EventHandler::READ | EventHandler::WRITE, 0 },
536     { 0, 0, 0 },
537   };
538   scheduleEvents(&eb, sp[1], events);
539
540   // Loop
541   TimePoint start;
542   eb.loop();
543   TimePoint end;
544
545   // It's not strictly required that the EventBase register us about both
546   // events in the same call.  So, it's possible that if the EventBase
547   // implementation changes this test could start failing, and it wouldn't be
548   // considered breaking the API.  However for now it's nice to exercise this
549   // code path.
550   ASSERT_EQ(handler.log.size(), 1);
551   ASSERT_EQ(handler.log[0].events,
552                     EventHandler::READ | EventHandler::WRITE);
553   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
554   ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
555   ASSERT_GT(handler.log[0].bytesWritten, 0);
556   T_CHECK_TIMEOUT(start, end, events[0].milliseconds);
557 }
558
559 /**
560  * Test (READ | WRITE | PERSIST)
561  */
562 TEST(EventBaseTest, ReadWritePersist) {
563   EventBase eb;
564   SocketPair sp;
565
566   // Register for read and write events
567   TestHandler handler(&eb, sp[0]);
568   handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
569                           EventHandler::PERSIST);
570
571   // Register timeouts to perform several reads and writes
572   ScheduledEvent events[] = {
573     { 10, EventHandler::WRITE, 2345 },
574     { 20, EventHandler::READ, 0 },
575     { 35, EventHandler::WRITE, 200 },
576     { 45, EventHandler::WRITE, 15 },
577     { 55, EventHandler::READ, 0 },
578     { 120, EventHandler::WRITE, 2345 },
579     { 0, 0, 0 },
580   };
581   scheduleEvents(&eb, sp[1], events);
582
583   // Schedule a timeout to unregister the handler
584   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
585
586   // Loop
587   TimePoint start;
588   eb.loop();
589   TimePoint end;
590
591   ASSERT_EQ(handler.log.size(), 6);
592
593   // Since we didn't fill up the write buffer immediately, there should
594   // be an immediate event for writability.
595   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
596   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
597   ASSERT_EQ(handler.log[0].bytesRead, 0);
598   ASSERT_GT(handler.log[0].bytesWritten, 0);
599
600   // Events 1 through 5 should correspond to the scheduled events
601   for (int n = 1; n < 6; ++n) {
602     ScheduledEvent* event = &events[n - 1];
603     T_CHECK_TIMEOUT(start, handler.log[n].timestamp, event->milliseconds);
604     if (event->events == EventHandler::READ) {
605       ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
606       ASSERT_EQ(handler.log[n].bytesRead, 0);
607       ASSERT_GT(handler.log[n].bytesWritten, 0);
608     } else {
609       ASSERT_EQ(handler.log[n].events, EventHandler::READ);
610       ASSERT_EQ(handler.log[n].bytesRead, event->length);
611       ASSERT_EQ(handler.log[n].bytesWritten, 0);
612     }
613   }
614
615   // The timeout should have unregistered the handler before the last write.
616   // Make sure that data is still waiting to be read
617   size_t bytesRemaining = readUntilEmpty(sp[0]);
618   ASSERT_EQ(bytesRemaining, events[5].length);
619 }
620
621
622 class PartialReadHandler : public TestHandler {
623  public:
624   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
625     : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
626
627   virtual void handlerReady(uint16_t events) noexcept {
628     assert(events == EventHandler::READ);
629     ssize_t bytesRead = readFromFD(fd_, readLength_);
630     log.push_back(EventRecord(events, bytesRead, 0));
631   }
632
633  private:
634   int fd_;
635   size_t readLength_;
636 };
637
638 /**
639  * Test reading only part of the available data when a read event is fired.
640  * When PERSIST is used, make sure the handler gets notified again the next
641  * time around the loop.
642  */
643 TEST(EventBaseTest, ReadPartial) {
644   EventBase eb;
645   SocketPair sp;
646
647   // Register for read events
648   size_t readLength = 100;
649   PartialReadHandler handler(&eb, sp[0], readLength);
650   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
651
652   // Register a timeout to perform a single write,
653   // with more data than PartialReadHandler will read at once
654   ScheduledEvent events[] = {
655     { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) },
656     { 0, 0, 0 },
657   };
658   scheduleEvents(&eb, sp[1], events);
659
660   // Schedule a timeout to unregister the handler
661   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
662
663   // Loop
664   TimePoint start;
665   eb.loop();
666   TimePoint end;
667
668   ASSERT_EQ(handler.log.size(), 4);
669
670   // The first 3 invocations should read readLength bytes each
671   for (int n = 0; n < 3; ++n) {
672     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
673     T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds);
674     ASSERT_EQ(handler.log[n].bytesRead, readLength);
675     ASSERT_EQ(handler.log[n].bytesWritten, 0);
676   }
677   // The last read only has readLength/2 bytes
678   ASSERT_EQ(handler.log[3].events, EventHandler::READ);
679   T_CHECK_TIMEOUT(start, handler.log[3].timestamp, events[0].milliseconds);
680   ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
681   ASSERT_EQ(handler.log[3].bytesWritten, 0);
682 }
683
684
685 class PartialWriteHandler : public TestHandler {
686  public:
687   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
688     : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
689
690   virtual void handlerReady(uint16_t events) noexcept {
691     assert(events == EventHandler::WRITE);
692     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
693     log.push_back(EventRecord(events, 0, bytesWritten));
694   }
695
696  private:
697   int fd_;
698   size_t writeLength_;
699 };
700
701 /**
702  * Test writing without completely filling up the write buffer when the fd
703  * becomes writable.  When PERSIST is used, make sure the handler gets
704  * notified again the next time around the loop.
705  */
706 TEST(EventBaseTest, WritePartial) {
707   EventBase eb;
708   SocketPair sp;
709
710   // Fill up the write buffer before starting
711   size_t initialBytesWritten = writeUntilFull(sp[0]);
712
713   // Register for write events
714   size_t writeLength = 100;
715   PartialWriteHandler handler(&eb, sp[0], writeLength);
716   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
717
718   // Register a timeout to read, so that more data can be written
719   ScheduledEvent events[] = {
720     { 10, EventHandler::READ, 0 },
721     { 0, 0, 0 },
722   };
723   scheduleEvents(&eb, sp[1], events);
724
725   // Schedule a timeout to unregister the handler
726   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
727
728   // Loop
729   TimePoint start;
730   eb.loop();
731   TimePoint end;
732
733   // Depending on how big the socket buffer is, there will be multiple writes
734   // Only check the first 5
735   int numChecked = 5;
736   ASSERT_GE(handler.log.size(), numChecked);
737   ASSERT_EQ(events[0].result, initialBytesWritten);
738
739   // The first 3 invocations should read writeLength bytes each
740   for (int n = 0; n < numChecked; ++n) {
741     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
742     T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds);
743     ASSERT_EQ(handler.log[n].bytesRead, 0);
744     ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
745   }
746 }
747
748
749 /**
750  * Test destroying a registered EventHandler
751  */
752 TEST(EventBaseTest, DestroyHandler) {
753   class DestroyHandler : public TAsyncTimeout {
754    public:
755     DestroyHandler(EventBase* eb, EventHandler* h)
756       : TAsyncTimeout(eb)
757       , handler_(h) {}
758
759     virtual void timeoutExpired() noexcept {
760       delete handler_;
761     }
762
763    private:
764     EventHandler* handler_;
765   };
766
767   EventBase eb;
768   SocketPair sp;
769
770   // Fill up the write buffer before starting
771   size_t initialBytesWritten = writeUntilFull(sp[0]);
772
773   // Register for write events
774   TestHandler* handler = new TestHandler(&eb, sp[0]);
775   handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
776
777   // After 10ms, read some data, so that the handler
778   // will be notified that it can write.
779   eb.runAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
780                    10);
781
782   // Start a timer to destroy the handler after 25ms
783   // This mainly just makes sure the code doesn't break or assert
784   DestroyHandler dh(&eb, handler);
785   dh.scheduleTimeout(25);
786
787   TimePoint start;
788   eb.loop();
789   TimePoint end;
790
791   // Make sure the EventHandler was uninstalled properly when it was
792   // destroyed, and the EventBase loop exited
793   T_CHECK_TIMEOUT(start, end, 25);
794
795   // Make sure that the handler wrote data to the socket
796   // before it was destroyed
797   size_t bytesRemaining = readUntilEmpty(sp[1]);
798   ASSERT_GT(bytesRemaining, 0);
799 }
800
801
802 ///////////////////////////////////////////////////////////////////////////
803 // Tests for timeout events
804 ///////////////////////////////////////////////////////////////////////////
805
806 TEST(EventBaseTest, RunAfterDelay) {
807   EventBase eb;
808
809   TimePoint timestamp1(false);
810   TimePoint timestamp2(false);
811   TimePoint timestamp3(false);
812   eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
813   eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
814   eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
815
816   TimePoint start;
817   eb.loop();
818   TimePoint end;
819
820   T_CHECK_TIMEOUT(start, timestamp1, 10);
821   T_CHECK_TIMEOUT(start, timestamp2, 20);
822   T_CHECK_TIMEOUT(start, timestamp3, 40);
823   T_CHECK_TIMEOUT(start, end, 40);
824 }
825
826 /**
827  * Test the behavior of runAfterDelay() when some timeouts are
828  * still scheduled when the EventBase is destroyed.
829  */
830 TEST(EventBaseTest, RunAfterDelayDestruction) {
831   TimePoint timestamp1(false);
832   TimePoint timestamp2(false);
833   TimePoint timestamp3(false);
834   TimePoint timestamp4(false);
835   TimePoint start(false);
836   TimePoint end(false);
837
838   {
839     EventBase eb;
840
841     // Run two normal timeouts
842     eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
843     eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
844
845     // Schedule a timeout to stop the event loop after 40ms
846     eb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
847
848     // Schedule 2 timeouts that would fire after the event loop stops
849     eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
850     eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
851
852     start.reset();
853     eb.loop();
854     end.reset();
855   }
856
857   T_CHECK_TIMEOUT(start, timestamp1, 10);
858   T_CHECK_TIMEOUT(start, timestamp2, 20);
859   T_CHECK_TIMEOUT(start, end, 40);
860
861   ASSERT_TRUE(timestamp3.isUnset());
862   ASSERT_TRUE(timestamp4.isUnset());
863
864   // Ideally this test should be run under valgrind to ensure that no
865   // memory is leaked.
866 }
867
868 class TestTimeout : public TAsyncTimeout {
869  public:
870   explicit TestTimeout(EventBase* eventBase)
871     : TAsyncTimeout(eventBase)
872     , timestamp(false) {}
873
874   virtual void timeoutExpired() noexcept {
875     timestamp.reset();
876   }
877
878   TimePoint timestamp;
879 };
880
881 TEST(EventBaseTest, BasicTimeouts) {
882   EventBase eb;
883
884   TestTimeout t1(&eb);
885   TestTimeout t2(&eb);
886   TestTimeout t3(&eb);
887   t1.scheduleTimeout(10);
888   t2.scheduleTimeout(20);
889   t3.scheduleTimeout(40);
890
891   TimePoint start;
892   eb.loop();
893   TimePoint end;
894
895   T_CHECK_TIMEOUT(start, t1.timestamp, 10);
896   T_CHECK_TIMEOUT(start, t2.timestamp, 20);
897   T_CHECK_TIMEOUT(start, t3.timestamp, 40);
898   T_CHECK_TIMEOUT(start, end, 40);
899 }
900
901 class ReschedulingTimeout : public TAsyncTimeout {
902  public:
903   ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
904     : TAsyncTimeout(evb)
905     , timeouts_(timeouts)
906     , iterator_(timeouts_.begin()) {}
907
908   void start() {
909     reschedule();
910   }
911
912   virtual void timeoutExpired() noexcept {
913     timestamps.push_back(TimePoint());
914     reschedule();
915   }
916
917   void reschedule() {
918     if (iterator_ != timeouts_.end()) {
919       uint32_t timeout = *iterator_;
920       ++iterator_;
921       scheduleTimeout(timeout);
922     }
923   }
924
925   vector<TimePoint> timestamps;
926
927  private:
928   vector<uint32_t> timeouts_;
929   vector<uint32_t>::const_iterator iterator_;
930 };
931
932 /**
933  * Test rescheduling the same timeout multiple times
934  */
935 TEST(EventBaseTest, ReuseTimeout) {
936   EventBase eb;
937
938   vector<uint32_t> timeouts;
939   timeouts.push_back(10);
940   timeouts.push_back(30);
941   timeouts.push_back(15);
942
943   ReschedulingTimeout t(&eb, timeouts);
944   t.start();
945
946   TimePoint start;
947   eb.loop();
948   TimePoint end;
949
950   // Use a higher tolerance than usual.  We're waiting on 3 timeouts
951   // consecutively.  In general, each timeout may go over by a few
952   // milliseconds, and we're tripling this error by witing on 3 timeouts.
953   int64_t tolerance = 6;
954
955   ASSERT_EQ(timeouts.size(), t.timestamps.size());
956   uint32_t total = 0;
957   for (int n = 0; n < timeouts.size(); ++n) {
958     total += timeouts[n];
959     T_CHECK_TIMEOUT(start, t.timestamps[n], total, tolerance);
960   }
961   T_CHECK_TIMEOUT(start, end, total, tolerance);
962 }
963
964 /**
965  * Test rescheduling a timeout before it has fired
966  */
967 TEST(EventBaseTest, RescheduleTimeout) {
968   EventBase eb;
969
970   TestTimeout t1(&eb);
971   TestTimeout t2(&eb);
972   TestTimeout t3(&eb);
973
974   t1.scheduleTimeout(15);
975   t2.scheduleTimeout(30);
976   t3.scheduleTimeout(30);
977
978   auto f = static_cast<bool(TAsyncTimeout::*)(uint32_t)>(
979       &TAsyncTimeout::scheduleTimeout);
980
981   // after 10ms, reschedule t2 to run sooner than originally scheduled
982   eb.runAfterDelay(std::bind(f, &t2, 10), 10);
983   // after 10ms, reschedule t3 to run later than originally scheduled
984   eb.runAfterDelay(std::bind(f, &t3, 40), 10);
985
986   TimePoint start;
987   eb.loop();
988   TimePoint end;
989
990   T_CHECK_TIMEOUT(start, t1.timestamp, 15);
991   T_CHECK_TIMEOUT(start, t2.timestamp, 20);
992   T_CHECK_TIMEOUT(start, t3.timestamp, 50);
993   T_CHECK_TIMEOUT(start, end, 50);
994 }
995
996 /**
997  * Test cancelling a timeout
998  */
999 TEST(EventBaseTest, CancelTimeout) {
1000   EventBase eb;
1001
1002   vector<uint32_t> timeouts;
1003   timeouts.push_back(10);
1004   timeouts.push_back(30);
1005   timeouts.push_back(25);
1006
1007   ReschedulingTimeout t(&eb, timeouts);
1008   t.start();
1009   eb.runAfterDelay(std::bind(&TAsyncTimeout::cancelTimeout, &t), 50);
1010
1011   TimePoint start;
1012   eb.loop();
1013   TimePoint end;
1014
1015   ASSERT_EQ(t.timestamps.size(), 2);
1016   T_CHECK_TIMEOUT(start, t.timestamps[0], 10);
1017   T_CHECK_TIMEOUT(start, t.timestamps[1], 40);
1018   T_CHECK_TIMEOUT(start, end, 50);
1019 }
1020
1021 /**
1022  * Test destroying a scheduled timeout object
1023  */
1024 TEST(EventBaseTest, DestroyTimeout) {
1025   class DestroyTimeout : public TAsyncTimeout {
1026    public:
1027     DestroyTimeout(EventBase* eb, TAsyncTimeout* t)
1028       : TAsyncTimeout(eb)
1029       , timeout_(t) {}
1030
1031     virtual void timeoutExpired() noexcept {
1032       delete timeout_;
1033     }
1034
1035    private:
1036     TAsyncTimeout* timeout_;
1037   };
1038
1039   EventBase eb;
1040
1041   TestTimeout* t1 = new TestTimeout(&eb);
1042   t1->scheduleTimeout(30);
1043
1044   DestroyTimeout dt(&eb, t1);
1045   dt.scheduleTimeout(10);
1046
1047   TimePoint start;
1048   eb.loop();
1049   TimePoint end;
1050
1051   T_CHECK_TIMEOUT(start, end, 10);
1052 }
1053
1054
1055 ///////////////////////////////////////////////////////////////////////////
1056 // Test for runInThreadTestFunc()
1057 ///////////////////////////////////////////////////////////////////////////
1058
1059 struct RunInThreadData {
1060   RunInThreadData(int numThreads, int opsPerThread)
1061     : opsPerThread(opsPerThread)
1062     , opsToGo(numThreads*opsPerThread) {}
1063
1064   EventBase evb;
1065   deque< pair<int, int> > values;
1066
1067   int opsPerThread;
1068   int opsToGo;
1069 };
1070
1071 struct RunInThreadArg {
1072   RunInThreadArg(RunInThreadData* data,
1073                  int threadId,
1074                  int value)
1075     : data(data)
1076     , thread(threadId)
1077     , value(value) {}
1078
1079   RunInThreadData* data;
1080   int thread;
1081   int value;
1082 };
1083
1084 void runInThreadTestFunc(RunInThreadArg* arg) {
1085   arg->data->values.push_back(make_pair(arg->thread, arg->value));
1086   RunInThreadData* data = arg->data;
1087   delete arg;
1088
1089   if(--data->opsToGo == 0) {
1090     // Break out of the event base loop if we are the last thread running
1091     data->evb.terminateLoopSoon();
1092   }
1093 }
1094
1095 class RunInThreadTester : public concurrency::Runnable {
1096  public:
1097   RunInThreadTester(int id, RunInThreadData* data) : id_(id), data_(data) {}
1098
1099   void run() {
1100     // Call evb->runInThread() a number of times
1101     {
1102       for (int n = 0; n < data_->opsPerThread; ++n) {
1103         RunInThreadArg* arg = new RunInThreadArg(data_, id_, n);
1104         data_->evb.runInEventBaseThread(runInThreadTestFunc, arg);
1105         usleep(10);
1106       }
1107     }
1108   }
1109
1110  private:
1111   int id_;
1112   RunInThreadData* data_;
1113 };
1114
1115 TEST(EventBaseTest, RunInThread) {
1116   uint32_t numThreads = 50;
1117   uint32_t opsPerThread = 100;
1118   RunInThreadData data(numThreads, opsPerThread);
1119
1120   PosixThreadFactory threadFactory;
1121   threadFactory.setDetached(false);
1122   deque< std::shared_ptr<Thread> > threads;
1123   for (int n = 0; n < numThreads; ++n) {
1124     std::shared_ptr<RunInThreadTester> runner(new RunInThreadTester(n, &data));
1125     std::shared_ptr<Thread> thread = threadFactory.newThread(runner);
1126     threads.push_back(thread);
1127     thread->start();
1128   }
1129
1130   // Add a timeout event to run after 3 seconds.
1131   // Otherwise loop() will return immediately since there are no events to run.
1132   // Once the last thread exits, it will stop the loop().  However, this
1133   // timeout also stops the loop in case there is a bug performing the normal
1134   // stop.
1135   data.evb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1136                          3000);
1137
1138   TimePoint start;
1139   data.evb.loop();
1140   TimePoint end;
1141
1142   // Verify that the loop exited because all threads finished and requested it
1143   // to stop.  This should happen much sooner than the 3 second timeout.
1144   // Assert that it happens in under a second.  (This is still tons of extra
1145   // padding.)
1146   int64_t timeTaken = end.getTime() - start.getTime();
1147   ASSERT_LT(timeTaken, 1000);
1148   VLOG(11) << "Time taken: " << timeTaken;
1149
1150   // Verify that we have all of the events from every thread
1151   int expectedValues[numThreads];
1152   for (int n = 0; n < numThreads; ++n) {
1153     expectedValues[n] = 0;
1154   }
1155   for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1156        it != data.values.end();
1157        ++it) {
1158     int threadID = it->first;
1159     int value = it->second;
1160     ASSERT_EQ(expectedValues[threadID], value);
1161     ++expectedValues[threadID];
1162   }
1163   for (int n = 0; n < numThreads; ++n) {
1164     ASSERT_EQ(expectedValues[n], opsPerThread);
1165   }
1166
1167   // Wait on all of the threads.  Otherwise we can exit and clean up
1168   // RunInThreadData before the last thread exits, while it is still holding
1169   // the RunInThreadData's mutex.
1170   for (deque< std::shared_ptr<Thread> >::const_iterator it = threads.begin();
1171        it != threads.end();
1172        ++it) {
1173     (*it)->join();
1174   }
1175 }
1176
1177 ///////////////////////////////////////////////////////////////////////////
1178 // Tests for runInLoop()
1179 ///////////////////////////////////////////////////////////////////////////
1180
1181 class CountedLoopCallback : public EventBase::LoopCallback {
1182  public:
1183   CountedLoopCallback(EventBase* eventBase,
1184                       unsigned int count,
1185                       std::function<void()> action =
1186                         std::function<void()>())
1187     : eventBase_(eventBase)
1188     , count_(count)
1189     , action_(action) {}
1190
1191   virtual void runLoopCallback() noexcept {
1192     --count_;
1193     if (count_ > 0) {
1194       eventBase_->runInLoop(this);
1195     } else if (action_) {
1196       action_();
1197     }
1198   }
1199
1200   unsigned int getCount() const {
1201     return count_;
1202   }
1203
1204  private:
1205   EventBase* eventBase_;
1206   unsigned int count_;
1207   std::function<void()> action_;
1208 };
1209
1210 // Test that EventBase::loop() doesn't exit while there are
1211 // still LoopCallbacks remaining to be invoked.
1212 TEST(EventBaseTest, RepeatedRunInLoop) {
1213   EventBase eventBase;
1214
1215   CountedLoopCallback c(&eventBase, 10);
1216   eventBase.runInLoop(&c);
1217   // The callback shouldn't have run immediately
1218   ASSERT_EQ(c.getCount(), 10);
1219   eventBase.loop();
1220
1221   // loop() should loop until the CountedLoopCallback stops
1222   // re-installing itself.
1223   ASSERT_EQ(c.getCount(), 0);
1224 }
1225
1226 // Test runInLoop() calls with terminateLoopSoon()
1227 TEST(EventBaseTest, RunInLoopStopLoop) {
1228   EventBase eventBase;
1229
1230   CountedLoopCallback c1(&eventBase, 20);
1231   CountedLoopCallback c2(&eventBase, 10,
1232                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1233
1234   eventBase.runInLoop(&c1);
1235   eventBase.runInLoop(&c2);
1236   ASSERT_EQ(c1.getCount(), 20);
1237   ASSERT_EQ(c2.getCount(), 10);
1238
1239   eventBase.loopForever();
1240
1241   // c2 should have stopped the loop after 10 iterations
1242   ASSERT_EQ(c2.getCount(), 0);
1243
1244   // We allow the EventBase to run the loop callbacks in whatever order it
1245   // chooses.  We'll accept c1's count being either 10 (if the loop terminated
1246   // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1247   // before c1 ran).
1248   //
1249   // (With the current code, c1 will always run 10 times, but we don't consider
1250   // this a hard API requirement.)
1251   ASSERT_GE(c1.getCount(), 10);
1252   ASSERT_LE(c1.getCount(), 11);
1253 }
1254
1255 // Test cancelling runInLoop() callbacks
1256 TEST(EventBaseTest, CancelRunInLoop) {
1257   EventBase eventBase;
1258
1259   CountedLoopCallback c1(&eventBase, 20);
1260   CountedLoopCallback c2(&eventBase, 20);
1261   CountedLoopCallback c3(&eventBase, 20);
1262
1263   std::function<void()> cancelC1Action =
1264     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1265   std::function<void()> cancelC2Action =
1266     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1267
1268   CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1269   CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1270
1271   // Install cancelC1 after c1
1272   eventBase.runInLoop(&c1);
1273   eventBase.runInLoop(&cancelC1);
1274
1275   // Install cancelC2 before c2
1276   eventBase.runInLoop(&cancelC2);
1277   eventBase.runInLoop(&c2);
1278
1279   // Install c3
1280   eventBase.runInLoop(&c3);
1281
1282   ASSERT_EQ(c1.getCount(), 20);
1283   ASSERT_EQ(c2.getCount(), 20);
1284   ASSERT_EQ(c3.getCount(), 20);
1285   ASSERT_EQ(cancelC1.getCount(), 10);
1286   ASSERT_EQ(cancelC2.getCount(), 10);
1287
1288   // Run the loop
1289   eventBase.loop();
1290
1291   // cancelC1 and cancelC3 should have both fired after 10 iterations and
1292   // stopped re-installing themselves
1293   ASSERT_EQ(cancelC1.getCount(), 0);
1294   ASSERT_EQ(cancelC2.getCount(), 0);
1295   // c3 should have continued on for the full 20 iterations
1296   ASSERT_EQ(c3.getCount(), 0);
1297
1298   // c1 and c2 should have both been cancelled on the 10th iteration.
1299   //
1300   // Callbacks are always run in the order they are installed,
1301   // so c1 should have fired 10 times, and been canceled after it ran on the
1302   // 10th iteration.  c2 should have only fired 9 times, because cancelC2 will
1303   // have run before it on the 10th iteration, and cancelled it before it
1304   // fired.
1305   ASSERT_EQ(c1.getCount(), 10);
1306   ASSERT_EQ(c2.getCount(), 11);
1307 }
1308
1309 class TerminateTestCallback : public EventBase::LoopCallback,
1310                               public EventHandler {
1311  public:
1312   TerminateTestCallback(EventBase* eventBase, int fd)
1313     : EventHandler(eventBase, fd),
1314       eventBase_(eventBase),
1315       loopInvocations_(0),
1316       maxLoopInvocations_(0),
1317       eventInvocations_(0),
1318       maxEventInvocations_(0) {}
1319
1320   void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1321     loopInvocations_ = 0;
1322     maxLoopInvocations_ = maxLoopInvocations;
1323     eventInvocations_ = 0;
1324     maxEventInvocations_ = maxEventInvocations;
1325
1326     cancelLoopCallback();
1327     unregisterHandler();
1328   }
1329
1330   virtual void handlerReady(uint16_t events) noexcept {
1331     // We didn't register with PERSIST, so we will have been automatically
1332     // unregistered already.
1333     ASSERT_FALSE(isHandlerRegistered());
1334
1335     ++eventInvocations_;
1336     if (eventInvocations_ >= maxEventInvocations_) {
1337       return;
1338     }
1339
1340     eventBase_->runInLoop(this);
1341   }
1342   virtual void runLoopCallback() noexcept {
1343     ++loopInvocations_;
1344     if (loopInvocations_ >= maxLoopInvocations_) {
1345       return;
1346     }
1347
1348     registerHandler(READ);
1349   }
1350
1351   uint32_t getLoopInvocations() const {
1352     return loopInvocations_;
1353   }
1354   uint32_t getEventInvocations() const {
1355     return eventInvocations_;
1356   }
1357
1358  private:
1359   EventBase* eventBase_;
1360   uint32_t loopInvocations_;
1361   uint32_t maxLoopInvocations_;
1362   uint32_t eventInvocations_;
1363   uint32_t maxEventInvocations_;
1364 };
1365
1366 /**
1367  * Test that EventBase::loop() correctly detects when there are no more events
1368  * left to run.
1369  *
1370  * This uses a single callback, which alternates registering itself as a loop
1371  * callback versus a EventHandler callback.  This exercises a regression where
1372  * EventBase::loop() incorrectly exited if there were no more fd handlers
1373  * registered, but a loop callback installed a new fd handler.
1374  */
1375 TEST(EventBaseTest, LoopTermination) {
1376   EventBase eventBase;
1377
1378   // Open a pipe and close the write end,
1379   // so the read endpoint will be readable
1380   int pipeFds[2];
1381   int rc = pipe(pipeFds);
1382   ASSERT_EQ(rc, 0);
1383   close(pipeFds[1]);
1384   TerminateTestCallback callback(&eventBase, pipeFds[0]);
1385
1386   // Test once where the callback will exit after a loop callback
1387   callback.reset(10, 100);
1388   eventBase.runInLoop(&callback);
1389   eventBase.loop();
1390   ASSERT_EQ(callback.getLoopInvocations(), 10);
1391   ASSERT_EQ(callback.getEventInvocations(), 9);
1392
1393   // Test once where the callback will exit after an fd event callback
1394   callback.reset(100, 7);
1395   eventBase.runInLoop(&callback);
1396   eventBase.loop();
1397   ASSERT_EQ(callback.getLoopInvocations(), 7);
1398   ASSERT_EQ(callback.getEventInvocations(), 7);
1399
1400   close(pipeFds[0]);
1401 }
1402
1403 ///////////////////////////////////////////////////////////////////////////
1404 // Tests for latency calculations
1405 ///////////////////////////////////////////////////////////////////////////
1406
1407 class IdleTimeTimeoutSeries : public TAsyncTimeout {
1408
1409  public:
1410
1411   explicit IdleTimeTimeoutSeries(EventBase *base,
1412                                  std::deque<std::uint64_t>& timeout) :
1413     TAsyncTimeout(base),
1414     timeouts_(0),
1415     timeout_(timeout) {
1416       scheduleTimeout(1);
1417     }
1418
1419   virtual ~IdleTimeTimeoutSeries() {}
1420
1421   void timeoutExpired() noexcept {
1422     ++timeouts_;
1423
1424     if(timeout_.empty()){
1425       cancelTimeout();
1426     } else {
1427       uint64_t sleepTime = timeout_.front();
1428       timeout_.pop_front();
1429       if (sleepTime) {
1430         usleep(sleepTime);
1431       }
1432       scheduleTimeout(1);
1433     }
1434   }
1435
1436   int getTimeouts() const {
1437     return timeouts_;
1438   }
1439
1440  private:
1441   int timeouts_;
1442   std::deque<uint64_t>& timeout_;
1443 };
1444
1445 /**
1446  * Verify that idle time is correctly accounted for when decaying our loop
1447  * time.
1448  *
1449  * This works by creating a high loop time (via usleep), expecting a latency
1450  * callback with known value, and then scheduling a timeout for later. This
1451  * later timeout is far enough in the future that the idle time should have
1452  * caused the loop time to decay.
1453  */
1454 TEST(EventBaseTest, IdleTime) {
1455   EventBase eventBase;
1456   eventBase.setLoadAvgMsec(1000);
1457   eventBase.resetLoadAvg(5900.0);
1458   std::deque<uint64_t> timeouts0(4, 8080);
1459   timeouts0.push_front(8000);
1460   timeouts0.push_back(14000);
1461   IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1462   std::deque<uint64_t> timeouts(20, 20);
1463   std::unique_ptr<IdleTimeTimeoutSeries> tos;
1464
1465   int latencyCallbacks = 0;
1466   eventBase.setMaxLatency(6000, [&]() {
1467     ++latencyCallbacks;
1468
1469     switch (latencyCallbacks) {
1470     case 1:
1471       ASSERT_EQ(6, tos0.getTimeouts());
1472       ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1473       ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1474       tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1475       break;
1476
1477     default:
1478       FAIL() << "Unexpected latency callback";
1479       break;
1480     }
1481   });
1482
1483   // Kick things off with an "immedite" timeout
1484   tos0.scheduleTimeout(1);
1485
1486   eventBase.loop();
1487
1488   ASSERT_EQ(1, latencyCallbacks);
1489   ASSERT_EQ(7, tos0.getTimeouts());
1490   ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1491   ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1492   ASSERT_EQ(21, tos->getTimeouts());
1493 }
1494
1495 /**
1496  * Test that thisLoop functionality works with terminateLoopSoon
1497  */
1498 TEST(EventBaseTest, ThisLoop) {
1499   EventBase eb;
1500   bool runInLoop = false;
1501   bool runThisLoop = false;
1502
1503   eb.runInLoop([&](){
1504       eb.terminateLoopSoon();
1505       eb.runInLoop([&]() {
1506           runInLoop = true;
1507         });
1508       eb.runInLoop([&]() {
1509           runThisLoop = true;
1510         }, true);
1511     }, true);
1512   eb.loopForever();
1513
1514   // Should not work
1515   ASSERT_FALSE(runInLoop);
1516   // Should work with thisLoop
1517   ASSERT_TRUE(runThisLoop);
1518 }
1519
1520 TEST(EventBaseTest, EventBaseThreadLoop) {
1521   EventBase base;
1522   bool ran = false;
1523
1524   base.runInEventBaseThread([&](){
1525     ran = true;
1526   });
1527   base.loop();
1528
1529   ASSERT_EQ(true, ran);
1530 }
1531
1532 TEST(EventBaseTest, EventBaseThreadName) {
1533   EventBase base;
1534   base.setName("foo");
1535   base.loop();
1536
1537 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1538   char name[16];
1539   pthread_getname_np(pthread_self(), name, 16);
1540   ASSERT_EQ(0, strcmp("foo", name));
1541 #endif
1542 }
1543
1544 TEST(TEventBaseTest, RunBeforeLoop) {
1545   TEventBase base;
1546   CountedLoopCallback cb(&base, 1, [&](){
1547     base.terminateLoopSoon();
1548   });
1549   base.runBeforeLoop(&cb);
1550   base.loopForever();
1551   ASSERT_EQUAL(cb.getCount(), 0);
1552 }
1553
1554 TEST(TEventBaseTest, RunBeforeLoopWait) {
1555   TEventBase base;
1556   CountedLoopCallback cb(&base, 1);
1557   base.runAfterDelay([&](){
1558       base.terminateLoopSoon();
1559     }, 500);
1560   base.runBeforeLoop(&cb);
1561   base.loopForever();
1562
1563   // Check that we only ran once, and did not loop multiple times.
1564   ASSERT_EQUAL(cb.getCount(), 0);
1565 }