25b3192c1280751b497b8138b13d9ebadb58be58
[folly.git] / folly / io / async / test / EventBaseTest.cpp
1 /*
2  * Copyright 2004-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/Memory.h>
18 #include <folly/ScopeGuard.h>
19
20 #include <folly/io/async/AsyncTimeout.h>
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/EventHandler.h>
23 #include <folly/io/async/test/SocketPair.h>
24 #include <folly/io/async/test/Util.h>
25 #include <folly/portability/Unistd.h>
26
27 #include <folly/futures/Promise.h>
28
29 #include <atomic>
30 #include <iostream>
31 #include <memory>
32 #include <thread>
33
34 using std::atomic;
35 using std::deque;
36 using std::pair;
37 using std::vector;
38 using std::unique_ptr;
39 using std::thread;
40 using std::make_pair;
41 using std::cerr;
42 using std::endl;
43 using std::chrono::milliseconds;
44 using std::chrono::microseconds;
45 using std::chrono::duration_cast;
46
47 using namespace std::chrono_literals;
48
49 using namespace folly;
50
51 ///////////////////////////////////////////////////////////////////////////
52 // Tests for read and write events
53 ///////////////////////////////////////////////////////////////////////////
54
55 enum { BUF_SIZE = 4096 };
56
57 ssize_t writeToFD(int fd, size_t length) {
58   // write an arbitrary amount of data to the fd
59   auto bufv = vector<char>(length);
60   auto buf = bufv.data();
61   memset(buf, 'a', length);
62   ssize_t rc = write(fd, buf, length);
63   CHECK_EQ(rc, length);
64   return rc;
65 }
66
67 size_t writeUntilFull(int fd) {
68   // Write to the fd until EAGAIN is returned
69   size_t bytesWritten = 0;
70   char buf[BUF_SIZE];
71   memset(buf, 'a', sizeof(buf));
72   while (true) {
73     ssize_t rc = write(fd, buf, sizeof(buf));
74     if (rc < 0) {
75       CHECK_EQ(errno, EAGAIN);
76       break;
77     } else {
78       bytesWritten += rc;
79     }
80   }
81   return bytesWritten;
82 }
83
84 ssize_t readFromFD(int fd, size_t length) {
85   // write an arbitrary amount of data to the fd
86   auto buf = vector<char>(length);
87   return read(fd, buf.data(), length);
88 }
89
90 size_t readUntilEmpty(int fd) {
91   // Read from the fd until EAGAIN is returned
92   char buf[BUF_SIZE];
93   size_t bytesRead = 0;
94   while (true) {
95     int rc = read(fd, buf, sizeof(buf));
96     if (rc == 0) {
97       CHECK(false) << "unexpected EOF";
98     } else if (rc < 0) {
99       CHECK_EQ(errno, EAGAIN);
100       break;
101     } else {
102       bytesRead += rc;
103     }
104   }
105   return bytesRead;
106 }
107
108 void checkReadUntilEmpty(int fd, size_t expectedLength) {
109   ASSERT_EQ(readUntilEmpty(fd), expectedLength);
110 }
111
112 struct ScheduledEvent {
113   int milliseconds;
114   uint16_t events;
115   size_t length;
116   ssize_t result;
117
118   void perform(int fd) {
119     if (events & EventHandler::READ) {
120       if (length == 0) {
121         result = readUntilEmpty(fd);
122       } else {
123         result = readFromFD(fd, length);
124       }
125     }
126     if (events & EventHandler::WRITE) {
127       if (length == 0) {
128         result = writeUntilFull(fd);
129       } else {
130         result = writeToFD(fd, length);
131       }
132     }
133   }
134 };
135
136 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
137   for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
138     eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
139                              ev->milliseconds);
140   }
141 }
142
143 class TestHandler : public EventHandler {
144  public:
145   TestHandler(EventBase* eventBase, int fd)
146     : EventHandler(eventBase, fd), fd_(fd) {}
147
148   void handlerReady(uint16_t events) noexcept override {
149     ssize_t bytesRead = 0;
150     ssize_t bytesWritten = 0;
151     if (events & READ) {
152       // Read all available data, so EventBase will stop calling us
153       // until new data becomes available
154       bytesRead = readUntilEmpty(fd_);
155     }
156     if (events & WRITE) {
157       // Write until the pipe buffer is full, so EventBase will stop calling
158       // us until the other end has read some data
159       bytesWritten = writeUntilFull(fd_);
160     }
161
162     log.emplace_back(events, bytesRead, bytesWritten);
163   }
164
165   struct EventRecord {
166     EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
167       : events(events)
168       , timestamp()
169       , bytesRead(bytesRead)
170       , bytesWritten(bytesWritten) {}
171
172     uint16_t events;
173     TimePoint timestamp;
174     ssize_t bytesRead;
175     ssize_t bytesWritten;
176   };
177
178   deque<EventRecord> log;
179
180  private:
181   int fd_;
182 };
183
184 /**
185  * Test a READ event
186  */
187 TEST(EventBaseTest, ReadEvent) {
188   EventBase eb;
189   SocketPair sp;
190
191   // Register for read events
192   TestHandler handler(&eb, sp[0]);
193   handler.registerHandler(EventHandler::READ);
194
195   // Register timeouts to perform two write events
196   ScheduledEvent events[] = {
197     { 10, EventHandler::WRITE, 2345, 0 },
198     { 160, EventHandler::WRITE, 99, 0 },
199     { 0, 0, 0, 0 },
200   };
201   scheduleEvents(&eb, sp[1], events);
202
203   // Loop
204   TimePoint start;
205   eb.loop();
206   TimePoint end;
207
208   // Since we didn't use the EventHandler::PERSIST flag, the handler should
209   // have received the first read, then unregistered itself.  Check that only
210   // the first chunk of data was received.
211   ASSERT_EQ(handler.log.size(), 1);
212   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
213   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
214                   milliseconds(events[0].milliseconds), milliseconds(90));
215   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
216   ASSERT_EQ(handler.log[0].bytesWritten, 0);
217   T_CHECK_TIMEOUT(start, end,
218                   milliseconds(events[1].milliseconds), milliseconds(30));
219
220   // Make sure the second chunk of data is still waiting to be read.
221   size_t bytesRemaining = readUntilEmpty(sp[0]);
222   ASSERT_EQ(bytesRemaining, events[1].length);
223 }
224
225 /**
226  * Test (READ | PERSIST)
227  */
228 TEST(EventBaseTest, ReadPersist) {
229   EventBase eb;
230   SocketPair sp;
231
232   // Register for read events
233   TestHandler handler(&eb, sp[0]);
234   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
235
236   // Register several timeouts to perform writes
237   ScheduledEvent events[] = {
238     { 10,  EventHandler::WRITE, 1024, 0 },
239     { 20,  EventHandler::WRITE, 2211, 0 },
240     { 30,  EventHandler::WRITE, 4096, 0 },
241     { 100, EventHandler::WRITE, 100,  0 },
242     { 0, 0, 0, 0 },
243   };
244   scheduleEvents(&eb, sp[1], events);
245
246   // Schedule a timeout to unregister the handler after the third write
247   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
248
249   // Loop
250   TimePoint start;
251   eb.loop();
252   TimePoint end;
253
254   // The handler should have received the first 3 events,
255   // then been unregistered after that.
256   ASSERT_EQ(handler.log.size(), 3);
257   for (int n = 0; n < 3; ++n) {
258     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
259     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
260                     milliseconds(events[n].milliseconds));
261     ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
262     ASSERT_EQ(handler.log[n].bytesWritten, 0);
263   }
264   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
265
266   // Make sure the data from the last write is still waiting to be read
267   size_t bytesRemaining = readUntilEmpty(sp[0]);
268   ASSERT_EQ(bytesRemaining, events[3].length);
269 }
270
271 /**
272  * Test registering for READ when the socket is immediately readable
273  */
274 TEST(EventBaseTest, ReadImmediate) {
275   EventBase eb;
276   SocketPair sp;
277
278   // Write some data to the socket so the other end will
279   // be immediately readable
280   size_t dataLength = 1234;
281   writeToFD(sp[1], dataLength);
282
283   // Register for read events
284   TestHandler handler(&eb, sp[0]);
285   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
286
287   // Register a timeout to perform another write
288   ScheduledEvent events[] = {
289     { 10, EventHandler::WRITE, 2345, 0 },
290     { 0, 0, 0, 0 },
291   };
292   scheduleEvents(&eb, sp[1], events);
293
294   // Schedule a timeout to unregister the handler
295   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
296
297   // Loop
298   TimePoint start;
299   eb.loop();
300   TimePoint end;
301
302   ASSERT_EQ(handler.log.size(), 2);
303
304   // There should have been 1 event for immediate readability
305   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
306   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
307   ASSERT_EQ(handler.log[0].bytesRead, dataLength);
308   ASSERT_EQ(handler.log[0].bytesWritten, 0);
309
310   // There should be another event after the timeout wrote more data
311   ASSERT_EQ(handler.log[1].events, EventHandler::READ);
312   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
313                   milliseconds(events[0].milliseconds));
314   ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
315   ASSERT_EQ(handler.log[1].bytesWritten, 0);
316
317   T_CHECK_TIMEOUT(start, end, milliseconds(20));
318 }
319
320 /**
321  * Test a WRITE event
322  */
323 TEST(EventBaseTest, WriteEvent) {
324   EventBase eb;
325   SocketPair sp;
326
327   // Fill up the write buffer before starting
328   size_t initialBytesWritten = writeUntilFull(sp[0]);
329
330   // Register for write events
331   TestHandler handler(&eb, sp[0]);
332   handler.registerHandler(EventHandler::WRITE);
333
334   // Register timeouts to perform two reads
335   ScheduledEvent events[] = {
336     { 10, EventHandler::READ, 0, 0 },
337     { 60, EventHandler::READ, 0, 0 },
338     { 0, 0, 0, 0 },
339   };
340   scheduleEvents(&eb, sp[1], events);
341
342   // Loop
343   TimePoint start;
344   eb.loop();
345   TimePoint end;
346
347   // Since we didn't use the EventHandler::PERSIST flag, the handler should
348   // have only been able to write once, then unregistered itself.
349   ASSERT_EQ(handler.log.size(), 1);
350   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
351   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
352                   milliseconds(events[0].milliseconds));
353   ASSERT_EQ(handler.log[0].bytesRead, 0);
354   ASSERT_GT(handler.log[0].bytesWritten, 0);
355   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
356
357   ASSERT_EQ(events[0].result, initialBytesWritten);
358   ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
359 }
360
361 /**
362  * Test (WRITE | PERSIST)
363  */
364 TEST(EventBaseTest, WritePersist) {
365   EventBase eb;
366   SocketPair sp;
367
368   // Fill up the write buffer before starting
369   size_t initialBytesWritten = writeUntilFull(sp[0]);
370
371   // Register for write events
372   TestHandler handler(&eb, sp[0]);
373   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
374
375   // Register several timeouts to read from the socket at several intervals
376   ScheduledEvent events[] = {
377     { 10,  EventHandler::READ, 0, 0 },
378     { 40,  EventHandler::READ, 0, 0 },
379     { 70,  EventHandler::READ, 0, 0 },
380     { 100, EventHandler::READ, 0, 0 },
381     { 0, 0, 0, 0 },
382   };
383   scheduleEvents(&eb, sp[1], events);
384
385   // Schedule a timeout to unregister the handler after the third read
386   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
387
388   // Loop
389   TimePoint start;
390   eb.loop();
391   TimePoint end;
392
393   // The handler should have received the first 3 events,
394   // then been unregistered after that.
395   ASSERT_EQ(handler.log.size(), 3);
396   ASSERT_EQ(events[0].result, initialBytesWritten);
397   for (int n = 0; n < 3; ++n) {
398     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
399     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
400                     milliseconds(events[n].milliseconds));
401     ASSERT_EQ(handler.log[n].bytesRead, 0);
402     ASSERT_GT(handler.log[n].bytesWritten, 0);
403     ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
404   }
405   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
406 }
407
408 /**
409  * Test registering for WRITE when the socket is immediately writable
410  */
411 TEST(EventBaseTest, WriteImmediate) {
412   EventBase eb;
413   SocketPair sp;
414
415   // Register for write events
416   TestHandler handler(&eb, sp[0]);
417   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
418
419   // Register a timeout to perform a read
420   ScheduledEvent events[] = {
421     { 10, EventHandler::READ, 0, 0 },
422     { 0, 0, 0, 0 },
423   };
424   scheduleEvents(&eb, sp[1], events);
425
426   // Schedule a timeout to unregister the handler
427   int64_t unregisterTimeout = 40;
428   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
429                    unregisterTimeout);
430
431   // Loop
432   TimePoint start;
433   eb.loop();
434   TimePoint end;
435
436   ASSERT_EQ(handler.log.size(), 2);
437
438   // Since the socket buffer was initially empty,
439   // there should have been 1 event for immediate writability
440   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
441   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
442   ASSERT_EQ(handler.log[0].bytesRead, 0);
443   ASSERT_GT(handler.log[0].bytesWritten, 0);
444
445   // There should be another event after the timeout wrote more data
446   ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
447   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
448                   milliseconds(events[0].milliseconds));
449   ASSERT_EQ(handler.log[1].bytesRead, 0);
450   ASSERT_GT(handler.log[1].bytesWritten, 0);
451
452   T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
453 }
454
455 /**
456  * Test (READ | WRITE) when the socket becomes readable first
457  */
458 TEST(EventBaseTest, ReadWrite) {
459   EventBase eb;
460   SocketPair sp;
461
462   // Fill up the write buffer before starting
463   size_t sock0WriteLength = writeUntilFull(sp[0]);
464
465   // Register for read and write events
466   TestHandler handler(&eb, sp[0]);
467   handler.registerHandler(EventHandler::READ_WRITE);
468
469   // Register timeouts to perform a write then a read.
470   ScheduledEvent events[] = {
471     { 10, EventHandler::WRITE, 2345, 0 },
472     { 40, EventHandler::READ, 0, 0 },
473     { 0, 0, 0, 0 },
474   };
475   scheduleEvents(&eb, sp[1], events);
476
477   // Loop
478   TimePoint start;
479   eb.loop();
480   TimePoint end;
481
482   // Since we didn't use the EventHandler::PERSIST flag, the handler should
483   // have only noticed readability, then unregistered itself.  Check that only
484   // one event was logged.
485   ASSERT_EQ(handler.log.size(), 1);
486   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
487   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
488                   milliseconds(events[0].milliseconds));
489   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
490   ASSERT_EQ(handler.log[0].bytesWritten, 0);
491   ASSERT_EQ(events[1].result, sock0WriteLength);
492   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
493 }
494
495 /**
496  * Test (READ | WRITE) when the socket becomes writable first
497  */
498 TEST(EventBaseTest, WriteRead) {
499   EventBase eb;
500   SocketPair sp;
501
502   // Fill up the write buffer before starting
503   size_t sock0WriteLength = writeUntilFull(sp[0]);
504
505   // Register for read and write events
506   TestHandler handler(&eb, sp[0]);
507   handler.registerHandler(EventHandler::READ_WRITE);
508
509   // Register timeouts to perform a read then a write.
510   size_t sock1WriteLength = 2345;
511   ScheduledEvent events[] = {
512     { 10, EventHandler::READ, 0, 0 },
513     { 40, EventHandler::WRITE, sock1WriteLength, 0 },
514     { 0, 0, 0, 0 },
515   };
516   scheduleEvents(&eb, sp[1], events);
517
518   // Loop
519   TimePoint start;
520   eb.loop();
521   TimePoint end;
522
523   // Since we didn't use the EventHandler::PERSIST flag, the handler should
524   // have only noticed writability, then unregistered itself.  Check that only
525   // one event was logged.
526   ASSERT_EQ(handler.log.size(), 1);
527   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
528   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
529                   milliseconds(events[0].milliseconds));
530   ASSERT_EQ(handler.log[0].bytesRead, 0);
531   ASSERT_GT(handler.log[0].bytesWritten, 0);
532   ASSERT_EQ(events[0].result, sock0WriteLength);
533   ASSERT_EQ(events[1].result, sock1WriteLength);
534   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
535
536   // Make sure the written data is still waiting to be read.
537   size_t bytesRemaining = readUntilEmpty(sp[0]);
538   ASSERT_EQ(bytesRemaining, events[1].length);
539 }
540
541 /**
542  * Test (READ | WRITE) when the socket becomes readable and writable
543  * at the same time.
544  */
545 TEST(EventBaseTest, ReadWriteSimultaneous) {
546   EventBase eb;
547   SocketPair sp;
548
549   // Fill up the write buffer before starting
550   size_t sock0WriteLength = writeUntilFull(sp[0]);
551
552   // Register for read and write events
553   TestHandler handler(&eb, sp[0]);
554   handler.registerHandler(EventHandler::READ_WRITE);
555
556   // Register a timeout to perform a read and write together
557   ScheduledEvent events[] = {
558     { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
559     { 0, 0, 0, 0 },
560   };
561   scheduleEvents(&eb, sp[1], events);
562
563   // Loop
564   TimePoint start;
565   eb.loop();
566   TimePoint end;
567
568   // It's not strictly required that the EventBase register us about both
569   // events in the same call.  So, it's possible that if the EventBase
570   // implementation changes this test could start failing, and it wouldn't be
571   // considered breaking the API.  However for now it's nice to exercise this
572   // code path.
573   ASSERT_EQ(handler.log.size(), 1);
574   ASSERT_EQ(handler.log[0].events,
575                     EventHandler::READ | EventHandler::WRITE);
576   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
577                   milliseconds(events[0].milliseconds));
578   ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
579   ASSERT_GT(handler.log[0].bytesWritten, 0);
580   T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
581 }
582
583 /**
584  * Test (READ | WRITE | PERSIST)
585  */
586 TEST(EventBaseTest, ReadWritePersist) {
587   EventBase eb;
588   SocketPair sp;
589
590   // Register for read and write events
591   TestHandler handler(&eb, sp[0]);
592   handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
593                           EventHandler::PERSIST);
594
595   // Register timeouts to perform several reads and writes
596   ScheduledEvent events[] = {
597     { 10, EventHandler::WRITE, 2345, 0 },
598     { 20, EventHandler::READ, 0, 0 },
599     { 35, EventHandler::WRITE, 200, 0 },
600     { 45, EventHandler::WRITE, 15, 0 },
601     { 55, EventHandler::READ, 0, 0 },
602     { 120, EventHandler::WRITE, 2345, 0 },
603     { 0, 0, 0, 0 },
604   };
605   scheduleEvents(&eb, sp[1], events);
606
607   // Schedule a timeout to unregister the handler
608   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
609
610   // Loop
611   TimePoint start;
612   eb.loop();
613   TimePoint end;
614
615   ASSERT_EQ(handler.log.size(), 6);
616
617   // Since we didn't fill up the write buffer immediately, there should
618   // be an immediate event for writability.
619   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
620   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
621   ASSERT_EQ(handler.log[0].bytesRead, 0);
622   ASSERT_GT(handler.log[0].bytesWritten, 0);
623
624   // Events 1 through 5 should correspond to the scheduled events
625   for (int n = 1; n < 6; ++n) {
626     ScheduledEvent* event = &events[n - 1];
627     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
628                     milliseconds(event->milliseconds));
629     if (event->events == EventHandler::READ) {
630       ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
631       ASSERT_EQ(handler.log[n].bytesRead, 0);
632       ASSERT_GT(handler.log[n].bytesWritten, 0);
633     } else {
634       ASSERT_EQ(handler.log[n].events, EventHandler::READ);
635       ASSERT_EQ(handler.log[n].bytesRead, event->length);
636       ASSERT_EQ(handler.log[n].bytesWritten, 0);
637     }
638   }
639
640   // The timeout should have unregistered the handler before the last write.
641   // Make sure that data is still waiting to be read
642   size_t bytesRemaining = readUntilEmpty(sp[0]);
643   ASSERT_EQ(bytesRemaining, events[5].length);
644 }
645
646
647 class PartialReadHandler : public TestHandler {
648  public:
649   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
650     : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
651
652   void handlerReady(uint16_t events) noexcept override {
653     assert(events == EventHandler::READ);
654     ssize_t bytesRead = readFromFD(fd_, readLength_);
655     log.emplace_back(events, bytesRead, 0);
656   }
657
658  private:
659   int fd_;
660   size_t readLength_;
661 };
662
663 /**
664  * Test reading only part of the available data when a read event is fired.
665  * When PERSIST is used, make sure the handler gets notified again the next
666  * time around the loop.
667  */
668 TEST(EventBaseTest, ReadPartial) {
669   EventBase eb;
670   SocketPair sp;
671
672   // Register for read events
673   size_t readLength = 100;
674   PartialReadHandler handler(&eb, sp[0], readLength);
675   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
676
677   // Register a timeout to perform a single write,
678   // with more data than PartialReadHandler will read at once
679   ScheduledEvent events[] = {
680     { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
681     { 0, 0, 0, 0 },
682   };
683   scheduleEvents(&eb, sp[1], events);
684
685   // Schedule a timeout to unregister the handler
686   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
687
688   // Loop
689   TimePoint start;
690   eb.loop();
691   TimePoint end;
692
693   ASSERT_EQ(handler.log.size(), 4);
694
695   // The first 3 invocations should read readLength bytes each
696   for (int n = 0; n < 3; ++n) {
697     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
698     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
699                     milliseconds(events[0].milliseconds));
700     ASSERT_EQ(handler.log[n].bytesRead, readLength);
701     ASSERT_EQ(handler.log[n].bytesWritten, 0);
702   }
703   // The last read only has readLength/2 bytes
704   ASSERT_EQ(handler.log[3].events, EventHandler::READ);
705   T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
706                   milliseconds(events[0].milliseconds));
707   ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
708   ASSERT_EQ(handler.log[3].bytesWritten, 0);
709 }
710
711
712 class PartialWriteHandler : public TestHandler {
713  public:
714   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
715     : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
716
717   void handlerReady(uint16_t events) noexcept override {
718     assert(events == EventHandler::WRITE);
719     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
720     log.emplace_back(events, 0, bytesWritten);
721   }
722
723  private:
724   int fd_;
725   size_t writeLength_;
726 };
727
728 /**
729  * Test writing without completely filling up the write buffer when the fd
730  * becomes writable.  When PERSIST is used, make sure the handler gets
731  * notified again the next time around the loop.
732  */
733 TEST(EventBaseTest, WritePartial) {
734   EventBase eb;
735   SocketPair sp;
736
737   // Fill up the write buffer before starting
738   size_t initialBytesWritten = writeUntilFull(sp[0]);
739
740   // Register for write events
741   size_t writeLength = 100;
742   PartialWriteHandler handler(&eb, sp[0], writeLength);
743   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
744
745   // Register a timeout to read, so that more data can be written
746   ScheduledEvent events[] = {
747     { 10, EventHandler::READ, 0, 0 },
748     { 0, 0, 0, 0 },
749   };
750   scheduleEvents(&eb, sp[1], events);
751
752   // Schedule a timeout to unregister the handler
753   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
754
755   // Loop
756   TimePoint start;
757   eb.loop();
758   TimePoint end;
759
760   // Depending on how big the socket buffer is, there will be multiple writes
761   // Only check the first 5
762   int numChecked = 5;
763   ASSERT_GE(handler.log.size(), numChecked);
764   ASSERT_EQ(events[0].result, initialBytesWritten);
765
766   // The first 3 invocations should read writeLength bytes each
767   for (int n = 0; n < numChecked; ++n) {
768     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
769     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
770                     milliseconds(events[0].milliseconds));
771     ASSERT_EQ(handler.log[n].bytesRead, 0);
772     ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
773   }
774 }
775
776
777 /**
778  * Test destroying a registered EventHandler
779  */
780 TEST(EventBaseTest, DestroyHandler) {
781   class DestroyHandler : public AsyncTimeout {
782    public:
783     DestroyHandler(EventBase* eb, EventHandler* h)
784       : AsyncTimeout(eb)
785       , handler_(h) {}
786
787     void timeoutExpired() noexcept override { delete handler_; }
788
789    private:
790     EventHandler* handler_;
791   };
792
793   EventBase eb;
794   SocketPair sp;
795
796   // Fill up the write buffer before starting
797   size_t initialBytesWritten = writeUntilFull(sp[0]);
798
799   // Register for write events
800   TestHandler* handler = new TestHandler(&eb, sp[0]);
801   handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
802
803   // After 10ms, read some data, so that the handler
804   // will be notified that it can write.
805   eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
806                    10);
807
808   // Start a timer to destroy the handler after 25ms
809   // This mainly just makes sure the code doesn't break or assert
810   DestroyHandler dh(&eb, handler);
811   dh.scheduleTimeout(25);
812
813   TimePoint start;
814   eb.loop();
815   TimePoint end;
816
817   // Make sure the EventHandler was uninstalled properly when it was
818   // destroyed, and the EventBase loop exited
819   T_CHECK_TIMEOUT(start, end, milliseconds(25));
820
821   // Make sure that the handler wrote data to the socket
822   // before it was destroyed
823   size_t bytesRemaining = readUntilEmpty(sp[1]);
824   ASSERT_GT(bytesRemaining, 0);
825 }
826
827
828 ///////////////////////////////////////////////////////////////////////////
829 // Tests for timeout events
830 ///////////////////////////////////////////////////////////////////////////
831
832 TEST(EventBaseTest, RunAfterDelay) {
833   EventBase eb;
834
835   TimePoint timestamp1(false);
836   TimePoint timestamp2(false);
837   TimePoint timestamp3(false);
838   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
839   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
840   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
841
842   TimePoint start;
843   eb.loop();
844   TimePoint end;
845
846   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
847   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
848   T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
849   T_CHECK_TIMEOUT(start, end, milliseconds(40));
850 }
851
852 /**
853  * Test the behavior of tryRunAfterDelay() when some timeouts are
854  * still scheduled when the EventBase is destroyed.
855  */
856 TEST(EventBaseTest, RunAfterDelayDestruction) {
857   TimePoint timestamp1(false);
858   TimePoint timestamp2(false);
859   TimePoint timestamp3(false);
860   TimePoint timestamp4(false);
861   TimePoint start(false);
862   TimePoint end(false);
863
864   {
865     EventBase eb;
866
867     // Run two normal timeouts
868     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
869     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
870
871     // Schedule a timeout to stop the event loop after 40ms
872     eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
873
874     // Schedule 2 timeouts that would fire after the event loop stops
875     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
876     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
877
878     start.reset();
879     eb.loop();
880     end.reset();
881   }
882
883   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
884   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
885   T_CHECK_TIMEOUT(start, end, milliseconds(40));
886
887   ASSERT_TRUE(timestamp3.isUnset());
888   ASSERT_TRUE(timestamp4.isUnset());
889
890   // Ideally this test should be run under valgrind to ensure that no
891   // memory is leaked.
892 }
893
894 class TestTimeout : public AsyncTimeout {
895  public:
896   explicit TestTimeout(EventBase* eventBase)
897     : AsyncTimeout(eventBase)
898     , timestamp(false) {}
899
900   void timeoutExpired() noexcept override { timestamp.reset(); }
901
902   TimePoint timestamp;
903 };
904
905 TEST(EventBaseTest, BasicTimeouts) {
906   EventBase eb;
907
908   TestTimeout t1(&eb);
909   TestTimeout t2(&eb);
910   TestTimeout t3(&eb);
911   t1.scheduleTimeout(10);
912   t2.scheduleTimeout(20);
913   t3.scheduleTimeout(40);
914
915   TimePoint start;
916   eb.loop();
917   TimePoint end;
918
919   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
920   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
921   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
922   T_CHECK_TIMEOUT(start, end, milliseconds(40));
923 }
924
925 class ReschedulingTimeout : public AsyncTimeout {
926  public:
927   ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
928     : AsyncTimeout(evb)
929     , timeouts_(timeouts)
930     , iterator_(timeouts_.begin()) {}
931
932   void start() {
933     reschedule();
934   }
935
936   void timeoutExpired() noexcept override {
937     timestamps.emplace_back();
938     reschedule();
939   }
940
941   void reschedule() {
942     if (iterator_ != timeouts_.end()) {
943       uint32_t timeout = *iterator_;
944       ++iterator_;
945       scheduleTimeout(timeout);
946     }
947   }
948
949   vector<TimePoint> timestamps;
950
951  private:
952   vector<uint32_t> timeouts_;
953   vector<uint32_t>::const_iterator iterator_;
954 };
955
956 /**
957  * Test rescheduling the same timeout multiple times
958  */
959 TEST(EventBaseTest, ReuseTimeout) {
960   EventBase eb;
961
962   vector<uint32_t> timeouts;
963   timeouts.push_back(10);
964   timeouts.push_back(30);
965   timeouts.push_back(15);
966
967   ReschedulingTimeout t(&eb, timeouts);
968   t.start();
969
970   TimePoint start;
971   eb.loop();
972   TimePoint end;
973
974   // Use a higher tolerance than usual.  We're waiting on 3 timeouts
975   // consecutively.  In general, each timeout may go over by a few
976   // milliseconds, and we're tripling this error by witing on 3 timeouts.
977   milliseconds tolerance{6};
978
979   ASSERT_EQ(timeouts.size(), t.timestamps.size());
980   uint32_t total = 0;
981   for (size_t n = 0; n < timeouts.size(); ++n) {
982     total += timeouts[n];
983     T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
984   }
985   T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
986 }
987
988 /**
989  * Test rescheduling a timeout before it has fired
990  */
991 TEST(EventBaseTest, RescheduleTimeout) {
992   EventBase eb;
993
994   TestTimeout t1(&eb);
995   TestTimeout t2(&eb);
996   TestTimeout t3(&eb);
997
998   t1.scheduleTimeout(15);
999   t2.scheduleTimeout(30);
1000   t3.scheduleTimeout(30);
1001
1002   auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1003       &AsyncTimeout::scheduleTimeout);
1004
1005   // after 10ms, reschedule t2 to run sooner than originally scheduled
1006   eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1007   // after 10ms, reschedule t3 to run later than originally scheduled
1008   eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1009
1010   TimePoint start;
1011   eb.loop();
1012   TimePoint end;
1013
1014   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1015   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1016   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1017   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1018 }
1019
1020 /**
1021  * Test cancelling a timeout
1022  */
1023 TEST(EventBaseTest, CancelTimeout) {
1024   EventBase eb;
1025
1026   vector<uint32_t> timeouts;
1027   timeouts.push_back(10);
1028   timeouts.push_back(30);
1029   timeouts.push_back(25);
1030
1031   ReschedulingTimeout t(&eb, timeouts);
1032   t.start();
1033   eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1034
1035   TimePoint start;
1036   eb.loop();
1037   TimePoint end;
1038
1039   ASSERT_EQ(t.timestamps.size(), 2);
1040   T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1041   T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1042   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1043 }
1044
1045 /**
1046  * Test destroying a scheduled timeout object
1047  */
1048 TEST(EventBaseTest, DestroyTimeout) {
1049   class DestroyTimeout : public AsyncTimeout {
1050    public:
1051     DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1052       : AsyncTimeout(eb)
1053       , timeout_(t) {}
1054
1055     void timeoutExpired() noexcept override { delete timeout_; }
1056
1057    private:
1058     AsyncTimeout* timeout_;
1059   };
1060
1061   EventBase eb;
1062
1063   TestTimeout* t1 = new TestTimeout(&eb);
1064   t1->scheduleTimeout(30);
1065
1066   DestroyTimeout dt(&eb, t1);
1067   dt.scheduleTimeout(10);
1068
1069   TimePoint start;
1070   eb.loop();
1071   TimePoint end;
1072
1073   T_CHECK_TIMEOUT(start, end, milliseconds(10));
1074 }
1075
1076
1077 ///////////////////////////////////////////////////////////////////////////
1078 // Test for runInThreadTestFunc()
1079 ///////////////////////////////////////////////////////////////////////////
1080
1081 struct RunInThreadData {
1082   RunInThreadData(int numThreads, int opsPerThread)
1083     : opsPerThread(opsPerThread)
1084     , opsToGo(numThreads*opsPerThread) {}
1085
1086   EventBase evb;
1087   deque< pair<int, int> > values;
1088
1089   int opsPerThread;
1090   int opsToGo;
1091 };
1092
1093 struct RunInThreadArg {
1094   RunInThreadArg(RunInThreadData* data,
1095                  int threadId,
1096                  int value)
1097     : data(data)
1098     , thread(threadId)
1099     , value(value) {}
1100
1101   RunInThreadData* data;
1102   int thread;
1103   int value;
1104 };
1105
1106 void runInThreadTestFunc(RunInThreadArg* arg) {
1107   arg->data->values.emplace_back(arg->thread, arg->value);
1108   RunInThreadData* data = arg->data;
1109   delete arg;
1110
1111   if(--data->opsToGo == 0) {
1112     // Break out of the event base loop if we are the last thread running
1113     data->evb.terminateLoopSoon();
1114   }
1115 }
1116
1117 TEST(EventBaseTest, RunInThread) {
1118   constexpr uint32_t numThreads = 50;
1119   constexpr uint32_t opsPerThread = 100;
1120   RunInThreadData data(numThreads, opsPerThread);
1121
1122   deque<std::thread> threads;
1123   SCOPE_EXIT {
1124     // Wait on all of the threads.
1125     for (auto& thread : threads) {
1126       thread.join();
1127     }
1128   };
1129
1130   for (uint32_t i = 0; i < numThreads; ++i) {
1131     threads.emplace_back([i, &data] {
1132         for (int n = 0; n < data.opsPerThread; ++n) {
1133           RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1134           data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1135           usleep(10);
1136         }
1137       });
1138   }
1139
1140   // Add a timeout event to run after 3 seconds.
1141   // Otherwise loop() will return immediately since there are no events to run.
1142   // Once the last thread exits, it will stop the loop().  However, this
1143   // timeout also stops the loop in case there is a bug performing the normal
1144   // stop.
1145   data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1146                          3000);
1147
1148   TimePoint start;
1149   data.evb.loop();
1150   TimePoint end;
1151
1152   // Verify that the loop exited because all threads finished and requested it
1153   // to stop.  This should happen much sooner than the 3 second timeout.
1154   // Assert that it happens in under a second.  (This is still tons of extra
1155   // padding.)
1156
1157   auto timeTaken = std::chrono::duration_cast<milliseconds>(
1158     end.getTime() - start.getTime());
1159   ASSERT_LT(timeTaken.count(), 1000);
1160   VLOG(11) << "Time taken: " << timeTaken.count();
1161
1162   // Verify that we have all of the events from every thread
1163   int expectedValues[numThreads];
1164   for (uint32_t n = 0; n < numThreads; ++n) {
1165     expectedValues[n] = 0;
1166   }
1167   for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1168        it != data.values.end();
1169        ++it) {
1170     int threadID = it->first;
1171     int value = it->second;
1172     ASSERT_EQ(expectedValues[threadID], value);
1173     ++expectedValues[threadID];
1174   }
1175   for (uint32_t n = 0; n < numThreads; ++n) {
1176     ASSERT_EQ(expectedValues[n], opsPerThread);
1177   }
1178 }
1179
1180 //  This test simulates some calls, and verifies that the waiting happens by
1181 //  triggering what otherwise would be race conditions, and trying to detect
1182 //  whether any of the race conditions happened.
1183 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1184   const size_t c = 256;
1185   vector<unique_ptr<atomic<size_t>>> atoms(c);
1186   for (size_t i = 0; i < c; ++i) {
1187     auto& atom = atoms.at(i);
1188     atom = std::make_unique<atomic<size_t>>(0);
1189   }
1190   vector<thread> threads;
1191   for (size_t i = 0; i < c; ++i) {
1192     threads.emplace_back([&atoms, i] {
1193       EventBase eb;
1194       auto& atom = *atoms.at(i);
1195       auto ebth = thread([&] { eb.loopForever(); });
1196       eb.waitUntilRunning();
1197       eb.runInEventBaseThreadAndWait([&] {
1198         size_t x = 0;
1199         atom.compare_exchange_weak(
1200             x, 1, std::memory_order_release, std::memory_order_relaxed);
1201       });
1202       size_t x = 0;
1203       atom.compare_exchange_weak(
1204           x, 2, std::memory_order_release, std::memory_order_relaxed);
1205       eb.terminateLoopSoon();
1206       ebth.join();
1207     });
1208   }
1209   for (size_t i = 0; i < c; ++i) {
1210     auto& th = threads.at(i);
1211     th.join();
1212   }
1213   size_t sum = 0;
1214   for (auto& atom : atoms) {
1215     sum += *atom;
1216   }
1217   EXPECT_EQ(c, sum);
1218 }
1219
1220 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1221   EventBase eb;
1222   thread th(&EventBase::loopForever, &eb);
1223   SCOPE_EXIT {
1224     eb.terminateLoopSoon();
1225     th.join();
1226   };
1227   auto mutated = false;
1228   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1229       mutated = true;
1230   });
1231   EXPECT_TRUE(mutated);
1232 }
1233
1234 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1235   EventBase eb;
1236   thread th(&EventBase::loopForever, &eb);
1237   SCOPE_EXIT {
1238     eb.terminateLoopSoon();
1239     th.join();
1240   };
1241   eb.runInEventBaseThreadAndWait([&] {
1242       auto mutated = false;
1243       eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1244           mutated = true;
1245       });
1246       EXPECT_TRUE(mutated);
1247   });
1248 }
1249
1250 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1251   EventBase eb;
1252   auto mutated = false;
1253   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1254       mutated = true;
1255     });
1256   EXPECT_TRUE(mutated);
1257 }
1258
1259 ///////////////////////////////////////////////////////////////////////////
1260 // Tests for runInLoop()
1261 ///////////////////////////////////////////////////////////////////////////
1262
1263 class CountedLoopCallback : public EventBase::LoopCallback {
1264  public:
1265   CountedLoopCallback(EventBase* eventBase,
1266                       unsigned int count,
1267                       std::function<void()> action =
1268                         std::function<void()>())
1269     : eventBase_(eventBase)
1270     , count_(count)
1271     , action_(action) {}
1272
1273   void runLoopCallback() noexcept override {
1274     --count_;
1275     if (count_ > 0) {
1276       eventBase_->runInLoop(this);
1277     } else if (action_) {
1278       action_();
1279     }
1280   }
1281
1282   unsigned int getCount() const {
1283     return count_;
1284   }
1285
1286  private:
1287   EventBase* eventBase_;
1288   unsigned int count_;
1289   std::function<void()> action_;
1290 };
1291
1292 // Test that EventBase::loop() doesn't exit while there are
1293 // still LoopCallbacks remaining to be invoked.
1294 TEST(EventBaseTest, RepeatedRunInLoop) {
1295   EventBase eventBase;
1296
1297   CountedLoopCallback c(&eventBase, 10);
1298   eventBase.runInLoop(&c);
1299   // The callback shouldn't have run immediately
1300   ASSERT_EQ(c.getCount(), 10);
1301   eventBase.loop();
1302
1303   // loop() should loop until the CountedLoopCallback stops
1304   // re-installing itself.
1305   ASSERT_EQ(c.getCount(), 0);
1306 }
1307
1308 // Test that EventBase::loop() works as expected without time measurements.
1309 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1310   EventBase eventBase(false);
1311
1312   CountedLoopCallback c(&eventBase, 10);
1313   eventBase.runInLoop(&c);
1314   // The callback shouldn't have run immediately
1315   ASSERT_EQ(c.getCount(), 10);
1316   eventBase.loop();
1317
1318   // loop() should loop until the CountedLoopCallback stops
1319   // re-installing itself.
1320   ASSERT_EQ(c.getCount(), 0);
1321 }
1322
1323 // Test runInLoop() calls with terminateLoopSoon()
1324 TEST(EventBaseTest, RunInLoopStopLoop) {
1325   EventBase eventBase;
1326
1327   CountedLoopCallback c1(&eventBase, 20);
1328   CountedLoopCallback c2(&eventBase, 10,
1329                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1330
1331   eventBase.runInLoop(&c1);
1332   eventBase.runInLoop(&c2);
1333   ASSERT_EQ(c1.getCount(), 20);
1334   ASSERT_EQ(c2.getCount(), 10);
1335
1336   eventBase.loopForever();
1337
1338   // c2 should have stopped the loop after 10 iterations
1339   ASSERT_EQ(c2.getCount(), 0);
1340
1341   // We allow the EventBase to run the loop callbacks in whatever order it
1342   // chooses.  We'll accept c1's count being either 10 (if the loop terminated
1343   // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1344   // before c1 ran).
1345   //
1346   // (With the current code, c1 will always run 10 times, but we don't consider
1347   // this a hard API requirement.)
1348   ASSERT_GE(c1.getCount(), 10);
1349   ASSERT_LE(c1.getCount(), 11);
1350 }
1351
1352 TEST(EventBaseTest, messageAvailableException) {
1353   auto deadManWalking = [] {
1354     EventBase eventBase;
1355     std::thread t([&] {
1356       // Call this from another thread to force use of NotificationQueue in
1357       // runInEventBaseThread
1358       eventBase.runInEventBaseThread(
1359           []() { throw std::runtime_error("boom"); });
1360     });
1361     t.join();
1362     eventBase.loopForever();
1363   };
1364   EXPECT_DEATH(deadManWalking(), ".*");
1365 }
1366
1367 TEST(EventBaseTest, TryRunningAfterTerminate) {
1368   EventBase eventBase;
1369   CountedLoopCallback c1(&eventBase, 1,
1370                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1371   eventBase.runInLoop(&c1);
1372   eventBase.loopForever();
1373   bool ran = false;
1374   eventBase.runInEventBaseThread([&]() {
1375     ran = true;
1376   });
1377
1378   ASSERT_FALSE(ran);
1379 }
1380
1381 // Test cancelling runInLoop() callbacks
1382 TEST(EventBaseTest, CancelRunInLoop) {
1383   EventBase eventBase;
1384
1385   CountedLoopCallback c1(&eventBase, 20);
1386   CountedLoopCallback c2(&eventBase, 20);
1387   CountedLoopCallback c3(&eventBase, 20);
1388
1389   std::function<void()> cancelC1Action =
1390     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1391   std::function<void()> cancelC2Action =
1392     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1393
1394   CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1395   CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1396
1397   // Install cancelC1 after c1
1398   eventBase.runInLoop(&c1);
1399   eventBase.runInLoop(&cancelC1);
1400
1401   // Install cancelC2 before c2
1402   eventBase.runInLoop(&cancelC2);
1403   eventBase.runInLoop(&c2);
1404
1405   // Install c3
1406   eventBase.runInLoop(&c3);
1407
1408   ASSERT_EQ(c1.getCount(), 20);
1409   ASSERT_EQ(c2.getCount(), 20);
1410   ASSERT_EQ(c3.getCount(), 20);
1411   ASSERT_EQ(cancelC1.getCount(), 10);
1412   ASSERT_EQ(cancelC2.getCount(), 10);
1413
1414   // Run the loop
1415   eventBase.loop();
1416
1417   // cancelC1 and cancelC3 should have both fired after 10 iterations and
1418   // stopped re-installing themselves
1419   ASSERT_EQ(cancelC1.getCount(), 0);
1420   ASSERT_EQ(cancelC2.getCount(), 0);
1421   // c3 should have continued on for the full 20 iterations
1422   ASSERT_EQ(c3.getCount(), 0);
1423
1424   // c1 and c2 should have both been cancelled on the 10th iteration.
1425   //
1426   // Callbacks are always run in the order they are installed,
1427   // so c1 should have fired 10 times, and been canceled after it ran on the
1428   // 10th iteration.  c2 should have only fired 9 times, because cancelC2 will
1429   // have run before it on the 10th iteration, and cancelled it before it
1430   // fired.
1431   ASSERT_EQ(c1.getCount(), 10);
1432   ASSERT_EQ(c2.getCount(), 11);
1433 }
1434
1435 class TerminateTestCallback : public EventBase::LoopCallback,
1436                               public EventHandler {
1437  public:
1438   TerminateTestCallback(EventBase* eventBase, int fd)
1439     : EventHandler(eventBase, fd),
1440       eventBase_(eventBase),
1441       loopInvocations_(0),
1442       maxLoopInvocations_(0),
1443       eventInvocations_(0),
1444       maxEventInvocations_(0) {}
1445
1446   void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1447     loopInvocations_ = 0;
1448     maxLoopInvocations_ = maxLoopInvocations;
1449     eventInvocations_ = 0;
1450     maxEventInvocations_ = maxEventInvocations;
1451
1452     cancelLoopCallback();
1453     unregisterHandler();
1454   }
1455
1456   void handlerReady(uint16_t /* events */) noexcept override {
1457     // We didn't register with PERSIST, so we will have been automatically
1458     // unregistered already.
1459     ASSERT_FALSE(isHandlerRegistered());
1460
1461     ++eventInvocations_;
1462     if (eventInvocations_ >= maxEventInvocations_) {
1463       return;
1464     }
1465
1466     eventBase_->runInLoop(this);
1467   }
1468   void runLoopCallback() noexcept override {
1469     ++loopInvocations_;
1470     if (loopInvocations_ >= maxLoopInvocations_) {
1471       return;
1472     }
1473
1474     registerHandler(READ);
1475   }
1476
1477   uint32_t getLoopInvocations() const {
1478     return loopInvocations_;
1479   }
1480   uint32_t getEventInvocations() const {
1481     return eventInvocations_;
1482   }
1483
1484  private:
1485   EventBase* eventBase_;
1486   uint32_t loopInvocations_;
1487   uint32_t maxLoopInvocations_;
1488   uint32_t eventInvocations_;
1489   uint32_t maxEventInvocations_;
1490 };
1491
1492 /**
1493  * Test that EventBase::loop() correctly detects when there are no more events
1494  * left to run.
1495  *
1496  * This uses a single callback, which alternates registering itself as a loop
1497  * callback versus a EventHandler callback.  This exercises a regression where
1498  * EventBase::loop() incorrectly exited if there were no more fd handlers
1499  * registered, but a loop callback installed a new fd handler.
1500  */
1501 TEST(EventBaseTest, LoopTermination) {
1502   EventBase eventBase;
1503
1504   // Open a pipe and close the write end,
1505   // so the read endpoint will be readable
1506   int pipeFds[2];
1507   int rc = pipe(pipeFds);
1508   ASSERT_EQ(rc, 0);
1509   close(pipeFds[1]);
1510   TerminateTestCallback callback(&eventBase, pipeFds[0]);
1511
1512   // Test once where the callback will exit after a loop callback
1513   callback.reset(10, 100);
1514   eventBase.runInLoop(&callback);
1515   eventBase.loop();
1516   ASSERT_EQ(callback.getLoopInvocations(), 10);
1517   ASSERT_EQ(callback.getEventInvocations(), 9);
1518
1519   // Test once where the callback will exit after an fd event callback
1520   callback.reset(100, 7);
1521   eventBase.runInLoop(&callback);
1522   eventBase.loop();
1523   ASSERT_EQ(callback.getLoopInvocations(), 7);
1524   ASSERT_EQ(callback.getEventInvocations(), 7);
1525
1526   close(pipeFds[0]);
1527 }
1528
1529 ///////////////////////////////////////////////////////////////////////////
1530 // Tests for latency calculations
1531 ///////////////////////////////////////////////////////////////////////////
1532
1533 class IdleTimeTimeoutSeries : public AsyncTimeout {
1534
1535  public:
1536
1537   explicit IdleTimeTimeoutSeries(EventBase *base,
1538                                  std::deque<std::uint64_t>& timeout) :
1539     AsyncTimeout(base),
1540     timeouts_(0),
1541     timeout_(timeout) {
1542       scheduleTimeout(1);
1543     }
1544
1545     ~IdleTimeTimeoutSeries() override {}
1546
1547     void timeoutExpired() noexcept override {
1548     ++timeouts_;
1549
1550     if(timeout_.empty()){
1551       cancelTimeout();
1552     } else {
1553       uint64_t sleepTime = timeout_.front();
1554       timeout_.pop_front();
1555       if (sleepTime) {
1556         usleep(sleepTime);
1557       }
1558       scheduleTimeout(1);
1559     }
1560   }
1561
1562   int getTimeouts() const {
1563     return timeouts_;
1564   }
1565
1566  private:
1567   int timeouts_;
1568   std::deque<uint64_t>& timeout_;
1569 };
1570
1571 /**
1572  * Verify that idle time is correctly accounted for when decaying our loop
1573  * time.
1574  *
1575  * This works by creating a high loop time (via usleep), expecting a latency
1576  * callback with known value, and then scheduling a timeout for later. This
1577  * later timeout is far enough in the future that the idle time should have
1578  * caused the loop time to decay.
1579  */
1580 TEST(EventBaseTest, IdleTime) {
1581   EventBase eventBase;
1582   eventBase.setLoadAvgMsec(1000ms);
1583   eventBase.resetLoadAvg(5900.0);
1584   std::deque<uint64_t> timeouts0(4, 8080);
1585   timeouts0.push_front(8000);
1586   timeouts0.push_back(14000);
1587   IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1588   std::deque<uint64_t> timeouts(20, 20);
1589   std::unique_ptr<IdleTimeTimeoutSeries> tos;
1590   int64_t testStart = duration_cast<microseconds>(
1591     std::chrono::steady_clock::now().time_since_epoch()).count();
1592   bool hostOverloaded = false;
1593
1594   int latencyCallbacks = 0;
1595   eventBase.setMaxLatency(6000us, [&]() {
1596     ++latencyCallbacks;
1597     if (latencyCallbacks != 1) {
1598       FAIL() << "Unexpected latency callback";
1599     }
1600
1601     if (tos0.getTimeouts() < 6) {
1602       // This could only happen if the host this test is running
1603       // on is heavily loaded.
1604       int64_t maxLatencyReached = duration_cast<microseconds>(
1605           std::chrono::steady_clock::now().time_since_epoch()).count();
1606       ASSERT_LE(43800, maxLatencyReached - testStart);
1607       hostOverloaded = true;
1608       return;
1609     }
1610     ASSERT_EQ(6, tos0.getTimeouts());
1611     ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1612     ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1613     tos = std::make_unique<IdleTimeTimeoutSeries>(&eventBase, timeouts);
1614   });
1615
1616   // Kick things off with an "immedite" timeout
1617   tos0.scheduleTimeout(1);
1618
1619   eventBase.loop();
1620
1621   if (hostOverloaded) {
1622     return;
1623   }
1624
1625   ASSERT_EQ(1, latencyCallbacks);
1626   ASSERT_EQ(7, tos0.getTimeouts());
1627   ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1628   ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1629   ASSERT_TRUE(!!tos);
1630   ASSERT_EQ(21, tos->getTimeouts());
1631 }
1632
1633 /**
1634  * Test that thisLoop functionality works with terminateLoopSoon
1635  */
1636 TEST(EventBaseTest, ThisLoop) {
1637   EventBase eb;
1638   bool runInLoop = false;
1639   bool runThisLoop = false;
1640
1641   eb.runInLoop([&](){
1642       eb.terminateLoopSoon();
1643       eb.runInLoop([&]() {
1644           runInLoop = true;
1645         });
1646       eb.runInLoop([&]() {
1647           runThisLoop = true;
1648         }, true);
1649     }, true);
1650   eb.loopForever();
1651
1652   // Should not work
1653   ASSERT_FALSE(runInLoop);
1654   // Should work with thisLoop
1655   ASSERT_TRUE(runThisLoop);
1656 }
1657
1658 TEST(EventBaseTest, EventBaseThreadLoop) {
1659   EventBase base;
1660   bool ran = false;
1661
1662   base.runInEventBaseThread([&](){
1663     ran = true;
1664   });
1665   base.loop();
1666
1667   ASSERT_TRUE(ran);
1668 }
1669
1670 TEST(EventBaseTest, EventBaseThreadName) {
1671   EventBase base;
1672   base.setName("foo");
1673   base.loop();
1674
1675 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1676   char name[16];
1677   pthread_getname_np(pthread_self(), name, 16);
1678   ASSERT_EQ(0, strcmp("foo", name));
1679 #endif
1680 }
1681
1682 TEST(EventBaseTest, RunBeforeLoop) {
1683   EventBase base;
1684   CountedLoopCallback cb(&base, 1, [&](){
1685     base.terminateLoopSoon();
1686   });
1687   base.runBeforeLoop(&cb);
1688   base.loopForever();
1689   ASSERT_EQ(cb.getCount(), 0);
1690 }
1691
1692 TEST(EventBaseTest, RunBeforeLoopWait) {
1693   EventBase base;
1694   CountedLoopCallback cb(&base, 1);
1695   base.tryRunAfterDelay([&](){
1696       base.terminateLoopSoon();
1697     }, 500);
1698   base.runBeforeLoop(&cb);
1699   base.loopForever();
1700
1701   // Check that we only ran once, and did not loop multiple times.
1702   ASSERT_EQ(cb.getCount(), 0);
1703 }
1704
1705 class PipeHandler : public EventHandler {
1706  public:
1707   PipeHandler(EventBase* eventBase, int fd)
1708     : EventHandler(eventBase, fd) {}
1709
1710   void handlerReady(uint16_t /* events */) noexcept override { abort(); }
1711 };
1712
1713 TEST(EventBaseTest, StopBeforeLoop) {
1714   EventBase evb;
1715
1716   // Give the evb something to do.
1717   int p[2];
1718   ASSERT_EQ(0, pipe(p));
1719   PipeHandler handler(&evb, p[0]);
1720   handler.registerHandler(EventHandler::READ);
1721
1722   // It's definitely not running yet
1723   evb.terminateLoopSoon();
1724
1725   // let it run, it should exit quickly.
1726   std::thread t([&] { evb.loop(); });
1727   t.join();
1728
1729   handler.unregisterHandler();
1730   close(p[0]);
1731   close(p[1]);
1732
1733   SUCCEED();
1734 }
1735
1736 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1737   bool ran = false;
1738
1739   {
1740     EventBase base;
1741     base.runInEventBaseThread([&](){
1742       ran = true;
1743     });
1744   }
1745
1746   ASSERT_TRUE(ran);
1747 }
1748
1749 TEST(EventBaseTest, LoopKeepAlive) {
1750   EventBase evb;
1751
1752   bool done = false;
1753   std::thread t([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
1754     /* sleep override */ std::this_thread::sleep_for(
1755         std::chrono::milliseconds(100));
1756     evb.runInEventBaseThread(
1757         [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1758   });
1759
1760   evb.loop();
1761
1762   ASSERT_TRUE(done);
1763
1764   t.join();
1765 }
1766
1767 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1768   EventBase evb;
1769
1770   bool done = false;
1771   std::thread t;
1772
1773   evb.runInEventBaseThread([&] {
1774     t = std::thread([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
1775       /* sleep override */ std::this_thread::sleep_for(
1776           std::chrono::milliseconds(100));
1777       evb.runInEventBaseThread(
1778           [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1779     });
1780   });
1781
1782   evb.loop();
1783
1784   ASSERT_TRUE(done);
1785
1786   t.join();
1787 }
1788
1789 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1790   std::unique_ptr<EventBase> evb = std::make_unique<EventBase>();
1791
1792   bool done = false;
1793
1794   std::thread evThread([&] {
1795     evb->loopForever();
1796     evb.reset();
1797     done = true;
1798   });
1799
1800   {
1801     auto* ev = evb.get();
1802     Executor::KeepAlive keepAlive;
1803     ev->runInEventBaseThreadAndWait(
1804         [&ev, &keepAlive] { keepAlive = ev->getKeepAliveToken(); });
1805     ASSERT_FALSE(done) << "Loop finished before we asked it to";
1806     ev->terminateLoopSoon();
1807     /* sleep override */
1808     std::this_thread::sleep_for(std::chrono::milliseconds(30));
1809     ASSERT_FALSE(done) << "Loop terminated early";
1810     ev->runInEventBaseThread([keepAlive = std::move(keepAlive)]{});
1811   }
1812
1813   evThread.join();
1814   ASSERT_TRUE(done);
1815 }
1816
1817 TEST(EventBaseTest, LoopKeepAliveShutdown) {
1818   auto evb = std::make_unique<EventBase>();
1819
1820   bool done = false;
1821
1822   std::thread t([
1823     &done,
1824     loopKeepAlive = evb->getKeepAliveToken(),
1825     evbPtr = evb.get()
1826   ]() mutable {
1827     /* sleep override */ std::this_thread::sleep_for(
1828         std::chrono::milliseconds(100));
1829     evbPtr->runInEventBaseThread(
1830         [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1831   });
1832
1833   evb.reset();
1834
1835   ASSERT_TRUE(done);
1836
1837   t.join();
1838 }
1839
1840 TEST(EventBaseTest, LoopKeepAliveAtomic) {
1841   auto evb = std::make_unique<EventBase>();
1842
1843   static constexpr size_t kNumThreads = 100;
1844   static constexpr size_t kNumTasks = 100;
1845
1846   std::vector<std::thread> ts;
1847   std::vector<std::unique_ptr<Baton<>>> batons;
1848   size_t done{0};
1849
1850   for (size_t i = 0; i < kNumThreads; ++i) {
1851     batons.emplace_back(std::make_unique<Baton<>>());
1852   }
1853
1854   for (size_t i = 0; i < kNumThreads; ++i) {
1855     ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
1856       std::vector<Executor::KeepAlive> keepAlives;
1857       for (size_t j = 0; j < kNumTasks; ++j) {
1858         keepAlives.emplace_back(evbPtr->getKeepAliveToken());
1859       }
1860
1861       batonPtr->post();
1862
1863       /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
1864
1865       for (auto& keepAlive : keepAlives) {
1866         evbPtr->runInEventBaseThread(
1867             [&done, keepAlive = std::move(keepAlive) ]() { ++done; });
1868       }
1869     });
1870   }
1871
1872   for (auto& baton : batons) {
1873     baton->wait();
1874   }
1875
1876   evb.reset();
1877
1878   EXPECT_EQ(kNumThreads * kNumTasks, done);
1879
1880   for (auto& t : ts) {
1881     t.join();
1882   }
1883 }
1884
1885 TEST(EventBaseTest, DrivableExecutorTest) {
1886   folly::Promise<bool> p;
1887   auto f = p.getFuture();
1888   EventBase base;
1889   bool finished = false;
1890
1891   std::thread t([&] {
1892     /* sleep override */
1893     std::this_thread::sleep_for(std::chrono::microseconds(10));
1894     finished = true;
1895     base.runInEventBaseThread([&]() { p.setValue(true); });
1896   });
1897
1898   // Ensure drive does not busy wait
1899   base.drive(); // TODO: fix notification queue init() extra wakeup
1900   base.drive();
1901   EXPECT_TRUE(finished);
1902
1903   folly::Promise<bool> p2;
1904   auto f2 = p2.getFuture();
1905   // Ensure waitVia gets woken up properly, even from
1906   // a separate thread.
1907   base.runAfterDelay([&]() { p2.setValue(true); }, 10);
1908   f2.waitVia(&base);
1909   EXPECT_TRUE(f2.isReady());
1910
1911   t.join();
1912 }
1913
1914 TEST(EventBaseTest, RequestContextTest) {
1915   EventBase evb;
1916   auto defaultCtx = RequestContext::get();
1917
1918   {
1919     RequestContextScopeGuard rctx;
1920     auto context = RequestContext::get();
1921     EXPECT_NE(defaultCtx, context);
1922     evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
1923   }
1924
1925   EXPECT_EQ(defaultCtx, RequestContext::get());
1926   evb.loop();
1927   EXPECT_EQ(defaultCtx, RequestContext::get());
1928 }