2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 #include <folly/Memory.h>
21 #include <folly/ScopeGuard.h>
23 #include <folly/io/async/AsyncTimeout.h>
24 #include <folly/io/async/EventBase.h>
25 #include <folly/io/async/EventHandler.h>
26 #include <folly/io/async/test/SocketPair.h>
27 #include <folly/io/async/test/Util.h>
28 #include <folly/portability/Unistd.h>
30 #include <folly/futures/Promise.h>
41 using std::unique_ptr;
46 using std::chrono::milliseconds;
47 using std::chrono::microseconds;
48 using std::chrono::duration_cast;
50 using namespace folly;
52 ///////////////////////////////////////////////////////////////////////////
53 // Tests for read and write events
54 ///////////////////////////////////////////////////////////////////////////
56 enum { BUF_SIZE = 4096 };
58 ssize_t writeToFD(int fd, size_t length) {
59 // write an arbitrary amount of data to the fd
60 auto bufv = vector<char>(length);
61 auto buf = bufv.data();
62 memset(buf, 'a', length);
63 ssize_t rc = write(fd, buf, length);
68 size_t writeUntilFull(int fd) {
69 // Write to the fd until EAGAIN is returned
70 size_t bytesWritten = 0;
72 memset(buf, 'a', sizeof(buf));
74 ssize_t rc = write(fd, buf, sizeof(buf));
76 CHECK_EQ(errno, EAGAIN);
85 ssize_t readFromFD(int fd, size_t length) {
86 // write an arbitrary amount of data to the fd
87 auto buf = vector<char>(length);
88 return read(fd, buf.data(), length);
91 size_t readUntilEmpty(int fd) {
92 // Read from the fd until EAGAIN is returned
96 int rc = read(fd, buf, sizeof(buf));
98 CHECK(false) << "unexpected EOF";
100 CHECK_EQ(errno, EAGAIN);
109 void checkReadUntilEmpty(int fd, size_t expectedLength) {
110 ASSERT_EQ(readUntilEmpty(fd), expectedLength);
113 struct ScheduledEvent {
119 void perform(int fd) {
120 if (events & EventHandler::READ) {
122 result = readUntilEmpty(fd);
124 result = readFromFD(fd, length);
127 if (events & EventHandler::WRITE) {
129 result = writeUntilFull(fd);
131 result = writeToFD(fd, length);
137 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
138 for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
139 eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
144 class TestHandler : public EventHandler {
146 TestHandler(EventBase* eventBase, int fd)
147 : EventHandler(eventBase, fd), fd_(fd) {}
149 void handlerReady(uint16_t events) noexcept override {
150 ssize_t bytesRead = 0;
151 ssize_t bytesWritten = 0;
153 // Read all available data, so EventBase will stop calling us
154 // until new data becomes available
155 bytesRead = readUntilEmpty(fd_);
157 if (events & WRITE) {
158 // Write until the pipe buffer is full, so EventBase will stop calling
159 // us until the other end has read some data
160 bytesWritten = writeUntilFull(fd_);
163 log.emplace_back(events, bytesRead, bytesWritten);
167 EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
170 , bytesRead(bytesRead)
171 , bytesWritten(bytesWritten) {}
176 ssize_t bytesWritten;
179 deque<EventRecord> log;
188 TEST(EventBaseTest, ReadEvent) {
192 // Register for read events
193 TestHandler handler(&eb, sp[0]);
194 handler.registerHandler(EventHandler::READ);
196 // Register timeouts to perform two write events
197 ScheduledEvent events[] = {
198 { 10, EventHandler::WRITE, 2345, 0 },
199 { 160, EventHandler::WRITE, 99, 0 },
202 scheduleEvents(&eb, sp[1], events);
209 // Since we didn't use the EventHandler::PERSIST flag, the handler should
210 // have received the first read, then unregistered itself. Check that only
211 // the first chunk of data was received.
212 ASSERT_EQ(handler.log.size(), 1);
213 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
214 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
215 milliseconds(events[0].milliseconds), milliseconds(90));
216 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
217 ASSERT_EQ(handler.log[0].bytesWritten, 0);
218 T_CHECK_TIMEOUT(start, end,
219 milliseconds(events[1].milliseconds), milliseconds(30));
221 // Make sure the second chunk of data is still waiting to be read.
222 size_t bytesRemaining = readUntilEmpty(sp[0]);
223 ASSERT_EQ(bytesRemaining, events[1].length);
227 * Test (READ | PERSIST)
229 TEST(EventBaseTest, ReadPersist) {
233 // Register for read events
234 TestHandler handler(&eb, sp[0]);
235 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
237 // Register several timeouts to perform writes
238 ScheduledEvent events[] = {
239 { 10, EventHandler::WRITE, 1024, 0 },
240 { 20, EventHandler::WRITE, 2211, 0 },
241 { 30, EventHandler::WRITE, 4096, 0 },
242 { 100, EventHandler::WRITE, 100, 0 },
245 scheduleEvents(&eb, sp[1], events);
247 // Schedule a timeout to unregister the handler after the third write
248 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
255 // The handler should have received the first 3 events,
256 // then been unregistered after that.
257 ASSERT_EQ(handler.log.size(), 3);
258 for (int n = 0; n < 3; ++n) {
259 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
260 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
261 milliseconds(events[n].milliseconds));
262 ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
263 ASSERT_EQ(handler.log[n].bytesWritten, 0);
265 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
267 // Make sure the data from the last write is still waiting to be read
268 size_t bytesRemaining = readUntilEmpty(sp[0]);
269 ASSERT_EQ(bytesRemaining, events[3].length);
273 * Test registering for READ when the socket is immediately readable
275 TEST(EventBaseTest, ReadImmediate) {
279 // Write some data to the socket so the other end will
280 // be immediately readable
281 size_t dataLength = 1234;
282 writeToFD(sp[1], dataLength);
284 // Register for read events
285 TestHandler handler(&eb, sp[0]);
286 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
288 // Register a timeout to perform another write
289 ScheduledEvent events[] = {
290 { 10, EventHandler::WRITE, 2345, 0 },
293 scheduleEvents(&eb, sp[1], events);
295 // Schedule a timeout to unregister the handler
296 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
303 ASSERT_EQ(handler.log.size(), 2);
305 // There should have been 1 event for immediate readability
306 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
307 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
308 ASSERT_EQ(handler.log[0].bytesRead, dataLength);
309 ASSERT_EQ(handler.log[0].bytesWritten, 0);
311 // There should be another event after the timeout wrote more data
312 ASSERT_EQ(handler.log[1].events, EventHandler::READ);
313 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
314 milliseconds(events[0].milliseconds));
315 ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
316 ASSERT_EQ(handler.log[1].bytesWritten, 0);
318 T_CHECK_TIMEOUT(start, end, milliseconds(20));
324 TEST(EventBaseTest, WriteEvent) {
328 // Fill up the write buffer before starting
329 size_t initialBytesWritten = writeUntilFull(sp[0]);
331 // Register for write events
332 TestHandler handler(&eb, sp[0]);
333 handler.registerHandler(EventHandler::WRITE);
335 // Register timeouts to perform two reads
336 ScheduledEvent events[] = {
337 { 10, EventHandler::READ, 0, 0 },
338 { 60, EventHandler::READ, 0, 0 },
341 scheduleEvents(&eb, sp[1], events);
348 // Since we didn't use the EventHandler::PERSIST flag, the handler should
349 // have only been able to write once, then unregistered itself.
350 ASSERT_EQ(handler.log.size(), 1);
351 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
352 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
353 milliseconds(events[0].milliseconds));
354 ASSERT_EQ(handler.log[0].bytesRead, 0);
355 ASSERT_GT(handler.log[0].bytesWritten, 0);
356 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
358 ASSERT_EQ(events[0].result, initialBytesWritten);
359 ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
363 * Test (WRITE | PERSIST)
365 TEST(EventBaseTest, WritePersist) {
369 // Fill up the write buffer before starting
370 size_t initialBytesWritten = writeUntilFull(sp[0]);
372 // Register for write events
373 TestHandler handler(&eb, sp[0]);
374 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
376 // Register several timeouts to read from the socket at several intervals
377 ScheduledEvent events[] = {
378 { 10, EventHandler::READ, 0, 0 },
379 { 40, EventHandler::READ, 0, 0 },
380 { 70, EventHandler::READ, 0, 0 },
381 { 100, EventHandler::READ, 0, 0 },
384 scheduleEvents(&eb, sp[1], events);
386 // Schedule a timeout to unregister the handler after the third read
387 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
394 // The handler should have received the first 3 events,
395 // then been unregistered after that.
396 ASSERT_EQ(handler.log.size(), 3);
397 ASSERT_EQ(events[0].result, initialBytesWritten);
398 for (int n = 0; n < 3; ++n) {
399 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
400 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
401 milliseconds(events[n].milliseconds));
402 ASSERT_EQ(handler.log[n].bytesRead, 0);
403 ASSERT_GT(handler.log[n].bytesWritten, 0);
404 ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
406 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
410 * Test registering for WRITE when the socket is immediately writable
412 TEST(EventBaseTest, WriteImmediate) {
416 // Register for write events
417 TestHandler handler(&eb, sp[0]);
418 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
420 // Register a timeout to perform a read
421 ScheduledEvent events[] = {
422 { 10, EventHandler::READ, 0, 0 },
425 scheduleEvents(&eb, sp[1], events);
427 // Schedule a timeout to unregister the handler
428 int64_t unregisterTimeout = 40;
429 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
437 ASSERT_EQ(handler.log.size(), 2);
439 // Since the socket buffer was initially empty,
440 // there should have been 1 event for immediate writability
441 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
442 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
443 ASSERT_EQ(handler.log[0].bytesRead, 0);
444 ASSERT_GT(handler.log[0].bytesWritten, 0);
446 // There should be another event after the timeout wrote more data
447 ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
448 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
449 milliseconds(events[0].milliseconds));
450 ASSERT_EQ(handler.log[1].bytesRead, 0);
451 ASSERT_GT(handler.log[1].bytesWritten, 0);
453 T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
457 * Test (READ | WRITE) when the socket becomes readable first
459 TEST(EventBaseTest, ReadWrite) {
463 // Fill up the write buffer before starting
464 size_t sock0WriteLength = writeUntilFull(sp[0]);
466 // Register for read and write events
467 TestHandler handler(&eb, sp[0]);
468 handler.registerHandler(EventHandler::READ_WRITE);
470 // Register timeouts to perform a write then a read.
471 ScheduledEvent events[] = {
472 { 10, EventHandler::WRITE, 2345, 0 },
473 { 40, EventHandler::READ, 0, 0 },
476 scheduleEvents(&eb, sp[1], events);
483 // Since we didn't use the EventHandler::PERSIST flag, the handler should
484 // have only noticed readability, then unregistered itself. Check that only
485 // one event was logged.
486 ASSERT_EQ(handler.log.size(), 1);
487 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
488 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
489 milliseconds(events[0].milliseconds));
490 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
491 ASSERT_EQ(handler.log[0].bytesWritten, 0);
492 ASSERT_EQ(events[1].result, sock0WriteLength);
493 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
497 * Test (READ | WRITE) when the socket becomes writable first
499 TEST(EventBaseTest, WriteRead) {
503 // Fill up the write buffer before starting
504 size_t sock0WriteLength = writeUntilFull(sp[0]);
506 // Register for read and write events
507 TestHandler handler(&eb, sp[0]);
508 handler.registerHandler(EventHandler::READ_WRITE);
510 // Register timeouts to perform a read then a write.
511 size_t sock1WriteLength = 2345;
512 ScheduledEvent events[] = {
513 { 10, EventHandler::READ, 0, 0 },
514 { 40, EventHandler::WRITE, sock1WriteLength, 0 },
517 scheduleEvents(&eb, sp[1], events);
524 // Since we didn't use the EventHandler::PERSIST flag, the handler should
525 // have only noticed writability, then unregistered itself. Check that only
526 // one event was logged.
527 ASSERT_EQ(handler.log.size(), 1);
528 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
529 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
530 milliseconds(events[0].milliseconds));
531 ASSERT_EQ(handler.log[0].bytesRead, 0);
532 ASSERT_GT(handler.log[0].bytesWritten, 0);
533 ASSERT_EQ(events[0].result, sock0WriteLength);
534 ASSERT_EQ(events[1].result, sock1WriteLength);
535 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
537 // Make sure the written data is still waiting to be read.
538 size_t bytesRemaining = readUntilEmpty(sp[0]);
539 ASSERT_EQ(bytesRemaining, events[1].length);
543 * Test (READ | WRITE) when the socket becomes readable and writable
546 TEST(EventBaseTest, ReadWriteSimultaneous) {
550 // Fill up the write buffer before starting
551 size_t sock0WriteLength = writeUntilFull(sp[0]);
553 // Register for read and write events
554 TestHandler handler(&eb, sp[0]);
555 handler.registerHandler(EventHandler::READ_WRITE);
557 // Register a timeout to perform a read and write together
558 ScheduledEvent events[] = {
559 { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
562 scheduleEvents(&eb, sp[1], events);
569 // It's not strictly required that the EventBase register us about both
570 // events in the same call. So, it's possible that if the EventBase
571 // implementation changes this test could start failing, and it wouldn't be
572 // considered breaking the API. However for now it's nice to exercise this
574 ASSERT_EQ(handler.log.size(), 1);
575 ASSERT_EQ(handler.log[0].events,
576 EventHandler::READ | EventHandler::WRITE);
577 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
578 milliseconds(events[0].milliseconds));
579 ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
580 ASSERT_GT(handler.log[0].bytesWritten, 0);
581 T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
585 * Test (READ | WRITE | PERSIST)
587 TEST(EventBaseTest, ReadWritePersist) {
591 // Register for read and write events
592 TestHandler handler(&eb, sp[0]);
593 handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
594 EventHandler::PERSIST);
596 // Register timeouts to perform several reads and writes
597 ScheduledEvent events[] = {
598 { 10, EventHandler::WRITE, 2345, 0 },
599 { 20, EventHandler::READ, 0, 0 },
600 { 35, EventHandler::WRITE, 200, 0 },
601 { 45, EventHandler::WRITE, 15, 0 },
602 { 55, EventHandler::READ, 0, 0 },
603 { 120, EventHandler::WRITE, 2345, 0 },
606 scheduleEvents(&eb, sp[1], events);
608 // Schedule a timeout to unregister the handler
609 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
616 ASSERT_EQ(handler.log.size(), 6);
618 // Since we didn't fill up the write buffer immediately, there should
619 // be an immediate event for writability.
620 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
621 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
622 ASSERT_EQ(handler.log[0].bytesRead, 0);
623 ASSERT_GT(handler.log[0].bytesWritten, 0);
625 // Events 1 through 5 should correspond to the scheduled events
626 for (int n = 1; n < 6; ++n) {
627 ScheduledEvent* event = &events[n - 1];
628 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
629 milliseconds(event->milliseconds));
630 if (event->events == EventHandler::READ) {
631 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
632 ASSERT_EQ(handler.log[n].bytesRead, 0);
633 ASSERT_GT(handler.log[n].bytesWritten, 0);
635 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
636 ASSERT_EQ(handler.log[n].bytesRead, event->length);
637 ASSERT_EQ(handler.log[n].bytesWritten, 0);
641 // The timeout should have unregistered the handler before the last write.
642 // Make sure that data is still waiting to be read
643 size_t bytesRemaining = readUntilEmpty(sp[0]);
644 ASSERT_EQ(bytesRemaining, events[5].length);
648 class PartialReadHandler : public TestHandler {
650 PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
651 : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
653 void handlerReady(uint16_t events) noexcept override {
654 assert(events == EventHandler::READ);
655 ssize_t bytesRead = readFromFD(fd_, readLength_);
656 log.emplace_back(events, bytesRead, 0);
665 * Test reading only part of the available data when a read event is fired.
666 * When PERSIST is used, make sure the handler gets notified again the next
667 * time around the loop.
669 TEST(EventBaseTest, ReadPartial) {
673 // Register for read events
674 size_t readLength = 100;
675 PartialReadHandler handler(&eb, sp[0], readLength);
676 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
678 // Register a timeout to perform a single write,
679 // with more data than PartialReadHandler will read at once
680 ScheduledEvent events[] = {
681 { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
684 scheduleEvents(&eb, sp[1], events);
686 // Schedule a timeout to unregister the handler
687 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
694 ASSERT_EQ(handler.log.size(), 4);
696 // The first 3 invocations should read readLength bytes each
697 for (int n = 0; n < 3; ++n) {
698 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
699 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
700 milliseconds(events[0].milliseconds));
701 ASSERT_EQ(handler.log[n].bytesRead, readLength);
702 ASSERT_EQ(handler.log[n].bytesWritten, 0);
704 // The last read only has readLength/2 bytes
705 ASSERT_EQ(handler.log[3].events, EventHandler::READ);
706 T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
707 milliseconds(events[0].milliseconds));
708 ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
709 ASSERT_EQ(handler.log[3].bytesWritten, 0);
713 class PartialWriteHandler : public TestHandler {
715 PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
716 : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
718 void handlerReady(uint16_t events) noexcept override {
719 assert(events == EventHandler::WRITE);
720 ssize_t bytesWritten = writeToFD(fd_, writeLength_);
721 log.emplace_back(events, 0, bytesWritten);
730 * Test writing without completely filling up the write buffer when the fd
731 * becomes writable. When PERSIST is used, make sure the handler gets
732 * notified again the next time around the loop.
734 TEST(EventBaseTest, WritePartial) {
738 // Fill up the write buffer before starting
739 size_t initialBytesWritten = writeUntilFull(sp[0]);
741 // Register for write events
742 size_t writeLength = 100;
743 PartialWriteHandler handler(&eb, sp[0], writeLength);
744 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
746 // Register a timeout to read, so that more data can be written
747 ScheduledEvent events[] = {
748 { 10, EventHandler::READ, 0, 0 },
751 scheduleEvents(&eb, sp[1], events);
753 // Schedule a timeout to unregister the handler
754 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
761 // Depending on how big the socket buffer is, there will be multiple writes
762 // Only check the first 5
764 ASSERT_GE(handler.log.size(), numChecked);
765 ASSERT_EQ(events[0].result, initialBytesWritten);
767 // The first 3 invocations should read writeLength bytes each
768 for (int n = 0; n < numChecked; ++n) {
769 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
770 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
771 milliseconds(events[0].milliseconds));
772 ASSERT_EQ(handler.log[n].bytesRead, 0);
773 ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
779 * Test destroying a registered EventHandler
781 TEST(EventBaseTest, DestroyHandler) {
782 class DestroyHandler : public AsyncTimeout {
784 DestroyHandler(EventBase* eb, EventHandler* h)
788 void timeoutExpired() noexcept override { delete handler_; }
791 EventHandler* handler_;
797 // Fill up the write buffer before starting
798 size_t initialBytesWritten = writeUntilFull(sp[0]);
800 // Register for write events
801 TestHandler* handler = new TestHandler(&eb, sp[0]);
802 handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
804 // After 10ms, read some data, so that the handler
805 // will be notified that it can write.
806 eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
809 // Start a timer to destroy the handler after 25ms
810 // This mainly just makes sure the code doesn't break or assert
811 DestroyHandler dh(&eb, handler);
812 dh.scheduleTimeout(25);
818 // Make sure the EventHandler was uninstalled properly when it was
819 // destroyed, and the EventBase loop exited
820 T_CHECK_TIMEOUT(start, end, milliseconds(25));
822 // Make sure that the handler wrote data to the socket
823 // before it was destroyed
824 size_t bytesRemaining = readUntilEmpty(sp[1]);
825 ASSERT_GT(bytesRemaining, 0);
829 ///////////////////////////////////////////////////////////////////////////
830 // Tests for timeout events
831 ///////////////////////////////////////////////////////////////////////////
833 TEST(EventBaseTest, RunAfterDelay) {
836 TimePoint timestamp1(false);
837 TimePoint timestamp2(false);
838 TimePoint timestamp3(false);
839 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
840 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
841 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40);
847 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
848 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
849 T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
850 T_CHECK_TIMEOUT(start, end, milliseconds(40));
854 * Test the behavior of tryRunAfterDelay() when some timeouts are
855 * still scheduled when the EventBase is destroyed.
857 TEST(EventBaseTest, RunAfterDelayDestruction) {
858 TimePoint timestamp1(false);
859 TimePoint timestamp2(false);
860 TimePoint timestamp3(false);
861 TimePoint timestamp4(false);
862 TimePoint start(false);
863 TimePoint end(false);
868 // Run two normal timeouts
869 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
870 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
872 // Schedule a timeout to stop the event loop after 40ms
873 eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
875 // Schedule 2 timeouts that would fire after the event loop stops
876 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80);
877 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160);
884 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
885 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
886 T_CHECK_TIMEOUT(start, end, milliseconds(40));
888 ASSERT_TRUE(timestamp3.isUnset());
889 ASSERT_TRUE(timestamp4.isUnset());
891 // Ideally this test should be run under valgrind to ensure that no
895 class TestTimeout : public AsyncTimeout {
897 explicit TestTimeout(EventBase* eventBase)
898 : AsyncTimeout(eventBase)
899 , timestamp(false) {}
901 void timeoutExpired() noexcept override { timestamp.reset(); }
906 TEST(EventBaseTest, BasicTimeouts) {
912 t1.scheduleTimeout(10);
913 t2.scheduleTimeout(20);
914 t3.scheduleTimeout(40);
920 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
921 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
922 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
923 T_CHECK_TIMEOUT(start, end, milliseconds(40));
926 class ReschedulingTimeout : public AsyncTimeout {
928 ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
930 , timeouts_(timeouts)
931 , iterator_(timeouts_.begin()) {}
937 void timeoutExpired() noexcept override {
938 timestamps.emplace_back();
943 if (iterator_ != timeouts_.end()) {
944 uint32_t timeout = *iterator_;
946 scheduleTimeout(timeout);
950 vector<TimePoint> timestamps;
953 vector<uint32_t> timeouts_;
954 vector<uint32_t>::const_iterator iterator_;
958 * Test rescheduling the same timeout multiple times
960 TEST(EventBaseTest, ReuseTimeout) {
963 vector<uint32_t> timeouts;
964 timeouts.push_back(10);
965 timeouts.push_back(30);
966 timeouts.push_back(15);
968 ReschedulingTimeout t(&eb, timeouts);
975 // Use a higher tolerance than usual. We're waiting on 3 timeouts
976 // consecutively. In general, each timeout may go over by a few
977 // milliseconds, and we're tripling this error by witing on 3 timeouts.
978 milliseconds tolerance{6};
980 ASSERT_EQ(timeouts.size(), t.timestamps.size());
982 for (size_t n = 0; n < timeouts.size(); ++n) {
983 total += timeouts[n];
984 T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
986 T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
990 * Test rescheduling a timeout before it has fired
992 TEST(EventBaseTest, RescheduleTimeout) {
999 t1.scheduleTimeout(15);
1000 t2.scheduleTimeout(30);
1001 t3.scheduleTimeout(30);
1003 auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1004 &AsyncTimeout::scheduleTimeout);
1006 // after 10ms, reschedule t2 to run sooner than originally scheduled
1007 eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1008 // after 10ms, reschedule t3 to run later than originally scheduled
1009 eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1015 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1016 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1017 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1018 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1022 * Test cancelling a timeout
1024 TEST(EventBaseTest, CancelTimeout) {
1027 vector<uint32_t> timeouts;
1028 timeouts.push_back(10);
1029 timeouts.push_back(30);
1030 timeouts.push_back(25);
1032 ReschedulingTimeout t(&eb, timeouts);
1034 eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1040 ASSERT_EQ(t.timestamps.size(), 2);
1041 T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1042 T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1043 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1047 * Test destroying a scheduled timeout object
1049 TEST(EventBaseTest, DestroyTimeout) {
1050 class DestroyTimeout : public AsyncTimeout {
1052 DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1056 void timeoutExpired() noexcept override { delete timeout_; }
1059 AsyncTimeout* timeout_;
1064 TestTimeout* t1 = new TestTimeout(&eb);
1065 t1->scheduleTimeout(30);
1067 DestroyTimeout dt(&eb, t1);
1068 dt.scheduleTimeout(10);
1074 T_CHECK_TIMEOUT(start, end, milliseconds(10));
1078 ///////////////////////////////////////////////////////////////////////////
1079 // Test for runInThreadTestFunc()
1080 ///////////////////////////////////////////////////////////////////////////
1082 struct RunInThreadData {
1083 RunInThreadData(int numThreads, int opsPerThread)
1084 : opsPerThread(opsPerThread)
1085 , opsToGo(numThreads*opsPerThread) {}
1088 deque< pair<int, int> > values;
1094 struct RunInThreadArg {
1095 RunInThreadArg(RunInThreadData* data,
1102 RunInThreadData* data;
1107 void runInThreadTestFunc(RunInThreadArg* arg) {
1108 arg->data->values.emplace_back(arg->thread, arg->value);
1109 RunInThreadData* data = arg->data;
1112 if(--data->opsToGo == 0) {
1113 // Break out of the event base loop if we are the last thread running
1114 data->evb.terminateLoopSoon();
1118 TEST(EventBaseTest, RunInThread) {
1119 constexpr uint32_t numThreads = 50;
1120 constexpr uint32_t opsPerThread = 100;
1121 RunInThreadData data(numThreads, opsPerThread);
1123 deque<std::thread> threads;
1125 // Wait on all of the threads.
1126 for (auto& thread : threads) {
1131 for (uint32_t i = 0; i < numThreads; ++i) {
1132 threads.emplace_back([i, &data] {
1133 for (int n = 0; n < data.opsPerThread; ++n) {
1134 RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1135 data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1141 // Add a timeout event to run after 3 seconds.
1142 // Otherwise loop() will return immediately since there are no events to run.
1143 // Once the last thread exits, it will stop the loop(). However, this
1144 // timeout also stops the loop in case there is a bug performing the normal
1146 data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1153 // Verify that the loop exited because all threads finished and requested it
1154 // to stop. This should happen much sooner than the 3 second timeout.
1155 // Assert that it happens in under a second. (This is still tons of extra
1158 auto timeTaken = std::chrono::duration_cast<milliseconds>(
1159 end.getTime() - start.getTime());
1160 ASSERT_LT(timeTaken.count(), 1000);
1161 VLOG(11) << "Time taken: " << timeTaken.count();
1163 // Verify that we have all of the events from every thread
1164 int expectedValues[numThreads];
1165 for (uint32_t n = 0; n < numThreads; ++n) {
1166 expectedValues[n] = 0;
1168 for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1169 it != data.values.end();
1171 int threadID = it->first;
1172 int value = it->second;
1173 ASSERT_EQ(expectedValues[threadID], value);
1174 ++expectedValues[threadID];
1176 for (uint32_t n = 0; n < numThreads; ++n) {
1177 ASSERT_EQ(expectedValues[n], opsPerThread);
1181 // This test simulates some calls, and verifies that the waiting happens by
1182 // triggering what otherwise would be race conditions, and trying to detect
1183 // whether any of the race conditions happened.
1184 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1185 const size_t c = 256;
1186 vector<unique_ptr<atomic<size_t>>> atoms(c);
1187 for (size_t i = 0; i < c; ++i) {
1188 auto& atom = atoms.at(i);
1189 atom = make_unique<atomic<size_t>>(0);
1191 vector<thread> threads;
1192 for (size_t i = 0; i < c; ++i) {
1193 threads.emplace_back([&atoms, i] {
1195 auto& atom = *atoms.at(i);
1196 auto ebth = thread([&] { eb.loopForever(); });
1197 eb.waitUntilRunning();
1198 eb.runInEventBaseThreadAndWait([&] {
1200 atom.compare_exchange_weak(
1201 x, 1, std::memory_order_release, std::memory_order_relaxed);
1204 atom.compare_exchange_weak(
1205 x, 2, std::memory_order_release, std::memory_order_relaxed);
1206 eb.terminateLoopSoon();
1210 for (size_t i = 0; i < c; ++i) {
1211 auto& th = threads.at(i);
1215 for (auto& atom : atoms) sum += *atom;
1219 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1221 thread th(&EventBase::loopForever, &eb);
1223 eb.terminateLoopSoon();
1226 auto mutated = false;
1227 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1230 EXPECT_TRUE(mutated);
1233 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1235 thread th(&EventBase::loopForever, &eb);
1237 eb.terminateLoopSoon();
1240 eb.runInEventBaseThreadAndWait([&] {
1241 auto mutated = false;
1242 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1245 EXPECT_TRUE(mutated);
1249 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1251 auto mutated = false;
1252 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1255 EXPECT_TRUE(mutated);
1258 ///////////////////////////////////////////////////////////////////////////
1259 // Tests for runInLoop()
1260 ///////////////////////////////////////////////////////////////////////////
1262 class CountedLoopCallback : public EventBase::LoopCallback {
1264 CountedLoopCallback(EventBase* eventBase,
1266 std::function<void()> action =
1267 std::function<void()>())
1268 : eventBase_(eventBase)
1270 , action_(action) {}
1272 void runLoopCallback() noexcept override {
1275 eventBase_->runInLoop(this);
1276 } else if (action_) {
1281 unsigned int getCount() const {
1286 EventBase* eventBase_;
1287 unsigned int count_;
1288 std::function<void()> action_;
1291 // Test that EventBase::loop() doesn't exit while there are
1292 // still LoopCallbacks remaining to be invoked.
1293 TEST(EventBaseTest, RepeatedRunInLoop) {
1294 EventBase eventBase;
1296 CountedLoopCallback c(&eventBase, 10);
1297 eventBase.runInLoop(&c);
1298 // The callback shouldn't have run immediately
1299 ASSERT_EQ(c.getCount(), 10);
1302 // loop() should loop until the CountedLoopCallback stops
1303 // re-installing itself.
1304 ASSERT_EQ(c.getCount(), 0);
1307 // Test that EventBase::loop() works as expected without time measurements.
1308 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1309 EventBase eventBase(false);
1311 CountedLoopCallback c(&eventBase, 10);
1312 eventBase.runInLoop(&c);
1313 // The callback shouldn't have run immediately
1314 ASSERT_EQ(c.getCount(), 10);
1317 // loop() should loop until the CountedLoopCallback stops
1318 // re-installing itself.
1319 ASSERT_EQ(c.getCount(), 0);
1322 // Test runInLoop() calls with terminateLoopSoon()
1323 TEST(EventBaseTest, RunInLoopStopLoop) {
1324 EventBase eventBase;
1326 CountedLoopCallback c1(&eventBase, 20);
1327 CountedLoopCallback c2(&eventBase, 10,
1328 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1330 eventBase.runInLoop(&c1);
1331 eventBase.runInLoop(&c2);
1332 ASSERT_EQ(c1.getCount(), 20);
1333 ASSERT_EQ(c2.getCount(), 10);
1335 eventBase.loopForever();
1337 // c2 should have stopped the loop after 10 iterations
1338 ASSERT_EQ(c2.getCount(), 0);
1340 // We allow the EventBase to run the loop callbacks in whatever order it
1341 // chooses. We'll accept c1's count being either 10 (if the loop terminated
1342 // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1345 // (With the current code, c1 will always run 10 times, but we don't consider
1346 // this a hard API requirement.)
1347 ASSERT_GE(c1.getCount(), 10);
1348 ASSERT_LE(c1.getCount(), 11);
1351 TEST(EventBaseTest, TryRunningAfterTerminate) {
1352 EventBase eventBase;
1353 CountedLoopCallback c1(&eventBase, 1,
1354 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1355 eventBase.runInLoop(&c1);
1356 eventBase.loopForever();
1358 eventBase.runInEventBaseThread([&]() {
1365 // Test cancelling runInLoop() callbacks
1366 TEST(EventBaseTest, CancelRunInLoop) {
1367 EventBase eventBase;
1369 CountedLoopCallback c1(&eventBase, 20);
1370 CountedLoopCallback c2(&eventBase, 20);
1371 CountedLoopCallback c3(&eventBase, 20);
1373 std::function<void()> cancelC1Action =
1374 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1375 std::function<void()> cancelC2Action =
1376 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1378 CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1379 CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1381 // Install cancelC1 after c1
1382 eventBase.runInLoop(&c1);
1383 eventBase.runInLoop(&cancelC1);
1385 // Install cancelC2 before c2
1386 eventBase.runInLoop(&cancelC2);
1387 eventBase.runInLoop(&c2);
1390 eventBase.runInLoop(&c3);
1392 ASSERT_EQ(c1.getCount(), 20);
1393 ASSERT_EQ(c2.getCount(), 20);
1394 ASSERT_EQ(c3.getCount(), 20);
1395 ASSERT_EQ(cancelC1.getCount(), 10);
1396 ASSERT_EQ(cancelC2.getCount(), 10);
1401 // cancelC1 and cancelC3 should have both fired after 10 iterations and
1402 // stopped re-installing themselves
1403 ASSERT_EQ(cancelC1.getCount(), 0);
1404 ASSERT_EQ(cancelC2.getCount(), 0);
1405 // c3 should have continued on for the full 20 iterations
1406 ASSERT_EQ(c3.getCount(), 0);
1408 // c1 and c2 should have both been cancelled on the 10th iteration.
1410 // Callbacks are always run in the order they are installed,
1411 // so c1 should have fired 10 times, and been canceled after it ran on the
1412 // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
1413 // have run before it on the 10th iteration, and cancelled it before it
1415 ASSERT_EQ(c1.getCount(), 10);
1416 ASSERT_EQ(c2.getCount(), 11);
1419 class TerminateTestCallback : public EventBase::LoopCallback,
1420 public EventHandler {
1422 TerminateTestCallback(EventBase* eventBase, int fd)
1423 : EventHandler(eventBase, fd),
1424 eventBase_(eventBase),
1425 loopInvocations_(0),
1426 maxLoopInvocations_(0),
1427 eventInvocations_(0),
1428 maxEventInvocations_(0) {}
1430 void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1431 loopInvocations_ = 0;
1432 maxLoopInvocations_ = maxLoopInvocations;
1433 eventInvocations_ = 0;
1434 maxEventInvocations_ = maxEventInvocations;
1436 cancelLoopCallback();
1437 unregisterHandler();
1440 void handlerReady(uint16_t /* events */) noexcept override {
1441 // We didn't register with PERSIST, so we will have been automatically
1442 // unregistered already.
1443 ASSERT_FALSE(isHandlerRegistered());
1445 ++eventInvocations_;
1446 if (eventInvocations_ >= maxEventInvocations_) {
1450 eventBase_->runInLoop(this);
1452 void runLoopCallback() noexcept override {
1454 if (loopInvocations_ >= maxLoopInvocations_) {
1458 registerHandler(READ);
1461 uint32_t getLoopInvocations() const {
1462 return loopInvocations_;
1464 uint32_t getEventInvocations() const {
1465 return eventInvocations_;
1469 EventBase* eventBase_;
1470 uint32_t loopInvocations_;
1471 uint32_t maxLoopInvocations_;
1472 uint32_t eventInvocations_;
1473 uint32_t maxEventInvocations_;
1477 * Test that EventBase::loop() correctly detects when there are no more events
1480 * This uses a single callback, which alternates registering itself as a loop
1481 * callback versus a EventHandler callback. This exercises a regression where
1482 * EventBase::loop() incorrectly exited if there were no more fd handlers
1483 * registered, but a loop callback installed a new fd handler.
1485 TEST(EventBaseTest, LoopTermination) {
1486 EventBase eventBase;
1488 // Open a pipe and close the write end,
1489 // so the read endpoint will be readable
1491 int rc = pipe(pipeFds);
1494 TerminateTestCallback callback(&eventBase, pipeFds[0]);
1496 // Test once where the callback will exit after a loop callback
1497 callback.reset(10, 100);
1498 eventBase.runInLoop(&callback);
1500 ASSERT_EQ(callback.getLoopInvocations(), 10);
1501 ASSERT_EQ(callback.getEventInvocations(), 9);
1503 // Test once where the callback will exit after an fd event callback
1504 callback.reset(100, 7);
1505 eventBase.runInLoop(&callback);
1507 ASSERT_EQ(callback.getLoopInvocations(), 7);
1508 ASSERT_EQ(callback.getEventInvocations(), 7);
1513 ///////////////////////////////////////////////////////////////////////////
1514 // Tests for latency calculations
1515 ///////////////////////////////////////////////////////////////////////////
1517 class IdleTimeTimeoutSeries : public AsyncTimeout {
1521 explicit IdleTimeTimeoutSeries(EventBase *base,
1522 std::deque<std::uint64_t>& timeout) :
1529 ~IdleTimeTimeoutSeries() override {}
1531 void timeoutExpired() noexcept override {
1534 if(timeout_.empty()){
1537 uint64_t sleepTime = timeout_.front();
1538 timeout_.pop_front();
1546 int getTimeouts() const {
1552 std::deque<uint64_t>& timeout_;
1556 * Verify that idle time is correctly accounted for when decaying our loop
1559 * This works by creating a high loop time (via usleep), expecting a latency
1560 * callback with known value, and then scheduling a timeout for later. This
1561 * later timeout is far enough in the future that the idle time should have
1562 * caused the loop time to decay.
1564 TEST(EventBaseTest, IdleTime) {
1565 EventBase eventBase;
1566 eventBase.setLoadAvgMsec(1000);
1567 eventBase.resetLoadAvg(5900.0);
1568 std::deque<uint64_t> timeouts0(4, 8080);
1569 timeouts0.push_front(8000);
1570 timeouts0.push_back(14000);
1571 IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1572 std::deque<uint64_t> timeouts(20, 20);
1573 std::unique_ptr<IdleTimeTimeoutSeries> tos;
1574 int64_t testStart = duration_cast<microseconds>(
1575 std::chrono::steady_clock::now().time_since_epoch()).count();
1576 bool hostOverloaded = false;
1578 int latencyCallbacks = 0;
1579 eventBase.setMaxLatency(6000, [&]() {
1582 switch (latencyCallbacks) {
1584 if (tos0.getTimeouts() < 6) {
1585 // This could only happen if the host this test is running
1586 // on is heavily loaded.
1587 int64_t maxLatencyReached = duration_cast<microseconds>(
1588 std::chrono::steady_clock::now().time_since_epoch()).count();
1589 ASSERT_LE(43800, maxLatencyReached - testStart);
1590 hostOverloaded = true;
1593 ASSERT_EQ(6, tos0.getTimeouts());
1594 ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1595 ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1596 tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1600 FAIL() << "Unexpected latency callback";
1605 // Kick things off with an "immedite" timeout
1606 tos0.scheduleTimeout(1);
1610 if (hostOverloaded) {
1614 ASSERT_EQ(1, latencyCallbacks);
1615 ASSERT_EQ(7, tos0.getTimeouts());
1616 ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1617 ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1619 ASSERT_EQ(21, tos->getTimeouts());
1623 * Test that thisLoop functionality works with terminateLoopSoon
1625 TEST(EventBaseTest, ThisLoop) {
1627 bool runInLoop = false;
1628 bool runThisLoop = false;
1631 eb.terminateLoopSoon();
1632 eb.runInLoop([&]() {
1635 eb.runInLoop([&]() {
1642 ASSERT_FALSE(runInLoop);
1643 // Should work with thisLoop
1644 ASSERT_TRUE(runThisLoop);
1647 TEST(EventBaseTest, EventBaseThreadLoop) {
1651 base.runInEventBaseThread([&](){
1656 ASSERT_EQ(true, ran);
1659 TEST(EventBaseTest, EventBaseThreadName) {
1661 base.setName("foo");
1664 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1666 pthread_getname_np(pthread_self(), name, 16);
1667 ASSERT_EQ(0, strcmp("foo", name));
1671 TEST(EventBaseTest, RunBeforeLoop) {
1673 CountedLoopCallback cb(&base, 1, [&](){
1674 base.terminateLoopSoon();
1676 base.runBeforeLoop(&cb);
1678 ASSERT_EQ(cb.getCount(), 0);
1681 TEST(EventBaseTest, RunBeforeLoopWait) {
1683 CountedLoopCallback cb(&base, 1);
1684 base.tryRunAfterDelay([&](){
1685 base.terminateLoopSoon();
1687 base.runBeforeLoop(&cb);
1690 // Check that we only ran once, and did not loop multiple times.
1691 ASSERT_EQ(cb.getCount(), 0);
1694 class PipeHandler : public EventHandler {
1696 PipeHandler(EventBase* eventBase, int fd)
1697 : EventHandler(eventBase, fd) {}
1699 void handlerReady(uint16_t /* events */) noexcept override { abort(); }
1702 TEST(EventBaseTest, StopBeforeLoop) {
1705 // Give the evb something to do.
1707 ASSERT_EQ(0, pipe(p));
1708 PipeHandler handler(&evb, p[0]);
1709 handler.registerHandler(EventHandler::READ);
1711 // It's definitely not running yet
1712 evb.terminateLoopSoon();
1714 // let it run, it should exit quickly.
1715 std::thread t([&] { evb.loop(); });
1718 handler.unregisterHandler();
1725 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1730 base.runInEventBaseThread([&](){
1738 TEST(EventBaseTest, LoopKeepAlive) {
1742 std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1743 /* sleep override */ std::this_thread::sleep_for(
1744 std::chrono::milliseconds(100));
1745 evb.runInEventBaseThread(
1746 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1756 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1762 evb.runInEventBaseThread([&] {
1763 t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1764 /* sleep override */ std::this_thread::sleep_for(
1765 std::chrono::milliseconds(100));
1766 evb.runInEventBaseThread(
1767 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1778 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1779 std::unique_ptr<EventBase> evb = folly::make_unique<EventBase>();
1783 std::thread evThread([&] {
1790 auto* ev = evb.get();
1791 EventBase::LoopKeepAlive keepAlive;
1792 ev->runInEventBaseThreadAndWait(
1793 [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); });
1794 ASSERT_FALSE(done) << "Loop finished before we asked it to";
1795 ev->terminateLoopSoon();
1796 /* sleep override */
1797 std::this_thread::sleep_for(std::chrono::milliseconds(30));
1798 ASSERT_FALSE(done) << "Loop terminated early";
1799 ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{});
1806 TEST(EventBaseTest, LoopKeepAliveShutdown) {
1807 auto evb = folly::make_unique<EventBase>();
1813 loopKeepAlive = evb->loopKeepAlive(),
1816 /* sleep override */ std::this_thread::sleep_for(
1817 std::chrono::milliseconds(100));
1818 evbPtr->runInEventBaseThread(
1819 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1829 TEST(EventBaseTest, DrivableExecutorTest) {
1830 folly::Promise<bool> p;
1831 auto f = p.getFuture();
1833 bool finished = false;
1836 /* sleep override */
1837 std::this_thread::sleep_for(std::chrono::microseconds(10));
1839 base.runInEventBaseThread([&]() { p.setValue(true); });
1842 // Ensure drive does not busy wait
1843 base.drive(); // TODO: fix notification queue init() extra wakeup
1845 EXPECT_TRUE(finished);
1847 folly::Promise<bool> p2;
1848 auto f2 = p2.getFuture();
1849 // Ensure waitVia gets woken up properly, even from
1850 // a separate thread.
1851 base.runAfterDelay([&]() { p2.setValue(true); }, 10);
1853 EXPECT_TRUE(f2.isReady());
1858 TEST(EventBaseTest, RequestContextTest) {
1860 auto defaultCtx = RequestContext::get();
1863 RequestContextScopeGuard rctx;
1864 auto context = RequestContext::get();
1865 EXPECT_NE(defaultCtx, context);
1866 evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
1869 EXPECT_EQ(defaultCtx, RequestContext::get());
1871 EXPECT_EQ(defaultCtx, RequestContext::get());