2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
19 #include <folly/Memory.h>
21 #include <folly/io/async/AsyncTimeout.h>
22 #include <folly/io/async/EventBase.h>
23 #include <folly/io/async/EventHandler.h>
24 #include <folly/io/async/test/SocketPair.h>
25 #include <folly/io/async/test/Util.h>
26 #include <folly/portability/Unistd.h>
37 using std::unique_ptr;
42 using std::chrono::milliseconds;
43 using std::chrono::microseconds;
44 using std::chrono::duration_cast;
46 using namespace folly;
48 ///////////////////////////////////////////////////////////////////////////
49 // Tests for read and write events
50 ///////////////////////////////////////////////////////////////////////////
52 enum { BUF_SIZE = 4096 };
54 ssize_t writeToFD(int fd, size_t length) {
55 // write an arbitrary amount of data to the fd
56 auto bufv = vector<char>(length);
57 auto buf = bufv.data();
58 memset(buf, 'a', length);
59 ssize_t rc = write(fd, buf, length);
64 size_t writeUntilFull(int fd) {
65 // Write to the fd until EAGAIN is returned
66 size_t bytesWritten = 0;
68 memset(buf, 'a', sizeof(buf));
70 ssize_t rc = write(fd, buf, sizeof(buf));
72 CHECK_EQ(errno, EAGAIN);
81 ssize_t readFromFD(int fd, size_t length) {
82 // write an arbitrary amount of data to the fd
83 auto buf = vector<char>(length);
84 return read(fd, buf.data(), length);
87 size_t readUntilEmpty(int fd) {
88 // Read from the fd until EAGAIN is returned
92 int rc = read(fd, buf, sizeof(buf));
94 CHECK(false) << "unexpected EOF";
96 CHECK_EQ(errno, EAGAIN);
105 void checkReadUntilEmpty(int fd, size_t expectedLength) {
106 ASSERT_EQ(readUntilEmpty(fd), expectedLength);
109 struct ScheduledEvent {
115 void perform(int fd) {
116 if (events & EventHandler::READ) {
118 result = readUntilEmpty(fd);
120 result = readFromFD(fd, length);
123 if (events & EventHandler::WRITE) {
125 result = writeUntilFull(fd);
127 result = writeToFD(fd, length);
133 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
134 for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
135 eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
140 class TestHandler : public EventHandler {
142 TestHandler(EventBase* eventBase, int fd)
143 : EventHandler(eventBase, fd), fd_(fd) {}
145 void handlerReady(uint16_t events) noexcept override {
146 ssize_t bytesRead = 0;
147 ssize_t bytesWritten = 0;
149 // Read all available data, so EventBase will stop calling us
150 // until new data becomes available
151 bytesRead = readUntilEmpty(fd_);
153 if (events & WRITE) {
154 // Write until the pipe buffer is full, so EventBase will stop calling
155 // us until the other end has read some data
156 bytesWritten = writeUntilFull(fd_);
159 log.emplace_back(events, bytesRead, bytesWritten);
163 EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
166 , bytesRead(bytesRead)
167 , bytesWritten(bytesWritten) {}
172 ssize_t bytesWritten;
175 deque<EventRecord> log;
184 TEST(EventBaseTest, ReadEvent) {
188 // Register for read events
189 TestHandler handler(&eb, sp[0]);
190 handler.registerHandler(EventHandler::READ);
192 // Register timeouts to perform two write events
193 ScheduledEvent events[] = {
194 { 10, EventHandler::WRITE, 2345, 0 },
195 { 160, EventHandler::WRITE, 99, 0 },
198 scheduleEvents(&eb, sp[1], events);
205 // Since we didn't use the EventHandler::PERSIST flag, the handler should
206 // have received the first read, then unregistered itself. Check that only
207 // the first chunk of data was received.
208 ASSERT_EQ(handler.log.size(), 1);
209 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
210 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
211 milliseconds(events[0].milliseconds), milliseconds(90));
212 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
213 ASSERT_EQ(handler.log[0].bytesWritten, 0);
214 T_CHECK_TIMEOUT(start, end,
215 milliseconds(events[1].milliseconds), milliseconds(30));
217 // Make sure the second chunk of data is still waiting to be read.
218 size_t bytesRemaining = readUntilEmpty(sp[0]);
219 ASSERT_EQ(bytesRemaining, events[1].length);
223 * Test (READ | PERSIST)
225 TEST(EventBaseTest, ReadPersist) {
229 // Register for read events
230 TestHandler handler(&eb, sp[0]);
231 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
233 // Register several timeouts to perform writes
234 ScheduledEvent events[] = {
235 { 10, EventHandler::WRITE, 1024, 0 },
236 { 20, EventHandler::WRITE, 2211, 0 },
237 { 30, EventHandler::WRITE, 4096, 0 },
238 { 100, EventHandler::WRITE, 100, 0 },
241 scheduleEvents(&eb, sp[1], events);
243 // Schedule a timeout to unregister the handler after the third write
244 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
251 // The handler should have received the first 3 events,
252 // then been unregistered after that.
253 ASSERT_EQ(handler.log.size(), 3);
254 for (int n = 0; n < 3; ++n) {
255 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
256 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
257 milliseconds(events[n].milliseconds));
258 ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
259 ASSERT_EQ(handler.log[n].bytesWritten, 0);
261 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
263 // Make sure the data from the last write is still waiting to be read
264 size_t bytesRemaining = readUntilEmpty(sp[0]);
265 ASSERT_EQ(bytesRemaining, events[3].length);
269 * Test registering for READ when the socket is immediately readable
271 TEST(EventBaseTest, ReadImmediate) {
275 // Write some data to the socket so the other end will
276 // be immediately readable
277 size_t dataLength = 1234;
278 writeToFD(sp[1], dataLength);
280 // Register for read events
281 TestHandler handler(&eb, sp[0]);
282 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
284 // Register a timeout to perform another write
285 ScheduledEvent events[] = {
286 { 10, EventHandler::WRITE, 2345, 0 },
289 scheduleEvents(&eb, sp[1], events);
291 // Schedule a timeout to unregister the handler
292 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
299 ASSERT_EQ(handler.log.size(), 2);
301 // There should have been 1 event for immediate readability
302 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
303 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
304 ASSERT_EQ(handler.log[0].bytesRead, dataLength);
305 ASSERT_EQ(handler.log[0].bytesWritten, 0);
307 // There should be another event after the timeout wrote more data
308 ASSERT_EQ(handler.log[1].events, EventHandler::READ);
309 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
310 milliseconds(events[0].milliseconds));
311 ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
312 ASSERT_EQ(handler.log[1].bytesWritten, 0);
314 T_CHECK_TIMEOUT(start, end, milliseconds(20));
320 TEST(EventBaseTest, WriteEvent) {
324 // Fill up the write buffer before starting
325 size_t initialBytesWritten = writeUntilFull(sp[0]);
327 // Register for write events
328 TestHandler handler(&eb, sp[0]);
329 handler.registerHandler(EventHandler::WRITE);
331 // Register timeouts to perform two reads
332 ScheduledEvent events[] = {
333 { 10, EventHandler::READ, 0, 0 },
334 { 60, EventHandler::READ, 0, 0 },
337 scheduleEvents(&eb, sp[1], events);
344 // Since we didn't use the EventHandler::PERSIST flag, the handler should
345 // have only been able to write once, then unregistered itself.
346 ASSERT_EQ(handler.log.size(), 1);
347 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
348 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
349 milliseconds(events[0].milliseconds));
350 ASSERT_EQ(handler.log[0].bytesRead, 0);
351 ASSERT_GT(handler.log[0].bytesWritten, 0);
352 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
354 ASSERT_EQ(events[0].result, initialBytesWritten);
355 ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
359 * Test (WRITE | PERSIST)
361 TEST(EventBaseTest, WritePersist) {
365 // Fill up the write buffer before starting
366 size_t initialBytesWritten = writeUntilFull(sp[0]);
368 // Register for write events
369 TestHandler handler(&eb, sp[0]);
370 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
372 // Register several timeouts to read from the socket at several intervals
373 ScheduledEvent events[] = {
374 { 10, EventHandler::READ, 0, 0 },
375 { 40, EventHandler::READ, 0, 0 },
376 { 70, EventHandler::READ, 0, 0 },
377 { 100, EventHandler::READ, 0, 0 },
380 scheduleEvents(&eb, sp[1], events);
382 // Schedule a timeout to unregister the handler after the third read
383 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
390 // The handler should have received the first 3 events,
391 // then been unregistered after that.
392 ASSERT_EQ(handler.log.size(), 3);
393 ASSERT_EQ(events[0].result, initialBytesWritten);
394 for (int n = 0; n < 3; ++n) {
395 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
396 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
397 milliseconds(events[n].milliseconds));
398 ASSERT_EQ(handler.log[n].bytesRead, 0);
399 ASSERT_GT(handler.log[n].bytesWritten, 0);
400 ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
402 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
406 * Test registering for WRITE when the socket is immediately writable
408 TEST(EventBaseTest, WriteImmediate) {
412 // Register for write events
413 TestHandler handler(&eb, sp[0]);
414 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
416 // Register a timeout to perform a read
417 ScheduledEvent events[] = {
418 { 10, EventHandler::READ, 0, 0 },
421 scheduleEvents(&eb, sp[1], events);
423 // Schedule a timeout to unregister the handler
424 int64_t unregisterTimeout = 40;
425 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
433 ASSERT_EQ(handler.log.size(), 2);
435 // Since the socket buffer was initially empty,
436 // there should have been 1 event for immediate writability
437 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
438 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
439 ASSERT_EQ(handler.log[0].bytesRead, 0);
440 ASSERT_GT(handler.log[0].bytesWritten, 0);
442 // There should be another event after the timeout wrote more data
443 ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
444 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
445 milliseconds(events[0].milliseconds));
446 ASSERT_EQ(handler.log[1].bytesRead, 0);
447 ASSERT_GT(handler.log[1].bytesWritten, 0);
449 T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
453 * Test (READ | WRITE) when the socket becomes readable first
455 TEST(EventBaseTest, ReadWrite) {
459 // Fill up the write buffer before starting
460 size_t sock0WriteLength = writeUntilFull(sp[0]);
462 // Register for read and write events
463 TestHandler handler(&eb, sp[0]);
464 handler.registerHandler(EventHandler::READ_WRITE);
466 // Register timeouts to perform a write then a read.
467 ScheduledEvent events[] = {
468 { 10, EventHandler::WRITE, 2345, 0 },
469 { 40, EventHandler::READ, 0, 0 },
472 scheduleEvents(&eb, sp[1], events);
479 // Since we didn't use the EventHandler::PERSIST flag, the handler should
480 // have only noticed readability, then unregistered itself. Check that only
481 // one event was logged.
482 ASSERT_EQ(handler.log.size(), 1);
483 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
484 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
485 milliseconds(events[0].milliseconds));
486 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
487 ASSERT_EQ(handler.log[0].bytesWritten, 0);
488 ASSERT_EQ(events[1].result, sock0WriteLength);
489 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
493 * Test (READ | WRITE) when the socket becomes writable first
495 TEST(EventBaseTest, WriteRead) {
499 // Fill up the write buffer before starting
500 size_t sock0WriteLength = writeUntilFull(sp[0]);
502 // Register for read and write events
503 TestHandler handler(&eb, sp[0]);
504 handler.registerHandler(EventHandler::READ_WRITE);
506 // Register timeouts to perform a read then a write.
507 size_t sock1WriteLength = 2345;
508 ScheduledEvent events[] = {
509 { 10, EventHandler::READ, 0, 0 },
510 { 40, EventHandler::WRITE, sock1WriteLength, 0 },
513 scheduleEvents(&eb, sp[1], events);
520 // Since we didn't use the EventHandler::PERSIST flag, the handler should
521 // have only noticed writability, then unregistered itself. Check that only
522 // one event was logged.
523 ASSERT_EQ(handler.log.size(), 1);
524 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
525 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
526 milliseconds(events[0].milliseconds));
527 ASSERT_EQ(handler.log[0].bytesRead, 0);
528 ASSERT_GT(handler.log[0].bytesWritten, 0);
529 ASSERT_EQ(events[0].result, sock0WriteLength);
530 ASSERT_EQ(events[1].result, sock1WriteLength);
531 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
533 // Make sure the written data is still waiting to be read.
534 size_t bytesRemaining = readUntilEmpty(sp[0]);
535 ASSERT_EQ(bytesRemaining, events[1].length);
539 * Test (READ | WRITE) when the socket becomes readable and writable
542 TEST(EventBaseTest, ReadWriteSimultaneous) {
546 // Fill up the write buffer before starting
547 size_t sock0WriteLength = writeUntilFull(sp[0]);
549 // Register for read and write events
550 TestHandler handler(&eb, sp[0]);
551 handler.registerHandler(EventHandler::READ_WRITE);
553 // Register a timeout to perform a read and write together
554 ScheduledEvent events[] = {
555 { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
558 scheduleEvents(&eb, sp[1], events);
565 // It's not strictly required that the EventBase register us about both
566 // events in the same call. So, it's possible that if the EventBase
567 // implementation changes this test could start failing, and it wouldn't be
568 // considered breaking the API. However for now it's nice to exercise this
570 ASSERT_EQ(handler.log.size(), 1);
571 ASSERT_EQ(handler.log[0].events,
572 EventHandler::READ | EventHandler::WRITE);
573 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
574 milliseconds(events[0].milliseconds));
575 ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
576 ASSERT_GT(handler.log[0].bytesWritten, 0);
577 T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
581 * Test (READ | WRITE | PERSIST)
583 TEST(EventBaseTest, ReadWritePersist) {
587 // Register for read and write events
588 TestHandler handler(&eb, sp[0]);
589 handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
590 EventHandler::PERSIST);
592 // Register timeouts to perform several reads and writes
593 ScheduledEvent events[] = {
594 { 10, EventHandler::WRITE, 2345, 0 },
595 { 20, EventHandler::READ, 0, 0 },
596 { 35, EventHandler::WRITE, 200, 0 },
597 { 45, EventHandler::WRITE, 15, 0 },
598 { 55, EventHandler::READ, 0, 0 },
599 { 120, EventHandler::WRITE, 2345, 0 },
602 scheduleEvents(&eb, sp[1], events);
604 // Schedule a timeout to unregister the handler
605 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
612 ASSERT_EQ(handler.log.size(), 6);
614 // Since we didn't fill up the write buffer immediately, there should
615 // be an immediate event for writability.
616 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
617 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
618 ASSERT_EQ(handler.log[0].bytesRead, 0);
619 ASSERT_GT(handler.log[0].bytesWritten, 0);
621 // Events 1 through 5 should correspond to the scheduled events
622 for (int n = 1; n < 6; ++n) {
623 ScheduledEvent* event = &events[n - 1];
624 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
625 milliseconds(event->milliseconds));
626 if (event->events == EventHandler::READ) {
627 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
628 ASSERT_EQ(handler.log[n].bytesRead, 0);
629 ASSERT_GT(handler.log[n].bytesWritten, 0);
631 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
632 ASSERT_EQ(handler.log[n].bytesRead, event->length);
633 ASSERT_EQ(handler.log[n].bytesWritten, 0);
637 // The timeout should have unregistered the handler before the last write.
638 // Make sure that data is still waiting to be read
639 size_t bytesRemaining = readUntilEmpty(sp[0]);
640 ASSERT_EQ(bytesRemaining, events[5].length);
644 class PartialReadHandler : public TestHandler {
646 PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
647 : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
649 void handlerReady(uint16_t events) noexcept override {
650 assert(events == EventHandler::READ);
651 ssize_t bytesRead = readFromFD(fd_, readLength_);
652 log.emplace_back(events, bytesRead, 0);
661 * Test reading only part of the available data when a read event is fired.
662 * When PERSIST is used, make sure the handler gets notified again the next
663 * time around the loop.
665 TEST(EventBaseTest, ReadPartial) {
669 // Register for read events
670 size_t readLength = 100;
671 PartialReadHandler handler(&eb, sp[0], readLength);
672 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
674 // Register a timeout to perform a single write,
675 // with more data than PartialReadHandler will read at once
676 ScheduledEvent events[] = {
677 { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
680 scheduleEvents(&eb, sp[1], events);
682 // Schedule a timeout to unregister the handler
683 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
690 ASSERT_EQ(handler.log.size(), 4);
692 // The first 3 invocations should read readLength bytes each
693 for (int n = 0; n < 3; ++n) {
694 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
695 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
696 milliseconds(events[0].milliseconds));
697 ASSERT_EQ(handler.log[n].bytesRead, readLength);
698 ASSERT_EQ(handler.log[n].bytesWritten, 0);
700 // The last read only has readLength/2 bytes
701 ASSERT_EQ(handler.log[3].events, EventHandler::READ);
702 T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
703 milliseconds(events[0].milliseconds));
704 ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
705 ASSERT_EQ(handler.log[3].bytesWritten, 0);
709 class PartialWriteHandler : public TestHandler {
711 PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
712 : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
714 void handlerReady(uint16_t events) noexcept override {
715 assert(events == EventHandler::WRITE);
716 ssize_t bytesWritten = writeToFD(fd_, writeLength_);
717 log.emplace_back(events, 0, bytesWritten);
726 * Test writing without completely filling up the write buffer when the fd
727 * becomes writable. When PERSIST is used, make sure the handler gets
728 * notified again the next time around the loop.
730 TEST(EventBaseTest, WritePartial) {
734 // Fill up the write buffer before starting
735 size_t initialBytesWritten = writeUntilFull(sp[0]);
737 // Register for write events
738 size_t writeLength = 100;
739 PartialWriteHandler handler(&eb, sp[0], writeLength);
740 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
742 // Register a timeout to read, so that more data can be written
743 ScheduledEvent events[] = {
744 { 10, EventHandler::READ, 0, 0 },
747 scheduleEvents(&eb, sp[1], events);
749 // Schedule a timeout to unregister the handler
750 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
757 // Depending on how big the socket buffer is, there will be multiple writes
758 // Only check the first 5
760 ASSERT_GE(handler.log.size(), numChecked);
761 ASSERT_EQ(events[0].result, initialBytesWritten);
763 // The first 3 invocations should read writeLength bytes each
764 for (int n = 0; n < numChecked; ++n) {
765 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
766 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
767 milliseconds(events[0].milliseconds));
768 ASSERT_EQ(handler.log[n].bytesRead, 0);
769 ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
775 * Test destroying a registered EventHandler
777 TEST(EventBaseTest, DestroyHandler) {
778 class DestroyHandler : public AsyncTimeout {
780 DestroyHandler(EventBase* eb, EventHandler* h)
784 void timeoutExpired() noexcept override { delete handler_; }
787 EventHandler* handler_;
793 // Fill up the write buffer before starting
794 size_t initialBytesWritten = writeUntilFull(sp[0]);
796 // Register for write events
797 TestHandler* handler = new TestHandler(&eb, sp[0]);
798 handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
800 // After 10ms, read some data, so that the handler
801 // will be notified that it can write.
802 eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
805 // Start a timer to destroy the handler after 25ms
806 // This mainly just makes sure the code doesn't break or assert
807 DestroyHandler dh(&eb, handler);
808 dh.scheduleTimeout(25);
814 // Make sure the EventHandler was uninstalled properly when it was
815 // destroyed, and the EventBase loop exited
816 T_CHECK_TIMEOUT(start, end, milliseconds(25));
818 // Make sure that the handler wrote data to the socket
819 // before it was destroyed
820 size_t bytesRemaining = readUntilEmpty(sp[1]);
821 ASSERT_GT(bytesRemaining, 0);
825 ///////////////////////////////////////////////////////////////////////////
826 // Tests for timeout events
827 ///////////////////////////////////////////////////////////////////////////
829 TEST(EventBaseTest, RunAfterDelay) {
832 TimePoint timestamp1(false);
833 TimePoint timestamp2(false);
834 TimePoint timestamp3(false);
835 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
836 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
837 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40);
843 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
844 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
845 T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
846 T_CHECK_TIMEOUT(start, end, milliseconds(40));
850 * Test the behavior of tryRunAfterDelay() when some timeouts are
851 * still scheduled when the EventBase is destroyed.
853 TEST(EventBaseTest, RunAfterDelayDestruction) {
854 TimePoint timestamp1(false);
855 TimePoint timestamp2(false);
856 TimePoint timestamp3(false);
857 TimePoint timestamp4(false);
858 TimePoint start(false);
859 TimePoint end(false);
864 // Run two normal timeouts
865 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
866 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
868 // Schedule a timeout to stop the event loop after 40ms
869 eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
871 // Schedule 2 timeouts that would fire after the event loop stops
872 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80);
873 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160);
880 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
881 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
882 T_CHECK_TIMEOUT(start, end, milliseconds(40));
884 ASSERT_TRUE(timestamp3.isUnset());
885 ASSERT_TRUE(timestamp4.isUnset());
887 // Ideally this test should be run under valgrind to ensure that no
891 class TestTimeout : public AsyncTimeout {
893 explicit TestTimeout(EventBase* eventBase)
894 : AsyncTimeout(eventBase)
895 , timestamp(false) {}
897 void timeoutExpired() noexcept override { timestamp.reset(); }
902 TEST(EventBaseTest, BasicTimeouts) {
908 t1.scheduleTimeout(10);
909 t2.scheduleTimeout(20);
910 t3.scheduleTimeout(40);
916 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
917 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
918 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
919 T_CHECK_TIMEOUT(start, end, milliseconds(40));
922 class ReschedulingTimeout : public AsyncTimeout {
924 ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
926 , timeouts_(timeouts)
927 , iterator_(timeouts_.begin()) {}
933 void timeoutExpired() noexcept override {
934 timestamps.emplace_back();
939 if (iterator_ != timeouts_.end()) {
940 uint32_t timeout = *iterator_;
942 scheduleTimeout(timeout);
946 vector<TimePoint> timestamps;
949 vector<uint32_t> timeouts_;
950 vector<uint32_t>::const_iterator iterator_;
954 * Test rescheduling the same timeout multiple times
956 TEST(EventBaseTest, ReuseTimeout) {
959 vector<uint32_t> timeouts;
960 timeouts.push_back(10);
961 timeouts.push_back(30);
962 timeouts.push_back(15);
964 ReschedulingTimeout t(&eb, timeouts);
971 // Use a higher tolerance than usual. We're waiting on 3 timeouts
972 // consecutively. In general, each timeout may go over by a few
973 // milliseconds, and we're tripling this error by witing on 3 timeouts.
974 milliseconds tolerance{6};
976 ASSERT_EQ(timeouts.size(), t.timestamps.size());
978 for (size_t n = 0; n < timeouts.size(); ++n) {
979 total += timeouts[n];
980 T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
982 T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
986 * Test rescheduling a timeout before it has fired
988 TEST(EventBaseTest, RescheduleTimeout) {
995 t1.scheduleTimeout(15);
996 t2.scheduleTimeout(30);
997 t3.scheduleTimeout(30);
999 auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1000 &AsyncTimeout::scheduleTimeout);
1002 // after 10ms, reschedule t2 to run sooner than originally scheduled
1003 eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1004 // after 10ms, reschedule t3 to run later than originally scheduled
1005 eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1011 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1012 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1013 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1014 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1018 * Test cancelling a timeout
1020 TEST(EventBaseTest, CancelTimeout) {
1023 vector<uint32_t> timeouts;
1024 timeouts.push_back(10);
1025 timeouts.push_back(30);
1026 timeouts.push_back(25);
1028 ReschedulingTimeout t(&eb, timeouts);
1030 eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1036 ASSERT_EQ(t.timestamps.size(), 2);
1037 T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1038 T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1039 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1043 * Test destroying a scheduled timeout object
1045 TEST(EventBaseTest, DestroyTimeout) {
1046 class DestroyTimeout : public AsyncTimeout {
1048 DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1052 void timeoutExpired() noexcept override { delete timeout_; }
1055 AsyncTimeout* timeout_;
1060 TestTimeout* t1 = new TestTimeout(&eb);
1061 t1->scheduleTimeout(30);
1063 DestroyTimeout dt(&eb, t1);
1064 dt.scheduleTimeout(10);
1070 T_CHECK_TIMEOUT(start, end, milliseconds(10));
1074 ///////////////////////////////////////////////////////////////////////////
1075 // Test for runInThreadTestFunc()
1076 ///////////////////////////////////////////////////////////////////////////
1078 struct RunInThreadData {
1079 RunInThreadData(int numThreads, int opsPerThread)
1080 : opsPerThread(opsPerThread)
1081 , opsToGo(numThreads*opsPerThread) {}
1084 deque< pair<int, int> > values;
1090 struct RunInThreadArg {
1091 RunInThreadArg(RunInThreadData* data,
1098 RunInThreadData* data;
1103 void runInThreadTestFunc(RunInThreadArg* arg) {
1104 arg->data->values.emplace_back(arg->thread, arg->value);
1105 RunInThreadData* data = arg->data;
1108 if(--data->opsToGo == 0) {
1109 // Break out of the event base loop if we are the last thread running
1110 data->evb.terminateLoopSoon();
1114 TEST(EventBaseTest, RunInThread) {
1115 constexpr uint32_t numThreads = 50;
1116 constexpr uint32_t opsPerThread = 100;
1117 RunInThreadData data(numThreads, opsPerThread);
1119 deque<std::thread> threads;
1120 for (uint32_t i = 0; i < numThreads; ++i) {
1121 threads.emplace_back([i, &data] {
1122 for (int n = 0; n < data.opsPerThread; ++n) {
1123 RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1124 data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1130 // Add a timeout event to run after 3 seconds.
1131 // Otherwise loop() will return immediately since there are no events to run.
1132 // Once the last thread exits, it will stop the loop(). However, this
1133 // timeout also stops the loop in case there is a bug performing the normal
1135 data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1142 // Verify that the loop exited because all threads finished and requested it
1143 // to stop. This should happen much sooner than the 3 second timeout.
1144 // Assert that it happens in under a second. (This is still tons of extra
1147 auto timeTaken = std::chrono::duration_cast<milliseconds>(
1148 end.getTime() - start.getTime());
1149 ASSERT_LT(timeTaken.count(), 1000);
1150 VLOG(11) << "Time taken: " << timeTaken.count();
1152 // Verify that we have all of the events from every thread
1153 int expectedValues[numThreads];
1154 for (uint32_t n = 0; n < numThreads; ++n) {
1155 expectedValues[n] = 0;
1157 for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1158 it != data.values.end();
1160 int threadID = it->first;
1161 int value = it->second;
1162 ASSERT_EQ(expectedValues[threadID], value);
1163 ++expectedValues[threadID];
1165 for (uint32_t n = 0; n < numThreads; ++n) {
1166 ASSERT_EQ(expectedValues[n], opsPerThread);
1169 // Wait on all of the threads.
1170 for (auto& thread: threads) {
1175 // This test simulates some calls, and verifies that the waiting happens by
1176 // triggering what otherwise would be race conditions, and trying to detect
1177 // whether any of the race conditions happened.
1178 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1179 const size_t c = 256;
1180 vector<unique_ptr<atomic<size_t>>> atoms(c);
1181 for (size_t i = 0; i < c; ++i) {
1182 auto& atom = atoms.at(i);
1183 atom = make_unique<atomic<size_t>>(0);
1185 vector<thread> threads(c);
1186 for (size_t i = 0; i < c; ++i) {
1187 auto& atom = *atoms.at(i);
1188 auto& th = threads.at(i);
1189 th = thread([&atom] {
1191 auto ebth = thread([&]{ eb.loopForever(); });
1192 eb.waitUntilRunning();
1193 eb.runInEventBaseThreadAndWait([&] {
1195 atom.compare_exchange_weak(
1196 x, 1, std::memory_order_release, std::memory_order_relaxed);
1199 atom.compare_exchange_weak(
1200 x, 2, std::memory_order_release, std::memory_order_relaxed);
1201 eb.terminateLoopSoon();
1205 for (size_t i = 0; i < c; ++i) {
1206 auto& th = threads.at(i);
1210 for (auto& atom : atoms) sum += *atom;
1214 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1216 thread th(&EventBase::loopForever, &eb);
1218 eb.terminateLoopSoon();
1221 auto mutated = false;
1222 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1225 EXPECT_TRUE(mutated);
1228 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1230 thread th(&EventBase::loopForever, &eb);
1232 eb.terminateLoopSoon();
1235 eb.runInEventBaseThreadAndWait([&] {
1236 auto mutated = false;
1237 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1240 EXPECT_TRUE(mutated);
1244 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1246 auto mutated = false;
1247 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1250 EXPECT_TRUE(mutated);
1253 ///////////////////////////////////////////////////////////////////////////
1254 // Tests for runInLoop()
1255 ///////////////////////////////////////////////////////////////////////////
1257 class CountedLoopCallback : public EventBase::LoopCallback {
1259 CountedLoopCallback(EventBase* eventBase,
1261 std::function<void()> action =
1262 std::function<void()>())
1263 : eventBase_(eventBase)
1265 , action_(action) {}
1267 void runLoopCallback() noexcept override {
1270 eventBase_->runInLoop(this);
1271 } else if (action_) {
1276 unsigned int getCount() const {
1281 EventBase* eventBase_;
1282 unsigned int count_;
1283 std::function<void()> action_;
1286 // Test that EventBase::loop() doesn't exit while there are
1287 // still LoopCallbacks remaining to be invoked.
1288 TEST(EventBaseTest, RepeatedRunInLoop) {
1289 EventBase eventBase;
1291 CountedLoopCallback c(&eventBase, 10);
1292 eventBase.runInLoop(&c);
1293 // The callback shouldn't have run immediately
1294 ASSERT_EQ(c.getCount(), 10);
1297 // loop() should loop until the CountedLoopCallback stops
1298 // re-installing itself.
1299 ASSERT_EQ(c.getCount(), 0);
1302 // Test that EventBase::loop() works as expected without time measurements.
1303 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1304 EventBase eventBase(false);
1306 CountedLoopCallback c(&eventBase, 10);
1307 eventBase.runInLoop(&c);
1308 // The callback shouldn't have run immediately
1309 ASSERT_EQ(c.getCount(), 10);
1312 // loop() should loop until the CountedLoopCallback stops
1313 // re-installing itself.
1314 ASSERT_EQ(c.getCount(), 0);
1317 // Test runInLoop() calls with terminateLoopSoon()
1318 TEST(EventBaseTest, RunInLoopStopLoop) {
1319 EventBase eventBase;
1321 CountedLoopCallback c1(&eventBase, 20);
1322 CountedLoopCallback c2(&eventBase, 10,
1323 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1325 eventBase.runInLoop(&c1);
1326 eventBase.runInLoop(&c2);
1327 ASSERT_EQ(c1.getCount(), 20);
1328 ASSERT_EQ(c2.getCount(), 10);
1330 eventBase.loopForever();
1332 // c2 should have stopped the loop after 10 iterations
1333 ASSERT_EQ(c2.getCount(), 0);
1335 // We allow the EventBase to run the loop callbacks in whatever order it
1336 // chooses. We'll accept c1's count being either 10 (if the loop terminated
1337 // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1340 // (With the current code, c1 will always run 10 times, but we don't consider
1341 // this a hard API requirement.)
1342 ASSERT_GE(c1.getCount(), 10);
1343 ASSERT_LE(c1.getCount(), 11);
1346 TEST(EventBaseTest, TryRunningAfterTerminate) {
1347 EventBase eventBase;
1348 CountedLoopCallback c1(&eventBase, 1,
1349 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1350 eventBase.runInLoop(&c1);
1351 eventBase.loopForever();
1353 eventBase.runInEventBaseThread([&]() {
1360 // Test cancelling runInLoop() callbacks
1361 TEST(EventBaseTest, CancelRunInLoop) {
1362 EventBase eventBase;
1364 CountedLoopCallback c1(&eventBase, 20);
1365 CountedLoopCallback c2(&eventBase, 20);
1366 CountedLoopCallback c3(&eventBase, 20);
1368 std::function<void()> cancelC1Action =
1369 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1370 std::function<void()> cancelC2Action =
1371 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1373 CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1374 CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1376 // Install cancelC1 after c1
1377 eventBase.runInLoop(&c1);
1378 eventBase.runInLoop(&cancelC1);
1380 // Install cancelC2 before c2
1381 eventBase.runInLoop(&cancelC2);
1382 eventBase.runInLoop(&c2);
1385 eventBase.runInLoop(&c3);
1387 ASSERT_EQ(c1.getCount(), 20);
1388 ASSERT_EQ(c2.getCount(), 20);
1389 ASSERT_EQ(c3.getCount(), 20);
1390 ASSERT_EQ(cancelC1.getCount(), 10);
1391 ASSERT_EQ(cancelC2.getCount(), 10);
1396 // cancelC1 and cancelC3 should have both fired after 10 iterations and
1397 // stopped re-installing themselves
1398 ASSERT_EQ(cancelC1.getCount(), 0);
1399 ASSERT_EQ(cancelC2.getCount(), 0);
1400 // c3 should have continued on for the full 20 iterations
1401 ASSERT_EQ(c3.getCount(), 0);
1403 // c1 and c2 should have both been cancelled on the 10th iteration.
1405 // Callbacks are always run in the order they are installed,
1406 // so c1 should have fired 10 times, and been canceled after it ran on the
1407 // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
1408 // have run before it on the 10th iteration, and cancelled it before it
1410 ASSERT_EQ(c1.getCount(), 10);
1411 ASSERT_EQ(c2.getCount(), 11);
1414 class TerminateTestCallback : public EventBase::LoopCallback,
1415 public EventHandler {
1417 TerminateTestCallback(EventBase* eventBase, int fd)
1418 : EventHandler(eventBase, fd),
1419 eventBase_(eventBase),
1420 loopInvocations_(0),
1421 maxLoopInvocations_(0),
1422 eventInvocations_(0),
1423 maxEventInvocations_(0) {}
1425 void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1426 loopInvocations_ = 0;
1427 maxLoopInvocations_ = maxLoopInvocations;
1428 eventInvocations_ = 0;
1429 maxEventInvocations_ = maxEventInvocations;
1431 cancelLoopCallback();
1432 unregisterHandler();
1435 void handlerReady(uint16_t /* events */) noexcept override {
1436 // We didn't register with PERSIST, so we will have been automatically
1437 // unregistered already.
1438 ASSERT_FALSE(isHandlerRegistered());
1440 ++eventInvocations_;
1441 if (eventInvocations_ >= maxEventInvocations_) {
1445 eventBase_->runInLoop(this);
1447 void runLoopCallback() noexcept override {
1449 if (loopInvocations_ >= maxLoopInvocations_) {
1453 registerHandler(READ);
1456 uint32_t getLoopInvocations() const {
1457 return loopInvocations_;
1459 uint32_t getEventInvocations() const {
1460 return eventInvocations_;
1464 EventBase* eventBase_;
1465 uint32_t loopInvocations_;
1466 uint32_t maxLoopInvocations_;
1467 uint32_t eventInvocations_;
1468 uint32_t maxEventInvocations_;
1472 * Test that EventBase::loop() correctly detects when there are no more events
1475 * This uses a single callback, which alternates registering itself as a loop
1476 * callback versus a EventHandler callback. This exercises a regression where
1477 * EventBase::loop() incorrectly exited if there were no more fd handlers
1478 * registered, but a loop callback installed a new fd handler.
1480 TEST(EventBaseTest, LoopTermination) {
1481 EventBase eventBase;
1483 // Open a pipe and close the write end,
1484 // so the read endpoint will be readable
1486 int rc = pipe(pipeFds);
1489 TerminateTestCallback callback(&eventBase, pipeFds[0]);
1491 // Test once where the callback will exit after a loop callback
1492 callback.reset(10, 100);
1493 eventBase.runInLoop(&callback);
1495 ASSERT_EQ(callback.getLoopInvocations(), 10);
1496 ASSERT_EQ(callback.getEventInvocations(), 9);
1498 // Test once where the callback will exit after an fd event callback
1499 callback.reset(100, 7);
1500 eventBase.runInLoop(&callback);
1502 ASSERT_EQ(callback.getLoopInvocations(), 7);
1503 ASSERT_EQ(callback.getEventInvocations(), 7);
1508 ///////////////////////////////////////////////////////////////////////////
1509 // Tests for latency calculations
1510 ///////////////////////////////////////////////////////////////////////////
1512 class IdleTimeTimeoutSeries : public AsyncTimeout {
1516 explicit IdleTimeTimeoutSeries(EventBase *base,
1517 std::deque<std::uint64_t>& timeout) :
1524 ~IdleTimeTimeoutSeries() override {}
1526 void timeoutExpired() noexcept override {
1529 if(timeout_.empty()){
1532 uint64_t sleepTime = timeout_.front();
1533 timeout_.pop_front();
1541 int getTimeouts() const {
1547 std::deque<uint64_t>& timeout_;
1551 * Verify that idle time is correctly accounted for when decaying our loop
1554 * This works by creating a high loop time (via usleep), expecting a latency
1555 * callback with known value, and then scheduling a timeout for later. This
1556 * later timeout is far enough in the future that the idle time should have
1557 * caused the loop time to decay.
1559 TEST(EventBaseTest, IdleTime) {
1560 EventBase eventBase;
1561 eventBase.setLoadAvgMsec(1000);
1562 eventBase.resetLoadAvg(5900.0);
1563 std::deque<uint64_t> timeouts0(4, 8080);
1564 timeouts0.push_front(8000);
1565 timeouts0.push_back(14000);
1566 IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1567 std::deque<uint64_t> timeouts(20, 20);
1568 std::unique_ptr<IdleTimeTimeoutSeries> tos;
1569 int64_t testStart = duration_cast<microseconds>(
1570 std::chrono::steady_clock::now().time_since_epoch()).count();
1571 bool hostOverloaded = false;
1573 int latencyCallbacks = 0;
1574 eventBase.setMaxLatency(6000, [&]() {
1577 switch (latencyCallbacks) {
1579 if (tos0.getTimeouts() < 6) {
1580 // This could only happen if the host this test is running
1581 // on is heavily loaded.
1582 int64_t maxLatencyReached = duration_cast<microseconds>(
1583 std::chrono::steady_clock::now().time_since_epoch()).count();
1584 ASSERT_LE(43800, maxLatencyReached - testStart);
1585 hostOverloaded = true;
1588 ASSERT_EQ(6, tos0.getTimeouts());
1589 ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1590 ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1591 tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1595 FAIL() << "Unexpected latency callback";
1600 // Kick things off with an "immedite" timeout
1601 tos0.scheduleTimeout(1);
1605 if (hostOverloaded) {
1609 ASSERT_EQ(1, latencyCallbacks);
1610 ASSERT_EQ(7, tos0.getTimeouts());
1611 ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1612 ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1614 ASSERT_EQ(21, tos->getTimeouts());
1618 * Test that thisLoop functionality works with terminateLoopSoon
1620 TEST(EventBaseTest, ThisLoop) {
1622 bool runInLoop = false;
1623 bool runThisLoop = false;
1626 eb.terminateLoopSoon();
1627 eb.runInLoop([&]() {
1630 eb.runInLoop([&]() {
1637 ASSERT_FALSE(runInLoop);
1638 // Should work with thisLoop
1639 ASSERT_TRUE(runThisLoop);
1642 TEST(EventBaseTest, EventBaseThreadLoop) {
1646 base.runInEventBaseThread([&](){
1651 ASSERT_EQ(true, ran);
1654 TEST(EventBaseTest, EventBaseThreadName) {
1656 base.setName("foo");
1659 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1661 pthread_getname_np(pthread_self(), name, 16);
1662 ASSERT_EQ(0, strcmp("foo", name));
1666 TEST(EventBaseTest, RunBeforeLoop) {
1668 CountedLoopCallback cb(&base, 1, [&](){
1669 base.terminateLoopSoon();
1671 base.runBeforeLoop(&cb);
1673 ASSERT_EQ(cb.getCount(), 0);
1676 TEST(EventBaseTest, RunBeforeLoopWait) {
1678 CountedLoopCallback cb(&base, 1);
1679 base.tryRunAfterDelay([&](){
1680 base.terminateLoopSoon();
1682 base.runBeforeLoop(&cb);
1685 // Check that we only ran once, and did not loop multiple times.
1686 ASSERT_EQ(cb.getCount(), 0);
1689 class PipeHandler : public EventHandler {
1691 PipeHandler(EventBase* eventBase, int fd)
1692 : EventHandler(eventBase, fd) {}
1694 void handlerReady(uint16_t /* events */) noexcept override { abort(); }
1697 TEST(EventBaseTest, StopBeforeLoop) {
1700 // Give the evb something to do.
1702 ASSERT_EQ(0, pipe(p));
1703 PipeHandler handler(&evb, p[0]);
1704 handler.registerHandler(EventHandler::READ);
1706 // It's definitely not running yet
1707 evb.terminateLoopSoon();
1709 // let it run, it should exit quickly.
1710 std::thread t([&] { evb.loop(); });
1713 handler.unregisterHandler();
1720 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1725 base.runInEventBaseThread([&](){
1733 TEST(EventBaseTest, LoopKeepAlive) {
1737 std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1738 /* sleep override */ std::this_thread::sleep_for(
1739 std::chrono::milliseconds(100));
1740 evb.runInEventBaseThread(
1741 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1751 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1757 evb.runInEventBaseThread([&] {
1758 t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1759 /* sleep override */ std::this_thread::sleep_for(
1760 std::chrono::milliseconds(100));
1761 evb.runInEventBaseThread(
1762 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1773 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1774 std::unique_ptr<EventBase> evb = folly::make_unique<EventBase>();
1778 std::thread evThread([&] {
1785 auto* ev = evb.get();
1786 EventBase::LoopKeepAlive keepAlive;
1787 ev->runInEventBaseThreadAndWait(
1788 [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); });
1789 ASSERT_FALSE(done) << "Loop finished before we asked it to";
1790 ev->terminateLoopSoon();
1791 /* sleep override */
1792 std::this_thread::sleep_for(std::chrono::milliseconds(30));
1793 ASSERT_FALSE(done) << "Loop terminated early";
1794 ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{});
1801 TEST(EventBaseTest, LoopKeepAliveShutdown) {
1802 auto evb = folly::make_unique<EventBase>();
1808 loopKeepAlive = evb->loopKeepAlive(),
1811 /* sleep override */ std::this_thread::sleep_for(
1812 std::chrono::milliseconds(100));
1813 evbPtr->runInEventBaseThread(
1814 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });