#endif
#include <folly/io/async/EventBase.h>
-#include <folly/io/async/VirtualEventBase.h>
-
-#include <folly/Memory.h>
-#include <folly/ThreadName.h>
-#include <folly/io/async/NotificationQueue.h>
-#include <folly/portability/Unistd.h>
-#include <condition_variable>
#include <fcntl.h>
+
+#include <memory>
#include <mutex>
#include <thread>
+#include <folly/Baton.h>
+#include <folly/Memory.h>
+#include <folly/io/async/NotificationQueue.h>
+#include <folly/io/async/VirtualEventBase.h>
+#include <folly/portability/Unistd.h>
+#include <folly/system/ThreadName.h>
+
namespace folly {
/*
// The value 'current_base' (libevent 1) or
// 'event_global_current_base_' (libevent 2) is filled in by event_set(),
// allowing examination of its value without an explicit reference here.
- // If ev.ev_base is NULL, then event_init() must be called, otherwise
+ // If ev.ev_base is nullptr, then event_init() must be called, otherwise
// call event_base_new().
event_set(&ev, 0, 0, nullptr, nullptr);
if (!ev.ev_base) {
}
VLOG(5) << "EventBase(): Created.";
initNotificationQueue();
- RequestContext::saveContext();
}
// takes ownership of the event_base
throw std::invalid_argument("EventBase(): event base cannot be nullptr");
}
initNotificationQueue();
- RequestContext::saveContext();
}
EventBase::~EventBase() {
event_base_free(evb_);
}
- {
- std::lock_guard<std::mutex> lock(localStorageMutex_);
- for (auto storage : localStorageToDtor_) {
- storage->onEventBaseDestruction(*this);
- }
+ for (auto storage : localStorageToDtor_) {
+ storage->onEventBaseDestruction(*this);
}
+
VLOG(5) << "EventBase(): Destroyed.";
}
fnRunner_->setMaxReadAtOnce(maxAtOnce);
}
+void EventBase::checkIsInEventBaseThread() const {
+ auto evbTid = loopThread_.load(std::memory_order_relaxed);
+ if (evbTid == std::thread::id()) {
+ return;
+ }
+
+ // Using getThreadName(evbTid) instead of name_ will work also if
+ // the thread name is set outside of EventBase (and name_ is empty).
+ auto curTid = std::this_thread::get_id();
+ CHECK(evbTid == curTid)
+ << "This logic must be executed in the event base thread. "
+ << "Event base thread name: \""
+ << folly::getThreadName(evbTid).value_or("")
+ << "\", current thread name: \""
+ << folly::getThreadName(curTid).value_or("") << "\"";
+}
+
// Set smoothing coefficient for loop load average; input is # of milliseconds
// for exp(-1) decay.
void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) {
}
void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
- DCHECK(isInEventBaseThread());
+ dcheckIsInEventBaseThread();
callback->cancelLoopCallback();
callback->context_ = RequestContext::saveContext();
if (runOnceCallbacks_ != nullptr && thisIteration) {
}
void EventBase::runInLoop(Func cob, bool thisIteration) {
- DCHECK(isInEventBaseThread());
+ dcheckIsInEventBaseThread();
auto wrapper = new FunctionLoopCallback(std::move(cob));
wrapper->context_ = RequestContext::saveContext();
if (runOnceCallbacks_ != nullptr && thisIteration) {
}
void EventBase::runBeforeLoop(LoopCallback* callback) {
- DCHECK(isInEventBaseThread());
+ dcheckIsInEventBaseThread();
callback->cancelLoopCallback();
runBeforeLoopCallbacks_.push_back(*callback);
}
if (inRunningEventBaseThread()) {
runInLoop(std::move(fn));
return true;
-
}
try {
return true;
}
-bool EventBase::runInEventBaseThreadAndWait(FuncRef fn) {
+bool EventBase::runInEventBaseThreadAndWait(Func fn) {
if (inRunningEventBaseThread()) {
LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
<< "allowed";
return false;
}
- bool ready = false;
- std::mutex m;
- std::condition_variable cv;
- runInEventBaseThread([&] {
- SCOPE_EXIT {
- std::unique_lock<std::mutex> l(m);
- ready = true;
- cv.notify_one();
- // We cannot release the lock before notify_one, because a spurious
- // wakeup in the waiting thread may lead to cv and m going out of scope
- // prematurely.
- };
- fn();
+ Baton<> ready;
+ runInEventBaseThread([&ready, fn = std::move(fn)]() mutable {
+ SCOPE_EXIT {
+ ready.post();
+ };
+ // A trick to force the stored functor to be executed and then destructed
+ // before posting the baton and waking the waiting thread.
+ copy(std::move(fn))();
});
- std::unique_lock<std::mutex> l(m);
- cv.wait(l, [&] { return ready; });
+ ready.wait();
return true;
}
-bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(FuncRef fn) {
+bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) {
if (isInEventBaseThread()) {
fn();
return true;
void EventBase::initNotificationQueue() {
// Infinite size queue
- queue_.reset(new NotificationQueue<Func>());
+ queue_ = std::make_unique<NotificationQueue<Func>>();
// We allocate fnRunner_ separately, rather than declaring it directly
// as a member of EventBase solely so that we don't need to include
// NotificationQueue.h from EventBase.h
- fnRunner_.reset(new FunctionRunner());
+ fnRunner_ = std::make_unique<FunctionRunner>();
// Mark this as an internal event, so event_base_loop() will return if
// there are no other events besides this one installed.
bool EventBase::scheduleTimeout(AsyncTimeout* obj,
TimeoutManager::timeout_type timeout) {
- assert(isInEventBaseThread());
+ dcheckIsInEventBaseThread();
// Set up the timeval and add the event
struct timeval tv;
tv.tv_sec = long(timeout.count() / 1000LL);
}
void EventBase::cancelTimeout(AsyncTimeout* obj) {
- assert(isInEventBaseThread());
+ dcheckIsInEventBaseThread();
struct event* ev = obj->getEvent();
if (EventUtil::isEventRegistered(ev)) {
event_del(ev);
}
void EventBase::setName(const std::string& name) {
- assert(isInEventBaseThread());
+ dcheckIsInEventBaseThread();
name_ = name;
if (isRunning()) {
}
const std::string& EventBase::getName() {
- assert(isInEventBaseThread());
+ dcheckIsInEventBaseThread();
return name_;
}
VirtualEventBase& EventBase::getVirtualEventBase() {
folly::call_once(virtualEventBaseInitFlag_, [&] {
- virtualEventBase_ = folly::make_unique<VirtualEventBase>(*this);
+ virtualEventBase_ = std::make_unique<VirtualEventBase>(*this);
});
return *virtualEventBase_;
}
-} // folly
+} // namespace folly