Fix a race in Observable context destruction
[folly.git] / folly / experimental / observer / Observable-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 namespace folly {
19 namespace observer {
20
21 template <typename Observable, typename Traits>
22 class ObserverCreator<Observable, Traits>::Context {
23  public:
24   template <typename... Args>
25   Context(Args&&... args) : observable_(std::forward<Args>(args)...) {
26     updateValue();
27   }
28
29   ~Context() {
30     if (value_.copy()) {
31       Traits::unsubscribe(observable_);
32     }
33   }
34
35   void setCore(observer_detail::Core::WeakPtr coreWeak) {
36     coreWeak_ = std::move(coreWeak);
37   }
38
39   std::shared_ptr<const T> get() {
40     updateRequested_ = false;
41     return value_.copy();
42   }
43
44   void update() {
45     // This mutex ensures there's no race condition between initial update()
46     // call and update() calls from the subsciption callback.
47     //
48     // Additionally it helps avoid races between two different subscription
49     // callbacks (getting new value from observable and storing it into value_
50     // is not atomic).
51     std::lock_guard<std::mutex> lg(updateMutex_);
52     updateValue();
53
54     bool expected = false;
55     if (updateRequested_.compare_exchange_strong(expected, true)) {
56       observer_detail::ObserverManager::scheduleRefreshNewVersion(coreWeak_);
57     }
58   }
59
60   template <typename F>
61   void subscribe(F&& callback) {
62     Traits::subscribe(observable_, std::forward<F>(callback));
63   }
64
65  private:
66   void updateValue() {
67     auto newValue = Traits::get(observable_);
68     if (!newValue) {
69       throw std::logic_error("Observable returned nullptr.");
70     }
71     value_.swap(newValue);
72   }
73
74   folly::Synchronized<std::shared_ptr<const T>> value_;
75   std::atomic<bool> updateRequested_{false};
76
77   observer_detail::Core::WeakPtr coreWeak_;
78
79   Observable observable_;
80
81   std::mutex updateMutex_;
82 };
83
84 template <typename Observable, typename Traits>
85 template <typename... Args>
86 ObserverCreator<Observable, Traits>::ObserverCreator(Args&&... args)
87     : context_(std::make_shared<Context>(std::forward<Args>(args)...)) {}
88
89 template <typename Observable, typename Traits>
90 Observer<typename ObserverCreator<Observable, Traits>::T>
91 ObserverCreator<Observable, Traits>::getObserver()&& {
92   // This master shared_ptr allows grabbing derived weak_ptrs, pointing to the
93   // the same Context object, but using a separate reference count. Master
94   // shared_ptr destructor then blocks until all shared_ptrs obtained from
95   // derived weak_ptrs are released.
96   class ContextMasterPointer {
97    public:
98     explicit ContextMasterPointer(std::shared_ptr<Context> context)
99         : contextMaster_(std::move(context)),
100           context_(
101               contextMaster_.get(),
102               [destroyBaton = destroyBaton_](Context*) {
103                 destroyBaton->post();
104               }) {}
105     ~ContextMasterPointer() {
106       if (context_) {
107         context_.reset();
108         destroyBaton_->wait();
109       }
110     }
111     ContextMasterPointer(const ContextMasterPointer&) = delete;
112     ContextMasterPointer(ContextMasterPointer&&) = default;
113     ContextMasterPointer& operator=(const ContextMasterPointer&) = delete;
114     ContextMasterPointer& operator=(ContextMasterPointer&&) = default;
115
116     Context* operator->() const {
117       return contextMaster_.get();
118     }
119
120     std::weak_ptr<Context> get_weak() {
121       return context_;
122     }
123
124    private:
125     std::shared_ptr<folly::Baton<>> destroyBaton_{
126         std::make_shared<folly::Baton<>>()};
127     std::shared_ptr<Context> contextMaster_;
128     std::shared_ptr<Context> context_;
129   };
130   // We want to make sure that Context can only be destroyed when Core is
131   // destroyed. So we have to avoid the situation when subscribe callback is
132   // locking Context shared_ptr and remains the last to release it.
133   // We solve this by having Core hold the master shared_ptr and subscription
134   // callback gets derived weak_ptr.
135   ContextMasterPointer contextMaster(context_);
136   auto contextWeak = contextMaster.get_weak();
137   auto observer = makeObserver([context = std::move(contextMaster)]() {
138     return context->get();
139   });
140
141   context_->setCore(observer.core_);
142   context_->subscribe([contextWeak = std::move(contextWeak)] {
143     if (auto context = contextWeak.lock()) {
144       context->update();
145     }
146   });
147
148   // Do an extra update in case observable was updated between observer creation
149   // and setting updates callback.
150   context_->update();
151   context_.reset();
152
153   return observer;
154 }
155 }
156 }