2017
[folly.git] / folly / fibers / FiberManagerMap.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "FiberManagerMap.h"
17
18 #include <memory>
19 #include <unordered_map>
20
21 #include <folly/Synchronized.h>
22 #include <folly/ThreadLocal.h>
23
24 namespace folly {
25 namespace fibers {
26
27 namespace {
28
29 template <typename EventBaseT>
30 class EventBaseOnDestructionCallback : public EventBase::LoopCallback {
31  public:
32   explicit EventBaseOnDestructionCallback(EventBaseT& evb) : evb_(evb) {}
33   void runLoopCallback() noexcept override;
34
35  private:
36   EventBaseT& evb_;
37 };
38
39 template <typename EventBaseT>
40 class GlobalCache {
41  public:
42   static FiberManager& get(EventBaseT& evb, const FiberManager::Options& opts) {
43     return instance().getImpl(evb, opts);
44   }
45
46   static std::unique_ptr<FiberManager> erase(EventBaseT& evb) {
47     return instance().eraseImpl(evb);
48   }
49
50  private:
51   GlobalCache() {}
52
53   // Leak this intentionally. During shutdown, we may call getFiberManager,
54   // and want access to the fiber managers during that time.
55   static GlobalCache& instance() {
56     static auto ret = new GlobalCache();
57     return *ret;
58   }
59
60   FiberManager& getImpl(EventBaseT& evb, const FiberManager::Options& opts) {
61     std::lock_guard<std::mutex> lg(mutex_);
62
63     auto& fmPtrRef = map_[&evb];
64
65     if (!fmPtrRef) {
66       auto loopController = make_unique<EventBaseLoopControllerT<EventBaseT>>();
67       loopController->attachEventBase(evb);
68       evb.runOnDestruction(new EventBaseOnDestructionCallback<EventBaseT>(evb));
69
70       fmPtrRef = make_unique<FiberManager>(std::move(loopController), opts);
71     }
72
73     return *fmPtrRef;
74   }
75
76   std::unique_ptr<FiberManager> eraseImpl(EventBaseT& evb) {
77     std::lock_guard<std::mutex> lg(mutex_);
78
79     DCHECK_EQ(map_.count(&evb), 1u);
80
81     auto ret = std::move(map_[&evb]);
82     map_.erase(&evb);
83     return ret;
84   }
85
86   std::mutex mutex_;
87   std::unordered_map<EventBaseT*, std::unique_ptr<FiberManager>> map_;
88 };
89
90 constexpr size_t kEraseListMaxSize = 64;
91
92 template <typename EventBaseT>
93 class ThreadLocalCache {
94  public:
95   static FiberManager& get(EventBaseT& evb, const FiberManager::Options& opts) {
96     return instance()->getImpl(evb, opts);
97   }
98
99   static void erase(EventBaseT& evb) {
100     for (auto& localInstance : instance().accessAllThreads()) {
101       SYNCHRONIZED(info, localInstance.eraseInfo_) {
102         if (info.eraseList.size() >= kEraseListMaxSize) {
103           info.eraseAll = true;
104         } else {
105           info.eraseList.push_back(&evb);
106         }
107         localInstance.eraseRequested_ = true;
108       }
109     }
110   }
111
112  private:
113   ThreadLocalCache() {}
114
115   struct ThreadLocalCacheTag {};
116   using ThreadThreadLocalCache =
117       ThreadLocal<ThreadLocalCache, ThreadLocalCacheTag>;
118
119   // Leak this intentionally. During shutdown, we may call getFiberManager,
120   // and want access to the fiber managers during that time.
121   static ThreadThreadLocalCache& instance() {
122     static auto ret =
123         new ThreadThreadLocalCache([]() { return new ThreadLocalCache(); });
124     return *ret;
125   }
126
127   FiberManager& getImpl(EventBaseT& evb, const FiberManager::Options& opts) {
128     eraseImpl();
129
130     auto& fmPtrRef = map_[&evb];
131     if (!fmPtrRef) {
132       fmPtrRef = &GlobalCache<EventBaseT>::get(evb, opts);
133     }
134
135     DCHECK(fmPtrRef != nullptr);
136
137     return *fmPtrRef;
138   }
139
140   void eraseImpl() {
141     if (!eraseRequested_.load()) {
142       return;
143     }
144
145     SYNCHRONIZED(info, eraseInfo_) {
146       if (info.eraseAll) {
147         map_.clear();
148       } else {
149         for (auto evbPtr : info.eraseList) {
150           map_.erase(evbPtr);
151         }
152       }
153
154       info.eraseList.clear();
155       info.eraseAll = false;
156       eraseRequested_ = false;
157     }
158   }
159
160   std::unordered_map<EventBaseT*, FiberManager*> map_;
161   std::atomic<bool> eraseRequested_{false};
162
163   struct EraseInfo {
164     bool eraseAll{false};
165     std::vector<EventBaseT*> eraseList;
166   };
167
168   folly::Synchronized<EraseInfo> eraseInfo_;
169 };
170
171 template <typename EventBaseT>
172 void EventBaseOnDestructionCallback<EventBaseT>::runLoopCallback() noexcept {
173   auto fm = GlobalCache<EventBaseT>::erase(evb_);
174   DCHECK(fm.get() != nullptr);
175   ThreadLocalCache<EventBaseT>::erase(evb_);
176
177   delete this;
178 }
179
180 } // namespace
181
182 FiberManager& getFiberManager(
183     EventBase& evb,
184     const FiberManager::Options& opts) {
185   return ThreadLocalCache<EventBase>::get(evb, opts);
186 }
187
188 FiberManager& getFiberManager(
189     VirtualEventBase& evb,
190     const FiberManager::Options& opts) {
191   return ThreadLocalCache<VirtualEventBase>::get(evb, opts);
192 }
193 }
194 }