2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 #include <folly/Memory.h>
21 #include <folly/ScopeGuard.h>
23 #include <folly/io/async/AsyncTimeout.h>
24 #include <folly/io/async/EventBase.h>
25 #include <folly/io/async/EventHandler.h>
26 #include <folly/io/async/test/SocketPair.h>
27 #include <folly/io/async/test/Util.h>
28 #include <folly/portability/Unistd.h>
39 using std::unique_ptr;
44 using std::chrono::milliseconds;
45 using std::chrono::microseconds;
46 using std::chrono::duration_cast;
48 using namespace folly;
50 ///////////////////////////////////////////////////////////////////////////
51 // Tests for read and write events
52 ///////////////////////////////////////////////////////////////////////////
54 enum { BUF_SIZE = 4096 };
56 ssize_t writeToFD(int fd, size_t length) {
57 // write an arbitrary amount of data to the fd
58 auto bufv = vector<char>(length);
59 auto buf = bufv.data();
60 memset(buf, 'a', length);
61 ssize_t rc = write(fd, buf, length);
66 size_t writeUntilFull(int fd) {
67 // Write to the fd until EAGAIN is returned
68 size_t bytesWritten = 0;
70 memset(buf, 'a', sizeof(buf));
72 ssize_t rc = write(fd, buf, sizeof(buf));
74 CHECK_EQ(errno, EAGAIN);
83 ssize_t readFromFD(int fd, size_t length) {
84 // write an arbitrary amount of data to the fd
85 auto buf = vector<char>(length);
86 return read(fd, buf.data(), length);
89 size_t readUntilEmpty(int fd) {
90 // Read from the fd until EAGAIN is returned
94 int rc = read(fd, buf, sizeof(buf));
96 CHECK(false) << "unexpected EOF";
98 CHECK_EQ(errno, EAGAIN);
107 void checkReadUntilEmpty(int fd, size_t expectedLength) {
108 ASSERT_EQ(readUntilEmpty(fd), expectedLength);
111 struct ScheduledEvent {
117 void perform(int fd) {
118 if (events & EventHandler::READ) {
120 result = readUntilEmpty(fd);
122 result = readFromFD(fd, length);
125 if (events & EventHandler::WRITE) {
127 result = writeUntilFull(fd);
129 result = writeToFD(fd, length);
135 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
136 for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
137 eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
142 class TestHandler : public EventHandler {
144 TestHandler(EventBase* eventBase, int fd)
145 : EventHandler(eventBase, fd), fd_(fd) {}
147 void handlerReady(uint16_t events) noexcept override {
148 ssize_t bytesRead = 0;
149 ssize_t bytesWritten = 0;
151 // Read all available data, so EventBase will stop calling us
152 // until new data becomes available
153 bytesRead = readUntilEmpty(fd_);
155 if (events & WRITE) {
156 // Write until the pipe buffer is full, so EventBase will stop calling
157 // us until the other end has read some data
158 bytesWritten = writeUntilFull(fd_);
161 log.emplace_back(events, bytesRead, bytesWritten);
165 EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
168 , bytesRead(bytesRead)
169 , bytesWritten(bytesWritten) {}
174 ssize_t bytesWritten;
177 deque<EventRecord> log;
186 TEST(EventBaseTest, ReadEvent) {
190 // Register for read events
191 TestHandler handler(&eb, sp[0]);
192 handler.registerHandler(EventHandler::READ);
194 // Register timeouts to perform two write events
195 ScheduledEvent events[] = {
196 { 10, EventHandler::WRITE, 2345, 0 },
197 { 160, EventHandler::WRITE, 99, 0 },
200 scheduleEvents(&eb, sp[1], events);
207 // Since we didn't use the EventHandler::PERSIST flag, the handler should
208 // have received the first read, then unregistered itself. Check that only
209 // the first chunk of data was received.
210 ASSERT_EQ(handler.log.size(), 1);
211 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
212 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
213 milliseconds(events[0].milliseconds), milliseconds(90));
214 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
215 ASSERT_EQ(handler.log[0].bytesWritten, 0);
216 T_CHECK_TIMEOUT(start, end,
217 milliseconds(events[1].milliseconds), milliseconds(30));
219 // Make sure the second chunk of data is still waiting to be read.
220 size_t bytesRemaining = readUntilEmpty(sp[0]);
221 ASSERT_EQ(bytesRemaining, events[1].length);
225 * Test (READ | PERSIST)
227 TEST(EventBaseTest, ReadPersist) {
231 // Register for read events
232 TestHandler handler(&eb, sp[0]);
233 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
235 // Register several timeouts to perform writes
236 ScheduledEvent events[] = {
237 { 10, EventHandler::WRITE, 1024, 0 },
238 { 20, EventHandler::WRITE, 2211, 0 },
239 { 30, EventHandler::WRITE, 4096, 0 },
240 { 100, EventHandler::WRITE, 100, 0 },
243 scheduleEvents(&eb, sp[1], events);
245 // Schedule a timeout to unregister the handler after the third write
246 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
253 // The handler should have received the first 3 events,
254 // then been unregistered after that.
255 ASSERT_EQ(handler.log.size(), 3);
256 for (int n = 0; n < 3; ++n) {
257 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
258 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
259 milliseconds(events[n].milliseconds));
260 ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
261 ASSERT_EQ(handler.log[n].bytesWritten, 0);
263 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
265 // Make sure the data from the last write is still waiting to be read
266 size_t bytesRemaining = readUntilEmpty(sp[0]);
267 ASSERT_EQ(bytesRemaining, events[3].length);
271 * Test registering for READ when the socket is immediately readable
273 TEST(EventBaseTest, ReadImmediate) {
277 // Write some data to the socket so the other end will
278 // be immediately readable
279 size_t dataLength = 1234;
280 writeToFD(sp[1], dataLength);
282 // Register for read events
283 TestHandler handler(&eb, sp[0]);
284 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
286 // Register a timeout to perform another write
287 ScheduledEvent events[] = {
288 { 10, EventHandler::WRITE, 2345, 0 },
291 scheduleEvents(&eb, sp[1], events);
293 // Schedule a timeout to unregister the handler
294 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
301 ASSERT_EQ(handler.log.size(), 2);
303 // There should have been 1 event for immediate readability
304 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
305 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
306 ASSERT_EQ(handler.log[0].bytesRead, dataLength);
307 ASSERT_EQ(handler.log[0].bytesWritten, 0);
309 // There should be another event after the timeout wrote more data
310 ASSERT_EQ(handler.log[1].events, EventHandler::READ);
311 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
312 milliseconds(events[0].milliseconds));
313 ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
314 ASSERT_EQ(handler.log[1].bytesWritten, 0);
316 T_CHECK_TIMEOUT(start, end, milliseconds(20));
322 TEST(EventBaseTest, WriteEvent) {
326 // Fill up the write buffer before starting
327 size_t initialBytesWritten = writeUntilFull(sp[0]);
329 // Register for write events
330 TestHandler handler(&eb, sp[0]);
331 handler.registerHandler(EventHandler::WRITE);
333 // Register timeouts to perform two reads
334 ScheduledEvent events[] = {
335 { 10, EventHandler::READ, 0, 0 },
336 { 60, EventHandler::READ, 0, 0 },
339 scheduleEvents(&eb, sp[1], events);
346 // Since we didn't use the EventHandler::PERSIST flag, the handler should
347 // have only been able to write once, then unregistered itself.
348 ASSERT_EQ(handler.log.size(), 1);
349 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
350 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
351 milliseconds(events[0].milliseconds));
352 ASSERT_EQ(handler.log[0].bytesRead, 0);
353 ASSERT_GT(handler.log[0].bytesWritten, 0);
354 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
356 ASSERT_EQ(events[0].result, initialBytesWritten);
357 ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
361 * Test (WRITE | PERSIST)
363 TEST(EventBaseTest, WritePersist) {
367 // Fill up the write buffer before starting
368 size_t initialBytesWritten = writeUntilFull(sp[0]);
370 // Register for write events
371 TestHandler handler(&eb, sp[0]);
372 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
374 // Register several timeouts to read from the socket at several intervals
375 ScheduledEvent events[] = {
376 { 10, EventHandler::READ, 0, 0 },
377 { 40, EventHandler::READ, 0, 0 },
378 { 70, EventHandler::READ, 0, 0 },
379 { 100, EventHandler::READ, 0, 0 },
382 scheduleEvents(&eb, sp[1], events);
384 // Schedule a timeout to unregister the handler after the third read
385 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
392 // The handler should have received the first 3 events,
393 // then been unregistered after that.
394 ASSERT_EQ(handler.log.size(), 3);
395 ASSERT_EQ(events[0].result, initialBytesWritten);
396 for (int n = 0; n < 3; ++n) {
397 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
398 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
399 milliseconds(events[n].milliseconds));
400 ASSERT_EQ(handler.log[n].bytesRead, 0);
401 ASSERT_GT(handler.log[n].bytesWritten, 0);
402 ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
404 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
408 * Test registering for WRITE when the socket is immediately writable
410 TEST(EventBaseTest, WriteImmediate) {
414 // Register for write events
415 TestHandler handler(&eb, sp[0]);
416 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
418 // Register a timeout to perform a read
419 ScheduledEvent events[] = {
420 { 10, EventHandler::READ, 0, 0 },
423 scheduleEvents(&eb, sp[1], events);
425 // Schedule a timeout to unregister the handler
426 int64_t unregisterTimeout = 40;
427 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
435 ASSERT_EQ(handler.log.size(), 2);
437 // Since the socket buffer was initially empty,
438 // there should have been 1 event for immediate writability
439 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
440 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
441 ASSERT_EQ(handler.log[0].bytesRead, 0);
442 ASSERT_GT(handler.log[0].bytesWritten, 0);
444 // There should be another event after the timeout wrote more data
445 ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
446 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
447 milliseconds(events[0].milliseconds));
448 ASSERT_EQ(handler.log[1].bytesRead, 0);
449 ASSERT_GT(handler.log[1].bytesWritten, 0);
451 T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
455 * Test (READ | WRITE) when the socket becomes readable first
457 TEST(EventBaseTest, ReadWrite) {
461 // Fill up the write buffer before starting
462 size_t sock0WriteLength = writeUntilFull(sp[0]);
464 // Register for read and write events
465 TestHandler handler(&eb, sp[0]);
466 handler.registerHandler(EventHandler::READ_WRITE);
468 // Register timeouts to perform a write then a read.
469 ScheduledEvent events[] = {
470 { 10, EventHandler::WRITE, 2345, 0 },
471 { 40, EventHandler::READ, 0, 0 },
474 scheduleEvents(&eb, sp[1], events);
481 // Since we didn't use the EventHandler::PERSIST flag, the handler should
482 // have only noticed readability, then unregistered itself. Check that only
483 // one event was logged.
484 ASSERT_EQ(handler.log.size(), 1);
485 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
486 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
487 milliseconds(events[0].milliseconds));
488 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
489 ASSERT_EQ(handler.log[0].bytesWritten, 0);
490 ASSERT_EQ(events[1].result, sock0WriteLength);
491 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
495 * Test (READ | WRITE) when the socket becomes writable first
497 TEST(EventBaseTest, WriteRead) {
501 // Fill up the write buffer before starting
502 size_t sock0WriteLength = writeUntilFull(sp[0]);
504 // Register for read and write events
505 TestHandler handler(&eb, sp[0]);
506 handler.registerHandler(EventHandler::READ_WRITE);
508 // Register timeouts to perform a read then a write.
509 size_t sock1WriteLength = 2345;
510 ScheduledEvent events[] = {
511 { 10, EventHandler::READ, 0, 0 },
512 { 40, EventHandler::WRITE, sock1WriteLength, 0 },
515 scheduleEvents(&eb, sp[1], events);
522 // Since we didn't use the EventHandler::PERSIST flag, the handler should
523 // have only noticed writability, then unregistered itself. Check that only
524 // one event was logged.
525 ASSERT_EQ(handler.log.size(), 1);
526 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
527 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
528 milliseconds(events[0].milliseconds));
529 ASSERT_EQ(handler.log[0].bytesRead, 0);
530 ASSERT_GT(handler.log[0].bytesWritten, 0);
531 ASSERT_EQ(events[0].result, sock0WriteLength);
532 ASSERT_EQ(events[1].result, sock1WriteLength);
533 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
535 // Make sure the written data is still waiting to be read.
536 size_t bytesRemaining = readUntilEmpty(sp[0]);
537 ASSERT_EQ(bytesRemaining, events[1].length);
541 * Test (READ | WRITE) when the socket becomes readable and writable
544 TEST(EventBaseTest, ReadWriteSimultaneous) {
548 // Fill up the write buffer before starting
549 size_t sock0WriteLength = writeUntilFull(sp[0]);
551 // Register for read and write events
552 TestHandler handler(&eb, sp[0]);
553 handler.registerHandler(EventHandler::READ_WRITE);
555 // Register a timeout to perform a read and write together
556 ScheduledEvent events[] = {
557 { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
560 scheduleEvents(&eb, sp[1], events);
567 // It's not strictly required that the EventBase register us about both
568 // events in the same call. So, it's possible that if the EventBase
569 // implementation changes this test could start failing, and it wouldn't be
570 // considered breaking the API. However for now it's nice to exercise this
572 ASSERT_EQ(handler.log.size(), 1);
573 ASSERT_EQ(handler.log[0].events,
574 EventHandler::READ | EventHandler::WRITE);
575 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
576 milliseconds(events[0].milliseconds));
577 ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
578 ASSERT_GT(handler.log[0].bytesWritten, 0);
579 T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
583 * Test (READ | WRITE | PERSIST)
585 TEST(EventBaseTest, ReadWritePersist) {
589 // Register for read and write events
590 TestHandler handler(&eb, sp[0]);
591 handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
592 EventHandler::PERSIST);
594 // Register timeouts to perform several reads and writes
595 ScheduledEvent events[] = {
596 { 10, EventHandler::WRITE, 2345, 0 },
597 { 20, EventHandler::READ, 0, 0 },
598 { 35, EventHandler::WRITE, 200, 0 },
599 { 45, EventHandler::WRITE, 15, 0 },
600 { 55, EventHandler::READ, 0, 0 },
601 { 120, EventHandler::WRITE, 2345, 0 },
604 scheduleEvents(&eb, sp[1], events);
606 // Schedule a timeout to unregister the handler
607 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
614 ASSERT_EQ(handler.log.size(), 6);
616 // Since we didn't fill up the write buffer immediately, there should
617 // be an immediate event for writability.
618 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
619 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
620 ASSERT_EQ(handler.log[0].bytesRead, 0);
621 ASSERT_GT(handler.log[0].bytesWritten, 0);
623 // Events 1 through 5 should correspond to the scheduled events
624 for (int n = 1; n < 6; ++n) {
625 ScheduledEvent* event = &events[n - 1];
626 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
627 milliseconds(event->milliseconds));
628 if (event->events == EventHandler::READ) {
629 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
630 ASSERT_EQ(handler.log[n].bytesRead, 0);
631 ASSERT_GT(handler.log[n].bytesWritten, 0);
633 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
634 ASSERT_EQ(handler.log[n].bytesRead, event->length);
635 ASSERT_EQ(handler.log[n].bytesWritten, 0);
639 // The timeout should have unregistered the handler before the last write.
640 // Make sure that data is still waiting to be read
641 size_t bytesRemaining = readUntilEmpty(sp[0]);
642 ASSERT_EQ(bytesRemaining, events[5].length);
646 class PartialReadHandler : public TestHandler {
648 PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
649 : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
651 void handlerReady(uint16_t events) noexcept override {
652 assert(events == EventHandler::READ);
653 ssize_t bytesRead = readFromFD(fd_, readLength_);
654 log.emplace_back(events, bytesRead, 0);
663 * Test reading only part of the available data when a read event is fired.
664 * When PERSIST is used, make sure the handler gets notified again the next
665 * time around the loop.
667 TEST(EventBaseTest, ReadPartial) {
671 // Register for read events
672 size_t readLength = 100;
673 PartialReadHandler handler(&eb, sp[0], readLength);
674 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
676 // Register a timeout to perform a single write,
677 // with more data than PartialReadHandler will read at once
678 ScheduledEvent events[] = {
679 { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
682 scheduleEvents(&eb, sp[1], events);
684 // Schedule a timeout to unregister the handler
685 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
692 ASSERT_EQ(handler.log.size(), 4);
694 // The first 3 invocations should read readLength bytes each
695 for (int n = 0; n < 3; ++n) {
696 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
697 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
698 milliseconds(events[0].milliseconds));
699 ASSERT_EQ(handler.log[n].bytesRead, readLength);
700 ASSERT_EQ(handler.log[n].bytesWritten, 0);
702 // The last read only has readLength/2 bytes
703 ASSERT_EQ(handler.log[3].events, EventHandler::READ);
704 T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
705 milliseconds(events[0].milliseconds));
706 ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
707 ASSERT_EQ(handler.log[3].bytesWritten, 0);
711 class PartialWriteHandler : public TestHandler {
713 PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
714 : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
716 void handlerReady(uint16_t events) noexcept override {
717 assert(events == EventHandler::WRITE);
718 ssize_t bytesWritten = writeToFD(fd_, writeLength_);
719 log.emplace_back(events, 0, bytesWritten);
728 * Test writing without completely filling up the write buffer when the fd
729 * becomes writable. When PERSIST is used, make sure the handler gets
730 * notified again the next time around the loop.
732 TEST(EventBaseTest, WritePartial) {
736 // Fill up the write buffer before starting
737 size_t initialBytesWritten = writeUntilFull(sp[0]);
739 // Register for write events
740 size_t writeLength = 100;
741 PartialWriteHandler handler(&eb, sp[0], writeLength);
742 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
744 // Register a timeout to read, so that more data can be written
745 ScheduledEvent events[] = {
746 { 10, EventHandler::READ, 0, 0 },
749 scheduleEvents(&eb, sp[1], events);
751 // Schedule a timeout to unregister the handler
752 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
759 // Depending on how big the socket buffer is, there will be multiple writes
760 // Only check the first 5
762 ASSERT_GE(handler.log.size(), numChecked);
763 ASSERT_EQ(events[0].result, initialBytesWritten);
765 // The first 3 invocations should read writeLength bytes each
766 for (int n = 0; n < numChecked; ++n) {
767 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
768 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
769 milliseconds(events[0].milliseconds));
770 ASSERT_EQ(handler.log[n].bytesRead, 0);
771 ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
777 * Test destroying a registered EventHandler
779 TEST(EventBaseTest, DestroyHandler) {
780 class DestroyHandler : public AsyncTimeout {
782 DestroyHandler(EventBase* eb, EventHandler* h)
786 void timeoutExpired() noexcept override { delete handler_; }
789 EventHandler* handler_;
795 // Fill up the write buffer before starting
796 size_t initialBytesWritten = writeUntilFull(sp[0]);
798 // Register for write events
799 TestHandler* handler = new TestHandler(&eb, sp[0]);
800 handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
802 // After 10ms, read some data, so that the handler
803 // will be notified that it can write.
804 eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
807 // Start a timer to destroy the handler after 25ms
808 // This mainly just makes sure the code doesn't break or assert
809 DestroyHandler dh(&eb, handler);
810 dh.scheduleTimeout(25);
816 // Make sure the EventHandler was uninstalled properly when it was
817 // destroyed, and the EventBase loop exited
818 T_CHECK_TIMEOUT(start, end, milliseconds(25));
820 // Make sure that the handler wrote data to the socket
821 // before it was destroyed
822 size_t bytesRemaining = readUntilEmpty(sp[1]);
823 ASSERT_GT(bytesRemaining, 0);
827 ///////////////////////////////////////////////////////////////////////////
828 // Tests for timeout events
829 ///////////////////////////////////////////////////////////////////////////
831 TEST(EventBaseTest, RunAfterDelay) {
834 TimePoint timestamp1(false);
835 TimePoint timestamp2(false);
836 TimePoint timestamp3(false);
837 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
838 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
839 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40);
845 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
846 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
847 T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
848 T_CHECK_TIMEOUT(start, end, milliseconds(40));
852 * Test the behavior of tryRunAfterDelay() when some timeouts are
853 * still scheduled when the EventBase is destroyed.
855 TEST(EventBaseTest, RunAfterDelayDestruction) {
856 TimePoint timestamp1(false);
857 TimePoint timestamp2(false);
858 TimePoint timestamp3(false);
859 TimePoint timestamp4(false);
860 TimePoint start(false);
861 TimePoint end(false);
866 // Run two normal timeouts
867 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
868 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
870 // Schedule a timeout to stop the event loop after 40ms
871 eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
873 // Schedule 2 timeouts that would fire after the event loop stops
874 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80);
875 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160);
882 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
883 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
884 T_CHECK_TIMEOUT(start, end, milliseconds(40));
886 ASSERT_TRUE(timestamp3.isUnset());
887 ASSERT_TRUE(timestamp4.isUnset());
889 // Ideally this test should be run under valgrind to ensure that no
893 class TestTimeout : public AsyncTimeout {
895 explicit TestTimeout(EventBase* eventBase)
896 : AsyncTimeout(eventBase)
897 , timestamp(false) {}
899 void timeoutExpired() noexcept override { timestamp.reset(); }
904 TEST(EventBaseTest, BasicTimeouts) {
910 t1.scheduleTimeout(10);
911 t2.scheduleTimeout(20);
912 t3.scheduleTimeout(40);
918 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
919 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
920 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
921 T_CHECK_TIMEOUT(start, end, milliseconds(40));
924 class ReschedulingTimeout : public AsyncTimeout {
926 ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
928 , timeouts_(timeouts)
929 , iterator_(timeouts_.begin()) {}
935 void timeoutExpired() noexcept override {
936 timestamps.emplace_back();
941 if (iterator_ != timeouts_.end()) {
942 uint32_t timeout = *iterator_;
944 scheduleTimeout(timeout);
948 vector<TimePoint> timestamps;
951 vector<uint32_t> timeouts_;
952 vector<uint32_t>::const_iterator iterator_;
956 * Test rescheduling the same timeout multiple times
958 TEST(EventBaseTest, ReuseTimeout) {
961 vector<uint32_t> timeouts;
962 timeouts.push_back(10);
963 timeouts.push_back(30);
964 timeouts.push_back(15);
966 ReschedulingTimeout t(&eb, timeouts);
973 // Use a higher tolerance than usual. We're waiting on 3 timeouts
974 // consecutively. In general, each timeout may go over by a few
975 // milliseconds, and we're tripling this error by witing on 3 timeouts.
976 milliseconds tolerance{6};
978 ASSERT_EQ(timeouts.size(), t.timestamps.size());
980 for (size_t n = 0; n < timeouts.size(); ++n) {
981 total += timeouts[n];
982 T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
984 T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
988 * Test rescheduling a timeout before it has fired
990 TEST(EventBaseTest, RescheduleTimeout) {
997 t1.scheduleTimeout(15);
998 t2.scheduleTimeout(30);
999 t3.scheduleTimeout(30);
1001 auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1002 &AsyncTimeout::scheduleTimeout);
1004 // after 10ms, reschedule t2 to run sooner than originally scheduled
1005 eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1006 // after 10ms, reschedule t3 to run later than originally scheduled
1007 eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1013 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1014 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1015 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1016 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1020 * Test cancelling a timeout
1022 TEST(EventBaseTest, CancelTimeout) {
1025 vector<uint32_t> timeouts;
1026 timeouts.push_back(10);
1027 timeouts.push_back(30);
1028 timeouts.push_back(25);
1030 ReschedulingTimeout t(&eb, timeouts);
1032 eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1038 ASSERT_EQ(t.timestamps.size(), 2);
1039 T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1040 T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1041 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1045 * Test destroying a scheduled timeout object
1047 TEST(EventBaseTest, DestroyTimeout) {
1048 class DestroyTimeout : public AsyncTimeout {
1050 DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1054 void timeoutExpired() noexcept override { delete timeout_; }
1057 AsyncTimeout* timeout_;
1062 TestTimeout* t1 = new TestTimeout(&eb);
1063 t1->scheduleTimeout(30);
1065 DestroyTimeout dt(&eb, t1);
1066 dt.scheduleTimeout(10);
1072 T_CHECK_TIMEOUT(start, end, milliseconds(10));
1076 ///////////////////////////////////////////////////////////////////////////
1077 // Test for runInThreadTestFunc()
1078 ///////////////////////////////////////////////////////////////////////////
1080 struct RunInThreadData {
1081 RunInThreadData(int numThreads, int opsPerThread)
1082 : opsPerThread(opsPerThread)
1083 , opsToGo(numThreads*opsPerThread) {}
1086 deque< pair<int, int> > values;
1092 struct RunInThreadArg {
1093 RunInThreadArg(RunInThreadData* data,
1100 RunInThreadData* data;
1105 void runInThreadTestFunc(RunInThreadArg* arg) {
1106 arg->data->values.emplace_back(arg->thread, arg->value);
1107 RunInThreadData* data = arg->data;
1110 if(--data->opsToGo == 0) {
1111 // Break out of the event base loop if we are the last thread running
1112 data->evb.terminateLoopSoon();
1116 TEST(EventBaseTest, RunInThread) {
1117 constexpr uint32_t numThreads = 50;
1118 constexpr uint32_t opsPerThread = 100;
1119 RunInThreadData data(numThreads, opsPerThread);
1121 deque<std::thread> threads;
1122 for (uint32_t i = 0; i < numThreads; ++i) {
1123 threads.emplace_back([i, &data] {
1124 for (int n = 0; n < data.opsPerThread; ++n) {
1125 RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1126 data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1132 // Add a timeout event to run after 3 seconds.
1133 // Otherwise loop() will return immediately since there are no events to run.
1134 // Once the last thread exits, it will stop the loop(). However, this
1135 // timeout also stops the loop in case there is a bug performing the normal
1137 data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1144 // Verify that the loop exited because all threads finished and requested it
1145 // to stop. This should happen much sooner than the 3 second timeout.
1146 // Assert that it happens in under a second. (This is still tons of extra
1149 auto timeTaken = std::chrono::duration_cast<milliseconds>(
1150 end.getTime() - start.getTime());
1151 ASSERT_LT(timeTaken.count(), 1000);
1152 VLOG(11) << "Time taken: " << timeTaken.count();
1154 // Verify that we have all of the events from every thread
1155 int expectedValues[numThreads];
1156 for (uint32_t n = 0; n < numThreads; ++n) {
1157 expectedValues[n] = 0;
1159 for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1160 it != data.values.end();
1162 int threadID = it->first;
1163 int value = it->second;
1164 ASSERT_EQ(expectedValues[threadID], value);
1165 ++expectedValues[threadID];
1167 for (uint32_t n = 0; n < numThreads; ++n) {
1168 ASSERT_EQ(expectedValues[n], opsPerThread);
1171 // Wait on all of the threads.
1172 for (auto& thread: threads) {
1177 // This test simulates some calls, and verifies that the waiting happens by
1178 // triggering what otherwise would be race conditions, and trying to detect
1179 // whether any of the race conditions happened.
1180 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1181 const size_t c = 256;
1182 vector<unique_ptr<atomic<size_t>>> atoms(c);
1183 for (size_t i = 0; i < c; ++i) {
1184 auto& atom = atoms.at(i);
1185 atom = make_unique<atomic<size_t>>(0);
1187 vector<thread> threads(c);
1188 for (size_t i = 0; i < c; ++i) {
1189 auto& atom = *atoms.at(i);
1190 auto& th = threads.at(i);
1191 th = thread([&atom] {
1193 auto ebth = thread([&]{ eb.loopForever(); });
1194 eb.waitUntilRunning();
1195 eb.runInEventBaseThreadAndWait([&] {
1197 atom.compare_exchange_weak(
1198 x, 1, std::memory_order_release, std::memory_order_relaxed);
1201 atom.compare_exchange_weak(
1202 x, 2, std::memory_order_release, std::memory_order_relaxed);
1203 eb.terminateLoopSoon();
1207 for (size_t i = 0; i < c; ++i) {
1208 auto& th = threads.at(i);
1212 for (auto& atom : atoms) sum += *atom;
1216 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1218 thread th(&EventBase::loopForever, &eb);
1220 eb.terminateLoopSoon();
1223 auto mutated = false;
1224 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1227 EXPECT_TRUE(mutated);
1230 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1232 thread th(&EventBase::loopForever, &eb);
1234 eb.terminateLoopSoon();
1237 eb.runInEventBaseThreadAndWait([&] {
1238 auto mutated = false;
1239 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1242 EXPECT_TRUE(mutated);
1246 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1248 auto mutated = false;
1249 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1252 EXPECT_TRUE(mutated);
1255 ///////////////////////////////////////////////////////////////////////////
1256 // Tests for runInLoop()
1257 ///////////////////////////////////////////////////////////////////////////
1259 class CountedLoopCallback : public EventBase::LoopCallback {
1261 CountedLoopCallback(EventBase* eventBase,
1263 std::function<void()> action =
1264 std::function<void()>())
1265 : eventBase_(eventBase)
1267 , action_(action) {}
1269 void runLoopCallback() noexcept override {
1272 eventBase_->runInLoop(this);
1273 } else if (action_) {
1278 unsigned int getCount() const {
1283 EventBase* eventBase_;
1284 unsigned int count_;
1285 std::function<void()> action_;
1288 // Test that EventBase::loop() doesn't exit while there are
1289 // still LoopCallbacks remaining to be invoked.
1290 TEST(EventBaseTest, RepeatedRunInLoop) {
1291 EventBase eventBase;
1293 CountedLoopCallback c(&eventBase, 10);
1294 eventBase.runInLoop(&c);
1295 // The callback shouldn't have run immediately
1296 ASSERT_EQ(c.getCount(), 10);
1299 // loop() should loop until the CountedLoopCallback stops
1300 // re-installing itself.
1301 ASSERT_EQ(c.getCount(), 0);
1304 // Test that EventBase::loop() works as expected without time measurements.
1305 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1306 EventBase eventBase(false);
1308 CountedLoopCallback c(&eventBase, 10);
1309 eventBase.runInLoop(&c);
1310 // The callback shouldn't have run immediately
1311 ASSERT_EQ(c.getCount(), 10);
1314 // loop() should loop until the CountedLoopCallback stops
1315 // re-installing itself.
1316 ASSERT_EQ(c.getCount(), 0);
1319 // Test runInLoop() calls with terminateLoopSoon()
1320 TEST(EventBaseTest, RunInLoopStopLoop) {
1321 EventBase eventBase;
1323 CountedLoopCallback c1(&eventBase, 20);
1324 CountedLoopCallback c2(&eventBase, 10,
1325 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1327 eventBase.runInLoop(&c1);
1328 eventBase.runInLoop(&c2);
1329 ASSERT_EQ(c1.getCount(), 20);
1330 ASSERT_EQ(c2.getCount(), 10);
1332 eventBase.loopForever();
1334 // c2 should have stopped the loop after 10 iterations
1335 ASSERT_EQ(c2.getCount(), 0);
1337 // We allow the EventBase to run the loop callbacks in whatever order it
1338 // chooses. We'll accept c1's count being either 10 (if the loop terminated
1339 // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1342 // (With the current code, c1 will always run 10 times, but we don't consider
1343 // this a hard API requirement.)
1344 ASSERT_GE(c1.getCount(), 10);
1345 ASSERT_LE(c1.getCount(), 11);
1348 TEST(EventBaseTest, TryRunningAfterTerminate) {
1349 EventBase eventBase;
1350 CountedLoopCallback c1(&eventBase, 1,
1351 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1352 eventBase.runInLoop(&c1);
1353 eventBase.loopForever();
1355 eventBase.runInEventBaseThread([&]() {
1362 // Test cancelling runInLoop() callbacks
1363 TEST(EventBaseTest, CancelRunInLoop) {
1364 EventBase eventBase;
1366 CountedLoopCallback c1(&eventBase, 20);
1367 CountedLoopCallback c2(&eventBase, 20);
1368 CountedLoopCallback c3(&eventBase, 20);
1370 std::function<void()> cancelC1Action =
1371 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1372 std::function<void()> cancelC2Action =
1373 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1375 CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1376 CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1378 // Install cancelC1 after c1
1379 eventBase.runInLoop(&c1);
1380 eventBase.runInLoop(&cancelC1);
1382 // Install cancelC2 before c2
1383 eventBase.runInLoop(&cancelC2);
1384 eventBase.runInLoop(&c2);
1387 eventBase.runInLoop(&c3);
1389 ASSERT_EQ(c1.getCount(), 20);
1390 ASSERT_EQ(c2.getCount(), 20);
1391 ASSERT_EQ(c3.getCount(), 20);
1392 ASSERT_EQ(cancelC1.getCount(), 10);
1393 ASSERT_EQ(cancelC2.getCount(), 10);
1398 // cancelC1 and cancelC3 should have both fired after 10 iterations and
1399 // stopped re-installing themselves
1400 ASSERT_EQ(cancelC1.getCount(), 0);
1401 ASSERT_EQ(cancelC2.getCount(), 0);
1402 // c3 should have continued on for the full 20 iterations
1403 ASSERT_EQ(c3.getCount(), 0);
1405 // c1 and c2 should have both been cancelled on the 10th iteration.
1407 // Callbacks are always run in the order they are installed,
1408 // so c1 should have fired 10 times, and been canceled after it ran on the
1409 // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
1410 // have run before it on the 10th iteration, and cancelled it before it
1412 ASSERT_EQ(c1.getCount(), 10);
1413 ASSERT_EQ(c2.getCount(), 11);
1416 class TerminateTestCallback : public EventBase::LoopCallback,
1417 public EventHandler {
1419 TerminateTestCallback(EventBase* eventBase, int fd)
1420 : EventHandler(eventBase, fd),
1421 eventBase_(eventBase),
1422 loopInvocations_(0),
1423 maxLoopInvocations_(0),
1424 eventInvocations_(0),
1425 maxEventInvocations_(0) {}
1427 void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1428 loopInvocations_ = 0;
1429 maxLoopInvocations_ = maxLoopInvocations;
1430 eventInvocations_ = 0;
1431 maxEventInvocations_ = maxEventInvocations;
1433 cancelLoopCallback();
1434 unregisterHandler();
1437 void handlerReady(uint16_t /* events */) noexcept override {
1438 // We didn't register with PERSIST, so we will have been automatically
1439 // unregistered already.
1440 ASSERT_FALSE(isHandlerRegistered());
1442 ++eventInvocations_;
1443 if (eventInvocations_ >= maxEventInvocations_) {
1447 eventBase_->runInLoop(this);
1449 void runLoopCallback() noexcept override {
1451 if (loopInvocations_ >= maxLoopInvocations_) {
1455 registerHandler(READ);
1458 uint32_t getLoopInvocations() const {
1459 return loopInvocations_;
1461 uint32_t getEventInvocations() const {
1462 return eventInvocations_;
1466 EventBase* eventBase_;
1467 uint32_t loopInvocations_;
1468 uint32_t maxLoopInvocations_;
1469 uint32_t eventInvocations_;
1470 uint32_t maxEventInvocations_;
1474 * Test that EventBase::loop() correctly detects when there are no more events
1477 * This uses a single callback, which alternates registering itself as a loop
1478 * callback versus a EventHandler callback. This exercises a regression where
1479 * EventBase::loop() incorrectly exited if there were no more fd handlers
1480 * registered, but a loop callback installed a new fd handler.
1482 TEST(EventBaseTest, LoopTermination) {
1483 EventBase eventBase;
1485 // Open a pipe and close the write end,
1486 // so the read endpoint will be readable
1488 int rc = pipe(pipeFds);
1491 TerminateTestCallback callback(&eventBase, pipeFds[0]);
1493 // Test once where the callback will exit after a loop callback
1494 callback.reset(10, 100);
1495 eventBase.runInLoop(&callback);
1497 ASSERT_EQ(callback.getLoopInvocations(), 10);
1498 ASSERT_EQ(callback.getEventInvocations(), 9);
1500 // Test once where the callback will exit after an fd event callback
1501 callback.reset(100, 7);
1502 eventBase.runInLoop(&callback);
1504 ASSERT_EQ(callback.getLoopInvocations(), 7);
1505 ASSERT_EQ(callback.getEventInvocations(), 7);
1510 ///////////////////////////////////////////////////////////////////////////
1511 // Tests for latency calculations
1512 ///////////////////////////////////////////////////////////////////////////
1514 class IdleTimeTimeoutSeries : public AsyncTimeout {
1518 explicit IdleTimeTimeoutSeries(EventBase *base,
1519 std::deque<std::uint64_t>& timeout) :
1526 ~IdleTimeTimeoutSeries() override {}
1528 void timeoutExpired() noexcept override {
1531 if(timeout_.empty()){
1534 uint64_t sleepTime = timeout_.front();
1535 timeout_.pop_front();
1543 int getTimeouts() const {
1549 std::deque<uint64_t>& timeout_;
1553 * Verify that idle time is correctly accounted for when decaying our loop
1556 * This works by creating a high loop time (via usleep), expecting a latency
1557 * callback with known value, and then scheduling a timeout for later. This
1558 * later timeout is far enough in the future that the idle time should have
1559 * caused the loop time to decay.
1561 TEST(EventBaseTest, IdleTime) {
1562 EventBase eventBase;
1563 eventBase.setLoadAvgMsec(1000);
1564 eventBase.resetLoadAvg(5900.0);
1565 std::deque<uint64_t> timeouts0(4, 8080);
1566 timeouts0.push_front(8000);
1567 timeouts0.push_back(14000);
1568 IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1569 std::deque<uint64_t> timeouts(20, 20);
1570 std::unique_ptr<IdleTimeTimeoutSeries> tos;
1571 int64_t testStart = duration_cast<microseconds>(
1572 std::chrono::steady_clock::now().time_since_epoch()).count();
1573 bool hostOverloaded = false;
1575 int latencyCallbacks = 0;
1576 eventBase.setMaxLatency(6000, [&]() {
1579 switch (latencyCallbacks) {
1581 if (tos0.getTimeouts() < 6) {
1582 // This could only happen if the host this test is running
1583 // on is heavily loaded.
1584 int64_t maxLatencyReached = duration_cast<microseconds>(
1585 std::chrono::steady_clock::now().time_since_epoch()).count();
1586 ASSERT_LE(43800, maxLatencyReached - testStart);
1587 hostOverloaded = true;
1590 ASSERT_EQ(6, tos0.getTimeouts());
1591 ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1592 ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1593 tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1597 FAIL() << "Unexpected latency callback";
1602 // Kick things off with an "immedite" timeout
1603 tos0.scheduleTimeout(1);
1607 if (hostOverloaded) {
1611 ASSERT_EQ(1, latencyCallbacks);
1612 ASSERT_EQ(7, tos0.getTimeouts());
1613 ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1614 ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1616 ASSERT_EQ(21, tos->getTimeouts());
1620 * Test that thisLoop functionality works with terminateLoopSoon
1622 TEST(EventBaseTest, ThisLoop) {
1624 bool runInLoop = false;
1625 bool runThisLoop = false;
1628 eb.terminateLoopSoon();
1629 eb.runInLoop([&]() {
1632 eb.runInLoop([&]() {
1639 ASSERT_FALSE(runInLoop);
1640 // Should work with thisLoop
1641 ASSERT_TRUE(runThisLoop);
1644 TEST(EventBaseTest, EventBaseThreadLoop) {
1648 base.runInEventBaseThread([&](){
1653 ASSERT_EQ(true, ran);
1656 TEST(EventBaseTest, EventBaseThreadName) {
1658 base.setName("foo");
1661 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1663 pthread_getname_np(pthread_self(), name, 16);
1664 ASSERT_EQ(0, strcmp("foo", name));
1668 TEST(EventBaseTest, RunBeforeLoop) {
1670 CountedLoopCallback cb(&base, 1, [&](){
1671 base.terminateLoopSoon();
1673 base.runBeforeLoop(&cb);
1675 ASSERT_EQ(cb.getCount(), 0);
1678 TEST(EventBaseTest, RunBeforeLoopWait) {
1680 CountedLoopCallback cb(&base, 1);
1681 base.tryRunAfterDelay([&](){
1682 base.terminateLoopSoon();
1684 base.runBeforeLoop(&cb);
1687 // Check that we only ran once, and did not loop multiple times.
1688 ASSERT_EQ(cb.getCount(), 0);
1691 class PipeHandler : public EventHandler {
1693 PipeHandler(EventBase* eventBase, int fd)
1694 : EventHandler(eventBase, fd) {}
1696 void handlerReady(uint16_t /* events */) noexcept override { abort(); }
1699 TEST(EventBaseTest, StopBeforeLoop) {
1702 // Give the evb something to do.
1704 ASSERT_EQ(0, pipe(p));
1705 PipeHandler handler(&evb, p[0]);
1706 handler.registerHandler(EventHandler::READ);
1708 // It's definitely not running yet
1709 evb.terminateLoopSoon();
1711 // let it run, it should exit quickly.
1712 std::thread t([&] { evb.loop(); });
1715 handler.unregisterHandler();
1722 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1727 base.runInEventBaseThread([&](){
1735 TEST(EventBaseTest, LoopKeepAlive) {
1739 std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1740 /* sleep override */ std::this_thread::sleep_for(
1741 std::chrono::milliseconds(100));
1742 evb.runInEventBaseThread(
1743 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1753 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1759 evb.runInEventBaseThread([&] {
1760 t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1761 /* sleep override */ std::this_thread::sleep_for(
1762 std::chrono::milliseconds(100));
1763 evb.runInEventBaseThread(
1764 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1775 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1776 std::unique_ptr<EventBase> evb = folly::make_unique<EventBase>();
1780 std::thread evThread([&] {
1787 auto* ev = evb.get();
1788 EventBase::LoopKeepAlive keepAlive;
1789 ev->runInEventBaseThreadAndWait(
1790 [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); });
1791 ASSERT_FALSE(done) << "Loop finished before we asked it to";
1792 ev->terminateLoopSoon();
1793 /* sleep override */
1794 std::this_thread::sleep_for(std::chrono::milliseconds(30));
1795 ASSERT_FALSE(done) << "Loop terminated early";
1796 ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{});
1803 TEST(EventBaseTest, LoopKeepAliveShutdown) {
1804 auto evb = folly::make_unique<EventBase>();
1810 loopKeepAlive = evb->loopKeepAlive(),
1813 /* sleep override */ std::this_thread::sleep_for(
1814 std::chrono::milliseconds(100));
1815 evbPtr->runInEventBaseThread(
1816 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });