Fix Observable to not trigger unneccessary refresh if the value didn't change
[folly.git] / folly / experimental / observer / Observable-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 namespace folly {
19 namespace observer {
20
21 template <typename Observable, typename Traits>
22 class ObserverCreator<Observable, Traits>::Context {
23  public:
24   template <typename... Args>
25   Context(Args&&... args) : observable_(std::forward<Args>(args)...) {
26     updateValue();
27   }
28
29   ~Context() {
30     if (value_.copy()) {
31       Traits::unsubscribe(observable_);
32     }
33   }
34
35   void setCore(observer_detail::Core::WeakPtr coreWeak) {
36     coreWeak_ = std::move(coreWeak);
37   }
38
39   std::shared_ptr<const T> get() {
40     updateRequested_ = false;
41     return value_.copy();
42   }
43
44   void update() {
45     // This mutex ensures there's no race condition between initial update()
46     // call and update() calls from the subsciption callback.
47     //
48     // Additionally it helps avoid races between two different subscription
49     // callbacks (getting new value from observable and storing it into value_
50     // is not atomic).
51     std::lock_guard<std::mutex> lg(updateMutex_);
52     if (!updateValue()) {
53       // Value didn't change, so we can skip the version update.
54       return;
55     }
56
57     bool expected = false;
58     if (updateRequested_.compare_exchange_strong(expected, true)) {
59       observer_detail::ObserverManager::scheduleRefreshNewVersion(coreWeak_);
60     }
61   }
62
63   template <typename F>
64   void subscribe(F&& callback) {
65     Traits::subscribe(observable_, std::forward<F>(callback));
66   }
67
68  private:
69   bool updateValue() {
70     auto newValue = Traits::get(observable_);
71     auto newValuePtr = newValue.get();
72     if (!newValue) {
73       throw std::logic_error("Observable returned nullptr.");
74     }
75     value_.swap(newValue);
76     return newValuePtr != newValue.get();
77   }
78
79   folly::Synchronized<std::shared_ptr<const T>> value_;
80   std::atomic<bool> updateRequested_{false};
81
82   observer_detail::Core::WeakPtr coreWeak_;
83
84   Observable observable_;
85
86   std::mutex updateMutex_;
87 };
88
89 template <typename Observable, typename Traits>
90 template <typename... Args>
91 ObserverCreator<Observable, Traits>::ObserverCreator(Args&&... args)
92     : context_(std::make_shared<Context>(std::forward<Args>(args)...)) {}
93
94 template <typename Observable, typename Traits>
95 Observer<typename ObserverCreator<Observable, Traits>::T>
96 ObserverCreator<Observable, Traits>::getObserver()&& {
97   // This master shared_ptr allows grabbing derived weak_ptrs, pointing to the
98   // the same Context object, but using a separate reference count. Master
99   // shared_ptr destructor then blocks until all shared_ptrs obtained from
100   // derived weak_ptrs are released.
101   class ContextMasterPointer {
102    public:
103     explicit ContextMasterPointer(std::shared_ptr<Context> context)
104         : contextMaster_(std::move(context)),
105           context_(
106               contextMaster_.get(),
107               [destroyBaton = destroyBaton_](Context*) {
108                 destroyBaton->post();
109               }) {}
110     ~ContextMasterPointer() {
111       if (context_) {
112         context_.reset();
113         destroyBaton_->wait();
114       }
115     }
116     ContextMasterPointer(const ContextMasterPointer&) = delete;
117     ContextMasterPointer(ContextMasterPointer&&) = default;
118     ContextMasterPointer& operator=(const ContextMasterPointer&) = delete;
119     ContextMasterPointer& operator=(ContextMasterPointer&&) = default;
120
121     Context* operator->() const {
122       return contextMaster_.get();
123     }
124
125     std::weak_ptr<Context> get_weak() {
126       return context_;
127     }
128
129    private:
130     std::shared_ptr<folly::Baton<>> destroyBaton_{
131         std::make_shared<folly::Baton<>>()};
132     std::shared_ptr<Context> contextMaster_;
133     std::shared_ptr<Context> context_;
134   };
135   // We want to make sure that Context can only be destroyed when Core is
136   // destroyed. So we have to avoid the situation when subscribe callback is
137   // locking Context shared_ptr and remains the last to release it.
138   // We solve this by having Core hold the master shared_ptr and subscription
139   // callback gets derived weak_ptr.
140   ContextMasterPointer contextMaster(context_);
141   auto contextWeak = contextMaster.get_weak();
142   auto observer = makeObserver([context = std::move(contextMaster)]() {
143     return context->get();
144   });
145
146   context_->setCore(observer.core_);
147   context_->subscribe([contextWeak = std::move(contextWeak)] {
148     if (auto context = contextWeak.lock()) {
149       context->update();
150     }
151   });
152
153   // Do an extra update in case observable was updated between observer creation
154   // and setting updates callback.
155   context_->update();
156   context_.reset();
157
158   return observer;
159 }
160 }
161 }