user-defined expirations
[folly.git] / folly / experimental / wangle / concurrent / test / ThreadPoolExecutorTest.cpp
index eb8527ca0ab5637758b88cb1493fac88830e3374..8b972773c07d04c1f5a07f2e23b73220d7aefd6a 100644 (file)
 #include <gtest/gtest.h>
 
 using namespace folly::wangle;
+using namespace std::chrono;
 
 static Func burnMs(uint64_t ms) {
-  return [ms]() { std::this_thread::sleep_for(std::chrono::milliseconds(ms)); };
+  return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
 }
 
 template <class TPE>
@@ -176,11 +177,11 @@ static void taskStats() {
       [&] (ThreadPoolExecutor::TaskStats stats) {
         int i = c++;
         if (i < 10) {
-          EXPECT_GE(10000, stats.waitTime.count());
-          EXPECT_LE(20000, stats.runTime.count());
+          EXPECT_GE(milliseconds(10), stats.waitTime);
+          EXPECT_LE(milliseconds(20), stats.runTime);
         } else {
-          EXPECT_LE(10000, stats.waitTime.count());
-          EXPECT_LE(10000, stats.runTime.count());
+          EXPECT_LE(milliseconds(10), stats.waitTime);
+          EXPECT_LE(milliseconds(10), stats.runTime);
         }
       }));
   for (int i = 0; i < 10; i++) {
@@ -200,3 +201,35 @@ TEST(ThreadPoolExecutorTest, CPUTaskStats) {
 TEST(ThreadPoolExecutorTest, IOTaskStats) {
   taskStats<IOThreadPoolExecutor>();
 }
+
+template <class TPE>
+static void expiration() {
+  TPE tpe(1);
+  std::atomic<int> statCbCount(0);
+  tpe.subscribeToTaskStats(Observer<ThreadPoolExecutor::TaskStats>::create(
+      [&] (ThreadPoolExecutor::TaskStats stats) {
+        int i = statCbCount++;
+        if (i == 0) {
+          EXPECT_FALSE(stats.expired);
+        } else if (i == 1) {
+          EXPECT_TRUE(stats.expired);
+        } else {
+          FAIL();
+        }
+      }));
+  std::atomic<int> expireCbCount(0);
+  auto expireCb = [&] () { expireCbCount++; };
+  tpe.add(burnMs(10), milliseconds(10), expireCb);
+  tpe.add(burnMs(10), milliseconds(10), expireCb);
+  tpe.join();
+  EXPECT_EQ(2, statCbCount);
+  EXPECT_EQ(1, expireCbCount);
+}
+
+TEST(ThreadPoolExecutorTest, CPUExpiration) {
+  expiration<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOExpiration) {
+  expiration<IOThreadPoolExecutor>();
+}