Added fiber supported mutex to folly
authorMisha Shneerson <mshneer@fb.com>
Fri, 12 Jun 2015 20:55:05 +0000 (13:55 -0700)
committerSara Golemon <sgolemon@fb.com>
Fri, 12 Jun 2015 21:54:34 +0000 (14:54 -0700)
Summary: To be able to use fiber lock in thrift

Reviewed By: @brianwatling, @yfeldblum

Differential Revision: D2149936

folly/Makefile.am
folly/experimental/fibers/TimedMutex-inl.h [new file with mode: 0644]
folly/experimental/fibers/TimedMutex.h [new file with mode: 0644]

index c7519d181dc6120f65715cc9f39bef6eb2d00bd3..d007fc686df887644eed507d0f1861939fe767ee 100644 (file)
@@ -95,6 +95,9 @@ nobase_follyinclude_HEADERS = \
        experimental/fibers/Promise.h \
        experimental/fibers/Promise-inl.h \
        experimental/fibers/SimpleLoopController.h \
+       experimental/fibers/TimedMutex.h \
+       experimental/fibers/TimedMutex-inl.h \
+       experimental/fibers/TimeoutController.h \
        experimental/fibers/TimeoutController.h \
        experimental/fibers/traits.h \
        experimental/fibers/WhenN.h \
diff --git a/folly/experimental/fibers/TimedMutex-inl.h b/folly/experimental/fibers/TimedMutex-inl.h
new file mode 100644 (file)
index 0000000..bdc3afc
--- /dev/null
@@ -0,0 +1,295 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+
+namespace folly { namespace fibers {
+
+//
+// TimedMutex implementation
+//
+
+template <typename BatonType>
+void TimedMutex<BatonType>::lock() {
+  pthread_spin_lock(&lock_);
+  if (!locked_) {
+    locked_ = true;
+    pthread_spin_unlock(&lock_);
+    return;
+  }
+
+  // Delay constructing the waiter until it is actually required.
+  // This makes a huge difference, at least in the benchmarks,
+  // when the mutex isn't locked.
+  MutexWaiter waiter;
+  waiters_.push_back(waiter);
+  pthread_spin_unlock(&lock_);
+  waiter.baton.wait();
+}
+
+template <typename BatonType>
+template <typename Rep, typename Period>
+bool TimedMutex<BatonType>::timed_lock(
+    const std::chrono::duration<Rep, Period>& duration) {
+  pthread_spin_lock(&lock_);
+  if (!locked_) {
+    locked_ = true;
+    pthread_spin_unlock(&lock_);
+    return true;
+  }
+
+  MutexWaiter waiter;
+  waiters_.push_back(waiter);
+  pthread_spin_unlock(&lock_);
+
+  if (!waiter.baton.timed_wait(duration)) {
+    // We timed out. Two cases:
+    // 1. We're still in the waiter list and we truly timed out
+    // 2. We're not in the waiter list anymore. This could happen if the baton
+    //    times out but the mutex is unlocked before we reach this code. In this
+    //    case we'll pretend we got the lock on time.
+    pthread_spin_lock(&lock_);
+    if (waiter.hook.is_linked()) {
+      waiters_.erase(waiters_.iterator_to(waiter));
+      pthread_spin_unlock(&lock_);
+      return false;
+    }
+    pthread_spin_unlock(&lock_);
+  }
+  return true;
+}
+
+template <typename BatonType>
+bool TimedMutex<BatonType>::try_lock() {
+  pthread_spin_lock(&lock_);
+  if (locked_) {
+    pthread_spin_unlock(&lock_);
+    return false;
+  }
+  locked_ = true;
+  pthread_spin_unlock(&lock_);
+  return true;
+}
+
+template <typename BatonType>
+void TimedMutex<BatonType>::unlock() {
+  pthread_spin_lock(&lock_);
+  if (waiters_.empty()) {
+    locked_ = false;
+    pthread_spin_unlock(&lock_);
+    return;
+  }
+  MutexWaiter& to_wake = waiters_.front();
+  waiters_.pop_front();
+  to_wake.baton.post();
+  pthread_spin_unlock(&lock_);
+}
+
+//
+// TimedRWMutex implementation
+//
+
+template <typename BatonType>
+void TimedRWMutex<BatonType>::read_lock() {
+  pthread_spin_lock(&lock_);
+  if (state_ == State::WRITE_LOCKED) {
+    MutexWaiter waiter;
+    read_waiters_.push_back(waiter);
+    pthread_spin_unlock(&lock_);
+    waiter.baton.wait();
+    assert(state_ == State::READ_LOCKED);
+    return;
+  }
+  assert((state_ == State::UNLOCKED && readers_ == 0) ||
+         (state_ == State::READ_LOCKED && readers_ > 0));
+  assert(read_waiters_.empty());
+  state_ = State::READ_LOCKED;
+  readers_ += 1;
+  pthread_spin_unlock(&lock_);
+}
+
+template <typename BatonType>
+template <typename Rep, typename Period>
+bool TimedRWMutex<BatonType>::timed_read_lock(
+    const std::chrono::duration<Rep, Period>& duration) {
+  pthread_spin_lock(&lock_);
+  if (state_ == State::WRITE_LOCKED) {
+    MutexWaiter waiter;
+    read_waiters_.push_back(waiter);
+    pthread_spin_unlock(&lock_);
+
+    if (!waiter.baton.timed_wait(duration)) {
+      // We timed out. Two cases:
+      // 1. We're still in the waiter list and we truly timed out
+      // 2. We're not in the waiter list anymore. This could happen if the baton
+      //    times out but the mutex is unlocked before we reach this code. In
+      //    this case we'll pretend we got the lock on time.
+      pthread_spin_lock(&lock_);
+      if (waiter.hook.is_linked()) {
+        read_waiters_.erase(read_waiters_.iterator_to(waiter));
+        pthread_spin_unlock(&lock_);
+        return false;
+      }
+      pthread_spin_unlock(&lock_);
+    }
+    return true;
+  }
+  assert((state_ == State::UNLOCKED && readers_ == 0) ||
+         (state_ == State::READ_LOCKED && readers_ > 0));
+  assert(read_waiters_.empty());
+  state_ = State::READ_LOCKED;
+  readers_ += 1;
+  pthread_spin_unlock(&lock_);
+  return true;
+}
+
+template <typename BatonType>
+bool TimedRWMutex<BatonType>::try_read_lock() {
+  pthread_spin_lock(&lock_);
+  if (state_ != State::WRITE_LOCKED) {
+    assert((state_ == State::UNLOCKED && readers_ == 0) ||
+           (state_ == State::READ_LOCKED && readers_ > 0));
+    assert(read_waiters_.empty());
+    state_ = State::READ_LOCKED;
+    readers_ += 1;
+    pthread_spin_unlock(&lock_);
+    return true;
+  }
+  pthread_spin_unlock(&lock_);
+  return false;
+}
+
+template <typename BatonType>
+void TimedRWMutex<BatonType>::write_lock() {
+  pthread_spin_lock(&lock_);
+  if (state_ == State::UNLOCKED) {
+    verify_unlocked_properties();
+    state_ = State::WRITE_LOCKED;
+    pthread_spin_unlock(&lock_);
+    return;
+  }
+  MutexWaiter waiter;
+  write_waiters_.push_back(waiter);
+  pthread_spin_unlock(&lock_);
+  waiter.baton.wait();
+}
+
+template <typename BatonType>
+template <typename Rep, typename Period>
+bool TimedRWMutex<BatonType>::timed_write_lock(
+    const std::chrono::duration<Rep, Period>& duration) {
+  pthread_spin_lock(&lock_);
+  if (state_ == State::UNLOCKED) {
+    verify_unlocked_properties();
+    state_ = State::WRITE_LOCKED;
+    pthread_spin_unlock(&lock_);
+    return true;
+  }
+  MutexWaiter waiter;
+  write_waiters_.push_back(waiter);
+  pthread_spin_unlock(&lock_);
+
+  if (!waiter.baton.timed_wait(duration)) {
+    // We timed out. Two cases:
+    // 1. We're still in the waiter list and we truly timed out
+    // 2. We're not in the waiter list anymore. This could happen if the baton
+    //    times out but the mutex is unlocked before we reach this code. In
+    //    this case we'll pretend we got the lock on time.
+    pthread_spin_lock(&lock_);
+    if (waiter.hook.is_linked()) {
+      write_waiters_.erase(write_waiters_.iterator_to(waiter));
+      pthread_spin_unlock(&lock_);
+      return false;
+    }
+    pthread_spin_unlock(&lock_);
+  }
+  assert(state_ == State::WRITE_LOCKED);
+  return true;
+}
+
+template <typename BatonType>
+bool TimedRWMutex<BatonType>::try_write_lock() {
+  pthread_spin_lock(&lock_);
+  if (state_ == State::UNLOCKED) {
+    verify_unlocked_properties();
+    state_ = State::WRITE_LOCKED;
+    pthread_spin_unlock(&lock_);
+    return true;
+  }
+  pthread_spin_unlock(&lock_);
+  return false;
+}
+
+template <typename BatonType>
+void TimedRWMutex<BatonType>::unlock() {
+  pthread_spin_lock(&lock_);
+  assert(state_ != State::UNLOCKED);
+  assert((state_ == State::READ_LOCKED && readers_ > 0) ||
+         (state_ == State::WRITE_LOCKED && readers_ == 0));
+  if (state_ == State::READ_LOCKED) {
+    readers_ -= 1;
+  }
+
+  if (!read_waiters_.empty()) {
+    assert(state_ == State::WRITE_LOCKED && readers_ == 0 &&
+           "read waiters can only accumulate while write locked");
+    state_ = State::READ_LOCKED;
+    readers_ = read_waiters_.size();
+
+    while (!read_waiters_.empty()) {
+      MutexWaiter& to_wake = read_waiters_.front();
+      read_waiters_.pop_front();
+      to_wake.baton.post();
+    }
+  } else if (readers_ == 0) {
+    if (!write_waiters_.empty()) {
+      assert(read_waiters_.empty());
+      state_ = State::WRITE_LOCKED;
+
+      // Wake a single writer (after releasing the spin lock)
+      MutexWaiter& to_wake = write_waiters_.front();
+      write_waiters_.pop_front();
+      to_wake.baton.post();
+    } else {
+      verify_unlocked_properties();
+      state_ = State::UNLOCKED;
+    }
+  } else {
+    assert(state_ == State::READ_LOCKED);
+  }
+  pthread_spin_unlock(&lock_);
+}
+
+template <typename BatonType>
+void TimedRWMutex<BatonType>::downgrade() {
+  pthread_spin_lock(&lock_);
+  assert(state_ == State::WRITE_LOCKED && readers_ == 0);
+  state_ = State::READ_LOCKED;
+  readers_ += 1;
+
+  if (!read_waiters_.empty()) {
+    readers_ += read_waiters_.size();
+
+    while (!read_waiters_.empty()) {
+      MutexWaiter& to_wake = read_waiters_.front();
+      read_waiters_.pop_front();
+      to_wake.baton.post();
+    }
+  }
+  pthread_spin_unlock(&lock_);
+}
+
+}}
diff --git a/folly/experimental/fibers/TimedMutex.h b/folly/experimental/fibers/TimedMutex.h
new file mode 100644 (file)
index 0000000..04f6c6d
--- /dev/null
@@ -0,0 +1,240 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <pthread.h>
+
+#include <folly/experimental/fibers/GenericBaton.h>
+
+namespace folly { namespace fibers {
+
+/**
+ * @class TimedMutex
+ *
+ * Like mutex but allows timed_lock in addition to lock and try_lock.
+ **/
+template <typename BatonType>
+class TimedMutex {
+ public:
+  TimedMutex() {
+    pthread_spin_init(&lock_, PTHREAD_PROCESS_PRIVATE);
+  }
+
+  ~TimedMutex() {
+    pthread_spin_destroy(&lock_);
+  }
+
+  TimedMutex(const TimedMutex& rhs) = delete;
+  TimedMutex& operator=(const TimedMutex& rhs) = delete;
+  TimedMutex(TimedMutex&& rhs) = delete;
+  TimedMutex& operator=(TimedMutex&& rhs) = delete;
+
+  // Lock the mutex. The thread / fiber is blocked until the mutex is free
+  void lock();
+
+  // Lock the mutex. The thread / fiber will be blocked for a time duration.
+  //
+  // @return        true if the mutex was locked, false otherwise
+  template <typename Rep, typename Period>
+  bool timed_lock(
+      const std::chrono::duration<Rep, Period>& duration);
+
+  // Try to obtain lock without blocking the thread or fiber
+  bool try_lock();
+
+  // Unlock the mutex and wake up a waiter if there is one
+  void unlock();
+
+ private:
+  typedef boost::intrusive::list_member_hook<> MutexWaiterHookType;
+
+  // represents a waiter waiting for the lock. The waiter waits on the
+  // baton until it is woken up by a post or timeout expires.
+  struct MutexWaiter {
+    BatonType baton;
+    MutexWaiterHookType hook;
+  };
+
+  typedef boost::intrusive::member_hook<MutexWaiter,
+                                        MutexWaiterHookType,
+                                        &MutexWaiter::hook> MutexWaiterHook;
+
+  typedef boost::intrusive::list<MutexWaiter,
+                                 MutexWaiterHook,
+                                 boost::intrusive::constant_time_size<true>>
+  MutexWaiterList;
+
+  pthread_spinlock_t lock_;         //< lock to protect waiter list
+  bool locked_ = false;             //< is this locked by some thread?
+  MutexWaiterList waiters_;         //< list of waiters
+};
+
+/**
+ * @class TimedRWMutex
+ *
+ * A readers-writer lock which allows multiple readers to hold the
+ * lock simultaneously or only one writer.
+ *
+ * NOTE: This is a reader-preferred RWLock i.e. readers are give priority
+ * when there are both readers and writers waiting to get the lock.
+ **/
+template <typename BatonType>
+class TimedRWMutex {
+ public:
+  TimedRWMutex() {
+    pthread_spin_init(&lock_, PTHREAD_PROCESS_PRIVATE);
+  }
+
+  ~TimedRWMutex() {
+    pthread_spin_destroy(&lock_);
+  }
+
+  TimedRWMutex(const TimedRWMutex& rhs) = delete;
+  TimedRWMutex& operator=(const TimedRWMutex& rhs) = delete;
+  TimedRWMutex(TimedRWMutex&& rhs) = delete;
+  TimedRWMutex& operator=(TimedRWMutex&& rhs) = delete;
+
+  // Lock for shared access. The thread / fiber is blocked until the lock
+  // can be acquired.
+  void read_lock();
+
+  // Like read_lock except the thread /fiber is blocked for a time duration
+  // @return        true if locked successfully, false otherwise.
+  template <typename Rep, typename Period>
+  bool timed_read_lock(const std::chrono::duration<Rep, Period>& duration);
+
+  // Like read_lock but doesn't block the thread / fiber if the lock can't
+  // be acquired.
+  // @return        true if lock was acquired, false otherwise.
+  bool try_read_lock();
+
+  // Obtain an exclusive lock. The thread / fiber is blocked until the lock
+  // is available.
+  void write_lock();
+
+  // Like write_lock except the thread / fiber is blocked for a time duration
+  // @return        true if locked successfully, false otherwise.
+  template <typename Rep, typename Period>
+  bool timed_write_lock(const std::chrono::duration<Rep, Period>& duration);
+
+  // Like write_lock but doesn't block the thread / fiber if the lock cant be
+  // obtained.
+  // @return        true if lock was acquired, false otherwise.
+  bool try_write_lock();
+
+  // Wrapper for write_lock() for compatibility with Mutex
+  void lock() { write_lock(); }
+
+  // Realease the lock. The thread / fiber will wake up all readers if there are
+  // any. If there are waiting writers then only one of them will be woken up.
+  // NOTE: readers are given priority over writers (see above comment)
+  void unlock();
+
+  // Downgrade the lock. The thread / fiber will wake up all readers if there
+  // are any.
+  void downgrade();
+
+  class ReadHolder {
+   public:
+    explicit ReadHolder(TimedRWMutex& lock)
+        : lock_(&lock) {
+      lock_->read_lock();
+    }
+
+    ~ReadHolder() {
+      if (lock_) {
+        lock_->unlock();
+      }
+    }
+
+    ReadHolder(const ReadHolder& rhs) = delete;
+    ReadHolder& operator=(const ReadHolder& rhs) = delete;
+    ReadHolder(ReadHolder&& rhs) = delete;
+    ReadHolder& operator=(ReadHolder&& rhs) = delete;
+
+   private:
+    TimedRWMutex* lock_;
+  };
+
+  class WriteHolder {
+   public:
+    explicit WriteHolder(TimedRWMutex& lock) : lock_(&lock) {
+      lock_->write_lock();
+    }
+
+    ~WriteHolder() {
+      if (lock_) {
+        lock_->unlock();
+      }
+    }
+
+    WriteHolder(const WriteHolder& rhs) = delete;
+    WriteHolder& operator=(const WriteHolder& rhs) = delete;
+    WriteHolder(WriteHolder&& rhs) = delete;
+    WriteHolder& operator=(WriteHolder&& rhs) = delete;
+
+   private:
+    TimedRWMutex* lock_;
+  };
+
+ private:
+  // invariants that must hold when the lock is not held by anyone
+  void verify_unlocked_properties() {
+    assert(readers_ == 0);
+    assert(read_waiters_.empty());
+    assert(write_waiters_.empty());
+  }
+
+  // Different states the lock can be in
+  enum class State {
+    UNLOCKED,
+    READ_LOCKED,
+    WRITE_LOCKED,
+  };
+
+  typedef boost::intrusive::list_member_hook<> MutexWaiterHookType;
+
+  // represents a waiter waiting for the lock.
+  struct MutexWaiter {
+    BatonType baton;
+    MutexWaiterHookType hook;
+  };
+
+  typedef boost::intrusive::member_hook<MutexWaiter,
+                                        MutexWaiterHookType,
+                                        &MutexWaiter::hook> MutexWaiterHook;
+
+  typedef boost::intrusive::list<MutexWaiter,
+                                 MutexWaiterHook,
+                                 boost::intrusive::constant_time_size<true>>
+  MutexWaiterList;
+
+  pthread_spinlock_t  lock_;            //< lock protecting the internal state
+                                        // (state_, read_waiters_, etc.)
+  State state_ = State::UNLOCKED;
+
+  uint32_t readers_ = 0;                //< Number of readers who have the lock
+
+  MutexWaiterList write_waiters_;       //< List of thread / fibers waiting for
+                                        //  exclusive access
+
+  MutexWaiterList read_waiters_;        //< List of thread / fibers waiting for
+                                        //  shared access
+};
+
+}}
+
+#include "TimedMutex-inl.h"