2017
[folly.git] / folly / experimental / observer / Observable-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 namespace folly {
19 namespace observer {
20
21 template <typename Observable, typename Traits>
22 class ObserverCreator<Observable, Traits>::Context {
23  public:
24   template <typename... Args>
25   Context(Args&&... args) : observable_(std::forward<Args>(args)...) {}
26
27   ~Context() {
28     if (value_.copy()) {
29       Traits::unsubscribe(observable_);
30     }
31   }
32
33   void setCore(observer_detail::Core::WeakPtr coreWeak) {
34     coreWeak_ = std::move(coreWeak);
35   }
36
37   std::shared_ptr<const T> get() {
38     updateRequested_ = false;
39     return value_.copy();
40   }
41
42   void update() {
43     // This mutex ensures there's no race condition between initial update()
44     // call and update() calls from the subsciption callback.
45     //
46     // Additionally it helps avoid races between two different subscription
47     // callbacks (getting new value from observable and storing it into value_
48     // is not atomic).
49     std::lock_guard<std::mutex> lg(updateMutex_);
50
51     {
52       auto newValue = Traits::get(observable_);
53       if (!newValue) {
54         throw std::logic_error("Observable returned nullptr.");
55       }
56       value_.swap(newValue);
57     }
58
59     bool expected = false;
60     if (updateRequested_.compare_exchange_strong(expected, true)) {
61       if (auto core = coreWeak_.lock()) {
62         observer_detail::ObserverManager::scheduleRefreshNewVersion(
63             std::move(core));
64       }
65     }
66   }
67
68   template <typename F>
69   void subscribe(F&& callback) {
70     Traits::subscribe(observable_, std::forward<F>(callback));
71   }
72
73  private:
74   folly::Synchronized<std::shared_ptr<const T>> value_;
75   std::atomic<bool> updateRequested_{false};
76
77   observer_detail::Core::WeakPtr coreWeak_;
78
79   Observable observable_;
80
81   std::mutex updateMutex_;
82 };
83
84 template <typename Observable, typename Traits>
85 template <typename... Args>
86 ObserverCreator<Observable, Traits>::ObserverCreator(Args&&... args)
87     : context_(std::make_shared<Context>(std::forward<Args>(args)...)) {}
88
89 template <typename Observable, typename Traits>
90 Observer<typename ObserverCreator<Observable, Traits>::T>
91 ObserverCreator<Observable, Traits>::getObserver()&& {
92   auto core = observer_detail::Core::create([context = context_]() {
93     return context->get();
94   });
95
96   context_->setCore(core);
97
98   context_->subscribe([contextWeak = std::weak_ptr<Context>(context_)] {
99     if (auto context = contextWeak.lock()) {
100       context->update();
101     }
102   });
103
104   context_->update();
105   context_.reset();
106
107   DCHECK(core->getVersion() > 0);
108
109   return Observer<T>(std::move(core));
110 }
111 }
112 }