From 143b92ee0d22b6dd70e07f4115687ff6a04e0f50 Mon Sep 17 00:00:00 2001 From: Misha Shneerson Date: Fri, 12 Jun 2015 13:55:05 -0700 Subject: [PATCH] Added fiber supported mutex to folly Summary: To be able to use fiber lock in thrift Reviewed By: @brianwatling, @yfeldblum Differential Revision: D2149936 --- folly/Makefile.am | 3 + folly/experimental/fibers/TimedMutex-inl.h | 295 +++++++++++++++++++++ folly/experimental/fibers/TimedMutex.h | 240 +++++++++++++++++ 3 files changed, 538 insertions(+) create mode 100644 folly/experimental/fibers/TimedMutex-inl.h create mode 100644 folly/experimental/fibers/TimedMutex.h diff --git a/folly/Makefile.am b/folly/Makefile.am index c7519d18..d007fc68 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -95,6 +95,9 @@ nobase_follyinclude_HEADERS = \ experimental/fibers/Promise.h \ experimental/fibers/Promise-inl.h \ experimental/fibers/SimpleLoopController.h \ + experimental/fibers/TimedMutex.h \ + experimental/fibers/TimedMutex-inl.h \ + experimental/fibers/TimeoutController.h \ experimental/fibers/TimeoutController.h \ experimental/fibers/traits.h \ experimental/fibers/WhenN.h \ diff --git a/folly/experimental/fibers/TimedMutex-inl.h b/folly/experimental/fibers/TimedMutex-inl.h new file mode 100644 index 00000000..bdc3afcc --- /dev/null +++ b/folly/experimental/fibers/TimedMutex-inl.h @@ -0,0 +1,295 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + + +namespace folly { namespace fibers { + +// +// TimedMutex implementation +// + +template +void TimedMutex::lock() { + pthread_spin_lock(&lock_); + if (!locked_) { + locked_ = true; + pthread_spin_unlock(&lock_); + return; + } + + // Delay constructing the waiter until it is actually required. + // This makes a huge difference, at least in the benchmarks, + // when the mutex isn't locked. + MutexWaiter waiter; + waiters_.push_back(waiter); + pthread_spin_unlock(&lock_); + waiter.baton.wait(); +} + +template +template +bool TimedMutex::timed_lock( + const std::chrono::duration& duration) { + pthread_spin_lock(&lock_); + if (!locked_) { + locked_ = true; + pthread_spin_unlock(&lock_); + return true; + } + + MutexWaiter waiter; + waiters_.push_back(waiter); + pthread_spin_unlock(&lock_); + + if (!waiter.baton.timed_wait(duration)) { + // We timed out. Two cases: + // 1. We're still in the waiter list and we truly timed out + // 2. We're not in the waiter list anymore. This could happen if the baton + // times out but the mutex is unlocked before we reach this code. In this + // case we'll pretend we got the lock on time. + pthread_spin_lock(&lock_); + if (waiter.hook.is_linked()) { + waiters_.erase(waiters_.iterator_to(waiter)); + pthread_spin_unlock(&lock_); + return false; + } + pthread_spin_unlock(&lock_); + } + return true; +} + +template +bool TimedMutex::try_lock() { + pthread_spin_lock(&lock_); + if (locked_) { + pthread_spin_unlock(&lock_); + return false; + } + locked_ = true; + pthread_spin_unlock(&lock_); + return true; +} + +template +void TimedMutex::unlock() { + pthread_spin_lock(&lock_); + if (waiters_.empty()) { + locked_ = false; + pthread_spin_unlock(&lock_); + return; + } + MutexWaiter& to_wake = waiters_.front(); + waiters_.pop_front(); + to_wake.baton.post(); + pthread_spin_unlock(&lock_); +} + +// +// TimedRWMutex implementation +// + +template +void TimedRWMutex::read_lock() { + pthread_spin_lock(&lock_); + if (state_ == State::WRITE_LOCKED) { + MutexWaiter waiter; + read_waiters_.push_back(waiter); + pthread_spin_unlock(&lock_); + waiter.baton.wait(); + assert(state_ == State::READ_LOCKED); + return; + } + assert((state_ == State::UNLOCKED && readers_ == 0) || + (state_ == State::READ_LOCKED && readers_ > 0)); + assert(read_waiters_.empty()); + state_ = State::READ_LOCKED; + readers_ += 1; + pthread_spin_unlock(&lock_); +} + +template +template +bool TimedRWMutex::timed_read_lock( + const std::chrono::duration& duration) { + pthread_spin_lock(&lock_); + if (state_ == State::WRITE_LOCKED) { + MutexWaiter waiter; + read_waiters_.push_back(waiter); + pthread_spin_unlock(&lock_); + + if (!waiter.baton.timed_wait(duration)) { + // We timed out. Two cases: + // 1. We're still in the waiter list and we truly timed out + // 2. We're not in the waiter list anymore. This could happen if the baton + // times out but the mutex is unlocked before we reach this code. In + // this case we'll pretend we got the lock on time. + pthread_spin_lock(&lock_); + if (waiter.hook.is_linked()) { + read_waiters_.erase(read_waiters_.iterator_to(waiter)); + pthread_spin_unlock(&lock_); + return false; + } + pthread_spin_unlock(&lock_); + } + return true; + } + assert((state_ == State::UNLOCKED && readers_ == 0) || + (state_ == State::READ_LOCKED && readers_ > 0)); + assert(read_waiters_.empty()); + state_ = State::READ_LOCKED; + readers_ += 1; + pthread_spin_unlock(&lock_); + return true; +} + +template +bool TimedRWMutex::try_read_lock() { + pthread_spin_lock(&lock_); + if (state_ != State::WRITE_LOCKED) { + assert((state_ == State::UNLOCKED && readers_ == 0) || + (state_ == State::READ_LOCKED && readers_ > 0)); + assert(read_waiters_.empty()); + state_ = State::READ_LOCKED; + readers_ += 1; + pthread_spin_unlock(&lock_); + return true; + } + pthread_spin_unlock(&lock_); + return false; +} + +template +void TimedRWMutex::write_lock() { + pthread_spin_lock(&lock_); + if (state_ == State::UNLOCKED) { + verify_unlocked_properties(); + state_ = State::WRITE_LOCKED; + pthread_spin_unlock(&lock_); + return; + } + MutexWaiter waiter; + write_waiters_.push_back(waiter); + pthread_spin_unlock(&lock_); + waiter.baton.wait(); +} + +template +template +bool TimedRWMutex::timed_write_lock( + const std::chrono::duration& duration) { + pthread_spin_lock(&lock_); + if (state_ == State::UNLOCKED) { + verify_unlocked_properties(); + state_ = State::WRITE_LOCKED; + pthread_spin_unlock(&lock_); + return true; + } + MutexWaiter waiter; + write_waiters_.push_back(waiter); + pthread_spin_unlock(&lock_); + + if (!waiter.baton.timed_wait(duration)) { + // We timed out. Two cases: + // 1. We're still in the waiter list and we truly timed out + // 2. We're not in the waiter list anymore. This could happen if the baton + // times out but the mutex is unlocked before we reach this code. In + // this case we'll pretend we got the lock on time. + pthread_spin_lock(&lock_); + if (waiter.hook.is_linked()) { + write_waiters_.erase(write_waiters_.iterator_to(waiter)); + pthread_spin_unlock(&lock_); + return false; + } + pthread_spin_unlock(&lock_); + } + assert(state_ == State::WRITE_LOCKED); + return true; +} + +template +bool TimedRWMutex::try_write_lock() { + pthread_spin_lock(&lock_); + if (state_ == State::UNLOCKED) { + verify_unlocked_properties(); + state_ = State::WRITE_LOCKED; + pthread_spin_unlock(&lock_); + return true; + } + pthread_spin_unlock(&lock_); + return false; +} + +template +void TimedRWMutex::unlock() { + pthread_spin_lock(&lock_); + assert(state_ != State::UNLOCKED); + assert((state_ == State::READ_LOCKED && readers_ > 0) || + (state_ == State::WRITE_LOCKED && readers_ == 0)); + if (state_ == State::READ_LOCKED) { + readers_ -= 1; + } + + if (!read_waiters_.empty()) { + assert(state_ == State::WRITE_LOCKED && readers_ == 0 && + "read waiters can only accumulate while write locked"); + state_ = State::READ_LOCKED; + readers_ = read_waiters_.size(); + + while (!read_waiters_.empty()) { + MutexWaiter& to_wake = read_waiters_.front(); + read_waiters_.pop_front(); + to_wake.baton.post(); + } + } else if (readers_ == 0) { + if (!write_waiters_.empty()) { + assert(read_waiters_.empty()); + state_ = State::WRITE_LOCKED; + + // Wake a single writer (after releasing the spin lock) + MutexWaiter& to_wake = write_waiters_.front(); + write_waiters_.pop_front(); + to_wake.baton.post(); + } else { + verify_unlocked_properties(); + state_ = State::UNLOCKED; + } + } else { + assert(state_ == State::READ_LOCKED); + } + pthread_spin_unlock(&lock_); +} + +template +void TimedRWMutex::downgrade() { + pthread_spin_lock(&lock_); + assert(state_ == State::WRITE_LOCKED && readers_ == 0); + state_ = State::READ_LOCKED; + readers_ += 1; + + if (!read_waiters_.empty()) { + readers_ += read_waiters_.size(); + + while (!read_waiters_.empty()) { + MutexWaiter& to_wake = read_waiters_.front(); + read_waiters_.pop_front(); + to_wake.baton.post(); + } + } + pthread_spin_unlock(&lock_); +} + +}} diff --git a/folly/experimental/fibers/TimedMutex.h b/folly/experimental/fibers/TimedMutex.h new file mode 100644 index 00000000..04f6c6db --- /dev/null +++ b/folly/experimental/fibers/TimedMutex.h @@ -0,0 +1,240 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include + +namespace folly { namespace fibers { + +/** + * @class TimedMutex + * + * Like mutex but allows timed_lock in addition to lock and try_lock. + **/ +template +class TimedMutex { + public: + TimedMutex() { + pthread_spin_init(&lock_, PTHREAD_PROCESS_PRIVATE); + } + + ~TimedMutex() { + pthread_spin_destroy(&lock_); + } + + TimedMutex(const TimedMutex& rhs) = delete; + TimedMutex& operator=(const TimedMutex& rhs) = delete; + TimedMutex(TimedMutex&& rhs) = delete; + TimedMutex& operator=(TimedMutex&& rhs) = delete; + + // Lock the mutex. The thread / fiber is blocked until the mutex is free + void lock(); + + // Lock the mutex. The thread / fiber will be blocked for a time duration. + // + // @return true if the mutex was locked, false otherwise + template + bool timed_lock( + const std::chrono::duration& duration); + + // Try to obtain lock without blocking the thread or fiber + bool try_lock(); + + // Unlock the mutex and wake up a waiter if there is one + void unlock(); + + private: + typedef boost::intrusive::list_member_hook<> MutexWaiterHookType; + + // represents a waiter waiting for the lock. The waiter waits on the + // baton until it is woken up by a post or timeout expires. + struct MutexWaiter { + BatonType baton; + MutexWaiterHookType hook; + }; + + typedef boost::intrusive::member_hook MutexWaiterHook; + + typedef boost::intrusive::list> + MutexWaiterList; + + pthread_spinlock_t lock_; //< lock to protect waiter list + bool locked_ = false; //< is this locked by some thread? + MutexWaiterList waiters_; //< list of waiters +}; + +/** + * @class TimedRWMutex + * + * A readers-writer lock which allows multiple readers to hold the + * lock simultaneously or only one writer. + * + * NOTE: This is a reader-preferred RWLock i.e. readers are give priority + * when there are both readers and writers waiting to get the lock. + **/ +template +class TimedRWMutex { + public: + TimedRWMutex() { + pthread_spin_init(&lock_, PTHREAD_PROCESS_PRIVATE); + } + + ~TimedRWMutex() { + pthread_spin_destroy(&lock_); + } + + TimedRWMutex(const TimedRWMutex& rhs) = delete; + TimedRWMutex& operator=(const TimedRWMutex& rhs) = delete; + TimedRWMutex(TimedRWMutex&& rhs) = delete; + TimedRWMutex& operator=(TimedRWMutex&& rhs) = delete; + + // Lock for shared access. The thread / fiber is blocked until the lock + // can be acquired. + void read_lock(); + + // Like read_lock except the thread /fiber is blocked for a time duration + // @return true if locked successfully, false otherwise. + template + bool timed_read_lock(const std::chrono::duration& duration); + + // Like read_lock but doesn't block the thread / fiber if the lock can't + // be acquired. + // @return true if lock was acquired, false otherwise. + bool try_read_lock(); + + // Obtain an exclusive lock. The thread / fiber is blocked until the lock + // is available. + void write_lock(); + + // Like write_lock except the thread / fiber is blocked for a time duration + // @return true if locked successfully, false otherwise. + template + bool timed_write_lock(const std::chrono::duration& duration); + + // Like write_lock but doesn't block the thread / fiber if the lock cant be + // obtained. + // @return true if lock was acquired, false otherwise. + bool try_write_lock(); + + // Wrapper for write_lock() for compatibility with Mutex + void lock() { write_lock(); } + + // Realease the lock. The thread / fiber will wake up all readers if there are + // any. If there are waiting writers then only one of them will be woken up. + // NOTE: readers are given priority over writers (see above comment) + void unlock(); + + // Downgrade the lock. The thread / fiber will wake up all readers if there + // are any. + void downgrade(); + + class ReadHolder { + public: + explicit ReadHolder(TimedRWMutex& lock) + : lock_(&lock) { + lock_->read_lock(); + } + + ~ReadHolder() { + if (lock_) { + lock_->unlock(); + } + } + + ReadHolder(const ReadHolder& rhs) = delete; + ReadHolder& operator=(const ReadHolder& rhs) = delete; + ReadHolder(ReadHolder&& rhs) = delete; + ReadHolder& operator=(ReadHolder&& rhs) = delete; + + private: + TimedRWMutex* lock_; + }; + + class WriteHolder { + public: + explicit WriteHolder(TimedRWMutex& lock) : lock_(&lock) { + lock_->write_lock(); + } + + ~WriteHolder() { + if (lock_) { + lock_->unlock(); + } + } + + WriteHolder(const WriteHolder& rhs) = delete; + WriteHolder& operator=(const WriteHolder& rhs) = delete; + WriteHolder(WriteHolder&& rhs) = delete; + WriteHolder& operator=(WriteHolder&& rhs) = delete; + + private: + TimedRWMutex* lock_; + }; + + private: + // invariants that must hold when the lock is not held by anyone + void verify_unlocked_properties() { + assert(readers_ == 0); + assert(read_waiters_.empty()); + assert(write_waiters_.empty()); + } + + // Different states the lock can be in + enum class State { + UNLOCKED, + READ_LOCKED, + WRITE_LOCKED, + }; + + typedef boost::intrusive::list_member_hook<> MutexWaiterHookType; + + // represents a waiter waiting for the lock. + struct MutexWaiter { + BatonType baton; + MutexWaiterHookType hook; + }; + + typedef boost::intrusive::member_hook MutexWaiterHook; + + typedef boost::intrusive::list> + MutexWaiterList; + + pthread_spinlock_t lock_; //< lock protecting the internal state + // (state_, read_waiters_, etc.) + State state_ = State::UNLOCKED; + + uint32_t readers_ = 0; //< Number of readers who have the lock + + MutexWaiterList write_waiters_; //< List of thread / fibers waiting for + // exclusive access + + MutexWaiterList read_waiters_; //< List of thread / fibers waiting for + // shared access +}; + +}} + +#include "TimedMutex-inl.h" -- 2.34.1