runImmediatly fix
[folly.git] / folly / io / async / test / EventBaseTest.cpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 #include <folly/Memory.h>
20
21 #include <folly/io/async/AsyncTimeout.h>
22 #include <folly/io/async/EventBase.h>
23 #include <folly/io/async/EventHandler.h>
24 #include <folly/io/async/test/SocketPair.h>
25 #include <folly/io/async/test/Util.h>
26
27 #include <atomic>
28 #include <iostream>
29 #include <unistd.h>
30 #include <memory>
31 #include <thread>
32
33 using std::atomic;
34 using std::deque;
35 using std::pair;
36 using std::vector;
37 using std::unique_ptr;
38 using std::thread;
39 using std::make_pair;
40 using std::cerr;
41 using std::endl;
42 using std::chrono::milliseconds;
43 using std::chrono::microseconds;
44 using std::chrono::duration_cast;
45
46 using namespace folly;
47
48 ///////////////////////////////////////////////////////////////////////////
49 // Tests for read and write events
50 ///////////////////////////////////////////////////////////////////////////
51
52 enum { BUF_SIZE = 4096 };
53
54 ssize_t writeToFD(int fd, size_t length) {
55   // write an arbitrary amount of data to the fd
56   char buf[length];
57   memset(buf, 'a', sizeof(buf));
58   ssize_t rc = write(fd, buf, sizeof(buf));
59   CHECK_EQ(rc, length);
60   return rc;
61 }
62
63 size_t writeUntilFull(int fd) {
64   // Write to the fd until EAGAIN is returned
65   size_t bytesWritten = 0;
66   char buf[BUF_SIZE];
67   memset(buf, 'a', sizeof(buf));
68   while (true) {
69     ssize_t rc = write(fd, buf, sizeof(buf));
70     if (rc < 0) {
71       CHECK_EQ(errno, EAGAIN);
72       break;
73     } else {
74       bytesWritten += rc;
75     }
76   }
77   return bytesWritten;
78 }
79
80 ssize_t readFromFD(int fd, size_t length) {
81   // write an arbitrary amount of data to the fd
82   char buf[length];
83   return read(fd, buf, sizeof(buf));
84 }
85
86 size_t readUntilEmpty(int fd) {
87   // Read from the fd until EAGAIN is returned
88   char buf[BUF_SIZE];
89   size_t bytesRead = 0;
90   while (true) {
91     int rc = read(fd, buf, sizeof(buf));
92     if (rc == 0) {
93       CHECK(false) << "unexpected EOF";
94     } else if (rc < 0) {
95       CHECK_EQ(errno, EAGAIN);
96       break;
97     } else {
98       bytesRead += rc;
99     }
100   }
101   return bytesRead;
102 }
103
104 void checkReadUntilEmpty(int fd, size_t expectedLength) {
105   ASSERT_EQ(readUntilEmpty(fd), expectedLength);
106 }
107
108 struct ScheduledEvent {
109   int milliseconds;
110   uint16_t events;
111   size_t length;
112   ssize_t result;
113
114   void perform(int fd) {
115     if (events & EventHandler::READ) {
116       if (length == 0) {
117         result = readUntilEmpty(fd);
118       } else {
119         result = readFromFD(fd, length);
120       }
121     }
122     if (events & EventHandler::WRITE) {
123       if (length == 0) {
124         result = writeUntilFull(fd);
125       } else {
126         result = writeToFD(fd, length);
127       }
128     }
129   }
130 };
131
132 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
133   for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
134     eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
135                              ev->milliseconds);
136   }
137 }
138
139 class TestHandler : public EventHandler {
140  public:
141   TestHandler(EventBase* eventBase, int fd)
142     : EventHandler(eventBase, fd), fd_(fd) {}
143
144   virtual void handlerReady(uint16_t events) noexcept {
145     ssize_t bytesRead = 0;
146     ssize_t bytesWritten = 0;
147     if (events & READ) {
148       // Read all available data, so EventBase will stop calling us
149       // until new data becomes available
150       bytesRead = readUntilEmpty(fd_);
151     }
152     if (events & WRITE) {
153       // Write until the pipe buffer is full, so EventBase will stop calling
154       // us until the other end has read some data
155       bytesWritten = writeUntilFull(fd_);
156     }
157
158     log.push_back(EventRecord(events, bytesRead, bytesWritten));
159   }
160
161   struct EventRecord {
162     EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
163       : events(events)
164       , timestamp()
165       , bytesRead(bytesRead)
166       , bytesWritten(bytesWritten) {}
167
168     uint16_t events;
169     TimePoint timestamp;
170     ssize_t bytesRead;
171     ssize_t bytesWritten;
172   };
173
174   deque<EventRecord> log;
175
176  private:
177   int fd_;
178 };
179
180 /**
181  * Test a READ event
182  */
183 TEST(EventBaseTest, ReadEvent) {
184   EventBase eb;
185   SocketPair sp;
186
187   // Register for read events
188   TestHandler handler(&eb, sp[0]);
189   handler.registerHandler(EventHandler::READ);
190
191   // Register timeouts to perform two write events
192   ScheduledEvent events[] = {
193     { 10, EventHandler::WRITE, 2345 },
194     { 160, EventHandler::WRITE, 99 },
195     { 0, 0, 0 },
196   };
197   scheduleEvents(&eb, sp[1], events);
198
199   // Loop
200   TimePoint start;
201   eb.loop();
202   TimePoint end;
203
204   // Since we didn't use the EventHandler::PERSIST flag, the handler should
205   // have received the first read, then unregistered itself.  Check that only
206   // the first chunk of data was received.
207   ASSERT_EQ(handler.log.size(), 1);
208   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
209   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
210                   milliseconds(events[0].milliseconds), milliseconds(90));
211   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
212   ASSERT_EQ(handler.log[0].bytesWritten, 0);
213   T_CHECK_TIMEOUT(start, end,
214                   milliseconds(events[1].milliseconds), milliseconds(30));
215
216   // Make sure the second chunk of data is still waiting to be read.
217   size_t bytesRemaining = readUntilEmpty(sp[0]);
218   ASSERT_EQ(bytesRemaining, events[1].length);
219 }
220
221 /**
222  * Test (READ | PERSIST)
223  */
224 TEST(EventBaseTest, ReadPersist) {
225   EventBase eb;
226   SocketPair sp;
227
228   // Register for read events
229   TestHandler handler(&eb, sp[0]);
230   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
231
232   // Register several timeouts to perform writes
233   ScheduledEvent events[] = {
234     { 10,  EventHandler::WRITE, 1024 },
235     { 20,  EventHandler::WRITE, 2211 },
236     { 30,  EventHandler::WRITE, 4096 },
237     { 100, EventHandler::WRITE, 100 },
238     { 0, 0 },
239   };
240   scheduleEvents(&eb, sp[1], events);
241
242   // Schedule a timeout to unregister the handler after the third write
243   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
244
245   // Loop
246   TimePoint start;
247   eb.loop();
248   TimePoint end;
249
250   // The handler should have received the first 3 events,
251   // then been unregistered after that.
252   ASSERT_EQ(handler.log.size(), 3);
253   for (int n = 0; n < 3; ++n) {
254     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
255     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
256                     milliseconds(events[n].milliseconds));
257     ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
258     ASSERT_EQ(handler.log[n].bytesWritten, 0);
259   }
260   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
261
262   // Make sure the data from the last write is still waiting to be read
263   size_t bytesRemaining = readUntilEmpty(sp[0]);
264   ASSERT_EQ(bytesRemaining, events[3].length);
265 }
266
267 /**
268  * Test registering for READ when the socket is immediately readable
269  */
270 TEST(EventBaseTest, ReadImmediate) {
271   EventBase eb;
272   SocketPair sp;
273
274   // Write some data to the socket so the other end will
275   // be immediately readable
276   size_t dataLength = 1234;
277   writeToFD(sp[1], dataLength);
278
279   // Register for read events
280   TestHandler handler(&eb, sp[0]);
281   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
282
283   // Register a timeout to perform another write
284   ScheduledEvent events[] = {
285     { 10, EventHandler::WRITE, 2345 },
286     { 0, 0, 0 },
287   };
288   scheduleEvents(&eb, sp[1], events);
289
290   // Schedule a timeout to unregister the handler
291   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
292
293   // Loop
294   TimePoint start;
295   eb.loop();
296   TimePoint end;
297
298   ASSERT_EQ(handler.log.size(), 2);
299
300   // There should have been 1 event for immediate readability
301   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
302   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
303   ASSERT_EQ(handler.log[0].bytesRead, dataLength);
304   ASSERT_EQ(handler.log[0].bytesWritten, 0);
305
306   // There should be another event after the timeout wrote more data
307   ASSERT_EQ(handler.log[1].events, EventHandler::READ);
308   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
309                   milliseconds(events[0].milliseconds));
310   ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
311   ASSERT_EQ(handler.log[1].bytesWritten, 0);
312
313   T_CHECK_TIMEOUT(start, end, milliseconds(20));
314 }
315
316 /**
317  * Test a WRITE event
318  */
319 TEST(EventBaseTest, WriteEvent) {
320   EventBase eb;
321   SocketPair sp;
322
323   // Fill up the write buffer before starting
324   size_t initialBytesWritten = writeUntilFull(sp[0]);
325
326   // Register for write events
327   TestHandler handler(&eb, sp[0]);
328   handler.registerHandler(EventHandler::WRITE);
329
330   // Register timeouts to perform two reads
331   ScheduledEvent events[] = {
332     { 10, EventHandler::READ, 0 },
333     { 60, EventHandler::READ, 0 },
334     { 0, 0, 0 },
335   };
336   scheduleEvents(&eb, sp[1], events);
337
338   // Loop
339   TimePoint start;
340   eb.loop();
341   TimePoint end;
342
343   // Since we didn't use the EventHandler::PERSIST flag, the handler should
344   // have only been able to write once, then unregistered itself.
345   ASSERT_EQ(handler.log.size(), 1);
346   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
347   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
348                   milliseconds(events[0].milliseconds));
349   ASSERT_EQ(handler.log[0].bytesRead, 0);
350   ASSERT_GT(handler.log[0].bytesWritten, 0);
351   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
352
353   ASSERT_EQ(events[0].result, initialBytesWritten);
354   ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
355 }
356
357 /**
358  * Test (WRITE | PERSIST)
359  */
360 TEST(EventBaseTest, WritePersist) {
361   EventBase eb;
362   SocketPair sp;
363
364   // Fill up the write buffer before starting
365   size_t initialBytesWritten = writeUntilFull(sp[0]);
366
367   // Register for write events
368   TestHandler handler(&eb, sp[0]);
369   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
370
371   // Register several timeouts to read from the socket at several intervals
372   ScheduledEvent events[] = {
373     { 10,  EventHandler::READ, 0 },
374     { 40,  EventHandler::READ, 0 },
375     { 70,  EventHandler::READ, 0 },
376     { 100, EventHandler::READ, 0 },
377     { 0, 0 },
378   };
379   scheduleEvents(&eb, sp[1], events);
380
381   // Schedule a timeout to unregister the handler after the third read
382   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
383
384   // Loop
385   TimePoint start;
386   eb.loop();
387   TimePoint end;
388
389   // The handler should have received the first 3 events,
390   // then been unregistered after that.
391   ASSERT_EQ(handler.log.size(), 3);
392   ASSERT_EQ(events[0].result, initialBytesWritten);
393   for (int n = 0; n < 3; ++n) {
394     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
395     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
396                     milliseconds(events[n].milliseconds));
397     ASSERT_EQ(handler.log[n].bytesRead, 0);
398     ASSERT_GT(handler.log[n].bytesWritten, 0);
399     ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
400   }
401   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
402 }
403
404 /**
405  * Test registering for WRITE when the socket is immediately writable
406  */
407 TEST(EventBaseTest, WriteImmediate) {
408   EventBase eb;
409   SocketPair sp;
410
411   // Register for write events
412   TestHandler handler(&eb, sp[0]);
413   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
414
415   // Register a timeout to perform a read
416   ScheduledEvent events[] = {
417     { 10, EventHandler::READ, 0 },
418     { 0, 0, 0 },
419   };
420   scheduleEvents(&eb, sp[1], events);
421
422   // Schedule a timeout to unregister the handler
423   int64_t unregisterTimeout = 40;
424   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
425                    unregisterTimeout);
426
427   // Loop
428   TimePoint start;
429   eb.loop();
430   TimePoint end;
431
432   ASSERT_EQ(handler.log.size(), 2);
433
434   // Since the socket buffer was initially empty,
435   // there should have been 1 event for immediate writability
436   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
437   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
438   ASSERT_EQ(handler.log[0].bytesRead, 0);
439   ASSERT_GT(handler.log[0].bytesWritten, 0);
440
441   // There should be another event after the timeout wrote more data
442   ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
443   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
444                   milliseconds(events[0].milliseconds));
445   ASSERT_EQ(handler.log[1].bytesRead, 0);
446   ASSERT_GT(handler.log[1].bytesWritten, 0);
447
448   T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
449 }
450
451 /**
452  * Test (READ | WRITE) when the socket becomes readable first
453  */
454 TEST(EventBaseTest, ReadWrite) {
455   EventBase eb;
456   SocketPair sp;
457
458   // Fill up the write buffer before starting
459   size_t sock0WriteLength = writeUntilFull(sp[0]);
460
461   // Register for read and write events
462   TestHandler handler(&eb, sp[0]);
463   handler.registerHandler(EventHandler::READ_WRITE);
464
465   // Register timeouts to perform a write then a read.
466   ScheduledEvent events[] = {
467     { 10, EventHandler::WRITE, 2345 },
468     { 40, EventHandler::READ, 0 },
469     { 0, 0, 0 },
470   };
471   scheduleEvents(&eb, sp[1], events);
472
473   // Loop
474   TimePoint start;
475   eb.loop();
476   TimePoint end;
477
478   // Since we didn't use the EventHandler::PERSIST flag, the handler should
479   // have only noticed readability, then unregistered itself.  Check that only
480   // one event was logged.
481   ASSERT_EQ(handler.log.size(), 1);
482   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
483   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
484                   milliseconds(events[0].milliseconds));
485   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
486   ASSERT_EQ(handler.log[0].bytesWritten, 0);
487   ASSERT_EQ(events[1].result, sock0WriteLength);
488   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
489 }
490
491 /**
492  * Test (READ | WRITE) when the socket becomes writable first
493  */
494 TEST(EventBaseTest, WriteRead) {
495   EventBase eb;
496   SocketPair sp;
497
498   // Fill up the write buffer before starting
499   size_t sock0WriteLength = writeUntilFull(sp[0]);
500
501   // Register for read and write events
502   TestHandler handler(&eb, sp[0]);
503   handler.registerHandler(EventHandler::READ_WRITE);
504
505   // Register timeouts to perform a read then a write.
506   size_t sock1WriteLength = 2345;
507   ScheduledEvent events[] = {
508     { 10, EventHandler::READ, 0 },
509     { 40, EventHandler::WRITE, sock1WriteLength },
510     { 0, 0, 0 },
511   };
512   scheduleEvents(&eb, sp[1], events);
513
514   // Loop
515   TimePoint start;
516   eb.loop();
517   TimePoint end;
518
519   // Since we didn't use the EventHandler::PERSIST flag, the handler should
520   // have only noticed writability, then unregistered itself.  Check that only
521   // one event was logged.
522   ASSERT_EQ(handler.log.size(), 1);
523   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
524   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
525                   milliseconds(events[0].milliseconds));
526   ASSERT_EQ(handler.log[0].bytesRead, 0);
527   ASSERT_GT(handler.log[0].bytesWritten, 0);
528   ASSERT_EQ(events[0].result, sock0WriteLength);
529   ASSERT_EQ(events[1].result, sock1WriteLength);
530   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
531
532   // Make sure the written data is still waiting to be read.
533   size_t bytesRemaining = readUntilEmpty(sp[0]);
534   ASSERT_EQ(bytesRemaining, events[1].length);
535 }
536
537 /**
538  * Test (READ | WRITE) when the socket becomes readable and writable
539  * at the same time.
540  */
541 TEST(EventBaseTest, ReadWriteSimultaneous) {
542   EventBase eb;
543   SocketPair sp;
544
545   // Fill up the write buffer before starting
546   size_t sock0WriteLength = writeUntilFull(sp[0]);
547
548   // Register for read and write events
549   TestHandler handler(&eb, sp[0]);
550   handler.registerHandler(EventHandler::READ_WRITE);
551
552   // Register a timeout to perform a read and write together
553   ScheduledEvent events[] = {
554     { 10, EventHandler::READ | EventHandler::WRITE, 0 },
555     { 0, 0, 0 },
556   };
557   scheduleEvents(&eb, sp[1], events);
558
559   // Loop
560   TimePoint start;
561   eb.loop();
562   TimePoint end;
563
564   // It's not strictly required that the EventBase register us about both
565   // events in the same call.  So, it's possible that if the EventBase
566   // implementation changes this test could start failing, and it wouldn't be
567   // considered breaking the API.  However for now it's nice to exercise this
568   // code path.
569   ASSERT_EQ(handler.log.size(), 1);
570   ASSERT_EQ(handler.log[0].events,
571                     EventHandler::READ | EventHandler::WRITE);
572   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
573                   milliseconds(events[0].milliseconds));
574   ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
575   ASSERT_GT(handler.log[0].bytesWritten, 0);
576   T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
577 }
578
579 /**
580  * Test (READ | WRITE | PERSIST)
581  */
582 TEST(EventBaseTest, ReadWritePersist) {
583   EventBase eb;
584   SocketPair sp;
585
586   // Register for read and write events
587   TestHandler handler(&eb, sp[0]);
588   handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
589                           EventHandler::PERSIST);
590
591   // Register timeouts to perform several reads and writes
592   ScheduledEvent events[] = {
593     { 10, EventHandler::WRITE, 2345 },
594     { 20, EventHandler::READ, 0 },
595     { 35, EventHandler::WRITE, 200 },
596     { 45, EventHandler::WRITE, 15 },
597     { 55, EventHandler::READ, 0 },
598     { 120, EventHandler::WRITE, 2345 },
599     { 0, 0, 0 },
600   };
601   scheduleEvents(&eb, sp[1], events);
602
603   // Schedule a timeout to unregister the handler
604   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
605
606   // Loop
607   TimePoint start;
608   eb.loop();
609   TimePoint end;
610
611   ASSERT_EQ(handler.log.size(), 6);
612
613   // Since we didn't fill up the write buffer immediately, there should
614   // be an immediate event for writability.
615   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
616   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
617   ASSERT_EQ(handler.log[0].bytesRead, 0);
618   ASSERT_GT(handler.log[0].bytesWritten, 0);
619
620   // Events 1 through 5 should correspond to the scheduled events
621   for (int n = 1; n < 6; ++n) {
622     ScheduledEvent* event = &events[n - 1];
623     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
624                     milliseconds(event->milliseconds));
625     if (event->events == EventHandler::READ) {
626       ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
627       ASSERT_EQ(handler.log[n].bytesRead, 0);
628       ASSERT_GT(handler.log[n].bytesWritten, 0);
629     } else {
630       ASSERT_EQ(handler.log[n].events, EventHandler::READ);
631       ASSERT_EQ(handler.log[n].bytesRead, event->length);
632       ASSERT_EQ(handler.log[n].bytesWritten, 0);
633     }
634   }
635
636   // The timeout should have unregistered the handler before the last write.
637   // Make sure that data is still waiting to be read
638   size_t bytesRemaining = readUntilEmpty(sp[0]);
639   ASSERT_EQ(bytesRemaining, events[5].length);
640 }
641
642
643 class PartialReadHandler : public TestHandler {
644  public:
645   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
646     : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
647
648   virtual void handlerReady(uint16_t events) noexcept {
649     assert(events == EventHandler::READ);
650     ssize_t bytesRead = readFromFD(fd_, readLength_);
651     log.push_back(EventRecord(events, bytesRead, 0));
652   }
653
654  private:
655   int fd_;
656   size_t readLength_;
657 };
658
659 /**
660  * Test reading only part of the available data when a read event is fired.
661  * When PERSIST is used, make sure the handler gets notified again the next
662  * time around the loop.
663  */
664 TEST(EventBaseTest, ReadPartial) {
665   EventBase eb;
666   SocketPair sp;
667
668   // Register for read events
669   size_t readLength = 100;
670   PartialReadHandler handler(&eb, sp[0], readLength);
671   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
672
673   // Register a timeout to perform a single write,
674   // with more data than PartialReadHandler will read at once
675   ScheduledEvent events[] = {
676     { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) },
677     { 0, 0, 0 },
678   };
679   scheduleEvents(&eb, sp[1], events);
680
681   // Schedule a timeout to unregister the handler
682   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
683
684   // Loop
685   TimePoint start;
686   eb.loop();
687   TimePoint end;
688
689   ASSERT_EQ(handler.log.size(), 4);
690
691   // The first 3 invocations should read readLength bytes each
692   for (int n = 0; n < 3; ++n) {
693     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
694     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
695                     milliseconds(events[0].milliseconds));
696     ASSERT_EQ(handler.log[n].bytesRead, readLength);
697     ASSERT_EQ(handler.log[n].bytesWritten, 0);
698   }
699   // The last read only has readLength/2 bytes
700   ASSERT_EQ(handler.log[3].events, EventHandler::READ);
701   T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
702                   milliseconds(events[0].milliseconds));
703   ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
704   ASSERT_EQ(handler.log[3].bytesWritten, 0);
705 }
706
707
708 class PartialWriteHandler : public TestHandler {
709  public:
710   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
711     : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
712
713   virtual void handlerReady(uint16_t events) noexcept {
714     assert(events == EventHandler::WRITE);
715     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
716     log.push_back(EventRecord(events, 0, bytesWritten));
717   }
718
719  private:
720   int fd_;
721   size_t writeLength_;
722 };
723
724 /**
725  * Test writing without completely filling up the write buffer when the fd
726  * becomes writable.  When PERSIST is used, make sure the handler gets
727  * notified again the next time around the loop.
728  */
729 TEST(EventBaseTest, WritePartial) {
730   EventBase eb;
731   SocketPair sp;
732
733   // Fill up the write buffer before starting
734   size_t initialBytesWritten = writeUntilFull(sp[0]);
735
736   // Register for write events
737   size_t writeLength = 100;
738   PartialWriteHandler handler(&eb, sp[0], writeLength);
739   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
740
741   // Register a timeout to read, so that more data can be written
742   ScheduledEvent events[] = {
743     { 10, EventHandler::READ, 0 },
744     { 0, 0, 0 },
745   };
746   scheduleEvents(&eb, sp[1], events);
747
748   // Schedule a timeout to unregister the handler
749   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
750
751   // Loop
752   TimePoint start;
753   eb.loop();
754   TimePoint end;
755
756   // Depending on how big the socket buffer is, there will be multiple writes
757   // Only check the first 5
758   int numChecked = 5;
759   ASSERT_GE(handler.log.size(), numChecked);
760   ASSERT_EQ(events[0].result, initialBytesWritten);
761
762   // The first 3 invocations should read writeLength bytes each
763   for (int n = 0; n < numChecked; ++n) {
764     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
765     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
766                     milliseconds(events[0].milliseconds));
767     ASSERT_EQ(handler.log[n].bytesRead, 0);
768     ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
769   }
770 }
771
772
773 /**
774  * Test destroying a registered EventHandler
775  */
776 TEST(EventBaseTest, DestroyHandler) {
777   class DestroyHandler : public AsyncTimeout {
778    public:
779     DestroyHandler(EventBase* eb, EventHandler* h)
780       : AsyncTimeout(eb)
781       , handler_(h) {}
782
783     virtual void timeoutExpired() noexcept {
784       delete handler_;
785     }
786
787    private:
788     EventHandler* handler_;
789   };
790
791   EventBase eb;
792   SocketPair sp;
793
794   // Fill up the write buffer before starting
795   size_t initialBytesWritten = writeUntilFull(sp[0]);
796
797   // Register for write events
798   TestHandler* handler = new TestHandler(&eb, sp[0]);
799   handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
800
801   // After 10ms, read some data, so that the handler
802   // will be notified that it can write.
803   eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
804                    10);
805
806   // Start a timer to destroy the handler after 25ms
807   // This mainly just makes sure the code doesn't break or assert
808   DestroyHandler dh(&eb, handler);
809   dh.scheduleTimeout(25);
810
811   TimePoint start;
812   eb.loop();
813   TimePoint end;
814
815   // Make sure the EventHandler was uninstalled properly when it was
816   // destroyed, and the EventBase loop exited
817   T_CHECK_TIMEOUT(start, end, milliseconds(25));
818
819   // Make sure that the handler wrote data to the socket
820   // before it was destroyed
821   size_t bytesRemaining = readUntilEmpty(sp[1]);
822   ASSERT_GT(bytesRemaining, 0);
823 }
824
825
826 ///////////////////////////////////////////////////////////////////////////
827 // Tests for timeout events
828 ///////////////////////////////////////////////////////////////////////////
829
830 TEST(EventBaseTest, RunAfterDelay) {
831   EventBase eb;
832
833   TimePoint timestamp1(false);
834   TimePoint timestamp2(false);
835   TimePoint timestamp3(false);
836   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
837   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
838   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
839
840   TimePoint start;
841   eb.loop();
842   TimePoint end;
843
844   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
845   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
846   T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
847   T_CHECK_TIMEOUT(start, end, milliseconds(40));
848 }
849
850 /**
851  * Test the behavior of tryRunAfterDelay() when some timeouts are
852  * still scheduled when the EventBase is destroyed.
853  */
854 TEST(EventBaseTest, RunAfterDelayDestruction) {
855   TimePoint timestamp1(false);
856   TimePoint timestamp2(false);
857   TimePoint timestamp3(false);
858   TimePoint timestamp4(false);
859   TimePoint start(false);
860   TimePoint end(false);
861
862   {
863     EventBase eb;
864
865     // Run two normal timeouts
866     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
867     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
868
869     // Schedule a timeout to stop the event loop after 40ms
870     eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
871
872     // Schedule 2 timeouts that would fire after the event loop stops
873     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
874     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
875
876     start.reset();
877     eb.loop();
878     end.reset();
879   }
880
881   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
882   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
883   T_CHECK_TIMEOUT(start, end, milliseconds(40));
884
885   ASSERT_TRUE(timestamp3.isUnset());
886   ASSERT_TRUE(timestamp4.isUnset());
887
888   // Ideally this test should be run under valgrind to ensure that no
889   // memory is leaked.
890 }
891
892 class TestTimeout : public AsyncTimeout {
893  public:
894   explicit TestTimeout(EventBase* eventBase)
895     : AsyncTimeout(eventBase)
896     , timestamp(false) {}
897
898   virtual void timeoutExpired() noexcept {
899     timestamp.reset();
900   }
901
902   TimePoint timestamp;
903 };
904
905 TEST(EventBaseTest, BasicTimeouts) {
906   EventBase eb;
907
908   TestTimeout t1(&eb);
909   TestTimeout t2(&eb);
910   TestTimeout t3(&eb);
911   t1.scheduleTimeout(10);
912   t2.scheduleTimeout(20);
913   t3.scheduleTimeout(40);
914
915   TimePoint start;
916   eb.loop();
917   TimePoint end;
918
919   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
920   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
921   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
922   T_CHECK_TIMEOUT(start, end, milliseconds(40));
923 }
924
925 class ReschedulingTimeout : public AsyncTimeout {
926  public:
927   ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
928     : AsyncTimeout(evb)
929     , timeouts_(timeouts)
930     , iterator_(timeouts_.begin()) {}
931
932   void start() {
933     reschedule();
934   }
935
936   virtual void timeoutExpired() noexcept {
937     timestamps.push_back(TimePoint());
938     reschedule();
939   }
940
941   void reschedule() {
942     if (iterator_ != timeouts_.end()) {
943       uint32_t timeout = *iterator_;
944       ++iterator_;
945       scheduleTimeout(timeout);
946     }
947   }
948
949   vector<TimePoint> timestamps;
950
951  private:
952   vector<uint32_t> timeouts_;
953   vector<uint32_t>::const_iterator iterator_;
954 };
955
956 /**
957  * Test rescheduling the same timeout multiple times
958  */
959 TEST(EventBaseTest, ReuseTimeout) {
960   EventBase eb;
961
962   vector<uint32_t> timeouts;
963   timeouts.push_back(10);
964   timeouts.push_back(30);
965   timeouts.push_back(15);
966
967   ReschedulingTimeout t(&eb, timeouts);
968   t.start();
969
970   TimePoint start;
971   eb.loop();
972   TimePoint end;
973
974   // Use a higher tolerance than usual.  We're waiting on 3 timeouts
975   // consecutively.  In general, each timeout may go over by a few
976   // milliseconds, and we're tripling this error by witing on 3 timeouts.
977   milliseconds tolerance{6};
978
979   ASSERT_EQ(timeouts.size(), t.timestamps.size());
980   uint32_t total = 0;
981   for (size_t n = 0; n < timeouts.size(); ++n) {
982     total += timeouts[n];
983     T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
984   }
985   T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
986 }
987
988 /**
989  * Test rescheduling a timeout before it has fired
990  */
991 TEST(EventBaseTest, RescheduleTimeout) {
992   EventBase eb;
993
994   TestTimeout t1(&eb);
995   TestTimeout t2(&eb);
996   TestTimeout t3(&eb);
997
998   t1.scheduleTimeout(15);
999   t2.scheduleTimeout(30);
1000   t3.scheduleTimeout(30);
1001
1002   auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1003       &AsyncTimeout::scheduleTimeout);
1004
1005   // after 10ms, reschedule t2 to run sooner than originally scheduled
1006   eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1007   // after 10ms, reschedule t3 to run later than originally scheduled
1008   eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1009
1010   TimePoint start;
1011   eb.loop();
1012   TimePoint end;
1013
1014   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1015   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1016   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1017   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1018 }
1019
1020 /**
1021  * Test cancelling a timeout
1022  */
1023 TEST(EventBaseTest, CancelTimeout) {
1024   EventBase eb;
1025
1026   vector<uint32_t> timeouts;
1027   timeouts.push_back(10);
1028   timeouts.push_back(30);
1029   timeouts.push_back(25);
1030
1031   ReschedulingTimeout t(&eb, timeouts);
1032   t.start();
1033   eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1034
1035   TimePoint start;
1036   eb.loop();
1037   TimePoint end;
1038
1039   ASSERT_EQ(t.timestamps.size(), 2);
1040   T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1041   T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1042   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1043 }
1044
1045 /**
1046  * Test destroying a scheduled timeout object
1047  */
1048 TEST(EventBaseTest, DestroyTimeout) {
1049   class DestroyTimeout : public AsyncTimeout {
1050    public:
1051     DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1052       : AsyncTimeout(eb)
1053       , timeout_(t) {}
1054
1055     virtual void timeoutExpired() noexcept {
1056       delete timeout_;
1057     }
1058
1059    private:
1060     AsyncTimeout* timeout_;
1061   };
1062
1063   EventBase eb;
1064
1065   TestTimeout* t1 = new TestTimeout(&eb);
1066   t1->scheduleTimeout(30);
1067
1068   DestroyTimeout dt(&eb, t1);
1069   dt.scheduleTimeout(10);
1070
1071   TimePoint start;
1072   eb.loop();
1073   TimePoint end;
1074
1075   T_CHECK_TIMEOUT(start, end, milliseconds(10));
1076 }
1077
1078
1079 ///////////////////////////////////////////////////////////////////////////
1080 // Test for runInThreadTestFunc()
1081 ///////////////////////////////////////////////////////////////////////////
1082
1083 struct RunInThreadData {
1084   RunInThreadData(int numThreads, int opsPerThread)
1085     : opsPerThread(opsPerThread)
1086     , opsToGo(numThreads*opsPerThread) {}
1087
1088   EventBase evb;
1089   deque< pair<int, int> > values;
1090
1091   int opsPerThread;
1092   int opsToGo;
1093 };
1094
1095 struct RunInThreadArg {
1096   RunInThreadArg(RunInThreadData* data,
1097                  int threadId,
1098                  int value)
1099     : data(data)
1100     , thread(threadId)
1101     , value(value) {}
1102
1103   RunInThreadData* data;
1104   int thread;
1105   int value;
1106 };
1107
1108 void runInThreadTestFunc(RunInThreadArg* arg) {
1109   arg->data->values.push_back(make_pair(arg->thread, arg->value));
1110   RunInThreadData* data = arg->data;
1111   delete arg;
1112
1113   if(--data->opsToGo == 0) {
1114     // Break out of the event base loop if we are the last thread running
1115     data->evb.terminateLoopSoon();
1116   }
1117 }
1118
1119 TEST(EventBaseTest, RunInThread) {
1120   uint32_t numThreads = 50;
1121   uint32_t opsPerThread = 100;
1122   RunInThreadData data(numThreads, opsPerThread);
1123
1124   deque<std::thread> threads;
1125   for (uint32_t i = 0; i < numThreads; ++i) {
1126     threads.emplace_back([i, &data] {
1127         for (int n = 0; n < data.opsPerThread; ++n) {
1128           RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1129           data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1130           usleep(10);
1131         }
1132       });
1133   }
1134
1135   // Add a timeout event to run after 3 seconds.
1136   // Otherwise loop() will return immediately since there are no events to run.
1137   // Once the last thread exits, it will stop the loop().  However, this
1138   // timeout also stops the loop in case there is a bug performing the normal
1139   // stop.
1140   data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1141                          3000);
1142
1143   TimePoint start;
1144   data.evb.loop();
1145   TimePoint end;
1146
1147   // Verify that the loop exited because all threads finished and requested it
1148   // to stop.  This should happen much sooner than the 3 second timeout.
1149   // Assert that it happens in under a second.  (This is still tons of extra
1150   // padding.)
1151
1152   auto timeTaken = std::chrono::duration_cast<milliseconds>(
1153     end.getTime() - start.getTime());
1154   ASSERT_LT(timeTaken.count(), 1000);
1155   VLOG(11) << "Time taken: " << timeTaken.count();
1156
1157   // Verify that we have all of the events from every thread
1158   int expectedValues[numThreads];
1159   for (uint32_t n = 0; n < numThreads; ++n) {
1160     expectedValues[n] = 0;
1161   }
1162   for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1163        it != data.values.end();
1164        ++it) {
1165     int threadID = it->first;
1166     int value = it->second;
1167     ASSERT_EQ(expectedValues[threadID], value);
1168     ++expectedValues[threadID];
1169   }
1170   for (uint32_t n = 0; n < numThreads; ++n) {
1171     ASSERT_EQ(expectedValues[n], opsPerThread);
1172   }
1173
1174   // Wait on all of the threads.
1175   for (auto& thread: threads) {
1176     thread.join();
1177   }
1178 }
1179
1180 //  This test simulates some calls, and verifies that the waiting happens by
1181 //  triggering what otherwise would be race conditions, and trying to detect
1182 //  whether any of the race conditions happened.
1183 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1184   const size_t c = 256;
1185   vector<unique_ptr<atomic<size_t>>> atoms(c);
1186   for (size_t i = 0; i < c; ++i) {
1187     auto& atom = atoms.at(i);
1188     atom = make_unique<atomic<size_t>>(0);
1189   }
1190   vector<thread> threads(c);
1191   for (size_t i = 0; i < c; ++i) {
1192     auto& atom = *atoms.at(i);
1193     auto& th = threads.at(i);
1194     th = thread([&atom] {
1195         EventBase eb;
1196         auto ebth = thread([&]{ eb.loopForever(); });
1197         eb.waitUntilRunning();
1198         eb.runInEventBaseThreadAndWait([&] {
1199           size_t x = 0;
1200           atom.compare_exchange_weak(
1201               x, 1, std::memory_order_release, std::memory_order_relaxed);
1202         });
1203         size_t x = 0;
1204         atom.compare_exchange_weak(
1205             x, 2, std::memory_order_release, std::memory_order_relaxed);
1206         eb.terminateLoopSoon();
1207         ebth.join();
1208     });
1209   }
1210   for (size_t i = 0; i < c; ++i) {
1211     auto& th = threads.at(i);
1212     th.join();
1213   }
1214   size_t sum = 0;
1215   for (auto& atom : atoms) sum += *atom;
1216   EXPECT_EQ(c, sum);
1217 }
1218
1219 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1220   EventBase eb;
1221   thread th(&EventBase::loopForever, &eb);
1222   SCOPE_EXIT {
1223     eb.terminateLoopSoon();
1224     th.join();
1225   };
1226   auto mutated = false;
1227   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1228       mutated = true;
1229   });
1230   EXPECT_TRUE(mutated);
1231 }
1232
1233 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1234   EventBase eb;
1235   thread th(&EventBase::loopForever, &eb);
1236   SCOPE_EXIT {
1237     eb.terminateLoopSoon();
1238     th.join();
1239   };
1240   eb.runInEventBaseThreadAndWait([&] {
1241       auto mutated = false;
1242       eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1243           mutated = true;
1244       });
1245       EXPECT_TRUE(mutated);
1246   });
1247 }
1248
1249 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1250   EventBase eb;
1251   auto mutated = false;
1252   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1253       mutated = true;
1254     });
1255   EXPECT_TRUE(mutated);
1256 }
1257
1258 ///////////////////////////////////////////////////////////////////////////
1259 // Tests for runInLoop()
1260 ///////////////////////////////////////////////////////////////////////////
1261
1262 class CountedLoopCallback : public EventBase::LoopCallback {
1263  public:
1264   CountedLoopCallback(EventBase* eventBase,
1265                       unsigned int count,
1266                       std::function<void()> action =
1267                         std::function<void()>())
1268     : eventBase_(eventBase)
1269     , count_(count)
1270     , action_(action) {}
1271
1272   virtual void runLoopCallback() noexcept {
1273     --count_;
1274     if (count_ > 0) {
1275       eventBase_->runInLoop(this);
1276     } else if (action_) {
1277       action_();
1278     }
1279   }
1280
1281   unsigned int getCount() const {
1282     return count_;
1283   }
1284
1285  private:
1286   EventBase* eventBase_;
1287   unsigned int count_;
1288   std::function<void()> action_;
1289 };
1290
1291 // Test that EventBase::loop() doesn't exit while there are
1292 // still LoopCallbacks remaining to be invoked.
1293 TEST(EventBaseTest, RepeatedRunInLoop) {
1294   EventBase eventBase;
1295
1296   CountedLoopCallback c(&eventBase, 10);
1297   eventBase.runInLoop(&c);
1298   // The callback shouldn't have run immediately
1299   ASSERT_EQ(c.getCount(), 10);
1300   eventBase.loop();
1301
1302   // loop() should loop until the CountedLoopCallback stops
1303   // re-installing itself.
1304   ASSERT_EQ(c.getCount(), 0);
1305 }
1306
1307 // Test that EventBase::loop() works as expected without time measurements.
1308 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1309   EventBase eventBase(false);
1310
1311   CountedLoopCallback c(&eventBase, 10);
1312   eventBase.runInLoop(&c);
1313   // The callback shouldn't have run immediately
1314   ASSERT_EQ(c.getCount(), 10);
1315   eventBase.loop();
1316
1317   // loop() should loop until the CountedLoopCallback stops
1318   // re-installing itself.
1319   ASSERT_EQ(c.getCount(), 0);
1320 }
1321
1322 // Test runInLoop() calls with terminateLoopSoon()
1323 TEST(EventBaseTest, RunInLoopStopLoop) {
1324   EventBase eventBase;
1325
1326   CountedLoopCallback c1(&eventBase, 20);
1327   CountedLoopCallback c2(&eventBase, 10,
1328                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1329
1330   eventBase.runInLoop(&c1);
1331   eventBase.runInLoop(&c2);
1332   ASSERT_EQ(c1.getCount(), 20);
1333   ASSERT_EQ(c2.getCount(), 10);
1334
1335   eventBase.loopForever();
1336
1337   // c2 should have stopped the loop after 10 iterations
1338   ASSERT_EQ(c2.getCount(), 0);
1339
1340   // We allow the EventBase to run the loop callbacks in whatever order it
1341   // chooses.  We'll accept c1's count being either 10 (if the loop terminated
1342   // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1343   // before c1 ran).
1344   //
1345   // (With the current code, c1 will always run 10 times, but we don't consider
1346   // this a hard API requirement.)
1347   ASSERT_GE(c1.getCount(), 10);
1348   ASSERT_LE(c1.getCount(), 11);
1349 }
1350
1351 TEST(EventBaseTest, TryRunningAfterTerminate) {
1352   EventBase eventBase;
1353   CountedLoopCallback c1(&eventBase, 1,
1354                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1355   eventBase.runInLoop(&c1);
1356   eventBase.loopForever();
1357   bool ran = false;
1358   eventBase.runInEventBaseThread([&]() {
1359     ran = true;
1360   });
1361
1362   ASSERT_FALSE(ran);
1363 }
1364
1365 // Test cancelling runInLoop() callbacks
1366 TEST(EventBaseTest, CancelRunInLoop) {
1367   EventBase eventBase;
1368
1369   CountedLoopCallback c1(&eventBase, 20);
1370   CountedLoopCallback c2(&eventBase, 20);
1371   CountedLoopCallback c3(&eventBase, 20);
1372
1373   std::function<void()> cancelC1Action =
1374     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1375   std::function<void()> cancelC2Action =
1376     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1377
1378   CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1379   CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1380
1381   // Install cancelC1 after c1
1382   eventBase.runInLoop(&c1);
1383   eventBase.runInLoop(&cancelC1);
1384
1385   // Install cancelC2 before c2
1386   eventBase.runInLoop(&cancelC2);
1387   eventBase.runInLoop(&c2);
1388
1389   // Install c3
1390   eventBase.runInLoop(&c3);
1391
1392   ASSERT_EQ(c1.getCount(), 20);
1393   ASSERT_EQ(c2.getCount(), 20);
1394   ASSERT_EQ(c3.getCount(), 20);
1395   ASSERT_EQ(cancelC1.getCount(), 10);
1396   ASSERT_EQ(cancelC2.getCount(), 10);
1397
1398   // Run the loop
1399   eventBase.loop();
1400
1401   // cancelC1 and cancelC3 should have both fired after 10 iterations and
1402   // stopped re-installing themselves
1403   ASSERT_EQ(cancelC1.getCount(), 0);
1404   ASSERT_EQ(cancelC2.getCount(), 0);
1405   // c3 should have continued on for the full 20 iterations
1406   ASSERT_EQ(c3.getCount(), 0);
1407
1408   // c1 and c2 should have both been cancelled on the 10th iteration.
1409   //
1410   // Callbacks are always run in the order they are installed,
1411   // so c1 should have fired 10 times, and been canceled after it ran on the
1412   // 10th iteration.  c2 should have only fired 9 times, because cancelC2 will
1413   // have run before it on the 10th iteration, and cancelled it before it
1414   // fired.
1415   ASSERT_EQ(c1.getCount(), 10);
1416   ASSERT_EQ(c2.getCount(), 11);
1417 }
1418
1419 class TerminateTestCallback : public EventBase::LoopCallback,
1420                               public EventHandler {
1421  public:
1422   TerminateTestCallback(EventBase* eventBase, int fd)
1423     : EventHandler(eventBase, fd),
1424       eventBase_(eventBase),
1425       loopInvocations_(0),
1426       maxLoopInvocations_(0),
1427       eventInvocations_(0),
1428       maxEventInvocations_(0) {}
1429
1430   void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1431     loopInvocations_ = 0;
1432     maxLoopInvocations_ = maxLoopInvocations;
1433     eventInvocations_ = 0;
1434     maxEventInvocations_ = maxEventInvocations;
1435
1436     cancelLoopCallback();
1437     unregisterHandler();
1438   }
1439
1440   virtual void handlerReady(uint16_t events) noexcept {
1441     // We didn't register with PERSIST, so we will have been automatically
1442     // unregistered already.
1443     ASSERT_FALSE(isHandlerRegistered());
1444
1445     ++eventInvocations_;
1446     if (eventInvocations_ >= maxEventInvocations_) {
1447       return;
1448     }
1449
1450     eventBase_->runInLoop(this);
1451   }
1452   virtual void runLoopCallback() noexcept {
1453     ++loopInvocations_;
1454     if (loopInvocations_ >= maxLoopInvocations_) {
1455       return;
1456     }
1457
1458     registerHandler(READ);
1459   }
1460
1461   uint32_t getLoopInvocations() const {
1462     return loopInvocations_;
1463   }
1464   uint32_t getEventInvocations() const {
1465     return eventInvocations_;
1466   }
1467
1468  private:
1469   EventBase* eventBase_;
1470   uint32_t loopInvocations_;
1471   uint32_t maxLoopInvocations_;
1472   uint32_t eventInvocations_;
1473   uint32_t maxEventInvocations_;
1474 };
1475
1476 /**
1477  * Test that EventBase::loop() correctly detects when there are no more events
1478  * left to run.
1479  *
1480  * This uses a single callback, which alternates registering itself as a loop
1481  * callback versus a EventHandler callback.  This exercises a regression where
1482  * EventBase::loop() incorrectly exited if there were no more fd handlers
1483  * registered, but a loop callback installed a new fd handler.
1484  */
1485 TEST(EventBaseTest, LoopTermination) {
1486   EventBase eventBase;
1487
1488   // Open a pipe and close the write end,
1489   // so the read endpoint will be readable
1490   int pipeFds[2];
1491   int rc = pipe(pipeFds);
1492   ASSERT_EQ(rc, 0);
1493   close(pipeFds[1]);
1494   TerminateTestCallback callback(&eventBase, pipeFds[0]);
1495
1496   // Test once where the callback will exit after a loop callback
1497   callback.reset(10, 100);
1498   eventBase.runInLoop(&callback);
1499   eventBase.loop();
1500   ASSERT_EQ(callback.getLoopInvocations(), 10);
1501   ASSERT_EQ(callback.getEventInvocations(), 9);
1502
1503   // Test once where the callback will exit after an fd event callback
1504   callback.reset(100, 7);
1505   eventBase.runInLoop(&callback);
1506   eventBase.loop();
1507   ASSERT_EQ(callback.getLoopInvocations(), 7);
1508   ASSERT_EQ(callback.getEventInvocations(), 7);
1509
1510   close(pipeFds[0]);
1511 }
1512
1513 ///////////////////////////////////////////////////////////////////////////
1514 // Tests for latency calculations
1515 ///////////////////////////////////////////////////////////////////////////
1516
1517 class IdleTimeTimeoutSeries : public AsyncTimeout {
1518
1519  public:
1520
1521   explicit IdleTimeTimeoutSeries(EventBase *base,
1522                                  std::deque<std::uint64_t>& timeout) :
1523     AsyncTimeout(base),
1524     timeouts_(0),
1525     timeout_(timeout) {
1526       scheduleTimeout(1);
1527     }
1528
1529   virtual ~IdleTimeTimeoutSeries() {}
1530
1531   void timeoutExpired() noexcept {
1532     ++timeouts_;
1533
1534     if(timeout_.empty()){
1535       cancelTimeout();
1536     } else {
1537       uint64_t sleepTime = timeout_.front();
1538       timeout_.pop_front();
1539       if (sleepTime) {
1540         usleep(sleepTime);
1541       }
1542       scheduleTimeout(1);
1543     }
1544   }
1545
1546   int getTimeouts() const {
1547     return timeouts_;
1548   }
1549
1550  private:
1551   int timeouts_;
1552   std::deque<uint64_t>& timeout_;
1553 };
1554
1555 /**
1556  * Verify that idle time is correctly accounted for when decaying our loop
1557  * time.
1558  *
1559  * This works by creating a high loop time (via usleep), expecting a latency
1560  * callback with known value, and then scheduling a timeout for later. This
1561  * later timeout is far enough in the future that the idle time should have
1562  * caused the loop time to decay.
1563  */
1564 TEST(EventBaseTest, IdleTime) {
1565   EventBase eventBase;
1566   eventBase.setLoadAvgMsec(1000);
1567   eventBase.resetLoadAvg(5900.0);
1568   std::deque<uint64_t> timeouts0(4, 8080);
1569   timeouts0.push_front(8000);
1570   timeouts0.push_back(14000);
1571   IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1572   std::deque<uint64_t> timeouts(20, 20);
1573   std::unique_ptr<IdleTimeTimeoutSeries> tos;
1574   int64_t testStart = duration_cast<microseconds>(
1575     std::chrono::steady_clock::now().time_since_epoch()).count();
1576   bool hostOverloaded = false;
1577
1578   int latencyCallbacks = 0;
1579   eventBase.setMaxLatency(6000, [&]() {
1580     ++latencyCallbacks;
1581
1582     switch (latencyCallbacks) {
1583     case 1:
1584       if (tos0.getTimeouts() < 6) {
1585         // This could only happen if the host this test is running
1586         // on is heavily loaded.
1587         int64_t maxLatencyReached = duration_cast<microseconds>(
1588             std::chrono::steady_clock::now().time_since_epoch()).count();
1589         ASSERT_LE(43800, maxLatencyReached - testStart);
1590         hostOverloaded = true;
1591         break;
1592       }
1593       ASSERT_EQ(6, tos0.getTimeouts());
1594       ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1595       ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1596       tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1597       break;
1598
1599     default:
1600       FAIL() << "Unexpected latency callback";
1601       break;
1602     }
1603   });
1604
1605   // Kick things off with an "immedite" timeout
1606   tos0.scheduleTimeout(1);
1607
1608   eventBase.loop();
1609
1610   if (hostOverloaded) {
1611     return;
1612   }
1613
1614   ASSERT_EQ(1, latencyCallbacks);
1615   ASSERT_EQ(7, tos0.getTimeouts());
1616   ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1617   ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1618   ASSERT_TRUE(!!tos);
1619   ASSERT_EQ(21, tos->getTimeouts());
1620 }
1621
1622 /**
1623  * Test that thisLoop functionality works with terminateLoopSoon
1624  */
1625 TEST(EventBaseTest, ThisLoop) {
1626   EventBase eb;
1627   bool runInLoop = false;
1628   bool runThisLoop = false;
1629
1630   eb.runInLoop([&](){
1631       eb.terminateLoopSoon();
1632       eb.runInLoop([&]() {
1633           runInLoop = true;
1634         });
1635       eb.runInLoop([&]() {
1636           runThisLoop = true;
1637         }, true);
1638     }, true);
1639   eb.loopForever();
1640
1641   // Should not work
1642   ASSERT_FALSE(runInLoop);
1643   // Should work with thisLoop
1644   ASSERT_TRUE(runThisLoop);
1645 }
1646
1647 TEST(EventBaseTest, EventBaseThreadLoop) {
1648   EventBase base;
1649   bool ran = false;
1650
1651   base.runInEventBaseThread([&](){
1652     ran = true;
1653   });
1654   base.loop();
1655
1656   ASSERT_EQ(true, ran);
1657 }
1658
1659 TEST(EventBaseTest, EventBaseThreadName) {
1660   EventBase base;
1661   base.setName("foo");
1662   base.loop();
1663
1664 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1665   char name[16];
1666   pthread_getname_np(pthread_self(), name, 16);
1667   ASSERT_EQ(0, strcmp("foo", name));
1668 #endif
1669 }
1670
1671 TEST(EventBaseTest, RunBeforeLoop) {
1672   EventBase base;
1673   CountedLoopCallback cb(&base, 1, [&](){
1674     base.terminateLoopSoon();
1675   });
1676   base.runBeforeLoop(&cb);
1677   base.loopForever();
1678   ASSERT_EQ(cb.getCount(), 0);
1679 }
1680
1681 TEST(EventBaseTest, RunBeforeLoopWait) {
1682   EventBase base;
1683   CountedLoopCallback cb(&base, 1);
1684   base.tryRunAfterDelay([&](){
1685       base.terminateLoopSoon();
1686     }, 500);
1687   base.runBeforeLoop(&cb);
1688   base.loopForever();
1689
1690   // Check that we only ran once, and did not loop multiple times.
1691   ASSERT_EQ(cb.getCount(), 0);
1692 }
1693
1694 class PipeHandler : public EventHandler {
1695 public:
1696   PipeHandler(EventBase* eventBase, int fd)
1697     : EventHandler(eventBase, fd) {}
1698
1699   void handlerReady(uint16_t events) noexcept {
1700     abort();
1701   }
1702 };
1703
1704 TEST(EventBaseTest, StopBeforeLoop) {
1705   EventBase evb;
1706
1707   // Give the evb something to do.
1708   int p[2];
1709   ASSERT_EQ(0, pipe(p));
1710   PipeHandler handler(&evb, p[0]);
1711   handler.registerHandler(EventHandler::READ);
1712
1713   // It's definitely not running yet
1714   evb.terminateLoopSoon();
1715
1716   // let it run, it should exit quickly.
1717   std::thread t([&] { evb.loop(); });
1718   t.join();
1719
1720   handler.unregisterHandler();
1721   close(p[0]);
1722   close(p[1]);
1723
1724   SUCCEED();
1725 }
1726
1727 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1728   bool ran = false;
1729
1730   {
1731     EventBase base;
1732     base.runInEventBaseThread([&](){
1733       ran = true;
1734     });
1735   }
1736
1737   ASSERT_TRUE(ran);
1738 }