Implement LoopKeepAlive for EventBase
[folly.git] / folly / io / async / test / EventBaseTest.cpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 #include <folly/Memory.h>
20
21 #include <folly/io/async/AsyncTimeout.h>
22 #include <folly/io/async/EventBase.h>
23 #include <folly/io/async/EventHandler.h>
24 #include <folly/io/async/test/SocketPair.h>
25 #include <folly/io/async/test/Util.h>
26
27 #include <atomic>
28 #include <iostream>
29 #include <unistd.h>
30 #include <memory>
31 #include <thread>
32
33 using std::atomic;
34 using std::deque;
35 using std::pair;
36 using std::vector;
37 using std::unique_ptr;
38 using std::thread;
39 using std::make_pair;
40 using std::cerr;
41 using std::endl;
42 using std::chrono::milliseconds;
43 using std::chrono::microseconds;
44 using std::chrono::duration_cast;
45
46 using namespace folly;
47
48 ///////////////////////////////////////////////////////////////////////////
49 // Tests for read and write events
50 ///////////////////////////////////////////////////////////////////////////
51
52 enum { BUF_SIZE = 4096 };
53
54 ssize_t writeToFD(int fd, size_t length) {
55   // write an arbitrary amount of data to the fd
56   char buf[length];
57   memset(buf, 'a', sizeof(buf));
58   ssize_t rc = write(fd, buf, sizeof(buf));
59   CHECK_EQ(rc, length);
60   return rc;
61 }
62
63 size_t writeUntilFull(int fd) {
64   // Write to the fd until EAGAIN is returned
65   size_t bytesWritten = 0;
66   char buf[BUF_SIZE];
67   memset(buf, 'a', sizeof(buf));
68   while (true) {
69     ssize_t rc = write(fd, buf, sizeof(buf));
70     if (rc < 0) {
71       CHECK_EQ(errno, EAGAIN);
72       break;
73     } else {
74       bytesWritten += rc;
75     }
76   }
77   return bytesWritten;
78 }
79
80 ssize_t readFromFD(int fd, size_t length) {
81   // write an arbitrary amount of data to the fd
82   char buf[length];
83   return read(fd, buf, sizeof(buf));
84 }
85
86 size_t readUntilEmpty(int fd) {
87   // Read from the fd until EAGAIN is returned
88   char buf[BUF_SIZE];
89   size_t bytesRead = 0;
90   while (true) {
91     int rc = read(fd, buf, sizeof(buf));
92     if (rc == 0) {
93       CHECK(false) << "unexpected EOF";
94     } else if (rc < 0) {
95       CHECK_EQ(errno, EAGAIN);
96       break;
97     } else {
98       bytesRead += rc;
99     }
100   }
101   return bytesRead;
102 }
103
104 void checkReadUntilEmpty(int fd, size_t expectedLength) {
105   ASSERT_EQ(readUntilEmpty(fd), expectedLength);
106 }
107
108 struct ScheduledEvent {
109   int milliseconds;
110   uint16_t events;
111   size_t length;
112   ssize_t result;
113
114   void perform(int fd) {
115     if (events & EventHandler::READ) {
116       if (length == 0) {
117         result = readUntilEmpty(fd);
118       } else {
119         result = readFromFD(fd, length);
120       }
121     }
122     if (events & EventHandler::WRITE) {
123       if (length == 0) {
124         result = writeUntilFull(fd);
125       } else {
126         result = writeToFD(fd, length);
127       }
128     }
129   }
130 };
131
132 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
133   for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
134     eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
135                              ev->milliseconds);
136   }
137 }
138
139 class TestHandler : public EventHandler {
140  public:
141   TestHandler(EventBase* eventBase, int fd)
142     : EventHandler(eventBase, fd), fd_(fd) {}
143
144   void handlerReady(uint16_t events) noexcept override {
145     ssize_t bytesRead = 0;
146     ssize_t bytesWritten = 0;
147     if (events & READ) {
148       // Read all available data, so EventBase will stop calling us
149       // until new data becomes available
150       bytesRead = readUntilEmpty(fd_);
151     }
152     if (events & WRITE) {
153       // Write until the pipe buffer is full, so EventBase will stop calling
154       // us until the other end has read some data
155       bytesWritten = writeUntilFull(fd_);
156     }
157
158     log.emplace_back(events, bytesRead, bytesWritten);
159   }
160
161   struct EventRecord {
162     EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
163       : events(events)
164       , timestamp()
165       , bytesRead(bytesRead)
166       , bytesWritten(bytesWritten) {}
167
168     uint16_t events;
169     TimePoint timestamp;
170     ssize_t bytesRead;
171     ssize_t bytesWritten;
172   };
173
174   deque<EventRecord> log;
175
176  private:
177   int fd_;
178 };
179
180 /**
181  * Test a READ event
182  */
183 TEST(EventBaseTest, ReadEvent) {
184   EventBase eb;
185   SocketPair sp;
186
187   // Register for read events
188   TestHandler handler(&eb, sp[0]);
189   handler.registerHandler(EventHandler::READ);
190
191   // Register timeouts to perform two write events
192   ScheduledEvent events[] = {
193     { 10, EventHandler::WRITE, 2345, 0 },
194     { 160, EventHandler::WRITE, 99, 0 },
195     { 0, 0, 0, 0 },
196   };
197   scheduleEvents(&eb, sp[1], events);
198
199   // Loop
200   TimePoint start;
201   eb.loop();
202   TimePoint end;
203
204   // Since we didn't use the EventHandler::PERSIST flag, the handler should
205   // have received the first read, then unregistered itself.  Check that only
206   // the first chunk of data was received.
207   ASSERT_EQ(handler.log.size(), 1);
208   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
209   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
210                   milliseconds(events[0].milliseconds), milliseconds(90));
211   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
212   ASSERT_EQ(handler.log[0].bytesWritten, 0);
213   T_CHECK_TIMEOUT(start, end,
214                   milliseconds(events[1].milliseconds), milliseconds(30));
215
216   // Make sure the second chunk of data is still waiting to be read.
217   size_t bytesRemaining = readUntilEmpty(sp[0]);
218   ASSERT_EQ(bytesRemaining, events[1].length);
219 }
220
221 /**
222  * Test (READ | PERSIST)
223  */
224 TEST(EventBaseTest, ReadPersist) {
225   EventBase eb;
226   SocketPair sp;
227
228   // Register for read events
229   TestHandler handler(&eb, sp[0]);
230   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
231
232   // Register several timeouts to perform writes
233   ScheduledEvent events[] = {
234     { 10,  EventHandler::WRITE, 1024, 0 },
235     { 20,  EventHandler::WRITE, 2211, 0 },
236     { 30,  EventHandler::WRITE, 4096, 0 },
237     { 100, EventHandler::WRITE, 100,  0 },
238     { 0, 0, 0, 0 },
239   };
240   scheduleEvents(&eb, sp[1], events);
241
242   // Schedule a timeout to unregister the handler after the third write
243   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
244
245   // Loop
246   TimePoint start;
247   eb.loop();
248   TimePoint end;
249
250   // The handler should have received the first 3 events,
251   // then been unregistered after that.
252   ASSERT_EQ(handler.log.size(), 3);
253   for (int n = 0; n < 3; ++n) {
254     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
255     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
256                     milliseconds(events[n].milliseconds));
257     ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
258     ASSERT_EQ(handler.log[n].bytesWritten, 0);
259   }
260   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
261
262   // Make sure the data from the last write is still waiting to be read
263   size_t bytesRemaining = readUntilEmpty(sp[0]);
264   ASSERT_EQ(bytesRemaining, events[3].length);
265 }
266
267 /**
268  * Test registering for READ when the socket is immediately readable
269  */
270 TEST(EventBaseTest, ReadImmediate) {
271   EventBase eb;
272   SocketPair sp;
273
274   // Write some data to the socket so the other end will
275   // be immediately readable
276   size_t dataLength = 1234;
277   writeToFD(sp[1], dataLength);
278
279   // Register for read events
280   TestHandler handler(&eb, sp[0]);
281   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
282
283   // Register a timeout to perform another write
284   ScheduledEvent events[] = {
285     { 10, EventHandler::WRITE, 2345, 0 },
286     { 0, 0, 0, 0 },
287   };
288   scheduleEvents(&eb, sp[1], events);
289
290   // Schedule a timeout to unregister the handler
291   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
292
293   // Loop
294   TimePoint start;
295   eb.loop();
296   TimePoint end;
297
298   ASSERT_EQ(handler.log.size(), 2);
299
300   // There should have been 1 event for immediate readability
301   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
302   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
303   ASSERT_EQ(handler.log[0].bytesRead, dataLength);
304   ASSERT_EQ(handler.log[0].bytesWritten, 0);
305
306   // There should be another event after the timeout wrote more data
307   ASSERT_EQ(handler.log[1].events, EventHandler::READ);
308   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
309                   milliseconds(events[0].milliseconds));
310   ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
311   ASSERT_EQ(handler.log[1].bytesWritten, 0);
312
313   T_CHECK_TIMEOUT(start, end, milliseconds(20));
314 }
315
316 /**
317  * Test a WRITE event
318  */
319 TEST(EventBaseTest, WriteEvent) {
320   EventBase eb;
321   SocketPair sp;
322
323   // Fill up the write buffer before starting
324   size_t initialBytesWritten = writeUntilFull(sp[0]);
325
326   // Register for write events
327   TestHandler handler(&eb, sp[0]);
328   handler.registerHandler(EventHandler::WRITE);
329
330   // Register timeouts to perform two reads
331   ScheduledEvent events[] = {
332     { 10, EventHandler::READ, 0, 0 },
333     { 60, EventHandler::READ, 0, 0 },
334     { 0, 0, 0, 0 },
335   };
336   scheduleEvents(&eb, sp[1], events);
337
338   // Loop
339   TimePoint start;
340   eb.loop();
341   TimePoint end;
342
343   // Since we didn't use the EventHandler::PERSIST flag, the handler should
344   // have only been able to write once, then unregistered itself.
345   ASSERT_EQ(handler.log.size(), 1);
346   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
347   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
348                   milliseconds(events[0].milliseconds));
349   ASSERT_EQ(handler.log[0].bytesRead, 0);
350   ASSERT_GT(handler.log[0].bytesWritten, 0);
351   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
352
353   ASSERT_EQ(events[0].result, initialBytesWritten);
354   ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
355 }
356
357 /**
358  * Test (WRITE | PERSIST)
359  */
360 TEST(EventBaseTest, WritePersist) {
361   EventBase eb;
362   SocketPair sp;
363
364   // Fill up the write buffer before starting
365   size_t initialBytesWritten = writeUntilFull(sp[0]);
366
367   // Register for write events
368   TestHandler handler(&eb, sp[0]);
369   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
370
371   // Register several timeouts to read from the socket at several intervals
372   ScheduledEvent events[] = {
373     { 10,  EventHandler::READ, 0, 0 },
374     { 40,  EventHandler::READ, 0, 0 },
375     { 70,  EventHandler::READ, 0, 0 },
376     { 100, EventHandler::READ, 0, 0 },
377     { 0, 0, 0, 0 },
378   };
379   scheduleEvents(&eb, sp[1], events);
380
381   // Schedule a timeout to unregister the handler after the third read
382   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
383
384   // Loop
385   TimePoint start;
386   eb.loop();
387   TimePoint end;
388
389   // The handler should have received the first 3 events,
390   // then been unregistered after that.
391   ASSERT_EQ(handler.log.size(), 3);
392   ASSERT_EQ(events[0].result, initialBytesWritten);
393   for (int n = 0; n < 3; ++n) {
394     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
395     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
396                     milliseconds(events[n].milliseconds));
397     ASSERT_EQ(handler.log[n].bytesRead, 0);
398     ASSERT_GT(handler.log[n].bytesWritten, 0);
399     ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
400   }
401   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
402 }
403
404 /**
405  * Test registering for WRITE when the socket is immediately writable
406  */
407 TEST(EventBaseTest, WriteImmediate) {
408   EventBase eb;
409   SocketPair sp;
410
411   // Register for write events
412   TestHandler handler(&eb, sp[0]);
413   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
414
415   // Register a timeout to perform a read
416   ScheduledEvent events[] = {
417     { 10, EventHandler::READ, 0, 0 },
418     { 0, 0, 0, 0 },
419   };
420   scheduleEvents(&eb, sp[1], events);
421
422   // Schedule a timeout to unregister the handler
423   int64_t unregisterTimeout = 40;
424   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
425                    unregisterTimeout);
426
427   // Loop
428   TimePoint start;
429   eb.loop();
430   TimePoint end;
431
432   ASSERT_EQ(handler.log.size(), 2);
433
434   // Since the socket buffer was initially empty,
435   // there should have been 1 event for immediate writability
436   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
437   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
438   ASSERT_EQ(handler.log[0].bytesRead, 0);
439   ASSERT_GT(handler.log[0].bytesWritten, 0);
440
441   // There should be another event after the timeout wrote more data
442   ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
443   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
444                   milliseconds(events[0].milliseconds));
445   ASSERT_EQ(handler.log[1].bytesRead, 0);
446   ASSERT_GT(handler.log[1].bytesWritten, 0);
447
448   T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
449 }
450
451 /**
452  * Test (READ | WRITE) when the socket becomes readable first
453  */
454 TEST(EventBaseTest, ReadWrite) {
455   EventBase eb;
456   SocketPair sp;
457
458   // Fill up the write buffer before starting
459   size_t sock0WriteLength = writeUntilFull(sp[0]);
460
461   // Register for read and write events
462   TestHandler handler(&eb, sp[0]);
463   handler.registerHandler(EventHandler::READ_WRITE);
464
465   // Register timeouts to perform a write then a read.
466   ScheduledEvent events[] = {
467     { 10, EventHandler::WRITE, 2345, 0 },
468     { 40, EventHandler::READ, 0, 0 },
469     { 0, 0, 0, 0 },
470   };
471   scheduleEvents(&eb, sp[1], events);
472
473   // Loop
474   TimePoint start;
475   eb.loop();
476   TimePoint end;
477
478   // Since we didn't use the EventHandler::PERSIST flag, the handler should
479   // have only noticed readability, then unregistered itself.  Check that only
480   // one event was logged.
481   ASSERT_EQ(handler.log.size(), 1);
482   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
483   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
484                   milliseconds(events[0].milliseconds));
485   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
486   ASSERT_EQ(handler.log[0].bytesWritten, 0);
487   ASSERT_EQ(events[1].result, sock0WriteLength);
488   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
489 }
490
491 /**
492  * Test (READ | WRITE) when the socket becomes writable first
493  */
494 TEST(EventBaseTest, WriteRead) {
495   EventBase eb;
496   SocketPair sp;
497
498   // Fill up the write buffer before starting
499   size_t sock0WriteLength = writeUntilFull(sp[0]);
500
501   // Register for read and write events
502   TestHandler handler(&eb, sp[0]);
503   handler.registerHandler(EventHandler::READ_WRITE);
504
505   // Register timeouts to perform a read then a write.
506   size_t sock1WriteLength = 2345;
507   ScheduledEvent events[] = {
508     { 10, EventHandler::READ, 0, 0 },
509     { 40, EventHandler::WRITE, sock1WriteLength, 0 },
510     { 0, 0, 0, 0 },
511   };
512   scheduleEvents(&eb, sp[1], events);
513
514   // Loop
515   TimePoint start;
516   eb.loop();
517   TimePoint end;
518
519   // Since we didn't use the EventHandler::PERSIST flag, the handler should
520   // have only noticed writability, then unregistered itself.  Check that only
521   // one event was logged.
522   ASSERT_EQ(handler.log.size(), 1);
523   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
524   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
525                   milliseconds(events[0].milliseconds));
526   ASSERT_EQ(handler.log[0].bytesRead, 0);
527   ASSERT_GT(handler.log[0].bytesWritten, 0);
528   ASSERT_EQ(events[0].result, sock0WriteLength);
529   ASSERT_EQ(events[1].result, sock1WriteLength);
530   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
531
532   // Make sure the written data is still waiting to be read.
533   size_t bytesRemaining = readUntilEmpty(sp[0]);
534   ASSERT_EQ(bytesRemaining, events[1].length);
535 }
536
537 /**
538  * Test (READ | WRITE) when the socket becomes readable and writable
539  * at the same time.
540  */
541 TEST(EventBaseTest, ReadWriteSimultaneous) {
542   EventBase eb;
543   SocketPair sp;
544
545   // Fill up the write buffer before starting
546   size_t sock0WriteLength = writeUntilFull(sp[0]);
547
548   // Register for read and write events
549   TestHandler handler(&eb, sp[0]);
550   handler.registerHandler(EventHandler::READ_WRITE);
551
552   // Register a timeout to perform a read and write together
553   ScheduledEvent events[] = {
554     { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
555     { 0, 0, 0, 0 },
556   };
557   scheduleEvents(&eb, sp[1], events);
558
559   // Loop
560   TimePoint start;
561   eb.loop();
562   TimePoint end;
563
564   // It's not strictly required that the EventBase register us about both
565   // events in the same call.  So, it's possible that if the EventBase
566   // implementation changes this test could start failing, and it wouldn't be
567   // considered breaking the API.  However for now it's nice to exercise this
568   // code path.
569   ASSERT_EQ(handler.log.size(), 1);
570   ASSERT_EQ(handler.log[0].events,
571                     EventHandler::READ | EventHandler::WRITE);
572   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
573                   milliseconds(events[0].milliseconds));
574   ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
575   ASSERT_GT(handler.log[0].bytesWritten, 0);
576   T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
577 }
578
579 /**
580  * Test (READ | WRITE | PERSIST)
581  */
582 TEST(EventBaseTest, ReadWritePersist) {
583   EventBase eb;
584   SocketPair sp;
585
586   // Register for read and write events
587   TestHandler handler(&eb, sp[0]);
588   handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
589                           EventHandler::PERSIST);
590
591   // Register timeouts to perform several reads and writes
592   ScheduledEvent events[] = {
593     { 10, EventHandler::WRITE, 2345, 0 },
594     { 20, EventHandler::READ, 0, 0 },
595     { 35, EventHandler::WRITE, 200, 0 },
596     { 45, EventHandler::WRITE, 15, 0 },
597     { 55, EventHandler::READ, 0, 0 },
598     { 120, EventHandler::WRITE, 2345, 0 },
599     { 0, 0, 0, 0 },
600   };
601   scheduleEvents(&eb, sp[1], events);
602
603   // Schedule a timeout to unregister the handler
604   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
605
606   // Loop
607   TimePoint start;
608   eb.loop();
609   TimePoint end;
610
611   ASSERT_EQ(handler.log.size(), 6);
612
613   // Since we didn't fill up the write buffer immediately, there should
614   // be an immediate event for writability.
615   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
616   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
617   ASSERT_EQ(handler.log[0].bytesRead, 0);
618   ASSERT_GT(handler.log[0].bytesWritten, 0);
619
620   // Events 1 through 5 should correspond to the scheduled events
621   for (int n = 1; n < 6; ++n) {
622     ScheduledEvent* event = &events[n - 1];
623     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
624                     milliseconds(event->milliseconds));
625     if (event->events == EventHandler::READ) {
626       ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
627       ASSERT_EQ(handler.log[n].bytesRead, 0);
628       ASSERT_GT(handler.log[n].bytesWritten, 0);
629     } else {
630       ASSERT_EQ(handler.log[n].events, EventHandler::READ);
631       ASSERT_EQ(handler.log[n].bytesRead, event->length);
632       ASSERT_EQ(handler.log[n].bytesWritten, 0);
633     }
634   }
635
636   // The timeout should have unregistered the handler before the last write.
637   // Make sure that data is still waiting to be read
638   size_t bytesRemaining = readUntilEmpty(sp[0]);
639   ASSERT_EQ(bytesRemaining, events[5].length);
640 }
641
642
643 class PartialReadHandler : public TestHandler {
644  public:
645   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
646     : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
647
648   void handlerReady(uint16_t events) noexcept override {
649     assert(events == EventHandler::READ);
650     ssize_t bytesRead = readFromFD(fd_, readLength_);
651     log.emplace_back(events, bytesRead, 0);
652   }
653
654  private:
655   int fd_;
656   size_t readLength_;
657 };
658
659 /**
660  * Test reading only part of the available data when a read event is fired.
661  * When PERSIST is used, make sure the handler gets notified again the next
662  * time around the loop.
663  */
664 TEST(EventBaseTest, ReadPartial) {
665   EventBase eb;
666   SocketPair sp;
667
668   // Register for read events
669   size_t readLength = 100;
670   PartialReadHandler handler(&eb, sp[0], readLength);
671   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
672
673   // Register a timeout to perform a single write,
674   // with more data than PartialReadHandler will read at once
675   ScheduledEvent events[] = {
676     { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
677     { 0, 0, 0, 0 },
678   };
679   scheduleEvents(&eb, sp[1], events);
680
681   // Schedule a timeout to unregister the handler
682   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
683
684   // Loop
685   TimePoint start;
686   eb.loop();
687   TimePoint end;
688
689   ASSERT_EQ(handler.log.size(), 4);
690
691   // The first 3 invocations should read readLength bytes each
692   for (int n = 0; n < 3; ++n) {
693     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
694     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
695                     milliseconds(events[0].milliseconds));
696     ASSERT_EQ(handler.log[n].bytesRead, readLength);
697     ASSERT_EQ(handler.log[n].bytesWritten, 0);
698   }
699   // The last read only has readLength/2 bytes
700   ASSERT_EQ(handler.log[3].events, EventHandler::READ);
701   T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
702                   milliseconds(events[0].milliseconds));
703   ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
704   ASSERT_EQ(handler.log[3].bytesWritten, 0);
705 }
706
707
708 class PartialWriteHandler : public TestHandler {
709  public:
710   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
711     : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
712
713   void handlerReady(uint16_t events) noexcept override {
714     assert(events == EventHandler::WRITE);
715     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
716     log.emplace_back(events, 0, bytesWritten);
717   }
718
719  private:
720   int fd_;
721   size_t writeLength_;
722 };
723
724 /**
725  * Test writing without completely filling up the write buffer when the fd
726  * becomes writable.  When PERSIST is used, make sure the handler gets
727  * notified again the next time around the loop.
728  */
729 TEST(EventBaseTest, WritePartial) {
730   EventBase eb;
731   SocketPair sp;
732
733   // Fill up the write buffer before starting
734   size_t initialBytesWritten = writeUntilFull(sp[0]);
735
736   // Register for write events
737   size_t writeLength = 100;
738   PartialWriteHandler handler(&eb, sp[0], writeLength);
739   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
740
741   // Register a timeout to read, so that more data can be written
742   ScheduledEvent events[] = {
743     { 10, EventHandler::READ, 0, 0 },
744     { 0, 0, 0, 0 },
745   };
746   scheduleEvents(&eb, sp[1], events);
747
748   // Schedule a timeout to unregister the handler
749   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
750
751   // Loop
752   TimePoint start;
753   eb.loop();
754   TimePoint end;
755
756   // Depending on how big the socket buffer is, there will be multiple writes
757   // Only check the first 5
758   int numChecked = 5;
759   ASSERT_GE(handler.log.size(), numChecked);
760   ASSERT_EQ(events[0].result, initialBytesWritten);
761
762   // The first 3 invocations should read writeLength bytes each
763   for (int n = 0; n < numChecked; ++n) {
764     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
765     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
766                     milliseconds(events[0].milliseconds));
767     ASSERT_EQ(handler.log[n].bytesRead, 0);
768     ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
769   }
770 }
771
772
773 /**
774  * Test destroying a registered EventHandler
775  */
776 TEST(EventBaseTest, DestroyHandler) {
777   class DestroyHandler : public AsyncTimeout {
778    public:
779     DestroyHandler(EventBase* eb, EventHandler* h)
780       : AsyncTimeout(eb)
781       , handler_(h) {}
782
783     void timeoutExpired() noexcept override { delete handler_; }
784
785    private:
786     EventHandler* handler_;
787   };
788
789   EventBase eb;
790   SocketPair sp;
791
792   // Fill up the write buffer before starting
793   size_t initialBytesWritten = writeUntilFull(sp[0]);
794
795   // Register for write events
796   TestHandler* handler = new TestHandler(&eb, sp[0]);
797   handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
798
799   // After 10ms, read some data, so that the handler
800   // will be notified that it can write.
801   eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
802                    10);
803
804   // Start a timer to destroy the handler after 25ms
805   // This mainly just makes sure the code doesn't break or assert
806   DestroyHandler dh(&eb, handler);
807   dh.scheduleTimeout(25);
808
809   TimePoint start;
810   eb.loop();
811   TimePoint end;
812
813   // Make sure the EventHandler was uninstalled properly when it was
814   // destroyed, and the EventBase loop exited
815   T_CHECK_TIMEOUT(start, end, milliseconds(25));
816
817   // Make sure that the handler wrote data to the socket
818   // before it was destroyed
819   size_t bytesRemaining = readUntilEmpty(sp[1]);
820   ASSERT_GT(bytesRemaining, 0);
821 }
822
823
824 ///////////////////////////////////////////////////////////////////////////
825 // Tests for timeout events
826 ///////////////////////////////////////////////////////////////////////////
827
828 TEST(EventBaseTest, RunAfterDelay) {
829   EventBase eb;
830
831   TimePoint timestamp1(false);
832   TimePoint timestamp2(false);
833   TimePoint timestamp3(false);
834   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
835   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
836   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
837
838   TimePoint start;
839   eb.loop();
840   TimePoint end;
841
842   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
843   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
844   T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
845   T_CHECK_TIMEOUT(start, end, milliseconds(40));
846 }
847
848 /**
849  * Test the behavior of tryRunAfterDelay() when some timeouts are
850  * still scheduled when the EventBase is destroyed.
851  */
852 TEST(EventBaseTest, RunAfterDelayDestruction) {
853   TimePoint timestamp1(false);
854   TimePoint timestamp2(false);
855   TimePoint timestamp3(false);
856   TimePoint timestamp4(false);
857   TimePoint start(false);
858   TimePoint end(false);
859
860   {
861     EventBase eb;
862
863     // Run two normal timeouts
864     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
865     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
866
867     // Schedule a timeout to stop the event loop after 40ms
868     eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
869
870     // Schedule 2 timeouts that would fire after the event loop stops
871     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
872     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
873
874     start.reset();
875     eb.loop();
876     end.reset();
877   }
878
879   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
880   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
881   T_CHECK_TIMEOUT(start, end, milliseconds(40));
882
883   ASSERT_TRUE(timestamp3.isUnset());
884   ASSERT_TRUE(timestamp4.isUnset());
885
886   // Ideally this test should be run under valgrind to ensure that no
887   // memory is leaked.
888 }
889
890 class TestTimeout : public AsyncTimeout {
891  public:
892   explicit TestTimeout(EventBase* eventBase)
893     : AsyncTimeout(eventBase)
894     , timestamp(false) {}
895
896   void timeoutExpired() noexcept override { timestamp.reset(); }
897
898   TimePoint timestamp;
899 };
900
901 TEST(EventBaseTest, BasicTimeouts) {
902   EventBase eb;
903
904   TestTimeout t1(&eb);
905   TestTimeout t2(&eb);
906   TestTimeout t3(&eb);
907   t1.scheduleTimeout(10);
908   t2.scheduleTimeout(20);
909   t3.scheduleTimeout(40);
910
911   TimePoint start;
912   eb.loop();
913   TimePoint end;
914
915   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
916   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
917   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
918   T_CHECK_TIMEOUT(start, end, milliseconds(40));
919 }
920
921 class ReschedulingTimeout : public AsyncTimeout {
922  public:
923   ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
924     : AsyncTimeout(evb)
925     , timeouts_(timeouts)
926     , iterator_(timeouts_.begin()) {}
927
928   void start() {
929     reschedule();
930   }
931
932   void timeoutExpired() noexcept override {
933     timestamps.emplace_back();
934     reschedule();
935   }
936
937   void reschedule() {
938     if (iterator_ != timeouts_.end()) {
939       uint32_t timeout = *iterator_;
940       ++iterator_;
941       scheduleTimeout(timeout);
942     }
943   }
944
945   vector<TimePoint> timestamps;
946
947  private:
948   vector<uint32_t> timeouts_;
949   vector<uint32_t>::const_iterator iterator_;
950 };
951
952 /**
953  * Test rescheduling the same timeout multiple times
954  */
955 TEST(EventBaseTest, ReuseTimeout) {
956   EventBase eb;
957
958   vector<uint32_t> timeouts;
959   timeouts.push_back(10);
960   timeouts.push_back(30);
961   timeouts.push_back(15);
962
963   ReschedulingTimeout t(&eb, timeouts);
964   t.start();
965
966   TimePoint start;
967   eb.loop();
968   TimePoint end;
969
970   // Use a higher tolerance than usual.  We're waiting on 3 timeouts
971   // consecutively.  In general, each timeout may go over by a few
972   // milliseconds, and we're tripling this error by witing on 3 timeouts.
973   milliseconds tolerance{6};
974
975   ASSERT_EQ(timeouts.size(), t.timestamps.size());
976   uint32_t total = 0;
977   for (size_t n = 0; n < timeouts.size(); ++n) {
978     total += timeouts[n];
979     T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
980   }
981   T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
982 }
983
984 /**
985  * Test rescheduling a timeout before it has fired
986  */
987 TEST(EventBaseTest, RescheduleTimeout) {
988   EventBase eb;
989
990   TestTimeout t1(&eb);
991   TestTimeout t2(&eb);
992   TestTimeout t3(&eb);
993
994   t1.scheduleTimeout(15);
995   t2.scheduleTimeout(30);
996   t3.scheduleTimeout(30);
997
998   auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
999       &AsyncTimeout::scheduleTimeout);
1000
1001   // after 10ms, reschedule t2 to run sooner than originally scheduled
1002   eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1003   // after 10ms, reschedule t3 to run later than originally scheduled
1004   eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1005
1006   TimePoint start;
1007   eb.loop();
1008   TimePoint end;
1009
1010   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1011   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1012   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1013   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1014 }
1015
1016 /**
1017  * Test cancelling a timeout
1018  */
1019 TEST(EventBaseTest, CancelTimeout) {
1020   EventBase eb;
1021
1022   vector<uint32_t> timeouts;
1023   timeouts.push_back(10);
1024   timeouts.push_back(30);
1025   timeouts.push_back(25);
1026
1027   ReschedulingTimeout t(&eb, timeouts);
1028   t.start();
1029   eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1030
1031   TimePoint start;
1032   eb.loop();
1033   TimePoint end;
1034
1035   ASSERT_EQ(t.timestamps.size(), 2);
1036   T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1037   T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1038   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1039 }
1040
1041 /**
1042  * Test destroying a scheduled timeout object
1043  */
1044 TEST(EventBaseTest, DestroyTimeout) {
1045   class DestroyTimeout : public AsyncTimeout {
1046    public:
1047     DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1048       : AsyncTimeout(eb)
1049       , timeout_(t) {}
1050
1051     void timeoutExpired() noexcept override { delete timeout_; }
1052
1053    private:
1054     AsyncTimeout* timeout_;
1055   };
1056
1057   EventBase eb;
1058
1059   TestTimeout* t1 = new TestTimeout(&eb);
1060   t1->scheduleTimeout(30);
1061
1062   DestroyTimeout dt(&eb, t1);
1063   dt.scheduleTimeout(10);
1064
1065   TimePoint start;
1066   eb.loop();
1067   TimePoint end;
1068
1069   T_CHECK_TIMEOUT(start, end, milliseconds(10));
1070 }
1071
1072
1073 ///////////////////////////////////////////////////////////////////////////
1074 // Test for runInThreadTestFunc()
1075 ///////////////////////////////////////////////////////////////////////////
1076
1077 struct RunInThreadData {
1078   RunInThreadData(int numThreads, int opsPerThread)
1079     : opsPerThread(opsPerThread)
1080     , opsToGo(numThreads*opsPerThread) {}
1081
1082   EventBase evb;
1083   deque< pair<int, int> > values;
1084
1085   int opsPerThread;
1086   int opsToGo;
1087 };
1088
1089 struct RunInThreadArg {
1090   RunInThreadArg(RunInThreadData* data,
1091                  int threadId,
1092                  int value)
1093     : data(data)
1094     , thread(threadId)
1095     , value(value) {}
1096
1097   RunInThreadData* data;
1098   int thread;
1099   int value;
1100 };
1101
1102 void runInThreadTestFunc(RunInThreadArg* arg) {
1103   arg->data->values.emplace_back(arg->thread, arg->value);
1104   RunInThreadData* data = arg->data;
1105   delete arg;
1106
1107   if(--data->opsToGo == 0) {
1108     // Break out of the event base loop if we are the last thread running
1109     data->evb.terminateLoopSoon();
1110   }
1111 }
1112
1113 TEST(EventBaseTest, RunInThread) {
1114   uint32_t numThreads = 50;
1115   uint32_t opsPerThread = 100;
1116   RunInThreadData data(numThreads, opsPerThread);
1117
1118   deque<std::thread> threads;
1119   for (uint32_t i = 0; i < numThreads; ++i) {
1120     threads.emplace_back([i, &data] {
1121         for (int n = 0; n < data.opsPerThread; ++n) {
1122           RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1123           data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1124           usleep(10);
1125         }
1126       });
1127   }
1128
1129   // Add a timeout event to run after 3 seconds.
1130   // Otherwise loop() will return immediately since there are no events to run.
1131   // Once the last thread exits, it will stop the loop().  However, this
1132   // timeout also stops the loop in case there is a bug performing the normal
1133   // stop.
1134   data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1135                          3000);
1136
1137   TimePoint start;
1138   data.evb.loop();
1139   TimePoint end;
1140
1141   // Verify that the loop exited because all threads finished and requested it
1142   // to stop.  This should happen much sooner than the 3 second timeout.
1143   // Assert that it happens in under a second.  (This is still tons of extra
1144   // padding.)
1145
1146   auto timeTaken = std::chrono::duration_cast<milliseconds>(
1147     end.getTime() - start.getTime());
1148   ASSERT_LT(timeTaken.count(), 1000);
1149   VLOG(11) << "Time taken: " << timeTaken.count();
1150
1151   // Verify that we have all of the events from every thread
1152   int expectedValues[numThreads];
1153   for (uint32_t n = 0; n < numThreads; ++n) {
1154     expectedValues[n] = 0;
1155   }
1156   for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1157        it != data.values.end();
1158        ++it) {
1159     int threadID = it->first;
1160     int value = it->second;
1161     ASSERT_EQ(expectedValues[threadID], value);
1162     ++expectedValues[threadID];
1163   }
1164   for (uint32_t n = 0; n < numThreads; ++n) {
1165     ASSERT_EQ(expectedValues[n], opsPerThread);
1166   }
1167
1168   // Wait on all of the threads.
1169   for (auto& thread: threads) {
1170     thread.join();
1171   }
1172 }
1173
1174 //  This test simulates some calls, and verifies that the waiting happens by
1175 //  triggering what otherwise would be race conditions, and trying to detect
1176 //  whether any of the race conditions happened.
1177 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1178   const size_t c = 256;
1179   vector<unique_ptr<atomic<size_t>>> atoms(c);
1180   for (size_t i = 0; i < c; ++i) {
1181     auto& atom = atoms.at(i);
1182     atom = make_unique<atomic<size_t>>(0);
1183   }
1184   vector<thread> threads(c);
1185   for (size_t i = 0; i < c; ++i) {
1186     auto& atom = *atoms.at(i);
1187     auto& th = threads.at(i);
1188     th = thread([&atom] {
1189         EventBase eb;
1190         auto ebth = thread([&]{ eb.loopForever(); });
1191         eb.waitUntilRunning();
1192         eb.runInEventBaseThreadAndWait([&] {
1193           size_t x = 0;
1194           atom.compare_exchange_weak(
1195               x, 1, std::memory_order_release, std::memory_order_relaxed);
1196         });
1197         size_t x = 0;
1198         atom.compare_exchange_weak(
1199             x, 2, std::memory_order_release, std::memory_order_relaxed);
1200         eb.terminateLoopSoon();
1201         ebth.join();
1202     });
1203   }
1204   for (size_t i = 0; i < c; ++i) {
1205     auto& th = threads.at(i);
1206     th.join();
1207   }
1208   size_t sum = 0;
1209   for (auto& atom : atoms) sum += *atom;
1210   EXPECT_EQ(c, sum);
1211 }
1212
1213 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1214   EventBase eb;
1215   thread th(&EventBase::loopForever, &eb);
1216   SCOPE_EXIT {
1217     eb.terminateLoopSoon();
1218     th.join();
1219   };
1220   auto mutated = false;
1221   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1222       mutated = true;
1223   });
1224   EXPECT_TRUE(mutated);
1225 }
1226
1227 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1228   EventBase eb;
1229   thread th(&EventBase::loopForever, &eb);
1230   SCOPE_EXIT {
1231     eb.terminateLoopSoon();
1232     th.join();
1233   };
1234   eb.runInEventBaseThreadAndWait([&] {
1235       auto mutated = false;
1236       eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1237           mutated = true;
1238       });
1239       EXPECT_TRUE(mutated);
1240   });
1241 }
1242
1243 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1244   EventBase eb;
1245   auto mutated = false;
1246   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1247       mutated = true;
1248     });
1249   EXPECT_TRUE(mutated);
1250 }
1251
1252 ///////////////////////////////////////////////////////////////////////////
1253 // Tests for runInLoop()
1254 ///////////////////////////////////////////////////////////////////////////
1255
1256 class CountedLoopCallback : public EventBase::LoopCallback {
1257  public:
1258   CountedLoopCallback(EventBase* eventBase,
1259                       unsigned int count,
1260                       std::function<void()> action =
1261                         std::function<void()>())
1262     : eventBase_(eventBase)
1263     , count_(count)
1264     , action_(action) {}
1265
1266   void runLoopCallback() noexcept override {
1267     --count_;
1268     if (count_ > 0) {
1269       eventBase_->runInLoop(this);
1270     } else if (action_) {
1271       action_();
1272     }
1273   }
1274
1275   unsigned int getCount() const {
1276     return count_;
1277   }
1278
1279  private:
1280   EventBase* eventBase_;
1281   unsigned int count_;
1282   std::function<void()> action_;
1283 };
1284
1285 // Test that EventBase::loop() doesn't exit while there are
1286 // still LoopCallbacks remaining to be invoked.
1287 TEST(EventBaseTest, RepeatedRunInLoop) {
1288   EventBase eventBase;
1289
1290   CountedLoopCallback c(&eventBase, 10);
1291   eventBase.runInLoop(&c);
1292   // The callback shouldn't have run immediately
1293   ASSERT_EQ(c.getCount(), 10);
1294   eventBase.loop();
1295
1296   // loop() should loop until the CountedLoopCallback stops
1297   // re-installing itself.
1298   ASSERT_EQ(c.getCount(), 0);
1299 }
1300
1301 // Test that EventBase::loop() works as expected without time measurements.
1302 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1303   EventBase eventBase(false);
1304
1305   CountedLoopCallback c(&eventBase, 10);
1306   eventBase.runInLoop(&c);
1307   // The callback shouldn't have run immediately
1308   ASSERT_EQ(c.getCount(), 10);
1309   eventBase.loop();
1310
1311   // loop() should loop until the CountedLoopCallback stops
1312   // re-installing itself.
1313   ASSERT_EQ(c.getCount(), 0);
1314 }
1315
1316 // Test runInLoop() calls with terminateLoopSoon()
1317 TEST(EventBaseTest, RunInLoopStopLoop) {
1318   EventBase eventBase;
1319
1320   CountedLoopCallback c1(&eventBase, 20);
1321   CountedLoopCallback c2(&eventBase, 10,
1322                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1323
1324   eventBase.runInLoop(&c1);
1325   eventBase.runInLoop(&c2);
1326   ASSERT_EQ(c1.getCount(), 20);
1327   ASSERT_EQ(c2.getCount(), 10);
1328
1329   eventBase.loopForever();
1330
1331   // c2 should have stopped the loop after 10 iterations
1332   ASSERT_EQ(c2.getCount(), 0);
1333
1334   // We allow the EventBase to run the loop callbacks in whatever order it
1335   // chooses.  We'll accept c1's count being either 10 (if the loop terminated
1336   // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1337   // before c1 ran).
1338   //
1339   // (With the current code, c1 will always run 10 times, but we don't consider
1340   // this a hard API requirement.)
1341   ASSERT_GE(c1.getCount(), 10);
1342   ASSERT_LE(c1.getCount(), 11);
1343 }
1344
1345 TEST(EventBaseTest, TryRunningAfterTerminate) {
1346   EventBase eventBase;
1347   CountedLoopCallback c1(&eventBase, 1,
1348                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1349   eventBase.runInLoop(&c1);
1350   eventBase.loopForever();
1351   bool ran = false;
1352   eventBase.runInEventBaseThread([&]() {
1353     ran = true;
1354   });
1355
1356   ASSERT_FALSE(ran);
1357 }
1358
1359 // Test cancelling runInLoop() callbacks
1360 TEST(EventBaseTest, CancelRunInLoop) {
1361   EventBase eventBase;
1362
1363   CountedLoopCallback c1(&eventBase, 20);
1364   CountedLoopCallback c2(&eventBase, 20);
1365   CountedLoopCallback c3(&eventBase, 20);
1366
1367   std::function<void()> cancelC1Action =
1368     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1369   std::function<void()> cancelC2Action =
1370     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1371
1372   CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1373   CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1374
1375   // Install cancelC1 after c1
1376   eventBase.runInLoop(&c1);
1377   eventBase.runInLoop(&cancelC1);
1378
1379   // Install cancelC2 before c2
1380   eventBase.runInLoop(&cancelC2);
1381   eventBase.runInLoop(&c2);
1382
1383   // Install c3
1384   eventBase.runInLoop(&c3);
1385
1386   ASSERT_EQ(c1.getCount(), 20);
1387   ASSERT_EQ(c2.getCount(), 20);
1388   ASSERT_EQ(c3.getCount(), 20);
1389   ASSERT_EQ(cancelC1.getCount(), 10);
1390   ASSERT_EQ(cancelC2.getCount(), 10);
1391
1392   // Run the loop
1393   eventBase.loop();
1394
1395   // cancelC1 and cancelC3 should have both fired after 10 iterations and
1396   // stopped re-installing themselves
1397   ASSERT_EQ(cancelC1.getCount(), 0);
1398   ASSERT_EQ(cancelC2.getCount(), 0);
1399   // c3 should have continued on for the full 20 iterations
1400   ASSERT_EQ(c3.getCount(), 0);
1401
1402   // c1 and c2 should have both been cancelled on the 10th iteration.
1403   //
1404   // Callbacks are always run in the order they are installed,
1405   // so c1 should have fired 10 times, and been canceled after it ran on the
1406   // 10th iteration.  c2 should have only fired 9 times, because cancelC2 will
1407   // have run before it on the 10th iteration, and cancelled it before it
1408   // fired.
1409   ASSERT_EQ(c1.getCount(), 10);
1410   ASSERT_EQ(c2.getCount(), 11);
1411 }
1412
1413 class TerminateTestCallback : public EventBase::LoopCallback,
1414                               public EventHandler {
1415  public:
1416   TerminateTestCallback(EventBase* eventBase, int fd)
1417     : EventHandler(eventBase, fd),
1418       eventBase_(eventBase),
1419       loopInvocations_(0),
1420       maxLoopInvocations_(0),
1421       eventInvocations_(0),
1422       maxEventInvocations_(0) {}
1423
1424   void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1425     loopInvocations_ = 0;
1426     maxLoopInvocations_ = maxLoopInvocations;
1427     eventInvocations_ = 0;
1428     maxEventInvocations_ = maxEventInvocations;
1429
1430     cancelLoopCallback();
1431     unregisterHandler();
1432   }
1433
1434   void handlerReady(uint16_t /* events */) noexcept override {
1435     // We didn't register with PERSIST, so we will have been automatically
1436     // unregistered already.
1437     ASSERT_FALSE(isHandlerRegistered());
1438
1439     ++eventInvocations_;
1440     if (eventInvocations_ >= maxEventInvocations_) {
1441       return;
1442     }
1443
1444     eventBase_->runInLoop(this);
1445   }
1446   void runLoopCallback() noexcept override {
1447     ++loopInvocations_;
1448     if (loopInvocations_ >= maxLoopInvocations_) {
1449       return;
1450     }
1451
1452     registerHandler(READ);
1453   }
1454
1455   uint32_t getLoopInvocations() const {
1456     return loopInvocations_;
1457   }
1458   uint32_t getEventInvocations() const {
1459     return eventInvocations_;
1460   }
1461
1462  private:
1463   EventBase* eventBase_;
1464   uint32_t loopInvocations_;
1465   uint32_t maxLoopInvocations_;
1466   uint32_t eventInvocations_;
1467   uint32_t maxEventInvocations_;
1468 };
1469
1470 /**
1471  * Test that EventBase::loop() correctly detects when there are no more events
1472  * left to run.
1473  *
1474  * This uses a single callback, which alternates registering itself as a loop
1475  * callback versus a EventHandler callback.  This exercises a regression where
1476  * EventBase::loop() incorrectly exited if there were no more fd handlers
1477  * registered, but a loop callback installed a new fd handler.
1478  */
1479 TEST(EventBaseTest, LoopTermination) {
1480   EventBase eventBase;
1481
1482   // Open a pipe and close the write end,
1483   // so the read endpoint will be readable
1484   int pipeFds[2];
1485   int rc = pipe(pipeFds);
1486   ASSERT_EQ(rc, 0);
1487   close(pipeFds[1]);
1488   TerminateTestCallback callback(&eventBase, pipeFds[0]);
1489
1490   // Test once where the callback will exit after a loop callback
1491   callback.reset(10, 100);
1492   eventBase.runInLoop(&callback);
1493   eventBase.loop();
1494   ASSERT_EQ(callback.getLoopInvocations(), 10);
1495   ASSERT_EQ(callback.getEventInvocations(), 9);
1496
1497   // Test once where the callback will exit after an fd event callback
1498   callback.reset(100, 7);
1499   eventBase.runInLoop(&callback);
1500   eventBase.loop();
1501   ASSERT_EQ(callback.getLoopInvocations(), 7);
1502   ASSERT_EQ(callback.getEventInvocations(), 7);
1503
1504   close(pipeFds[0]);
1505 }
1506
1507 ///////////////////////////////////////////////////////////////////////////
1508 // Tests for latency calculations
1509 ///////////////////////////////////////////////////////////////////////////
1510
1511 class IdleTimeTimeoutSeries : public AsyncTimeout {
1512
1513  public:
1514
1515   explicit IdleTimeTimeoutSeries(EventBase *base,
1516                                  std::deque<std::uint64_t>& timeout) :
1517     AsyncTimeout(base),
1518     timeouts_(0),
1519     timeout_(timeout) {
1520       scheduleTimeout(1);
1521     }
1522
1523     ~IdleTimeTimeoutSeries() override {}
1524
1525     void timeoutExpired() noexcept override {
1526     ++timeouts_;
1527
1528     if(timeout_.empty()){
1529       cancelTimeout();
1530     } else {
1531       uint64_t sleepTime = timeout_.front();
1532       timeout_.pop_front();
1533       if (sleepTime) {
1534         usleep(sleepTime);
1535       }
1536       scheduleTimeout(1);
1537     }
1538   }
1539
1540   int getTimeouts() const {
1541     return timeouts_;
1542   }
1543
1544  private:
1545   int timeouts_;
1546   std::deque<uint64_t>& timeout_;
1547 };
1548
1549 /**
1550  * Verify that idle time is correctly accounted for when decaying our loop
1551  * time.
1552  *
1553  * This works by creating a high loop time (via usleep), expecting a latency
1554  * callback with known value, and then scheduling a timeout for later. This
1555  * later timeout is far enough in the future that the idle time should have
1556  * caused the loop time to decay.
1557  */
1558 TEST(EventBaseTest, IdleTime) {
1559   EventBase eventBase;
1560   eventBase.setLoadAvgMsec(1000);
1561   eventBase.resetLoadAvg(5900.0);
1562   std::deque<uint64_t> timeouts0(4, 8080);
1563   timeouts0.push_front(8000);
1564   timeouts0.push_back(14000);
1565   IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1566   std::deque<uint64_t> timeouts(20, 20);
1567   std::unique_ptr<IdleTimeTimeoutSeries> tos;
1568   int64_t testStart = duration_cast<microseconds>(
1569     std::chrono::steady_clock::now().time_since_epoch()).count();
1570   bool hostOverloaded = false;
1571
1572   int latencyCallbacks = 0;
1573   eventBase.setMaxLatency(6000, [&]() {
1574     ++latencyCallbacks;
1575
1576     switch (latencyCallbacks) {
1577     case 1:
1578       if (tos0.getTimeouts() < 6) {
1579         // This could only happen if the host this test is running
1580         // on is heavily loaded.
1581         int64_t maxLatencyReached = duration_cast<microseconds>(
1582             std::chrono::steady_clock::now().time_since_epoch()).count();
1583         ASSERT_LE(43800, maxLatencyReached - testStart);
1584         hostOverloaded = true;
1585         break;
1586       }
1587       ASSERT_EQ(6, tos0.getTimeouts());
1588       ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1589       ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1590       tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1591       break;
1592
1593     default:
1594       FAIL() << "Unexpected latency callback";
1595       break;
1596     }
1597   });
1598
1599   // Kick things off with an "immedite" timeout
1600   tos0.scheduleTimeout(1);
1601
1602   eventBase.loop();
1603
1604   if (hostOverloaded) {
1605     return;
1606   }
1607
1608   ASSERT_EQ(1, latencyCallbacks);
1609   ASSERT_EQ(7, tos0.getTimeouts());
1610   ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1611   ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1612   ASSERT_TRUE(!!tos);
1613   ASSERT_EQ(21, tos->getTimeouts());
1614 }
1615
1616 /**
1617  * Test that thisLoop functionality works with terminateLoopSoon
1618  */
1619 TEST(EventBaseTest, ThisLoop) {
1620   EventBase eb;
1621   bool runInLoop = false;
1622   bool runThisLoop = false;
1623
1624   eb.runInLoop([&](){
1625       eb.terminateLoopSoon();
1626       eb.runInLoop([&]() {
1627           runInLoop = true;
1628         });
1629       eb.runInLoop([&]() {
1630           runThisLoop = true;
1631         }, true);
1632     }, true);
1633   eb.loopForever();
1634
1635   // Should not work
1636   ASSERT_FALSE(runInLoop);
1637   // Should work with thisLoop
1638   ASSERT_TRUE(runThisLoop);
1639 }
1640
1641 TEST(EventBaseTest, EventBaseThreadLoop) {
1642   EventBase base;
1643   bool ran = false;
1644
1645   base.runInEventBaseThread([&](){
1646     ran = true;
1647   });
1648   base.loop();
1649
1650   ASSERT_EQ(true, ran);
1651 }
1652
1653 TEST(EventBaseTest, EventBaseThreadName) {
1654   EventBase base;
1655   base.setName("foo");
1656   base.loop();
1657
1658 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1659   char name[16];
1660   pthread_getname_np(pthread_self(), name, 16);
1661   ASSERT_EQ(0, strcmp("foo", name));
1662 #endif
1663 }
1664
1665 TEST(EventBaseTest, RunBeforeLoop) {
1666   EventBase base;
1667   CountedLoopCallback cb(&base, 1, [&](){
1668     base.terminateLoopSoon();
1669   });
1670   base.runBeforeLoop(&cb);
1671   base.loopForever();
1672   ASSERT_EQ(cb.getCount(), 0);
1673 }
1674
1675 TEST(EventBaseTest, RunBeforeLoopWait) {
1676   EventBase base;
1677   CountedLoopCallback cb(&base, 1);
1678   base.tryRunAfterDelay([&](){
1679       base.terminateLoopSoon();
1680     }, 500);
1681   base.runBeforeLoop(&cb);
1682   base.loopForever();
1683
1684   // Check that we only ran once, and did not loop multiple times.
1685   ASSERT_EQ(cb.getCount(), 0);
1686 }
1687
1688 class PipeHandler : public EventHandler {
1689 public:
1690   PipeHandler(EventBase* eventBase, int fd)
1691     : EventHandler(eventBase, fd) {}
1692
1693   void handlerReady(uint16_t /* events */) noexcept override { abort(); }
1694 };
1695
1696 TEST(EventBaseTest, StopBeforeLoop) {
1697   EventBase evb;
1698
1699   // Give the evb something to do.
1700   int p[2];
1701   ASSERT_EQ(0, pipe(p));
1702   PipeHandler handler(&evb, p[0]);
1703   handler.registerHandler(EventHandler::READ);
1704
1705   // It's definitely not running yet
1706   evb.terminateLoopSoon();
1707
1708   // let it run, it should exit quickly.
1709   std::thread t([&] { evb.loop(); });
1710   t.join();
1711
1712   handler.unregisterHandler();
1713   close(p[0]);
1714   close(p[1]);
1715
1716   SUCCEED();
1717 }
1718
1719 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1720   bool ran = false;
1721
1722   {
1723     EventBase base;
1724     base.runInEventBaseThread([&](){
1725       ran = true;
1726     });
1727   }
1728
1729   ASSERT_TRUE(ran);
1730 }
1731
1732 TEST(EventBaseTest, LoopKeepAlive) {
1733   EventBase evb;
1734
1735   bool done = false;
1736   std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ] {
1737     /* sleep override */ std::this_thread::sleep_for(
1738         std::chrono::milliseconds(100));
1739     evb.runInEventBaseThread([&] { done = true; });
1740   });
1741
1742   evb.loop();
1743
1744   ASSERT_TRUE(done);
1745
1746   t.join();
1747 }
1748
1749 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1750   EventBase evb;
1751
1752   bool done = false;
1753   std::thread t;
1754
1755   evb.runInEventBaseThread([&] {
1756     t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ] {
1757       /* sleep override */ std::this_thread::sleep_for(
1758           std::chrono::milliseconds(100));
1759       evb.runInEventBaseThread([&] { done = true; });
1760     });
1761   });
1762
1763   evb.loop();
1764
1765   ASSERT_TRUE(done);
1766
1767   t.join();
1768 }