From 95084352f2064da765af36bb42158d9a9f99797f Mon Sep 17 00:00:00 2001 From: Dave Watson Date: Fri, 26 Sep 2014 13:49:11 -0700 Subject: [PATCH] AsyncSocket Summary: Move async socket to folly. Changes: * Made an AsyncSocketException type instead of TTransportException: Some of the exceptions didn't fit nicely in to std::exception types (like TIMED_OUT). There are some wrappers in thrift/lib/cpp/async to convert back to TTransportException, so all existing code still compiles. * Moved read/write callbacks out of AsyncTransport: filters are going to want to do the read/write stuff separately (see revproxy/tunnel/filters, and discussions in D1483148). Test Plan: fbconfig -r thrift; fbmake runtests contbuild should catch everything else - exception types shouldn't change for existing code Reviewed By: dcsommer@fb.com Subscribers: mshneer, folly-diffs@, trunkagent, doug, alandau, bmatheny, njormrod, fugalh, jsedgwick FB internal diff: D1587625 --- folly/Makefile.am | 2 + folly/io/async/AsyncSocket.cpp | 1967 +++++++++++++++++++++++++ folly/io/async/AsyncSocket.h | 766 ++++++++++ folly/io/async/AsyncSocketException.h | 79 + folly/io/async/AsyncTransport.h | 317 ++++ 5 files changed, 3131 insertions(+) create mode 100644 folly/io/async/AsyncSocket.cpp create mode 100644 folly/io/async/AsyncSocket.h create mode 100644 folly/io/async/AsyncSocketException.h create mode 100644 folly/io/async/AsyncTransport.h diff --git a/folly/Makefile.am b/folly/Makefile.am index f1ca4ac5..037daddd 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -131,6 +131,7 @@ nobase_follyinclude_HEADERS = \ io/ShutdownSocketSet.h \ io/async/AsyncTimeout.h \ io/async/AsyncServerSocket.h \ + io/async/AsyncSocket.h \ io/async/DelayedDestruction.h \ io/async/EventBase.h \ io/async/EventBaseManager.h \ @@ -258,6 +259,7 @@ libfolly_la_SOURCES = \ io/ShutdownSocketSet.cpp \ io/async/AsyncTimeout.cpp \ io/async/AsyncServerSocket.cpp \ + io/async/AsyncSocket.cpp \ io/async/EventBase.cpp \ io/async/EventBaseManager.cpp \ io/async/EventHandler.cpp \ diff --git a/folly/io/async/AsyncSocket.cpp b/folly/io/async/AsyncSocket.cpp new file mode 100644 index 00000000..69cfe867 --- /dev/null +++ b/folly/io/async/AsyncSocket.cpp @@ -0,0 +1,1967 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using std::string; +using std::unique_ptr; + +namespace folly { + +// static members initializers +const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap; +const folly::SocketAddress AsyncSocket::anyAddress = + folly::SocketAddress("0.0.0.0", 0); + +const AsyncSocketException socketClosedLocallyEx( + AsyncSocketException::END_OF_FILE, "socket closed locally"); +const AsyncSocketException socketShutdownForWritesEx( + AsyncSocketException::END_OF_FILE, "socket shutdown for writes"); + +// TODO: It might help performance to provide a version of WriteRequest that +// users could derive from, so we can avoid the extra allocation for each call +// to write()/writev(). We could templatize TFramedAsyncChannel just like the +// protocols are currently templatized for transports. +// +// We would need the version for external users where they provide the iovec +// storage space, and only our internal version would allocate it at the end of +// the WriteRequest. + +/** + * A WriteRequest object tracks information about a pending write() or writev() + * operation. + * + * A new WriteRequest operation is allocated on the heap for all write + * operations that cannot be completed immediately. + */ +class AsyncSocket::WriteRequest { + public: + static WriteRequest* newRequest(WriteCallback* callback, + const iovec* ops, + uint32_t opCount, + unique_ptr&& ioBuf, + WriteFlags flags) { + assert(opCount > 0); + // Since we put a variable size iovec array at the end + // of each WriteRequest, we have to manually allocate the memory. + void* buf = malloc(sizeof(WriteRequest) + + (opCount * sizeof(struct iovec))); + if (buf == nullptr) { + throw std::bad_alloc(); + } + + return new(buf) WriteRequest(callback, ops, opCount, std::move(ioBuf), + flags); + } + + void destroy() { + this->~WriteRequest(); + free(this); + } + + bool cork() const { + return isSet(flags_, WriteFlags::CORK); + } + + WriteFlags flags() const { + return flags_; + } + + WriteRequest* getNext() const { + return next_; + } + + WriteCallback* getCallback() const { + return callback_; + } + + uint32_t getBytesWritten() const { + return bytesWritten_; + } + + const struct iovec* getOps() const { + assert(opCount_ > opIndex_); + return writeOps_ + opIndex_; + } + + uint32_t getOpCount() const { + assert(opCount_ > opIndex_); + return opCount_ - opIndex_; + } + + void consume(uint32_t wholeOps, uint32_t partialBytes, + uint32_t totalBytesWritten) { + // Advance opIndex_ forward by wholeOps + opIndex_ += wholeOps; + assert(opIndex_ < opCount_); + + // If we've finished writing any IOBufs, release them + if (ioBuf_) { + for (uint32_t i = wholeOps; i != 0; --i) { + assert(ioBuf_); + ioBuf_ = ioBuf_->pop(); + } + } + + // Move partialBytes forward into the current iovec buffer + struct iovec* currentOp = writeOps_ + opIndex_; + assert((partialBytes < currentOp->iov_len) || (currentOp->iov_len == 0)); + currentOp->iov_base = + reinterpret_cast(currentOp->iov_base) + partialBytes; + currentOp->iov_len -= partialBytes; + + // Increment the bytesWritten_ count by totalBytesWritten + bytesWritten_ += totalBytesWritten; + } + + void append(WriteRequest* next) { + assert(next_ == nullptr); + next_ = next; + } + + private: + WriteRequest(WriteCallback* callback, + const struct iovec* ops, + uint32_t opCount, + unique_ptr&& ioBuf, + WriteFlags flags) + : next_(nullptr) + , callback_(callback) + , bytesWritten_(0) + , opCount_(opCount) + , opIndex_(0) + , flags_(flags) + , ioBuf_(std::move(ioBuf)) { + memcpy(writeOps_, ops, sizeof(*ops) * opCount_); + } + + // Private destructor, to ensure callers use destroy() + ~WriteRequest() {} + + WriteRequest* next_; ///< pointer to next WriteRequest + WriteCallback* callback_; ///< completion callback + uint32_t bytesWritten_; ///< bytes written + uint32_t opCount_; ///< number of entries in writeOps_ + uint32_t opIndex_; ///< current index into writeOps_ + WriteFlags flags_; ///< set for WriteFlags + unique_ptr ioBuf_; ///< underlying IOBuf, or nullptr if N/A + struct iovec writeOps_[]; ///< write operation(s) list +}; + +AsyncSocket::AsyncSocket(EventBase* evb) + : eventBase_(evb) + , writeTimeout_(this, evb) + , ioHandler_(this, evb) { + VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")"; + init(); +} + +AsyncSocket::AsyncSocket(EventBase* evb, + const folly::SocketAddress& address, + uint32_t connectTimeout) + : eventBase_(evb) + , writeTimeout_(this, evb) + , ioHandler_(this, evb) { + VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")"; + init(); + connect(nullptr, address, connectTimeout); +} + +AsyncSocket::AsyncSocket(EventBase* evb, + const std::string& ip, + uint16_t port, + uint32_t connectTimeout) + : eventBase_(evb) + , writeTimeout_(this, evb) + , ioHandler_(this, evb) { + VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")"; + init(); + connect(nullptr, ip, port, connectTimeout); +} + +AsyncSocket::AsyncSocket(EventBase* evb, int fd) + : eventBase_(evb) + , writeTimeout_(this, evb) + , ioHandler_(this, evb, fd) { + VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd=" + << fd << ")"; + init(); + fd_ = fd; + state_ = StateEnum::ESTABLISHED; +} + +// init() method, since constructor forwarding isn't supported in most +// compilers yet. +void AsyncSocket::init() { + assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + shutdownFlags_ = 0; + state_ = StateEnum::UNINIT; + eventFlags_ = EventHandler::NONE; + fd_ = -1; + sendTimeout_ = 0; + maxReadsPerEvent_ = 0; + connectCallback_ = nullptr; + readCallback_ = nullptr; + writeReqHead_ = nullptr; + writeReqTail_ = nullptr; + shutdownSocketSet_ = nullptr; + appBytesWritten_ = 0; + appBytesReceived_ = 0; +} + +AsyncSocket::~AsyncSocket() { + VLOG(7) << "actual destruction of AsyncSocket(this=" << this + << ", evb=" << eventBase_ << ", fd=" << fd_ + << ", state=" << state_ << ")"; +} + +void AsyncSocket::destroy() { + VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_ + << ", fd=" << fd_ << ", state=" << state_; + // When destroy is called, close the socket immediately + closeNow(); + + // Then call DelayedDestruction::destroy() to take care of + // whether or not we need immediate or delayed destruction + DelayedDestruction::destroy(); +} + +int AsyncSocket::detachFd() { + VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_ + << ", evb=" << eventBase_ << ", state=" << state_ + << ", events=" << std::hex << eventFlags_ << ")"; + // Extract the fd, and set fd_ to -1 first, so closeNow() won't + // actually close the descriptor. + if (shutdownSocketSet_) { + shutdownSocketSet_->remove(fd_); + } + int fd = fd_; + fd_ = -1; + // Call closeNow() to invoke all pending callbacks with an error. + closeNow(); + // Update the EventHandler to stop using this fd. + // This can only be done after closeNow() unregisters the handler. + ioHandler_.changeHandlerFD(-1); + return fd; +} + +void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) { + if (shutdownSocketSet_ == newSS) { + return; + } + if (shutdownSocketSet_ && fd_ != -1) { + shutdownSocketSet_->remove(fd_); + } + shutdownSocketSet_ = newSS; + if (shutdownSocketSet_ && fd_ != -1) { + shutdownSocketSet_->add(fd_); + } +} + +void AsyncSocket::connect(ConnectCallback* callback, + const folly::SocketAddress& address, + int timeout, + const OptionMap &options, + const folly::SocketAddress& bindAddr) noexcept { + DestructorGuard dg(this); + assert(eventBase_->isInEventBaseThread()); + + addr_ = address; + + // Make sure we're in the uninitialized state + if (state_ != StateEnum::UNINIT) { + return invalidState(callback); + } + + assert(fd_ == -1); + state_ = StateEnum::CONNECTING; + connectCallback_ = callback; + + sockaddr_storage addrStorage; + sockaddr* saddr = reinterpret_cast(&addrStorage); + + try { + // Create the socket + // Technically the first parameter should actually be a protocol family + // constant (PF_xxx) rather than an address family (AF_xxx), but the + // distinction is mainly just historical. In pretty much all + // implementations the PF_foo and AF_foo constants are identical. + fd_ = socket(address.getFamily(), SOCK_STREAM, 0); + if (fd_ < 0) { + throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR, + withAddr("failed to create socket"), errno); + } + if (shutdownSocketSet_) { + shutdownSocketSet_->add(fd_); + } + ioHandler_.changeHandlerFD(fd_); + + // Set the FD_CLOEXEC flag so that the socket will be closed if the program + // later forks and execs. + int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC); + if (rv != 0) { + throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR, + withAddr("failed to set close-on-exec flag"), + errno); + } + + // Put the socket in non-blocking mode + int flags = fcntl(fd_, F_GETFL, 0); + if (flags == -1) { + throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR, + withAddr("failed to get socket flags"), errno); + } + rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK); + if (rv == -1) { + throw AsyncSocketException( + AsyncSocketException::INTERNAL_ERROR, + withAddr("failed to put socket in non-blocking mode"), + errno); + } + +#if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE) + // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead + rv = fcntl(fd_, F_SETNOSIGPIPE, 1); + if (rv == -1) { + throw AsyncSocketException( + AsyncSocketException::INTERNAL_ERROR, + "failed to enable F_SETNOSIGPIPE on socket", + errno); + } +#endif + + // By default, turn on TCP_NODELAY + // If setNoDelay() fails, we continue anyway; this isn't a fatal error. + // setNoDelay() will log an error message if it fails. + if (address.getFamily() != AF_UNIX) { + (void)setNoDelay(true); + } + + VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_ + << ", fd=" << fd_ << ", host=" << address.describe().c_str(); + + // bind the socket + if (bindAddr != anyAddress) { + int one = 1; + if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) { + doClose(); + throw AsyncSocketException( + AsyncSocketException::NOT_OPEN, + "failed to setsockopt prior to bind on " + bindAddr.describe(), + errno); + } + + bindAddr.getAddress(&addrStorage); + + if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) { + doClose(); + throw AsyncSocketException(AsyncSocketException::NOT_OPEN, + "failed to bind to async socket: " + + bindAddr.describe(), + errno); + } + } + + // Apply the additional options if any. + for (const auto& opt: options) { + int rv = opt.first.apply(fd_, opt.second); + if (rv != 0) { + throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR, + withAddr("failed to set socket option"), + errno); + } + } + + // Perform the connect() + address.getAddress(&addrStorage); + + rv = ::connect(fd_, saddr, address.getActualSize()); + if (rv < 0) { + if (errno == EINPROGRESS) { + // Connection in progress. + if (timeout > 0) { + // Start a timer in case the connection takes too long. + if (!writeTimeout_.scheduleTimeout(timeout)) { + throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR, + withAddr("failed to schedule AsyncSocket connect timeout")); + } + } + + // Register for write events, so we'll + // be notified when the connection finishes/fails. + // Note that we don't register for a persistent event here. + assert(eventFlags_ == EventHandler::NONE); + eventFlags_ = EventHandler::WRITE; + if (!ioHandler_.registerHandler(eventFlags_)) { + throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR, + withAddr("failed to register AsyncSocket connect handler")); + } + return; + } else { + throw AsyncSocketException(AsyncSocketException::NOT_OPEN, + "connect failed (immediately)", errno); + } + } + + // If we're still here the connect() succeeded immediately. + // Fall through to call the callback outside of this try...catch block + } catch (const AsyncSocketException& ex) { + return failConnect(__func__, ex); + } catch (const std::exception& ex) { + // shouldn't happen, but handle it just in case + VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_ + << "): unexpected " << typeid(ex).name() << " exception: " + << ex.what(); + AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR, + withAddr(string("unexpected exception: ") + + ex.what())); + return failConnect(__func__, tex); + } + + // The connection succeeded immediately + // The read callback may not have been set yet, and no writes may be pending + // yet, so we don't have to register for any events at the moment. + VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this; + assert(readCallback_ == nullptr); + assert(writeReqHead_ == nullptr); + state_ = StateEnum::ESTABLISHED; + if (callback) { + connectCallback_ = nullptr; + callback->connectSuccess(); + } +} + +void AsyncSocket::connect(ConnectCallback* callback, + const string& ip, uint16_t port, + int timeout, + const OptionMap &options) noexcept { + DestructorGuard dg(this); + try { + connectCallback_ = callback; + connect(callback, folly::SocketAddress(ip, port), timeout, options); + } catch (const std::exception& ex) { + AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR, + ex.what()); + return failConnect(__func__, tex); + } +} + +void AsyncSocket::setSendTimeout(uint32_t milliseconds) { + sendTimeout_ = milliseconds; + assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + + // If we are currently pending on write requests, immediately update + // writeTimeout_ with the new value. + if ((eventFlags_ & EventHandler::WRITE) && + (state_ != StateEnum::CONNECTING)) { + assert(state_ == StateEnum::ESTABLISHED); + assert((shutdownFlags_ & SHUT_WRITE) == 0); + if (sendTimeout_ > 0) { + if (!writeTimeout_.scheduleTimeout(sendTimeout_)) { + AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, + withAddr("failed to reschedule send timeout in setSendTimeout")); + return failWrite(__func__, ex); + } + } else { + writeTimeout_.cancelTimeout(); + } + } +} + +void AsyncSocket::setReadCB(ReadCallback *callback) { + VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_ + << ", callback=" << callback << ", state=" << state_; + + // Short circuit if callback is the same as the existing readCallback_. + // + // Note that this is needed for proper functioning during some cleanup cases. + // During cleanup we allow setReadCallback(nullptr) to be called even if the + // read callback is already unset and we have been detached from an event + // base. This check prevents us from asserting + // eventBase_->isInEventBaseThread() when eventBase_ is nullptr. + if (callback == readCallback_) { + return; + } + + if (shutdownFlags_ & SHUT_READ) { + // Reads have already been shut down on this socket. + // + // Allow setReadCallback(nullptr) to be called in this case, but don't + // allow a new callback to be set. + // + // For example, setReadCallback(nullptr) can happen after an error if we + // invoke some other error callback before invoking readError(). The other + // error callback that is invoked first may go ahead and clear the read + // callback before we get a chance to invoke readError(). + if (callback != nullptr) { + return invalidState(callback); + } + assert((eventFlags_ & EventHandler::READ) == 0); + readCallback_ = nullptr; + return; + } + + DestructorGuard dg(this); + assert(eventBase_->isInEventBaseThread()); + + switch ((StateEnum)state_) { + case StateEnum::CONNECTING: + // For convenience, we allow the read callback to be set while we are + // still connecting. We just store the callback for now. Once the + // connection completes we'll register for read events. + readCallback_ = callback; + return; + case StateEnum::ESTABLISHED: + { + readCallback_ = callback; + uint16_t oldFlags = eventFlags_; + if (readCallback_) { + eventFlags_ |= EventHandler::READ; + } else { + eventFlags_ &= ~EventHandler::READ; + } + + // Update our registration if our flags have changed + if (eventFlags_ != oldFlags) { + // We intentionally ignore the return value here. + // updateEventRegistration() will move us into the error state if it + // fails, and we don't need to do anything else here afterwards. + (void)updateEventRegistration(); + } + + if (readCallback_) { + checkForImmediateRead(); + } + return; + } + case StateEnum::CLOSED: + case StateEnum::ERROR: + // We should never reach here. SHUT_READ should always be set + // if we are in STATE_CLOSED or STATE_ERROR. + assert(false); + return invalidState(callback); + case StateEnum::UNINIT: + // We do not allow setReadCallback() to be called before we start + // connecting. + return invalidState(callback); + } + + // We don't put a default case in the switch statement, so that the compiler + // will warn us to update the switch statement if a new state is added. + return invalidState(callback); +} + +AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const { + return readCallback_; +} + +void AsyncSocket::write(WriteCallback* callback, + const void* buf, size_t bytes, WriteFlags flags) { + iovec op; + op.iov_base = const_cast(buf); + op.iov_len = bytes; + writeImpl(callback, &op, 1, std::move(unique_ptr()), flags); +} + +void AsyncSocket::writev(WriteCallback* callback, + const iovec* vec, + size_t count, + WriteFlags flags) { + writeImpl(callback, vec, count, std::move(unique_ptr()), flags); +} + +void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr&& buf, + WriteFlags flags) { + size_t count = buf->countChainElements(); + if (count <= 64) { + iovec vec[count]; + writeChainImpl(callback, vec, count, std::move(buf), flags); + } else { + iovec* vec = new iovec[count]; + writeChainImpl(callback, vec, count, std::move(buf), flags); + delete[] vec; + } +} + +void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec, + size_t count, unique_ptr&& buf, WriteFlags flags) { + const IOBuf* head = buf.get(); + const IOBuf* next = head; + unsigned i = 0; + do { + vec[i].iov_base = const_cast(next->data()); + vec[i].iov_len = next->length(); + // IOBuf can get confused by empty iovec buffers, so increment the + // output pointer only if the iovec buffer is non-empty. We could + // end the loop with i < count, but that's ok. + if (vec[i].iov_len != 0) { + i++; + } + next = next->next(); + } while (next != head); + writeImpl(callback, vec, i, std::move(buf), flags); +} + +void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, + size_t count, unique_ptr&& buf, + WriteFlags flags) { + VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_ + << ", callback=" << callback << ", count=" << count + << ", state=" << state_; + DestructorGuard dg(this); + unique_ptrioBuf(std::move(buf)); + assert(eventBase_->isInEventBaseThread()); + + if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) { + // No new writes may be performed after the write side of the socket has + // been shutdown. + // + // We could just call callback->writeError() here to fail just this write. + // However, fail hard and use invalidState() to fail all outstanding + // callbacks and move the socket into the error state. There's most likely + // a bug in the caller's code, so we abort everything rather than trying to + // proceed as best we can. + return invalidState(callback); + } + + uint32_t countWritten = 0; + uint32_t partialWritten = 0; + int bytesWritten = 0; + bool mustRegister = false; + if (state_ == StateEnum::ESTABLISHED && !connecting()) { + if (writeReqHead_ == nullptr) { + // If we are established and there are no other writes pending, + // we can attempt to perform the write immediately. + assert(writeReqTail_ == nullptr); + assert((eventFlags_ & EventHandler::WRITE) == 0); + + bytesWritten = performWrite(vec, count, flags, + &countWritten, &partialWritten); + if (bytesWritten < 0) { + AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, + withAddr("writev failed"), errno); + return failWrite(__func__, callback, 0, ex); + } else if (countWritten == count) { + // We successfully wrote everything. + // Invoke the callback and return. + if (callback) { + callback->writeSuccess(); + } + return; + } // else { continue writing the next writeReq } + mustRegister = true; + } + } else if (!connecting()) { + // Invalid state for writing + return invalidState(callback); + } + + // Create a new WriteRequest to add to the queue + WriteRequest* req; + try { + req = WriteRequest::newRequest(callback, vec + countWritten, + count - countWritten, std::move(ioBuf), + flags); + } catch (const std::exception& ex) { + // we mainly expect to catch std::bad_alloc here + AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR, + withAddr(string("failed to append new WriteRequest: ") + ex.what())); + return failWrite(__func__, callback, bytesWritten, tex); + } + req->consume(0, partialWritten, bytesWritten); + if (writeReqTail_ == nullptr) { + assert(writeReqHead_ == nullptr); + writeReqHead_ = writeReqTail_ = req; + } else { + writeReqTail_->append(req); + writeReqTail_ = req; + } + + // Register for write events if are established and not currently + // waiting on write events + if (mustRegister) { + assert(state_ == StateEnum::ESTABLISHED); + assert((eventFlags_ & EventHandler::WRITE) == 0); + if (!updateEventRegistration(EventHandler::WRITE, 0)) { + assert(state_ == StateEnum::ERROR); + return; + } + if (sendTimeout_ > 0) { + // Schedule a timeout to fire if the write takes too long. + if (!writeTimeout_.scheduleTimeout(sendTimeout_)) { + AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, + withAddr("failed to schedule send timeout")); + return failWrite(__func__, ex); + } + } + } +} + +void AsyncSocket::close() { + VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_ + << ", state=" << state_ << ", shutdownFlags=" + << std::hex << (int) shutdownFlags_; + + // close() is only different from closeNow() when there are pending writes + // that need to drain before we can close. In all other cases, just call + // closeNow(). + // + // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or + // STATE_ERROR if close() is invoked while a previous closeNow() or failure + // is still running. (e.g., If there are multiple pending writes, and we + // call writeError() on the first one, it may call close(). In this case we + // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending + // writes will still be in the queue.) + // + // We only need to drain pending writes if we are still in STATE_CONNECTING + // or STATE_ESTABLISHED + if ((writeReqHead_ == nullptr) || + !(state_ == StateEnum::CONNECTING || + state_ == StateEnum::ESTABLISHED)) { + closeNow(); + return; + } + + // Declare a DestructorGuard to ensure that the AsyncSocket cannot be + // destroyed until close() returns. + DestructorGuard dg(this); + assert(eventBase_->isInEventBaseThread()); + + // Since there are write requests pending, we have to set the + // SHUT_WRITE_PENDING flag, and wait to perform the real close until the + // connect finishes and we finish writing these requests. + // + // Set SHUT_READ to indicate that reads are shut down, and set the + // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the + // pending writes complete. + shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING); + + // If a read callback is set, invoke readEOF() immediately to inform it that + // the socket has been closed and no more data can be read. + if (readCallback_) { + // Disable reads if they are enabled + if (!updateEventRegistration(0, EventHandler::READ)) { + // We're now in the error state; callbacks have been cleaned up + assert(state_ == StateEnum::ERROR); + assert(readCallback_ == nullptr); + } else { + ReadCallback* callback = readCallback_; + readCallback_ = nullptr; + callback->readEOF(); + } + } +} + +void AsyncSocket::closeNow() { + VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_ + << ", state=" << state_ << ", shutdownFlags=" + << std::hex << (int) shutdownFlags_; + DestructorGuard dg(this); + assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + + switch (state_) { + case StateEnum::ESTABLISHED: + case StateEnum::CONNECTING: + { + shutdownFlags_ |= (SHUT_READ | SHUT_WRITE); + state_ = StateEnum::CLOSED; + + // If the write timeout was set, cancel it. + writeTimeout_.cancelTimeout(); + + // If we are registered for I/O events, unregister. + if (eventFlags_ != EventHandler::NONE) { + eventFlags_ = EventHandler::NONE; + if (!updateEventRegistration()) { + // We will have been moved into the error state. + assert(state_ == StateEnum::ERROR); + return; + } + } + + if (fd_ >= 0) { + ioHandler_.changeHandlerFD(-1); + doClose(); + } + + if (connectCallback_) { + ConnectCallback* callback = connectCallback_; + connectCallback_ = nullptr; + callback->connectErr(socketClosedLocallyEx); + } + + failAllWrites(socketClosedLocallyEx); + + if (readCallback_) { + ReadCallback* callback = readCallback_; + readCallback_ = nullptr; + callback->readEOF(); + } + return; + } + case StateEnum::CLOSED: + // Do nothing. It's possible that we are being called recursively + // from inside a callback that we invoked inside another call to close() + // that is still running. + return; + case StateEnum::ERROR: + // Do nothing. The error handling code has performed (or is performing) + // cleanup. + return; + case StateEnum::UNINIT: + assert(eventFlags_ == EventHandler::NONE); + assert(connectCallback_ == nullptr); + assert(readCallback_ == nullptr); + assert(writeReqHead_ == nullptr); + shutdownFlags_ |= (SHUT_READ | SHUT_WRITE); + state_ = StateEnum::CLOSED; + return; + } + + LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_ + << ") called in unknown state " << state_; +} + +void AsyncSocket::closeWithReset() { + // Enable SO_LINGER, with the linger timeout set to 0. + // This will trigger a TCP reset when we close the socket. + if (fd_ >= 0) { + struct linger optLinger = {1, 0}; + if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) { + VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER " + << "on " << fd_ << ": errno=" << errno; + } + } + + // Then let closeNow() take care of the rest + closeNow(); +} + +void AsyncSocket::shutdownWrite() { + VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_ + << ", state=" << state_ << ", shutdownFlags=" + << std::hex << (int) shutdownFlags_; + + // If there are no pending writes, shutdownWrite() is identical to + // shutdownWriteNow(). + if (writeReqHead_ == nullptr) { + shutdownWriteNow(); + return; + } + + assert(eventBase_->isInEventBaseThread()); + + // There are pending writes. Set SHUT_WRITE_PENDING so that the actual + // shutdown will be performed once all writes complete. + shutdownFlags_ |= SHUT_WRITE_PENDING; +} + +void AsyncSocket::shutdownWriteNow() { + VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this + << ", fd=" << fd_ << ", state=" << state_ + << ", shutdownFlags=" << std::hex << (int) shutdownFlags_; + + if (shutdownFlags_ & SHUT_WRITE) { + // Writes are already shutdown; nothing else to do. + return; + } + + // If SHUT_READ is already set, just call closeNow() to completely + // close the socket. This can happen if close() was called with writes + // pending, and then shutdownWriteNow() is called before all pending writes + // complete. + if (shutdownFlags_ & SHUT_READ) { + closeNow(); + return; + } + + DestructorGuard dg(this); + assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + + switch (static_cast(state_)) { + case StateEnum::ESTABLISHED: + { + shutdownFlags_ |= SHUT_WRITE; + + // If the write timeout was set, cancel it. + writeTimeout_.cancelTimeout(); + + // If we are registered for write events, unregister. + if (!updateEventRegistration(0, EventHandler::WRITE)) { + // We will have been moved into the error state. + assert(state_ == StateEnum::ERROR); + return; + } + + // Shutdown writes on the file descriptor + ::shutdown(fd_, SHUT_WR); + + // Immediately fail all write requests + failAllWrites(socketShutdownForWritesEx); + return; + } + case StateEnum::CONNECTING: + { + // Set the SHUT_WRITE_PENDING flag. + // When the connection completes, it will check this flag, + // shutdown the write half of the socket, and then set SHUT_WRITE. + shutdownFlags_ |= SHUT_WRITE_PENDING; + + // Immediately fail all write requests + failAllWrites(socketShutdownForWritesEx); + return; + } + case StateEnum::UNINIT: + // Callers normally shouldn't call shutdownWriteNow() before the socket + // even starts connecting. Nonetheless, go ahead and set + // SHUT_WRITE_PENDING. Once the socket eventually connects it will + // immediately shut down the write side of the socket. + shutdownFlags_ |= SHUT_WRITE_PENDING; + return; + case StateEnum::CLOSED: + case StateEnum::ERROR: + // We should never get here. SHUT_WRITE should always be set + // in STATE_CLOSED and STATE_ERROR. + VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this + << ", fd=" << fd_ << ") in unexpected state " << state_ + << " with SHUT_WRITE not set (" + << std::hex << (int) shutdownFlags_ << ")"; + assert(false); + return; + } + + LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd=" + << fd_ << ") called in unknown state " << state_; +} + +bool AsyncSocket::readable() const { + if (fd_ == -1) { + return false; + } + struct pollfd fds[1]; + fds[0].fd = fd_; + fds[0].events = POLLIN; + fds[0].revents = 0; + int rc = poll(fds, 1, 0); + return rc == 1; +} + +bool AsyncSocket::isPending() const { + return ioHandler_.isPending(); +} + +bool AsyncSocket::hangup() const { + if (fd_ == -1) { + // sanity check, no one should ask for hangup if we are not connected. + assert(false); + return false; + } +#ifdef POLLRDHUP // Linux-only + struct pollfd fds[1]; + fds[0].fd = fd_; + fds[0].events = POLLRDHUP|POLLHUP; + fds[0].revents = 0; + poll(fds, 1, 0); + return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0; +#else + return false; +#endif +} + +bool AsyncSocket::good() const { + return ((state_ == StateEnum::CONNECTING || + state_ == StateEnum::ESTABLISHED) && + (shutdownFlags_ == 0) && (eventBase_ != nullptr)); +} + +bool AsyncSocket::error() const { + return (state_ == StateEnum::ERROR); +} + +void AsyncSocket::attachEventBase(EventBase* eventBase) { + VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_ + << ", old evb=" << eventBase_ << ", new evb=" << eventBase + << ", state=" << state_ << ", events=" + << std::hex << eventFlags_ << ")"; + assert(eventBase_ == nullptr); + assert(eventBase->isInEventBaseThread()); + + eventBase_ = eventBase; + ioHandler_.attachEventBase(eventBase); + writeTimeout_.attachEventBase(eventBase); +} + +void AsyncSocket::detachEventBase() { + VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_ + << ", old evb=" << eventBase_ << ", state=" << state_ + << ", events=" << std::hex << eventFlags_ << ")"; + assert(eventBase_ != nullptr); + assert(eventBase_->isInEventBaseThread()); + + eventBase_ = nullptr; + ioHandler_.detachEventBase(); + writeTimeout_.detachEventBase(); +} + +bool AsyncSocket::isDetachable() const { + DCHECK(eventBase_ != nullptr); + DCHECK(eventBase_->isInEventBaseThread()); + + return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled(); +} + +void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const { + address->setFromLocalAddress(fd_); +} + +void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const { + if (!addr_.isInitialized()) { + addr_.setFromPeerAddress(fd_); + } + *address = addr_; +} + +int AsyncSocket::setNoDelay(bool noDelay) { + if (fd_ < 0) { + VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket " + << this << "(state=" << state_ << ")"; + return EINVAL; + + } + + int value = noDelay ? 1 : 0; + if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) { + int errnoCopy = errno; + VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket " + << this << " (fd=" << fd_ << ", state=" << state_ << "): " + << strerror(errnoCopy); + return errnoCopy; + } + + return 0; +} + +int AsyncSocket::setCongestionFlavor(const std::string &cname) { + + #ifndef TCP_CONGESTION + #define TCP_CONGESTION 13 + #endif + + if (fd_ < 0) { + VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open " + << "socket " << this << "(state=" << state_ << ")"; + return EINVAL; + + } + + if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(), + cname.length() + 1) != 0) { + int errnoCopy = errno; + VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket " + << this << "(fd=" << fd_ << ", state=" << state_ << "): " + << strerror(errnoCopy); + return errnoCopy; + } + + return 0; +} + +int AsyncSocket::setQuickAck(bool quickack) { + if (fd_ < 0) { + VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket " + << this << "(state=" << state_ << ")"; + return EINVAL; + + } + +#ifdef TCP_QUICKACK // Linux-only + int value = quickack ? 1 : 0; + if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) { + int errnoCopy = errno; + VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket" + << this << "(fd=" << fd_ << ", state=" << state_ << "): " + << strerror(errnoCopy); + return errnoCopy; + } + + return 0; +#else + return ENOSYS; +#endif +} + +int AsyncSocket::setSendBufSize(size_t bufsize) { + if (fd_ < 0) { + VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket " + << this << "(state=" << state_ << ")"; + return EINVAL; + } + + if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) { + int errnoCopy = errno; + VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket" + << this << "(fd=" << fd_ << ", state=" << state_ << "): " + << strerror(errnoCopy); + return errnoCopy; + } + + return 0; +} + +int AsyncSocket::setRecvBufSize(size_t bufsize) { + if (fd_ < 0) { + VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket " + << this << "(state=" << state_ << ")"; + return EINVAL; + } + + if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) { + int errnoCopy = errno; + VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket" + << this << "(fd=" << fd_ << ", state=" << state_ << "): " + << strerror(errnoCopy); + return errnoCopy; + } + + return 0; +} + +int AsyncSocket::setTCPProfile(int profd) { + if (fd_ < 0) { + VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket " + << this << "(state=" << state_ << ")"; + return EINVAL; + } + + if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) { + int errnoCopy = errno; + VLOG(2) << "failed to set socket namespace option on AsyncSocket" + << this << "(fd=" << fd_ << ", state=" << state_ << "): " + << strerror(errnoCopy); + return errnoCopy; + } + + return 0; +} + +void AsyncSocket::ioReady(uint16_t events) noexcept { + VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_ + << ", events=" << std::hex << events << ", state=" << state_; + DestructorGuard dg(this); + assert(events & EventHandler::READ_WRITE); + assert(eventBase_->isInEventBaseThread()); + + uint16_t relevantEvents = events & EventHandler::READ_WRITE; + if (relevantEvents == EventHandler::READ) { + handleRead(); + } else if (relevantEvents == EventHandler::WRITE) { + handleWrite(); + } else if (relevantEvents == EventHandler::READ_WRITE) { + EventBase* originalEventBase = eventBase_; + // If both read and write events are ready, process writes first. + handleWrite(); + + // Return now if handleWrite() detached us from our EventBase + if (eventBase_ != originalEventBase) { + return; + } + + // Only call handleRead() if a read callback is still installed. + // (It's possible that the read callback was uninstalled during + // handleWrite().) + if (readCallback_) { + handleRead(); + } + } else { + VLOG(4) << "AsyncSocket::ioRead() called with unexpected events " + << std::hex << events << "(this=" << this << ")"; + abort(); + } +} + +ssize_t AsyncSocket::performRead(void* buf, size_t buflen) { + ssize_t bytes = recv(fd_, buf, buflen, MSG_DONTWAIT); + if (bytes < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // No more data to read right now. + return READ_BLOCKING; + } else { + return READ_ERROR; + } + } else { + appBytesReceived_ += bytes; + return bytes; + } +} + +void AsyncSocket::handleRead() noexcept { + VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_ + << ", state=" << state_; + assert(state_ == StateEnum::ESTABLISHED); + assert((shutdownFlags_ & SHUT_READ) == 0); + assert(readCallback_ != nullptr); + assert(eventFlags_ & EventHandler::READ); + + // Loop until: + // - a read attempt would block + // - readCallback_ is uninstalled + // - the number of loop iterations exceeds the optional maximum + // - this AsyncSocket is moved to another EventBase + // + // When we invoke readDataAvailable() it may uninstall the readCallback_, + // which is why need to check for it here. + // + // The last bullet point is slightly subtle. readDataAvailable() may also + // detach this socket from this EventBase. However, before + // readDataAvailable() returns another thread may pick it up, attach it to + // a different EventBase, and install another readCallback_. We need to + // exit immediately after readDataAvailable() returns if the eventBase_ has + // changed. (The caller must perform some sort of locking to transfer the + // AsyncSocket between threads properly. This will be sufficient to ensure + // that this thread sees the updated eventBase_ variable after + // readDataAvailable() returns.) + uint16_t numReads = 0; + EventBase* originalEventBase = eventBase_; + while (readCallback_ && eventBase_ == originalEventBase) { + // Get the buffer to read into. + void* buf = nullptr; + size_t buflen = 0; + try { + readCallback_->getReadBuffer(&buf, &buflen); + } catch (const AsyncSocketException& ex) { + return failRead(__func__, ex); + } catch (const std::exception& ex) { + AsyncSocketException tex(AsyncSocketException::BAD_ARGS, + string("ReadCallback::getReadBuffer() " + "threw exception: ") + + ex.what()); + return failRead(__func__, tex); + } catch (...) { + AsyncSocketException ex(AsyncSocketException::BAD_ARGS, + "ReadCallback::getReadBuffer() threw " + "non-exception type"); + return failRead(__func__, ex); + } + if (buf == nullptr || buflen == 0) { + AsyncSocketException ex(AsyncSocketException::BAD_ARGS, + "ReadCallback::getReadBuffer() returned " + "empty buffer"); + return failRead(__func__, ex); + } + + // Perform the read + ssize_t bytesRead = performRead(buf, buflen); + if (bytesRead > 0) { + readCallback_->readDataAvailable(bytesRead); + // Fall through and continue around the loop if the read + // completely filled the available buffer. + // Note that readCallback_ may have been uninstalled or changed inside + // readDataAvailable(). + if (bytesRead < buflen) { + return; + } + } else if (bytesRead == READ_BLOCKING) { + // No more data to read right now. + return; + } else if (bytesRead == READ_ERROR) { + AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, + withAddr("recv() failed"), errno); + return failRead(__func__, ex); + } else { + assert(bytesRead == READ_EOF); + // EOF + shutdownFlags_ |= SHUT_READ; + if (!updateEventRegistration(0, EventHandler::READ)) { + // we've already been moved into STATE_ERROR + assert(state_ == StateEnum::ERROR); + assert(readCallback_ == nullptr); + return; + } + + ReadCallback* callback = readCallback_; + readCallback_ = nullptr; + callback->readEOF(); + return; + } + if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) { + return; + } + } +} + +/** + * This function attempts to write as much data as possible, until no more data + * can be written. + * + * - If it sends all available data, it unregisters for write events, and stops + * the writeTimeout_. + * + * - If not all of the data can be sent immediately, it reschedules + * writeTimeout_ (if a non-zero timeout is set), and ensures the handler is + * registered for write events. + */ +void AsyncSocket::handleWrite() noexcept { + VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_ + << ", state=" << state_; + if (state_ == StateEnum::CONNECTING) { + handleConnect(); + return; + } + + // Normal write + assert(state_ == StateEnum::ESTABLISHED); + assert((shutdownFlags_ & SHUT_WRITE) == 0); + assert(writeReqHead_ != nullptr); + + // Loop until we run out of write requests, + // or until this socket is moved to another EventBase. + // (See the comment in handleRead() explaining how this can happen.) + EventBase* originalEventBase = eventBase_; + while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) { + uint32_t countWritten; + uint32_t partialWritten; + WriteFlags writeFlags = writeReqHead_->flags(); + if (writeReqHead_->getNext() != nullptr) { + writeFlags = writeFlags | WriteFlags::CORK; + } + int bytesWritten = performWrite(writeReqHead_->getOps(), + writeReqHead_->getOpCount(), + writeFlags, &countWritten, &partialWritten); + if (bytesWritten < 0) { + AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, + withAddr("writev() failed"), errno); + return failWrite(__func__, ex); + } else if (countWritten == writeReqHead_->getOpCount()) { + // We finished this request + WriteRequest* req = writeReqHead_; + writeReqHead_ = req->getNext(); + + if (writeReqHead_ == nullptr) { + writeReqTail_ = nullptr; + // This is the last write request. + // Unregister for write events and cancel the send timer + // before we invoke the callback. We have to update the state properly + // before calling the callback, since it may want to detach us from + // the EventBase. + if (eventFlags_ & EventHandler::WRITE) { + if (!updateEventRegistration(0, EventHandler::WRITE)) { + assert(state_ == StateEnum::ERROR); + return; + } + // Stop the send timeout + writeTimeout_.cancelTimeout(); + } + assert(!writeTimeout_.isScheduled()); + + // If SHUT_WRITE_PENDING is set, we should shutdown the socket after + // we finish sending the last write request. + // + // We have to do this before invoking writeSuccess(), since + // writeSuccess() may detach us from our EventBase. + if (shutdownFlags_ & SHUT_WRITE_PENDING) { + assert(connectCallback_ == nullptr); + shutdownFlags_ |= SHUT_WRITE; + + if (shutdownFlags_ & SHUT_READ) { + // Reads have already been shutdown. Fully close the socket and + // move to STATE_CLOSED. + // + // Note: This code currently moves us to STATE_CLOSED even if + // close() hasn't ever been called. This can occur if we have + // received EOF from the peer and shutdownWrite() has been called + // locally. Should we bother staying in STATE_ESTABLISHED in this + // case, until close() is actually called? I can't think of a + // reason why we would need to do so. No other operations besides + // calling close() or destroying the socket can be performed at + // this point. + assert(readCallback_ == nullptr); + state_ = StateEnum::CLOSED; + if (fd_ >= 0) { + ioHandler_.changeHandlerFD(-1); + doClose(); + } + } else { + // Reads are still enabled, so we are only doing a half-shutdown + ::shutdown(fd_, SHUT_WR); + } + } + } + + // Invoke the callback + WriteCallback* callback = req->getCallback(); + req->destroy(); + if (callback) { + callback->writeSuccess(); + } + // We'll continue around the loop, trying to write another request + } else { + // Partial write. + writeReqHead_->consume(countWritten, partialWritten, bytesWritten); + // Stop after a partial write; it's highly likely that a subsequent write + // attempt will just return EAGAIN. + // + // Ensure that we are registered for write events. + if ((eventFlags_ & EventHandler::WRITE) == 0) { + if (!updateEventRegistration(EventHandler::WRITE, 0)) { + assert(state_ == StateEnum::ERROR); + return; + } + } + + // Reschedule the send timeout, since we have made some write progress. + if (sendTimeout_ > 0) { + if (!writeTimeout_.scheduleTimeout(sendTimeout_)) { + AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, + withAddr("failed to reschedule write timeout")); + return failWrite(__func__, ex); + } + } + return; + } + } +} + +void AsyncSocket::checkForImmediateRead() noexcept { + // We currently don't attempt to perform optimistic reads in AsyncSocket. + // (However, note that some subclasses do override this method.) + // + // Simply calling handleRead() here would be bad, as this would call + // readCallback_->getReadBuffer(), forcing the callback to allocate a read + // buffer even though no data may be available. This would waste lots of + // memory, since the buffer will sit around unused until the socket actually + // becomes readable. + // + // Checking if the socket is readable now also seems like it would probably + // be a pessimism. In most cases it probably wouldn't be readable, and we + // would just waste an extra system call. Even if it is readable, waiting to + // find out from libevent on the next event loop doesn't seem that bad. +} + +void AsyncSocket::handleInitialReadWrite() noexcept { + // Our callers should already be holding a DestructorGuard, but grab + // one here just to make sure, in case one of our calling code paths ever + // changes. + DestructorGuard dg(this); + + // If we have a readCallback_, make sure we enable read events. We + // may already be registered for reads if connectSuccess() set + // the read calback. + if (readCallback_ && !(eventFlags_ & EventHandler::READ)) { + assert(state_ == StateEnum::ESTABLISHED); + assert((shutdownFlags_ & SHUT_READ) == 0); + if (!updateEventRegistration(EventHandler::READ, 0)) { + assert(state_ == StateEnum::ERROR); + return; + } + checkForImmediateRead(); + } else if (readCallback_ == nullptr) { + // Unregister for read events. + updateEventRegistration(0, EventHandler::READ); + } + + // If we have write requests pending, try to send them immediately. + // Since we just finished accepting, there is a very good chance that we can + // write without blocking. + // + // However, we only process them if EventHandler::WRITE is not already set, + // which means that we're already blocked on a write attempt. (This can + // happen if connectSuccess() called write() before returning.) + if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) { + // Call handleWrite() to perform write processing. + handleWrite(); + } else if (writeReqHead_ == nullptr) { + // Unregister for write event. + updateEventRegistration(0, EventHandler::WRITE); + } +} + +void AsyncSocket::handleConnect() noexcept { + VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_ + << ", state=" << state_; + assert(state_ == StateEnum::CONNECTING); + // SHUT_WRITE can never be set while we are still connecting; + // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect + // finishes + assert((shutdownFlags_ & SHUT_WRITE) == 0); + + // In case we had a connect timeout, cancel the timeout + writeTimeout_.cancelTimeout(); + // We don't use a persistent registration when waiting on a connect event, + // so we have been automatically unregistered now. Update eventFlags_ to + // reflect reality. + assert(eventFlags_ == EventHandler::WRITE); + eventFlags_ = EventHandler::NONE; + + // Call getsockopt() to check if the connect succeeded + int error; + socklen_t len = sizeof(error); + int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len); + if (rv != 0) { + AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, + withAddr("error calling getsockopt() after connect"), + errno); + VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd=" + << fd_ << " host=" << addr_.describe() + << ") exception:" << ex.what(); + return failConnect(__func__, ex); + } + + if (error != 0) { + AsyncSocketException ex(AsyncSocketException::NOT_OPEN, + "connect failed", error); + VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd=" + << fd_ << " host=" << addr_.describe() + << ") exception: " << ex.what(); + return failConnect(__func__, ex); + } + + // Move into STATE_ESTABLISHED + state_ = StateEnum::ESTABLISHED; + + // If SHUT_WRITE_PENDING is set and we don't have any write requests to + // perform, immediately shutdown the write half of the socket. + if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) { + // SHUT_READ shouldn't be set. If close() is called on the socket while we + // are still connecting we just abort the connect rather than waiting for + // it to complete. + assert((shutdownFlags_ & SHUT_READ) == 0); + ::shutdown(fd_, SHUT_WR); + shutdownFlags_ |= SHUT_WRITE; + } + + VLOG(7) << "AsyncSocket " << this << ": fd " << fd_ + << "successfully connected; state=" << state_; + + // Remember the EventBase we are attached to, before we start invoking any + // callbacks (since the callbacks may call detachEventBase()). + EventBase* originalEventBase = eventBase_; + + // Call the connect callback. + if (connectCallback_) { + ConnectCallback* callback = connectCallback_; + connectCallback_ = nullptr; + callback->connectSuccess(); + } + + // Note that the connect callback may have changed our state. + // (set or unset the read callback, called write(), closed the socket, etc.) + // The following code needs to handle these situations correctly. + // + // If the socket has been closed, readCallback_ and writeReqHead_ will + // always be nullptr, so that will prevent us from trying to read or write. + // + // The main thing to check for is if eventBase_ is still originalEventBase. + // If not, we have been detached from this event base, so we shouldn't + // perform any more operations. + if (eventBase_ != originalEventBase) { + return; + } + + handleInitialReadWrite(); +} + +void AsyncSocket::timeoutExpired() noexcept { + VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: " + << "state=" << state_ << ", events=" << std::hex << eventFlags_; + DestructorGuard dg(this); + assert(eventBase_->isInEventBaseThread()); + + if (state_ == StateEnum::CONNECTING) { + // connect() timed out + // Unregister for I/O events. + AsyncSocketException ex(AsyncSocketException::TIMED_OUT, + "connect timed out"); + failConnect(__func__, ex); + } else { + // a normal write operation timed out + assert(state_ == StateEnum::ESTABLISHED); + AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out"); + failWrite(__func__, ex); + } +} + +ssize_t AsyncSocket::performWrite(const iovec* vec, + uint32_t count, + WriteFlags flags, + uint32_t* countWritten, + uint32_t* partialWritten) { + // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL + // We correctly handle EPIPE errors, so we never want to receive SIGPIPE + // (since it may terminate the program if the main program doesn't explicitly + // ignore it). + struct msghdr msg; + msg.msg_name = nullptr; + msg.msg_namelen = 0; + msg.msg_iov = const_cast(vec); +#ifdef IOV_MAX // not defined on Android + msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX); +#else + msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV); +#endif + msg.msg_control = nullptr; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + int msg_flags = MSG_DONTWAIT; + +#ifdef MSG_NOSIGNAL // Linux-only + msg_flags |= MSG_NOSIGNAL; + if (isSet(flags, WriteFlags::CORK)) { + // MSG_MORE tells the kernel we have more data to send, so wait for us to + // give it the rest of the data rather than immediately sending a partial + // frame, even when TCP_NODELAY is enabled. + msg_flags |= MSG_MORE; + } +#endif + if (isSet(flags, WriteFlags::EOR)) { + // marks that this is the last byte of a record (response) + msg_flags |= MSG_EOR; + } + ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags); + if (totalWritten < 0) { + if (errno == EAGAIN) { + // TCP buffer is full; we can't write any more data right now. + *countWritten = 0; + *partialWritten = 0; + return 0; + } + // error + *countWritten = 0; + *partialWritten = 0; + return -1; + } + + appBytesWritten_ += totalWritten; + + uint32_t bytesWritten; + uint32_t n; + for (bytesWritten = totalWritten, n = 0; n < count; ++n) { + const iovec* v = vec + n; + if (v->iov_len > bytesWritten) { + // Partial write finished in the middle of this iovec + *countWritten = n; + *partialWritten = bytesWritten; + return totalWritten; + } + + bytesWritten -= v->iov_len; + } + + assert(bytesWritten == 0); + *countWritten = n; + *partialWritten = 0; + return totalWritten; +} + +/** + * Re-register the EventHandler after eventFlags_ has changed. + * + * If an error occurs, fail() is called to move the socket into the error state + * and call all currently installed callbacks. After an error, the + * AsyncSocket is completely unregistered. + * + * @return Returns true on succcess, or false on error. + */ +bool AsyncSocket::updateEventRegistration() { + VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this + << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_ + << ", events=" << std::hex << eventFlags_; + assert(eventBase_->isInEventBaseThread()); + if (eventFlags_ == EventHandler::NONE) { + ioHandler_.unregisterHandler(); + return true; + } + + // Always register for persistent events, so we don't have to re-register + // after being called back. + if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) { + eventFlags_ = EventHandler::NONE; // we're not registered after error + AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, + withAddr("failed to update AsyncSocket event registration")); + fail("updateEventRegistration", ex); + return false; + } + + return true; +} + +bool AsyncSocket::updateEventRegistration(uint16_t enable, + uint16_t disable) { + uint16_t oldFlags = eventFlags_; + eventFlags_ |= enable; + eventFlags_ &= ~disable; + if (eventFlags_ == oldFlags) { + return true; + } else { + return updateEventRegistration(); + } +} + +void AsyncSocket::startFail() { + // startFail() should only be called once + assert(state_ != StateEnum::ERROR); + assert(getDestructorGuardCount() > 0); + state_ = StateEnum::ERROR; + // Ensure that SHUT_READ and SHUT_WRITE are set, + // so all future attempts to read or write will be rejected + shutdownFlags_ |= (SHUT_READ | SHUT_WRITE); + + if (eventFlags_ != EventHandler::NONE) { + eventFlags_ = EventHandler::NONE; + ioHandler_.unregisterHandler(); + } + writeTimeout_.cancelTimeout(); + + if (fd_ >= 0) { + ioHandler_.changeHandlerFD(-1); + doClose(); + } +} + +void AsyncSocket::finishFail() { + assert(state_ == StateEnum::ERROR); + assert(getDestructorGuardCount() > 0); + + AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, + withAddr("socket closing after error")); + if (connectCallback_) { + ConnectCallback* callback = connectCallback_; + connectCallback_ = nullptr; + callback->connectErr(ex); + } + + failAllWrites(ex); + + if (readCallback_) { + ReadCallback* callback = readCallback_; + readCallback_ = nullptr; + callback->readErr(ex); + } +} + +void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) { + VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state=" + << state_ << " host=" << addr_.describe() + << "): failed in " << fn << "(): " + << ex.what(); + startFail(); + finishFail(); +} + +void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) { + VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state=" + << state_ << " host=" << addr_.describe() + << "): failed while connecting in " << fn << "(): " + << ex.what(); + startFail(); + + if (connectCallback_ != nullptr) { + ConnectCallback* callback = connectCallback_; + connectCallback_ = nullptr; + callback->connectErr(ex); + } + + finishFail(); +} + +void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) { + VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state=" + << state_ << " host=" << addr_.describe() + << "): failed while reading in " << fn << "(): " + << ex.what(); + startFail(); + + if (readCallback_ != nullptr) { + ReadCallback* callback = readCallback_; + readCallback_ = nullptr; + callback->readErr(ex); + } + + finishFail(); +} + +void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) { + VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state=" + << state_ << " host=" << addr_.describe() + << "): failed while writing in " << fn << "(): " + << ex.what(); + startFail(); + + // Only invoke the first write callback, since the error occurred while + // writing this request. Let any other pending write callbacks be invoked in + // finishFail(). + if (writeReqHead_ != nullptr) { + WriteRequest* req = writeReqHead_; + writeReqHead_ = req->getNext(); + WriteCallback* callback = req->getCallback(); + uint32_t bytesWritten = req->getBytesWritten(); + req->destroy(); + if (callback) { + callback->writeErr(bytesWritten, ex); + } + } + + finishFail(); +} + +void AsyncSocket::failWrite(const char* fn, WriteCallback* callback, + size_t bytesWritten, + const AsyncSocketException& ex) { + // This version of failWrite() is used when the failure occurs before + // we've added the callback to writeReqHead_. + VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state=" + << state_ << " host=" << addr_.describe() + <<"): failed while writing in " << fn << "(): " + << ex.what(); + startFail(); + + if (callback != nullptr) { + callback->writeErr(bytesWritten, ex); + } + + finishFail(); +} + +void AsyncSocket::failAllWrites(const AsyncSocketException& ex) { + // Invoke writeError() on all write callbacks. + // This is used when writes are forcibly shutdown with write requests + // pending, or when an error occurs with writes pending. + while (writeReqHead_ != nullptr) { + WriteRequest* req = writeReqHead_; + writeReqHead_ = req->getNext(); + WriteCallback* callback = req->getCallback(); + if (callback) { + callback->writeErr(req->getBytesWritten(), ex); + } + req->destroy(); + } +} + +void AsyncSocket::invalidState(ConnectCallback* callback) { + VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ + << "): connect() called in invalid state " << state_; + + /* + * The invalidState() methods don't use the normal failure mechanisms, + * since we don't know what state we are in. We don't want to call + * startFail()/finishFail() recursively if we are already in the middle of + * cleaning up. + */ + + AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN, + "connect() called with socket in invalid state"); + if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) { + if (callback) { + callback->connectErr(ex); + } + } else { + // We can't use failConnect() here since connectCallback_ + // may already be set to another callback. Invoke this ConnectCallback + // here; any other connectCallback_ will be invoked in finishFail() + startFail(); + if (callback) { + callback->connectErr(ex); + } + finishFail(); + } +} + +void AsyncSocket::invalidState(ReadCallback* callback) { + VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ + << "): setReadCallback(" << callback + << ") called in invalid state " << state_; + + AsyncSocketException ex(AsyncSocketException::NOT_OPEN, + "setReadCallback() called with socket in " + "invalid state"); + if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) { + if (callback) { + callback->readErr(ex); + } + } else { + startFail(); + if (callback) { + callback->readErr(ex); + } + finishFail(); + } +} + +void AsyncSocket::invalidState(WriteCallback* callback) { + VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ + << "): write() called in invalid state " << state_; + + AsyncSocketException ex(AsyncSocketException::NOT_OPEN, + withAddr("write() called with socket in invalid state")); + if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) { + if (callback) { + callback->writeErr(0, ex); + } + } else { + startFail(); + if (callback) { + callback->writeErr(0, ex); + } + finishFail(); + } +} + +void AsyncSocket::doClose() { + if (fd_ == -1) return; + if (shutdownSocketSet_) { + shutdownSocketSet_->close(fd_); + } else { + ::close(fd_); + } + fd_ = -1; +} + +std::ostream& operator << (std::ostream& os, + const AsyncSocket::StateEnum& state) { + os << static_cast(state); + return os; +} + +std::string AsyncSocket::withAddr(const std::string& s) { + // Don't use addr_ directly because it may not be initialized + // e.g. if constructed from fd + folly::SocketAddress peer, local; + try { + getPeerAddress(&peer); + getLocalAddress(&local); + } catch (const std::exception&) { + // ignore + } catch (...) { + // ignore + } + return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")"; +} + +} // folly diff --git a/folly/io/async/AsyncSocket.h b/folly/io/async/AsyncSocket.h new file mode 100644 index 00000000..77bd2b0c --- /dev/null +++ b/folly/io/async/AsyncSocket.h @@ -0,0 +1,766 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace folly { + +/** + * A class for performing asynchronous I/O on a socket. + * + * AsyncSocket allows users to asynchronously wait for data on a socket, and + * to asynchronously send data. + * + * The APIs for reading and writing are intentionally asymmetric. Waiting for + * data to read is a persistent API: a callback is installed, and is notified + * whenever new data is available. It continues to be notified of new events + * until it is uninstalled. + * + * AsyncSocket does not provide read timeout functionality, because it + * typically cannot determine when the timeout should be active. Generally, a + * timeout should only be enabled when processing is blocked waiting on data + * from the remote endpoint. For server sockets, the timeout should not be + * active if the server is currently processing one or more outstanding + * requests for this socket. For client sockets, the timeout should not be + * active if there are no requests pending on the socket. Additionally, if a + * client has multiple pending requests, it will ususally want a separate + * timeout for each request, rather than a single read timeout. + * + * The write API is fairly intuitive: a user can request to send a block of + * data, and a callback will be informed once the entire block has been + * transferred to the kernel, or on error. AsyncSocket does provide a send + * timeout, since most callers want to give up if the remote end stops + * responding and no further progress can be made sending the data. + */ + +class AsyncSocket : virtual public AsyncTransport { + public: + typedef std::unique_ptr UniquePtr; + + class ConnectCallback { + public: + virtual ~ConnectCallback() {} + + /** + * connectSuccess() will be invoked when the connection has been + * successfully established. + */ + virtual void connectSuccess() noexcept = 0; + + /** + * connectErr() will be invoked if the connection attempt fails. + * + * @param ex An exception describing the error that occurred. + */ + virtual void connectErr(const AsyncSocketException& ex) + noexcept = 0; + }; + + class ReadCallback { + public: + virtual ~ReadCallback() {} + + /** + * When data becomes available, getReadBuffer() will be invoked to get the + * buffer into which data should be read. + * + * This method allows the ReadCallback to delay buffer allocation until + * data becomes available. This allows applications to manage large + * numbers of idle connections, without having to maintain a separate read + * buffer for each idle connection. + * + * It is possible that in some cases, getReadBuffer() may be called + * multiple times before readDataAvailable() is invoked. In this case, the + * data will be written to the buffer returned from the most recent call to + * readDataAvailable(). If the previous calls to readDataAvailable() + * returned different buffers, the ReadCallback is responsible for ensuring + * that they are not leaked. + * + * If getReadBuffer() throws an exception, returns a nullptr buffer, or + * returns a 0 length, the ReadCallback will be uninstalled and its + * readError() method will be invoked. + * + * getReadBuffer() is not allowed to change the transport state before it + * returns. (For example, it should never uninstall the read callback, or + * set a different read callback.) + * + * @param bufReturn getReadBuffer() should update *bufReturn to contain the + * address of the read buffer. This parameter will never + * be nullptr. + * @param lenReturn getReadBuffer() should update *lenReturn to contain the + * maximum number of bytes that may be written to the read + * buffer. This parameter will never be nullptr. + */ + virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0; + + /** + * readDataAvailable() will be invoked when data has been successfully read + * into the buffer returned by the last call to getReadBuffer(). + * + * The read callback remains installed after readDataAvailable() returns. + * It must be explicitly uninstalled to stop receiving read events. + * getReadBuffer() will be called at least once before each call to + * readDataAvailable(). getReadBuffer() will also be called before any + * call to readEOF(). + * + * @param len The number of bytes placed in the buffer. + */ + virtual void readDataAvailable(size_t len) noexcept = 0; + + /** + * readEOF() will be invoked when the transport is closed. + * + * The read callback will be automatically uninstalled immediately before + * readEOF() is invoked. + */ + virtual void readEOF() noexcept = 0; + + /** + * readError() will be invoked if an error occurs reading from the + * transport. + * + * The read callback will be automatically uninstalled immediately before + * readError() is invoked. + * + * @param ex An exception describing the error that occurred. + */ + virtual void readErr(const AsyncSocketException& ex) + noexcept = 0; + }; + + class WriteCallback { + public: + virtual ~WriteCallback() {} + + /** + * writeSuccess() will be invoked when all of the data has been + * successfully written. + * + * Note that this mainly signals that the buffer containing the data to + * write is no longer needed and may be freed or re-used. It does not + * guarantee that the data has been fully transmitted to the remote + * endpoint. For example, on socket-based transports, writeSuccess() only + * indicates that the data has been given to the kernel for eventual + * transmission. + */ + virtual void writeSuccess() noexcept = 0; + + /** + * writeError() will be invoked if an error occurs writing the data. + * + * @param bytesWritten The number of bytes that were successfull + * @param ex An exception describing the error that occurred. + */ + virtual void writeErr(size_t bytesWritten, + const AsyncSocketException& ex) + noexcept = 0; + }; + + /** + * Create a new unconnected AsyncSocket. + * + * connect() must later be called on this socket to establish a connection. + */ + explicit AsyncSocket(EventBase* evb); + + void setShutdownSocketSet(ShutdownSocketSet* ss); + + /** + * Create a new AsyncSocket and begin the connection process. + * + * @param evb EventBase that will manage this socket. + * @param address The address to connect to. + * @param connectTimeout Optional timeout in milliseconds for the connection + * attempt. + */ + AsyncSocket(EventBase* evb, + const folly::SocketAddress& address, + uint32_t connectTimeout = 0); + + /** + * Create a new AsyncSocket and begin the connection process. + * + * @param evb EventBase that will manage this socket. + * @param ip IP address to connect to (dotted-quad). + * @param port Destination port in host byte order. + * @param connectTimeout Optional timeout in milliseconds for the connection + * attempt. + */ + AsyncSocket(EventBase* evb, + const std::string& ip, + uint16_t port, + uint32_t connectTimeout = 0); + + /** + * Create a AsyncSocket from an already connected socket file descriptor. + * + * Note that while AsyncSocket enables TCP_NODELAY for sockets it creates + * when connecting, it does not change the socket options when given an + * existing file descriptor. If callers want TCP_NODELAY enabled when using + * this version of the constructor, they need to explicitly call + * setNoDelay(true) after the constructor returns. + * + * @param evb EventBase that will manage this socket. + * @param fd File descriptor to take over (should be a connected socket). + */ + AsyncSocket(EventBase* evb, int fd); + + /** + * Helper function to create a shared_ptr. + * + * This passes in the correct destructor object, since AsyncSocket's + * destructor is protected and cannot be invoked directly. + */ + static std::shared_ptr newSocket(EventBase* evb) { + return std::shared_ptr(new AsyncSocket(evb), + Destructor()); + } + + /** + * Helper function to create a shared_ptr. + */ + static std::shared_ptr newSocket( + EventBase* evb, + const folly::SocketAddress& address, + uint32_t connectTimeout = 0) { + return std::shared_ptr( + new AsyncSocket(evb, address, connectTimeout), + Destructor()); + } + + /** + * Helper function to create a shared_ptr. + */ + static std::shared_ptr newSocket( + EventBase* evb, + const std::string& ip, + uint16_t port, + uint32_t connectTimeout = 0) { + return std::shared_ptr( + new AsyncSocket(evb, ip, port, connectTimeout), + Destructor()); + } + + /** + * Helper function to create a shared_ptr. + */ + static std::shared_ptr newSocket(EventBase* evb, int fd) { + return std::shared_ptr(new AsyncSocket(evb, fd), + Destructor()); + } + + /** + * Destroy the socket. + * + * AsyncSocket::destroy() must be called to destroy the socket. + * The normal destructor is private, and should not be invoked directly. + * This prevents callers from deleting a AsyncSocket while it is invoking a + * callback. + */ + virtual void destroy(); + + /** + * Get the EventBase used by this socket. + */ + EventBase* getEventBase() const override { + return eventBase_; + } + + /** + * Get the file descriptor used by the AsyncSocket. + */ + virtual int getFd() const { + return fd_; + } + + /** + * Extract the file descriptor from the AsyncSocket. + * + * This will immediately cause any installed callbacks to be invoked with an + * error. The AsyncSocket may no longer be used after the file descriptor + * has been extracted. + * + * Returns the file descriptor. The caller assumes ownership of the + * descriptor, and it will not be closed when the AsyncSocket is destroyed. + */ + virtual int detachFd(); + + /** + * Uniquely identifies a handle to a socket option value. Each + * combination of level and option name corresponds to one socket + * option value. + */ + class OptionKey { + public: + bool operator<(const OptionKey& other) const { + if (level == other.level) { + return optname < other.optname; + } + return level < other.level; + } + int apply(int fd, int val) const { + return setsockopt(fd, level, optname, &val, sizeof(val)); + } + int level; + int optname; + }; + + // Maps from a socket option key to its value + typedef std::map OptionMap; + + static const OptionMap emptyOptionMap; + static const folly::SocketAddress anyAddress; + + /** + * Initiate a connection. + * + * @param callback The callback to inform when the connection attempt + * completes. + * @param address The address to connect to. + * @param timeout A timeout value, in milliseconds. If the connection + * does not succeed within this period, + * callback->connectError() will be invoked. + */ + virtual void connect(ConnectCallback* callback, + const folly::SocketAddress& address, + int timeout = 0, + const OptionMap &options = emptyOptionMap, + const folly::SocketAddress& bindAddr = anyAddress + ) noexcept; + void connect(ConnectCallback* callback, const std::string& ip, uint16_t port, + int timeout = 00, + const OptionMap &options = emptyOptionMap) noexcept; + + /** + * Set the send timeout. + * + * If write requests do not make any progress for more than the specified + * number of milliseconds, fail all pending writes and close the socket. + * + * If write requests are currently pending when setSendTimeout() is called, + * the timeout interval is immediately restarted using the new value. + * + * (See the comments for AsyncSocket for an explanation of why AsyncSocket + * provides setSendTimeout() but not setRecvTimeout().) + * + * @param milliseconds The timeout duration, in milliseconds. If 0, no + * timeout will be used. + */ + void setSendTimeout(uint32_t milliseconds) override; + + /** + * Get the send timeout. + * + * @return Returns the current send timeout, in milliseconds. A return value + * of 0 indicates that no timeout is set. + */ + uint32_t getSendTimeout() const override { + return sendTimeout_; + } + + /** + * Set the maximum number of reads to execute from the underlying + * socket each time the EventBase detects that new ingress data is + * available. The default is unlimited, but callers can use this method + * to limit the amount of data read from the socket per event loop + * iteration. + * + * @param maxReads Maximum number of reads per data-available event; + * a value of zero means unlimited. + */ + void setMaxReadsPerEvent(uint16_t maxReads) { + maxReadsPerEvent_ = maxReads; + } + + /** + * Get the maximum number of reads this object will execute from + * the underlying socket each time the EventBase detects that new + * ingress data is available. + * + * @returns Maximum number of reads per data-available event; a value + * of zero means unlimited. + */ + uint16_t getMaxReadsPerEvent() const { + return maxReadsPerEvent_; + } + + // Read and write methods + void setReadCB(ReadCallback* callback); + ReadCallback* getReadCallback() const; + + void write(WriteCallback* callback, const void* buf, size_t bytes, + WriteFlags flags = WriteFlags::NONE); + void writev(WriteCallback* callback, const iovec* vec, size_t count, + WriteFlags flags = WriteFlags::NONE); + void writeChain(WriteCallback* callback, + std::unique_ptr&& buf, + WriteFlags flags = WriteFlags::NONE); + + // Methods inherited from AsyncTransport + void close() override; + void closeNow() override; + void closeWithReset() override; + void shutdownWrite() override; + void shutdownWriteNow() override; + + bool readable() const override; + bool isPending() const override; + virtual bool hangup() const; + bool good() const override; + bool error() const override; + void attachEventBase(EventBase* eventBase) override; + void detachEventBase() override; + bool isDetachable() const override; + + void getLocalAddress( + folly::SocketAddress* address) const override; + void getPeerAddress( + folly::SocketAddress* address) const override; + + bool isEorTrackingEnabled() const override { return false; } + + void setEorTracking(bool track) override {} + + bool connecting() const override { + return (state_ == StateEnum::CONNECTING); + } + + size_t getAppBytesWritten() const override { + return appBytesWritten_; + } + + size_t getRawBytesWritten() const override { + return getAppBytesWritten(); + } + + size_t getAppBytesReceived() const override { + return appBytesReceived_; + } + + size_t getRawBytesReceived() const override { + return getAppBytesReceived(); + } + + // Methods controlling socket options + + /** + * Force writes to be transmitted immediately. + * + * This controls the TCP_NODELAY socket option. When enabled, TCP segments + * are sent as soon as possible, even if it is not a full frame of data. + * When disabled, the data may be buffered briefly to try and wait for a full + * frame of data. + * + * By default, TCP_NODELAY is enabled for AsyncSocket objects. + * + * This method will fail if the socket is not currently open. + * + * @return Returns 0 if the TCP_NODELAY flag was successfully updated, + * or a non-zero errno value on error. + */ + int setNoDelay(bool noDelay); + + /* + * Set the Flavor of Congestion Control to be used for this Socket + * Please check '/lib/modules//kernel/net/ipv4' for tcp_*.ko + * first to make sure the module is available for plugging in + * Alternatively you can choose from net.ipv4.tcp_allowed_congestion_control + */ + int setCongestionFlavor(const std::string &cname); + + /* + * Forces ACKs to be sent immediately + * + * @return Returns 0 if the TCP_QUICKACK flag was successfully updated, + * or a non-zero errno value on error. + */ + int setQuickAck(bool quickack); + + /** + * Set the send bufsize + */ + int setSendBufSize(size_t bufsize); + + /** + * Set the recv bufsize + */ + int setRecvBufSize(size_t bufsize); + + /** + * Sets a specific tcp personality + * Available only on kernels 3.2 and greater + */ + #define SO_SET_NAMESPACE 41 + int setTCPProfile(int profd); + + + /** + * Generic API for reading a socket option. + * + * @param level same as the "level" parameter in getsockopt(). + * @param optname same as the "optname" parameter in getsockopt(). + * @param optval pointer to the variable in which the option value should + * be returned. + * @return same as the return value of getsockopt(). + */ + template + int getSockOpt(int level, int optname, T *optval) { + return getsockopt(fd_, level, optname, optval, sizeof(T)); + } + + /** + * Generic API for setting a socket option. + * + * @param level same as the "level" parameter in getsockopt(). + * @param optname same as the "optname" parameter in getsockopt(). + * @param optval the option value to set. + * @return same as the return value of setsockopt(). + */ + template + int setSockOpt(int level, int optname, const T *optval) { + return setsockopt(fd_, level, optname, optval, sizeof(T)); + } + + protected: + enum ReadResultEnum { + READ_EOF = 0, + READ_ERROR = -1, + READ_BLOCKING = -2, + }; + + /** + * Protected destructor. + * + * Users of AsyncSocket must never delete it directly. Instead, invoke + * destroy() instead. (See the documentation in DelayedDestruction.h for + * more details.) + */ + ~AsyncSocket(); + + enum class StateEnum : uint8_t { + UNINIT, + CONNECTING, + ESTABLISHED, + CLOSED, + ERROR + }; + + friend std::ostream& operator << (std::ostream& os, const StateEnum& state); + + enum ShutdownFlags { + /// shutdownWrite() called, but we are still waiting on writes to drain + SHUT_WRITE_PENDING = 0x01, + /// writes have been completely shut down + SHUT_WRITE = 0x02, + /** + * Reads have been shutdown. + * + * At the moment we don't distinguish between remote read shutdown + * (received EOF from the remote end) and local read shutdown. We can + * only receive EOF when a read callback is set, and we immediately inform + * it of the EOF. Therefore there doesn't seem to be any reason to have a + * separate state of "received EOF but the local side may still want to + * read". + * + * We also don't currently provide any API for only shutting down the read + * side of a socket. (This is a no-op as far as TCP is concerned, anyway.) + */ + SHUT_READ = 0x04, + }; + + class WriteRequest; + + class WriteTimeout : public AsyncTimeout { + public: + WriteTimeout(AsyncSocket* socket, EventBase* eventBase) + : AsyncTimeout(eventBase) + , socket_(socket) {} + + virtual void timeoutExpired() noexcept { + socket_->timeoutExpired(); + } + + private: + AsyncSocket* socket_; + }; + + class IoHandler : public EventHandler { + public: + IoHandler(AsyncSocket* socket, EventBase* eventBase) + : EventHandler(eventBase, -1) + , socket_(socket) {} + IoHandler(AsyncSocket* socket, EventBase* eventBase, int fd) + : EventHandler(eventBase, fd) + , socket_(socket) {} + + virtual void handlerReady(uint16_t events) noexcept { + socket_->ioReady(events); + } + + private: + AsyncSocket* socket_; + }; + + void init(); + + // event notification methods + void ioReady(uint16_t events) noexcept; + virtual void checkForImmediateRead() noexcept; + virtual void handleInitialReadWrite() noexcept; + virtual void handleRead() noexcept; + virtual void handleWrite() noexcept; + virtual void handleConnect() noexcept; + void timeoutExpired() noexcept; + + /** + * Attempt to read from the socket. + * + * @param buf The buffer to read data into. + * @param buflen The length of the buffer. + * + * @return Returns the number of bytes read, or READ_EOF on EOF, or + * READ_ERROR on error, or READ_BLOCKING if the operation will + * block. + */ + virtual ssize_t performRead(void* buf, size_t buflen); + + /** + * Populate an iovec array from an IOBuf and attempt to write it. + * + * @param callback Write completion/error callback. + * @param vec Target iovec array; caller retains ownership. + * @param count Number of IOBufs to write, beginning at start of buf. + * @param buf Chain of iovecs. + * @param flags set of flags for the underlying write calls, like cork + */ + void writeChainImpl(WriteCallback* callback, iovec* vec, + size_t count, std::unique_ptr&& buf, + WriteFlags flags); + + /** + * Write as much data as possible to the socket without blocking, + * and queue up any leftover data to send when the socket can + * handle writes again. + * + * @param callback The callback to invoke when the write is completed. + * @param vec Array of buffers to write; this method will make a + * copy of the vector (but not the buffers themselves) + * if the write has to be completed asynchronously. + * @param count Number of elements in vec. + * @param buf The IOBuf that manages the buffers referenced by + * vec, or a pointer to nullptr if the buffers are not + * associated with an IOBuf. Note that ownership of + * the IOBuf is transferred here; upon completion of + * the write, the AsyncSocket deletes the IOBuf. + * @param flags Set of write flags. + */ + void writeImpl(WriteCallback* callback, const iovec* vec, size_t count, + std::unique_ptr&& buf, + WriteFlags flags = WriteFlags::NONE); + + /** + * Attempt to write to the socket. + * + * @param vec The iovec array pointing to the buffers to write. + * @param count The length of the iovec array. + * @param flags Set of write flags. + * @param countWritten On return, the value pointed to by this parameter + * will contain the number of iovec entries that were + * fully written. + * @param partialWritten On return, the value pointed to by this parameter + * will contain the number of bytes written in the + * partially written iovec entry. + * + * @return Returns the total number of bytes written, or -1 on error. If no + * data can be written immediately, 0 is returned. + */ + virtual ssize_t performWrite(const iovec* vec, uint32_t count, + WriteFlags flags, uint32_t* countWritten, + uint32_t* partialWritten); + + bool updateEventRegistration(); + + /** + * Update event registration. + * + * @param enable Flags of events to enable. Set it to 0 if no events + * need to be enabled in this call. + * @param disable Flags of events + * to disable. Set it to 0 if no events need to be disabled in this + * call. + * + * @return true iff the update is successful. + */ + bool updateEventRegistration(uint16_t enable, uint16_t disable); + + // Actually close the file descriptor and set it to -1 so we don't + // accidentally close it again. + void doClose(); + + // error handling methods + void startFail(); + void finishFail(); + void fail(const char* fn, const AsyncSocketException& ex); + void failConnect(const char* fn, const AsyncSocketException& ex); + void failRead(const char* fn, const AsyncSocketException& ex); + void failWrite(const char* fn, WriteCallback* callback, size_t bytesWritten, + const AsyncSocketException& ex); + void failWrite(const char* fn, const AsyncSocketException& ex); + void failAllWrites(const AsyncSocketException& ex); + void invalidState(ConnectCallback* callback); + void invalidState(ReadCallback* callback); + void invalidState(WriteCallback* callback); + + std::string withAddr(const std::string& s); + + StateEnum state_; ///< StateEnum describing current state + uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags) + uint16_t eventFlags_; ///< EventBase::HandlerFlags settings + int fd_; ///< The socket file descriptor + mutable + folly::SocketAddress addr_; ///< The address we tried to connect to + uint32_t sendTimeout_; ///< The send timeout, in milliseconds + uint16_t maxReadsPerEvent_; ///< Max reads per event loop iteration + EventBase* eventBase_; ///< The EventBase + WriteTimeout writeTimeout_; ///< A timeout for connect and write + IoHandler ioHandler_; ///< A EventHandler to monitor the fd + + ConnectCallback* connectCallback_; ///< ConnectCallback + ReadCallback* readCallback_; ///< ReadCallback + WriteRequest* writeReqHead_; ///< Chain of WriteRequests + WriteRequest* writeReqTail_; ///< End of WriteRequest chain + ShutdownSocketSet* shutdownSocketSet_; + size_t appBytesReceived_; ///< Num of bytes received from socket + size_t appBytesWritten_; ///< Num of bytes written to socket +}; + + +} // folly diff --git a/folly/io/async/AsyncSocketException.h b/folly/io/async/AsyncSocketException.h new file mode 100644 index 00000000..762e9bc0 --- /dev/null +++ b/folly/io/async/AsyncSocketException.h @@ -0,0 +1,79 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace folly { + +class AsyncSocketException : public std::runtime_error { + public: + enum AsyncSocketExceptionType + { UNKNOWN = 0 + , NOT_OPEN = 1 + , ALREADY_OPEN = 2 + , TIMED_OUT = 3 + , END_OF_FILE = 4 + , INTERRUPTED = 5 + , BAD_ARGS = 6 + , CORRUPTED_DATA = 7 + , INTERNAL_ERROR = 8 + , NOT_SUPPORTED = 9 + , INVALID_STATE = 10 + , SSL_ERROR = 12 + , COULD_NOT_BIND = 13 + , SASL_HANDSHAKE_TIMEOUT = 14 + }; + + AsyncSocketException( + AsyncSocketExceptionType type, const std::string& message) : + std::runtime_error(message), + type_(type), errno_(0) {} + + /** Error code */ + AsyncSocketExceptionType type_; + + /** A copy of the errno. */ + int errno_; + + AsyncSocketException(AsyncSocketExceptionType type, + const std::string& message, + int errno_copy) : + std::runtime_error(getMessage(message, errno_copy)), + type_(type), errno_(errno_copy) {} + + AsyncSocketExceptionType getType() const noexcept { return type_; } + int getErrno() const noexcept { return errno_; } + + protected: + /** Just like strerror_r but returns a C++ string object. */ + std::string strerror_s(int errno_copy) { + return "errno = " + folly::to(errno_copy); + } + + /** Return a message based on the input. */ + std::string getMessage(const std::string &message, + int errno_copy) { + if (errno_copy != 0) { + return message + ": " + strerror_s(errno_copy); + } else { + return message; + } + } +}; + +} // folly diff --git a/folly/io/async/AsyncTransport.h b/folly/io/async/AsyncTransport.h new file mode 100644 index 00000000..0b929c16 --- /dev/null +++ b/folly/io/async/AsyncTransport.h @@ -0,0 +1,317 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace folly { + +/* + * flags given by the application for write* calls + */ +enum class WriteFlags : uint32_t { + NONE = 0x00, + /* + * Whether to delay the output until a subsequent non-corked write. + * (Note: may not be supported in all subclasses or on all platforms.) + */ + CORK = 0x01, + /* + * for a socket that has ACK latency enabled, it will cause the kernel + * to fire a TCP ESTATS event when the last byte of the given write call + * will be acknowledged. + */ + EOR = 0x02, +}; + +/* + * union operator + */ +inline WriteFlags operator|(WriteFlags a, WriteFlags b) { + return static_cast( + static_cast(a) | static_cast(b)); +} + +/* + * intersection operator + */ +inline WriteFlags operator&(WriteFlags a, WriteFlags b) { + return static_cast( + static_cast(a) & static_cast(b)); +} + +/* + * exclusion parameter + */ +inline WriteFlags operator~(WriteFlags a) { + return static_cast(~static_cast(a)); +} + +/* + * unset operator + */ +inline WriteFlags unSet(WriteFlags a, WriteFlags b) { + return a & ~b; +} + +/* + * inclusion operator + */ +inline bool isSet(WriteFlags a, WriteFlags b) { + return (a & b) == b; +} + + +/** + * AsyncTransport defines an asynchronous API for streaming I/O. + * + * This class provides an API to for asynchronously waiting for data + * on a streaming transport, and for asynchronously sending data. + * + * The APIs for reading and writing are intentionally asymmetric. Waiting for + * data to read is a persistent API: a callback is installed, and is notified + * whenever new data is available. It continues to be notified of new events + * until it is uninstalled. + * + * AsyncTransport does not provide read timeout functionality, because it + * typically cannot determine when the timeout should be active. Generally, a + * timeout should only be enabled when processing is blocked waiting on data + * from the remote endpoint. For server-side applications, the timeout should + * not be active if the server is currently processing one or more outstanding + * requests on this transport. For client-side applications, the timeout + * should not be active if there are no requests pending on the transport. + * Additionally, if a client has multiple pending requests, it will ususally + * want a separate timeout for each request, rather than a single read timeout. + * + * The write API is fairly intuitive: a user can request to send a block of + * data, and a callback will be informed once the entire block has been + * transferred to the kernel, or on error. AsyncTransport does provide a send + * timeout, since most callers want to give up if the remote end stops + * responding and no further progress can be made sending the data. + */ +class AsyncTransport : public DelayedDestruction { + public: + typedef std::unique_ptr UniquePtr; + + /** + * Close the transport. + * + * This gracefully closes the transport, waiting for all pending write + * requests to complete before actually closing the underlying transport. + * + * If a read callback is set, readEOF() will be called immediately. If there + * are outstanding write requests, the close will be delayed until all + * remaining writes have completed. No new writes may be started after + * close() has been called. + */ + virtual void close() = 0; + + /** + * Close the transport immediately. + * + * This closes the transport immediately, dropping any outstanding data + * waiting to be written. + * + * If a read callback is set, readEOF() will be called immediately. + * If there are outstanding write requests, these requests will be aborted + * and writeError() will be invoked immediately on all outstanding write + * callbacks. + */ + virtual void closeNow() = 0; + + /** + * Reset the transport immediately. + * + * This closes the transport immediately, sending a reset to the remote peer + * if possible to indicate abnormal shutdown. + * + * Note that not all subclasses implement this reset functionality: some + * subclasses may treat reset() the same as closeNow(). Subclasses that use + * TCP transports should terminate the connection with a TCP reset. + */ + virtual void closeWithReset() { + closeNow(); + } + + /** + * Perform a half-shutdown of the write side of the transport. + * + * The caller should not make any more calls to write() or writev() after + * shutdownWrite() is called. Any future write attempts will fail + * immediately. + * + * Not all transport types support half-shutdown. If the underlying + * transport does not support half-shutdown, it will fully shutdown both the + * read and write sides of the transport. (Fully shutting down the socket is + * better than doing nothing at all, since the caller may rely on the + * shutdownWrite() call to notify the other end of the connection that no + * more data can be read.) + * + * If there is pending data still waiting to be written on the transport, + * the actual shutdown will be delayed until the pending data has been + * written. + * + * Note: There is no corresponding shutdownRead() equivalent. Simply + * uninstall the read callback if you wish to stop reading. (On TCP sockets + * at least, shutting down the read side of the socket is a no-op anyway.) + */ + virtual void shutdownWrite() = 0; + + /** + * Perform a half-shutdown of the write side of the transport. + * + * shutdownWriteNow() is identical to shutdownWrite(), except that it + * immediately performs the shutdown, rather than waiting for pending writes + * to complete. Any pending write requests will be immediately failed when + * shutdownWriteNow() is called. + */ + virtual void shutdownWriteNow() = 0; + + /** + * Determine if transport is open and ready to read or write. + * + * Note that this function returns false on EOF; you must also call error() + * to distinguish between an EOF and an error. + * + * @return true iff the transport is open and ready, false otherwise. + */ + virtual bool good() const = 0; + + /** + * Determine if the transport is readable or not. + * + * @return true iff the transport is readable, false otherwise. + */ + virtual bool readable() const = 0; + + /** + * Determine if the there is pending data on the transport. + * + * @return true iff the if the there is pending data, false otherwise. + */ + virtual bool isPending() const { + return readable(); + } + /** + * Determine if transport is connected to the endpoint + * + * @return false iff the transport is connected, otherwise true + */ + virtual bool connecting() const = 0; + + /** + * Determine if an error has occurred with this transport. + * + * @return true iff an error has occurred (not EOF). + */ + virtual bool error() const = 0; + + /** + * Attach the transport to a EventBase. + * + * This may only be called if the transport is not currently attached to a + * EventBase (by an earlier call to detachEventBase()). + * + * This method must be invoked in the EventBase's thread. + */ + virtual void attachEventBase(EventBase* eventBase) = 0; + + /** + * Detach the transport from its EventBase. + * + * This may only be called when the transport is idle and has no reads or + * writes pending. Once detached, the transport may not be used again until + * it is re-attached to a EventBase by calling attachEventBase(). + * + * This method must be called from the current EventBase's thread. + */ + virtual void detachEventBase() = 0; + + /** + * Determine if the transport can be detached. + * + * This method must be called from the current EventBase's thread. + */ + virtual bool isDetachable() const = 0; + + /** + * Get the EventBase used by this transport. + * + * Returns nullptr if this transport is not currently attached to a + * EventBase. + */ + virtual EventBase* getEventBase() const = 0; + + /** + * Set the send timeout. + * + * If write requests do not make any progress for more than the specified + * number of milliseconds, fail all pending writes and close the transport. + * + * If write requests are currently pending when setSendTimeout() is called, + * the timeout interval is immediately restarted using the new value. + * + * @param milliseconds The timeout duration, in milliseconds. If 0, no + * timeout will be used. + */ + virtual void setSendTimeout(uint32_t milliseconds) = 0; + + /** + * Get the send timeout. + * + * @return Returns the current send timeout, in milliseconds. A return value + * of 0 indicates that no timeout is set. + */ + virtual uint32_t getSendTimeout() const = 0; + + /** + * Get the address of the local endpoint of this transport. + * + * This function may throw AsyncSocketException on error. + * + * @param address The local address will be stored in the specified + * SocketAddress. + */ + virtual void getLocalAddress(folly::SocketAddress* address) const = 0; + + /** + * Get the address of the remote endpoint to which this transport is + * connected. + * + * This function may throw AsyncSocketException on error. + * + * @param address The remote endpoint's address will be stored in the + * specified SocketAddress. + */ + virtual void getPeerAddress(folly::SocketAddress* address) const = 0; + + /** + * @return True iff end of record tracking is enabled + */ + virtual bool isEorTrackingEnabled() const = 0; + + virtual void setEorTracking(bool track) = 0; + + virtual size_t getAppBytesWritten() const = 0; + virtual size_t getRawBytesWritten() const = 0; + virtual size_t getAppBytesReceived() const = 0; + virtual size_t getRawBytesReceived() const = 0; + + protected: + virtual ~AsyncTransport() {} +}; + + +} // folly -- 2.34.1