/*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2014-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <folly/io/async/AsyncServerSocket.h>
#include <folly/FileUtil.h>
+#include <folly/Portability.h>
#include <folly/SocketAddress.h>
+#include <folly/String.h>
+#include <folly/detail/SocketFastOpen.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/NotificationQueue.h>
+#include <folly/portability/Fcntl.h>
+#include <folly/portability/Sockets.h>
+#include <folly/portability/Unistd.h>
#include <errno.h>
-#include <fcntl.h>
-#include <netinet/tcp.h>
#include <string.h>
-#include <sys/socket.h>
#include <sys/types.h>
-#include <unistd.h>
+
+namespace fsp = folly::portability::sockets;
namespace folly {
+static constexpr bool msgErrQueueSupported =
+#ifdef FOLLY_HAVE_MSG_ERRQUEUE
+ true;
+#else
+ false;
+#endif // FOLLY_HAVE_MSG_ERRQUEUE
+
const uint32_t AsyncServerSocket::kDefaultMaxAcceptAtOnce;
const uint32_t AsyncServerSocket::kDefaultCallbackAcceptAtOnce;
const uint32_t AsyncServerSocket::kDefaultMaxMessagesInQueue;
int old_flags = fcntl(fd, F_GETFD, 0);
// If reading the flags failed, return error indication now
- if (old_flags < 0)
+ if (old_flags < 0) {
return -1;
+ }
// Set just the flag we want to set
int new_flags;
- if (value != 0)
+ if (value != 0) {
new_flags = old_flags | FD_CLOEXEC;
- else
+ } else {
new_flags = old_flags & ~FD_CLOEXEC;
+ }
// Store modified flag word in the descriptor
return fcntl(fd, F_SETFD, new_flags);
}
void AsyncServerSocket::RemoteAcceptor::messageAvailable(
- QueueMessage&& msg) {
-
+ QueueMessage&& msg) noexcept {
switch (msg.type) {
case MessageType::MSG_NEW_CONN:
{
public:
// Disallow copy, move, and default constructors.
BackoffTimeout(BackoffTimeout&&) = delete;
- BackoffTimeout(AsyncServerSocket* socket)
+ explicit BackoffTimeout(AsyncServerSocket* socket)
: AsyncTimeout(socket->getEventBase()), socket_(socket) {}
void timeoutExpired() noexcept override { socket_->backoffTimeoutExpired(); }
*/
AsyncServerSocket::AsyncServerSocket(EventBase* eventBase)
-: eventBase_(eventBase),
- accepting_(false),
- maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce),
- maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue),
- acceptRateAdjustSpeed_(0),
- acceptRate_(1),
- lastAccepTimestamp_(std::chrono::steady_clock::now()),
- numDroppedConnections_(0),
- callbackIndex_(0),
- backoffTimeout_(nullptr),
- callbacks_(),
- keepAliveEnabled_(true),
- closeOnExec_(true),
- shutdownSocketSet_(nullptr) {
-}
-
-void AsyncServerSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
- if (shutdownSocketSet_ == newSS) {
+ : eventBase_(eventBase),
+ accepting_(false),
+ maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce),
+ maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue),
+ acceptRateAdjustSpeed_(0),
+ acceptRate_(1),
+ lastAccepTimestamp_(std::chrono::steady_clock::now()),
+ numDroppedConnections_(0),
+ callbackIndex_(0),
+ backoffTimeout_(nullptr),
+ callbacks_(),
+ keepAliveEnabled_(true),
+ closeOnExec_(true) {}
+
+void AsyncServerSocket::setShutdownSocketSet(
+ const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
+ const auto newSS = wNewSS.lock();
+ const auto shutdownSocketSet = wShutdownSocketSet_.lock();
+
+ if (shutdownSocketSet == newSS) {
return;
}
- if (shutdownSocketSet_) {
+
+ if (shutdownSocketSet) {
for (auto& h : sockets_) {
- shutdownSocketSet_->remove(h.socket_);
+ shutdownSocketSet->remove(h.socket_);
}
}
- shutdownSocketSet_ = newSS;
- if (shutdownSocketSet_) {
+
+ if (newSS) {
for (auto& h : sockets_) {
- shutdownSocketSet_->add(h.socket_);
+ newSS->add(h.socket_);
}
}
+
+ wShutdownSocketSet_ = wNewSS;
}
AsyncServerSocket::~AsyncServerSocket() {
VLOG(10) << "AsyncServerSocket::stopAccepting " << this <<
handler.socket_;
}
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
// When destroy is called, unregister and close the socket immediately.
accepting_ = false;
for (; !sockets_.empty(); sockets_.pop_back()) {
auto& handler = sockets_.back();
handler.unregisterHandler();
- if (shutdownSocketSet_) {
- shutdownSocketSet_->close(handler.socket_);
+ if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+ shutdownSocketSet->close(handler.socket_);
} else if (shutdownFlags >= 0) {
result = shutdownNoInt(handler.socket_, shutdownFlags);
pendingCloseSockets_.push_back(handler.socket_);
for (std::vector<CallbackInfo>::iterator it = callbacksCopy.begin();
it != callbacksCopy.end();
++it) {
- it->consumer->stop(it->eventBase, it->callback);
+ // consumer may not be set if we are running in primary event base
+ if (it->consumer) {
+ DCHECK(it->eventBase);
+ it->consumer->stop(it->eventBase, it->callback);
+ } else {
+ DCHECK(it->callback);
+ it->callback->acceptStopped();
+ }
}
return result;
void AsyncServerSocket::attachEventBase(EventBase *eventBase) {
assert(eventBase_ == nullptr);
- assert(eventBase->isInEventBaseThread());
+ eventBase->dcheckIsInEventBaseThread();
eventBase_ = eventBase;
for (auto& handler : sockets_) {
void AsyncServerSocket::detachEventBase() {
assert(eventBase_ != nullptr);
- assert(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
assert(!accepting_);
eventBase_ = nullptr;
}
void AsyncServerSocket::useExistingSockets(const std::vector<int>& fds) {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
if (sockets_.size() > 0) {
throw std::invalid_argument(
SocketAddress address;
address.setFromLocalAddress(fd);
- setupSocket(fd);
+#if __linux__
+ if (noTransparentTls_) {
+ // Ignore return value, errors are ok
+ setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
+ }
+#endif
+
+ setupSocket(fd, address.getFamily());
sockets_.emplace_back(eventBase_, fd, this, address.getFamily());
sockets_.back().changeHandlerFD(fd);
}
sockaddr_storage addrStorage;
address.getAddress(&addrStorage);
sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
- if (::bind(fd, saddr, address.getActualSize()) != 0) {
+
+ if (fsp::bind(fd, saddr, address.getActualSize()) != 0) {
if (!isExistingSocket) {
closeNoInt(fd);
}
address.describe());
}
+#if __linux__
+ if (noTransparentTls_) {
+ // Ignore return value, errors are ok
+ setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
+ }
+#endif
+
// If we just created this socket, update the EventHandler and set socket_
if (!isExistingSocket) {
sockets_.emplace_back(eventBase_, fd, this, address.getFamily());
}
}
+bool AsyncServerSocket::setZeroCopy(bool enable) {
+ if (msgErrQueueSupported) {
+ int fd = getSocket();
+ int val = enable ? 1 : 0;
+ int ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
+
+ return (0 == ret);
+ }
+
+ return false;
+}
+
void AsyncServerSocket::bind(const SocketAddress& address) {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
// useExistingSocket() may have been called to initialize socket_ already.
// However, in the normal case we need to create a new socket now.
}
void AsyncServerSocket::bind(uint16_t port) {
- struct addrinfo hints, *res, *res0;
+ struct addrinfo hints, *res0;
char sport[sizeof("65536")];
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = AI_PASSIVE;
+ hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
snprintf(sport, sizeof(sport), "%u", port);
- if (getaddrinfo(nullptr, sport, &hints, &res0)) {
+ // On Windows the value we need to pass to bind to all available
+ // addresses is an empty string. Everywhere else, it's nullptr.
+ constexpr const char* kWildcardNode = kIsWindows ? "" : nullptr;
+ if (getaddrinfo(kWildcardNode, sport, &hints, &res0)) {
throw std::invalid_argument(
"Attempted to bind address to socket with "
"bad getaddrinfo");
SCOPE_EXIT { freeaddrinfo(res0); };
auto setupAddress = [&] (struct addrinfo* res) {
- int s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ int s = fsp::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
// IPv6/IPv4 may not be supported by the kernel
if (s < 0 && errno == EAFNOSUPPORT) {
return;
CHECK_GE(s, 0);
try {
- setupSocket(s);
+ setupSocket(s, res->ai_family);
} catch (...) {
closeNoInt(s);
throw;
&v6only, sizeof(v6only)));
}
+ // Bind to the socket
+ if (fsp::bind(s, res->ai_addr, socklen_t(res->ai_addrlen)) != 0) {
+ folly::throwSystemError(
+ errno,
+ "failed to bind to async server socket for port ",
+ SocketAddress::getPortFrom(res->ai_addr),
+ " family ",
+ SocketAddress::getFamilyNameFrom(res->ai_addr, "<unknown>"));
+ }
+
+#if __linux__
+ if (noTransparentTls_) {
+ // Ignore return value, errors are ok
+ setsockopt(s, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
+ }
+#endif
+
SocketAddress address;
address.setFromLocalAddress(s);
sockets_.emplace_back(eventBase_, s, this, address.getFamily());
-
- // Bind to the socket
- if (::bind(s, res->ai_addr, res->ai_addrlen) != 0) {
- folly::throwSystemError(
- errno,
- "failed to bind to async server socket for port");
- }
};
const int kNumTries = 25;
// - 0.0.0.0 (IPv4-only)
// - :: (IPv6+IPv4) in this order
// See: https://sourceware.org/bugzilla/show_bug.cgi?id=9981
- for (res = res0; res; res = res->ai_next) {
+ for (struct addrinfo* res = res0; res; res = res->ai_next) {
if (res->ai_family == AF_INET6) {
setupAddress(res);
}
}
try {
- for (res = res0; res; res = res->ai_next) {
+ for (struct addrinfo* res = res0; res; res = res->ai_next) {
if (res->ai_family != AF_INET6) {
setupAddress(res);
}
}
- } catch (const std::system_error& e) {
+ } catch (const std::system_error&) {
// If we can't bind to the same port on ipv4 as ipv6 when using
// port=0 then we will retry again before giving up after
// kNumTries attempts. We do this by closing the sockets that
for (const auto& socket : sockets_) {
if (socket.socket_ <= 0) {
continue;
- } else if (shutdownSocketSet_) {
- shutdownSocketSet_->close(socket.socket_);
+ } else if (
+ const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+ shutdownSocketSet->close(socket.socket_);
} else {
closeNoInt(socket.socket_);
}
}
void AsyncServerSocket::listen(int backlog) {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
// Start listening
for (auto& handler : sockets_) {
- if (::listen(handler.socket_, backlog) == -1) {
+ if (fsp::listen(handler.socket_, backlog) == -1) {
folly::throwSystemError(errno,
"failed to listen on async server socket");
}
void AsyncServerSocket::addAcceptCallback(AcceptCallback *callback,
EventBase *eventBase,
uint32_t maxAtOnce) {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
// If this is the first accept callback and we are supposed to be accepting,
// start accepting once the callback is installed.
bool runStartAccepting = accepting_ && callbacks_.empty();
+ callbacks_.emplace_back(callback, eventBase);
+
+ SCOPE_SUCCESS {
+ // If this is the first accept callback and we are supposed to be accepting,
+ // start accepting.
+ if (runStartAccepting) {
+ startAccepting();
+ }
+ };
+
if (!eventBase) {
- eventBase = eventBase_; // Run in AsyncServerSocket's eventbase
+ // Run in AsyncServerSocket's eventbase; notify that we are
+ // starting to accept connections
+ callback->acceptStarted();
+ return;
}
- callbacks_.emplace_back(callback, eventBase);
-
// Start the remote acceptor.
//
// It would be nice if we could avoid starting the remote acceptor if
throw;
}
callbacks_.back().consumer = acceptor;
-
- // If this is the first accept callback and we are supposed to be accepting,
- // start accepting.
- if (runStartAccepting) {
- startAccepting();
- }
}
void AsyncServerSocket::removeAcceptCallback(AcceptCallback *callback,
EventBase *eventBase) {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
// Find the matching AcceptCallback.
// We just do a simple linear search; we don't expect removeAcceptCallback()
}
}
- info.consumer->stop(info.eventBase, info.callback);
+ if (info.consumer) {
+ // consumer could be nullptr is we run callbacks in primary event
+ // base
+ DCHECK(info.eventBase);
+ info.consumer->stop(info.eventBase, info.callback);
+ } else {
+ // callback invoked in the primary event base, just call directly
+ DCHECK(info.callback);
+ callback->acceptStopped();
+ }
// If we are supposed to be accepting but the last accept callback
// was removed, unregister for events until a callback is added.
}
void AsyncServerSocket::startAccepting() {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
accepting_ = true;
if (callbacks_.empty()) {
}
void AsyncServerSocket::pauseAccepting() {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ if (eventBase_) {
+ eventBase_->dcheckIsInEventBaseThread();
+ }
accepting_ = false;
for (auto& handler : sockets_) {
handler. unregisterHandler();
}
int AsyncServerSocket::createSocket(int family) {
- int fd = socket(family, SOCK_STREAM, 0);
+ int fd = fsp::socket(family, SOCK_STREAM, 0);
if (fd == -1) {
folly::throwSystemError(errno, "error creating async server socket");
}
try {
- setupSocket(fd);
+ setupSocket(fd, family);
} catch (...) {
closeNoInt(fd);
throw;
return fd;
}
-void AsyncServerSocket::setupSocket(int fd) {
- // Get the address family
- SocketAddress address;
- address.setFromLocalAddress(fd);
-
+void AsyncServerSocket::setupSocket(int fd, int family) {
// Put the socket in non-blocking mode
if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) {
folly::throwSystemError(errno,
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(int)) != 0) {
LOG(ERROR) << "failed to set SO_REUSEPORT on async server socket "
<< strerror(errno);
+#ifdef WIN32
+ folly::throwSystemError(errno, "failed to bind to the async server socket");
+#else
+ SocketAddress address;
+ address.setFromLocalAddress(fd);
folly::throwSystemError(errno,
"failed to bind to async server socket: " +
address.describe());
+#endif
}
// Set keepalive as desired
// Set TCP nodelay if available, MAC OS X Hack
// See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
#ifndef TCP_NOPUSH
- auto family = address.getFamily();
if (family != AF_UNIX) {
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) != 0) {
// This isn't a fatal error; just log an error message and continue
}
#endif
- if (shutdownSocketSet_) {
- shutdownSocketSet_->add(fd);
+#if FOLLY_ALLOW_TFO
+ if (tfo_ && detail::tfo_enable(fd, tfoMaxQueueSize_) != 0) {
+ // This isn't a fatal error; just log an error message and continue
+ LOG(WARNING) << "failed to set TCP_FASTOPEN on async server socket: "
+ << folly::errnoStr(errno);
+ }
+#endif
+
+ if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+ shutdownSocketSet->add(fd);
}
}
-void AsyncServerSocket::handlerReady(
- uint16_t events, int fd, sa_family_t addressFamily) noexcept {
+void AsyncServerSocket::handlerReady(uint16_t /* events */,
+ int fd,
+ sa_family_t addressFamily) noexcept {
assert(!callbacks_.empty());
DestructorGuard dg(this);
// should use pauseAccepting() to temporarily back off accepting new
// connections, before they reach the point where their threads can't
// even accept new messages.
- LOG(ERROR) << "failed to dispatch newly accepted socket:"
- << " all accept callback queues are full";
+ LOG_EVERY_N(ERROR, 100) << "failed to dispatch newly accepted socket:"
+ << " all accept callback queues are full";
closeNoInt(socket);
if (connectionEventCallback_) {
connectionEventCallback_->onConnectionDropped(socket, addr);
if (callbackIndex_ == startingIndex) {
// The notification queues for all of the callbacks were full.
// We can't really do anything at this point.
- LOG(ERROR) << "failed to dispatch accept error: all accept callback "
- "queues are full: error msg: " <<
- msg.msg.c_str() << errnoValue;
+ LOG_EVERY_N(ERROR, 100)
+ << "failed to dispatch accept error: all accept"
+ << " callback queues are full: error msg: " << msg.msg << ": "
+ << errnoValue;
return;
}
info = nextCallback();
if (backoffTimeout_ == nullptr) {
try {
backoffTimeout_ = new BackoffTimeout(this);
- } catch (const std::bad_alloc& ex) {
+ } catch (const std::bad_alloc&) {
// Man, we couldn't even allocate the timer to re-enable accepts.
// We must be in pretty bad shape. Don't pause accepting for now,
// since we won't be able to re-enable ourselves later.
// the backoff timeout.
assert(accepting_);
// We can't be detached from the EventBase without being paused
- assert(eventBase_ != nullptr && eventBase_->isInEventBaseThread());
+ assert(eventBase_ != nullptr);
+ eventBase_->dcheckIsInEventBaseThread();
// If all of the callbacks were removed, we shouldn't re-enable accepts
if (callbacks_.empty()) {
}
}
-
-
-} // folly
+} // namespace folly