Merge commit '64f2f2734ad853784bdd260bcf31e065c47c0741' into fix-configure-pthread...
[folly.git] / folly / io / async / test / EventBaseTest.cpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 #include <folly/io/async/AsyncTimeout.h>
20 #include <folly/io/async/EventBase.h>
21 #include <folly/io/async/EventHandler.h>
22 #include <folly/io/async/test/SocketPair.h>
23 #include <folly/io/async/test/Util.h>
24
25 #include <atomic>
26 #include <iostream>
27 #include <unistd.h>
28 #include <memory>
29 #include <thread>
30
31 using std::atomic;
32 using std::deque;
33 using std::pair;
34 using std::vector;
35 using std::thread;
36 using std::make_pair;
37 using std::cerr;
38 using std::endl;
39 using std::chrono::milliseconds;
40 using std::chrono::microseconds;
41 using std::chrono::duration_cast;
42
43 using namespace folly;
44
45 ///////////////////////////////////////////////////////////////////////////
46 // Tests for read and write events
47 ///////////////////////////////////////////////////////////////////////////
48
49 enum { BUF_SIZE = 4096 };
50
51 ssize_t writeToFD(int fd, size_t length) {
52   // write an arbitrary amount of data to the fd
53   char buf[length];
54   memset(buf, 'a', sizeof(buf));
55   ssize_t rc = write(fd, buf, sizeof(buf));
56   CHECK_EQ(rc, length);
57   return rc;
58 }
59
60 size_t writeUntilFull(int fd) {
61   // Write to the fd until EAGAIN is returned
62   size_t bytesWritten = 0;
63   char buf[BUF_SIZE];
64   memset(buf, 'a', sizeof(buf));
65   while (true) {
66     ssize_t rc = write(fd, buf, sizeof(buf));
67     if (rc < 0) {
68       CHECK_EQ(errno, EAGAIN);
69       break;
70     } else {
71       bytesWritten += rc;
72     }
73   }
74   return bytesWritten;
75 }
76
77 ssize_t readFromFD(int fd, size_t length) {
78   // write an arbitrary amount of data to the fd
79   char buf[length];
80   return read(fd, buf, sizeof(buf));
81 }
82
83 size_t readUntilEmpty(int fd) {
84   // Read from the fd until EAGAIN is returned
85   char buf[BUF_SIZE];
86   size_t bytesRead = 0;
87   while (true) {
88     int rc = read(fd, buf, sizeof(buf));
89     if (rc == 0) {
90       CHECK(false) << "unexpected EOF";
91     } else if (rc < 0) {
92       CHECK_EQ(errno, EAGAIN);
93       break;
94     } else {
95       bytesRead += rc;
96     }
97   }
98   return bytesRead;
99 }
100
101 void checkReadUntilEmpty(int fd, size_t expectedLength) {
102   ASSERT_EQ(readUntilEmpty(fd), expectedLength);
103 }
104
105 struct ScheduledEvent {
106   int milliseconds;
107   uint16_t events;
108   size_t length;
109   ssize_t result;
110
111   void perform(int fd) {
112     if (events & EventHandler::READ) {
113       if (length == 0) {
114         result = readUntilEmpty(fd);
115       } else {
116         result = readFromFD(fd, length);
117       }
118     }
119     if (events & EventHandler::WRITE) {
120       if (length == 0) {
121         result = writeUntilFull(fd);
122       } else {
123         result = writeToFD(fd, length);
124       }
125     }
126   }
127 };
128
129 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
130   for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
131     eventBase->runAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
132                              ev->milliseconds);
133   }
134 }
135
136 class TestHandler : public EventHandler {
137  public:
138   TestHandler(EventBase* eventBase, int fd)
139     : EventHandler(eventBase, fd), fd_(fd) {}
140
141   virtual void handlerReady(uint16_t events) noexcept {
142     ssize_t bytesRead = 0;
143     ssize_t bytesWritten = 0;
144     if (events & READ) {
145       // Read all available data, so EventBase will stop calling us
146       // until new data becomes available
147       bytesRead = readUntilEmpty(fd_);
148     }
149     if (events & WRITE) {
150       // Write until the pipe buffer is full, so EventBase will stop calling
151       // us until the other end has read some data
152       bytesWritten = writeUntilFull(fd_);
153     }
154
155     log.push_back(EventRecord(events, bytesRead, bytesWritten));
156   }
157
158   struct EventRecord {
159     EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
160       : events(events)
161       , timestamp()
162       , bytesRead(bytesRead)
163       , bytesWritten(bytesWritten) {}
164
165     uint16_t events;
166     TimePoint timestamp;
167     ssize_t bytesRead;
168     ssize_t bytesWritten;
169   };
170
171   deque<EventRecord> log;
172
173  private:
174   int fd_;
175 };
176
177 /**
178  * Test a READ event
179  */
180 TEST(EventBaseTest, ReadEvent) {
181   EventBase eb;
182   SocketPair sp;
183
184   // Register for read events
185   TestHandler handler(&eb, sp[0]);
186   handler.registerHandler(EventHandler::READ);
187
188   // Register timeouts to perform two write events
189   ScheduledEvent events[] = {
190     { 10, EventHandler::WRITE, 2345 },
191     { 160, EventHandler::WRITE, 99 },
192     { 0, 0, 0 },
193   };
194   scheduleEvents(&eb, sp[1], events);
195
196   // Loop
197   TimePoint start;
198   eb.loop();
199   TimePoint end;
200
201   // Since we didn't use the EventHandler::PERSIST flag, the handler should
202   // have received the first read, then unregistered itself.  Check that only
203   // the first chunk of data was received.
204   ASSERT_EQ(handler.log.size(), 1);
205   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
206   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
207                   milliseconds(events[0].milliseconds), milliseconds(90));
208   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
209   ASSERT_EQ(handler.log[0].bytesWritten, 0);
210   T_CHECK_TIMEOUT(start, end,
211                   milliseconds(events[1].milliseconds), milliseconds(30));
212
213   // Make sure the second chunk of data is still waiting to be read.
214   size_t bytesRemaining = readUntilEmpty(sp[0]);
215   ASSERT_EQ(bytesRemaining, events[1].length);
216 }
217
218 /**
219  * Test (READ | PERSIST)
220  */
221 TEST(EventBaseTest, ReadPersist) {
222   EventBase eb;
223   SocketPair sp;
224
225   // Register for read events
226   TestHandler handler(&eb, sp[0]);
227   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
228
229   // Register several timeouts to perform writes
230   ScheduledEvent events[] = {
231     { 10,  EventHandler::WRITE, 1024 },
232     { 20,  EventHandler::WRITE, 2211 },
233     { 30,  EventHandler::WRITE, 4096 },
234     { 100, EventHandler::WRITE, 100 },
235     { 0, 0 },
236   };
237   scheduleEvents(&eb, sp[1], events);
238
239   // Schedule a timeout to unregister the handler after the third write
240   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
241
242   // Loop
243   TimePoint start;
244   eb.loop();
245   TimePoint end;
246
247   // The handler should have received the first 3 events,
248   // then been unregistered after that.
249   ASSERT_EQ(handler.log.size(), 3);
250   for (int n = 0; n < 3; ++n) {
251     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
252     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
253                     milliseconds(events[n].milliseconds));
254     ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
255     ASSERT_EQ(handler.log[n].bytesWritten, 0);
256   }
257   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
258
259   // Make sure the data from the last write is still waiting to be read
260   size_t bytesRemaining = readUntilEmpty(sp[0]);
261   ASSERT_EQ(bytesRemaining, events[3].length);
262 }
263
264 /**
265  * Test registering for READ when the socket is immediately readable
266  */
267 TEST(EventBaseTest, ReadImmediate) {
268   EventBase eb;
269   SocketPair sp;
270
271   // Write some data to the socket so the other end will
272   // be immediately readable
273   size_t dataLength = 1234;
274   writeToFD(sp[1], dataLength);
275
276   // Register for read events
277   TestHandler handler(&eb, sp[0]);
278   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
279
280   // Register a timeout to perform another write
281   ScheduledEvent events[] = {
282     { 10, EventHandler::WRITE, 2345 },
283     { 0, 0, 0 },
284   };
285   scheduleEvents(&eb, sp[1], events);
286
287   // Schedule a timeout to unregister the handler
288   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
289
290   // Loop
291   TimePoint start;
292   eb.loop();
293   TimePoint end;
294
295   ASSERT_EQ(handler.log.size(), 2);
296
297   // There should have been 1 event for immediate readability
298   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
299   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
300   ASSERT_EQ(handler.log[0].bytesRead, dataLength);
301   ASSERT_EQ(handler.log[0].bytesWritten, 0);
302
303   // There should be another event after the timeout wrote more data
304   ASSERT_EQ(handler.log[1].events, EventHandler::READ);
305   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
306                   milliseconds(events[0].milliseconds));
307   ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
308   ASSERT_EQ(handler.log[1].bytesWritten, 0);
309
310   T_CHECK_TIMEOUT(start, end, milliseconds(20));
311 }
312
313 /**
314  * Test a WRITE event
315  */
316 TEST(EventBaseTest, WriteEvent) {
317   EventBase eb;
318   SocketPair sp;
319
320   // Fill up the write buffer before starting
321   size_t initialBytesWritten = writeUntilFull(sp[0]);
322
323   // Register for write events
324   TestHandler handler(&eb, sp[0]);
325   handler.registerHandler(EventHandler::WRITE);
326
327   // Register timeouts to perform two reads
328   ScheduledEvent events[] = {
329     { 10, EventHandler::READ, 0 },
330     { 60, EventHandler::READ, 0 },
331     { 0, 0, 0 },
332   };
333   scheduleEvents(&eb, sp[1], events);
334
335   // Loop
336   TimePoint start;
337   eb.loop();
338   TimePoint end;
339
340   // Since we didn't use the EventHandler::PERSIST flag, the handler should
341   // have only been able to write once, then unregistered itself.
342   ASSERT_EQ(handler.log.size(), 1);
343   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
344   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
345                   milliseconds(events[0].milliseconds));
346   ASSERT_EQ(handler.log[0].bytesRead, 0);
347   ASSERT_GT(handler.log[0].bytesWritten, 0);
348   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
349
350   ASSERT_EQ(events[0].result, initialBytesWritten);
351   ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
352 }
353
354 /**
355  * Test (WRITE | PERSIST)
356  */
357 TEST(EventBaseTest, WritePersist) {
358   EventBase eb;
359   SocketPair sp;
360
361   // Fill up the write buffer before starting
362   size_t initialBytesWritten = writeUntilFull(sp[0]);
363
364   // Register for write events
365   TestHandler handler(&eb, sp[0]);
366   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
367
368   // Register several timeouts to read from the socket at several intervals
369   ScheduledEvent events[] = {
370     { 10,  EventHandler::READ, 0 },
371     { 40,  EventHandler::READ, 0 },
372     { 70,  EventHandler::READ, 0 },
373     { 100, EventHandler::READ, 0 },
374     { 0, 0 },
375   };
376   scheduleEvents(&eb, sp[1], events);
377
378   // Schedule a timeout to unregister the handler after the third read
379   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
380
381   // Loop
382   TimePoint start;
383   eb.loop();
384   TimePoint end;
385
386   // The handler should have received the first 3 events,
387   // then been unregistered after that.
388   ASSERT_EQ(handler.log.size(), 3);
389   ASSERT_EQ(events[0].result, initialBytesWritten);
390   for (int n = 0; n < 3; ++n) {
391     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
392     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
393                     milliseconds(events[n].milliseconds));
394     ASSERT_EQ(handler.log[n].bytesRead, 0);
395     ASSERT_GT(handler.log[n].bytesWritten, 0);
396     ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
397   }
398   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
399 }
400
401 /**
402  * Test registering for WRITE when the socket is immediately writable
403  */
404 TEST(EventBaseTest, WriteImmediate) {
405   EventBase eb;
406   SocketPair sp;
407
408   // Register for write events
409   TestHandler handler(&eb, sp[0]);
410   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
411
412   // Register a timeout to perform a read
413   ScheduledEvent events[] = {
414     { 10, EventHandler::READ, 0 },
415     { 0, 0, 0 },
416   };
417   scheduleEvents(&eb, sp[1], events);
418
419   // Schedule a timeout to unregister the handler
420   int64_t unregisterTimeout = 40;
421   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
422                    unregisterTimeout);
423
424   // Loop
425   TimePoint start;
426   eb.loop();
427   TimePoint end;
428
429   ASSERT_EQ(handler.log.size(), 2);
430
431   // Since the socket buffer was initially empty,
432   // there should have been 1 event for immediate writability
433   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
434   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
435   ASSERT_EQ(handler.log[0].bytesRead, 0);
436   ASSERT_GT(handler.log[0].bytesWritten, 0);
437
438   // There should be another event after the timeout wrote more data
439   ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
440   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
441                   milliseconds(events[0].milliseconds));
442   ASSERT_EQ(handler.log[1].bytesRead, 0);
443   ASSERT_GT(handler.log[1].bytesWritten, 0);
444
445   T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
446 }
447
448 /**
449  * Test (READ | WRITE) when the socket becomes readable first
450  */
451 TEST(EventBaseTest, ReadWrite) {
452   EventBase eb;
453   SocketPair sp;
454
455   // Fill up the write buffer before starting
456   size_t sock0WriteLength = writeUntilFull(sp[0]);
457
458   // Register for read and write events
459   TestHandler handler(&eb, sp[0]);
460   handler.registerHandler(EventHandler::READ_WRITE);
461
462   // Register timeouts to perform a write then a read.
463   ScheduledEvent events[] = {
464     { 10, EventHandler::WRITE, 2345 },
465     { 40, EventHandler::READ, 0 },
466     { 0, 0, 0 },
467   };
468   scheduleEvents(&eb, sp[1], events);
469
470   // Loop
471   TimePoint start;
472   eb.loop();
473   TimePoint end;
474
475   // Since we didn't use the EventHandler::PERSIST flag, the handler should
476   // have only noticed readability, then unregistered itself.  Check that only
477   // one event was logged.
478   ASSERT_EQ(handler.log.size(), 1);
479   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
480   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
481                   milliseconds(events[0].milliseconds));
482   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
483   ASSERT_EQ(handler.log[0].bytesWritten, 0);
484   ASSERT_EQ(events[1].result, sock0WriteLength);
485   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
486 }
487
488 /**
489  * Test (READ | WRITE) when the socket becomes writable first
490  */
491 TEST(EventBaseTest, WriteRead) {
492   EventBase eb;
493   SocketPair sp;
494
495   // Fill up the write buffer before starting
496   size_t sock0WriteLength = writeUntilFull(sp[0]);
497
498   // Register for read and write events
499   TestHandler handler(&eb, sp[0]);
500   handler.registerHandler(EventHandler::READ_WRITE);
501
502   // Register timeouts to perform a read then a write.
503   size_t sock1WriteLength = 2345;
504   ScheduledEvent events[] = {
505     { 10, EventHandler::READ, 0 },
506     { 40, EventHandler::WRITE, sock1WriteLength },
507     { 0, 0, 0 },
508   };
509   scheduleEvents(&eb, sp[1], events);
510
511   // Loop
512   TimePoint start;
513   eb.loop();
514   TimePoint end;
515
516   // Since we didn't use the EventHandler::PERSIST flag, the handler should
517   // have only noticed writability, then unregistered itself.  Check that only
518   // one event was logged.
519   ASSERT_EQ(handler.log.size(), 1);
520   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
521   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
522                   milliseconds(events[0].milliseconds));
523   ASSERT_EQ(handler.log[0].bytesRead, 0);
524   ASSERT_GT(handler.log[0].bytesWritten, 0);
525   ASSERT_EQ(events[0].result, sock0WriteLength);
526   ASSERT_EQ(events[1].result, sock1WriteLength);
527   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
528
529   // Make sure the written data is still waiting to be read.
530   size_t bytesRemaining = readUntilEmpty(sp[0]);
531   ASSERT_EQ(bytesRemaining, events[1].length);
532 }
533
534 /**
535  * Test (READ | WRITE) when the socket becomes readable and writable
536  * at the same time.
537  */
538 TEST(EventBaseTest, ReadWriteSimultaneous) {
539   EventBase eb;
540   SocketPair sp;
541
542   // Fill up the write buffer before starting
543   size_t sock0WriteLength = writeUntilFull(sp[0]);
544
545   // Register for read and write events
546   TestHandler handler(&eb, sp[0]);
547   handler.registerHandler(EventHandler::READ_WRITE);
548
549   // Register a timeout to perform a read and write together
550   ScheduledEvent events[] = {
551     { 10, EventHandler::READ | EventHandler::WRITE, 0 },
552     { 0, 0, 0 },
553   };
554   scheduleEvents(&eb, sp[1], events);
555
556   // Loop
557   TimePoint start;
558   eb.loop();
559   TimePoint end;
560
561   // It's not strictly required that the EventBase register us about both
562   // events in the same call.  So, it's possible that if the EventBase
563   // implementation changes this test could start failing, and it wouldn't be
564   // considered breaking the API.  However for now it's nice to exercise this
565   // code path.
566   ASSERT_EQ(handler.log.size(), 1);
567   ASSERT_EQ(handler.log[0].events,
568                     EventHandler::READ | EventHandler::WRITE);
569   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
570                   milliseconds(events[0].milliseconds));
571   ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
572   ASSERT_GT(handler.log[0].bytesWritten, 0);
573   T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
574 }
575
576 /**
577  * Test (READ | WRITE | PERSIST)
578  */
579 TEST(EventBaseTest, ReadWritePersist) {
580   EventBase eb;
581   SocketPair sp;
582
583   // Register for read and write events
584   TestHandler handler(&eb, sp[0]);
585   handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
586                           EventHandler::PERSIST);
587
588   // Register timeouts to perform several reads and writes
589   ScheduledEvent events[] = {
590     { 10, EventHandler::WRITE, 2345 },
591     { 20, EventHandler::READ, 0 },
592     { 35, EventHandler::WRITE, 200 },
593     { 45, EventHandler::WRITE, 15 },
594     { 55, EventHandler::READ, 0 },
595     { 120, EventHandler::WRITE, 2345 },
596     { 0, 0, 0 },
597   };
598   scheduleEvents(&eb, sp[1], events);
599
600   // Schedule a timeout to unregister the handler
601   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
602
603   // Loop
604   TimePoint start;
605   eb.loop();
606   TimePoint end;
607
608   ASSERT_EQ(handler.log.size(), 6);
609
610   // Since we didn't fill up the write buffer immediately, there should
611   // be an immediate event for writability.
612   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
613   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
614   ASSERT_EQ(handler.log[0].bytesRead, 0);
615   ASSERT_GT(handler.log[0].bytesWritten, 0);
616
617   // Events 1 through 5 should correspond to the scheduled events
618   for (int n = 1; n < 6; ++n) {
619     ScheduledEvent* event = &events[n - 1];
620     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
621                     milliseconds(event->milliseconds));
622     if (event->events == EventHandler::READ) {
623       ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
624       ASSERT_EQ(handler.log[n].bytesRead, 0);
625       ASSERT_GT(handler.log[n].bytesWritten, 0);
626     } else {
627       ASSERT_EQ(handler.log[n].events, EventHandler::READ);
628       ASSERT_EQ(handler.log[n].bytesRead, event->length);
629       ASSERT_EQ(handler.log[n].bytesWritten, 0);
630     }
631   }
632
633   // The timeout should have unregistered the handler before the last write.
634   // Make sure that data is still waiting to be read
635   size_t bytesRemaining = readUntilEmpty(sp[0]);
636   ASSERT_EQ(bytesRemaining, events[5].length);
637 }
638
639
640 class PartialReadHandler : public TestHandler {
641  public:
642   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
643     : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
644
645   virtual void handlerReady(uint16_t events) noexcept {
646     assert(events == EventHandler::READ);
647     ssize_t bytesRead = readFromFD(fd_, readLength_);
648     log.push_back(EventRecord(events, bytesRead, 0));
649   }
650
651  private:
652   int fd_;
653   size_t readLength_;
654 };
655
656 /**
657  * Test reading only part of the available data when a read event is fired.
658  * When PERSIST is used, make sure the handler gets notified again the next
659  * time around the loop.
660  */
661 TEST(EventBaseTest, ReadPartial) {
662   EventBase eb;
663   SocketPair sp;
664
665   // Register for read events
666   size_t readLength = 100;
667   PartialReadHandler handler(&eb, sp[0], readLength);
668   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
669
670   // Register a timeout to perform a single write,
671   // with more data than PartialReadHandler will read at once
672   ScheduledEvent events[] = {
673     { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) },
674     { 0, 0, 0 },
675   };
676   scheduleEvents(&eb, sp[1], events);
677
678   // Schedule a timeout to unregister the handler
679   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
680
681   // Loop
682   TimePoint start;
683   eb.loop();
684   TimePoint end;
685
686   ASSERT_EQ(handler.log.size(), 4);
687
688   // The first 3 invocations should read readLength bytes each
689   for (int n = 0; n < 3; ++n) {
690     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
691     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
692                     milliseconds(events[0].milliseconds));
693     ASSERT_EQ(handler.log[n].bytesRead, readLength);
694     ASSERT_EQ(handler.log[n].bytesWritten, 0);
695   }
696   // The last read only has readLength/2 bytes
697   ASSERT_EQ(handler.log[3].events, EventHandler::READ);
698   T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
699                   milliseconds(events[0].milliseconds));
700   ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
701   ASSERT_EQ(handler.log[3].bytesWritten, 0);
702 }
703
704
705 class PartialWriteHandler : public TestHandler {
706  public:
707   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
708     : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
709
710   virtual void handlerReady(uint16_t events) noexcept {
711     assert(events == EventHandler::WRITE);
712     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
713     log.push_back(EventRecord(events, 0, bytesWritten));
714   }
715
716  private:
717   int fd_;
718   size_t writeLength_;
719 };
720
721 /**
722  * Test writing without completely filling up the write buffer when the fd
723  * becomes writable.  When PERSIST is used, make sure the handler gets
724  * notified again the next time around the loop.
725  */
726 TEST(EventBaseTest, WritePartial) {
727   EventBase eb;
728   SocketPair sp;
729
730   // Fill up the write buffer before starting
731   size_t initialBytesWritten = writeUntilFull(sp[0]);
732
733   // Register for write events
734   size_t writeLength = 100;
735   PartialWriteHandler handler(&eb, sp[0], writeLength);
736   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
737
738   // Register a timeout to read, so that more data can be written
739   ScheduledEvent events[] = {
740     { 10, EventHandler::READ, 0 },
741     { 0, 0, 0 },
742   };
743   scheduleEvents(&eb, sp[1], events);
744
745   // Schedule a timeout to unregister the handler
746   eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
747
748   // Loop
749   TimePoint start;
750   eb.loop();
751   TimePoint end;
752
753   // Depending on how big the socket buffer is, there will be multiple writes
754   // Only check the first 5
755   int numChecked = 5;
756   ASSERT_GE(handler.log.size(), numChecked);
757   ASSERT_EQ(events[0].result, initialBytesWritten);
758
759   // The first 3 invocations should read writeLength bytes each
760   for (int n = 0; n < numChecked; ++n) {
761     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
762     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
763                     milliseconds(events[0].milliseconds));
764     ASSERT_EQ(handler.log[n].bytesRead, 0);
765     ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
766   }
767 }
768
769
770 /**
771  * Test destroying a registered EventHandler
772  */
773 TEST(EventBaseTest, DestroyHandler) {
774   class DestroyHandler : public AsyncTimeout {
775    public:
776     DestroyHandler(EventBase* eb, EventHandler* h)
777       : AsyncTimeout(eb)
778       , handler_(h) {}
779
780     virtual void timeoutExpired() noexcept {
781       delete handler_;
782     }
783
784    private:
785     EventHandler* handler_;
786   };
787
788   EventBase eb;
789   SocketPair sp;
790
791   // Fill up the write buffer before starting
792   size_t initialBytesWritten = writeUntilFull(sp[0]);
793
794   // Register for write events
795   TestHandler* handler = new TestHandler(&eb, sp[0]);
796   handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
797
798   // After 10ms, read some data, so that the handler
799   // will be notified that it can write.
800   eb.runAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
801                    10);
802
803   // Start a timer to destroy the handler after 25ms
804   // This mainly just makes sure the code doesn't break or assert
805   DestroyHandler dh(&eb, handler);
806   dh.scheduleTimeout(25);
807
808   TimePoint start;
809   eb.loop();
810   TimePoint end;
811
812   // Make sure the EventHandler was uninstalled properly when it was
813   // destroyed, and the EventBase loop exited
814   T_CHECK_TIMEOUT(start, end, milliseconds(25));
815
816   // Make sure that the handler wrote data to the socket
817   // before it was destroyed
818   size_t bytesRemaining = readUntilEmpty(sp[1]);
819   ASSERT_GT(bytesRemaining, 0);
820 }
821
822
823 ///////////////////////////////////////////////////////////////////////////
824 // Tests for timeout events
825 ///////////////////////////////////////////////////////////////////////////
826
827 TEST(EventBaseTest, RunAfterDelay) {
828   EventBase eb;
829
830   TimePoint timestamp1(false);
831   TimePoint timestamp2(false);
832   TimePoint timestamp3(false);
833   eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
834   eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
835   eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
836
837   TimePoint start;
838   eb.loop();
839   TimePoint end;
840
841   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
842   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
843   T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
844   T_CHECK_TIMEOUT(start, end, milliseconds(40));
845 }
846
847 /**
848  * Test the behavior of runAfterDelay() when some timeouts are
849  * still scheduled when the EventBase is destroyed.
850  */
851 TEST(EventBaseTest, RunAfterDelayDestruction) {
852   TimePoint timestamp1(false);
853   TimePoint timestamp2(false);
854   TimePoint timestamp3(false);
855   TimePoint timestamp4(false);
856   TimePoint start(false);
857   TimePoint end(false);
858
859   {
860     EventBase eb;
861
862     // Run two normal timeouts
863     eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
864     eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
865
866     // Schedule a timeout to stop the event loop after 40ms
867     eb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
868
869     // Schedule 2 timeouts that would fire after the event loop stops
870     eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
871     eb.runAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
872
873     start.reset();
874     eb.loop();
875     end.reset();
876   }
877
878   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
879   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
880   T_CHECK_TIMEOUT(start, end, milliseconds(40));
881
882   ASSERT_TRUE(timestamp3.isUnset());
883   ASSERT_TRUE(timestamp4.isUnset());
884
885   // Ideally this test should be run under valgrind to ensure that no
886   // memory is leaked.
887 }
888
889 class TestTimeout : public AsyncTimeout {
890  public:
891   explicit TestTimeout(EventBase* eventBase)
892     : AsyncTimeout(eventBase)
893     , timestamp(false) {}
894
895   virtual void timeoutExpired() noexcept {
896     timestamp.reset();
897   }
898
899   TimePoint timestamp;
900 };
901
902 TEST(EventBaseTest, BasicTimeouts) {
903   EventBase eb;
904
905   TestTimeout t1(&eb);
906   TestTimeout t2(&eb);
907   TestTimeout t3(&eb);
908   t1.scheduleTimeout(10);
909   t2.scheduleTimeout(20);
910   t3.scheduleTimeout(40);
911
912   TimePoint start;
913   eb.loop();
914   TimePoint end;
915
916   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
917   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
918   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
919   T_CHECK_TIMEOUT(start, end, milliseconds(40));
920 }
921
922 class ReschedulingTimeout : public AsyncTimeout {
923  public:
924   ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
925     : AsyncTimeout(evb)
926     , timeouts_(timeouts)
927     , iterator_(timeouts_.begin()) {}
928
929   void start() {
930     reschedule();
931   }
932
933   virtual void timeoutExpired() noexcept {
934     timestamps.push_back(TimePoint());
935     reschedule();
936   }
937
938   void reschedule() {
939     if (iterator_ != timeouts_.end()) {
940       uint32_t timeout = *iterator_;
941       ++iterator_;
942       scheduleTimeout(timeout);
943     }
944   }
945
946   vector<TimePoint> timestamps;
947
948  private:
949   vector<uint32_t> timeouts_;
950   vector<uint32_t>::const_iterator iterator_;
951 };
952
953 /**
954  * Test rescheduling the same timeout multiple times
955  */
956 TEST(EventBaseTest, ReuseTimeout) {
957   EventBase eb;
958
959   vector<uint32_t> timeouts;
960   timeouts.push_back(10);
961   timeouts.push_back(30);
962   timeouts.push_back(15);
963
964   ReschedulingTimeout t(&eb, timeouts);
965   t.start();
966
967   TimePoint start;
968   eb.loop();
969   TimePoint end;
970
971   // Use a higher tolerance than usual.  We're waiting on 3 timeouts
972   // consecutively.  In general, each timeout may go over by a few
973   // milliseconds, and we're tripling this error by witing on 3 timeouts.
974   milliseconds tolerance{6};
975
976   ASSERT_EQ(timeouts.size(), t.timestamps.size());
977   uint32_t total = 0;
978   for (size_t n = 0; n < timeouts.size(); ++n) {
979     total += timeouts[n];
980     T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
981   }
982   T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
983 }
984
985 /**
986  * Test rescheduling a timeout before it has fired
987  */
988 TEST(EventBaseTest, RescheduleTimeout) {
989   EventBase eb;
990
991   TestTimeout t1(&eb);
992   TestTimeout t2(&eb);
993   TestTimeout t3(&eb);
994
995   t1.scheduleTimeout(15);
996   t2.scheduleTimeout(30);
997   t3.scheduleTimeout(30);
998
999   auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1000       &AsyncTimeout::scheduleTimeout);
1001
1002   // after 10ms, reschedule t2 to run sooner than originally scheduled
1003   eb.runAfterDelay(std::bind(f, &t2, 10), 10);
1004   // after 10ms, reschedule t3 to run later than originally scheduled
1005   eb.runAfterDelay(std::bind(f, &t3, 40), 10);
1006
1007   TimePoint start;
1008   eb.loop();
1009   TimePoint end;
1010
1011   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1012   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1013   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1014   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1015 }
1016
1017 /**
1018  * Test cancelling a timeout
1019  */
1020 TEST(EventBaseTest, CancelTimeout) {
1021   EventBase eb;
1022
1023   vector<uint32_t> timeouts;
1024   timeouts.push_back(10);
1025   timeouts.push_back(30);
1026   timeouts.push_back(25);
1027
1028   ReschedulingTimeout t(&eb, timeouts);
1029   t.start();
1030   eb.runAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1031
1032   TimePoint start;
1033   eb.loop();
1034   TimePoint end;
1035
1036   ASSERT_EQ(t.timestamps.size(), 2);
1037   T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1038   T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1039   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1040 }
1041
1042 /**
1043  * Test destroying a scheduled timeout object
1044  */
1045 TEST(EventBaseTest, DestroyTimeout) {
1046   class DestroyTimeout : public AsyncTimeout {
1047    public:
1048     DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1049       : AsyncTimeout(eb)
1050       , timeout_(t) {}
1051
1052     virtual void timeoutExpired() noexcept {
1053       delete timeout_;
1054     }
1055
1056    private:
1057     AsyncTimeout* timeout_;
1058   };
1059
1060   EventBase eb;
1061
1062   TestTimeout* t1 = new TestTimeout(&eb);
1063   t1->scheduleTimeout(30);
1064
1065   DestroyTimeout dt(&eb, t1);
1066   dt.scheduleTimeout(10);
1067
1068   TimePoint start;
1069   eb.loop();
1070   TimePoint end;
1071
1072   T_CHECK_TIMEOUT(start, end, milliseconds(10));
1073 }
1074
1075
1076 ///////////////////////////////////////////////////////////////////////////
1077 // Test for runInThreadTestFunc()
1078 ///////////////////////////////////////////////////////////////////////////
1079
1080 struct RunInThreadData {
1081   RunInThreadData(int numThreads, int opsPerThread)
1082     : opsPerThread(opsPerThread)
1083     , opsToGo(numThreads*opsPerThread) {}
1084
1085   EventBase evb;
1086   deque< pair<int, int> > values;
1087
1088   int opsPerThread;
1089   int opsToGo;
1090 };
1091
1092 struct RunInThreadArg {
1093   RunInThreadArg(RunInThreadData* data,
1094                  int threadId,
1095                  int value)
1096     : data(data)
1097     , thread(threadId)
1098     , value(value) {}
1099
1100   RunInThreadData* data;
1101   int thread;
1102   int value;
1103 };
1104
1105 void runInThreadTestFunc(RunInThreadArg* arg) {
1106   arg->data->values.push_back(make_pair(arg->thread, arg->value));
1107   RunInThreadData* data = arg->data;
1108   delete arg;
1109
1110   if(--data->opsToGo == 0) {
1111     // Break out of the event base loop if we are the last thread running
1112     data->evb.terminateLoopSoon();
1113   }
1114 }
1115
1116 TEST(EventBaseTest, RunInThread) {
1117   uint32_t numThreads = 50;
1118   uint32_t opsPerThread = 100;
1119   RunInThreadData data(numThreads, opsPerThread);
1120
1121   deque<std::thread> threads;
1122   for (uint32_t i = 0; i < numThreads; ++i) {
1123     threads.emplace_back([i, &data] {
1124         for (int n = 0; n < data.opsPerThread; ++n) {
1125           RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1126           data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1127           usleep(10);
1128         }
1129       });
1130   }
1131
1132   // Add a timeout event to run after 3 seconds.
1133   // Otherwise loop() will return immediately since there are no events to run.
1134   // Once the last thread exits, it will stop the loop().  However, this
1135   // timeout also stops the loop in case there is a bug performing the normal
1136   // stop.
1137   data.evb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1138                          3000);
1139
1140   TimePoint start;
1141   data.evb.loop();
1142   TimePoint end;
1143
1144   // Verify that the loop exited because all threads finished and requested it
1145   // to stop.  This should happen much sooner than the 3 second timeout.
1146   // Assert that it happens in under a second.  (This is still tons of extra
1147   // padding.)
1148
1149   auto timeTaken = std::chrono::duration_cast<milliseconds>(
1150     end.getTime() - start.getTime());
1151   ASSERT_LT(timeTaken.count(), 1000);
1152   VLOG(11) << "Time taken: " << timeTaken.count();
1153
1154   // Verify that we have all of the events from every thread
1155   int expectedValues[numThreads];
1156   for (uint32_t n = 0; n < numThreads; ++n) {
1157     expectedValues[n] = 0;
1158   }
1159   for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1160        it != data.values.end();
1161        ++it) {
1162     int threadID = it->first;
1163     int value = it->second;
1164     ASSERT_EQ(expectedValues[threadID], value);
1165     ++expectedValues[threadID];
1166   }
1167   for (uint32_t n = 0; n < numThreads; ++n) {
1168     ASSERT_EQ(expectedValues[n], opsPerThread);
1169   }
1170
1171   // Wait on all of the threads.
1172   for (auto& thread: threads) {
1173     thread.join();
1174   }
1175 }
1176
1177 //  This test simulates some calls, and verifies that the waiting happens by
1178 //  triggering what otherwise would be race conditions, and trying to detect
1179 //  whether any of the race conditions happened.
1180 TEST(EventBaseTest, RunInEventLoopThreadAndWait) {
1181   const size_t c = 256;
1182   vector<atomic<size_t>> atoms(c);
1183   for (size_t i = 0; i < c; ++i) {
1184     auto& atom = atoms.at(i);
1185     atom = 0;
1186   }
1187   vector<thread> threads(c);
1188   for (size_t i = 0; i < c; ++i) {
1189     auto& atom = atoms.at(i);
1190     auto& th = threads.at(i);
1191     th = thread([&atom] {
1192         EventBase eb;
1193         auto ebth = thread([&]{ eb.loopForever(); });
1194         eb.waitUntilRunning();
1195         eb.runInEventBaseThreadAndWait([&] {
1196           size_t x = 0;
1197           atom.compare_exchange_weak(
1198               x, 1, std::memory_order_release, std::memory_order_relaxed);
1199         });
1200         size_t x = 0;
1201         atom.compare_exchange_weak(
1202             x, 2, std::memory_order_release, std::memory_order_relaxed);
1203         eb.terminateLoopSoon();
1204         ebth.join();
1205     });
1206   }
1207   for (size_t i = 0; i < c; ++i) {
1208     auto& th = threads.at(i);
1209     th.join();
1210   }
1211   size_t sum = 0;
1212   for (auto& atom : atoms) sum += atom;
1213   EXPECT_EQ(c, sum);
1214 }
1215
1216 ///////////////////////////////////////////////////////////////////////////
1217 // Tests for runInLoop()
1218 ///////////////////////////////////////////////////////////////////////////
1219
1220 class CountedLoopCallback : public EventBase::LoopCallback {
1221  public:
1222   CountedLoopCallback(EventBase* eventBase,
1223                       unsigned int count,
1224                       std::function<void()> action =
1225                         std::function<void()>())
1226     : eventBase_(eventBase)
1227     , count_(count)
1228     , action_(action) {}
1229
1230   virtual void runLoopCallback() noexcept {
1231     --count_;
1232     if (count_ > 0) {
1233       eventBase_->runInLoop(this);
1234     } else if (action_) {
1235       action_();
1236     }
1237   }
1238
1239   unsigned int getCount() const {
1240     return count_;
1241   }
1242
1243  private:
1244   EventBase* eventBase_;
1245   unsigned int count_;
1246   std::function<void()> action_;
1247 };
1248
1249 // Test that EventBase::loop() doesn't exit while there are
1250 // still LoopCallbacks remaining to be invoked.
1251 TEST(EventBaseTest, RepeatedRunInLoop) {
1252   EventBase eventBase;
1253
1254   CountedLoopCallback c(&eventBase, 10);
1255   eventBase.runInLoop(&c);
1256   // The callback shouldn't have run immediately
1257   ASSERT_EQ(c.getCount(), 10);
1258   eventBase.loop();
1259
1260   // loop() should loop until the CountedLoopCallback stops
1261   // re-installing itself.
1262   ASSERT_EQ(c.getCount(), 0);
1263 }
1264
1265 // Test runInLoop() calls with terminateLoopSoon()
1266 TEST(EventBaseTest, RunInLoopStopLoop) {
1267   EventBase eventBase;
1268
1269   CountedLoopCallback c1(&eventBase, 20);
1270   CountedLoopCallback c2(&eventBase, 10,
1271                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1272
1273   eventBase.runInLoop(&c1);
1274   eventBase.runInLoop(&c2);
1275   ASSERT_EQ(c1.getCount(), 20);
1276   ASSERT_EQ(c2.getCount(), 10);
1277
1278   eventBase.loopForever();
1279
1280   // c2 should have stopped the loop after 10 iterations
1281   ASSERT_EQ(c2.getCount(), 0);
1282
1283   // We allow the EventBase to run the loop callbacks in whatever order it
1284   // chooses.  We'll accept c1's count being either 10 (if the loop terminated
1285   // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1286   // before c1 ran).
1287   //
1288   // (With the current code, c1 will always run 10 times, but we don't consider
1289   // this a hard API requirement.)
1290   ASSERT_GE(c1.getCount(), 10);
1291   ASSERT_LE(c1.getCount(), 11);
1292 }
1293
1294 // Test cancelling runInLoop() callbacks
1295 TEST(EventBaseTest, CancelRunInLoop) {
1296   EventBase eventBase;
1297
1298   CountedLoopCallback c1(&eventBase, 20);
1299   CountedLoopCallback c2(&eventBase, 20);
1300   CountedLoopCallback c3(&eventBase, 20);
1301
1302   std::function<void()> cancelC1Action =
1303     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1304   std::function<void()> cancelC2Action =
1305     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1306
1307   CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1308   CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1309
1310   // Install cancelC1 after c1
1311   eventBase.runInLoop(&c1);
1312   eventBase.runInLoop(&cancelC1);
1313
1314   // Install cancelC2 before c2
1315   eventBase.runInLoop(&cancelC2);
1316   eventBase.runInLoop(&c2);
1317
1318   // Install c3
1319   eventBase.runInLoop(&c3);
1320
1321   ASSERT_EQ(c1.getCount(), 20);
1322   ASSERT_EQ(c2.getCount(), 20);
1323   ASSERT_EQ(c3.getCount(), 20);
1324   ASSERT_EQ(cancelC1.getCount(), 10);
1325   ASSERT_EQ(cancelC2.getCount(), 10);
1326
1327   // Run the loop
1328   eventBase.loop();
1329
1330   // cancelC1 and cancelC3 should have both fired after 10 iterations and
1331   // stopped re-installing themselves
1332   ASSERT_EQ(cancelC1.getCount(), 0);
1333   ASSERT_EQ(cancelC2.getCount(), 0);
1334   // c3 should have continued on for the full 20 iterations
1335   ASSERT_EQ(c3.getCount(), 0);
1336
1337   // c1 and c2 should have both been cancelled on the 10th iteration.
1338   //
1339   // Callbacks are always run in the order they are installed,
1340   // so c1 should have fired 10 times, and been canceled after it ran on the
1341   // 10th iteration.  c2 should have only fired 9 times, because cancelC2 will
1342   // have run before it on the 10th iteration, and cancelled it before it
1343   // fired.
1344   ASSERT_EQ(c1.getCount(), 10);
1345   ASSERT_EQ(c2.getCount(), 11);
1346 }
1347
1348 class TerminateTestCallback : public EventBase::LoopCallback,
1349                               public EventHandler {
1350  public:
1351   TerminateTestCallback(EventBase* eventBase, int fd)
1352     : EventHandler(eventBase, fd),
1353       eventBase_(eventBase),
1354       loopInvocations_(0),
1355       maxLoopInvocations_(0),
1356       eventInvocations_(0),
1357       maxEventInvocations_(0) {}
1358
1359   void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1360     loopInvocations_ = 0;
1361     maxLoopInvocations_ = maxLoopInvocations;
1362     eventInvocations_ = 0;
1363     maxEventInvocations_ = maxEventInvocations;
1364
1365     cancelLoopCallback();
1366     unregisterHandler();
1367   }
1368
1369   virtual void handlerReady(uint16_t events) noexcept {
1370     // We didn't register with PERSIST, so we will have been automatically
1371     // unregistered already.
1372     ASSERT_FALSE(isHandlerRegistered());
1373
1374     ++eventInvocations_;
1375     if (eventInvocations_ >= maxEventInvocations_) {
1376       return;
1377     }
1378
1379     eventBase_->runInLoop(this);
1380   }
1381   virtual void runLoopCallback() noexcept {
1382     ++loopInvocations_;
1383     if (loopInvocations_ >= maxLoopInvocations_) {
1384       return;
1385     }
1386
1387     registerHandler(READ);
1388   }
1389
1390   uint32_t getLoopInvocations() const {
1391     return loopInvocations_;
1392   }
1393   uint32_t getEventInvocations() const {
1394     return eventInvocations_;
1395   }
1396
1397  private:
1398   EventBase* eventBase_;
1399   uint32_t loopInvocations_;
1400   uint32_t maxLoopInvocations_;
1401   uint32_t eventInvocations_;
1402   uint32_t maxEventInvocations_;
1403 };
1404
1405 /**
1406  * Test that EventBase::loop() correctly detects when there are no more events
1407  * left to run.
1408  *
1409  * This uses a single callback, which alternates registering itself as a loop
1410  * callback versus a EventHandler callback.  This exercises a regression where
1411  * EventBase::loop() incorrectly exited if there were no more fd handlers
1412  * registered, but a loop callback installed a new fd handler.
1413  */
1414 TEST(EventBaseTest, LoopTermination) {
1415   EventBase eventBase;
1416
1417   // Open a pipe and close the write end,
1418   // so the read endpoint will be readable
1419   int pipeFds[2];
1420   int rc = pipe(pipeFds);
1421   ASSERT_EQ(rc, 0);
1422   close(pipeFds[1]);
1423   TerminateTestCallback callback(&eventBase, pipeFds[0]);
1424
1425   // Test once where the callback will exit after a loop callback
1426   callback.reset(10, 100);
1427   eventBase.runInLoop(&callback);
1428   eventBase.loop();
1429   ASSERT_EQ(callback.getLoopInvocations(), 10);
1430   ASSERT_EQ(callback.getEventInvocations(), 9);
1431
1432   // Test once where the callback will exit after an fd event callback
1433   callback.reset(100, 7);
1434   eventBase.runInLoop(&callback);
1435   eventBase.loop();
1436   ASSERT_EQ(callback.getLoopInvocations(), 7);
1437   ASSERT_EQ(callback.getEventInvocations(), 7);
1438
1439   close(pipeFds[0]);
1440 }
1441
1442 ///////////////////////////////////////////////////////////////////////////
1443 // Tests for latency calculations
1444 ///////////////////////////////////////////////////////////////////////////
1445
1446 class IdleTimeTimeoutSeries : public AsyncTimeout {
1447
1448  public:
1449
1450   explicit IdleTimeTimeoutSeries(EventBase *base,
1451                                  std::deque<std::uint64_t>& timeout) :
1452     AsyncTimeout(base),
1453     timeouts_(0),
1454     timeout_(timeout) {
1455       scheduleTimeout(1);
1456     }
1457
1458   virtual ~IdleTimeTimeoutSeries() {}
1459
1460   void timeoutExpired() noexcept {
1461     ++timeouts_;
1462
1463     if(timeout_.empty()){
1464       cancelTimeout();
1465     } else {
1466       uint64_t sleepTime = timeout_.front();
1467       timeout_.pop_front();
1468       if (sleepTime) {
1469         usleep(sleepTime);
1470       }
1471       scheduleTimeout(1);
1472     }
1473   }
1474
1475   int getTimeouts() const {
1476     return timeouts_;
1477   }
1478
1479  private:
1480   int timeouts_;
1481   std::deque<uint64_t>& timeout_;
1482 };
1483
1484 /**
1485  * Verify that idle time is correctly accounted for when decaying our loop
1486  * time.
1487  *
1488  * This works by creating a high loop time (via usleep), expecting a latency
1489  * callback with known value, and then scheduling a timeout for later. This
1490  * later timeout is far enough in the future that the idle time should have
1491  * caused the loop time to decay.
1492  */
1493 TEST(EventBaseTest, IdleTime) {
1494   EventBase eventBase;
1495   eventBase.setLoadAvgMsec(1000);
1496   eventBase.resetLoadAvg(5900.0);
1497   std::deque<uint64_t> timeouts0(4, 8080);
1498   timeouts0.push_front(8000);
1499   timeouts0.push_back(14000);
1500   IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1501   std::deque<uint64_t> timeouts(20, 20);
1502   std::unique_ptr<IdleTimeTimeoutSeries> tos;
1503   int64_t testStart = duration_cast<microseconds>(
1504     std::chrono::steady_clock::now().time_since_epoch()).count();
1505   bool hostOverloaded = false;
1506
1507   int latencyCallbacks = 0;
1508   eventBase.setMaxLatency(6000, [&]() {
1509     ++latencyCallbacks;
1510
1511     switch (latencyCallbacks) {
1512     case 1:
1513       if (tos0.getTimeouts() < 6) {
1514         // This could only happen if the host this test is running
1515         // on is heavily loaded.
1516         int64_t maxLatencyReached = duration_cast<microseconds>(
1517             std::chrono::steady_clock::now().time_since_epoch()).count();
1518         ASSERT_LE(43800, maxLatencyReached - testStart);
1519         hostOverloaded = true;
1520         break;
1521       }
1522       ASSERT_EQ(6, tos0.getTimeouts());
1523       ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1524       ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1525       tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1526       break;
1527
1528     default:
1529       FAIL() << "Unexpected latency callback";
1530       break;
1531     }
1532   });
1533
1534   // Kick things off with an "immedite" timeout
1535   tos0.scheduleTimeout(1);
1536
1537   eventBase.loop();
1538
1539   if (hostOverloaded) {
1540     return;
1541   }
1542
1543   ASSERT_EQ(1, latencyCallbacks);
1544   ASSERT_EQ(7, tos0.getTimeouts());
1545   ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1546   ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1547   ASSERT_TRUE(!!tos);
1548   ASSERT_EQ(21, tos->getTimeouts());
1549 }
1550
1551 /**
1552  * Test that thisLoop functionality works with terminateLoopSoon
1553  */
1554 TEST(EventBaseTest, ThisLoop) {
1555   EventBase eb;
1556   bool runInLoop = false;
1557   bool runThisLoop = false;
1558
1559   eb.runInLoop([&](){
1560       eb.terminateLoopSoon();
1561       eb.runInLoop([&]() {
1562           runInLoop = true;
1563         });
1564       eb.runInLoop([&]() {
1565           runThisLoop = true;
1566         }, true);
1567     }, true);
1568   eb.loopForever();
1569
1570   // Should not work
1571   ASSERT_FALSE(runInLoop);
1572   // Should work with thisLoop
1573   ASSERT_TRUE(runThisLoop);
1574 }
1575
1576 TEST(EventBaseTest, EventBaseThreadLoop) {
1577   EventBase base;
1578   bool ran = false;
1579
1580   base.runInEventBaseThread([&](){
1581     ran = true;
1582   });
1583   base.loop();
1584
1585   ASSERT_EQ(true, ran);
1586 }
1587
1588 TEST(EventBaseTest, EventBaseThreadName) {
1589   EventBase base;
1590   base.setName("foo");
1591   base.loop();
1592
1593 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1594   char name[16];
1595   pthread_getname_np(pthread_self(), name, 16);
1596   ASSERT_EQ(0, strcmp("foo", name));
1597 #endif
1598 }
1599
1600 TEST(EventBaseTest, RunBeforeLoop) {
1601   EventBase base;
1602   CountedLoopCallback cb(&base, 1, [&](){
1603     base.terminateLoopSoon();
1604   });
1605   base.runBeforeLoop(&cb);
1606   base.loopForever();
1607   ASSERT_EQ(cb.getCount(), 0);
1608 }
1609
1610 TEST(EventBaseTest, RunBeforeLoopWait) {
1611   EventBase base;
1612   CountedLoopCallback cb(&base, 1);
1613   base.runAfterDelay([&](){
1614       base.terminateLoopSoon();
1615     }, 500);
1616   base.runBeforeLoop(&cb);
1617   base.loopForever();
1618
1619   // Check that we only ran once, and did not loop multiple times.
1620   ASSERT_EQ(cb.getCount(), 0);
1621 }
1622
1623 class PipeHandler : public EventHandler {
1624 public:
1625   PipeHandler(EventBase* eventBase, int fd)
1626     : EventHandler(eventBase, fd) {}
1627
1628   void handlerReady(uint16_t events) noexcept {
1629     abort();
1630   }
1631 };
1632
1633 TEST(EventBaseTest, StopBeforeLoop) {
1634   EventBase evb;
1635
1636   // Give the evb something to do.
1637   int p[2];
1638   ASSERT_EQ(0, pipe(p));
1639   PipeHandler handler(&evb, p[0]);
1640   handler.registerHandler(EventHandler::READ);
1641
1642   // It's definitely not running yet
1643   evb.terminateLoopSoon();
1644
1645   // let it run, it should exit quickly.
1646   std::thread t([&] { evb.loop(); });
1647   t.join();
1648
1649   handler.unregisterHandler();
1650   close(p[0]);
1651   close(p[1]);
1652
1653   SUCCEED();
1654 }
1655
1656 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1657   bool ran = false;
1658
1659   {
1660     EventBase base;
1661     base.runInEventBaseThread([&](){
1662       ran = true;
1663     });
1664   }
1665
1666   ASSERT_TRUE(ran);
1667 }