Thread-safe version of loopKeepAlive()
[folly.git] / folly / io / async / test / EventBaseTest.cpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19
20 #include <folly/Memory.h>
21 #include <folly/ScopeGuard.h>
22
23 #include <folly/io/async/AsyncTimeout.h>
24 #include <folly/io/async/EventBase.h>
25 #include <folly/io/async/EventHandler.h>
26 #include <folly/io/async/test/SocketPair.h>
27 #include <folly/io/async/test/Util.h>
28 #include <folly/portability/Unistd.h>
29
30 #include <folly/futures/Promise.h>
31
32 #include <atomic>
33 #include <iostream>
34 #include <memory>
35 #include <thread>
36
37 using std::atomic;
38 using std::deque;
39 using std::pair;
40 using std::vector;
41 using std::unique_ptr;
42 using std::thread;
43 using std::make_pair;
44 using std::cerr;
45 using std::endl;
46 using std::chrono::milliseconds;
47 using std::chrono::microseconds;
48 using std::chrono::duration_cast;
49
50 using namespace folly;
51
52 ///////////////////////////////////////////////////////////////////////////
53 // Tests for read and write events
54 ///////////////////////////////////////////////////////////////////////////
55
56 enum { BUF_SIZE = 4096 };
57
58 ssize_t writeToFD(int fd, size_t length) {
59   // write an arbitrary amount of data to the fd
60   auto bufv = vector<char>(length);
61   auto buf = bufv.data();
62   memset(buf, 'a', length);
63   ssize_t rc = write(fd, buf, length);
64   CHECK_EQ(rc, length);
65   return rc;
66 }
67
68 size_t writeUntilFull(int fd) {
69   // Write to the fd until EAGAIN is returned
70   size_t bytesWritten = 0;
71   char buf[BUF_SIZE];
72   memset(buf, 'a', sizeof(buf));
73   while (true) {
74     ssize_t rc = write(fd, buf, sizeof(buf));
75     if (rc < 0) {
76       CHECK_EQ(errno, EAGAIN);
77       break;
78     } else {
79       bytesWritten += rc;
80     }
81   }
82   return bytesWritten;
83 }
84
85 ssize_t readFromFD(int fd, size_t length) {
86   // write an arbitrary amount of data to the fd
87   auto buf = vector<char>(length);
88   return read(fd, buf.data(), length);
89 }
90
91 size_t readUntilEmpty(int fd) {
92   // Read from the fd until EAGAIN is returned
93   char buf[BUF_SIZE];
94   size_t bytesRead = 0;
95   while (true) {
96     int rc = read(fd, buf, sizeof(buf));
97     if (rc == 0) {
98       CHECK(false) << "unexpected EOF";
99     } else if (rc < 0) {
100       CHECK_EQ(errno, EAGAIN);
101       break;
102     } else {
103       bytesRead += rc;
104     }
105   }
106   return bytesRead;
107 }
108
109 void checkReadUntilEmpty(int fd, size_t expectedLength) {
110   ASSERT_EQ(readUntilEmpty(fd), expectedLength);
111 }
112
113 struct ScheduledEvent {
114   int milliseconds;
115   uint16_t events;
116   size_t length;
117   ssize_t result;
118
119   void perform(int fd) {
120     if (events & EventHandler::READ) {
121       if (length == 0) {
122         result = readUntilEmpty(fd);
123       } else {
124         result = readFromFD(fd, length);
125       }
126     }
127     if (events & EventHandler::WRITE) {
128       if (length == 0) {
129         result = writeUntilFull(fd);
130       } else {
131         result = writeToFD(fd, length);
132       }
133     }
134   }
135 };
136
137 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
138   for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
139     eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
140                              ev->milliseconds);
141   }
142 }
143
144 class TestHandler : public EventHandler {
145  public:
146   TestHandler(EventBase* eventBase, int fd)
147     : EventHandler(eventBase, fd), fd_(fd) {}
148
149   void handlerReady(uint16_t events) noexcept override {
150     ssize_t bytesRead = 0;
151     ssize_t bytesWritten = 0;
152     if (events & READ) {
153       // Read all available data, so EventBase will stop calling us
154       // until new data becomes available
155       bytesRead = readUntilEmpty(fd_);
156     }
157     if (events & WRITE) {
158       // Write until the pipe buffer is full, so EventBase will stop calling
159       // us until the other end has read some data
160       bytesWritten = writeUntilFull(fd_);
161     }
162
163     log.emplace_back(events, bytesRead, bytesWritten);
164   }
165
166   struct EventRecord {
167     EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
168       : events(events)
169       , timestamp()
170       , bytesRead(bytesRead)
171       , bytesWritten(bytesWritten) {}
172
173     uint16_t events;
174     TimePoint timestamp;
175     ssize_t bytesRead;
176     ssize_t bytesWritten;
177   };
178
179   deque<EventRecord> log;
180
181  private:
182   int fd_;
183 };
184
185 /**
186  * Test a READ event
187  */
188 TEST(EventBaseTest, ReadEvent) {
189   EventBase eb;
190   SocketPair sp;
191
192   // Register for read events
193   TestHandler handler(&eb, sp[0]);
194   handler.registerHandler(EventHandler::READ);
195
196   // Register timeouts to perform two write events
197   ScheduledEvent events[] = {
198     { 10, EventHandler::WRITE, 2345, 0 },
199     { 160, EventHandler::WRITE, 99, 0 },
200     { 0, 0, 0, 0 },
201   };
202   scheduleEvents(&eb, sp[1], events);
203
204   // Loop
205   TimePoint start;
206   eb.loop();
207   TimePoint end;
208
209   // Since we didn't use the EventHandler::PERSIST flag, the handler should
210   // have received the first read, then unregistered itself.  Check that only
211   // the first chunk of data was received.
212   ASSERT_EQ(handler.log.size(), 1);
213   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
214   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
215                   milliseconds(events[0].milliseconds), milliseconds(90));
216   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
217   ASSERT_EQ(handler.log[0].bytesWritten, 0);
218   T_CHECK_TIMEOUT(start, end,
219                   milliseconds(events[1].milliseconds), milliseconds(30));
220
221   // Make sure the second chunk of data is still waiting to be read.
222   size_t bytesRemaining = readUntilEmpty(sp[0]);
223   ASSERT_EQ(bytesRemaining, events[1].length);
224 }
225
226 /**
227  * Test (READ | PERSIST)
228  */
229 TEST(EventBaseTest, ReadPersist) {
230   EventBase eb;
231   SocketPair sp;
232
233   // Register for read events
234   TestHandler handler(&eb, sp[0]);
235   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
236
237   // Register several timeouts to perform writes
238   ScheduledEvent events[] = {
239     { 10,  EventHandler::WRITE, 1024, 0 },
240     { 20,  EventHandler::WRITE, 2211, 0 },
241     { 30,  EventHandler::WRITE, 4096, 0 },
242     { 100, EventHandler::WRITE, 100,  0 },
243     { 0, 0, 0, 0 },
244   };
245   scheduleEvents(&eb, sp[1], events);
246
247   // Schedule a timeout to unregister the handler after the third write
248   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
249
250   // Loop
251   TimePoint start;
252   eb.loop();
253   TimePoint end;
254
255   // The handler should have received the first 3 events,
256   // then been unregistered after that.
257   ASSERT_EQ(handler.log.size(), 3);
258   for (int n = 0; n < 3; ++n) {
259     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
260     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
261                     milliseconds(events[n].milliseconds));
262     ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
263     ASSERT_EQ(handler.log[n].bytesWritten, 0);
264   }
265   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
266
267   // Make sure the data from the last write is still waiting to be read
268   size_t bytesRemaining = readUntilEmpty(sp[0]);
269   ASSERT_EQ(bytesRemaining, events[3].length);
270 }
271
272 /**
273  * Test registering for READ when the socket is immediately readable
274  */
275 TEST(EventBaseTest, ReadImmediate) {
276   EventBase eb;
277   SocketPair sp;
278
279   // Write some data to the socket so the other end will
280   // be immediately readable
281   size_t dataLength = 1234;
282   writeToFD(sp[1], dataLength);
283
284   // Register for read events
285   TestHandler handler(&eb, sp[0]);
286   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
287
288   // Register a timeout to perform another write
289   ScheduledEvent events[] = {
290     { 10, EventHandler::WRITE, 2345, 0 },
291     { 0, 0, 0, 0 },
292   };
293   scheduleEvents(&eb, sp[1], events);
294
295   // Schedule a timeout to unregister the handler
296   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
297
298   // Loop
299   TimePoint start;
300   eb.loop();
301   TimePoint end;
302
303   ASSERT_EQ(handler.log.size(), 2);
304
305   // There should have been 1 event for immediate readability
306   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
307   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
308   ASSERT_EQ(handler.log[0].bytesRead, dataLength);
309   ASSERT_EQ(handler.log[0].bytesWritten, 0);
310
311   // There should be another event after the timeout wrote more data
312   ASSERT_EQ(handler.log[1].events, EventHandler::READ);
313   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
314                   milliseconds(events[0].milliseconds));
315   ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
316   ASSERT_EQ(handler.log[1].bytesWritten, 0);
317
318   T_CHECK_TIMEOUT(start, end, milliseconds(20));
319 }
320
321 /**
322  * Test a WRITE event
323  */
324 TEST(EventBaseTest, WriteEvent) {
325   EventBase eb;
326   SocketPair sp;
327
328   // Fill up the write buffer before starting
329   size_t initialBytesWritten = writeUntilFull(sp[0]);
330
331   // Register for write events
332   TestHandler handler(&eb, sp[0]);
333   handler.registerHandler(EventHandler::WRITE);
334
335   // Register timeouts to perform two reads
336   ScheduledEvent events[] = {
337     { 10, EventHandler::READ, 0, 0 },
338     { 60, EventHandler::READ, 0, 0 },
339     { 0, 0, 0, 0 },
340   };
341   scheduleEvents(&eb, sp[1], events);
342
343   // Loop
344   TimePoint start;
345   eb.loop();
346   TimePoint end;
347
348   // Since we didn't use the EventHandler::PERSIST flag, the handler should
349   // have only been able to write once, then unregistered itself.
350   ASSERT_EQ(handler.log.size(), 1);
351   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
352   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
353                   milliseconds(events[0].milliseconds));
354   ASSERT_EQ(handler.log[0].bytesRead, 0);
355   ASSERT_GT(handler.log[0].bytesWritten, 0);
356   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
357
358   ASSERT_EQ(events[0].result, initialBytesWritten);
359   ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
360 }
361
362 /**
363  * Test (WRITE | PERSIST)
364  */
365 TEST(EventBaseTest, WritePersist) {
366   EventBase eb;
367   SocketPair sp;
368
369   // Fill up the write buffer before starting
370   size_t initialBytesWritten = writeUntilFull(sp[0]);
371
372   // Register for write events
373   TestHandler handler(&eb, sp[0]);
374   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
375
376   // Register several timeouts to read from the socket at several intervals
377   ScheduledEvent events[] = {
378     { 10,  EventHandler::READ, 0, 0 },
379     { 40,  EventHandler::READ, 0, 0 },
380     { 70,  EventHandler::READ, 0, 0 },
381     { 100, EventHandler::READ, 0, 0 },
382     { 0, 0, 0, 0 },
383   };
384   scheduleEvents(&eb, sp[1], events);
385
386   // Schedule a timeout to unregister the handler after the third read
387   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
388
389   // Loop
390   TimePoint start;
391   eb.loop();
392   TimePoint end;
393
394   // The handler should have received the first 3 events,
395   // then been unregistered after that.
396   ASSERT_EQ(handler.log.size(), 3);
397   ASSERT_EQ(events[0].result, initialBytesWritten);
398   for (int n = 0; n < 3; ++n) {
399     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
400     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
401                     milliseconds(events[n].milliseconds));
402     ASSERT_EQ(handler.log[n].bytesRead, 0);
403     ASSERT_GT(handler.log[n].bytesWritten, 0);
404     ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
405   }
406   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
407 }
408
409 /**
410  * Test registering for WRITE when the socket is immediately writable
411  */
412 TEST(EventBaseTest, WriteImmediate) {
413   EventBase eb;
414   SocketPair sp;
415
416   // Register for write events
417   TestHandler handler(&eb, sp[0]);
418   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
419
420   // Register a timeout to perform a read
421   ScheduledEvent events[] = {
422     { 10, EventHandler::READ, 0, 0 },
423     { 0, 0, 0, 0 },
424   };
425   scheduleEvents(&eb, sp[1], events);
426
427   // Schedule a timeout to unregister the handler
428   int64_t unregisterTimeout = 40;
429   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
430                    unregisterTimeout);
431
432   // Loop
433   TimePoint start;
434   eb.loop();
435   TimePoint end;
436
437   ASSERT_EQ(handler.log.size(), 2);
438
439   // Since the socket buffer was initially empty,
440   // there should have been 1 event for immediate writability
441   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
442   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
443   ASSERT_EQ(handler.log[0].bytesRead, 0);
444   ASSERT_GT(handler.log[0].bytesWritten, 0);
445
446   // There should be another event after the timeout wrote more data
447   ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
448   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
449                   milliseconds(events[0].milliseconds));
450   ASSERT_EQ(handler.log[1].bytesRead, 0);
451   ASSERT_GT(handler.log[1].bytesWritten, 0);
452
453   T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
454 }
455
456 /**
457  * Test (READ | WRITE) when the socket becomes readable first
458  */
459 TEST(EventBaseTest, ReadWrite) {
460   EventBase eb;
461   SocketPair sp;
462
463   // Fill up the write buffer before starting
464   size_t sock0WriteLength = writeUntilFull(sp[0]);
465
466   // Register for read and write events
467   TestHandler handler(&eb, sp[0]);
468   handler.registerHandler(EventHandler::READ_WRITE);
469
470   // Register timeouts to perform a write then a read.
471   ScheduledEvent events[] = {
472     { 10, EventHandler::WRITE, 2345, 0 },
473     { 40, EventHandler::READ, 0, 0 },
474     { 0, 0, 0, 0 },
475   };
476   scheduleEvents(&eb, sp[1], events);
477
478   // Loop
479   TimePoint start;
480   eb.loop();
481   TimePoint end;
482
483   // Since we didn't use the EventHandler::PERSIST flag, the handler should
484   // have only noticed readability, then unregistered itself.  Check that only
485   // one event was logged.
486   ASSERT_EQ(handler.log.size(), 1);
487   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
488   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
489                   milliseconds(events[0].milliseconds));
490   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
491   ASSERT_EQ(handler.log[0].bytesWritten, 0);
492   ASSERT_EQ(events[1].result, sock0WriteLength);
493   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
494 }
495
496 /**
497  * Test (READ | WRITE) when the socket becomes writable first
498  */
499 TEST(EventBaseTest, WriteRead) {
500   EventBase eb;
501   SocketPair sp;
502
503   // Fill up the write buffer before starting
504   size_t sock0WriteLength = writeUntilFull(sp[0]);
505
506   // Register for read and write events
507   TestHandler handler(&eb, sp[0]);
508   handler.registerHandler(EventHandler::READ_WRITE);
509
510   // Register timeouts to perform a read then a write.
511   size_t sock1WriteLength = 2345;
512   ScheduledEvent events[] = {
513     { 10, EventHandler::READ, 0, 0 },
514     { 40, EventHandler::WRITE, sock1WriteLength, 0 },
515     { 0, 0, 0, 0 },
516   };
517   scheduleEvents(&eb, sp[1], events);
518
519   // Loop
520   TimePoint start;
521   eb.loop();
522   TimePoint end;
523
524   // Since we didn't use the EventHandler::PERSIST flag, the handler should
525   // have only noticed writability, then unregistered itself.  Check that only
526   // one event was logged.
527   ASSERT_EQ(handler.log.size(), 1);
528   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
529   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
530                   milliseconds(events[0].milliseconds));
531   ASSERT_EQ(handler.log[0].bytesRead, 0);
532   ASSERT_GT(handler.log[0].bytesWritten, 0);
533   ASSERT_EQ(events[0].result, sock0WriteLength);
534   ASSERT_EQ(events[1].result, sock1WriteLength);
535   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
536
537   // Make sure the written data is still waiting to be read.
538   size_t bytesRemaining = readUntilEmpty(sp[0]);
539   ASSERT_EQ(bytesRemaining, events[1].length);
540 }
541
542 /**
543  * Test (READ | WRITE) when the socket becomes readable and writable
544  * at the same time.
545  */
546 TEST(EventBaseTest, ReadWriteSimultaneous) {
547   EventBase eb;
548   SocketPair sp;
549
550   // Fill up the write buffer before starting
551   size_t sock0WriteLength = writeUntilFull(sp[0]);
552
553   // Register for read and write events
554   TestHandler handler(&eb, sp[0]);
555   handler.registerHandler(EventHandler::READ_WRITE);
556
557   // Register a timeout to perform a read and write together
558   ScheduledEvent events[] = {
559     { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
560     { 0, 0, 0, 0 },
561   };
562   scheduleEvents(&eb, sp[1], events);
563
564   // Loop
565   TimePoint start;
566   eb.loop();
567   TimePoint end;
568
569   // It's not strictly required that the EventBase register us about both
570   // events in the same call.  So, it's possible that if the EventBase
571   // implementation changes this test could start failing, and it wouldn't be
572   // considered breaking the API.  However for now it's nice to exercise this
573   // code path.
574   ASSERT_EQ(handler.log.size(), 1);
575   ASSERT_EQ(handler.log[0].events,
576                     EventHandler::READ | EventHandler::WRITE);
577   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
578                   milliseconds(events[0].milliseconds));
579   ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
580   ASSERT_GT(handler.log[0].bytesWritten, 0);
581   T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
582 }
583
584 /**
585  * Test (READ | WRITE | PERSIST)
586  */
587 TEST(EventBaseTest, ReadWritePersist) {
588   EventBase eb;
589   SocketPair sp;
590
591   // Register for read and write events
592   TestHandler handler(&eb, sp[0]);
593   handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
594                           EventHandler::PERSIST);
595
596   // Register timeouts to perform several reads and writes
597   ScheduledEvent events[] = {
598     { 10, EventHandler::WRITE, 2345, 0 },
599     { 20, EventHandler::READ, 0, 0 },
600     { 35, EventHandler::WRITE, 200, 0 },
601     { 45, EventHandler::WRITE, 15, 0 },
602     { 55, EventHandler::READ, 0, 0 },
603     { 120, EventHandler::WRITE, 2345, 0 },
604     { 0, 0, 0, 0 },
605   };
606   scheduleEvents(&eb, sp[1], events);
607
608   // Schedule a timeout to unregister the handler
609   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
610
611   // Loop
612   TimePoint start;
613   eb.loop();
614   TimePoint end;
615
616   ASSERT_EQ(handler.log.size(), 6);
617
618   // Since we didn't fill up the write buffer immediately, there should
619   // be an immediate event for writability.
620   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
621   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
622   ASSERT_EQ(handler.log[0].bytesRead, 0);
623   ASSERT_GT(handler.log[0].bytesWritten, 0);
624
625   // Events 1 through 5 should correspond to the scheduled events
626   for (int n = 1; n < 6; ++n) {
627     ScheduledEvent* event = &events[n - 1];
628     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
629                     milliseconds(event->milliseconds));
630     if (event->events == EventHandler::READ) {
631       ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
632       ASSERT_EQ(handler.log[n].bytesRead, 0);
633       ASSERT_GT(handler.log[n].bytesWritten, 0);
634     } else {
635       ASSERT_EQ(handler.log[n].events, EventHandler::READ);
636       ASSERT_EQ(handler.log[n].bytesRead, event->length);
637       ASSERT_EQ(handler.log[n].bytesWritten, 0);
638     }
639   }
640
641   // The timeout should have unregistered the handler before the last write.
642   // Make sure that data is still waiting to be read
643   size_t bytesRemaining = readUntilEmpty(sp[0]);
644   ASSERT_EQ(bytesRemaining, events[5].length);
645 }
646
647
648 class PartialReadHandler : public TestHandler {
649  public:
650   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
651     : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
652
653   void handlerReady(uint16_t events) noexcept override {
654     assert(events == EventHandler::READ);
655     ssize_t bytesRead = readFromFD(fd_, readLength_);
656     log.emplace_back(events, bytesRead, 0);
657   }
658
659  private:
660   int fd_;
661   size_t readLength_;
662 };
663
664 /**
665  * Test reading only part of the available data when a read event is fired.
666  * When PERSIST is used, make sure the handler gets notified again the next
667  * time around the loop.
668  */
669 TEST(EventBaseTest, ReadPartial) {
670   EventBase eb;
671   SocketPair sp;
672
673   // Register for read events
674   size_t readLength = 100;
675   PartialReadHandler handler(&eb, sp[0], readLength);
676   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
677
678   // Register a timeout to perform a single write,
679   // with more data than PartialReadHandler will read at once
680   ScheduledEvent events[] = {
681     { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
682     { 0, 0, 0, 0 },
683   };
684   scheduleEvents(&eb, sp[1], events);
685
686   // Schedule a timeout to unregister the handler
687   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
688
689   // Loop
690   TimePoint start;
691   eb.loop();
692   TimePoint end;
693
694   ASSERT_EQ(handler.log.size(), 4);
695
696   // The first 3 invocations should read readLength bytes each
697   for (int n = 0; n < 3; ++n) {
698     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
699     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
700                     milliseconds(events[0].milliseconds));
701     ASSERT_EQ(handler.log[n].bytesRead, readLength);
702     ASSERT_EQ(handler.log[n].bytesWritten, 0);
703   }
704   // The last read only has readLength/2 bytes
705   ASSERT_EQ(handler.log[3].events, EventHandler::READ);
706   T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
707                   milliseconds(events[0].milliseconds));
708   ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
709   ASSERT_EQ(handler.log[3].bytesWritten, 0);
710 }
711
712
713 class PartialWriteHandler : public TestHandler {
714  public:
715   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
716     : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
717
718   void handlerReady(uint16_t events) noexcept override {
719     assert(events == EventHandler::WRITE);
720     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
721     log.emplace_back(events, 0, bytesWritten);
722   }
723
724  private:
725   int fd_;
726   size_t writeLength_;
727 };
728
729 /**
730  * Test writing without completely filling up the write buffer when the fd
731  * becomes writable.  When PERSIST is used, make sure the handler gets
732  * notified again the next time around the loop.
733  */
734 TEST(EventBaseTest, WritePartial) {
735   EventBase eb;
736   SocketPair sp;
737
738   // Fill up the write buffer before starting
739   size_t initialBytesWritten = writeUntilFull(sp[0]);
740
741   // Register for write events
742   size_t writeLength = 100;
743   PartialWriteHandler handler(&eb, sp[0], writeLength);
744   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
745
746   // Register a timeout to read, so that more data can be written
747   ScheduledEvent events[] = {
748     { 10, EventHandler::READ, 0, 0 },
749     { 0, 0, 0, 0 },
750   };
751   scheduleEvents(&eb, sp[1], events);
752
753   // Schedule a timeout to unregister the handler
754   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
755
756   // Loop
757   TimePoint start;
758   eb.loop();
759   TimePoint end;
760
761   // Depending on how big the socket buffer is, there will be multiple writes
762   // Only check the first 5
763   int numChecked = 5;
764   ASSERT_GE(handler.log.size(), numChecked);
765   ASSERT_EQ(events[0].result, initialBytesWritten);
766
767   // The first 3 invocations should read writeLength bytes each
768   for (int n = 0; n < numChecked; ++n) {
769     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
770     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
771                     milliseconds(events[0].milliseconds));
772     ASSERT_EQ(handler.log[n].bytesRead, 0);
773     ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
774   }
775 }
776
777
778 /**
779  * Test destroying a registered EventHandler
780  */
781 TEST(EventBaseTest, DestroyHandler) {
782   class DestroyHandler : public AsyncTimeout {
783    public:
784     DestroyHandler(EventBase* eb, EventHandler* h)
785       : AsyncTimeout(eb)
786       , handler_(h) {}
787
788     void timeoutExpired() noexcept override { delete handler_; }
789
790    private:
791     EventHandler* handler_;
792   };
793
794   EventBase eb;
795   SocketPair sp;
796
797   // Fill up the write buffer before starting
798   size_t initialBytesWritten = writeUntilFull(sp[0]);
799
800   // Register for write events
801   TestHandler* handler = new TestHandler(&eb, sp[0]);
802   handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
803
804   // After 10ms, read some data, so that the handler
805   // will be notified that it can write.
806   eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
807                    10);
808
809   // Start a timer to destroy the handler after 25ms
810   // This mainly just makes sure the code doesn't break or assert
811   DestroyHandler dh(&eb, handler);
812   dh.scheduleTimeout(25);
813
814   TimePoint start;
815   eb.loop();
816   TimePoint end;
817
818   // Make sure the EventHandler was uninstalled properly when it was
819   // destroyed, and the EventBase loop exited
820   T_CHECK_TIMEOUT(start, end, milliseconds(25));
821
822   // Make sure that the handler wrote data to the socket
823   // before it was destroyed
824   size_t bytesRemaining = readUntilEmpty(sp[1]);
825   ASSERT_GT(bytesRemaining, 0);
826 }
827
828
829 ///////////////////////////////////////////////////////////////////////////
830 // Tests for timeout events
831 ///////////////////////////////////////////////////////////////////////////
832
833 TEST(EventBaseTest, RunAfterDelay) {
834   EventBase eb;
835
836   TimePoint timestamp1(false);
837   TimePoint timestamp2(false);
838   TimePoint timestamp3(false);
839   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
840   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
841   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
842
843   TimePoint start;
844   eb.loop();
845   TimePoint end;
846
847   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
848   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
849   T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
850   T_CHECK_TIMEOUT(start, end, milliseconds(40));
851 }
852
853 /**
854  * Test the behavior of tryRunAfterDelay() when some timeouts are
855  * still scheduled when the EventBase is destroyed.
856  */
857 TEST(EventBaseTest, RunAfterDelayDestruction) {
858   TimePoint timestamp1(false);
859   TimePoint timestamp2(false);
860   TimePoint timestamp3(false);
861   TimePoint timestamp4(false);
862   TimePoint start(false);
863   TimePoint end(false);
864
865   {
866     EventBase eb;
867
868     // Run two normal timeouts
869     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
870     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
871
872     // Schedule a timeout to stop the event loop after 40ms
873     eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
874
875     // Schedule 2 timeouts that would fire after the event loop stops
876     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
877     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
878
879     start.reset();
880     eb.loop();
881     end.reset();
882   }
883
884   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
885   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
886   T_CHECK_TIMEOUT(start, end, milliseconds(40));
887
888   ASSERT_TRUE(timestamp3.isUnset());
889   ASSERT_TRUE(timestamp4.isUnset());
890
891   // Ideally this test should be run under valgrind to ensure that no
892   // memory is leaked.
893 }
894
895 class TestTimeout : public AsyncTimeout {
896  public:
897   explicit TestTimeout(EventBase* eventBase)
898     : AsyncTimeout(eventBase)
899     , timestamp(false) {}
900
901   void timeoutExpired() noexcept override { timestamp.reset(); }
902
903   TimePoint timestamp;
904 };
905
906 TEST(EventBaseTest, BasicTimeouts) {
907   EventBase eb;
908
909   TestTimeout t1(&eb);
910   TestTimeout t2(&eb);
911   TestTimeout t3(&eb);
912   t1.scheduleTimeout(10);
913   t2.scheduleTimeout(20);
914   t3.scheduleTimeout(40);
915
916   TimePoint start;
917   eb.loop();
918   TimePoint end;
919
920   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
921   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
922   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
923   T_CHECK_TIMEOUT(start, end, milliseconds(40));
924 }
925
926 class ReschedulingTimeout : public AsyncTimeout {
927  public:
928   ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
929     : AsyncTimeout(evb)
930     , timeouts_(timeouts)
931     , iterator_(timeouts_.begin()) {}
932
933   void start() {
934     reschedule();
935   }
936
937   void timeoutExpired() noexcept override {
938     timestamps.emplace_back();
939     reschedule();
940   }
941
942   void reschedule() {
943     if (iterator_ != timeouts_.end()) {
944       uint32_t timeout = *iterator_;
945       ++iterator_;
946       scheduleTimeout(timeout);
947     }
948   }
949
950   vector<TimePoint> timestamps;
951
952  private:
953   vector<uint32_t> timeouts_;
954   vector<uint32_t>::const_iterator iterator_;
955 };
956
957 /**
958  * Test rescheduling the same timeout multiple times
959  */
960 TEST(EventBaseTest, ReuseTimeout) {
961   EventBase eb;
962
963   vector<uint32_t> timeouts;
964   timeouts.push_back(10);
965   timeouts.push_back(30);
966   timeouts.push_back(15);
967
968   ReschedulingTimeout t(&eb, timeouts);
969   t.start();
970
971   TimePoint start;
972   eb.loop();
973   TimePoint end;
974
975   // Use a higher tolerance than usual.  We're waiting on 3 timeouts
976   // consecutively.  In general, each timeout may go over by a few
977   // milliseconds, and we're tripling this error by witing on 3 timeouts.
978   milliseconds tolerance{6};
979
980   ASSERT_EQ(timeouts.size(), t.timestamps.size());
981   uint32_t total = 0;
982   for (size_t n = 0; n < timeouts.size(); ++n) {
983     total += timeouts[n];
984     T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
985   }
986   T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
987 }
988
989 /**
990  * Test rescheduling a timeout before it has fired
991  */
992 TEST(EventBaseTest, RescheduleTimeout) {
993   EventBase eb;
994
995   TestTimeout t1(&eb);
996   TestTimeout t2(&eb);
997   TestTimeout t3(&eb);
998
999   t1.scheduleTimeout(15);
1000   t2.scheduleTimeout(30);
1001   t3.scheduleTimeout(30);
1002
1003   auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1004       &AsyncTimeout::scheduleTimeout);
1005
1006   // after 10ms, reschedule t2 to run sooner than originally scheduled
1007   eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1008   // after 10ms, reschedule t3 to run later than originally scheduled
1009   eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1010
1011   TimePoint start;
1012   eb.loop();
1013   TimePoint end;
1014
1015   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1016   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1017   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1018   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1019 }
1020
1021 /**
1022  * Test cancelling a timeout
1023  */
1024 TEST(EventBaseTest, CancelTimeout) {
1025   EventBase eb;
1026
1027   vector<uint32_t> timeouts;
1028   timeouts.push_back(10);
1029   timeouts.push_back(30);
1030   timeouts.push_back(25);
1031
1032   ReschedulingTimeout t(&eb, timeouts);
1033   t.start();
1034   eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1035
1036   TimePoint start;
1037   eb.loop();
1038   TimePoint end;
1039
1040   ASSERT_EQ(t.timestamps.size(), 2);
1041   T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1042   T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1043   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1044 }
1045
1046 /**
1047  * Test destroying a scheduled timeout object
1048  */
1049 TEST(EventBaseTest, DestroyTimeout) {
1050   class DestroyTimeout : public AsyncTimeout {
1051    public:
1052     DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1053       : AsyncTimeout(eb)
1054       , timeout_(t) {}
1055
1056     void timeoutExpired() noexcept override { delete timeout_; }
1057
1058    private:
1059     AsyncTimeout* timeout_;
1060   };
1061
1062   EventBase eb;
1063
1064   TestTimeout* t1 = new TestTimeout(&eb);
1065   t1->scheduleTimeout(30);
1066
1067   DestroyTimeout dt(&eb, t1);
1068   dt.scheduleTimeout(10);
1069
1070   TimePoint start;
1071   eb.loop();
1072   TimePoint end;
1073
1074   T_CHECK_TIMEOUT(start, end, milliseconds(10));
1075 }
1076
1077
1078 ///////////////////////////////////////////////////////////////////////////
1079 // Test for runInThreadTestFunc()
1080 ///////////////////////////////////////////////////////////////////////////
1081
1082 struct RunInThreadData {
1083   RunInThreadData(int numThreads, int opsPerThread)
1084     : opsPerThread(opsPerThread)
1085     , opsToGo(numThreads*opsPerThread) {}
1086
1087   EventBase evb;
1088   deque< pair<int, int> > values;
1089
1090   int opsPerThread;
1091   int opsToGo;
1092 };
1093
1094 struct RunInThreadArg {
1095   RunInThreadArg(RunInThreadData* data,
1096                  int threadId,
1097                  int value)
1098     : data(data)
1099     , thread(threadId)
1100     , value(value) {}
1101
1102   RunInThreadData* data;
1103   int thread;
1104   int value;
1105 };
1106
1107 void runInThreadTestFunc(RunInThreadArg* arg) {
1108   arg->data->values.emplace_back(arg->thread, arg->value);
1109   RunInThreadData* data = arg->data;
1110   delete arg;
1111
1112   if(--data->opsToGo == 0) {
1113     // Break out of the event base loop if we are the last thread running
1114     data->evb.terminateLoopSoon();
1115   }
1116 }
1117
1118 TEST(EventBaseTest, RunInThread) {
1119   constexpr uint32_t numThreads = 50;
1120   constexpr uint32_t opsPerThread = 100;
1121   RunInThreadData data(numThreads, opsPerThread);
1122
1123   deque<std::thread> threads;
1124   SCOPE_EXIT {
1125     // Wait on all of the threads.
1126     for (auto& thread : threads) {
1127       thread.join();
1128     }
1129   };
1130
1131   for (uint32_t i = 0; i < numThreads; ++i) {
1132     threads.emplace_back([i, &data] {
1133         for (int n = 0; n < data.opsPerThread; ++n) {
1134           RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1135           data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1136           usleep(10);
1137         }
1138       });
1139   }
1140
1141   // Add a timeout event to run after 3 seconds.
1142   // Otherwise loop() will return immediately since there are no events to run.
1143   // Once the last thread exits, it will stop the loop().  However, this
1144   // timeout also stops the loop in case there is a bug performing the normal
1145   // stop.
1146   data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1147                          3000);
1148
1149   TimePoint start;
1150   data.evb.loop();
1151   TimePoint end;
1152
1153   // Verify that the loop exited because all threads finished and requested it
1154   // to stop.  This should happen much sooner than the 3 second timeout.
1155   // Assert that it happens in under a second.  (This is still tons of extra
1156   // padding.)
1157
1158   auto timeTaken = std::chrono::duration_cast<milliseconds>(
1159     end.getTime() - start.getTime());
1160   ASSERT_LT(timeTaken.count(), 1000);
1161   VLOG(11) << "Time taken: " << timeTaken.count();
1162
1163   // Verify that we have all of the events from every thread
1164   int expectedValues[numThreads];
1165   for (uint32_t n = 0; n < numThreads; ++n) {
1166     expectedValues[n] = 0;
1167   }
1168   for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1169        it != data.values.end();
1170        ++it) {
1171     int threadID = it->first;
1172     int value = it->second;
1173     ASSERT_EQ(expectedValues[threadID], value);
1174     ++expectedValues[threadID];
1175   }
1176   for (uint32_t n = 0; n < numThreads; ++n) {
1177     ASSERT_EQ(expectedValues[n], opsPerThread);
1178   }
1179 }
1180
1181 //  This test simulates some calls, and verifies that the waiting happens by
1182 //  triggering what otherwise would be race conditions, and trying to detect
1183 //  whether any of the race conditions happened.
1184 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1185   const size_t c = 256;
1186   vector<unique_ptr<atomic<size_t>>> atoms(c);
1187   for (size_t i = 0; i < c; ++i) {
1188     auto& atom = atoms.at(i);
1189     atom = make_unique<atomic<size_t>>(0);
1190   }
1191   vector<thread> threads;
1192   for (size_t i = 0; i < c; ++i) {
1193     threads.emplace_back([&atoms, i] {
1194       EventBase eb;
1195       auto& atom = *atoms.at(i);
1196       auto ebth = thread([&] { eb.loopForever(); });
1197       eb.waitUntilRunning();
1198       eb.runInEventBaseThreadAndWait([&] {
1199         size_t x = 0;
1200         atom.compare_exchange_weak(
1201             x, 1, std::memory_order_release, std::memory_order_relaxed);
1202       });
1203       size_t x = 0;
1204       atom.compare_exchange_weak(
1205           x, 2, std::memory_order_release, std::memory_order_relaxed);
1206       eb.terminateLoopSoon();
1207       ebth.join();
1208     });
1209   }
1210   for (size_t i = 0; i < c; ++i) {
1211     auto& th = threads.at(i);
1212     th.join();
1213   }
1214   size_t sum = 0;
1215   for (auto& atom : atoms) sum += *atom;
1216   EXPECT_EQ(c, sum);
1217 }
1218
1219 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1220   EventBase eb;
1221   thread th(&EventBase::loopForever, &eb);
1222   SCOPE_EXIT {
1223     eb.terminateLoopSoon();
1224     th.join();
1225   };
1226   auto mutated = false;
1227   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1228       mutated = true;
1229   });
1230   EXPECT_TRUE(mutated);
1231 }
1232
1233 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1234   EventBase eb;
1235   thread th(&EventBase::loopForever, &eb);
1236   SCOPE_EXIT {
1237     eb.terminateLoopSoon();
1238     th.join();
1239   };
1240   eb.runInEventBaseThreadAndWait([&] {
1241       auto mutated = false;
1242       eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1243           mutated = true;
1244       });
1245       EXPECT_TRUE(mutated);
1246   });
1247 }
1248
1249 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1250   EventBase eb;
1251   auto mutated = false;
1252   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1253       mutated = true;
1254     });
1255   EXPECT_TRUE(mutated);
1256 }
1257
1258 ///////////////////////////////////////////////////////////////////////////
1259 // Tests for runInLoop()
1260 ///////////////////////////////////////////////////////////////////////////
1261
1262 class CountedLoopCallback : public EventBase::LoopCallback {
1263  public:
1264   CountedLoopCallback(EventBase* eventBase,
1265                       unsigned int count,
1266                       std::function<void()> action =
1267                         std::function<void()>())
1268     : eventBase_(eventBase)
1269     , count_(count)
1270     , action_(action) {}
1271
1272   void runLoopCallback() noexcept override {
1273     --count_;
1274     if (count_ > 0) {
1275       eventBase_->runInLoop(this);
1276     } else if (action_) {
1277       action_();
1278     }
1279   }
1280
1281   unsigned int getCount() const {
1282     return count_;
1283   }
1284
1285  private:
1286   EventBase* eventBase_;
1287   unsigned int count_;
1288   std::function<void()> action_;
1289 };
1290
1291 // Test that EventBase::loop() doesn't exit while there are
1292 // still LoopCallbacks remaining to be invoked.
1293 TEST(EventBaseTest, RepeatedRunInLoop) {
1294   EventBase eventBase;
1295
1296   CountedLoopCallback c(&eventBase, 10);
1297   eventBase.runInLoop(&c);
1298   // The callback shouldn't have run immediately
1299   ASSERT_EQ(c.getCount(), 10);
1300   eventBase.loop();
1301
1302   // loop() should loop until the CountedLoopCallback stops
1303   // re-installing itself.
1304   ASSERT_EQ(c.getCount(), 0);
1305 }
1306
1307 // Test that EventBase::loop() works as expected without time measurements.
1308 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1309   EventBase eventBase(false);
1310
1311   CountedLoopCallback c(&eventBase, 10);
1312   eventBase.runInLoop(&c);
1313   // The callback shouldn't have run immediately
1314   ASSERT_EQ(c.getCount(), 10);
1315   eventBase.loop();
1316
1317   // loop() should loop until the CountedLoopCallback stops
1318   // re-installing itself.
1319   ASSERT_EQ(c.getCount(), 0);
1320 }
1321
1322 // Test runInLoop() calls with terminateLoopSoon()
1323 TEST(EventBaseTest, RunInLoopStopLoop) {
1324   EventBase eventBase;
1325
1326   CountedLoopCallback c1(&eventBase, 20);
1327   CountedLoopCallback c2(&eventBase, 10,
1328                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1329
1330   eventBase.runInLoop(&c1);
1331   eventBase.runInLoop(&c2);
1332   ASSERT_EQ(c1.getCount(), 20);
1333   ASSERT_EQ(c2.getCount(), 10);
1334
1335   eventBase.loopForever();
1336
1337   // c2 should have stopped the loop after 10 iterations
1338   ASSERT_EQ(c2.getCount(), 0);
1339
1340   // We allow the EventBase to run the loop callbacks in whatever order it
1341   // chooses.  We'll accept c1's count being either 10 (if the loop terminated
1342   // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1343   // before c1 ran).
1344   //
1345   // (With the current code, c1 will always run 10 times, but we don't consider
1346   // this a hard API requirement.)
1347   ASSERT_GE(c1.getCount(), 10);
1348   ASSERT_LE(c1.getCount(), 11);
1349 }
1350
1351 TEST(EventBaseTest, TryRunningAfterTerminate) {
1352   EventBase eventBase;
1353   CountedLoopCallback c1(&eventBase, 1,
1354                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1355   eventBase.runInLoop(&c1);
1356   eventBase.loopForever();
1357   bool ran = false;
1358   eventBase.runInEventBaseThread([&]() {
1359     ran = true;
1360   });
1361
1362   ASSERT_FALSE(ran);
1363 }
1364
1365 // Test cancelling runInLoop() callbacks
1366 TEST(EventBaseTest, CancelRunInLoop) {
1367   EventBase eventBase;
1368
1369   CountedLoopCallback c1(&eventBase, 20);
1370   CountedLoopCallback c2(&eventBase, 20);
1371   CountedLoopCallback c3(&eventBase, 20);
1372
1373   std::function<void()> cancelC1Action =
1374     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1375   std::function<void()> cancelC2Action =
1376     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1377
1378   CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1379   CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1380
1381   // Install cancelC1 after c1
1382   eventBase.runInLoop(&c1);
1383   eventBase.runInLoop(&cancelC1);
1384
1385   // Install cancelC2 before c2
1386   eventBase.runInLoop(&cancelC2);
1387   eventBase.runInLoop(&c2);
1388
1389   // Install c3
1390   eventBase.runInLoop(&c3);
1391
1392   ASSERT_EQ(c1.getCount(), 20);
1393   ASSERT_EQ(c2.getCount(), 20);
1394   ASSERT_EQ(c3.getCount(), 20);
1395   ASSERT_EQ(cancelC1.getCount(), 10);
1396   ASSERT_EQ(cancelC2.getCount(), 10);
1397
1398   // Run the loop
1399   eventBase.loop();
1400
1401   // cancelC1 and cancelC3 should have both fired after 10 iterations and
1402   // stopped re-installing themselves
1403   ASSERT_EQ(cancelC1.getCount(), 0);
1404   ASSERT_EQ(cancelC2.getCount(), 0);
1405   // c3 should have continued on for the full 20 iterations
1406   ASSERT_EQ(c3.getCount(), 0);
1407
1408   // c1 and c2 should have both been cancelled on the 10th iteration.
1409   //
1410   // Callbacks are always run in the order they are installed,
1411   // so c1 should have fired 10 times, and been canceled after it ran on the
1412   // 10th iteration.  c2 should have only fired 9 times, because cancelC2 will
1413   // have run before it on the 10th iteration, and cancelled it before it
1414   // fired.
1415   ASSERT_EQ(c1.getCount(), 10);
1416   ASSERT_EQ(c2.getCount(), 11);
1417 }
1418
1419 class TerminateTestCallback : public EventBase::LoopCallback,
1420                               public EventHandler {
1421  public:
1422   TerminateTestCallback(EventBase* eventBase, int fd)
1423     : EventHandler(eventBase, fd),
1424       eventBase_(eventBase),
1425       loopInvocations_(0),
1426       maxLoopInvocations_(0),
1427       eventInvocations_(0),
1428       maxEventInvocations_(0) {}
1429
1430   void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1431     loopInvocations_ = 0;
1432     maxLoopInvocations_ = maxLoopInvocations;
1433     eventInvocations_ = 0;
1434     maxEventInvocations_ = maxEventInvocations;
1435
1436     cancelLoopCallback();
1437     unregisterHandler();
1438   }
1439
1440   void handlerReady(uint16_t /* events */) noexcept override {
1441     // We didn't register with PERSIST, so we will have been automatically
1442     // unregistered already.
1443     ASSERT_FALSE(isHandlerRegistered());
1444
1445     ++eventInvocations_;
1446     if (eventInvocations_ >= maxEventInvocations_) {
1447       return;
1448     }
1449
1450     eventBase_->runInLoop(this);
1451   }
1452   void runLoopCallback() noexcept override {
1453     ++loopInvocations_;
1454     if (loopInvocations_ >= maxLoopInvocations_) {
1455       return;
1456     }
1457
1458     registerHandler(READ);
1459   }
1460
1461   uint32_t getLoopInvocations() const {
1462     return loopInvocations_;
1463   }
1464   uint32_t getEventInvocations() const {
1465     return eventInvocations_;
1466   }
1467
1468  private:
1469   EventBase* eventBase_;
1470   uint32_t loopInvocations_;
1471   uint32_t maxLoopInvocations_;
1472   uint32_t eventInvocations_;
1473   uint32_t maxEventInvocations_;
1474 };
1475
1476 /**
1477  * Test that EventBase::loop() correctly detects when there are no more events
1478  * left to run.
1479  *
1480  * This uses a single callback, which alternates registering itself as a loop
1481  * callback versus a EventHandler callback.  This exercises a regression where
1482  * EventBase::loop() incorrectly exited if there were no more fd handlers
1483  * registered, but a loop callback installed a new fd handler.
1484  */
1485 TEST(EventBaseTest, LoopTermination) {
1486   EventBase eventBase;
1487
1488   // Open a pipe and close the write end,
1489   // so the read endpoint will be readable
1490   int pipeFds[2];
1491   int rc = pipe(pipeFds);
1492   ASSERT_EQ(rc, 0);
1493   close(pipeFds[1]);
1494   TerminateTestCallback callback(&eventBase, pipeFds[0]);
1495
1496   // Test once where the callback will exit after a loop callback
1497   callback.reset(10, 100);
1498   eventBase.runInLoop(&callback);
1499   eventBase.loop();
1500   ASSERT_EQ(callback.getLoopInvocations(), 10);
1501   ASSERT_EQ(callback.getEventInvocations(), 9);
1502
1503   // Test once where the callback will exit after an fd event callback
1504   callback.reset(100, 7);
1505   eventBase.runInLoop(&callback);
1506   eventBase.loop();
1507   ASSERT_EQ(callback.getLoopInvocations(), 7);
1508   ASSERT_EQ(callback.getEventInvocations(), 7);
1509
1510   close(pipeFds[0]);
1511 }
1512
1513 ///////////////////////////////////////////////////////////////////////////
1514 // Tests for latency calculations
1515 ///////////////////////////////////////////////////////////////////////////
1516
1517 class IdleTimeTimeoutSeries : public AsyncTimeout {
1518
1519  public:
1520
1521   explicit IdleTimeTimeoutSeries(EventBase *base,
1522                                  std::deque<std::uint64_t>& timeout) :
1523     AsyncTimeout(base),
1524     timeouts_(0),
1525     timeout_(timeout) {
1526       scheduleTimeout(1);
1527     }
1528
1529     ~IdleTimeTimeoutSeries() override {}
1530
1531     void timeoutExpired() noexcept override {
1532     ++timeouts_;
1533
1534     if(timeout_.empty()){
1535       cancelTimeout();
1536     } else {
1537       uint64_t sleepTime = timeout_.front();
1538       timeout_.pop_front();
1539       if (sleepTime) {
1540         usleep(sleepTime);
1541       }
1542       scheduleTimeout(1);
1543     }
1544   }
1545
1546   int getTimeouts() const {
1547     return timeouts_;
1548   }
1549
1550  private:
1551   int timeouts_;
1552   std::deque<uint64_t>& timeout_;
1553 };
1554
1555 /**
1556  * Verify that idle time is correctly accounted for when decaying our loop
1557  * time.
1558  *
1559  * This works by creating a high loop time (via usleep), expecting a latency
1560  * callback with known value, and then scheduling a timeout for later. This
1561  * later timeout is far enough in the future that the idle time should have
1562  * caused the loop time to decay.
1563  */
1564 TEST(EventBaseTest, IdleTime) {
1565   EventBase eventBase;
1566   eventBase.setLoadAvgMsec(1000);
1567   eventBase.resetLoadAvg(5900.0);
1568   std::deque<uint64_t> timeouts0(4, 8080);
1569   timeouts0.push_front(8000);
1570   timeouts0.push_back(14000);
1571   IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1572   std::deque<uint64_t> timeouts(20, 20);
1573   std::unique_ptr<IdleTimeTimeoutSeries> tos;
1574   int64_t testStart = duration_cast<microseconds>(
1575     std::chrono::steady_clock::now().time_since_epoch()).count();
1576   bool hostOverloaded = false;
1577
1578   int latencyCallbacks = 0;
1579   eventBase.setMaxLatency(6000, [&]() {
1580     ++latencyCallbacks;
1581
1582     switch (latencyCallbacks) {
1583     case 1:
1584       if (tos0.getTimeouts() < 6) {
1585         // This could only happen if the host this test is running
1586         // on is heavily loaded.
1587         int64_t maxLatencyReached = duration_cast<microseconds>(
1588             std::chrono::steady_clock::now().time_since_epoch()).count();
1589         ASSERT_LE(43800, maxLatencyReached - testStart);
1590         hostOverloaded = true;
1591         break;
1592       }
1593       ASSERT_EQ(6, tos0.getTimeouts());
1594       ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1595       ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1596       tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1597       break;
1598
1599     default:
1600       FAIL() << "Unexpected latency callback";
1601       break;
1602     }
1603   });
1604
1605   // Kick things off with an "immedite" timeout
1606   tos0.scheduleTimeout(1);
1607
1608   eventBase.loop();
1609
1610   if (hostOverloaded) {
1611     return;
1612   }
1613
1614   ASSERT_EQ(1, latencyCallbacks);
1615   ASSERT_EQ(7, tos0.getTimeouts());
1616   ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1617   ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1618   ASSERT_TRUE(!!tos);
1619   ASSERT_EQ(21, tos->getTimeouts());
1620 }
1621
1622 /**
1623  * Test that thisLoop functionality works with terminateLoopSoon
1624  */
1625 TEST(EventBaseTest, ThisLoop) {
1626   EventBase eb;
1627   bool runInLoop = false;
1628   bool runThisLoop = false;
1629
1630   eb.runInLoop([&](){
1631       eb.terminateLoopSoon();
1632       eb.runInLoop([&]() {
1633           runInLoop = true;
1634         });
1635       eb.runInLoop([&]() {
1636           runThisLoop = true;
1637         }, true);
1638     }, true);
1639   eb.loopForever();
1640
1641   // Should not work
1642   ASSERT_FALSE(runInLoop);
1643   // Should work with thisLoop
1644   ASSERT_TRUE(runThisLoop);
1645 }
1646
1647 TEST(EventBaseTest, EventBaseThreadLoop) {
1648   EventBase base;
1649   bool ran = false;
1650
1651   base.runInEventBaseThread([&](){
1652     ran = true;
1653   });
1654   base.loop();
1655
1656   ASSERT_EQ(true, ran);
1657 }
1658
1659 TEST(EventBaseTest, EventBaseThreadName) {
1660   EventBase base;
1661   base.setName("foo");
1662   base.loop();
1663
1664 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1665   char name[16];
1666   pthread_getname_np(pthread_self(), name, 16);
1667   ASSERT_EQ(0, strcmp("foo", name));
1668 #endif
1669 }
1670
1671 TEST(EventBaseTest, RunBeforeLoop) {
1672   EventBase base;
1673   CountedLoopCallback cb(&base, 1, [&](){
1674     base.terminateLoopSoon();
1675   });
1676   base.runBeforeLoop(&cb);
1677   base.loopForever();
1678   ASSERT_EQ(cb.getCount(), 0);
1679 }
1680
1681 TEST(EventBaseTest, RunBeforeLoopWait) {
1682   EventBase base;
1683   CountedLoopCallback cb(&base, 1);
1684   base.tryRunAfterDelay([&](){
1685       base.terminateLoopSoon();
1686     }, 500);
1687   base.runBeforeLoop(&cb);
1688   base.loopForever();
1689
1690   // Check that we only ran once, and did not loop multiple times.
1691   ASSERT_EQ(cb.getCount(), 0);
1692 }
1693
1694 class PipeHandler : public EventHandler {
1695 public:
1696   PipeHandler(EventBase* eventBase, int fd)
1697     : EventHandler(eventBase, fd) {}
1698
1699   void handlerReady(uint16_t /* events */) noexcept override { abort(); }
1700 };
1701
1702 TEST(EventBaseTest, StopBeforeLoop) {
1703   EventBase evb;
1704
1705   // Give the evb something to do.
1706   int p[2];
1707   ASSERT_EQ(0, pipe(p));
1708   PipeHandler handler(&evb, p[0]);
1709   handler.registerHandler(EventHandler::READ);
1710
1711   // It's definitely not running yet
1712   evb.terminateLoopSoon();
1713
1714   // let it run, it should exit quickly.
1715   std::thread t([&] { evb.loop(); });
1716   t.join();
1717
1718   handler.unregisterHandler();
1719   close(p[0]);
1720   close(p[1]);
1721
1722   SUCCEED();
1723 }
1724
1725 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1726   bool ran = false;
1727
1728   {
1729     EventBase base;
1730     base.runInEventBaseThread([&](){
1731       ran = true;
1732     });
1733   }
1734
1735   ASSERT_TRUE(ran);
1736 }
1737
1738 TEST(EventBaseTest, LoopKeepAlive) {
1739   EventBase evb;
1740
1741   bool done = false;
1742   std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1743     /* sleep override */ std::this_thread::sleep_for(
1744         std::chrono::milliseconds(100));
1745     evb.runInEventBaseThread(
1746         [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1747   });
1748
1749   evb.loop();
1750
1751   ASSERT_TRUE(done);
1752
1753   t.join();
1754 }
1755
1756 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1757   EventBase evb;
1758
1759   bool done = false;
1760   std::thread t;
1761
1762   evb.runInEventBaseThread([&] {
1763     t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1764       /* sleep override */ std::this_thread::sleep_for(
1765           std::chrono::milliseconds(100));
1766       evb.runInEventBaseThread(
1767           [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1768     });
1769   });
1770
1771   evb.loop();
1772
1773   ASSERT_TRUE(done);
1774
1775   t.join();
1776 }
1777
1778 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1779   std::unique_ptr<EventBase> evb = folly::make_unique<EventBase>();
1780
1781   bool done = false;
1782
1783   std::thread evThread([&] {
1784     evb->loopForever();
1785     evb.reset();
1786     done = true;
1787   });
1788
1789   {
1790     auto* ev = evb.get();
1791     EventBase::LoopKeepAlive keepAlive;
1792     ev->runInEventBaseThreadAndWait(
1793         [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); });
1794     ASSERT_FALSE(done) << "Loop finished before we asked it to";
1795     ev->terminateLoopSoon();
1796     /* sleep override */
1797     std::this_thread::sleep_for(std::chrono::milliseconds(30));
1798     ASSERT_FALSE(done) << "Loop terminated early";
1799     ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{});
1800   }
1801
1802   evThread.join();
1803   ASSERT_TRUE(done);
1804 }
1805
1806 TEST(EventBaseTest, LoopKeepAliveShutdown) {
1807   auto evb = folly::make_unique<EventBase>();
1808
1809   bool done = false;
1810
1811   std::thread t([
1812     &done,
1813     loopKeepAlive = evb->loopKeepAlive(),
1814     evbPtr = evb.get()
1815   ]() mutable {
1816     /* sleep override */ std::this_thread::sleep_for(
1817         std::chrono::milliseconds(100));
1818     evbPtr->runInEventBaseThread(
1819         [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1820   });
1821
1822   evb.reset();
1823
1824   ASSERT_TRUE(done);
1825
1826   t.join();
1827 }
1828
1829 TEST(EventBaseTest, LoopKeepAliveAtomic) {
1830   auto evb = folly::make_unique<EventBase>();
1831
1832   constexpr size_t kNumThreads = 100;
1833   constexpr size_t kNumTasks = 100;
1834
1835   std::vector<std::thread> ts;
1836   std::vector<std::unique_ptr<Baton<>>> batons;
1837   size_t done{0};
1838
1839   for (size_t i = 0; i < kNumThreads; ++i) {
1840     batons.emplace_back(std::make_unique<Baton<>>());
1841   }
1842
1843   for (size_t i = 0; i < kNumThreads; ++i) {
1844     ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
1845       std::vector<EventBase::LoopKeepAlive> keepAlives;
1846       for (size_t i = 0; i < kNumTasks; ++i) {
1847         keepAlives.emplace_back(evbPtr->loopKeepAliveAtomic());
1848       }
1849
1850       batonPtr->post();
1851
1852       /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
1853
1854       for (auto& keepAlive : keepAlives) {
1855         evbPtr->runInEventBaseThread(
1856             [&done, keepAlive = std::move(keepAlive) ]() { ++done; });
1857       }
1858     });
1859   }
1860
1861   for (auto& baton : batons) {
1862     baton->wait();
1863   }
1864
1865   evb.reset();
1866
1867   EXPECT_EQ(kNumThreads * kNumTasks, done);
1868
1869   for (auto& t : ts) {
1870     t.join();
1871   }
1872 }
1873
1874 TEST(EventBaseTest, DrivableExecutorTest) {
1875   folly::Promise<bool> p;
1876   auto f = p.getFuture();
1877   EventBase base;
1878   bool finished = false;
1879
1880   std::thread t([&] {
1881     /* sleep override */
1882     std::this_thread::sleep_for(std::chrono::microseconds(10));
1883     finished = true;
1884     base.runInEventBaseThread([&]() { p.setValue(true); });
1885   });
1886
1887   // Ensure drive does not busy wait
1888   base.drive(); // TODO: fix notification queue init() extra wakeup
1889   base.drive();
1890   EXPECT_TRUE(finished);
1891
1892   folly::Promise<bool> p2;
1893   auto f2 = p2.getFuture();
1894   // Ensure waitVia gets woken up properly, even from
1895   // a separate thread.
1896   base.runAfterDelay([&]() { p2.setValue(true); }, 10);
1897   f2.waitVia(&base);
1898   EXPECT_TRUE(f2.isReady());
1899
1900   t.join();
1901 }
1902
1903 TEST(EventBaseTest, RequestContextTest) {
1904   EventBase evb;
1905   auto defaultCtx = RequestContext::get();
1906
1907   {
1908     RequestContextScopeGuard rctx;
1909     auto context = RequestContext::get();
1910     EXPECT_NE(defaultCtx, context);
1911     evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
1912   }
1913
1914   EXPECT_EQ(defaultCtx, RequestContext::get());
1915   evb.loop();
1916   EXPECT_EQ(defaultCtx, RequestContext::get());
1917 }