fix potential race/memory corruption in IOThreadPoolExecutor
authorJames Sedgwick <jsedgwick@fb.com>
Mon, 17 Nov 2014 23:04:35 +0000 (15:04 -0800)
committerDave Watson <davejwatson@fb.com>
Wed, 19 Nov 2014 20:52:32 +0000 (12:52 -0800)
Summary:
In unusual but possible circumstances, the EventBase and thus pending tasks will outlive the pool, so we shouldn't keep references of any kind to the pool in the task.
The only reference we were keeping was used to access the task stats rx subject. Store the subject as a shared ptr and give a copy of the ptr to the Thread object, which is itself
owned by a shared ptr and captured by every task. I thought this had to do with the thread local leak in mentioned in the test plan of D1682860 but this patch doesn't actually fix that :(
Thankfully, while task surfing I saw @phillip's awesome D1682698. Patching that in fixes the leak! Woo. Either way, this is more correct.

Test Plan: unit under clang/asan

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, fugalh, njormrod, folly-diffs@, philipp

FB internal diff: D1683221

Tasks: 5336655

Signature: t1:1683221:1416264933:946d29b5a3eb22ed08812f2adefb7284b1899e4e

folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp
folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h
folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp
folly/experimental/wangle/concurrent/ThreadPoolExecutor.h

index dfca621658e937e3eb36e9e50cf0e8f526df3426..d60472fca183ded126e8be3ee00b7736f0f2cac7 100644 (file)
@@ -93,7 +93,7 @@ void IOThreadPoolExecutor::add(
 
   auto moveTask = folly::makeMoveWrapper(
       Task(std::move(func), expiration, std::move(expireCallback)));
-  auto wrappedFunc = [this, ioThread, moveTask] () mutable {
+  auto wrappedFunc = [ioThread, moveTask] () mutable {
     runTask(ioThread, std::move(*moveTask));
     ioThread->pendingTasks--;
   };
@@ -107,7 +107,7 @@ void IOThreadPoolExecutor::add(
 
 std::shared_ptr<ThreadPoolExecutor::Thread>
 IOThreadPoolExecutor::makeThread() {
-  return std::make_shared<IOThread>();
+  return std::make_shared<IOThread>(this);
 }
 
 void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
index 7acaafe80e87956ec9941c5e19cc878e5b8a3728..35cce4c008692d7405209ecf1885259a7724e22c 100644 (file)
@@ -42,7 +42,10 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor {
   uint64_t getPendingTaskCount() override;
 
   struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread {
-    IOThread() : shouldRun(true), pendingTasks(0) {};
+    IOThread(IOThreadPoolExecutor* pool)
+      : Thread(pool),
+        shouldRun(true),
+        pendingTasks(0) {};
     std::atomic<bool> shouldRun;
     std::atomic<size_t> pendingTasks;
     EventBase* eventBase;
index 18d8c2756610a6e710e8646ae89756213971d885..74890d7b37f2d20dc52bd4c5d271ac389afd8280 100644 (file)
@@ -21,7 +21,8 @@ namespace folly { namespace wangle {
 ThreadPoolExecutor::ThreadPoolExecutor(
     size_t numThreads,
     std::shared_ptr<ThreadFactory> threadFactory)
-    : threadFactory_(std::move(threadFactory)) {}
+    : threadFactory_(std::move(threadFactory)),
+      taskStatsSubject_(std::make_shared<Subject<TaskStats>>()) {}
 
 ThreadPoolExecutor::~ThreadPoolExecutor() {
   CHECK(threadList_.get().size() == 0);
@@ -63,7 +64,7 @@ void ThreadPoolExecutor::runTask(
     task.stats_.runTime = std::chrono::steady_clock::now() - startTime;
   }
   thread->idle = true;
-  taskStatsSubject_.onNext(std::move(task.stats_));
+  thread->taskStatsSubject->onNext(std::move(task.stats_));
 }
 
 size_t ThreadPoolExecutor::numThreads() {
index 68da850ab5aa215858430e8cc1cd8c6e7b02556b..b1a3dd3854af6658aa3e74e8a758b7afa76fc6f6 100644 (file)
@@ -73,7 +73,7 @@ class ThreadPoolExecutor : public Executor {
 
   Subscription<TaskStats> subscribeToTaskStats(
       const ObserverPtr<TaskStats>& observer) {
-    return taskStatsSubject_.subscribe(observer);
+    return taskStatsSubject_->subscribe(observer);
   }
 
  protected:
@@ -83,13 +83,20 @@ class ThreadPoolExecutor : public Executor {
   void removeThreads(size_t n, bool isJoin);
 
   struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread {
+    explicit Thread(ThreadPoolExecutor* pool)
+      : id(nextId++),
+        handle(),
+        idle(true),
+        taskStatsSubject(pool->taskStatsSubject_) {}
+
     virtual ~Thread() {}
-    Thread() : id(nextId++), handle(), idle(true) {};
+
     static std::atomic<uint64_t> nextId;
     uint64_t id;
     std::thread handle;
     bool idle;
     Baton<> startupBaton;
+    std::shared_ptr<Subject<TaskStats>> taskStatsSubject;
   };
 
   typedef std::shared_ptr<Thread> ThreadPtr;
@@ -106,7 +113,7 @@ class ThreadPoolExecutor : public Executor {
     Func expireCallback_;
   };
 
-  void runTask(const ThreadPtr& thread, Task&& task);
+  static void runTask(const ThreadPtr& thread, Task&& task);
 
   // The function that will be bound to pool threads. It must call
   // thread->startupBaton.post() when it's ready to consume work.
@@ -118,7 +125,7 @@ class ThreadPoolExecutor : public Executor {
 
   // Create a suitable Thread struct
   virtual ThreadPtr makeThread() {
-    return std::make_shared<Thread>();
+    return std::make_shared<Thread>(this);
   }
 
   // Prerequisite: threadListLock_ readlocked
@@ -168,7 +175,7 @@ class ThreadPoolExecutor : public Executor {
   StoppedThreadQueue stoppedThreads_;
   std::atomic<bool> isJoin_; // whether the current downsizing is a join
 
-  Subject<TaskStats> taskStatsSubject_;
+  std::shared_ptr<Subject<TaskStats>> taskStatsSubject_;
 };
 
 }} // folly::wangle