Mark a few constants as constexpr so VLAs aren't required
[folly.git] / folly / io / async / test / EventBaseTest.cpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 #include <folly/Memory.h>
20
21 #include <folly/io/async/AsyncTimeout.h>
22 #include <folly/io/async/EventBase.h>
23 #include <folly/io/async/EventHandler.h>
24 #include <folly/io/async/test/SocketPair.h>
25 #include <folly/io/async/test/Util.h>
26 #include <folly/portability/Unistd.h>
27
28 #include <atomic>
29 #include <iostream>
30 #include <memory>
31 #include <thread>
32
33 using std::atomic;
34 using std::deque;
35 using std::pair;
36 using std::vector;
37 using std::unique_ptr;
38 using std::thread;
39 using std::make_pair;
40 using std::cerr;
41 using std::endl;
42 using std::chrono::milliseconds;
43 using std::chrono::microseconds;
44 using std::chrono::duration_cast;
45
46 using namespace folly;
47
48 ///////////////////////////////////////////////////////////////////////////
49 // Tests for read and write events
50 ///////////////////////////////////////////////////////////////////////////
51
52 enum { BUF_SIZE = 4096 };
53
54 ssize_t writeToFD(int fd, size_t length) {
55   // write an arbitrary amount of data to the fd
56   auto bufv = vector<char>(length);
57   auto buf = bufv.data();
58   memset(buf, 'a', length);
59   ssize_t rc = write(fd, buf, length);
60   CHECK_EQ(rc, length);
61   return rc;
62 }
63
64 size_t writeUntilFull(int fd) {
65   // Write to the fd until EAGAIN is returned
66   size_t bytesWritten = 0;
67   char buf[BUF_SIZE];
68   memset(buf, 'a', sizeof(buf));
69   while (true) {
70     ssize_t rc = write(fd, buf, sizeof(buf));
71     if (rc < 0) {
72       CHECK_EQ(errno, EAGAIN);
73       break;
74     } else {
75       bytesWritten += rc;
76     }
77   }
78   return bytesWritten;
79 }
80
81 ssize_t readFromFD(int fd, size_t length) {
82   // write an arbitrary amount of data to the fd
83   auto buf = vector<char>(length);
84   return read(fd, buf.data(), length);
85 }
86
87 size_t readUntilEmpty(int fd) {
88   // Read from the fd until EAGAIN is returned
89   char buf[BUF_SIZE];
90   size_t bytesRead = 0;
91   while (true) {
92     int rc = read(fd, buf, sizeof(buf));
93     if (rc == 0) {
94       CHECK(false) << "unexpected EOF";
95     } else if (rc < 0) {
96       CHECK_EQ(errno, EAGAIN);
97       break;
98     } else {
99       bytesRead += rc;
100     }
101   }
102   return bytesRead;
103 }
104
105 void checkReadUntilEmpty(int fd, size_t expectedLength) {
106   ASSERT_EQ(readUntilEmpty(fd), expectedLength);
107 }
108
109 struct ScheduledEvent {
110   int milliseconds;
111   uint16_t events;
112   size_t length;
113   ssize_t result;
114
115   void perform(int fd) {
116     if (events & EventHandler::READ) {
117       if (length == 0) {
118         result = readUntilEmpty(fd);
119       } else {
120         result = readFromFD(fd, length);
121       }
122     }
123     if (events & EventHandler::WRITE) {
124       if (length == 0) {
125         result = writeUntilFull(fd);
126       } else {
127         result = writeToFD(fd, length);
128       }
129     }
130   }
131 };
132
133 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
134   for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
135     eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
136                              ev->milliseconds);
137   }
138 }
139
140 class TestHandler : public EventHandler {
141  public:
142   TestHandler(EventBase* eventBase, int fd)
143     : EventHandler(eventBase, fd), fd_(fd) {}
144
145   void handlerReady(uint16_t events) noexcept override {
146     ssize_t bytesRead = 0;
147     ssize_t bytesWritten = 0;
148     if (events & READ) {
149       // Read all available data, so EventBase will stop calling us
150       // until new data becomes available
151       bytesRead = readUntilEmpty(fd_);
152     }
153     if (events & WRITE) {
154       // Write until the pipe buffer is full, so EventBase will stop calling
155       // us until the other end has read some data
156       bytesWritten = writeUntilFull(fd_);
157     }
158
159     log.emplace_back(events, bytesRead, bytesWritten);
160   }
161
162   struct EventRecord {
163     EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
164       : events(events)
165       , timestamp()
166       , bytesRead(bytesRead)
167       , bytesWritten(bytesWritten) {}
168
169     uint16_t events;
170     TimePoint timestamp;
171     ssize_t bytesRead;
172     ssize_t bytesWritten;
173   };
174
175   deque<EventRecord> log;
176
177  private:
178   int fd_;
179 };
180
181 /**
182  * Test a READ event
183  */
184 TEST(EventBaseTest, ReadEvent) {
185   EventBase eb;
186   SocketPair sp;
187
188   // Register for read events
189   TestHandler handler(&eb, sp[0]);
190   handler.registerHandler(EventHandler::READ);
191
192   // Register timeouts to perform two write events
193   ScheduledEvent events[] = {
194     { 10, EventHandler::WRITE, 2345, 0 },
195     { 160, EventHandler::WRITE, 99, 0 },
196     { 0, 0, 0, 0 },
197   };
198   scheduleEvents(&eb, sp[1], events);
199
200   // Loop
201   TimePoint start;
202   eb.loop();
203   TimePoint end;
204
205   // Since we didn't use the EventHandler::PERSIST flag, the handler should
206   // have received the first read, then unregistered itself.  Check that only
207   // the first chunk of data was received.
208   ASSERT_EQ(handler.log.size(), 1);
209   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
210   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
211                   milliseconds(events[0].milliseconds), milliseconds(90));
212   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
213   ASSERT_EQ(handler.log[0].bytesWritten, 0);
214   T_CHECK_TIMEOUT(start, end,
215                   milliseconds(events[1].milliseconds), milliseconds(30));
216
217   // Make sure the second chunk of data is still waiting to be read.
218   size_t bytesRemaining = readUntilEmpty(sp[0]);
219   ASSERT_EQ(bytesRemaining, events[1].length);
220 }
221
222 /**
223  * Test (READ | PERSIST)
224  */
225 TEST(EventBaseTest, ReadPersist) {
226   EventBase eb;
227   SocketPair sp;
228
229   // Register for read events
230   TestHandler handler(&eb, sp[0]);
231   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
232
233   // Register several timeouts to perform writes
234   ScheduledEvent events[] = {
235     { 10,  EventHandler::WRITE, 1024, 0 },
236     { 20,  EventHandler::WRITE, 2211, 0 },
237     { 30,  EventHandler::WRITE, 4096, 0 },
238     { 100, EventHandler::WRITE, 100,  0 },
239     { 0, 0, 0, 0 },
240   };
241   scheduleEvents(&eb, sp[1], events);
242
243   // Schedule a timeout to unregister the handler after the third write
244   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
245
246   // Loop
247   TimePoint start;
248   eb.loop();
249   TimePoint end;
250
251   // The handler should have received the first 3 events,
252   // then been unregistered after that.
253   ASSERT_EQ(handler.log.size(), 3);
254   for (int n = 0; n < 3; ++n) {
255     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
256     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
257                     milliseconds(events[n].milliseconds));
258     ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
259     ASSERT_EQ(handler.log[n].bytesWritten, 0);
260   }
261   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
262
263   // Make sure the data from the last write is still waiting to be read
264   size_t bytesRemaining = readUntilEmpty(sp[0]);
265   ASSERT_EQ(bytesRemaining, events[3].length);
266 }
267
268 /**
269  * Test registering for READ when the socket is immediately readable
270  */
271 TEST(EventBaseTest, ReadImmediate) {
272   EventBase eb;
273   SocketPair sp;
274
275   // Write some data to the socket so the other end will
276   // be immediately readable
277   size_t dataLength = 1234;
278   writeToFD(sp[1], dataLength);
279
280   // Register for read events
281   TestHandler handler(&eb, sp[0]);
282   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
283
284   // Register a timeout to perform another write
285   ScheduledEvent events[] = {
286     { 10, EventHandler::WRITE, 2345, 0 },
287     { 0, 0, 0, 0 },
288   };
289   scheduleEvents(&eb, sp[1], events);
290
291   // Schedule a timeout to unregister the handler
292   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
293
294   // Loop
295   TimePoint start;
296   eb.loop();
297   TimePoint end;
298
299   ASSERT_EQ(handler.log.size(), 2);
300
301   // There should have been 1 event for immediate readability
302   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
303   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
304   ASSERT_EQ(handler.log[0].bytesRead, dataLength);
305   ASSERT_EQ(handler.log[0].bytesWritten, 0);
306
307   // There should be another event after the timeout wrote more data
308   ASSERT_EQ(handler.log[1].events, EventHandler::READ);
309   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
310                   milliseconds(events[0].milliseconds));
311   ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
312   ASSERT_EQ(handler.log[1].bytesWritten, 0);
313
314   T_CHECK_TIMEOUT(start, end, milliseconds(20));
315 }
316
317 /**
318  * Test a WRITE event
319  */
320 TEST(EventBaseTest, WriteEvent) {
321   EventBase eb;
322   SocketPair sp;
323
324   // Fill up the write buffer before starting
325   size_t initialBytesWritten = writeUntilFull(sp[0]);
326
327   // Register for write events
328   TestHandler handler(&eb, sp[0]);
329   handler.registerHandler(EventHandler::WRITE);
330
331   // Register timeouts to perform two reads
332   ScheduledEvent events[] = {
333     { 10, EventHandler::READ, 0, 0 },
334     { 60, EventHandler::READ, 0, 0 },
335     { 0, 0, 0, 0 },
336   };
337   scheduleEvents(&eb, sp[1], events);
338
339   // Loop
340   TimePoint start;
341   eb.loop();
342   TimePoint end;
343
344   // Since we didn't use the EventHandler::PERSIST flag, the handler should
345   // have only been able to write once, then unregistered itself.
346   ASSERT_EQ(handler.log.size(), 1);
347   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
348   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
349                   milliseconds(events[0].milliseconds));
350   ASSERT_EQ(handler.log[0].bytesRead, 0);
351   ASSERT_GT(handler.log[0].bytesWritten, 0);
352   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
353
354   ASSERT_EQ(events[0].result, initialBytesWritten);
355   ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
356 }
357
358 /**
359  * Test (WRITE | PERSIST)
360  */
361 TEST(EventBaseTest, WritePersist) {
362   EventBase eb;
363   SocketPair sp;
364
365   // Fill up the write buffer before starting
366   size_t initialBytesWritten = writeUntilFull(sp[0]);
367
368   // Register for write events
369   TestHandler handler(&eb, sp[0]);
370   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
371
372   // Register several timeouts to read from the socket at several intervals
373   ScheduledEvent events[] = {
374     { 10,  EventHandler::READ, 0, 0 },
375     { 40,  EventHandler::READ, 0, 0 },
376     { 70,  EventHandler::READ, 0, 0 },
377     { 100, EventHandler::READ, 0, 0 },
378     { 0, 0, 0, 0 },
379   };
380   scheduleEvents(&eb, sp[1], events);
381
382   // Schedule a timeout to unregister the handler after the third read
383   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
384
385   // Loop
386   TimePoint start;
387   eb.loop();
388   TimePoint end;
389
390   // The handler should have received the first 3 events,
391   // then been unregistered after that.
392   ASSERT_EQ(handler.log.size(), 3);
393   ASSERT_EQ(events[0].result, initialBytesWritten);
394   for (int n = 0; n < 3; ++n) {
395     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
396     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
397                     milliseconds(events[n].milliseconds));
398     ASSERT_EQ(handler.log[n].bytesRead, 0);
399     ASSERT_GT(handler.log[n].bytesWritten, 0);
400     ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
401   }
402   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
403 }
404
405 /**
406  * Test registering for WRITE when the socket is immediately writable
407  */
408 TEST(EventBaseTest, WriteImmediate) {
409   EventBase eb;
410   SocketPair sp;
411
412   // Register for write events
413   TestHandler handler(&eb, sp[0]);
414   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
415
416   // Register a timeout to perform a read
417   ScheduledEvent events[] = {
418     { 10, EventHandler::READ, 0, 0 },
419     { 0, 0, 0, 0 },
420   };
421   scheduleEvents(&eb, sp[1], events);
422
423   // Schedule a timeout to unregister the handler
424   int64_t unregisterTimeout = 40;
425   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
426                    unregisterTimeout);
427
428   // Loop
429   TimePoint start;
430   eb.loop();
431   TimePoint end;
432
433   ASSERT_EQ(handler.log.size(), 2);
434
435   // Since the socket buffer was initially empty,
436   // there should have been 1 event for immediate writability
437   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
438   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
439   ASSERT_EQ(handler.log[0].bytesRead, 0);
440   ASSERT_GT(handler.log[0].bytesWritten, 0);
441
442   // There should be another event after the timeout wrote more data
443   ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
444   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
445                   milliseconds(events[0].milliseconds));
446   ASSERT_EQ(handler.log[1].bytesRead, 0);
447   ASSERT_GT(handler.log[1].bytesWritten, 0);
448
449   T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
450 }
451
452 /**
453  * Test (READ | WRITE) when the socket becomes readable first
454  */
455 TEST(EventBaseTest, ReadWrite) {
456   EventBase eb;
457   SocketPair sp;
458
459   // Fill up the write buffer before starting
460   size_t sock0WriteLength = writeUntilFull(sp[0]);
461
462   // Register for read and write events
463   TestHandler handler(&eb, sp[0]);
464   handler.registerHandler(EventHandler::READ_WRITE);
465
466   // Register timeouts to perform a write then a read.
467   ScheduledEvent events[] = {
468     { 10, EventHandler::WRITE, 2345, 0 },
469     { 40, EventHandler::READ, 0, 0 },
470     { 0, 0, 0, 0 },
471   };
472   scheduleEvents(&eb, sp[1], events);
473
474   // Loop
475   TimePoint start;
476   eb.loop();
477   TimePoint end;
478
479   // Since we didn't use the EventHandler::PERSIST flag, the handler should
480   // have only noticed readability, then unregistered itself.  Check that only
481   // one event was logged.
482   ASSERT_EQ(handler.log.size(), 1);
483   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
484   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
485                   milliseconds(events[0].milliseconds));
486   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
487   ASSERT_EQ(handler.log[0].bytesWritten, 0);
488   ASSERT_EQ(events[1].result, sock0WriteLength);
489   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
490 }
491
492 /**
493  * Test (READ | WRITE) when the socket becomes writable first
494  */
495 TEST(EventBaseTest, WriteRead) {
496   EventBase eb;
497   SocketPair sp;
498
499   // Fill up the write buffer before starting
500   size_t sock0WriteLength = writeUntilFull(sp[0]);
501
502   // Register for read and write events
503   TestHandler handler(&eb, sp[0]);
504   handler.registerHandler(EventHandler::READ_WRITE);
505
506   // Register timeouts to perform a read then a write.
507   size_t sock1WriteLength = 2345;
508   ScheduledEvent events[] = {
509     { 10, EventHandler::READ, 0, 0 },
510     { 40, EventHandler::WRITE, sock1WriteLength, 0 },
511     { 0, 0, 0, 0 },
512   };
513   scheduleEvents(&eb, sp[1], events);
514
515   // Loop
516   TimePoint start;
517   eb.loop();
518   TimePoint end;
519
520   // Since we didn't use the EventHandler::PERSIST flag, the handler should
521   // have only noticed writability, then unregistered itself.  Check that only
522   // one event was logged.
523   ASSERT_EQ(handler.log.size(), 1);
524   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
525   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
526                   milliseconds(events[0].milliseconds));
527   ASSERT_EQ(handler.log[0].bytesRead, 0);
528   ASSERT_GT(handler.log[0].bytesWritten, 0);
529   ASSERT_EQ(events[0].result, sock0WriteLength);
530   ASSERT_EQ(events[1].result, sock1WriteLength);
531   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
532
533   // Make sure the written data is still waiting to be read.
534   size_t bytesRemaining = readUntilEmpty(sp[0]);
535   ASSERT_EQ(bytesRemaining, events[1].length);
536 }
537
538 /**
539  * Test (READ | WRITE) when the socket becomes readable and writable
540  * at the same time.
541  */
542 TEST(EventBaseTest, ReadWriteSimultaneous) {
543   EventBase eb;
544   SocketPair sp;
545
546   // Fill up the write buffer before starting
547   size_t sock0WriteLength = writeUntilFull(sp[0]);
548
549   // Register for read and write events
550   TestHandler handler(&eb, sp[0]);
551   handler.registerHandler(EventHandler::READ_WRITE);
552
553   // Register a timeout to perform a read and write together
554   ScheduledEvent events[] = {
555     { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
556     { 0, 0, 0, 0 },
557   };
558   scheduleEvents(&eb, sp[1], events);
559
560   // Loop
561   TimePoint start;
562   eb.loop();
563   TimePoint end;
564
565   // It's not strictly required that the EventBase register us about both
566   // events in the same call.  So, it's possible that if the EventBase
567   // implementation changes this test could start failing, and it wouldn't be
568   // considered breaking the API.  However for now it's nice to exercise this
569   // code path.
570   ASSERT_EQ(handler.log.size(), 1);
571   ASSERT_EQ(handler.log[0].events,
572                     EventHandler::READ | EventHandler::WRITE);
573   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
574                   milliseconds(events[0].milliseconds));
575   ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
576   ASSERT_GT(handler.log[0].bytesWritten, 0);
577   T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
578 }
579
580 /**
581  * Test (READ | WRITE | PERSIST)
582  */
583 TEST(EventBaseTest, ReadWritePersist) {
584   EventBase eb;
585   SocketPair sp;
586
587   // Register for read and write events
588   TestHandler handler(&eb, sp[0]);
589   handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
590                           EventHandler::PERSIST);
591
592   // Register timeouts to perform several reads and writes
593   ScheduledEvent events[] = {
594     { 10, EventHandler::WRITE, 2345, 0 },
595     { 20, EventHandler::READ, 0, 0 },
596     { 35, EventHandler::WRITE, 200, 0 },
597     { 45, EventHandler::WRITE, 15, 0 },
598     { 55, EventHandler::READ, 0, 0 },
599     { 120, EventHandler::WRITE, 2345, 0 },
600     { 0, 0, 0, 0 },
601   };
602   scheduleEvents(&eb, sp[1], events);
603
604   // Schedule a timeout to unregister the handler
605   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
606
607   // Loop
608   TimePoint start;
609   eb.loop();
610   TimePoint end;
611
612   ASSERT_EQ(handler.log.size(), 6);
613
614   // Since we didn't fill up the write buffer immediately, there should
615   // be an immediate event for writability.
616   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
617   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
618   ASSERT_EQ(handler.log[0].bytesRead, 0);
619   ASSERT_GT(handler.log[0].bytesWritten, 0);
620
621   // Events 1 through 5 should correspond to the scheduled events
622   for (int n = 1; n < 6; ++n) {
623     ScheduledEvent* event = &events[n - 1];
624     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
625                     milliseconds(event->milliseconds));
626     if (event->events == EventHandler::READ) {
627       ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
628       ASSERT_EQ(handler.log[n].bytesRead, 0);
629       ASSERT_GT(handler.log[n].bytesWritten, 0);
630     } else {
631       ASSERT_EQ(handler.log[n].events, EventHandler::READ);
632       ASSERT_EQ(handler.log[n].bytesRead, event->length);
633       ASSERT_EQ(handler.log[n].bytesWritten, 0);
634     }
635   }
636
637   // The timeout should have unregistered the handler before the last write.
638   // Make sure that data is still waiting to be read
639   size_t bytesRemaining = readUntilEmpty(sp[0]);
640   ASSERT_EQ(bytesRemaining, events[5].length);
641 }
642
643
644 class PartialReadHandler : public TestHandler {
645  public:
646   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
647     : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
648
649   void handlerReady(uint16_t events) noexcept override {
650     assert(events == EventHandler::READ);
651     ssize_t bytesRead = readFromFD(fd_, readLength_);
652     log.emplace_back(events, bytesRead, 0);
653   }
654
655  private:
656   int fd_;
657   size_t readLength_;
658 };
659
660 /**
661  * Test reading only part of the available data when a read event is fired.
662  * When PERSIST is used, make sure the handler gets notified again the next
663  * time around the loop.
664  */
665 TEST(EventBaseTest, ReadPartial) {
666   EventBase eb;
667   SocketPair sp;
668
669   // Register for read events
670   size_t readLength = 100;
671   PartialReadHandler handler(&eb, sp[0], readLength);
672   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
673
674   // Register a timeout to perform a single write,
675   // with more data than PartialReadHandler will read at once
676   ScheduledEvent events[] = {
677     { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
678     { 0, 0, 0, 0 },
679   };
680   scheduleEvents(&eb, sp[1], events);
681
682   // Schedule a timeout to unregister the handler
683   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
684
685   // Loop
686   TimePoint start;
687   eb.loop();
688   TimePoint end;
689
690   ASSERT_EQ(handler.log.size(), 4);
691
692   // The first 3 invocations should read readLength bytes each
693   for (int n = 0; n < 3; ++n) {
694     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
695     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
696                     milliseconds(events[0].milliseconds));
697     ASSERT_EQ(handler.log[n].bytesRead, readLength);
698     ASSERT_EQ(handler.log[n].bytesWritten, 0);
699   }
700   // The last read only has readLength/2 bytes
701   ASSERT_EQ(handler.log[3].events, EventHandler::READ);
702   T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
703                   milliseconds(events[0].milliseconds));
704   ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
705   ASSERT_EQ(handler.log[3].bytesWritten, 0);
706 }
707
708
709 class PartialWriteHandler : public TestHandler {
710  public:
711   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
712     : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
713
714   void handlerReady(uint16_t events) noexcept override {
715     assert(events == EventHandler::WRITE);
716     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
717     log.emplace_back(events, 0, bytesWritten);
718   }
719
720  private:
721   int fd_;
722   size_t writeLength_;
723 };
724
725 /**
726  * Test writing without completely filling up the write buffer when the fd
727  * becomes writable.  When PERSIST is used, make sure the handler gets
728  * notified again the next time around the loop.
729  */
730 TEST(EventBaseTest, WritePartial) {
731   EventBase eb;
732   SocketPair sp;
733
734   // Fill up the write buffer before starting
735   size_t initialBytesWritten = writeUntilFull(sp[0]);
736
737   // Register for write events
738   size_t writeLength = 100;
739   PartialWriteHandler handler(&eb, sp[0], writeLength);
740   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
741
742   // Register a timeout to read, so that more data can be written
743   ScheduledEvent events[] = {
744     { 10, EventHandler::READ, 0, 0 },
745     { 0, 0, 0, 0 },
746   };
747   scheduleEvents(&eb, sp[1], events);
748
749   // Schedule a timeout to unregister the handler
750   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
751
752   // Loop
753   TimePoint start;
754   eb.loop();
755   TimePoint end;
756
757   // Depending on how big the socket buffer is, there will be multiple writes
758   // Only check the first 5
759   int numChecked = 5;
760   ASSERT_GE(handler.log.size(), numChecked);
761   ASSERT_EQ(events[0].result, initialBytesWritten);
762
763   // The first 3 invocations should read writeLength bytes each
764   for (int n = 0; n < numChecked; ++n) {
765     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
766     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
767                     milliseconds(events[0].milliseconds));
768     ASSERT_EQ(handler.log[n].bytesRead, 0);
769     ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
770   }
771 }
772
773
774 /**
775  * Test destroying a registered EventHandler
776  */
777 TEST(EventBaseTest, DestroyHandler) {
778   class DestroyHandler : public AsyncTimeout {
779    public:
780     DestroyHandler(EventBase* eb, EventHandler* h)
781       : AsyncTimeout(eb)
782       , handler_(h) {}
783
784     void timeoutExpired() noexcept override { delete handler_; }
785
786    private:
787     EventHandler* handler_;
788   };
789
790   EventBase eb;
791   SocketPair sp;
792
793   // Fill up the write buffer before starting
794   size_t initialBytesWritten = writeUntilFull(sp[0]);
795
796   // Register for write events
797   TestHandler* handler = new TestHandler(&eb, sp[0]);
798   handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
799
800   // After 10ms, read some data, so that the handler
801   // will be notified that it can write.
802   eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
803                    10);
804
805   // Start a timer to destroy the handler after 25ms
806   // This mainly just makes sure the code doesn't break or assert
807   DestroyHandler dh(&eb, handler);
808   dh.scheduleTimeout(25);
809
810   TimePoint start;
811   eb.loop();
812   TimePoint end;
813
814   // Make sure the EventHandler was uninstalled properly when it was
815   // destroyed, and the EventBase loop exited
816   T_CHECK_TIMEOUT(start, end, milliseconds(25));
817
818   // Make sure that the handler wrote data to the socket
819   // before it was destroyed
820   size_t bytesRemaining = readUntilEmpty(sp[1]);
821   ASSERT_GT(bytesRemaining, 0);
822 }
823
824
825 ///////////////////////////////////////////////////////////////////////////
826 // Tests for timeout events
827 ///////////////////////////////////////////////////////////////////////////
828
829 TEST(EventBaseTest, RunAfterDelay) {
830   EventBase eb;
831
832   TimePoint timestamp1(false);
833   TimePoint timestamp2(false);
834   TimePoint timestamp3(false);
835   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
836   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
837   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
838
839   TimePoint start;
840   eb.loop();
841   TimePoint end;
842
843   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
844   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
845   T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
846   T_CHECK_TIMEOUT(start, end, milliseconds(40));
847 }
848
849 /**
850  * Test the behavior of tryRunAfterDelay() when some timeouts are
851  * still scheduled when the EventBase is destroyed.
852  */
853 TEST(EventBaseTest, RunAfterDelayDestruction) {
854   TimePoint timestamp1(false);
855   TimePoint timestamp2(false);
856   TimePoint timestamp3(false);
857   TimePoint timestamp4(false);
858   TimePoint start(false);
859   TimePoint end(false);
860
861   {
862     EventBase eb;
863
864     // Run two normal timeouts
865     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
866     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
867
868     // Schedule a timeout to stop the event loop after 40ms
869     eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
870
871     // Schedule 2 timeouts that would fire after the event loop stops
872     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
873     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
874
875     start.reset();
876     eb.loop();
877     end.reset();
878   }
879
880   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
881   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
882   T_CHECK_TIMEOUT(start, end, milliseconds(40));
883
884   ASSERT_TRUE(timestamp3.isUnset());
885   ASSERT_TRUE(timestamp4.isUnset());
886
887   // Ideally this test should be run under valgrind to ensure that no
888   // memory is leaked.
889 }
890
891 class TestTimeout : public AsyncTimeout {
892  public:
893   explicit TestTimeout(EventBase* eventBase)
894     : AsyncTimeout(eventBase)
895     , timestamp(false) {}
896
897   void timeoutExpired() noexcept override { timestamp.reset(); }
898
899   TimePoint timestamp;
900 };
901
902 TEST(EventBaseTest, BasicTimeouts) {
903   EventBase eb;
904
905   TestTimeout t1(&eb);
906   TestTimeout t2(&eb);
907   TestTimeout t3(&eb);
908   t1.scheduleTimeout(10);
909   t2.scheduleTimeout(20);
910   t3.scheduleTimeout(40);
911
912   TimePoint start;
913   eb.loop();
914   TimePoint end;
915
916   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
917   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
918   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
919   T_CHECK_TIMEOUT(start, end, milliseconds(40));
920 }
921
922 class ReschedulingTimeout : public AsyncTimeout {
923  public:
924   ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
925     : AsyncTimeout(evb)
926     , timeouts_(timeouts)
927     , iterator_(timeouts_.begin()) {}
928
929   void start() {
930     reschedule();
931   }
932
933   void timeoutExpired() noexcept override {
934     timestamps.emplace_back();
935     reschedule();
936   }
937
938   void reschedule() {
939     if (iterator_ != timeouts_.end()) {
940       uint32_t timeout = *iterator_;
941       ++iterator_;
942       scheduleTimeout(timeout);
943     }
944   }
945
946   vector<TimePoint> timestamps;
947
948  private:
949   vector<uint32_t> timeouts_;
950   vector<uint32_t>::const_iterator iterator_;
951 };
952
953 /**
954  * Test rescheduling the same timeout multiple times
955  */
956 TEST(EventBaseTest, ReuseTimeout) {
957   EventBase eb;
958
959   vector<uint32_t> timeouts;
960   timeouts.push_back(10);
961   timeouts.push_back(30);
962   timeouts.push_back(15);
963
964   ReschedulingTimeout t(&eb, timeouts);
965   t.start();
966
967   TimePoint start;
968   eb.loop();
969   TimePoint end;
970
971   // Use a higher tolerance than usual.  We're waiting on 3 timeouts
972   // consecutively.  In general, each timeout may go over by a few
973   // milliseconds, and we're tripling this error by witing on 3 timeouts.
974   milliseconds tolerance{6};
975
976   ASSERT_EQ(timeouts.size(), t.timestamps.size());
977   uint32_t total = 0;
978   for (size_t n = 0; n < timeouts.size(); ++n) {
979     total += timeouts[n];
980     T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
981   }
982   T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
983 }
984
985 /**
986  * Test rescheduling a timeout before it has fired
987  */
988 TEST(EventBaseTest, RescheduleTimeout) {
989   EventBase eb;
990
991   TestTimeout t1(&eb);
992   TestTimeout t2(&eb);
993   TestTimeout t3(&eb);
994
995   t1.scheduleTimeout(15);
996   t2.scheduleTimeout(30);
997   t3.scheduleTimeout(30);
998
999   auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1000       &AsyncTimeout::scheduleTimeout);
1001
1002   // after 10ms, reschedule t2 to run sooner than originally scheduled
1003   eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1004   // after 10ms, reschedule t3 to run later than originally scheduled
1005   eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1006
1007   TimePoint start;
1008   eb.loop();
1009   TimePoint end;
1010
1011   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1012   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1013   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1014   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1015 }
1016
1017 /**
1018  * Test cancelling a timeout
1019  */
1020 TEST(EventBaseTest, CancelTimeout) {
1021   EventBase eb;
1022
1023   vector<uint32_t> timeouts;
1024   timeouts.push_back(10);
1025   timeouts.push_back(30);
1026   timeouts.push_back(25);
1027
1028   ReschedulingTimeout t(&eb, timeouts);
1029   t.start();
1030   eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1031
1032   TimePoint start;
1033   eb.loop();
1034   TimePoint end;
1035
1036   ASSERT_EQ(t.timestamps.size(), 2);
1037   T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1038   T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1039   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1040 }
1041
1042 /**
1043  * Test destroying a scheduled timeout object
1044  */
1045 TEST(EventBaseTest, DestroyTimeout) {
1046   class DestroyTimeout : public AsyncTimeout {
1047    public:
1048     DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1049       : AsyncTimeout(eb)
1050       , timeout_(t) {}
1051
1052     void timeoutExpired() noexcept override { delete timeout_; }
1053
1054    private:
1055     AsyncTimeout* timeout_;
1056   };
1057
1058   EventBase eb;
1059
1060   TestTimeout* t1 = new TestTimeout(&eb);
1061   t1->scheduleTimeout(30);
1062
1063   DestroyTimeout dt(&eb, t1);
1064   dt.scheduleTimeout(10);
1065
1066   TimePoint start;
1067   eb.loop();
1068   TimePoint end;
1069
1070   T_CHECK_TIMEOUT(start, end, milliseconds(10));
1071 }
1072
1073
1074 ///////////////////////////////////////////////////////////////////////////
1075 // Test for runInThreadTestFunc()
1076 ///////////////////////////////////////////////////////////////////////////
1077
1078 struct RunInThreadData {
1079   RunInThreadData(int numThreads, int opsPerThread)
1080     : opsPerThread(opsPerThread)
1081     , opsToGo(numThreads*opsPerThread) {}
1082
1083   EventBase evb;
1084   deque< pair<int, int> > values;
1085
1086   int opsPerThread;
1087   int opsToGo;
1088 };
1089
1090 struct RunInThreadArg {
1091   RunInThreadArg(RunInThreadData* data,
1092                  int threadId,
1093                  int value)
1094     : data(data)
1095     , thread(threadId)
1096     , value(value) {}
1097
1098   RunInThreadData* data;
1099   int thread;
1100   int value;
1101 };
1102
1103 void runInThreadTestFunc(RunInThreadArg* arg) {
1104   arg->data->values.emplace_back(arg->thread, arg->value);
1105   RunInThreadData* data = arg->data;
1106   delete arg;
1107
1108   if(--data->opsToGo == 0) {
1109     // Break out of the event base loop if we are the last thread running
1110     data->evb.terminateLoopSoon();
1111   }
1112 }
1113
1114 TEST(EventBaseTest, RunInThread) {
1115   constexpr uint32_t numThreads = 50;
1116   constexpr uint32_t opsPerThread = 100;
1117   RunInThreadData data(numThreads, opsPerThread);
1118
1119   deque<std::thread> threads;
1120   for (uint32_t i = 0; i < numThreads; ++i) {
1121     threads.emplace_back([i, &data] {
1122         for (int n = 0; n < data.opsPerThread; ++n) {
1123           RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1124           data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1125           usleep(10);
1126         }
1127       });
1128   }
1129
1130   // Add a timeout event to run after 3 seconds.
1131   // Otherwise loop() will return immediately since there are no events to run.
1132   // Once the last thread exits, it will stop the loop().  However, this
1133   // timeout also stops the loop in case there is a bug performing the normal
1134   // stop.
1135   data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1136                          3000);
1137
1138   TimePoint start;
1139   data.evb.loop();
1140   TimePoint end;
1141
1142   // Verify that the loop exited because all threads finished and requested it
1143   // to stop.  This should happen much sooner than the 3 second timeout.
1144   // Assert that it happens in under a second.  (This is still tons of extra
1145   // padding.)
1146
1147   auto timeTaken = std::chrono::duration_cast<milliseconds>(
1148     end.getTime() - start.getTime());
1149   ASSERT_LT(timeTaken.count(), 1000);
1150   VLOG(11) << "Time taken: " << timeTaken.count();
1151
1152   // Verify that we have all of the events from every thread
1153   int expectedValues[numThreads];
1154   for (uint32_t n = 0; n < numThreads; ++n) {
1155     expectedValues[n] = 0;
1156   }
1157   for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1158        it != data.values.end();
1159        ++it) {
1160     int threadID = it->first;
1161     int value = it->second;
1162     ASSERT_EQ(expectedValues[threadID], value);
1163     ++expectedValues[threadID];
1164   }
1165   for (uint32_t n = 0; n < numThreads; ++n) {
1166     ASSERT_EQ(expectedValues[n], opsPerThread);
1167   }
1168
1169   // Wait on all of the threads.
1170   for (auto& thread: threads) {
1171     thread.join();
1172   }
1173 }
1174
1175 //  This test simulates some calls, and verifies that the waiting happens by
1176 //  triggering what otherwise would be race conditions, and trying to detect
1177 //  whether any of the race conditions happened.
1178 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1179   const size_t c = 256;
1180   vector<unique_ptr<atomic<size_t>>> atoms(c);
1181   for (size_t i = 0; i < c; ++i) {
1182     auto& atom = atoms.at(i);
1183     atom = make_unique<atomic<size_t>>(0);
1184   }
1185   vector<thread> threads(c);
1186   for (size_t i = 0; i < c; ++i) {
1187     auto& atom = *atoms.at(i);
1188     auto& th = threads.at(i);
1189     th = thread([&atom] {
1190         EventBase eb;
1191         auto ebth = thread([&]{ eb.loopForever(); });
1192         eb.waitUntilRunning();
1193         eb.runInEventBaseThreadAndWait([&] {
1194           size_t x = 0;
1195           atom.compare_exchange_weak(
1196               x, 1, std::memory_order_release, std::memory_order_relaxed);
1197         });
1198         size_t x = 0;
1199         atom.compare_exchange_weak(
1200             x, 2, std::memory_order_release, std::memory_order_relaxed);
1201         eb.terminateLoopSoon();
1202         ebth.join();
1203     });
1204   }
1205   for (size_t i = 0; i < c; ++i) {
1206     auto& th = threads.at(i);
1207     th.join();
1208   }
1209   size_t sum = 0;
1210   for (auto& atom : atoms) sum += *atom;
1211   EXPECT_EQ(c, sum);
1212 }
1213
1214 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1215   EventBase eb;
1216   thread th(&EventBase::loopForever, &eb);
1217   SCOPE_EXIT {
1218     eb.terminateLoopSoon();
1219     th.join();
1220   };
1221   auto mutated = false;
1222   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1223       mutated = true;
1224   });
1225   EXPECT_TRUE(mutated);
1226 }
1227
1228 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1229   EventBase eb;
1230   thread th(&EventBase::loopForever, &eb);
1231   SCOPE_EXIT {
1232     eb.terminateLoopSoon();
1233     th.join();
1234   };
1235   eb.runInEventBaseThreadAndWait([&] {
1236       auto mutated = false;
1237       eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1238           mutated = true;
1239       });
1240       EXPECT_TRUE(mutated);
1241   });
1242 }
1243
1244 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1245   EventBase eb;
1246   auto mutated = false;
1247   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1248       mutated = true;
1249     });
1250   EXPECT_TRUE(mutated);
1251 }
1252
1253 ///////////////////////////////////////////////////////////////////////////
1254 // Tests for runInLoop()
1255 ///////////////////////////////////////////////////////////////////////////
1256
1257 class CountedLoopCallback : public EventBase::LoopCallback {
1258  public:
1259   CountedLoopCallback(EventBase* eventBase,
1260                       unsigned int count,
1261                       std::function<void()> action =
1262                         std::function<void()>())
1263     : eventBase_(eventBase)
1264     , count_(count)
1265     , action_(action) {}
1266
1267   void runLoopCallback() noexcept override {
1268     --count_;
1269     if (count_ > 0) {
1270       eventBase_->runInLoop(this);
1271     } else if (action_) {
1272       action_();
1273     }
1274   }
1275
1276   unsigned int getCount() const {
1277     return count_;
1278   }
1279
1280  private:
1281   EventBase* eventBase_;
1282   unsigned int count_;
1283   std::function<void()> action_;
1284 };
1285
1286 // Test that EventBase::loop() doesn't exit while there are
1287 // still LoopCallbacks remaining to be invoked.
1288 TEST(EventBaseTest, RepeatedRunInLoop) {
1289   EventBase eventBase;
1290
1291   CountedLoopCallback c(&eventBase, 10);
1292   eventBase.runInLoop(&c);
1293   // The callback shouldn't have run immediately
1294   ASSERT_EQ(c.getCount(), 10);
1295   eventBase.loop();
1296
1297   // loop() should loop until the CountedLoopCallback stops
1298   // re-installing itself.
1299   ASSERT_EQ(c.getCount(), 0);
1300 }
1301
1302 // Test that EventBase::loop() works as expected without time measurements.
1303 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1304   EventBase eventBase(false);
1305
1306   CountedLoopCallback c(&eventBase, 10);
1307   eventBase.runInLoop(&c);
1308   // The callback shouldn't have run immediately
1309   ASSERT_EQ(c.getCount(), 10);
1310   eventBase.loop();
1311
1312   // loop() should loop until the CountedLoopCallback stops
1313   // re-installing itself.
1314   ASSERT_EQ(c.getCount(), 0);
1315 }
1316
1317 // Test runInLoop() calls with terminateLoopSoon()
1318 TEST(EventBaseTest, RunInLoopStopLoop) {
1319   EventBase eventBase;
1320
1321   CountedLoopCallback c1(&eventBase, 20);
1322   CountedLoopCallback c2(&eventBase, 10,
1323                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1324
1325   eventBase.runInLoop(&c1);
1326   eventBase.runInLoop(&c2);
1327   ASSERT_EQ(c1.getCount(), 20);
1328   ASSERT_EQ(c2.getCount(), 10);
1329
1330   eventBase.loopForever();
1331
1332   // c2 should have stopped the loop after 10 iterations
1333   ASSERT_EQ(c2.getCount(), 0);
1334
1335   // We allow the EventBase to run the loop callbacks in whatever order it
1336   // chooses.  We'll accept c1's count being either 10 (if the loop terminated
1337   // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1338   // before c1 ran).
1339   //
1340   // (With the current code, c1 will always run 10 times, but we don't consider
1341   // this a hard API requirement.)
1342   ASSERT_GE(c1.getCount(), 10);
1343   ASSERT_LE(c1.getCount(), 11);
1344 }
1345
1346 TEST(EventBaseTest, TryRunningAfterTerminate) {
1347   EventBase eventBase;
1348   CountedLoopCallback c1(&eventBase, 1,
1349                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1350   eventBase.runInLoop(&c1);
1351   eventBase.loopForever();
1352   bool ran = false;
1353   eventBase.runInEventBaseThread([&]() {
1354     ran = true;
1355   });
1356
1357   ASSERT_FALSE(ran);
1358 }
1359
1360 // Test cancelling runInLoop() callbacks
1361 TEST(EventBaseTest, CancelRunInLoop) {
1362   EventBase eventBase;
1363
1364   CountedLoopCallback c1(&eventBase, 20);
1365   CountedLoopCallback c2(&eventBase, 20);
1366   CountedLoopCallback c3(&eventBase, 20);
1367
1368   std::function<void()> cancelC1Action =
1369     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1370   std::function<void()> cancelC2Action =
1371     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1372
1373   CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1374   CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1375
1376   // Install cancelC1 after c1
1377   eventBase.runInLoop(&c1);
1378   eventBase.runInLoop(&cancelC1);
1379
1380   // Install cancelC2 before c2
1381   eventBase.runInLoop(&cancelC2);
1382   eventBase.runInLoop(&c2);
1383
1384   // Install c3
1385   eventBase.runInLoop(&c3);
1386
1387   ASSERT_EQ(c1.getCount(), 20);
1388   ASSERT_EQ(c2.getCount(), 20);
1389   ASSERT_EQ(c3.getCount(), 20);
1390   ASSERT_EQ(cancelC1.getCount(), 10);
1391   ASSERT_EQ(cancelC2.getCount(), 10);
1392
1393   // Run the loop
1394   eventBase.loop();
1395
1396   // cancelC1 and cancelC3 should have both fired after 10 iterations and
1397   // stopped re-installing themselves
1398   ASSERT_EQ(cancelC1.getCount(), 0);
1399   ASSERT_EQ(cancelC2.getCount(), 0);
1400   // c3 should have continued on for the full 20 iterations
1401   ASSERT_EQ(c3.getCount(), 0);
1402
1403   // c1 and c2 should have both been cancelled on the 10th iteration.
1404   //
1405   // Callbacks are always run in the order they are installed,
1406   // so c1 should have fired 10 times, and been canceled after it ran on the
1407   // 10th iteration.  c2 should have only fired 9 times, because cancelC2 will
1408   // have run before it on the 10th iteration, and cancelled it before it
1409   // fired.
1410   ASSERT_EQ(c1.getCount(), 10);
1411   ASSERT_EQ(c2.getCount(), 11);
1412 }
1413
1414 class TerminateTestCallback : public EventBase::LoopCallback,
1415                               public EventHandler {
1416  public:
1417   TerminateTestCallback(EventBase* eventBase, int fd)
1418     : EventHandler(eventBase, fd),
1419       eventBase_(eventBase),
1420       loopInvocations_(0),
1421       maxLoopInvocations_(0),
1422       eventInvocations_(0),
1423       maxEventInvocations_(0) {}
1424
1425   void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1426     loopInvocations_ = 0;
1427     maxLoopInvocations_ = maxLoopInvocations;
1428     eventInvocations_ = 0;
1429     maxEventInvocations_ = maxEventInvocations;
1430
1431     cancelLoopCallback();
1432     unregisterHandler();
1433   }
1434
1435   void handlerReady(uint16_t /* events */) noexcept override {
1436     // We didn't register with PERSIST, so we will have been automatically
1437     // unregistered already.
1438     ASSERT_FALSE(isHandlerRegistered());
1439
1440     ++eventInvocations_;
1441     if (eventInvocations_ >= maxEventInvocations_) {
1442       return;
1443     }
1444
1445     eventBase_->runInLoop(this);
1446   }
1447   void runLoopCallback() noexcept override {
1448     ++loopInvocations_;
1449     if (loopInvocations_ >= maxLoopInvocations_) {
1450       return;
1451     }
1452
1453     registerHandler(READ);
1454   }
1455
1456   uint32_t getLoopInvocations() const {
1457     return loopInvocations_;
1458   }
1459   uint32_t getEventInvocations() const {
1460     return eventInvocations_;
1461   }
1462
1463  private:
1464   EventBase* eventBase_;
1465   uint32_t loopInvocations_;
1466   uint32_t maxLoopInvocations_;
1467   uint32_t eventInvocations_;
1468   uint32_t maxEventInvocations_;
1469 };
1470
1471 /**
1472  * Test that EventBase::loop() correctly detects when there are no more events
1473  * left to run.
1474  *
1475  * This uses a single callback, which alternates registering itself as a loop
1476  * callback versus a EventHandler callback.  This exercises a regression where
1477  * EventBase::loop() incorrectly exited if there were no more fd handlers
1478  * registered, but a loop callback installed a new fd handler.
1479  */
1480 TEST(EventBaseTest, LoopTermination) {
1481   EventBase eventBase;
1482
1483   // Open a pipe and close the write end,
1484   // so the read endpoint will be readable
1485   int pipeFds[2];
1486   int rc = pipe(pipeFds);
1487   ASSERT_EQ(rc, 0);
1488   close(pipeFds[1]);
1489   TerminateTestCallback callback(&eventBase, pipeFds[0]);
1490
1491   // Test once where the callback will exit after a loop callback
1492   callback.reset(10, 100);
1493   eventBase.runInLoop(&callback);
1494   eventBase.loop();
1495   ASSERT_EQ(callback.getLoopInvocations(), 10);
1496   ASSERT_EQ(callback.getEventInvocations(), 9);
1497
1498   // Test once where the callback will exit after an fd event callback
1499   callback.reset(100, 7);
1500   eventBase.runInLoop(&callback);
1501   eventBase.loop();
1502   ASSERT_EQ(callback.getLoopInvocations(), 7);
1503   ASSERT_EQ(callback.getEventInvocations(), 7);
1504
1505   close(pipeFds[0]);
1506 }
1507
1508 ///////////////////////////////////////////////////////////////////////////
1509 // Tests for latency calculations
1510 ///////////////////////////////////////////////////////////////////////////
1511
1512 class IdleTimeTimeoutSeries : public AsyncTimeout {
1513
1514  public:
1515
1516   explicit IdleTimeTimeoutSeries(EventBase *base,
1517                                  std::deque<std::uint64_t>& timeout) :
1518     AsyncTimeout(base),
1519     timeouts_(0),
1520     timeout_(timeout) {
1521       scheduleTimeout(1);
1522     }
1523
1524     ~IdleTimeTimeoutSeries() override {}
1525
1526     void timeoutExpired() noexcept override {
1527     ++timeouts_;
1528
1529     if(timeout_.empty()){
1530       cancelTimeout();
1531     } else {
1532       uint64_t sleepTime = timeout_.front();
1533       timeout_.pop_front();
1534       if (sleepTime) {
1535         usleep(sleepTime);
1536       }
1537       scheduleTimeout(1);
1538     }
1539   }
1540
1541   int getTimeouts() const {
1542     return timeouts_;
1543   }
1544
1545  private:
1546   int timeouts_;
1547   std::deque<uint64_t>& timeout_;
1548 };
1549
1550 /**
1551  * Verify that idle time is correctly accounted for when decaying our loop
1552  * time.
1553  *
1554  * This works by creating a high loop time (via usleep), expecting a latency
1555  * callback with known value, and then scheduling a timeout for later. This
1556  * later timeout is far enough in the future that the idle time should have
1557  * caused the loop time to decay.
1558  */
1559 TEST(EventBaseTest, IdleTime) {
1560   EventBase eventBase;
1561   eventBase.setLoadAvgMsec(1000);
1562   eventBase.resetLoadAvg(5900.0);
1563   std::deque<uint64_t> timeouts0(4, 8080);
1564   timeouts0.push_front(8000);
1565   timeouts0.push_back(14000);
1566   IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1567   std::deque<uint64_t> timeouts(20, 20);
1568   std::unique_ptr<IdleTimeTimeoutSeries> tos;
1569   int64_t testStart = duration_cast<microseconds>(
1570     std::chrono::steady_clock::now().time_since_epoch()).count();
1571   bool hostOverloaded = false;
1572
1573   int latencyCallbacks = 0;
1574   eventBase.setMaxLatency(6000, [&]() {
1575     ++latencyCallbacks;
1576
1577     switch (latencyCallbacks) {
1578     case 1:
1579       if (tos0.getTimeouts() < 6) {
1580         // This could only happen if the host this test is running
1581         // on is heavily loaded.
1582         int64_t maxLatencyReached = duration_cast<microseconds>(
1583             std::chrono::steady_clock::now().time_since_epoch()).count();
1584         ASSERT_LE(43800, maxLatencyReached - testStart);
1585         hostOverloaded = true;
1586         break;
1587       }
1588       ASSERT_EQ(6, tos0.getTimeouts());
1589       ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1590       ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1591       tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1592       break;
1593
1594     default:
1595       FAIL() << "Unexpected latency callback";
1596       break;
1597     }
1598   });
1599
1600   // Kick things off with an "immedite" timeout
1601   tos0.scheduleTimeout(1);
1602
1603   eventBase.loop();
1604
1605   if (hostOverloaded) {
1606     return;
1607   }
1608
1609   ASSERT_EQ(1, latencyCallbacks);
1610   ASSERT_EQ(7, tos0.getTimeouts());
1611   ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1612   ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1613   ASSERT_TRUE(!!tos);
1614   ASSERT_EQ(21, tos->getTimeouts());
1615 }
1616
1617 /**
1618  * Test that thisLoop functionality works with terminateLoopSoon
1619  */
1620 TEST(EventBaseTest, ThisLoop) {
1621   EventBase eb;
1622   bool runInLoop = false;
1623   bool runThisLoop = false;
1624
1625   eb.runInLoop([&](){
1626       eb.terminateLoopSoon();
1627       eb.runInLoop([&]() {
1628           runInLoop = true;
1629         });
1630       eb.runInLoop([&]() {
1631           runThisLoop = true;
1632         }, true);
1633     }, true);
1634   eb.loopForever();
1635
1636   // Should not work
1637   ASSERT_FALSE(runInLoop);
1638   // Should work with thisLoop
1639   ASSERT_TRUE(runThisLoop);
1640 }
1641
1642 TEST(EventBaseTest, EventBaseThreadLoop) {
1643   EventBase base;
1644   bool ran = false;
1645
1646   base.runInEventBaseThread([&](){
1647     ran = true;
1648   });
1649   base.loop();
1650
1651   ASSERT_EQ(true, ran);
1652 }
1653
1654 TEST(EventBaseTest, EventBaseThreadName) {
1655   EventBase base;
1656   base.setName("foo");
1657   base.loop();
1658
1659 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1660   char name[16];
1661   pthread_getname_np(pthread_self(), name, 16);
1662   ASSERT_EQ(0, strcmp("foo", name));
1663 #endif
1664 }
1665
1666 TEST(EventBaseTest, RunBeforeLoop) {
1667   EventBase base;
1668   CountedLoopCallback cb(&base, 1, [&](){
1669     base.terminateLoopSoon();
1670   });
1671   base.runBeforeLoop(&cb);
1672   base.loopForever();
1673   ASSERT_EQ(cb.getCount(), 0);
1674 }
1675
1676 TEST(EventBaseTest, RunBeforeLoopWait) {
1677   EventBase base;
1678   CountedLoopCallback cb(&base, 1);
1679   base.tryRunAfterDelay([&](){
1680       base.terminateLoopSoon();
1681     }, 500);
1682   base.runBeforeLoop(&cb);
1683   base.loopForever();
1684
1685   // Check that we only ran once, and did not loop multiple times.
1686   ASSERT_EQ(cb.getCount(), 0);
1687 }
1688
1689 class PipeHandler : public EventHandler {
1690 public:
1691   PipeHandler(EventBase* eventBase, int fd)
1692     : EventHandler(eventBase, fd) {}
1693
1694   void handlerReady(uint16_t /* events */) noexcept override { abort(); }
1695 };
1696
1697 TEST(EventBaseTest, StopBeforeLoop) {
1698   EventBase evb;
1699
1700   // Give the evb something to do.
1701   int p[2];
1702   ASSERT_EQ(0, pipe(p));
1703   PipeHandler handler(&evb, p[0]);
1704   handler.registerHandler(EventHandler::READ);
1705
1706   // It's definitely not running yet
1707   evb.terminateLoopSoon();
1708
1709   // let it run, it should exit quickly.
1710   std::thread t([&] { evb.loop(); });
1711   t.join();
1712
1713   handler.unregisterHandler();
1714   close(p[0]);
1715   close(p[1]);
1716
1717   SUCCEED();
1718 }
1719
1720 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1721   bool ran = false;
1722
1723   {
1724     EventBase base;
1725     base.runInEventBaseThread([&](){
1726       ran = true;
1727     });
1728   }
1729
1730   ASSERT_TRUE(ran);
1731 }
1732
1733 TEST(EventBaseTest, LoopKeepAlive) {
1734   EventBase evb;
1735
1736   bool done = false;
1737   std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1738     /* sleep override */ std::this_thread::sleep_for(
1739         std::chrono::milliseconds(100));
1740     evb.runInEventBaseThread(
1741         [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1742   });
1743
1744   evb.loop();
1745
1746   ASSERT_TRUE(done);
1747
1748   t.join();
1749 }
1750
1751 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1752   EventBase evb;
1753
1754   bool done = false;
1755   std::thread t;
1756
1757   evb.runInEventBaseThread([&] {
1758     t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1759       /* sleep override */ std::this_thread::sleep_for(
1760           std::chrono::milliseconds(100));
1761       evb.runInEventBaseThread(
1762           [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1763     });
1764   });
1765
1766   evb.loop();
1767
1768   ASSERT_TRUE(done);
1769
1770   t.join();
1771 }
1772
1773 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1774   std::unique_ptr<EventBase> evb = folly::make_unique<EventBase>();
1775
1776   bool done = false;
1777
1778   std::thread evThread([&] {
1779     evb->loopForever();
1780     evb.reset();
1781     done = true;
1782   });
1783
1784   {
1785     auto* ev = evb.get();
1786     EventBase::LoopKeepAlive keepAlive;
1787     ev->runInEventBaseThreadAndWait(
1788         [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); });
1789     ASSERT_FALSE(done) << "Loop finished before we asked it to";
1790     ev->terminateLoopSoon();
1791     /* sleep override */
1792     std::this_thread::sleep_for(std::chrono::milliseconds(30));
1793     ASSERT_FALSE(done) << "Loop terminated early";
1794     ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{});
1795   }
1796
1797   evThread.join();
1798   ASSERT_TRUE(done);
1799 }
1800
1801 TEST(EventBaseTest, LoopKeepAliveShutdown) {
1802   auto evb = folly::make_unique<EventBase>();
1803
1804   bool done = false;
1805
1806   std::thread t([
1807     &done,
1808     loopKeepAlive = evb->loopKeepAlive(),
1809     evbPtr = evb.get()
1810   ]() mutable {
1811     /* sleep override */ std::this_thread::sleep_for(
1812         std::chrono::milliseconds(100));
1813     evbPtr->runInEventBaseThread(
1814         [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1815   });
1816
1817   evb.reset();
1818
1819   ASSERT_TRUE(done);
1820
1821   t.join();
1822 }