Move RequestContext definitions to source files
[folly.git] / folly / io / async / test / EventBaseTest.cpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19
20 #include <folly/Memory.h>
21 #include <folly/ScopeGuard.h>
22
23 #include <folly/io/async/AsyncTimeout.h>
24 #include <folly/io/async/EventBase.h>
25 #include <folly/io/async/EventHandler.h>
26 #include <folly/io/async/test/SocketPair.h>
27 #include <folly/io/async/test/Util.h>
28 #include <folly/portability/Unistd.h>
29
30 #include <atomic>
31 #include <iostream>
32 #include <memory>
33 #include <thread>
34
35 using std::atomic;
36 using std::deque;
37 using std::pair;
38 using std::vector;
39 using std::unique_ptr;
40 using std::thread;
41 using std::make_pair;
42 using std::cerr;
43 using std::endl;
44 using std::chrono::milliseconds;
45 using std::chrono::microseconds;
46 using std::chrono::duration_cast;
47
48 using namespace folly;
49
50 ///////////////////////////////////////////////////////////////////////////
51 // Tests for read and write events
52 ///////////////////////////////////////////////////////////////////////////
53
54 enum { BUF_SIZE = 4096 };
55
56 ssize_t writeToFD(int fd, size_t length) {
57   // write an arbitrary amount of data to the fd
58   auto bufv = vector<char>(length);
59   auto buf = bufv.data();
60   memset(buf, 'a', length);
61   ssize_t rc = write(fd, buf, length);
62   CHECK_EQ(rc, length);
63   return rc;
64 }
65
66 size_t writeUntilFull(int fd) {
67   // Write to the fd until EAGAIN is returned
68   size_t bytesWritten = 0;
69   char buf[BUF_SIZE];
70   memset(buf, 'a', sizeof(buf));
71   while (true) {
72     ssize_t rc = write(fd, buf, sizeof(buf));
73     if (rc < 0) {
74       CHECK_EQ(errno, EAGAIN);
75       break;
76     } else {
77       bytesWritten += rc;
78     }
79   }
80   return bytesWritten;
81 }
82
83 ssize_t readFromFD(int fd, size_t length) {
84   // write an arbitrary amount of data to the fd
85   auto buf = vector<char>(length);
86   return read(fd, buf.data(), length);
87 }
88
89 size_t readUntilEmpty(int fd) {
90   // Read from the fd until EAGAIN is returned
91   char buf[BUF_SIZE];
92   size_t bytesRead = 0;
93   while (true) {
94     int rc = read(fd, buf, sizeof(buf));
95     if (rc == 0) {
96       CHECK(false) << "unexpected EOF";
97     } else if (rc < 0) {
98       CHECK_EQ(errno, EAGAIN);
99       break;
100     } else {
101       bytesRead += rc;
102     }
103   }
104   return bytesRead;
105 }
106
107 void checkReadUntilEmpty(int fd, size_t expectedLength) {
108   ASSERT_EQ(readUntilEmpty(fd), expectedLength);
109 }
110
111 struct ScheduledEvent {
112   int milliseconds;
113   uint16_t events;
114   size_t length;
115   ssize_t result;
116
117   void perform(int fd) {
118     if (events & EventHandler::READ) {
119       if (length == 0) {
120         result = readUntilEmpty(fd);
121       } else {
122         result = readFromFD(fd, length);
123       }
124     }
125     if (events & EventHandler::WRITE) {
126       if (length == 0) {
127         result = writeUntilFull(fd);
128       } else {
129         result = writeToFD(fd, length);
130       }
131     }
132   }
133 };
134
135 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
136   for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
137     eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
138                              ev->milliseconds);
139   }
140 }
141
142 class TestHandler : public EventHandler {
143  public:
144   TestHandler(EventBase* eventBase, int fd)
145     : EventHandler(eventBase, fd), fd_(fd) {}
146
147   void handlerReady(uint16_t events) noexcept override {
148     ssize_t bytesRead = 0;
149     ssize_t bytesWritten = 0;
150     if (events & READ) {
151       // Read all available data, so EventBase will stop calling us
152       // until new data becomes available
153       bytesRead = readUntilEmpty(fd_);
154     }
155     if (events & WRITE) {
156       // Write until the pipe buffer is full, so EventBase will stop calling
157       // us until the other end has read some data
158       bytesWritten = writeUntilFull(fd_);
159     }
160
161     log.emplace_back(events, bytesRead, bytesWritten);
162   }
163
164   struct EventRecord {
165     EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
166       : events(events)
167       , timestamp()
168       , bytesRead(bytesRead)
169       , bytesWritten(bytesWritten) {}
170
171     uint16_t events;
172     TimePoint timestamp;
173     ssize_t bytesRead;
174     ssize_t bytesWritten;
175   };
176
177   deque<EventRecord> log;
178
179  private:
180   int fd_;
181 };
182
183 /**
184  * Test a READ event
185  */
186 TEST(EventBaseTest, ReadEvent) {
187   EventBase eb;
188   SocketPair sp;
189
190   // Register for read events
191   TestHandler handler(&eb, sp[0]);
192   handler.registerHandler(EventHandler::READ);
193
194   // Register timeouts to perform two write events
195   ScheduledEvent events[] = {
196     { 10, EventHandler::WRITE, 2345, 0 },
197     { 160, EventHandler::WRITE, 99, 0 },
198     { 0, 0, 0, 0 },
199   };
200   scheduleEvents(&eb, sp[1], events);
201
202   // Loop
203   TimePoint start;
204   eb.loop();
205   TimePoint end;
206
207   // Since we didn't use the EventHandler::PERSIST flag, the handler should
208   // have received the first read, then unregistered itself.  Check that only
209   // the first chunk of data was received.
210   ASSERT_EQ(handler.log.size(), 1);
211   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
212   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
213                   milliseconds(events[0].milliseconds), milliseconds(90));
214   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
215   ASSERT_EQ(handler.log[0].bytesWritten, 0);
216   T_CHECK_TIMEOUT(start, end,
217                   milliseconds(events[1].milliseconds), milliseconds(30));
218
219   // Make sure the second chunk of data is still waiting to be read.
220   size_t bytesRemaining = readUntilEmpty(sp[0]);
221   ASSERT_EQ(bytesRemaining, events[1].length);
222 }
223
224 /**
225  * Test (READ | PERSIST)
226  */
227 TEST(EventBaseTest, ReadPersist) {
228   EventBase eb;
229   SocketPair sp;
230
231   // Register for read events
232   TestHandler handler(&eb, sp[0]);
233   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
234
235   // Register several timeouts to perform writes
236   ScheduledEvent events[] = {
237     { 10,  EventHandler::WRITE, 1024, 0 },
238     { 20,  EventHandler::WRITE, 2211, 0 },
239     { 30,  EventHandler::WRITE, 4096, 0 },
240     { 100, EventHandler::WRITE, 100,  0 },
241     { 0, 0, 0, 0 },
242   };
243   scheduleEvents(&eb, sp[1], events);
244
245   // Schedule a timeout to unregister the handler after the third write
246   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
247
248   // Loop
249   TimePoint start;
250   eb.loop();
251   TimePoint end;
252
253   // The handler should have received the first 3 events,
254   // then been unregistered after that.
255   ASSERT_EQ(handler.log.size(), 3);
256   for (int n = 0; n < 3; ++n) {
257     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
258     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
259                     milliseconds(events[n].milliseconds));
260     ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
261     ASSERT_EQ(handler.log[n].bytesWritten, 0);
262   }
263   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
264
265   // Make sure the data from the last write is still waiting to be read
266   size_t bytesRemaining = readUntilEmpty(sp[0]);
267   ASSERT_EQ(bytesRemaining, events[3].length);
268 }
269
270 /**
271  * Test registering for READ when the socket is immediately readable
272  */
273 TEST(EventBaseTest, ReadImmediate) {
274   EventBase eb;
275   SocketPair sp;
276
277   // Write some data to the socket so the other end will
278   // be immediately readable
279   size_t dataLength = 1234;
280   writeToFD(sp[1], dataLength);
281
282   // Register for read events
283   TestHandler handler(&eb, sp[0]);
284   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
285
286   // Register a timeout to perform another write
287   ScheduledEvent events[] = {
288     { 10, EventHandler::WRITE, 2345, 0 },
289     { 0, 0, 0, 0 },
290   };
291   scheduleEvents(&eb, sp[1], events);
292
293   // Schedule a timeout to unregister the handler
294   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
295
296   // Loop
297   TimePoint start;
298   eb.loop();
299   TimePoint end;
300
301   ASSERT_EQ(handler.log.size(), 2);
302
303   // There should have been 1 event for immediate readability
304   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
305   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
306   ASSERT_EQ(handler.log[0].bytesRead, dataLength);
307   ASSERT_EQ(handler.log[0].bytesWritten, 0);
308
309   // There should be another event after the timeout wrote more data
310   ASSERT_EQ(handler.log[1].events, EventHandler::READ);
311   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
312                   milliseconds(events[0].milliseconds));
313   ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
314   ASSERT_EQ(handler.log[1].bytesWritten, 0);
315
316   T_CHECK_TIMEOUT(start, end, milliseconds(20));
317 }
318
319 /**
320  * Test a WRITE event
321  */
322 TEST(EventBaseTest, WriteEvent) {
323   EventBase eb;
324   SocketPair sp;
325
326   // Fill up the write buffer before starting
327   size_t initialBytesWritten = writeUntilFull(sp[0]);
328
329   // Register for write events
330   TestHandler handler(&eb, sp[0]);
331   handler.registerHandler(EventHandler::WRITE);
332
333   // Register timeouts to perform two reads
334   ScheduledEvent events[] = {
335     { 10, EventHandler::READ, 0, 0 },
336     { 60, EventHandler::READ, 0, 0 },
337     { 0, 0, 0, 0 },
338   };
339   scheduleEvents(&eb, sp[1], events);
340
341   // Loop
342   TimePoint start;
343   eb.loop();
344   TimePoint end;
345
346   // Since we didn't use the EventHandler::PERSIST flag, the handler should
347   // have only been able to write once, then unregistered itself.
348   ASSERT_EQ(handler.log.size(), 1);
349   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
350   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
351                   milliseconds(events[0].milliseconds));
352   ASSERT_EQ(handler.log[0].bytesRead, 0);
353   ASSERT_GT(handler.log[0].bytesWritten, 0);
354   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
355
356   ASSERT_EQ(events[0].result, initialBytesWritten);
357   ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
358 }
359
360 /**
361  * Test (WRITE | PERSIST)
362  */
363 TEST(EventBaseTest, WritePersist) {
364   EventBase eb;
365   SocketPair sp;
366
367   // Fill up the write buffer before starting
368   size_t initialBytesWritten = writeUntilFull(sp[0]);
369
370   // Register for write events
371   TestHandler handler(&eb, sp[0]);
372   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
373
374   // Register several timeouts to read from the socket at several intervals
375   ScheduledEvent events[] = {
376     { 10,  EventHandler::READ, 0, 0 },
377     { 40,  EventHandler::READ, 0, 0 },
378     { 70,  EventHandler::READ, 0, 0 },
379     { 100, EventHandler::READ, 0, 0 },
380     { 0, 0, 0, 0 },
381   };
382   scheduleEvents(&eb, sp[1], events);
383
384   // Schedule a timeout to unregister the handler after the third read
385   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
386
387   // Loop
388   TimePoint start;
389   eb.loop();
390   TimePoint end;
391
392   // The handler should have received the first 3 events,
393   // then been unregistered after that.
394   ASSERT_EQ(handler.log.size(), 3);
395   ASSERT_EQ(events[0].result, initialBytesWritten);
396   for (int n = 0; n < 3; ++n) {
397     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
398     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
399                     milliseconds(events[n].milliseconds));
400     ASSERT_EQ(handler.log[n].bytesRead, 0);
401     ASSERT_GT(handler.log[n].bytesWritten, 0);
402     ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
403   }
404   T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
405 }
406
407 /**
408  * Test registering for WRITE when the socket is immediately writable
409  */
410 TEST(EventBaseTest, WriteImmediate) {
411   EventBase eb;
412   SocketPair sp;
413
414   // Register for write events
415   TestHandler handler(&eb, sp[0]);
416   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
417
418   // Register a timeout to perform a read
419   ScheduledEvent events[] = {
420     { 10, EventHandler::READ, 0, 0 },
421     { 0, 0, 0, 0 },
422   };
423   scheduleEvents(&eb, sp[1], events);
424
425   // Schedule a timeout to unregister the handler
426   int64_t unregisterTimeout = 40;
427   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
428                    unregisterTimeout);
429
430   // Loop
431   TimePoint start;
432   eb.loop();
433   TimePoint end;
434
435   ASSERT_EQ(handler.log.size(), 2);
436
437   // Since the socket buffer was initially empty,
438   // there should have been 1 event for immediate writability
439   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
440   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
441   ASSERT_EQ(handler.log[0].bytesRead, 0);
442   ASSERT_GT(handler.log[0].bytesWritten, 0);
443
444   // There should be another event after the timeout wrote more data
445   ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
446   T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
447                   milliseconds(events[0].milliseconds));
448   ASSERT_EQ(handler.log[1].bytesRead, 0);
449   ASSERT_GT(handler.log[1].bytesWritten, 0);
450
451   T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
452 }
453
454 /**
455  * Test (READ | WRITE) when the socket becomes readable first
456  */
457 TEST(EventBaseTest, ReadWrite) {
458   EventBase eb;
459   SocketPair sp;
460
461   // Fill up the write buffer before starting
462   size_t sock0WriteLength = writeUntilFull(sp[0]);
463
464   // Register for read and write events
465   TestHandler handler(&eb, sp[0]);
466   handler.registerHandler(EventHandler::READ_WRITE);
467
468   // Register timeouts to perform a write then a read.
469   ScheduledEvent events[] = {
470     { 10, EventHandler::WRITE, 2345, 0 },
471     { 40, EventHandler::READ, 0, 0 },
472     { 0, 0, 0, 0 },
473   };
474   scheduleEvents(&eb, sp[1], events);
475
476   // Loop
477   TimePoint start;
478   eb.loop();
479   TimePoint end;
480
481   // Since we didn't use the EventHandler::PERSIST flag, the handler should
482   // have only noticed readability, then unregistered itself.  Check that only
483   // one event was logged.
484   ASSERT_EQ(handler.log.size(), 1);
485   ASSERT_EQ(handler.log[0].events, EventHandler::READ);
486   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
487                   milliseconds(events[0].milliseconds));
488   ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
489   ASSERT_EQ(handler.log[0].bytesWritten, 0);
490   ASSERT_EQ(events[1].result, sock0WriteLength);
491   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
492 }
493
494 /**
495  * Test (READ | WRITE) when the socket becomes writable first
496  */
497 TEST(EventBaseTest, WriteRead) {
498   EventBase eb;
499   SocketPair sp;
500
501   // Fill up the write buffer before starting
502   size_t sock0WriteLength = writeUntilFull(sp[0]);
503
504   // Register for read and write events
505   TestHandler handler(&eb, sp[0]);
506   handler.registerHandler(EventHandler::READ_WRITE);
507
508   // Register timeouts to perform a read then a write.
509   size_t sock1WriteLength = 2345;
510   ScheduledEvent events[] = {
511     { 10, EventHandler::READ, 0, 0 },
512     { 40, EventHandler::WRITE, sock1WriteLength, 0 },
513     { 0, 0, 0, 0 },
514   };
515   scheduleEvents(&eb, sp[1], events);
516
517   // Loop
518   TimePoint start;
519   eb.loop();
520   TimePoint end;
521
522   // Since we didn't use the EventHandler::PERSIST flag, the handler should
523   // have only noticed writability, then unregistered itself.  Check that only
524   // one event was logged.
525   ASSERT_EQ(handler.log.size(), 1);
526   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
527   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
528                   milliseconds(events[0].milliseconds));
529   ASSERT_EQ(handler.log[0].bytesRead, 0);
530   ASSERT_GT(handler.log[0].bytesWritten, 0);
531   ASSERT_EQ(events[0].result, sock0WriteLength);
532   ASSERT_EQ(events[1].result, sock1WriteLength);
533   T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
534
535   // Make sure the written data is still waiting to be read.
536   size_t bytesRemaining = readUntilEmpty(sp[0]);
537   ASSERT_EQ(bytesRemaining, events[1].length);
538 }
539
540 /**
541  * Test (READ | WRITE) when the socket becomes readable and writable
542  * at the same time.
543  */
544 TEST(EventBaseTest, ReadWriteSimultaneous) {
545   EventBase eb;
546   SocketPair sp;
547
548   // Fill up the write buffer before starting
549   size_t sock0WriteLength = writeUntilFull(sp[0]);
550
551   // Register for read and write events
552   TestHandler handler(&eb, sp[0]);
553   handler.registerHandler(EventHandler::READ_WRITE);
554
555   // Register a timeout to perform a read and write together
556   ScheduledEvent events[] = {
557     { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
558     { 0, 0, 0, 0 },
559   };
560   scheduleEvents(&eb, sp[1], events);
561
562   // Loop
563   TimePoint start;
564   eb.loop();
565   TimePoint end;
566
567   // It's not strictly required that the EventBase register us about both
568   // events in the same call.  So, it's possible that if the EventBase
569   // implementation changes this test could start failing, and it wouldn't be
570   // considered breaking the API.  However for now it's nice to exercise this
571   // code path.
572   ASSERT_EQ(handler.log.size(), 1);
573   ASSERT_EQ(handler.log[0].events,
574                     EventHandler::READ | EventHandler::WRITE);
575   T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
576                   milliseconds(events[0].milliseconds));
577   ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
578   ASSERT_GT(handler.log[0].bytesWritten, 0);
579   T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
580 }
581
582 /**
583  * Test (READ | WRITE | PERSIST)
584  */
585 TEST(EventBaseTest, ReadWritePersist) {
586   EventBase eb;
587   SocketPair sp;
588
589   // Register for read and write events
590   TestHandler handler(&eb, sp[0]);
591   handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
592                           EventHandler::PERSIST);
593
594   // Register timeouts to perform several reads and writes
595   ScheduledEvent events[] = {
596     { 10, EventHandler::WRITE, 2345, 0 },
597     { 20, EventHandler::READ, 0, 0 },
598     { 35, EventHandler::WRITE, 200, 0 },
599     { 45, EventHandler::WRITE, 15, 0 },
600     { 55, EventHandler::READ, 0, 0 },
601     { 120, EventHandler::WRITE, 2345, 0 },
602     { 0, 0, 0, 0 },
603   };
604   scheduleEvents(&eb, sp[1], events);
605
606   // Schedule a timeout to unregister the handler
607   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
608
609   // Loop
610   TimePoint start;
611   eb.loop();
612   TimePoint end;
613
614   ASSERT_EQ(handler.log.size(), 6);
615
616   // Since we didn't fill up the write buffer immediately, there should
617   // be an immediate event for writability.
618   ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
619   T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
620   ASSERT_EQ(handler.log[0].bytesRead, 0);
621   ASSERT_GT(handler.log[0].bytesWritten, 0);
622
623   // Events 1 through 5 should correspond to the scheduled events
624   for (int n = 1; n < 6; ++n) {
625     ScheduledEvent* event = &events[n - 1];
626     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
627                     milliseconds(event->milliseconds));
628     if (event->events == EventHandler::READ) {
629       ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
630       ASSERT_EQ(handler.log[n].bytesRead, 0);
631       ASSERT_GT(handler.log[n].bytesWritten, 0);
632     } else {
633       ASSERT_EQ(handler.log[n].events, EventHandler::READ);
634       ASSERT_EQ(handler.log[n].bytesRead, event->length);
635       ASSERT_EQ(handler.log[n].bytesWritten, 0);
636     }
637   }
638
639   // The timeout should have unregistered the handler before the last write.
640   // Make sure that data is still waiting to be read
641   size_t bytesRemaining = readUntilEmpty(sp[0]);
642   ASSERT_EQ(bytesRemaining, events[5].length);
643 }
644
645
646 class PartialReadHandler : public TestHandler {
647  public:
648   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
649     : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
650
651   void handlerReady(uint16_t events) noexcept override {
652     assert(events == EventHandler::READ);
653     ssize_t bytesRead = readFromFD(fd_, readLength_);
654     log.emplace_back(events, bytesRead, 0);
655   }
656
657  private:
658   int fd_;
659   size_t readLength_;
660 };
661
662 /**
663  * Test reading only part of the available data when a read event is fired.
664  * When PERSIST is used, make sure the handler gets notified again the next
665  * time around the loop.
666  */
667 TEST(EventBaseTest, ReadPartial) {
668   EventBase eb;
669   SocketPair sp;
670
671   // Register for read events
672   size_t readLength = 100;
673   PartialReadHandler handler(&eb, sp[0], readLength);
674   handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
675
676   // Register a timeout to perform a single write,
677   // with more data than PartialReadHandler will read at once
678   ScheduledEvent events[] = {
679     { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
680     { 0, 0, 0, 0 },
681   };
682   scheduleEvents(&eb, sp[1], events);
683
684   // Schedule a timeout to unregister the handler
685   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
686
687   // Loop
688   TimePoint start;
689   eb.loop();
690   TimePoint end;
691
692   ASSERT_EQ(handler.log.size(), 4);
693
694   // The first 3 invocations should read readLength bytes each
695   for (int n = 0; n < 3; ++n) {
696     ASSERT_EQ(handler.log[n].events, EventHandler::READ);
697     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
698                     milliseconds(events[0].milliseconds));
699     ASSERT_EQ(handler.log[n].bytesRead, readLength);
700     ASSERT_EQ(handler.log[n].bytesWritten, 0);
701   }
702   // The last read only has readLength/2 bytes
703   ASSERT_EQ(handler.log[3].events, EventHandler::READ);
704   T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
705                   milliseconds(events[0].milliseconds));
706   ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
707   ASSERT_EQ(handler.log[3].bytesWritten, 0);
708 }
709
710
711 class PartialWriteHandler : public TestHandler {
712  public:
713   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
714     : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
715
716   void handlerReady(uint16_t events) noexcept override {
717     assert(events == EventHandler::WRITE);
718     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
719     log.emplace_back(events, 0, bytesWritten);
720   }
721
722  private:
723   int fd_;
724   size_t writeLength_;
725 };
726
727 /**
728  * Test writing without completely filling up the write buffer when the fd
729  * becomes writable.  When PERSIST is used, make sure the handler gets
730  * notified again the next time around the loop.
731  */
732 TEST(EventBaseTest, WritePartial) {
733   EventBase eb;
734   SocketPair sp;
735
736   // Fill up the write buffer before starting
737   size_t initialBytesWritten = writeUntilFull(sp[0]);
738
739   // Register for write events
740   size_t writeLength = 100;
741   PartialWriteHandler handler(&eb, sp[0], writeLength);
742   handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
743
744   // Register a timeout to read, so that more data can be written
745   ScheduledEvent events[] = {
746     { 10, EventHandler::READ, 0, 0 },
747     { 0, 0, 0, 0 },
748   };
749   scheduleEvents(&eb, sp[1], events);
750
751   // Schedule a timeout to unregister the handler
752   eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
753
754   // Loop
755   TimePoint start;
756   eb.loop();
757   TimePoint end;
758
759   // Depending on how big the socket buffer is, there will be multiple writes
760   // Only check the first 5
761   int numChecked = 5;
762   ASSERT_GE(handler.log.size(), numChecked);
763   ASSERT_EQ(events[0].result, initialBytesWritten);
764
765   // The first 3 invocations should read writeLength bytes each
766   for (int n = 0; n < numChecked; ++n) {
767     ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
768     T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
769                     milliseconds(events[0].milliseconds));
770     ASSERT_EQ(handler.log[n].bytesRead, 0);
771     ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
772   }
773 }
774
775
776 /**
777  * Test destroying a registered EventHandler
778  */
779 TEST(EventBaseTest, DestroyHandler) {
780   class DestroyHandler : public AsyncTimeout {
781    public:
782     DestroyHandler(EventBase* eb, EventHandler* h)
783       : AsyncTimeout(eb)
784       , handler_(h) {}
785
786     void timeoutExpired() noexcept override { delete handler_; }
787
788    private:
789     EventHandler* handler_;
790   };
791
792   EventBase eb;
793   SocketPair sp;
794
795   // Fill up the write buffer before starting
796   size_t initialBytesWritten = writeUntilFull(sp[0]);
797
798   // Register for write events
799   TestHandler* handler = new TestHandler(&eb, sp[0]);
800   handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
801
802   // After 10ms, read some data, so that the handler
803   // will be notified that it can write.
804   eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
805                    10);
806
807   // Start a timer to destroy the handler after 25ms
808   // This mainly just makes sure the code doesn't break or assert
809   DestroyHandler dh(&eb, handler);
810   dh.scheduleTimeout(25);
811
812   TimePoint start;
813   eb.loop();
814   TimePoint end;
815
816   // Make sure the EventHandler was uninstalled properly when it was
817   // destroyed, and the EventBase loop exited
818   T_CHECK_TIMEOUT(start, end, milliseconds(25));
819
820   // Make sure that the handler wrote data to the socket
821   // before it was destroyed
822   size_t bytesRemaining = readUntilEmpty(sp[1]);
823   ASSERT_GT(bytesRemaining, 0);
824 }
825
826
827 ///////////////////////////////////////////////////////////////////////////
828 // Tests for timeout events
829 ///////////////////////////////////////////////////////////////////////////
830
831 TEST(EventBaseTest, RunAfterDelay) {
832   EventBase eb;
833
834   TimePoint timestamp1(false);
835   TimePoint timestamp2(false);
836   TimePoint timestamp3(false);
837   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
838   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
839   eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
840
841   TimePoint start;
842   eb.loop();
843   TimePoint end;
844
845   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
846   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
847   T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
848   T_CHECK_TIMEOUT(start, end, milliseconds(40));
849 }
850
851 /**
852  * Test the behavior of tryRunAfterDelay() when some timeouts are
853  * still scheduled when the EventBase is destroyed.
854  */
855 TEST(EventBaseTest, RunAfterDelayDestruction) {
856   TimePoint timestamp1(false);
857   TimePoint timestamp2(false);
858   TimePoint timestamp3(false);
859   TimePoint timestamp4(false);
860   TimePoint start(false);
861   TimePoint end(false);
862
863   {
864     EventBase eb;
865
866     // Run two normal timeouts
867     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
868     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
869
870     // Schedule a timeout to stop the event loop after 40ms
871     eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
872
873     // Schedule 2 timeouts that would fire after the event loop stops
874     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
875     eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
876
877     start.reset();
878     eb.loop();
879     end.reset();
880   }
881
882   T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
883   T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
884   T_CHECK_TIMEOUT(start, end, milliseconds(40));
885
886   ASSERT_TRUE(timestamp3.isUnset());
887   ASSERT_TRUE(timestamp4.isUnset());
888
889   // Ideally this test should be run under valgrind to ensure that no
890   // memory is leaked.
891 }
892
893 class TestTimeout : public AsyncTimeout {
894  public:
895   explicit TestTimeout(EventBase* eventBase)
896     : AsyncTimeout(eventBase)
897     , timestamp(false) {}
898
899   void timeoutExpired() noexcept override { timestamp.reset(); }
900
901   TimePoint timestamp;
902 };
903
904 TEST(EventBaseTest, BasicTimeouts) {
905   EventBase eb;
906
907   TestTimeout t1(&eb);
908   TestTimeout t2(&eb);
909   TestTimeout t3(&eb);
910   t1.scheduleTimeout(10);
911   t2.scheduleTimeout(20);
912   t3.scheduleTimeout(40);
913
914   TimePoint start;
915   eb.loop();
916   TimePoint end;
917
918   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
919   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
920   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
921   T_CHECK_TIMEOUT(start, end, milliseconds(40));
922 }
923
924 class ReschedulingTimeout : public AsyncTimeout {
925  public:
926   ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
927     : AsyncTimeout(evb)
928     , timeouts_(timeouts)
929     , iterator_(timeouts_.begin()) {}
930
931   void start() {
932     reschedule();
933   }
934
935   void timeoutExpired() noexcept override {
936     timestamps.emplace_back();
937     reschedule();
938   }
939
940   void reschedule() {
941     if (iterator_ != timeouts_.end()) {
942       uint32_t timeout = *iterator_;
943       ++iterator_;
944       scheduleTimeout(timeout);
945     }
946   }
947
948   vector<TimePoint> timestamps;
949
950  private:
951   vector<uint32_t> timeouts_;
952   vector<uint32_t>::const_iterator iterator_;
953 };
954
955 /**
956  * Test rescheduling the same timeout multiple times
957  */
958 TEST(EventBaseTest, ReuseTimeout) {
959   EventBase eb;
960
961   vector<uint32_t> timeouts;
962   timeouts.push_back(10);
963   timeouts.push_back(30);
964   timeouts.push_back(15);
965
966   ReschedulingTimeout t(&eb, timeouts);
967   t.start();
968
969   TimePoint start;
970   eb.loop();
971   TimePoint end;
972
973   // Use a higher tolerance than usual.  We're waiting on 3 timeouts
974   // consecutively.  In general, each timeout may go over by a few
975   // milliseconds, and we're tripling this error by witing on 3 timeouts.
976   milliseconds tolerance{6};
977
978   ASSERT_EQ(timeouts.size(), t.timestamps.size());
979   uint32_t total = 0;
980   for (size_t n = 0; n < timeouts.size(); ++n) {
981     total += timeouts[n];
982     T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
983   }
984   T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
985 }
986
987 /**
988  * Test rescheduling a timeout before it has fired
989  */
990 TEST(EventBaseTest, RescheduleTimeout) {
991   EventBase eb;
992
993   TestTimeout t1(&eb);
994   TestTimeout t2(&eb);
995   TestTimeout t3(&eb);
996
997   t1.scheduleTimeout(15);
998   t2.scheduleTimeout(30);
999   t3.scheduleTimeout(30);
1000
1001   auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1002       &AsyncTimeout::scheduleTimeout);
1003
1004   // after 10ms, reschedule t2 to run sooner than originally scheduled
1005   eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1006   // after 10ms, reschedule t3 to run later than originally scheduled
1007   eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1008
1009   TimePoint start;
1010   eb.loop();
1011   TimePoint end;
1012
1013   T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1014   T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1015   T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1016   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1017 }
1018
1019 /**
1020  * Test cancelling a timeout
1021  */
1022 TEST(EventBaseTest, CancelTimeout) {
1023   EventBase eb;
1024
1025   vector<uint32_t> timeouts;
1026   timeouts.push_back(10);
1027   timeouts.push_back(30);
1028   timeouts.push_back(25);
1029
1030   ReschedulingTimeout t(&eb, timeouts);
1031   t.start();
1032   eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1033
1034   TimePoint start;
1035   eb.loop();
1036   TimePoint end;
1037
1038   ASSERT_EQ(t.timestamps.size(), 2);
1039   T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1040   T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1041   T_CHECK_TIMEOUT(start, end, milliseconds(50));
1042 }
1043
1044 /**
1045  * Test destroying a scheduled timeout object
1046  */
1047 TEST(EventBaseTest, DestroyTimeout) {
1048   class DestroyTimeout : public AsyncTimeout {
1049    public:
1050     DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1051       : AsyncTimeout(eb)
1052       , timeout_(t) {}
1053
1054     void timeoutExpired() noexcept override { delete timeout_; }
1055
1056    private:
1057     AsyncTimeout* timeout_;
1058   };
1059
1060   EventBase eb;
1061
1062   TestTimeout* t1 = new TestTimeout(&eb);
1063   t1->scheduleTimeout(30);
1064
1065   DestroyTimeout dt(&eb, t1);
1066   dt.scheduleTimeout(10);
1067
1068   TimePoint start;
1069   eb.loop();
1070   TimePoint end;
1071
1072   T_CHECK_TIMEOUT(start, end, milliseconds(10));
1073 }
1074
1075
1076 ///////////////////////////////////////////////////////////////////////////
1077 // Test for runInThreadTestFunc()
1078 ///////////////////////////////////////////////////////////////////////////
1079
1080 struct RunInThreadData {
1081   RunInThreadData(int numThreads, int opsPerThread)
1082     : opsPerThread(opsPerThread)
1083     , opsToGo(numThreads*opsPerThread) {}
1084
1085   EventBase evb;
1086   deque< pair<int, int> > values;
1087
1088   int opsPerThread;
1089   int opsToGo;
1090 };
1091
1092 struct RunInThreadArg {
1093   RunInThreadArg(RunInThreadData* data,
1094                  int threadId,
1095                  int value)
1096     : data(data)
1097     , thread(threadId)
1098     , value(value) {}
1099
1100   RunInThreadData* data;
1101   int thread;
1102   int value;
1103 };
1104
1105 void runInThreadTestFunc(RunInThreadArg* arg) {
1106   arg->data->values.emplace_back(arg->thread, arg->value);
1107   RunInThreadData* data = arg->data;
1108   delete arg;
1109
1110   if(--data->opsToGo == 0) {
1111     // Break out of the event base loop if we are the last thread running
1112     data->evb.terminateLoopSoon();
1113   }
1114 }
1115
1116 TEST(EventBaseTest, RunInThread) {
1117   constexpr uint32_t numThreads = 50;
1118   constexpr uint32_t opsPerThread = 100;
1119   RunInThreadData data(numThreads, opsPerThread);
1120
1121   deque<std::thread> threads;
1122   for (uint32_t i = 0; i < numThreads; ++i) {
1123     threads.emplace_back([i, &data] {
1124         for (int n = 0; n < data.opsPerThread; ++n) {
1125           RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1126           data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1127           usleep(10);
1128         }
1129       });
1130   }
1131
1132   // Add a timeout event to run after 3 seconds.
1133   // Otherwise loop() will return immediately since there are no events to run.
1134   // Once the last thread exits, it will stop the loop().  However, this
1135   // timeout also stops the loop in case there is a bug performing the normal
1136   // stop.
1137   data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1138                          3000);
1139
1140   TimePoint start;
1141   data.evb.loop();
1142   TimePoint end;
1143
1144   // Verify that the loop exited because all threads finished and requested it
1145   // to stop.  This should happen much sooner than the 3 second timeout.
1146   // Assert that it happens in under a second.  (This is still tons of extra
1147   // padding.)
1148
1149   auto timeTaken = std::chrono::duration_cast<milliseconds>(
1150     end.getTime() - start.getTime());
1151   ASSERT_LT(timeTaken.count(), 1000);
1152   VLOG(11) << "Time taken: " << timeTaken.count();
1153
1154   // Verify that we have all of the events from every thread
1155   int expectedValues[numThreads];
1156   for (uint32_t n = 0; n < numThreads; ++n) {
1157     expectedValues[n] = 0;
1158   }
1159   for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1160        it != data.values.end();
1161        ++it) {
1162     int threadID = it->first;
1163     int value = it->second;
1164     ASSERT_EQ(expectedValues[threadID], value);
1165     ++expectedValues[threadID];
1166   }
1167   for (uint32_t n = 0; n < numThreads; ++n) {
1168     ASSERT_EQ(expectedValues[n], opsPerThread);
1169   }
1170
1171   // Wait on all of the threads.
1172   for (auto& thread: threads) {
1173     thread.join();
1174   }
1175 }
1176
1177 //  This test simulates some calls, and verifies that the waiting happens by
1178 //  triggering what otherwise would be race conditions, and trying to detect
1179 //  whether any of the race conditions happened.
1180 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1181   const size_t c = 256;
1182   vector<unique_ptr<atomic<size_t>>> atoms(c);
1183   for (size_t i = 0; i < c; ++i) {
1184     auto& atom = atoms.at(i);
1185     atom = make_unique<atomic<size_t>>(0);
1186   }
1187   vector<thread> threads(c);
1188   for (size_t i = 0; i < c; ++i) {
1189     auto& atom = *atoms.at(i);
1190     auto& th = threads.at(i);
1191     th = thread([&atom] {
1192         EventBase eb;
1193         auto ebth = thread([&]{ eb.loopForever(); });
1194         eb.waitUntilRunning();
1195         eb.runInEventBaseThreadAndWait([&] {
1196           size_t x = 0;
1197           atom.compare_exchange_weak(
1198               x, 1, std::memory_order_release, std::memory_order_relaxed);
1199         });
1200         size_t x = 0;
1201         atom.compare_exchange_weak(
1202             x, 2, std::memory_order_release, std::memory_order_relaxed);
1203         eb.terminateLoopSoon();
1204         ebth.join();
1205     });
1206   }
1207   for (size_t i = 0; i < c; ++i) {
1208     auto& th = threads.at(i);
1209     th.join();
1210   }
1211   size_t sum = 0;
1212   for (auto& atom : atoms) sum += *atom;
1213   EXPECT_EQ(c, sum);
1214 }
1215
1216 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1217   EventBase eb;
1218   thread th(&EventBase::loopForever, &eb);
1219   SCOPE_EXIT {
1220     eb.terminateLoopSoon();
1221     th.join();
1222   };
1223   auto mutated = false;
1224   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1225       mutated = true;
1226   });
1227   EXPECT_TRUE(mutated);
1228 }
1229
1230 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1231   EventBase eb;
1232   thread th(&EventBase::loopForever, &eb);
1233   SCOPE_EXIT {
1234     eb.terminateLoopSoon();
1235     th.join();
1236   };
1237   eb.runInEventBaseThreadAndWait([&] {
1238       auto mutated = false;
1239       eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1240           mutated = true;
1241       });
1242       EXPECT_TRUE(mutated);
1243   });
1244 }
1245
1246 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1247   EventBase eb;
1248   auto mutated = false;
1249   eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1250       mutated = true;
1251     });
1252   EXPECT_TRUE(mutated);
1253 }
1254
1255 ///////////////////////////////////////////////////////////////////////////
1256 // Tests for runInLoop()
1257 ///////////////////////////////////////////////////////////////////////////
1258
1259 class CountedLoopCallback : public EventBase::LoopCallback {
1260  public:
1261   CountedLoopCallback(EventBase* eventBase,
1262                       unsigned int count,
1263                       std::function<void()> action =
1264                         std::function<void()>())
1265     : eventBase_(eventBase)
1266     , count_(count)
1267     , action_(action) {}
1268
1269   void runLoopCallback() noexcept override {
1270     --count_;
1271     if (count_ > 0) {
1272       eventBase_->runInLoop(this);
1273     } else if (action_) {
1274       action_();
1275     }
1276   }
1277
1278   unsigned int getCount() const {
1279     return count_;
1280   }
1281
1282  private:
1283   EventBase* eventBase_;
1284   unsigned int count_;
1285   std::function<void()> action_;
1286 };
1287
1288 // Test that EventBase::loop() doesn't exit while there are
1289 // still LoopCallbacks remaining to be invoked.
1290 TEST(EventBaseTest, RepeatedRunInLoop) {
1291   EventBase eventBase;
1292
1293   CountedLoopCallback c(&eventBase, 10);
1294   eventBase.runInLoop(&c);
1295   // The callback shouldn't have run immediately
1296   ASSERT_EQ(c.getCount(), 10);
1297   eventBase.loop();
1298
1299   // loop() should loop until the CountedLoopCallback stops
1300   // re-installing itself.
1301   ASSERT_EQ(c.getCount(), 0);
1302 }
1303
1304 // Test that EventBase::loop() works as expected without time measurements.
1305 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1306   EventBase eventBase(false);
1307
1308   CountedLoopCallback c(&eventBase, 10);
1309   eventBase.runInLoop(&c);
1310   // The callback shouldn't have run immediately
1311   ASSERT_EQ(c.getCount(), 10);
1312   eventBase.loop();
1313
1314   // loop() should loop until the CountedLoopCallback stops
1315   // re-installing itself.
1316   ASSERT_EQ(c.getCount(), 0);
1317 }
1318
1319 // Test runInLoop() calls with terminateLoopSoon()
1320 TEST(EventBaseTest, RunInLoopStopLoop) {
1321   EventBase eventBase;
1322
1323   CountedLoopCallback c1(&eventBase, 20);
1324   CountedLoopCallback c2(&eventBase, 10,
1325                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1326
1327   eventBase.runInLoop(&c1);
1328   eventBase.runInLoop(&c2);
1329   ASSERT_EQ(c1.getCount(), 20);
1330   ASSERT_EQ(c2.getCount(), 10);
1331
1332   eventBase.loopForever();
1333
1334   // c2 should have stopped the loop after 10 iterations
1335   ASSERT_EQ(c2.getCount(), 0);
1336
1337   // We allow the EventBase to run the loop callbacks in whatever order it
1338   // chooses.  We'll accept c1's count being either 10 (if the loop terminated
1339   // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1340   // before c1 ran).
1341   //
1342   // (With the current code, c1 will always run 10 times, but we don't consider
1343   // this a hard API requirement.)
1344   ASSERT_GE(c1.getCount(), 10);
1345   ASSERT_LE(c1.getCount(), 11);
1346 }
1347
1348 TEST(EventBaseTest, TryRunningAfterTerminate) {
1349   EventBase eventBase;
1350   CountedLoopCallback c1(&eventBase, 1,
1351                          std::bind(&EventBase::terminateLoopSoon, &eventBase));
1352   eventBase.runInLoop(&c1);
1353   eventBase.loopForever();
1354   bool ran = false;
1355   eventBase.runInEventBaseThread([&]() {
1356     ran = true;
1357   });
1358
1359   ASSERT_FALSE(ran);
1360 }
1361
1362 // Test cancelling runInLoop() callbacks
1363 TEST(EventBaseTest, CancelRunInLoop) {
1364   EventBase eventBase;
1365
1366   CountedLoopCallback c1(&eventBase, 20);
1367   CountedLoopCallback c2(&eventBase, 20);
1368   CountedLoopCallback c3(&eventBase, 20);
1369
1370   std::function<void()> cancelC1Action =
1371     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1372   std::function<void()> cancelC2Action =
1373     std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1374
1375   CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1376   CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1377
1378   // Install cancelC1 after c1
1379   eventBase.runInLoop(&c1);
1380   eventBase.runInLoop(&cancelC1);
1381
1382   // Install cancelC2 before c2
1383   eventBase.runInLoop(&cancelC2);
1384   eventBase.runInLoop(&c2);
1385
1386   // Install c3
1387   eventBase.runInLoop(&c3);
1388
1389   ASSERT_EQ(c1.getCount(), 20);
1390   ASSERT_EQ(c2.getCount(), 20);
1391   ASSERT_EQ(c3.getCount(), 20);
1392   ASSERT_EQ(cancelC1.getCount(), 10);
1393   ASSERT_EQ(cancelC2.getCount(), 10);
1394
1395   // Run the loop
1396   eventBase.loop();
1397
1398   // cancelC1 and cancelC3 should have both fired after 10 iterations and
1399   // stopped re-installing themselves
1400   ASSERT_EQ(cancelC1.getCount(), 0);
1401   ASSERT_EQ(cancelC2.getCount(), 0);
1402   // c3 should have continued on for the full 20 iterations
1403   ASSERT_EQ(c3.getCount(), 0);
1404
1405   // c1 and c2 should have both been cancelled on the 10th iteration.
1406   //
1407   // Callbacks are always run in the order they are installed,
1408   // so c1 should have fired 10 times, and been canceled after it ran on the
1409   // 10th iteration.  c2 should have only fired 9 times, because cancelC2 will
1410   // have run before it on the 10th iteration, and cancelled it before it
1411   // fired.
1412   ASSERT_EQ(c1.getCount(), 10);
1413   ASSERT_EQ(c2.getCount(), 11);
1414 }
1415
1416 class TerminateTestCallback : public EventBase::LoopCallback,
1417                               public EventHandler {
1418  public:
1419   TerminateTestCallback(EventBase* eventBase, int fd)
1420     : EventHandler(eventBase, fd),
1421       eventBase_(eventBase),
1422       loopInvocations_(0),
1423       maxLoopInvocations_(0),
1424       eventInvocations_(0),
1425       maxEventInvocations_(0) {}
1426
1427   void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1428     loopInvocations_ = 0;
1429     maxLoopInvocations_ = maxLoopInvocations;
1430     eventInvocations_ = 0;
1431     maxEventInvocations_ = maxEventInvocations;
1432
1433     cancelLoopCallback();
1434     unregisterHandler();
1435   }
1436
1437   void handlerReady(uint16_t /* events */) noexcept override {
1438     // We didn't register with PERSIST, so we will have been automatically
1439     // unregistered already.
1440     ASSERT_FALSE(isHandlerRegistered());
1441
1442     ++eventInvocations_;
1443     if (eventInvocations_ >= maxEventInvocations_) {
1444       return;
1445     }
1446
1447     eventBase_->runInLoop(this);
1448   }
1449   void runLoopCallback() noexcept override {
1450     ++loopInvocations_;
1451     if (loopInvocations_ >= maxLoopInvocations_) {
1452       return;
1453     }
1454
1455     registerHandler(READ);
1456   }
1457
1458   uint32_t getLoopInvocations() const {
1459     return loopInvocations_;
1460   }
1461   uint32_t getEventInvocations() const {
1462     return eventInvocations_;
1463   }
1464
1465  private:
1466   EventBase* eventBase_;
1467   uint32_t loopInvocations_;
1468   uint32_t maxLoopInvocations_;
1469   uint32_t eventInvocations_;
1470   uint32_t maxEventInvocations_;
1471 };
1472
1473 /**
1474  * Test that EventBase::loop() correctly detects when there are no more events
1475  * left to run.
1476  *
1477  * This uses a single callback, which alternates registering itself as a loop
1478  * callback versus a EventHandler callback.  This exercises a regression where
1479  * EventBase::loop() incorrectly exited if there were no more fd handlers
1480  * registered, but a loop callback installed a new fd handler.
1481  */
1482 TEST(EventBaseTest, LoopTermination) {
1483   EventBase eventBase;
1484
1485   // Open a pipe and close the write end,
1486   // so the read endpoint will be readable
1487   int pipeFds[2];
1488   int rc = pipe(pipeFds);
1489   ASSERT_EQ(rc, 0);
1490   close(pipeFds[1]);
1491   TerminateTestCallback callback(&eventBase, pipeFds[0]);
1492
1493   // Test once where the callback will exit after a loop callback
1494   callback.reset(10, 100);
1495   eventBase.runInLoop(&callback);
1496   eventBase.loop();
1497   ASSERT_EQ(callback.getLoopInvocations(), 10);
1498   ASSERT_EQ(callback.getEventInvocations(), 9);
1499
1500   // Test once where the callback will exit after an fd event callback
1501   callback.reset(100, 7);
1502   eventBase.runInLoop(&callback);
1503   eventBase.loop();
1504   ASSERT_EQ(callback.getLoopInvocations(), 7);
1505   ASSERT_EQ(callback.getEventInvocations(), 7);
1506
1507   close(pipeFds[0]);
1508 }
1509
1510 ///////////////////////////////////////////////////////////////////////////
1511 // Tests for latency calculations
1512 ///////////////////////////////////////////////////////////////////////////
1513
1514 class IdleTimeTimeoutSeries : public AsyncTimeout {
1515
1516  public:
1517
1518   explicit IdleTimeTimeoutSeries(EventBase *base,
1519                                  std::deque<std::uint64_t>& timeout) :
1520     AsyncTimeout(base),
1521     timeouts_(0),
1522     timeout_(timeout) {
1523       scheduleTimeout(1);
1524     }
1525
1526     ~IdleTimeTimeoutSeries() override {}
1527
1528     void timeoutExpired() noexcept override {
1529     ++timeouts_;
1530
1531     if(timeout_.empty()){
1532       cancelTimeout();
1533     } else {
1534       uint64_t sleepTime = timeout_.front();
1535       timeout_.pop_front();
1536       if (sleepTime) {
1537         usleep(sleepTime);
1538       }
1539       scheduleTimeout(1);
1540     }
1541   }
1542
1543   int getTimeouts() const {
1544     return timeouts_;
1545   }
1546
1547  private:
1548   int timeouts_;
1549   std::deque<uint64_t>& timeout_;
1550 };
1551
1552 /**
1553  * Verify that idle time is correctly accounted for when decaying our loop
1554  * time.
1555  *
1556  * This works by creating a high loop time (via usleep), expecting a latency
1557  * callback with known value, and then scheduling a timeout for later. This
1558  * later timeout is far enough in the future that the idle time should have
1559  * caused the loop time to decay.
1560  */
1561 TEST(EventBaseTest, IdleTime) {
1562   EventBase eventBase;
1563   eventBase.setLoadAvgMsec(1000);
1564   eventBase.resetLoadAvg(5900.0);
1565   std::deque<uint64_t> timeouts0(4, 8080);
1566   timeouts0.push_front(8000);
1567   timeouts0.push_back(14000);
1568   IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1569   std::deque<uint64_t> timeouts(20, 20);
1570   std::unique_ptr<IdleTimeTimeoutSeries> tos;
1571   int64_t testStart = duration_cast<microseconds>(
1572     std::chrono::steady_clock::now().time_since_epoch()).count();
1573   bool hostOverloaded = false;
1574
1575   int latencyCallbacks = 0;
1576   eventBase.setMaxLatency(6000, [&]() {
1577     ++latencyCallbacks;
1578
1579     switch (latencyCallbacks) {
1580     case 1:
1581       if (tos0.getTimeouts() < 6) {
1582         // This could only happen if the host this test is running
1583         // on is heavily loaded.
1584         int64_t maxLatencyReached = duration_cast<microseconds>(
1585             std::chrono::steady_clock::now().time_since_epoch()).count();
1586         ASSERT_LE(43800, maxLatencyReached - testStart);
1587         hostOverloaded = true;
1588         break;
1589       }
1590       ASSERT_EQ(6, tos0.getTimeouts());
1591       ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1592       ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1593       tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1594       break;
1595
1596     default:
1597       FAIL() << "Unexpected latency callback";
1598       break;
1599     }
1600   });
1601
1602   // Kick things off with an "immedite" timeout
1603   tos0.scheduleTimeout(1);
1604
1605   eventBase.loop();
1606
1607   if (hostOverloaded) {
1608     return;
1609   }
1610
1611   ASSERT_EQ(1, latencyCallbacks);
1612   ASSERT_EQ(7, tos0.getTimeouts());
1613   ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1614   ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1615   ASSERT_TRUE(!!tos);
1616   ASSERT_EQ(21, tos->getTimeouts());
1617 }
1618
1619 /**
1620  * Test that thisLoop functionality works with terminateLoopSoon
1621  */
1622 TEST(EventBaseTest, ThisLoop) {
1623   EventBase eb;
1624   bool runInLoop = false;
1625   bool runThisLoop = false;
1626
1627   eb.runInLoop([&](){
1628       eb.terminateLoopSoon();
1629       eb.runInLoop([&]() {
1630           runInLoop = true;
1631         });
1632       eb.runInLoop([&]() {
1633           runThisLoop = true;
1634         }, true);
1635     }, true);
1636   eb.loopForever();
1637
1638   // Should not work
1639   ASSERT_FALSE(runInLoop);
1640   // Should work with thisLoop
1641   ASSERT_TRUE(runThisLoop);
1642 }
1643
1644 TEST(EventBaseTest, EventBaseThreadLoop) {
1645   EventBase base;
1646   bool ran = false;
1647
1648   base.runInEventBaseThread([&](){
1649     ran = true;
1650   });
1651   base.loop();
1652
1653   ASSERT_EQ(true, ran);
1654 }
1655
1656 TEST(EventBaseTest, EventBaseThreadName) {
1657   EventBase base;
1658   base.setName("foo");
1659   base.loop();
1660
1661 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1662   char name[16];
1663   pthread_getname_np(pthread_self(), name, 16);
1664   ASSERT_EQ(0, strcmp("foo", name));
1665 #endif
1666 }
1667
1668 TEST(EventBaseTest, RunBeforeLoop) {
1669   EventBase base;
1670   CountedLoopCallback cb(&base, 1, [&](){
1671     base.terminateLoopSoon();
1672   });
1673   base.runBeforeLoop(&cb);
1674   base.loopForever();
1675   ASSERT_EQ(cb.getCount(), 0);
1676 }
1677
1678 TEST(EventBaseTest, RunBeforeLoopWait) {
1679   EventBase base;
1680   CountedLoopCallback cb(&base, 1);
1681   base.tryRunAfterDelay([&](){
1682       base.terminateLoopSoon();
1683     }, 500);
1684   base.runBeforeLoop(&cb);
1685   base.loopForever();
1686
1687   // Check that we only ran once, and did not loop multiple times.
1688   ASSERT_EQ(cb.getCount(), 0);
1689 }
1690
1691 class PipeHandler : public EventHandler {
1692 public:
1693   PipeHandler(EventBase* eventBase, int fd)
1694     : EventHandler(eventBase, fd) {}
1695
1696   void handlerReady(uint16_t /* events */) noexcept override { abort(); }
1697 };
1698
1699 TEST(EventBaseTest, StopBeforeLoop) {
1700   EventBase evb;
1701
1702   // Give the evb something to do.
1703   int p[2];
1704   ASSERT_EQ(0, pipe(p));
1705   PipeHandler handler(&evb, p[0]);
1706   handler.registerHandler(EventHandler::READ);
1707
1708   // It's definitely not running yet
1709   evb.terminateLoopSoon();
1710
1711   // let it run, it should exit quickly.
1712   std::thread t([&] { evb.loop(); });
1713   t.join();
1714
1715   handler.unregisterHandler();
1716   close(p[0]);
1717   close(p[1]);
1718
1719   SUCCEED();
1720 }
1721
1722 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1723   bool ran = false;
1724
1725   {
1726     EventBase base;
1727     base.runInEventBaseThread([&](){
1728       ran = true;
1729     });
1730   }
1731
1732   ASSERT_TRUE(ran);
1733 }
1734
1735 TEST(EventBaseTest, LoopKeepAlive) {
1736   EventBase evb;
1737
1738   bool done = false;
1739   std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1740     /* sleep override */ std::this_thread::sleep_for(
1741         std::chrono::milliseconds(100));
1742     evb.runInEventBaseThread(
1743         [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1744   });
1745
1746   evb.loop();
1747
1748   ASSERT_TRUE(done);
1749
1750   t.join();
1751 }
1752
1753 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1754   EventBase evb;
1755
1756   bool done = false;
1757   std::thread t;
1758
1759   evb.runInEventBaseThread([&] {
1760     t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1761       /* sleep override */ std::this_thread::sleep_for(
1762           std::chrono::milliseconds(100));
1763       evb.runInEventBaseThread(
1764           [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1765     });
1766   });
1767
1768   evb.loop();
1769
1770   ASSERT_TRUE(done);
1771
1772   t.join();
1773 }
1774
1775 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1776   std::unique_ptr<EventBase> evb = folly::make_unique<EventBase>();
1777
1778   bool done = false;
1779
1780   std::thread evThread([&] {
1781     evb->loopForever();
1782     evb.reset();
1783     done = true;
1784   });
1785
1786   {
1787     auto* ev = evb.get();
1788     EventBase::LoopKeepAlive keepAlive;
1789     ev->runInEventBaseThreadAndWait(
1790         [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); });
1791     ASSERT_FALSE(done) << "Loop finished before we asked it to";
1792     ev->terminateLoopSoon();
1793     /* sleep override */
1794     std::this_thread::sleep_for(std::chrono::milliseconds(30));
1795     ASSERT_FALSE(done) << "Loop terminated early";
1796     ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{});
1797   }
1798
1799   evThread.join();
1800   ASSERT_TRUE(done);
1801 }
1802
1803 TEST(EventBaseTest, LoopKeepAliveShutdown) {
1804   auto evb = folly::make_unique<EventBase>();
1805
1806   bool done = false;
1807
1808   std::thread t([
1809     &done,
1810     loopKeepAlive = evb->loopKeepAlive(),
1811     evbPtr = evb.get()
1812   ]() mutable {
1813     /* sleep override */ std::this_thread::sleep_for(
1814         std::chrono::milliseconds(100));
1815     evbPtr->runInEventBaseThread(
1816         [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1817   });
1818
1819   evb.reset();
1820
1821   ASSERT_TRUE(done);
1822
1823   t.join();
1824 }