Add missing include for flock()
[folly.git] / folly / wangle / rx / Observable.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/wangle/rx/types.h> // must come first
20 #include <folly/wangle/rx/Subject.h>
21 #include <folly/wangle/rx/Subscription.h>
22
23 #include <folly/RWSpinLock.h>
24 #include <folly/SmallLocks.h>
25 #include <folly/ThreadLocal.h>
26 #include <folly/small_vector.h>
27 #include <folly/Executor.h>
28 #include <folly/Memory.h>
29 #include <map>
30 #include <memory>
31
32 namespace folly { namespace wangle {
33
34 template <class T, size_t InlineObservers>
35 class Observable {
36  public:
37   Observable() : nextSubscriptionId_{1} {}
38
39   // TODO perhaps we want to provide this #5283229
40   Observable(Observable&& other) = delete;
41
42   virtual ~Observable() {
43     if (unsubscriber_) {
44       unsubscriber_->disable();
45     }
46   }
47
48   // The next three methods subscribe the given Observer to this Observable.
49   //
50   // If these are called within an Observer callback, the new observer will not
51   // get the current update but will get subsequent updates.
52   //
53   // subscribe() returns a Subscription object. The observer will continue to
54   // get updates until the Subscription is destroyed.
55   //
56   // observe(ObserverPtr<T>) creates an indefinite subscription
57   //
58   // observe(Observer<T>*) also creates an indefinite subscription, but the
59   // caller is responsible for ensuring that the given Observer outlives this
60   // Observable. This might be useful in high performance environments where
61   // allocations must be kept to a minimum. Template parameter InlineObservers
62   // specifies how many observers can been subscribed inline without any
63   // allocations (it's just the size of a folly::small_vector).
64   virtual Subscription<T> subscribe(ObserverPtr<T> observer) {
65     return subscribeImpl(observer, false);
66   }
67
68   virtual void observe(ObserverPtr<T> observer) {
69     subscribeImpl(observer, true);
70   }
71
72   virtual void observe(Observer<T>* observer) {
73     if (inCallback_ && *inCallback_) {
74       if (!newObservers_) {
75         newObservers_.reset(new ObserverList());
76       }
77       newObservers_->push_back(observer);
78     } else {
79       RWSpinLock::WriteHolder{&observersLock_};
80       observers_.push_back(observer);
81     }
82   }
83
84   // TODO unobserve(ObserverPtr<T>), unobserve(Observer<T>*)
85
86   /// Returns a new Observable that will call back on the given Scheduler.
87   /// The returned Observable must outlive the parent Observable.
88
89   // This and subscribeOn should maybe just be a first-class feature of an
90   // Observable, rather than making new ones whose lifetimes are tied to their
91   // parents. In that case it'd return a reference to this object for
92   // chaining.
93   ObservablePtr<T> observeOn(SchedulerPtr scheduler) {
94     // you're right Hannes, if we have Observable::create we don't need this
95     // helper class.
96     struct ViaSubject : public Observable<T>
97     {
98       ViaSubject(SchedulerPtr sched,
99                  Observable* obs)
100         : scheduler_(sched), observable_(obs)
101       {}
102
103       Subscription<T> subscribe(ObserverPtr<T> o) override {
104         return observable_->subscribe(
105           Observer<T>::create(
106             [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
107             [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); },
108             [=]() { scheduler_->add([o] { o->onCompleted(); }); }));
109       }
110
111      protected:
112       SchedulerPtr scheduler_;
113       Observable* observable_;
114     };
115
116     return std::make_shared<ViaSubject>(scheduler, this);
117   }
118
119   /// Returns a new Observable that will subscribe to this parent Observable
120   /// via the given Scheduler. This can be subtle and confusing at first, see
121   /// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn
122   std::unique_ptr<Observable> subscribeOn(SchedulerPtr scheduler) {
123     struct Subject_ : public Subject<T> {
124      public:
125       Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
126       }
127
128       Subscription<T> subscribe(ObserverPtr<T> o) {
129         scheduler_->add([=] {
130           observable_->subscribe(o);
131         });
132         return Subscription<T>(nullptr, 0); // TODO
133       }
134
135      protected:
136       SchedulerPtr scheduler_;
137       Observable* observable_;
138     };
139
140     return folly::make_unique<Subject_>(scheduler, this);
141   }
142
143  protected:
144   // Safely execute an operation on each observer. F must take a single
145   // Observer<T>* as its argument.
146   template <class F>
147   void forEachObserver(F f) {
148     if (UNLIKELY(!inCallback_)) {
149       inCallback_.reset(new bool{false});
150     }
151     CHECK(!(*inCallback_));
152     *inCallback_ = true;
153
154     {
155       RWSpinLock::ReadHolder rh(observersLock_);
156       for (auto o : observers_) {
157         f(o);
158       }
159
160       for (auto& kv : subscribers_) {
161         f(kv.second.get());
162       }
163     }
164
165     if (UNLIKELY((newObservers_ && !newObservers_->empty()) ||
166                  (newSubscribers_ && !newSubscribers_->empty()) ||
167                  (oldSubscribers_ && !oldSubscribers_->empty()))) {
168       {
169         RWSpinLock::WriteHolder wh(observersLock_);
170         if (newObservers_) {
171           for (auto observer : *(newObservers_)) {
172             observers_.push_back(observer);
173           }
174           newObservers_->clear();
175         }
176         if (newSubscribers_) {
177           for (auto& kv : *(newSubscribers_)) {
178             subscribers_.insert(std::move(kv));
179           }
180           newSubscribers_->clear();
181         }
182         if (oldSubscribers_) {
183           for (auto id : *(oldSubscribers_)) {
184             subscribers_.erase(id);
185           }
186           oldSubscribers_->clear();
187         }
188       }
189     }
190     *inCallback_ = false;
191   }
192
193  private:
194   Subscription<T> subscribeImpl(ObserverPtr<T> observer, bool indefinite) {
195     auto subscription = makeSubscription(indefinite);
196     typename SubscriberMap::value_type kv{subscription.id_, std::move(observer)};
197     if (inCallback_ && *inCallback_) {
198       if (!newSubscribers_) {
199         newSubscribers_.reset(new SubscriberMap());
200       }
201       newSubscribers_->insert(std::move(kv));
202     } else {
203       RWSpinLock::WriteHolder{&observersLock_};
204       subscribers_.insert(std::move(kv));
205     }
206     return subscription;
207   }
208
209   class Unsubscriber {
210    public:
211     explicit Unsubscriber(Observable* observable) : observable_(observable) {
212       CHECK(observable_);
213     }
214
215     void unsubscribe(uint64_t id) {
216       CHECK(id > 0);
217       RWSpinLock::ReadHolder guard(lock_);
218       if (observable_) {
219         observable_->unsubscribe(id);
220       }
221     }
222
223     void disable() {
224       RWSpinLock::WriteHolder guard(lock_);
225       observable_ = nullptr;
226     }
227
228    private:
229     RWSpinLock lock_;
230     Observable* observable_;
231   };
232
233   std::shared_ptr<Unsubscriber> unsubscriber_{nullptr};
234   MicroSpinLock unsubscriberLock_{0};
235
236   friend class Subscription<T>;
237
238   void unsubscribe(uint64_t id) {
239     if (inCallback_ && *inCallback_) {
240       if (!oldSubscribers_) {
241         oldSubscribers_.reset(new std::vector<uint64_t>());
242       }
243       if (newSubscribers_) {
244         auto it = newSubscribers_->find(id);
245         if (it != newSubscribers_->end()) {
246           newSubscribers_->erase(it);
247           return;
248         }
249       }
250       oldSubscribers_->push_back(id);
251     } else {
252       RWSpinLock::WriteHolder{&observersLock_};
253       subscribers_.erase(id);
254     }
255   }
256
257   Subscription<T> makeSubscription(bool indefinite) {
258     if (indefinite) {
259       return Subscription<T>(nullptr, nextSubscriptionId_++);
260     } else {
261       if (!unsubscriber_) {
262         std::lock_guard<MicroSpinLock> guard(unsubscriberLock_);
263         if (!unsubscriber_) {
264           unsubscriber_ = std::make_shared<Unsubscriber>(this);
265         }
266       }
267       return Subscription<T>(unsubscriber_, nextSubscriptionId_++);
268     }
269   }
270
271   std::atomic<uint64_t> nextSubscriptionId_;
272   RWSpinLock observersLock_;
273   folly::ThreadLocalPtr<bool> inCallback_;
274
275   typedef folly::small_vector<Observer<T>*, InlineObservers> ObserverList;
276   ObserverList observers_;
277   folly::ThreadLocalPtr<ObserverList> newObservers_;
278
279   typedef std::map<uint64_t, ObserverPtr<T>> SubscriberMap;
280   SubscriberMap subscribers_;
281   folly::ThreadLocalPtr<SubscriberMap> newSubscribers_;
282   folly::ThreadLocalPtr<std::vector<uint64_t>> oldSubscribers_;
283 };
284
285 }}