/*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2014-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
namespace folly {
+static constexpr bool msgErrQueueSupported =
+#ifdef FOLLY_HAVE_MSG_ERRQUEUE
+ true;
+#else
+ false;
+#endif // FOLLY_HAVE_MSG_ERRQUEUE
+
const uint32_t AsyncServerSocket::kDefaultMaxAcceptAtOnce;
const uint32_t AsyncServerSocket::kDefaultCallbackAcceptAtOnce;
const uint32_t AsyncServerSocket::kDefaultMaxMessagesInQueue;
int old_flags = fcntl(fd, F_GETFD, 0);
// If reading the flags failed, return error indication now
- if (old_flags < 0)
+ if (old_flags < 0) {
return -1;
+ }
// Set just the flag we want to set
int new_flags;
- if (value != 0)
+ if (value != 0) {
new_flags = old_flags | FD_CLOEXEC;
- else
+ } else {
new_flags = old_flags & ~FD_CLOEXEC;
+ }
// Store modified flag word in the descriptor
return fcntl(fd, F_SETFD, new_flags);
*/
AsyncServerSocket::AsyncServerSocket(EventBase* eventBase)
-: eventBase_(eventBase),
- accepting_(false),
- maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce),
- maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue),
- acceptRateAdjustSpeed_(0),
- acceptRate_(1),
- lastAccepTimestamp_(std::chrono::steady_clock::now()),
- numDroppedConnections_(0),
- callbackIndex_(0),
- backoffTimeout_(nullptr),
- callbacks_(),
- keepAliveEnabled_(true),
- closeOnExec_(true),
- shutdownSocketSet_(nullptr) {
-}
-
-void AsyncServerSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
- if (shutdownSocketSet_ == newSS) {
+ : eventBase_(eventBase),
+ accepting_(false),
+ maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce),
+ maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue),
+ acceptRateAdjustSpeed_(0),
+ acceptRate_(1),
+ lastAccepTimestamp_(std::chrono::steady_clock::now()),
+ numDroppedConnections_(0),
+ callbackIndex_(0),
+ backoffTimeout_(nullptr),
+ callbacks_(),
+ keepAliveEnabled_(true),
+ closeOnExec_(true) {}
+
+void AsyncServerSocket::setShutdownSocketSet(
+ const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
+ const auto newSS = wNewSS.lock();
+ const auto shutdownSocketSet = wShutdownSocketSet_.lock();
+
+ if (shutdownSocketSet == newSS) {
return;
}
- if (shutdownSocketSet_) {
+
+ if (shutdownSocketSet) {
for (auto& h : sockets_) {
- shutdownSocketSet_->remove(h.socket_);
+ shutdownSocketSet->remove(h.socket_);
}
}
- shutdownSocketSet_ = newSS;
- if (shutdownSocketSet_) {
+
+ if (newSS) {
for (auto& h : sockets_) {
- shutdownSocketSet_->add(h.socket_);
+ newSS->add(h.socket_);
}
}
+
+ wShutdownSocketSet_ = wNewSS;
}
AsyncServerSocket::~AsyncServerSocket() {
for (; !sockets_.empty(); sockets_.pop_back()) {
auto& handler = sockets_.back();
handler.unregisterHandler();
- if (shutdownSocketSet_) {
- shutdownSocketSet_->close(handler.socket_);
+ if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+ shutdownSocketSet->close(handler.socket_);
} else if (shutdownFlags >= 0) {
result = shutdownNoInt(handler.socket_, shutdownFlags);
pendingCloseSockets_.push_back(handler.socket_);
}
}
+bool AsyncServerSocket::setZeroCopy(bool enable) {
+ if (msgErrQueueSupported) {
+ int fd = getSocket();
+ int val = enable ? 1 : 0;
+ int ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
+
+ return (0 == ret);
+ }
+
+ return false;
+}
+
void AsyncServerSocket::bind(const SocketAddress& address) {
if (eventBase_) {
eventBase_->dcheckIsInEventBaseThread();
for (const auto& socket : sockets_) {
if (socket.socket_ <= 0) {
continue;
- } else if (shutdownSocketSet_) {
- shutdownSocketSet_->close(socket.socket_);
+ } else if (
+ const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+ shutdownSocketSet->close(socket.socket_);
} else {
closeNoInt(socket.socket_);
}
}
#endif
- if (shutdownSocketSet_) {
- shutdownSocketSet_->add(fd);
+ if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+ shutdownSocketSet->add(fd);
}
}
// should use pauseAccepting() to temporarily back off accepting new
// connections, before they reach the point where their threads can't
// even accept new messages.
- LOG(ERROR) << "failed to dispatch newly accepted socket:"
- << " all accept callback queues are full";
+ LOG_EVERY_N(ERROR, 100) << "failed to dispatch newly accepted socket:"
+ << " all accept callback queues are full";
closeNoInt(socket);
if (connectionEventCallback_) {
connectionEventCallback_->onConnectionDropped(socket, addr);
if (callbackIndex_ == startingIndex) {
// The notification queues for all of the callbacks were full.
// We can't really do anything at this point.
- LOG(ERROR) << "failed to dispatch accept error: all accept callback "
- "queues are full: error msg: " <<
- msg.msg.c_str() << errnoValue;
+ LOG_EVERY_N(ERROR, 100)
+ << "failed to dispatch accept error: all accept"
+ << " callback queues are full: error msg: " << msg.msg << ": "
+ << errnoValue;
return;
}
info = nextCallback();