f85d52a252c42e39734169a0f7ad79981da6338a
[folly.git] / folly / experimental / wangle / rx / Observable.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include "types.h"
20 #include "Subject.h"
21 #include "Subscription.h"
22
23 #include <folly/RWSpinLock.h>
24 #include <folly/ThreadLocal.h>
25 #include <folly/wangle/Executor.h>
26 #include <list>
27 #include <memory>
28
29 namespace folly { namespace wangle {
30
31 template <class T>
32 class Observable {
33  public:
34   Observable() = default;
35   Observable(Observable&& other) noexcept {
36     RWSpinLock::WriteHolder{&other.observersLock_};
37     observers_ = std::move(other.observers_);
38   }
39
40   virtual ~Observable() = default;
41
42   /// Subscribe the given Observer to this Observable.
43   // Eventually this will return a Subscription object of some kind, in order
44   // to support cancellation. This is kinda really important. Maybe I should
45   // just do it now, using an dummy Subscription object.
46   //
47   // If this is called within an Observer callback, the new observer will not
48   // get the current update but will get subsequent updates.
49   virtual Subscription subscribe(ObserverPtr<T> o) {
50     if (inCallback_ && *inCallback_) {
51       if (!newObservers_) {
52         newObservers_.reset(new std::list<ObserverPtr<T>>());
53       }
54       newObservers_->push_back(o);
55     } else {
56       RWSpinLock::WriteHolder{&observersLock_};
57       observers_.push_back(o);
58     }
59     return Subscription();
60   }
61
62   /// Returns a new Observable that will call back on the given Scheduler.
63   /// The returned Observable must outlive the parent Observable.
64
65   // This and subscribeOn should maybe just be a first-class feature of an
66   // Observable, rather than making new ones whose lifetimes are tied to their
67   // parents. In that case it'd return a reference to this object for
68   // chaining.
69   ObservablePtr<T> observeOn(SchedulerPtr scheduler) {
70     // you're right Hannes, if we have Observable::create we don't need this
71     // helper class.
72     struct ViaSubject : public Observable<T>
73     {
74       ViaSubject(SchedulerPtr scheduler,
75                  Observable* obs)
76         : scheduler_(scheduler), observable_(obs)
77       {}
78
79       Subscription subscribe(ObserverPtr<T> o) override {
80         return observable_->subscribe(
81           Observer<T>::create(
82             [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
83             [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); },
84             [=]() { scheduler_->add([o] { o->onCompleted(); }); }));
85       }
86
87      protected:
88       SchedulerPtr scheduler_;
89       Observable* observable_;
90     };
91
92     return std::make_shared<ViaSubject>(scheduler, this);
93   }
94
95   /// Returns a new Observable that will subscribe to this parent Observable
96   /// via the given Scheduler. This can be subtle and confusing at first, see
97   /// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn
98   std::unique_ptr<Observable> subscribeOn(SchedulerPtr scheduler) {
99     struct Subject_ : public Subject<T> {
100      public:
101       Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
102       }
103
104       Subscription subscribe(ObserverPtr<T> o) {
105         scheduler_->add([=] {
106           observable_->subscribe(o);
107         });
108         return Subscription();
109       }
110
111      protected:
112       SchedulerPtr scheduler_;
113       Observable* observable_;
114     };
115
116     return folly::make_unique<Subject_>(scheduler, this);
117   }
118
119  protected:
120   const std::list<ObserverPtr<T>>& getObservers() {
121     return observers_;
122   }
123
124   // This guard manages deferred modification of the observers list.
125   // Subclasses should use this guard if they want to subscribe new observers
126   // in the course of a callback. New observers won't be added until the guard
127   // goes out of scope. See Subject for an example.
128   class ObserversGuard {
129    public:
130     explicit ObserversGuard(Observable* o) : o_(o) {
131       if (UNLIKELY(!o_->inCallback_)) {
132         o_->inCallback_.reset(new bool{false});
133       }
134       CHECK(!(*o_->inCallback_));
135       *o_->inCallback_ = true;
136       o_->observersLock_.lock_shared();
137     }
138
139     ~ObserversGuard() {
140       o_->observersLock_.unlock_shared();
141       if (UNLIKELY(o_->newObservers_ && !o_->newObservers_->empty())) {
142         {
143           RWSpinLock::WriteHolder(o_->observersLock_);
144           for (auto& o : *(o_->newObservers_)) {
145             o_->observers_.push_back(o);
146           }
147         }
148         o_->newObservers_->clear();
149       }
150       *o_->inCallback_ = false;
151     }
152
153    private:
154     Observable* o_;
155   };
156
157  private:
158   std::list<ObserverPtr<T>> observers_;
159   RWSpinLock observersLock_;
160   folly::ThreadLocalPtr<bool> inCallback_;
161   folly::ThreadLocalPtr<std::list<ObserverPtr<T>>> newObservers_;
162 };
163
164 }}