Fix some old license headers
[folly.git] / folly / io / async / test / EventBaseTest.cpp
1 /*
2  * Copyright 2004-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/Memory.h>
18 #include <folly/ScopeGuard.h>
19
20 #include <folly/io/async/AsyncTimeout.h>
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/EventHandler.h>
23 #include <folly/io/async/test/SocketPair.h>
24 #include <folly/io/async/test/Util.h>
25 #include <folly/portability/Unistd.h>
26
27 #include <folly/futures/Promise.h>
28
29 #include <atomic>
30 #include <iostream>
31 #include <memory>
32 #include <thread>
33
34 using std::atomic;
35 using std::deque;
36 using std::pair;
37 using std::vector;
38 using std::unique_ptr;
39 using std::thread;
40 using std::make_pair;
41 using std::cerr;
42 using std::endl;
43 using std::chrono::milliseconds;
44 using std::chrono::microseconds;
45 using std::chrono::duration_cast;
46
47 using namespace std::chrono_literals;
48
49 using namespace folly;
50
51 ///////////////////////////////////////////////////////////////////////////
52 // Tests for read and write events
53 ///////////////////////////////////////////////////////////////////////////
54
55 enum { BUF_SIZE = 4096 };
56
57 ssize_t writeToFD(int fd, size_t length) {
58   // write an arbitrary amount of data to the fd
59   auto bufv = vector<char>(length);
60   auto buf = bufv.data();
61   memset(buf, 'a', length);
62   ssize_t rc = write(fd, buf, length);
63   CHECK_EQ(rc, length);
64   return rc;
65 }
66
67 size_t writeUntilFull(int fd) {
68   // Write to the fd until EAGAIN is returned
69   size_t bytesWritten = 0;
70   char buf[BUF_SIZE];
71   memset(buf, 'a', sizeof(buf));
72   while (true) {
73     ssize_t rc = write(fd, buf, sizeof(buf));
74     if (rc < 0) {
75       CHECK_EQ(errno, EAGAIN);
76       break;
77     } else {
78       bytesWritten += rc;
79     }
80   }
81   return bytesWritten;
82 }
83
84 ssize_t readFromFD(int fd, size_t length) {
85   // write an arbitrary amount of data to the fd
86   auto buf = vector<char>(length);
87   return read(fd, buf.data(), length);
88 }
89
90 size_t readUntilEmpty(int fd) {
91   // Read from the fd until EAGAIN is returned
92   char buf[BUF_SIZE];
93   size_t bytesRead = 0;
94   while (true) {
95     int rc = read(fd, buf, sizeof(buf));
96     if (rc == 0) {
97       CHECK(false) << "unexpected EOF";
98     } else if (rc < 0) {
99       CHECK_EQ(errno, EAGAIN);
100       break;
101     } else {
102       bytesRead += rc;
103     }
104   }
105   return bytesRead;
106 }
107
108 void checkReadUntilEmpty(int fd, size_t expectedLength) {
109   ASSERT_EQ(readUntilEmpty(fd), expectedLength);
110 }
111
112 struct ScheduledEvent {
113   int milliseconds;
114   uint16_t events;
115   size_t length;
116   ssize_t result;
117
118   void perform(int fd) {
119     if (events & EventHandler::READ) {
120       if (length == 0) {
121         result = readUntilEmpty(fd);
122       } else {
123         result = readFromFD(fd, length);
124       }
125     }
126     if (events & EventHandler::WRITE) {
127       if (length == 0) {
128         result = writeUntilFull(fd);
129       } else {
130         result = writeToFD(fd, length);
131       }
132     }
133   }
134 };
135
136 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
137   for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
138     eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
139                              ev->milliseconds);
140   }
141 }
142
143 class TestHandler : public EventHandler {
144  public:
145   TestHandler(EventBase* eventBase, int fd)
146     : EventHandler(eventBase, fd), fd_(fd) {}
147
148   void handlerReady(uint16_t events) noexcept override {
149     ssize_t bytesRead = 0;
150     ssize_t bytesWritten = 0;
151     if (events & READ) {
152       // Read all available data, so EventBase will stop calling us
153       // until new data becomes available
154       bytesRead = readUntilEmpty(fd_);
155     }
156     if (events & WRITE) {
157       // Write until the pipe buffer is full, so EventBase will stop calling
158       // us until the other end has read some data
159       bytesWritten = writeUntilFull(fd_);
160     }
161
162     log.emplace_back(events, bytesRead, bytesWritten);
163   }
164
165   struct EventRecord {
166     EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
167       : events(events)
168       , timestamp()
169       , bytesRead(bytesRead)
170       , bytesWritten(bytesWritten) {}
171
172     uint16_t events;
173     TimePoint timestamp;
174     ssize_t bytesRead;
175     ssize_t bytesWritten;
176   };
177
178   deque<EventRecord> log;
179
180  private:
181   int fd_;
182 };
183
184 /**
185  * Test a READ event
186  */
187 TEST(EventBaseTest, ReadEvent) {
188   EventBase eb;
189   SocketPair sp;
190
191   // Register for read events
192   TestHandler handler(&eb, sp[0]);
193   handler.registerHandler(EventHandler::READ);
194
195   // Register timeouts to perform two write events
196   ScheduledEvent events[] = {
197     { 10, EventHandler::WRITE, 2345, 0 },
198     { 160, EventHandler::WRITE, 99, 0 },
199     { 0, 0, 0, 0 },
200   };
201   scheduleEvents(&eb, sp[1], events);
202
203   // Loop
204   TimePoint start;
205   eb.loop();
206   TimePoint end;
207
208   // Since we didn't use the EventHandler::PERSIST flag, the handler should
209   // have received the first read, then unregistered itself.  Check that only
210   // the first chunk of data was received.
211   ASSERT_EQ(handler.log.size(), 1);
212   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
213   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
214                   milliseconds(events[0].milliseconds), milliseconds(90));
215   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
216   ASSERT_EQ(handler.log[0].bytesWritten, 0);
217   T_CHECK_TIMEOUT(start, end,
218                   milliseconds(events[1].milliseconds), milliseconds(30));
219
220   // Make sure the second chunk of data is still waiting to be read.
221   size_t bytesRemaining = readUntilEmpty(sp[0]);
222   ASSERT_EQ(bytesRemaining, events[1].length);
223 }
224
225 /**
226  * Test (READ | PERSIST)
227  */
228 TEST(EventBaseTest, ReadPersist) {
229   EventBase eb;
230   SocketPair sp;
231
232   // Register for read events
233   TestHandler handler(&eb, sp[0]);
234   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
235
236   // Register several timeouts to perform writes
237   ScheduledEvent events[] = {
238     { 10,  EventHandler::WRITE, 1024, 0 },
239     { 20,  EventHandler::WRITE, 2211, 0 },
240     { 30,  EventHandler::WRITE, 4096, 0 },
241     { 100, EventHandler::WRITE, 100,  0 },
242     { 0, 0, 0, 0 },
243   };
244   scheduleEvents(&eb, sp[1], events);
245
246   // Schedule a timeout to unregister the handler after the third write
247   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
248
249   // Loop
250   TimePoint start;
251   eb.loop();
252   TimePoint end;
253
254   // The handler should have received the first 3 events,
255   // then been unregistered after that.
256   ASSERT_EQ(handler.log.size(), 3);
257   for (int n = 0; n < 3; ++n) {
258     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
259     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
260                     milliseconds(events[n].milliseconds));
261     ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
262     ASSERT_EQ(handler.log[n].bytesWritten, 0);
263   }
264   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
265
266   // Make sure the data from the last write is still waiting to be read
267   size_t bytesRemaining = readUntilEmpty(sp[0]);
268   ASSERT_EQ(bytesRemaining, events[3].length);
269 }
270
271 /**
272  * Test registering for READ when the socket is immediately readable
273  */
274 TEST(EventBaseTest, ReadImmediate) {
275   EventBase eb;
276   SocketPair sp;
277
278   // Write some data to the socket so the other end will
279   // be immediately readable
280   size_t dataLength = 1234;
281   writeToFD(sp[1], dataLength);
282
283   // Register for read events
284   TestHandler handler(&eb, sp[0]);
285   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
286
287   // Register a timeout to perform another write
288   ScheduledEvent events[] = {
289     { 10, EventHandler::WRITE, 2345, 0 },
290     { 0, 0, 0, 0 },
291   };
292   scheduleEvents(&eb, sp[1], events);
293
294   // Schedule a timeout to unregister the handler
295   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
296
297   // Loop
298   TimePoint start;
299   eb.loop();
300   TimePoint end;
301
302   ASSERT_EQ(handler.log.size(), 2);
303
304   // There should have been 1 event for immediate readability
305   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
306   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
307   ASSERT_EQ(handler.log[0].bytesRead, dataLength);
308   ASSERT_EQ(handler.log[0].bytesWritten, 0);
309
310   // There should be another event after the timeout wrote more data
311   ASSERT_EQ(handler.log[1].events, EventHandler::READ);
312   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
313                   milliseconds(events[0].milliseconds));
314   ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
315   ASSERT_EQ(handler.log[1].bytesWritten, 0);
316
317   T_CHECK_TIMEOUT(start, end, milliseconds(20));
318 }
319
320 /**
321  * Test a WRITE event
322  */
323 TEST(EventBaseTest, WriteEvent) {
324   EventBase eb;
325   SocketPair sp;
326
327   // Fill up the write buffer before starting
328   size_t initialBytesWritten = writeUntilFull(sp[0]);
329
330   // Register for write events
331   TestHandler handler(&eb, sp[0]);
332   handler.registerHandler(EventHandler::WRITE);
333
334   // Register timeouts to perform two reads
335   ScheduledEvent events[] = {
336     { 10, EventHandler::READ, 0, 0 },
337     { 60, EventHandler::READ, 0, 0 },
338     { 0, 0, 0, 0 },
339   };
340   scheduleEvents(&eb, sp[1], events);
341
342   // Loop
343   TimePoint start;
344   eb.loop();
345   TimePoint end;
346
347   // Since we didn't use the EventHandler::PERSIST flag, the handler should
348   // have only been able to write once, then unregistered itself.
349   ASSERT_EQ(handler.log.size(), 1);
350   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
351   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
352                   milliseconds(events[0].milliseconds));
353   ASSERT_EQ(handler.log[0].bytesRead, 0);
354   ASSERT_GT(handler.log[0].bytesWritten, 0);
355   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
356
357   ASSERT_EQ(events[0].result, initialBytesWritten);
358   ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
359 }
360
361 /**
362  * Test (WRITE | PERSIST)
363  */
364 TEST(EventBaseTest, WritePersist) {
365   EventBase eb;
366   SocketPair sp;
367
368   // Fill up the write buffer before starting
369   size_t initialBytesWritten = writeUntilFull(sp[0]);
370
371   // Register for write events
372   TestHandler handler(&eb, sp[0]);
373   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
374
375   // Register several timeouts to read from the socket at several intervals
376   ScheduledEvent events[] = {
377     { 10,  EventHandler::READ, 0, 0 },
378     { 40,  EventHandler::READ, 0, 0 },
379     { 70,  EventHandler::READ, 0, 0 },
380     { 100, EventHandler::READ, 0, 0 },
381     { 0, 0, 0, 0 },
382   };
383   scheduleEvents(&eb, sp[1], events);
384
385   // Schedule a timeout to unregister the handler after the third read
386   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
387
388   // Loop
389   TimePoint start;
390   eb.loop();
391   TimePoint end;
392
393   // The handler should have received the first 3 events,
394   // then been unregistered after that.
395   ASSERT_EQ(handler.log.size(), 3);
396   ASSERT_EQ(events[0].result, initialBytesWritten);
397   for (int n = 0; n < 3; ++n) {
398     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
399     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
400                     milliseconds(events[n].milliseconds));
401     ASSERT_EQ(handler.log[n].bytesRead, 0);
402     ASSERT_GT(handler.log[n].bytesWritten, 0);
403     ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
404   }
405   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
406 }
407
408 /**
409  * Test registering for WRITE when the socket is immediately writable
410  */
411 TEST(EventBaseTest, WriteImmediate) {
412   EventBase eb;
413   SocketPair sp;
414
415   // Register for write events
416   TestHandler handler(&eb, sp[0]);
417   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
418
419   // Register a timeout to perform a read
420   ScheduledEvent events[] = {
421     { 10, EventHandler::READ, 0, 0 },
422     { 0, 0, 0, 0 },
423   };
424   scheduleEvents(&eb, sp[1], events);
425
426   // Schedule a timeout to unregister the handler
427   int64_t unregisterTimeout = 40;
428   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
429                    unregisterTimeout);
430
431   // Loop
432   TimePoint start;
433   eb.loop();
434   TimePoint end;
435
436   ASSERT_EQ(handler.log.size(), 2);
437
438   // Since the socket buffer was initially empty,
439   // there should have been 1 event for immediate writability
440   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
441   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
442   ASSERT_EQ(handler.log[0].bytesRead, 0);
443   ASSERT_GT(handler.log[0].bytesWritten, 0);
444
445   // There should be another event after the timeout wrote more data
446   ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
447   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
448                   milliseconds(events[0].milliseconds));
449   ASSERT_EQ(handler.log[1].bytesRead, 0);
450   ASSERT_GT(handler.log[1].bytesWritten, 0);
451
452   T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
453 }
454
455 /**
456  * Test (READ | WRITE) when the socket becomes readable first
457  */
458 TEST(EventBaseTest, ReadWrite) {
459   EventBase eb;
460   SocketPair sp;
461
462   // Fill up the write buffer before starting
463   size_t sock0WriteLength = writeUntilFull(sp[0]);
464
465   // Register for read and write events
466   TestHandler handler(&eb, sp[0]);
467   handler.registerHandler(EventHandler::READ_WRITE);
468
469   // Register timeouts to perform a write then a read.
470   ScheduledEvent events[] = {
471     { 10, EventHandler::WRITE, 2345, 0 },
472     { 40, EventHandler::READ, 0, 0 },
473     { 0, 0, 0, 0 },
474   };
475   scheduleEvents(&eb, sp[1], events);
476
477   // Loop
478   TimePoint start;
479   eb.loop();
480   TimePoint end;
481
482   // Since we didn't use the EventHandler::PERSIST flag, the handler should
483   // have only noticed readability, then unregistered itself.  Check that only
484   // one event was logged.
485   ASSERT_EQ(handler.log.size(), 1);
486   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
487   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
488                   milliseconds(events[0].milliseconds));
489   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
490   ASSERT_EQ(handler.log[0].bytesWritten, 0);
491   ASSERT_EQ(events[1].result, sock0WriteLength);
492   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
493 }
494
495 /**
496  * Test (READ | WRITE) when the socket becomes writable first
497  */
498 TEST(EventBaseTest, WriteRead) {
499   EventBase eb;
500   SocketPair sp;
501
502   // Fill up the write buffer before starting
503   size_t sock0WriteLength = writeUntilFull(sp[0]);
504
505   // Register for read and write events
506   TestHandler handler(&eb, sp[0]);
507   handler.registerHandler(EventHandler::READ_WRITE);
508
509   // Register timeouts to perform a read then a write.
510   size_t sock1WriteLength = 2345;
511   ScheduledEvent events[] = {
512     { 10, EventHandler::READ, 0, 0 },
513     { 40, EventHandler::WRITE, sock1WriteLength, 0 },
514     { 0, 0, 0, 0 },
515   };
516   scheduleEvents(&eb, sp[1], events);
517
518   // Loop
519   TimePoint start;
520   eb.loop();
521   TimePoint end;
522
523   // Since we didn't use the EventHandler::PERSIST flag, the handler should
524   // have only noticed writability, then unregistered itself.  Check that only
525   // one event was logged.
526   ASSERT_EQ(handler.log.size(), 1);
527   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
528   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
529                   milliseconds(events[0].milliseconds));
530   ASSERT_EQ(handler.log[0].bytesRead, 0);
531   ASSERT_GT(handler.log[0].bytesWritten, 0);
532   ASSERT_EQ(events[0].result, sock0WriteLength);
533   ASSERT_EQ(events[1].result, sock1WriteLength);
534   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
535
536   // Make sure the written data is still waiting to be read.
537   size_t bytesRemaining = readUntilEmpty(sp[0]);
538   ASSERT_EQ(bytesRemaining, events[1].length);
539 }
540
541 /**
542  * Test (READ | WRITE) when the socket becomes readable and writable
543  * at the same time.
544  */
545 TEST(EventBaseTest, ReadWriteSimultaneous) {
546   EventBase eb;
547   SocketPair sp;
548
549   // Fill up the write buffer before starting
550   size_t sock0WriteLength = writeUntilFull(sp[0]);
551
552   // Register for read and write events
553   TestHandler handler(&eb, sp[0]);
554   handler.registerHandler(EventHandler::READ_WRITE);
555
556   // Register a timeout to perform a read and write together
557   ScheduledEvent events[] = {
558     { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
559     { 0, 0, 0, 0 },
560   };
561   scheduleEvents(&eb, sp[1], events);
562
563   // Loop
564   TimePoint start;
565   eb.loop();
566   TimePoint end;
567
568   // It's not strictly required that the EventBase register us about both
569   // events in the same call.  So, it's possible that if the EventBase
570   // implementation changes this test could start failing, and it wouldn't be
571   // considered breaking the API.  However for now it's nice to exercise this
572   // code path.
573   ASSERT_EQ(handler.log.size(), 1);
574   ASSERT_EQ(handler.log[0].events,
575                     EventHandler::READ | EventHandler::WRITE);
576   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
577                   milliseconds(events[0].milliseconds));
578   ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
579   ASSERT_GT(handler.log[0].bytesWritten, 0);
580   T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
581 }
582
583 /**
584  * Test (READ | WRITE | PERSIST)
585  */
586 TEST(EventBaseTest, ReadWritePersist) {
587   EventBase eb;
588   SocketPair sp;
589
590   // Register for read and write events
591   TestHandler handler(&eb, sp[0]);
592   handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
593                           EventHandler::PERSIST);
594
595   // Register timeouts to perform several reads and writes
596   ScheduledEvent events[] = {
597     { 10, EventHandler::WRITE, 2345, 0 },
598     { 20, EventHandler::READ, 0, 0 },
599     { 35, EventHandler::WRITE, 200, 0 },
600     { 45, EventHandler::WRITE, 15, 0 },
601     { 55, EventHandler::READ, 0, 0 },
602     { 120, EventHandler::WRITE, 2345, 0 },
603     { 0, 0, 0, 0 },
604   };
605   scheduleEvents(&eb, sp[1], events);
606
607   // Schedule a timeout to unregister the handler
608   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
609
610   // Loop
611   TimePoint start;
612   eb.loop();
613   TimePoint end;
614
615   ASSERT_EQ(handler.log.size(), 6);
616
617   // Since we didn't fill up the write buffer immediately, there should
618   // be an immediate event for writability.
619   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
620   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
621   ASSERT_EQ(handler.log[0].bytesRead, 0);
622   ASSERT_GT(handler.log[0].bytesWritten, 0);
623
624   // Events 1 through 5 should correspond to the scheduled events
625   for (int n = 1; n < 6; ++n) {
626     ScheduledEvent* event = &events[n - 1];
627     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
628                     milliseconds(event->milliseconds));
629     if (event->events == EventHandler::READ) {
630       ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
631       ASSERT_EQ(handler.log[n].bytesRead, 0);
632       ASSERT_GT(handler.log[n].bytesWritten, 0);
633     } else {
634       ASSERT_EQ(handler.log[n].events, EventHandler::READ);
635       ASSERT_EQ(handler.log[n].bytesRead, event->length);
636       ASSERT_EQ(handler.log[n].bytesWritten, 0);
637     }
638   }
639
640   // The timeout should have unregistered the handler before the last write.
641   // Make sure that data is still waiting to be read
642   size_t bytesRemaining = readUntilEmpty(sp[0]);
643   ASSERT_EQ(bytesRemaining, events[5].length);
644 }
645
646
647 class PartialReadHandler : public TestHandler {
648  public:
649   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
650     : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
651
652   void handlerReady(uint16_t events) noexcept override {
653     assert(events == EventHandler::READ);
654     ssize_t bytesRead = readFromFD(fd_, readLength_);
655     log.emplace_back(events, bytesRead, 0);
656   }
657
658  private:
659   int fd_;
660   size_t readLength_;
661 };
662
663 /**
664  * Test reading only part of the available data when a read event is fired.
665  * When PERSIST is used, make sure the handler gets notified again the next
666  * time around the loop.
667  */
668 TEST(EventBaseTest, ReadPartial) {
669   EventBase eb;
670   SocketPair sp;
671
672   // Register for read events
673   size_t readLength = 100;
674   PartialReadHandler handler(&eb, sp[0], readLength);
675   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
676
677   // Register a timeout to perform a single write,
678   // with more data than PartialReadHandler will read at once
679   ScheduledEvent events[] = {
680     { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
681     { 0, 0, 0, 0 },
682   };
683   scheduleEvents(&eb, sp[1], events);
684
685   // Schedule a timeout to unregister the handler
686   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
687
688   // Loop
689   TimePoint start;
690   eb.loop();
691   TimePoint end;
692
693   ASSERT_EQ(handler.log.size(), 4);
694
695   // The first 3 invocations should read readLength bytes each
696   for (int n = 0; n < 3; ++n) {
697     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
698     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
699                     milliseconds(events[0].milliseconds));
700     ASSERT_EQ(handler.log[n].bytesRead, readLength);
701     ASSERT_EQ(handler.log[n].bytesWritten, 0);
702   }
703   // The last read only has readLength/2 bytes
704   ASSERT_EQ(handler.log[3].events, EventHandler::READ);
705   T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
706                   milliseconds(events[0].milliseconds));
707   ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
708   ASSERT_EQ(handler.log[3].bytesWritten, 0);
709 }
710
711
712 class PartialWriteHandler : public TestHandler {
713  public:
714   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
715     : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
716
717   void handlerReady(uint16_t events) noexcept override {
718     assert(events == EventHandler::WRITE);
719     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
720     log.emplace_back(events, 0, bytesWritten);
721   }
722
723  private:
724   int fd_;
725   size_t writeLength_;
726 };
727
728 /**
729  * Test writing without completely filling up the write buffer when the fd
730  * becomes writable.  When PERSIST is used, make sure the handler gets
731  * notified again the next time around the loop.
732  */
733 TEST(EventBaseTest, WritePartial) {
734   EventBase eb;
735   SocketPair sp;
736
737   // Fill up the write buffer before starting
738   size_t initialBytesWritten = writeUntilFull(sp[0]);
739
740   // Register for write events
741   size_t writeLength = 100;
742   PartialWriteHandler handler(&eb, sp[0], writeLength);
743   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
744
745   // Register a timeout to read, so that more data can be written
746   ScheduledEvent events[] = {
747     { 10, EventHandler::READ, 0, 0 },
748     { 0, 0, 0, 0 },
749   };
750   scheduleEvents(&eb, sp[1], events);
751
752   // Schedule a timeout to unregister the handler
753   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
754
755   // Loop
756   TimePoint start;
757   eb.loop();
758   TimePoint end;
759
760   // Depending on how big the socket buffer is, there will be multiple writes
761   // Only check the first 5
762   int numChecked = 5;
763   ASSERT_GE(handler.log.size(), numChecked);
764   ASSERT_EQ(events[0].result, initialBytesWritten);
765
766   // The first 3 invocations should read writeLength bytes each
767   for (int n = 0; n < numChecked; ++n) {
768     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
769     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
770                     milliseconds(events[0].milliseconds));
771     ASSERT_EQ(handler.log[n].bytesRead, 0);
772     ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
773   }
774 }
775
776
777 /**
778  * Test destroying a registered EventHandler
779  */
780 TEST(EventBaseTest, DestroyHandler) {
781   class DestroyHandler : public AsyncTimeout {
782    public:
783     DestroyHandler(EventBase* eb, EventHandler* h)
784       : AsyncTimeout(eb)
785       , handler_(h) {}
786
787     void timeoutExpired() noexcept override { delete handler_; }
788
789    private:
790     EventHandler* handler_;
791   };
792
793   EventBase eb;
794   SocketPair sp;
795
796   // Fill up the write buffer before starting
797   size_t initialBytesWritten = writeUntilFull(sp[0]);
798
799   // Register for write events
800   TestHandler* handler = new TestHandler(&eb, sp[0]);
801   handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
802
803   // After 10ms, read some data, so that the handler
804   // will be notified that it can write.
805   eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
806                    10);
807
808   // Start a timer to destroy the handler after 25ms
809   // This mainly just makes sure the code doesn't break or assert
810   DestroyHandler dh(&eb, handler);
811   dh.scheduleTimeout(25);
812
813   TimePoint start;
814   eb.loop();
815   TimePoint end;
816
817   // Make sure the EventHandler was uninstalled properly when it was
818   // destroyed, and the EventBase loop exited
819   T_CHECK_TIMEOUT(start, end, milliseconds(25));
820
821   // Make sure that the handler wrote data to the socket
822   // before it was destroyed
823   size_t bytesRemaining = readUntilEmpty(sp[1]);
824   ASSERT_GT(bytesRemaining, 0);
825 }
826
827
828 ///////////////////////////////////////////////////////////////////////////
829 // Tests for timeout events
830 ///////////////////////////////////////////////////////////////////////////
831
832 TEST(EventBaseTest, RunAfterDelay) {
833   EventBase eb;
834
835   TimePoint timestamp1(false);
836   TimePoint timestamp2(false);
837   TimePoint timestamp3(false);
838   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
839   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
840   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
841
842   TimePoint start;
843   eb.loop();
844   TimePoint end;
845
846   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
847   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
848   T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
849   T_CHECK_TIMEOUT(start, end, milliseconds(40));
850 }
851
852 /**
853  * Test the behavior of tryRunAfterDelay() when some timeouts are
854  * still scheduled when the EventBase is destroyed.
855  */
856 TEST(EventBaseTest, RunAfterDelayDestruction) {
857   TimePoint timestamp1(false);
858   TimePoint timestamp2(false);
859   TimePoint timestamp3(false);
860   TimePoint timestamp4(false);
861   TimePoint start(false);
862   TimePoint end(false);
863
864   {
865     EventBase eb;
866
867     // Run two normal timeouts
868     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
869     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
870
871     // Schedule a timeout to stop the event loop after 40ms
872     eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
873
874     // Schedule 2 timeouts that would fire after the event loop stops
875     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
876     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
877
878     start.reset();
879     eb.loop();
880     end.reset();
881   }
882
883   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
884   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
885   T_CHECK_TIMEOUT(start, end, milliseconds(40));
886
887   ASSERT_TRUE(timestamp3.isUnset());
888   ASSERT_TRUE(timestamp4.isUnset());
889
890   // Ideally this test should be run under valgrind to ensure that no
891   // memory is leaked.
892 }
893
894 class TestTimeout : public AsyncTimeout {
895  public:
896   explicit TestTimeout(EventBase* eventBase)
897     : AsyncTimeout(eventBase)
898     , timestamp(false) {}
899
900   void timeoutExpired() noexcept override { timestamp.reset(); }
901
902   TimePoint timestamp;
903 };
904
905 TEST(EventBaseTest, BasicTimeouts) {
906   EventBase eb;
907
908   TestTimeout t1(&eb);
909   TestTimeout t2(&eb);
910   TestTimeout t3(&eb);
911   t1.scheduleTimeout(10);
912   t2.scheduleTimeout(20);
913   t3.scheduleTimeout(40);
914
915   TimePoint start;
916   eb.loop();
917   TimePoint end;
918
919   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
920   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
921   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
922   T_CHECK_TIMEOUT(start, end, milliseconds(40));
923 }
924
925 class ReschedulingTimeout : public AsyncTimeout {
926  public:
927   ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
928     : AsyncTimeout(evb)
929     , timeouts_(timeouts)
930     , iterator_(timeouts_.begin()) {}
931
932   void start() {
933     reschedule();
934   }
935
936   void timeoutExpired() noexcept override {
937     timestamps.emplace_back();
938     reschedule();
939   }
940
941   void reschedule() {
942     if (iterator_ != timeouts_.end()) {
943       uint32_t timeout = *iterator_;
944       ++iterator_;
945       scheduleTimeout(timeout);
946     }
947   }
948
949   vector<TimePoint> timestamps;
950
951  private:
952   vector<uint32_t> timeouts_;
953   vector<uint32_t>::const_iterator iterator_;
954 };
955
956 /**
957  * Test rescheduling the same timeout multiple times
958  */
959 TEST(EventBaseTest, ReuseTimeout) {
960   EventBase eb;
961
962   vector<uint32_t> timeouts;
963   timeouts.push_back(10);
964   timeouts.push_back(30);
965   timeouts.push_back(15);
966
967   ReschedulingTimeout t(&eb, timeouts);
968   t.start();
969
970   TimePoint start;
971   eb.loop();
972   TimePoint end;
973
974   // Use a higher tolerance than usual.  We're waiting on 3 timeouts
975   // consecutively.  In general, each timeout may go over by a few
976   // milliseconds, and we're tripling this error by witing on 3 timeouts.
977   milliseconds tolerance{6};
978
979   ASSERT_EQ(timeouts.size(), t.timestamps.size());
980   uint32_t total = 0;
981   for (size_t n = 0; n < timeouts.size(); ++n) {
982     total += timeouts[n];
983     T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
984   }
985   T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
986 }
987
988 /**
989  * Test rescheduling a timeout before it has fired
990  */
991 TEST(EventBaseTest, RescheduleTimeout) {
992   EventBase eb;
993
994   TestTimeout t1(&eb);
995   TestTimeout t2(&eb);
996   TestTimeout t3(&eb);
997
998   t1.scheduleTimeout(15);
999   t2.scheduleTimeout(30);
1000   t3.scheduleTimeout(30);
1001
1002   auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1003       &AsyncTimeout::scheduleTimeout);
1004
1005   // after 10ms, reschedule t2 to run sooner than originally scheduled
1006   eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1007   // after 10ms, reschedule t3 to run later than originally scheduled
1008   eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1009
1010   TimePoint start;
1011   eb.loop();
1012   TimePoint end;
1013
1014   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1015   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1016   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1017   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1018 }
1019
1020 /**
1021  * Test cancelling a timeout
1022  */
1023 TEST(EventBaseTest, CancelTimeout) {
1024   EventBase eb;
1025
1026   vector<uint32_t> timeouts;
1027   timeouts.push_back(10);
1028   timeouts.push_back(30);
1029   timeouts.push_back(25);
1030
1031   ReschedulingTimeout t(&eb, timeouts);
1032   t.start();
1033   eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1034
1035   TimePoint start;
1036   eb.loop();
1037   TimePoint end;
1038
1039   ASSERT_EQ(t.timestamps.size(), 2);
1040   T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1041   T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1042   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1043 }
1044
1045 /**
1046  * Test destroying a scheduled timeout object
1047  */
1048 TEST(EventBaseTest, DestroyTimeout) {
1049   class DestroyTimeout : public AsyncTimeout {
1050    public:
1051     DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1052       : AsyncTimeout(eb)
1053       , timeout_(t) {}
1054
1055     void timeoutExpired() noexcept override { delete timeout_; }
1056
1057    private:
1058     AsyncTimeout* timeout_;
1059   };
1060
1061   EventBase eb;
1062
1063   TestTimeout* t1 = new TestTimeout(&eb);
1064   t1->scheduleTimeout(30);
1065
1066   DestroyTimeout dt(&eb, t1);
1067   dt.scheduleTimeout(10);
1068
1069   TimePoint start;
1070   eb.loop();
1071   TimePoint end;
1072
1073   T_CHECK_TIMEOUT(start, end, milliseconds(10));
1074 }
1075
1076
1077 ///////////////////////////////////////////////////////////////////////////
1078 // Test for runInThreadTestFunc()
1079 ///////////////////////////////////////////////////////////////////////////
1080
1081 struct RunInThreadData {
1082   RunInThreadData(int numThreads, int opsPerThread)
1083     : opsPerThread(opsPerThread)
1084     , opsToGo(numThreads*opsPerThread) {}
1085
1086   EventBase evb;
1087   deque< pair<int, int> > values;
1088
1089   int opsPerThread;
1090   int opsToGo;
1091 };
1092
1093 struct RunInThreadArg {
1094   RunInThreadArg(RunInThreadData* data,
1095                  int threadId,
1096                  int value)
1097     : data(data)
1098     , thread(threadId)
1099     , value(value) {}
1100
1101   RunInThreadData* data;
1102   int thread;
1103   int value;
1104 };
1105
1106 void runInThreadTestFunc(RunInThreadArg* arg) {
1107   arg->data->values.emplace_back(arg->thread, arg->value);
1108   RunInThreadData* data = arg->data;
1109   delete arg;
1110
1111   if(--data->opsToGo == 0) {
1112     // Break out of the event base loop if we are the last thread running
1113     data->evb.terminateLoopSoon();
1114   }
1115 }
1116
1117 TEST(EventBaseTest, RunInThread) {
1118   constexpr uint32_t numThreads = 50;
1119   constexpr uint32_t opsPerThread = 100;
1120   RunInThreadData data(numThreads, opsPerThread);
1121
1122   deque<std::thread> threads;
1123   SCOPE_EXIT {
1124     // Wait on all of the threads.
1125     for (auto& thread : threads) {
1126       thread.join();
1127     }
1128   };
1129
1130   for (uint32_t i = 0; i < numThreads; ++i) {
1131     threads.emplace_back([i, &data] {
1132         for (int n = 0; n < data.opsPerThread; ++n) {
1133           RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1134           data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1135           usleep(10);
1136         }
1137       });
1138   }
1139
1140   // Add a timeout event to run after 3 seconds.
1141   // Otherwise loop() will return immediately since there are no events to run.
1142   // Once the last thread exits, it will stop the loop().  However, this
1143   // timeout also stops the loop in case there is a bug performing the normal
1144   // stop.
1145   data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1146                          3000);
1147
1148   TimePoint start;
1149   data.evb.loop();
1150   TimePoint end;
1151
1152   // Verify that the loop exited because all threads finished and requested it
1153   // to stop.  This should happen much sooner than the 3 second timeout.
1154   // Assert that it happens in under a second.  (This is still tons of extra
1155   // padding.)
1156
1157   auto timeTaken = std::chrono::duration_cast<milliseconds>(
1158     end.getTime() - start.getTime());
1159   ASSERT_LT(timeTaken.count(), 1000);
1160   VLOG(11) << "Time taken: " << timeTaken.count();
1161
1162   // Verify that we have all of the events from every thread
1163   int expectedValues[numThreads];
1164   for (uint32_t n = 0; n < numThreads; ++n) {
1165     expectedValues[n] = 0;
1166   }
1167   for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1168        it != data.values.end();
1169        ++it) {
1170     int threadID = it->first;
1171     int value = it->second;
1172     ASSERT_EQ(expectedValues[threadID], value);
1173     ++expectedValues[threadID];
1174   }
1175   for (uint32_t n = 0; n < numThreads; ++n) {
1176     ASSERT_EQ(expectedValues[n], opsPerThread);
1177   }
1178 }
1179
1180 //  This test simulates some calls, and verifies that the waiting happens by
1181 //  triggering what otherwise would be race conditions, and trying to detect
1182 //  whether any of the race conditions happened.
1183 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1184   const size_t c = 256;
1185   vector<unique_ptr<atomic<size_t>>> atoms(c);
1186   for (size_t i = 0; i < c; ++i) {
1187     auto& atom = atoms.at(i);
1188     atom = std::make_unique<atomic<size_t>>(0);
1189   }
1190   vector<thread> threads;
1191   for (size_t i = 0; i < c; ++i) {
1192     threads.emplace_back([&atoms, i] {
1193       EventBase eb;
1194       auto& atom = *atoms.at(i);
1195       auto ebth = thread([&] { eb.loopForever(); });
1196       eb.waitUntilRunning();
1197       eb.runInEventBaseThreadAndWait([&] {
1198         size_t x = 0;
1199         atom.compare_exchange_weak(
1200             x, 1, std::memory_order_release, std::memory_order_relaxed);
1201       });
1202       size_t x = 0;
1203       atom.compare_exchange_weak(
1204           x, 2, std::memory_order_release, std::memory_order_relaxed);
1205       eb.terminateLoopSoon();
1206       ebth.join();
1207     });
1208   }
1209   for (size_t i = 0; i < c; ++i) {
1210     auto& th = threads.at(i);
1211     th.join();
1212   }
1213   size_t sum = 0;
1214   for (auto& atom : atoms) sum += *atom;
1215   EXPECT_EQ(c, sum);
1216 }
1217
1218 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1219   EventBase eb;
1220   thread th(&EventBase::loopForever, &eb);
1221   SCOPE_EXIT {
1222     eb.terminateLoopSoon();
1223     th.join();
1224   };
1225   auto mutated = false;
1226   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1227       mutated = true;
1228   });
1229   EXPECT_TRUE(mutated);
1230 }
1231
1232 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1233   EventBase eb;
1234   thread th(&EventBase::loopForever, &eb);
1235   SCOPE_EXIT {
1236     eb.terminateLoopSoon();
1237     th.join();
1238   };
1239   eb.runInEventBaseThreadAndWait([&] {
1240       auto mutated = false;
1241       eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1242           mutated = true;
1243       });
1244       EXPECT_TRUE(mutated);
1245   });
1246 }
1247
1248 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1249   EventBase eb;
1250   auto mutated = false;
1251   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1252       mutated = true;
1253     });
1254   EXPECT_TRUE(mutated);
1255 }
1256
1257 ///////////////////////////////////////////////////////////////////////////
1258 // Tests for runInLoop()
1259 ///////////////////////////////////////////////////////////////////////////
1260
1261 class CountedLoopCallback : public EventBase::LoopCallback {
1262  public:
1263   CountedLoopCallback(EventBase* eventBase,
1264                       unsigned int count,
1265                       std::function<void()> action =
1266                         std::function<void()>())
1267     : eventBase_(eventBase)
1268     , count_(count)
1269     , action_(action) {}
1270
1271   void runLoopCallback() noexcept override {
1272     --count_;
1273     if (count_ > 0) {
1274       eventBase_->runInLoop(this);
1275     } else if (action_) {
1276       action_();
1277     }
1278   }
1279
1280   unsigned int getCount() const {
1281     return count_;
1282   }
1283
1284  private:
1285   EventBase* eventBase_;
1286   unsigned int count_;
1287   std::function<void()> action_;
1288 };
1289
1290 // Test that EventBase::loop() doesn't exit while there are
1291 // still LoopCallbacks remaining to be invoked.
1292 TEST(EventBaseTest, RepeatedRunInLoop) {
1293   EventBase eventBase;
1294
1295   CountedLoopCallback c(&eventBase, 10);
1296   eventBase.runInLoop(&c);
1297   // The callback shouldn't have run immediately
1298   ASSERT_EQ(c.getCount(), 10);
1299   eventBase.loop();
1300
1301   // loop() should loop until the CountedLoopCallback stops
1302   // re-installing itself.
1303   ASSERT_EQ(c.getCount(), 0);
1304 }
1305
1306 // Test that EventBase::loop() works as expected without time measurements.
1307 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1308   EventBase eventBase(false);
1309
1310   CountedLoopCallback c(&eventBase, 10);
1311   eventBase.runInLoop(&c);
1312   // The callback shouldn't have run immediately
1313   ASSERT_EQ(c.getCount(), 10);
1314   eventBase.loop();
1315
1316   // loop() should loop until the CountedLoopCallback stops
1317   // re-installing itself.
1318   ASSERT_EQ(c.getCount(), 0);
1319 }
1320
1321 // Test runInLoop() calls with terminateLoopSoon()
1322 TEST(EventBaseTest, RunInLoopStopLoop) {
1323   EventBase eventBase;
1324
1325   CountedLoopCallback c1(&eventBase, 20);
1326   CountedLoopCallback c2(&eventBase, 10,
1327                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1328
1329   eventBase.runInLoop(&c1);
1330   eventBase.runInLoop(&c2);
1331   ASSERT_EQ(c1.getCount(), 20);
1332   ASSERT_EQ(c2.getCount(), 10);
1333
1334   eventBase.loopForever();
1335
1336   // c2 should have stopped the loop after 10 iterations
1337   ASSERT_EQ(c2.getCount(), 0);
1338
1339   // We allow the EventBase to run the loop callbacks in whatever order it
1340   // chooses.  We'll accept c1's count being either 10 (if the loop terminated
1341   // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1342   // before c1 ran).
1343   //
1344   // (With the current code, c1 will always run 10 times, but we don't consider
1345   // this a hard API requirement.)
1346   ASSERT_GE(c1.getCount(), 10);
1347   ASSERT_LE(c1.getCount(), 11);
1348 }
1349
1350 TEST(EventBaseTest, messageAvailableException) {
1351   auto deadManWalking = [] {
1352     EventBase eventBase;
1353     std::thread t([&] {
1354       // Call this from another thread to force use of NotificationQueue in
1355       // runInEventBaseThread
1356       eventBase.runInEventBaseThread(
1357           []() { throw std::runtime_error("boom"); });
1358     });
1359     t.join();
1360     eventBase.loopForever();
1361   };
1362   EXPECT_DEATH(deadManWalking(), ".*");
1363 }
1364
1365 TEST(EventBaseTest, TryRunningAfterTerminate) {
1366   EventBase eventBase;
1367   CountedLoopCallback c1(&eventBase, 1,
1368                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1369   eventBase.runInLoop(&c1);
1370   eventBase.loopForever();
1371   bool ran = false;
1372   eventBase.runInEventBaseThread([&]() {
1373     ran = true;
1374   });
1375
1376   ASSERT_FALSE(ran);
1377 }
1378
1379 // Test cancelling runInLoop() callbacks
1380 TEST(EventBaseTest, CancelRunInLoop) {
1381   EventBase eventBase;
1382
1383   CountedLoopCallback c1(&eventBase, 20);
1384   CountedLoopCallback c2(&eventBase, 20);
1385   CountedLoopCallback c3(&eventBase, 20);
1386
1387   std::function<void()> cancelC1Action =
1388     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1389   std::function<void()> cancelC2Action =
1390     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1391
1392   CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1393   CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1394
1395   // Install cancelC1 after c1
1396   eventBase.runInLoop(&c1);
1397   eventBase.runInLoop(&cancelC1);
1398
1399   // Install cancelC2 before c2
1400   eventBase.runInLoop(&cancelC2);
1401   eventBase.runInLoop(&c2);
1402
1403   // Install c3
1404   eventBase.runInLoop(&c3);
1405
1406   ASSERT_EQ(c1.getCount(), 20);
1407   ASSERT_EQ(c2.getCount(), 20);
1408   ASSERT_EQ(c3.getCount(), 20);
1409   ASSERT_EQ(cancelC1.getCount(), 10);
1410   ASSERT_EQ(cancelC2.getCount(), 10);
1411
1412   // Run the loop
1413   eventBase.loop();
1414
1415   // cancelC1 and cancelC3 should have both fired after 10 iterations and
1416   // stopped re-installing themselves
1417   ASSERT_EQ(cancelC1.getCount(), 0);
1418   ASSERT_EQ(cancelC2.getCount(), 0);
1419   // c3 should have continued on for the full 20 iterations
1420   ASSERT_EQ(c3.getCount(), 0);
1421
1422   // c1 and c2 should have both been cancelled on the 10th iteration.
1423   //
1424   // Callbacks are always run in the order they are installed,
1425   // so c1 should have fired 10 times, and been canceled after it ran on the
1426   // 10th iteration.  c2 should have only fired 9 times, because cancelC2 will
1427   // have run before it on the 10th iteration, and cancelled it before it
1428   // fired.
1429   ASSERT_EQ(c1.getCount(), 10);
1430   ASSERT_EQ(c2.getCount(), 11);
1431 }
1432
1433 class TerminateTestCallback : public EventBase::LoopCallback,
1434                               public EventHandler {
1435  public:
1436   TerminateTestCallback(EventBase* eventBase, int fd)
1437     : EventHandler(eventBase, fd),
1438       eventBase_(eventBase),
1439       loopInvocations_(0),
1440       maxLoopInvocations_(0),
1441       eventInvocations_(0),
1442       maxEventInvocations_(0) {}
1443
1444   void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1445     loopInvocations_ = 0;
1446     maxLoopInvocations_ = maxLoopInvocations;
1447     eventInvocations_ = 0;
1448     maxEventInvocations_ = maxEventInvocations;
1449
1450     cancelLoopCallback();
1451     unregisterHandler();
1452   }
1453
1454   void handlerReady(uint16_t /* events */) noexcept override {
1455     // We didn't register with PERSIST, so we will have been automatically
1456     // unregistered already.
1457     ASSERT_FALSE(isHandlerRegistered());
1458
1459     ++eventInvocations_;
1460     if (eventInvocations_ >= maxEventInvocations_) {
1461       return;
1462     }
1463
1464     eventBase_->runInLoop(this);
1465   }
1466   void runLoopCallback() noexcept override {
1467     ++loopInvocations_;
1468     if (loopInvocations_ >= maxLoopInvocations_) {
1469       return;
1470     }
1471
1472     registerHandler(READ);
1473   }
1474
1475   uint32_t getLoopInvocations() const {
1476     return loopInvocations_;
1477   }
1478   uint32_t getEventInvocations() const {
1479     return eventInvocations_;
1480   }
1481
1482  private:
1483   EventBase* eventBase_;
1484   uint32_t loopInvocations_;
1485   uint32_t maxLoopInvocations_;
1486   uint32_t eventInvocations_;
1487   uint32_t maxEventInvocations_;
1488 };
1489
1490 /**
1491  * Test that EventBase::loop() correctly detects when there are no more events
1492  * left to run.
1493  *
1494  * This uses a single callback, which alternates registering itself as a loop
1495  * callback versus a EventHandler callback.  This exercises a regression where
1496  * EventBase::loop() incorrectly exited if there were no more fd handlers
1497  * registered, but a loop callback installed a new fd handler.
1498  */
1499 TEST(EventBaseTest, LoopTermination) {
1500   EventBase eventBase;
1501
1502   // Open a pipe and close the write end,
1503   // so the read endpoint will be readable
1504   int pipeFds[2];
1505   int rc = pipe(pipeFds);
1506   ASSERT_EQ(rc, 0);
1507   close(pipeFds[1]);
1508   TerminateTestCallback callback(&eventBase, pipeFds[0]);
1509
1510   // Test once where the callback will exit after a loop callback
1511   callback.reset(10, 100);
1512   eventBase.runInLoop(&callback);
1513   eventBase.loop();
1514   ASSERT_EQ(callback.getLoopInvocations(), 10);
1515   ASSERT_EQ(callback.getEventInvocations(), 9);
1516
1517   // Test once where the callback will exit after an fd event callback
1518   callback.reset(100, 7);
1519   eventBase.runInLoop(&callback);
1520   eventBase.loop();
1521   ASSERT_EQ(callback.getLoopInvocations(), 7);
1522   ASSERT_EQ(callback.getEventInvocations(), 7);
1523
1524   close(pipeFds[0]);
1525 }
1526
1527 ///////////////////////////////////////////////////////////////////////////
1528 // Tests for latency calculations
1529 ///////////////////////////////////////////////////////////////////////////
1530
1531 class IdleTimeTimeoutSeries : public AsyncTimeout {
1532
1533  public:
1534
1535   explicit IdleTimeTimeoutSeries(EventBase *base,
1536                                  std::deque<std::uint64_t>& timeout) :
1537     AsyncTimeout(base),
1538     timeouts_(0),
1539     timeout_(timeout) {
1540       scheduleTimeout(1);
1541     }
1542
1543     ~IdleTimeTimeoutSeries() override {}
1544
1545     void timeoutExpired() noexcept override {
1546     ++timeouts_;
1547
1548     if(timeout_.empty()){
1549       cancelTimeout();
1550     } else {
1551       uint64_t sleepTime = timeout_.front();
1552       timeout_.pop_front();
1553       if (sleepTime) {
1554         usleep(sleepTime);
1555       }
1556       scheduleTimeout(1);
1557     }
1558   }
1559
1560   int getTimeouts() const {
1561     return timeouts_;
1562   }
1563
1564  private:
1565   int timeouts_;
1566   std::deque<uint64_t>& timeout_;
1567 };
1568
1569 /**
1570  * Verify that idle time is correctly accounted for when decaying our loop
1571  * time.
1572  *
1573  * This works by creating a high loop time (via usleep), expecting a latency
1574  * callback with known value, and then scheduling a timeout for later. This
1575  * later timeout is far enough in the future that the idle time should have
1576  * caused the loop time to decay.
1577  */
1578 TEST(EventBaseTest, IdleTime) {
1579   EventBase eventBase;
1580   eventBase.setLoadAvgMsec(1000ms);
1581   eventBase.resetLoadAvg(5900.0);
1582   std::deque<uint64_t> timeouts0(4, 8080);
1583   timeouts0.push_front(8000);
1584   timeouts0.push_back(14000);
1585   IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1586   std::deque<uint64_t> timeouts(20, 20);
1587   std::unique_ptr<IdleTimeTimeoutSeries> tos;
1588   int64_t testStart = duration_cast<microseconds>(
1589     std::chrono::steady_clock::now().time_since_epoch()).count();
1590   bool hostOverloaded = false;
1591
1592   int latencyCallbacks = 0;
1593   eventBase.setMaxLatency(6000us, [&]() {
1594     ++latencyCallbacks;
1595     if (latencyCallbacks != 1) {
1596       FAIL() << "Unexpected latency callback";
1597     }
1598
1599     if (tos0.getTimeouts() < 6) {
1600       // This could only happen if the host this test is running
1601       // on is heavily loaded.
1602       int64_t maxLatencyReached = duration_cast<microseconds>(
1603           std::chrono::steady_clock::now().time_since_epoch()).count();
1604       ASSERT_LE(43800, maxLatencyReached - testStart);
1605       hostOverloaded = true;
1606       return;
1607     }
1608     ASSERT_EQ(6, tos0.getTimeouts());
1609     ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1610     ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1611     tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1612   });
1613
1614   // Kick things off with an "immedite" timeout
1615   tos0.scheduleTimeout(1);
1616
1617   eventBase.loop();
1618
1619   if (hostOverloaded) {
1620     return;
1621   }
1622
1623   ASSERT_EQ(1, latencyCallbacks);
1624   ASSERT_EQ(7, tos0.getTimeouts());
1625   ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1626   ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1627   ASSERT_TRUE(!!tos);
1628   ASSERT_EQ(21, tos->getTimeouts());
1629 }
1630
1631 /**
1632  * Test that thisLoop functionality works with terminateLoopSoon
1633  */
1634 TEST(EventBaseTest, ThisLoop) {
1635   EventBase eb;
1636   bool runInLoop = false;
1637   bool runThisLoop = false;
1638
1639   eb.runInLoop([&](){
1640       eb.terminateLoopSoon();
1641       eb.runInLoop([&]() {
1642           runInLoop = true;
1643         });
1644       eb.runInLoop([&]() {
1645           runThisLoop = true;
1646         }, true);
1647     }, true);
1648   eb.loopForever();
1649
1650   // Should not work
1651   ASSERT_FALSE(runInLoop);
1652   // Should work with thisLoop
1653   ASSERT_TRUE(runThisLoop);
1654 }
1655
1656 TEST(EventBaseTest, EventBaseThreadLoop) {
1657   EventBase base;
1658   bool ran = false;
1659
1660   base.runInEventBaseThread([&](){
1661     ran = true;
1662   });
1663   base.loop();
1664
1665   ASSERT_TRUE(ran);
1666 }
1667
1668 TEST(EventBaseTest, EventBaseThreadName) {
1669   EventBase base;
1670   base.setName("foo");
1671   base.loop();
1672
1673 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1674   char name[16];
1675   pthread_getname_np(pthread_self(), name, 16);
1676   ASSERT_EQ(0, strcmp("foo", name));
1677 #endif
1678 }
1679
1680 TEST(EventBaseTest, RunBeforeLoop) {
1681   EventBase base;
1682   CountedLoopCallback cb(&base, 1, [&](){
1683     base.terminateLoopSoon();
1684   });
1685   base.runBeforeLoop(&cb);
1686   base.loopForever();
1687   ASSERT_EQ(cb.getCount(), 0);
1688 }
1689
1690 TEST(EventBaseTest, RunBeforeLoopWait) {
1691   EventBase base;
1692   CountedLoopCallback cb(&base, 1);
1693   base.tryRunAfterDelay([&](){
1694       base.terminateLoopSoon();
1695     }, 500);
1696   base.runBeforeLoop(&cb);
1697   base.loopForever();
1698
1699   // Check that we only ran once, and did not loop multiple times.
1700   ASSERT_EQ(cb.getCount(), 0);
1701 }
1702
1703 class PipeHandler : public EventHandler {
1704 public:
1705   PipeHandler(EventBase* eventBase, int fd)
1706     : EventHandler(eventBase, fd) {}
1707
1708   void handlerReady(uint16_t /* events */) noexcept override { abort(); }
1709 };
1710
1711 TEST(EventBaseTest, StopBeforeLoop) {
1712   EventBase evb;
1713
1714   // Give the evb something to do.
1715   int p[2];
1716   ASSERT_EQ(0, pipe(p));
1717   PipeHandler handler(&evb, p[0]);
1718   handler.registerHandler(EventHandler::READ);
1719
1720   // It's definitely not running yet
1721   evb.terminateLoopSoon();
1722
1723   // let it run, it should exit quickly.
1724   std::thread t([&] { evb.loop(); });
1725   t.join();
1726
1727   handler.unregisterHandler();
1728   close(p[0]);
1729   close(p[1]);
1730
1731   SUCCEED();
1732 }
1733
1734 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1735   bool ran = false;
1736
1737   {
1738     EventBase base;
1739     base.runInEventBaseThread([&](){
1740       ran = true;
1741     });
1742   }
1743
1744   ASSERT_TRUE(ran);
1745 }
1746
1747 TEST(EventBaseTest, LoopKeepAlive) {
1748   EventBase evb;
1749
1750   bool done = false;
1751   std::thread t([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
1752     /* sleep override */ std::this_thread::sleep_for(
1753         std::chrono::milliseconds(100));
1754     evb.runInEventBaseThread(
1755         [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1756   });
1757
1758   evb.loop();
1759
1760   ASSERT_TRUE(done);
1761
1762   t.join();
1763 }
1764
1765 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1766   EventBase evb;
1767
1768   bool done = false;
1769   std::thread t;
1770
1771   evb.runInEventBaseThread([&] {
1772     t = std::thread([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
1773       /* sleep override */ std::this_thread::sleep_for(
1774           std::chrono::milliseconds(100));
1775       evb.runInEventBaseThread(
1776           [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1777     });
1778   });
1779
1780   evb.loop();
1781
1782   ASSERT_TRUE(done);
1783
1784   t.join();
1785 }
1786
1787 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1788   std::unique_ptr<EventBase> evb = std::make_unique<EventBase>();
1789
1790   bool done = false;
1791
1792   std::thread evThread([&] {
1793     evb->loopForever();
1794     evb.reset();
1795     done = true;
1796   });
1797
1798   {
1799     auto* ev = evb.get();
1800     Executor::KeepAlive keepAlive;
1801     ev->runInEventBaseThreadAndWait(
1802         [&ev, &keepAlive] { keepAlive = ev->getKeepAliveToken(); });
1803     ASSERT_FALSE(done) << "Loop finished before we asked it to";
1804     ev->terminateLoopSoon();
1805     /* sleep override */
1806     std::this_thread::sleep_for(std::chrono::milliseconds(30));
1807     ASSERT_FALSE(done) << "Loop terminated early";
1808     ev->runInEventBaseThread([keepAlive = std::move(keepAlive)]{});
1809   }
1810
1811   evThread.join();
1812   ASSERT_TRUE(done);
1813 }
1814
1815 TEST(EventBaseTest, LoopKeepAliveShutdown) {
1816   auto evb = std::make_unique<EventBase>();
1817
1818   bool done = false;
1819
1820   std::thread t([
1821     &done,
1822     loopKeepAlive = evb->getKeepAliveToken(),
1823     evbPtr = evb.get()
1824   ]() mutable {
1825     /* sleep override */ std::this_thread::sleep_for(
1826         std::chrono::milliseconds(100));
1827     evbPtr->runInEventBaseThread(
1828         [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1829   });
1830
1831   evb.reset();
1832
1833   ASSERT_TRUE(done);
1834
1835   t.join();
1836 }
1837
1838 TEST(EventBaseTest, LoopKeepAliveAtomic) {
1839   auto evb = std::make_unique<EventBase>();
1840
1841   static constexpr size_t kNumThreads = 100;
1842   static constexpr size_t kNumTasks = 100;
1843
1844   std::vector<std::thread> ts;
1845   std::vector<std::unique_ptr<Baton<>>> batons;
1846   size_t done{0};
1847
1848   for (size_t i = 0; i < kNumThreads; ++i) {
1849     batons.emplace_back(std::make_unique<Baton<>>());
1850   }
1851
1852   for (size_t i = 0; i < kNumThreads; ++i) {
1853     ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
1854       std::vector<Executor::KeepAlive> keepAlives;
1855       for (size_t j = 0; j < kNumTasks; ++j) {
1856         keepAlives.emplace_back(evbPtr->getKeepAliveToken());
1857       }
1858
1859       batonPtr->post();
1860
1861       /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
1862
1863       for (auto& keepAlive : keepAlives) {
1864         evbPtr->runInEventBaseThread(
1865             [&done, keepAlive = std::move(keepAlive) ]() { ++done; });
1866       }
1867     });
1868   }
1869
1870   for (auto& baton : batons) {
1871     baton->wait();
1872   }
1873
1874   evb.reset();
1875
1876   EXPECT_EQ(kNumThreads * kNumTasks, done);
1877
1878   for (auto& t : ts) {
1879     t.join();
1880   }
1881 }
1882
1883 TEST(EventBaseTest, DrivableExecutorTest) {
1884   folly::Promise<bool> p;
1885   auto f = p.getFuture();
1886   EventBase base;
1887   bool finished = false;
1888
1889   std::thread t([&] {
1890     /* sleep override */
1891     std::this_thread::sleep_for(std::chrono::microseconds(10));
1892     finished = true;
1893     base.runInEventBaseThread([&]() { p.setValue(true); });
1894   });
1895
1896   // Ensure drive does not busy wait
1897   base.drive(); // TODO: fix notification queue init() extra wakeup
1898   base.drive();
1899   EXPECT_TRUE(finished);
1900
1901   folly::Promise<bool> p2;
1902   auto f2 = p2.getFuture();
1903   // Ensure waitVia gets woken up properly, even from
1904   // a separate thread.
1905   base.runAfterDelay([&]() { p2.setValue(true); }, 10);
1906   f2.waitVia(&base);
1907   EXPECT_TRUE(f2.isReady());
1908
1909   t.join();
1910 }
1911
1912 TEST(EventBaseTest, RequestContextTest) {
1913   EventBase evb;
1914   auto defaultCtx = RequestContext::get();
1915
1916   {
1917     RequestContextScopeGuard rctx;
1918     auto context = RequestContext::get();
1919     EXPECT_NE(defaultCtx, context);
1920     evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
1921   }
1922
1923   EXPECT_EQ(defaultCtx, RequestContext::get());
1924   evb.loop();
1925   EXPECT_EQ(defaultCtx, RequestContext::get());
1926 }