From: Andrii Grynenko Date: Fri, 5 Feb 2016 20:01:26 +0000 (-0800) Subject: Revert D2853921 X-Git-Tag: deprecate-dynamic-initializer~93 X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=commitdiff_plain;h=e0f9b5989a0cea3ae1aef97f00a3d5ca205c5e59 Revert D2853921 Summary: This was causing memory leaks. I assume that happens if FiberManager was once requested from a thread (so that thread local was initialized), but the thread was never used since then, accumulating EventBase* in the queue. Reviewed By: simonmar Differential Revision: D2906752 fb-gh-sync-id: 71ab14cb051a9cee3684a30eaf6729ef65888a52 --- diff --git a/folly/AtomicIntrusiveLinkedList.h b/folly/AtomicIntrusiveLinkedList.h deleted file mode 100644 index 87e7d5b5..00000000 --- a/folly/AtomicIntrusiveLinkedList.h +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright 2016 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include - -#include - -namespace folly { - -/** - * A very simple atomic single-linked list primitive. - * - * Usage: - * - * class MyClass { - * AtomicIntrusiveLinkedListHook hook_; - * } - * - * AtomicIntrusiveLinkedList list; - * list.insert(&a); - * list.sweep([] (MyClass* c) { doSomething(c); } - */ -template -struct AtomicIntrusiveLinkedListHook { - T* next{nullptr}; -}; - -template T::*HookMember> -class AtomicIntrusiveLinkedList { - public: - AtomicIntrusiveLinkedList() {} - AtomicIntrusiveLinkedList(const AtomicIntrusiveLinkedList&) = delete; - AtomicIntrusiveLinkedList& operator=(const AtomicIntrusiveLinkedList&) = - delete; - AtomicIntrusiveLinkedList(AtomicIntrusiveLinkedList&& other) noexcept { - *this = std::move(other); - } - AtomicIntrusiveLinkedList& operator=( - AtomicIntrusiveLinkedList&& other) noexcept { - auto tmp = other.head_.load(); - other.head_ = head_.load(); - head_ = tmp; - - return *this; - } - - /** - * Note: list must be empty on destruction. - */ - ~AtomicIntrusiveLinkedList() { DCHECK(empty()); } - - bool empty() const { return head_ == nullptr; } - - /** - * Atomically insert t at the head of the list. - * @return True if the inserted element is the only one in the list - * after the call. - */ - bool insertHead(T* t) { - DCHECK(next(t) == nullptr); - - auto oldHead = head_.load(std::memory_order_relaxed); - do { - next(t) = oldHead; - /* oldHead is updated by the call below. - - NOTE: we don't use next(t) instead of oldHead directly due to - compiler bugs (GCC prior to 4.8.3 (bug 60272), clang (bug 18899), - MSVC (bug 819819); source: - http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange */ - } while (!head_.compare_exchange_weak( - oldHead, t, std::memory_order_release, std::memory_order_relaxed)); - - return oldHead == nullptr; - } - - /** - * Repeatedly replaces the head with nullptr, - * and calls func() on the removed elements in the order from tail to head. - * Stops when the list is empty. - */ - template - void sweep(F&& func) { - while (auto head = head_.exchange(nullptr)) { - auto rhead = reverse(head); - while (rhead != nullptr) { - auto t = rhead; - rhead = next(t); - next(t) = nullptr; - func(t); - } - } - } - - private: - std::atomic head_{nullptr}; - - static T*& next(T* t) { return (t->*HookMember).next; } - - /* Reverses a linked list, returning the pointer to the new head - (old tail) */ - static T* reverse(T* head) { - T* rhead = nullptr; - while (head != nullptr) { - auto t = head; - head = next(t); - next(t) = rhead; - rhead = t; - } - return rhead; - } -}; - -} // namespace folly diff --git a/folly/AtomicLinkedList.h b/folly/AtomicLinkedList.h index ee826a7b..f24746a5 100644 --- a/folly/AtomicLinkedList.h +++ b/folly/AtomicLinkedList.h @@ -13,14 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#pragma once + +#ifndef FOLLY_ATOMIC_LINKED_LIST_H_ +#define FOLLY_ATOMIC_LINKED_LIST_H_ #include #include -#include -#include - namespace folly { /** @@ -28,59 +27,112 @@ namespace folly { * * Usage: * - * AtomicLinkedList list; - * list.insert(a); - * list.sweep([] (MyClass& c) { doSomething(c); } + * class MyClass { + * AtomicLinkedListHook hook_; + * } + * + * AtomicLinkedList list; + * list.insert(&a); + * list.sweep([] (MyClass* c) { doSomething(c); } */ - template +struct AtomicLinkedListHook { + T* next{nullptr}; +}; + +template T::* HookMember> class AtomicLinkedList { public: AtomicLinkedList() {} AtomicLinkedList(const AtomicLinkedList&) = delete; AtomicLinkedList& operator=(const AtomicLinkedList&) = delete; - AtomicLinkedList(AtomicLinkedList&& other) noexcept = default; - AtomicLinkedList& operator=(AtomicLinkedList&& other) = default; + AtomicLinkedList(AtomicLinkedList&& other) noexcept { + auto tmp = other.head_.load(); + other.head_ = head_.load(); + head_ = tmp; + } + AtomicLinkedList& operator=(AtomicLinkedList&& other) noexcept { + auto tmp = other.head_.load(); + other.head_ = head_.load(); + head_ = tmp; + return *this; + } + + /** + * Note: list must be empty on destruction. + */ ~AtomicLinkedList() { - sweep([](T&&) {}); + assert(empty()); } - bool empty() const { return list_.empty(); } + bool empty() const { + return head_ == nullptr; + } /** * Atomically insert t at the head of the list. * @return True if the inserted element is the only one in the list * after the call. */ - bool insertHead(T t) { - auto wrapper = folly::make_unique(std::move(t)); + bool insertHead(T* t) { + assert(next(t) == nullptr); + + auto oldHead = head_.load(std::memory_order_relaxed); + do { + next(t) = oldHead; + /* oldHead is updated by the call below. - return list_.insertHead(wrapper.release()); + NOTE: we don't use next(t) instead of oldHead directly due to + compiler bugs (GCC prior to 4.8.3 (bug 60272), clang (bug 18899), + MSVC (bug 819819); source: + http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange */ + } while (!head_.compare_exchange_weak(oldHead, t, + std::memory_order_release, + std::memory_order_relaxed)); + + return oldHead == nullptr; } /** - * Repeatedly pops element from head, + * Repeatedly replaces the head with nullptr, * and calls func() on the removed elements in the order from tail to head. * Stops when the list is empty. */ template void sweep(F&& func) { - list_.sweep([&](Wrapper* wrapperPtr) mutable { - std::unique_ptr wrapper(wrapperPtr); - - func(std::move(wrapper->data)); - }); + while (auto head = head_.exchange(nullptr)) { + auto rhead = reverse(head); + while (rhead != nullptr) { + auto t = rhead; + rhead = next(t); + next(t) = nullptr; + func(t); + } + } } private: - struct Wrapper { - explicit Wrapper(T&& t) : data(std::move(t)) {} + std::atomic head_{nullptr}; - AtomicIntrusiveLinkedListHook hook; - T data; - }; - AtomicIntrusiveLinkedList list_; + static T*& next(T* t) { + return (t->*HookMember).next; + } + + /* Reverses a linked list, returning the pointer to the new head + (old tail) */ + static T* reverse(T* head) { + T* rhead = nullptr; + while (head != nullptr) { + auto t = head; + head = next(t); + next(t) = rhead; + rhead = t; + } + return rhead; + } }; } // namespace folly + +#endif diff --git a/folly/Makefile.am b/folly/Makefile.am index a88acae8..1cf59337 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -31,7 +31,6 @@ nobase_follyinclude_HEADERS = \ AtomicHashArray-inl.h \ AtomicHashMap.h \ AtomicHashMap-inl.h \ - AtomicIntrusiveLinkedList.h \ AtomicLinkedList.h \ AtomicStruct.h \ AtomicUnorderedMap.h \ diff --git a/folly/experimental/fibers/Fiber.h b/folly/experimental/fibers/Fiber.h index 3c31e957..0732dd71 100644 --- a/folly/experimental/fibers/Fiber.h +++ b/folly/experimental/fibers/Fiber.h @@ -20,7 +20,7 @@ #include #include -#include +#include #include #include #include @@ -126,7 +126,7 @@ class Fiber { /** * Points to next fiber in remote ready list */ - folly::AtomicIntrusiveLinkedListHook nextRemoteReady_; + folly::AtomicLinkedListHook nextRemoteReady_; static constexpr size_t kUserBufferSize = 256; std::aligned_storage::type userBuffer_; diff --git a/folly/experimental/fibers/FiberManager.h b/folly/experimental/fibers/FiberManager.h index bce93a53..34aab306 100644 --- a/folly/experimental/fibers/FiberManager.h +++ b/folly/experimental/fibers/FiberManager.h @@ -24,7 +24,7 @@ #include #include -#include +#include #include #include #include @@ -334,7 +334,7 @@ class FiberManager : public ::folly::Executor { std::function func; std::unique_ptr localData; std::shared_ptr rcontext; - AtomicIntrusiveLinkedListHook nextRemoteTask; + AtomicLinkedListHook nextRemoteTask; }; typedef folly::IntrusiveList FiberTailQueue; @@ -441,10 +441,9 @@ class FiberManager : public ::folly::Executor { ExceptionCallback exceptionCallback_; /**< task exception callback */ - folly::AtomicIntrusiveLinkedList - remoteReadyQueue_; + folly::AtomicLinkedList remoteReadyQueue_; - folly::AtomicIntrusiveLinkedList + folly::AtomicLinkedList remoteTaskQueue_; std::shared_ptr timeoutManager_; diff --git a/folly/experimental/fibers/FiberManagerMap.cpp b/folly/experimental/fibers/FiberManagerMap.cpp index df1cce36..9e63bd51 100644 --- a/folly/experimental/fibers/FiberManagerMap.cpp +++ b/folly/experimental/fibers/FiberManagerMap.cpp @@ -15,140 +15,96 @@ */ #include "FiberManagerMap.h" +#include #include #include -#include #include namespace folly { namespace fibers { namespace { -class OnEventBaseDestructionCallback : public EventBase::LoopCallback { - public: - explicit OnEventBaseDestructionCallback(EventBase& evb) : evb_(evb) {} - void runLoopCallback() noexcept override; - - private: - EventBase& evb_; -}; - -class GlobalCache { - public: - static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) { - return instance().getImpl(evb, opts); - } - - static std::unique_ptr erase(EventBase& evb) { - return instance().eraseImpl(evb); - } - - private: - GlobalCache() {} - - // Leak this intentionally. During shutdown, we may call getFiberManager, - // and want access to the fiber managers during that time. - static GlobalCache& instance() { - static auto ret = new GlobalCache(); - return *ret; - } - - FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) { - std::lock_guard lg(mutex_); - - auto& fmPtrRef = map_[&evb]; - - if (!fmPtrRef) { - auto loopController = make_unique(); - loopController->attachEventBase(evb); - evb.runOnDestruction(new OnEventBaseDestructionCallback(evb)); - - fmPtrRef = make_unique(std::move(loopController), opts); - } - - return *fmPtrRef; - } - - std::unique_ptr eraseImpl(EventBase& evb) { - std::lock_guard lg(mutex_); +// Leak these intentionally. During shutdown, we may call getFiberManager, and +// want access to the fiber managers during that time. +class LocalFiberManagerMapTag; +typedef folly::ThreadLocal< + std::unordered_map, + LocalFiberManagerMapTag> + LocalMapType; +LocalMapType* localFiberManagerMap() { + static auto ret = new LocalMapType(); + return ret; +} - DCHECK(map_.find(&evb) != map_.end()); +typedef + std::unordered_map> + MapType; +MapType* fiberManagerMap() { + static auto ret = new MapType(); + return ret; +} - auto ret = std::move(map_[&evb]); - map_.erase(&evb); - return ret; - } +std::mutex* fiberManagerMapMutex() { + static auto ret = new std::mutex(); + return ret; +} - std::mutex mutex_; - std::unordered_map> map_; -}; -class LocalCache { +class OnEventBaseDestructionCallback : public folly::EventBase::LoopCallback { public: - static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) { - return instance()->getImpl(evb, opts); - } - - static void erase(EventBase& evb) { - for (auto& localInstance : instance().accessAllThreads()) { - localInstance.removedEvbs_.insertHead(&evb); + explicit OnEventBaseDestructionCallback(folly::EventBase& evb) + : evb_(&evb) {} + void runLoopCallback() noexcept override { + for (auto& localMap : localFiberManagerMap()->accessAllThreads()) { + localMap.erase(evb_); } - } - - private: - LocalCache() {} - - struct LocalCacheTag {}; - using ThreadLocalCache = ThreadLocal; - - // Leak this intentionally. During shutdown, we may call getFiberManager, - // and want access to the fiber managers during that time. - static ThreadLocalCache& instance() { - static auto ret = new ThreadLocalCache([]() { return new LocalCache(); }); - return *ret; - } - - FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) { - eraseImpl(); - - auto& fmPtrRef = map_[&evb]; - if (!fmPtrRef) { - fmPtrRef = &GlobalCache::get(evb, opts); + std::unique_ptr fm; + { + std::lock_guard lg(*fiberManagerMapMutex()); + auto it = fiberManagerMap()->find(evb_); + assert(it != fiberManagerMap()->end()); + fm = std::move(it->second); + fiberManagerMap()->erase(it); } - - DCHECK(fmPtrRef != nullptr); - - return *fmPtrRef; - } - - void eraseImpl() { - if (removedEvbs_.empty()) { - return; - } - - removedEvbs_.sweep([&](EventBase* evb) { map_.erase(evb); }); + assert(fm.get() != nullptr); + fm->loopUntilNoReady(); + delete this; } - - std::unordered_map map_; - AtomicLinkedList removedEvbs_; + private: + folly::EventBase* evb_; }; -void OnEventBaseDestructionCallback::runLoopCallback() noexcept { - auto fm = GlobalCache::erase(evb_); - DCHECK(fm.get() != nullptr); - LocalCache::erase(evb_); +FiberManager* getFiberManagerThreadSafe(folly::EventBase& evb, + const FiberManager::Options& opts) { + std::lock_guard lg(*fiberManagerMapMutex()); - fm->loopUntilNoReady(); + auto it = fiberManagerMap()->find(&evb); + if (LIKELY(it != fiberManagerMap()->end())) { + return it->second.get(); + } - delete this; + auto loopController = folly::make_unique(); + loopController->attachEventBase(evb); + auto fiberManager = + folly::make_unique(std::move(loopController), opts); + auto result = fiberManagerMap()->emplace(&evb, std::move(fiberManager)); + evb.runOnDestruction(new OnEventBaseDestructionCallback(evb)); + return result.first->second.get(); } } // namespace -FiberManager& getFiberManager(EventBase& evb, +FiberManager& getFiberManager(folly::EventBase& evb, const FiberManager::Options& opts) { - return LocalCache::get(evb, opts); + auto it = (*localFiberManagerMap())->find(&evb); + if (LIKELY(it != (*localFiberManagerMap())->end())) { + return *(it->second); + } + + auto fm = getFiberManagerThreadSafe(evb, opts); + (*localFiberManagerMap())->emplace(&evb, fm); + return *fm; } }} diff --git a/folly/test/AtomicLinkedListTest.cpp b/folly/test/AtomicLinkedListTest.cpp deleted file mode 100644 index b9435f75..00000000 --- a/folly/test/AtomicLinkedListTest.cpp +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Copyright 2016 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include - -#include - -class TestIntrusiveObject { - public: - explicit TestIntrusiveObject(size_t id__) : id_(id__) {} - size_t id() { return id_; } - - private: - folly::AtomicIntrusiveLinkedListHook hook_; - size_t id_; - - public: - using List = folly::AtomicIntrusiveLinkedList; -}; - -TEST(AtomicIntrusiveLinkedList, Basic) { - TestIntrusiveObject a(1), b(2), c(3); - - TestIntrusiveObject::List list; - - EXPECT_TRUE(list.empty()); - - { - EXPECT_TRUE(list.insertHead(&a)); - EXPECT_FALSE(list.insertHead(&b)); - - EXPECT_FALSE(list.empty()); - - size_t id = 0; - list.sweep( - [&](TestIntrusiveObject* obj) mutable { EXPECT_EQ(++id, obj->id()); }); - - EXPECT_TRUE(list.empty()); - } - - // Try re-inserting the same item (b) and a new item (c) - { - EXPECT_TRUE(list.insertHead(&b)); - EXPECT_FALSE(list.insertHead(&c)); - - EXPECT_FALSE(list.empty()); - - size_t id = 1; - list.sweep( - [&](TestIntrusiveObject* obj) mutable { EXPECT_EQ(++id, obj->id()); }); - - EXPECT_TRUE(list.empty()); - } - - TestIntrusiveObject::List movedList = std::move(list); -} - -TEST(AtomicIntrusiveLinkedList, Move) { - TestIntrusiveObject a(1), b(2); - - TestIntrusiveObject::List list1; - - EXPECT_TRUE(list1.insertHead(&a)); - EXPECT_FALSE(list1.insertHead(&b)); - - EXPECT_FALSE(list1.empty()); - - TestIntrusiveObject::List list2(std::move(list1)); - - EXPECT_TRUE(list1.empty()); - EXPECT_FALSE(list2.empty()); - - TestIntrusiveObject::List list3; - - EXPECT_TRUE(list3.empty()); - - list3 = std::move(list2); - - EXPECT_TRUE(list2.empty()); - EXPECT_FALSE(list3.empty()); - - size_t id = 0; - list3.sweep( - [&](TestIntrusiveObject* obj) mutable { EXPECT_EQ(++id, obj->id()); }); -} - -TEST(AtomicIntrusiveLinkedList, Stress) { - constexpr size_t kNumThreads = 32; - constexpr size_t kNumElements = 100000; - - std::vector elements; - for (size_t i = 0; i < kNumThreads * kNumElements; ++i) { - elements.emplace_back(i); - } - - TestIntrusiveObject::List list; - - std::vector threads; - for (size_t threadId = 0; threadId < kNumThreads; ++threadId) { - threads.emplace_back( - [threadId, kNumThreads, kNumElements, &list, &elements]() { - for (size_t id = 0; id < kNumElements; ++id) { - list.insertHead(&elements[threadId + kNumThreads * id]); - } - }); - } - - std::vector ids; - TestIntrusiveObject* prev{nullptr}; - - while (ids.size() < kNumThreads * kNumThreads) { - list.sweep([&](TestIntrusiveObject* current) { - ids.push_back(current->id()); - - if (prev && prev->id() % kNumThreads == current->id() % kNumThreads) { - EXPECT_EQ(prev->id() + kNumThreads, current->id()); - } - - prev = current; - }); - } - - std::sort(ids.begin(), ids.end()); - - for (size_t i = 0; i < kNumThreads * kNumElements; ++i) { - EXPECT_EQ(i, ids[i]); - } - - for (auto& thread : threads) { - thread.join(); - } -} - -class TestObject { - public: - TestObject(size_t id__, std::shared_ptr ptr) : id_(id__), ptr_(ptr) {} - - size_t id() { return id_; } - - private: - size_t id_; - std::shared_ptr ptr_; -}; - -TEST(AtomicLinkedList, Basic) { - constexpr size_t kNumElements = 10; - - using List = folly::AtomicLinkedList; - List list; - - std::shared_ptr ptr = std::make_shared(42); - - for (size_t id = 0; id < kNumElements; ++id) { - list.insertHead({id, ptr}); - } - - size_t counter = 0; - - list.sweep([&](TestObject object) { - EXPECT_EQ(counter, object.id()); - - EXPECT_EQ(1 + kNumElements - counter, ptr.use_count()); - - ++counter; - }); - - EXPECT_TRUE(ptr.unique()); -}