2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
19 #include <folly/io/async/AsyncTimeout.h>
20 #include <folly/io/async/EventBase.h>
21 #include <folly/io/async/EventHandler.h>
22 #include <folly/io/async/test/SocketPair.h>
23 #include <folly/io/async/test/Util.h>
36 using std::chrono::milliseconds;
38 using namespace folly;
40 ///////////////////////////////////////////////////////////////////////////
41 // Tests for read and write events
42 ///////////////////////////////////////////////////////////////////////////
44 enum { BUF_SIZE = 4096 };
46 ssize_t writeToFD(int fd, size_t length) {
47 // write an arbitrary amount of data to the fd
49 memset(buf, 'a', sizeof(buf));
50 ssize_t rc = write(fd, buf, sizeof(buf));
55 size_t writeUntilFull(int fd) {
56 // Write to the fd until EAGAIN is returned
57 size_t bytesWritten = 0;
59 memset(buf, 'a', sizeof(buf));
61 ssize_t rc = write(fd, buf, sizeof(buf));
63 CHECK_EQ(errno, EAGAIN);
72 ssize_t readFromFD(int fd, size_t length) {
73 // write an arbitrary amount of data to the fd
75 return read(fd, buf, sizeof(buf));
78 size_t readUntilEmpty(int fd) {
79 // Read from the fd until EAGAIN is returned
83 int rc = read(fd, buf, sizeof(buf));
85 CHECK(false) << "unexpected EOF";
87 CHECK_EQ(errno, EAGAIN);
96 void checkReadUntilEmpty(int fd, size_t expectedLength) {
97 ASSERT_EQ(readUntilEmpty(fd), expectedLength);
100 struct ScheduledEvent {
106 void perform(int fd) {
107 if (events & EventHandler::READ) {
109 result = readUntilEmpty(fd);
111 result = readFromFD(fd, length);
114 if (events & EventHandler::WRITE) {
116 result = writeUntilFull(fd);
118 result = writeToFD(fd, length);
124 void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
125 for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
126 eventBase->runAfterDelay(std::bind(&ScheduledEvent::perform, ev, fd),
131 class TestHandler : public EventHandler {
133 TestHandler(EventBase* eventBase, int fd)
134 : EventHandler(eventBase, fd), fd_(fd) {}
136 virtual void handlerReady(uint16_t events) noexcept {
137 ssize_t bytesRead = 0;
138 ssize_t bytesWritten = 0;
140 // Read all available data, so EventBase will stop calling us
141 // until new data becomes available
142 bytesRead = readUntilEmpty(fd_);
144 if (events & WRITE) {
145 // Write until the pipe buffer is full, so EventBase will stop calling
146 // us until the other end has read some data
147 bytesWritten = writeUntilFull(fd_);
150 log.push_back(EventRecord(events, bytesRead, bytesWritten));
154 EventRecord(uint16_t events, size_t bytesRead, size_t bytesWritten)
157 , bytesRead(bytesRead)
158 , bytesWritten(bytesWritten) {}
163 ssize_t bytesWritten;
166 deque<EventRecord> log;
175 TEST(EventBaseTest, ReadEvent) {
179 // Register for read events
180 TestHandler handler(&eb, sp[0]);
181 handler.registerHandler(EventHandler::READ);
183 // Register timeouts to perform two write events
184 ScheduledEvent events[] = {
185 { 10, EventHandler::WRITE, 2345 },
186 { 160, EventHandler::WRITE, 99 },
189 scheduleEvents(&eb, sp[1], events);
196 // Since we didn't use the EventHandler::PERSIST flag, the handler should
197 // have received the first read, then unregistered itself. Check that only
198 // the first chunk of data was received.
199 ASSERT_EQ(handler.log.size(), 1);
200 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
201 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
202 milliseconds(events[0].milliseconds), milliseconds(90));
203 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
204 ASSERT_EQ(handler.log[0].bytesWritten, 0);
205 T_CHECK_TIMEOUT(start, end,
206 milliseconds(events[1].milliseconds), milliseconds(30));
208 // Make sure the second chunk of data is still waiting to be read.
209 size_t bytesRemaining = readUntilEmpty(sp[0]);
210 ASSERT_EQ(bytesRemaining, events[1].length);
214 * Test (READ | PERSIST)
216 TEST(EventBaseTest, ReadPersist) {
220 // Register for read events
221 TestHandler handler(&eb, sp[0]);
222 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
224 // Register several timeouts to perform writes
225 ScheduledEvent events[] = {
226 { 10, EventHandler::WRITE, 1024 },
227 { 20, EventHandler::WRITE, 2211 },
228 { 30, EventHandler::WRITE, 4096 },
229 { 100, EventHandler::WRITE, 100 },
232 scheduleEvents(&eb, sp[1], events);
234 // Schedule a timeout to unregister the handler after the third write
235 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
242 // The handler should have received the first 3 events,
243 // then been unregistered after that.
244 ASSERT_EQ(handler.log.size(), 3);
245 for (int n = 0; n < 3; ++n) {
246 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
247 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
248 milliseconds(events[n].milliseconds));
249 ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
250 ASSERT_EQ(handler.log[n].bytesWritten, 0);
252 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
254 // Make sure the data from the last write is still waiting to be read
255 size_t bytesRemaining = readUntilEmpty(sp[0]);
256 ASSERT_EQ(bytesRemaining, events[3].length);
260 * Test registering for READ when the socket is immediately readable
262 TEST(EventBaseTest, ReadImmediate) {
266 // Write some data to the socket so the other end will
267 // be immediately readable
268 size_t dataLength = 1234;
269 writeToFD(sp[1], dataLength);
271 // Register for read events
272 TestHandler handler(&eb, sp[0]);
273 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
275 // Register a timeout to perform another write
276 ScheduledEvent events[] = {
277 { 10, EventHandler::WRITE, 2345 },
280 scheduleEvents(&eb, sp[1], events);
282 // Schedule a timeout to unregister the handler
283 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
290 ASSERT_EQ(handler.log.size(), 2);
292 // There should have been 1 event for immediate readability
293 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
294 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
295 ASSERT_EQ(handler.log[0].bytesRead, dataLength);
296 ASSERT_EQ(handler.log[0].bytesWritten, 0);
298 // There should be another event after the timeout wrote more data
299 ASSERT_EQ(handler.log[1].events, EventHandler::READ);
300 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
301 milliseconds(events[0].milliseconds));
302 ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
303 ASSERT_EQ(handler.log[1].bytesWritten, 0);
305 T_CHECK_TIMEOUT(start, end, milliseconds(20));
311 TEST(EventBaseTest, WriteEvent) {
315 // Fill up the write buffer before starting
316 size_t initialBytesWritten = writeUntilFull(sp[0]);
318 // Register for write events
319 TestHandler handler(&eb, sp[0]);
320 handler.registerHandler(EventHandler::WRITE);
322 // Register timeouts to perform two reads
323 ScheduledEvent events[] = {
324 { 10, EventHandler::READ, 0 },
325 { 60, EventHandler::READ, 0 },
328 scheduleEvents(&eb, sp[1], events);
335 // Since we didn't use the EventHandler::PERSIST flag, the handler should
336 // have only been able to write once, then unregistered itself.
337 ASSERT_EQ(handler.log.size(), 1);
338 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
339 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
340 milliseconds(events[0].milliseconds));
341 ASSERT_EQ(handler.log[0].bytesRead, 0);
342 ASSERT_GT(handler.log[0].bytesWritten, 0);
343 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
345 ASSERT_EQ(events[0].result, initialBytesWritten);
346 ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
350 * Test (WRITE | PERSIST)
352 TEST(EventBaseTest, WritePersist) {
356 // Fill up the write buffer before starting
357 size_t initialBytesWritten = writeUntilFull(sp[0]);
359 // Register for write events
360 TestHandler handler(&eb, sp[0]);
361 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
363 // Register several timeouts to read from the socket at several intervals
364 ScheduledEvent events[] = {
365 { 10, EventHandler::READ, 0 },
366 { 40, EventHandler::READ, 0 },
367 { 70, EventHandler::READ, 0 },
368 { 100, EventHandler::READ, 0 },
371 scheduleEvents(&eb, sp[1], events);
373 // Schedule a timeout to unregister the handler after the third read
374 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
381 // The handler should have received the first 3 events,
382 // then been unregistered after that.
383 ASSERT_EQ(handler.log.size(), 3);
384 ASSERT_EQ(events[0].result, initialBytesWritten);
385 for (int n = 0; n < 3; ++n) {
386 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
387 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
388 milliseconds(events[n].milliseconds));
389 ASSERT_EQ(handler.log[n].bytesRead, 0);
390 ASSERT_GT(handler.log[n].bytesWritten, 0);
391 ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
393 T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
397 * Test registering for WRITE when the socket is immediately writable
399 TEST(EventBaseTest, WriteImmediate) {
403 // Register for write events
404 TestHandler handler(&eb, sp[0]);
405 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
407 // Register a timeout to perform a read
408 ScheduledEvent events[] = {
409 { 10, EventHandler::READ, 0 },
412 scheduleEvents(&eb, sp[1], events);
414 // Schedule a timeout to unregister the handler
415 int64_t unregisterTimeout = 40;
416 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler),
424 ASSERT_EQ(handler.log.size(), 2);
426 // Since the socket buffer was initially empty,
427 // there should have been 1 event for immediate writability
428 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
429 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
430 ASSERT_EQ(handler.log[0].bytesRead, 0);
431 ASSERT_GT(handler.log[0].bytesWritten, 0);
433 // There should be another event after the timeout wrote more data
434 ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
435 T_CHECK_TIMEOUT(start, handler.log[1].timestamp,
436 milliseconds(events[0].milliseconds));
437 ASSERT_EQ(handler.log[1].bytesRead, 0);
438 ASSERT_GT(handler.log[1].bytesWritten, 0);
440 T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
444 * Test (READ | WRITE) when the socket becomes readable first
446 TEST(EventBaseTest, ReadWrite) {
450 // Fill up the write buffer before starting
451 size_t sock0WriteLength = writeUntilFull(sp[0]);
453 // Register for read and write events
454 TestHandler handler(&eb, sp[0]);
455 handler.registerHandler(EventHandler::READ_WRITE);
457 // Register timeouts to perform a write then a read.
458 ScheduledEvent events[] = {
459 { 10, EventHandler::WRITE, 2345 },
460 { 40, EventHandler::READ, 0 },
463 scheduleEvents(&eb, sp[1], events);
470 // Since we didn't use the EventHandler::PERSIST flag, the handler should
471 // have only noticed readability, then unregistered itself. Check that only
472 // one event was logged.
473 ASSERT_EQ(handler.log.size(), 1);
474 ASSERT_EQ(handler.log[0].events, EventHandler::READ);
475 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
476 milliseconds(events[0].milliseconds));
477 ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
478 ASSERT_EQ(handler.log[0].bytesWritten, 0);
479 ASSERT_EQ(events[1].result, sock0WriteLength);
480 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
484 * Test (READ | WRITE) when the socket becomes writable first
486 TEST(EventBaseTest, WriteRead) {
490 // Fill up the write buffer before starting
491 size_t sock0WriteLength = writeUntilFull(sp[0]);
493 // Register for read and write events
494 TestHandler handler(&eb, sp[0]);
495 handler.registerHandler(EventHandler::READ_WRITE);
497 // Register timeouts to perform a read then a write.
498 size_t sock1WriteLength = 2345;
499 ScheduledEvent events[] = {
500 { 10, EventHandler::READ, 0 },
501 { 40, EventHandler::WRITE, sock1WriteLength },
504 scheduleEvents(&eb, sp[1], events);
511 // Since we didn't use the EventHandler::PERSIST flag, the handler should
512 // have only noticed writability, then unregistered itself. Check that only
513 // one event was logged.
514 ASSERT_EQ(handler.log.size(), 1);
515 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
516 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
517 milliseconds(events[0].milliseconds));
518 ASSERT_EQ(handler.log[0].bytesRead, 0);
519 ASSERT_GT(handler.log[0].bytesWritten, 0);
520 ASSERT_EQ(events[0].result, sock0WriteLength);
521 ASSERT_EQ(events[1].result, sock1WriteLength);
522 T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
524 // Make sure the written data is still waiting to be read.
525 size_t bytesRemaining = readUntilEmpty(sp[0]);
526 ASSERT_EQ(bytesRemaining, events[1].length);
530 * Test (READ | WRITE) when the socket becomes readable and writable
533 TEST(EventBaseTest, ReadWriteSimultaneous) {
537 // Fill up the write buffer before starting
538 size_t sock0WriteLength = writeUntilFull(sp[0]);
540 // Register for read and write events
541 TestHandler handler(&eb, sp[0]);
542 handler.registerHandler(EventHandler::READ_WRITE);
544 // Register a timeout to perform a read and write together
545 ScheduledEvent events[] = {
546 { 10, EventHandler::READ | EventHandler::WRITE, 0 },
549 scheduleEvents(&eb, sp[1], events);
556 // It's not strictly required that the EventBase register us about both
557 // events in the same call. So, it's possible that if the EventBase
558 // implementation changes this test could start failing, and it wouldn't be
559 // considered breaking the API. However for now it's nice to exercise this
561 ASSERT_EQ(handler.log.size(), 1);
562 ASSERT_EQ(handler.log[0].events,
563 EventHandler::READ | EventHandler::WRITE);
564 T_CHECK_TIMEOUT(start, handler.log[0].timestamp,
565 milliseconds(events[0].milliseconds));
566 ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
567 ASSERT_GT(handler.log[0].bytesWritten, 0);
568 T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
572 * Test (READ | WRITE | PERSIST)
574 TEST(EventBaseTest, ReadWritePersist) {
578 // Register for read and write events
579 TestHandler handler(&eb, sp[0]);
580 handler.registerHandler(EventHandler::READ | EventHandler::WRITE |
581 EventHandler::PERSIST);
583 // Register timeouts to perform several reads and writes
584 ScheduledEvent events[] = {
585 { 10, EventHandler::WRITE, 2345 },
586 { 20, EventHandler::READ, 0 },
587 { 35, EventHandler::WRITE, 200 },
588 { 45, EventHandler::WRITE, 15 },
589 { 55, EventHandler::READ, 0 },
590 { 120, EventHandler::WRITE, 2345 },
593 scheduleEvents(&eb, sp[1], events);
595 // Schedule a timeout to unregister the handler
596 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
603 ASSERT_EQ(handler.log.size(), 6);
605 // Since we didn't fill up the write buffer immediately, there should
606 // be an immediate event for writability.
607 ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
608 T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
609 ASSERT_EQ(handler.log[0].bytesRead, 0);
610 ASSERT_GT(handler.log[0].bytesWritten, 0);
612 // Events 1 through 5 should correspond to the scheduled events
613 for (int n = 1; n < 6; ++n) {
614 ScheduledEvent* event = &events[n - 1];
615 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
616 milliseconds(event->milliseconds));
617 if (event->events == EventHandler::READ) {
618 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
619 ASSERT_EQ(handler.log[n].bytesRead, 0);
620 ASSERT_GT(handler.log[n].bytesWritten, 0);
622 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
623 ASSERT_EQ(handler.log[n].bytesRead, event->length);
624 ASSERT_EQ(handler.log[n].bytesWritten, 0);
628 // The timeout should have unregistered the handler before the last write.
629 // Make sure that data is still waiting to be read
630 size_t bytesRemaining = readUntilEmpty(sp[0]);
631 ASSERT_EQ(bytesRemaining, events[5].length);
635 class PartialReadHandler : public TestHandler {
637 PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
638 : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
640 virtual void handlerReady(uint16_t events) noexcept {
641 assert(events == EventHandler::READ);
642 ssize_t bytesRead = readFromFD(fd_, readLength_);
643 log.push_back(EventRecord(events, bytesRead, 0));
652 * Test reading only part of the available data when a read event is fired.
653 * When PERSIST is used, make sure the handler gets notified again the next
654 * time around the loop.
656 TEST(EventBaseTest, ReadPartial) {
660 // Register for read events
661 size_t readLength = 100;
662 PartialReadHandler handler(&eb, sp[0], readLength);
663 handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
665 // Register a timeout to perform a single write,
666 // with more data than PartialReadHandler will read at once
667 ScheduledEvent events[] = {
668 { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) },
671 scheduleEvents(&eb, sp[1], events);
673 // Schedule a timeout to unregister the handler
674 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
681 ASSERT_EQ(handler.log.size(), 4);
683 // The first 3 invocations should read readLength bytes each
684 for (int n = 0; n < 3; ++n) {
685 ASSERT_EQ(handler.log[n].events, EventHandler::READ);
686 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
687 milliseconds(events[0].milliseconds));
688 ASSERT_EQ(handler.log[n].bytesRead, readLength);
689 ASSERT_EQ(handler.log[n].bytesWritten, 0);
691 // The last read only has readLength/2 bytes
692 ASSERT_EQ(handler.log[3].events, EventHandler::READ);
693 T_CHECK_TIMEOUT(start, handler.log[3].timestamp,
694 milliseconds(events[0].milliseconds));
695 ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
696 ASSERT_EQ(handler.log[3].bytesWritten, 0);
700 class PartialWriteHandler : public TestHandler {
702 PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
703 : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
705 virtual void handlerReady(uint16_t events) noexcept {
706 assert(events == EventHandler::WRITE);
707 ssize_t bytesWritten = writeToFD(fd_, writeLength_);
708 log.push_back(EventRecord(events, 0, bytesWritten));
717 * Test writing without completely filling up the write buffer when the fd
718 * becomes writable. When PERSIST is used, make sure the handler gets
719 * notified again the next time around the loop.
721 TEST(EventBaseTest, WritePartial) {
725 // Fill up the write buffer before starting
726 size_t initialBytesWritten = writeUntilFull(sp[0]);
728 // Register for write events
729 size_t writeLength = 100;
730 PartialWriteHandler handler(&eb, sp[0], writeLength);
731 handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
733 // Register a timeout to read, so that more data can be written
734 ScheduledEvent events[] = {
735 { 10, EventHandler::READ, 0 },
738 scheduleEvents(&eb, sp[1], events);
740 // Schedule a timeout to unregister the handler
741 eb.runAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
748 // Depending on how big the socket buffer is, there will be multiple writes
749 // Only check the first 5
751 ASSERT_GE(handler.log.size(), numChecked);
752 ASSERT_EQ(events[0].result, initialBytesWritten);
754 // The first 3 invocations should read writeLength bytes each
755 for (int n = 0; n < numChecked; ++n) {
756 ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
757 T_CHECK_TIMEOUT(start, handler.log[n].timestamp,
758 milliseconds(events[0].milliseconds));
759 ASSERT_EQ(handler.log[n].bytesRead, 0);
760 ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
766 * Test destroying a registered EventHandler
768 TEST(EventBaseTest, DestroyHandler) {
769 class DestroyHandler : public AsyncTimeout {
771 DestroyHandler(EventBase* eb, EventHandler* h)
775 virtual void timeoutExpired() noexcept {
780 EventHandler* handler_;
786 // Fill up the write buffer before starting
787 size_t initialBytesWritten = writeUntilFull(sp[0]);
789 // Register for write events
790 TestHandler* handler = new TestHandler(&eb, sp[0]);
791 handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
793 // After 10ms, read some data, so that the handler
794 // will be notified that it can write.
795 eb.runAfterDelay(std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten),
798 // Start a timer to destroy the handler after 25ms
799 // This mainly just makes sure the code doesn't break or assert
800 DestroyHandler dh(&eb, handler);
801 dh.scheduleTimeout(25);
807 // Make sure the EventHandler was uninstalled properly when it was
808 // destroyed, and the EventBase loop exited
809 T_CHECK_TIMEOUT(start, end, milliseconds(25));
811 // Make sure that the handler wrote data to the socket
812 // before it was destroyed
813 size_t bytesRemaining = readUntilEmpty(sp[1]);
814 ASSERT_GT(bytesRemaining, 0);
818 ///////////////////////////////////////////////////////////////////////////
819 // Tests for timeout events
820 ///////////////////////////////////////////////////////////////////////////
822 TEST(EventBaseTest, RunAfterDelay) {
825 TimePoint timestamp1(false);
826 TimePoint timestamp2(false);
827 TimePoint timestamp3(false);
828 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
829 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
830 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40);
836 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
837 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
838 T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
839 T_CHECK_TIMEOUT(start, end, milliseconds(40));
843 * Test the behavior of runAfterDelay() when some timeouts are
844 * still scheduled when the EventBase is destroyed.
846 TEST(EventBaseTest, RunAfterDelayDestruction) {
847 TimePoint timestamp1(false);
848 TimePoint timestamp2(false);
849 TimePoint timestamp3(false);
850 TimePoint timestamp4(false);
851 TimePoint start(false);
852 TimePoint end(false);
857 // Run two normal timeouts
858 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
859 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
861 // Schedule a timeout to stop the event loop after 40ms
862 eb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
864 // Schedule 2 timeouts that would fire after the event loop stops
865 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80);
866 eb.runAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160);
873 T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
874 T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
875 T_CHECK_TIMEOUT(start, end, milliseconds(40));
877 ASSERT_TRUE(timestamp3.isUnset());
878 ASSERT_TRUE(timestamp4.isUnset());
880 // Ideally this test should be run under valgrind to ensure that no
884 class TestTimeout : public AsyncTimeout {
886 explicit TestTimeout(EventBase* eventBase)
887 : AsyncTimeout(eventBase)
888 , timestamp(false) {}
890 virtual void timeoutExpired() noexcept {
897 TEST(EventBaseTest, BasicTimeouts) {
903 t1.scheduleTimeout(10);
904 t2.scheduleTimeout(20);
905 t3.scheduleTimeout(40);
911 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
912 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
913 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
914 T_CHECK_TIMEOUT(start, end, milliseconds(40));
917 class ReschedulingTimeout : public AsyncTimeout {
919 ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
921 , timeouts_(timeouts)
922 , iterator_(timeouts_.begin()) {}
928 virtual void timeoutExpired() noexcept {
929 timestamps.push_back(TimePoint());
934 if (iterator_ != timeouts_.end()) {
935 uint32_t timeout = *iterator_;
937 scheduleTimeout(timeout);
941 vector<TimePoint> timestamps;
944 vector<uint32_t> timeouts_;
945 vector<uint32_t>::const_iterator iterator_;
949 * Test rescheduling the same timeout multiple times
951 TEST(EventBaseTest, ReuseTimeout) {
954 vector<uint32_t> timeouts;
955 timeouts.push_back(10);
956 timeouts.push_back(30);
957 timeouts.push_back(15);
959 ReschedulingTimeout t(&eb, timeouts);
966 // Use a higher tolerance than usual. We're waiting on 3 timeouts
967 // consecutively. In general, each timeout may go over by a few
968 // milliseconds, and we're tripling this error by witing on 3 timeouts.
969 milliseconds tolerance{6};
971 ASSERT_EQ(timeouts.size(), t.timestamps.size());
973 for (size_t n = 0; n < timeouts.size(); ++n) {
974 total += timeouts[n];
975 T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
977 T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
981 * Test rescheduling a timeout before it has fired
983 TEST(EventBaseTest, RescheduleTimeout) {
990 t1.scheduleTimeout(15);
991 t2.scheduleTimeout(30);
992 t3.scheduleTimeout(30);
994 auto f = static_cast<bool(AsyncTimeout::*)(uint32_t)>(
995 &AsyncTimeout::scheduleTimeout);
997 // after 10ms, reschedule t2 to run sooner than originally scheduled
998 eb.runAfterDelay(std::bind(f, &t2, 10), 10);
999 // after 10ms, reschedule t3 to run later than originally scheduled
1000 eb.runAfterDelay(std::bind(f, &t3, 40), 10);
1006 T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
1007 T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
1008 T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
1009 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1013 * Test cancelling a timeout
1015 TEST(EventBaseTest, CancelTimeout) {
1018 vector<uint32_t> timeouts;
1019 timeouts.push_back(10);
1020 timeouts.push_back(30);
1021 timeouts.push_back(25);
1023 ReschedulingTimeout t(&eb, timeouts);
1025 eb.runAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
1031 ASSERT_EQ(t.timestamps.size(), 2);
1032 T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
1033 T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
1034 T_CHECK_TIMEOUT(start, end, milliseconds(50));
1038 * Test destroying a scheduled timeout object
1040 TEST(EventBaseTest, DestroyTimeout) {
1041 class DestroyTimeout : public AsyncTimeout {
1043 DestroyTimeout(EventBase* eb, AsyncTimeout* t)
1047 virtual void timeoutExpired() noexcept {
1052 AsyncTimeout* timeout_;
1057 TestTimeout* t1 = new TestTimeout(&eb);
1058 t1->scheduleTimeout(30);
1060 DestroyTimeout dt(&eb, t1);
1061 dt.scheduleTimeout(10);
1067 T_CHECK_TIMEOUT(start, end, milliseconds(10));
1071 ///////////////////////////////////////////////////////////////////////////
1072 // Test for runInThreadTestFunc()
1073 ///////////////////////////////////////////////////////////////////////////
1075 struct RunInThreadData {
1076 RunInThreadData(int numThreads, int opsPerThread)
1077 : opsPerThread(opsPerThread)
1078 , opsToGo(numThreads*opsPerThread) {}
1081 deque< pair<int, int> > values;
1087 struct RunInThreadArg {
1088 RunInThreadArg(RunInThreadData* data,
1095 RunInThreadData* data;
1100 void runInThreadTestFunc(RunInThreadArg* arg) {
1101 arg->data->values.push_back(make_pair(arg->thread, arg->value));
1102 RunInThreadData* data = arg->data;
1105 if(--data->opsToGo == 0) {
1106 // Break out of the event base loop if we are the last thread running
1107 data->evb.terminateLoopSoon();
1111 TEST(EventBaseTest, RunInThread) {
1112 uint32_t numThreads = 50;
1113 uint32_t opsPerThread = 100;
1114 RunInThreadData data(numThreads, opsPerThread);
1116 deque<std::thread> threads;
1117 for (uint32_t i = 0; i < numThreads; ++i) {
1118 threads.emplace_back([i, &data] {
1119 for (int n = 0; n < data.opsPerThread; ++n) {
1120 RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
1121 data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
1127 // Add a timeout event to run after 3 seconds.
1128 // Otherwise loop() will return immediately since there are no events to run.
1129 // Once the last thread exits, it will stop the loop(). However, this
1130 // timeout also stops the loop in case there is a bug performing the normal
1132 data.evb.runAfterDelay(std::bind(&EventBase::terminateLoopSoon, &data.evb),
1139 // Verify that the loop exited because all threads finished and requested it
1140 // to stop. This should happen much sooner than the 3 second timeout.
1141 // Assert that it happens in under a second. (This is still tons of extra
1144 auto timeTaken = std::chrono::duration_cast<milliseconds>(
1145 end.getTime() - start.getTime());
1146 ASSERT_LT(timeTaken.count(), 1000);
1147 VLOG(11) << "Time taken: " << timeTaken.count();
1149 // Verify that we have all of the events from every thread
1150 int expectedValues[numThreads];
1151 for (uint32_t n = 0; n < numThreads; ++n) {
1152 expectedValues[n] = 0;
1154 for (deque< pair<int, int> >::const_iterator it = data.values.begin();
1155 it != data.values.end();
1157 int threadID = it->first;
1158 int value = it->second;
1159 ASSERT_EQ(expectedValues[threadID], value);
1160 ++expectedValues[threadID];
1162 for (uint32_t n = 0; n < numThreads; ++n) {
1163 ASSERT_EQ(expectedValues[n], opsPerThread);
1166 // Wait on all of the threads.
1167 for (auto& thread: threads) {
1172 ///////////////////////////////////////////////////////////////////////////
1173 // Tests for runInLoop()
1174 ///////////////////////////////////////////////////////////////////////////
1176 class CountedLoopCallback : public EventBase::LoopCallback {
1178 CountedLoopCallback(EventBase* eventBase,
1180 std::function<void()> action =
1181 std::function<void()>())
1182 : eventBase_(eventBase)
1184 , action_(action) {}
1186 virtual void runLoopCallback() noexcept {
1189 eventBase_->runInLoop(this);
1190 } else if (action_) {
1195 unsigned int getCount() const {
1200 EventBase* eventBase_;
1201 unsigned int count_;
1202 std::function<void()> action_;
1205 // Test that EventBase::loop() doesn't exit while there are
1206 // still LoopCallbacks remaining to be invoked.
1207 TEST(EventBaseTest, RepeatedRunInLoop) {
1208 EventBase eventBase;
1210 CountedLoopCallback c(&eventBase, 10);
1211 eventBase.runInLoop(&c);
1212 // The callback shouldn't have run immediately
1213 ASSERT_EQ(c.getCount(), 10);
1216 // loop() should loop until the CountedLoopCallback stops
1217 // re-installing itself.
1218 ASSERT_EQ(c.getCount(), 0);
1221 // Test runInLoop() calls with terminateLoopSoon()
1222 TEST(EventBaseTest, RunInLoopStopLoop) {
1223 EventBase eventBase;
1225 CountedLoopCallback c1(&eventBase, 20);
1226 CountedLoopCallback c2(&eventBase, 10,
1227 std::bind(&EventBase::terminateLoopSoon, &eventBase));
1229 eventBase.runInLoop(&c1);
1230 eventBase.runInLoop(&c2);
1231 ASSERT_EQ(c1.getCount(), 20);
1232 ASSERT_EQ(c2.getCount(), 10);
1234 eventBase.loopForever();
1236 // c2 should have stopped the loop after 10 iterations
1237 ASSERT_EQ(c2.getCount(), 0);
1239 // We allow the EventBase to run the loop callbacks in whatever order it
1240 // chooses. We'll accept c1's count being either 10 (if the loop terminated
1241 // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
1244 // (With the current code, c1 will always run 10 times, but we don't consider
1245 // this a hard API requirement.)
1246 ASSERT_GE(c1.getCount(), 10);
1247 ASSERT_LE(c1.getCount(), 11);
1250 // Test cancelling runInLoop() callbacks
1251 TEST(EventBaseTest, CancelRunInLoop) {
1252 EventBase eventBase;
1254 CountedLoopCallback c1(&eventBase, 20);
1255 CountedLoopCallback c2(&eventBase, 20);
1256 CountedLoopCallback c3(&eventBase, 20);
1258 std::function<void()> cancelC1Action =
1259 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
1260 std::function<void()> cancelC2Action =
1261 std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
1263 CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
1264 CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
1266 // Install cancelC1 after c1
1267 eventBase.runInLoop(&c1);
1268 eventBase.runInLoop(&cancelC1);
1270 // Install cancelC2 before c2
1271 eventBase.runInLoop(&cancelC2);
1272 eventBase.runInLoop(&c2);
1275 eventBase.runInLoop(&c3);
1277 ASSERT_EQ(c1.getCount(), 20);
1278 ASSERT_EQ(c2.getCount(), 20);
1279 ASSERT_EQ(c3.getCount(), 20);
1280 ASSERT_EQ(cancelC1.getCount(), 10);
1281 ASSERT_EQ(cancelC2.getCount(), 10);
1286 // cancelC1 and cancelC3 should have both fired after 10 iterations and
1287 // stopped re-installing themselves
1288 ASSERT_EQ(cancelC1.getCount(), 0);
1289 ASSERT_EQ(cancelC2.getCount(), 0);
1290 // c3 should have continued on for the full 20 iterations
1291 ASSERT_EQ(c3.getCount(), 0);
1293 // c1 and c2 should have both been cancelled on the 10th iteration.
1295 // Callbacks are always run in the order they are installed,
1296 // so c1 should have fired 10 times, and been canceled after it ran on the
1297 // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
1298 // have run before it on the 10th iteration, and cancelled it before it
1300 ASSERT_EQ(c1.getCount(), 10);
1301 ASSERT_EQ(c2.getCount(), 11);
1304 class TerminateTestCallback : public EventBase::LoopCallback,
1305 public EventHandler {
1307 TerminateTestCallback(EventBase* eventBase, int fd)
1308 : EventHandler(eventBase, fd),
1309 eventBase_(eventBase),
1310 loopInvocations_(0),
1311 maxLoopInvocations_(0),
1312 eventInvocations_(0),
1313 maxEventInvocations_(0) {}
1315 void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
1316 loopInvocations_ = 0;
1317 maxLoopInvocations_ = maxLoopInvocations;
1318 eventInvocations_ = 0;
1319 maxEventInvocations_ = maxEventInvocations;
1321 cancelLoopCallback();
1322 unregisterHandler();
1325 virtual void handlerReady(uint16_t events) noexcept {
1326 // We didn't register with PERSIST, so we will have been automatically
1327 // unregistered already.
1328 ASSERT_FALSE(isHandlerRegistered());
1330 ++eventInvocations_;
1331 if (eventInvocations_ >= maxEventInvocations_) {
1335 eventBase_->runInLoop(this);
1337 virtual void runLoopCallback() noexcept {
1339 if (loopInvocations_ >= maxLoopInvocations_) {
1343 registerHandler(READ);
1346 uint32_t getLoopInvocations() const {
1347 return loopInvocations_;
1349 uint32_t getEventInvocations() const {
1350 return eventInvocations_;
1354 EventBase* eventBase_;
1355 uint32_t loopInvocations_;
1356 uint32_t maxLoopInvocations_;
1357 uint32_t eventInvocations_;
1358 uint32_t maxEventInvocations_;
1362 * Test that EventBase::loop() correctly detects when there are no more events
1365 * This uses a single callback, which alternates registering itself as a loop
1366 * callback versus a EventHandler callback. This exercises a regression where
1367 * EventBase::loop() incorrectly exited if there were no more fd handlers
1368 * registered, but a loop callback installed a new fd handler.
1370 TEST(EventBaseTest, LoopTermination) {
1371 EventBase eventBase;
1373 // Open a pipe and close the write end,
1374 // so the read endpoint will be readable
1376 int rc = pipe(pipeFds);
1379 TerminateTestCallback callback(&eventBase, pipeFds[0]);
1381 // Test once where the callback will exit after a loop callback
1382 callback.reset(10, 100);
1383 eventBase.runInLoop(&callback);
1385 ASSERT_EQ(callback.getLoopInvocations(), 10);
1386 ASSERT_EQ(callback.getEventInvocations(), 9);
1388 // Test once where the callback will exit after an fd event callback
1389 callback.reset(100, 7);
1390 eventBase.runInLoop(&callback);
1392 ASSERT_EQ(callback.getLoopInvocations(), 7);
1393 ASSERT_EQ(callback.getEventInvocations(), 7);
1398 ///////////////////////////////////////////////////////////////////////////
1399 // Tests for latency calculations
1400 ///////////////////////////////////////////////////////////////////////////
1402 class IdleTimeTimeoutSeries : public AsyncTimeout {
1406 explicit IdleTimeTimeoutSeries(EventBase *base,
1407 std::deque<std::uint64_t>& timeout) :
1414 virtual ~IdleTimeTimeoutSeries() {}
1416 void timeoutExpired() noexcept {
1419 if(timeout_.empty()){
1422 uint64_t sleepTime = timeout_.front();
1423 timeout_.pop_front();
1431 int getTimeouts() const {
1437 std::deque<uint64_t>& timeout_;
1441 * Verify that idle time is correctly accounted for when decaying our loop
1444 * This works by creating a high loop time (via usleep), expecting a latency
1445 * callback with known value, and then scheduling a timeout for later. This
1446 * later timeout is far enough in the future that the idle time should have
1447 * caused the loop time to decay.
1449 TEST(EventBaseTest, IdleTime) {
1450 EventBase eventBase;
1451 eventBase.setLoadAvgMsec(1000);
1452 eventBase.resetLoadAvg(5900.0);
1453 std::deque<uint64_t> timeouts0(4, 8080);
1454 timeouts0.push_front(8000);
1455 timeouts0.push_back(14000);
1456 IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
1457 std::deque<uint64_t> timeouts(20, 20);
1458 std::unique_ptr<IdleTimeTimeoutSeries> tos;
1460 int latencyCallbacks = 0;
1461 eventBase.setMaxLatency(6000, [&]() {
1464 switch (latencyCallbacks) {
1466 ASSERT_EQ(6, tos0.getTimeouts());
1467 ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
1468 ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
1469 tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
1473 FAIL() << "Unexpected latency callback";
1478 // Kick things off with an "immedite" timeout
1479 tos0.scheduleTimeout(1);
1483 ASSERT_EQ(1, latencyCallbacks);
1484 ASSERT_EQ(7, tos0.getTimeouts());
1485 ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
1486 ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
1487 ASSERT_EQ(21, tos->getTimeouts());
1491 * Test that thisLoop functionality works with terminateLoopSoon
1493 TEST(EventBaseTest, ThisLoop) {
1495 bool runInLoop = false;
1496 bool runThisLoop = false;
1499 eb.terminateLoopSoon();
1500 eb.runInLoop([&]() {
1503 eb.runInLoop([&]() {
1510 ASSERT_FALSE(runInLoop);
1511 // Should work with thisLoop
1512 ASSERT_TRUE(runThisLoop);
1515 TEST(EventBaseTest, EventBaseThreadLoop) {
1519 base.runInEventBaseThread([&](){
1524 ASSERT_EQ(true, ran);
1527 TEST(EventBaseTest, EventBaseThreadName) {
1529 base.setName("foo");
1532 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
1534 pthread_getname_np(pthread_self(), name, 16);
1535 ASSERT_EQ(0, strcmp("foo", name));
1539 TEST(EventBaseTest, RunBeforeLoop) {
1541 CountedLoopCallback cb(&base, 1, [&](){
1542 base.terminateLoopSoon();
1544 base.runBeforeLoop(&cb);
1546 ASSERT_EQ(cb.getCount(), 0);
1549 TEST(EventBaseTest, RunBeforeLoopWait) {
1551 CountedLoopCallback cb(&base, 1);
1552 base.runAfterDelay([&](){
1553 base.terminateLoopSoon();
1555 base.runBeforeLoop(&cb);
1558 // Check that we only ran once, and did not loop multiple times.
1559 ASSERT_EQ(cb.getCount(), 0);
1562 class PipeHandler : public EventHandler {
1564 PipeHandler(EventBase* eventBase, int fd)
1565 : EventHandler(eventBase, fd) {}
1567 void handlerReady(uint16_t events) noexcept {
1572 TEST(EventBaseTest, StopBeforeLoop) {
1575 // Give the evb something to do.
1577 ASSERT_EQ(0, pipe(p));
1578 PipeHandler handler(&evb, p[0]);
1579 handler.registerHandler(EventHandler::READ);
1581 // It's definitely not running yet
1582 evb.terminateLoopSoon();
1584 // let it run, it should exit quickly.
1585 std::thread t([&] { evb.loop(); });
1588 handler.unregisterHandler();
1595 TEST(EventBaseTest, RunCallbacksOnDestruction) {
1600 base.runInEventBaseThread([&](){