2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 #include <folly/Memory.h>
21 #include <folly/ScopeGuard.h>
23 #include <folly/io/async/AsyncTimeout.h>
24 #include <folly/io/async/EventBase.h>
25 #include <folly/io/async/EventHandler.h>
26 #include <folly/io/async/test/SocketPair.h>
27 #include <folly/io/async/test/Util.h>
28 #include <folly/portability/Unistd.h>
30 #include <folly/futures/Promise.h>
41 using std::unique_ptr;
46 using std::chrono::milliseconds;
47 using std::chrono::microseconds;
48 using std::chrono::duration_cast;
50 using namespace std::chrono_literals;
52 using namespace folly;
54 ///////////////////////////////////////////////////////////////////////////
55 // Tests for read and write events
56 ///////////////////////////////////////////////////////////////////////////
58 enum { BUF_SIZE = 4096 };
60 ssize_t writeToFD(int fd, size_t length) {
61 // write an arbitrary amount of data to the fd
62 auto bufv = vector<char>(length);
63 auto buf = bufv.data();
64 memset(buf, 'a', length);
65 ssize_t rc = write(fd, buf, length);
70 size_t writeUntilFull(int fd) {
71 // Write to the fd until EAGAIN is returned
72 size_t bytesWritten = 0;
74 memset(buf, 'a', sizeof(buf));
76 ssize_t rc = write(fd, buf, sizeof(buf));
78 CHECK_EQ(errno, EAGAIN);
87 ssize_t readFromFD(int fd, size_t length) {
88 // write an arbitrary amount of data to the fd
89 auto buf = vector<char>(length);
90 return read(fd, buf.data(), length);
93 size_t readUntilEmpty(int fd) {
94 // Read from the fd until EAGAIN is returned
98 int rc = read(fd, buf, sizeof(buf));
100 CHECK(false) << "unexpected EOF";
102 CHECK_EQ(errno, EAGAIN);
111 void checkReadUntilEmpty(int fd, size_t expectedLength) {
112 ASSERT_EQ(readUntilEmpty(fd), expectedLength);
115 struct ScheduledEvent {
121 void perform(int fd) {
122 if (events & EventHandler::READ) {
124 result = readUntilEmpty(fd);
126 result = readFromFD(fd, length);
129 if (events & EventHandler::WRITE) {
131 result = writeUntilFull(fd);
133 result = writeToFD(fd, length);
139 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
140 for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
141 eventBase->tryRunAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
146 class TestHandler : public EventHandler {
148 TestHandler(EventBase* eventBase, int fd)
149 : EventHandler(eventBase, fd), fd_(fd) {}
151 void handlerReady(uint16_t events) noexcept override {
152 ssize_t bytesRead = 0;
153 ssize_t bytesWritten = 0;
155 // Read all available data, so EventBase will stop calling us
156 // until new data becomes available
157 bytesRead = readUntilEmpty(fd_);
159 if (events & WRITE) {
160 // Write until the pipe buffer is full, so EventBase will stop calling
161 // us until the other end has read some data
162 bytesWritten = writeUntilFull(fd_);
165 log.emplace_back(events, bytesRead, bytesWritten);
169 EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
172 , bytesRead(bytesRead)
173 , bytesWritten(bytesWritten) {}
178 ssize_t bytesWritten;
181 deque<EventRecord> log;
190 TEST(EventBaseTest, ReadEvent) {
194 // Register for read events
195 TestHandler handler(&eb, sp[0]);
196 handler.registerHandler(EventHandler::READ);
198 // Register timeouts to perform two write events
199 ScheduledEvent events[] = {
200 { 10, EventHandler::WRITE, 2345, 0 },
201 { 160, EventHandler::WRITE, 99, 0 },
204 scheduleEvents(&eb, sp[1], events);
211 // Since we didn't use the EventHandler::PERSIST flag, the handler should
212 // have received the first read, then unregistered itself. Check that only
213 // the first chunk of data was received.
214 ASSERT_EQ(handler.log.size(), 1);
215 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
216 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
217 milliseconds(events[0].milliseconds), milliseconds(90));
218 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
219 ASSERT_EQ(handler.log[0].bytesWritten, 0);
220 T_CHECK_TIMEOUT(start, end,
221 milliseconds(events[1].milliseconds), milliseconds(30));
223 // Make sure the second chunk of data is still waiting to be read.
224 size_t bytesRemaining = readUntilEmpty(sp[0]);
225 ASSERT_EQ(bytesRemaining, events[1].length);
229 * Test (READ | PERSIST)
231 TEST(EventBaseTest, ReadPersist) {
235 // Register for read events
236 TestHandler handler(&eb, sp[0]);
237 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
239 // Register several timeouts to perform writes
240 ScheduledEvent events[] = {
241 { 10, EventHandler::WRITE, 1024, 0 },
242 { 20, EventHandler::WRITE, 2211, 0 },
243 { 30, EventHandler::WRITE, 4096, 0 },
244 { 100, EventHandler::WRITE, 100, 0 },
247 scheduleEvents(&eb, sp[1], events);
249 // Schedule a timeout to unregister the handler after the third write
250 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
257 // The handler should have received the first 3 events,
258 // then been unregistered after that.
259 ASSERT_EQ(handler.log.size(), 3);
260 for (int n = 0; n < 3; ++n) {
261 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
262 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
263 milliseconds(events[n].milliseconds));
264 ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
265 ASSERT_EQ(handler.log[n].bytesWritten, 0);
267 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
269 // Make sure the data from the last write is still waiting to be read
270 size_t bytesRemaining = readUntilEmpty(sp[0]);
271 ASSERT_EQ(bytesRemaining, events[3].length);
275 * Test registering for READ when the socket is immediately readable
277 TEST(EventBaseTest, ReadImmediate) {
281 // Write some data to the socket so the other end will
282 // be immediately readable
283 size_t dataLength = 1234;
284 writeToFD(sp[1], dataLength);
286 // Register for read events
287 TestHandler handler(&eb, sp[0]);
288 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
290 // Register a timeout to perform another write
291 ScheduledEvent events[] = {
292 { 10, EventHandler::WRITE, 2345, 0 },
295 scheduleEvents(&eb, sp[1], events);
297 // Schedule a timeout to unregister the handler
298 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
305 ASSERT_EQ(handler.log.size(), 2);
307 // There should have been 1 event for immediate readability
308 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
309 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
310 ASSERT_EQ(handler.log[0].bytesRead, dataLength);
311 ASSERT_EQ(handler.log[0].bytesWritten, 0);
313 // There should be another event after the timeout wrote more data
314 ASSERT_EQ(handler.log[1].events, EventHandler::READ);
315 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
316 milliseconds(events[0].milliseconds));
317 ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
318 ASSERT_EQ(handler.log[1].bytesWritten, 0);
320 T_CHECK_TIMEOUT(start, end, milliseconds(20));
326 TEST(EventBaseTest, WriteEvent) {
330 // Fill up the write buffer before starting
331 size_t initialBytesWritten = writeUntilFull(sp[0]);
333 // Register for write events
334 TestHandler handler(&eb, sp[0]);
335 handler.registerHandler(EventHandler::WRITE);
337 // Register timeouts to perform two reads
338 ScheduledEvent events[] = {
339 { 10, EventHandler::READ, 0, 0 },
340 { 60, EventHandler::READ, 0, 0 },
343 scheduleEvents(&eb, sp[1], events);
350 // Since we didn't use the EventHandler::PERSIST flag, the handler should
351 // have only been able to write once, then unregistered itself.
352 ASSERT_EQ(handler.log.size(), 1);
353 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
354 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
355 milliseconds(events[0].milliseconds));
356 ASSERT_EQ(handler.log[0].bytesRead, 0);
357 ASSERT_GT(handler.log[0].bytesWritten, 0);
358 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
360 ASSERT_EQ(events[0].result, initialBytesWritten);
361 ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
365 * Test (WRITE | PERSIST)
367 TEST(EventBaseTest, WritePersist) {
371 // Fill up the write buffer before starting
372 size_t initialBytesWritten = writeUntilFull(sp[0]);
374 // Register for write events
375 TestHandler handler(&eb, sp[0]);
376 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
378 // Register several timeouts to read from the socket at several intervals
379 ScheduledEvent events[] = {
380 { 10, EventHandler::READ, 0, 0 },
381 { 40, EventHandler::READ, 0, 0 },
382 { 70, EventHandler::READ, 0, 0 },
383 { 100, EventHandler::READ, 0, 0 },
386 scheduleEvents(&eb, sp[1], events);
388 // Schedule a timeout to unregister the handler after the third read
389 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
396 // The handler should have received the first 3 events,
397 // then been unregistered after that.
398 ASSERT_EQ(handler.log.size(), 3);
399 ASSERT_EQ(events[0].result, initialBytesWritten);
400 for (int n = 0; n < 3; ++n) {
401 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
402 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
403 milliseconds(events[n].milliseconds));
404 ASSERT_EQ(handler.log[n].bytesRead, 0);
405 ASSERT_GT(handler.log[n].bytesWritten, 0);
406 ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
408 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
412 * Test registering for WRITE when the socket is immediately writable
414 TEST(EventBaseTest, WriteImmediate) {
418 // Register for write events
419 TestHandler handler(&eb, sp[0]);
420 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
422 // Register a timeout to perform a read
423 ScheduledEvent events[] = {
424 { 10, EventHandler::READ, 0, 0 },
427 scheduleEvents(&eb, sp[1], events);
429 // Schedule a timeout to unregister the handler
430 int64_t unregisterTimeout = 40;
431 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
439 ASSERT_EQ(handler.log.size(), 2);
441 // Since the socket buffer was initially empty,
442 // there should have been 1 event for immediate writability
443 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
444 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
445 ASSERT_EQ(handler.log[0].bytesRead, 0);
446 ASSERT_GT(handler.log[0].bytesWritten, 0);
448 // There should be another event after the timeout wrote more data
449 ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
450 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
451 milliseconds(events[0].milliseconds));
452 ASSERT_EQ(handler.log[1].bytesRead, 0);
453 ASSERT_GT(handler.log[1].bytesWritten, 0);
455 T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
459 * Test (READ | WRITE) when the socket becomes readable first
461 TEST(EventBaseTest, ReadWrite) {
465 // Fill up the write buffer before starting
466 size_t sock0WriteLength = writeUntilFull(sp[0]);
468 // Register for read and write events
469 TestHandler handler(&eb, sp[0]);
470 handler.registerHandler(EventHandler::READ_WRITE);
472 // Register timeouts to perform a write then a read.
473 ScheduledEvent events[] = {
474 { 10, EventHandler::WRITE, 2345, 0 },
475 { 40, EventHandler::READ, 0, 0 },
478 scheduleEvents(&eb, sp[1], events);
485 // Since we didn't use the EventHandler::PERSIST flag, the handler should
486 // have only noticed readability, then unregistered itself. Check that only
487 // one event was logged.
488 ASSERT_EQ(handler.log.size(), 1);
489 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
490 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
491 milliseconds(events[0].milliseconds));
492 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
493 ASSERT_EQ(handler.log[0].bytesWritten, 0);
494 ASSERT_EQ(events[1].result, sock0WriteLength);
495 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
499 * Test (READ | WRITE) when the socket becomes writable first
501 TEST(EventBaseTest, WriteRead) {
505 // Fill up the write buffer before starting
506 size_t sock0WriteLength = writeUntilFull(sp[0]);
508 // Register for read and write events
509 TestHandler handler(&eb, sp[0]);
510 handler.registerHandler(EventHandler::READ_WRITE);
512 // Register timeouts to perform a read then a write.
513 size_t sock1WriteLength = 2345;
514 ScheduledEvent events[] = {
515 { 10, EventHandler::READ, 0, 0 },
516 { 40, EventHandler::WRITE, sock1WriteLength, 0 },
519 scheduleEvents(&eb, sp[1], events);
526 // Since we didn't use the EventHandler::PERSIST flag, the handler should
527 // have only noticed writability, then unregistered itself. Check that only
528 // one event was logged.
529 ASSERT_EQ(handler.log.size(), 1);
530 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
531 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
532 milliseconds(events[0].milliseconds));
533 ASSERT_EQ(handler.log[0].bytesRead, 0);
534 ASSERT_GT(handler.log[0].bytesWritten, 0);
535 ASSERT_EQ(events[0].result, sock0WriteLength);
536 ASSERT_EQ(events[1].result, sock1WriteLength);
537 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
539 // Make sure the written data is still waiting to be read.
540 size_t bytesRemaining = readUntilEmpty(sp[0]);
541 ASSERT_EQ(bytesRemaining, events[1].length);
545 * Test (READ | WRITE) when the socket becomes readable and writable
548 TEST(EventBaseTest, ReadWriteSimultaneous) {
552 // Fill up the write buffer before starting
553 size_t sock0WriteLength = writeUntilFull(sp[0]);
555 // Register for read and write events
556 TestHandler handler(&eb, sp[0]);
557 handler.registerHandler(EventHandler::READ_WRITE);
559 // Register a timeout to perform a read and write together
560 ScheduledEvent events[] = {
561 { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
564 scheduleEvents(&eb, sp[1], events);
571 // It's not strictly required that the EventBase register us about both
572 // events in the same call. So, it's possible that if the EventBase
573 // implementation changes this test could start failing, and it wouldn't be
574 // considered breaking the API. However for now it's nice to exercise this
576 ASSERT_EQ(handler.log.size(), 1);
577 ASSERT_EQ(handler.log[0].events,
578 EventHandler::READ | EventHandler::WRITE);
579 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
580 milliseconds(events[0].milliseconds));
581 ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
582 ASSERT_GT(handler.log[0].bytesWritten, 0);
583 T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
587 * Test (READ | WRITE | PERSIST)
589 TEST(EventBaseTest, ReadWritePersist) {
593 // Register for read and write events
594 TestHandler handler(&eb, sp[0]);
595 handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
596 EventHandler::PERSIST);
598 // Register timeouts to perform several reads and writes
599 ScheduledEvent events[] = {
600 { 10, EventHandler::WRITE, 2345, 0 },
601 { 20, EventHandler::READ, 0, 0 },
602 { 35, EventHandler::WRITE, 200, 0 },
603 { 45, EventHandler::WRITE, 15, 0 },
604 { 55, EventHandler::READ, 0, 0 },
605 { 120, EventHandler::WRITE, 2345, 0 },
608 scheduleEvents(&eb, sp[1], events);
610 // Schedule a timeout to unregister the handler
611 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
618 ASSERT_EQ(handler.log.size(), 6);
620 // Since we didn't fill up the write buffer immediately, there should
621 // be an immediate event for writability.
622 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
623 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
624 ASSERT_EQ(handler.log[0].bytesRead, 0);
625 ASSERT_GT(handler.log[0].bytesWritten, 0);
627 // Events 1 through 5 should correspond to the scheduled events
628 for (int n = 1; n < 6; ++n) {
629 ScheduledEvent* event = &events[n - 1];
630 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
631 milliseconds(event->milliseconds));
632 if (event->events == EventHandler::READ) {
633 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
634 ASSERT_EQ(handler.log[n].bytesRead, 0);
635 ASSERT_GT(handler.log[n].bytesWritten, 0);
637 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
638 ASSERT_EQ(handler.log[n].bytesRead, event->length);
639 ASSERT_EQ(handler.log[n].bytesWritten, 0);
643 // The timeout should have unregistered the handler before the last write.
644 // Make sure that data is still waiting to be read
645 size_t bytesRemaining = readUntilEmpty(sp[0]);
646 ASSERT_EQ(bytesRemaining, events[5].length);
650 class PartialReadHandler : public TestHandler {
652 PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
653 : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
655 void handlerReady(uint16_t events) noexcept override {
656 assert(events == EventHandler::READ);
657 ssize_t bytesRead = readFromFD(fd_, readLength_);
658 log.emplace_back(events, bytesRead, 0);
667 * Test reading only part of the available data when a read event is fired.
668 * When PERSIST is used, make sure the handler gets notified again the next
669 * time around the loop.
671 TEST(EventBaseTest, ReadPartial) {
675 // Register for read events
676 size_t readLength = 100;
677 PartialReadHandler handler(&eb, sp[0], readLength);
678 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
680 // Register a timeout to perform a single write,
681 // with more data than PartialReadHandler will read at once
682 ScheduledEvent events[] = {
683 { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
686 scheduleEvents(&eb, sp[1], events);
688 // Schedule a timeout to unregister the handler
689 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
696 ASSERT_EQ(handler.log.size(), 4);
698 // The first 3 invocations should read readLength bytes each
699 for (int n = 0; n < 3; ++n) {
700 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
701 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
702 milliseconds(events[0].milliseconds));
703 ASSERT_EQ(handler.log[n].bytesRead, readLength);
704 ASSERT_EQ(handler.log[n].bytesWritten, 0);
706 // The last read only has readLength/2 bytes
707 ASSERT_EQ(handler.log[3].events, EventHandler::READ);
708 T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
709 milliseconds(events[0].milliseconds));
710 ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
711 ASSERT_EQ(handler.log[3].bytesWritten, 0);
715 class PartialWriteHandler : public TestHandler {
717 PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
718 : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
720 void handlerReady(uint16_t events) noexcept override {
721 assert(events == EventHandler::WRITE);
722 ssize_t bytesWritten = writeToFD(fd_, writeLength_);
723 log.emplace_back(events, 0, bytesWritten);
732 * Test writing without completely filling up the write buffer when the fd
733 * becomes writable. When PERSIST is used, make sure the handler gets
734 * notified again the next time around the loop.
736 TEST(EventBaseTest, WritePartial) {
740 // Fill up the write buffer before starting
741 size_t initialBytesWritten = writeUntilFull(sp[0]);
743 // Register for write events
744 size_t writeLength = 100;
745 PartialWriteHandler handler(&eb, sp[0], writeLength);
746 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
748 // Register a timeout to read, so that more data can be written
749 ScheduledEvent events[] = {
750 { 10, EventHandler::READ, 0, 0 },
753 scheduleEvents(&eb, sp[1], events);
755 // Schedule a timeout to unregister the handler
756 eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
763 // Depending on how big the socket buffer is, there will be multiple writes
764 // Only check the first 5
766 ASSERT_GE(handler.log.size(), numChecked);
767 ASSERT_EQ(events[0].result, initialBytesWritten);
769 // The first 3 invocations should read writeLength bytes each
770 for (int n = 0; n < numChecked; ++n) {
771 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
772 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
773 milliseconds(events[0].milliseconds));
774 ASSERT_EQ(handler.log[n].bytesRead, 0);
775 ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
781 * Test destroying a registered EventHandler
783 TEST(EventBaseTest, DestroyHandler) {
784 class DestroyHandler : public AsyncTimeout {
786 DestroyHandler(EventBase* eb, EventHandler* h)
790 void timeoutExpired() noexcept override { delete handler_; }
793 EventHandler* handler_;
799 // Fill up the write buffer before starting
800 size_t initialBytesWritten = writeUntilFull(sp[0]);
802 // Register for write events
803 TestHandler* handler = new TestHandler(&eb, sp[0]);
804 handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
806 // After 10ms, read some data, so that the handler
807 // will be notified that it can write.
808 eb.tryRunAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
811 // Start a timer to destroy the handler after 25ms
812 // This mainly just makes sure the code doesn't break or assert
813 DestroyHandler dh(&eb, handler);
814 dh.scheduleTimeout(25);
820 // Make sure the EventHandler was uninstalled properly when it was
821 // destroyed, and the EventBase loop exited
822 T_CHECK_TIMEOUT(start, end, milliseconds(25));
824 // Make sure that the handler wrote data to the socket
825 // before it was destroyed
826 size_t bytesRemaining = readUntilEmpty(sp[1]);
827 ASSERT_GT(bytesRemaining, 0);
831 ///////////////////////////////////////////////////////////////////////////
832 // Tests for timeout events
833 ///////////////////////////////////////////////////////////////////////////
835 TEST(EventBaseTest, RunAfterDelay) {
838 TimePoint timestamp1(false);
839 TimePoint timestamp2(false);
840 TimePoint timestamp3(false);
841 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
842 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
843 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40);
849 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
850 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
851 T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
852 T_CHECK_TIMEOUT(start, end, milliseconds(40));
856 * Test the behavior of tryRunAfterDelay() when some timeouts are
857 * still scheduled when the EventBase is destroyed.
859 TEST(EventBaseTest, RunAfterDelayDestruction) {
860 TimePoint timestamp1(false);
861 TimePoint timestamp2(false);
862 TimePoint timestamp3(false);
863 TimePoint timestamp4(false);
864 TimePoint start(false);
865 TimePoint end(false);
870 // Run two normal timeouts
871 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
872 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
874 // Schedule a timeout to stop the event loop after 40ms
875 eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
877 // Schedule 2 timeouts that would fire after the event loop stops
878 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80);
879 eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160);
886 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
887 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
888 T_CHECK_TIMEOUT(start, end, milliseconds(40));
890 ASSERT_TRUE(timestamp3.isUnset());
891 ASSERT_TRUE(timestamp4.isUnset());
893 // Ideally this test should be run under valgrind to ensure that no
897 class TestTimeout : public AsyncTimeout {
899 explicit TestTimeout(EventBase* eventBase)
900 : AsyncTimeout(eventBase)
901 , timestamp(false) {}
903 void timeoutExpired() noexcept override { timestamp.reset(); }
908 TEST(EventBaseTest, BasicTimeouts) {
914 t1.scheduleTimeout(10);
915 t2.scheduleTimeout(20);
916 t3.scheduleTimeout(40);
922 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
923 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
924 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
925 T_CHECK_TIMEOUT(start, end, milliseconds(40));
928 class ReschedulingTimeout : public AsyncTimeout {
930 ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
932 , timeouts_(timeouts)
933 , iterator_(timeouts_.begin()) {}
939 void timeoutExpired() noexcept override {
940 timestamps.emplace_back();
945 if (iterator_ != timeouts_.end()) {
946 uint32_t timeout = *iterator_;
948 scheduleTimeout(timeout);
952 vector<TimePoint> timestamps;
955 vector<uint32_t> timeouts_;
956 vector<uint32_t>::const_iterator iterator_;
960 * Test rescheduling the same timeout multiple times
962 TEST(EventBaseTest, ReuseTimeout) {
965 vector<uint32_t> timeouts;
966 timeouts.push_back(10);
967 timeouts.push_back(30);
968 timeouts.push_back(15);
970 ReschedulingTimeout t(&eb, timeouts);
977 // Use a higher tolerance than usual. We're waiting on 3 timeouts
978 // consecutively. In general, each timeout may go over by a few
979 // milliseconds, and we're tripling this error by witing on 3 timeouts.
980 milliseconds tolerance{6};
982 ASSERT_EQ(timeouts.size(), t.timestamps.size());
984 for (size_t n = 0; n < timeouts.size(); ++n) {
985 total += timeouts[n];
986 T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
988 T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
992 * Test rescheduling a timeout before it has fired
994 TEST(EventBaseTest, RescheduleTimeout) {
1001 t1.scheduleTimeout(15);
1002 t2.scheduleTimeout(30);
1003 t3.scheduleTimeout(30);
1005 auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
1006 &AsyncTimeout::scheduleTimeout);
1008 // after 10ms, reschedule t2 to run sooner than originally scheduled
1009 eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
1010 // after 10ms, reschedule t3 to run later than originally scheduled
1011 eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
1017 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1018 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1019 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1020 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1024 * Test cancelling a timeout
1026 TEST(EventBaseTest, CancelTimeout) {
1029 vector<uint32_t> timeouts;
1030 timeouts.push_back(10);
1031 timeouts.push_back(30);
1032 timeouts.push_back(25);
1034 ReschedulingTimeout t(&eb, timeouts);
1036 eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1042 ASSERT_EQ(t.timestamps.size(), 2);
1043 T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1044 T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1045 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1049 * Test destroying a scheduled timeout object
1051 TEST(EventBaseTest, DestroyTimeout) {
1052 class DestroyTimeout : public AsyncTimeout {
1054 DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1058 void timeoutExpired() noexcept override { delete timeout_; }
1061 AsyncTimeout* timeout_;
1066 TestTimeout* t1 = new TestTimeout(&eb);
1067 t1->scheduleTimeout(30);
1069 DestroyTimeout dt(&eb, t1);
1070 dt.scheduleTimeout(10);
1076 T_CHECK_TIMEOUT(start, end, milliseconds(10));
1080 ///////////////////////////////////////////////////////////////////////////
1081 // Test for runInThreadTestFunc()
1082 ///////////////////////////////////////////////////////////////////////////
1084 struct RunInThreadData {
1085 RunInThreadData(int numThreads, int opsPerThread)
1086 : opsPerThread(opsPerThread)
1087 , opsToGo(numThreads*opsPerThread) {}
1090 deque< pair<int, int> > values;
1096 struct RunInThreadArg {
1097 RunInThreadArg(RunInThreadData* data,
1104 RunInThreadData* data;
1109 void runInThreadTestFunc(RunInThreadArg* arg) {
1110 arg->data->values.emplace_back(arg->thread, arg->value);
1111 RunInThreadData* data = arg->data;
1114 if(--data->opsToGo == 0) {
1115 // Break out of the event base loop if we are the last thread running
1116 data->evb.terminateLoopSoon();
1120 TEST(EventBaseTest, RunInThread) {
1121 constexpr uint32_t numThreads = 50;
1122 constexpr uint32_t opsPerThread = 100;
1123 RunInThreadData data(numThreads, opsPerThread);
1125 deque<std::thread> threads;
1127 // Wait on all of the threads.
1128 for (auto& thread : threads) {
1133 for (uint32_t i = 0; i < numThreads; ++i) {
1134 threads.emplace_back([i, &data] {
1135 for (int n = 0; n < data.opsPerThread; ++n) {
1136 RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1137 data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1143 // Add a timeout event to run after 3 seconds.
1144 // Otherwise loop() will return immediately since there are no events to run.
1145 // Once the last thread exits, it will stop the loop(). However, this
1146 // timeout also stops the loop in case there is a bug performing the normal
1148 data.evb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1155 // Verify that the loop exited because all threads finished and requested it
1156 // to stop. This should happen much sooner than the 3 second timeout.
1157 // Assert that it happens in under a second. (This is still tons of extra
1160 auto timeTaken = std::chrono::duration_cast<milliseconds>(
1161 end.getTime() - start.getTime());
1162 ASSERT_LT(timeTaken.count(), 1000);
1163 VLOG(11) << "Time taken: " << timeTaken.count();
1165 // Verify that we have all of the events from every thread
1166 int expectedValues[numThreads];
1167 for (uint32_t n = 0; n < numThreads; ++n) {
1168 expectedValues[n] = 0;
1170 for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1171 it != data.values.end();
1173 int threadID = it->first;
1174 int value = it->second;
1175 ASSERT_EQ(expectedValues[threadID], value);
1176 ++expectedValues[threadID];
1178 for (uint32_t n = 0; n < numThreads; ++n) {
1179 ASSERT_EQ(expectedValues[n], opsPerThread);
1183 // This test simulates some calls, and verifies that the waiting happens by
1184 // triggering what otherwise would be race conditions, and trying to detect
1185 // whether any of the race conditions happened.
1186 TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
1187 const size_t c = 256;
1188 vector<unique_ptr<atomic<size_t>>> atoms(c);
1189 for (size_t i = 0; i < c; ++i) {
1190 auto& atom = atoms.at(i);
1191 atom = make_unique<atomic<size_t>>(0);
1193 vector<thread> threads;
1194 for (size_t i = 0; i < c; ++i) {
1195 threads.emplace_back([&atoms, i] {
1197 auto& atom = *atoms.at(i);
1198 auto ebth = thread([&] { eb.loopForever(); });
1199 eb.waitUntilRunning();
1200 eb.runInEventBaseThreadAndWait([&] {
1202 atom.compare_exchange_weak(
1203 x, 1, std::memory_order_release, std::memory_order_relaxed);
1206 atom.compare_exchange_weak(
1207 x, 2, std::memory_order_release, std::memory_order_relaxed);
1208 eb.terminateLoopSoon();
1212 for (size_t i = 0; i < c; ++i) {
1213 auto& th = threads.at(i);
1217 for (auto& atom : atoms) sum += *atom;
1221 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
1223 thread th(&EventBase::loopForever, &eb);
1225 eb.terminateLoopSoon();
1228 auto mutated = false;
1229 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1232 EXPECT_TRUE(mutated);
1235 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
1237 thread th(&EventBase::loopForever, &eb);
1239 eb.terminateLoopSoon();
1242 eb.runInEventBaseThreadAndWait([&] {
1243 auto mutated = false;
1244 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1247 EXPECT_TRUE(mutated);
1251 TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
1253 auto mutated = false;
1254 eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] {
1257 EXPECT_TRUE(mutated);
1260 ///////////////////////////////////////////////////////////////////////////
1261 // Tests for runInLoop()
1262 ///////////////////////////////////////////////////////////////////////////
1264 class CountedLoopCallback : public EventBase::LoopCallback {
1266 CountedLoopCallback(EventBase* eventBase,
1268 std::function<void()> action =
1269 std::function<void()>())
1270 : eventBase_(eventBase)
1272 , action_(action) {}
1274 void runLoopCallback() noexcept override {
1277 eventBase_->runInLoop(this);
1278 } else if (action_) {
1283 unsigned int getCount() const {
1288 EventBase* eventBase_;
1289 unsigned int count_;
1290 std::function<void()> action_;
1293 // Test that EventBase::loop() doesn't exit while there are
1294 // still LoopCallbacks remaining to be invoked.
1295 TEST(EventBaseTest, RepeatedRunInLoop) {
1296 EventBase eventBase;
1298 CountedLoopCallback c(&eventBase, 10);
1299 eventBase.runInLoop(&c);
1300 // The callback shouldn't have run immediately
1301 ASSERT_EQ(c.getCount(), 10);
1304 // loop() should loop until the CountedLoopCallback stops
1305 // re-installing itself.
1306 ASSERT_EQ(c.getCount(), 0);
1309 // Test that EventBase::loop() works as expected without time measurements.
1310 TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
1311 EventBase eventBase(false);
1313 CountedLoopCallback c(&eventBase, 10);
1314 eventBase.runInLoop(&c);
1315 // The callback shouldn't have run immediately
1316 ASSERT_EQ(c.getCount(), 10);
1319 // loop() should loop until the CountedLoopCallback stops
1320 // re-installing itself.
1321 ASSERT_EQ(c.getCount(), 0);
1324 // Test runInLoop() calls with terminateLoopSoon()
1325 TEST(EventBaseTest, RunInLoopStopLoop) {
1326 EventBase eventBase;
1328 CountedLoopCallback c1(&eventBase, 20);
1329 CountedLoopCallback c2(&eventBase, 10,
1330 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1332 eventBase.runInLoop(&c1);
1333 eventBase.runInLoop(&c2);
1334 ASSERT_EQ(c1.getCount(), 20);
1335 ASSERT_EQ(c2.getCount(), 10);
1337 eventBase.loopForever();
1339 // c2 should have stopped the loop after 10 iterations
1340 ASSERT_EQ(c2.getCount(), 0);
1342 // We allow the EventBase to run the loop callbacks in whatever order it
1343 // chooses. We'll accept c1's count being either 10 (if the loop terminated
1344 // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1347 // (With the current code, c1 will always run 10 times, but we don't consider
1348 // this a hard API requirement.)
1349 ASSERT_GE(c1.getCount(), 10);
1350 ASSERT_LE(c1.getCount(), 11);
1353 TEST(EventBaseTest, TryRunningAfterTerminate) {
1354 EventBase eventBase;
1355 CountedLoopCallback c1(&eventBase, 1,
1356 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1357 eventBase.runInLoop(&c1);
1358 eventBase.loopForever();
1360 eventBase.runInEventBaseThread([&]() {
1367 // Test cancelling runInLoop() callbacks
1368 TEST(EventBaseTest, CancelRunInLoop) {
1369 EventBase eventBase;
1371 CountedLoopCallback c1(&eventBase, 20);
1372 CountedLoopCallback c2(&eventBase, 20);
1373 CountedLoopCallback c3(&eventBase, 20);
1375 std::function<void()> cancelC1Action =
1376 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1377 std::function<void()> cancelC2Action =
1378 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1380 CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1381 CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1383 // Install cancelC1 after c1
1384 eventBase.runInLoop(&c1);
1385 eventBase.runInLoop(&cancelC1);
1387 // Install cancelC2 before c2
1388 eventBase.runInLoop(&cancelC2);
1389 eventBase.runInLoop(&c2);
1392 eventBase.runInLoop(&c3);
1394 ASSERT_EQ(c1.getCount(), 20);
1395 ASSERT_EQ(c2.getCount(), 20);
1396 ASSERT_EQ(c3.getCount(), 20);
1397 ASSERT_EQ(cancelC1.getCount(), 10);
1398 ASSERT_EQ(cancelC2.getCount(), 10);
1403 // cancelC1 and cancelC3 should have both fired after 10 iterations and
1404 // stopped re-installing themselves
1405 ASSERT_EQ(cancelC1.getCount(), 0);
1406 ASSERT_EQ(cancelC2.getCount(), 0);
1407 // c3 should have continued on for the full 20 iterations
1408 ASSERT_EQ(c3.getCount(), 0);
1410 // c1 and c2 should have both been cancelled on the 10th iteration.
1412 // Callbacks are always run in the order they are installed,
1413 // so c1 should have fired 10 times, and been canceled after it ran on the
1414 // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
1415 // have run before it on the 10th iteration, and cancelled it before it
1417 ASSERT_EQ(c1.getCount(), 10);
1418 ASSERT_EQ(c2.getCount(), 11);
1421 class TerminateTestCallback : public EventBase::LoopCallback,
1422 public EventHandler {
1424 TerminateTestCallback(EventBase* eventBase, int fd)
1425 : EventHandler(eventBase, fd),
1426 eventBase_(eventBase),
1427 loopInvocations_(0),
1428 maxLoopInvocations_(0),
1429 eventInvocations_(0),
1430 maxEventInvocations_(0) {}
1432 void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1433 loopInvocations_ = 0;
1434 maxLoopInvocations_ = maxLoopInvocations;
1435 eventInvocations_ = 0;
1436 maxEventInvocations_ = maxEventInvocations;
1438 cancelLoopCallback();
1439 unregisterHandler();
1442 void handlerReady(uint16_t /* events */) noexcept override {
1443 // We didn't register with PERSIST, so we will have been automatically
1444 // unregistered already.
1445 ASSERT_FALSE(isHandlerRegistered());
1447 ++eventInvocations_;
1448 if (eventInvocations_ >= maxEventInvocations_) {
1452 eventBase_->runInLoop(this);
1454 void runLoopCallback() noexcept override {
1456 if (loopInvocations_ >= maxLoopInvocations_) {
1460 registerHandler(READ);
1463 uint32_t getLoopInvocations() const {
1464 return loopInvocations_;
1466 uint32_t getEventInvocations() const {
1467 return eventInvocations_;
1471 EventBase* eventBase_;
1472 uint32_t loopInvocations_;
1473 uint32_t maxLoopInvocations_;
1474 uint32_t eventInvocations_;
1475 uint32_t maxEventInvocations_;
1479 * Test that EventBase::loop() correctly detects when there are no more events
1482 * This uses a single callback, which alternates registering itself as a loop
1483 * callback versus a EventHandler callback. This exercises a regression where
1484 * EventBase::loop() incorrectly exited if there were no more fd handlers
1485 * registered, but a loop callback installed a new fd handler.
1487 TEST(EventBaseTest, LoopTermination) {
1488 EventBase eventBase;
1490 // Open a pipe and close the write end,
1491 // so the read endpoint will be readable
1493 int rc = pipe(pipeFds);
1496 TerminateTestCallback callback(&eventBase, pipeFds[0]);
1498 // Test once where the callback will exit after a loop callback
1499 callback.reset(10, 100);
1500 eventBase.runInLoop(&callback);
1502 ASSERT_EQ(callback.getLoopInvocations(), 10);
1503 ASSERT_EQ(callback.getEventInvocations(), 9);
1505 // Test once where the callback will exit after an fd event callback
1506 callback.reset(100, 7);
1507 eventBase.runInLoop(&callback);
1509 ASSERT_EQ(callback.getLoopInvocations(), 7);
1510 ASSERT_EQ(callback.getEventInvocations(), 7);
1515 ///////////////////////////////////////////////////////////////////////////
1516 // Tests for latency calculations
1517 ///////////////////////////////////////////////////////////////////////////
1519 class IdleTimeTimeoutSeries : public AsyncTimeout {
1523 explicit IdleTimeTimeoutSeries(EventBase *base,
1524 std::deque<std::uint64_t>& timeout) :
1531 ~IdleTimeTimeoutSeries() override {}
1533 void timeoutExpired() noexcept override {
1536 if(timeout_.empty()){
1539 uint64_t sleepTime = timeout_.front();
1540 timeout_.pop_front();
1548 int getTimeouts() const {
1554 std::deque<uint64_t>& timeout_;
1558 * Verify that idle time is correctly accounted for when decaying our loop
1561 * This works by creating a high loop time (via usleep), expecting a latency
1562 * callback with known value, and then scheduling a timeout for later. This
1563 * later timeout is far enough in the future that the idle time should have
1564 * caused the loop time to decay.
1566 TEST(EventBaseTest, IdleTime) {
1567 EventBase eventBase;
1568 eventBase.setLoadAvgMsec(1000ms);
1569 eventBase.resetLoadAvg(5900.0);
1570 std::deque<uint64_t> timeouts0(4, 8080);
1571 timeouts0.push_front(8000);
1572 timeouts0.push_back(14000);
1573 IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1574 std::deque<uint64_t> timeouts(20, 20);
1575 std::unique_ptr<IdleTimeTimeoutSeries> tos;
1576 int64_t testStart = duration_cast<microseconds>(
1577 std::chrono::steady_clock::now().time_since_epoch()).count();
1578 bool hostOverloaded = false;
1580 int latencyCallbacks = 0;
1581 eventBase.setMaxLatency(6000, [&]() {
1583 if (latencyCallbacks != 1) {
1584 FAIL() << "Unexpected latency callback";
1587 if (tos0.getTimeouts() < 6) {
1588 // This could only happen if the host this test is running
1589 // on is heavily loaded.
1590 int64_t maxLatencyReached = duration_cast<microseconds>(
1591 std::chrono::steady_clock::now().time_since_epoch()).count();
1592 ASSERT_LE(43800, maxLatencyReached - testStart);
1593 hostOverloaded = true;
1596 ASSERT_EQ(6, tos0.getTimeouts());
1597 ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1598 ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1599 tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1602 // Kick things off with an "immedite" timeout
1603 tos0.scheduleTimeout(1);
1607 if (hostOverloaded) {
1611 ASSERT_EQ(1, latencyCallbacks);
1612 ASSERT_EQ(7, tos0.getTimeouts());
1613 ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1614 ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1616 ASSERT_EQ(21, tos->getTimeouts());
1620 * Test that thisLoop functionality works with terminateLoopSoon
1622 TEST(EventBaseTest, ThisLoop) {
1624 bool runInLoop = false;
1625 bool runThisLoop = false;
1628 eb.terminateLoopSoon();
1629 eb.runInLoop([&]() {
1632 eb.runInLoop([&]() {
1639 ASSERT_FALSE(runInLoop);
1640 // Should work with thisLoop
1641 ASSERT_TRUE(runThisLoop);
1644 TEST(EventBaseTest, EventBaseThreadLoop) {
1648 base.runInEventBaseThread([&](){
1653 ASSERT_EQ(true, ran);
1656 TEST(EventBaseTest, EventBaseThreadName) {
1658 base.setName("foo");
1661 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1663 pthread_getname_np(pthread_self(), name, 16);
1664 ASSERT_EQ(0, strcmp("foo", name));
1668 TEST(EventBaseTest, RunBeforeLoop) {
1670 CountedLoopCallback cb(&base, 1, [&](){
1671 base.terminateLoopSoon();
1673 base.runBeforeLoop(&cb);
1675 ASSERT_EQ(cb.getCount(), 0);
1678 TEST(EventBaseTest, RunBeforeLoopWait) {
1680 CountedLoopCallback cb(&base, 1);
1681 base.tryRunAfterDelay([&](){
1682 base.terminateLoopSoon();
1684 base.runBeforeLoop(&cb);
1687 // Check that we only ran once, and did not loop multiple times.
1688 ASSERT_EQ(cb.getCount(), 0);
1691 class PipeHandler : public EventHandler {
1693 PipeHandler(EventBase* eventBase, int fd)
1694 : EventHandler(eventBase, fd) {}
1696 void handlerReady(uint16_t /* events */) noexcept override { abort(); }
1699 TEST(EventBaseTest, StopBeforeLoop) {
1702 // Give the evb something to do.
1704 ASSERT_EQ(0, pipe(p));
1705 PipeHandler handler(&evb, p[0]);
1706 handler.registerHandler(EventHandler::READ);
1708 // It's definitely not running yet
1709 evb.terminateLoopSoon();
1711 // let it run, it should exit quickly.
1712 std::thread t([&] { evb.loop(); });
1715 handler.unregisterHandler();
1722 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1727 base.runInEventBaseThread([&](){
1735 TEST(EventBaseTest, LoopKeepAlive) {
1739 std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1740 /* sleep override */ std::this_thread::sleep_for(
1741 std::chrono::milliseconds(100));
1742 evb.runInEventBaseThread(
1743 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1753 TEST(EventBaseTest, LoopKeepAliveInLoop) {
1759 evb.runInEventBaseThread([&] {
1760 t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
1761 /* sleep override */ std::this_thread::sleep_for(
1762 std::chrono::milliseconds(100));
1763 evb.runInEventBaseThread(
1764 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1775 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
1776 std::unique_ptr<EventBase> evb = folly::make_unique<EventBase>();
1780 std::thread evThread([&] {
1787 auto* ev = evb.get();
1788 EventBase::LoopKeepAlive keepAlive;
1789 ev->runInEventBaseThreadAndWait(
1790 [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); });
1791 ASSERT_FALSE(done) << "Loop finished before we asked it to";
1792 ev->terminateLoopSoon();
1793 /* sleep override */
1794 std::this_thread::sleep_for(std::chrono::milliseconds(30));
1795 ASSERT_FALSE(done) << "Loop terminated early";
1796 ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{});
1803 TEST(EventBaseTest, LoopKeepAliveShutdown) {
1804 auto evb = folly::make_unique<EventBase>();
1810 loopKeepAlive = evb->loopKeepAlive(),
1813 /* sleep override */ std::this_thread::sleep_for(
1814 std::chrono::milliseconds(100));
1815 evbPtr->runInEventBaseThread(
1816 [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
1826 TEST(EventBaseTest, LoopKeepAliveAtomic) {
1827 auto evb = folly::make_unique<EventBase>();
1829 static constexpr size_t kNumThreads = 100;
1830 static constexpr size_t kNumTasks = 100;
1832 std::vector<std::thread> ts;
1833 std::vector<std::unique_ptr<Baton<>>> batons;
1836 for (size_t i = 0; i < kNumThreads; ++i) {
1837 batons.emplace_back(std::make_unique<Baton<>>());
1840 for (size_t i = 0; i < kNumThreads; ++i) {
1841 ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
1842 std::vector<EventBase::LoopKeepAlive> keepAlives;
1843 for (size_t j = 0; j < kNumTasks; ++j) {
1844 keepAlives.emplace_back(evbPtr->loopKeepAliveAtomic());
1849 /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
1851 for (auto& keepAlive : keepAlives) {
1852 evbPtr->runInEventBaseThread(
1853 [&done, keepAlive = std::move(keepAlive) ]() { ++done; });
1858 for (auto& baton : batons) {
1864 EXPECT_EQ(kNumThreads * kNumTasks, done);
1866 for (auto& t : ts) {
1871 TEST(EventBaseTest, DrivableExecutorTest) {
1872 folly::Promise<bool> p;
1873 auto f = p.getFuture();
1875 bool finished = false;
1878 /* sleep override */
1879 std::this_thread::sleep_for(std::chrono::microseconds(10));
1881 base.runInEventBaseThread([&]() { p.setValue(true); });
1884 // Ensure drive does not busy wait
1885 base.drive(); // TODO: fix notification queue init() extra wakeup
1887 EXPECT_TRUE(finished);
1889 folly::Promise<bool> p2;
1890 auto f2 = p2.getFuture();
1891 // Ensure waitVia gets woken up properly, even from
1892 // a separate thread.
1893 base.runAfterDelay([&]() { p2.setValue(true); }, 10);
1895 EXPECT_TRUE(f2.isReady());
1900 TEST(EventBaseTest, RequestContextTest) {
1902 auto defaultCtx = RequestContext::get();
1905 RequestContextScopeGuard rctx;
1906 auto context = RequestContext::get();
1907 EXPECT_NE(defaultCtx, context);
1908 evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
1911 EXPECT_EQ(defaultCtx, RequestContext::get());
1913 EXPECT_EQ(defaultCtx, RequestContext::get());