move executor task queues and thread factories into subdirectories
authorJames Sedgwick <jsedgwick@fb.com>
Fri, 20 Oct 2017 20:30:03 +0000 (13:30 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Fri, 20 Oct 2017 20:36:07 +0000 (13:36 -0700)
Summary:
as title, see moves

(Note: this ignores all push blocking failures!)

Reviewed By: mzlee

Differential Revision: D6112001

fbshipit-source-id: 1eb10b44ae8ee1f90a10e05c29e48c99d824afa5

22 files changed:
folly/Makefile.am
folly/executors/BlockingQueue.h [deleted file]
folly/executors/CPUThreadPoolExecutor.cpp
folly/executors/LifoSemMPMCQueue.h [deleted file]
folly/executors/NamedThreadFactory.h [deleted file]
folly/executors/PriorityLifoSemMPMCQueue.h [deleted file]
folly/executors/PriorityThreadFactory.h [deleted file]
folly/executors/ThreadFactory.h [deleted file]
folly/executors/ThreadPoolExecutor.h
folly/executors/ThreadedExecutor.cpp
folly/executors/ThreadedExecutor.h
folly/executors/UnboundedBlockingQueue.h [deleted file]
folly/executors/task_queue/BlockingQueue.h [new file with mode: 0644]
folly/executors/task_queue/LifoSemMPMCQueue.h [new file with mode: 0644]
folly/executors/task_queue/PriorityLifoSemMPMCQueue.h [new file with mode: 0644]
folly/executors/task_queue/UnboundedBlockingQueue.h [new file with mode: 0644]
folly/executors/task_queue/test/UnboundedBlockingQueueTest.cpp [new file with mode: 0644]
folly/executors/test/ThreadPoolExecutorTest.cpp
folly/executors/test/UnboundedBlockingQueueTest.cpp [deleted file]
folly/executors/thread_factory/NamedThreadFactory.h [new file with mode: 0644]
folly/executors/thread_factory/PriorityThreadFactory.h [new file with mode: 0644]
folly/executors/thread_factory/ThreadFactory.h [new file with mode: 0644]

index 344dc3aedb0a94c336cc0ed9487b1ade837c71fa..3fc4644bee6f18f4e4a6add6a43582c8093efc48 100644 (file)
@@ -83,7 +83,6 @@ nobase_follyinclude_HEADERS = \
        detail/TurnSequencer.h \
        detail/UncaughtExceptionCounter.h \
        executors/Async.h \
-       executors/BlockingQueue.h \
        executors/CPUThreadPoolExecutor.h \
        executors/Codel.h \
        executors/DrivableExecutor.h \
@@ -93,17 +92,18 @@ nobase_follyinclude_HEADERS = \
        executors/IOExecutor.h \
        executors/IOObjectCache.h \
        executors/IOThreadPoolExecutor.h \
-       executors/LifoSemMPMCQueue.h \
-       executors/NamedThreadFactory.h \
        executors/NotificationQueueExecutor.h \
-       executors/PriorityLifoSemMPMCQueue.h \
-       executors/PriorityThreadFactory.h \
        executors/ScheduledExecutor.h \
        executors/SerialExecutor.h \
-       executors/ThreadFactory.h \
        executors/ThreadPoolExecutor.h \
        executors/ThreadedExecutor.h \
-       executors/UnboundedBlockingQueue.h \
+       executors/task_queue/BlockingQueue.h \
+       executors/task_queue/LifoSemMPMCQueue.h \
+       executors/task_queue/PriorityLifoSemMPMCQueue.h \
+       executors/task_queue/UnboundedBlockingQueue.h \
+       executors/thread_factory/NamedThreadFactory.h \
+       executors/thread_factory/PriorityThreadFactory.h \
+       executors/thread_factory/ThreadFactory.h \
        functional/ApplyTuple.h \
        Demangle.h \
        DiscriminatedPtr.h \
diff --git a/folly/executors/BlockingQueue.h b/folly/executors/BlockingQueue.h
deleted file mode 100644 (file)
index 4928095..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <exception>
-#include <stdexcept>
-
-#include <glog/logging.h>
-
-namespace folly {
-
-// Some queue implementations (for example, LifoSemMPMCQueue or
-// PriorityLifoSemMPMCQueue) support both blocking (BLOCK) and
-// non-blocking (THROW) behaviors.
-enum class QueueBehaviorIfFull { THROW, BLOCK };
-
-class QueueFullException : public std::runtime_error {
-  using std::runtime_error::runtime_error; // Inherit constructors.
-};
-
-template <class T>
-class BlockingQueue {
- public:
-  virtual ~BlockingQueue() = default;
-  virtual void add(T item) = 0;
-  virtual void addWithPriority(T item, int8_t /* priority */) {
-    add(std::move(item));
-  }
-  virtual uint8_t getNumPriorities() {
-    return 1;
-  }
-  virtual T take() = 0;
-  virtual size_t size() = 0;
-};
-
-} // namespace folly
index 37fd4441c5fcb49e28880be7fdf15c4c2960e234..3c50a29985bd41aa219229dc0d1854af731e07b8 100644 (file)
@@ -15,7 +15,7 @@
  */
 
 #include <folly/executors/CPUThreadPoolExecutor.h>
-#include <folly/executors/PriorityLifoSemMPMCQueue.h>
+#include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h>
 
 namespace folly {
 
diff --git a/folly/executors/LifoSemMPMCQueue.h b/folly/executors/LifoSemMPMCQueue.h
deleted file mode 100644 (file)
index 3a16da2..0000000
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/LifoSem.h>
-#include <folly/MPMCQueue.h>
-#include <folly/executors/BlockingQueue.h>
-
-namespace folly {
-
-template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
-class LifoSemMPMCQueue : public BlockingQueue<T> {
- public:
-  // Note: The queue pre-allocates all memory for max_capacity
-  explicit LifoSemMPMCQueue(size_t max_capacity) : queue_(max_capacity) {}
-
-  void add(T item) override {
-    switch (kBehavior) { // static
-      case QueueBehaviorIfFull::THROW:
-        if (!queue_.write(std::move(item))) {
-          throw QueueFullException("LifoSemMPMCQueue full, can't add item");
-        }
-        break;
-      case QueueBehaviorIfFull::BLOCK:
-        queue_.blockingWrite(std::move(item));
-        break;
-    }
-    sem_.post();
-  }
-
-  T take() override {
-    T item;
-    while (!queue_.readIfNotEmpty(item)) {
-      sem_.wait();
-    }
-    return item;
-  }
-
-  size_t capacity() {
-    return queue_.capacity();
-  }
-
-  size_t size() override {
-    return queue_.size();
-  }
-
- private:
-  folly::LifoSem sem_;
-  folly::MPMCQueue<T> queue_;
-};
-
-} // namespace folly
diff --git a/folly/executors/NamedThreadFactory.h b/folly/executors/NamedThreadFactory.h
deleted file mode 100644 (file)
index 856a529..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <atomic>
-#include <string>
-#include <thread>
-
-#include <folly/Conv.h>
-#include <folly/Range.h>
-#include <folly/ThreadName.h>
-#include <folly/executors/ThreadFactory.h>
-
-namespace folly {
-
-class NamedThreadFactory : public ThreadFactory {
- public:
-  explicit NamedThreadFactory(folly::StringPiece prefix)
-      : prefix_(prefix.str()), suffix_(0) {}
-
-  std::thread newThread(Func&& func) override {
-    auto name = folly::to<std::string>(prefix_, suffix_++);
-    return std::thread(
-        [ func = std::move(func), name = std::move(name) ]() mutable {
-          folly::setThreadName(name);
-          func();
-        });
-  }
-
-  void setNamePrefix(folly::StringPiece prefix) {
-    prefix_ = prefix.str();
-  }
-
-  std::string getNamePrefix() {
-    return prefix_;
-  }
-
- private:
-  std::string prefix_;
-  std::atomic<uint64_t> suffix_;
-};
-
-} // namespace folly
diff --git a/folly/executors/PriorityLifoSemMPMCQueue.h b/folly/executors/PriorityLifoSemMPMCQueue.h
deleted file mode 100644 (file)
index 797287c..0000000
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/Executor.h>
-#include <folly/LifoSem.h>
-#include <folly/MPMCQueue.h>
-#include <folly/executors/BlockingQueue.h>
-
-namespace folly {
-
-template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
-class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
- public:
-  // Note A: The queue pre-allocates all memory for max_capacity
-  // Note B: To use folly::Executor::*_PRI, for numPriorities == 2
-  //         MID_PRI and HI_PRI are treated at the same priority level.
-  PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t max_capacity) {
-    queues_.reserve(numPriorities);
-    for (int8_t i = 0; i < numPriorities; i++) {
-      queues_.emplace_back(max_capacity);
-    }
-  }
-
-  uint8_t getNumPriorities() override {
-    return queues_.size();
-  }
-
-  // Add at medium priority by default
-  void add(T item) override {
-    addWithPriority(std::move(item), folly::Executor::MID_PRI);
-  }
-
-  void addWithPriority(T item, int8_t priority) override {
-    int mid = getNumPriorities() / 2;
-    size_t queue = priority < 0
-        ? std::max(0, mid + priority)
-        : std::min(getNumPriorities() - 1, mid + priority);
-    CHECK_LT(queue, queues_.size());
-    switch (kBehavior) { // static
-      case QueueBehaviorIfFull::THROW:
-        if (!queues_[queue].write(std::move(item))) {
-          throw QueueFullException("LifoSemMPMCQueue full, can't add item");
-        }
-        break;
-      case QueueBehaviorIfFull::BLOCK:
-        queues_[queue].blockingWrite(std::move(item));
-        break;
-    }
-    sem_.post();
-  }
-
-  T take() override {
-    T item;
-    while (true) {
-      if (nonBlockingTake(item)) {
-        return item;
-      }
-      sem_.wait();
-    }
-  }
-
-  bool nonBlockingTake(T& item) {
-    for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
-      if (it->readIfNotEmpty(item)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  size_t size() override {
-    size_t size = 0;
-    for (auto& q : queues_) {
-      size += q.size();
-    }
-    return size;
-  }
-
-  size_t sizeGuess() const {
-    size_t size = 0;
-    for (auto& q : queues_) {
-      size += q.sizeGuess();
-    }
-    return size;
-  }
-
- private:
-  folly::LifoSem sem_;
-  std::vector<folly::MPMCQueue<T>> queues_;
-};
-
-} // namespace folly
diff --git a/folly/executors/PriorityThreadFactory.h b/folly/executors/PriorityThreadFactory.h
deleted file mode 100644 (file)
index ed46dd3..0000000
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/executors/ThreadFactory.h>
-
-#include <folly/portability/SysResource.h>
-#include <folly/portability/SysTime.h>
-
-namespace folly {
-
-/**
- * A ThreadFactory that sets nice values for each thread.  The main
- * use case for this class is if there are multiple
- * CPUThreadPoolExecutors in a single process, or between multiple
- * processes, where some should have a higher priority than the others.
- *
- * Note that per-thread nice values are not POSIX standard, but both
- * pthreads and linux support per-thread nice.  The default linux
- * scheduler uses these values to do smart thread prioritization.
- * sched_priority function calls only affect real-time schedulers.
- */
-class PriorityThreadFactory : public ThreadFactory {
- public:
-  explicit PriorityThreadFactory(
-      std::shared_ptr<ThreadFactory> factory,
-      int priority)
-      : factory_(std::move(factory)), priority_(priority) {}
-
-  std::thread newThread(Func&& func) override {
-    int priority = priority_;
-    return factory_->newThread([ priority, func = std::move(func) ]() mutable {
-      if (setpriority(PRIO_PROCESS, 0, priority) != 0) {
-        LOG(ERROR) << "setpriority failed (are you root?) with error " << errno,
-            strerror(errno);
-      }
-      func();
-    });
-  }
-
- private:
-  std::shared_ptr<ThreadFactory> factory_;
-  int priority_;
-};
-
-} // folly
diff --git a/folly/executors/ThreadFactory.h b/folly/executors/ThreadFactory.h
deleted file mode 100644 (file)
index 0af8632..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-#include <folly/Executor.h>
-
-#include <thread>
-
-namespace folly {
-
-class ThreadFactory {
- public:
-  virtual ~ThreadFactory() = default;
-  virtual std::thread newThread(Func&& func) = 0;
-};
-
-} // namespace folly
index 3de9ec591c3d0effebb2b5beab0758b46a559ff3..09ff14fd02feb3c57130f7d0e83f750415eba9f2 100644 (file)
@@ -20,8 +20,8 @@
 #include <folly/Memory.h>
 #include <folly/RWSpinLock.h>
 #include <folly/concurrency/GlobalThreadPoolList.h>
-#include <folly/executors/LifoSemMPMCQueue.h>
-#include <folly/executors/NamedThreadFactory.h>
+#include <folly/executors/task_queue/LifoSemMPMCQueue.h>
+#include <folly/executors/thread_factory/NamedThreadFactory.h>
 #include <folly/io/async/Request.h>
 
 #include <algorithm>
index d75e82b6b719838de91b22c28740338cf3da1eab..e19ecf4f8c82441089a3e9073e230b42e987b3de 100644 (file)
@@ -21,7 +21,7 @@
 #include <glog/logging.h>
 
 #include <folly/ThreadName.h>
-#include <folly/executors/NamedThreadFactory.h>
+#include <folly/executors/thread_factory/NamedThreadFactory.h>
 
 namespace folly {
 
index 81c5fca80e80c270ce96cdaeea13381cc5d4219d..9417898778287f17049f83273ff53178a3661beb 100644 (file)
@@ -25,7 +25,7 @@
 #include <thread>
 
 #include <folly/Executor.h>
-#include <folly/executors/ThreadFactory.h>
+#include <folly/executors/thread_factory/ThreadFactory.h>
 
 namespace folly {
 
diff --git a/folly/executors/UnboundedBlockingQueue.h b/folly/executors/UnboundedBlockingQueue.h
deleted file mode 100644 (file)
index 3fb09b3..0000000
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2017 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <folly/LifoSem.h>
-#include <folly/Synchronized.h>
-#include <folly/executors/BlockingQueue.h>
-#include <queue>
-
-namespace folly {
-
-// Warning: this is effectively just a std::deque wrapped in a single mutex
-// We are aiming to add a more performant concurrent unbounded queue in the
-// future, but this class is available if you must have an unbounded queue
-// and can tolerate any contention.
-template <class T>
-class UnboundedBlockingQueue : public BlockingQueue<T> {
- public:
-  virtual ~UnboundedBlockingQueue() {}
-
-  void add(T item) override {
-    queue_.wlock()->push(std::move(item));
-    sem_.post();
-  }
-
-  T take() override {
-    while (true) {
-      {
-        auto ulockedQueue = queue_.ulock();
-        if (!ulockedQueue->empty()) {
-          auto wlockedQueue = ulockedQueue.moveFromUpgradeToWrite();
-          T item = std::move(wlockedQueue->front());
-          wlockedQueue->pop();
-          return item;
-        }
-      }
-      sem_.wait();
-    }
-  }
-
-  size_t size() override {
-    return queue_.rlock()->size();
-  }
-
- private:
-  LifoSem sem_;
-  Synchronized<std::queue<T>> queue_;
-};
-
-} // namespace folly
diff --git a/folly/executors/task_queue/BlockingQueue.h b/folly/executors/task_queue/BlockingQueue.h
new file mode 100644 (file)
index 0000000..4928095
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <exception>
+#include <stdexcept>
+
+#include <glog/logging.h>
+
+namespace folly {
+
+// Some queue implementations (for example, LifoSemMPMCQueue or
+// PriorityLifoSemMPMCQueue) support both blocking (BLOCK) and
+// non-blocking (THROW) behaviors.
+enum class QueueBehaviorIfFull { THROW, BLOCK };
+
+class QueueFullException : public std::runtime_error {
+  using std::runtime_error::runtime_error; // Inherit constructors.
+};
+
+template <class T>
+class BlockingQueue {
+ public:
+  virtual ~BlockingQueue() = default;
+  virtual void add(T item) = 0;
+  virtual void addWithPriority(T item, int8_t /* priority */) {
+    add(std::move(item));
+  }
+  virtual uint8_t getNumPriorities() {
+    return 1;
+  }
+  virtual T take() = 0;
+  virtual size_t size() = 0;
+};
+
+} // namespace folly
diff --git a/folly/executors/task_queue/LifoSemMPMCQueue.h b/folly/executors/task_queue/LifoSemMPMCQueue.h
new file mode 100644 (file)
index 0000000..68a8606
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/LifoSem.h>
+#include <folly/MPMCQueue.h>
+#include <folly/executors/task_queue/BlockingQueue.h>
+
+namespace folly {
+
+template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
+class LifoSemMPMCQueue : public BlockingQueue<T> {
+ public:
+  // Note: The queue pre-allocates all memory for max_capacity
+  explicit LifoSemMPMCQueue(size_t max_capacity) : queue_(max_capacity) {}
+
+  void add(T item) override {
+    switch (kBehavior) { // static
+      case QueueBehaviorIfFull::THROW:
+        if (!queue_.write(std::move(item))) {
+          throw QueueFullException("LifoSemMPMCQueue full, can't add item");
+        }
+        break;
+      case QueueBehaviorIfFull::BLOCK:
+        queue_.blockingWrite(std::move(item));
+        break;
+    }
+    sem_.post();
+  }
+
+  T take() override {
+    T item;
+    while (!queue_.readIfNotEmpty(item)) {
+      sem_.wait();
+    }
+    return item;
+  }
+
+  size_t capacity() {
+    return queue_.capacity();
+  }
+
+  size_t size() override {
+    return queue_.size();
+  }
+
+ private:
+  folly::LifoSem sem_;
+  folly::MPMCQueue<T> queue_;
+};
+
+} // namespace folly
diff --git a/folly/executors/task_queue/PriorityLifoSemMPMCQueue.h b/folly/executors/task_queue/PriorityLifoSemMPMCQueue.h
new file mode 100644 (file)
index 0000000..3242628
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/Executor.h>
+#include <folly/LifoSem.h>
+#include <folly/MPMCQueue.h>
+#include <folly/executors/task_queue/BlockingQueue.h>
+
+namespace folly {
+
+template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
+class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
+ public:
+  // Note A: The queue pre-allocates all memory for max_capacity
+  // Note B: To use folly::Executor::*_PRI, for numPriorities == 2
+  //         MID_PRI and HI_PRI are treated at the same priority level.
+  PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t max_capacity) {
+    queues_.reserve(numPriorities);
+    for (int8_t i = 0; i < numPriorities; i++) {
+      queues_.emplace_back(max_capacity);
+    }
+  }
+
+  uint8_t getNumPriorities() override {
+    return queues_.size();
+  }
+
+  // Add at medium priority by default
+  void add(T item) override {
+    addWithPriority(std::move(item), folly::Executor::MID_PRI);
+  }
+
+  void addWithPriority(T item, int8_t priority) override {
+    int mid = getNumPriorities() / 2;
+    size_t queue = priority < 0
+        ? std::max(0, mid + priority)
+        : std::min(getNumPriorities() - 1, mid + priority);
+    CHECK_LT(queue, queues_.size());
+    switch (kBehavior) { // static
+      case QueueBehaviorIfFull::THROW:
+        if (!queues_[queue].write(std::move(item))) {
+          throw QueueFullException("LifoSemMPMCQueue full, can't add item");
+        }
+        break;
+      case QueueBehaviorIfFull::BLOCK:
+        queues_[queue].blockingWrite(std::move(item));
+        break;
+    }
+    sem_.post();
+  }
+
+  T take() override {
+    T item;
+    while (true) {
+      if (nonBlockingTake(item)) {
+        return item;
+      }
+      sem_.wait();
+    }
+  }
+
+  bool nonBlockingTake(T& item) {
+    for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
+      if (it->readIfNotEmpty(item)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  size_t size() override {
+    size_t size = 0;
+    for (auto& q : queues_) {
+      size += q.size();
+    }
+    return size;
+  }
+
+  size_t sizeGuess() const {
+    size_t size = 0;
+    for (auto& q : queues_) {
+      size += q.sizeGuess();
+    }
+    return size;
+  }
+
+ private:
+  folly::LifoSem sem_;
+  std::vector<folly::MPMCQueue<T>> queues_;
+};
+
+} // namespace folly
diff --git a/folly/executors/task_queue/UnboundedBlockingQueue.h b/folly/executors/task_queue/UnboundedBlockingQueue.h
new file mode 100644 (file)
index 0000000..2f29fbf
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/LifoSem.h>
+#include <folly/Synchronized.h>
+#include <folly/executors/task_queue/BlockingQueue.h>
+#include <queue>
+
+namespace folly {
+
+// Warning: this is effectively just a std::deque wrapped in a single mutex
+// We are aiming to add a more performant concurrent unbounded queue in the
+// future, but this class is available if you must have an unbounded queue
+// and can tolerate any contention.
+template <class T>
+class UnboundedBlockingQueue : public BlockingQueue<T> {
+ public:
+  virtual ~UnboundedBlockingQueue() {}
+
+  void add(T item) override {
+    queue_.wlock()->push(std::move(item));
+    sem_.post();
+  }
+
+  T take() override {
+    while (true) {
+      {
+        auto ulockedQueue = queue_.ulock();
+        if (!ulockedQueue->empty()) {
+          auto wlockedQueue = ulockedQueue.moveFromUpgradeToWrite();
+          T item = std::move(wlockedQueue->front());
+          wlockedQueue->pop();
+          return item;
+        }
+      }
+      sem_.wait();
+    }
+  }
+
+  size_t size() override {
+    return queue_.rlock()->size();
+  }
+
+ private:
+  LifoSem sem_;
+  Synchronized<std::queue<T>> queue_;
+};
+
+} // namespace folly
diff --git a/folly/executors/task_queue/test/UnboundedBlockingQueueTest.cpp b/folly/executors/task_queue/test/UnboundedBlockingQueueTest.cpp
new file mode 100644 (file)
index 0000000..06de12a
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/executors/task_queue/UnboundedBlockingQueue.h>
+#include <folly/Baton.h>
+#include <gtest/gtest.h>
+#include <thread>
+
+using namespace folly;
+
+TEST(UnboundedQueuee, push_pop) {
+  UnboundedBlockingQueue<int> q;
+  q.add(42);
+  EXPECT_EQ(42, q.take());
+}
+TEST(UnboundedBlockingQueue, size) {
+  UnboundedBlockingQueue<int> q;
+  EXPECT_EQ(0, q.size());
+  q.add(42);
+  EXPECT_EQ(1, q.size());
+  q.take();
+  EXPECT_EQ(0, q.size());
+}
+
+TEST(UnboundedBlockingQueue, concurrent_push_pop) {
+  UnboundedBlockingQueue<int> q;
+  Baton<> b1, b2;
+  std::thread t([&] {
+    b1.post();
+    EXPECT_EQ(42, q.take());
+    EXPECT_EQ(0, q.size());
+    b2.post();
+  });
+  b1.wait();
+  q.add(42);
+  b2.wait();
+  EXPECT_EQ(0, q.size());
+  t.join();
+}
index 40b4af15c4f760115813fbc7d24d5fcea9e0627c..fd96b9f19952e3e7e0d068b1917d7ef41e04a045 100644 (file)
@@ -19,9 +19,9 @@
 #include <folly/executors/CPUThreadPoolExecutor.h>
 #include <folly/executors/FutureExecutor.h>
 #include <folly/executors/IOThreadPoolExecutor.h>
-#include <folly/executors/LifoSemMPMCQueue.h>
-#include <folly/executors/PriorityThreadFactory.h>
 #include <folly/executors/ThreadPoolExecutor.h>
+#include <folly/executors/task_queue/LifoSemMPMCQueue.h>
+#include <folly/executors/thread_factory/PriorityThreadFactory.h>
 #include <gtest/gtest.h>
 
 using namespace folly;
diff --git a/folly/executors/test/UnboundedBlockingQueueTest.cpp b/folly/executors/test/UnboundedBlockingQueueTest.cpp
deleted file mode 100644 (file)
index 203c1eb..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright 2017-present Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <folly/Baton.h>
-#include <folly/executors/UnboundedBlockingQueue.h>
-#include <gtest/gtest.h>
-#include <thread>
-
-using namespace folly;
-
-TEST(UnboundedQueuee, push_pop) {
-  UnboundedBlockingQueue<int> q;
-  q.add(42);
-  EXPECT_EQ(42, q.take());
-}
-TEST(UnboundedBlockingQueue, size) {
-  UnboundedBlockingQueue<int> q;
-  EXPECT_EQ(0, q.size());
-  q.add(42);
-  EXPECT_EQ(1, q.size());
-  q.take();
-  EXPECT_EQ(0, q.size());
-}
-
-TEST(UnboundedBlockingQueue, concurrent_push_pop) {
-  UnboundedBlockingQueue<int> q;
-  Baton<> b1, b2;
-  std::thread t([&] {
-    b1.post();
-    EXPECT_EQ(42, q.take());
-    EXPECT_EQ(0, q.size());
-    b2.post();
-  });
-  b1.wait();
-  q.add(42);
-  b2.wait();
-  EXPECT_EQ(0, q.size());
-  t.join();
-}
diff --git a/folly/executors/thread_factory/NamedThreadFactory.h b/folly/executors/thread_factory/NamedThreadFactory.h
new file mode 100644 (file)
index 0000000..2b34be9
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <string>
+#include <thread>
+
+#include <folly/Conv.h>
+#include <folly/Range.h>
+#include <folly/ThreadName.h>
+#include <folly/executors/thread_factory/ThreadFactory.h>
+
+namespace folly {
+
+class NamedThreadFactory : public ThreadFactory {
+ public:
+  explicit NamedThreadFactory(folly::StringPiece prefix)
+      : prefix_(prefix.str()), suffix_(0) {}
+
+  std::thread newThread(Func&& func) override {
+    auto name = folly::to<std::string>(prefix_, suffix_++);
+    return std::thread(
+        [ func = std::move(func), name = std::move(name) ]() mutable {
+          folly::setThreadName(name);
+          func();
+        });
+  }
+
+  void setNamePrefix(folly::StringPiece prefix) {
+    prefix_ = prefix.str();
+  }
+
+  std::string getNamePrefix() {
+    return prefix_;
+  }
+
+ private:
+  std::string prefix_;
+  std::atomic<uint64_t> suffix_;
+};
+
+} // namespace folly
diff --git a/folly/executors/thread_factory/PriorityThreadFactory.h b/folly/executors/thread_factory/PriorityThreadFactory.h
new file mode 100644 (file)
index 0000000..0c75fef
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/executors/thread_factory/ThreadFactory.h>
+
+#include <folly/portability/SysResource.h>
+#include <folly/portability/SysTime.h>
+
+namespace folly {
+
+/**
+ * A ThreadFactory that sets nice values for each thread.  The main
+ * use case for this class is if there are multiple
+ * CPUThreadPoolExecutors in a single process, or between multiple
+ * processes, where some should have a higher priority than the others.
+ *
+ * Note that per-thread nice values are not POSIX standard, but both
+ * pthreads and linux support per-thread nice.  The default linux
+ * scheduler uses these values to do smart thread prioritization.
+ * sched_priority function calls only affect real-time schedulers.
+ */
+class PriorityThreadFactory : public ThreadFactory {
+ public:
+  explicit PriorityThreadFactory(
+      std::shared_ptr<ThreadFactory> factory,
+      int priority)
+      : factory_(std::move(factory)), priority_(priority) {}
+
+  std::thread newThread(Func&& func) override {
+    int priority = priority_;
+    return factory_->newThread([ priority, func = std::move(func) ]() mutable {
+      if (setpriority(PRIO_PROCESS, 0, priority) != 0) {
+        LOG(ERROR) << "setpriority failed (are you root?) with error " << errno,
+            strerror(errno);
+      }
+      func();
+    });
+  }
+
+ private:
+  std::shared_ptr<ThreadFactory> factory_;
+  int priority_;
+};
+
+} // folly
diff --git a/folly/executors/thread_factory/ThreadFactory.h b/folly/executors/thread_factory/ThreadFactory.h
new file mode 100644 (file)
index 0000000..0af8632
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/Executor.h>
+
+#include <thread>
+
+namespace folly {
+
+class ThreadFactory {
+ public:
+  virtual ~ThreadFactory() = default;
+  virtual std::thread newThread(Func&& func) = 0;
+};
+
+} // namespace folly