/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2016-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <thread>
-#include <folly/Baton.h>
#include <folly/experimental/observer/SimpleObservable.h>
#include <folly/portability/GTest.h>
+#include <folly/synchronization/Baton.h>
using namespace folly::observer;
observable.setValue(24);
- EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+ EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
EXPECT_EQ(24, **observer);
}
observable.setValue(24);
- EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+ EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
EXPECT_EQ(25, **observer);
}
observable.setValue(24);
- EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+ EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
EXPECT_EQ(25 * 26, **observer);
}
observable.setValue(2);
// Waiting observer shouldn't be updated
- EXPECT_FALSE(baton.timed_wait(std::chrono::seconds{1}));
+ EXPECT_FALSE(baton.try_wait_for(std::chrono::seconds{1}));
baton.reset();
EXPECT_EQ(82, **oddObserver);
observable.setValue(23);
- EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+ EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
EXPECT_EQ(46, **oddObserver);
}
for (size_t i = 1; i <= 3; ++i) {
observable.setValue(i);
- EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+ EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
baton.reset();
EXPECT_EQ(i, **collectObserver);
TEST(Observer, Stress) {
SimpleObservable<int> observable(0);
- folly::Synchronized<std::vector<int>> values;
+ auto values = std::make_shared<folly::Synchronized<std::vector<int>>>();
- auto observer = makeObserver([ child = observable.getObserver(), &values ]() {
+ auto observer = makeObserver([ child = observable.getObserver(), values ]() {
auto value = **child * 10;
- values.withWLock(
+ values->withWLock(
[&](std::vector<int>& values) { values.push_back(value); });
return value;
});
EXPECT_EQ(0, **observer);
- values.withRLock([](const std::vector<int>& values) {
+ values->withRLock([](const std::vector<int>& values) {
EXPECT_EQ(1, values.size());
EXPECT_EQ(0, values.back());
});
std::this_thread::yield();
}
- values.withRLock([numIters = numIters](const std::vector<int>& values) {
+ values->withRLock([numIters = numIters](const std::vector<int>& values) {
EXPECT_EQ(numIters * 10, values.back());
EXPECT_LT(values.size(), numIters / 2);
- EXPECT_GT(values.size(), 10);
+
+ EXPECT_EQ(0, values[0]);
+ EXPECT_EQ(numIters * 10, values.back());
for (auto value : values) {
EXPECT_EQ(0, value % 10);
}
});
}
+
+TEST(Observer, TLObserver) {
+ auto createTLObserver = [](int value) {
+ return folly::observer::makeTLObserver([=] { return value; });
+ };
+
+ auto k =
+ std::make_unique<folly::observer::TLObserver<int>>(createTLObserver(42));
+ EXPECT_EQ(42, ***k);
+ k = std::make_unique<folly::observer::TLObserver<int>>(createTLObserver(41));
+ EXPECT_EQ(41, ***k);
+}
+
+TEST(Observer, SubscribeCallback) {
+ static auto mainThreadId = std::this_thread::get_id();
+ static std::function<void()> updatesCob;
+ static bool slowGet = false;
+ static std::atomic<size_t> getCallsStart{0};
+ static std::atomic<size_t> getCallsFinish{0};
+
+ struct Observable {
+ ~Observable() {
+ EXPECT_EQ(mainThreadId, std::this_thread::get_id());
+ }
+ };
+ struct Traits {
+ using element_type = int;
+ static std::shared_ptr<const int> get(Observable&) {
+ ++getCallsStart;
+ if (slowGet) {
+ /* sleep override */ std::this_thread::sleep_for(
+ std::chrono::seconds{2});
+ }
+ ++getCallsFinish;
+ return std::make_shared<const int>(42);
+ }
+
+ static void subscribe(Observable&, std::function<void()> cob) {
+ updatesCob = std::move(cob);
+ }
+
+ static void unsubscribe(Observable&) {}
+ };
+
+ std::thread cobThread;
+ {
+ auto observer =
+ folly::observer::ObserverCreator<Observable, Traits>().getObserver();
+
+ EXPECT_TRUE(updatesCob);
+ EXPECT_EQ(2, getCallsStart);
+ EXPECT_EQ(2, getCallsFinish);
+
+ updatesCob();
+ EXPECT_EQ(3, getCallsStart);
+ EXPECT_EQ(3, getCallsFinish);
+
+ slowGet = true;
+ cobThread = std::thread([] { updatesCob(); });
+ /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds{1});
+ EXPECT_EQ(4, getCallsStart);
+ EXPECT_EQ(3, getCallsFinish);
+
+ // Observer is destroyed here
+ }
+
+ // Make sure that destroying the observer actually joined the updates callback
+ EXPECT_EQ(4, getCallsStart);
+ EXPECT_EQ(4, getCallsFinish);
+ cobThread.join();
+}