EventBase::runAfterDelay to throw an exception
[folly.git] / folly / io / async / test / EventBaseTest.cpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 #include <folly/Memory.h>
20
21 #include <folly/io/async/AsyncTimeout.h>
22 #include <folly/io/async/EventBase.h>
23 #include <folly/io/async/EventHandler.h>
24 #include <folly/io/async/test/SocketPair.h>
25 #include <folly/io/async/test/Util.h>
26
27 #include <atomic>
28 #include <iostream>
29 #include <unistd.h>
30 #include <memory>
31 #include <thread>
32
33 using std::atomic;
34 using std::deque;
35 using std::pair;
36 using std::vector;
37 using std::unique_ptr;
38 using std::thread;
39 using std::make_pair;
40 using std::cerr;
41 using std::endl;
42 using std::chrono::milliseconds;
43 using std::chrono::microseconds;
44 using std::chrono::duration_cast;
45
46 using namespace folly;
47
48 ///////////////////////////////////////////////////////////////////////////
49 // Tests for read and write events
50 ///////////////////////////////////////////////////////////////////////////
51
52 enum { BUF_SIZE = 4096 };
53
54 ssize_t writeToFD(int fd, size_t length) {
55   // write an arbitrary amount of data to the fd
56   char buf[length];
57   memset(buf, 'a', sizeof(buf));
58   ssize_t rc = write(fd, buf, sizeof(buf));
59   CHECK_EQ(rc, length);
60   return rc;
61 }
62
63 size_t writeUntilFull(int fd) {
64   // Write to the fd until EAGAIN is returned
65   size_t bytesWritten = 0;
66   char buf[BUF_SIZE];
67   memset(buf, 'a', sizeof(buf));
68   while (true) {
69     ssize_t rc = write(fd, buf, sizeof(buf));
70     if (rc < 0) {
71       CHECK_EQ(errno, EAGAIN);
72       break;
73     } else {
74       bytesWritten += rc;
75     }
76   }
77   return bytesWritten;
78 }
79
80 ssize_t readFromFD(int fd, size_t length) {
81   // write an arbitrary amount of data to the fd
82   char buf[length];
83   return read(fd, buf, sizeof(buf));
84 }
85
86 size_t readUntilEmpty(int fd) {
87   // Read from the fd until EAGAIN is returned
88   char buf[BUF_SIZE];
89   size_t bytesRead = 0;
90   while (true) {
91     int rc = read(fd, buf, sizeof(buf));
92     if (rc == 0) {
93       CHECK(false) << "unexpected EOF";
94     } else if (rc < 0) {
95       CHECK_EQ(errno, EAGAIN);
96       break;
97     } else {
98       bytesRead += rc;
99     }
100   }
101   return bytesRead;
102 }
103
104 void checkReadUntilEmpty(int fd, size_t expectedLength) {
105   ASSERT_EQ(readUntilEmpty(fd), expectedLength);
106 }
107
108 struct ScheduledEvent {
109   int milliseconds;
110   uint16_t events;
111   size_t length;
112   ssize_t result;
113
114   void perform(int fd) {
115     if (events & EventHandler::READ) {
116       if (length == 0) {
117         result = readUntilEmpty(fd);
118       } else {
119         result = readFromFD(fd, length);
120       }
121     }
122     if (events & EventHandler::WRITE) {
123       if (length == 0) {
124         result = writeUntilFull(fd);
125       } else {
126         result = writeToFD(fd, length);
127       }
128     }
129   }
130 };
131
132 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
133   for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
134     eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
135                              ev->milliseconds);
136   }
137 }
138
139 class TestHandler : public EventHandler {
140  public:
141   TestHandler(EventBase* eventBase, int fd)
142     : EventHandler(eventBase, fd), fd_(fd) {}
143
144   virtual void handlerReady(uint16_t events) noexcept {
145     ssize_t bytesRead = 0;
146     ssize_t bytesWritten = 0;
147     if (events & READ) {
148       // Read all available data, so EventBase will stop calling us
149       // until new data becomes available
150       bytesRead = readUntilEmpty(fd_);
151     }
152     if (events & WRITE) {
153       // Write until the pipe buffer is full, so EventBase will stop calling
154       // us until the other end has read some data
155       bytesWritten = writeUntilFull(fd_);
156     }
157
158     log.push_back(EventRecord(events, bytesRead, bytesWritten));
159   }
160
161   struct EventRecord {
162     EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
163       : events(events)
164       , timestamp()
165       , bytesRead(bytesRead)
166       , bytesWritten(bytesWritten) {}
167
168     uint16_t events;
169     TimePoint timestamp;
170     ssize_t bytesRead;
171     ssize_t bytesWritten;
172   };
173
174   deque<EventRecord> log;
175
176  private:
177   int fd_;
178 };
179
180 /**
181  * Test a READ event
182  */
183 TEST(EventBaseTest, ReadEvent) {
184   EventBase eb;
185   SocketPair sp;
186
187   // Register for read events
188   TestHandler handler(&eb, sp[0]);
189   handler.registerHandler(EventHandler::READ);
190
191   // Register timeouts to perform two write events
192   ScheduledEvent events[] = {
193     { 10, EventHandler::WRITE, 2345 },
194     { 160, EventHandler::WRITE, 99 },
195     { 0, 0, 0 },
196   };
197   scheduleEvents(&eb, sp[1], events);
198
199   // Loop
200   TimePoint start;
201   eb.loop();
202   TimePoint end;
203
204   // Since we didn't use the EventHandler::PERSIST flag, the handler should
205   // have received the first read, then unregistered itself.  Check that only
206   // the first chunk of data was received.
207   ASSERT_EQ(handler.log.size(), 1);
208   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
209   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
210                   milliseconds(events[0].milliseconds), milliseconds(90));
211   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
212   ASSERT_EQ(handler.log[0].bytesWritten, 0);
213   T_CHECK_TIMEOUT(start, end,
214                   milliseconds(events[1].milliseconds), milliseconds(30));
215
216   // Make sure the second chunk of data is still waiting to be read.
217   size_t bytesRemaining = readUntilEmpty(sp[0]);
218   ASSERT_EQ(bytesRemaining, events[1].length);
219 }
220
221 /**
222  * Test (READ | PERSIST)
223  */
224 TEST(EventBaseTest, ReadPersist) {
225   EventBase eb;
226   SocketPair sp;
227
228   // Register for read events
229   TestHandler handler(&eb, sp[0]);
230   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
231
232   // Register several timeouts to perform writes
233   ScheduledEvent events[] = {
234     { 10,  EventHandler::WRITE, 1024 },
235     { 20,  EventHandler::WRITE, 2211 },
236     { 30,  EventHandler::WRITE, 4096 },
237     { 100, EventHandler::WRITE, 100 },
238     { 0, 0 },
239   };
240   scheduleEvents(&eb, sp[1], events);
241
242   // Schedule a timeout to unregister the handler after the third write
243   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
244
245   // Loop
246   TimePoint start;
247   eb.loop();
248   TimePoint end;
249
250   // The handler should have received the first 3 events,
251   // then been unregistered after that.
252   ASSERT_EQ(handler.log.size(), 3);
253   for (int n = 0; n < 3; ++n) {
254     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
255     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
256                     milliseconds(events[n].milliseconds));
257     ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
258     ASSERT_EQ(handler.log[n].bytesWritten, 0);
259   }
260   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
261
262   // Make sure the data from the last write is still waiting to be read
263   size_t bytesRemaining = readUntilEmpty(sp[0]);
264   ASSERT_EQ(bytesRemaining, events[3].length);
265 }
266
267 /**
268  * Test registering for READ when the socket is immediately readable
269  */
270 TEST(EventBaseTest, ReadImmediate) {
271   EventBase eb;
272   SocketPair sp;
273
274   // Write some data to the socket so the other end will
275   // be immediately readable
276   size_t dataLength = 1234;
277   writeToFD(sp[1], dataLength);
278
279   // Register for read events
280   TestHandler handler(&eb, sp[0]);
281   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
282
283   // Register a timeout to perform another write
284   ScheduledEvent events[] = {
285     { 10, EventHandler::WRITE, 2345 },
286     { 0, 0, 0 },
287   };
288   scheduleEvents(&eb, sp[1], events);
289
290   // Schedule a timeout to unregister the handler
291   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
292
293   // Loop
294   TimePoint start;
295   eb.loop();
296   TimePoint end;
297
298   ASSERT_EQ(handler.log.size(), 2);
299
300   // There should have been 1 event for immediate readability
301   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
302   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
303   ASSERT_EQ(handler.log[0].bytesRead, dataLength);
304   ASSERT_EQ(handler.log[0].bytesWritten, 0);
305
306   // There should be another event after the timeout wrote more data
307   ASSERT_EQ(handler.log[1].events, EventHandler::READ);
308   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
309                   milliseconds(events[0].milliseconds));
310   ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
311   ASSERT_EQ(handler.log[1].bytesWritten, 0);
312
313   T_CHECK_TIMEOUT(start, end, milliseconds(20));
314 }
315
316 /**
317  * Test a WRITE event
318  */
319 TEST(EventBaseTest, WriteEvent) {
320   EventBase eb;
321   SocketPair sp;
322
323   // Fill up the write buffer before starting
324   size_t initialBytesWritten = writeUntilFull(sp[0]);
325
326   // Register for write events
327   TestHandler handler(&eb, sp[0]);
328   handler.registerHandler(EventHandler::WRITE);
329
330   // Register timeouts to perform two reads
331   ScheduledEvent events[] = {
332     { 10, EventHandler::READ, 0 },
333     { 60, EventHandler::READ, 0 },
334     { 0, 0, 0 },
335   };
336   scheduleEvents(&eb, sp[1], events);
337
338   // Loop
339   TimePoint start;
340   eb.loop();
341   TimePoint end;
342
343   // Since we didn't use the EventHandler::PERSIST flag, the handler should
344   // have only been able to write once, then unregistered itself.
345   ASSERT_EQ(handler.log.size(), 1);
346   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
347   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
348                   milliseconds(events[0].milliseconds));
349   ASSERT_EQ(handler.log[0].bytesRead, 0);
350   ASSERT_GT(handler.log[0].bytesWritten, 0);
351   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
352
353   ASSERT_EQ(events[0].result, initialBytesWritten);
354   ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
355 }
356
357 /**
358  * Test (WRITE | PERSIST)
359  */
360 TEST(EventBaseTest, WritePersist) {
361   EventBase eb;
362   SocketPair sp;
363
364   // Fill up the write buffer before starting
365   size_t initialBytesWritten = writeUntilFull(sp[0]);
366
367   // Register for write events
368   TestHandler handler(&eb, sp[0]);
369   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
370
371   // Register several timeouts to read from the socket at several intervals
372   ScheduledEvent events[] = {
373     { 10,  EventHandler::READ, 0 },
374     { 40,  EventHandler::READ, 0 },
375     { 70,  EventHandler::READ, 0 },
376     { 100, EventHandler::READ, 0 },
377     { 0, 0 },
378   };
379   scheduleEvents(&eb, sp[1], events);
380
381   // Schedule a timeout to unregister the handler after the third read
382   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
383
384   // Loop
385   TimePoint start;
386   eb.loop();
387   TimePoint end;
388
389   // The handler should have received the first 3 events,
390   // then been unregistered after that.
391   ASSERT_EQ(handler.log.size(), 3);
392   ASSERT_EQ(events[0].result, initialBytesWritten);
393   for (int n = 0; n < 3; ++n) {
394     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
395     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
396                     milliseconds(events[n].milliseconds));
397     ASSERT_EQ(handler.log[n].bytesRead, 0);
398     ASSERT_GT(handler.log[n].bytesWritten, 0);
399     ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
400   }
401   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
402 }
403
404 /**
405  * Test registering for WRITE when the socket is immediately writable
406  */
407 TEST(EventBaseTest, WriteImmediate) {
408   EventBase eb;
409   SocketPair sp;
410
411   // Register for write events
412   TestHandler handler(&eb, sp[0]);
413   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
414
415   // Register a timeout to perform a read
416   ScheduledEvent events[] = {
417     { 10, EventHandler::READ, 0 },
418     { 0, 0, 0 },
419   };
420   scheduleEvents(&eb, sp[1], events);
421
422   // Schedule a timeout to unregister the handler
423   int64_t unregisterTimeout = 40;
424   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
425                    unregisterTimeout);
426
427   // Loop
428   TimePoint start;
429   eb.loop();
430   TimePoint end;
431
432   ASSERT_EQ(handler.log.size(), 2);
433
434   // Since the socket buffer was initially empty,
435   // there should have been 1 event for immediate writability
436   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
437   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
438   ASSERT_EQ(handler.log[0].bytesRead, 0);
439   ASSERT_GT(handler.log[0].bytesWritten, 0);
440
441   // There should be another event after the timeout wrote more data
442   ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
443   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
444                   milliseconds(events[0].milliseconds));
445   ASSERT_EQ(handler.log[1].bytesRead, 0);
446   ASSERT_GT(handler.log[1].bytesWritten, 0);
447
448   T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
449 }
450
451 /**
452  * Test (READ | WRITE) when the socket becomes readable first
453  */
454 TEST(EventBaseTest, ReadWrite) {
455   EventBase eb;
456   SocketPair sp;
457
458   // Fill up the write buffer before starting
459   size_t sock0WriteLength = writeUntilFull(sp[0]);
460
461   // Register for read and write events
462   TestHandler handler(&eb, sp[0]);
463   handler.registerHandler(EventHandler::READ_WRITE);
464
465   // Register timeouts to perform a write then a read.
466   ScheduledEvent events[] = {
467     { 10, EventHandler::WRITE, 2345 },
468     { 40, EventHandler::READ, 0 },
469     { 0, 0, 0 },
470   };
471   scheduleEvents(&eb, sp[1], events);
472
473   // Loop
474   TimePoint start;
475   eb.loop();
476   TimePoint end;
477
478   // Since we didn't use the EventHandler::PERSIST flag, the handler should
479   // have only noticed readability, then unregistered itself.  Check that only
480   // one event was logged.
481   ASSERT_EQ(handler.log.size(), 1);
482   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
483   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
484                   milliseconds(events[0].milliseconds));
485   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
486   ASSERT_EQ(handler.log[0].bytesWritten, 0);
487   ASSERT_EQ(events[1].result, sock0WriteLength);
488   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
489 }
490
491 /**
492  * Test (READ | WRITE) when the socket becomes writable first
493  */
494 TEST(EventBaseTest, WriteRead) {
495   EventBase eb;
496   SocketPair sp;
497
498   // Fill up the write buffer before starting
499   size_t sock0WriteLength = writeUntilFull(sp[0]);
500
501   // Register for read and write events
502   TestHandler handler(&eb, sp[0]);
503   handler.registerHandler(EventHandler::READ_WRITE);
504
505   // Register timeouts to perform a read then a write.
506   size_t sock1WriteLength = 2345;
507   ScheduledEvent events[] = {
508     { 10, EventHandler::READ, 0 },
509     { 40, EventHandler::WRITE, sock1WriteLength },
510     { 0, 0, 0 },
511   };
512   scheduleEvents(&eb, sp[1], events);
513
514   // Loop
515   TimePoint start;
516   eb.loop();
517   TimePoint end;
518
519   // Since we didn't use the EventHandler::PERSIST flag, the handler should
520   // have only noticed writability, then unregistered itself.  Check that only
521   // one event was logged.
522   ASSERT_EQ(handler.log.size(), 1);
523   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
524   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
525                   milliseconds(events[0].milliseconds));
526   ASSERT_EQ(handler.log[0].bytesRead, 0);
527   ASSERT_GT(handler.log[0].bytesWritten, 0);
528   ASSERT_EQ(events[0].result, sock0WriteLength);
529   ASSERT_EQ(events[1].result, sock1WriteLength);
530   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
531
532   // Make sure the written data is still waiting to be read.
533   size_t bytesRemaining = readUntilEmpty(sp[0]);
534   ASSERT_EQ(bytesRemaining, events[1].length);
535 }
536
537 /**
538  * Test (READ | WRITE) when the socket becomes readable and writable
539  * at the same time.
540  */
541 TEST(EventBaseTest, ReadWriteSimultaneous) {
542   EventBase eb;
543   SocketPair sp;
544
545   // Fill up the write buffer before starting
546   size_t sock0WriteLength = writeUntilFull(sp[0]);
547
548   // Register for read and write events
549   TestHandler handler(&eb, sp[0]);
550   handler.registerHandler(EventHandler::READ_WRITE);
551
552   // Register a timeout to perform a read and write together
553   ScheduledEvent events[] = {
554     { 10, EventHandler::READ | EventHandler::WRITE, 0 },
555     { 0, 0, 0 },
556   };
557   scheduleEvents(&eb, sp[1], events);
558
559   // Loop
560   TimePoint start;
561   eb.loop();
562   TimePoint end;
563
564   // It's not strictly required that the EventBase register us about both
565   // events in the same call.  So, it's possible that if the EventBase
566   // implementation changes this test could start failing, and it wouldn't be
567   // considered breaking the API.  However for now it's nice to exercise this
568   // code path.
569   ASSERT_EQ(handler.log.size(), 1);
570   ASSERT_EQ(handler.log[0].events,
571                     EventHandler::READ | EventHandler::WRITE);
572   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
573                   milliseconds(events[0].milliseconds));
574   ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
575   ASSERT_GT(handler.log[0].bytesWritten, 0);
576   T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
577 }
578
579 /**
580  * Test (READ | WRITE | PERSIST)
581  */
582 TEST(EventBaseTest, ReadWritePersist) {
583   EventBase eb;
584   SocketPair sp;
585
586   // Register for read and write events
587   TestHandler handler(&eb, sp[0]);
588   handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
589                           EventHandler::PERSIST);
590
591   // Register timeouts to perform several reads and writes
592   ScheduledEvent events[] = {
593     { 10, EventHandler::WRITE, 2345 },
594     { 20, EventHandler::READ, 0 },
595     { 35, EventHandler::WRITE, 200 },
596     { 45, EventHandler::WRITE, 15 },
597     { 55, EventHandler::READ, 0 },
598     { 120, EventHandler::WRITE, 2345 },
599     { 0, 0, 0 },
600   };
601   scheduleEvents(&eb, sp[1], events);
602
603   // Schedule a timeout to unregister the handler
604   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
605
606   // Loop
607   TimePoint start;
608   eb.loop();
609   TimePoint end;
610
611   ASSERT_EQ(handler.log.size(), 6);
612
613   // Since we didn't fill up the write buffer immediately, there should
614   // be an immediate event for writability.
615   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
616   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
617   ASSERT_EQ(handler.log[0].bytesRead, 0);
618   ASSERT_GT(handler.log[0].bytesWritten, 0);
619
620   // Events 1 through 5 should correspond to the scheduled events
621   for (int n = 1; n < 6; ++n) {
622     ScheduledEvent* event = &events[n - 1];
623     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
624                     milliseconds(event->milliseconds));
625     if (event->events == EventHandler::READ) {
626       ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
627       ASSERT_EQ(handler.log[n].bytesRead, 0);
628       ASSERT_GT(handler.log[n].bytesWritten, 0);
629     } else {
630       ASSERT_EQ(handler.log[n].events, EventHandler::READ);
631       ASSERT_EQ(handler.log[n].bytesRead, event->length);
632       ASSERT_EQ(handler.log[n].bytesWritten, 0);
633     }
634   }
635
636   // The timeout should have unregistered the handler before the last write.
637   // Make sure that data is still waiting to be read
638   size_t bytesRemaining = readUntilEmpty(sp[0]);
639   ASSERT_EQ(bytesRemaining, events[5].length);
640 }
641
642
643 class PartialReadHandler : public TestHandler {
644  public:
645   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
646     : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
647
648   virtual void handlerReady(uint16_t events) noexcept {
649     assert(events == EventHandler::READ);
650     ssize_t bytesRead = readFromFD(fd_, readLength_);
651     log.push_back(EventRecord(events, bytesRead, 0));
652   }
653
654  private:
655   int fd_;
656   size_t readLength_;
657 };
658
659 /**
660  * Test reading only part of the available data when a read event is fired.
661  * When PERSIST is used, make sure the handler gets notified again the next
662  * time around the loop.
663  */
664 TEST(EventBaseTest, ReadPartial) {
665   EventBase eb;
666   SocketPair sp;
667
668   // Register for read events
669   size_t readLength = 100;
670   PartialReadHandler handler(&eb, sp[0], readLength);
671   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
672
673   // Register a timeout to perform a single write,
674   // with more data than PartialReadHandler will read at once
675   ScheduledEvent events[] = {
676     { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) },
677     { 0, 0, 0 },
678   };
679   scheduleEvents(&eb, sp[1], events);
680
681   // Schedule a timeout to unregister the handler
682   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
683
684   // Loop
685   TimePoint start;
686   eb.loop();
687   TimePoint end;
688
689   ASSERT_EQ(handler.log.size(), 4);
690
691   // The first 3 invocations should read readLength bytes each
692   for (int n = 0; n < 3; ++n) {
693     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
694     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
695                     milliseconds(events[0].milliseconds));
696     ASSERT_EQ(handler.log[n].bytesRead, readLength);
697     ASSERT_EQ(handler.log[n].bytesWritten, 0);
698   }
699   // The last read only has readLength/2 bytes
700   ASSERT_EQ(handler.log[3].events, EventHandler::READ);
701   T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
702                   milliseconds(events[0].milliseconds));
703   ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
704   ASSERT_EQ(handler.log[3].bytesWritten, 0);
705 }
706
707
708 class PartialWriteHandler : public TestHandler {
709  public:
710   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
711     : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
712
713   virtual void handlerReady(uint16_t events) noexcept {
714     assert(events == EventHandler::WRITE);
715     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
716     log.push_back(EventRecord(events, 0, bytesWritten));
717   }
718
719  private:
720   int fd_;
721   size_t writeLength_;
722 };
723
724 /**
725  * Test writing without completely filling up the write buffer when the fd
726  * becomes writable.  When PERSIST is used, make sure the handler gets
727  * notified again the next time around the loop.
728  */
729 TEST(EventBaseTest, WritePartial) {
730   EventBase eb;
731   SocketPair sp;
732
733   // Fill up the write buffer before starting
734   size_t initialBytesWritten = writeUntilFull(sp[0]);
735
736   // Register for write events
737   size_t writeLength = 100;
738   PartialWriteHandler handler(&eb, sp[0], writeLength);
739   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
740
741   // Register a timeout to read, so that more data can be written
742   ScheduledEvent events[] = {
743     { 10, EventHandler::READ, 0 },
744     { 0, 0, 0 },
745   };
746   scheduleEvents(&eb, sp[1], events);
747
748   // Schedule a timeout to unregister the handler
749   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
750
751   // Loop
752   TimePoint start;
753   eb.loop();
754   TimePoint end;
755
756   // Depending on how big the socket buffer is, there will be multiple writes
757   // Only check the first 5
758   int numChecked = 5;
759   ASSERT_GE(handler.log.size(), numChecked);
760   ASSERT_EQ(events[0].result, initialBytesWritten);
761
762   // The first 3 invocations should read writeLength bytes each
763   for (int n = 0; n < numChecked; ++n) {
764     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
765     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
766                     milliseconds(events[0].milliseconds));
767     ASSERT_EQ(handler.log[n].bytesRead, 0);
768     ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
769   }
770 }
771
772
773 /**
774  * Test destroying a registered EventHandler
775  */
776 TEST(EventBaseTest, DestroyHandler) {
777   class DestroyHandler : public AsyncTimeout {
778    public:
779     DestroyHandler(EventBase* eb, EventHandler* h)
780       : AsyncTimeout(eb)
781       , handler_(h) {}
782
783     virtual void timeoutExpired() noexcept {
784       delete handler_;
785     }
786
787    private:
788     EventHandler* handler_;
789   };
790
791   EventBase eb;
792   SocketPair sp;
793
794   // Fill up the write buffer before starting
795   size_t initialBytesWritten = writeUntilFull(sp[0]);
796
797   // Register for write events
798   TestHandler* handler = new TestHandler(&eb, sp[0]);
799   handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
800
801   // After 10ms, read some data, so that the handler
802   // will be notified that it can write.
803   eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
804                    10);
805
806   // Start a timer to destroy the handler after 25ms
807   // This mainly just makes sure the code doesn't break or assert
808   DestroyHandler dh(&eb, handler);
809   dh.scheduleTimeout(25);
810
811   TimePoint start;
812   eb.loop();
813   TimePoint end;
814
815   // Make sure the EventHandler was uninstalled properly when it was
816   // destroyed, and the EventBase loop exited
817   T_CHECK_TIMEOUT(start, end, milliseconds(25));
818
819   // Make sure that the handler wrote data to the socket
820   // before it was destroyed
821   size_t bytesRemaining = readUntilEmpty(sp[1]);
822   ASSERT_GT(bytesRemaining, 0);
823 }
824
825
826 ///////////////////////////////////////////////////////////////////////////
827 // Tests for timeout events
828 ///////////////////////////////////////////////////////////////////////////
829
830 TEST(EventBaseTest, RunAfterDelay) {
831   EventBase eb;
832
833   TimePoint timestamp1(false);
834   TimePoint timestamp2(false);
835   TimePoint timestamp3(false);
836   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
837   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
838   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
839
840   TimePoint start;
841   eb.loop();
842   TimePoint end;
843
844   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
845   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
846   T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
847   T_CHECK_TIMEOUT(start, end, milliseconds(40));
848 }
849
850 /**
851  * Test the behavior of tryRunAfterDelay() when some timeouts are
852  * still scheduled when the EventBase is destroyed.
853  */
854 TEST(EventBaseTest, RunAfterDelayDestruction) {
855   TimePoint timestamp1(false);
856   TimePoint timestamp2(false);
857   TimePoint timestamp3(false);
858   TimePoint timestamp4(false);
859   TimePoint start(false);
860   TimePoint end(false);
861
862   {
863     EventBase eb;
864
865     // Run two normal timeouts
866     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
867     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
868
869     // Schedule a timeout to stop the event loop after 40ms
870     eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
871
872     // Schedule 2 timeouts that would fire after the event loop stops
873     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
874     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
875
876     start.reset();
877     eb.loop();
878     end.reset();
879   }
880
881   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
882   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
883   T_CHECK_TIMEOUT(start, end, milliseconds(40));
884
885   ASSERT_TRUE(timestamp3.isUnset());
886   ASSERT_TRUE(timestamp4.isUnset());
887
888   // Ideally this test should be run under valgrind to ensure that no
889   // memory is leaked.
890 }
891
892 class TestTimeout : public AsyncTimeout {
893  public:
894   explicit TestTimeout(EventBase* eventBase)
895     : AsyncTimeout(eventBase)
896     , timestamp(false) {}
897
898   virtual void timeoutExpired() noexcept {
899     timestamp.reset();
900   }
901
902   TimePoint timestamp;
903 };
904
905 TEST(EventBaseTest, BasicTimeouts) {
906   EventBase eb;
907
908   TestTimeout t1(&eb);
909   TestTimeout t2(&eb);
910   TestTimeout t3(&eb);
911   t1.scheduleTimeout(10);
912   t2.scheduleTimeout(20);
913   t3.scheduleTimeout(40);
914
915   TimePoint start;
916   eb.loop();
917   TimePoint end;
918
919   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
920   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
921   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
922   T_CHECK_TIMEOUT(start, end, milliseconds(40));
923 }
924
925 class ReschedulingTimeout : public AsyncTimeout {
926  public:
927   ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
928     : AsyncTimeout(evb)
929     , timeouts_(timeouts)
930     , iterator_(timeouts_.begin()) {}
931
932   void start() {
933     reschedule();
934   }
935
936   virtual void timeoutExpired() noexcept {
937     timestamps.push_back(TimePoint());
938     reschedule();
939   }
940
941   void reschedule() {
942     if (iterator_ != timeouts_.end()) {
943       uint32_t timeout = *iterator_;
944       ++iterator_;
945       scheduleTimeout(timeout);
946     }
947   }
948
949   vector<TimePoint> timestamps;
950
951  private:
952   vector<uint32_t> timeouts_;
953   vector<uint32_t>::const_iterator iterator_;
954 };
955
956 /**
957  * Test rescheduling the same timeout multiple times
958  */
959 TEST(EventBaseTest, ReuseTimeout) {
960   EventBase eb;
961
962   vector<uint32_t> timeouts;
963   timeouts.push_back(10);
964   timeouts.push_back(30);
965   timeouts.push_back(15);
966
967   ReschedulingTimeout t(&eb, timeouts);
968   t.start();
969
970   TimePoint start;
971   eb.loop();
972   TimePoint end;
973
974   // Use a higher tolerance than usual.  We're waiting on 3 timeouts
975   // consecutively.  In general, each timeout may go over by a few
976   // milliseconds, and we're tripling this error by witing on 3 timeouts.
977   milliseconds tolerance{6};
978
979   ASSERT_EQ(timeouts.size(), t.timestamps.size());
980   uint32_t total = 0;
981   for (size_t n = 0; n < timeouts.size(); ++n) {
982     total += timeouts[n];
983     T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
984   }
985   T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
986 }
987
988 /**
989  * Test rescheduling a timeout before it has fired
990  */
991 TEST(EventBaseTest, RescheduleTimeout) {
992   EventBase eb;
993
994   TestTimeout t1(&eb);
995   TestTimeout t2(&eb);
996   TestTimeout t3(&eb);
997
998   t1.scheduleTimeout(15);
999   t2.scheduleTimeout(30);
1000   t3.scheduleTimeout(30);
1001
1002   auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1003       &AsyncTimeout::scheduleTimeout);
1004
1005   // after 10ms, reschedule t2 to run sooner than originally scheduled
1006   eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1007   // after 10ms, reschedule t3 to run later than originally scheduled
1008   eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1009
1010   TimePoint start;
1011   eb.loop();
1012   TimePoint end;
1013
1014   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1015   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1016   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1017   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1018 }
1019
1020 /**
1021  * Test cancelling a timeout
1022  */
1023 TEST(EventBaseTest, CancelTimeout) {
1024   EventBase eb;
1025
1026   vector<uint32_t> timeouts;
1027   timeouts.push_back(10);
1028   timeouts.push_back(30);
1029   timeouts.push_back(25);
1030
1031   ReschedulingTimeout t(&eb, timeouts);
1032   t.start();
1033   eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1034
1035   TimePoint start;
1036   eb.loop();
1037   TimePoint end;
1038
1039   ASSERT_EQ(t.timestamps.size(), 2);
1040   T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1041   T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1042   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1043 }
1044
1045 /**
1046  * Test destroying a scheduled timeout object
1047  */
1048 TEST(EventBaseTest, DestroyTimeout) {
1049   class DestroyTimeout : public AsyncTimeout {
1050    public:
1051     DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1052       : AsyncTimeout(eb)
1053       , timeout_(t) {}
1054
1055     virtual void timeoutExpired() noexcept {
1056       delete timeout_;
1057     }
1058
1059    private:
1060     AsyncTimeout* timeout_;
1061   };
1062
1063   EventBase eb;
1064
1065   TestTimeout* t1 = new TestTimeout(&eb);
1066   t1->scheduleTimeout(30);
1067
1068   DestroyTimeout dt(&eb, t1);
1069   dt.scheduleTimeout(10);
1070
1071   TimePoint start;
1072   eb.loop();
1073   TimePoint end;
1074
1075   T_CHECK_TIMEOUT(start, end, milliseconds(10));
1076 }
1077
1078
1079 ///////////////////////////////////////////////////////////////////////////
1080 // Test for runInThreadTestFunc()
1081 ///////////////////////////////////////////////////////////////////////////
1082
1083 struct RunInThreadData {
1084   RunInThreadData(int numThreads, int opsPerThread)
1085     : opsPerThread(opsPerThread)
1086     , opsToGo(numThreads*opsPerThread) {}
1087
1088   EventBase evb;
1089   deque< pair<int, int> > values;
1090
1091   int opsPerThread;
1092   int opsToGo;
1093 };
1094
1095 struct RunInThreadArg {
1096   RunInThreadArg(RunInThreadData* data,
1097                  int threadId,
1098                  int value)
1099     : data(data)
1100     , thread(threadId)
1101     , value(value) {}
1102
1103   RunInThreadData* data;
1104   int thread;
1105   int value;
1106 };
1107
1108 void runInThreadTestFunc(RunInThreadArg* arg) {
1109   arg->data->values.push_back(make_pair(arg->thread, arg->value));
1110   RunInThreadData* data = arg->data;
1111   delete arg;
1112
1113   if(--data->opsToGo == 0) {
1114     // Break out of the event base loop if we are the last thread running
1115     data->evb.terminateLoopSoon();
1116   }
1117 }
1118
1119 TEST(EventBaseTest, RunInThread) {
1120   uint32_t numThreads = 50;
1121   uint32_t opsPerThread = 100;
1122   RunInThreadData data(numThreads, opsPerThread);
1123
1124   deque<std::thread> threads;
1125   for (uint32_t i = 0; i < numThreads; ++i) {
1126     threads.emplace_back([i, &data] {
1127         for (int n = 0; n < data.opsPerThread; ++n) {
1128           RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1129           data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1130           usleep(10);
1131         }
1132       });
1133   }
1134
1135   // Add a timeout event to run after 3 seconds.
1136   // Otherwise loop() will return immediately since there are no events to run.
1137   // Once the last thread exits, it will stop the loop().  However, this
1138   // timeout also stops the loop in case there is a bug performing the normal
1139   // stop.
1140   data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1141                          3000);
1142
1143   TimePoint start;
1144   data.evb.loop();
1145   TimePoint end;
1146
1147   // Verify that the loop exited because all threads finished and requested it
1148   // to stop.  This should happen much sooner than the 3 second timeout.
1149   // Assert that it happens in under a second.  (This is still tons of extra
1150   // padding.)
1151
1152   auto timeTaken = std::chrono::duration_cast<milliseconds>(
1153     end.getTime() - start.getTime());
1154   ASSERT_LT(timeTaken.count(), 1000);
1155   VLOG(11) << "Time taken: " << timeTaken.count();
1156
1157   // Verify that we have all of the events from every thread
1158   int expectedValues[numThreads];
1159   for (uint32_t n = 0; n < numThreads; ++n) {
1160     expectedValues[n] = 0;
1161   }
1162   for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1163        it != data.values.end();
1164        ++it) {
1165     int threadID = it->first;
1166     int value = it->second;
1167     ASSERT_EQ(expectedValues[threadID], value);
1168     ++expectedValues[threadID];
1169   }
1170   for (uint32_t n = 0; n < numThreads; ++n) {
1171     ASSERT_EQ(expectedValues[n], opsPerThread);
1172   }
1173
1174   // Wait on all of the threads.
1175   for (auto& thread: threads) {
1176     thread.join();
1177   }
1178 }
1179
1180 //  This test simulates some calls, and verifies that the waiting happens by
1181 //  triggering what otherwise would be race conditions, and trying to detect
1182 //  whether any of the race conditions happened.
1183 TEST(EventBaseTest, RunInEventLoopThreadAndWait) {
1184   const size_t c = 256;
1185   vector<unique_ptr<atomic<size_t>>> atoms(c);
1186   for (size_t i = 0; i < c; ++i) {
1187     auto& atom = atoms.at(i);
1188     atom = make_unique<atomic<size_t>>(0);
1189   }
1190   vector<thread> threads(c);
1191   for (size_t i = 0; i < c; ++i) {
1192     auto& atom = *atoms.at(i);
1193     auto& th = threads.at(i);
1194     th = thread([&atom] {
1195         EventBase eb;
1196         auto ebth = thread([&]{ eb.loopForever(); });
1197         eb.waitUntilRunning();
1198         eb.runInEventBaseThreadAndWait([&] {
1199           size_t x = 0;
1200           atom.compare_exchange_weak(
1201               x, 1, std::memory_order_release, std::memory_order_relaxed);
1202         });
1203         size_t x = 0;
1204         atom.compare_exchange_weak(
1205             x, 2, std::memory_order_release, std::memory_order_relaxed);
1206         eb.terminateLoopSoon();
1207         ebth.join();
1208     });
1209   }
1210   for (size_t i = 0; i < c; ++i) {
1211     auto& th = threads.at(i);
1212     th.join();
1213   }
1214   size_t sum = 0;
1215   for (auto& atom : atoms) sum += *atom;
1216   EXPECT_EQ(c, sum);
1217 }
1218
1219 ///////////////////////////////////////////////////////////////////////////
1220 // Tests for runInLoop()
1221 ///////////////////////////////////////////////////////////////////////////
1222
1223 class CountedLoopCallback : public EventBase::LoopCallback {
1224  public:
1225   CountedLoopCallback(EventBase* eventBase,
1226                       unsigned int count,
1227                       std::function<void()> action =
1228                         std::function<void()>())
1229     : eventBase_(eventBase)
1230     , count_(count)
1231     , action_(action) {}
1232
1233   virtual void runLoopCallback() noexcept {
1234     --count_;
1235     if (count_ > 0) {
1236       eventBase_->runInLoop(this);
1237     } else if (action_) {
1238       action_();
1239     }
1240   }
1241
1242   unsigned int getCount() const {
1243     return count_;
1244   }
1245
1246  private:
1247   EventBase* eventBase_;
1248   unsigned int count_;
1249   std::function<void()> action_;
1250 };
1251
1252 // Test that EventBase::loop() doesn't exit while there are
1253 // still LoopCallbacks remaining to be invoked.
1254 TEST(EventBaseTest, RepeatedRunInLoop) {
1255   EventBase eventBase;
1256
1257   CountedLoopCallback c(&eventBase, 10);
1258   eventBase.runInLoop(&c);
1259   // The callback shouldn't have run immediately
1260   ASSERT_EQ(c.getCount(), 10);
1261   eventBase.loop();
1262
1263   // loop() should loop until the CountedLoopCallback stops
1264   // re-installing itself.
1265   ASSERT_EQ(c.getCount(), 0);
1266 }
1267
1268 // Test that EventBase::loop() works as expected without time measurements.
1269 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1270   EventBase eventBase(false);
1271
1272   CountedLoopCallback c(&eventBase, 10);
1273   eventBase.runInLoop(&c);
1274   // The callback shouldn't have run immediately
1275   ASSERT_EQ(c.getCount(), 10);
1276   eventBase.loop();
1277
1278   // loop() should loop until the CountedLoopCallback stops
1279   // re-installing itself.
1280   ASSERT_EQ(c.getCount(), 0);
1281 }
1282
1283 // Test runInLoop() calls with terminateLoopSoon()
1284 TEST(EventBaseTest, RunInLoopStopLoop) {
1285   EventBase eventBase;
1286
1287   CountedLoopCallback c1(&eventBase, 20);
1288   CountedLoopCallback c2(&eventBase, 10,
1289                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1290
1291   eventBase.runInLoop(&c1);
1292   eventBase.runInLoop(&c2);
1293   ASSERT_EQ(c1.getCount(), 20);
1294   ASSERT_EQ(c2.getCount(), 10);
1295
1296   eventBase.loopForever();
1297
1298   // c2 should have stopped the loop after 10 iterations
1299   ASSERT_EQ(c2.getCount(), 0);
1300
1301   // We allow the EventBase to run the loop callbacks in whatever order it
1302   // chooses.  We'll accept c1's count being either 10 (if the loop terminated
1303   // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1304   // before c1 ran).
1305   //
1306   // (With the current code, c1 will always run 10 times, but we don't consider
1307   // this a hard API requirement.)
1308   ASSERT_GE(c1.getCount(), 10);
1309   ASSERT_LE(c1.getCount(), 11);
1310 }
1311
1312 // Test cancelling runInLoop() callbacks
1313 TEST(EventBaseTest, CancelRunInLoop) {
1314   EventBase eventBase;
1315
1316   CountedLoopCallback c1(&eventBase, 20);
1317   CountedLoopCallback c2(&eventBase, 20);
1318   CountedLoopCallback c3(&eventBase, 20);
1319
1320   std::function<void()> cancelC1Action =
1321     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1322   std::function<void()> cancelC2Action =
1323     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1324
1325   CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1326   CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1327
1328   // Install cancelC1 after c1
1329   eventBase.runInLoop(&c1);
1330   eventBase.runInLoop(&cancelC1);
1331
1332   // Install cancelC2 before c2
1333   eventBase.runInLoop(&cancelC2);
1334   eventBase.runInLoop(&c2);
1335
1336   // Install c3
1337   eventBase.runInLoop(&c3);
1338
1339   ASSERT_EQ(c1.getCount(), 20);
1340   ASSERT_EQ(c2.getCount(), 20);
1341   ASSERT_EQ(c3.getCount(), 20);
1342   ASSERT_EQ(cancelC1.getCount(), 10);
1343   ASSERT_EQ(cancelC2.getCount(), 10);
1344
1345   // Run the loop
1346   eventBase.loop();
1347
1348   // cancelC1 and cancelC3 should have both fired after 10 iterations and
1349   // stopped re-installing themselves
1350   ASSERT_EQ(cancelC1.getCount(), 0);
1351   ASSERT_EQ(cancelC2.getCount(), 0);
1352   // c3 should have continued on for the full 20 iterations
1353   ASSERT_EQ(c3.getCount(), 0);
1354
1355   // c1 and c2 should have both been cancelled on the 10th iteration.
1356   //
1357   // Callbacks are always run in the order they are installed,
1358   // so c1 should have fired 10 times, and been canceled after it ran on the
1359   // 10th iteration.  c2 should have only fired 9 times, because cancelC2 will
1360   // have run before it on the 10th iteration, and cancelled it before it
1361   // fired.
1362   ASSERT_EQ(c1.getCount(), 10);
1363   ASSERT_EQ(c2.getCount(), 11);
1364 }
1365
1366 class TerminateTestCallback : public EventBase::LoopCallback,
1367                               public EventHandler {
1368  public:
1369   TerminateTestCallback(EventBase* eventBase, int fd)
1370     : EventHandler(eventBase, fd),
1371       eventBase_(eventBase),
1372       loopInvocations_(0),
1373       maxLoopInvocations_(0),
1374       eventInvocations_(0),
1375       maxEventInvocations_(0) {}
1376
1377   void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1378     loopInvocations_ = 0;
1379     maxLoopInvocations_ = maxLoopInvocations;
1380     eventInvocations_ = 0;
1381     maxEventInvocations_ = maxEventInvocations;
1382
1383     cancelLoopCallback();
1384     unregisterHandler();
1385   }
1386
1387   virtual void handlerReady(uint16_t events) noexcept {
1388     // We didn't register with PERSIST, so we will have been automatically
1389     // unregistered already.
1390     ASSERT_FALSE(isHandlerRegistered());
1391
1392     ++eventInvocations_;
1393     if (eventInvocations_ >= maxEventInvocations_) {
1394       return;
1395     }
1396
1397     eventBase_->runInLoop(this);
1398   }
1399   virtual void runLoopCallback() noexcept {
1400     ++loopInvocations_;
1401     if (loopInvocations_ >= maxLoopInvocations_) {
1402       return;
1403     }
1404
1405     registerHandler(READ);
1406   }
1407
1408   uint32_t getLoopInvocations() const {
1409     return loopInvocations_;
1410   }
1411   uint32_t getEventInvocations() const {
1412     return eventInvocations_;
1413   }
1414
1415  private:
1416   EventBase* eventBase_;
1417   uint32_t loopInvocations_;
1418   uint32_t maxLoopInvocations_;
1419   uint32_t eventInvocations_;
1420   uint32_t maxEventInvocations_;
1421 };
1422
1423 /**
1424  * Test that EventBase::loop() correctly detects when there are no more events
1425  * left to run.
1426  *
1427  * This uses a single callback, which alternates registering itself as a loop
1428  * callback versus a EventHandler callback.  This exercises a regression where
1429  * EventBase::loop() incorrectly exited if there were no more fd handlers
1430  * registered, but a loop callback installed a new fd handler.
1431  */
1432 TEST(EventBaseTest, LoopTermination) {
1433   EventBase eventBase;
1434
1435   // Open a pipe and close the write end,
1436   // so the read endpoint will be readable
1437   int pipeFds[2];
1438   int rc = pipe(pipeFds);
1439   ASSERT_EQ(rc, 0);
1440   close(pipeFds[1]);
1441   TerminateTestCallback callback(&eventBase, pipeFds[0]);
1442
1443   // Test once where the callback will exit after a loop callback
1444   callback.reset(10, 100);
1445   eventBase.runInLoop(&callback);
1446   eventBase.loop();
1447   ASSERT_EQ(callback.getLoopInvocations(), 10);
1448   ASSERT_EQ(callback.getEventInvocations(), 9);
1449
1450   // Test once where the callback will exit after an fd event callback
1451   callback.reset(100, 7);
1452   eventBase.runInLoop(&callback);
1453   eventBase.loop();
1454   ASSERT_EQ(callback.getLoopInvocations(), 7);
1455   ASSERT_EQ(callback.getEventInvocations(), 7);
1456
1457   close(pipeFds[0]);
1458 }
1459
1460 ///////////////////////////////////////////////////////////////////////////
1461 // Tests for latency calculations
1462 ///////////////////////////////////////////////////////////////////////////
1463
1464 class IdleTimeTimeoutSeries : public AsyncTimeout {
1465
1466  public:
1467
1468   explicit IdleTimeTimeoutSeries(EventBase *base,
1469                                  std::deque<std::uint64_t>& timeout) :
1470     AsyncTimeout(base),
1471     timeouts_(0),
1472     timeout_(timeout) {
1473       scheduleTimeout(1);
1474     }
1475
1476   virtual ~IdleTimeTimeoutSeries() {}
1477
1478   void timeoutExpired() noexcept {
1479     ++timeouts_;
1480
1481     if(timeout_.empty()){
1482       cancelTimeout();
1483     } else {
1484       uint64_t sleepTime = timeout_.front();
1485       timeout_.pop_front();
1486       if (sleepTime) {
1487         usleep(sleepTime);
1488       }
1489       scheduleTimeout(1);
1490     }
1491   }
1492
1493   int getTimeouts() const {
1494     return timeouts_;
1495   }
1496
1497  private:
1498   int timeouts_;
1499   std::deque<uint64_t>& timeout_;
1500 };
1501
1502 /**
1503  * Verify that idle time is correctly accounted for when decaying our loop
1504  * time.
1505  *
1506  * This works by creating a high loop time (via usleep), expecting a latency
1507  * callback with known value, and then scheduling a timeout for later. This
1508  * later timeout is far enough in the future that the idle time should have
1509  * caused the loop time to decay.
1510  */
1511 TEST(EventBaseTest, IdleTime) {
1512   EventBase eventBase;
1513   eventBase.setLoadAvgMsec(1000);
1514   eventBase.resetLoadAvg(5900.0);
1515   std::deque<uint64_t> timeouts0(4, 8080);
1516   timeouts0.push_front(8000);
1517   timeouts0.push_back(14000);
1518   IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1519   std::deque<uint64_t> timeouts(20, 20);
1520   std::unique_ptr<IdleTimeTimeoutSeries> tos;
1521   int64_t testStart = duration_cast<microseconds>(
1522     std::chrono::steady_clock::now().time_since_epoch()).count();
1523   bool hostOverloaded = false;
1524
1525   int latencyCallbacks = 0;
1526   eventBase.setMaxLatency(6000, [&]() {
1527     ++latencyCallbacks;
1528
1529     switch (latencyCallbacks) {
1530     case 1:
1531       if (tos0.getTimeouts() < 6) {
1532         // This could only happen if the host this test is running
1533         // on is heavily loaded.
1534         int64_t maxLatencyReached = duration_cast<microseconds>(
1535             std::chrono::steady_clock::now().time_since_epoch()).count();
1536         ASSERT_LE(43800, maxLatencyReached - testStart);
1537         hostOverloaded = true;
1538         break;
1539       }
1540       ASSERT_EQ(6, tos0.getTimeouts());
1541       ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1542       ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1543       tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1544       break;
1545
1546     default:
1547       FAIL() << "Unexpected latency callback";
1548       break;
1549     }
1550   });
1551
1552   // Kick things off with an "immedite" timeout
1553   tos0.scheduleTimeout(1);
1554
1555   eventBase.loop();
1556
1557   if (hostOverloaded) {
1558     return;
1559   }
1560
1561   ASSERT_EQ(1, latencyCallbacks);
1562   ASSERT_EQ(7, tos0.getTimeouts());
1563   ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1564   ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1565   ASSERT_TRUE(!!tos);
1566   ASSERT_EQ(21, tos->getTimeouts());
1567 }
1568
1569 /**
1570  * Test that thisLoop functionality works with terminateLoopSoon
1571  */
1572 TEST(EventBaseTest, ThisLoop) {
1573   EventBase eb;
1574   bool runInLoop = false;
1575   bool runThisLoop = false;
1576
1577   eb.runInLoop([&](){
1578       eb.terminateLoopSoon();
1579       eb.runInLoop([&]() {
1580           runInLoop = true;
1581         });
1582       eb.runInLoop([&]() {
1583           runThisLoop = true;
1584         }, true);
1585     }, true);
1586   eb.loopForever();
1587
1588   // Should not work
1589   ASSERT_FALSE(runInLoop);
1590   // Should work with thisLoop
1591   ASSERT_TRUE(runThisLoop);
1592 }
1593
1594 TEST(EventBaseTest, EventBaseThreadLoop) {
1595   EventBase base;
1596   bool ran = false;
1597
1598   base.runInEventBaseThread([&](){
1599     ran = true;
1600   });
1601   base.loop();
1602
1603   ASSERT_EQ(true, ran);
1604 }
1605
1606 TEST(EventBaseTest, EventBaseThreadName) {
1607   EventBase base;
1608   base.setName("foo");
1609   base.loop();
1610
1611 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1612   char name[16];
1613   pthread_getname_np(pthread_self(), name, 16);
1614   ASSERT_EQ(0, strcmp("foo", name));
1615 #endif
1616 }
1617
1618 TEST(EventBaseTest, RunBeforeLoop) {
1619   EventBase base;
1620   CountedLoopCallback cb(&base, 1, [&](){
1621     base.terminateLoopSoon();
1622   });
1623   base.runBeforeLoop(&cb);
1624   base.loopForever();
1625   ASSERT_EQ(cb.getCount(), 0);
1626 }
1627
1628 TEST(EventBaseTest, RunBeforeLoopWait) {
1629   EventBase base;
1630   CountedLoopCallback cb(&base, 1);
1631   base.tryRunAfterDelay([&](){
1632       base.terminateLoopSoon();
1633     }, 500);
1634   base.runBeforeLoop(&cb);
1635   base.loopForever();
1636
1637   // Check that we only ran once, and did not loop multiple times.
1638   ASSERT_EQ(cb.getCount(), 0);
1639 }
1640
1641 class PipeHandler : public EventHandler {
1642 public:
1643   PipeHandler(EventBase* eventBase, int fd)
1644     : EventHandler(eventBase, fd) {}
1645
1646   void handlerReady(uint16_t events) noexcept {
1647     abort();
1648   }
1649 };
1650
1651 TEST(EventBaseTest, StopBeforeLoop) {
1652   EventBase evb;
1653
1654   // Give the evb something to do.
1655   int p[2];
1656   ASSERT_EQ(0, pipe(p));
1657   PipeHandler handler(&evb, p[0]);
1658   handler.registerHandler(EventHandler::READ);
1659
1660   // It's definitely not running yet
1661   evb.terminateLoopSoon();
1662
1663   // let it run, it should exit quickly.
1664   std::thread t([&] { evb.loop(); });
1665   t.join();
1666
1667   handler.unregisterHandler();
1668   close(p[0]);
1669   close(p[1]);
1670
1671   SUCCEED();
1672 }
1673
1674 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1675   bool ran = false;
1676
1677   {
1678     EventBase base;
1679     base.runInEventBaseThread([&](){
1680       ran = true;
1681     });
1682   }
1683
1684   ASSERT_TRUE(ran);
1685 }