/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2014-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#pragma once
#include <atomic>
+#include <cerrno>
+#include <cmath>
#include <cstdlib>
-#include <errno.h>
#include <functional>
#include <list>
-#include <math.h>
#include <memory>
#include <mutex>
#include <queue>
#include <boost/intrusive/list.hpp>
#include <boost/utility.hpp>
+#include <glog/logging.h>
#include <folly/Executor.h>
#include <folly/Function.h>
#include <folly/Portability.h>
#include <folly/ScopeGuard.h>
+#include <folly/executors/DrivableExecutor.h>
#include <folly/experimental/ExecutionObserver.h>
-#include <folly/futures/DrivableExecutor.h>
#include <folly/io/async/AsyncTimeout.h>
#include <folly/io/async/HHWheelTimer.h>
#include <folly/io/async/Request.h>
#include <folly/io/async/TimeoutManager.h>
-#include <folly/portability/PThread.h>
-#include <glog/logging.h>
-
-#include <event.h> // libevent
+#include <folly/portability/Event.h>
+#include <folly/synchronization/CallOnce.h>
namespace folly {
virtual void onEventBaseDestruction(EventBase& evb) = 0;
virtual ~EventBaseLocalBaseBase() = default;
};
-}
+} // namespace detail
template <typename T>
class EventBaseLocal;
std::unique_ptr<RequestEventBase>(new RequestEventBase(eb)));
}
+ bool hasCallback() override {
+ return false;
+ }
+
private:
explicit RequestEventBase(EventBase* eb) : eb_(eb) {}
EventBase* eb_;
virtual void runLoopCallback() noexcept = 0;
void cancelLoopCallback() {
+ context_.reset();
unlink();
}
Func function_;
};
+ /**
+ * Create a new EventBase object.
+ *
+ * Same as EventBase(true), which constructs an EventBase that measures time.
+ */
+ EventBase() : EventBase(true) {}
+
/**
* Create a new EventBase object.
*
* that relies on time-measurement, including:
* observer, max latency and avg loop time.
*/
- explicit EventBase(bool enableTimeMeasurement = true);
+ explicit EventBase(bool enableTimeMeasurement);
/**
* Create a new EventBase object that will use the specified libevent
* observer, max latency and avg loop time.
*/
explicit EventBase(event_base* evb, bool enableTimeMeasurement = true);
- ~EventBase();
+ ~EventBase() override;
/**
* Runs the event loop.
* called when that latency is exceeded.
* OBS: This functionality depends on time-measurement.
*/
- void setMaxLatency(int64_t maxLatency, Func maxLatencyCob) {
+ void setMaxLatency(std::chrono::microseconds maxLatency, Func maxLatencyCob) {
assert(enableTimeMeasurement_);
maxLatency_ = maxLatency;
maxLatencyCob_ = std::move(maxLatencyCob);
* Set smoothing coefficient for loop load average; # of milliseconds
* for exp(-1) (1/2.71828...) decay.
*/
- void setLoadAvgMsec(uint32_t ms);
+ void setLoadAvgMsec(std::chrono::milliseconds ms);
/**
* reset the load average to a desired value
* check if the event base loop is running.
*/
bool isRunning() const {
- return loopThread_.load(std::memory_order_relaxed) != 0;
+ return loopThread_.load(std::memory_order_relaxed) != std::thread::id();
}
/**
*/
bool isInEventBaseThread() const {
auto tid = loopThread_.load(std::memory_order_relaxed);
- return tid == 0 || pthread_equal(tid, pthread_self());
+ return tid == std::thread::id() || tid == std::this_thread::get_id();
}
bool inRunningEventBaseThread() const {
- return pthread_equal(
- loopThread_.load(std::memory_order_relaxed), pthread_self());
+ return loopThread_.load(std::memory_order_relaxed) ==
+ std::this_thread::get_id();
+ }
+
+ /**
+ * Equivalent to CHECK(isInEventBaseThread()) (and assert/DCHECK for
+ * dcheckIsInEventBaseThread), but it prints more information on
+ * failure.
+ */
+ void checkIsInEventBaseThread() const;
+ void dcheckIsInEventBaseThread() const {
+ if (kIsDebug) {
+ checkIsInEventBaseThread();
+ }
}
HHWheelTimer& timer() {
* first handler fired within that cycle.
*
*/
- void bumpHandlingTime() override final;
+ void bumpHandlingTime() final;
class SmoothLoopTime {
public:
- explicit SmoothLoopTime(uint64_t timeInterval)
- : expCoeff_(-1.0/timeInterval)
- , value_(0.0)
- , oldBusyLeftover_(0) {
+ explicit SmoothLoopTime(std::chrono::microseconds timeInterval)
+ : expCoeff_(-1.0 / timeInterval.count()),
+ value_(0.0),
+ oldBusyLeftover_(0) {
VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
}
- void setTimeInterval(uint64_t timeInterval);
+ void setTimeInterval(std::chrono::microseconds timeInterval);
void reset(double value = 0.0);
- void addSample(int64_t idle, int64_t busy);
+ void addSample(
+ std::chrono::microseconds idle,
+ std::chrono::microseconds busy);
double get() const {
return value_;
}
private:
- double expCoeff_;
- double value_;
- int64_t oldBusyLeftover_;
+ double expCoeff_;
+ double value_;
+ std::chrono::microseconds oldBusyLeftover_;
};
void setObserver(const std::shared_ptr<EventBaseObserver>& observer) {
/// Implements the DrivableExecutor interface
void drive() override {
- // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
- // released inside a loop.
++loopKeepAliveCount_;
SCOPE_EXIT {
--loopKeepAliveCount_;
loopOnce();
}
- struct LoopKeepAliveDeleter {
- void operator()(EventBase* evb) {
- DCHECK(evb->isInEventBaseThread());
- evb->loopKeepAliveCount_--;
- }
- };
- using LoopKeepAlive = std::unique_ptr<EventBase, LoopKeepAliveDeleter>;
-
/// Returns you a handle which make loop() behave like loopForever() until
/// destroyed. loop() will return to its original behavior only when all
- /// loop keep-alives are released. Loop holder is safe to release only from
- /// EventBase thread.
- ///
- /// May return no op LoopKeepAlive if loopForever() is already running.
- LoopKeepAlive loopKeepAlive() {
- DCHECK(isInEventBaseThread());
- loopKeepAliveCount_++;
- return LoopKeepAlive(this);
- }
-
- // Thread-safe version of loopKeepAlive()
- LoopKeepAlive loopKeepAliveAtomic() {
- if (inRunningEventBaseThread()) {
- return loopKeepAlive();
- }
- loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
- return LoopKeepAlive(this);
+ /// loop keep-alives are released.
+ KeepAlive getKeepAliveToken() override {
+ keepAliveAcquire();
+ return makeKeepAlive();
}
// TimeoutManager
void attachTimeoutManager(
AsyncTimeout* obj,
- TimeoutManager::InternalEnum internal) override final;
+ TimeoutManager::InternalEnum internal) final;
- void detachTimeoutManager(AsyncTimeout* obj) override final;
+ void detachTimeoutManager(AsyncTimeout* obj) final;
bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout)
- override final;
+ final;
- void cancelTimeout(AsyncTimeout* obj) override final;
+ void cancelTimeout(AsyncTimeout* obj) final;
- bool isInTimeoutManagerThread() override final {
+ bool isInTimeoutManagerThread() final {
return isInEventBaseThread();
}
+ // Returns a VirtualEventBase attached to this EventBase. Can be used to
+ // pass to APIs which expect VirtualEventBase. This VirtualEventBase will be
+ // destroyed together with the EventBase.
+ //
+ // Any number of VirtualEventBases instances may be independently constructed,
+ // which are backed by this EventBase. This method should be only used if you
+ // don't need to manage the life time of the VirtualEventBase used.
+ folly::VirtualEventBase& getVirtualEventBase();
+
+ protected:
+ void keepAliveAcquire() override {
+ if (inRunningEventBaseThread()) {
+ loopKeepAliveCount_++;
+ } else {
+ loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
+ }
+ }
+
+ void keepAliveRelease() override {
+ if (!inRunningEventBaseThread()) {
+ return add([=] { keepAliveRelease(); });
+ }
+ loopKeepAliveCount_--;
+ }
+
private:
void applyLoopKeepAlive();
std::atomic<bool> stop_;
// The ID of the thread running the main loop.
- // 0 if loop is not running.
- // Note: POSIX doesn't guarantee that 0 is an invalid pthread_t (or
- // even that atomic<pthread_t> is valid), but that's how it is
- // everywhere (at least on Linux, FreeBSD, and OSX).
- std::atomic<pthread_t> loopThread_;
+ // std::thread::id{} if loop is not running.
+ std::atomic<std::thread::id> loopThread_;
// pointer to underlying event_base class doing the heavy lifting
event_base* evb_;
bool loopKeepAliveActive_{false};
// limit for latency in microseconds (0 disables)
- int64_t maxLatency_;
+ std::chrono::microseconds maxLatency_;
// exponentially-smoothed average loop time for latency-limiting
SmoothLoopTime avgLoopTime_;
// be supported: avg loop time, observer and max latency.
const bool enableTimeMeasurement_;
- // we'll wait this long before running deferred callbacks if the event
- // loop is idle.
- static const int kDEFAULT_IDLE_WAIT_USEC = 20000; // 20ms
-
// Wrap-around loop counter to detect beginning of each loop
uint64_t nextLoopCnt_;
uint64_t latestLoopCnt_;
- uint64_t startWork_;
+ std::chrono::steady_clock::time_point startWork_;
// Prevent undefined behavior from invoking event_base_loop() reentrantly.
// This is needed since many projects use libevent-1.4, which lacks commit
// b557b175c00dc462c1fce25f6e7dd67121d2c001 from
// see EventBaseLocal
friend class detail::EventBaseLocalBase;
template <typename T> friend class EventBaseLocal;
- std::mutex localStorageMutex_;
std::unordered_map<uint64_t, std::shared_ptr<void>> localStorage_;
std::unordered_set<detail::EventBaseLocalBaseBase*> localStorageToDtor_;
+
+ folly::once_flag virtualEventBaseInitFlag_;
+ std::unique_ptr<VirtualEventBase> virtualEventBase_;
};
template <typename T>
return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); });
}
-} // folly
+} // namespace folly