folly::Observer
authorAndrii Grynenko <andrii@fb.com>
Wed, 17 Aug 2016 02:25:54 +0000 (19:25 -0700)
committerFacebook Github Bot 5 <facebook-github-bot-5-bot@fb.com>
Wed, 17 Aug 2016 02:38:28 +0000 (19:38 -0700)
Summary:
This is a basic implementation of a new Observer API. I mostly see this being useful as a replacement for various configuration subscription APIs (Configerator, SMC etc.)

The library provides an Observer primitive. At any time user can take a Snapshot of data in the Observer (which is a view with the most recent version of the data). New Observer can be created by providing a generator functor. Such Observer mays depend on other Observers and its generator functor is re-run automatically every time, when at least one of the dependencies are updated. The implementation may also ignore intermediate updates and will only use the most recent version of other Observers, when re-computing Observer data.

Reviewed By: yfeldblum

Differential Revision: D3481231

fbshipit-source-id: 96c165f8081cff0141d5814ec2bc88adeb2e1e74

13 files changed:
folly/Makefile.am
folly/experimental/observer/Observable-inl.h [new file with mode: 0644]
folly/experimental/observer/Observable.h [new file with mode: 0644]
folly/experimental/observer/Observer-inl.h [new file with mode: 0644]
folly/experimental/observer/Observer.h [new file with mode: 0644]
folly/experimental/observer/SimpleObservable-inl.h [new file with mode: 0644]
folly/experimental/observer/SimpleObservable.h [new file with mode: 0644]
folly/experimental/observer/detail/Core.cpp [new file with mode: 0644]
folly/experimental/observer/detail/Core.h [new file with mode: 0644]
folly/experimental/observer/detail/Observer-pre.h [new file with mode: 0644]
folly/experimental/observer/detail/ObserverManager.cpp [new file with mode: 0644]
folly/experimental/observer/detail/ObserverManager.h [new file with mode: 0644]
folly/experimental/observer/test/ObserverTest.cpp [new file with mode: 0644]

index 16228fb279071b845e136a3471e9efad40ddbb9f..b9d5424ad700d1a49ee58678ce4559354e754f76 100644 (file)
@@ -107,6 +107,15 @@ nobase_follyinclude_HEADERS = \
        experimental/JSONSchema.h \
        experimental/LockFreeRingBuffer.h \
        experimental/NestedCommandLineApp.h \
+       experimental/observer/detail/Core.h \
+       experimental/observer/detail/ObserverManager.h \
+       experimental/observer/detail/Observer-pre.h \
+       experimental/observer/Observable.h \
+       experimental/observer/Observable-inl.h \
+       experimental/observer/Observer.h \
+       experimental/observer/Observer-inl.h \
+       experimental/observer/SimpleObservable.h \
+       experimental/observer/SimpleObservable-inl.h \
        experimental/ProgramOptions.h \
        experimental/ReadMostlySharedPtr.h \
        experimental/symbolizer/Elf.h \
@@ -469,6 +478,8 @@ libfolly_la_SOURCES = \
        experimental/io/FsUtil.cpp \
        experimental/JSONSchema.cpp \
        experimental/NestedCommandLineApp.cpp \
+       experimental/observer/detail/Core.cpp \
+       experimental/observer/detail/ObserverManager.cpp \
        experimental/ProgramOptions.cpp \
        experimental/Select64.cpp \
        experimental/TestUtil.cpp
diff --git a/folly/experimental/observer/Observable-inl.h b/folly/experimental/observer/Observable-inl.h
new file mode 100644 (file)
index 0000000..75061ac
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+namespace folly {
+namespace observer {
+
+template <typename Observable, typename Traits>
+class ObserverCreator<Observable, Traits>::Context {
+ public:
+  template <typename... Args>
+  Context(Args&&... args) : observable_(std::forward<Args>(args)...) {}
+
+  ~Context() {
+    if (value_.copy()) {
+      Traits::unsubscribe(observable_);
+    }
+  }
+
+  void setCore(observer_detail::Core::WeakPtr coreWeak) {
+    coreWeak_ = std::move(coreWeak);
+  }
+
+  std::shared_ptr<const T> get() {
+    updateRequested_ = false;
+    return value_.copy();
+  }
+
+  void update() {
+    {
+      auto newValue = Traits::get(observable_);
+      DCHECK(newValue);
+      value_.swap(newValue);
+    }
+
+    bool expected = false;
+    if (updateRequested_.compare_exchange_strong(expected, true)) {
+      if (auto core = coreWeak_.lock()) {
+        observer_detail::ObserverManager::scheduleRefreshNewVersion(
+            std::move(core));
+      }
+    }
+  }
+
+  template <typename F>
+  void subscribe(F&& callback) {
+    Traits::subscribe(observable_, std::forward<F>(callback));
+  }
+
+ private:
+  folly::Synchronized<std::shared_ptr<const T>> value_;
+  std::atomic<bool> updateRequested_{false};
+
+  observer_detail::Core::WeakPtr coreWeak_;
+
+  Observable observable_;
+};
+
+template <typename Observable, typename Traits>
+template <typename... Args>
+ObserverCreator<Observable, Traits>::ObserverCreator(Args&&... args)
+    : context_(std::make_shared<Context>(std::forward<Args>(args)...)) {}
+
+template <typename Observable, typename Traits>
+Observer<typename ObserverCreator<Observable, Traits>::T>
+ObserverCreator<Observable, Traits>::getObserver()&& {
+  auto core = observer_detail::Core::create([context = context_]() {
+    return context->get();
+  });
+
+  context_->setCore(core);
+
+  context_->subscribe([contextWeak = std::weak_ptr<Context>(context_)] {
+    if (auto context = contextWeak.lock()) {
+      context->update();
+    }
+  });
+
+  context_->update();
+  context_.reset();
+
+  DCHECK(core->getVersion() > 0);
+
+  return Observer<T>(std::move(core));
+}
+}
+}
diff --git a/folly/experimental/observer/Observable.h b/folly/experimental/observer/Observable.h
new file mode 100644 (file)
index 0000000..f1aaf0a
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/observer/Observer.h>
+
+namespace folly {
+namespace observer {
+
+template <typename Observable>
+struct ObservableTraits {
+  using element_type =
+      typename std::remove_reference<Observable>::type::element_type;
+
+  static std::shared_ptr<const element_type> get(Observable& observable) {
+    return observable.get();
+  }
+
+  template <typename F>
+  static void subscribe(Observable& observable, F&& callback) {
+    observable.subscribe(std::forward<F>(callback));
+  }
+
+  static void unsubscribe(Observable& observable) {
+    observable.unsubscribe();
+  }
+};
+
+template <typename Observable, typename Traits = ObservableTraits<Observable>>
+class ObserverCreator {
+ public:
+  using T = typename Traits::element_type;
+
+  template <typename... Args>
+  explicit ObserverCreator(Args&&... args);
+
+  Observer<T> getObserver() &&;
+
+ private:
+  class Context;
+
+  std::shared_ptr<Context> context_;
+};
+}
+}
+
+#include <folly/experimental/observer/Observable-inl.h>
diff --git a/folly/experimental/observer/Observer-inl.h b/folly/experimental/observer/Observer-inl.h
new file mode 100644 (file)
index 0000000..eb1338d
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/observer/detail/ObserverManager.h>
+
+namespace folly {
+namespace observer {
+
+template <typename T>
+Snapshot<T> Observer<T>::getSnapshot() const {
+  auto data = core_->getData();
+  return Snapshot<T>(
+      *core_,
+      std::static_pointer_cast<const T>(std::move(data.data)),
+      data.version);
+}
+
+template <typename T>
+Observer<T>::Observer(observer_detail::Core::Ptr core)
+    : core_(std::move(core)) {}
+
+template <typename F>
+Observer<observer_detail::ResultOfUnwrapSharedPtr<F>> makeObserver(
+    F&& creator) {
+  auto core = observer_detail::Core::
+      create([creator = std::forward<F>(creator)]() mutable {
+        return std::static_pointer_cast<void>(creator());
+      });
+
+  observer_detail::ObserverManager::scheduleRefreshNewVersion(core);
+
+  return Observer<observer_detail::ResultOfUnwrapSharedPtr<F>>(core);
+}
+
+template <typename F>
+Observer<observer_detail::ResultOf<F>> makeObserver(F&& creator) {
+  return makeObserver([creator = std::forward<F>(creator)]() mutable {
+    return std::make_shared<observer_detail::ResultOf<F>>(creator());
+  });
+}
+
+template <typename T>
+TLObserver<T>::TLObserver(Observer<T> observer)
+    : observer_(observer),
+      snapshot_([&] { return new Snapshot<T>(observer_.getSnapshot()); }) {}
+
+template <typename T>
+const Snapshot<T>& TLObserver<T>::getSnapshotRef() const {
+  auto& snapshot = *snapshot_;
+  if (observer_.needRefresh(snapshot) ||
+      observer_detail::ObserverManager::inManagerThread()) {
+    snapshot = observer_.getSnapshot();
+  }
+
+  return snapshot;
+}
+}
+}
diff --git a/folly/experimental/observer/Observer.h b/folly/experimental/observer/Observer.h
new file mode 100644 (file)
index 0000000..8037b24
--- /dev/null
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/ThreadLocal.h>
+#include <folly/experimental/observer/detail/Core.h>
+#include <folly/experimental/observer/detail/Observer-pre.h>
+
+namespace folly {
+namespace observer {
+
+/**
+ * Observer - a library which lets you create objects which track updates of
+ * their dependencies and get re-computed when any of the dependencies changes.
+ *
+ *
+ * Given an Observer, you can get a snapshot of the current version of the
+ * object it holds:
+ *
+ *   Observer<int> myObserver = ...;
+ *   Snapshot<int> mySnapshot = myObserver.getSnapshot();
+ * or simply
+ *   Snapshot<int> mySnapshot = *myObserver;
+ *
+ * Snapshot will hold a view of the object, even if object in the Observer
+ * gets updated.
+ *
+ *
+ * What makes Observer powerful is its ability to track updates to other
+ * Observers. Imagine we have two separate Observers A and B which hold
+ * integers.
+ *
+ *   Observer<int> observerA = ...;
+ *   Observer<int> observerB = ...;
+ *
+ * To compute a sum of A and B we can create a new Observer which would track
+ * updates to A and B and re-compute the sum only when necessary.
+ *
+ *   Observer<int> sumObserver = makeObserver(
+ *       [observerA, observerB] {
+ *         int a = **observerA;
+ *         int b = **observerB;
+ *         return a + b;
+ *       });
+ *
+ *   int sum = **sumObserver;
+ *
+ * Notice that a + b will be only called when either a or b is changed. Getting
+ * a snapshot from sumObserver won't trigger any re-computation.
+ *
+ *
+ * TLObserver is very similar to Observer, but it also keeps a thread-local
+ * cache for the observed object.
+ *
+ *   Observer<int> observer = ...;
+ *   TLObserver<int> tlObserver(observer);
+ *   auto& snapshot = *tlObserver;
+ *
+ *
+ * See ObserverCreator class if you want to wrap any existing subscription API
+ * in an Observer object.
+ */
+template <typename T>
+class Observer;
+
+template <typename T>
+class Snapshot {
+ public:
+  const T& operator*() const {
+    return *get();
+  }
+
+  const T* operator->() const {
+    return get();
+  }
+
+  const T* get() const {
+    return data_.get();
+  }
+
+  /**
+   * Return the version of the observed object.
+   */
+  size_t getVersion() const {
+    return version_;
+  }
+
+ private:
+  friend class Observer<T>;
+
+  Snapshot(
+      const observer_detail::Core& core,
+      std::shared_ptr<const T> data,
+      size_t version)
+      : data_(std::move(data)), version_(version), core_(&core) {
+    DCHECK(data_);
+  }
+
+  std::shared_ptr<const T> data_;
+  size_t version_;
+  const observer_detail::Core* core_;
+};
+
+template <typename T>
+class Observer {
+ public:
+  explicit Observer(observer_detail::Core::Ptr core);
+
+  Snapshot<T> getSnapshot() const;
+  Snapshot<T> operator*() const {
+    return getSnapshot();
+  }
+
+  /**
+   * Check if we have a newer version of the observed object than the snapshot.
+   * Snapshot should have been originally from this Observer.
+   */
+  bool needRefresh(const Snapshot<T>& snapshot) const {
+    DCHECK_EQ(core_.get(), snapshot.core_);
+    return snapshot.getVersion() < core_->getVersionLastChange();
+  }
+
+ private:
+  observer_detail::Core::Ptr core_;
+};
+
+/**
+ * makeObserver(...) creates a new Observer<T> object given a functor to
+ * compute it. The functor can return T or std::shared_ptr<const T>.
+ *
+ * makeObserver(...) blocks until the initial version of Observer is computed.
+ * If creator functor fails (throws or returns a nullptr) during this first
+ * call, the exception is re-thrown by makeObserver(...).
+ *
+ * For all subsequent updates if creator functor fails (throws or returs a
+ * nullptr), the Observer (and all its dependents) is not updated.
+ */
+template <typename F>
+Observer<observer_detail::ResultOf<F>> makeObserver(F&& creator);
+
+template <typename F>
+Observer<observer_detail::ResultOfUnwrapSharedPtr<F>> makeObserver(F&& creator);
+
+template <typename T>
+class TLObserver {
+ public:
+  explicit TLObserver(Observer<T> observer);
+
+  const Snapshot<T>& getSnapshotRef() const;
+  const Snapshot<T>& operator*() const {
+    return getSnapshotRef();
+  }
+
+ private:
+  Observer<T> observer_;
+  folly::ThreadLocal<Snapshot<T>> snapshot_;
+};
+
+/**
+ * Same as makeObserver(...), but creates TLObserver.
+ */
+template <typename T>
+TLObserver<T> makeTLObserver(Observer<T> observer) {
+  return TLObserver<T>(std::move(observer));
+}
+
+template <typename F>
+auto makeTLObserver(F&& creator) {
+  return makeTLObserver(makeObserver(std::forward<F>(creator)));
+}
+
+template <typename T, bool CacheInThreadLocal>
+struct ObserverTraits {};
+
+template <typename T>
+struct ObserverTraits<T, false> {
+  using type = Observer<T>;
+};
+
+template <typename T>
+struct ObserverTraits<T, true> {
+  using type = TLObserver<T>;
+};
+
+template <typename T, bool CacheInThreadLocal>
+using ObserverT = typename ObserverTraits<T, CacheInThreadLocal>::type;
+}
+}
+
+#include <folly/experimental/observer/Observer-inl.h>
diff --git a/folly/experimental/observer/SimpleObservable-inl.h b/folly/experimental/observer/SimpleObservable-inl.h
new file mode 100644 (file)
index 0000000..463e85b
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/observer/Observable.h>
+
+namespace folly {
+namespace observer {
+
+template <typename T>
+SimpleObservable<T>::SimpleObservable(T value)
+    : context_(std::make_shared<Context>()) {
+  setValue(std::move(value));
+}
+
+template <typename T>
+SimpleObservable<T>::SimpleObservable(std::shared_ptr<const T> value)
+    : context_(std::make_shared<Context>()) {
+  setValue(std::move(value));
+}
+
+template <typename T>
+void SimpleObservable<T>::setValue(T value) {
+  setValue(std::make_shared<const T>(std::move(value)));
+}
+
+template <typename T>
+void SimpleObservable<T>::setValue(std::shared_ptr<const T> value) {
+  context_->value_.swap(value);
+
+  context_->callback_.withWLock([](folly::Function<void()>& callback) {
+    if (callback) {
+      callback();
+    }
+  });
+}
+
+template <typename T>
+Observer<T> SimpleObservable<T>::getObserver() {
+  struct SimpleObservableWrapper {
+    using element_type = T;
+
+    std::shared_ptr<Context> context;
+
+    std::shared_ptr<const T> get() {
+      return context->value_.copy();
+    }
+
+    void subscribe(folly::Function<void()> callback) {
+      context->callback_.swap(callback);
+    }
+
+    void unsubscribe() {
+      folly::Function<void()> empty;
+      context->callback_.swap(empty);
+    }
+  };
+
+  std::call_once(observerInit_, [&]() {
+    SimpleObservableWrapper wrapper;
+    wrapper.context = context_;
+    ObserverCreator<SimpleObservableWrapper> creator(std::move(wrapper));
+    observer_ = std::move(creator).getObserver();
+  });
+  return *observer_;
+}
+}
+}
diff --git a/folly/experimental/observer/SimpleObservable.h b/folly/experimental/observer/SimpleObservable.h
new file mode 100644 (file)
index 0000000..1feb21d
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/Function.h>
+#include <folly/Synchronized.h>
+#include <folly/experimental/observer/Observer.h>
+
+namespace folly {
+namespace observer {
+
+template <typename T>
+class SimpleObservable {
+ public:
+  explicit SimpleObservable(T value);
+  explicit SimpleObservable(std::shared_ptr<const T> value);
+
+  void setValue(T value);
+  void setValue(std::shared_ptr<const T> value);
+
+  Observer<T> getObserver();
+
+ private:
+  struct Context {
+    folly::Synchronized<std::shared_ptr<const T>> value_;
+    folly::Synchronized<folly::Function<void()>> callback_;
+  };
+  std::shared_ptr<Context> context_;
+
+  std::once_flag observerInit_;
+  folly::Optional<Observer<T>> observer_;
+};
+}
+}
+
+#include <folly/experimental/observer/SimpleObservable-inl.h>
diff --git a/folly/experimental/observer/detail/Core.cpp b/folly/experimental/observer/detail/Core.cpp
new file mode 100644 (file)
index 0000000..c4e317e
--- /dev/null
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/observer/detail/Core.h>
+#include <folly/experimental/observer/detail/ObserverManager.h>
+
+namespace folly {
+namespace observer_detail {
+
+Core::VersionedData Core::getData() {
+  if (!ObserverManager::inManagerThread()) {
+    return data_.copy();
+  }
+
+  ObserverManager::DependencyRecorder::markDependency(shared_from_this());
+
+  auto version = ObserverManager::getVersion();
+
+  if (version_ >= version) {
+    return data_.copy();
+  }
+
+  refresh(version);
+
+  DCHECK_GE(version_, version);
+  return data_.copy();
+}
+
+size_t Core::refresh(size_t version, bool force) {
+  CHECK(ObserverManager::inManagerThread());
+
+  if (version_ >= version) {
+    return versionLastChange_;
+  }
+
+  bool refreshDependents = false;
+
+  {
+    std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
+
+    // Recheck in case this code was already refreshed
+    if (version_ >= version) {
+      return versionLastChange_;
+    }
+
+    bool needRefresh = force || version_ == 0;
+
+    // This can be run in parallel, but we expect most updates to propagate
+    // bottom to top.
+    dependencies_.withRLock([&](const Dependencies& dependencies) {
+      for (const auto& dependency : dependencies) {
+        if (dependency->refresh(version) > version_) {
+          needRefresh = true;
+          break;
+        }
+      }
+    });
+
+    if (!needRefresh) {
+      version_ = version;
+      return versionLastChange_;
+    }
+
+    ObserverManager::DependencyRecorder dependencyRecorder;
+
+    try {
+      {
+        VersionedData newData{creator_(), version};
+        if (!newData.data) {
+          throw std::logic_error("Observer creator returned nullptr.");
+        }
+        data_.swap(newData);
+      }
+
+      versionLastChange_ = version;
+      refreshDependents = true;
+    } catch (...) {
+      LOG(ERROR) << "Exception while refreshing Observer: "
+                 << exceptionStr(std::current_exception());
+
+      if (version_ == 0) {
+        // Re-throw exception if this is the first time we run creator
+        throw;
+      }
+    }
+
+    version_ = version;
+
+    auto newDependencies = dependencyRecorder.release();
+    dependencies_.withWLock([&](Dependencies& dependencies) {
+      for (const auto& dependency : newDependencies) {
+        if (!dependencies.count(dependency)) {
+          dependency->addDependent(this->shared_from_this());
+        }
+      }
+
+      for (const auto& dependency : dependencies) {
+        if (!newDependencies.count(dependency)) {
+          dependency->removeStaleDependents();
+        }
+      }
+
+      dependencies = std::move(newDependencies);
+    });
+  }
+
+  if (refreshDependents) {
+    auto dependents = dependents_.copy();
+
+    for (const auto& dependentWeak : dependents) {
+      if (auto dependent = dependentWeak.lock()) {
+        ObserverManager::scheduleRefresh(std::move(dependent), version);
+      }
+    }
+  }
+
+  return versionLastChange_;
+}
+
+Core::Core(folly::Function<std::shared_ptr<const void>()> creator)
+    : creator_(std::move(creator)) {}
+
+Core::~Core() {
+  dependencies_.withWLock([](const Dependencies& dependencies) {
+    for (const auto& dependecy : dependencies) {
+      dependecy->removeStaleDependents();
+    }
+  });
+}
+
+Core::Ptr Core::create(folly::Function<std::shared_ptr<const void>()> creator) {
+  auto core = Core::Ptr(new Core(std::move(creator)));
+  return core;
+}
+
+void Core::addDependent(Core::WeakPtr dependent) {
+  dependents_.withWLock([&](Dependents& dependents) {
+    dependents.push_back(std::move(dependent));
+  });
+}
+
+void Core::removeStaleDependents() {
+  // This is inefficient, the assumption is that we won't have many dependents
+  dependents_.withWLock([](Dependents& dependents) {
+    for (size_t i = 0; i < dependents.size(); ++i) {
+      if (dependents[i].expired()) {
+        std::swap(dependents[i], dependents.back());
+        dependents.pop_back();
+        --i;
+      }
+    }
+  });
+}
+}
+}
diff --git a/folly/experimental/observer/detail/Core.h b/folly/experimental/observer/detail/Core.h
new file mode 100644 (file)
index 0000000..2d1dd7f
--- /dev/null
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/Synchronized.h>
+#include <folly/futures/Future.h>
+#include <set>
+
+namespace folly {
+namespace observer_detail {
+
+class ObserverManager;
+
+/**
+ * Core stores the current version of the object held by Observer. It also keeps
+ * all dependencies and dependents of the Observer.
+ */
+class Core : public std::enable_shared_from_this<Core> {
+ public:
+  using Ptr = std::shared_ptr<Core>;
+  using WeakPtr = std::weak_ptr<Core>;
+
+  /**
+   * Blocks until creator is successfully run by ObserverManager
+   */
+  static Ptr create(folly::Function<std::shared_ptr<const void>()> creator);
+
+  /**
+   * View of the observed object and its version
+   */
+  struct VersionedData {
+    VersionedData() {}
+
+    VersionedData(std::shared_ptr<const void> data_, size_t version_)
+        : data(std::move(data_)), version(version_) {}
+
+    std::shared_ptr<const void> data;
+    size_t version{0};
+  };
+
+  /**
+   * Gets current view of the observed object.
+   * This is safe to call from any thread. If this is called from other Observer
+   * functor then that Observer is marked as dependent on current Observer.
+   */
+  VersionedData getData();
+
+  /**
+   * Gets the version of the observed object.
+   */
+  size_t getVersion() const {
+    return version_;
+  }
+
+  /**
+   * Get the last version at which the observed object was actually changed.
+   */
+  size_t getVersionLastChange() {
+    return versionLastChange_;
+  }
+
+  /**
+   * Check if the observed object needs to be re-computed. Returns the version
+   * of last change. If force is true, re-computes the observed object, even if
+   * dependencies didn't change.
+   *
+   * This should be only called from ObserverManager thread.
+   */
+  size_t refresh(size_t version, bool force = false);
+
+  ~Core();
+
+ private:
+  explicit Core(folly::Function<std::shared_ptr<const void>()> creator);
+
+  void addDependent(Core::WeakPtr dependent);
+  void removeStaleDependents();
+
+  using Dependents = std::vector<WeakPtr>;
+  using Dependencies = std::unordered_set<Ptr>;
+
+  folly::Synchronized<Dependents> dependents_;
+  folly::Synchronized<Dependencies> dependencies_;
+
+  std::atomic<size_t> version_{0};
+  std::atomic<size_t> versionLastChange_{0};
+
+  folly::Synchronized<VersionedData> data_;
+
+  folly::Function<std::shared_ptr<const void>()> creator_;
+
+  std::mutex refreshMutex_;
+};
+}
+}
diff --git a/folly/experimental/observer/detail/Observer-pre.h b/folly/experimental/observer/detail/Observer-pre.h
new file mode 100644 (file)
index 0000000..506306a
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+namespace folly {
+namespace observer_detail {
+
+template <typename T>
+struct NonSharedPtr {
+  using type = typename std::decay<T>::type;
+};
+
+template <typename T>
+struct NonSharedPtr<std::shared_ptr<T>> {};
+
+template <typename T>
+struct UnwrapSharedPtr {};
+
+template <typename T>
+struct UnwrapSharedPtr<std::shared_ptr<T>> {
+  using type = typename std::decay<T>::type;
+};
+
+template <typename F>
+using ResultOf =
+    typename NonSharedPtr<typename std::result_of<F()>::type>::type;
+
+template <typename F>
+using ResultOfUnwrapSharedPtr =
+    typename UnwrapSharedPtr<typename std::result_of<F()>::type>::type;
+}
+}
diff --git a/folly/experimental/observer/detail/ObserverManager.cpp b/folly/experimental/observer/detail/ObserverManager.cpp
new file mode 100644 (file)
index 0000000..24acaba
--- /dev/null
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/observer/detail/ObserverManager.h>
+
+#include <folly/MPMCQueue.h>
+#include <folly/Singleton.h>
+
+namespace folly {
+namespace observer_detail {
+
+FOLLY_TLS bool ObserverManager::inManagerThread_{false};
+FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies*
+    ObserverManager::DependencyRecorder::currentDependencies_{nullptr};
+
+namespace {
+constexpr size_t kCurrentThreadPoolSize{4};
+constexpr size_t kCurrentQueueSize{10 * 1024};
+constexpr size_t kNextQueueSize{10 * 1024};
+}
+
+class ObserverManager::CurrentQueue {
+ public:
+  CurrentQueue() : queue_(kCurrentQueueSize) {
+    for (size_t i = 0; i < kCurrentThreadPoolSize; ++i) {
+      threads_.emplace_back([&]() {
+        ObserverManager::inManagerThread_ = true;
+
+        while (true) {
+          Function<void()> task;
+          queue_.blockingRead(task);
+
+          if (!task) {
+            return;
+          }
+
+          try {
+            task();
+          } catch (...) {
+            LOG(ERROR) << "Exception while running CurrentQueue task: "
+                       << exceptionStr(std::current_exception());
+          }
+        }
+      });
+    }
+  }
+
+  ~CurrentQueue() {
+    for (size_t i = 0; i < threads_.size(); ++i) {
+      queue_.blockingWrite(nullptr);
+    }
+
+    for (auto& thread : threads_) {
+      thread.join();
+    }
+
+    CHECK(queue_.isEmpty());
+  }
+
+  void add(Function<void()> task) {
+    if (ObserverManager::inManagerThread()) {
+      if (!queue_.write(std::move(task))) {
+        throw std::runtime_error("Too many Observers scheduled for update.");
+      }
+    } else {
+      queue_.blockingWrite(std::move(task));
+    }
+  }
+
+ private:
+  MPMCQueue<Function<void()>> queue_;
+  std::vector<std::thread> threads_;
+};
+
+class ObserverManager::NextQueue {
+ public:
+  explicit NextQueue(ObserverManager& manager)
+      : manager_(manager), queue_(kNextQueueSize) {
+    thread_ = std::thread([&]() {
+      Core::Ptr queueCore;
+
+      while (true) {
+        queue_.blockingRead(queueCore);
+
+        if (!queueCore) {
+          return;
+        }
+
+        std::vector<Core::Ptr> cores;
+        cores.emplace_back(std::move(queueCore));
+
+        {
+          SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
+
+          // We can't pick more tasks from the queue after we bumped the
+          // version, so we have to do this while holding the lock.
+          while (cores.size() < kNextQueueSize && queue_.read(queueCore)) {
+            if (!queueCore) {
+              return;
+            }
+            cores.emplace_back(std::move(queueCore));
+          }
+
+          ++manager_.version_;
+        }
+
+        for (auto& core : cores) {
+          manager_.scheduleRefresh(std::move(core), manager_.version_, true);
+        }
+      }
+    });
+  }
+
+  void add(Core::Ptr core) {
+    queue_.blockingWrite(std::move(core));
+  }
+
+  ~NextQueue() {
+    // Emtpy element signals thread to terminate
+    queue_.blockingWrite(nullptr);
+    thread_.join();
+  }
+
+ private:
+  ObserverManager& manager_;
+  MPMCQueue<Core::Ptr> queue_;
+  std::thread thread_;
+};
+
+ObserverManager::ObserverManager() {
+  currentQueue_ = make_unique<CurrentQueue>();
+  nextQueue_ = make_unique<NextQueue>(*this);
+}
+
+ObserverManager::~ObserverManager() {
+  // Destroy NextQueue, before the rest of this object, since it expects
+  // ObserverManager to be alive.
+  nextQueue_.release();
+  currentQueue_.release();
+}
+
+void ObserverManager::scheduleCurrent(Function<void()> task) {
+  currentQueue_->add(std::move(task));
+}
+
+void ObserverManager::scheduleNext(Core::Ptr core) {
+  nextQueue_->add(std::move(core));
+}
+
+struct ObserverManager::Singleton {
+  static folly::Singleton<ObserverManager> instance;
+};
+
+folly::Singleton<ObserverManager> ObserverManager::Singleton::instance([] {
+  return new ObserverManager();
+});
+
+std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
+  return Singleton::instance.try_get();
+}
+}
+}
diff --git a/folly/experimental/observer/detail/ObserverManager.h b/folly/experimental/observer/detail/ObserverManager.h
new file mode 100644 (file)
index 0000000..6319bed
--- /dev/null
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/observer/detail/Core.h>
+#include <folly/futures/Future.h>
+
+namespace folly {
+namespace observer_detail {
+
+/**
+ * ObserverManager is a singleton which controls the re-computation of all
+ * Observers. Such re-computation always happens on the thread pool owned by
+ * ObserverManager.
+ *
+ * ObserverManager has global current version. All existing Observers
+ * may have their version be less (yet to be updated) or equal (up to date)
+ * to the global current version.
+ *
+ * ObserverManager::CurrentQueue contains all of the Observers which need to be
+ * updated to the global current version. Those updates are peformed on the
+ * ObserverManager's thread pool, until the queue is empty. If some Observer is
+ * updated, all of its dependents are added to ObserverManager::CurrentQueue
+ * to be updated.
+ *
+ * If some leaf Observer (i.e. created from Observable) is updated, then current
+ * version of the ObserverManager should be bumped. All such updated leaf
+ * Observers are added to the ObserverManager::NextQueue.
+ *
+ * *Only* when ObserverManager::CurrentQueue is empty, the global current
+ * version is bumped and all updates from the ObserverManager::NextQueue are
+ * performed. If leaf Observer gets updated more then once before being picked
+ * from the ObserverManager::NextQueue, then only the last update is processed.
+ */
+class ObserverManager {
+ public:
+  static size_t getVersion() {
+    auto instance = getInstance();
+
+    if (!instance) {
+      return 1;
+    }
+
+    return instance->version_;
+  }
+
+  static bool inManagerThread() {
+    return inManagerThread_;
+  }
+
+  static Future<Unit>
+  scheduleRefresh(Core::Ptr core, size_t minVersion, bool force = false) {
+    if (core->getVersion() >= minVersion) {
+      return makeFuture<Unit>(Unit());
+    }
+
+    auto instance = getInstance();
+
+    if (!instance) {
+      return makeFuture<Unit>(
+          std::logic_error("ObserverManager requested during shutdown"));
+    }
+
+    Promise<Unit> promise;
+    auto future = promise.getFuture();
+
+    SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_);
+
+    instance->scheduleCurrent([
+      core = std::move(core),
+      promise = std::move(promise),
+      instancePtr = instance.get(),
+      rh = std::move(rh),
+      force
+    ]() mutable {
+      promise.setWith([&]() { core->refresh(instancePtr->version_, force); });
+    });
+
+    return future;
+  }
+
+  static void scheduleRefreshNewVersion(Core::Ptr core) {
+    if (core->getVersion() == 0) {
+      scheduleRefresh(std::move(core), 1).get();
+      return;
+    }
+
+    auto instance = getInstance();
+
+    if (!instance) {
+      return;
+    }
+
+    instance->scheduleNext(std::move(core));
+  }
+
+  class DependencyRecorder {
+   public:
+    using Dependencies = std::unordered_set<Core::Ptr>;
+
+    DependencyRecorder() {
+      DCHECK(inManagerThread());
+
+      previousDepedencies_ = currentDependencies_;
+      currentDependencies_ = &dependencies_;
+    }
+
+    static void markDependency(Core::Ptr dependency) {
+      DCHECK(inManagerThread());
+      DCHECK(currentDependencies_);
+
+      currentDependencies_->insert(std::move(dependency));
+    }
+
+    Dependencies release() {
+      DCHECK(currentDependencies_ == &dependencies_);
+      std::swap(currentDependencies_, previousDepedencies_);
+      previousDepedencies_ = nullptr;
+
+      return std::move(dependencies_);
+    }
+
+    ~DependencyRecorder() {
+      if (previousDepedencies_) {
+        release();
+      }
+    }
+
+   private:
+    Dependencies dependencies_;
+    Dependencies* previousDepedencies_;
+
+    static FOLLY_TLS Dependencies* currentDependencies_;
+  };
+
+  ~ObserverManager();
+
+ private:
+  ObserverManager();
+
+  struct Singleton;
+
+  void scheduleCurrent(Function<void()>);
+  void scheduleNext(Core::Ptr);
+
+  class CurrentQueue;
+  class NextQueue;
+
+  std::unique_ptr<CurrentQueue> currentQueue_;
+  std::unique_ptr<NextQueue> nextQueue_;
+
+  static std::shared_ptr<ObserverManager> getInstance();
+  static FOLLY_TLS bool inManagerThread_;
+
+  /**
+   * Version mutex is used to make sure all updates are processed from the
+   * CurrentQueue, before bumping the version and moving to the NextQueue.
+   *
+   * To achieve this every task added to CurrentQueue holds a reader lock.
+   * NextQueue grabs a writer lock before bumping the version, so it can only
+   * happen if CurrentQueue is empty (notice that we use read-priority shared
+   * mutex).
+   */
+  SharedMutexReadPriority versionMutex_;
+  std::atomic<size_t> version_{1};
+};
+}
+}
diff --git a/folly/experimental/observer/test/ObserverTest.cpp b/folly/experimental/observer/test/ObserverTest.cpp
new file mode 100644 (file)
index 0000000..abfbc36
--- /dev/null
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <gtest/gtest.h>
+
+#include <thread>
+
+#include <folly/Baton.h>
+#include <folly/experimental/observer/SimpleObservable.h>
+
+using namespace folly::observer;
+
+TEST(Observer, Observable) {
+  SimpleObservable<int> observable(42);
+  auto observer = observable.getObserver();
+
+  EXPECT_EQ(42, **observer);
+
+  folly::Baton<> baton;
+  auto waitingObserver = makeObserver([observer, &baton]() {
+    *observer;
+    baton.post();
+    return folly::Unit();
+  });
+  baton.reset();
+
+  observable.setValue(24);
+
+  EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+
+  EXPECT_EQ(24, **observer);
+}
+
+TEST(Observer, MakeObserver) {
+  SimpleObservable<int> observable(42);
+
+  auto observer = makeObserver([child = observable.getObserver()]() {
+    return **child + 1;
+  });
+
+  EXPECT_EQ(43, **observer);
+
+  folly::Baton<> baton;
+  auto waitingObserver = makeObserver([observer, &baton]() {
+    *observer;
+    baton.post();
+    return folly::Unit();
+  });
+  baton.reset();
+
+  observable.setValue(24);
+
+  EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+
+  EXPECT_EQ(25, **observer);
+}
+
+TEST(Observer, MakeObserverDiamond) {
+  SimpleObservable<int> observable(42);
+
+  auto observer1 = makeObserver([child = observable.getObserver()]() {
+    return **child + 1;
+  });
+
+  auto observer2 = makeObserver([child = observable.getObserver()]() {
+    return std::make_shared<int>(**child + 2);
+  });
+
+  auto observer = makeObserver(
+      [observer1, observer2]() { return (**observer1) * (**observer2); });
+
+  EXPECT_EQ(43 * 44, *observer.getSnapshot());
+
+  folly::Baton<> baton;
+  auto waitingObserver = makeObserver([observer, &baton]() {
+    *observer;
+    baton.post();
+    return folly::Unit();
+  });
+  baton.reset();
+
+  observable.setValue(24);
+
+  EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+
+  EXPECT_EQ(25 * 26, **observer);
+}
+
+TEST(Observer, CreateException) {
+  struct ExpectedException {};
+  EXPECT_THROW(
+      auto observer = makeObserver(
+          []() -> std::shared_ptr<int> { throw ExpectedException(); }),
+      ExpectedException);
+
+  EXPECT_THROW(
+      auto observer =
+          makeObserver([]() -> std::shared_ptr<int> { return nullptr; }),
+      std::logic_error);
+}
+
+TEST(Observer, NullValue) {
+  SimpleObservable<int> observable(41);
+  auto oddObserver = makeObserver([innerObserver = observable.getObserver()]() {
+    auto value = **innerObserver;
+
+    if (value % 2 != 0) {
+      return value * 2;
+    }
+
+    throw std::logic_error("I prefer odd numbers");
+  });
+
+  folly::Baton<> baton;
+  auto waitingObserver = makeObserver([oddObserver, &baton]() {
+    *oddObserver;
+    baton.post();
+    return folly::Unit();
+  });
+
+  baton.reset();
+  EXPECT_EQ(82, **oddObserver);
+
+  observable.setValue(2);
+
+  // Waiting observer shouldn't be updated
+  EXPECT_FALSE(baton.timed_wait(std::chrono::seconds{1}));
+  baton.reset();
+
+  EXPECT_EQ(82, **oddObserver);
+
+  observable.setValue(23);
+
+  EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+
+  EXPECT_EQ(46, **oddObserver);
+}
+
+TEST(Observer, Stress) {
+  SimpleObservable<int> observable(0);
+
+  folly::Synchronized<std::vector<int>> values;
+
+  auto observer = makeObserver([ child = observable.getObserver(), &values ]() {
+    auto value = **child * 10;
+    values.withWLock(
+        [&](std::vector<int>& values) { values.push_back(value); });
+    return value;
+  });
+
+  EXPECT_EQ(0, **observer);
+  values.withRLock([](const std::vector<int>& values) {
+    EXPECT_EQ(1, values.size());
+    EXPECT_EQ(0, values.back());
+  });
+
+  constexpr size_t numIters = 10000;
+
+  for (size_t i = 1; i <= numIters; ++i) {
+    observable.setValue(i);
+  }
+
+  while (**observer != numIters * 10) {
+    std::this_thread::yield();
+  }
+
+  values.withRLock([numIters = numIters](const std::vector<int>& values) {
+    EXPECT_EQ(numIters * 10, values.back());
+    EXPECT_LT(values.size(), numIters / 2);
+    EXPECT_GT(values.size(), 10);
+
+    for (auto value : values) {
+      EXPECT_EQ(0, value % 10);
+    }
+
+    for (size_t i = 0; i < values.size() - 1; ++i) {
+      EXPECT_LE(values[i], values[i + 1]);
+    }
+  });
+}