Move thrift/lib/cpp/async to folly.
authorDave Watson <davejwatson@fb.com>
Wed, 12 Mar 2014 16:25:43 +0000 (09:25 -0700)
committerDave Watson <davejwatson@fb.com>
Tue, 18 Mar 2014 17:01:39 +0000 (10:01 -0700)
Summary:
Move the minimum amount of stuff and still have everything compile.  Would like to move TAsyncSocket/ServerSocket/SSL/UDP eventually, but not this round.

thrift async is used very widely now: thrift, proxygen, newer mysql async interface, even trying it out in memcache.  A common complaint is that it doesn't get wide enough notice under thrift/, so let's move it to folly/.  After moving TAsyncSocket, both HHVM and proxygen could avoid a dep on thrift as well.

Changes:
* mv files to folly/io/async
* remove 'T' prefix on classes/filenames
* change namespace to 'folly'
* remove any thrift references.

Tried this before in D470080, this time doesn't attempt to abstract libevent

@override-unit-failures

Test Plan:
fbconfig -r thrift; fbmake dev.
Will iterate on any other contbuild failures

Reviewed By: pgriess@fb.com

FB internal diff: D1195393

14 files changed:
folly/Makefile.am
folly/configure.ac
folly/io/async/AsyncTimeout.cpp [new file with mode: 0644]
folly/io/async/AsyncTimeout.h [new file with mode: 0644]
folly/io/async/EventBase.cpp [new file with mode: 0644]
folly/io/async/EventBase.h [new file with mode: 0644]
folly/io/async/EventFDWrapper.h [new file with mode: 0644]
folly/io/async/EventHandler.cpp [new file with mode: 0644]
folly/io/async/EventHandler.h [new file with mode: 0644]
folly/io/async/EventUtil.h [new file with mode: 0644]
folly/io/async/NotificationQueue.h [new file with mode: 0644]
folly/io/async/Request.cpp [new file with mode: 0644]
folly/io/async/Request.h [new file with mode: 0644]
folly/io/async/TimeoutManager.h [new file with mode: 0644]

index 6dd25ab061bc23effa25ab7ffb9e8f913aee21dc..b420da275fdf919e8b33b6ebe2ef62bb98760834 100644 (file)
@@ -79,6 +79,14 @@ nobase_follyinclude_HEADERS = \
        io/RecordIO.h \
        io/RecordIO-inl.h \
        io/TypedIOBuf.h \
+       io/async/AsyncTimeout.h \
+       io/async/EventBase.h \
+       io/async/EventFDWrapper.h \
+       io/async/EventHandler.h \
+       io/async/EventUtil.h \
+       io/async/NotificationQueue.h \
+       io/async/Request.h \
+       io/async/TimeoutManager.h \
        json.h \
        Lazy.h \
        Likely.h \
@@ -175,6 +183,10 @@ libfolly_la_SOURCES = \
        io/IOBuf.cpp \
        io/IOBufQueue.cpp \
        io/RecordIO.cpp \
+       io/async/AsyncTimeout.cpp \
+       io/async/EventBase.cpp \
+       io/async/EventHandler.cpp \
+       io/async/Request.cpp \
        json.cpp \
        detail/MemoryIdler.cpp \
        MemoryMapping.cpp \
index 3fd30fad5e4908b9d06f689dad111a0301bbcd40..4e9c804104c8c99cbcf7edbe5b098d195fa53403 100644 (file)
@@ -60,6 +60,8 @@ AC_CHECK_HEADER(double-conversion/double-conversion.h, [], [AC_MSG_ERROR(
 AC_CHECK_LIB([double-conversion],[ceil],[],[AC_MSG_ERROR(
              [Please install double-conversion library])])
 
+AC_CHECK_LIB([event], [event_set], [], [AC_MSG_ERROR([Unable to find libevent])])
+
 # Checks for typedefs, structures, and compiler characteristics.
 AC_HEADER_STDBOOL
 AC_C_CONST
diff --git a/folly/io/async/AsyncTimeout.cpp b/folly/io/async/AsyncTimeout.cpp
new file mode 100644 (file)
index 0000000..c2196dd
--- /dev/null
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "folly/io/async/AsyncTimeout.h"
+#include "folly/io/async/EventBase.h"
+#include "folly/io/async/EventUtil.h"
+#include "folly/io/async/Request.h"
+
+#include <assert.h>
+#include <glog/logging.h>
+
+namespace folly {
+
+AsyncTimeout::AsyncTimeout(TimeoutManager* timeoutManager)
+    : timeoutManager_(timeoutManager) {
+
+  event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this);
+  event_.ev_base = nullptr;
+  timeoutManager_->attachTimeoutManager(
+      this,
+      TimeoutManager::InternalEnum::NORMAL);
+  RequestContext::getStaticContext();
+}
+
+AsyncTimeout::AsyncTimeout(EventBase* eventBase)
+    : timeoutManager_(eventBase) {
+
+  event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this);
+  event_.ev_base = nullptr;
+  timeoutManager_->attachTimeoutManager(
+      this,
+      TimeoutManager::InternalEnum::NORMAL);
+  RequestContext::getStaticContext();
+}
+
+AsyncTimeout::AsyncTimeout(TimeoutManager* timeoutManager,
+                             InternalEnum internal)
+    : timeoutManager_(timeoutManager) {
+
+  event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this);
+  event_.ev_base = nullptr;
+  timeoutManager_->attachTimeoutManager(this, internal);
+  RequestContext::getStaticContext();
+}
+
+AsyncTimeout::AsyncTimeout(EventBase* eventBase, InternalEnum internal)
+    : timeoutManager_(eventBase) {
+
+  event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this);
+  event_.ev_base = nullptr;
+  timeoutManager_->attachTimeoutManager(this, internal);
+  RequestContext::getStaticContext();
+}
+
+AsyncTimeout::AsyncTimeout(): timeoutManager_(nullptr) {
+  event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this);
+  event_.ev_base = nullptr;
+  RequestContext::getStaticContext();
+}
+
+AsyncTimeout::~AsyncTimeout() {
+  cancelTimeout();
+}
+
+bool AsyncTimeout::scheduleTimeout(std::chrono::milliseconds timeout) {
+  assert(timeoutManager_ != nullptr);
+  context_ = RequestContext::saveContext();
+  return timeoutManager_->scheduleTimeout(this, timeout);
+}
+
+bool AsyncTimeout::scheduleTimeout(uint32_t milliseconds) {
+  return scheduleTimeout(std::chrono::milliseconds(milliseconds));
+}
+
+void AsyncTimeout::cancelTimeout() {
+  if (isScheduled()) {
+    timeoutManager_->cancelTimeout(this);
+  }
+}
+
+bool AsyncTimeout::isScheduled() const {
+  return EventUtil::isEventRegistered(&event_);
+}
+
+void AsyncTimeout::attachTimeoutManager(
+    TimeoutManager* timeoutManager,
+    InternalEnum internal) {
+  // This also implies no timeout is scheduled.
+  assert(timeoutManager_ == nullptr);
+  assert(timeoutManager->isInTimeoutManagerThread());
+  timeoutManager_ = timeoutManager;
+
+  timeoutManager_->attachTimeoutManager(this, internal);
+}
+
+void AsyncTimeout::attachEventBase(
+    EventBase* eventBase,
+    InternalEnum internal) {
+  attachTimeoutManager(eventBase, internal);
+}
+
+void AsyncTimeout::detachTimeoutManager() {
+  // Only allow the event base to be changed if the timeout is not
+  // currently installed.
+  if (isScheduled()) {
+    // Programmer bug.  Abort the program.
+    LOG(ERROR) << "detachEventBase() called on scheduled timeout; aborting";
+    abort();
+    return;
+  }
+
+  if (timeoutManager_) {
+    timeoutManager_->detachTimeoutManager(this);
+    timeoutManager_ = nullptr;
+  }
+}
+
+void AsyncTimeout::detachEventBase() {
+  detachTimeoutManager();
+}
+
+void AsyncTimeout::libeventCallback(int fd, short events, void* arg) {
+  AsyncTimeout* timeout = reinterpret_cast<AsyncTimeout*>(arg);
+  assert(fd == -1);
+  assert(events == EV_TIMEOUT);
+
+  // double check that ev_flags gets reset when the timeout is not running
+  assert((timeout->event_.ev_flags & ~EVLIST_INTERNAL) == EVLIST_INIT);
+
+  // this can't possibly fire if timeout->eventBase_ is nullptr
+  (void) timeout->timeoutManager_->bumpHandlingTime();
+
+  auto old_ctx =
+    RequestContext::setContext(timeout->context_);
+
+  timeout->timeoutExpired();
+
+  RequestContext::setContext(old_ctx);
+}
+
+} // folly
diff --git a/folly/io/async/AsyncTimeout.h b/folly/io/async/AsyncTimeout.h
new file mode 100644 (file)
index 0000000..d1d2ba8
--- /dev/null
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include "folly/io/async/TimeoutManager.h"
+
+#include <boost/noncopyable.hpp>
+#include <event.h>
+#include <memory>
+
+namespace folly {
+
+class EventBase;
+class RequestContext;
+class TimeoutManager;
+
+/**
+ * AsyncTimeout is used to asynchronously wait for a timeout to occur.
+ */
+class AsyncTimeout : private boost::noncopyable {
+ public:
+  typedef TimeoutManager::InternalEnum InternalEnum;
+
+  /**
+   * Create a new AsyncTimeout object, driven by the specified TimeoutManager.
+   */
+  explicit AsyncTimeout(TimeoutManager* timeoutManager);
+  explicit AsyncTimeout(EventBase* eventBase);
+
+  /**
+   * Create a new internal AsyncTimeout object.
+   *
+   * Internal timeouts are like regular timeouts, but will not stop the
+   * TimeoutManager loop from exiting if the only remaining events are internal
+   * timeouts.
+   *
+   * This is useful for implementing fallback timeouts to abort the
+   * TimeoutManager loop if the other events have not been processed within a
+   * specified time period: if the event loop takes too long the timeout will
+   * fire and can stop the event loop.  However, if all other events complete,
+   * the event loop will exit even though the internal timeout is still
+   * installed.
+   */
+  AsyncTimeout(TimeoutManager* timeoutManager, InternalEnum internal);
+  AsyncTimeout(EventBase* eventBase, InternalEnum internal);
+
+  /**
+   * Create a new AsyncTimeout object, not yet assigned to a TimeoutManager.
+   *
+   * attachEventBase() must be called prior to scheduling the timeout.
+   */
+  AsyncTimeout();
+
+  /**
+   * AsyncTimeout destructor.
+   *
+   * The timeout will be automatically cancelled if it is running.
+   */
+  virtual ~AsyncTimeout();
+
+  /**
+   * timeoutExpired() is invoked when the timeout period has expired.
+   */
+  virtual void timeoutExpired() noexcept = 0;
+
+  /**
+   * Schedule the timeout to fire in the specified number of milliseconds.
+   *
+   * After the specified number of milliseconds has elapsed, timeoutExpired()
+   * will be invoked by the TimeoutManager's main loop.
+   *
+   * If the timeout is already running, it will be rescheduled with the
+   * new timeout value.
+   *
+   * @param milliseconds  The timeout duration, in milliseconds.
+   *
+   * @return Returns true if the timeout was successfully scheduled,
+   *         and false if an error occurred.  After an error, the timeout is
+   *         always unscheduled, even if scheduleTimeout() was just
+   *         rescheduling an existing timeout.
+   */
+  bool scheduleTimeout(uint32_t milliseconds);
+  bool scheduleTimeout(std::chrono::milliseconds timeout);
+
+  /**
+   * Cancel the timeout, if it is running.
+   */
+  void cancelTimeout();
+
+  /**
+   * Returns true if the timeout is currently scheduled.
+   */
+  bool isScheduled() const;
+
+  /**
+   * Attach the timeout to a TimeoutManager.
+   *
+   * This may only be called if the timeout is not currently attached to a
+   * TimeoutManager (either by using the default constructor, or by calling
+   * detachTimeoutManager()).
+   *
+   * This method must be invoked in the TimeoutManager's thread.
+   *
+   * The internal parameter specifies if this timeout should be treated as an
+   * internal event.  TimeoutManager::loop() will return when there are no more
+   * non-internal events remaining.
+   */
+  void attachTimeoutManager(TimeoutManager* timeoutManager,
+                            InternalEnum internal = InternalEnum::NORMAL);
+  void attachEventBase(EventBase* eventBase,
+                       InternalEnum internal = InternalEnum::NORMAL);
+
+  /**
+   * Detach the timeout from its TimeoutManager.
+   *
+   * This may only be called when the timeout is not running.
+   * Once detached, the timeout may not be scheduled again until it is
+   * re-attached to a EventBase by calling attachEventBase().
+   *
+   * This method must be called from the current TimeoutManager's thread.
+   */
+  void detachTimeoutManager();
+  void detachEventBase();
+
+  /**
+   * Returns the internal handle to the event
+   */
+  struct event* getEvent() {
+    return &event_;
+  }
+
+ private:
+  static void libeventCallback(int fd, short events, void* arg);
+
+  struct event event_;
+
+  /*
+   * Store a pointer to the TimeoutManager.  We only use this
+   * for some assert() statements, to make sure that AsyncTimeout is always
+   * used from the correct thread.
+   */
+  TimeoutManager* timeoutManager_;
+
+  // Save the request context for when the timeout fires.
+  std::shared_ptr<RequestContext> context_;
+};
+
+} // folly
diff --git a/folly/io/async/EventBase.cpp b/folly/io/async/EventBase.cpp
new file mode 100644 (file)
index 0000000..8eee710
--- /dev/null
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include "folly/io/async/EventBase.h"
+
+#include "folly/io/async/NotificationQueue.h"
+
+#include <boost/static_assert.hpp>
+#include <fcntl.h>
+#include <pthread.h>
+#include <unistd.h>
+
+namespace {
+
+using folly::Cob;
+using folly::EventBase;
+
+class Tr1FunctionLoopCallback : public EventBase::LoopCallback {
+ public:
+  explicit Tr1FunctionLoopCallback(const Cob& function)
+    : function_(function) {}
+
+  virtual void runLoopCallback() noexcept {
+    function_();
+    delete this;
+  }
+
+ private:
+  Cob function_;
+};
+
+}
+
+namespace folly {
+
+const int kNoFD = -1;
+
+/*
+ * EventBase::FunctionRunner
+ */
+
+class EventBase::FunctionRunner
+    : public NotificationQueue<std::pair<void (*)(void*), void*>>::Consumer {
+ public:
+  void messageAvailable(std::pair<void (*)(void*), void*>&& msg) {
+
+    // In libevent2, internal events do not break the loop.
+    // Most users would expect loop(), followed by runInEventBaseThread(),
+    // to break the loop and check if it should exit or not.
+    // To have similar bejaviour to libevent1.4, tell the loop to break here.
+    // Note that loop() may still continue to loop, but it will also check the
+    // stop_ flag as well as runInLoop callbacks, etc.
+    event_base_loopbreak(getEventBase()->evb_);
+
+    if (msg.first == nullptr && msg.second == nullptr) {
+      // terminateLoopSoon() sends a null message just to
+      // wake up the loop.  We can ignore these messages.
+      return;
+    }
+
+    // If function is nullptr, just log and move on
+    if (!msg.first) {
+      LOG(ERROR) << "nullptr callback registered to be run in "
+                 << "event base thread";
+      return;
+    }
+
+    // The function should never throw an exception, because we have no
+    // way of knowing what sort of error handling to perform.
+    //
+    // If it does throw, log a message and abort the program.
+    try {
+      msg.first(msg.second);
+    } catch (const std::exception& ex) {
+      LOG(ERROR) << "runInEventBaseThread() function threw a "
+                 << typeid(ex).name() << " exception: " << ex.what();
+      abort();
+    } catch (...) {
+      LOG(ERROR) << "runInEventBaseThread() function threw an exception";
+      abort();
+    }
+  }
+};
+
+/*
+ * EventBase::CobTimeout methods
+ */
+
+void EventBase::CobTimeout::timeoutExpired() noexcept {
+  // For now, we just swallow any exceptions that the callback threw.
+  try {
+    cob_();
+  } catch (const std::exception& ex) {
+    LOG(ERROR) << "EventBase::runAfterDelay() callback threw "
+               << typeid(ex).name() << " exception: " << ex.what();
+  } catch (...) {
+    LOG(ERROR) << "EventBase::runAfterDelay() callback threw non-exception "
+               << "type";
+  }
+
+  // The CobTimeout object was allocated on the heap by runAfterDelay(),
+  // so delete it now that the it has fired.
+  delete this;
+}
+
+/*
+ * EventBase methods
+ */
+
+EventBase::EventBase()
+  : runOnceCallbacks_(nullptr)
+  , stop_(false)
+  , loopThread_(0)
+  , evb_(static_cast<event_base*>(event_init()))
+  , queue_(nullptr)
+  , fnRunner_(nullptr)
+  , maxLatency_(0)
+  , avgLoopTime_(2000000)
+  , maxLatencyLoopTime_(avgLoopTime_)
+  , nextLoopCnt_(-40)       // Early wrap-around so bugs will manifest soon
+  , latestLoopCnt_(nextLoopCnt_)
+  , startWork_(0)
+  , observer_(nullptr)
+  , observerSampleCount_(0) {
+  VLOG(5) << "EventBase(): Created.";
+  initNotificationQueue();
+  RequestContext::getStaticContext();
+}
+
+// takes ownership of the event_base
+EventBase::EventBase(event_base* evb)
+  : runOnceCallbacks_(nullptr)
+  , stop_(false)
+  , loopThread_(0)
+  , evb_(evb)
+  , queue_(nullptr)
+  , fnRunner_(nullptr)
+  , maxLatency_(0)
+  , avgLoopTime_(2000000)
+  , maxLatencyLoopTime_(avgLoopTime_)
+  , nextLoopCnt_(-40)       // Early wrap-around so bugs will manifest soon
+  , latestLoopCnt_(nextLoopCnt_)
+  , startWork_(0)
+  , observer_(nullptr)
+  , observerSampleCount_(0) {
+  initNotificationQueue();
+  RequestContext::getStaticContext();
+}
+
+EventBase::~EventBase() {
+  // Delete any unfired CobTimeout objects, so that we don't leak memory
+  // (Note that we don't fire them.  The caller is responsible for cleaning up
+  // its own data structures if it destroys the EventBase with unfired events
+  // remaining.)
+  while (!pendingCobTimeouts_.empty()) {
+    CobTimeout* timeout = &pendingCobTimeouts_.front();
+    delete timeout;
+  }
+
+  (void) runLoopCallbacks(false);
+
+  // Stop consumer before deleting NotificationQueue
+  fnRunner_->stopConsuming();
+  event_base_free(evb_);
+  VLOG(5) << "EventBase(): Destroyed.";
+}
+
+int EventBase::getNotificationQueueSize() const {
+  return queue_->size();
+}
+
+// Set smoothing coefficient for loop load average; input is # of milliseconds
+// for exp(-1) decay.
+void EventBase::setLoadAvgMsec(uint32_t ms) {
+  uint64_t us = 1000 * ms;
+  if (ms > 0) {
+    maxLatencyLoopTime_.setTimeInterval(us);
+    avgLoopTime_.setTimeInterval(us);
+  } else {
+    LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
+  }
+}
+
+void EventBase::resetLoadAvg(double value) {
+  avgLoopTime_.reset(value);
+  maxLatencyLoopTime_.reset(value);
+}
+
+static int64_t getTimeDelta(int64_t *prev) {
+  int64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+    std::chrono::steady_clock::now().time_since_epoch()).count();
+  int64_t delta = now - *prev;
+  *prev = now;
+  return delta;
+}
+
+void EventBase::waitUntilRunning() {
+  while (!isRunning()) {
+    sched_yield();
+  }
+}
+
+// enters the event_base loop -- will only exit when forced to
+bool EventBase::loop() {
+  VLOG(5) << "EventBase(): Starting loop.";
+  int res = 0;
+  bool ranLoopCallbacks;
+  int nonBlocking;
+
+  loopThread_.store(pthread_self(), std::memory_order_release);
+
+#if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
+  if (!name_.empty()) {
+    pthread_setname_np(pthread_self(), name_.c_str());
+  }
+#endif
+
+  int64_t prev = std::chrono::duration_cast<std::chrono::milliseconds>(
+    std::chrono::steady_clock::now().time_since_epoch()).count();
+  int64_t idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
+    std::chrono::steady_clock::now().time_since_epoch()).count();
+
+  // TODO: Read stop_ atomically with an acquire barrier.
+  while (!stop_) {
+    ++nextLoopCnt_;
+
+    // nobody can add loop callbacks from within this thread if
+    // we don't have to handle anything to start with...
+    nonBlocking = (loopCallbacks_.empty() ? 0 : EVLOOP_NONBLOCK);
+    res = event_base_loop(evb_, EVLOOP_ONCE | nonBlocking);
+    ranLoopCallbacks = runLoopCallbacks();
+
+    int64_t busy = std::chrono::duration_cast<std::chrono::microseconds>(
+      std::chrono::steady_clock::now().time_since_epoch()).count() - startWork_;
+    int64_t idle = startWork_ - idleStart;
+
+    avgLoopTime_.addSample(idle, busy);
+    maxLatencyLoopTime_.addSample(idle, busy);
+
+    if (observer_) {
+      if (observerSampleCount_++ == observer_->getSampleRate()) {
+        observerSampleCount_ = 0;
+        observer_->loopSample(busy, idle);
+      }
+    }
+
+    VLOG(11) << "EventBase " << this         << " did not timeout "
+     " loop time guess: "    << busy + idle  <<
+     " idle time: "          << idle         <<
+     " busy time: "          << busy         <<
+     " avgLoopTime: "        << avgLoopTime_.get() <<
+     " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
+     " maxLatency_: "        << maxLatency_ <<
+     " nothingHandledYet(): "<< nothingHandledYet();
+
+    // see if our average loop time has exceeded our limit
+    if ((maxLatency_ > 0) &&
+        (maxLatencyLoopTime_.get() > double(maxLatency_))) {
+      maxLatencyCob_();
+      // back off temporarily -- don't keep spamming maxLatencyCob_
+      // if we're only a bit over the limit
+      maxLatencyLoopTime_.dampen(0.9);
+    }
+
+    // Our loop run did real work; reset the idle timer
+    idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
+      std::chrono::steady_clock::now().time_since_epoch()).count();
+
+    // If the event loop indicate that there were no more events, and
+    // we also didn't have any loop callbacks to run, there is nothing left to
+    // do.
+    if (res != 0 && !ranLoopCallbacks) {
+      // Since Notification Queue is marked 'internal' some events may not have
+      // run.  Run them manually if so, and continue looping.
+      //
+      if (getNotificationQueueSize() > 0) {
+        fnRunner_->handlerReady(0);
+      } else {
+        break;
+      }
+    }
+
+    VLOG(5) << "EventBase " << this << " loop time: " << getTimeDelta(&prev);
+  }
+  // Reset stop_ so loop() can be called again
+  stop_ = false;
+
+  if (res < 0) {
+    LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
+    return false;
+  } else if (res == 1) {
+    VLOG(5) << "EventBase: ran out of events (exiting loop)!";
+  } else if (res > 1) {
+    LOG(ERROR) << "EventBase: unknown event loop result = " << res;
+    return false;
+  }
+
+  loopThread_.store(0, std::memory_order_release);
+
+  VLOG(5) << "EventBase(): Done with loop.";
+  return true;
+}
+
+void EventBase::loopForever() {
+  // Update the notification queue event to treat it as a normal (non-internal)
+  // event.  The notification queue event always remains installed, and the main
+  // loop won't exit with it installed.
+  fnRunner_->stopConsuming();
+  fnRunner_->startConsuming(this, queue_.get());
+
+  bool ret = loop();
+
+  // Restore the notification queue internal flag
+  fnRunner_->stopConsuming();
+  fnRunner_->startConsumingInternal(this, queue_.get());
+
+  if (!ret) {
+    folly::throwSystemError("error in EventBase::loopForever()");
+  }
+}
+
+bool EventBase::bumpHandlingTime() {
+  VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
+    " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
+  if(nothingHandledYet()) {
+    latestLoopCnt_ = nextLoopCnt_;
+    // set the time
+    startWork_ = std::chrono::duration_cast<std::chrono::microseconds>(
+      std::chrono::steady_clock::now().time_since_epoch()).count();
+
+    VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
+      " (loop) startWork_ " << startWork_;
+    return true;
+  }
+  return false;
+}
+
+void EventBase::terminateLoopSoon() {
+  VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
+
+  if (!isRunning()) {
+    return;
+  }
+
+  // Set stop to true, so the event loop will know to exit.
+  // TODO: We should really use an atomic operation here with a release
+  // barrier.
+  stop_ = true;
+
+  // Call event_base_loopbreak() so that libevent will exit the next time
+  // around the loop.
+  event_base_loopbreak(evb_);
+
+  // If terminateLoopSoon() is called from another thread,
+  // the EventBase thread might be stuck waiting for events.
+  // In this case, it won't wake up and notice that stop_ is set until it
+  // receives another event.  Send an empty frame to the notification queue
+  // so that the event loop will wake up even if there are no other events.
+  //
+  // We don't care about the return value of trySendFrame().  If it fails
+  // this likely means the EventBase already has lots of events waiting
+  // anyway.
+  try {
+    queue_->putMessage(std::make_pair(nullptr, nullptr));
+  } catch (...) {
+    // We don't care if putMessage() fails.  This likely means
+    // the EventBase already has lots of events waiting anyway.
+  }
+}
+
+void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
+  DCHECK(isInEventBaseThread());
+  callback->cancelLoopCallback();
+  callback->context_ = RequestContext::saveContext();
+  if (runOnceCallbacks_ != nullptr && thisIteration) {
+    runOnceCallbacks_->push_back(*callback);
+  } else {
+    loopCallbacks_.push_back(*callback);
+  }
+}
+
+void EventBase::runInLoop(const Cob& cob, bool thisIteration) {
+  DCHECK(isInEventBaseThread());
+  Tr1FunctionLoopCallback* wrapper = new Tr1FunctionLoopCallback(cob);
+  wrapper->context_ = RequestContext::saveContext();
+  if (runOnceCallbacks_ != nullptr && thisIteration) {
+    runOnceCallbacks_->push_back(*wrapper);
+  } else {
+    loopCallbacks_.push_back(*wrapper);
+  }
+}
+
+bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) {
+  // Send the message.
+  // It will be received by the FunctionRunner in the EventBase's thread.
+
+  // We try not to schedule nullptr callbacks
+  if (!fn) {
+    LOG(ERROR) << "EventBase " << this
+               << ": Scheduling nullptr callbacks is not allowed";
+    return false;
+  }
+
+  // Short-circuit if we are already in our event base
+  if (inRunningEventBaseThread()) {
+    runInLoop(new RunInLoopCallback(fn, arg));
+    return true;
+
+  }
+
+  try {
+    queue_->putMessage(std::make_pair(fn, arg));
+  } catch (const std::exception& ex) {
+    LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
+               << fn << "for EventBase thread: " << ex.what();
+    return false;
+  }
+
+  return true;
+}
+
+bool EventBase::runInEventBaseThread(const Cob& fn) {
+  // Short-circuit if we are already in our event base
+  if (inRunningEventBaseThread()) {
+    runInLoop(fn);
+    return true;
+  }
+
+  Cob* fnCopy;
+  // Allocate a copy of the function so we can pass it to the other thread
+  // The other thread will delete this copy once the function has been run
+  try {
+    fnCopy = new Cob(fn);
+  } catch (const std::bad_alloc& ex) {
+    LOG(ERROR) << "failed to allocate tr::function copy "
+               << "for runInEventBaseThread()";
+    return false;
+  }
+
+  if (!runInEventBaseThread(&EventBase::runTr1FunctionPtr, fnCopy)) {
+    delete fnCopy;
+    return false;
+  }
+
+  return true;
+}
+
+bool EventBase::runAfterDelay(const Cob& cob,
+                               int milliseconds,
+                               TimeoutManager::InternalEnum in) {
+  CobTimeout* timeout = new CobTimeout(this, cob, in);
+  if (!timeout->scheduleTimeout(milliseconds)) {
+    delete timeout;
+    return false;
+  }
+
+  pendingCobTimeouts_.push_back(*timeout);
+  return true;
+}
+
+bool EventBase::runLoopCallbacks(bool setContext) {
+  if (!loopCallbacks_.empty()) {
+    bumpHandlingTime();
+    // Swap the loopCallbacks_ list with a temporary list on our stack.
+    // This way we will only run callbacks scheduled at the time
+    // runLoopCallbacks() was invoked.
+    //
+    // If any of these callbacks in turn call runInLoop() to schedule more
+    // callbacks, those new callbacks won't be run until the next iteration
+    // around the event loop.  This prevents runInLoop() callbacks from being
+    // able to start file descriptor and timeout based events.
+    LoopCallbackList currentCallbacks;
+    currentCallbacks.swap(loopCallbacks_);
+    runOnceCallbacks_ = &currentCallbacks;
+
+    while (!currentCallbacks.empty()) {
+      LoopCallback* callback = &currentCallbacks.front();
+      currentCallbacks.pop_front();
+      if (setContext) {
+        RequestContext::setContext(callback->context_);
+      }
+      callback->runLoopCallback();
+    }
+
+    runOnceCallbacks_ = nullptr;
+    return true;
+  }
+  return false;
+}
+
+void EventBase::initNotificationQueue() {
+  // Infinite size queue
+  queue_.reset(new NotificationQueue<std::pair<void (*)(void*), void*>>());
+
+  // We allocate fnRunner_ separately, rather than declaring it directly
+  // as a member of EventBase solely so that we don't need to include
+  // NotificationQueue.h from EventBase.h
+  fnRunner_.reset(new FunctionRunner());
+
+  // Mark this as an internal event, so event_base_loop() will return if
+  // there are no other events besides this one installed.
+  //
+  // Most callers don't care about the internal notification queue used by
+  // EventBase.  The queue is always installed, so if we did count the queue as
+  // an active event, loop() would never exit with no more events to process.
+  // Users can use loopForever() if they do care about the notification queue.
+  // (This is useful for EventBase threads that do nothing but process
+  // runInEventBaseThread() notifications.)
+  fnRunner_->startConsumingInternal(this, queue_.get());
+}
+
+void EventBase::SmoothLoopTime::setTimeInterval(uint64_t timeInterval) {
+  expCoeff_ = -1.0/timeInterval;
+  VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
+}
+
+void EventBase::SmoothLoopTime::reset(double value) {
+  value_ = value;
+}
+
+void EventBase::SmoothLoopTime::addSample(int64_t idle, int64_t busy) {
+    /*
+     * Position at which the busy sample is considered to be taken.
+     * (Allows to quickly skew our average without editing much code)
+     */
+    enum BusySamplePosition {
+      RIGHT = 0,  // busy sample placed at the end of the iteration
+      CENTER = 1, // busy sample placed at the middle point of the iteration
+      LEFT = 2,   // busy sample placed at the beginning of the iteration
+    };
+
+  VLOG(11) << "idle " << idle << " oldBusyLeftover_ " << oldBusyLeftover_ <<
+              " idle + oldBusyLeftover_ " << idle + oldBusyLeftover_ <<
+              " busy " << busy << " " << __PRETTY_FUNCTION__;
+  idle += oldBusyLeftover_ + busy;
+  oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
+  idle -= oldBusyLeftover_;
+
+  double coeff = exp(idle * expCoeff_);
+  value_ *= coeff;
+  value_ += (1.0 - coeff) * busy;
+}
+
+bool EventBase::nothingHandledYet() {
+  VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
+  return (nextLoopCnt_ != latestLoopCnt_);
+}
+
+/* static */
+void EventBase::runTr1FunctionPtr(Cob* fn) {
+  // The function should never throw an exception, because we have no
+  // way of knowing what sort of error handling to perform.
+  //
+  // If it does throw, log a message and abort the program.
+  try {
+    (*fn)();
+  } catch (const std::exception &ex) {
+    LOG(ERROR) << "runInEventBaseThread() std::function threw a "
+               << typeid(ex).name() << " exception: " << ex.what();
+    abort();
+  } catch (...) {
+    LOG(ERROR) << "runInEventBaseThread() std::function threw an exception";
+    abort();
+  }
+
+  // The function object was allocated by runInEventBaseThread().
+  // Delete it once it has been run.
+  delete fn;
+}
+
+EventBase::RunInLoopCallback::RunInLoopCallback(void (*fn)(void*), void* arg)
+    : fn_(fn)
+    , arg_(arg) {}
+
+void EventBase::RunInLoopCallback::runLoopCallback() noexcept {
+  fn_(arg_);
+  delete this;
+}
+
+void EventBase::attachTimeoutManager(AsyncTimeout* obj,
+                                      InternalEnum internal) {
+
+  struct event* ev = obj->getEvent();
+  assert(ev->ev_base == nullptr);
+
+  event_base_set(getLibeventBase(), ev);
+  if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
+    // Set the EVLIST_INTERNAL flag
+    ev->ev_flags |= EVLIST_INTERNAL;
+  }
+}
+
+void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
+  cancelTimeout(obj);
+  struct event* ev = obj->getEvent();
+  ev->ev_base = nullptr;
+}
+
+bool EventBase::scheduleTimeout(AsyncTimeout* obj,
+                                 std::chrono::milliseconds timeout) {
+  assert(isInEventBaseThread());
+  // Set up the timeval and add the event
+  struct timeval tv;
+  tv.tv_sec = timeout.count() / 1000LL;
+  tv.tv_usec = (timeout.count() % 1000LL) * 1000LL;
+
+  struct event* ev = obj->getEvent();
+  if (event_add(ev, &tv) < 0) {
+    LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
+    return false;
+  }
+
+  return true;
+}
+
+void EventBase::cancelTimeout(AsyncTimeout* obj) {
+  assert(isInEventBaseThread());
+  struct event* ev = obj->getEvent();
+  if (EventUtil::isEventRegistered(ev)) {
+    event_del(ev);
+  }
+}
+
+void EventBase::setName(const std::string& name) {
+  assert(isInEventBaseThread());
+  name_ = name;
+#if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
+  if (isRunning()) {
+    pthread_setname_np(loopThread_.load(std::memory_order_relaxed),
+                       name_.c_str());
+  }
+#endif
+}
+
+const std::string& EventBase::getName() {
+  assert(isInEventBaseThread());
+  return name_;
+}
+
+} // folly
diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h
new file mode 100644 (file)
index 0000000..dae1be1
--- /dev/null
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <glog/logging.h>
+#include "folly/io/async/AsyncTimeout.h"
+#include "folly/io/async/TimeoutManager.h"
+#include <memory>
+#include <stack>
+#include <list>
+#include <queue>
+#include <cstdlib>
+#include <set>
+#include <utility>
+#include <boost/intrusive/list.hpp>
+#include <boost/utility.hpp>
+#include <functional>
+#include <event.h>  // libevent
+#include <errno.h>
+#include <math.h>
+#include <atomic>
+
+namespace folly {
+
+typedef std::function<void()> Cob;
+template <typename MessageT>
+class NotificationQueue;
+
+class EventBaseObserver {
+ public:
+  virtual ~EventBaseObserver() {}
+
+  virtual uint32_t getSampleRate() const = 0;
+
+  virtual void loopSample(
+    int64_t busyTime, int64_t idleTime) = 0;
+};
+
+/**
+ * This class is a wrapper for all asynchronous I/O processing functionality
+ *
+ * EventBase provides a main loop that notifies EventHandler callback objects
+ * when I/O is ready on a file descriptor, and notifies AsyncTimeout objects
+ * when a specified timeout has expired.  More complex, higher-level callback
+ * mechanisms can then be built on top of EventHandler and AsyncTimeout.
+ *
+ * A EventBase object can only drive an event loop for a single thread.  To
+ * take advantage of multiple CPU cores, most asynchronous I/O servers have one
+ * thread per CPU, and use a separate EventBase for each thread.
+ *
+ * In general, most EventBase methods may only be called from the thread
+ * running the EventBase's loop.  There are a few exceptions to this rule, for
+ * methods that are explicitly intended to allow communication with a
+ * EventBase from other threads.  When it is safe to call a method from
+ * another thread it is explicitly listed in the method comments.
+ */
+class EventBase : private boost::noncopyable, public TimeoutManager {
+ public:
+  /**
+   * A callback interface to use with runInLoop()
+   *
+   * Derive from this class if you need to delay some code execution until the
+   * next iteration of the event loop.  This allows you to schedule code to be
+   * invoked from the top-level of the loop, after your immediate callers have
+   * returned.
+   *
+   * If a LoopCallback object is destroyed while it is scheduled to be run in
+   * the next loop iteration, it will automatically be cancelled.
+   */
+  class LoopCallback {
+   public:
+    virtual ~LoopCallback() {}
+
+    virtual void runLoopCallback() noexcept = 0;
+    void cancelLoopCallback() {
+      hook_.unlink();
+    }
+
+    bool isLoopCallbackScheduled() const {
+      return hook_.is_linked();
+    }
+
+   private:
+    typedef boost::intrusive::list_member_hook<
+      boost::intrusive::link_mode<boost::intrusive::auto_unlink> > ListHook;
+
+    ListHook hook_;
+
+    typedef boost::intrusive::list<
+      LoopCallback,
+      boost::intrusive::member_hook<LoopCallback, ListHook,
+                                    &LoopCallback::hook_>,
+      boost::intrusive::constant_time_size<false> > List;
+
+    // EventBase needs access to LoopCallbackList (and therefore to hook_)
+    friend class EventBase;
+    std::shared_ptr<RequestContext> context_;
+  };
+
+  /**
+   * Create a new EventBase object.
+   */
+  EventBase();
+
+  /**
+   * Create a new EventBase object that will use the specified libevent
+   * event_base object to drive the event loop.
+   *
+   * The EventBase will take ownership of this event_base, and will call
+   * event_base_free(evb) when the EventBase is destroyed.
+   */
+  explicit EventBase(event_base* evb);
+  ~EventBase();
+
+  /**
+   * Runs the event loop.
+   *
+   * loop() will loop waiting for I/O or timeouts and invoking EventHandler
+   * and AsyncTimeout callbacks as their events become ready.  loop() will
+   * only return when there are no more events remaining to process, or after
+   * terminateLoopSoon() has been called.
+   *
+   * loop() may be called again to restart event processing after a previous
+   * call to loop() or loopForever() has returned.
+   *
+   * Returns true if the loop completed normally (if it processed all
+   * outstanding requests, or if terminateLoopSoon() was called).  If an error
+   * occurs waiting for events, false will be returned.
+   */
+  bool loop();
+
+  /**
+   * Runs the event loop.
+   *
+   * loopForever() behaves like loop(), except that it keeps running even if
+   * when there are no more user-supplied EventHandlers or AsyncTimeouts
+   * registered.  It will only return after terminateLoopSoon() has been
+   * called.
+   *
+   * This is useful for callers that want to wait for other threads to call
+   * runInEventBaseThread(), even when there are no other scheduled events.
+   *
+   * loopForever() may be called again to restart event processing after a
+   * previous call to loop() or loopForever() has returned.
+   *
+   * Throws a std::system_error if an error occurs.
+   */
+  void loopForever();
+
+  /**
+   * Causes the event loop to exit soon.
+   *
+   * This will cause an existing call to loop() or loopForever() to stop event
+   * processing and return, even if there are still events remaining to be
+   * processed.
+   *
+   * It is safe to call terminateLoopSoon() from another thread to cause loop()
+   * to wake up and return in the EventBase loop thread.  terminateLoopSoon()
+   * may also be called from the loop thread itself (for example, a
+   * EventHandler or AsyncTimeout callback may call terminateLoopSoon() to
+   * cause the loop to exit after the callback returns.)
+   *
+   * Note that the caller is responsible for ensuring that cleanup of all event
+   * callbacks occurs properly.  Since terminateLoopSoon() causes the loop to
+   * exit even when there are pending events present, there may be remaining
+   * callbacks present waiting to be invoked.  If the loop is later restarted
+   * pending events will continue to be processed normally, however if the
+   * EventBase is destroyed after calling terminateLoopSoon() it is the
+   * caller's responsibility to ensure that cleanup happens properly even if
+   * some outstanding events are never processed.
+   */
+  void terminateLoopSoon();
+
+  /**
+   * Adds the given callback to a queue of things run after the current pass
+   * through the event loop completes.  Note that if this callback calls
+   * runInLoop() the new callback won't be called until the main event loop
+   * has gone through a cycle.
+   *
+   * This method may only be called from the EventBase's thread.  This
+   * essentially allows an event handler to schedule an additional callback to
+   * be invoked after it returns.
+   *
+   * Use runInEventBaseThread() to schedule functions from another thread.
+   *
+   * The thisIteration parameter makes this callback run in this loop
+   * iteration, instead of the next one, even if called from a
+   * runInLoop callback (normal io callbacks that call runInLoop will
+   * always run in this iteration).  This was originally added to
+   * support detachEventBase, as a user callback may have called
+   * terminateLoopSoon(), but we want to make sure we detach.  Also,
+   * detachEventBase almost always must be called from the base event
+   * loop to ensure the stack is unwound, since most users of
+   * EventBase are not thread safe.
+   *
+   * Ideally we would not need thisIteration, and instead just use
+   * runInLoop with loop() (instead of terminateLoopSoon).
+   */
+  void runInLoop(LoopCallback* callback, bool thisIteration = false);
+
+  /**
+   * Convenience function to call runInLoop() with a std::function.
+   *
+   * This creates a LoopCallback object to wrap the std::function, and invoke
+   * the std::function when the loop callback fires.  This is slightly more
+   * expensive than defining your own LoopCallback, but more convenient in
+   * areas that aren't performance sensitive where you just want to use
+   * std::bind.  (std::bind is fairly slow on even by itself.)
+   *
+   * This method may only be called from the EventBase's thread.  This
+   * essentially allows an event handler to schedule an additional callback to
+   * be invoked after it returns.
+   *
+   * Use runInEventBaseThread() to schedule functions from another thread.
+   */
+  void runInLoop(const Cob& c, bool thisIteration = false);
+
+  /**
+   * Run the specified function in the EventBase's thread.
+   *
+   * This method is thread-safe, and may be called from another thread.
+   *
+   * If runInEventBaseThread() is called when the EventBase loop is not
+   * running, the function call will be delayed until the next time the loop is
+   * started.
+   *
+   * If runInEventBaseThread() returns true the function has successfully been
+   * scheduled to run in the loop thread.  However, if the loop is terminated
+   * (and never later restarted) before it has a chance to run the requested
+   * function, the function may never be run at all.  The caller is responsible
+   * for handling this situation correctly if they may terminate the loop with
+   * outstanding runInEventBaseThread() calls pending.
+   *
+   * If two calls to runInEventBaseThread() are made from the same thread, the
+   * functions will always be run in the order that they were scheduled.
+   * Ordering between functions scheduled from separate threads is not
+   * guaranteed.
+   *
+   * @param fn  The function to run.  The function must not throw any
+   *     exceptions.
+   * @param arg An argument to pass to the function.
+   *
+   * @return Returns true if the function was successfully scheduled, or false
+   *         if there was an error scheduling the function.
+   */
+  template<typename T>
+  bool runInEventBaseThread(void (*fn)(T*), T* arg) {
+    return runInEventBaseThread(reinterpret_cast<void (*)(void*)>(fn),
+                                reinterpret_cast<void*>(arg));
+  }
+
+  bool runInEventBaseThread(void (*fn)(void*), void* arg);
+
+  /**
+   * Run the specified function in the EventBase's thread
+   *
+   * This version of runInEventBaseThread() takes a std::function object.
+   * Note that this is less efficient than the version that takes a plain
+   * function pointer and void* argument, as it has to allocate memory to copy
+   * the std::function object.
+   *
+   * If the EventBase loop is terminated before it has a chance to run this
+   * function, the allocated memory will be leaked.  The caller is responsible
+   * for ensuring that the EventBase loop is not terminated before this
+   * function can run.
+   *
+   * The function must not throw any exceptions.
+   */
+  bool runInEventBaseThread(const Cob& fn);
+
+  /**
+   * Runs the given Cob at some time after the specified number of
+   * milliseconds.  (No guarantees exactly when.)
+   *
+   * @return  true iff the cob was successfully registered.
+   */
+  bool runAfterDelay(
+      const Cob& c,
+      int milliseconds,
+      TimeoutManager::InternalEnum = TimeoutManager::InternalEnum::NORMAL);
+
+  /**
+   * Set the maximum desired latency in us and provide a callback which will be
+   * called when that latency is exceeded.
+   */
+  void setMaxLatency(int64_t maxLatency, const Cob& maxLatencyCob) {
+    maxLatency_ = maxLatency;
+    maxLatencyCob_ = maxLatencyCob;
+  }
+
+  /**
+   * Set smoothing coefficient for loop load average; # of milliseconds
+   * for exp(-1) (1/2.71828...) decay.
+   */
+  void setLoadAvgMsec(uint32_t ms);
+
+  /**
+   * reset the load average to a desired value
+   */
+  void resetLoadAvg(double value = 0.0);
+
+  /**
+   * Get the average loop time in microseconds (an exponentially-smoothed ave)
+   */
+  double getAvgLoopTime() const {
+    return avgLoopTime_.get();
+  }
+
+  /**
+    * check if the event base loop is running.
+   */
+  bool isRunning() const {
+    return loopThread_.load(std::memory_order_relaxed) != 0;
+  }
+
+  /**
+   * wait until the event loop starts (after starting the event loop thread).
+   */
+  void waitUntilRunning();
+
+  int getNotificationQueueSize() const;
+
+  /**
+   * Verify that current thread is the EventBase thread, if the EventBase is
+   * running.
+   */
+  bool isInEventBaseThread() const {
+    auto tid = loopThread_.load(std::memory_order_relaxed);
+    return tid == 0 || pthread_equal(tid, pthread_self());
+  }
+
+  bool inRunningEventBaseThread() const {
+    return pthread_equal(
+      loopThread_.load(std::memory_order_relaxed), pthread_self());
+  }
+
+  // --------- interface to underlying libevent base ------------
+  // Avoid using these functions if possible.  These functions are not
+  // guaranteed to always be present if we ever provide alternative EventBase
+  // implementations that do not use libevent internally.
+  event_base* getLibeventBase() const { return evb_; }
+  static const char* getLibeventVersion() { return event_get_version(); }
+  static const char* getLibeventMethod() { return event_get_method(); }
+
+  /**
+   * only EventHandler/AsyncTimeout subclasses and ourselves should
+   * ever call this.
+   *
+   * This is used to mark the beginning of a new loop cycle by the
+   * first handler fired within that cycle.
+   *
+   */
+  bool bumpHandlingTime();
+
+  class SmoothLoopTime {
+   public:
+    explicit SmoothLoopTime(uint64_t timeInterval)
+      : expCoeff_(-1.0/timeInterval)
+      , value_(0.0)
+      , oldBusyLeftover_(0) {
+      VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
+    }
+
+    void setTimeInterval(uint64_t timeInterval);
+    void reset(double value = 0.0);
+
+    void addSample(int64_t idle, int64_t busy);
+
+    double get() const {
+      return value_;
+    }
+
+    void dampen(double factor) {
+      value_ *= factor;
+    }
+
+   private:
+    double  expCoeff_;
+    double  value_;
+    int64_t oldBusyLeftover_;
+  };
+
+  void setObserver(
+    const std::shared_ptr<EventBaseObserver>& observer) {
+    observer_ = observer;
+  }
+
+  const std::shared_ptr<EventBaseObserver>& getObserver() {
+    return observer_;
+  }
+
+  /**
+   * Set the name of the thread that runs this event base.
+   */
+  void setName(const std::string& name);
+
+  /**
+   * Returns the name of the thread that runs this event base.
+   */
+  const std::string& getName();
+
+ private:
+
+  // TimeoutManager
+  void attachTimeoutManager(AsyncTimeout* obj,
+                            TimeoutManager::InternalEnum internal);
+
+  void detachTimeoutManager(AsyncTimeout* obj);
+
+  bool scheduleTimeout(AsyncTimeout* obj, std::chrono::milliseconds timeout);
+
+  void cancelTimeout(AsyncTimeout* obj);
+
+  bool isInTimeoutManagerThread() {
+    return isInEventBaseThread();
+  }
+
+  // Helper class used to short circuit runInEventBaseThread
+  class RunInLoopCallback : public LoopCallback {
+   public:
+    RunInLoopCallback(void (*fn)(void*), void* arg);
+    void runLoopCallback() noexcept;
+
+   private:
+    void (*fn_)(void*);
+    void* arg_;
+  };
+
+  /*
+   * Helper function that tells us whether we have already handled
+   * some event/timeout/callback in this loop iteration.
+   */
+  bool nothingHandledYet();
+
+  // --------- libevent callbacks (not for client use) ------------
+
+  static void runTr1FunctionPtr(std::function<void()>* fn);
+
+  // small object used as a callback arg with enough info to execute the
+  // appropriate client-provided Cob
+  class CobTimeout : public AsyncTimeout {
+   public:
+    CobTimeout(EventBase* b, const Cob& c, TimeoutManager::InternalEnum in)
+        : AsyncTimeout(b, in), cob_(c) {}
+
+    virtual void timeoutExpired() noexcept;
+
+   private:
+    Cob cob_;
+
+   public:
+    typedef boost::intrusive::list_member_hook<
+      boost::intrusive::link_mode<boost::intrusive::auto_unlink> > ListHook;
+
+    ListHook hook;
+
+    typedef boost::intrusive::list<
+      CobTimeout,
+      boost::intrusive::member_hook<CobTimeout, ListHook, &CobTimeout::hook>,
+      boost::intrusive::constant_time_size<false> > List;
+  };
+
+  typedef LoopCallback::List LoopCallbackList;
+  class FunctionRunner;
+
+  // executes any callbacks queued by runInLoop(); returns false if none found
+  bool runLoopCallbacks(bool setContext = true);
+
+  void initNotificationQueue();
+
+  CobTimeout::List pendingCobTimeouts_;
+
+  LoopCallbackList loopCallbacks_;
+
+  // This will be null most of the time, but point to currentCallbacks
+  // if we are in the middle of running loop callbacks, such that
+  // runInLoop(..., true) will always run in the current loop
+  // iteration.
+  LoopCallbackList* runOnceCallbacks_;
+
+  // stop_ is set by terminateLoopSoon() and is used by the main loop
+  // to determine if it should exit
+  bool stop_;
+
+  // The ID of the thread running the main loop.
+  // 0 if loop is not running.
+  // Note: POSIX doesn't guarantee that 0 is an invalid pthread_t (or
+  // even that atomic<pthread_t> is valid), but that's how it is
+  // everywhere (at least on Linux, FreeBSD, and OSX).
+  std::atomic<pthread_t> loopThread_;
+
+  // pointer to underlying event_base class doing the heavy lifting
+  event_base* evb_;
+
+  // A notification queue for runInEventBaseThread() to use
+  // to send function requests to the EventBase thread.
+  std::unique_ptr<NotificationQueue<std::pair<void (*)(void*), void*>>> queue_;
+  std::unique_ptr<FunctionRunner> fnRunner_;
+
+  // limit for latency in microseconds (0 disables)
+  int64_t maxLatency_;
+
+  // exponentially-smoothed average loop time for latency-limiting
+  SmoothLoopTime avgLoopTime_;
+
+  // smoothed loop time used to invoke latency callbacks; differs from
+  // avgLoopTime_ in that it's scaled down after triggering a callback
+  // to reduce spamminess
+  SmoothLoopTime maxLatencyLoopTime_;
+
+  // callback called when latency limit is exceeded
+  Cob maxLatencyCob_;
+
+  // we'll wait this long before running deferred callbacks if the event
+  // loop is idle.
+  static const int kDEFAULT_IDLE_WAIT_USEC = 20000; // 20ms
+
+  // Wrap-around loop counter to detect beginning of each loop
+  uint64_t nextLoopCnt_;
+  uint64_t latestLoopCnt_;
+  uint64_t startWork_;
+
+  // Observer to export counters
+  std::shared_ptr<EventBaseObserver> observer_;
+  uint32_t observerSampleCount_;
+
+  // Name of the thread running this EventBase
+  std::string name_;
+};
+
+} // folly
diff --git a/folly/io/async/EventFDWrapper.h b/folly/io/async/EventFDWrapper.h
new file mode 100644 (file)
index 0000000..5df88fc
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Work around the lack of <sys/eventfd.h> on glibc 2.5.1 which we still
+ * need to support, sigh.
+ */
+
+#pragma once
+
+#include <features.h>
+
+// <sys/eventfd.h> doesn't exist on older glibc versions
+#if (defined(__GLIBC__) && __GLIBC_PREREQ(2, 9))
+#include <sys/eventfd.h>
+#else /* !(defined(__GLIBC__) && __GLIBC_PREREQ(2, 9)) */
+
+#include <sys/syscall.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+// Use existing __NR_eventfd2 if already defined
+// Values from the Linux kernel source:
+// arch/x86/include/asm/unistd_{32,64}.h
+#ifndef __NR_eventfd2
+#if defined(__x86_64__)
+#define __NR_eventfd2  290
+#elif defined(__i386__)
+#define __NR_eventfd2  328
+#else
+#error "Can't define __NR_eventfd2 for your architecture."
+#endif
+#endif
+
+enum
+  {
+    EFD_SEMAPHORE = 1,
+#define EFD_SEMAPHORE EFD_SEMAPHORE
+    EFD_CLOEXEC = 02000000,
+#define EFD_CLOEXEC EFD_CLOEXEC
+    EFD_NONBLOCK = 04000
+#define EFD_NONBLOCK EFD_NONBLOCK
+  };
+
+// http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
+// Use the eventfd2 system call, as in glibc 2.9+
+// (requires kernel 2.6.30+)
+#define eventfd(initval, flags) syscall(__NR_eventfd2,(initval),(flags))
+
+#endif /* !(defined(__GLIBC__) && __GLIBC_PREREQ(2, 9)) */
diff --git a/folly/io/async/EventHandler.cpp b/folly/io/async/EventHandler.cpp
new file mode 100644 (file)
index 0000000..4c9d970
--- /dev/null
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "folly/io/async/EventHandler.h"
+#include "folly/io/async/EventBase.h"
+
+#include <assert.h>
+
+namespace folly {
+
+EventHandler::EventHandler(EventBase* eventBase, int fd) {
+  event_set(&event_, fd, 0, &EventHandler::libeventCallback, this);
+  if (eventBase != nullptr) {
+    setEventBase(eventBase);
+  } else {
+    // Callers must set the EventBase and fd before using this timeout.
+    // Set event_->ev_base to nullptr to ensure that this happens.
+    // (otherwise libevent will initialize it to the "default" event_base)
+    event_.ev_base = nullptr;
+    eventBase_ = nullptr;
+  }
+}
+
+EventHandler::~EventHandler() {
+  unregisterHandler();
+}
+
+bool EventHandler::registerImpl(uint16_t events, bool internal) {
+  assert(event_.ev_base != nullptr);
+
+  // We have to unregister the event before we can change the event flags
+  if (isHandlerRegistered()) {
+    // If the new events are the same are the same as the already registered
+    // flags, we don't have to do anything.  Just return.
+    if (events == event_.ev_events &&
+        static_cast<bool>(event_.ev_flags & EVLIST_INTERNAL) == internal) {
+      return true;
+    }
+
+    event_del(&event_);
+  }
+
+  // Update the event flags
+  // Unfortunately, event_set() resets the event_base, so we have to remember
+  // it before hand, then pass it back into event_base_set() afterwards
+  struct event_base* evb = event_.ev_base;
+  event_set(&event_, event_.ev_fd, events,
+            &EventHandler::libeventCallback, this);
+  event_base_set(evb, &event_);
+
+  // Set EVLIST_INTERNAL if this is an internal event
+  if (internal) {
+    event_.ev_flags |= EVLIST_INTERNAL;
+  }
+
+  // Add the event.
+  //
+  // Although libevent allows events to wait on both I/O and a timeout,
+  // we intentionally don't allow an EventHandler to also use a timeout.
+  // Callers must maintain a separate AsyncTimeout object if they want a
+  // timeout.
+  //
+  // Otherwise, it is difficult to handle persistent events properly.  (The I/O
+  // event and timeout may both fire together the same time around the event
+  // loop.  Normally we would want to inform the caller of the I/O event first,
+  // then the timeout.  However, it is difficult to do this properly since the
+  // I/O callback could delete the EventHandler.)  Additionally, if a caller
+  // uses the same struct event for both I/O and timeout, and they just want to
+  // reschedule the timeout, libevent currently makes an epoll_ctl() call even
+  // if the I/O event flags haven't changed.  Using a separate event struct is
+  // therefore slightly more efficient in this case (although it does take up
+  // more space).
+  if (event_add(&event_, nullptr) < 0) {
+    LOG(ERROR) << "EventBase: failed to register event handler for fd "
+               << event_.ev_fd << ": " << strerror(errno);
+    // Call event_del() to make sure the event is completely uninstalled
+    event_del(&event_);
+    return false;
+  }
+
+  return true;
+}
+
+void EventHandler::unregisterHandler() {
+  if (isHandlerRegistered()) {
+    event_del(&event_);
+  }
+}
+
+void EventHandler::attachEventBase(EventBase* eventBase) {
+  // attachEventBase() may only be called on detached handlers
+  assert(event_.ev_base == nullptr);
+  assert(!isHandlerRegistered());
+  // This must be invoked from the EventBase's thread
+  assert(eventBase->isInEventBaseThread());
+
+  setEventBase(eventBase);
+}
+
+void EventHandler::detachEventBase() {
+  ensureNotRegistered(__func__);
+  event_.ev_base = nullptr;
+}
+
+void EventHandler::changeHandlerFD(int fd) {
+  ensureNotRegistered(__func__);
+  // event_set() resets event_base.ev_base, so manually restore it afterwards
+  struct event_base* evb = event_.ev_base;
+  event_set(&event_, fd, 0, &EventHandler::libeventCallback, this);
+  event_.ev_base = evb; // don't use event_base_set(), since evb may be nullptr
+}
+
+void EventHandler::initHandler(EventBase* eventBase, int fd) {
+  ensureNotRegistered(__func__);
+  event_set(&event_, fd, 0, &EventHandler::libeventCallback, this);
+  setEventBase(eventBase);
+}
+
+void EventHandler::ensureNotRegistered(const char* fn) {
+  // Neither the EventBase nor file descriptor may be changed while the
+  // handler is registered.  Treat it as a programmer bug and abort the program
+  // if this requirement is violated.
+  if (isHandlerRegistered()) {
+    LOG(ERROR) << fn << " called on registered handler; aborting";
+    abort();
+  }
+}
+
+void EventHandler::libeventCallback(int fd, short events, void* arg) {
+  EventHandler* handler = reinterpret_cast<EventHandler*>(arg);
+  assert(fd == handler->event_.ev_fd);
+
+  // this can't possibly fire if handler->eventBase_ is nullptr
+  (void) handler->eventBase_->bumpHandlingTime();
+
+  handler->handlerReady(events);
+}
+
+void EventHandler::setEventBase(EventBase* eventBase) {
+  event_base_set(eventBase->getLibeventBase(), &event_);
+  eventBase_ = eventBase;
+}
+
+bool EventHandler::isPending() {
+  if (event_.ev_flags & EVLIST_ACTIVE) {
+    if (event_.ev_res & EV_READ) {
+      return true;
+    }
+  }
+  return false;
+}
+
+} // folly
diff --git a/folly/io/async/EventHandler.h b/folly/io/async/EventHandler.h
new file mode 100644 (file)
index 0000000..3239c61
--- /dev/null
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <glog/logging.h>
+#include "folly/io/async/EventUtil.h"
+#include <boost/noncopyable.hpp>
+#include <stddef.h>
+
+namespace folly {
+
+class EventBase;
+
+/**
+ * The EventHandler class is used to asynchronously wait for events on a file
+ * descriptor.
+ *
+ * Users that wish to wait on I/O events should derive from EventHandler and
+ * implement the handlerReady() method.
+ */
+class EventHandler : private boost::noncopyable {
+ public:
+  enum EventFlags {
+    NONE = 0,
+    READ = EV_READ,
+    WRITE = EV_WRITE,
+    READ_WRITE = (READ | WRITE),
+    PERSIST = EV_PERSIST
+  };
+
+  /**
+   * Create a new EventHandler object.
+   *
+   * @param eventBase  The EventBase to use to drive this event handler.
+   *                   This may be nullptr, in which case the EventBase must be
+   *                   set separately using initHandler() or attachEventBase()
+   *                   before the handler can be registered.
+   * @param fd         The file descriptor that this EventHandler will
+   *                   monitor.  This may be -1, in which case the file
+   *                   descriptor must be set separately using initHandler() or
+   *                   changeHandlerFD() before the handler can be registered.
+   */
+  explicit EventHandler(EventBase* eventBase = nullptr, int fd = -1);
+
+  /**
+   * EventHandler destructor.
+   *
+   * The event will be automatically unregistered if it is still registered.
+   */
+  virtual ~EventHandler();
+
+  /**
+   * handlerReady() is invoked when the handler is ready.
+   *
+   * @param events  A bitset indicating the events that are ready.
+   */
+  virtual void handlerReady(uint16_t events) noexcept = 0;
+
+  /**
+   * Register the handler.
+   *
+   * If the handler is already registered, the registration will be updated
+   * to wait on the new set of events.
+   *
+   * @param events   A bitset specifying the events to monitor.
+   *                 If the PERSIST bit is set, the handler will remain
+   *                 registered even after handlerReady() is called.
+   *
+   * @return Returns true if the handler was successfully registered,
+   *         or false if an error occurred.  After an error, the handler is
+   *         always unregistered, even if it was already registered prior to
+   *         this call to registerHandler().
+   */
+  bool registerHandler(uint16_t events) {
+    return registerImpl(events, false);
+  }
+
+  /**
+   * Unregister the handler, if it is registered.
+   */
+  void unregisterHandler();
+
+  /**
+   * Returns true if the handler is currently registered.
+   */
+  bool isHandlerRegistered() const {
+    return EventUtil::isEventRegistered(&event_);
+  }
+
+  /**
+   * Attach the handler to a EventBase.
+   *
+   * This may only be called if the handler is not currently attached to a
+   * EventBase (either by using the default constructor, or by calling
+   * detachEventBase()).
+   *
+   * This method must be invoked in the EventBase's thread.
+   */
+  void attachEventBase(EventBase* eventBase);
+
+  /**
+   * Detach the handler from its EventBase.
+   *
+   * This may only be called when the handler is not currently registered.
+   * Once detached, the handler may not be registered again until it is
+   * re-attached to a EventBase by calling attachEventBase().
+   *
+   * This method must be called from the current EventBase's thread.
+   */
+  void detachEventBase();
+
+  /**
+   * Change the file descriptor that this handler is associated with.
+   *
+   * This may only be called when the handler is not currently registered.
+   */
+  void changeHandlerFD(int fd);
+
+  /**
+   * Attach the handler to a EventBase, and change the file descriptor.
+   *
+   * This method may only be called if the handler is not currently attached to
+   * a EventBase.  This is primarily intended to be used to initialize
+   * EventHandler objects created using the default constructor.
+   */
+  void initHandler(EventBase* eventBase, int fd);
+
+  /**
+   * Return the set of events that we're currently registered for.
+   */
+  uint16_t getRegisteredEvents() const {
+    return (isHandlerRegistered()) ?
+      event_.ev_events : 0;
+  }
+
+  /**
+   * Register the handler as an internal event.
+   *
+   * This event will not count as an active event for determining if the
+   * EventBase loop has more events to process.  The EventBase loop runs
+   * only as long as there are active EventHandlers, however "internal" event
+   * handlers are not counted.  Therefore this event handler will not prevent
+   * EventBase loop from exiting with no more work to do if there are no other
+   * non-internal event handlers registered.
+   *
+   * This is intended to be used only in very rare cases by the internal
+   * EventBase code.  This API is not guaranteed to remain stable or portable
+   * in the future.
+   */
+  bool registerInternalHandler(uint16_t events) {
+    return registerImpl(events, true);
+  }
+
+  bool isPending();
+
+ private:
+  bool registerImpl(uint16_t events, bool internal);
+  void ensureNotRegistered(const char* fn);
+
+  void setEventBase(EventBase* eventBase);
+
+  static void libeventCallback(int fd, short events, void* arg);
+
+  struct event event_;
+  EventBase* eventBase_;
+};
+
+} // folly
diff --git a/folly/io/async/EventUtil.h b/folly/io/async/EventUtil.h
new file mode 100644 (file)
index 0000000..2942f97
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <event.h>  // libevent
+
+namespace folly {
+
+/**
+ * low-level libevent utility functions
+ */
+class EventUtil {
+ public:
+  static bool isEventRegistered(const struct event* ev) {
+    // If any of these flags are set, the event is registered.
+    enum {
+      EVLIST_REGISTERED = (EVLIST_INSERTED | EVLIST_ACTIVE |
+                           EVLIST_TIMEOUT | EVLIST_SIGNAL)
+    };
+    return (ev->ev_flags & EVLIST_REGISTERED);
+  }
+};
+
+} // folly
diff --git a/folly/io/async/NotificationQueue.h b/folly/io/async/NotificationQueue.h
new file mode 100644 (file)
index 0000000..431f933
--- /dev/null
@@ -0,0 +1,676 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "folly/io/async/EventBase.h"
+#include "folly/io/async/EventFDWrapper.h"
+#include "folly/io/async/EventHandler.h"
+#include "folly/io/async/Request.h"
+#include "folly/Likely.h"
+#include "folly/SmallLocks.h"
+
+#include <glog/logging.h>
+#include <deque>
+
+namespace folly {
+
+/**
+ * A producer-consumer queue for passing messages between EventBase threads.
+ *
+ * Messages can be added to the queue from any thread.  Multiple consumers may
+ * listen to the queue from multiple EventBase threads.
+ *
+ * A NotificationQueue may not be destroyed while there are still consumers
+ * registered to receive events from the queue.  It is the user's
+ * responsibility to ensure that all consumers are unregistered before the
+ * queue is destroyed.
+ *
+ * MessageT should be MoveConstructible (i.e., must support either a move
+ * constructor or a copy constructor, or both).  Ideally it's move constructor
+ * (or copy constructor if no move constructor is provided) should never throw
+ * exceptions.  If the constructor may throw, the consumers could end up
+ * spinning trying to move a message off the queue and failing, and then
+ * retrying.
+ */
+template<typename MessageT>
+class NotificationQueue {
+ public:
+  /**
+   * A callback interface for consuming messages from the queue as they arrive.
+   */
+  class Consumer : private EventHandler {
+   public:
+    enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
+
+    Consumer()
+      : queue_(nullptr),
+        destroyedFlagPtr_(nullptr),
+        maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
+
+    virtual ~Consumer();
+
+    /**
+     * messageAvailable() will be invoked whenever a new
+     * message is available from the pipe.
+     */
+    virtual void messageAvailable(MessageT&& message) = 0;
+
+    /**
+     * Begin consuming messages from the specified queue.
+     *
+     * messageAvailable() will be called whenever a message is available.  This
+     * consumer will continue to consume messages until stopConsuming() is
+     * called.
+     *
+     * A Consumer may only consume messages from a single NotificationQueue at
+     * a time.  startConsuming() should not be called if this consumer is
+     * already consuming.
+     */
+    void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
+      init(eventBase, queue);
+      registerHandler(READ | PERSIST);
+    }
+
+    /**
+     * Same as above but registers this event handler as internal so that it
+     * doesn't count towards the pending reader count for the IOLoop.
+     */
+    void startConsumingInternal(
+        EventBase* eventBase, NotificationQueue* queue) {
+      init(eventBase, queue);
+      registerInternalHandler(READ | PERSIST);
+    }
+
+    /**
+     * Stop consuming messages.
+     *
+     * startConsuming() may be called again to resume consumption of messages
+     * at a later point in time.
+     */
+    void stopConsuming();
+
+    /**
+     * Get the NotificationQueue that this consumer is currently consuming
+     * messages from.  Returns nullptr if the consumer is not currently
+     * consuming events from any queue.
+     */
+    NotificationQueue* getCurrentQueue() const {
+      return queue_;
+    }
+
+    /**
+     * Set a limit on how many messages this consumer will read each iteration
+     * around the event loop.
+     *
+     * This helps rate-limit how much work the Consumer will do each event loop
+     * iteration, to prevent it from starving other event handlers.
+     *
+     * A limit of 0 means no limit will be enforced.  If unset, the limit
+     * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
+     */
+    void setMaxReadAtOnce(uint32_t maxAtOnce) {
+      maxReadAtOnce_ = maxAtOnce;
+    }
+    uint32_t getMaxReadAtOnce() const {
+      return maxReadAtOnce_;
+    }
+
+    EventBase* getEventBase() {
+      return base_;
+    }
+
+    virtual void handlerReady(uint16_t events) noexcept;
+
+   private:
+    void init(EventBase* eventBase, NotificationQueue* queue);
+
+    NotificationQueue* queue_;
+    bool* destroyedFlagPtr_;
+    uint32_t maxReadAtOnce_;
+    EventBase* base_;
+  };
+
+  enum class FdType {
+    EVENTFD,
+    PIPE
+  };
+
+  /**
+   * Create a new NotificationQueue.
+   *
+   * If the maxSize parameter is specified, this sets the maximum queue size
+   * that will be enforced by tryPutMessage().  (This size is advisory, and may
+   * be exceeded if producers explicitly use putMessage() instead of
+   * tryPutMessage().)
+   *
+   * The fdType parameter determines the type of file descriptor used
+   * internally to signal message availability.  The default (eventfd) is
+   * preferable for performance and because it won't fail when the queue gets
+   * too long.  It is not available on on older and non-linux kernels, however.
+   * In this case the code will fall back to using a pipe, the parameter is
+   * mostly for testing purposes.
+   */
+  explicit NotificationQueue(uint32_t maxSize = 0,
+                              FdType fdType = FdType::EVENTFD)
+    : eventfd_(-1),
+      pipeFds_{-1, -1},
+      advisoryMaxQueueSize_(maxSize),
+      pid_(getpid()),
+      queue_() {
+
+    spinlock_.init();
+
+    RequestContext::getStaticContext();
+
+    if (fdType == FdType::EVENTFD) {
+      eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
+      if (eventfd_ == -1) {
+        if (errno == ENOSYS || errno == EINVAL) {
+          // eventfd not availalble
+          LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
+                     << errno << ", falling back to pipe mode (is your kernel "
+                     << "> 2.6.30?)";
+          fdType = FdType::PIPE;
+        } else {
+          // some other error
+          folly::throwSystemError("Failed to create eventfd for "
+                                  "NotificationQueue", errno);
+        }
+      }
+    }
+    if (fdType == FdType::PIPE) {
+      if (pipe(pipeFds_)) {
+        folly::throwSystemError("Failed to create pipe for NotificationQueue",
+                                errno);
+      }
+      try {
+        // put both ends of the pipe into non-blocking mode
+        if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
+          folly::throwSystemError("failed to put NotificationQueue pipe read "
+                                  "endpoint into non-blocking mode", errno);
+        }
+        if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
+          folly::throwSystemError("failed to put NotificationQueue pipe write "
+                                  "endpoint into non-blocking mode", errno);
+        }
+      } catch (...) {
+        ::close(pipeFds_[0]);
+        ::close(pipeFds_[1]);
+        throw;
+      }
+    }
+  }
+
+  ~NotificationQueue() {
+    if (eventfd_ >= 0) {
+      ::close(eventfd_);
+      eventfd_ = -1;
+    }
+    if (pipeFds_[0] >= 0) {
+      ::close(pipeFds_[0]);
+      pipeFds_[0] = -1;
+    }
+    if (pipeFds_[1] >= 0) {
+      ::close(pipeFds_[1]);
+      pipeFds_[1] = -1;
+    }
+  }
+
+  /**
+   * Set the advisory maximum queue size.
+   *
+   * This maximum queue size affects calls to tryPutMessage().  Message
+   * producers can still use the putMessage() call to unconditionally put a
+   * message on the queue, ignoring the configured maximum queue size.  This
+   * can cause the queue size to exceed the configured maximum.
+   */
+  void setMaxQueueSize(uint32_t max) {
+    advisoryMaxQueueSize_ = max;
+  }
+
+  /**
+   * Attempt to put a message on the queue if the queue is not already full.
+   *
+   * If the queue is full, a std::overflow_error will be thrown.  The
+   * setMaxQueueSize() function controls the maximum queue size.
+   *
+   * This method may contend briefly on a spinlock if many threads are
+   * concurrently accessing the queue, but for all intents and purposes it will
+   * immediately place the message on the queue and return.
+   *
+   * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
+   * may throw any other exception thrown by the MessageT move/copy
+   * constructor.
+   */
+  void tryPutMessage(MessageT&& message) {
+    putMessageImpl(std::move(message), advisoryMaxQueueSize_);
+  }
+  void tryPutMessage(const MessageT& message) {
+    putMessageImpl(message, advisoryMaxQueueSize_);
+  }
+
+  /**
+   * No-throw versions of the above.  Instead returns true on success, false on
+   * failure.
+   *
+   * Only std::overflow_error is prevented from being thrown (since this is the
+   * common exception case), user code must still catch std::bad_alloc errors.
+   */
+  bool tryPutMessageNoThrow(MessageT&& message) {
+    return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false);
+  }
+  bool tryPutMessageNoThrow(const MessageT& message) {
+    return putMessageImpl(message, advisoryMaxQueueSize_, false);
+  }
+
+  /**
+   * Unconditionally put a message on the queue.
+   *
+   * This method is like tryPutMessage(), but ignores the maximum queue size
+   * and always puts the message on the queue, even if the maximum queue size
+   * would be exceeded.
+   *
+   * putMessage() may throw std::bad_alloc if memory allocation fails, and may
+   * throw any other exception thrown by the MessageT move/copy constructor.
+   */
+  void putMessage(MessageT&& message) {
+    putMessageImpl(std::move(message), 0);
+  }
+  void putMessage(const MessageT& message) {
+    putMessageImpl(message, 0);
+  }
+
+  /**
+   * Put several messages on the queue.
+   */
+  template<typename InputIteratorT>
+  void putMessages(InputIteratorT first, InputIteratorT last) {
+    typedef typename std::iterator_traits<InputIteratorT>::iterator_category
+      IterCategory;
+    putMessagesImpl(first, last, IterCategory());
+  }
+
+  /**
+   * Try to immediately pull a message off of the queue, without blocking.
+   *
+   * If a message is immediately available, the result parameter will be
+   * updated to contain the message contents and true will be returned.
+   *
+   * If no message is available, false will be returned and result will be left
+   * unmodified.
+   */
+  bool tryConsume(MessageT& result) {
+    checkPid();
+    if (!tryConsumeEvent()) {
+      return false;
+    }
+
+    try {
+
+      folly::MSLGuard g(spinlock_);
+
+      // This shouldn't happen normally.  See the comments in
+      // Consumer::handlerReady() for more details.
+      if (UNLIKELY(queue_.empty())) {
+        LOG(ERROR) << "found empty queue after signalled event";
+        return false;
+      }
+
+      auto data = std::move(queue_.front());
+      result = data.first;
+      RequestContext::setContext(data.second);
+
+      queue_.pop_front();
+    } catch (...) {
+      // Handle an exception if the assignment operator happens to throw.
+      // We consumed an event but weren't able to pop the message off the
+      // queue.  Signal the event again since the message is still in the
+      // queue.
+      signalEvent(1);
+      throw;
+    }
+
+    return true;
+  }
+
+  int size() {
+    folly::MSLGuard g(spinlock_);
+    return queue_.size();
+  }
+
+  /**
+   * Check that the NotificationQueue is being used from the correct process.
+   *
+   * If you create a NotificationQueue in one process, then fork, and try to
+   * send messages to the queue from the child process, you're going to have a
+   * bad time.  Unfortunately users have (accidentally) run into this.
+   *
+   * Because we use an eventfd/pipe, the child process can actually signal the
+   * parent process that an event is ready.  However, it can't put anything on
+   * the parent's queue, so the parent wakes up and finds an empty queue.  This
+   * check ensures that we catch the problem in the misbehaving child process
+   * code, and crash before signalling the parent process.
+   */
+  void checkPid() const {
+    CHECK_EQ(pid_, getpid());
+  }
+
+ private:
+  // Forbidden copy constructor and assignment operator
+  NotificationQueue(NotificationQueue const &) = delete;
+  NotificationQueue& operator=(NotificationQueue const &) = delete;
+
+  inline bool checkQueueSize(size_t maxSize, bool throws=true) const {
+    DCHECK(0 == spinlock_.try_lock());
+    if (maxSize > 0 && queue_.size() >= maxSize) {
+      if (throws) {
+        throw std::overflow_error("unable to add message to NotificationQueue: "
+                                  "queue is full");
+      }
+      return false;
+    }
+    return true;
+  }
+
+  inline void signalEvent(size_t numAdded = 1) const {
+    static const uint8_t kPipeMessage[] = {
+      1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
+    };
+
+    ssize_t bytes_written = 0;
+    ssize_t bytes_expected = 0;
+    if (eventfd_ >= 0) {
+      // eventfd(2) dictates that we must write a 64-bit integer
+      uint64_t numAdded64(numAdded);
+      bytes_expected = static_cast<ssize_t>(sizeof(numAdded64));
+      bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64));
+    } else {
+      // pipe semantics, add one message for each numAdded
+      bytes_expected = numAdded;
+      do {
+        size_t messageSize = std::min(numAdded, sizeof(kPipeMessage));
+        ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize);
+        if (rc < 0) {
+          // TODO: if the pipe is full, write will fail with EAGAIN.
+          // See task #1044651 for how this could be handled
+          break;
+        }
+        numAdded -= rc;
+        bytes_written += rc;
+      } while (numAdded > 0);
+    }
+    if (bytes_written != bytes_expected) {
+      folly::throwSystemError("failed to signal NotificationQueue after "
+                              "write", errno);
+    }
+  }
+
+  bool tryConsumeEvent() {
+    uint64_t value = 0;
+    ssize_t rc = -1;
+    if (eventfd_ >= 0) {
+      rc = ::read(eventfd_, &value, sizeof(value));
+    } else {
+      uint8_t value8;
+      rc = ::read(pipeFds_[0], &value8, sizeof(value8));
+      value = value8;
+    }
+    if (rc < 0) {
+      // EAGAIN should pretty much be the only error we can ever get.
+      // This means someone else already processed the only available message.
+      assert(errno == EAGAIN);
+      return false;
+    }
+    assert(value == 1);
+    return true;
+  }
+
+  bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) {
+    checkPid();
+    {
+      folly::MSLGuard g(spinlock_);
+      if (!checkQueueSize(maxSize, throws)) {
+        return false;
+      }
+      queue_.push_back(
+        std::make_pair(std::move(message),
+                       RequestContext::saveContext()));
+    }
+    signalEvent();
+    return true;
+  }
+
+  bool putMessageImpl(
+    const MessageT& message, size_t maxSize, bool throws=true) {
+    checkPid();
+    {
+      folly::MSLGuard g(spinlock_);
+      if (!checkQueueSize(maxSize, throws)) {
+        return false;
+      }
+      queue_.push_back(std::make_pair(message, RequestContext::saveContext()));
+    }
+    signalEvent();
+    return true;
+  }
+
+  template<typename InputIteratorT>
+  void putMessagesImpl(InputIteratorT first, InputIteratorT last,
+                       std::input_iterator_tag) {
+    checkPid();
+    size_t numAdded = 0;
+    {
+      folly::MSLGuard g(spinlock_);
+      while (first != last) {
+        queue_.push_back(std::make_pair(*first, RequestContext::saveContext()));
+        ++first;
+        ++numAdded;
+      }
+    }
+    signalEvent(numAdded);
+  }
+
+  mutable folly::MicroSpinLock spinlock_;
+  int eventfd_;
+  int pipeFds_[2]; // to fallback to on older/non-linux systems
+  uint32_t advisoryMaxQueueSize_;
+  pid_t pid_;
+  std::deque<std::pair<MessageT, std::shared_ptr<RequestContext>>> queue_;
+};
+
+template<typename MessageT>
+NotificationQueue<MessageT>::Consumer::~Consumer() {
+  // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
+  // will be non-nullptr.  Mark the value that it points to, so that
+  // handlerReady() will know the callback is destroyed, and that it cannot
+  // access any member variables anymore.
+  if (destroyedFlagPtr_) {
+    *destroyedFlagPtr_ = true;
+  }
+}
+
+template<typename MessageT>
+void NotificationQueue<MessageT>::Consumer::handlerReady(uint16_t events)
+    noexcept {
+  uint32_t numProcessed = 0;
+  while (true) {
+    // Try to decrement the eventfd.
+    //
+    // We decrement the eventfd before checking the queue, and only pop a
+    // message off the queue if we read from the eventfd.
+    //
+    // Reading the eventfd first allows us to not have to hold the spinlock
+    // while accessing the eventfd.  If we popped from the queue first, we
+    // would have to hold the lock while reading from or writing to the
+    // eventfd.  (Multiple consumers may be woken up from a single eventfd
+    // notification.  If we popped from the queue first, we could end up
+    // popping a message from the queue before the eventfd has been notified by
+    // the producer, unless the consumer and producer both held the spinlock
+    // around the entire operation.)
+    if (!queue_->tryConsumeEvent()) {
+      // no message available right now
+      return;
+    }
+
+    // Now pop the message off of the queue.
+    // We successfully consumed the eventfd notification.
+    // There should be a message available for us to consume.
+    //
+    // We have to manually acquire and release the spinlock here, rather than
+    // using SpinLockHolder since the MessageT has to be constructed while
+    // holding the spinlock and available after we release it.  SpinLockHolder
+    // unfortunately doesn't provide a release() method.  (We can't construct
+    // MessageT first since we have no guarantee that MessageT has a default
+    // constructor.
+    queue_->spinlock_.lock();
+    bool locked = true;
+
+    try {
+      // The eventfd is incremented once for every message, and only
+      // decremented when a message is popped off.  There should always be a
+      // message here to read.
+      if (UNLIKELY(queue_->queue_.empty())) {
+        // Unfortunately we have seen this happen in practice if a user forks
+        // the process, and then the child tries to send a message to a
+        // NotificationQueue being monitored by a thread in the parent.
+        // The child can signal the parent via the eventfd, but won't have been
+        // able to put anything on the parent's queue since it has a separate
+        // address space.
+        //
+        // This is a bug in the sender's code.  putMessagesImpl() should cause
+        // the sender to crash now before trying to send a message from the
+        // wrong process.  However, just in case let's handle this case in the
+        // consumer without crashing.
+        LOG(ERROR) << "found empty queue after signalled event";
+        queue_->spinlock_.unlock();
+        return;
+      }
+
+      // Pull a message off the queue.
+      auto& data = queue_->queue_.front();
+
+      MessageT msg(std::move(data.first));
+      auto old_ctx =
+        RequestContext::setContext(data.second);
+      queue_->queue_.pop_front();
+
+      // Check to see if the queue is empty now.
+      // We use this as an optimization to see if we should bother trying to
+      // loop again and read another message after invoking this callback.
+      bool wasEmpty = queue_->queue_.empty();
+
+      // Now unlock the spinlock before we invoke the callback.
+      queue_->spinlock_.unlock();
+      locked = false;
+
+      // Call the callback
+      bool callbackDestroyed = false;
+      CHECK(destroyedFlagPtr_ == nullptr);
+      destroyedFlagPtr_ = &callbackDestroyed;
+      messageAvailable(std::move(msg));
+
+      RequestContext::setContext(old_ctx);
+
+      // If the callback was destroyed before it returned, we are done
+      if (callbackDestroyed) {
+        return;
+      }
+      destroyedFlagPtr_ = nullptr;
+
+      // If the callback is no longer installed, we are done.
+      if (queue_ == nullptr) {
+        return;
+      }
+
+      // If we have hit maxReadAtOnce_, we are done.
+      ++numProcessed;
+      if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
+        return;
+      }
+
+      // If the queue was empty before we invoked the callback, it's probable
+      // that it is still empty now.  Just go ahead and return, rather than
+      // looping again and trying to re-read from the eventfd.  (If a new
+      // message had in fact arrived while we were invoking the callback, we
+      // will simply be woken up the next time around the event loop and will
+      // process the message then.)
+      if (wasEmpty) {
+        return;
+      }
+    } catch (const std::exception& ex) {
+      // This catch block is really just to handle the case where the MessageT
+      // constructor throws.  The messageAvailable() callback itself is
+      // declared as noexcept and should never throw.
+      //
+      // If the MessageT constructor does throw we try to handle it as best as
+      // we can, but we can't work miracles.  We will just ignore the error for
+      // now and return.  The next time around the event loop we will end up
+      // trying to read the message again.  If MessageT continues to throw we
+      // will never make forward progress and will keep trying each time around
+      // the event loop.
+      if (locked) {
+        // Unlock the spinlock.
+        queue_->spinlock_.unlock();
+
+        // Push a notification back on the eventfd since we didn't actually
+        // read the message off of the queue.
+        queue_->signalEvent(1);
+      }
+
+      return;
+    }
+  }
+}
+
+template<typename MessageT>
+void NotificationQueue<MessageT>::Consumer::init(
+    EventBase* eventBase,
+    NotificationQueue* queue) {
+  assert(eventBase->isInEventBaseThread());
+  assert(queue_ == nullptr);
+  assert(!isHandlerRegistered());
+  queue->checkPid();
+
+  base_ = eventBase;
+
+  queue_ = queue;
+  if (queue_->eventfd_ >= 0) {
+    initHandler(eventBase, queue_->eventfd_);
+  } else {
+    initHandler(eventBase, queue_->pipeFds_[0]);
+  }
+}
+
+template<typename MessageT>
+void NotificationQueue<MessageT>::Consumer::stopConsuming() {
+  if (queue_ == nullptr) {
+    assert(!isHandlerRegistered());
+    return;
+  }
+
+  assert(isHandlerRegistered());
+  unregisterHandler();
+  detachEventBase();
+  queue_ = nullptr;
+}
+
+} // folly
diff --git a/folly/io/async/Request.cpp b/folly/io/async/Request.cpp
new file mode 100644 (file)
index 0000000..97e2dda
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "folly/io/async/Request.h"
+
+#ifndef NO_LIB_GFLAGS
+  DEFINE_bool(enable_request_context, true,
+              "Enable collection of per-request queueing stats for thrift");
+#endif
+
+namespace folly {
+
+#ifdef NO_LIB_GFLAGS
+  bool FLAGS_enable_thrift_request_context = true;
+#endif
+
+RequestContext* defaultContext;
+
+}
diff --git a/folly/io/async/Request.h b/folly/io/async/Request.h
new file mode 100644 (file)
index 0000000..d1e853c
--- /dev/null
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <map>
+#include <memory>
+#include <glog/logging.h>
+#include "folly/ThreadLocal.h"
+#include "folly/RWSpinLock.h"
+
+/**
+ * In many cases this header is included as a
+ * dependency to libraries which do not need
+ * command line flags. GFLAGS is a large binary
+ * and thus we do this so that a library which
+ * is size sensitive doesn't have to pull in
+ * GFLAGS if it doesn't want to.
+ */
+#ifndef NO_LIB_GFLAGS
+  #include <gflags/gflags.h>
+  DECLARE_bool(enable_request_context);
+#endif
+
+namespace folly {
+
+#ifdef NO_LIB_GFLAGS
+  extern bool FLAGS_enable_request_context;
+#endif
+
+// Some request context that follows an async request through a process
+// Everything in the context must be thread safe
+
+class RequestData {
+ public:
+  virtual ~RequestData() {}
+};
+
+class RequestContext;
+
+// If you do not call create() to create a unique request context,
+// this default request context will always be returned, and is never
+// copied between threads.
+extern RequestContext* defaultContext;
+
+class RequestContext {
+ public:
+  // Create a unique requext context for this request.
+  // It will be passed between queues / threads (where implemented),
+  // so it should be valid for the lifetime of the request.
+  static bool create() {
+    if(!FLAGS_enable_request_context) {
+      return false;
+    }
+    bool prev = getStaticContext().get() != nullptr;
+    getStaticContext().reset(new std::shared_ptr<RequestContext>(
+                     std::make_shared<RequestContext>()));
+    return prev;
+  }
+
+  // Get the current context.
+  static RequestContext* get() {
+    if (!FLAGS_enable_request_context ||
+        getStaticContext().get() == nullptr) {
+      if (defaultContext == nullptr) {
+        defaultContext = new RequestContext;
+      }
+      return defaultContext;
+    }
+    return getStaticContext().get()->get();
+  }
+
+  // The following API may be used to set per-request data in a thread-safe way.
+  // This access is still performance sensitive, so please ask if you need help
+  // profiling any use of these functions.
+  void setContextData(
+    const std::string& val, std::unique_ptr<RequestData> data) {
+    if (!FLAGS_enable_request_context) {
+      return;
+    }
+
+    folly::RWSpinLock::WriteHolder guard(lock);
+    if (data_.find(val) != data_.end()) {
+      LOG_FIRST_N(WARNING, 1) <<
+        "Called RequestContext::setContextData with data already set";
+
+      data_[val] = nullptr;
+    } else {
+      data_[val] = std::move(data);
+    }
+  }
+
+  bool hasContextData(const std::string& val) {
+    folly::RWSpinLock::ReadHolder guard(lock);
+    return data_.find(val) != data_.end();
+  }
+
+  RequestData* getContextData(const std::string& val) {
+    folly::RWSpinLock::ReadHolder guard(lock);
+    auto r = data_.find(val);
+    if (r == data_.end()) {
+      return nullptr;
+    } else {
+      return r->second.get();
+    }
+  }
+
+  void clearContextData(const std::string& val) {
+    folly::RWSpinLock::WriteHolder guard(lock);
+    data_.erase(val);
+  }
+
+  // The following API is used to pass the context through queues / threads.
+  // saveContext is called to geta shared_ptr to the context, and
+  // setContext is used to reset it on the other side of the queue.
+  //
+  // A shared_ptr is used, because many request may fan out across
+  // multiple threads, or do post-send processing, etc.
+
+  static std::shared_ptr<RequestContext>
+  setContext(std::shared_ptr<RequestContext> ctx) {
+    if (FLAGS_enable_request_context) {
+      std::shared_ptr<RequestContext> old_ctx;
+      if (getStaticContext().get()) {
+        old_ctx = *getStaticContext().get();
+      }
+      if (ctx == nullptr) {
+        getStaticContext().reset(nullptr);
+      } else {
+        getStaticContext().reset(new std::shared_ptr<RequestContext>(ctx));
+      }
+      return old_ctx;
+    }
+    return std::shared_ptr<RequestContext>();
+  }
+
+  static std::shared_ptr<RequestContext> saveContext() {
+    if (!FLAGS_enable_request_context) {
+      return std::shared_ptr<RequestContext>();
+    }
+    if (getStaticContext().get() == nullptr) {
+      return std::shared_ptr<RequestContext>();
+    } else {
+      return *getStaticContext().get();
+    }
+  }
+
+  // Used to solve static destruction ordering issue.  Any static object
+  // that uses RequestContext must call this function in its constructor.
+  //
+  // See below link for more details.
+  // http://stackoverflow.com/questions/335369/
+  // finding-c-static-initialization-order-problems#335746
+  static folly::ThreadLocalPtr<std::shared_ptr<RequestContext>>&
+  getStaticContext() {
+    static folly::ThreadLocalPtr<std::shared_ptr<RequestContext> > context;
+    return context;
+  }
+
+ private:
+  folly::RWSpinLock lock;
+  std::map<std::string, std::unique_ptr<RequestData>> data_;
+};
+
+/**
+ * Set the request context for a specific scope. For example,
+ * if you ran a part of a request in another thread you could
+ * use RequestContextGuard to copy apply the request context
+ * inside the other therad.
+ */
+class RequestContextGuard {
+ public:
+  explicit RequestContextGuard(std::shared_ptr<RequestContext> ctx) {
+    oldctx_ = RequestContext::setContext(std::move(ctx));
+  }
+
+  ~RequestContextGuard() {
+    RequestContext::setContext(std::move(oldctx_));
+  }
+
+ private:
+  std::shared_ptr<RequestContext> oldctx_;
+};
+
+}
diff --git a/folly/io/async/TimeoutManager.h b/folly/io/async/TimeoutManager.h
new file mode 100644 (file)
index 0000000..e54ef20
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <chrono>
+#include <stdint.h>
+
+namespace folly {
+
+class AsyncTimeout;
+
+/**
+ * Base interface to be implemented by all classes expecting to manage
+ * timeouts. AsyncTimeout will use implementations of this interface
+ * to schedule/cancel timeouts.
+ */
+class TimeoutManager {
+ public:
+  enum class InternalEnum {
+    INTERNAL,
+    NORMAL
+  };
+
+  virtual ~TimeoutManager() {}
+
+  /**
+   * Attaches/detaches TimeoutManager to AsyncTimeout
+   */
+  virtual void attachTimeoutManager(AsyncTimeout* obj,
+                                    InternalEnum internal) = 0;
+  virtual void detachTimeoutManager(AsyncTimeout* obj) = 0;
+
+  /**
+   * Schedules AsyncTimeout to fire after `timeout` milliseconds
+   */
+  virtual bool scheduleTimeout(AsyncTimeout* obj,
+                               std::chrono::milliseconds timeout) = 0;
+
+  /**
+   * Cancels the AsyncTimeout, if scheduled
+   */
+  virtual void cancelTimeout(AsyncTimeout* obj) = 0;
+
+  /**
+   * This is used to mark the beginning of a new loop cycle by the
+   * first handler fired within that cycle.
+   */
+  virtual bool bumpHandlingTime() = 0;
+
+  /**
+   * Helper method to know whether we are running in the timeout manager
+   * thread
+   */
+  virtual bool isInTimeoutManagerThread() = 0;
+};
+
+} // folly