Added fiber-compatible semaphore.
authorLee Howes <lwh@fb.com>
Fri, 26 Aug 2016 22:58:45 +0000 (15:58 -0700)
committerFacebook Github Bot 1 <facebook-github-bot-1-bot@fb.com>
Fri, 26 Aug 2016 23:08:43 +0000 (16:08 -0700)
Summary: Adds a standard semaphore type with signal and wait methods that is safe to use in both multi-threaded contexts and from fibers.

Reviewed By: andriigrynenko

Differential Revision: D3778943

fbshipit-source-id: 6997f1fb870739e07f982399dbebfd8b3e45daa2

folly/Makefile.am
folly/fibers/Semaphore.cpp [new file with mode: 0644]
folly/fibers/Semaphore.h [new file with mode: 0644]
folly/fibers/test/FibersTest.cpp

index 7caea18ba1ff7f6ec958822b94bc2e8247b8d130..67491803b8bb227b118de15ef16308ed6c519bd4 100644 (file)
@@ -521,6 +521,7 @@ nobase_follyinclude_HEADERS += \
        fibers/LoopController.h \
        fibers/Promise.h \
        fibers/Promise-inl.h \
+       fibers/Semaphore.h \
        fibers/SimpleLoopController.h \
        fibers/TimedMutex.h \
        fibers/TimedMutex-inl.h \
@@ -535,6 +536,7 @@ libfolly_la_SOURCES += \
        fibers/FiberManager.cpp \
        fibers/FiberManagerMap.cpp \
        fibers/GuardPageAllocator.cpp \
+       fibers/Semaphore.cpp \
        fibers/TimeoutController.cpp
 endif
 
diff --git a/folly/fibers/Semaphore.cpp b/folly/fibers/Semaphore.cpp
new file mode 100644 (file)
index 0000000..40efae3
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Semaphore.h"
+
+namespace folly {
+namespace fibers {
+
+bool Semaphore::signalSlow() {
+  // If we signalled a release, notify the waitlist
+  SYNCHRONIZED(waitList_) {
+    auto testVal = tokens_.load(std::memory_order_acquire);
+    if (testVal != 0) {
+      return false;
+    }
+
+    if (waitList_.empty()) {
+      // If the waitlist is now empty, ensure the token count increments
+      // No need for CAS here as we will always be under the mutex
+      CHECK(tokens_.compare_exchange_strong(
+          testVal, testVal + 1, std::memory_order_relaxed));
+    } else {
+      // trigger waiter if there is one
+      waitList_.front()->post();
+      waitList_.pop();
+    }
+  } // SYNCHRONIZED(waitList_)
+  return true;
+}
+
+void Semaphore::signal() {
+  auto oldVal = tokens_.load(std::memory_order_acquire);
+  do {
+    if (oldVal == 0) {
+      if (signalSlow()) {
+        break;
+      }
+    }
+  } while (!tokens_.compare_exchange_weak(
+      oldVal,
+      oldVal + 1,
+      std::memory_order_release,
+      std::memory_order_acquire));
+}
+
+bool Semaphore::waitSlow() {
+  // Slow path, create a baton and acquire a mutex to update the wait list
+  folly::fibers::Baton waitBaton;
+
+  SYNCHRONIZED(waitList_) {
+    auto testVal = tokens_.load(std::memory_order_acquire);
+    if (testVal != 0) {
+      return false;
+    }
+    // prepare baton and add to queue
+    waitList_.push(&waitBaton);
+  }
+  // If we managed to create a baton, wait on it
+  // This has to be done here so the mutex has been released
+  waitBaton.wait();
+  return true;
+}
+
+void Semaphore::wait() {
+  auto oldVal = tokens_.load(std::memory_order_acquire);
+  do {
+    if (oldVal == 0) {
+      // If waitSlow fails it is because the token is non-zero by the time
+      // the lock is taken, so we can just continue round the loop
+      if (waitSlow()) {
+        break;
+      }
+    }
+  } while (!tokens_.compare_exchange_weak(
+      oldVal,
+      oldVal - 1,
+      std::memory_order_release,
+      std::memory_order_acquire));
+}
+
+} // namespace fibers
+} // namespace folly
diff --git a/folly/fibers/Semaphore.h b/folly/fibers/Semaphore.h
new file mode 100644 (file)
index 0000000..24e5707
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/Synchronized.h>
+#include <folly/fibers/Baton.h>
+
+namespace folly {
+namespace fibers {
+
+/*
+ * Fiber-compatible semaphore. Will safely block fibers that wait when no
+ * tokens are available and wake fibers when signalled.
+ */
+class Semaphore {
+ public:
+  explicit Semaphore(size_t tokenCount) : tokens_(tokenCount) {}
+
+  Semaphore(const Semaphore&) = delete;
+  Semaphore(Semaphore&&) = delete;
+  Semaphore& operator=(const Semaphore&) = delete;
+  Semaphore& operator=(Semaphore&&) = delete;
+
+  /*
+   * Release a token in the semaphore. Signal the waiter if necessary.
+   */
+  void signal();
+
+  /*
+   * Wait for capacity in the semaphore.
+   */
+  void wait();
+
+ private:
+  bool waitSlow();
+  bool signalSlow();
+
+  // Atomic counter
+  std::atomic<int64_t> tokens_;
+  folly::Synchronized<std::queue<folly::fibers::Baton*>> waitList_;
+};
+
+} // namespace fibers
+} // namespace folly
index dcdd5d790f6cd2ee08680bdde2cf9e6cfd2adcaa..bbb5ddcfdaec149426ec1658dd4370f72faa25c3 100644 (file)
@@ -27,6 +27,7 @@
 #include <folly/fibers/FiberManager.h>
 #include <folly/fibers/FiberManagerMap.h>
 #include <folly/fibers/GenericBaton.h>
+#include <folly/fibers/Semaphore.h>
 #include <folly/fibers/SimpleLoopController.h>
 #include <folly/fibers/WhenN.h>
 
@@ -1539,3 +1540,58 @@ TEST(FiberManager, nestedFiberManagers) {
 
   outerEvb.loopForever();
 }
+
+TEST(FiberManager, semaphore) {
+  constexpr size_t kTasks = 10;
+  constexpr size_t kIterations = 10000;
+  constexpr size_t kNumTokens = 10;
+
+  Semaphore sem(kNumTokens);
+  int counterA = 0;
+  int counterB = 0;
+
+  auto task = [&sem, kTasks, kIterations, kNumTokens](
+      int& counter, folly::fibers::Baton& baton) {
+    FiberManager manager(folly::make_unique<EventBaseLoopController>());
+    folly::EventBase evb;
+    dynamic_cast<EventBaseLoopController&>(manager.loopController())
+        .attachEventBase(evb);
+
+    {
+      std::shared_ptr<folly::EventBase> completionCounter(
+          &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
+
+      for (size_t i = 0; i < kTasks; ++i) {
+        manager.addTask([&, completionCounter]() {
+          for (size_t i = 0; i < kIterations; ++i) {
+            sem.wait();
+            ++counter;
+            sem.signal();
+            --counter;
+
+            EXPECT_LT(counter, kNumTokens);
+            EXPECT_GE(counter, 0);
+          }
+        });
+      }
+
+      baton.wait();
+    }
+    evb.loopForever();
+  };
+
+  folly::fibers::Baton batonA;
+  folly::fibers::Baton batonB;
+  std::thread threadA([&] { task(counterA, batonA); });
+  std::thread threadB([&] { task(counterB, batonB); });
+
+  batonA.post();
+  batonB.post();
+  threadA.join();
+  threadB.join();
+
+  EXPECT_LT(counterA, kNumTokens);
+  EXPECT_LT(counterB, kNumTokens);
+  EXPECT_GE(counterA, 0);
+  EXPECT_GE(counterB, 0);
+}