From 843b72c05efafc75f2545bc84b856ae6fad5c53b Mon Sep 17 00:00:00 2001 From: Dave Watson Date: Wed, 12 Mar 2014 09:25:43 -0700 Subject: [PATCH] Move thrift/lib/cpp/async to folly. Summary: Move the minimum amount of stuff and still have everything compile. Would like to move TAsyncSocket/ServerSocket/SSL/UDP eventually, but not this round. thrift async is used very widely now: thrift, proxygen, newer mysql async interface, even trying it out in memcache. A common complaint is that it doesn't get wide enough notice under thrift/, so let's move it to folly/. After moving TAsyncSocket, both HHVM and proxygen could avoid a dep on thrift as well. Changes: * mv files to folly/io/async * remove 'T' prefix on classes/filenames * change namespace to 'folly' * remove any thrift references. Tried this before in D470080, this time doesn't attempt to abstract libevent @override-unit-failures Test Plan: fbconfig -r thrift; fbmake dev. Will iterate on any other contbuild failures Reviewed By: pgriess@fb.com FB internal diff: D1195393 --- folly/Makefile.am | 12 + folly/configure.ac | 2 + folly/io/async/AsyncTimeout.cpp | 156 +++++++ folly/io/async/AsyncTimeout.h | 164 +++++++ folly/io/async/EventBase.cpp | 660 ++++++++++++++++++++++++++++ folly/io/async/EventBase.h | 548 +++++++++++++++++++++++ folly/io/async/EventFDWrapper.h | 65 +++ folly/io/async/EventHandler.cpp | 168 +++++++ folly/io/async/EventHandler.h | 184 ++++++++ folly/io/async/EventUtil.h | 40 ++ folly/io/async/NotificationQueue.h | 676 +++++++++++++++++++++++++++++ folly/io/async/Request.cpp | 34 ++ folly/io/async/Request.h | 200 +++++++++ folly/io/async/TimeoutManager.h | 73 ++++ 14 files changed, 2982 insertions(+) create mode 100644 folly/io/async/AsyncTimeout.cpp create mode 100644 folly/io/async/AsyncTimeout.h create mode 100644 folly/io/async/EventBase.cpp create mode 100644 folly/io/async/EventBase.h create mode 100644 folly/io/async/EventFDWrapper.h create mode 100644 folly/io/async/EventHandler.cpp create mode 100644 folly/io/async/EventHandler.h create mode 100644 folly/io/async/EventUtil.h create mode 100644 folly/io/async/NotificationQueue.h create mode 100644 folly/io/async/Request.cpp create mode 100644 folly/io/async/Request.h create mode 100644 folly/io/async/TimeoutManager.h diff --git a/folly/Makefile.am b/folly/Makefile.am index 6dd25ab0..b420da27 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -79,6 +79,14 @@ nobase_follyinclude_HEADERS = \ io/RecordIO.h \ io/RecordIO-inl.h \ io/TypedIOBuf.h \ + io/async/AsyncTimeout.h \ + io/async/EventBase.h \ + io/async/EventFDWrapper.h \ + io/async/EventHandler.h \ + io/async/EventUtil.h \ + io/async/NotificationQueue.h \ + io/async/Request.h \ + io/async/TimeoutManager.h \ json.h \ Lazy.h \ Likely.h \ @@ -175,6 +183,10 @@ libfolly_la_SOURCES = \ io/IOBuf.cpp \ io/IOBufQueue.cpp \ io/RecordIO.cpp \ + io/async/AsyncTimeout.cpp \ + io/async/EventBase.cpp \ + io/async/EventHandler.cpp \ + io/async/Request.cpp \ json.cpp \ detail/MemoryIdler.cpp \ MemoryMapping.cpp \ diff --git a/folly/configure.ac b/folly/configure.ac index 3fd30fad..4e9c8041 100644 --- a/folly/configure.ac +++ b/folly/configure.ac @@ -60,6 +60,8 @@ AC_CHECK_HEADER(double-conversion/double-conversion.h, [], [AC_MSG_ERROR( AC_CHECK_LIB([double-conversion],[ceil],[],[AC_MSG_ERROR( [Please install double-conversion library])]) +AC_CHECK_LIB([event], [event_set], [], [AC_MSG_ERROR([Unable to find libevent])]) + # Checks for typedefs, structures, and compiler characteristics. AC_HEADER_STDBOOL AC_C_CONST diff --git a/folly/io/async/AsyncTimeout.cpp b/folly/io/async/AsyncTimeout.cpp new file mode 100644 index 00000000..c2196dd1 --- /dev/null +++ b/folly/io/async/AsyncTimeout.cpp @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "folly/io/async/AsyncTimeout.h" +#include "folly/io/async/EventBase.h" +#include "folly/io/async/EventUtil.h" +#include "folly/io/async/Request.h" + +#include +#include + +namespace folly { + +AsyncTimeout::AsyncTimeout(TimeoutManager* timeoutManager) + : timeoutManager_(timeoutManager) { + + event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this); + event_.ev_base = nullptr; + timeoutManager_->attachTimeoutManager( + this, + TimeoutManager::InternalEnum::NORMAL); + RequestContext::getStaticContext(); +} + +AsyncTimeout::AsyncTimeout(EventBase* eventBase) + : timeoutManager_(eventBase) { + + event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this); + event_.ev_base = nullptr; + timeoutManager_->attachTimeoutManager( + this, + TimeoutManager::InternalEnum::NORMAL); + RequestContext::getStaticContext(); +} + +AsyncTimeout::AsyncTimeout(TimeoutManager* timeoutManager, + InternalEnum internal) + : timeoutManager_(timeoutManager) { + + event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this); + event_.ev_base = nullptr; + timeoutManager_->attachTimeoutManager(this, internal); + RequestContext::getStaticContext(); +} + +AsyncTimeout::AsyncTimeout(EventBase* eventBase, InternalEnum internal) + : timeoutManager_(eventBase) { + + event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this); + event_.ev_base = nullptr; + timeoutManager_->attachTimeoutManager(this, internal); + RequestContext::getStaticContext(); +} + +AsyncTimeout::AsyncTimeout(): timeoutManager_(nullptr) { + event_set(&event_, -1, EV_TIMEOUT, &AsyncTimeout::libeventCallback, this); + event_.ev_base = nullptr; + RequestContext::getStaticContext(); +} + +AsyncTimeout::~AsyncTimeout() { + cancelTimeout(); +} + +bool AsyncTimeout::scheduleTimeout(std::chrono::milliseconds timeout) { + assert(timeoutManager_ != nullptr); + context_ = RequestContext::saveContext(); + return timeoutManager_->scheduleTimeout(this, timeout); +} + +bool AsyncTimeout::scheduleTimeout(uint32_t milliseconds) { + return scheduleTimeout(std::chrono::milliseconds(milliseconds)); +} + +void AsyncTimeout::cancelTimeout() { + if (isScheduled()) { + timeoutManager_->cancelTimeout(this); + } +} + +bool AsyncTimeout::isScheduled() const { + return EventUtil::isEventRegistered(&event_); +} + +void AsyncTimeout::attachTimeoutManager( + TimeoutManager* timeoutManager, + InternalEnum internal) { + // This also implies no timeout is scheduled. + assert(timeoutManager_ == nullptr); + assert(timeoutManager->isInTimeoutManagerThread()); + timeoutManager_ = timeoutManager; + + timeoutManager_->attachTimeoutManager(this, internal); +} + +void AsyncTimeout::attachEventBase( + EventBase* eventBase, + InternalEnum internal) { + attachTimeoutManager(eventBase, internal); +} + +void AsyncTimeout::detachTimeoutManager() { + // Only allow the event base to be changed if the timeout is not + // currently installed. + if (isScheduled()) { + // Programmer bug. Abort the program. + LOG(ERROR) << "detachEventBase() called on scheduled timeout; aborting"; + abort(); + return; + } + + if (timeoutManager_) { + timeoutManager_->detachTimeoutManager(this); + timeoutManager_ = nullptr; + } +} + +void AsyncTimeout::detachEventBase() { + detachTimeoutManager(); +} + +void AsyncTimeout::libeventCallback(int fd, short events, void* arg) { + AsyncTimeout* timeout = reinterpret_cast(arg); + assert(fd == -1); + assert(events == EV_TIMEOUT); + + // double check that ev_flags gets reset when the timeout is not running + assert((timeout->event_.ev_flags & ~EVLIST_INTERNAL) == EVLIST_INIT); + + // this can't possibly fire if timeout->eventBase_ is nullptr + (void) timeout->timeoutManager_->bumpHandlingTime(); + + auto old_ctx = + RequestContext::setContext(timeout->context_); + + timeout->timeoutExpired(); + + RequestContext::setContext(old_ctx); +} + +} // folly diff --git a/folly/io/async/AsyncTimeout.h b/folly/io/async/AsyncTimeout.h new file mode 100644 index 00000000..d1d2ba87 --- /dev/null +++ b/folly/io/async/AsyncTimeout.h @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include "folly/io/async/TimeoutManager.h" + +#include +#include +#include + +namespace folly { + +class EventBase; +class RequestContext; +class TimeoutManager; + +/** + * AsyncTimeout is used to asynchronously wait for a timeout to occur. + */ +class AsyncTimeout : private boost::noncopyable { + public: + typedef TimeoutManager::InternalEnum InternalEnum; + + /** + * Create a new AsyncTimeout object, driven by the specified TimeoutManager. + */ + explicit AsyncTimeout(TimeoutManager* timeoutManager); + explicit AsyncTimeout(EventBase* eventBase); + + /** + * Create a new internal AsyncTimeout object. + * + * Internal timeouts are like regular timeouts, but will not stop the + * TimeoutManager loop from exiting if the only remaining events are internal + * timeouts. + * + * This is useful for implementing fallback timeouts to abort the + * TimeoutManager loop if the other events have not been processed within a + * specified time period: if the event loop takes too long the timeout will + * fire and can stop the event loop. However, if all other events complete, + * the event loop will exit even though the internal timeout is still + * installed. + */ + AsyncTimeout(TimeoutManager* timeoutManager, InternalEnum internal); + AsyncTimeout(EventBase* eventBase, InternalEnum internal); + + /** + * Create a new AsyncTimeout object, not yet assigned to a TimeoutManager. + * + * attachEventBase() must be called prior to scheduling the timeout. + */ + AsyncTimeout(); + + /** + * AsyncTimeout destructor. + * + * The timeout will be automatically cancelled if it is running. + */ + virtual ~AsyncTimeout(); + + /** + * timeoutExpired() is invoked when the timeout period has expired. + */ + virtual void timeoutExpired() noexcept = 0; + + /** + * Schedule the timeout to fire in the specified number of milliseconds. + * + * After the specified number of milliseconds has elapsed, timeoutExpired() + * will be invoked by the TimeoutManager's main loop. + * + * If the timeout is already running, it will be rescheduled with the + * new timeout value. + * + * @param milliseconds The timeout duration, in milliseconds. + * + * @return Returns true if the timeout was successfully scheduled, + * and false if an error occurred. After an error, the timeout is + * always unscheduled, even if scheduleTimeout() was just + * rescheduling an existing timeout. + */ + bool scheduleTimeout(uint32_t milliseconds); + bool scheduleTimeout(std::chrono::milliseconds timeout); + + /** + * Cancel the timeout, if it is running. + */ + void cancelTimeout(); + + /** + * Returns true if the timeout is currently scheduled. + */ + bool isScheduled() const; + + /** + * Attach the timeout to a TimeoutManager. + * + * This may only be called if the timeout is not currently attached to a + * TimeoutManager (either by using the default constructor, or by calling + * detachTimeoutManager()). + * + * This method must be invoked in the TimeoutManager's thread. + * + * The internal parameter specifies if this timeout should be treated as an + * internal event. TimeoutManager::loop() will return when there are no more + * non-internal events remaining. + */ + void attachTimeoutManager(TimeoutManager* timeoutManager, + InternalEnum internal = InternalEnum::NORMAL); + void attachEventBase(EventBase* eventBase, + InternalEnum internal = InternalEnum::NORMAL); + + /** + * Detach the timeout from its TimeoutManager. + * + * This may only be called when the timeout is not running. + * Once detached, the timeout may not be scheduled again until it is + * re-attached to a EventBase by calling attachEventBase(). + * + * This method must be called from the current TimeoutManager's thread. + */ + void detachTimeoutManager(); + void detachEventBase(); + + /** + * Returns the internal handle to the event + */ + struct event* getEvent() { + return &event_; + } + + private: + static void libeventCallback(int fd, short events, void* arg); + + struct event event_; + + /* + * Store a pointer to the TimeoutManager. We only use this + * for some assert() statements, to make sure that AsyncTimeout is always + * used from the correct thread. + */ + TimeoutManager* timeoutManager_; + + // Save the request context for when the timeout fires. + std::shared_ptr context_; +}; + +} // folly diff --git a/folly/io/async/EventBase.cpp b/folly/io/async/EventBase.cpp new file mode 100644 index 00000000..8eee7107 --- /dev/null +++ b/folly/io/async/EventBase.cpp @@ -0,0 +1,660 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "folly/io/async/EventBase.h" + +#include "folly/io/async/NotificationQueue.h" + +#include +#include +#include +#include + +namespace { + +using folly::Cob; +using folly::EventBase; + +class Tr1FunctionLoopCallback : public EventBase::LoopCallback { + public: + explicit Tr1FunctionLoopCallback(const Cob& function) + : function_(function) {} + + virtual void runLoopCallback() noexcept { + function_(); + delete this; + } + + private: + Cob function_; +}; + +} + +namespace folly { + +const int kNoFD = -1; + +/* + * EventBase::FunctionRunner + */ + +class EventBase::FunctionRunner + : public NotificationQueue>::Consumer { + public: + void messageAvailable(std::pair&& msg) { + + // In libevent2, internal events do not break the loop. + // Most users would expect loop(), followed by runInEventBaseThread(), + // to break the loop and check if it should exit or not. + // To have similar bejaviour to libevent1.4, tell the loop to break here. + // Note that loop() may still continue to loop, but it will also check the + // stop_ flag as well as runInLoop callbacks, etc. + event_base_loopbreak(getEventBase()->evb_); + + if (msg.first == nullptr && msg.second == nullptr) { + // terminateLoopSoon() sends a null message just to + // wake up the loop. We can ignore these messages. + return; + } + + // If function is nullptr, just log and move on + if (!msg.first) { + LOG(ERROR) << "nullptr callback registered to be run in " + << "event base thread"; + return; + } + + // The function should never throw an exception, because we have no + // way of knowing what sort of error handling to perform. + // + // If it does throw, log a message and abort the program. + try { + msg.first(msg.second); + } catch (const std::exception& ex) { + LOG(ERROR) << "runInEventBaseThread() function threw a " + << typeid(ex).name() << " exception: " << ex.what(); + abort(); + } catch (...) { + LOG(ERROR) << "runInEventBaseThread() function threw an exception"; + abort(); + } + } +}; + +/* + * EventBase::CobTimeout methods + */ + +void EventBase::CobTimeout::timeoutExpired() noexcept { + // For now, we just swallow any exceptions that the callback threw. + try { + cob_(); + } catch (const std::exception& ex) { + LOG(ERROR) << "EventBase::runAfterDelay() callback threw " + << typeid(ex).name() << " exception: " << ex.what(); + } catch (...) { + LOG(ERROR) << "EventBase::runAfterDelay() callback threw non-exception " + << "type"; + } + + // The CobTimeout object was allocated on the heap by runAfterDelay(), + // so delete it now that the it has fired. + delete this; +} + +/* + * EventBase methods + */ + +EventBase::EventBase() + : runOnceCallbacks_(nullptr) + , stop_(false) + , loopThread_(0) + , evb_(static_cast(event_init())) + , queue_(nullptr) + , fnRunner_(nullptr) + , maxLatency_(0) + , avgLoopTime_(2000000) + , maxLatencyLoopTime_(avgLoopTime_) + , nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon + , latestLoopCnt_(nextLoopCnt_) + , startWork_(0) + , observer_(nullptr) + , observerSampleCount_(0) { + VLOG(5) << "EventBase(): Created."; + initNotificationQueue(); + RequestContext::getStaticContext(); +} + +// takes ownership of the event_base +EventBase::EventBase(event_base* evb) + : runOnceCallbacks_(nullptr) + , stop_(false) + , loopThread_(0) + , evb_(evb) + , queue_(nullptr) + , fnRunner_(nullptr) + , maxLatency_(0) + , avgLoopTime_(2000000) + , maxLatencyLoopTime_(avgLoopTime_) + , nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon + , latestLoopCnt_(nextLoopCnt_) + , startWork_(0) + , observer_(nullptr) + , observerSampleCount_(0) { + initNotificationQueue(); + RequestContext::getStaticContext(); +} + +EventBase::~EventBase() { + // Delete any unfired CobTimeout objects, so that we don't leak memory + // (Note that we don't fire them. The caller is responsible for cleaning up + // its own data structures if it destroys the EventBase with unfired events + // remaining.) + while (!pendingCobTimeouts_.empty()) { + CobTimeout* timeout = &pendingCobTimeouts_.front(); + delete timeout; + } + + (void) runLoopCallbacks(false); + + // Stop consumer before deleting NotificationQueue + fnRunner_->stopConsuming(); + event_base_free(evb_); + VLOG(5) << "EventBase(): Destroyed."; +} + +int EventBase::getNotificationQueueSize() const { + return queue_->size(); +} + +// Set smoothing coefficient for loop load average; input is # of milliseconds +// for exp(-1) decay. +void EventBase::setLoadAvgMsec(uint32_t ms) { + uint64_t us = 1000 * ms; + if (ms > 0) { + maxLatencyLoopTime_.setTimeInterval(us); + avgLoopTime_.setTimeInterval(us); + } else { + LOG(ERROR) << "non-positive arg to setLoadAvgMsec()"; + } +} + +void EventBase::resetLoadAvg(double value) { + avgLoopTime_.reset(value); + maxLatencyLoopTime_.reset(value); +} + +static int64_t getTimeDelta(int64_t *prev) { + int64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + int64_t delta = now - *prev; + *prev = now; + return delta; +} + +void EventBase::waitUntilRunning() { + while (!isRunning()) { + sched_yield(); + } +} + +// enters the event_base loop -- will only exit when forced to +bool EventBase::loop() { + VLOG(5) << "EventBase(): Starting loop."; + int res = 0; + bool ranLoopCallbacks; + int nonBlocking; + + loopThread_.store(pthread_self(), std::memory_order_release); + +#if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12) + if (!name_.empty()) { + pthread_setname_np(pthread_self(), name_.c_str()); + } +#endif + + int64_t prev = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + int64_t idleStart = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + + // TODO: Read stop_ atomically with an acquire barrier. + while (!stop_) { + ++nextLoopCnt_; + + // nobody can add loop callbacks from within this thread if + // we don't have to handle anything to start with... + nonBlocking = (loopCallbacks_.empty() ? 0 : EVLOOP_NONBLOCK); + res = event_base_loop(evb_, EVLOOP_ONCE | nonBlocking); + ranLoopCallbacks = runLoopCallbacks(); + + int64_t busy = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count() - startWork_; + int64_t idle = startWork_ - idleStart; + + avgLoopTime_.addSample(idle, busy); + maxLatencyLoopTime_.addSample(idle, busy); + + if (observer_) { + if (observerSampleCount_++ == observer_->getSampleRate()) { + observerSampleCount_ = 0; + observer_->loopSample(busy, idle); + } + } + + VLOG(11) << "EventBase " << this << " did not timeout " + " loop time guess: " << busy + idle << + " idle time: " << idle << + " busy time: " << busy << + " avgLoopTime: " << avgLoopTime_.get() << + " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() << + " maxLatency_: " << maxLatency_ << + " nothingHandledYet(): "<< nothingHandledYet(); + + // see if our average loop time has exceeded our limit + if ((maxLatency_ > 0) && + (maxLatencyLoopTime_.get() > double(maxLatency_))) { + maxLatencyCob_(); + // back off temporarily -- don't keep spamming maxLatencyCob_ + // if we're only a bit over the limit + maxLatencyLoopTime_.dampen(0.9); + } + + // Our loop run did real work; reset the idle timer + idleStart = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + + // If the event loop indicate that there were no more events, and + // we also didn't have any loop callbacks to run, there is nothing left to + // do. + if (res != 0 && !ranLoopCallbacks) { + // Since Notification Queue is marked 'internal' some events may not have + // run. Run them manually if so, and continue looping. + // + if (getNotificationQueueSize() > 0) { + fnRunner_->handlerReady(0); + } else { + break; + } + } + + VLOG(5) << "EventBase " << this << " loop time: " << getTimeDelta(&prev); + } + // Reset stop_ so loop() can be called again + stop_ = false; + + if (res < 0) { + LOG(ERROR) << "EventBase: -- error in event loop, res = " << res; + return false; + } else if (res == 1) { + VLOG(5) << "EventBase: ran out of events (exiting loop)!"; + } else if (res > 1) { + LOG(ERROR) << "EventBase: unknown event loop result = " << res; + return false; + } + + loopThread_.store(0, std::memory_order_release); + + VLOG(5) << "EventBase(): Done with loop."; + return true; +} + +void EventBase::loopForever() { + // Update the notification queue event to treat it as a normal (non-internal) + // event. The notification queue event always remains installed, and the main + // loop won't exit with it installed. + fnRunner_->stopConsuming(); + fnRunner_->startConsuming(this, queue_.get()); + + bool ret = loop(); + + // Restore the notification queue internal flag + fnRunner_->stopConsuming(); + fnRunner_->startConsumingInternal(this, queue_.get()); + + if (!ret) { + folly::throwSystemError("error in EventBase::loopForever()"); + } +} + +bool EventBase::bumpHandlingTime() { + VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ << + " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_; + if(nothingHandledYet()) { + latestLoopCnt_ = nextLoopCnt_; + // set the time + startWork_ = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + + VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ << + " (loop) startWork_ " << startWork_; + return true; + } + return false; +} + +void EventBase::terminateLoopSoon() { + VLOG(5) << "EventBase(): Received terminateLoopSoon() command."; + + if (!isRunning()) { + return; + } + + // Set stop to true, so the event loop will know to exit. + // TODO: We should really use an atomic operation here with a release + // barrier. + stop_ = true; + + // Call event_base_loopbreak() so that libevent will exit the next time + // around the loop. + event_base_loopbreak(evb_); + + // If terminateLoopSoon() is called from another thread, + // the EventBase thread might be stuck waiting for events. + // In this case, it won't wake up and notice that stop_ is set until it + // receives another event. Send an empty frame to the notification queue + // so that the event loop will wake up even if there are no other events. + // + // We don't care about the return value of trySendFrame(). If it fails + // this likely means the EventBase already has lots of events waiting + // anyway. + try { + queue_->putMessage(std::make_pair(nullptr, nullptr)); + } catch (...) { + // We don't care if putMessage() fails. This likely means + // the EventBase already has lots of events waiting anyway. + } +} + +void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) { + DCHECK(isInEventBaseThread()); + callback->cancelLoopCallback(); + callback->context_ = RequestContext::saveContext(); + if (runOnceCallbacks_ != nullptr && thisIteration) { + runOnceCallbacks_->push_back(*callback); + } else { + loopCallbacks_.push_back(*callback); + } +} + +void EventBase::runInLoop(const Cob& cob, bool thisIteration) { + DCHECK(isInEventBaseThread()); + Tr1FunctionLoopCallback* wrapper = new Tr1FunctionLoopCallback(cob); + wrapper->context_ = RequestContext::saveContext(); + if (runOnceCallbacks_ != nullptr && thisIteration) { + runOnceCallbacks_->push_back(*wrapper); + } else { + loopCallbacks_.push_back(*wrapper); + } +} + +bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) { + // Send the message. + // It will be received by the FunctionRunner in the EventBase's thread. + + // We try not to schedule nullptr callbacks + if (!fn) { + LOG(ERROR) << "EventBase " << this + << ": Scheduling nullptr callbacks is not allowed"; + return false; + } + + // Short-circuit if we are already in our event base + if (inRunningEventBaseThread()) { + runInLoop(new RunInLoopCallback(fn, arg)); + return true; + + } + + try { + queue_->putMessage(std::make_pair(fn, arg)); + } catch (const std::exception& ex) { + LOG(ERROR) << "EventBase " << this << ": failed to schedule function " + << fn << "for EventBase thread: " << ex.what(); + return false; + } + + return true; +} + +bool EventBase::runInEventBaseThread(const Cob& fn) { + // Short-circuit if we are already in our event base + if (inRunningEventBaseThread()) { + runInLoop(fn); + return true; + } + + Cob* fnCopy; + // Allocate a copy of the function so we can pass it to the other thread + // The other thread will delete this copy once the function has been run + try { + fnCopy = new Cob(fn); + } catch (const std::bad_alloc& ex) { + LOG(ERROR) << "failed to allocate tr::function copy " + << "for runInEventBaseThread()"; + return false; + } + + if (!runInEventBaseThread(&EventBase::runTr1FunctionPtr, fnCopy)) { + delete fnCopy; + return false; + } + + return true; +} + +bool EventBase::runAfterDelay(const Cob& cob, + int milliseconds, + TimeoutManager::InternalEnum in) { + CobTimeout* timeout = new CobTimeout(this, cob, in); + if (!timeout->scheduleTimeout(milliseconds)) { + delete timeout; + return false; + } + + pendingCobTimeouts_.push_back(*timeout); + return true; +} + +bool EventBase::runLoopCallbacks(bool setContext) { + if (!loopCallbacks_.empty()) { + bumpHandlingTime(); + // Swap the loopCallbacks_ list with a temporary list on our stack. + // This way we will only run callbacks scheduled at the time + // runLoopCallbacks() was invoked. + // + // If any of these callbacks in turn call runInLoop() to schedule more + // callbacks, those new callbacks won't be run until the next iteration + // around the event loop. This prevents runInLoop() callbacks from being + // able to start file descriptor and timeout based events. + LoopCallbackList currentCallbacks; + currentCallbacks.swap(loopCallbacks_); + runOnceCallbacks_ = ¤tCallbacks; + + while (!currentCallbacks.empty()) { + LoopCallback* callback = ¤tCallbacks.front(); + currentCallbacks.pop_front(); + if (setContext) { + RequestContext::setContext(callback->context_); + } + callback->runLoopCallback(); + } + + runOnceCallbacks_ = nullptr; + return true; + } + return false; +} + +void EventBase::initNotificationQueue() { + // Infinite size queue + queue_.reset(new NotificationQueue>()); + + // We allocate fnRunner_ separately, rather than declaring it directly + // as a member of EventBase solely so that we don't need to include + // NotificationQueue.h from EventBase.h + fnRunner_.reset(new FunctionRunner()); + + // Mark this as an internal event, so event_base_loop() will return if + // there are no other events besides this one installed. + // + // Most callers don't care about the internal notification queue used by + // EventBase. The queue is always installed, so if we did count the queue as + // an active event, loop() would never exit with no more events to process. + // Users can use loopForever() if they do care about the notification queue. + // (This is useful for EventBase threads that do nothing but process + // runInEventBaseThread() notifications.) + fnRunner_->startConsumingInternal(this, queue_.get()); +} + +void EventBase::SmoothLoopTime::setTimeInterval(uint64_t timeInterval) { + expCoeff_ = -1.0/timeInterval; + VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__; +} + +void EventBase::SmoothLoopTime::reset(double value) { + value_ = value; +} + +void EventBase::SmoothLoopTime::addSample(int64_t idle, int64_t busy) { + /* + * Position at which the busy sample is considered to be taken. + * (Allows to quickly skew our average without editing much code) + */ + enum BusySamplePosition { + RIGHT = 0, // busy sample placed at the end of the iteration + CENTER = 1, // busy sample placed at the middle point of the iteration + LEFT = 2, // busy sample placed at the beginning of the iteration + }; + + VLOG(11) << "idle " << idle << " oldBusyLeftover_ " << oldBusyLeftover_ << + " idle + oldBusyLeftover_ " << idle + oldBusyLeftover_ << + " busy " << busy << " " << __PRETTY_FUNCTION__; + idle += oldBusyLeftover_ + busy; + oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2; + idle -= oldBusyLeftover_; + + double coeff = exp(idle * expCoeff_); + value_ *= coeff; + value_ += (1.0 - coeff) * busy; +} + +bool EventBase::nothingHandledYet() { + VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_; + return (nextLoopCnt_ != latestLoopCnt_); +} + +/* static */ +void EventBase::runTr1FunctionPtr(Cob* fn) { + // The function should never throw an exception, because we have no + // way of knowing what sort of error handling to perform. + // + // If it does throw, log a message and abort the program. + try { + (*fn)(); + } catch (const std::exception &ex) { + LOG(ERROR) << "runInEventBaseThread() std::function threw a " + << typeid(ex).name() << " exception: " << ex.what(); + abort(); + } catch (...) { + LOG(ERROR) << "runInEventBaseThread() std::function threw an exception"; + abort(); + } + + // The function object was allocated by runInEventBaseThread(). + // Delete it once it has been run. + delete fn; +} + +EventBase::RunInLoopCallback::RunInLoopCallback(void (*fn)(void*), void* arg) + : fn_(fn) + , arg_(arg) {} + +void EventBase::RunInLoopCallback::runLoopCallback() noexcept { + fn_(arg_); + delete this; +} + +void EventBase::attachTimeoutManager(AsyncTimeout* obj, + InternalEnum internal) { + + struct event* ev = obj->getEvent(); + assert(ev->ev_base == nullptr); + + event_base_set(getLibeventBase(), ev); + if (internal == AsyncTimeout::InternalEnum::INTERNAL) { + // Set the EVLIST_INTERNAL flag + ev->ev_flags |= EVLIST_INTERNAL; + } +} + +void EventBase::detachTimeoutManager(AsyncTimeout* obj) { + cancelTimeout(obj); + struct event* ev = obj->getEvent(); + ev->ev_base = nullptr; +} + +bool EventBase::scheduleTimeout(AsyncTimeout* obj, + std::chrono::milliseconds timeout) { + assert(isInEventBaseThread()); + // Set up the timeval and add the event + struct timeval tv; + tv.tv_sec = timeout.count() / 1000LL; + tv.tv_usec = (timeout.count() % 1000LL) * 1000LL; + + struct event* ev = obj->getEvent(); + if (event_add(ev, &tv) < 0) { + LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno); + return false; + } + + return true; +} + +void EventBase::cancelTimeout(AsyncTimeout* obj) { + assert(isInEventBaseThread()); + struct event* ev = obj->getEvent(); + if (EventUtil::isEventRegistered(ev)) { + event_del(ev); + } +} + +void EventBase::setName(const std::string& name) { + assert(isInEventBaseThread()); + name_ = name; +#if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12) + if (isRunning()) { + pthread_setname_np(loopThread_.load(std::memory_order_relaxed), + name_.c_str()); + } +#endif +} + +const std::string& EventBase::getName() { + assert(isInEventBaseThread()); + return name_; +} + +} // folly diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h new file mode 100644 index 00000000..dae1be14 --- /dev/null +++ b/folly/io/async/EventBase.h @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include "folly/io/async/AsyncTimeout.h" +#include "folly/io/async/TimeoutManager.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include // libevent +#include +#include +#include + +namespace folly { + +typedef std::function Cob; +template +class NotificationQueue; + +class EventBaseObserver { + public: + virtual ~EventBaseObserver() {} + + virtual uint32_t getSampleRate() const = 0; + + virtual void loopSample( + int64_t busyTime, int64_t idleTime) = 0; +}; + +/** + * This class is a wrapper for all asynchronous I/O processing functionality + * + * EventBase provides a main loop that notifies EventHandler callback objects + * when I/O is ready on a file descriptor, and notifies AsyncTimeout objects + * when a specified timeout has expired. More complex, higher-level callback + * mechanisms can then be built on top of EventHandler and AsyncTimeout. + * + * A EventBase object can only drive an event loop for a single thread. To + * take advantage of multiple CPU cores, most asynchronous I/O servers have one + * thread per CPU, and use a separate EventBase for each thread. + * + * In general, most EventBase methods may only be called from the thread + * running the EventBase's loop. There are a few exceptions to this rule, for + * methods that are explicitly intended to allow communication with a + * EventBase from other threads. When it is safe to call a method from + * another thread it is explicitly listed in the method comments. + */ +class EventBase : private boost::noncopyable, public TimeoutManager { + public: + /** + * A callback interface to use with runInLoop() + * + * Derive from this class if you need to delay some code execution until the + * next iteration of the event loop. This allows you to schedule code to be + * invoked from the top-level of the loop, after your immediate callers have + * returned. + * + * If a LoopCallback object is destroyed while it is scheduled to be run in + * the next loop iteration, it will automatically be cancelled. + */ + class LoopCallback { + public: + virtual ~LoopCallback() {} + + virtual void runLoopCallback() noexcept = 0; + void cancelLoopCallback() { + hook_.unlink(); + } + + bool isLoopCallbackScheduled() const { + return hook_.is_linked(); + } + + private: + typedef boost::intrusive::list_member_hook< + boost::intrusive::link_mode > ListHook; + + ListHook hook_; + + typedef boost::intrusive::list< + LoopCallback, + boost::intrusive::member_hook, + boost::intrusive::constant_time_size > List; + + // EventBase needs access to LoopCallbackList (and therefore to hook_) + friend class EventBase; + std::shared_ptr context_; + }; + + /** + * Create a new EventBase object. + */ + EventBase(); + + /** + * Create a new EventBase object that will use the specified libevent + * event_base object to drive the event loop. + * + * The EventBase will take ownership of this event_base, and will call + * event_base_free(evb) when the EventBase is destroyed. + */ + explicit EventBase(event_base* evb); + ~EventBase(); + + /** + * Runs the event loop. + * + * loop() will loop waiting for I/O or timeouts and invoking EventHandler + * and AsyncTimeout callbacks as their events become ready. loop() will + * only return when there are no more events remaining to process, or after + * terminateLoopSoon() has been called. + * + * loop() may be called again to restart event processing after a previous + * call to loop() or loopForever() has returned. + * + * Returns true if the loop completed normally (if it processed all + * outstanding requests, or if terminateLoopSoon() was called). If an error + * occurs waiting for events, false will be returned. + */ + bool loop(); + + /** + * Runs the event loop. + * + * loopForever() behaves like loop(), except that it keeps running even if + * when there are no more user-supplied EventHandlers or AsyncTimeouts + * registered. It will only return after terminateLoopSoon() has been + * called. + * + * This is useful for callers that want to wait for other threads to call + * runInEventBaseThread(), even when there are no other scheduled events. + * + * loopForever() may be called again to restart event processing after a + * previous call to loop() or loopForever() has returned. + * + * Throws a std::system_error if an error occurs. + */ + void loopForever(); + + /** + * Causes the event loop to exit soon. + * + * This will cause an existing call to loop() or loopForever() to stop event + * processing and return, even if there are still events remaining to be + * processed. + * + * It is safe to call terminateLoopSoon() from another thread to cause loop() + * to wake up and return in the EventBase loop thread. terminateLoopSoon() + * may also be called from the loop thread itself (for example, a + * EventHandler or AsyncTimeout callback may call terminateLoopSoon() to + * cause the loop to exit after the callback returns.) + * + * Note that the caller is responsible for ensuring that cleanup of all event + * callbacks occurs properly. Since terminateLoopSoon() causes the loop to + * exit even when there are pending events present, there may be remaining + * callbacks present waiting to be invoked. If the loop is later restarted + * pending events will continue to be processed normally, however if the + * EventBase is destroyed after calling terminateLoopSoon() it is the + * caller's responsibility to ensure that cleanup happens properly even if + * some outstanding events are never processed. + */ + void terminateLoopSoon(); + + /** + * Adds the given callback to a queue of things run after the current pass + * through the event loop completes. Note that if this callback calls + * runInLoop() the new callback won't be called until the main event loop + * has gone through a cycle. + * + * This method may only be called from the EventBase's thread. This + * essentially allows an event handler to schedule an additional callback to + * be invoked after it returns. + * + * Use runInEventBaseThread() to schedule functions from another thread. + * + * The thisIteration parameter makes this callback run in this loop + * iteration, instead of the next one, even if called from a + * runInLoop callback (normal io callbacks that call runInLoop will + * always run in this iteration). This was originally added to + * support detachEventBase, as a user callback may have called + * terminateLoopSoon(), but we want to make sure we detach. Also, + * detachEventBase almost always must be called from the base event + * loop to ensure the stack is unwound, since most users of + * EventBase are not thread safe. + * + * Ideally we would not need thisIteration, and instead just use + * runInLoop with loop() (instead of terminateLoopSoon). + */ + void runInLoop(LoopCallback* callback, bool thisIteration = false); + + /** + * Convenience function to call runInLoop() with a std::function. + * + * This creates a LoopCallback object to wrap the std::function, and invoke + * the std::function when the loop callback fires. This is slightly more + * expensive than defining your own LoopCallback, but more convenient in + * areas that aren't performance sensitive where you just want to use + * std::bind. (std::bind is fairly slow on even by itself.) + * + * This method may only be called from the EventBase's thread. This + * essentially allows an event handler to schedule an additional callback to + * be invoked after it returns. + * + * Use runInEventBaseThread() to schedule functions from another thread. + */ + void runInLoop(const Cob& c, bool thisIteration = false); + + /** + * Run the specified function in the EventBase's thread. + * + * This method is thread-safe, and may be called from another thread. + * + * If runInEventBaseThread() is called when the EventBase loop is not + * running, the function call will be delayed until the next time the loop is + * started. + * + * If runInEventBaseThread() returns true the function has successfully been + * scheduled to run in the loop thread. However, if the loop is terminated + * (and never later restarted) before it has a chance to run the requested + * function, the function may never be run at all. The caller is responsible + * for handling this situation correctly if they may terminate the loop with + * outstanding runInEventBaseThread() calls pending. + * + * If two calls to runInEventBaseThread() are made from the same thread, the + * functions will always be run in the order that they were scheduled. + * Ordering between functions scheduled from separate threads is not + * guaranteed. + * + * @param fn The function to run. The function must not throw any + * exceptions. + * @param arg An argument to pass to the function. + * + * @return Returns true if the function was successfully scheduled, or false + * if there was an error scheduling the function. + */ + template + bool runInEventBaseThread(void (*fn)(T*), T* arg) { + return runInEventBaseThread(reinterpret_cast(fn), + reinterpret_cast(arg)); + } + + bool runInEventBaseThread(void (*fn)(void*), void* arg); + + /** + * Run the specified function in the EventBase's thread + * + * This version of runInEventBaseThread() takes a std::function object. + * Note that this is less efficient than the version that takes a plain + * function pointer and void* argument, as it has to allocate memory to copy + * the std::function object. + * + * If the EventBase loop is terminated before it has a chance to run this + * function, the allocated memory will be leaked. The caller is responsible + * for ensuring that the EventBase loop is not terminated before this + * function can run. + * + * The function must not throw any exceptions. + */ + bool runInEventBaseThread(const Cob& fn); + + /** + * Runs the given Cob at some time after the specified number of + * milliseconds. (No guarantees exactly when.) + * + * @return true iff the cob was successfully registered. + */ + bool runAfterDelay( + const Cob& c, + int milliseconds, + TimeoutManager::InternalEnum = TimeoutManager::InternalEnum::NORMAL); + + /** + * Set the maximum desired latency in us and provide a callback which will be + * called when that latency is exceeded. + */ + void setMaxLatency(int64_t maxLatency, const Cob& maxLatencyCob) { + maxLatency_ = maxLatency; + maxLatencyCob_ = maxLatencyCob; + } + + /** + * Set smoothing coefficient for loop load average; # of milliseconds + * for exp(-1) (1/2.71828...) decay. + */ + void setLoadAvgMsec(uint32_t ms); + + /** + * reset the load average to a desired value + */ + void resetLoadAvg(double value = 0.0); + + /** + * Get the average loop time in microseconds (an exponentially-smoothed ave) + */ + double getAvgLoopTime() const { + return avgLoopTime_.get(); + } + + /** + * check if the event base loop is running. + */ + bool isRunning() const { + return loopThread_.load(std::memory_order_relaxed) != 0; + } + + /** + * wait until the event loop starts (after starting the event loop thread). + */ + void waitUntilRunning(); + + int getNotificationQueueSize() const; + + /** + * Verify that current thread is the EventBase thread, if the EventBase is + * running. + */ + bool isInEventBaseThread() const { + auto tid = loopThread_.load(std::memory_order_relaxed); + return tid == 0 || pthread_equal(tid, pthread_self()); + } + + bool inRunningEventBaseThread() const { + return pthread_equal( + loopThread_.load(std::memory_order_relaxed), pthread_self()); + } + + // --------- interface to underlying libevent base ------------ + // Avoid using these functions if possible. These functions are not + // guaranteed to always be present if we ever provide alternative EventBase + // implementations that do not use libevent internally. + event_base* getLibeventBase() const { return evb_; } + static const char* getLibeventVersion() { return event_get_version(); } + static const char* getLibeventMethod() { return event_get_method(); } + + /** + * only EventHandler/AsyncTimeout subclasses and ourselves should + * ever call this. + * + * This is used to mark the beginning of a new loop cycle by the + * first handler fired within that cycle. + * + */ + bool bumpHandlingTime(); + + class SmoothLoopTime { + public: + explicit SmoothLoopTime(uint64_t timeInterval) + : expCoeff_(-1.0/timeInterval) + , value_(0.0) + , oldBusyLeftover_(0) { + VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__; + } + + void setTimeInterval(uint64_t timeInterval); + void reset(double value = 0.0); + + void addSample(int64_t idle, int64_t busy); + + double get() const { + return value_; + } + + void dampen(double factor) { + value_ *= factor; + } + + private: + double expCoeff_; + double value_; + int64_t oldBusyLeftover_; + }; + + void setObserver( + const std::shared_ptr& observer) { + observer_ = observer; + } + + const std::shared_ptr& getObserver() { + return observer_; + } + + /** + * Set the name of the thread that runs this event base. + */ + void setName(const std::string& name); + + /** + * Returns the name of the thread that runs this event base. + */ + const std::string& getName(); + + private: + + // TimeoutManager + void attachTimeoutManager(AsyncTimeout* obj, + TimeoutManager::InternalEnum internal); + + void detachTimeoutManager(AsyncTimeout* obj); + + bool scheduleTimeout(AsyncTimeout* obj, std::chrono::milliseconds timeout); + + void cancelTimeout(AsyncTimeout* obj); + + bool isInTimeoutManagerThread() { + return isInEventBaseThread(); + } + + // Helper class used to short circuit runInEventBaseThread + class RunInLoopCallback : public LoopCallback { + public: + RunInLoopCallback(void (*fn)(void*), void* arg); + void runLoopCallback() noexcept; + + private: + void (*fn_)(void*); + void* arg_; + }; + + /* + * Helper function that tells us whether we have already handled + * some event/timeout/callback in this loop iteration. + */ + bool nothingHandledYet(); + + // --------- libevent callbacks (not for client use) ------------ + + static void runTr1FunctionPtr(std::function* fn); + + // small object used as a callback arg with enough info to execute the + // appropriate client-provided Cob + class CobTimeout : public AsyncTimeout { + public: + CobTimeout(EventBase* b, const Cob& c, TimeoutManager::InternalEnum in) + : AsyncTimeout(b, in), cob_(c) {} + + virtual void timeoutExpired() noexcept; + + private: + Cob cob_; + + public: + typedef boost::intrusive::list_member_hook< + boost::intrusive::link_mode > ListHook; + + ListHook hook; + + typedef boost::intrusive::list< + CobTimeout, + boost::intrusive::member_hook, + boost::intrusive::constant_time_size > List; + }; + + typedef LoopCallback::List LoopCallbackList; + class FunctionRunner; + + // executes any callbacks queued by runInLoop(); returns false if none found + bool runLoopCallbacks(bool setContext = true); + + void initNotificationQueue(); + + CobTimeout::List pendingCobTimeouts_; + + LoopCallbackList loopCallbacks_; + + // This will be null most of the time, but point to currentCallbacks + // if we are in the middle of running loop callbacks, such that + // runInLoop(..., true) will always run in the current loop + // iteration. + LoopCallbackList* runOnceCallbacks_; + + // stop_ is set by terminateLoopSoon() and is used by the main loop + // to determine if it should exit + bool stop_; + + // The ID of the thread running the main loop. + // 0 if loop is not running. + // Note: POSIX doesn't guarantee that 0 is an invalid pthread_t (or + // even that atomic is valid), but that's how it is + // everywhere (at least on Linux, FreeBSD, and OSX). + std::atomic loopThread_; + + // pointer to underlying event_base class doing the heavy lifting + event_base* evb_; + + // A notification queue for runInEventBaseThread() to use + // to send function requests to the EventBase thread. + std::unique_ptr>> queue_; + std::unique_ptr fnRunner_; + + // limit for latency in microseconds (0 disables) + int64_t maxLatency_; + + // exponentially-smoothed average loop time for latency-limiting + SmoothLoopTime avgLoopTime_; + + // smoothed loop time used to invoke latency callbacks; differs from + // avgLoopTime_ in that it's scaled down after triggering a callback + // to reduce spamminess + SmoothLoopTime maxLatencyLoopTime_; + + // callback called when latency limit is exceeded + Cob maxLatencyCob_; + + // we'll wait this long before running deferred callbacks if the event + // loop is idle. + static const int kDEFAULT_IDLE_WAIT_USEC = 20000; // 20ms + + // Wrap-around loop counter to detect beginning of each loop + uint64_t nextLoopCnt_; + uint64_t latestLoopCnt_; + uint64_t startWork_; + + // Observer to export counters + std::shared_ptr observer_; + uint32_t observerSampleCount_; + + // Name of the thread running this EventBase + std::string name_; +}; + +} // folly diff --git a/folly/io/async/EventFDWrapper.h b/folly/io/async/EventFDWrapper.h new file mode 100644 index 00000000..5df88fcb --- /dev/null +++ b/folly/io/async/EventFDWrapper.h @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Work around the lack of on glibc 2.5.1 which we still + * need to support, sigh. + */ + +#pragma once + +#include + +// doesn't exist on older glibc versions +#if (defined(__GLIBC__) && __GLIBC_PREREQ(2, 9)) +#include +#else /* !(defined(__GLIBC__) && __GLIBC_PREREQ(2, 9)) */ + +#include +#include +#include + +// Use existing __NR_eventfd2 if already defined +// Values from the Linux kernel source: +// arch/x86/include/asm/unistd_{32,64}.h +#ifndef __NR_eventfd2 +#if defined(__x86_64__) +#define __NR_eventfd2 290 +#elif defined(__i386__) +#define __NR_eventfd2 328 +#else +#error "Can't define __NR_eventfd2 for your architecture." +#endif +#endif + +enum + { + EFD_SEMAPHORE = 1, +#define EFD_SEMAPHORE EFD_SEMAPHORE + EFD_CLOEXEC = 02000000, +#define EFD_CLOEXEC EFD_CLOEXEC + EFD_NONBLOCK = 04000 +#define EFD_NONBLOCK EFD_NONBLOCK + }; + +// http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html +// Use the eventfd2 system call, as in glibc 2.9+ +// (requires kernel 2.6.30+) +#define eventfd(initval, flags) syscall(__NR_eventfd2,(initval),(flags)) + +#endif /* !(defined(__GLIBC__) && __GLIBC_PREREQ(2, 9)) */ diff --git a/folly/io/async/EventHandler.cpp b/folly/io/async/EventHandler.cpp new file mode 100644 index 00000000..4c9d9701 --- /dev/null +++ b/folly/io/async/EventHandler.cpp @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "folly/io/async/EventHandler.h" +#include "folly/io/async/EventBase.h" + +#include + +namespace folly { + +EventHandler::EventHandler(EventBase* eventBase, int fd) { + event_set(&event_, fd, 0, &EventHandler::libeventCallback, this); + if (eventBase != nullptr) { + setEventBase(eventBase); + } else { + // Callers must set the EventBase and fd before using this timeout. + // Set event_->ev_base to nullptr to ensure that this happens. + // (otherwise libevent will initialize it to the "default" event_base) + event_.ev_base = nullptr; + eventBase_ = nullptr; + } +} + +EventHandler::~EventHandler() { + unregisterHandler(); +} + +bool EventHandler::registerImpl(uint16_t events, bool internal) { + assert(event_.ev_base != nullptr); + + // We have to unregister the event before we can change the event flags + if (isHandlerRegistered()) { + // If the new events are the same are the same as the already registered + // flags, we don't have to do anything. Just return. + if (events == event_.ev_events && + static_cast(event_.ev_flags & EVLIST_INTERNAL) == internal) { + return true; + } + + event_del(&event_); + } + + // Update the event flags + // Unfortunately, event_set() resets the event_base, so we have to remember + // it before hand, then pass it back into event_base_set() afterwards + struct event_base* evb = event_.ev_base; + event_set(&event_, event_.ev_fd, events, + &EventHandler::libeventCallback, this); + event_base_set(evb, &event_); + + // Set EVLIST_INTERNAL if this is an internal event + if (internal) { + event_.ev_flags |= EVLIST_INTERNAL; + } + + // Add the event. + // + // Although libevent allows events to wait on both I/O and a timeout, + // we intentionally don't allow an EventHandler to also use a timeout. + // Callers must maintain a separate AsyncTimeout object if they want a + // timeout. + // + // Otherwise, it is difficult to handle persistent events properly. (The I/O + // event and timeout may both fire together the same time around the event + // loop. Normally we would want to inform the caller of the I/O event first, + // then the timeout. However, it is difficult to do this properly since the + // I/O callback could delete the EventHandler.) Additionally, if a caller + // uses the same struct event for both I/O and timeout, and they just want to + // reschedule the timeout, libevent currently makes an epoll_ctl() call even + // if the I/O event flags haven't changed. Using a separate event struct is + // therefore slightly more efficient in this case (although it does take up + // more space). + if (event_add(&event_, nullptr) < 0) { + LOG(ERROR) << "EventBase: failed to register event handler for fd " + << event_.ev_fd << ": " << strerror(errno); + // Call event_del() to make sure the event is completely uninstalled + event_del(&event_); + return false; + } + + return true; +} + +void EventHandler::unregisterHandler() { + if (isHandlerRegistered()) { + event_del(&event_); + } +} + +void EventHandler::attachEventBase(EventBase* eventBase) { + // attachEventBase() may only be called on detached handlers + assert(event_.ev_base == nullptr); + assert(!isHandlerRegistered()); + // This must be invoked from the EventBase's thread + assert(eventBase->isInEventBaseThread()); + + setEventBase(eventBase); +} + +void EventHandler::detachEventBase() { + ensureNotRegistered(__func__); + event_.ev_base = nullptr; +} + +void EventHandler::changeHandlerFD(int fd) { + ensureNotRegistered(__func__); + // event_set() resets event_base.ev_base, so manually restore it afterwards + struct event_base* evb = event_.ev_base; + event_set(&event_, fd, 0, &EventHandler::libeventCallback, this); + event_.ev_base = evb; // don't use event_base_set(), since evb may be nullptr +} + +void EventHandler::initHandler(EventBase* eventBase, int fd) { + ensureNotRegistered(__func__); + event_set(&event_, fd, 0, &EventHandler::libeventCallback, this); + setEventBase(eventBase); +} + +void EventHandler::ensureNotRegistered(const char* fn) { + // Neither the EventBase nor file descriptor may be changed while the + // handler is registered. Treat it as a programmer bug and abort the program + // if this requirement is violated. + if (isHandlerRegistered()) { + LOG(ERROR) << fn << " called on registered handler; aborting"; + abort(); + } +} + +void EventHandler::libeventCallback(int fd, short events, void* arg) { + EventHandler* handler = reinterpret_cast(arg); + assert(fd == handler->event_.ev_fd); + + // this can't possibly fire if handler->eventBase_ is nullptr + (void) handler->eventBase_->bumpHandlingTime(); + + handler->handlerReady(events); +} + +void EventHandler::setEventBase(EventBase* eventBase) { + event_base_set(eventBase->getLibeventBase(), &event_); + eventBase_ = eventBase; +} + +bool EventHandler::isPending() { + if (event_.ev_flags & EVLIST_ACTIVE) { + if (event_.ev_res & EV_READ) { + return true; + } + } + return false; +} + +} // folly diff --git a/folly/io/async/EventHandler.h b/folly/io/async/EventHandler.h new file mode 100644 index 00000000..3239c614 --- /dev/null +++ b/folly/io/async/EventHandler.h @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include "folly/io/async/EventUtil.h" +#include +#include + +namespace folly { + +class EventBase; + +/** + * The EventHandler class is used to asynchronously wait for events on a file + * descriptor. + * + * Users that wish to wait on I/O events should derive from EventHandler and + * implement the handlerReady() method. + */ +class EventHandler : private boost::noncopyable { + public: + enum EventFlags { + NONE = 0, + READ = EV_READ, + WRITE = EV_WRITE, + READ_WRITE = (READ | WRITE), + PERSIST = EV_PERSIST + }; + + /** + * Create a new EventHandler object. + * + * @param eventBase The EventBase to use to drive this event handler. + * This may be nullptr, in which case the EventBase must be + * set separately using initHandler() or attachEventBase() + * before the handler can be registered. + * @param fd The file descriptor that this EventHandler will + * monitor. This may be -1, in which case the file + * descriptor must be set separately using initHandler() or + * changeHandlerFD() before the handler can be registered. + */ + explicit EventHandler(EventBase* eventBase = nullptr, int fd = -1); + + /** + * EventHandler destructor. + * + * The event will be automatically unregistered if it is still registered. + */ + virtual ~EventHandler(); + + /** + * handlerReady() is invoked when the handler is ready. + * + * @param events A bitset indicating the events that are ready. + */ + virtual void handlerReady(uint16_t events) noexcept = 0; + + /** + * Register the handler. + * + * If the handler is already registered, the registration will be updated + * to wait on the new set of events. + * + * @param events A bitset specifying the events to monitor. + * If the PERSIST bit is set, the handler will remain + * registered even after handlerReady() is called. + * + * @return Returns true if the handler was successfully registered, + * or false if an error occurred. After an error, the handler is + * always unregistered, even if it was already registered prior to + * this call to registerHandler(). + */ + bool registerHandler(uint16_t events) { + return registerImpl(events, false); + } + + /** + * Unregister the handler, if it is registered. + */ + void unregisterHandler(); + + /** + * Returns true if the handler is currently registered. + */ + bool isHandlerRegistered() const { + return EventUtil::isEventRegistered(&event_); + } + + /** + * Attach the handler to a EventBase. + * + * This may only be called if the handler is not currently attached to a + * EventBase (either by using the default constructor, or by calling + * detachEventBase()). + * + * This method must be invoked in the EventBase's thread. + */ + void attachEventBase(EventBase* eventBase); + + /** + * Detach the handler from its EventBase. + * + * This may only be called when the handler is not currently registered. + * Once detached, the handler may not be registered again until it is + * re-attached to a EventBase by calling attachEventBase(). + * + * This method must be called from the current EventBase's thread. + */ + void detachEventBase(); + + /** + * Change the file descriptor that this handler is associated with. + * + * This may only be called when the handler is not currently registered. + */ + void changeHandlerFD(int fd); + + /** + * Attach the handler to a EventBase, and change the file descriptor. + * + * This method may only be called if the handler is not currently attached to + * a EventBase. This is primarily intended to be used to initialize + * EventHandler objects created using the default constructor. + */ + void initHandler(EventBase* eventBase, int fd); + + /** + * Return the set of events that we're currently registered for. + */ + uint16_t getRegisteredEvents() const { + return (isHandlerRegistered()) ? + event_.ev_events : 0; + } + + /** + * Register the handler as an internal event. + * + * This event will not count as an active event for determining if the + * EventBase loop has more events to process. The EventBase loop runs + * only as long as there are active EventHandlers, however "internal" event + * handlers are not counted. Therefore this event handler will not prevent + * EventBase loop from exiting with no more work to do if there are no other + * non-internal event handlers registered. + * + * This is intended to be used only in very rare cases by the internal + * EventBase code. This API is not guaranteed to remain stable or portable + * in the future. + */ + bool registerInternalHandler(uint16_t events) { + return registerImpl(events, true); + } + + bool isPending(); + + private: + bool registerImpl(uint16_t events, bool internal); + void ensureNotRegistered(const char* fn); + + void setEventBase(EventBase* eventBase); + + static void libeventCallback(int fd, short events, void* arg); + + struct event event_; + EventBase* eventBase_; +}; + +} // folly diff --git a/folly/io/async/EventUtil.h b/folly/io/async/EventUtil.h new file mode 100644 index 00000000..2942f97a --- /dev/null +++ b/folly/io/async/EventUtil.h @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include // libevent + +namespace folly { + +/** + * low-level libevent utility functions + */ +class EventUtil { + public: + static bool isEventRegistered(const struct event* ev) { + // If any of these flags are set, the event is registered. + enum { + EVLIST_REGISTERED = (EVLIST_INSERTED | EVLIST_ACTIVE | + EVLIST_TIMEOUT | EVLIST_SIGNAL) + }; + return (ev->ev_flags & EVLIST_REGISTERED); + } +}; + +} // folly diff --git a/folly/io/async/NotificationQueue.h b/folly/io/async/NotificationQueue.h new file mode 100644 index 00000000..431f933b --- /dev/null +++ b/folly/io/async/NotificationQueue.h @@ -0,0 +1,676 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include + +#include "folly/io/async/EventBase.h" +#include "folly/io/async/EventFDWrapper.h" +#include "folly/io/async/EventHandler.h" +#include "folly/io/async/Request.h" +#include "folly/Likely.h" +#include "folly/SmallLocks.h" + +#include +#include + +namespace folly { + +/** + * A producer-consumer queue for passing messages between EventBase threads. + * + * Messages can be added to the queue from any thread. Multiple consumers may + * listen to the queue from multiple EventBase threads. + * + * A NotificationQueue may not be destroyed while there are still consumers + * registered to receive events from the queue. It is the user's + * responsibility to ensure that all consumers are unregistered before the + * queue is destroyed. + * + * MessageT should be MoveConstructible (i.e., must support either a move + * constructor or a copy constructor, or both). Ideally it's move constructor + * (or copy constructor if no move constructor is provided) should never throw + * exceptions. If the constructor may throw, the consumers could end up + * spinning trying to move a message off the queue and failing, and then + * retrying. + */ +template +class NotificationQueue { + public: + /** + * A callback interface for consuming messages from the queue as they arrive. + */ + class Consumer : private EventHandler { + public: + enum : uint16_t { kDefaultMaxReadAtOnce = 10 }; + + Consumer() + : queue_(nullptr), + destroyedFlagPtr_(nullptr), + maxReadAtOnce_(kDefaultMaxReadAtOnce) {} + + virtual ~Consumer(); + + /** + * messageAvailable() will be invoked whenever a new + * message is available from the pipe. + */ + virtual void messageAvailable(MessageT&& message) = 0; + + /** + * Begin consuming messages from the specified queue. + * + * messageAvailable() will be called whenever a message is available. This + * consumer will continue to consume messages until stopConsuming() is + * called. + * + * A Consumer may only consume messages from a single NotificationQueue at + * a time. startConsuming() should not be called if this consumer is + * already consuming. + */ + void startConsuming(EventBase* eventBase, NotificationQueue* queue) { + init(eventBase, queue); + registerHandler(READ | PERSIST); + } + + /** + * Same as above but registers this event handler as internal so that it + * doesn't count towards the pending reader count for the IOLoop. + */ + void startConsumingInternal( + EventBase* eventBase, NotificationQueue* queue) { + init(eventBase, queue); + registerInternalHandler(READ | PERSIST); + } + + /** + * Stop consuming messages. + * + * startConsuming() may be called again to resume consumption of messages + * at a later point in time. + */ + void stopConsuming(); + + /** + * Get the NotificationQueue that this consumer is currently consuming + * messages from. Returns nullptr if the consumer is not currently + * consuming events from any queue. + */ + NotificationQueue* getCurrentQueue() const { + return queue_; + } + + /** + * Set a limit on how many messages this consumer will read each iteration + * around the event loop. + * + * This helps rate-limit how much work the Consumer will do each event loop + * iteration, to prevent it from starving other event handlers. + * + * A limit of 0 means no limit will be enforced. If unset, the limit + * defaults to kDefaultMaxReadAtOnce (defined to 10 above). + */ + void setMaxReadAtOnce(uint32_t maxAtOnce) { + maxReadAtOnce_ = maxAtOnce; + } + uint32_t getMaxReadAtOnce() const { + return maxReadAtOnce_; + } + + EventBase* getEventBase() { + return base_; + } + + virtual void handlerReady(uint16_t events) noexcept; + + private: + void init(EventBase* eventBase, NotificationQueue* queue); + + NotificationQueue* queue_; + bool* destroyedFlagPtr_; + uint32_t maxReadAtOnce_; + EventBase* base_; + }; + + enum class FdType { + EVENTFD, + PIPE + }; + + /** + * Create a new NotificationQueue. + * + * If the maxSize parameter is specified, this sets the maximum queue size + * that will be enforced by tryPutMessage(). (This size is advisory, and may + * be exceeded if producers explicitly use putMessage() instead of + * tryPutMessage().) + * + * The fdType parameter determines the type of file descriptor used + * internally to signal message availability. The default (eventfd) is + * preferable for performance and because it won't fail when the queue gets + * too long. It is not available on on older and non-linux kernels, however. + * In this case the code will fall back to using a pipe, the parameter is + * mostly for testing purposes. + */ + explicit NotificationQueue(uint32_t maxSize = 0, + FdType fdType = FdType::EVENTFD) + : eventfd_(-1), + pipeFds_{-1, -1}, + advisoryMaxQueueSize_(maxSize), + pid_(getpid()), + queue_() { + + spinlock_.init(); + + RequestContext::getStaticContext(); + + if (fdType == FdType::EVENTFD) { + eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE); + if (eventfd_ == -1) { + if (errno == ENOSYS || errno == EINVAL) { + // eventfd not availalble + LOG(ERROR) << "failed to create eventfd for NotificationQueue: " + << errno << ", falling back to pipe mode (is your kernel " + << "> 2.6.30?)"; + fdType = FdType::PIPE; + } else { + // some other error + folly::throwSystemError("Failed to create eventfd for " + "NotificationQueue", errno); + } + } + } + if (fdType == FdType::PIPE) { + if (pipe(pipeFds_)) { + folly::throwSystemError("Failed to create pipe for NotificationQueue", + errno); + } + try { + // put both ends of the pipe into non-blocking mode + if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) { + folly::throwSystemError("failed to put NotificationQueue pipe read " + "endpoint into non-blocking mode", errno); + } + if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) { + folly::throwSystemError("failed to put NotificationQueue pipe write " + "endpoint into non-blocking mode", errno); + } + } catch (...) { + ::close(pipeFds_[0]); + ::close(pipeFds_[1]); + throw; + } + } + } + + ~NotificationQueue() { + if (eventfd_ >= 0) { + ::close(eventfd_); + eventfd_ = -1; + } + if (pipeFds_[0] >= 0) { + ::close(pipeFds_[0]); + pipeFds_[0] = -1; + } + if (pipeFds_[1] >= 0) { + ::close(pipeFds_[1]); + pipeFds_[1] = -1; + } + } + + /** + * Set the advisory maximum queue size. + * + * This maximum queue size affects calls to tryPutMessage(). Message + * producers can still use the putMessage() call to unconditionally put a + * message on the queue, ignoring the configured maximum queue size. This + * can cause the queue size to exceed the configured maximum. + */ + void setMaxQueueSize(uint32_t max) { + advisoryMaxQueueSize_ = max; + } + + /** + * Attempt to put a message on the queue if the queue is not already full. + * + * If the queue is full, a std::overflow_error will be thrown. The + * setMaxQueueSize() function controls the maximum queue size. + * + * This method may contend briefly on a spinlock if many threads are + * concurrently accessing the queue, but for all intents and purposes it will + * immediately place the message on the queue and return. + * + * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and + * may throw any other exception thrown by the MessageT move/copy + * constructor. + */ + void tryPutMessage(MessageT&& message) { + putMessageImpl(std::move(message), advisoryMaxQueueSize_); + } + void tryPutMessage(const MessageT& message) { + putMessageImpl(message, advisoryMaxQueueSize_); + } + + /** + * No-throw versions of the above. Instead returns true on success, false on + * failure. + * + * Only std::overflow_error is prevented from being thrown (since this is the + * common exception case), user code must still catch std::bad_alloc errors. + */ + bool tryPutMessageNoThrow(MessageT&& message) { + return putMessageImpl(std::move(message), advisoryMaxQueueSize_, false); + } + bool tryPutMessageNoThrow(const MessageT& message) { + return putMessageImpl(message, advisoryMaxQueueSize_, false); + } + + /** + * Unconditionally put a message on the queue. + * + * This method is like tryPutMessage(), but ignores the maximum queue size + * and always puts the message on the queue, even if the maximum queue size + * would be exceeded. + * + * putMessage() may throw std::bad_alloc if memory allocation fails, and may + * throw any other exception thrown by the MessageT move/copy constructor. + */ + void putMessage(MessageT&& message) { + putMessageImpl(std::move(message), 0); + } + void putMessage(const MessageT& message) { + putMessageImpl(message, 0); + } + + /** + * Put several messages on the queue. + */ + template + void putMessages(InputIteratorT first, InputIteratorT last) { + typedef typename std::iterator_traits::iterator_category + IterCategory; + putMessagesImpl(first, last, IterCategory()); + } + + /** + * Try to immediately pull a message off of the queue, without blocking. + * + * If a message is immediately available, the result parameter will be + * updated to contain the message contents and true will be returned. + * + * If no message is available, false will be returned and result will be left + * unmodified. + */ + bool tryConsume(MessageT& result) { + checkPid(); + if (!tryConsumeEvent()) { + return false; + } + + try { + + folly::MSLGuard g(spinlock_); + + // This shouldn't happen normally. See the comments in + // Consumer::handlerReady() for more details. + if (UNLIKELY(queue_.empty())) { + LOG(ERROR) << "found empty queue after signalled event"; + return false; + } + + auto data = std::move(queue_.front()); + result = data.first; + RequestContext::setContext(data.second); + + queue_.pop_front(); + } catch (...) { + // Handle an exception if the assignment operator happens to throw. + // We consumed an event but weren't able to pop the message off the + // queue. Signal the event again since the message is still in the + // queue. + signalEvent(1); + throw; + } + + return true; + } + + int size() { + folly::MSLGuard g(spinlock_); + return queue_.size(); + } + + /** + * Check that the NotificationQueue is being used from the correct process. + * + * If you create a NotificationQueue in one process, then fork, and try to + * send messages to the queue from the child process, you're going to have a + * bad time. Unfortunately users have (accidentally) run into this. + * + * Because we use an eventfd/pipe, the child process can actually signal the + * parent process that an event is ready. However, it can't put anything on + * the parent's queue, so the parent wakes up and finds an empty queue. This + * check ensures that we catch the problem in the misbehaving child process + * code, and crash before signalling the parent process. + */ + void checkPid() const { + CHECK_EQ(pid_, getpid()); + } + + private: + // Forbidden copy constructor and assignment operator + NotificationQueue(NotificationQueue const &) = delete; + NotificationQueue& operator=(NotificationQueue const &) = delete; + + inline bool checkQueueSize(size_t maxSize, bool throws=true) const { + DCHECK(0 == spinlock_.try_lock()); + if (maxSize > 0 && queue_.size() >= maxSize) { + if (throws) { + throw std::overflow_error("unable to add message to NotificationQueue: " + "queue is full"); + } + return false; + } + return true; + } + + inline void signalEvent(size_t numAdded = 1) const { + static const uint8_t kPipeMessage[] = { + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 + }; + + ssize_t bytes_written = 0; + ssize_t bytes_expected = 0; + if (eventfd_ >= 0) { + // eventfd(2) dictates that we must write a 64-bit integer + uint64_t numAdded64(numAdded); + bytes_expected = static_cast(sizeof(numAdded64)); + bytes_written = ::write(eventfd_, &numAdded64, sizeof(numAdded64)); + } else { + // pipe semantics, add one message for each numAdded + bytes_expected = numAdded; + do { + size_t messageSize = std::min(numAdded, sizeof(kPipeMessage)); + ssize_t rc = ::write(pipeFds_[1], kPipeMessage, messageSize); + if (rc < 0) { + // TODO: if the pipe is full, write will fail with EAGAIN. + // See task #1044651 for how this could be handled + break; + } + numAdded -= rc; + bytes_written += rc; + } while (numAdded > 0); + } + if (bytes_written != bytes_expected) { + folly::throwSystemError("failed to signal NotificationQueue after " + "write", errno); + } + } + + bool tryConsumeEvent() { + uint64_t value = 0; + ssize_t rc = -1; + if (eventfd_ >= 0) { + rc = ::read(eventfd_, &value, sizeof(value)); + } else { + uint8_t value8; + rc = ::read(pipeFds_[0], &value8, sizeof(value8)); + value = value8; + } + if (rc < 0) { + // EAGAIN should pretty much be the only error we can ever get. + // This means someone else already processed the only available message. + assert(errno == EAGAIN); + return false; + } + assert(value == 1); + return true; + } + + bool putMessageImpl(MessageT&& message, size_t maxSize, bool throws=true) { + checkPid(); + { + folly::MSLGuard g(spinlock_); + if (!checkQueueSize(maxSize, throws)) { + return false; + } + queue_.push_back( + std::make_pair(std::move(message), + RequestContext::saveContext())); + } + signalEvent(); + return true; + } + + bool putMessageImpl( + const MessageT& message, size_t maxSize, bool throws=true) { + checkPid(); + { + folly::MSLGuard g(spinlock_); + if (!checkQueueSize(maxSize, throws)) { + return false; + } + queue_.push_back(std::make_pair(message, RequestContext::saveContext())); + } + signalEvent(); + return true; + } + + template + void putMessagesImpl(InputIteratorT first, InputIteratorT last, + std::input_iterator_tag) { + checkPid(); + size_t numAdded = 0; + { + folly::MSLGuard g(spinlock_); + while (first != last) { + queue_.push_back(std::make_pair(*first, RequestContext::saveContext())); + ++first; + ++numAdded; + } + } + signalEvent(numAdded); + } + + mutable folly::MicroSpinLock spinlock_; + int eventfd_; + int pipeFds_[2]; // to fallback to on older/non-linux systems + uint32_t advisoryMaxQueueSize_; + pid_t pid_; + std::deque>> queue_; +}; + +template +NotificationQueue::Consumer::~Consumer() { + // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_ + // will be non-nullptr. Mark the value that it points to, so that + // handlerReady() will know the callback is destroyed, and that it cannot + // access any member variables anymore. + if (destroyedFlagPtr_) { + *destroyedFlagPtr_ = true; + } +} + +template +void NotificationQueue::Consumer::handlerReady(uint16_t events) + noexcept { + uint32_t numProcessed = 0; + while (true) { + // Try to decrement the eventfd. + // + // We decrement the eventfd before checking the queue, and only pop a + // message off the queue if we read from the eventfd. + // + // Reading the eventfd first allows us to not have to hold the spinlock + // while accessing the eventfd. If we popped from the queue first, we + // would have to hold the lock while reading from or writing to the + // eventfd. (Multiple consumers may be woken up from a single eventfd + // notification. If we popped from the queue first, we could end up + // popping a message from the queue before the eventfd has been notified by + // the producer, unless the consumer and producer both held the spinlock + // around the entire operation.) + if (!queue_->tryConsumeEvent()) { + // no message available right now + return; + } + + // Now pop the message off of the queue. + // We successfully consumed the eventfd notification. + // There should be a message available for us to consume. + // + // We have to manually acquire and release the spinlock here, rather than + // using SpinLockHolder since the MessageT has to be constructed while + // holding the spinlock and available after we release it. SpinLockHolder + // unfortunately doesn't provide a release() method. (We can't construct + // MessageT first since we have no guarantee that MessageT has a default + // constructor. + queue_->spinlock_.lock(); + bool locked = true; + + try { + // The eventfd is incremented once for every message, and only + // decremented when a message is popped off. There should always be a + // message here to read. + if (UNLIKELY(queue_->queue_.empty())) { + // Unfortunately we have seen this happen in practice if a user forks + // the process, and then the child tries to send a message to a + // NotificationQueue being monitored by a thread in the parent. + // The child can signal the parent via the eventfd, but won't have been + // able to put anything on the parent's queue since it has a separate + // address space. + // + // This is a bug in the sender's code. putMessagesImpl() should cause + // the sender to crash now before trying to send a message from the + // wrong process. However, just in case let's handle this case in the + // consumer without crashing. + LOG(ERROR) << "found empty queue after signalled event"; + queue_->spinlock_.unlock(); + return; + } + + // Pull a message off the queue. + auto& data = queue_->queue_.front(); + + MessageT msg(std::move(data.first)); + auto old_ctx = + RequestContext::setContext(data.second); + queue_->queue_.pop_front(); + + // Check to see if the queue is empty now. + // We use this as an optimization to see if we should bother trying to + // loop again and read another message after invoking this callback. + bool wasEmpty = queue_->queue_.empty(); + + // Now unlock the spinlock before we invoke the callback. + queue_->spinlock_.unlock(); + locked = false; + + // Call the callback + bool callbackDestroyed = false; + CHECK(destroyedFlagPtr_ == nullptr); + destroyedFlagPtr_ = &callbackDestroyed; + messageAvailable(std::move(msg)); + + RequestContext::setContext(old_ctx); + + // If the callback was destroyed before it returned, we are done + if (callbackDestroyed) { + return; + } + destroyedFlagPtr_ = nullptr; + + // If the callback is no longer installed, we are done. + if (queue_ == nullptr) { + return; + } + + // If we have hit maxReadAtOnce_, we are done. + ++numProcessed; + if (maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) { + return; + } + + // If the queue was empty before we invoked the callback, it's probable + // that it is still empty now. Just go ahead and return, rather than + // looping again and trying to re-read from the eventfd. (If a new + // message had in fact arrived while we were invoking the callback, we + // will simply be woken up the next time around the event loop and will + // process the message then.) + if (wasEmpty) { + return; + } + } catch (const std::exception& ex) { + // This catch block is really just to handle the case where the MessageT + // constructor throws. The messageAvailable() callback itself is + // declared as noexcept and should never throw. + // + // If the MessageT constructor does throw we try to handle it as best as + // we can, but we can't work miracles. We will just ignore the error for + // now and return. The next time around the event loop we will end up + // trying to read the message again. If MessageT continues to throw we + // will never make forward progress and will keep trying each time around + // the event loop. + if (locked) { + // Unlock the spinlock. + queue_->spinlock_.unlock(); + + // Push a notification back on the eventfd since we didn't actually + // read the message off of the queue. + queue_->signalEvent(1); + } + + return; + } + } +} + +template +void NotificationQueue::Consumer::init( + EventBase* eventBase, + NotificationQueue* queue) { + assert(eventBase->isInEventBaseThread()); + assert(queue_ == nullptr); + assert(!isHandlerRegistered()); + queue->checkPid(); + + base_ = eventBase; + + queue_ = queue; + if (queue_->eventfd_ >= 0) { + initHandler(eventBase, queue_->eventfd_); + } else { + initHandler(eventBase, queue_->pipeFds_[0]); + } +} + +template +void NotificationQueue::Consumer::stopConsuming() { + if (queue_ == nullptr) { + assert(!isHandlerRegistered()); + return; + } + + assert(isHandlerRegistered()); + unregisterHandler(); + detachEventBase(); + queue_ = nullptr; +} + +} // folly diff --git a/folly/io/async/Request.cpp b/folly/io/async/Request.cpp new file mode 100644 index 00000000..97e2dda4 --- /dev/null +++ b/folly/io/async/Request.cpp @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "folly/io/async/Request.h" + +#ifndef NO_LIB_GFLAGS + DEFINE_bool(enable_request_context, true, + "Enable collection of per-request queueing stats for thrift"); +#endif + +namespace folly { + +#ifdef NO_LIB_GFLAGS + bool FLAGS_enable_thrift_request_context = true; +#endif + +RequestContext* defaultContext; + +} diff --git a/folly/io/async/Request.h b/folly/io/async/Request.h new file mode 100644 index 00000000..d1e853c2 --- /dev/null +++ b/folly/io/async/Request.h @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include +#include +#include "folly/ThreadLocal.h" +#include "folly/RWSpinLock.h" + +/** + * In many cases this header is included as a + * dependency to libraries which do not need + * command line flags. GFLAGS is a large binary + * and thus we do this so that a library which + * is size sensitive doesn't have to pull in + * GFLAGS if it doesn't want to. + */ +#ifndef NO_LIB_GFLAGS + #include + DECLARE_bool(enable_request_context); +#endif + +namespace folly { + +#ifdef NO_LIB_GFLAGS + extern bool FLAGS_enable_request_context; +#endif + +// Some request context that follows an async request through a process +// Everything in the context must be thread safe + +class RequestData { + public: + virtual ~RequestData() {} +}; + +class RequestContext; + +// If you do not call create() to create a unique request context, +// this default request context will always be returned, and is never +// copied between threads. +extern RequestContext* defaultContext; + +class RequestContext { + public: + // Create a unique requext context for this request. + // It will be passed between queues / threads (where implemented), + // so it should be valid for the lifetime of the request. + static bool create() { + if(!FLAGS_enable_request_context) { + return false; + } + bool prev = getStaticContext().get() != nullptr; + getStaticContext().reset(new std::shared_ptr( + std::make_shared())); + return prev; + } + + // Get the current context. + static RequestContext* get() { + if (!FLAGS_enable_request_context || + getStaticContext().get() == nullptr) { + if (defaultContext == nullptr) { + defaultContext = new RequestContext; + } + return defaultContext; + } + return getStaticContext().get()->get(); + } + + // The following API may be used to set per-request data in a thread-safe way. + // This access is still performance sensitive, so please ask if you need help + // profiling any use of these functions. + void setContextData( + const std::string& val, std::unique_ptr data) { + if (!FLAGS_enable_request_context) { + return; + } + + folly::RWSpinLock::WriteHolder guard(lock); + if (data_.find(val) != data_.end()) { + LOG_FIRST_N(WARNING, 1) << + "Called RequestContext::setContextData with data already set"; + + data_[val] = nullptr; + } else { + data_[val] = std::move(data); + } + } + + bool hasContextData(const std::string& val) { + folly::RWSpinLock::ReadHolder guard(lock); + return data_.find(val) != data_.end(); + } + + RequestData* getContextData(const std::string& val) { + folly::RWSpinLock::ReadHolder guard(lock); + auto r = data_.find(val); + if (r == data_.end()) { + return nullptr; + } else { + return r->second.get(); + } + } + + void clearContextData(const std::string& val) { + folly::RWSpinLock::WriteHolder guard(lock); + data_.erase(val); + } + + // The following API is used to pass the context through queues / threads. + // saveContext is called to geta shared_ptr to the context, and + // setContext is used to reset it on the other side of the queue. + // + // A shared_ptr is used, because many request may fan out across + // multiple threads, or do post-send processing, etc. + + static std::shared_ptr + setContext(std::shared_ptr ctx) { + if (FLAGS_enable_request_context) { + std::shared_ptr old_ctx; + if (getStaticContext().get()) { + old_ctx = *getStaticContext().get(); + } + if (ctx == nullptr) { + getStaticContext().reset(nullptr); + } else { + getStaticContext().reset(new std::shared_ptr(ctx)); + } + return old_ctx; + } + return std::shared_ptr(); + } + + static std::shared_ptr saveContext() { + if (!FLAGS_enable_request_context) { + return std::shared_ptr(); + } + if (getStaticContext().get() == nullptr) { + return std::shared_ptr(); + } else { + return *getStaticContext().get(); + } + } + + // Used to solve static destruction ordering issue. Any static object + // that uses RequestContext must call this function in its constructor. + // + // See below link for more details. + // http://stackoverflow.com/questions/335369/ + // finding-c-static-initialization-order-problems#335746 + static folly::ThreadLocalPtr>& + getStaticContext() { + static folly::ThreadLocalPtr > context; + return context; + } + + private: + folly::RWSpinLock lock; + std::map> data_; +}; + +/** + * Set the request context for a specific scope. For example, + * if you ran a part of a request in another thread you could + * use RequestContextGuard to copy apply the request context + * inside the other therad. + */ +class RequestContextGuard { + public: + explicit RequestContextGuard(std::shared_ptr ctx) { + oldctx_ = RequestContext::setContext(std::move(ctx)); + } + + ~RequestContextGuard() { + RequestContext::setContext(std::move(oldctx_)); + } + + private: + std::shared_ptr oldctx_; +}; + +} diff --git a/folly/io/async/TimeoutManager.h b/folly/io/async/TimeoutManager.h new file mode 100644 index 00000000..e54ef20f --- /dev/null +++ b/folly/io/async/TimeoutManager.h @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include + +namespace folly { + +class AsyncTimeout; + +/** + * Base interface to be implemented by all classes expecting to manage + * timeouts. AsyncTimeout will use implementations of this interface + * to schedule/cancel timeouts. + */ +class TimeoutManager { + public: + enum class InternalEnum { + INTERNAL, + NORMAL + }; + + virtual ~TimeoutManager() {} + + /** + * Attaches/detaches TimeoutManager to AsyncTimeout + */ + virtual void attachTimeoutManager(AsyncTimeout* obj, + InternalEnum internal) = 0; + virtual void detachTimeoutManager(AsyncTimeout* obj) = 0; + + /** + * Schedules AsyncTimeout to fire after `timeout` milliseconds + */ + virtual bool scheduleTimeout(AsyncTimeout* obj, + std::chrono::milliseconds timeout) = 0; + + /** + * Cancels the AsyncTimeout, if scheduled + */ + virtual void cancelTimeout(AsyncTimeout* obj) = 0; + + /** + * This is used to mark the beginning of a new loop cycle by the + * first handler fired within that cycle. + */ + virtual bool bumpHandlingTime() = 0; + + /** + * Helper method to know whether we are running in the timeout manager + * thread + */ + virtual bool isInTimeoutManagerThread() = 0; +}; + +} // folly -- 2.34.1