ce1c6c770d3f279dd351fc477f72a3c90ff95cf3
[folly.git] / folly / io / async / test / EventBaseTest.cpp
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements. See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership. The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License. You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied. See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21 #include <folly/Memory.h>
22 #include <folly/ScopeGuard.h>
23
24 #include <folly/io/async/AsyncTimeout.h>
25 #include <folly/io/async/EventBase.h>
26 #include <folly/io/async/EventHandler.h>
27 #include <folly/io/async/test/SocketPair.h>
28 #include <folly/io/async/test/Util.h>
29 #include <folly/portability/Unistd.h>
30
31 #include <folly/futures/Promise.h>
32
33 #include <atomic>
34 #include <iostream>
35 #include <memory>
36 #include <thread>
37
38 using std::atomic;
39 using std::deque;
40 using std::pair;
41 using std::vector;
42 using std::unique_ptr;
43 using std::thread;
44 using std::make_pair;
45 using std::cerr;
46 using std::endl;
47 using std::chrono::milliseconds;
48 using std::chrono::microseconds;
49 using std::chrono::duration_cast;
50
51 using namespace std::chrono_literals;
52
53 using namespace folly;
54
55 ///////////////////////////////////////////////////////////////////////////
56 // Tests for read and write events
57 ///////////////////////////////////////////////////////////////////////////
58
59 enum { BUF_SIZE = 4096 };
60
61 ssize_t writeToFD(int fd, size_t length) {
62   // write an arbitrary amount of data to the fd
63   auto bufv = vector<char>(length);
64   auto buf = bufv.data();
65   memset(buf, 'a', length);
66   ssize_t rc = write(fd, buf, length);
67   CHECK_EQ(rc, length);
68   return rc;
69 }
70
71 size_t writeUntilFull(int fd) {
72   // Write to the fd until EAGAIN is returned
73   size_t bytesWritten = 0;
74   char buf[BUF_SIZE];
75   memset(buf, 'a', sizeof(buf));
76   while (true) {
77     ssize_t rc = write(fd, buf, sizeof(buf));
78     if (rc < 0) {
79       CHECK_EQ(errno, EAGAIN);
80       break;
81     } else {
82       bytesWritten += rc;
83     }
84   }
85   return bytesWritten;
86 }
87
88 ssize_t readFromFD(int fd, size_t length) {
89   // write an arbitrary amount of data to the fd
90   auto buf = vector<char>(length);
91   return read(fd, buf.data(), length);
92 }
93
94 size_t readUntilEmpty(int fd) {
95   // Read from the fd until EAGAIN is returned
96   char buf[BUF_SIZE];
97   size_t bytesRead = 0;
98   while (true) {
99     int rc = read(fd, buf, sizeof(buf));
100     if (rc == 0) {
101       CHECK(false) << "unexpected EOF";
102     } else if (rc < 0) {
103       CHECK_EQ(errno, EAGAIN);
104       break;
105     } else {
106       bytesRead += rc;
107     }
108   }
109   return bytesRead;
110 }
111
112 void checkReadUntilEmpty(int fd, size_t expectedLength) {
113   ASSERT_EQ(readUntilEmpty(fd), expectedLength);
114 }
115
116 struct ScheduledEvent {
117   int milliseconds;
118   uint16_t events;
119   size_t length;
120   ssize_t result;
121
122   void perform(int fd) {
123     if (events & EventHandler::READ) {
124       if (length == 0) {
125         result = readUntilEmpty(fd);
126       } else {
127         result = readFromFD(fd, length);
128       }
129     }
130     if (events & EventHandler::WRITE) {
131       if (length == 0) {
132         result = writeUntilFull(fd);
133       } else {
134         result = writeToFD(fd, length);
135       }
136     }
137   }
138 };
139
140 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
141   for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
142     eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
143                              ev->milliseconds);
144   }
145 }
146
147 class TestHandler : public EventHandler {
148  public:
149   TestHandler(EventBase* eventBase, int fd)
150     : EventHandler(eventBase, fd), fd_(fd) {}
151
152   void handlerReady(uint16_t events) noexcept override {
153     ssize_t bytesRead = 0;
154     ssize_t bytesWritten = 0;
155     if (events & READ) {
156       // Read all available data, so EventBase will stop calling us
157       // until new data becomes available
158       bytesRead = readUntilEmpty(fd_);
159     }
160     if (events & WRITE) {
161       // Write until the pipe buffer is full, so EventBase will stop calling
162       // us until the other end has read some data
163       bytesWritten = writeUntilFull(fd_);
164     }
165
166     log.emplace_back(events, bytesRead, bytesWritten);
167   }
168
169   struct EventRecord {
170     EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
171       : events(events)
172       , timestamp()
173       , bytesRead(bytesRead)
174       , bytesWritten(bytesWritten) {}
175
176     uint16_t events;
177     TimePoint timestamp;
178     ssize_t bytesRead;
179     ssize_t bytesWritten;
180   };
181
182   deque<EventRecord> log;
183
184  private:
185   int fd_;
186 };
187
188 /**
189  * Test a READ event
190  */
191 TEST(EventBaseTest, ReadEvent) {
192   EventBase eb;
193   SocketPair sp;
194
195   // Register for read events
196   TestHandler handler(&eb, sp[0]);
197   handler.registerHandler(EventHandler::READ);
198
199   // Register timeouts to perform two write events
200   ScheduledEvent events[] = {
201     { 10, EventHandler::WRITE, 2345, 0 },
202     { 160, EventHandler::WRITE, 99, 0 },
203     { 0, 0, 0, 0 },
204   };
205   scheduleEvents(&eb, sp[1], events);
206
207   // Loop
208   TimePoint start;
209   eb.loop();
210   TimePoint end;
211
212   // Since we didn't use the EventHandler::PERSIST flag, the handler should
213   // have received the first read, then unregistered itself.  Check that only
214   // the first chunk of data was received.
215   ASSERT_EQ(handler.log.size(), 1);
216   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
217   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
218                   milliseconds(events[0].milliseconds), milliseconds(90));
219   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
220   ASSERT_EQ(handler.log[0].bytesWritten, 0);
221   T_CHECK_TIMEOUT(start, end,
222                   milliseconds(events[1].milliseconds), milliseconds(30));
223
224   // Make sure the second chunk of data is still waiting to be read.
225   size_t bytesRemaining = readUntilEmpty(sp[0]);
226   ASSERT_EQ(bytesRemaining, events[1].length);
227 }
228
229 /**
230  * Test (READ | PERSIST)
231  */
232 TEST(EventBaseTest, ReadPersist) {
233   EventBase eb;
234   SocketPair sp;
235
236   // Register for read events
237   TestHandler handler(&eb, sp[0]);
238   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
239
240   // Register several timeouts to perform writes
241   ScheduledEvent events[] = {
242     { 10,  EventHandler::WRITE, 1024, 0 },
243     { 20,  EventHandler::WRITE, 2211, 0 },
244     { 30,  EventHandler::WRITE, 4096, 0 },
245     { 100, EventHandler::WRITE, 100,  0 },
246     { 0, 0, 0, 0 },
247   };
248   scheduleEvents(&eb, sp[1], events);
249
250   // Schedule a timeout to unregister the handler after the third write
251   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
252
253   // Loop
254   TimePoint start;
255   eb.loop();
256   TimePoint end;
257
258   // The handler should have received the first 3 events,
259   // then been unregistered after that.
260   ASSERT_EQ(handler.log.size(), 3);
261   for (int n = 0; n < 3; ++n) {
262     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
263     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
264                     milliseconds(events[n].milliseconds));
265     ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
266     ASSERT_EQ(handler.log[n].bytesWritten, 0);
267   }
268   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
269
270   // Make sure the data from the last write is still waiting to be read
271   size_t bytesRemaining = readUntilEmpty(sp[0]);
272   ASSERT_EQ(bytesRemaining, events[3].length);
273 }
274
275 /**
276  * Test registering for READ when the socket is immediately readable
277  */
278 TEST(EventBaseTest, ReadImmediate) {
279   EventBase eb;
280   SocketPair sp;
281
282   // Write some data to the socket so the other end will
283   // be immediately readable
284   size_t dataLength = 1234;
285   writeToFD(sp[1], dataLength);
286
287   // Register for read events
288   TestHandler handler(&eb, sp[0]);
289   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
290
291   // Register a timeout to perform another write
292   ScheduledEvent events[] = {
293     { 10, EventHandler::WRITE, 2345, 0 },
294     { 0, 0, 0, 0 },
295   };
296   scheduleEvents(&eb, sp[1], events);
297
298   // Schedule a timeout to unregister the handler
299   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
300
301   // Loop
302   TimePoint start;
303   eb.loop();
304   TimePoint end;
305
306   ASSERT_EQ(handler.log.size(), 2);
307
308   // There should have been 1 event for immediate readability
309   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
310   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
311   ASSERT_EQ(handler.log[0].bytesRead, dataLength);
312   ASSERT_EQ(handler.log[0].bytesWritten, 0);
313
314   // There should be another event after the timeout wrote more data
315   ASSERT_EQ(handler.log[1].events, EventHandler::READ);
316   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
317                   milliseconds(events[0].milliseconds));
318   ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
319   ASSERT_EQ(handler.log[1].bytesWritten, 0);
320
321   T_CHECK_TIMEOUT(start, end, milliseconds(20));
322 }
323
324 /**
325  * Test a WRITE event
326  */
327 TEST(EventBaseTest, WriteEvent) {
328   EventBase eb;
329   SocketPair sp;
330
331   // Fill up the write buffer before starting
332   size_t initialBytesWritten = writeUntilFull(sp[0]);
333
334   // Register for write events
335   TestHandler handler(&eb, sp[0]);
336   handler.registerHandler(EventHandler::WRITE);
337
338   // Register timeouts to perform two reads
339   ScheduledEvent events[] = {
340     { 10, EventHandler::READ, 0, 0 },
341     { 60, EventHandler::READ, 0, 0 },
342     { 0, 0, 0, 0 },
343   };
344   scheduleEvents(&eb, sp[1], events);
345
346   // Loop
347   TimePoint start;
348   eb.loop();
349   TimePoint end;
350
351   // Since we didn't use the EventHandler::PERSIST flag, the handler should
352   // have only been able to write once, then unregistered itself.
353   ASSERT_EQ(handler.log.size(), 1);
354   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
355   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
356                   milliseconds(events[0].milliseconds));
357   ASSERT_EQ(handler.log[0].bytesRead, 0);
358   ASSERT_GT(handler.log[0].bytesWritten, 0);
359   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
360
361   ASSERT_EQ(events[0].result, initialBytesWritten);
362   ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
363 }
364
365 /**
366  * Test (WRITE | PERSIST)
367  */
368 TEST(EventBaseTest, WritePersist) {
369   EventBase eb;
370   SocketPair sp;
371
372   // Fill up the write buffer before starting
373   size_t initialBytesWritten = writeUntilFull(sp[0]);
374
375   // Register for write events
376   TestHandler handler(&eb, sp[0]);
377   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
378
379   // Register several timeouts to read from the socket at several intervals
380   ScheduledEvent events[] = {
381     { 10,  EventHandler::READ, 0, 0 },
382     { 40,  EventHandler::READ, 0, 0 },
383     { 70,  EventHandler::READ, 0, 0 },
384     { 100, EventHandler::READ, 0, 0 },
385     { 0, 0, 0, 0 },
386   };
387   scheduleEvents(&eb, sp[1], events);
388
389   // Schedule a timeout to unregister the handler after the third read
390   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
391
392   // Loop
393   TimePoint start;
394   eb.loop();
395   TimePoint end;
396
397   // The handler should have received the first 3 events,
398   // then been unregistered after that.
399   ASSERT_EQ(handler.log.size(), 3);
400   ASSERT_EQ(events[0].result, initialBytesWritten);
401   for (int n = 0; n < 3; ++n) {
402     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
403     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
404                     milliseconds(events[n].milliseconds));
405     ASSERT_EQ(handler.log[n].bytesRead, 0);
406     ASSERT_GT(handler.log[n].bytesWritten, 0);
407     ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
408   }
409   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
410 }
411
412 /**
413  * Test registering for WRITE when the socket is immediately writable
414  */
415 TEST(EventBaseTest, WriteImmediate) {
416   EventBase eb;
417   SocketPair sp;
418
419   // Register for write events
420   TestHandler handler(&eb, sp[0]);
421   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
422
423   // Register a timeout to perform a read
424   ScheduledEvent events[] = {
425     { 10, EventHandler::READ, 0, 0 },
426     { 0, 0, 0, 0 },
427   };
428   scheduleEvents(&eb, sp[1], events);
429
430   // Schedule a timeout to unregister the handler
431   int64_t unregisterTimeout = 40;
432   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
433                    unregisterTimeout);
434
435   // Loop
436   TimePoint start;
437   eb.loop();
438   TimePoint end;
439
440   ASSERT_EQ(handler.log.size(), 2);
441
442   // Since the socket buffer was initially empty,
443   // there should have been 1 event for immediate writability
444   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
445   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
446   ASSERT_EQ(handler.log[0].bytesRead, 0);
447   ASSERT_GT(handler.log[0].bytesWritten, 0);
448
449   // There should be another event after the timeout wrote more data
450   ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
451   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
452                   milliseconds(events[0].milliseconds));
453   ASSERT_EQ(handler.log[1].bytesRead, 0);
454   ASSERT_GT(handler.log[1].bytesWritten, 0);
455
456   T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
457 }
458
459 /**
460  * Test (READ | WRITE) when the socket becomes readable first
461  */
462 TEST(EventBaseTest, ReadWrite) {
463   EventBase eb;
464   SocketPair sp;
465
466   // Fill up the write buffer before starting
467   size_t sock0WriteLength = writeUntilFull(sp[0]);
468
469   // Register for read and write events
470   TestHandler handler(&eb, sp[0]);
471   handler.registerHandler(EventHandler::READ_WRITE);
472
473   // Register timeouts to perform a write then a read.
474   ScheduledEvent events[] = {
475     { 10, EventHandler::WRITE, 2345, 0 },
476     { 40, EventHandler::READ, 0, 0 },
477     { 0, 0, 0, 0 },
478   };
479   scheduleEvents(&eb, sp[1], events);
480
481   // Loop
482   TimePoint start;
483   eb.loop();
484   TimePoint end;
485
486   // Since we didn't use the EventHandler::PERSIST flag, the handler should
487   // have only noticed readability, then unregistered itself.  Check that only
488   // one event was logged.
489   ASSERT_EQ(handler.log.size(), 1);
490   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
491   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
492                   milliseconds(events[0].milliseconds));
493   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
494   ASSERT_EQ(handler.log[0].bytesWritten, 0);
495   ASSERT_EQ(events[1].result, sock0WriteLength);
496   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
497 }
498
499 /**
500  * Test (READ | WRITE) when the socket becomes writable first
501  */
502 TEST(EventBaseTest, WriteRead) {
503   EventBase eb;
504   SocketPair sp;
505
506   // Fill up the write buffer before starting
507   size_t sock0WriteLength = writeUntilFull(sp[0]);
508
509   // Register for read and write events
510   TestHandler handler(&eb, sp[0]);
511   handler.registerHandler(EventHandler::READ_WRITE);
512
513   // Register timeouts to perform a read then a write.
514   size_t sock1WriteLength = 2345;
515   ScheduledEvent events[] = {
516     { 10, EventHandler::READ, 0, 0 },
517     { 40, EventHandler::WRITE, sock1WriteLength, 0 },
518     { 0, 0, 0, 0 },
519   };
520   scheduleEvents(&eb, sp[1], events);
521
522   // Loop
523   TimePoint start;
524   eb.loop();
525   TimePoint end;
526
527   // Since we didn't use the EventHandler::PERSIST flag, the handler should
528   // have only noticed writability, then unregistered itself.  Check that only
529   // one event was logged.
530   ASSERT_EQ(handler.log.size(), 1);
531   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
532   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
533                   milliseconds(events[0].milliseconds));
534   ASSERT_EQ(handler.log[0].bytesRead, 0);
535   ASSERT_GT(handler.log[0].bytesWritten, 0);
536   ASSERT_EQ(events[0].result, sock0WriteLength);
537   ASSERT_EQ(events[1].result, sock1WriteLength);
538   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
539
540   // Make sure the written data is still waiting to be read.
541   size_t bytesRemaining = readUntilEmpty(sp[0]);
542   ASSERT_EQ(bytesRemaining, events[1].length);
543 }
544
545 /**
546  * Test (READ | WRITE) when the socket becomes readable and writable
547  * at the same time.
548  */
549 TEST(EventBaseTest, ReadWriteSimultaneous) {
550   EventBase eb;
551   SocketPair sp;
552
553   // Fill up the write buffer before starting
554   size_t sock0WriteLength = writeUntilFull(sp[0]);
555
556   // Register for read and write events
557   TestHandler handler(&eb, sp[0]);
558   handler.registerHandler(EventHandler::READ_WRITE);
559
560   // Register a timeout to perform a read and write together
561   ScheduledEvent events[] = {
562     { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
563     { 0, 0, 0, 0 },
564   };
565   scheduleEvents(&eb, sp[1], events);
566
567   // Loop
568   TimePoint start;
569   eb.loop();
570   TimePoint end;
571
572   // It's not strictly required that the EventBase register us about both
573   // events in the same call.  So, it's possible that if the EventBase
574   // implementation changes this test could start failing, and it wouldn't be
575   // considered breaking the API.  However for now it's nice to exercise this
576   // code path.
577   ASSERT_EQ(handler.log.size(), 1);
578   ASSERT_EQ(handler.log[0].events,
579                     EventHandler::READ | EventHandler::WRITE);
580   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
581                   milliseconds(events[0].milliseconds));
582   ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
583   ASSERT_GT(handler.log[0].bytesWritten, 0);
584   T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
585 }
586
587 /**
588  * Test (READ | WRITE | PERSIST)
589  */
590 TEST(EventBaseTest, ReadWritePersist) {
591   EventBase eb;
592   SocketPair sp;
593
594   // Register for read and write events
595   TestHandler handler(&eb, sp[0]);
596   handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
597                           EventHandler::PERSIST);
598
599   // Register timeouts to perform several reads and writes
600   ScheduledEvent events[] = {
601     { 10, EventHandler::WRITE, 2345, 0 },
602     { 20, EventHandler::READ, 0, 0 },
603     { 35, EventHandler::WRITE, 200, 0 },
604     { 45, EventHandler::WRITE, 15, 0 },
605     { 55, EventHandler::READ, 0, 0 },
606     { 120, EventHandler::WRITE, 2345, 0 },
607     { 0, 0, 0, 0 },
608   };
609   scheduleEvents(&eb, sp[1], events);
610
611   // Schedule a timeout to unregister the handler
612   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
613
614   // Loop
615   TimePoint start;
616   eb.loop();
617   TimePoint end;
618
619   ASSERT_EQ(handler.log.size(), 6);
620
621   // Since we didn't fill up the write buffer immediately, there should
622   // be an immediate event for writability.
623   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
624   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
625   ASSERT_EQ(handler.log[0].bytesRead, 0);
626   ASSERT_GT(handler.log[0].bytesWritten, 0);
627
628   // Events 1 through 5 should correspond to the scheduled events
629   for (int n = 1; n < 6; ++n) {
630     ScheduledEvent* event = &events[n - 1];
631     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
632                     milliseconds(event->milliseconds));
633     if (event->events == EventHandler::READ) {
634       ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
635       ASSERT_EQ(handler.log[n].bytesRead, 0);
636       ASSERT_GT(handler.log[n].bytesWritten, 0);
637     } else {
638       ASSERT_EQ(handler.log[n].events, EventHandler::READ);
639       ASSERT_EQ(handler.log[n].bytesRead, event->length);
640       ASSERT_EQ(handler.log[n].bytesWritten, 0);
641     }
642   }
643
644   // The timeout should have unregistered the handler before the last write.
645   // Make sure that data is still waiting to be read
646   size_t bytesRemaining = readUntilEmpty(sp[0]);
647   ASSERT_EQ(bytesRemaining, events[5].length);
648 }
649
650
651 class PartialReadHandler : public TestHandler {
652  public:
653   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
654     : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
655
656   void handlerReady(uint16_t events) noexcept override {
657     assert(events == EventHandler::READ);
658     ssize_t bytesRead = readFromFD(fd_, readLength_);
659     log.emplace_back(events, bytesRead, 0);
660   }
661
662  private:
663   int fd_;
664   size_t readLength_;
665 };
666
667 /**
668  * Test reading only part of the available data when a read event is fired.
669  * When PERSIST is used, make sure the handler gets notified again the next
670  * time around the loop.
671  */
672 TEST(EventBaseTest, ReadPartial) {
673   EventBase eb;
674   SocketPair sp;
675
676   // Register for read events
677   size_t readLength = 100;
678   PartialReadHandler handler(&eb, sp[0], readLength);
679   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
680
681   // Register a timeout to perform a single write,
682   // with more data than PartialReadHandler will read at once
683   ScheduledEvent events[] = {
684     { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
685     { 0, 0, 0, 0 },
686   };
687   scheduleEvents(&eb, sp[1], events);
688
689   // Schedule a timeout to unregister the handler
690   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
691
692   // Loop
693   TimePoint start;
694   eb.loop();
695   TimePoint end;
696
697   ASSERT_EQ(handler.log.size(), 4);
698
699   // The first 3 invocations should read readLength bytes each
700   for (int n = 0; n < 3; ++n) {
701     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
702     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
703                     milliseconds(events[0].milliseconds));
704     ASSERT_EQ(handler.log[n].bytesRead, readLength);
705     ASSERT_EQ(handler.log[n].bytesWritten, 0);
706   }
707   // The last read only has readLength/2 bytes
708   ASSERT_EQ(handler.log[3].events, EventHandler::READ);
709   T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
710                   milliseconds(events[0].milliseconds));
711   ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
712   ASSERT_EQ(handler.log[3].bytesWritten, 0);
713 }
714
715
716 class PartialWriteHandler : public TestHandler {
717  public:
718   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
719     : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
720
721   void handlerReady(uint16_t events) noexcept override {
722     assert(events == EventHandler::WRITE);
723     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
724     log.emplace_back(events, 0, bytesWritten);
725   }
726
727  private:
728   int fd_;
729   size_t writeLength_;
730 };
731
732 /**
733  * Test writing without completely filling up the write buffer when the fd
734  * becomes writable.  When PERSIST is used, make sure the handler gets
735  * notified again the next time around the loop.
736  */
737 TEST(EventBaseTest, WritePartial) {
738   EventBase eb;
739   SocketPair sp;
740
741   // Fill up the write buffer before starting
742   size_t initialBytesWritten = writeUntilFull(sp[0]);
743
744   // Register for write events
745   size_t writeLength = 100;
746   PartialWriteHandler handler(&eb, sp[0], writeLength);
747   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
748
749   // Register a timeout to read, so that more data can be written
750   ScheduledEvent events[] = {
751     { 10, EventHandler::READ, 0, 0 },
752     { 0, 0, 0, 0 },
753   };
754   scheduleEvents(&eb, sp[1], events);
755
756   // Schedule a timeout to unregister the handler
757   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
758
759   // Loop
760   TimePoint start;
761   eb.loop();
762   TimePoint end;
763
764   // Depending on how big the socket buffer is, there will be multiple writes
765   // Only check the first 5
766   int numChecked = 5;
767   ASSERT_GE(handler.log.size(), numChecked);
768   ASSERT_EQ(events[0].result, initialBytesWritten);
769
770   // The first 3 invocations should read writeLength bytes each
771   for (int n = 0; n < numChecked; ++n) {
772     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
773     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
774                     milliseconds(events[0].milliseconds));
775     ASSERT_EQ(handler.log[n].bytesRead, 0);
776     ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
777   }
778 }
779
780
781 /**
782  * Test destroying a registered EventHandler
783  */
784 TEST(EventBaseTest, DestroyHandler) {
785   class DestroyHandler : public AsyncTimeout {
786    public:
787     DestroyHandler(EventBase* eb, EventHandler* h)
788       : AsyncTimeout(eb)
789       , handler_(h) {}
790
791     void timeoutExpired() noexcept override { delete handler_; }
792
793    private:
794     EventHandler* handler_;
795   };
796
797   EventBase eb;
798   SocketPair sp;
799
800   // Fill up the write buffer before starting
801   size_t initialBytesWritten = writeUntilFull(sp[0]);
802
803   // Register for write events
804   TestHandler* handler = new TestHandler(&eb, sp[0]);
805   handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
806
807   // After 10ms, read some data, so that the handler
808   // will be notified that it can write.
809   eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
810                    10);
811
812   // Start a timer to destroy the handler after 25ms
813   // This mainly just makes sure the code doesn't break or assert
814   DestroyHandler dh(&eb, handler);
815   dh.scheduleTimeout(25);
816
817   TimePoint start;
818   eb.loop();
819   TimePoint end;
820
821   // Make sure the EventHandler was uninstalled properly when it was
822   // destroyed, and the EventBase loop exited
823   T_CHECK_TIMEOUT(start, end, milliseconds(25));
824
825   // Make sure that the handler wrote data to the socket
826   // before it was destroyed
827   size_t bytesRemaining = readUntilEmpty(sp[1]);
828   ASSERT_GT(bytesRemaining, 0);
829 }
830
831
832 ///////////////////////////////////////////////////////////////////////////
833 // Tests for timeout events
834 ///////////////////////////////////////////////////////////////////////////
835
836 TEST(EventBaseTest, RunAfterDelay) {
837   EventBase eb;
838
839   TimePoint timestamp1(false);
840   TimePoint timestamp2(false);
841   TimePoint timestamp3(false);
842   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
843   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
844   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
845
846   TimePoint start;
847   eb.loop();
848   TimePoint end;
849
850   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
851   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
852   T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
853   T_CHECK_TIMEOUT(start, end, milliseconds(40));
854 }
855
856 /**
857  * Test the behavior of tryRunAfterDelay() when some timeouts are
858  * still scheduled when the EventBase is destroyed.
859  */
860 TEST(EventBaseTest, RunAfterDelayDestruction) {
861   TimePoint timestamp1(false);
862   TimePoint timestamp2(false);
863   TimePoint timestamp3(false);
864   TimePoint timestamp4(false);
865   TimePoint start(false);
866   TimePoint end(false);
867
868   {
869     EventBase eb;
870
871     // Run two normal timeouts
872     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
873     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
874
875     // Schedule a timeout to stop the event loop after 40ms
876     eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
877
878     // Schedule 2 timeouts that would fire after the event loop stops
879     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
880     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
881
882     start.reset();
883     eb.loop();
884     end.reset();
885   }
886
887   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
888   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
889   T_CHECK_TIMEOUT(start, end, milliseconds(40));
890
891   ASSERT_TRUE(timestamp3.isUnset());
892   ASSERT_TRUE(timestamp4.isUnset());
893
894   // Ideally this test should be run under valgrind to ensure that no
895   // memory is leaked.
896 }
897
898 class TestTimeout : public AsyncTimeout {
899  public:
900   explicit TestTimeout(EventBase* eventBase)
901     : AsyncTimeout(eventBase)
902     , timestamp(false) {}
903
904   void timeoutExpired() noexcept override { timestamp.reset(); }
905
906   TimePoint timestamp;
907 };
908
909 TEST(EventBaseTest, BasicTimeouts) {
910   EventBase eb;
911
912   TestTimeout t1(&eb);
913   TestTimeout t2(&eb);
914   TestTimeout t3(&eb);
915   t1.scheduleTimeout(10);
916   t2.scheduleTimeout(20);
917   t3.scheduleTimeout(40);
918
919   TimePoint start;
920   eb.loop();
921   TimePoint end;
922
923   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
924   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
925   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
926   T_CHECK_TIMEOUT(start, end, milliseconds(40));
927 }
928
929 class ReschedulingTimeout : public AsyncTimeout {
930  public:
931   ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
932     : AsyncTimeout(evb)
933     , timeouts_(timeouts)
934     , iterator_(timeouts_.begin()) {}
935
936   void start() {
937     reschedule();
938   }
939
940   void timeoutExpired() noexcept override {
941     timestamps.emplace_back();
942     reschedule();
943   }
944
945   void reschedule() {
946     if (iterator_ != timeouts_.end()) {
947       uint32_t timeout = *iterator_;
948       ++iterator_;
949       scheduleTimeout(timeout);
950     }
951   }
952
953   vector<TimePoint> timestamps;
954
955  private:
956   vector<uint32_t> timeouts_;
957   vector<uint32_t>::const_iterator iterator_;
958 };
959
960 /**
961  * Test rescheduling the same timeout multiple times
962  */
963 TEST(EventBaseTest, ReuseTimeout) {
964   EventBase eb;
965
966   vector<uint32_t> timeouts;
967   timeouts.push_back(10);
968   timeouts.push_back(30);
969   timeouts.push_back(15);
970
971   ReschedulingTimeout t(&eb, timeouts);
972   t.start();
973
974   TimePoint start;
975   eb.loop();
976   TimePoint end;
977
978   // Use a higher tolerance than usual.  We're waiting on 3 timeouts
979   // consecutively.  In general, each timeout may go over by a few
980   // milliseconds, and we're tripling this error by witing on 3 timeouts.
981   milliseconds tolerance{6};
982
983   ASSERT_EQ(timeouts.size(), t.timestamps.size());
984   uint32_t total = 0;
985   for (size_t n = 0; n < timeouts.size(); ++n) {
986     total += timeouts[n];
987     T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
988   }
989   T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
990 }
991
992 /**
993  * Test rescheduling a timeout before it has fired
994  */
995 TEST(EventBaseTest, RescheduleTimeout) {
996   EventBase eb;
997
998   TestTimeout t1(&eb);
999   TestTimeout t2(&eb);
1000   TestTimeout t3(&eb);
1001
1002   t1.scheduleTimeout(15);
1003   t2.scheduleTimeout(30);
1004   t3.scheduleTimeout(30);
1005
1006   auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1007       &AsyncTimeout::scheduleTimeout);
1008
1009   // after 10ms, reschedule t2 to run sooner than originally scheduled
1010   eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1011   // after 10ms, reschedule t3 to run later than originally scheduled
1012   eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1013
1014   TimePoint start;
1015   eb.loop();
1016   TimePoint end;
1017
1018   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1019   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1020   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1021   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1022 }
1023
1024 /**
1025  * Test cancelling a timeout
1026  */
1027 TEST(EventBaseTest, CancelTimeout) {
1028   EventBase eb;
1029
1030   vector<uint32_t> timeouts;
1031   timeouts.push_back(10);
1032   timeouts.push_back(30);
1033   timeouts.push_back(25);
1034
1035   ReschedulingTimeout t(&eb, timeouts);
1036   t.start();
1037   eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1038
1039   TimePoint start;
1040   eb.loop();
1041   TimePoint end;
1042
1043   ASSERT_EQ(t.timestamps.size(), 2);
1044   T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1045   T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1046   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1047 }
1048
1049 /**
1050  * Test destroying a scheduled timeout object
1051  */
1052 TEST(EventBaseTest, DestroyTimeout) {
1053   class DestroyTimeout : public AsyncTimeout {
1054    public:
1055     DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1056       : AsyncTimeout(eb)
1057       , timeout_(t) {}
1058
1059     void timeoutExpired() noexcept override { delete timeout_; }
1060
1061    private:
1062     AsyncTimeout* timeout_;
1063   };
1064
1065   EventBase eb;
1066
1067   TestTimeout* t1 = new TestTimeout(&eb);
1068   t1->scheduleTimeout(30);
1069
1070   DestroyTimeout dt(&eb, t1);
1071   dt.scheduleTimeout(10);
1072
1073   TimePoint start;
1074   eb.loop();
1075   TimePoint end;
1076
1077   T_CHECK_TIMEOUT(start, end, milliseconds(10));
1078 }
1079
1080
1081 ///////////////////////////////////////////////////////////////////////////
1082 // Test for runInThreadTestFunc()
1083 ///////////////////////////////////////////////////////////////////////////
1084
1085 struct RunInThreadData {
1086   RunInThreadData(int numThreads, int opsPerThread)
1087     : opsPerThread(opsPerThread)
1088     , opsToGo(numThreads*opsPerThread) {}
1089
1090   EventBase evb;
1091   deque< pair<int, int> > values;
1092
1093   int opsPerThread;
1094   int opsToGo;
1095 };
1096
1097 struct RunInThreadArg {
1098   RunInThreadArg(RunInThreadData* data,
1099                  int threadId,
1100                  int value)
1101     : data(data)
1102     , thread(threadId)
1103     , value(value) {}
1104
1105   RunInThreadData* data;
1106   int thread;
1107   int value;
1108 };
1109
1110 void runInThreadTestFunc(RunInThreadArg* arg) {
1111   arg->data->values.emplace_back(arg->thread, arg->value);
1112   RunInThreadData* data = arg->data;
1113   delete arg;
1114
1115   if(--data->opsToGo == 0) {
1116     // Break out of the event base loop if we are the last thread running
1117     data->evb.terminateLoopSoon();
1118   }
1119 }
1120
1121 TEST(EventBaseTest, RunInThread) {
1122   constexpr uint32_t numThreads = 50;
1123   constexpr uint32_t opsPerThread = 100;
1124   RunInThreadData data(numThreads, opsPerThread);
1125
1126   deque<std::thread> threads;
1127   SCOPE_EXIT {
1128     // Wait on all of the threads.
1129     for (auto& thread : threads) {
1130       thread.join();
1131     }
1132   };
1133
1134   for (uint32_t i = 0; i < numThreads; ++i) {
1135     threads.emplace_back([i, &data] {
1136         for (int n = 0; n < data.opsPerThread; ++n) {
1137           RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1138           data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1139           usleep(10);
1140         }
1141       });
1142   }
1143
1144   // Add a timeout event to run after 3 seconds.
1145   // Otherwise loop() will return immediately since there are no events to run.
1146   // Once the last thread exits, it will stop the loop().  However, this
1147   // timeout also stops the loop in case there is a bug performing the normal
1148   // stop.
1149   data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1150                          3000);
1151
1152   TimePoint start;
1153   data.evb.loop();
1154   TimePoint end;
1155
1156   // Verify that the loop exited because all threads finished and requested it
1157   // to stop.  This should happen much sooner than the 3 second timeout.
1158   // Assert that it happens in under a second.  (This is still tons of extra
1159   // padding.)
1160
1161   auto timeTaken = std::chrono::duration_cast<milliseconds>(
1162     end.getTime() - start.getTime());
1163   ASSERT_LT(timeTaken.count(), 1000);
1164   VLOG(11) << "Time taken: " << timeTaken.count();
1165
1166   // Verify that we have all of the events from every thread
1167   int expectedValues[numThreads];
1168   for (uint32_t n = 0; n < numThreads; ++n) {
1169     expectedValues[n] = 0;
1170   }
1171   for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1172        it != data.values.end();
1173        ++it) {
1174     int threadID = it->first;
1175     int value = it->second;
1176     ASSERT_EQ(expectedValues[threadID], value);
1177     ++expectedValues[threadID];
1178   }
1179   for (uint32_t n = 0; n < numThreads; ++n) {
1180     ASSERT_EQ(expectedValues[n], opsPerThread);
1181   }
1182 }
1183
1184 //  This test simulates some calls, and verifies that the waiting happens by
1185 //  triggering what otherwise would be race conditions, and trying to detect
1186 //  whether any of the race conditions happened.
1187 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1188   const size_t c = 256;
1189   vector<unique_ptr<atomic<size_t>>> atoms(c);
1190   for (size_t i = 0; i < c; ++i) {
1191     auto& atom = atoms.at(i);
1192     atom = make_unique<atomic<size_t>>(0);
1193   }
1194   vector<thread> threads;
1195   for (size_t i = 0; i < c; ++i) {
1196     threads.emplace_back([&atoms, i] {
1197       EventBase eb;
1198       auto& atom = *atoms.at(i);
1199       auto ebth = thread([&] { eb.loopForever(); });
1200       eb.waitUntilRunning();
1201       eb.runInEventBaseThreadAndWait([&] {
1202         size_t x = 0;
1203         atom.compare_exchange_weak(
1204             x, 1, std::memory_order_release, std::memory_order_relaxed);
1205       });
1206       size_t x = 0;
1207       atom.compare_exchange_weak(
1208           x, 2, std::memory_order_release, std::memory_order_relaxed);
1209       eb.terminateLoopSoon();
1210       ebth.join();
1211     });
1212   }
1213   for (size_t i = 0; i < c; ++i) {
1214     auto& th = threads.at(i);
1215     th.join();
1216   }
1217   size_t sum = 0;
1218   for (auto& atom : atoms) sum += *atom;
1219   EXPECT_EQ(c, sum);
1220 }
1221
1222 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1223   EventBase eb;
1224   thread th(&EventBase::loopForever, &eb);
1225   SCOPE_EXIT {
1226     eb.terminateLoopSoon();
1227     th.join();
1228   };
1229   auto mutated = false;
1230   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1231       mutated = true;
1232   });
1233   EXPECT_TRUE(mutated);
1234 }
1235
1236 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1237   EventBase eb;
1238   thread th(&EventBase::loopForever, &eb);
1239   SCOPE_EXIT {
1240     eb.terminateLoopSoon();
1241     th.join();
1242   };
1243   eb.runInEventBaseThreadAndWait([&] {
1244       auto mutated = false;
1245       eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1246           mutated = true;
1247       });
1248       EXPECT_TRUE(mutated);
1249   });
1250 }
1251
1252 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1253   EventBase eb;
1254   auto mutated = false;
1255   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1256       mutated = true;
1257     });
1258   EXPECT_TRUE(mutated);
1259 }
1260
1261 ///////////////////////////////////////////////////////////////////////////
1262 // Tests for runInLoop()
1263 ///////////////////////////////////////////////////////////////////////////
1264
1265 class CountedLoopCallback : public EventBase::LoopCallback {
1266  public:
1267   CountedLoopCallback(EventBase* eventBase,
1268                       unsigned int count,
1269                       std::function<void()> action =
1270                         std::function<void()>())
1271     : eventBase_(eventBase)
1272     , count_(count)
1273     , action_(action) {}
1274
1275   void runLoopCallback() noexcept override {
1276     --count_;
1277     if (count_ > 0) {
1278       eventBase_->runInLoop(this);
1279     } else if (action_) {
1280       action_();
1281     }
1282   }
1283
1284   unsigned int getCount() const {
1285     return count_;
1286   }
1287
1288  private:
1289   EventBase* eventBase_;
1290   unsigned int count_;
1291   std::function<void()> action_;
1292 };
1293
1294 // Test that EventBase::loop() doesn't exit while there are
1295 // still LoopCallbacks remaining to be invoked.
1296 TEST(EventBaseTest, RepeatedRunInLoop) {
1297   EventBase eventBase;
1298
1299   CountedLoopCallback c(&eventBase, 10);
1300   eventBase.runInLoop(&c);
1301   // The callback shouldn't have run immediately
1302   ASSERT_EQ(c.getCount(), 10);
1303   eventBase.loop();
1304
1305   // loop() should loop until the CountedLoopCallback stops
1306   // re-installing itself.
1307   ASSERT_EQ(c.getCount(), 0);
1308 }
1309
1310 // Test that EventBase::loop() works as expected without time measurements.
1311 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1312   EventBase eventBase(false);
1313
1314   CountedLoopCallback c(&eventBase, 10);
1315   eventBase.runInLoop(&c);
1316   // The callback shouldn't have run immediately
1317   ASSERT_EQ(c.getCount(), 10);
1318   eventBase.loop();
1319
1320   // loop() should loop until the CountedLoopCallback stops
1321   // re-installing itself.
1322   ASSERT_EQ(c.getCount(), 0);
1323 }
1324
1325 // Test runInLoop() calls with terminateLoopSoon()
1326 TEST(EventBaseTest, RunInLoopStopLoop) {
1327   EventBase eventBase;
1328
1329   CountedLoopCallback c1(&eventBase, 20);
1330   CountedLoopCallback c2(&eventBase, 10,
1331                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1332
1333   eventBase.runInLoop(&c1);
1334   eventBase.runInLoop(&c2);
1335   ASSERT_EQ(c1.getCount(), 20);
1336   ASSERT_EQ(c2.getCount(), 10);
1337
1338   eventBase.loopForever();
1339
1340   // c2 should have stopped the loop after 10 iterations
1341   ASSERT_EQ(c2.getCount(), 0);
1342
1343   // We allow the EventBase to run the loop callbacks in whatever order it
1344   // chooses.  We'll accept c1's count being either 10 (if the loop terminated
1345   // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1346   // before c1 ran).
1347   //
1348   // (With the current code, c1 will always run 10 times, but we don't consider
1349   // this a hard API requirement.)
1350   ASSERT_GE(c1.getCount(), 10);
1351   ASSERT_LE(c1.getCount(), 11);
1352 }
1353
1354 TEST(EventBaseTest, messageAvailableException) {
1355   auto deadManWalking = [] {
1356     EventBase eventBase;
1357     std::thread t([&] {
1358       // Call this from another thread to force use of NotificationQueue in
1359       // runInEventBaseThread
1360       eventBase.runInEventBaseThread(
1361           []() { throw std::runtime_error("boom"); });
1362     });
1363     t.join();
1364     eventBase.loopForever();
1365   };
1366   EXPECT_DEATH(deadManWalking(), ".*");
1367 }
1368
1369 TEST(EventBaseTest, TryRunningAfterTerminate) {
1370   EventBase eventBase;
1371   CountedLoopCallback c1(&eventBase, 1,
1372                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1373   eventBase.runInLoop(&c1);
1374   eventBase.loopForever();
1375   bool ran = false;
1376   eventBase.runInEventBaseThread([&]() {
1377     ran = true;
1378   });
1379
1380   ASSERT_FALSE(ran);
1381 }
1382
1383 // Test cancelling runInLoop() callbacks
1384 TEST(EventBaseTest, CancelRunInLoop) {
1385   EventBase eventBase;
1386
1387   CountedLoopCallback c1(&eventBase, 20);
1388   CountedLoopCallback c2(&eventBase, 20);
1389   CountedLoopCallback c3(&eventBase, 20);
1390
1391   std::function<void()> cancelC1Action =
1392     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1393   std::function<void()> cancelC2Action =
1394     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1395
1396   CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1397   CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1398
1399   // Install cancelC1 after c1
1400   eventBase.runInLoop(&c1);
1401   eventBase.runInLoop(&cancelC1);
1402
1403   // Install cancelC2 before c2
1404   eventBase.runInLoop(&cancelC2);
1405   eventBase.runInLoop(&c2);
1406
1407   // Install c3
1408   eventBase.runInLoop(&c3);
1409
1410   ASSERT_EQ(c1.getCount(), 20);
1411   ASSERT_EQ(c2.getCount(), 20);
1412   ASSERT_EQ(c3.getCount(), 20);
1413   ASSERT_EQ(cancelC1.getCount(), 10);
1414   ASSERT_EQ(cancelC2.getCount(), 10);
1415
1416   // Run the loop
1417   eventBase.loop();
1418
1419   // cancelC1 and cancelC3 should have both fired after 10 iterations and
1420   // stopped re-installing themselves
1421   ASSERT_EQ(cancelC1.getCount(), 0);
1422   ASSERT_EQ(cancelC2.getCount(), 0);
1423   // c3 should have continued on for the full 20 iterations
1424   ASSERT_EQ(c3.getCount(), 0);
1425
1426   // c1 and c2 should have both been cancelled on the 10th iteration.
1427   //
1428   // Callbacks are always run in the order they are installed,
1429   // so c1 should have fired 10 times, and been canceled after it ran on the
1430   // 10th iteration.  c2 should have only fired 9 times, because cancelC2 will
1431   // have run before it on the 10th iteration, and cancelled it before it
1432   // fired.
1433   ASSERT_EQ(c1.getCount(), 10);
1434   ASSERT_EQ(c2.getCount(), 11);
1435 }
1436
1437 class TerminateTestCallback : public EventBase::LoopCallback,
1438                               public EventHandler {
1439  public:
1440   TerminateTestCallback(EventBase* eventBase, int fd)
1441     : EventHandler(eventBase, fd),
1442       eventBase_(eventBase),
1443       loopInvocations_(0),
1444       maxLoopInvocations_(0),
1445       eventInvocations_(0),
1446       maxEventInvocations_(0) {}
1447
1448   void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1449     loopInvocations_ = 0;
1450     maxLoopInvocations_ = maxLoopInvocations;
1451     eventInvocations_ = 0;
1452     maxEventInvocations_ = maxEventInvocations;
1453
1454     cancelLoopCallback();
1455     unregisterHandler();
1456   }
1457
1458   void handlerReady(uint16_t /* events */) noexcept override {
1459     // We didn't register with PERSIST, so we will have been automatically
1460     // unregistered already.
1461     ASSERT_FALSE(isHandlerRegistered());
1462
1463     ++eventInvocations_;
1464     if (eventInvocations_ >= maxEventInvocations_) {
1465       return;
1466     }
1467
1468     eventBase_->runInLoop(this);
1469   }
1470   void runLoopCallback() noexcept override {
1471     ++loopInvocations_;
1472     if (loopInvocations_ >= maxLoopInvocations_) {
1473       return;
1474     }
1475
1476     registerHandler(READ);
1477   }
1478
1479   uint32_t getLoopInvocations() const {
1480     return loopInvocations_;
1481   }
1482   uint32_t getEventInvocations() const {
1483     return eventInvocations_;
1484   }
1485
1486  private:
1487   EventBase* eventBase_;
1488   uint32_t loopInvocations_;
1489   uint32_t maxLoopInvocations_;
1490   uint32_t eventInvocations_;
1491   uint32_t maxEventInvocations_;
1492 };
1493
1494 /**
1495  * Test that EventBase::loop() correctly detects when there are no more events
1496  * left to run.
1497  *
1498  * This uses a single callback, which alternates registering itself as a loop
1499  * callback versus a EventHandler callback.  This exercises a regression where
1500  * EventBase::loop() incorrectly exited if there were no more fd handlers
1501  * registered, but a loop callback installed a new fd handler.
1502  */
1503 TEST(EventBaseTest, LoopTermination) {
1504   EventBase eventBase;
1505
1506   // Open a pipe and close the write end,
1507   // so the read endpoint will be readable
1508   int pipeFds[2];
1509   int rc = pipe(pipeFds);
1510   ASSERT_EQ(rc, 0);
1511   close(pipeFds[1]);
1512   TerminateTestCallback callback(&eventBase, pipeFds[0]);
1513
1514   // Test once where the callback will exit after a loop callback
1515   callback.reset(10, 100);
1516   eventBase.runInLoop(&callback);
1517   eventBase.loop();
1518   ASSERT_EQ(callback.getLoopInvocations(), 10);
1519   ASSERT_EQ(callback.getEventInvocations(), 9);
1520
1521   // Test once where the callback will exit after an fd event callback
1522   callback.reset(100, 7);
1523   eventBase.runInLoop(&callback);
1524   eventBase.loop();
1525   ASSERT_EQ(callback.getLoopInvocations(), 7);
1526   ASSERT_EQ(callback.getEventInvocations(), 7);
1527
1528   close(pipeFds[0]);
1529 }
1530
1531 ///////////////////////////////////////////////////////////////////////////
1532 // Tests for latency calculations
1533 ///////////////////////////////////////////////////////////////////////////
1534
1535 class IdleTimeTimeoutSeries : public AsyncTimeout {
1536
1537  public:
1538
1539   explicit IdleTimeTimeoutSeries(EventBase *base,
1540                                  std::deque<std::uint64_t>& timeout) :
1541     AsyncTimeout(base),
1542     timeouts_(0),
1543     timeout_(timeout) {
1544       scheduleTimeout(1);
1545     }
1546
1547     ~IdleTimeTimeoutSeries() override {}
1548
1549     void timeoutExpired() noexcept override {
1550     ++timeouts_;
1551
1552     if(timeout_.empty()){
1553       cancelTimeout();
1554     } else {
1555       uint64_t sleepTime = timeout_.front();
1556       timeout_.pop_front();
1557       if (sleepTime) {
1558         usleep(sleepTime);
1559       }
1560       scheduleTimeout(1);
1561     }
1562   }
1563
1564   int getTimeouts() const {
1565     return timeouts_;
1566   }
1567
1568  private:
1569   int timeouts_;
1570   std::deque<uint64_t>& timeout_;
1571 };
1572
1573 /**
1574  * Verify that idle time is correctly accounted for when decaying our loop
1575  * time.
1576  *
1577  * This works by creating a high loop time (via usleep), expecting a latency
1578  * callback with known value, and then scheduling a timeout for later. This
1579  * later timeout is far enough in the future that the idle time should have
1580  * caused the loop time to decay.
1581  */
1582 TEST(EventBaseTest, IdleTime) {
1583   EventBase eventBase;
1584   eventBase.setLoadAvgMsec(1000ms);
1585   eventBase.resetLoadAvg(5900.0);
1586   std::deque<uint64_t> timeouts0(4, 8080);
1587   timeouts0.push_front(8000);
1588   timeouts0.push_back(14000);
1589   IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1590   std::deque<uint64_t> timeouts(20, 20);
1591   std::unique_ptr<IdleTimeTimeoutSeries> tos;
1592   int64_t testStart = duration_cast<microseconds>(
1593     std::chrono::steady_clock::now().time_since_epoch()).count();
1594   bool hostOverloaded = false;
1595
1596   int latencyCallbacks = 0;
1597   eventBase.setMaxLatency(6000us, [&]() {
1598     ++latencyCallbacks;
1599     if (latencyCallbacks != 1) {
1600       FAIL() << "Unexpected latency callback";
1601     }
1602
1603     if (tos0.getTimeouts() < 6) {
1604       // This could only happen if the host this test is running
1605       // on is heavily loaded.
1606       int64_t maxLatencyReached = duration_cast<microseconds>(
1607           std::chrono::steady_clock::now().time_since_epoch()).count();
1608       ASSERT_LE(43800, maxLatencyReached - testStart);
1609       hostOverloaded = true;
1610       return;
1611     }
1612     ASSERT_EQ(6, tos0.getTimeouts());
1613     ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1614     ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1615     tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1616   });
1617
1618   // Kick things off with an "immedite" timeout
1619   tos0.scheduleTimeout(1);
1620
1621   eventBase.loop();
1622
1623   if (hostOverloaded) {
1624     return;
1625   }
1626
1627   ASSERT_EQ(1, latencyCallbacks);
1628   ASSERT_EQ(7, tos0.getTimeouts());
1629   ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1630   ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1631   ASSERT_TRUE(!!tos);
1632   ASSERT_EQ(21, tos->getTimeouts());
1633 }
1634
1635 /**
1636  * Test that thisLoop functionality works with terminateLoopSoon
1637  */
1638 TEST(EventBaseTest, ThisLoop) {
1639   EventBase eb;
1640   bool runInLoop = false;
1641   bool runThisLoop = false;
1642
1643   eb.runInLoop([&](){
1644       eb.terminateLoopSoon();
1645       eb.runInLoop([&]() {
1646           runInLoop = true;
1647         });
1648       eb.runInLoop([&]() {
1649           runThisLoop = true;
1650         }, true);
1651     }, true);
1652   eb.loopForever();
1653
1654   // Should not work
1655   ASSERT_FALSE(runInLoop);
1656   // Should work with thisLoop
1657   ASSERT_TRUE(runThisLoop);
1658 }
1659
1660 TEST(EventBaseTest, EventBaseThreadLoop) {
1661   EventBase base;
1662   bool ran = false;
1663
1664   base.runInEventBaseThread([&](){
1665     ran = true;
1666   });
1667   base.loop();
1668
1669   ASSERT_TRUE(ran);
1670 }
1671
1672 TEST(EventBaseTest, EventBaseThreadName) {
1673   EventBase base;
1674   base.setName("foo");
1675   base.loop();
1676
1677 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1678   char name[16];
1679   pthread_getname_np(pthread_self(), name, 16);
1680   ASSERT_EQ(0, strcmp("foo", name));
1681 #endif
1682 }
1683
1684 TEST(EventBaseTest, RunBeforeLoop) {
1685   EventBase base;
1686   CountedLoopCallback cb(&base, 1, [&](){
1687     base.terminateLoopSoon();
1688   });
1689   base.runBeforeLoop(&cb);
1690   base.loopForever();
1691   ASSERT_EQ(cb.getCount(), 0);
1692 }
1693
1694 TEST(EventBaseTest, RunBeforeLoopWait) {
1695   EventBase base;
1696   CountedLoopCallback cb(&base, 1);
1697   base.tryRunAfterDelay([&](){
1698       base.terminateLoopSoon();
1699     }, 500);
1700   base.runBeforeLoop(&cb);
1701   base.loopForever();
1702
1703   // Check that we only ran once, and did not loop multiple times.
1704   ASSERT_EQ(cb.getCount(), 0);
1705 }
1706
1707 class PipeHandler : public EventHandler {
1708 public:
1709   PipeHandler(EventBase* eventBase, int fd)
1710     : EventHandler(eventBase, fd) {}
1711
1712   void handlerReady(uint16_t /* events */) noexcept override { abort(); }
1713 };
1714
1715 TEST(EventBaseTest, StopBeforeLoop) {
1716   EventBase evb;
1717
1718   // Give the evb something to do.
1719   int p[2];
1720   ASSERT_EQ(0, pipe(p));
1721   PipeHandler handler(&evb, p[0]);
1722   handler.registerHandler(EventHandler::READ);
1723
1724   // It's definitely not running yet
1725   evb.terminateLoopSoon();
1726
1727   // let it run, it should exit quickly.
1728   std::thread t([&] { evb.loop(); });
1729   t.join();
1730
1731   handler.unregisterHandler();
1732   close(p[0]);
1733   close(p[1]);
1734
1735   SUCCEED();
1736 }
1737
1738 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1739   bool ran = false;
1740
1741   {
1742     EventBase base;
1743     base.runInEventBaseThread([&](){
1744       ran = true;
1745     });
1746   }
1747
1748   ASSERT_TRUE(ran);
1749 }
1750
1751 TEST(EventBaseTest, LoopKeepAlive) {
1752   EventBase evb;
1753
1754   bool done = false;
1755   std::thread t([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
1756     /* sleep override */ std::this_thread::sleep_for(
1757         std::chrono::milliseconds(100));
1758     evb.runInEventBaseThread(
1759         [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1760   });
1761
1762   evb.loop();
1763
1764   ASSERT_TRUE(done);
1765
1766   t.join();
1767 }
1768
1769 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1770   EventBase evb;
1771
1772   bool done = false;
1773   std::thread t;
1774
1775   evb.runInEventBaseThread([&] {
1776     t = std::thread([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
1777       /* sleep override */ std::this_thread::sleep_for(
1778           std::chrono::milliseconds(100));
1779       evb.runInEventBaseThread(
1780           [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1781     });
1782   });
1783
1784   evb.loop();
1785
1786   ASSERT_TRUE(done);
1787
1788   t.join();
1789 }
1790
1791 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1792   std::unique_ptr<EventBase> evb = std::make_unique<EventBase>();
1793
1794   bool done = false;
1795
1796   std::thread evThread([&] {
1797     evb->loopForever();
1798     evb.reset();
1799     done = true;
1800   });
1801
1802   {
1803     auto* ev = evb.get();
1804     Executor::KeepAlive keepAlive;
1805     ev->runInEventBaseThreadAndWait(
1806         [&ev, &keepAlive] { keepAlive = ev->getKeepAliveToken(); });
1807     ASSERT_FALSE(done) << "Loop finished before we asked it to";
1808     ev->terminateLoopSoon();
1809     /* sleep override */
1810     std::this_thread::sleep_for(std::chrono::milliseconds(30));
1811     ASSERT_FALSE(done) << "Loop terminated early";
1812     ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{});
1813   }
1814
1815   evThread.join();
1816   ASSERT_TRUE(done);
1817 }
1818
1819 TEST(EventBaseTest, LoopKeepAliveShutdown) {
1820   auto evb = std::make_unique<EventBase>();
1821
1822   bool done = false;
1823
1824   std::thread t([
1825     &done,
1826     loopKeepAlive = evb->getKeepAliveToken(),
1827     evbPtr = evb.get()
1828   ]() mutable {
1829     /* sleep override */ std::this_thread::sleep_for(
1830         std::chrono::milliseconds(100));
1831     evbPtr->runInEventBaseThread(
1832         [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1833   });
1834
1835   evb.reset();
1836
1837   ASSERT_TRUE(done);
1838
1839   t.join();
1840 }
1841
1842 TEST(EventBaseTest, LoopKeepAliveAtomic) {
1843   auto evb = std::make_unique<EventBase>();
1844
1845   static constexpr size_t kNumThreads = 100;
1846   static constexpr size_t kNumTasks = 100;
1847
1848   std::vector<std::thread> ts;
1849   std::vector<std::unique_ptr<Baton<>>> batons;
1850   size_t done{0};
1851
1852   for (size_t i = 0; i < kNumThreads; ++i) {
1853     batons.emplace_back(std::make_unique<Baton<>>());
1854   }
1855
1856   for (size_t i = 0; i < kNumThreads; ++i) {
1857     ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
1858       std::vector<Executor::KeepAlive> keepAlives;
1859       for (size_t j = 0; j < kNumTasks; ++j) {
1860         keepAlives.emplace_back(evbPtr->getKeepAliveToken());
1861       }
1862
1863       batonPtr->post();
1864
1865       /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
1866
1867       for (auto& keepAlive : keepAlives) {
1868         evbPtr->runInEventBaseThread(
1869             [&done, keepAlive = std::move(keepAlive) ]() { ++done; });
1870       }
1871     });
1872   }
1873
1874   for (auto& baton : batons) {
1875     baton->wait();
1876   }
1877
1878   evb.reset();
1879
1880   EXPECT_EQ(kNumThreads * kNumTasks, done);
1881
1882   for (auto& t : ts) {
1883     t.join();
1884   }
1885 }
1886
1887 TEST(EventBaseTest, DrivableExecutorTest) {
1888   folly::Promise<bool> p;
1889   auto f = p.getFuture();
1890   EventBase base;
1891   bool finished = false;
1892
1893   std::thread t([&] {
1894     /* sleep override */
1895     std::this_thread::sleep_for(std::chrono::microseconds(10));
1896     finished = true;
1897     base.runInEventBaseThread([&]() { p.setValue(true); });
1898   });
1899
1900   // Ensure drive does not busy wait
1901   base.drive(); // TODO: fix notification queue init() extra wakeup
1902   base.drive();
1903   EXPECT_TRUE(finished);
1904
1905   folly::Promise<bool> p2;
1906   auto f2 = p2.getFuture();
1907   // Ensure waitVia gets woken up properly, even from
1908   // a separate thread.
1909   base.runAfterDelay([&]() { p2.setValue(true); }, 10);
1910   f2.waitVia(&base);
1911   EXPECT_TRUE(f2.isReady());
1912
1913   t.join();
1914 }
1915
1916 TEST(EventBaseTest, RequestContextTest) {
1917   EventBase evb;
1918   auto defaultCtx = RequestContext::get();
1919
1920   {
1921     RequestContextScopeGuard rctx;
1922     auto context = RequestContext::get();
1923     EXPECT_NE(defaultCtx, context);
1924     evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
1925   }
1926
1927   EXPECT_EQ(defaultCtx, RequestContext::get());
1928   evb.loop();
1929   EXPECT_EQ(defaultCtx, RequestContext::get());
1930 }