Fix copyright lines
[folly.git] / folly / experimental / observer / test / ObserverTest.cpp
index abfbc368491b53205713219e54c21b2ddbe67c6f..2a3b38bc2a79c7c8be8144bbf4032991e9b97c9d 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2016-present Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <gtest/gtest.h>
 
 #include <thread>
 
-#include <folly/Baton.h>
 #include <folly/experimental/observer/SimpleObservable.h>
+#include <folly/portability/GTest.h>
+#include <folly/synchronization/Baton.h>
 
 using namespace folly::observer;
 
@@ -38,7 +38,7 @@ TEST(Observer, Observable) {
 
   observable.setValue(24);
 
-  EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+  EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
 
   EXPECT_EQ(24, **observer);
 }
@@ -62,7 +62,7 @@ TEST(Observer, MakeObserver) {
 
   observable.setValue(24);
 
-  EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+  EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
 
   EXPECT_EQ(25, **observer);
 }
@@ -93,7 +93,7 @@ TEST(Observer, MakeObserverDiamond) {
 
   observable.setValue(24);
 
-  EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+  EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
 
   EXPECT_EQ(25 * 26, **observer);
 }
@@ -136,32 +136,90 @@ TEST(Observer, NullValue) {
   observable.setValue(2);
 
   // Waiting observer shouldn't be updated
-  EXPECT_FALSE(baton.timed_wait(std::chrono::seconds{1}));
+  EXPECT_FALSE(baton.try_wait_for(std::chrono::seconds{1}));
   baton.reset();
 
   EXPECT_EQ(82, **oddObserver);
 
   observable.setValue(23);
 
-  EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+  EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
 
   EXPECT_EQ(46, **oddObserver);
 }
 
+TEST(Observer, Cycle) {
+  SimpleObservable<int> observable(0);
+  auto observer = observable.getObserver();
+  folly::Optional<Observer<int>> observerB;
+
+  auto observerA = makeObserver([observer, &observerB]() {
+    auto value = **observer;
+    if (value == 1) {
+      **observerB;
+    }
+    return value;
+  });
+
+  observerB = makeObserver([observerA]() { return **observerA; });
+
+  auto collectObserver = makeObserver([observer, observerA, &observerB]() {
+    auto value = **observer;
+    auto valueA = **observerA;
+    auto valueB = ***observerB;
+
+    if (value == 1) {
+      if (valueA == 0) {
+        EXPECT_EQ(0, valueB);
+      } else {
+        EXPECT_EQ(1, valueA);
+        EXPECT_EQ(0, valueB);
+      }
+    } else if (value == 2) {
+      EXPECT_EQ(value, valueA);
+      EXPECT_TRUE(valueB == 0 || valueB == 2);
+    } else {
+      EXPECT_EQ(value, valueA);
+      EXPECT_EQ(value, valueB);
+    }
+
+    return value;
+  });
+
+  folly::Baton<> baton;
+  auto waitingObserver = makeObserver([collectObserver, &baton]() {
+    *collectObserver;
+    baton.post();
+    return folly::Unit();
+  });
+
+  baton.reset();
+  EXPECT_EQ(0, **collectObserver);
+
+  for (size_t i = 1; i <= 3; ++i) {
+    observable.setValue(i);
+
+    EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
+    baton.reset();
+
+    EXPECT_EQ(i, **collectObserver);
+  }
+}
+
 TEST(Observer, Stress) {
   SimpleObservable<int> observable(0);
 
-  folly::Synchronized<std::vector<int>> values;
+  auto values = std::make_shared<folly::Synchronized<std::vector<int>>>();
 
-  auto observer = makeObserver([ child = observable.getObserver(), &values ]() {
+  auto observer = makeObserver([ child = observable.getObserver(), values ]() {
     auto value = **child * 10;
-    values.withWLock(
+    values->withWLock(
         [&](std::vector<int>& values) { values.push_back(value); });
     return value;
   });
 
   EXPECT_EQ(0, **observer);
-  values.withRLock([](const std::vector<int>& values) {
+  values->withRLock([](const std::vector<int>& values) {
     EXPECT_EQ(1, values.size());
     EXPECT_EQ(0, values.back());
   });
@@ -176,10 +234,12 @@ TEST(Observer, Stress) {
     std::this_thread::yield();
   }
 
-  values.withRLock([numIters = numIters](const std::vector<int>& values) {
+  values->withRLock([numIters = numIters](const std::vector<int>& values) {
     EXPECT_EQ(numIters * 10, values.back());
     EXPECT_LT(values.size(), numIters / 2);
-    EXPECT_GT(values.size(), 10);
+
+    EXPECT_EQ(0, values[0]);
+    EXPECT_EQ(numIters * 10, values.back());
 
     for (auto value : values) {
       EXPECT_EQ(0, value % 10);
@@ -190,3 +250,74 @@ TEST(Observer, Stress) {
     }
   });
 }
+
+TEST(Observer, TLObserver) {
+  auto createTLObserver = [](int value) {
+    return folly::observer::makeTLObserver([=] { return value; });
+  };
+
+  auto k =
+      std::make_unique<folly::observer::TLObserver<int>>(createTLObserver(42));
+  EXPECT_EQ(42, ***k);
+  k = std::make_unique<folly::observer::TLObserver<int>>(createTLObserver(41));
+  EXPECT_EQ(41, ***k);
+}
+
+TEST(Observer, SubscribeCallback) {
+  static auto mainThreadId = std::this_thread::get_id();
+  static std::function<void()> updatesCob;
+  static bool slowGet = false;
+  static std::atomic<size_t> getCallsStart{0};
+  static std::atomic<size_t> getCallsFinish{0};
+
+  struct Observable {
+    ~Observable() {
+      EXPECT_EQ(mainThreadId, std::this_thread::get_id());
+    }
+  };
+  struct Traits {
+    using element_type = int;
+    static std::shared_ptr<const int> get(Observable&) {
+      ++getCallsStart;
+      if (slowGet) {
+        /* sleep override */ std::this_thread::sleep_for(
+            std::chrono::seconds{2});
+      }
+      ++getCallsFinish;
+      return std::make_shared<const int>(42);
+    }
+
+    static void subscribe(Observable&, std::function<void()> cob) {
+      updatesCob = std::move(cob);
+    }
+
+    static void unsubscribe(Observable&) {}
+  };
+
+  std::thread cobThread;
+  {
+    auto observer =
+        folly::observer::ObserverCreator<Observable, Traits>().getObserver();
+
+    EXPECT_TRUE(updatesCob);
+    EXPECT_EQ(2, getCallsStart);
+    EXPECT_EQ(2, getCallsFinish);
+
+    updatesCob();
+    EXPECT_EQ(3, getCallsStart);
+    EXPECT_EQ(3, getCallsFinish);
+
+    slowGet = true;
+    cobThread = std::thread([] { updatesCob(); });
+    /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds{1});
+    EXPECT_EQ(4, getCallsStart);
+    EXPECT_EQ(3, getCallsFinish);
+
+    // Observer is destroyed here
+  }
+
+  // Make sure that destroying the observer actually joined the updates callback
+  EXPECT_EQ(4, getCallsStart);
+  EXPECT_EQ(4, getCallsFinish);
+  cobThread.join();
+}