Revise API to load cert/key in SSLContext.
[folly.git] / folly / io / async / test / EventBaseTest.cpp
index 0c7b77f0b3497a3b553d481ca0d9ae1d9c82b638..25b3192c1280751b497b8138b13d9ebadb58be58 100644 (file)
@@ -1,20 +1,17 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Copyright 2004-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 #include <folly/Memory.h>
@@ -47,6 +44,8 @@ using std::chrono::milliseconds;
 using std::chrono::microseconds;
 using std::chrono::duration_cast;
 
+using namespace std::chrono_literals;
+
 using namespace folly;
 
 ///////////////////////////////////////////////////////////////////////////
@@ -1121,6 +1120,13 @@ TEST(EventBaseTest, RunInThread) {
   RunInThreadData data(numThreads, opsPerThread);
 
   deque<std::thread> threads;
+  SCOPE_EXIT {
+    // Wait on all of the threads.
+    for (auto& thread : threads) {
+      thread.join();
+    }
+  };
+
   for (uint32_t i = 0; i < numThreads; ++i) {
     threads.emplace_back([i, &data] {
         for (int n = 0; n < data.opsPerThread; ++n) {
@@ -1169,11 +1175,6 @@ TEST(EventBaseTest, RunInThread) {
   for (uint32_t n = 0; n < numThreads; ++n) {
     ASSERT_EQ(expectedValues[n], opsPerThread);
   }
-
-  // Wait on all of the threads.
-  for (auto& thread: threads) {
-    thread.join();
-  }
 }
 
 //  This test simulates some calls, and verifies that the waiting happens by
@@ -1184,26 +1185,25 @@ TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
   vector<unique_ptr<atomic<size_t>>> atoms(c);
   for (size_t i = 0; i < c; ++i) {
     auto& atom = atoms.at(i);
-    atom = make_unique<atomic<size_t>>(0);
+    atom = std::make_unique<atomic<size_t>>(0);
   }
-  vector<thread> threads(c);
+  vector<thread> threads;
   for (size_t i = 0; i < c; ++i) {
-    auto& atom = *atoms.at(i);
-    auto& th = threads.at(i);
-    th = thread([&atom] {
-        EventBase eb;
-        auto ebth = thread([&]{ eb.loopForever(); });
-        eb.waitUntilRunning();
-        eb.runInEventBaseThreadAndWait([&] {
-          size_t x = 0;
-          atom.compare_exchange_weak(
-              x, 1, std::memory_order_release, std::memory_order_relaxed);
-        });
+    threads.emplace_back([&atoms, i] {
+      EventBase eb;
+      auto& atom = *atoms.at(i);
+      auto ebth = thread([&] { eb.loopForever(); });
+      eb.waitUntilRunning();
+      eb.runInEventBaseThreadAndWait([&] {
         size_t x = 0;
         atom.compare_exchange_weak(
-            x, 2, std::memory_order_release, std::memory_order_relaxed);
-        eb.terminateLoopSoon();
-        ebth.join();
+            x, 1, std::memory_order_release, std::memory_order_relaxed);
+      });
+      size_t x = 0;
+      atom.compare_exchange_weak(
+          x, 2, std::memory_order_release, std::memory_order_relaxed);
+      eb.terminateLoopSoon();
+      ebth.join();
     });
   }
   for (size_t i = 0; i < c; ++i) {
@@ -1211,7 +1211,9 @@ TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
     th.join();
   }
   size_t sum = 0;
-  for (auto& atom : atoms) sum += *atom;
+  for (auto& atom : atoms) {
+    sum += *atom;
+  }
   EXPECT_EQ(c, sum);
 }
 
@@ -1347,6 +1349,21 @@ TEST(EventBaseTest, RunInLoopStopLoop) {
   ASSERT_LE(c1.getCount(), 11);
 }
 
+TEST(EventBaseTest, messageAvailableException) {
+  auto deadManWalking = [] {
+    EventBase eventBase;
+    std::thread t([&] {
+      // Call this from another thread to force use of NotificationQueue in
+      // runInEventBaseThread
+      eventBase.runInEventBaseThread(
+          []() { throw std::runtime_error("boom"); });
+    });
+    t.join();
+    eventBase.loopForever();
+  };
+  EXPECT_DEATH(deadManWalking(), ".*");
+}
+
 TEST(EventBaseTest, TryRunningAfterTerminate) {
   EventBase eventBase;
   CountedLoopCallback c1(&eventBase, 1,
@@ -1562,7 +1579,7 @@ class IdleTimeTimeoutSeries : public AsyncTimeout {
  */
 TEST(EventBaseTest, IdleTime) {
   EventBase eventBase;
-  eventBase.setLoadAvgMsec(1000);
+  eventBase.setLoadAvgMsec(1000ms);
   eventBase.resetLoadAvg(5900.0);
   std::deque<uint64_t> timeouts0(4, 8080);
   timeouts0.push_front(8000);
@@ -1575,30 +1592,25 @@ TEST(EventBaseTest, IdleTime) {
   bool hostOverloaded = false;
 
   int latencyCallbacks = 0;
-  eventBase.setMaxLatency(6000, [&]() {
+  eventBase.setMaxLatency(6000us, [&]() {
     ++latencyCallbacks;
-
-    switch (latencyCallbacks) {
-    case 1:
-      if (tos0.getTimeouts() < 6) {
-        // This could only happen if the host this test is running
-        // on is heavily loaded.
-        int64_t maxLatencyReached = duration_cast<microseconds>(
-            std::chrono::steady_clock::now().time_since_epoch()).count();
-        ASSERT_LE(43800, maxLatencyReached - testStart);
-        hostOverloaded = true;
-        break;
-      }
-      ASSERT_EQ(6, tos0.getTimeouts());
-      ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
-      ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
-      tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
-      break;
-
-    default:
+    if (latencyCallbacks != 1) {
       FAIL() << "Unexpected latency callback";
-      break;
     }
+
+    if (tos0.getTimeouts() < 6) {
+      // This could only happen if the host this test is running
+      // on is heavily loaded.
+      int64_t maxLatencyReached = duration_cast<microseconds>(
+          std::chrono::steady_clock::now().time_since_epoch()).count();
+      ASSERT_LE(43800, maxLatencyReached - testStart);
+      hostOverloaded = true;
+      return;
+    }
+    ASSERT_EQ(6, tos0.getTimeouts());
+    ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
+    ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
+    tos = std::make_unique<IdleTimeTimeoutSeries>(&eventBase, timeouts);
   });
 
   // Kick things off with an "immedite" timeout
@@ -1652,7 +1664,7 @@ TEST(EventBaseTest, EventBaseThreadLoop) {
   });
   base.loop();
 
-  ASSERT_EQ(true, ran);
+  ASSERT_TRUE(ran);
 }
 
 TEST(EventBaseTest, EventBaseThreadName) {
@@ -1691,7 +1703,7 @@ TEST(EventBaseTest, RunBeforeLoopWait) {
 }
 
 class PipeHandler : public EventHandler {
-public:
+ public:
   PipeHandler(EventBase* eventBase, int fd)
     : EventHandler(eventBase, fd) {}
 
@@ -1738,7 +1750,7 @@ TEST(EventBaseTest, LoopKeepAlive) {
   EventBase evb;
 
   bool done = false;
-  std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
+  std::thread t([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
     /* sleep override */ std::this_thread::sleep_for(
         std::chrono::milliseconds(100));
     evb.runInEventBaseThread(
@@ -1759,7 +1771,7 @@ TEST(EventBaseTest, LoopKeepAliveInLoop) {
   std::thread t;
 
   evb.runInEventBaseThread([&] {
-    t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
+    t = std::thread([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
       /* sleep override */ std::this_thread::sleep_for(
           std::chrono::milliseconds(100));
       evb.runInEventBaseThread(
@@ -1775,7 +1787,7 @@ TEST(EventBaseTest, LoopKeepAliveInLoop) {
 }
 
 TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
-  std::unique_ptr<EventBase> evb = folly::make_unique<EventBase>();
+  std::unique_ptr<EventBase> evb = std::make_unique<EventBase>();
 
   bool done = false;
 
@@ -1787,15 +1799,15 @@ TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
 
   {
     auto* ev = evb.get();
-    EventBase::LoopKeepAlive keepAlive;
+    Executor::KeepAlive keepAlive;
     ev->runInEventBaseThreadAndWait(
-        [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); });
+        [&ev, &keepAlive] { keepAlive = ev->getKeepAliveToken(); });
     ASSERT_FALSE(done) << "Loop finished before we asked it to";
     ev->terminateLoopSoon();
     /* sleep override */
     std::this_thread::sleep_for(std::chrono::milliseconds(30));
     ASSERT_FALSE(done) << "Loop terminated early";
-    ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{});
+    ev->runInEventBaseThread([keepAlive = std::move(keepAlive)]{});
   }
 
   evThread.join();
@@ -1803,13 +1815,13 @@ TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
 }
 
 TEST(EventBaseTest, LoopKeepAliveShutdown) {
-  auto evb = folly::make_unique<EventBase>();
+  auto evb = std::make_unique<EventBase>();
 
   bool done = false;
 
   std::thread t([
     &done,
-    loopKeepAlive = evb->loopKeepAlive(),
+    loopKeepAlive = evb->getKeepAliveToken(),
     evbPtr = evb.get()
   ]() mutable {
     /* sleep override */ std::this_thread::sleep_for(
@@ -1825,6 +1837,51 @@ TEST(EventBaseTest, LoopKeepAliveShutdown) {
   t.join();
 }
 
+TEST(EventBaseTest, LoopKeepAliveAtomic) {
+  auto evb = std::make_unique<EventBase>();
+
+  static constexpr size_t kNumThreads = 100;
+  static constexpr size_t kNumTasks = 100;
+
+  std::vector<std::thread> ts;
+  std::vector<std::unique_ptr<Baton<>>> batons;
+  size_t done{0};
+
+  for (size_t i = 0; i < kNumThreads; ++i) {
+    batons.emplace_back(std::make_unique<Baton<>>());
+  }
+
+  for (size_t i = 0; i < kNumThreads; ++i) {
+    ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
+      std::vector<Executor::KeepAlive> keepAlives;
+      for (size_t j = 0; j < kNumTasks; ++j) {
+        keepAlives.emplace_back(evbPtr->getKeepAliveToken());
+      }
+
+      batonPtr->post();
+
+      /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+      for (auto& keepAlive : keepAlives) {
+        evbPtr->runInEventBaseThread(
+            [&done, keepAlive = std::move(keepAlive) ]() { ++done; });
+      }
+    });
+  }
+
+  for (auto& baton : batons) {
+    baton->wait();
+  }
+
+  evb.reset();
+
+  EXPECT_EQ(kNumThreads * kNumTasks, done);
+
+  for (auto& t : ts) {
+    t.join();
+  }
+}
+
 TEST(EventBaseTest, DrivableExecutorTest) {
   folly::Promise<bool> p;
   auto f = p.getFuture();
@@ -1853,3 +1910,19 @@ TEST(EventBaseTest, DrivableExecutorTest) {
 
   t.join();
 }
+
+TEST(EventBaseTest, RequestContextTest) {
+  EventBase evb;
+  auto defaultCtx = RequestContext::get();
+
+  {
+    RequestContextScopeGuard rctx;
+    auto context = RequestContext::get();
+    EXPECT_NE(defaultCtx, context);
+    evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
+  }
+
+  EXPECT_EQ(defaultCtx, RequestContext::get());
+  evb.loop();
+  EXPECT_EQ(defaultCtx, RequestContext::get());
+}