move rx to folly/experimental
authorJames Sedgwick <jsedgwick@fb.com>
Tue, 23 Sep 2014 12:48:08 +0000 (05:48 -0700)
committerAnton Likhtarov <alikhtarov@fb.com>
Fri, 26 Sep 2014 22:21:29 +0000 (15:21 -0700)
Summary: As above. I want to use this for the thread pools and it probably belongs in folly long-term anywya (if we stick with it)

Test Plan: compiled the one user

Reviewed By: hans@fb.com

Subscribers: fugalh, mwa, jgehring, fuegen, njormrod

FB internal diff: D1560578

folly/experimental/wangle/rx/Observable.h [new file with mode: 0644]
folly/experimental/wangle/rx/Observer.h [new file with mode: 0644]
folly/experimental/wangle/rx/README [new file with mode: 0644]
folly/experimental/wangle/rx/Subject.h [new file with mode: 0644]
folly/experimental/wangle/rx/Subscription.h [new file with mode: 0644]
folly/experimental/wangle/rx/types.h [new file with mode: 0644]

diff --git a/folly/experimental/wangle/rx/Observable.h b/folly/experimental/wangle/rx/Observable.h
new file mode 100644 (file)
index 0000000..4769007
--- /dev/null
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "types.h"
+#include "Subject.h"
+#include "Subscription.h"
+
+#include <folly/wangle/Executor.h>
+#include <list>
+#include <memory>
+
+namespace folly { namespace wangle {
+
+template <class T>
+struct Observable {
+  virtual ~Observable() = default;
+
+  /// Subscribe the given Observer to this Observable.
+  // Eventually this will return a Subscription object of some kind, in order
+  // to support cancellation. This is kinda really important. Maybe I should
+  // just do it now, using an dummy Subscription object.
+  virtual Subscription subscribe(ObserverPtr<T> o) {
+    observers_.push_back(o);
+    return Subscription();
+  }
+
+  /// Returns a new Observable that will call back on the given Scheduler.
+  /// The returned Observable must outlive the parent Observable.
+
+  // This and subscribeOn should maybe just be a first-class feature of an
+  // Observable, rather than making new ones whose lifetimes are tied to their
+  // parents. In that case it'd return a reference to this object for
+  // chaining.
+  ObservablePtr<T> observeOn(SchedulerPtr scheduler) {
+    // you're right Hannes, if we have Observable::create we don't need this
+    // helper class.
+    struct ViaSubject : public Observable<T>
+    {
+      ViaSubject(SchedulerPtr scheduler,
+                 Observable* obs)
+        : scheduler_(scheduler), observable_(obs)
+      {}
+
+      Subscription subscribe(ObserverPtr<T> o) override {
+        return observable_->subscribe(
+          Observer<T>::create(
+            [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
+            [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); },
+            [=]() { scheduler_->add([o] { o->onCompleted(); }); }));
+      }
+
+     protected:
+      SchedulerPtr scheduler_;
+      Observable* observable_;
+    };
+
+    return std::make_shared<ViaSubject>(scheduler, this);
+  }
+
+  /// Returns a new Observable that will subscribe to this parent Observable
+  /// via the given Scheduler. This can be subtle and confusing at first, see
+  /// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn
+  std::unique_ptr<Observable> subscribeOn(SchedulerPtr scheduler) {
+    struct Subject_ : public Subject<T> {
+     public:
+      Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
+      }
+
+      Subscription subscribe(ObserverPtr<T> o) {
+        scheduler_->add([=] {
+          observable_->subscribe(o);
+        });
+        return Subscription();
+      }
+
+     protected:
+      SchedulerPtr scheduler_;
+      Observable* observable_;
+    };
+
+    return folly::make_unique<Subject_>(scheduler, this);
+  }
+
+ protected:
+  std::list<ObserverPtr<T>> observers_;
+};
+
+}}
diff --git a/folly/experimental/wangle/rx/Observer.h b/folly/experimental/wangle/rx/Observer.h
new file mode 100644 (file)
index 0000000..8d4bbb4
--- /dev/null
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "types.h"
+#include <functional>
+#include <memory>
+#include <stdexcept>
+#include <folly/Memory.h>
+
+namespace folly { namespace wangle {
+
+template <class T> class FunctionObserver;
+
+/// Observer interface. You can subclass it, or you can just use create()
+/// to use std::functions.
+template <class T>
+struct Observer {
+  // These are what it means to be an Observer.
+  virtual void onNext(T) = 0;
+  virtual void onError(Error) = 0;
+  virtual void onCompleted() = 0;
+
+  virtual ~Observer() = default;
+
+  /// Create an Observer with std::function callbacks. Handy to make ad-hoc
+  /// Observers with lambdas.
+  ///
+  /// Templated for maximum perfect forwarding flexibility, but ultimately
+  /// whatever you pass in has to implicitly become a std::function for the
+  /// same signature as onNext(), onError(), and onCompleted() respectively.
+  /// (see the FunctionObserver typedefs)
+  template <class N, class E, class C>
+  static std::unique_ptr<Observer> create(
+    N&& onNextFn, E&& onErrorFn, C&& onCompletedFn)
+  {
+    return folly::make_unique<FunctionObserver<T>>(
+      std::forward<N>(onNextFn),
+      std::forward<E>(onErrorFn),
+      std::forward<C>(onCompletedFn));
+  }
+
+  /// Create an Observer with only onNext and onError callbacks.
+  /// onCompleted will just be a no-op.
+  template <class N, class E>
+  static std::unique_ptr<Observer> create(N&& onNextFn, E&& onErrorFn) {
+    return folly::make_unique<FunctionObserver<T>>(
+      std::forward<N>(onNextFn),
+      std::forward<E>(onErrorFn),
+      nullptr);
+  }
+
+  /// Create an Observer with only an onNext callback.
+  /// onError and onCompleted will just be no-ops.
+  template <class N>
+  static std::unique_ptr<Observer> create(N&& onNextFn) {
+    return folly::make_unique<FunctionObserver<T>>(
+      std::forward<N>(onNextFn),
+      nullptr,
+      nullptr);
+  }
+};
+
+/// An observer that uses std::function callbacks. You don't really want to
+/// make one of these directly - instead use the Observer::create() methods.
+template <class T>
+struct FunctionObserver : public Observer<T> {
+  typedef std::function<void(T)> OnNext;
+  typedef std::function<void(Error)> OnError;
+  typedef std::function<void()> OnCompleted;
+
+  /// We don't need any fancy overloads of this constructor because that's
+  /// what Observer::create() is for.
+  template <class N = OnNext, class E = OnError, class C = OnCompleted>
+  FunctionObserver(N&& n, E&& e, C&& c)
+    : onNext_(std::forward<N>(n)),
+      onError_(std::forward<E>(e)),
+      onCompleted_(std::forward<C>(c))
+  {}
+
+  void onNext(T val) override {
+    if (onNext_) onNext_(val);
+  }
+
+  void onError(Error e) override {
+    if (onError_) onError_(e);
+  }
+
+  void onCompleted() override {
+    if (onCompleted_) onCompleted_();
+  }
+
+ protected:
+  OnNext onNext_;
+  OnError onError_;
+  OnCompleted onCompleted_;
+};
+
+}}
diff --git a/folly/experimental/wangle/rx/README b/folly/experimental/wangle/rx/README
new file mode 100644 (file)
index 0000000..ee170f3
--- /dev/null
@@ -0,0 +1,36 @@
+Rx is a pattern for "functional reactive programming" that started at
+Microsoft in C#, and has been reimplemented in various languages, notably
+RxJava for JVM languages.
+
+It is basically the plural of Futures (a la Wangle).
+
+
+                    singular              |            plural
+        +---------------------------------+-----------------------------------
+  sync  |  Foo getData()                  |  std::vector<Foo> getData()
+  async |  wangle::Future<Foo> getData()  |  wangle::Observable<Foo> getData()
+
+
+For more on Rx, I recommend these resources:
+
+Netflix blog post (RxJava): http://techblog.netflix.com/2013/02/rxjava-netflix-api.html
+Introduction to Rx eBook (C#): http://www.introtorx.com/content/v1.0.10621.0/01_WhyRx.html
+The RxJava wiki: https://github.com/Netflix/RxJava/wiki
+Netflix QCon presentation: http://www.infoq.com/presentations/netflix-functional-rx
+https://rx.codeplex.com/
+
+There are open source C++ implementations, I haven't looked at them. They
+might be the best way to go rather than writing it NIH-style. I mostly did it
+as an exercise, to think through how closely we might want to integrate
+something like this with Wangle, and to get a feel for how it works in C++.
+
+I haven't even tried to support move-only data in this version. I'm on the
+fence about the usage of shared_ptr. Subject is underdeveloped. A whole rich
+set of operations is obviously missing. I haven't decided how to handle
+subscriptions (and therefore cancellation), but I'm pretty sure C#'s
+"Disposable" is thoroughly un-C++ (opposite of RAII). So for now subscribe
+returns nothing at all and you can't cancel anything ever. The whole thing is
+probably riddled with lifetime corner case bugs that will come out like a
+swarm of angry bees as soon as someone tries an infinite sequence, or tries to
+partially observe a long sequence. I'm pretty sure subscribeOn has a bug that
+I haven't tracked down yet.
diff --git a/folly/experimental/wangle/rx/Subject.h b/folly/experimental/wangle/rx/Subject.h
new file mode 100644 (file)
index 0000000..41b59c4
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include "Observable.h"
+#include "Observer.h"
+
+namespace folly { namespace wangle {
+
+/// Subject interface. A Subject is both an Observable and an Observer. There
+/// is a default implementation of the Observer methods that just forwards the
+/// observed events to the Subject's observers.
+template <class T>
+struct Subject : public Observable<T>, public Observer<T> {
+  void onNext(T val) override {
+    for (auto& o : this->observers_)
+      o->onNext(val);
+  }
+  void onError(Error e) override {
+    for (auto& o : this->observers_)
+      o->onError(e);
+  }
+  void onCompleted() override {
+    for (auto& o : this->observers_)
+      o->onCompleted();
+  }
+};
+
+}}
diff --git a/folly/experimental/wangle/rx/Subscription.h b/folly/experimental/wangle/rx/Subscription.h
new file mode 100644 (file)
index 0000000..16406a2
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace folly { namespace wangle {
+
+// TODO
+struct Subscription {
+};
+
+}}
diff --git a/folly/experimental/wangle/rx/types.h b/folly/experimental/wangle/rx/types.h
new file mode 100644 (file)
index 0000000..54dd009
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/ExceptionWrapper.h>
+
+namespace folly { namespace wangle {
+  typedef folly::exception_wrapper Error;
+  // The wangle::Executor is basically an rx Scheduler (by design). So just
+  // alias it.
+  typedef std::shared_ptr<folly::wangle::Executor> SchedulerPtr;
+
+  template <class T> struct Observable;
+  template <class T> struct Observer;
+  template <class T> struct Subject;
+
+  template <class T> using ObservablePtr = std::shared_ptr<Observable<T>>;
+  template <class T> using ObserverPtr = std::shared_ptr<Observer<T>>;
+  template <class T> using SubjectPtr = std::shared_ptr<Subject<T>>;
+}}