Update providedCiphersStr_ in one place.
[folly.git] / folly / io / async / test / NotificationQueueTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/NotificationQueue.h>
18 #include <folly/io/async/ScopedEventBaseThread.h>
19
20 #include <folly/Baton.h>
21 #include <folly/portability/GTest.h>
22
23 #include <list>
24 #include <iostream>
25 #include <thread>
26 #include <sys/types.h>
27
28 #ifndef _WIN32
29 #include <sys/wait.h>
30 #endif
31
32 using namespace std;
33 using namespace folly;
34
35 typedef NotificationQueue<int> IntQueue;
36
37 class QueueConsumer : public IntQueue::Consumer {
38  public:
39   QueueConsumer() {}
40
41   void messageAvailable(int&& value) noexcept override {
42     messages.push_back(value);
43     if (fn) {
44       fn(value);
45     }
46   }
47
48   std::function<void(int)> fn;
49   std::deque<int> messages;
50 };
51
52 class QueueTest {
53  public:
54   explicit QueueTest(uint32_t maxSize, IntQueue::FdType type)
55       : queue(maxSize, type), terminationQueue(maxSize, type) {}
56
57   void sendOne();
58   void putMessages();
59   void multiConsumer();
60   void maxQueueSize();
61   void maxReadAtOnce();
62   void destroyCallback();
63   void useAfterFork();
64
65   IntQueue queue;
66   IntQueue terminationQueue;
67
68 };
69
70 void QueueTest::sendOne() {
71   // Create a notification queue and a callback in this thread
72   EventBase eventBase;
73
74   QueueConsumer consumer;
75   consumer.fn = [&](int) {
76     // Stop consuming after we receive 1 message
77     consumer.stopConsuming();
78   };
79   consumer.startConsuming(&eventBase, &queue);
80
81   // Start a new EventBase thread to put a message on our queue
82   ScopedEventBaseThread t1;
83   t1.getEventBase()->runInEventBaseThread([&] {
84     this->queue.putMessage(5);
85   });
86
87   // Loop until we receive the message
88   eventBase.loop();
89
90   const auto& messages = consumer.messages;
91   EXPECT_EQ(1, messages.size());
92   EXPECT_EQ(5, messages.at(0));
93 }
94
95 void QueueTest::putMessages() {
96   EventBase eventBase;
97
98   QueueConsumer consumer;
99   QueueConsumer consumer2;
100   consumer.fn = [&](int msg) {
101     // Stop consuming after we receive a message with value 0, and start
102     // consumer2
103     if (msg == 0) {
104       consumer.stopConsuming();
105       consumer2.startConsuming(&eventBase, &queue);
106     }
107   };
108   consumer2.fn = [&](int msg) {
109     // Stop consuming after we receive a message with value 0
110     if (msg == 0) {
111       consumer2.stopConsuming();
112     }
113   };
114   consumer.startConsuming(&eventBase, &queue);
115
116   list<int> msgList = { 1, 2, 3, 4 };
117   vector<int> msgVector = { 5, 0, 9, 8, 7, 6, 7, 7,
118                             8, 8, 2, 9, 6, 6, 10, 2, 0 };
119   // Call putMessages() several times to add messages to the queue
120   queue.putMessages(msgList.begin(), msgList.end());
121   queue.putMessages(msgVector.begin() + 2, msgVector.begin() + 4);
122   // Test sending 17 messages, the pipe-based queue calls write in 16 byte
123   // chunks
124   queue.putMessages(msgVector.begin(), msgVector.end());
125
126   // Loop until the consumer has stopped
127   eventBase.loop();
128
129   vector<int> expectedMessages = { 1, 2, 3, 4, 9, 8, 7, 5, 0 };
130   vector<int> expectedMessages2 = { 9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 10, 2, 0 };
131   EXPECT_EQ(expectedMessages.size(), consumer.messages.size());
132   for (unsigned int idx = 0; idx < expectedMessages.size(); ++idx) {
133     EXPECT_EQ(expectedMessages[idx], consumer.messages.at(idx));
134   }
135   EXPECT_EQ(expectedMessages2.size(), consumer2.messages.size());
136   for (unsigned int idx = 0; idx < expectedMessages2.size(); ++idx) {
137     EXPECT_EQ(expectedMessages2[idx], consumer2.messages.at(idx));
138   }
139 }
140
141 void QueueTest::multiConsumer() {
142   uint32_t numConsumers = 8;
143   uint32_t numMessages = 10000;
144
145   // Create several consumers each running in their own EventBase thread
146   vector<QueueConsumer> consumers(numConsumers);
147   vector<ScopedEventBaseThread> threads(numConsumers);
148
149   for (uint32_t consumerIdx = 0; consumerIdx < numConsumers; ++consumerIdx) {
150     QueueConsumer* consumer = &consumers[consumerIdx];
151
152     consumer->fn = [consumer, consumerIdx, this](int value) {
153       // Treat 0 as a signal to stop.
154       if (value == 0) {
155         consumer->stopConsuming();
156         // Put a message on the terminationQueue to indicate we have stopped
157         terminationQueue.putMessage(consumerIdx);
158       }
159     };
160
161     EventBase* eventBase = threads[consumerIdx].getEventBase();
162     eventBase->runInEventBaseThread([eventBase, consumer, this] {
163       consumer->startConsuming(eventBase, &queue);
164     });
165   }
166
167   // Now add a number of messages from this thread
168   // Start at 1 rather than 0, since 0 is the signal to stop.
169   for (uint32_t n = 1; n < numMessages; ++n) {
170     queue.putMessage(n);
171   }
172   // Now add a 0 for each consumer, to signal them to stop
173   for (uint32_t n = 0; n < numConsumers; ++n) {
174     queue.putMessage(0);
175   }
176
177   // Wait until we get notified that all of the consumers have stopped
178   // We use a separate notification queue for this.
179   QueueConsumer terminationConsumer;
180   vector<uint32_t> consumersStopped(numConsumers, 0);
181   uint32_t consumersRemaining = numConsumers;
182   terminationConsumer.fn = [&](int consumerIdx) {
183     --consumersRemaining;
184     if (consumersRemaining == 0) {
185       terminationConsumer.stopConsuming();
186     }
187
188     EXPECT_GE(consumerIdx, 0);
189     EXPECT_LT(consumerIdx, numConsumers);
190     ++consumersStopped[consumerIdx];
191   };
192   EventBase eventBase;
193   terminationConsumer.startConsuming(&eventBase, &terminationQueue);
194   eventBase.loop();
195
196   // Verify that we saw exactly 1 stop message for each consumer
197   for (uint32_t n = 0; n < numConsumers; ++n) {
198     EXPECT_EQ(1, consumersStopped[n]);
199   }
200
201   // Validate that every message sent to the main queue was received exactly
202   // once.
203   vector<int> messageCount(numMessages, 0);
204   for (uint32_t n = 0; n < numConsumers; ++n) {
205     for (int msg : consumers[n].messages) {
206       EXPECT_GE(msg, 0);
207       EXPECT_LT(msg, numMessages);
208       ++messageCount[msg];
209     }
210   }
211
212   // 0 is the signal to stop, and should have been received once by each
213   // consumer
214   EXPECT_EQ(numConsumers, messageCount[0]);
215   // All other messages should have been received exactly once
216   for (uint32_t n = 1; n < numMessages; ++n) {
217     EXPECT_EQ(1, messageCount[n]);
218   }
219 }
220
221 void QueueTest::maxQueueSize() {
222   // Create a queue with a maximum size of 5, and fill it up
223
224   for (int n = 0; n < 5; ++n) {
225     queue.tryPutMessage(n);
226   }
227
228   // Calling tryPutMessage() now should fail
229   EXPECT_THROW(queue.tryPutMessage(5), std::overflow_error);
230
231   EXPECT_FALSE(queue.tryPutMessageNoThrow(5));
232   int val = 5;
233   EXPECT_FALSE(queue.tryPutMessageNoThrow(std::move(val)));
234
235   // Pop a message from the queue
236   int result = -1;
237   EXPECT_TRUE(queue.tryConsume(result));
238   EXPECT_EQ(0, result);
239
240   // We should be able to write another message now that we popped one off.
241   queue.tryPutMessage(5);
242   // But now we are full again.
243   EXPECT_THROW(queue.tryPutMessage(6), std::overflow_error);
244   // putMessage() should let us exceed the maximum
245   queue.putMessage(6);
246
247   // Pull another mesage off
248   EXPECT_TRUE(queue.tryConsume(result));
249   EXPECT_EQ(1, result);
250
251   // tryPutMessage() should still fail since putMessage() actually put us over
252   // the max.
253   EXPECT_THROW(queue.tryPutMessage(7), std::overflow_error);
254
255   // Pull another message off and try again
256   EXPECT_TRUE(queue.tryConsume(result));
257   EXPECT_EQ(2, result);
258   queue.tryPutMessage(7);
259
260   // Now pull all the remaining messages off
261   EXPECT_TRUE(queue.tryConsume(result));
262   EXPECT_EQ(3, result);
263   EXPECT_TRUE(queue.tryConsume(result));
264   EXPECT_EQ(4, result);
265   EXPECT_TRUE(queue.tryConsume(result));
266   EXPECT_EQ(5, result);
267   EXPECT_TRUE(queue.tryConsume(result));
268   EXPECT_EQ(6, result);
269   EXPECT_TRUE(queue.tryConsume(result));
270   EXPECT_EQ(7, result);
271
272   // There should be no messages left
273   result = -1;
274   EXPECT_TRUE(!queue.tryConsume(result));
275   EXPECT_EQ(-1, result);
276 }
277
278
279 void QueueTest::maxReadAtOnce() {
280   // Add 100 messages to the queue
281   for (int n = 0; n < 100; ++n) {
282     queue.putMessage(n);
283   }
284
285   EventBase eventBase;
286
287   // Record how many messages were processed each loop iteration.
288   uint32_t messagesThisLoop = 0;
289   std::vector<uint32_t> messagesPerLoop;
290   std::function<void()> loopFinished = [&] {
291     // Record the current number of messages read this loop
292     messagesPerLoop.push_back(messagesThisLoop);
293     // Reset messagesThisLoop to 0 for the next loop
294     messagesThisLoop = 0;
295
296     // To prevent use-after-free bugs when eventBase destructs,
297     // prevent calling runInLoop any more after the test is finished.
298     // 55 == number of times loop should run.
299     if (messagesPerLoop.size() != 55) {
300       // Reschedule ourself to run at the end of the next loop
301       eventBase.runInLoop(loopFinished);
302     }
303   };
304   // Schedule the first call to loopFinished
305   eventBase.runInLoop(loopFinished);
306
307   QueueConsumer consumer;
308   // Read the first 50 messages 10 at a time.
309   consumer.setMaxReadAtOnce(10);
310   consumer.fn = [&](int value) {
311     ++messagesThisLoop;
312     // After 50 messages, drop to reading only 1 message at a time.
313     if (value == 50) {
314       consumer.setMaxReadAtOnce(1);
315     }
316     // Terminate the loop when we reach the end of the messages.
317     if (value == 99) {
318       eventBase.terminateLoopSoon();
319     }
320   };
321   consumer.startConsuming(&eventBase, &queue);
322
323   // Run the event loop until the consumer terminates it
324   eventBase.loop();
325
326   // The consumer should have read all 100 messages in order
327   EXPECT_EQ(100, consumer.messages.size());
328   for (int n = 0; n < 100; ++n) {
329     EXPECT_EQ(n, consumer.messages.at(n));
330   }
331
332   // Currently EventBase happens to still run the loop callbacks even after
333   // terminateLoopSoon() is called.  However, we don't really want to depend on
334   // this behavior.  In case this ever changes in the future, add
335   // messagesThisLoop to messagesPerLoop in loop callback isn't invoked for the
336   // last loop iteration.
337   if (messagesThisLoop > 0) {
338     messagesPerLoop.push_back(messagesThisLoop);
339     messagesThisLoop = 0;
340   }
341
342   // For the first 5 loops it should have read 10 messages each time.
343   // After that it should have read 1 messages per loop for the next 50 loops.
344   EXPECT_EQ(55, messagesPerLoop.size());
345   for (int n = 0; n < 5; ++n) {
346     EXPECT_EQ(10, messagesPerLoop.at(n));
347   }
348   for (int n = 5; n < 55; ++n) {
349     EXPECT_EQ(1, messagesPerLoop.at(n));
350   }
351 }
352
353
354 void QueueTest::destroyCallback() {
355   // Rather than using QueueConsumer, define a separate class for the destroy
356   // test.  The DestroyTestConsumer will delete itself inside the
357   // messageAvailable() callback.  With a regular QueueConsumer this would
358   // destroy the std::function object while the function is running, which we
359   // should probably avoid doing.  This uses a pointer to a std::function to
360   // avoid destroying the function object.
361   class DestroyTestConsumer : public IntQueue::Consumer {
362    public:
363     void messageAvailable(int&& value) noexcept override {
364       DestructorGuard g(this);
365       if (fn && *fn) {
366         (*fn)(value);
367       }
368     }
369
370     std::function<void(int)> *fn;
371    protected:
372     ~DestroyTestConsumer() override = default;
373   };
374
375   EventBase eventBase;
376   // Create a queue and add 2 messages to it
377   queue.putMessage(1);
378   queue.putMessage(2);
379
380   // Create two QueueConsumers allocated on the heap.
381   // Have whichever one gets called first destroy both of the QueueConsumers.
382   // This way one consumer will be destroyed from inside its messageAvailable()
383   // callback, and one consume will be destroyed when it isn't inside
384   // messageAvailable().
385   std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
386     consumer1(new DestroyTestConsumer);
387   std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
388     consumer2(new DestroyTestConsumer);
389   std::function<void(int)> fn = [&](int) {
390     consumer1 = nullptr;
391     consumer2 = nullptr;
392   };
393   consumer1->fn = &fn;
394   consumer2->fn = &fn;
395
396   consumer1->startConsuming(&eventBase, &queue);
397   consumer2->startConsuming(&eventBase, &queue);
398
399   // Run the event loop.
400   eventBase.loop();
401
402   // One of the consumers should have fired, received the message,
403   // then destroyed both consumers.
404   EXPECT_TRUE(!consumer1);
405   EXPECT_TRUE(!consumer2);
406   // One message should be left in the queue
407   int result = 1;
408   EXPECT_TRUE(queue.tryConsume(result));
409   EXPECT_EQ(2, result);
410 }
411
412 TEST(NotificationQueueTest, ConsumeUntilDrained) {
413   // Basic tests: make sure we
414   // - drain all the messages
415   // - ignore any maxReadAtOnce
416   // - can't add messages during draining
417   EventBase eventBase;
418   IntQueue queue;
419   QueueConsumer consumer;
420   consumer.fn = [&](int i) {
421     EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error);
422     EXPECT_FALSE(queue.tryPutMessageNoThrow(i));
423     EXPECT_THROW(queue.putMessage(i), std::runtime_error);
424     std::vector<int> ints{1, 2, 3};
425     EXPECT_THROW(
426         queue.putMessages(ints.begin(), ints.end()),
427         std::runtime_error);
428   };
429   consumer.setMaxReadAtOnce(10); // We should ignore this
430   consumer.startConsuming(&eventBase, &queue);
431   for (int i = 0; i < 20; i++) {
432     queue.putMessage(i);
433   }
434   EXPECT_TRUE(consumer.consumeUntilDrained());
435   EXPECT_EQ(20, consumer.messages.size());
436
437   // Make sure there can only be one drainer at once
438   folly::Baton<> callbackBaton, threadStartBaton;
439   consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
440   QueueConsumer competingConsumer;
441   competingConsumer.startConsuming(&eventBase, &queue);
442   queue.putMessage(1);
443   atomic<bool> raceA {false};
444   atomic<bool> raceB {false};
445   size_t numConsA = 0;
446   size_t numConsB = 0;
447   auto thread = std::thread([&]{
448     threadStartBaton.post();
449     raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
450   });
451   threadStartBaton.wait();
452   raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
453   callbackBaton.post();
454   thread.join();
455   EXPECT_FALSE(raceA && raceB);
456   EXPECT_TRUE(raceA || raceB);
457   EXPECT_TRUE(raceA ^ raceB);
458 }
459
460 TEST(NotificationQueueTest, ConsumeUntilDrainedStress) {
461   for (size_t i = 0; i < 1 << 8; ++i) {
462     // Basic tests: make sure we
463     // - drain all the messages
464     // - ignore any maxReadAtOnce
465     // - can't add messages during draining
466     EventBase eventBase;
467     IntQueue queue;
468     QueueConsumer consumer;
469     consumer.fn = [&](int j) {
470       EXPECT_THROW(queue.tryPutMessage(j), std::runtime_error);
471       EXPECT_FALSE(queue.tryPutMessageNoThrow(j));
472       EXPECT_THROW(queue.putMessage(j), std::runtime_error);
473       std::vector<int> ints{1, 2, 3};
474       EXPECT_THROW(
475           queue.putMessages(ints.begin(), ints.end()),
476           std::runtime_error);
477     };
478     consumer.setMaxReadAtOnce(10); // We should ignore this
479     consumer.startConsuming(&eventBase, &queue);
480     for (int j = 0; j < 20; j++) {
481       queue.putMessage(j);
482     }
483     EXPECT_TRUE(consumer.consumeUntilDrained());
484     EXPECT_EQ(20, consumer.messages.size());
485
486     // Make sure there can only be one drainer at once
487     folly::Baton<> callbackBaton, threadStartBaton;
488     consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
489     QueueConsumer competingConsumer;
490     competingConsumer.startConsuming(&eventBase, &queue);
491     queue.putMessage(1);
492     atomic<bool> raceA {false};
493     atomic<bool> raceB {false};
494     size_t numConsA = 0;
495     size_t numConsB = 0;
496     auto thread = std::thread([&]{
497       threadStartBaton.post();
498       raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
499     });
500     threadStartBaton.wait();
501     raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
502     callbackBaton.post();
503     thread.join();
504     EXPECT_FALSE(raceA && raceB);
505     EXPECT_TRUE(raceA || raceB);
506     EXPECT_TRUE(raceA ^ raceB);
507   }
508 }
509
510 #ifdef FOLLY_HAVE_EVENTFD
511 TEST(NotificationQueueTest, SendOneEventFD) {
512   QueueTest qt(0, IntQueue::FdType::EVENTFD);
513   qt.sendOne();
514 }
515
516 TEST(NotificationQueueTest, PutMessagesEventFD) {
517   QueueTest qt(0, IntQueue::FdType::EVENTFD);
518   qt.sendOne();
519 }
520
521 TEST(NotificationQueueTest, MultiConsumerEventFD) {
522   QueueTest qt(0, IntQueue::FdType::EVENTFD);
523   qt.multiConsumer();
524 }
525
526 TEST(NotificationQueueTest, MaxQueueSizeEventFD) {
527   QueueTest qt(5, IntQueue::FdType::EVENTFD);
528   qt.maxQueueSize();
529 }
530
531 TEST(NotificationQueueTest, MaxReadAtOnceEventFD) {
532   QueueTest qt(0, IntQueue::FdType::EVENTFD);
533   qt.maxReadAtOnce();
534 }
535
536 TEST(NotificationQueueTest, DestroyCallbackEventFD) {
537   QueueTest qt(0, IntQueue::FdType::EVENTFD);
538   qt.destroyCallback();
539 }
540 #endif
541
542 TEST(NotificationQueueTest, SendOnePipe) {
543   QueueTest qt(0, IntQueue::FdType::PIPE);
544   qt.sendOne();
545 }
546
547 TEST(NotificationQueueTest, PutMessagesPipe) {
548   QueueTest qt(0, IntQueue::FdType::PIPE);
549   qt.sendOne();
550 }
551
552 TEST(NotificationQueueTest, MultiConsumerPipe) {
553   QueueTest qt(0, IntQueue::FdType::PIPE);
554   qt.multiConsumer();
555 }
556
557 TEST(NotificationQueueTest, MaxQueueSizePipe) {
558   QueueTest qt(5, IntQueue::FdType::PIPE);
559   qt.maxQueueSize();
560 }
561
562 TEST(NotificationQueueTest, MaxReadAtOncePipe) {
563   QueueTest qt(0, IntQueue::FdType::PIPE);
564   qt.maxReadAtOnce();
565 }
566
567 TEST(NotificationQueueTest, DestroyCallbackPipe) {
568   QueueTest qt(0, IntQueue::FdType::PIPE);
569   qt.destroyCallback();
570 }
571
572 #ifndef _WIN32
573 /*
574  * Test code that creates a NotificationQueue, then forks, and incorrectly
575  * tries to send a message to the queue from the child process.
576  *
577  * The child process should crash in this scenario, since the child code has a
578  * bug.  (Older versions of NotificationQueue didn't catch this in the child,
579  * resulting in a crash in the parent process.)
580  */
581 TEST(NotificationQueueTest, UseAfterFork) {
582   IntQueue queue;
583   int childStatus = 0;
584   QueueConsumer consumer;
585
586   // Boost sets a custom SIGCHLD handler, which fails the test if a child
587   // process exits abnormally.  We don't want this.
588   signal(SIGCHLD, SIG_DFL);
589
590   // Log some info so users reading the test output aren't confused
591   // by the child process' crash log messages.
592   LOG(INFO) << "This test makes sure the child process crashes.  "
593     << "Error log messagges and a backtrace are expected.";
594
595   {
596     // Start a separate thread consuming from the queue
597     ScopedEventBaseThread t1;
598     t1.getEventBase()->runInEventBaseThread([&] {
599       consumer.startConsuming(t1.getEventBase(), &queue);
600     });
601
602     // Send a message to it, just for sanity checking
603     queue.putMessage(1234);
604
605     // Fork
606     pid_t pid = fork();
607     if (pid == 0) {
608       // The boost test framework installs signal handlers to catch errors.
609       // We only want to catch in the parent.  In the child let SIGABRT crash
610       // us normally.
611       signal(SIGABRT, SIG_DFL);
612
613       // Child.
614       // We're horrible people, so we try to send a message to the queue
615       // that is being consumed in the parent process.
616       //
617       // The putMessage() call should catch this error, and crash our process.
618       queue.putMessage(9876);
619       // We shouldn't reach here.
620       _exit(0);
621     }
622     PCHECK(pid > 0);
623
624     // Parent.  Wait for the child to exit.
625     auto waited = waitpid(pid, &childStatus, 0);
626     EXPECT_EQ(pid, waited);
627
628     // Send another message to the queue before we terminate the thread.
629     queue.putMessage(5678);
630   }
631
632   // The child process should have crashed when it tried to call putMessage()
633   // on our NotificationQueue.
634   EXPECT_TRUE(WIFSIGNALED(childStatus));
635   EXPECT_EQ(SIGABRT, WTERMSIG(childStatus));
636
637   // Make sure the parent saw the expected messages.
638   // It should have gotten 1234 and 5678 from the parent process, but not
639   // 9876 from the child.
640   EXPECT_EQ(2, consumer.messages.size());
641   EXPECT_EQ(1234, consumer.messages.front());
642   consumer.messages.pop_front();
643   EXPECT_EQ(5678, consumer.messages.front());
644   consumer.messages.pop_front();
645 }
646 #endif
647
648 TEST(NotificationQueueConsumer, make) {
649   int value = 0;
650   EventBase evb;
651   NotificationQueue<int> queue(32);
652
653   auto consumer = decltype(queue)::Consumer::make([&](
654       int&& msg) noexcept { value = msg; });
655
656   consumer->startConsuming(&evb, &queue);
657
658   int const newValue = 10;
659   queue.tryPutMessage(newValue);
660
661   evb.loopOnce();
662
663   EXPECT_EQ(newValue, value);
664 }