Replace ShutdownSocketSet to singleton
authorVitaly Berov <vitalyb@fb.com>
Fri, 13 Oct 2017 16:30:02 +0000 (09:30 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Fri, 13 Oct 2017 16:34:43 +0000 (09:34 -0700)
Summary:
We recently found out that ShutdownSocketSet consumes 150+MB for our service, which uses duplex channels. The problem is that we create ~1000 of ThriftServers, and each of the creates its own ShutdownSocketSet.
In reality, ShutdownSocketSet is only needed to kill all socket's FD in emergency before crash dump is taken, so they don't hand around waiting for crash dump to complete. There is no need to keep a SSS per ThriftServer, singleton should work just fine.
There is a problem here, though. Currently a ThriftServer has 'immediateShutdown' method, which kills all sockets from SSS. So, if SSS becomes a singleton, and we have more than one ThriftServer, calling 'immediateShutdown' on one will kill sockets from the other one. First, it's a quite surprising behavior, and second, it complicates unit tests, which emulate thrift servers running in different processes.

As a result,
1. ShutdownSocketSet is created as a singleton, but each ThriftServer still keeps weak ptr to it (mostly for unit tests support).
2. replaceShutdownSocketSet method is added to ThriftServer.h, so unit tests could set different SSS for different ThriftServers.
3. method immediateShutdown is removed from ThriftServer, because its behavior would be 'surprising'.

There still may be unexpected consequences of this change for the tests because of Singleton, but let's see.

Reviewed By: yfeldblum

Differential Revision: D6015576

fbshipit-source-id: dab70dbf82d01bcc71bbe063f983e862911ceb24

folly/io/ShutdownSocketSet.cpp
folly/io/ShutdownSocketSet.h
folly/io/async/AsyncServerSocket.cpp
folly/io/async/AsyncServerSocket.h
folly/io/async/AsyncSocket.cpp
folly/io/async/AsyncSocket.h

index 09988e7a2593471f15fd019c164f5667d938055d..a64ae49ae5e9ed8595dbe115d78cbea45cbed4c2 100644 (file)
 #include <glog/logging.h>
 
 #include <folly/FileUtil.h>
+#include <folly/Singleton.h>
 #include <folly/portability/Sockets.h>
 
 namespace folly {
 
+namespace {
+struct PrivateTag {};
+folly::Singleton<folly::ShutdownSocketSet, PrivateTag> singleton;
+} // namespace
+
 ShutdownSocketSet::ShutdownSocketSet(int maxFd)
     : maxFd_(maxFd),
       data_(static_cast<std::atomic<uint8_t>*>(
           folly::checkedCalloc(size_t(maxFd), sizeof(std::atomic<uint8_t>)))),
       nullFile_("/dev/null", O_RDWR) {}
 
+std::shared_ptr<ShutdownSocketSet> ShutdownSocketSet::getInstance() {
+  return singleton.try_get();
+}
+
 void ShutdownSocketSet::add(int fd) {
   // Silently ignore any fds >= maxFd_, very unlikely
   DCHECK_GE(fd, 0);
index 7d0853c216e5e53204d22360b74f32e4a8330e50..9ba145db080b22db9080ec633392f14500e39b60 100644 (file)
@@ -39,6 +39,10 @@ class ShutdownSocketSet : private boost::noncopyable {
    */
   explicit ShutdownSocketSet(int maxFd = 1 << 18);
 
+  // Singleton instance used by all thrift servers.
+  // May return nullptr on startup/shutdown.
+  static std::shared_ptr<ShutdownSocketSet> getInstance();
+
   /**
    * Add an already open socket to the list of sockets managed by
    * ShutdownSocketSet. You MUST close the socket by calling
@@ -73,8 +77,24 @@ class ShutdownSocketSet : private boost::noncopyable {
   void shutdown(int fd, bool abortive=false);
 
   /**
-   * Shut down all sockets managed by ShutdownSocketSet. This is
-   * async-signal-safe and ignores errors.
+   * Immediate shutdown of all connections. This is a hard-hitting hammer;
+   * all reads and writes will return errors and no new connections will
+   * be accepted.
+   *
+   * To be used only in dire situations. We're using it from the failure
+   * signal handler to close all connections quickly, even though the server
+   * might take multiple seconds to finish crashing.
+   *
+   * The optional bool parameter indicates whether to set the active
+   * connections in to not linger.  The effect of that includes RST packets
+   * being immediately sent to clients which will result
+   * in errors (and not normal EOF) on the client side.  This also causes
+   * the local (ip, tcp port number) tuple to be reusable immediately, instead
+   * of having to wait the standard amount of time.  For full details see
+   * the `shutdown` method of `ShutdownSocketSet` (incl. notes about the
+   * `abortive` parameter).
+   *
+   * This is async-signal-safe and ignores errors.
    */
   void shutdownAll(bool abortive=false);
 
index c9f391f08640612a7a01972d4dcb4a066c12ea2d..72f29d0caef2daaae17bfade745a5fdee6f2413d 100644 (file)
@@ -146,37 +146,42 @@ class AsyncServerSocket::BackoffTimeout : public AsyncTimeout {
  */
 
 AsyncServerSocket::AsyncServerSocket(EventBase* eventBase)
-:   eventBase_(eventBase),
-    accepting_(false),
-    maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce),
-    maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue),
-    acceptRateAdjustSpeed_(0),
-    acceptRate_(1),
-    lastAccepTimestamp_(std::chrono::steady_clock::now()),
-    numDroppedConnections_(0),
-    callbackIndex_(0),
-    backoffTimeout_(nullptr),
-    callbacks_(),
-    keepAliveEnabled_(true),
-    closeOnExec_(true),
-    shutdownSocketSet_(nullptr) {
-}
-
-void AsyncServerSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
-  if (shutdownSocketSet_ == newSS) {
+    : eventBase_(eventBase),
+      accepting_(false),
+      maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce),
+      maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue),
+      acceptRateAdjustSpeed_(0),
+      acceptRate_(1),
+      lastAccepTimestamp_(std::chrono::steady_clock::now()),
+      numDroppedConnections_(0),
+      callbackIndex_(0),
+      backoffTimeout_(nullptr),
+      callbacks_(),
+      keepAliveEnabled_(true),
+      closeOnExec_(true) {}
+
+void AsyncServerSocket::setShutdownSocketSet(
+    const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
+  const auto newSS = wNewSS.lock();
+  const auto shutdownSocketSet = wShutdownSocketSet_.lock();
+
+  if (shutdownSocketSet == newSS) {
     return;
   }
-  if (shutdownSocketSet_) {
+
+  if (shutdownSocketSet) {
     for (auto& h : sockets_) {
-      shutdownSocketSet_->remove(h.socket_);
+      shutdownSocketSet->remove(h.socket_);
     }
   }
-  shutdownSocketSet_ = newSS;
-  if (shutdownSocketSet_) {
+
+  if (newSS) {
     for (auto& h : sockets_) {
-      shutdownSocketSet_->add(h.socket_);
+      newSS->add(h.socket_);
     }
   }
+
+  wShutdownSocketSet_ = wNewSS;
 }
 
 AsyncServerSocket::~AsyncServerSocket() {
@@ -203,8 +208,8 @@ int AsyncServerSocket::stopAccepting(int shutdownFlags) {
   for (; !sockets_.empty(); sockets_.pop_back()) {
     auto& handler = sockets_.back();
     handler.unregisterHandler();
-    if (shutdownSocketSet_) {
-      shutdownSocketSet_->close(handler.socket_);
+    if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+      shutdownSocketSet->close(handler.socket_);
     } else if (shutdownFlags >= 0) {
       result = shutdownNoInt(handler.socket_, shutdownFlags);
       pendingCloseSockets_.push_back(handler.socket_);
@@ -504,8 +509,9 @@ void AsyncServerSocket::bind(uint16_t port) {
         for (const auto& socket : sockets_) {
           if (socket.socket_ <= 0) {
             continue;
-          } else if (shutdownSocketSet_) {
-            shutdownSocketSet_->close(socket.socket_);
+          } else if (
+              const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+            shutdownSocketSet->close(socket.socket_);
           } else {
             closeNoInt(socket.socket_);
           }
@@ -793,8 +799,8 @@ void AsyncServerSocket::setupSocket(int fd, int family) {
   }
 #endif
 
-  if (shutdownSocketSet_) {
-    shutdownSocketSet_->add(fd);
+  if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+    shutdownSocketSet->add(fd);
   }
 }
 
index 6589c667e91b6a047d225cb17ea2784b857f86d7..0a07d546ec274433282a9f4d62f6214c4aac4048 100644 (file)
@@ -227,7 +227,7 @@ class AsyncServerSocket : public DelayedDestruction
                                                  Destructor());
   }
 
-  void setShutdownSocketSet(ShutdownSocketSet* newSS);
+  void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wNewSS);
 
   /**
    * Destroy the socket.
@@ -877,7 +877,7 @@ class AsyncServerSocket : public DelayedDestruction
   bool tfo_{false};
   bool noTransparentTls_{false};
   uint32_t tfoMaxQueueSize_{0};
-  ShutdownSocketSet* shutdownSocketSet_;
+  std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_;
   ConnectionEventCallback* connectionEventCallback_{nullptr};
 };
 
index 6d32ca61063d04dc456a2b296c1a410ddc3e2b5c..7347dc6890279d65b0cd9373ce9b4fe18579d2a9 100644 (file)
@@ -307,7 +307,7 @@ void AsyncSocket::init() {
   readCallback_ = nullptr;
   writeReqHead_ = nullptr;
   writeReqTail_ = nullptr;
-  shutdownSocketSet_ = nullptr;
+  wShutdownSocketSet_.reset();
   appBytesWritten_ = 0;
   appBytesReceived_ = 0;
   sendMsgParamCallback_ = &defaultSendMsgParamsCallback;
@@ -336,8 +336,8 @@ int AsyncSocket::detachFd() {
           << ", events=" << std::hex << eventFlags_ << ")";
   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
   // actually close the descriptor.
-  if (shutdownSocketSet_) {
-    shutdownSocketSet_->remove(fd_);
+  if (const auto socketSet = wShutdownSocketSet_.lock()) {
+    socketSet->remove(fd_);
   }
   int fd = fd_;
   fd_ = -1;
@@ -355,17 +355,24 @@ const folly::SocketAddress& AsyncSocket::anyAddress() {
   return anyAddress;
 }
 
-void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
-  if (shutdownSocketSet_ == newSS) {
+void AsyncSocket::setShutdownSocketSet(
+    const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
+  const auto newSS = wNewSS.lock();
+  const auto shutdownSocketSet = wShutdownSocketSet_.lock();
+
+  if (newSS == shutdownSocketSet) {
     return;
   }
-  if (shutdownSocketSet_ && fd_ != -1) {
-    shutdownSocketSet_->remove(fd_);
+
+  if (shutdownSocketSet && fd_ != -1) {
+    shutdownSocketSet->remove(fd_);
   }
-  shutdownSocketSet_ = newSS;
-  if (shutdownSocketSet_ && fd_ != -1) {
-    shutdownSocketSet_->add(fd_);
+
+  if (newSS && fd_ != -1) {
+    newSS->add(fd_);
   }
+
+  wShutdownSocketSet_ = wNewSS;
 }
 
 void AsyncSocket::setCloseOnExec() {
@@ -420,8 +427,8 @@ void AsyncSocket::connect(ConnectCallback* callback,
           withAddr("failed to create socket"),
           errnoCopy);
     }
-    if (shutdownSocketSet_) {
-      shutdownSocketSet_->add(fd_);
+    if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+      shutdownSocketSet->add(fd_);
     }
     ioHandler_.changeHandlerFD(fd_);
 
@@ -2685,8 +2692,8 @@ void AsyncSocket::invalidState(WriteCallback* callback) {
 
 void AsyncSocket::doClose() {
   if (fd_ == -1) return;
-  if (shutdownSocketSet_) {
-    shutdownSocketSet_->close(fd_);
+  if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+    shutdownSocketSet->close(fd_);
   } else {
     ::close(fd_);
   }
index e99300fb238a6ae491f453bdbc886b405037adda..a4c637bbb7e8220431311782e2749ac2f35ce365 100644 (file)
@@ -222,7 +222,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
    */
   explicit AsyncSocket(EventBase* evb);
 
-  void setShutdownSocketSet(ShutdownSocketSet* ss);
+  void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wSS);
 
   /**
    * Create a new AsyncSocket and begin the connection process.
@@ -1195,7 +1195,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
   ReadCallback* readCallback_;           ///< ReadCallback
   WriteRequest* writeReqHead_;           ///< Chain of WriteRequests
   WriteRequest* writeReqTail_;           ///< End of WriteRequest chain
-  ShutdownSocketSet* shutdownSocketSet_;
+  std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_;
   size_t appBytesReceived_;              ///< Num of bytes received from socket
   size_t appBytesWritten_;               ///< Num of bytes written to socket
   bool isBufferMovable_{false};