2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
21 template <typename Observable, typename Traits>
22 class ObserverCreator<Observable, Traits>::Context {
24 template <typename... Args>
25 Context(Args&&... args) : observable_(std::forward<Args>(args)...) {}
29 Traits::unsubscribe(observable_);
33 void setCore(observer_detail::Core::WeakPtr coreWeak) {
34 coreWeak_ = std::move(coreWeak);
37 std::shared_ptr<const T> get() {
38 updateRequested_ = false;
43 // This mutex ensures there's no race condition between initial update()
44 // call and update() calls from the subsciption callback.
46 // Additionally it helps avoid races between two different subscription
47 // callbacks (getting new value from observable and storing it into value_
49 std::lock_guard<std::mutex> lg(updateMutex_);
52 auto newValue = Traits::get(observable_);
54 throw std::logic_error("Observable returned nullptr.");
56 value_.swap(newValue);
59 bool expected = false;
60 if (updateRequested_.compare_exchange_strong(expected, true)) {
61 if (auto core = coreWeak_.lock()) {
62 observer_detail::ObserverManager::scheduleRefreshNewVersion(
69 void subscribe(F&& callback) {
70 Traits::subscribe(observable_, std::forward<F>(callback));
74 folly::Synchronized<std::shared_ptr<const T>> value_;
75 std::atomic<bool> updateRequested_{false};
77 observer_detail::Core::WeakPtr coreWeak_;
79 Observable observable_;
81 std::mutex updateMutex_;
84 template <typename Observable, typename Traits>
85 template <typename... Args>
86 ObserverCreator<Observable, Traits>::ObserverCreator(Args&&... args)
87 : context_(std::make_shared<Context>(std::forward<Args>(args)...)) {}
89 template <typename Observable, typename Traits>
90 Observer<typename ObserverCreator<Observable, Traits>::T>
91 ObserverCreator<Observable, Traits>::getObserver()&& {
92 auto core = observer_detail::Core::create([context = context_]() {
93 return context->get();
96 context_->setCore(core);
98 context_->subscribe([contextWeak = std::weak_ptr<Context>(context_)] {
99 if (auto context = contextWeak.lock()) {
107 DCHECK(core->getVersion() > 0);
109 return Observer<T>(std::move(core));