2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
19 #include <folly/io/async/AsyncTimeout.h>
20 #include <folly/io/async/EventBase.h>
21 #include <folly/io/async/EventHandler.h>
22 #include <folly/io/async/test/SocketPair.h>
23 #include <folly/io/async/test/Util.h>
36 using namespace folly;
38 ///////////////////////////////////////////////////////////////////////////
39 // Tests for read and write events
40 ///////////////////////////////////////////////////////////////////////////
42 enum { BUF_SIZE = 4096 };
44 ssize_t writeToFD(int fd, size_t length) {
45 // write an arbitrary amount of data to the fd
47 memset(buf, 'a', sizeof(buf));
48 ssize_t rc = write(fd, buf, sizeof(buf));
53 size_t writeUntilFull(int fd) {
54 // Write to the fd until EAGAIN is returned
55 size_t bytesWritten = 0;
57 memset(buf, 'a', sizeof(buf));
59 ssize_t rc = write(fd, buf, sizeof(buf));
61 CHECK_EQ(errno, EAGAIN);
70 ssize_t readFromFD(int fd, size_t length) {
71 // write an arbitrary amount of data to the fd
73 return read(fd, buf, sizeof(buf));
76 size_t readUntilEmpty(int fd) {
77 // Read from the fd until EAGAIN is returned
81 int rc = read(fd, buf, sizeof(buf));
83 CHECK(false) << "unexpected EOF";
85 CHECK_EQ(errno, EAGAIN);
94 void checkReadUntilEmpty(int fd, size_t expectedLength) {
95 ASSERT_EQ(readUntilEmpty(fd), expectedLength);
98 struct ScheduledEvent {
104 void perform(int fd) {
105 if (events & EventHandler::READ) {
107 result = readUntilEmpty(fd);
109 result = readFromFD(fd, length);
112 if (events & EventHandler::WRITE) {
114 result = writeUntilFull(fd);
116 result = writeToFD(fd, length);
122 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
123 for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
124 eventBase->runAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
129 class TestHandler : public EventHandler {
131 TestHandler(EventBase* eventBase, int fd)
132 : EventHandler(eventBase, fd), fd_(fd) {}
134 virtual void handlerReady(uint16_t events) noexcept {
135 ssize_t bytesRead = 0;
136 ssize_t bytesWritten = 0;
138 // Read all available data, so EventBase will stop calling us
139 // until new data becomes available
140 bytesRead = readUntilEmpty(fd_);
142 if (events & WRITE) {
143 // Write until the pipe buffer is full, so EventBase will stop calling
144 // us until the other end has read some data
145 bytesWritten = writeUntilFull(fd_);
148 log.push_back(EventRecord(events, bytesRead, bytesWritten));
152 EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
155 , bytesRead(bytesRead)
156 , bytesWritten(bytesWritten) {}
161 ssize_t bytesWritten;
164 deque<EventRecord> log;
173 TEST(EventBaseTest, ReadEvent) {
177 // Register for read events
178 TestHandler handler(&eb, sp[0]);
179 handler.registerHandler(EventHandler::READ);
181 // Register timeouts to perform two write events
182 ScheduledEvent events[] = {
183 { 10, EventHandler::WRITE, 2345 },
184 { 160, EventHandler::WRITE, 99 },
187 scheduleEvents(&eb, sp[1], events);
194 // Since we didn't use the EventHandler::PERSIST flag, the handler should
195 // have received the first read, then unregistered itself. Check that only
196 // the first chunk of data was received.
197 ASSERT_EQ(handler.log.size(), 1);
198 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
199 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds, 90);
200 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
201 ASSERT_EQ(handler.log[0].bytesWritten, 0);
202 T_CHECK_TIMEOUT(start, end, events[1].milliseconds, 30);
204 // Make sure the second chunk of data is still waiting to be read.
205 size_t bytesRemaining = readUntilEmpty(sp[0]);
206 ASSERT_EQ(bytesRemaining, events[1].length);
210 * Test (READ | PERSIST)
212 TEST(EventBaseTest, ReadPersist) {
216 // Register for read events
217 TestHandler handler(&eb, sp[0]);
218 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
220 // Register several timeouts to perform writes
221 ScheduledEvent events[] = {
222 { 10, EventHandler::WRITE, 1024 },
223 { 20, EventHandler::WRITE, 2211 },
224 { 30, EventHandler::WRITE, 4096 },
225 { 100, EventHandler::WRITE, 100 },
228 scheduleEvents(&eb, sp[1], events);
230 // Schedule a timeout to unregister the handler after the third write
231 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
238 // The handler should have received the first 3 events,
239 // then been unregistered after that.
240 ASSERT_EQ(handler.log.size(), 3);
241 for (int n = 0; n < 3; ++n) {
242 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
243 T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds);
244 ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
245 ASSERT_EQ(handler.log[n].bytesWritten, 0);
247 T_CHECK_TIMEOUT(start, end, events[3].milliseconds);
249 // Make sure the data from the last write is still waiting to be read
250 size_t bytesRemaining = readUntilEmpty(sp[0]);
251 ASSERT_EQ(bytesRemaining, events[3].length);
255 * Test registering for READ when the socket is immediately readable
257 TEST(EventBaseTest, ReadImmediate) {
261 // Write some data to the socket so the other end will
262 // be immediately readable
263 size_t dataLength = 1234;
264 writeToFD(sp[1], dataLength);
266 // Register for read events
267 TestHandler handler(&eb, sp[0]);
268 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
270 // Register a timeout to perform another write
271 ScheduledEvent events[] = {
272 { 10, EventHandler::WRITE, 2345 },
275 scheduleEvents(&eb, sp[1], events);
277 // Schedule a timeout to unregister the handler
278 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
285 ASSERT_EQ(handler.log.size(), 2);
287 // There should have been 1 event for immediate readability
288 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
289 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
290 ASSERT_EQ(handler.log[0].bytesRead, dataLength);
291 ASSERT_EQ(handler.log[0].bytesWritten, 0);
293 // There should be another event after the timeout wrote more data
294 ASSERT_EQ(handler.log[1].events, EventHandler::READ);
295 T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds);
296 ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
297 ASSERT_EQ(handler.log[1].bytesWritten, 0);
299 T_CHECK_TIMEOUT(start, end, 20);
305 TEST(EventBaseTest, WriteEvent) {
309 // Fill up the write buffer before starting
310 size_t initialBytesWritten = writeUntilFull(sp[0]);
312 // Register for write events
313 TestHandler handler(&eb, sp[0]);
314 handler.registerHandler(EventHandler::WRITE);
316 // Register timeouts to perform two reads
317 ScheduledEvent events[] = {
318 { 10, EventHandler::READ, 0 },
319 { 60, EventHandler::READ, 0 },
322 scheduleEvents(&eb, sp[1], events);
329 // Since we didn't use the EventHandler::PERSIST flag, the handler should
330 // have only been able to write once, then unregistered itself.
331 ASSERT_EQ(handler.log.size(), 1);
332 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
333 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
334 ASSERT_EQ(handler.log[0].bytesRead, 0);
335 ASSERT_GT(handler.log[0].bytesWritten, 0);
336 T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
338 ASSERT_EQ(events[0].result, initialBytesWritten);
339 ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
343 * Test (WRITE | PERSIST)
345 TEST(EventBaseTest, WritePersist) {
349 // Fill up the write buffer before starting
350 size_t initialBytesWritten = writeUntilFull(sp[0]);
352 // Register for write events
353 TestHandler handler(&eb, sp[0]);
354 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
356 // Register several timeouts to read from the socket at several intervals
357 ScheduledEvent events[] = {
358 { 10, EventHandler::READ, 0 },
359 { 40, EventHandler::READ, 0 },
360 { 70, EventHandler::READ, 0 },
361 { 100, EventHandler::READ, 0 },
364 scheduleEvents(&eb, sp[1], events);
366 // Schedule a timeout to unregister the handler after the third read
367 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
374 // The handler should have received the first 3 events,
375 // then been unregistered after that.
376 ASSERT_EQ(handler.log.size(), 3);
377 ASSERT_EQ(events[0].result, initialBytesWritten);
378 for (int n = 0; n < 3; ++n) {
379 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
380 T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[n].milliseconds);
381 ASSERT_EQ(handler.log[n].bytesRead, 0);
382 ASSERT_GT(handler.log[n].bytesWritten, 0);
383 ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
385 T_CHECK_TIMEOUT(start, end, events[3].milliseconds);
389 * Test registering for WRITE when the socket is immediately writable
391 TEST(EventBaseTest, WriteImmediate) {
395 // Register for write events
396 TestHandler handler(&eb, sp[0]);
397 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
399 // Register a timeout to perform a read
400 ScheduledEvent events[] = {
401 { 10, EventHandler::READ, 0 },
404 scheduleEvents(&eb, sp[1], events);
406 // Schedule a timeout to unregister the handler
407 int64_t unregisterTimeout = 40;
408 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
416 ASSERT_EQ(handler.log.size(), 2);
418 // Since the socket buffer was initially empty,
419 // there should have been 1 event for immediate writability
420 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
421 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
422 ASSERT_EQ(handler.log[0].bytesRead, 0);
423 ASSERT_GT(handler.log[0].bytesWritten, 0);
425 // There should be another event after the timeout wrote more data
426 ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
427 T_CHECK_TIMEOUT(start, handler.log[1].timestamp, events[0].milliseconds);
428 ASSERT_EQ(handler.log[1].bytesRead, 0);
429 ASSERT_GT(handler.log[1].bytesWritten, 0);
431 T_CHECK_TIMEOUT(start, end, unregisterTimeout);
435 * Test (READ | WRITE) when the socket becomes readable first
437 TEST(EventBaseTest, ReadWrite) {
441 // Fill up the write buffer before starting
442 size_t sock0WriteLength = writeUntilFull(sp[0]);
444 // Register for read and write events
445 TestHandler handler(&eb, sp[0]);
446 handler.registerHandler(EventHandler::READ_WRITE);
448 // Register timeouts to perform a write then a read.
449 ScheduledEvent events[] = {
450 { 10, EventHandler::WRITE, 2345 },
451 { 40, EventHandler::READ, 0 },
454 scheduleEvents(&eb, sp[1], events);
461 // Since we didn't use the EventHandler::PERSIST flag, the handler should
462 // have only noticed readability, then unregistered itself. Check that only
463 // one event was logged.
464 ASSERT_EQ(handler.log.size(), 1);
465 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
466 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
467 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
468 ASSERT_EQ(handler.log[0].bytesWritten, 0);
469 ASSERT_EQ(events[1].result, sock0WriteLength);
470 T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
474 * Test (READ | WRITE) when the socket becomes writable first
476 TEST(EventBaseTest, WriteRead) {
480 // Fill up the write buffer before starting
481 size_t sock0WriteLength = writeUntilFull(sp[0]);
483 // Register for read and write events
484 TestHandler handler(&eb, sp[0]);
485 handler.registerHandler(EventHandler::READ_WRITE);
487 // Register timeouts to perform a read then a write.
488 size_t sock1WriteLength = 2345;
489 ScheduledEvent events[] = {
490 { 10, EventHandler::READ, 0 },
491 { 40, EventHandler::WRITE, sock1WriteLength },
494 scheduleEvents(&eb, sp[1], events);
501 // Since we didn't use the EventHandler::PERSIST flag, the handler should
502 // have only noticed writability, then unregistered itself. Check that only
503 // one event was logged.
504 ASSERT_EQ(handler.log.size(), 1);
505 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
506 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
507 ASSERT_EQ(handler.log[0].bytesRead, 0);
508 ASSERT_GT(handler.log[0].bytesWritten, 0);
509 ASSERT_EQ(events[0].result, sock0WriteLength);
510 ASSERT_EQ(events[1].result, sock1WriteLength);
511 T_CHECK_TIMEOUT(start, end, events[1].milliseconds);
513 // Make sure the written data is still waiting to be read.
514 size_t bytesRemaining = readUntilEmpty(sp[0]);
515 ASSERT_EQ(bytesRemaining, events[1].length);
519 * Test (READ | WRITE) when the socket becomes readable and writable
522 TEST(EventBaseTest, ReadWriteSimultaneous) {
526 // Fill up the write buffer before starting
527 size_t sock0WriteLength = writeUntilFull(sp[0]);
529 // Register for read and write events
530 TestHandler handler(&eb, sp[0]);
531 handler.registerHandler(EventHandler::READ_WRITE);
533 // Register a timeout to perform a read and write together
534 ScheduledEvent events[] = {
535 { 10, EventHandler::READ | EventHandler::WRITE, 0 },
538 scheduleEvents(&eb, sp[1], events);
545 // It's not strictly required that the EventBase register us about both
546 // events in the same call. So, it's possible that if the EventBase
547 // implementation changes this test could start failing, and it wouldn't be
548 // considered breaking the API. However for now it's nice to exercise this
550 ASSERT_EQ(handler.log.size(), 1);
551 ASSERT_EQ(handler.log[0].events,
552 EventHandler::READ | EventHandler::WRITE);
553 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, events[0].milliseconds);
554 ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
555 ASSERT_GT(handler.log[0].bytesWritten, 0);
556 T_CHECK_TIMEOUT(start, end, events[0].milliseconds);
560 * Test (READ | WRITE | PERSIST)
562 TEST(EventBaseTest, ReadWritePersist) {
566 // Register for read and write events
567 TestHandler handler(&eb, sp[0]);
568 handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
569 EventHandler::PERSIST);
571 // Register timeouts to perform several reads and writes
572 ScheduledEvent events[] = {
573 { 10, EventHandler::WRITE, 2345 },
574 { 20, EventHandler::READ, 0 },
575 { 35, EventHandler::WRITE, 200 },
576 { 45, EventHandler::WRITE, 15 },
577 { 55, EventHandler::READ, 0 },
578 { 120, EventHandler::WRITE, 2345 },
581 scheduleEvents(&eb, sp[1], events);
583 // Schedule a timeout to unregister the handler
584 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
591 ASSERT_EQ(handler.log.size(), 6);
593 // Since we didn't fill up the write buffer immediately, there should
594 // be an immediate event for writability.
595 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
596 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, 0);
597 ASSERT_EQ(handler.log[0].bytesRead, 0);
598 ASSERT_GT(handler.log[0].bytesWritten, 0);
600 // Events 1 through 5 should correspond to the scheduled events
601 for (int n = 1; n < 6; ++n) {
602 ScheduledEvent* event = &events[n - 1];
603 T_CHECK_TIMEOUT(start, handler.log[n].timestamp, event->milliseconds);
604 if (event->events == EventHandler::READ) {
605 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
606 ASSERT_EQ(handler.log[n].bytesRead, 0);
607 ASSERT_GT(handler.log[n].bytesWritten, 0);
609 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
610 ASSERT_EQ(handler.log[n].bytesRead, event->length);
611 ASSERT_EQ(handler.log[n].bytesWritten, 0);
615 // The timeout should have unregistered the handler before the last write.
616 // Make sure that data is still waiting to be read
617 size_t bytesRemaining = readUntilEmpty(sp[0]);
618 ASSERT_EQ(bytesRemaining, events[5].length);
622 class PartialReadHandler : public TestHandler {
624 PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
625 : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
627 virtual void handlerReady(uint16_t events) noexcept {
628 assert(events == EventHandler::READ);
629 ssize_t bytesRead = readFromFD(fd_, readLength_);
630 log.push_back(EventRecord(events, bytesRead, 0));
639 * Test reading only part of the available data when a read event is fired.
640 * When PERSIST is used, make sure the handler gets notified again the next
641 * time around the loop.
643 TEST(EventBaseTest, ReadPartial) {
647 // Register for read events
648 size_t readLength = 100;
649 PartialReadHandler handler(&eb, sp[0], readLength);
650 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
652 // Register a timeout to perform a single write,
653 // with more data than PartialReadHandler will read at once
654 ScheduledEvent events[] = {
655 { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) },
658 scheduleEvents(&eb, sp[1], events);
660 // Schedule a timeout to unregister the handler
661 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
668 ASSERT_EQ(handler.log.size(), 4);
670 // The first 3 invocations should read readLength bytes each
671 for (int n = 0; n < 3; ++n) {
672 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
673 T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds);
674 ASSERT_EQ(handler.log[n].bytesRead, readLength);
675 ASSERT_EQ(handler.log[n].bytesWritten, 0);
677 // The last read only has readLength/2 bytes
678 ASSERT_EQ(handler.log[3].events, EventHandler::READ);
679 T_CHECK_TIMEOUT(start, handler.log[3].timestamp, events[0].milliseconds);
680 ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
681 ASSERT_EQ(handler.log[3].bytesWritten, 0);
685 class PartialWriteHandler : public TestHandler {
687 PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
688 : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
690 virtual void handlerReady(uint16_t events) noexcept {
691 assert(events == EventHandler::WRITE);
692 ssize_t bytesWritten = writeToFD(fd_, writeLength_);
693 log.push_back(EventRecord(events, 0, bytesWritten));
702 * Test writing without completely filling up the write buffer when the fd
703 * becomes writable. When PERSIST is used, make sure the handler gets
704 * notified again the next time around the loop.
706 TEST(EventBaseTest, WritePartial) {
710 // Fill up the write buffer before starting
711 size_t initialBytesWritten = writeUntilFull(sp[0]);
713 // Register for write events
714 size_t writeLength = 100;
715 PartialWriteHandler handler(&eb, sp[0], writeLength);
716 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
718 // Register a timeout to read, so that more data can be written
719 ScheduledEvent events[] = {
720 { 10, EventHandler::READ, 0 },
723 scheduleEvents(&eb, sp[1], events);
725 // Schedule a timeout to unregister the handler
726 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
733 // Depending on how big the socket buffer is, there will be multiple writes
734 // Only check the first 5
736 ASSERT_GE(handler.log.size(), numChecked);
737 ASSERT_EQ(events[0].result, initialBytesWritten);
739 // The first 3 invocations should read writeLength bytes each
740 for (int n = 0; n < numChecked; ++n) {
741 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
742 T_CHECK_TIMEOUT(start, handler.log[n].timestamp, events[0].milliseconds);
743 ASSERT_EQ(handler.log[n].bytesRead, 0);
744 ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
750 * Test destroying a registered EventHandler
752 TEST(EventBaseTest, DestroyHandler) {
753 class DestroyHandler : public TAsyncTimeout {
755 DestroyHandler(EventBase* eb, EventHandler* h)
759 virtual void timeoutExpired() noexcept {
764 EventHandler* handler_;
770 // Fill up the write buffer before starting
771 size_t initialBytesWritten = writeUntilFull(sp[0]);
773 // Register for write events
774 TestHandler* handler = new TestHandler(&eb, sp[0]);
775 handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
777 // After 10ms, read some data, so that the handler
778 // will be notified that it can write.
779 eb.runAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
782 // Start a timer to destroy the handler after 25ms
783 // This mainly just makes sure the code doesn't break or assert
784 DestroyHandler dh(&eb, handler);
785 dh.scheduleTimeout(25);
791 // Make sure the EventHandler was uninstalled properly when it was
792 // destroyed, and the EventBase loop exited
793 T_CHECK_TIMEOUT(start, end, 25);
795 // Make sure that the handler wrote data to the socket
796 // before it was destroyed
797 size_t bytesRemaining = readUntilEmpty(sp[1]);
798 ASSERT_GT(bytesRemaining, 0);
802 ///////////////////////////////////////////////////////////////////////////
803 // Tests for timeout events
804 ///////////////////////////////////////////////////////////////////////////
806 TEST(EventBaseTest, RunAfterDelay) {
809 TimePoint timestamp1(false);
810 TimePoint timestamp2(false);
811 TimePoint timestamp3(false);
812 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
813 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
814 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40);
820 T_CHECK_TIMEOUT(start, timestamp1, 10);
821 T_CHECK_TIMEOUT(start, timestamp2, 20);
822 T_CHECK_TIMEOUT(start, timestamp3, 40);
823 T_CHECK_TIMEOUT(start, end, 40);
827 * Test the behavior of runAfterDelay() when some timeouts are
828 * still scheduled when the EventBase is destroyed.
830 TEST(EventBaseTest, RunAfterDelayDestruction) {
831 TimePoint timestamp1(false);
832 TimePoint timestamp2(false);
833 TimePoint timestamp3(false);
834 TimePoint timestamp4(false);
835 TimePoint start(false);
836 TimePoint end(false);
841 // Run two normal timeouts
842 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
843 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
845 // Schedule a timeout to stop the event loop after 40ms
846 eb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
848 // Schedule 2 timeouts that would fire after the event loop stops
849 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80);
850 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160);
857 T_CHECK_TIMEOUT(start, timestamp1, 10);
858 T_CHECK_TIMEOUT(start, timestamp2, 20);
859 T_CHECK_TIMEOUT(start, end, 40);
861 ASSERT_TRUE(timestamp3.isUnset());
862 ASSERT_TRUE(timestamp4.isUnset());
864 // Ideally this test should be run under valgrind to ensure that no
868 class TestTimeout : public TAsyncTimeout {
870 explicit TestTimeout(EventBase* eventBase)
871 : TAsyncTimeout(eventBase)
872 , timestamp(false) {}
874 virtual void timeoutExpired() noexcept {
881 TEST(EventBaseTest, BasicTimeouts) {
887 t1.scheduleTimeout(10);
888 t2.scheduleTimeout(20);
889 t3.scheduleTimeout(40);
895 T_CHECK_TIMEOUT(start, t1.timestamp, 10);
896 T_CHECK_TIMEOUT(start, t2.timestamp, 20);
897 T_CHECK_TIMEOUT(start, t3.timestamp, 40);
898 T_CHECK_TIMEOUT(start, end, 40);
901 class ReschedulingTimeout : public TAsyncTimeout {
903 ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
905 , timeouts_(timeouts)
906 , iterator_(timeouts_.begin()) {}
912 virtual void timeoutExpired() noexcept {
913 timestamps.push_back(TimePoint());
918 if (iterator_ != timeouts_.end()) {
919 uint32_t timeout = *iterator_;
921 scheduleTimeout(timeout);
925 vector<TimePoint> timestamps;
928 vector<uint32_t> timeouts_;
929 vector<uint32_t>::const_iterator iterator_;
933 * Test rescheduling the same timeout multiple times
935 TEST(EventBaseTest, ReuseTimeout) {
938 vector<uint32_t> timeouts;
939 timeouts.push_back(10);
940 timeouts.push_back(30);
941 timeouts.push_back(15);
943 ReschedulingTimeout t(&eb, timeouts);
950 // Use a higher tolerance than usual. We're waiting on 3 timeouts
951 // consecutively. In general, each timeout may go over by a few
952 // milliseconds, and we're tripling this error by witing on 3 timeouts.
953 int64_t tolerance = 6;
955 ASSERT_EQ(timeouts.size(), t.timestamps.size());
957 for (int n = 0; n < timeouts.size(); ++n) {
958 total += timeouts[n];
959 T_CHECK_TIMEOUT(start, t.timestamps[n], total, tolerance);
961 T_CHECK_TIMEOUT(start, end, total, tolerance);
965 * Test rescheduling a timeout before it has fired
967 TEST(EventBaseTest, RescheduleTimeout) {
974 t1.scheduleTimeout(15);
975 t2.scheduleTimeout(30);
976 t3.scheduleTimeout(30);
978 auto f = static_cast<bool(TAsyncTimeout::*)(uint32_t)>(
979 &TAsyncTimeout::scheduleTimeout);
981 // after 10ms, reschedule t2 to run sooner than originally scheduled
982 eb.runAfterDelay(std::bind(f, &t2, 10), 10);
983 // after 10ms, reschedule t3 to run later than originally scheduled
984 eb.runAfterDelay(std::bind(f, &t3, 40), 10);
990 T_CHECK_TIMEOUT(start, t1.timestamp, 15);
991 T_CHECK_TIMEOUT(start, t2.timestamp, 20);
992 T_CHECK_TIMEOUT(start, t3.timestamp, 50);
993 T_CHECK_TIMEOUT(start, end, 50);
997 * Test cancelling a timeout
999 TEST(EventBaseTest, CancelTimeout) {
1002 vector<uint32_t> timeouts;
1003 timeouts.push_back(10);
1004 timeouts.push_back(30);
1005 timeouts.push_back(25);
1007 ReschedulingTimeout t(&eb, timeouts);
1009 eb.runAfterDelay(std::bind(&TAsyncTimeout::cancelTimeout, &t), 50);
1015 ASSERT_EQ(t.timestamps.size(), 2);
1016 T_CHECK_TIMEOUT(start, t.timestamps[0], 10);
1017 T_CHECK_TIMEOUT(start, t.timestamps[1], 40);
1018 T_CHECK_TIMEOUT(start, end, 50);
1022 * Test destroying a scheduled timeout object
1024 TEST(EventBaseTest, DestroyTimeout) {
1025 class DestroyTimeout : public TAsyncTimeout {
1027 DestroyTimeout(EventBase* eb, TAsyncTimeout* t)
1031 virtual void timeoutExpired() noexcept {
1036 TAsyncTimeout* timeout_;
1041 TestTimeout* t1 = new TestTimeout(&eb);
1042 t1->scheduleTimeout(30);
1044 DestroyTimeout dt(&eb, t1);
1045 dt.scheduleTimeout(10);
1051 T_CHECK_TIMEOUT(start, end, 10);
1055 ///////////////////////////////////////////////////////////////////////////
1056 // Test for runInThreadTestFunc()
1057 ///////////////////////////////////////////////////////////////////////////
1059 struct RunInThreadData {
1060 RunInThreadData(int numThreads, int opsPerThread)
1061 : opsPerThread(opsPerThread)
1062 , opsToGo(numThreads*opsPerThread) {}
1065 deque< pair<int, int> > values;
1071 struct RunInThreadArg {
1072 RunInThreadArg(RunInThreadData* data,
1079 RunInThreadData* data;
1084 void runInThreadTestFunc(RunInThreadArg* arg) {
1085 arg->data->values.push_back(make_pair(arg->thread, arg->value));
1086 RunInThreadData* data = arg->data;
1089 if(--data->opsToGo == 0) {
1090 // Break out of the event base loop if we are the last thread running
1091 data->evb.terminateLoopSoon();
1095 class RunInThreadTester : public concurrency::Runnable {
1097 RunInThreadTester(int id, RunInThreadData* data) : id_(id), data_(data) {}
1100 // Call evb->runInThread() a number of times
1102 for (int n = 0; n < data_->opsPerThread; ++n) {
1103 RunInThreadArg* arg = new RunInThreadArg(data_, id_, n);
1104 data_->evb.runInEventBaseThread(runInThreadTestFunc, arg);
1112 RunInThreadData* data_;
1115 TEST(EventBaseTest, RunInThread) {
1116 uint32_t numThreads = 50;
1117 uint32_t opsPerThread = 100;
1118 RunInThreadData data(numThreads, opsPerThread);
1120 PosixThreadFactory threadFactory;
1121 threadFactory.setDetached(false);
1122 deque< std::shared_ptr<Thread> > threads;
1123 for (int n = 0; n < numThreads; ++n) {
1124 std::shared_ptr<RunInThreadTester> runner(new RunInThreadTester(n, &data));
1125 std::shared_ptr<Thread> thread = threadFactory.newThread(runner);
1126 threads.push_back(thread);
1130 // Add a timeout event to run after 3 seconds.
1131 // Otherwise loop() will return immediately since there are no events to run.
1132 // Once the last thread exits, it will stop the loop(). However, this
1133 // timeout also stops the loop in case there is a bug performing the normal
1135 data.evb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1142 // Verify that the loop exited because all threads finished and requested it
1143 // to stop. This should happen much sooner than the 3 second timeout.
1144 // Assert that it happens in under a second. (This is still tons of extra
1146 int64_t timeTaken = end.getTime() - start.getTime();
1147 ASSERT_LT(timeTaken, 1000);
1148 VLOG(11) << "Time taken: " << timeTaken;
1150 // Verify that we have all of the events from every thread
1151 int expectedValues[numThreads];
1152 for (int n = 0; n < numThreads; ++n) {
1153 expectedValues[n] = 0;
1155 for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1156 it != data.values.end();
1158 int threadID = it->first;
1159 int value = it->second;
1160 ASSERT_EQ(expectedValues[threadID], value);
1161 ++expectedValues[threadID];
1163 for (int n = 0; n < numThreads; ++n) {
1164 ASSERT_EQ(expectedValues[n], opsPerThread);
1167 // Wait on all of the threads. Otherwise we can exit and clean up
1168 // RunInThreadData before the last thread exits, while it is still holding
1169 // the RunInThreadData's mutex.
1170 for (deque< std::shared_ptr<Thread> >::const_iterator it = threads.begin();
1171 it != threads.end();
1177 ///////////////////////////////////////////////////////////////////////////
1178 // Tests for runInLoop()
1179 ///////////////////////////////////////////////////////////////////////////
1181 class CountedLoopCallback : public EventBase::LoopCallback {
1183 CountedLoopCallback(EventBase* eventBase,
1185 std::function<void()> action =
1186 std::function<void()>())
1187 : eventBase_(eventBase)
1189 , action_(action) {}
1191 virtual void runLoopCallback() noexcept {
1194 eventBase_->runInLoop(this);
1195 } else if (action_) {
1200 unsigned int getCount() const {
1205 EventBase* eventBase_;
1206 unsigned int count_;
1207 std::function<void()> action_;
1210 // Test that EventBase::loop() doesn't exit while there are
1211 // still LoopCallbacks remaining to be invoked.
1212 TEST(EventBaseTest, RepeatedRunInLoop) {
1213 EventBase eventBase;
1215 CountedLoopCallback c(&eventBase, 10);
1216 eventBase.runInLoop(&c);
1217 // The callback shouldn't have run immediately
1218 ASSERT_EQ(c.getCount(), 10);
1221 // loop() should loop until the CountedLoopCallback stops
1222 // re-installing itself.
1223 ASSERT_EQ(c.getCount(), 0);
1226 // Test runInLoop() calls with terminateLoopSoon()
1227 TEST(EventBaseTest, RunInLoopStopLoop) {
1228 EventBase eventBase;
1230 CountedLoopCallback c1(&eventBase, 20);
1231 CountedLoopCallback c2(&eventBase, 10,
1232 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1234 eventBase.runInLoop(&c1);
1235 eventBase.runInLoop(&c2);
1236 ASSERT_EQ(c1.getCount(), 20);
1237 ASSERT_EQ(c2.getCount(), 10);
1239 eventBase.loopForever();
1241 // c2 should have stopped the loop after 10 iterations
1242 ASSERT_EQ(c2.getCount(), 0);
1244 // We allow the EventBase to run the loop callbacks in whatever order it
1245 // chooses. We'll accept c1's count being either 10 (if the loop terminated
1246 // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1249 // (With the current code, c1 will always run 10 times, but we don't consider
1250 // this a hard API requirement.)
1251 ASSERT_GE(c1.getCount(), 10);
1252 ASSERT_LE(c1.getCount(), 11);
1255 // Test cancelling runInLoop() callbacks
1256 TEST(EventBaseTest, CancelRunInLoop) {
1257 EventBase eventBase;
1259 CountedLoopCallback c1(&eventBase, 20);
1260 CountedLoopCallback c2(&eventBase, 20);
1261 CountedLoopCallback c3(&eventBase, 20);
1263 std::function<void()> cancelC1Action =
1264 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1265 std::function<void()> cancelC2Action =
1266 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1268 CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1269 CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1271 // Install cancelC1 after c1
1272 eventBase.runInLoop(&c1);
1273 eventBase.runInLoop(&cancelC1);
1275 // Install cancelC2 before c2
1276 eventBase.runInLoop(&cancelC2);
1277 eventBase.runInLoop(&c2);
1280 eventBase.runInLoop(&c3);
1282 ASSERT_EQ(c1.getCount(), 20);
1283 ASSERT_EQ(c2.getCount(), 20);
1284 ASSERT_EQ(c3.getCount(), 20);
1285 ASSERT_EQ(cancelC1.getCount(), 10);
1286 ASSERT_EQ(cancelC2.getCount(), 10);
1291 // cancelC1 and cancelC3 should have both fired after 10 iterations and
1292 // stopped re-installing themselves
1293 ASSERT_EQ(cancelC1.getCount(), 0);
1294 ASSERT_EQ(cancelC2.getCount(), 0);
1295 // c3 should have continued on for the full 20 iterations
1296 ASSERT_EQ(c3.getCount(), 0);
1298 // c1 and c2 should have both been cancelled on the 10th iteration.
1300 // Callbacks are always run in the order they are installed,
1301 // so c1 should have fired 10 times, and been canceled after it ran on the
1302 // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
1303 // have run before it on the 10th iteration, and cancelled it before it
1305 ASSERT_EQ(c1.getCount(), 10);
1306 ASSERT_EQ(c2.getCount(), 11);
1309 class TerminateTestCallback : public EventBase::LoopCallback,
1310 public EventHandler {
1312 TerminateTestCallback(EventBase* eventBase, int fd)
1313 : EventHandler(eventBase, fd),
1314 eventBase_(eventBase),
1315 loopInvocations_(0),
1316 maxLoopInvocations_(0),
1317 eventInvocations_(0),
1318 maxEventInvocations_(0) {}
1320 void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1321 loopInvocations_ = 0;
1322 maxLoopInvocations_ = maxLoopInvocations;
1323 eventInvocations_ = 0;
1324 maxEventInvocations_ = maxEventInvocations;
1326 cancelLoopCallback();
1327 unregisterHandler();
1330 virtual void handlerReady(uint16_t events) noexcept {
1331 // We didn't register with PERSIST, so we will have been automatically
1332 // unregistered already.
1333 ASSERT_FALSE(isHandlerRegistered());
1335 ++eventInvocations_;
1336 if (eventInvocations_ >= maxEventInvocations_) {
1340 eventBase_->runInLoop(this);
1342 virtual void runLoopCallback() noexcept {
1344 if (loopInvocations_ >= maxLoopInvocations_) {
1348 registerHandler(READ);
1351 uint32_t getLoopInvocations() const {
1352 return loopInvocations_;
1354 uint32_t getEventInvocations() const {
1355 return eventInvocations_;
1359 EventBase* eventBase_;
1360 uint32_t loopInvocations_;
1361 uint32_t maxLoopInvocations_;
1362 uint32_t eventInvocations_;
1363 uint32_t maxEventInvocations_;
1367 * Test that EventBase::loop() correctly detects when there are no more events
1370 * This uses a single callback, which alternates registering itself as a loop
1371 * callback versus a EventHandler callback. This exercises a regression where
1372 * EventBase::loop() incorrectly exited if there were no more fd handlers
1373 * registered, but a loop callback installed a new fd handler.
1375 TEST(EventBaseTest, LoopTermination) {
1376 EventBase eventBase;
1378 // Open a pipe and close the write end,
1379 // so the read endpoint will be readable
1381 int rc = pipe(pipeFds);
1384 TerminateTestCallback callback(&eventBase, pipeFds[0]);
1386 // Test once where the callback will exit after a loop callback
1387 callback.reset(10, 100);
1388 eventBase.runInLoop(&callback);
1390 ASSERT_EQ(callback.getLoopInvocations(), 10);
1391 ASSERT_EQ(callback.getEventInvocations(), 9);
1393 // Test once where the callback will exit after an fd event callback
1394 callback.reset(100, 7);
1395 eventBase.runInLoop(&callback);
1397 ASSERT_EQ(callback.getLoopInvocations(), 7);
1398 ASSERT_EQ(callback.getEventInvocations(), 7);
1403 ///////////////////////////////////////////////////////////////////////////
1404 // Tests for latency calculations
1405 ///////////////////////////////////////////////////////////////////////////
1407 class IdleTimeTimeoutSeries : public TAsyncTimeout {
1411 explicit IdleTimeTimeoutSeries(EventBase *base,
1412 std::deque<std::uint64_t>& timeout) :
1413 TAsyncTimeout(base),
1419 virtual ~IdleTimeTimeoutSeries() {}
1421 void timeoutExpired() noexcept {
1424 if(timeout_.empty()){
1427 uint64_t sleepTime = timeout_.front();
1428 timeout_.pop_front();
1436 int getTimeouts() const {
1442 std::deque<uint64_t>& timeout_;
1446 * Verify that idle time is correctly accounted for when decaying our loop
1449 * This works by creating a high loop time (via usleep), expecting a latency
1450 * callback with known value, and then scheduling a timeout for later. This
1451 * later timeout is far enough in the future that the idle time should have
1452 * caused the loop time to decay.
1454 TEST(EventBaseTest, IdleTime) {
1455 EventBase eventBase;
1456 eventBase.setLoadAvgMsec(1000);
1457 eventBase.resetLoadAvg(5900.0);
1458 std::deque<uint64_t> timeouts0(4, 8080);
1459 timeouts0.push_front(8000);
1460 timeouts0.push_back(14000);
1461 IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1462 std::deque<uint64_t> timeouts(20, 20);
1463 std::unique_ptr<IdleTimeTimeoutSeries> tos;
1465 int latencyCallbacks = 0;
1466 eventBase.setMaxLatency(6000, [&]() {
1469 switch (latencyCallbacks) {
1471 ASSERT_EQ(6, tos0.getTimeouts());
1472 ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1473 ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1474 tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1478 FAIL() << "Unexpected latency callback";
1483 // Kick things off with an "immedite" timeout
1484 tos0.scheduleTimeout(1);
1488 ASSERT_EQ(1, latencyCallbacks);
1489 ASSERT_EQ(7, tos0.getTimeouts());
1490 ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1491 ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1492 ASSERT_EQ(21, tos->getTimeouts());
1496 * Test that thisLoop functionality works with terminateLoopSoon
1498 TEST(EventBaseTest, ThisLoop) {
1500 bool runInLoop = false;
1501 bool runThisLoop = false;
1504 eb.terminateLoopSoon();
1505 eb.runInLoop([&]() {
1508 eb.runInLoop([&]() {
1515 ASSERT_FALSE(runInLoop);
1516 // Should work with thisLoop
1517 ASSERT_TRUE(runThisLoop);
1520 TEST(EventBaseTest, EventBaseThreadLoop) {
1524 base.runInEventBaseThread([&](){
1529 ASSERT_EQ(true, ran);
1532 TEST(EventBaseTest, EventBaseThreadName) {
1534 base.setName("foo");
1537 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1539 pthread_getname_np(pthread_self(), name, 16);
1540 ASSERT_EQ(0, strcmp("foo", name));
1544 TEST(TEventBaseTest, RunBeforeLoop) {
1546 CountedLoopCallback cb(&base, 1, [&](){
1547 base.terminateLoopSoon();
1549 base.runBeforeLoop(&cb);
1551 ASSERT_EQUAL(cb.getCount(), 0);
1554 TEST(TEventBaseTest, RunBeforeLoopWait) {
1556 CountedLoopCallback cb(&base, 1);
1557 base.runAfterDelay([&](){
1558 base.terminateLoopSoon();
1560 base.runBeforeLoop(&cb);
1563 // Check that we only ran once, and did not loop multiple times.
1564 ASSERT_EQUAL(cb.getCount(), 0);