Run the HHWheelTimer and EventBase tests
[folly.git] / folly / io / async / test / EventBaseTest.cpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 #include <folly/io/async/AsyncTimeout.h>
20 #include <folly/io/async/EventBase.h>
21 #include <folly/io/async/EventHandler.h>
22 #include <folly/io/async/test/SocketPair.h>
23 #include <folly/io/async/test/Util.h>
24
25 #include <iostream>
26 #include <unistd.h>
27 #include <memory>
28 #include <thread>
29
30 using std::deque;
31 using std::pair;
32 using std::vector;
33 using std::make_pair;
34 using std::cerr;
35 using std::endl;
36 using std::chrono::milliseconds;
37
38 using namespace folly;
39
40 ///////////////////////////////////////////////////////////////////////////
41 // Tests for read and write events
42 ///////////////////////////////////////////////////////////////////////////
43
44 enum { BUF_SIZE = 4096 };
45
46 ssize_t writeToFD(int fd, size_t length) {
47   // write an arbitrary amount of data to the fd
48   char buf[length];
49   memset(buf, 'a', sizeof(buf));
50   ssize_t rc = write(fd, buf, sizeof(buf));
51   CHECK_EQ(rc, length);
52   return rc;
53 }
54
55 size_t writeUntilFull(int fd) {
56   // Write to the fd until EAGAIN is returned
57   size_t bytesWritten = 0;
58   char buf[BUF_SIZE];
59   memset(buf, 'a', sizeof(buf));
60   while (true) {
61     ssize_t rc = write(fd, buf, sizeof(buf));
62     if (rc < 0) {
63       CHECK_EQ(errno, EAGAIN);
64       break;
65     } else {
66       bytesWritten += rc;
67     }
68   }
69   return bytesWritten;
70 }
71
72 ssize_t readFromFD(int fd, size_t length) {
73   // write an arbitrary amount of data to the fd
74   char buf[length];
75   return read(fd, buf, sizeof(buf));
76 }
77
78 size_t readUntilEmpty(int fd) {
79   // Read from the fd until EAGAIN is returned
80   char buf[BUF_SIZE];
81   size_t bytesRead = 0;
82   while (true) {
83     int rc = read(fd, buf, sizeof(buf));
84     if (rc == 0) {
85       CHECK(false) << "unexpected EOF";
86     } else if (rc < 0) {
87       CHECK_EQ(errno, EAGAIN);
88       break;
89     } else {
90       bytesRead += rc;
91     }
92   }
93   return bytesRead;
94 }
95
96 void checkReadUntilEmpty(int fd, size_t expectedLength) {
97   ASSERT_EQ(readUntilEmpty(fd), expectedLength);
98 }
99
100 struct ScheduledEvent {
101   int milliseconds;
102   uint16_t events;
103   size_t length;
104   ssize_t result;
105
106   void perform(int fd) {
107     if (events & EventHandler::READ) {
108       if (length == 0) {
109         result = readUntilEmpty(fd);
110       } else {
111         result = readFromFD(fd, length);
112       }
113     }
114     if (events & EventHandler::WRITE) {
115       if (length == 0) {
116         result = writeUntilFull(fd);
117       } else {
118         result = writeToFD(fd, length);
119       }
120     }
121   }
122 };
123
124 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
125   for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
126     eventBase->runAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
127                              ev->milliseconds);
128   }
129 }
130
131 class TestHandler : public EventHandler {
132  public:
133   TestHandler(EventBase* eventBase, int fd)
134     : EventHandler(eventBase, fd), fd_(fd) {}
135
136   virtual void handlerReady(uint16_t events) noexcept {
137     ssize_t bytesRead = 0;
138     ssize_t bytesWritten = 0;
139     if (events & READ) {
140       // Read all available data, so EventBase will stop calling us
141       // until new data becomes available
142       bytesRead = readUntilEmpty(fd_);
143     }
144     if (events & WRITE) {
145       // Write until the pipe buffer is full, so EventBase will stop calling
146       // us until the other end has read some data
147       bytesWritten = writeUntilFull(fd_);
148     }
149
150     log.push_back(EventRecord(events, bytesRead, bytesWritten));
151   }
152
153   struct EventRecord {
154     EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
155       : events(events)
156       , timestamp()
157       , bytesRead(bytesRead)
158       , bytesWritten(bytesWritten) {}
159
160     uint16_t events;
161     TimePoint timestamp;
162     ssize_t bytesRead;
163     ssize_t bytesWritten;
164   };
165
166   deque<EventRecord> log;
167
168  private:
169   int fd_;
170 };
171
172 /**
173  * Test a READ event
174  */
175 TEST(EventBaseTest, ReadEvent) {
176   EventBase eb;
177   SocketPair sp;
178
179   // Register for read events
180   TestHandler handler(&eb, sp[0]);
181   handler.registerHandler(EventHandler::READ);
182
183   // Register timeouts to perform two write events
184   ScheduledEvent events[] = {
185     { 10, EventHandler::WRITE, 2345 },
186     { 160, EventHandler::WRITE, 99 },
187     { 0, 0, 0 },
188   };
189   scheduleEvents(&eb, sp[1], events);
190
191   // Loop
192   TimePoint start;
193   eb.loop();
194   TimePoint end;
195
196   // Since we didn't use the EventHandler::PERSIST flag, the handler should
197   // have received the first read, then unregistered itself.  Check that only
198   // the first chunk of data was received.
199   ASSERT_EQ(handler.log.size(), 1);
200   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
201   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
202                   milliseconds(events[0].milliseconds), milliseconds(90));
203   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
204   ASSERT_EQ(handler.log[0].bytesWritten, 0);
205   T_CHECK_TIMEOUT(start, end,
206                   milliseconds(events[1].milliseconds), milliseconds(30));
207
208   // Make sure the second chunk of data is still waiting to be read.
209   size_t bytesRemaining = readUntilEmpty(sp[0]);
210   ASSERT_EQ(bytesRemaining, events[1].length);
211 }
212
213 /**
214  * Test (READ | PERSIST)
215  */
216 TEST(EventBaseTest, ReadPersist) {
217   EventBase eb;
218   SocketPair sp;
219
220   // Register for read events
221   TestHandler handler(&eb, sp[0]);
222   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
223
224   // Register several timeouts to perform writes
225   ScheduledEvent events[] = {
226     { 10,  EventHandler::WRITE, 1024 },
227     { 20,  EventHandler::WRITE, 2211 },
228     { 30,  EventHandler::WRITE, 4096 },
229     { 100, EventHandler::WRITE, 100 },
230     { 0, 0 },
231   };
232   scheduleEvents(&eb, sp[1], events);
233
234   // Schedule a timeout to unregister the handler after the third write
235   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
236
237   // Loop
238   TimePoint start;
239   eb.loop();
240   TimePoint end;
241
242   // The handler should have received the first 3 events,
243   // then been unregistered after that.
244   ASSERT_EQ(handler.log.size(), 3);
245   for (int n = 0; n < 3; ++n) {
246     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
247     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
248                     milliseconds(events[n].milliseconds));
249     ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
250     ASSERT_EQ(handler.log[n].bytesWritten, 0);
251   }
252   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
253
254   // Make sure the data from the last write is still waiting to be read
255   size_t bytesRemaining = readUntilEmpty(sp[0]);
256   ASSERT_EQ(bytesRemaining, events[3].length);
257 }
258
259 /**
260  * Test registering for READ when the socket is immediately readable
261  */
262 TEST(EventBaseTest, ReadImmediate) {
263   EventBase eb;
264   SocketPair sp;
265
266   // Write some data to the socket so the other end will
267   // be immediately readable
268   size_t dataLength = 1234;
269   writeToFD(sp[1], dataLength);
270
271   // Register for read events
272   TestHandler handler(&eb, sp[0]);
273   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
274
275   // Register a timeout to perform another write
276   ScheduledEvent events[] = {
277     { 10, EventHandler::WRITE, 2345 },
278     { 0, 0, 0 },
279   };
280   scheduleEvents(&eb, sp[1], events);
281
282   // Schedule a timeout to unregister the handler
283   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
284
285   // Loop
286   TimePoint start;
287   eb.loop();
288   TimePoint end;
289
290   ASSERT_EQ(handler.log.size(), 2);
291
292   // There should have been 1 event for immediate readability
293   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
294   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
295   ASSERT_EQ(handler.log[0].bytesRead, dataLength);
296   ASSERT_EQ(handler.log[0].bytesWritten, 0);
297
298   // There should be another event after the timeout wrote more data
299   ASSERT_EQ(handler.log[1].events, EventHandler::READ);
300   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
301                   milliseconds(events[0].milliseconds));
302   ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
303   ASSERT_EQ(handler.log[1].bytesWritten, 0);
304
305   T_CHECK_TIMEOUT(start, end, milliseconds(20));
306 }
307
308 /**
309  * Test a WRITE event
310  */
311 TEST(EventBaseTest, WriteEvent) {
312   EventBase eb;
313   SocketPair sp;
314
315   // Fill up the write buffer before starting
316   size_t initialBytesWritten = writeUntilFull(sp[0]);
317
318   // Register for write events
319   TestHandler handler(&eb, sp[0]);
320   handler.registerHandler(EventHandler::WRITE);
321
322   // Register timeouts to perform two reads
323   ScheduledEvent events[] = {
324     { 10, EventHandler::READ, 0 },
325     { 60, EventHandler::READ, 0 },
326     { 0, 0, 0 },
327   };
328   scheduleEvents(&eb, sp[1], events);
329
330   // Loop
331   TimePoint start;
332   eb.loop();
333   TimePoint end;
334
335   // Since we didn't use the EventHandler::PERSIST flag, the handler should
336   // have only been able to write once, then unregistered itself.
337   ASSERT_EQ(handler.log.size(), 1);
338   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
339   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
340                   milliseconds(events[0].milliseconds));
341   ASSERT_EQ(handler.log[0].bytesRead, 0);
342   ASSERT_GT(handler.log[0].bytesWritten, 0);
343   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
344
345   ASSERT_EQ(events[0].result, initialBytesWritten);
346   ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
347 }
348
349 /**
350  * Test (WRITE | PERSIST)
351  */
352 TEST(EventBaseTest, WritePersist) {
353   EventBase eb;
354   SocketPair sp;
355
356   // Fill up the write buffer before starting
357   size_t initialBytesWritten = writeUntilFull(sp[0]);
358
359   // Register for write events
360   TestHandler handler(&eb, sp[0]);
361   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
362
363   // Register several timeouts to read from the socket at several intervals
364   ScheduledEvent events[] = {
365     { 10,  EventHandler::READ, 0 },
366     { 40,  EventHandler::READ, 0 },
367     { 70,  EventHandler::READ, 0 },
368     { 100, EventHandler::READ, 0 },
369     { 0, 0 },
370   };
371   scheduleEvents(&eb, sp[1], events);
372
373   // Schedule a timeout to unregister the handler after the third read
374   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
375
376   // Loop
377   TimePoint start;
378   eb.loop();
379   TimePoint end;
380
381   // The handler should have received the first 3 events,
382   // then been unregistered after that.
383   ASSERT_EQ(handler.log.size(), 3);
384   ASSERT_EQ(events[0].result, initialBytesWritten);
385   for (int n = 0; n < 3; ++n) {
386     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
387     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
388                     milliseconds(events[n].milliseconds));
389     ASSERT_EQ(handler.log[n].bytesRead, 0);
390     ASSERT_GT(handler.log[n].bytesWritten, 0);
391     ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
392   }
393   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
394 }
395
396 /**
397  * Test registering for WRITE when the socket is immediately writable
398  */
399 TEST(EventBaseTest, WriteImmediate) {
400   EventBase eb;
401   SocketPair sp;
402
403   // Register for write events
404   TestHandler handler(&eb, sp[0]);
405   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
406
407   // Register a timeout to perform a read
408   ScheduledEvent events[] = {
409     { 10, EventHandler::READ, 0 },
410     { 0, 0, 0 },
411   };
412   scheduleEvents(&eb, sp[1], events);
413
414   // Schedule a timeout to unregister the handler
415   int64_t unregisterTimeout = 40;
416   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
417                    unregisterTimeout);
418
419   // Loop
420   TimePoint start;
421   eb.loop();
422   TimePoint end;
423
424   ASSERT_EQ(handler.log.size(), 2);
425
426   // Since the socket buffer was initially empty,
427   // there should have been 1 event for immediate writability
428   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
429   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
430   ASSERT_EQ(handler.log[0].bytesRead, 0);
431   ASSERT_GT(handler.log[0].bytesWritten, 0);
432
433   // There should be another event after the timeout wrote more data
434   ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
435   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
436                   milliseconds(events[0].milliseconds));
437   ASSERT_EQ(handler.log[1].bytesRead, 0);
438   ASSERT_GT(handler.log[1].bytesWritten, 0);
439
440   T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
441 }
442
443 /**
444  * Test (READ | WRITE) when the socket becomes readable first
445  */
446 TEST(EventBaseTest, ReadWrite) {
447   EventBase eb;
448   SocketPair sp;
449
450   // Fill up the write buffer before starting
451   size_t sock0WriteLength = writeUntilFull(sp[0]);
452
453   // Register for read and write events
454   TestHandler handler(&eb, sp[0]);
455   handler.registerHandler(EventHandler::READ_WRITE);
456
457   // Register timeouts to perform a write then a read.
458   ScheduledEvent events[] = {
459     { 10, EventHandler::WRITE, 2345 },
460     { 40, EventHandler::READ, 0 },
461     { 0, 0, 0 },
462   };
463   scheduleEvents(&eb, sp[1], events);
464
465   // Loop
466   TimePoint start;
467   eb.loop();
468   TimePoint end;
469
470   // Since we didn't use the EventHandler::PERSIST flag, the handler should
471   // have only noticed readability, then unregistered itself.  Check that only
472   // one event was logged.
473   ASSERT_EQ(handler.log.size(), 1);
474   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
475   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
476                   milliseconds(events[0].milliseconds));
477   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
478   ASSERT_EQ(handler.log[0].bytesWritten, 0);
479   ASSERT_EQ(events[1].result, sock0WriteLength);
480   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
481 }
482
483 /**
484  * Test (READ | WRITE) when the socket becomes writable first
485  */
486 TEST(EventBaseTest, WriteRead) {
487   EventBase eb;
488   SocketPair sp;
489
490   // Fill up the write buffer before starting
491   size_t sock0WriteLength = writeUntilFull(sp[0]);
492
493   // Register for read and write events
494   TestHandler handler(&eb, sp[0]);
495   handler.registerHandler(EventHandler::READ_WRITE);
496
497   // Register timeouts to perform a read then a write.
498   size_t sock1WriteLength = 2345;
499   ScheduledEvent events[] = {
500     { 10, EventHandler::READ, 0 },
501     { 40, EventHandler::WRITE, sock1WriteLength },
502     { 0, 0, 0 },
503   };
504   scheduleEvents(&eb, sp[1], events);
505
506   // Loop
507   TimePoint start;
508   eb.loop();
509   TimePoint end;
510
511   // Since we didn't use the EventHandler::PERSIST flag, the handler should
512   // have only noticed writability, then unregistered itself.  Check that only
513   // one event was logged.
514   ASSERT_EQ(handler.log.size(), 1);
515   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
516   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
517                   milliseconds(events[0].milliseconds));
518   ASSERT_EQ(handler.log[0].bytesRead, 0);
519   ASSERT_GT(handler.log[0].bytesWritten, 0);
520   ASSERT_EQ(events[0].result, sock0WriteLength);
521   ASSERT_EQ(events[1].result, sock1WriteLength);
522   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
523
524   // Make sure the written data is still waiting to be read.
525   size_t bytesRemaining = readUntilEmpty(sp[0]);
526   ASSERT_EQ(bytesRemaining, events[1].length);
527 }
528
529 /**
530  * Test (READ | WRITE) when the socket becomes readable and writable
531  * at the same time.
532  */
533 TEST(EventBaseTest, ReadWriteSimultaneous) {
534   EventBase eb;
535   SocketPair sp;
536
537   // Fill up the write buffer before starting
538   size_t sock0WriteLength = writeUntilFull(sp[0]);
539
540   // Register for read and write events
541   TestHandler handler(&eb, sp[0]);
542   handler.registerHandler(EventHandler::READ_WRITE);
543
544   // Register a timeout to perform a read and write together
545   ScheduledEvent events[] = {
546     { 10, EventHandler::READ | EventHandler::WRITE, 0 },
547     { 0, 0, 0 },
548   };
549   scheduleEvents(&eb, sp[1], events);
550
551   // Loop
552   TimePoint start;
553   eb.loop();
554   TimePoint end;
555
556   // It's not strictly required that the EventBase register us about both
557   // events in the same call.  So, it's possible that if the EventBase
558   // implementation changes this test could start failing, and it wouldn't be
559   // considered breaking the API.  However for now it's nice to exercise this
560   // code path.
561   ASSERT_EQ(handler.log.size(), 1);
562   ASSERT_EQ(handler.log[0].events,
563                     EventHandler::READ | EventHandler::WRITE);
564   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
565                   milliseconds(events[0].milliseconds));
566   ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
567   ASSERT_GT(handler.log[0].bytesWritten, 0);
568   T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
569 }
570
571 /**
572  * Test (READ | WRITE | PERSIST)
573  */
574 TEST(EventBaseTest, ReadWritePersist) {
575   EventBase eb;
576   SocketPair sp;
577
578   // Register for read and write events
579   TestHandler handler(&eb, sp[0]);
580   handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
581                           EventHandler::PERSIST);
582
583   // Register timeouts to perform several reads and writes
584   ScheduledEvent events[] = {
585     { 10, EventHandler::WRITE, 2345 },
586     { 20, EventHandler::READ, 0 },
587     { 35, EventHandler::WRITE, 200 },
588     { 45, EventHandler::WRITE, 15 },
589     { 55, EventHandler::READ, 0 },
590     { 120, EventHandler::WRITE, 2345 },
591     { 0, 0, 0 },
592   };
593   scheduleEvents(&eb, sp[1], events);
594
595   // Schedule a timeout to unregister the handler
596   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
597
598   // Loop
599   TimePoint start;
600   eb.loop();
601   TimePoint end;
602
603   ASSERT_EQ(handler.log.size(), 6);
604
605   // Since we didn't fill up the write buffer immediately, there should
606   // be an immediate event for writability.
607   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
608   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
609   ASSERT_EQ(handler.log[0].bytesRead, 0);
610   ASSERT_GT(handler.log[0].bytesWritten, 0);
611
612   // Events 1 through 5 should correspond to the scheduled events
613   for (int n = 1; n < 6; ++n) {
614     ScheduledEvent* event = &events[n - 1];
615     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
616                     milliseconds(event->milliseconds));
617     if (event->events == EventHandler::READ) {
618       ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
619       ASSERT_EQ(handler.log[n].bytesRead, 0);
620       ASSERT_GT(handler.log[n].bytesWritten, 0);
621     } else {
622       ASSERT_EQ(handler.log[n].events, EventHandler::READ);
623       ASSERT_EQ(handler.log[n].bytesRead, event->length);
624       ASSERT_EQ(handler.log[n].bytesWritten, 0);
625     }
626   }
627
628   // The timeout should have unregistered the handler before the last write.
629   // Make sure that data is still waiting to be read
630   size_t bytesRemaining = readUntilEmpty(sp[0]);
631   ASSERT_EQ(bytesRemaining, events[5].length);
632 }
633
634
635 class PartialReadHandler : public TestHandler {
636  public:
637   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
638     : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
639
640   virtual void handlerReady(uint16_t events) noexcept {
641     assert(events == EventHandler::READ);
642     ssize_t bytesRead = readFromFD(fd_, readLength_);
643     log.push_back(EventRecord(events, bytesRead, 0));
644   }
645
646  private:
647   int fd_;
648   size_t readLength_;
649 };
650
651 /**
652  * Test reading only part of the available data when a read event is fired.
653  * When PERSIST is used, make sure the handler gets notified again the next
654  * time around the loop.
655  */
656 TEST(EventBaseTest, ReadPartial) {
657   EventBase eb;
658   SocketPair sp;
659
660   // Register for read events
661   size_t readLength = 100;
662   PartialReadHandler handler(&eb, sp[0], readLength);
663   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
664
665   // Register a timeout to perform a single write,
666   // with more data than PartialReadHandler will read at once
667   ScheduledEvent events[] = {
668     { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) },
669     { 0, 0, 0 },
670   };
671   scheduleEvents(&eb, sp[1], events);
672
673   // Schedule a timeout to unregister the handler
674   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
675
676   // Loop
677   TimePoint start;
678   eb.loop();
679   TimePoint end;
680
681   ASSERT_EQ(handler.log.size(), 4);
682
683   // The first 3 invocations should read readLength bytes each
684   for (int n = 0; n < 3; ++n) {
685     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
686     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
687                     milliseconds(events[0].milliseconds));
688     ASSERT_EQ(handler.log[n].bytesRead, readLength);
689     ASSERT_EQ(handler.log[n].bytesWritten, 0);
690   }
691   // The last read only has readLength/2 bytes
692   ASSERT_EQ(handler.log[3].events, EventHandler::READ);
693   T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
694                   milliseconds(events[0].milliseconds));
695   ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
696   ASSERT_EQ(handler.log[3].bytesWritten, 0);
697 }
698
699
700 class PartialWriteHandler : public TestHandler {
701  public:
702   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
703     : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
704
705   virtual void handlerReady(uint16_t events) noexcept {
706     assert(events == EventHandler::WRITE);
707     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
708     log.push_back(EventRecord(events, 0, bytesWritten));
709   }
710
711  private:
712   int fd_;
713   size_t writeLength_;
714 };
715
716 /**
717  * Test writing without completely filling up the write buffer when the fd
718  * becomes writable.  When PERSIST is used, make sure the handler gets
719  * notified again the next time around the loop.
720  */
721 TEST(EventBaseTest, WritePartial) {
722   EventBase eb;
723   SocketPair sp;
724
725   // Fill up the write buffer before starting
726   size_t initialBytesWritten = writeUntilFull(sp[0]);
727
728   // Register for write events
729   size_t writeLength = 100;
730   PartialWriteHandler handler(&eb, sp[0], writeLength);
731   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
732
733   // Register a timeout to read, so that more data can be written
734   ScheduledEvent events[] = {
735     { 10, EventHandler::READ, 0 },
736     { 0, 0, 0 },
737   };
738   scheduleEvents(&eb, sp[1], events);
739
740   // Schedule a timeout to unregister the handler
741   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
742
743   // Loop
744   TimePoint start;
745   eb.loop();
746   TimePoint end;
747
748   // Depending on how big the socket buffer is, there will be multiple writes
749   // Only check the first 5
750   int numChecked = 5;
751   ASSERT_GE(handler.log.size(), numChecked);
752   ASSERT_EQ(events[0].result, initialBytesWritten);
753
754   // The first 3 invocations should read writeLength bytes each
755   for (int n = 0; n < numChecked; ++n) {
756     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
757     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
758                     milliseconds(events[0].milliseconds));
759     ASSERT_EQ(handler.log[n].bytesRead, 0);
760     ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
761   }
762 }
763
764
765 /**
766  * Test destroying a registered EventHandler
767  */
768 TEST(EventBaseTest, DestroyHandler) {
769   class DestroyHandler : public AsyncTimeout {
770    public:
771     DestroyHandler(EventBase* eb, EventHandler* h)
772       : AsyncTimeout(eb)
773       , handler_(h) {}
774
775     virtual void timeoutExpired() noexcept {
776       delete handler_;
777     }
778
779    private:
780     EventHandler* handler_;
781   };
782
783   EventBase eb;
784   SocketPair sp;
785
786   // Fill up the write buffer before starting
787   size_t initialBytesWritten = writeUntilFull(sp[0]);
788
789   // Register for write events
790   TestHandler* handler = new TestHandler(&eb, sp[0]);
791   handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
792
793   // After 10ms, read some data, so that the handler
794   // will be notified that it can write.
795   eb.runAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
796                    10);
797
798   // Start a timer to destroy the handler after 25ms
799   // This mainly just makes sure the code doesn't break or assert
800   DestroyHandler dh(&eb, handler);
801   dh.scheduleTimeout(25);
802
803   TimePoint start;
804   eb.loop();
805   TimePoint end;
806
807   // Make sure the EventHandler was uninstalled properly when it was
808   // destroyed, and the EventBase loop exited
809   T_CHECK_TIMEOUT(start, end, milliseconds(25));
810
811   // Make sure that the handler wrote data to the socket
812   // before it was destroyed
813   size_t bytesRemaining = readUntilEmpty(sp[1]);
814   ASSERT_GT(bytesRemaining, 0);
815 }
816
817
818 ///////////////////////////////////////////////////////////////////////////
819 // Tests for timeout events
820 ///////////////////////////////////////////////////////////////////////////
821
822 TEST(EventBaseTest, RunAfterDelay) {
823   EventBase eb;
824
825   TimePoint timestamp1(false);
826   TimePoint timestamp2(false);
827   TimePoint timestamp3(false);
828   eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
829   eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
830   eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
831
832   TimePoint start;
833   eb.loop();
834   TimePoint end;
835
836   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
837   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
838   T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
839   T_CHECK_TIMEOUT(start, end, milliseconds(40));
840 }
841
842 /**
843  * Test the behavior of runAfterDelay() when some timeouts are
844  * still scheduled when the EventBase is destroyed.
845  */
846 TEST(EventBaseTest, RunAfterDelayDestruction) {
847   TimePoint timestamp1(false);
848   TimePoint timestamp2(false);
849   TimePoint timestamp3(false);
850   TimePoint timestamp4(false);
851   TimePoint start(false);
852   TimePoint end(false);
853
854   {
855     EventBase eb;
856
857     // Run two normal timeouts
858     eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
859     eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
860
861     // Schedule a timeout to stop the event loop after 40ms
862     eb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
863
864     // Schedule 2 timeouts that would fire after the event loop stops
865     eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
866     eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
867
868     start.reset();
869     eb.loop();
870     end.reset();
871   }
872
873   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
874   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
875   T_CHECK_TIMEOUT(start, end, milliseconds(40));
876
877   ASSERT_TRUE(timestamp3.isUnset());
878   ASSERT_TRUE(timestamp4.isUnset());
879
880   // Ideally this test should be run under valgrind to ensure that no
881   // memory is leaked.
882 }
883
884 class TestTimeout : public AsyncTimeout {
885  public:
886   explicit TestTimeout(EventBase* eventBase)
887     : AsyncTimeout(eventBase)
888     , timestamp(false) {}
889
890   virtual void timeoutExpired() noexcept {
891     timestamp.reset();
892   }
893
894   TimePoint timestamp;
895 };
896
897 TEST(EventBaseTest, BasicTimeouts) {
898   EventBase eb;
899
900   TestTimeout t1(&eb);
901   TestTimeout t2(&eb);
902   TestTimeout t3(&eb);
903   t1.scheduleTimeout(10);
904   t2.scheduleTimeout(20);
905   t3.scheduleTimeout(40);
906
907   TimePoint start;
908   eb.loop();
909   TimePoint end;
910
911   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
912   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
913   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
914   T_CHECK_TIMEOUT(start, end, milliseconds(40));
915 }
916
917 class ReschedulingTimeout : public AsyncTimeout {
918  public:
919   ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
920     : AsyncTimeout(evb)
921     , timeouts_(timeouts)
922     , iterator_(timeouts_.begin()) {}
923
924   void start() {
925     reschedule();
926   }
927
928   virtual void timeoutExpired() noexcept {
929     timestamps.push_back(TimePoint());
930     reschedule();
931   }
932
933   void reschedule() {
934     if (iterator_ != timeouts_.end()) {
935       uint32_t timeout = *iterator_;
936       ++iterator_;
937       scheduleTimeout(timeout);
938     }
939   }
940
941   vector<TimePoint> timestamps;
942
943  private:
944   vector<uint32_t> timeouts_;
945   vector<uint32_t>::const_iterator iterator_;
946 };
947
948 /**
949  * Test rescheduling the same timeout multiple times
950  */
951 TEST(EventBaseTest, ReuseTimeout) {
952   EventBase eb;
953
954   vector<uint32_t> timeouts;
955   timeouts.push_back(10);
956   timeouts.push_back(30);
957   timeouts.push_back(15);
958
959   ReschedulingTimeout t(&eb, timeouts);
960   t.start();
961
962   TimePoint start;
963   eb.loop();
964   TimePoint end;
965
966   // Use a higher tolerance than usual.  We're waiting on 3 timeouts
967   // consecutively.  In general, each timeout may go over by a few
968   // milliseconds, and we're tripling this error by witing on 3 timeouts.
969   milliseconds tolerance{6};
970
971   ASSERT_EQ(timeouts.size(), t.timestamps.size());
972   uint32_t total = 0;
973   for (size_t n = 0; n < timeouts.size(); ++n) {
974     total += timeouts[n];
975     T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
976   }
977   T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
978 }
979
980 /**
981  * Test rescheduling a timeout before it has fired
982  */
983 TEST(EventBaseTest, RescheduleTimeout) {
984   EventBase eb;
985
986   TestTimeout t1(&eb);
987   TestTimeout t2(&eb);
988   TestTimeout t3(&eb);
989
990   t1.scheduleTimeout(15);
991   t2.scheduleTimeout(30);
992   t3.scheduleTimeout(30);
993
994   auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
995       &AsyncTimeout::scheduleTimeout);
996
997   // after 10ms, reschedule t2 to run sooner than originally scheduled
998   eb.runAfterDelay(std::bind(f, &t2, 10), 10);
999   // after 10ms, reschedule t3 to run later than originally scheduled
1000   eb.runAfterDelay(std::bind(f, &t3, 40), 10);
1001
1002   TimePoint start;
1003   eb.loop();
1004   TimePoint end;
1005
1006   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1007   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1008   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1009   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1010 }
1011
1012 /**
1013  * Test cancelling a timeout
1014  */
1015 TEST(EventBaseTest, CancelTimeout) {
1016   EventBase eb;
1017
1018   vector<uint32_t> timeouts;
1019   timeouts.push_back(10);
1020   timeouts.push_back(30);
1021   timeouts.push_back(25);
1022
1023   ReschedulingTimeout t(&eb, timeouts);
1024   t.start();
1025   eb.runAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1026
1027   TimePoint start;
1028   eb.loop();
1029   TimePoint end;
1030
1031   ASSERT_EQ(t.timestamps.size(), 2);
1032   T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1033   T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1034   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1035 }
1036
1037 /**
1038  * Test destroying a scheduled timeout object
1039  */
1040 TEST(EventBaseTest, DestroyTimeout) {
1041   class DestroyTimeout : public AsyncTimeout {
1042    public:
1043     DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1044       : AsyncTimeout(eb)
1045       , timeout_(t) {}
1046
1047     virtual void timeoutExpired() noexcept {
1048       delete timeout_;
1049     }
1050
1051    private:
1052     AsyncTimeout* timeout_;
1053   };
1054
1055   EventBase eb;
1056
1057   TestTimeout* t1 = new TestTimeout(&eb);
1058   t1->scheduleTimeout(30);
1059
1060   DestroyTimeout dt(&eb, t1);
1061   dt.scheduleTimeout(10);
1062
1063   TimePoint start;
1064   eb.loop();
1065   TimePoint end;
1066
1067   T_CHECK_TIMEOUT(start, end, milliseconds(10));
1068 }
1069
1070
1071 ///////////////////////////////////////////////////////////////////////////
1072 // Test for runInThreadTestFunc()
1073 ///////////////////////////////////////////////////////////////////////////
1074
1075 struct RunInThreadData {
1076   RunInThreadData(int numThreads, int opsPerThread)
1077     : opsPerThread(opsPerThread)
1078     , opsToGo(numThreads*opsPerThread) {}
1079
1080   EventBase evb;
1081   deque< pair<int, int> > values;
1082
1083   int opsPerThread;
1084   int opsToGo;
1085 };
1086
1087 struct RunInThreadArg {
1088   RunInThreadArg(RunInThreadData* data,
1089                  int threadId,
1090                  int value)
1091     : data(data)
1092     , thread(threadId)
1093     , value(value) {}
1094
1095   RunInThreadData* data;
1096   int thread;
1097   int value;
1098 };
1099
1100 void runInThreadTestFunc(RunInThreadArg* arg) {
1101   arg->data->values.push_back(make_pair(arg->thread, arg->value));
1102   RunInThreadData* data = arg->data;
1103   delete arg;
1104
1105   if(--data->opsToGo == 0) {
1106     // Break out of the event base loop if we are the last thread running
1107     data->evb.terminateLoopSoon();
1108   }
1109 }
1110
1111 TEST(EventBaseTest, RunInThread) {
1112   uint32_t numThreads = 50;
1113   uint32_t opsPerThread = 100;
1114   RunInThreadData data(numThreads, opsPerThread);
1115
1116   deque<std::thread> threads;
1117   for (uint32_t i = 0; i < numThreads; ++i) {
1118     threads.emplace_back([i, &data] {
1119         for (int n = 0; n < data.opsPerThread; ++n) {
1120           RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1121           data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1122           usleep(10);
1123         }
1124       });
1125   }
1126
1127   // Add a timeout event to run after 3 seconds.
1128   // Otherwise loop() will return immediately since there are no events to run.
1129   // Once the last thread exits, it will stop the loop().  However, this
1130   // timeout also stops the loop in case there is a bug performing the normal
1131   // stop.
1132   data.evb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1133                          3000);
1134
1135   TimePoint start;
1136   data.evb.loop();
1137   TimePoint end;
1138
1139   // Verify that the loop exited because all threads finished and requested it
1140   // to stop.  This should happen much sooner than the 3 second timeout.
1141   // Assert that it happens in under a second.  (This is still tons of extra
1142   // padding.)
1143
1144   auto timeTaken = std::chrono::duration_cast<milliseconds>(
1145     end.getTime() - start.getTime());
1146   ASSERT_LT(timeTaken.count(), 1000);
1147   VLOG(11) << "Time taken: " << timeTaken.count();
1148
1149   // Verify that we have all of the events from every thread
1150   int expectedValues[numThreads];
1151   for (uint32_t n = 0; n < numThreads; ++n) {
1152     expectedValues[n] = 0;
1153   }
1154   for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1155        it != data.values.end();
1156        ++it) {
1157     int threadID = it->first;
1158     int value = it->second;
1159     ASSERT_EQ(expectedValues[threadID], value);
1160     ++expectedValues[threadID];
1161   }
1162   for (uint32_t n = 0; n < numThreads; ++n) {
1163     ASSERT_EQ(expectedValues[n], opsPerThread);
1164   }
1165
1166   // Wait on all of the threads.
1167   for (auto& thread: threads) {
1168     thread.join();
1169   }
1170 }
1171
1172 ///////////////////////////////////////////////////////////////////////////
1173 // Tests for runInLoop()
1174 ///////////////////////////////////////////////////////////////////////////
1175
1176 class CountedLoopCallback : public EventBase::LoopCallback {
1177  public:
1178   CountedLoopCallback(EventBase* eventBase,
1179                       unsigned int count,
1180                       std::function<void()> action =
1181                         std::function<void()>())
1182     : eventBase_(eventBase)
1183     , count_(count)
1184     , action_(action) {}
1185
1186   virtual void runLoopCallback() noexcept {
1187     --count_;
1188     if (count_ > 0) {
1189       eventBase_->runInLoop(this);
1190     } else if (action_) {
1191       action_();
1192     }
1193   }
1194
1195   unsigned int getCount() const {
1196     return count_;
1197   }
1198
1199  private:
1200   EventBase* eventBase_;
1201   unsigned int count_;
1202   std::function<void()> action_;
1203 };
1204
1205 // Test that EventBase::loop() doesn't exit while there are
1206 // still LoopCallbacks remaining to be invoked.
1207 TEST(EventBaseTest, RepeatedRunInLoop) {
1208   EventBase eventBase;
1209
1210   CountedLoopCallback c(&eventBase, 10);
1211   eventBase.runInLoop(&c);
1212   // The callback shouldn't have run immediately
1213   ASSERT_EQ(c.getCount(), 10);
1214   eventBase.loop();
1215
1216   // loop() should loop until the CountedLoopCallback stops
1217   // re-installing itself.
1218   ASSERT_EQ(c.getCount(), 0);
1219 }
1220
1221 // Test runInLoop() calls with terminateLoopSoon()
1222 TEST(EventBaseTest, RunInLoopStopLoop) {
1223   EventBase eventBase;
1224
1225   CountedLoopCallback c1(&eventBase, 20);
1226   CountedLoopCallback c2(&eventBase, 10,
1227                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1228
1229   eventBase.runInLoop(&c1);
1230   eventBase.runInLoop(&c2);
1231   ASSERT_EQ(c1.getCount(), 20);
1232   ASSERT_EQ(c2.getCount(), 10);
1233
1234   eventBase.loopForever();
1235
1236   // c2 should have stopped the loop after 10 iterations
1237   ASSERT_EQ(c2.getCount(), 0);
1238
1239   // We allow the EventBase to run the loop callbacks in whatever order it
1240   // chooses.  We'll accept c1's count being either 10 (if the loop terminated
1241   // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1242   // before c1 ran).
1243   //
1244   // (With the current code, c1 will always run 10 times, but we don't consider
1245   // this a hard API requirement.)
1246   ASSERT_GE(c1.getCount(), 10);
1247   ASSERT_LE(c1.getCount(), 11);
1248 }
1249
1250 // Test cancelling runInLoop() callbacks
1251 TEST(EventBaseTest, CancelRunInLoop) {
1252   EventBase eventBase;
1253
1254   CountedLoopCallback c1(&eventBase, 20);
1255   CountedLoopCallback c2(&eventBase, 20);
1256   CountedLoopCallback c3(&eventBase, 20);
1257
1258   std::function<void()> cancelC1Action =
1259     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1260   std::function<void()> cancelC2Action =
1261     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1262
1263   CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1264   CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1265
1266   // Install cancelC1 after c1
1267   eventBase.runInLoop(&c1);
1268   eventBase.runInLoop(&cancelC1);
1269
1270   // Install cancelC2 before c2
1271   eventBase.runInLoop(&cancelC2);
1272   eventBase.runInLoop(&c2);
1273
1274   // Install c3
1275   eventBase.runInLoop(&c3);
1276
1277   ASSERT_EQ(c1.getCount(), 20);
1278   ASSERT_EQ(c2.getCount(), 20);
1279   ASSERT_EQ(c3.getCount(), 20);
1280   ASSERT_EQ(cancelC1.getCount(), 10);
1281   ASSERT_EQ(cancelC2.getCount(), 10);
1282
1283   // Run the loop
1284   eventBase.loop();
1285
1286   // cancelC1 and cancelC3 should have both fired after 10 iterations and
1287   // stopped re-installing themselves
1288   ASSERT_EQ(cancelC1.getCount(), 0);
1289   ASSERT_EQ(cancelC2.getCount(), 0);
1290   // c3 should have continued on for the full 20 iterations
1291   ASSERT_EQ(c3.getCount(), 0);
1292
1293   // c1 and c2 should have both been cancelled on the 10th iteration.
1294   //
1295   // Callbacks are always run in the order they are installed,
1296   // so c1 should have fired 10 times, and been canceled after it ran on the
1297   // 10th iteration.  c2 should have only fired 9 times, because cancelC2 will
1298   // have run before it on the 10th iteration, and cancelled it before it
1299   // fired.
1300   ASSERT_EQ(c1.getCount(), 10);
1301   ASSERT_EQ(c2.getCount(), 11);
1302 }
1303
1304 class TerminateTestCallback : public EventBase::LoopCallback,
1305                               public EventHandler {
1306  public:
1307   TerminateTestCallback(EventBase* eventBase, int fd)
1308     : EventHandler(eventBase, fd),
1309       eventBase_(eventBase),
1310       loopInvocations_(0),
1311       maxLoopInvocations_(0),
1312       eventInvocations_(0),
1313       maxEventInvocations_(0) {}
1314
1315   void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1316     loopInvocations_ = 0;
1317     maxLoopInvocations_ = maxLoopInvocations;
1318     eventInvocations_ = 0;
1319     maxEventInvocations_ = maxEventInvocations;
1320
1321     cancelLoopCallback();
1322     unregisterHandler();
1323   }
1324
1325   virtual void handlerReady(uint16_t events) noexcept {
1326     // We didn't register with PERSIST, so we will have been automatically
1327     // unregistered already.
1328     ASSERT_FALSE(isHandlerRegistered());
1329
1330     ++eventInvocations_;
1331     if (eventInvocations_ >= maxEventInvocations_) {
1332       return;
1333     }
1334
1335     eventBase_->runInLoop(this);
1336   }
1337   virtual void runLoopCallback() noexcept {
1338     ++loopInvocations_;
1339     if (loopInvocations_ >= maxLoopInvocations_) {
1340       return;
1341     }
1342
1343     registerHandler(READ);
1344   }
1345
1346   uint32_t getLoopInvocations() const {
1347     return loopInvocations_;
1348   }
1349   uint32_t getEventInvocations() const {
1350     return eventInvocations_;
1351   }
1352
1353  private:
1354   EventBase* eventBase_;
1355   uint32_t loopInvocations_;
1356   uint32_t maxLoopInvocations_;
1357   uint32_t eventInvocations_;
1358   uint32_t maxEventInvocations_;
1359 };
1360
1361 /**
1362  * Test that EventBase::loop() correctly detects when there are no more events
1363  * left to run.
1364  *
1365  * This uses a single callback, which alternates registering itself as a loop
1366  * callback versus a EventHandler callback.  This exercises a regression where
1367  * EventBase::loop() incorrectly exited if there were no more fd handlers
1368  * registered, but a loop callback installed a new fd handler.
1369  */
1370 TEST(EventBaseTest, LoopTermination) {
1371   EventBase eventBase;
1372
1373   // Open a pipe and close the write end,
1374   // so the read endpoint will be readable
1375   int pipeFds[2];
1376   int rc = pipe(pipeFds);
1377   ASSERT_EQ(rc, 0);
1378   close(pipeFds[1]);
1379   TerminateTestCallback callback(&eventBase, pipeFds[0]);
1380
1381   // Test once where the callback will exit after a loop callback
1382   callback.reset(10, 100);
1383   eventBase.runInLoop(&callback);
1384   eventBase.loop();
1385   ASSERT_EQ(callback.getLoopInvocations(), 10);
1386   ASSERT_EQ(callback.getEventInvocations(), 9);
1387
1388   // Test once where the callback will exit after an fd event callback
1389   callback.reset(100, 7);
1390   eventBase.runInLoop(&callback);
1391   eventBase.loop();
1392   ASSERT_EQ(callback.getLoopInvocations(), 7);
1393   ASSERT_EQ(callback.getEventInvocations(), 7);
1394
1395   close(pipeFds[0]);
1396 }
1397
1398 ///////////////////////////////////////////////////////////////////////////
1399 // Tests for latency calculations
1400 ///////////////////////////////////////////////////////////////////////////
1401
1402 class IdleTimeTimeoutSeries : public AsyncTimeout {
1403
1404  public:
1405
1406   explicit IdleTimeTimeoutSeries(EventBase *base,
1407                                  std::deque<std::uint64_t>& timeout) :
1408     AsyncTimeout(base),
1409     timeouts_(0),
1410     timeout_(timeout) {
1411       scheduleTimeout(1);
1412     }
1413
1414   virtual ~IdleTimeTimeoutSeries() {}
1415
1416   void timeoutExpired() noexcept {
1417     ++timeouts_;
1418
1419     if(timeout_.empty()){
1420       cancelTimeout();
1421     } else {
1422       uint64_t sleepTime = timeout_.front();
1423       timeout_.pop_front();
1424       if (sleepTime) {
1425         usleep(sleepTime);
1426       }
1427       scheduleTimeout(1);
1428     }
1429   }
1430
1431   int getTimeouts() const {
1432     return timeouts_;
1433   }
1434
1435  private:
1436   int timeouts_;
1437   std::deque<uint64_t>& timeout_;
1438 };
1439
1440 /**
1441  * Verify that idle time is correctly accounted for when decaying our loop
1442  * time.
1443  *
1444  * This works by creating a high loop time (via usleep), expecting a latency
1445  * callback with known value, and then scheduling a timeout for later. This
1446  * later timeout is far enough in the future that the idle time should have
1447  * caused the loop time to decay.
1448  */
1449 TEST(EventBaseTest, IdleTime) {
1450   EventBase eventBase;
1451   eventBase.setLoadAvgMsec(1000);
1452   eventBase.resetLoadAvg(5900.0);
1453   std::deque<uint64_t> timeouts0(4, 8080);
1454   timeouts0.push_front(8000);
1455   timeouts0.push_back(14000);
1456   IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1457   std::deque<uint64_t> timeouts(20, 20);
1458   std::unique_ptr<IdleTimeTimeoutSeries> tos;
1459
1460   int latencyCallbacks = 0;
1461   eventBase.setMaxLatency(6000, [&]() {
1462     ++latencyCallbacks;
1463
1464     switch (latencyCallbacks) {
1465     case 1:
1466       ASSERT_EQ(6, tos0.getTimeouts());
1467       ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1468       ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1469       tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1470       break;
1471
1472     default:
1473       FAIL() << "Unexpected latency callback";
1474       break;
1475     }
1476   });
1477
1478   // Kick things off with an "immedite" timeout
1479   tos0.scheduleTimeout(1);
1480
1481   eventBase.loop();
1482
1483   ASSERT_EQ(1, latencyCallbacks);
1484   ASSERT_EQ(7, tos0.getTimeouts());
1485   ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1486   ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1487   ASSERT_EQ(21, tos->getTimeouts());
1488 }
1489
1490 /**
1491  * Test that thisLoop functionality works with terminateLoopSoon
1492  */
1493 TEST(EventBaseTest, ThisLoop) {
1494   EventBase eb;
1495   bool runInLoop = false;
1496   bool runThisLoop = false;
1497
1498   eb.runInLoop([&](){
1499       eb.terminateLoopSoon();
1500       eb.runInLoop([&]() {
1501           runInLoop = true;
1502         });
1503       eb.runInLoop([&]() {
1504           runThisLoop = true;
1505         }, true);
1506     }, true);
1507   eb.loopForever();
1508
1509   // Should not work
1510   ASSERT_FALSE(runInLoop);
1511   // Should work with thisLoop
1512   ASSERT_TRUE(runThisLoop);
1513 }
1514
1515 TEST(EventBaseTest, EventBaseThreadLoop) {
1516   EventBase base;
1517   bool ran = false;
1518
1519   base.runInEventBaseThread([&](){
1520     ran = true;
1521   });
1522   base.loop();
1523
1524   ASSERT_EQ(true, ran);
1525 }
1526
1527 TEST(EventBaseTest, EventBaseThreadName) {
1528   EventBase base;
1529   base.setName("foo");
1530   base.loop();
1531
1532 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1533   char name[16];
1534   pthread_getname_np(pthread_self(), name, 16);
1535   ASSERT_EQ(0, strcmp("foo", name));
1536 #endif
1537 }
1538
1539 TEST(EventBaseTest, RunBeforeLoop) {
1540   EventBase base;
1541   CountedLoopCallback cb(&base, 1, [&](){
1542     base.terminateLoopSoon();
1543   });
1544   base.runBeforeLoop(&cb);
1545   base.loopForever();
1546   ASSERT_EQ(cb.getCount(), 0);
1547 }
1548
1549 TEST(EventBaseTest, RunBeforeLoopWait) {
1550   EventBase base;
1551   CountedLoopCallback cb(&base, 1);
1552   base.runAfterDelay([&](){
1553       base.terminateLoopSoon();
1554     }, 500);
1555   base.runBeforeLoop(&cb);
1556   base.loopForever();
1557
1558   // Check that we only ran once, and did not loop multiple times.
1559   ASSERT_EQ(cb.getCount(), 0);
1560 }