2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/async/NotificationQueue.h>
18 #include <folly/io/async/ScopedEventBaseThread.h>
20 #include <folly/Baton.h>
24 #include <sys/types.h>
27 #include <gtest/gtest.h>
30 using namespace folly;
32 typedef NotificationQueue<int> IntQueue;
34 class QueueConsumer : public IntQueue::Consumer {
38 void messageAvailable(int&& value) override {
39 messages.push_back(value);
45 std::function<void(int)> fn;
46 std::deque<int> messages;
51 explicit QueueTest(uint32_t maxSize = 0,
52 IntQueue::FdType type = IntQueue::FdType::EVENTFD) :
54 terminationQueue(maxSize, type)
62 void destroyCallback();
66 IntQueue terminationQueue;
70 void QueueTest::sendOne() {
71 // Create a notification queue and a callback in this thread
74 QueueConsumer consumer;
75 consumer.fn = [&](int) {
76 // Stop consuming after we receive 1 message
77 consumer.stopConsuming();
79 consumer.startConsuming(&eventBase, &queue);
81 // Start a new EventBase thread to put a message on our queue
82 ScopedEventBaseThread t1;
83 t1.getEventBase()->runInEventBaseThread([&] {
87 // Loop until we receive the message
90 const auto& messages = consumer.messages;
91 EXPECT_EQ(1, messages.size());
92 EXPECT_EQ(5, messages.at(0));
95 void QueueTest::putMessages() {
98 QueueConsumer consumer;
99 QueueConsumer consumer2;
100 consumer.fn = [&](int msg) {
101 // Stop consuming after we receive a message with value 0, and start
104 consumer.stopConsuming();
105 consumer2.startConsuming(&eventBase, &queue);
108 consumer2.fn = [&](int msg) {
109 // Stop consuming after we receive a message with value 0
111 consumer2.stopConsuming();
114 consumer.startConsuming(&eventBase, &queue);
116 list<int> msgList = { 1, 2, 3, 4 };
117 vector<int> msgVector = { 5, 0, 9, 8, 7, 6, 7, 7,
118 8, 8, 2, 9, 6, 6, 10, 2, 0 };
119 // Call putMessages() several times to add messages to the queue
120 queue.putMessages(msgList.begin(), msgList.end());
121 queue.putMessages(msgVector.begin() + 2, msgVector.begin() + 4);
122 // Test sending 17 messages, the pipe-based queue calls write in 16 byte
124 queue.putMessages(msgVector.begin(), msgVector.end());
126 // Loop until the consumer has stopped
129 vector<int> expectedMessages = { 1, 2, 3, 4, 9, 8, 7, 5, 0 };
130 vector<int> expectedMessages2 = { 9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 10, 2, 0 };
131 EXPECT_EQ(expectedMessages.size(), consumer.messages.size());
132 for (unsigned int idx = 0; idx < expectedMessages.size(); ++idx) {
133 EXPECT_EQ(expectedMessages[idx], consumer.messages.at(idx));
135 EXPECT_EQ(expectedMessages2.size(), consumer2.messages.size());
136 for (unsigned int idx = 0; idx < expectedMessages2.size(); ++idx) {
137 EXPECT_EQ(expectedMessages2[idx], consumer2.messages.at(idx));
141 void QueueTest::multiConsumer() {
142 uint32_t numConsumers = 8;
143 uint32_t numMessages = 10000;
145 // Create several consumers each running in their own EventBase thread
146 vector<QueueConsumer> consumers(numConsumers);
147 vector<ScopedEventBaseThread> threads(numConsumers);
149 for (uint32_t consumerIdx = 0; consumerIdx < numConsumers; ++consumerIdx) {
150 QueueConsumer* consumer = &consumers[consumerIdx];
152 consumer->fn = [consumer, consumerIdx, this](int value) {
153 // Treat 0 as a signal to stop.
155 consumer->stopConsuming();
156 // Put a message on the terminationQueue to indicate we have stopped
157 terminationQueue.putMessage(consumerIdx);
161 EventBase* eventBase = threads[consumerIdx].getEventBase();
162 eventBase->runInEventBaseThread([eventBase, consumer, this] {
163 consumer->startConsuming(eventBase, &queue);
167 // Now add a number of messages from this thread
168 // Start at 1 rather than 0, since 0 is the signal to stop.
169 for (uint32_t n = 1; n < numMessages; ++n) {
172 // Now add a 0 for each consumer, to signal them to stop
173 for (uint32_t n = 0; n < numConsumers; ++n) {
177 // Wait until we get notified that all of the consumers have stopped
178 // We use a separate notification queue for this.
179 QueueConsumer terminationConsumer;
180 vector<uint32_t> consumersStopped(numConsumers, 0);
181 uint32_t consumersRemaining = numConsumers;
182 terminationConsumer.fn = [&](int consumerIdx) {
183 --consumersRemaining;
184 if (consumersRemaining == 0) {
185 terminationConsumer.stopConsuming();
188 EXPECT_GE(consumerIdx, 0);
189 EXPECT_LT(consumerIdx, numConsumers);
190 ++consumersStopped[consumerIdx];
193 terminationConsumer.startConsuming(&eventBase, &terminationQueue);
196 // Verify that we saw exactly 1 stop message for each consumer
197 for (uint32_t n = 0; n < numConsumers; ++n) {
198 EXPECT_EQ(1, consumersStopped[n]);
201 // Validate that every message sent to the main queue was received exactly
203 vector<int> messageCount(numMessages, 0);
204 for (uint32_t n = 0; n < numConsumers; ++n) {
205 for (int msg : consumers[n].messages) {
207 EXPECT_LT(msg, numMessages);
212 // 0 is the signal to stop, and should have been received once by each
214 EXPECT_EQ(numConsumers, messageCount[0]);
215 // All other messages should have been received exactly once
216 for (uint32_t n = 1; n < numMessages; ++n) {
217 EXPECT_EQ(1, messageCount[n]);
221 void QueueTest::maxQueueSize() {
222 // Create a queue with a maximum size of 5, and fill it up
224 for (int n = 0; n < 5; ++n) {
225 queue.tryPutMessage(n);
228 // Calling tryPutMessage() now should fail
229 EXPECT_THROW(queue.tryPutMessage(5), std::overflow_error);
231 EXPECT_FALSE(queue.tryPutMessageNoThrow(5));
233 EXPECT_FALSE(queue.tryPutMessageNoThrow(std::move(val)));
235 // Pop a message from the queue
237 EXPECT_TRUE(queue.tryConsume(result));
238 EXPECT_EQ(0, result);
240 // We should be able to write another message now that we popped one off.
241 queue.tryPutMessage(5);
242 // But now we are full again.
243 EXPECT_THROW(queue.tryPutMessage(6), std::overflow_error);
244 // putMessage() should let us exceed the maximum
247 // Pull another mesage off
248 EXPECT_TRUE(queue.tryConsume(result));
249 EXPECT_EQ(1, result);
251 // tryPutMessage() should still fail since putMessage() actually put us over
253 EXPECT_THROW(queue.tryPutMessage(7), std::overflow_error);
255 // Pull another message off and try again
256 EXPECT_TRUE(queue.tryConsume(result));
257 EXPECT_EQ(2, result);
258 queue.tryPutMessage(7);
260 // Now pull all the remaining messages off
261 EXPECT_TRUE(queue.tryConsume(result));
262 EXPECT_EQ(3, result);
263 EXPECT_TRUE(queue.tryConsume(result));
264 EXPECT_EQ(4, result);
265 EXPECT_TRUE(queue.tryConsume(result));
266 EXPECT_EQ(5, result);
267 EXPECT_TRUE(queue.tryConsume(result));
268 EXPECT_EQ(6, result);
269 EXPECT_TRUE(queue.tryConsume(result));
270 EXPECT_EQ(7, result);
272 // There should be no messages left
274 EXPECT_TRUE(!queue.tryConsume(result));
275 EXPECT_EQ(-1, result);
279 void QueueTest::maxReadAtOnce() {
280 // Add 100 messages to the queue
281 for (int n = 0; n < 100; ++n) {
287 // Record how many messages were processed each loop iteration.
288 uint32_t messagesThisLoop = 0;
289 std::vector<uint32_t> messagesPerLoop;
290 std::function<void()> loopFinished = [&] {
291 // Record the current number of messages read this loop
292 messagesPerLoop.push_back(messagesThisLoop);
293 // Reset messagesThisLoop to 0 for the next loop
294 messagesThisLoop = 0;
296 // To prevent use-after-free bugs when eventBase destructs,
297 // prevent calling runInLoop any more after the test is finished.
298 // 55 == number of times loop should run.
299 if (messagesPerLoop.size() != 55) {
300 // Reschedule ourself to run at the end of the next loop
301 eventBase.runInLoop(loopFinished);
304 // Schedule the first call to loopFinished
305 eventBase.runInLoop(loopFinished);
307 QueueConsumer consumer;
308 // Read the first 50 messages 10 at a time.
309 consumer.setMaxReadAtOnce(10);
310 consumer.fn = [&](int value) {
312 // After 50 messages, drop to reading only 1 message at a time.
314 consumer.setMaxReadAtOnce(1);
316 // Terminate the loop when we reach the end of the messages.
318 eventBase.terminateLoopSoon();
321 consumer.startConsuming(&eventBase, &queue);
323 // Run the event loop until the consumer terminates it
326 // The consumer should have read all 100 messages in order
327 EXPECT_EQ(100, consumer.messages.size());
328 for (int n = 0; n < 100; ++n) {
329 EXPECT_EQ(n, consumer.messages.at(n));
332 // Currently EventBase happens to still run the loop callbacks even after
333 // terminateLoopSoon() is called. However, we don't really want to depend on
334 // this behavior. In case this ever changes in the future, add
335 // messagesThisLoop to messagesPerLoop in loop callback isn't invoked for the
336 // last loop iteration.
337 if (messagesThisLoop > 0) {
338 messagesPerLoop.push_back(messagesThisLoop);
339 messagesThisLoop = 0;
342 // For the first 5 loops it should have read 10 messages each time.
343 // After that it should have read 1 messages per loop for the next 50 loops.
344 EXPECT_EQ(55, messagesPerLoop.size());
345 for (int n = 0; n < 5; ++n) {
346 EXPECT_EQ(10, messagesPerLoop.at(n));
348 for (int n = 5; n < 55; ++n) {
349 EXPECT_EQ(1, messagesPerLoop.at(n));
354 void QueueTest::destroyCallback() {
355 // Rather than using QueueConsumer, define a separate class for the destroy
356 // test. The DestroyTestConsumer will delete itself inside the
357 // messageAvailable() callback. With a regular QueueConsumer this would
358 // destroy the std::function object while the function is running, which we
359 // should probably avoid doing. This uses a pointer to a std::function to
360 // avoid destroying the function object.
361 class DestroyTestConsumer : public IntQueue::Consumer {
363 DestroyTestConsumer() {}
365 void messageAvailable(int&& value) override {
371 std::function<void(int)> *fn;
375 // Create a queue and add 2 messages to it
379 // Create two QueueConsumers allocated on the heap.
380 // Have whichever one gets called first destroy both of the QueueConsumers.
381 // This way one consumer will be destroyed from inside its messageAvailable()
382 // callback, and one consume will be destroyed when it isn't inside
383 // messageAvailable().
384 std::unique_ptr<DestroyTestConsumer> consumer1(new DestroyTestConsumer);
385 std::unique_ptr<DestroyTestConsumer> consumer2(new DestroyTestConsumer);
386 std::function<void(int)> fn = [&](int) {
393 consumer1->startConsuming(&eventBase, &queue);
394 consumer2->startConsuming(&eventBase, &queue);
396 // Run the event loop.
399 // One of the consumers should have fired, received the message,
400 // then destroyed both consumers.
401 EXPECT_TRUE(!consumer1);
402 EXPECT_TRUE(!consumer2);
403 // One message should be left in the queue
405 EXPECT_TRUE(queue.tryConsume(result));
406 EXPECT_EQ(2, result);
409 TEST(NotificationQueueTest, ConsumeUntilDrained) {
410 // Basic tests: make sure we
411 // - drain all the messages
412 // - ignore any maxReadAtOnce
413 // - can't add messages during draining
416 QueueConsumer consumer;
417 consumer.fn = [&](int i) {
418 EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error);
419 EXPECT_FALSE(queue.tryPutMessageNoThrow(i));
420 EXPECT_THROW(queue.putMessage(i), std::runtime_error);
421 std::vector<int> ints{1, 2, 3};
423 queue.putMessages(ints.begin(), ints.end()),
426 consumer.setMaxReadAtOnce(10); // We should ignore this
427 consumer.startConsuming(&eventBase, &queue);
428 for (int i = 0; i < 20; i++) {
431 EXPECT_TRUE(consumer.consumeUntilDrained());
432 EXPECT_EQ(20, consumer.messages.size());
434 // Make sure there can only be one drainer at once
435 folly::Baton<> callbackBaton, threadStartBaton;
436 consumer.fn = [&](int i) {
437 callbackBaton.wait();
439 QueueConsumer competingConsumer;
440 competingConsumer.startConsuming(&eventBase, &queue);
442 atomic<bool> raceA {false};
443 atomic<bool> raceB {false};
446 auto thread = std::thread([&]{
447 threadStartBaton.post();
448 raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
450 threadStartBaton.wait();
451 raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
452 callbackBaton.post();
454 EXPECT_FALSE(raceA && raceB);
455 EXPECT_TRUE(raceA || raceB);
456 EXPECT_TRUE(raceA ^ raceB);
459 TEST(NotificationQueueTest, ConsumeUntilDrainedStress) {
460 for (size_t i = 0; i < 1 << 8; ++i) {
461 // Basic tests: make sure we
462 // - drain all the messages
463 // - ignore any maxReadAtOnce
464 // - can't add messages during draining
467 QueueConsumer consumer;
468 consumer.fn = [&](int i) {
469 EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error);
470 EXPECT_FALSE(queue.tryPutMessageNoThrow(i));
471 EXPECT_THROW(queue.putMessage(i), std::runtime_error);
472 std::vector<int> ints{1, 2, 3};
474 queue.putMessages(ints.begin(), ints.end()),
477 consumer.setMaxReadAtOnce(10); // We should ignore this
478 consumer.startConsuming(&eventBase, &queue);
479 for (int i = 0; i < 20; i++) {
482 EXPECT_TRUE(consumer.consumeUntilDrained());
483 EXPECT_EQ(20, consumer.messages.size());
485 // Make sure there can only be one drainer at once
486 folly::Baton<> callbackBaton, threadStartBaton;
487 consumer.fn = [&](int i) {
488 callbackBaton.wait();
490 QueueConsumer competingConsumer;
491 competingConsumer.startConsuming(&eventBase, &queue);
493 atomic<bool> raceA {false};
494 atomic<bool> raceB {false};
497 auto thread = std::thread([&]{
498 threadStartBaton.post();
499 raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
501 threadStartBaton.wait();
502 raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
503 callbackBaton.post();
505 EXPECT_FALSE(raceA && raceB);
506 EXPECT_TRUE(raceA || raceB);
507 EXPECT_TRUE(raceA ^ raceB);
511 TEST(NotificationQueueTest, SendOne) {
516 TEST(NotificationQueueTest, PutMessages) {
521 TEST(NotificationQueueTest, MultiConsumer) {
526 TEST(NotificationQueueTest, MaxQueueSize) {
531 TEST(NotificationQueueTest, MaxReadAtOnce) {
536 TEST(NotificationQueueTest, DestroyCallback) {
538 qt.destroyCallback();
541 TEST(NotificationQueueTest, SendOnePipe) {
542 QueueTest qt(0, IntQueue::FdType::PIPE);
546 TEST(NotificationQueueTest, PutMessagesPipe) {
547 QueueTest qt(0, IntQueue::FdType::PIPE);
551 TEST(NotificationQueueTest, MultiConsumerPipe) {
552 QueueTest qt(0, IntQueue::FdType::PIPE);
556 TEST(NotificationQueueTest, MaxQueueSizePipe) {
557 QueueTest qt(5, IntQueue::FdType::PIPE);
561 TEST(NotificationQueueTest, MaxReadAtOncePipe) {
562 QueueTest qt(0, IntQueue::FdType::PIPE);
566 TEST(NotificationQueueTest, DestroyCallbackPipe) {
567 QueueTest qt(0, IntQueue::FdType::PIPE);
568 qt.destroyCallback();
572 * Test code that creates a TNotificationQueue, then forks, and incorrectly
573 * tries to send a message to the queue from the child process.
575 * The child process should crash in this scenario, since the child code has a
576 * bug. (Older versions of TNotificationQueue didn't catch this in the child,
577 * resulting in a crash in the parent process.)
579 TEST(NotificationQueueTest, UseAfterFork) {
582 QueueConsumer consumer;
584 // Boost sets a custom SIGCHLD handler, which fails the test if a child
585 // process exits abnormally. We don't want this.
586 signal(SIGCHLD, SIG_DFL);
588 // Log some info so users reading the test output aren't confused
589 // by the child process' crash log messages.
590 LOG(INFO) << "This test makes sure the child process crashes. "
591 << "Error log messagges and a backtrace are expected.";
594 // Start a separate thread consuming from the queue
595 ScopedEventBaseThread t1;
596 t1.getEventBase()->runInEventBaseThread([&] {
597 consumer.startConsuming(t1.getEventBase(), &queue);
600 // Send a message to it, just for sanity checking
601 queue.putMessage(1234);
606 // The boost test framework installs signal handlers to catch errors.
607 // We only want to catch in the parent. In the child let SIGABRT crash
609 signal(SIGABRT, SIG_DFL);
612 // We're horrible people, so we try to send a message to the queue
613 // that is being consumed in the parent process.
615 // The putMessage() call should catch this error, and crash our process.
616 queue.putMessage(9876);
617 // We shouldn't reach here.
621 // Parent. Wait for the child to exit.
622 auto waited = waitpid(pid, &childStatus, 0);
623 EXPECT_EQ(pid, waited);
625 // Send another message to the queue before we terminate the thread.
626 queue.putMessage(5678);
629 // The child process should have crashed when it tried to call putMessage()
630 // on our TNotificationQueue.
631 EXPECT_TRUE(WIFSIGNALED(childStatus));
632 EXPECT_EQ(SIGABRT, WTERMSIG(childStatus));
634 // Make sure the parent saw the expected messages.
635 // It should have gotten 1234 and 5678 from the parent process, but not
636 // 9876 from the child.
637 EXPECT_EQ(2, consumer.messages.size());
638 EXPECT_EQ(1234, consumer.messages.front());
639 consumer.messages.pop_front();
640 EXPECT_EQ(5678, consumer.messages.front());
641 consumer.messages.pop_front();