move shutdown socket set
authorDave Watson <davejwatson@fb.com>
Fri, 3 Oct 2014 19:05:37 +0000 (12:05 -0700)
committerdcsommer <dcsommer@fb.com>
Fri, 17 Oct 2014 18:44:29 +0000 (11:44 -0700)
Summary:
Move shutdownsocketset to folly, since it is a dep of the asyncsockets

Previoulsy tried moving it in to the server directly: D1583629, but had issues - close(fd) is called before the error callback, so we can't remove the fd before the close, which is essential to it working properly.

Just move it to folly instead.

Test Plan: fbconfig -r thrift/lib/cpp thrift/lib/cpp2; fbmake runtests

Reviewed By: dcsommer@fb.com

Subscribers: mshneer, trunkagent, fugalh, jsedgwick, doug, alandau, bmatheny, njormrod

FB internal diff: D1594950

folly/Makefile.am
folly/io/ShutdownSocketSet.cpp [new file with mode: 0644]
folly/io/ShutdownSocketSet.h [new file with mode: 0644]

index 24ed0e2b00dd1b1e3caec877dcf39e0d03e023aa..a2f4b72229d1a26ea1e0548a8ff0ff280d546b24 100644 (file)
@@ -113,6 +113,7 @@ nobase_follyinclude_HEADERS = \
        io/RecordIO.h \
        io/RecordIO-inl.h \
        io/TypedIOBuf.h \
+       io/ShutdownSocketSet.h \
        io/async/AsyncTimeout.h \
        io/async/DelayedDestruction.h \
        io/async/EventBase.h \
@@ -238,6 +239,7 @@ libfolly_la_SOURCES = \
        io/IOBuf.cpp \
        io/IOBufQueue.cpp \
        io/RecordIO.cpp \
+       io/ShutdownSocketSet.cpp \
        io/async/AsyncTimeout.cpp \
        io/async/EventBase.cpp \
        io/async/EventBaseManager.cpp \
diff --git a/folly/io/ShutdownSocketSet.cpp b/folly/io/ShutdownSocketSet.cpp
new file mode 100644 (file)
index 0000000..52125c2
--- /dev/null
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/io/ShutdownSocketSet.h>
+
+#include <sys/socket.h>
+#include <sys/types.h>
+
+#include <chrono>
+#include <thread>
+
+#include <glog/logging.h>
+
+#include <folly/FileUtil.h>
+#include <folly/Malloc.h>
+
+namespace folly {
+
+ShutdownSocketSet::ShutdownSocketSet(size_t maxFd)
+  : maxFd_(maxFd),
+    data_(static_cast<std::atomic<uint8_t>*>(
+            folly::checkedCalloc(maxFd, sizeof(std::atomic<uint8_t>)))),
+    nullFile_("/dev/null", O_RDWR) {
+}
+
+void ShutdownSocketSet::add(int fd) {
+  // Silently ignore any fds >= maxFd_, very unlikely
+  DCHECK_GE(fd, 0);
+  if (fd >= maxFd_) {
+    return;
+  }
+
+  auto& sref = data_[fd];
+  uint8_t prevState = FREE;
+  CHECK(sref.compare_exchange_strong(prevState,
+                                     IN_USE,
+                                     std::memory_order_acq_rel))
+    << "Invalid prev state for fd " << fd << ": " << int(prevState);
+}
+
+void ShutdownSocketSet::remove(int fd) {
+  DCHECK_GE(fd, 0);
+  if (fd >= maxFd_) {
+    return;
+  }
+
+  auto& sref = data_[fd];
+  uint8_t prevState = 0;
+
+retry_load:
+  prevState = sref.load(std::memory_order_relaxed);
+
+retry:
+  switch (prevState) {
+  case IN_SHUTDOWN:
+    std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    goto retry_load;
+  case FREE:
+    LOG(FATAL) << "Invalid prev state for fd " << fd << ": " << int(prevState);
+  }
+
+  if (!sref.compare_exchange_weak(prevState,
+                                  FREE,
+                                  std::memory_order_acq_rel)) {
+    goto retry;
+  }
+}
+
+int ShutdownSocketSet::close(int fd) {
+  DCHECK_GE(fd, 0);
+  if (fd >= maxFd_) {
+    return folly::closeNoInt(fd);
+  }
+
+  auto& sref = data_[fd];
+  uint8_t prevState = sref.load(std::memory_order_relaxed);
+  uint8_t newState = 0;
+
+retry:
+  switch (prevState) {
+  case IN_USE:
+  case SHUT_DOWN:
+    newState = FREE;
+    break;
+  case IN_SHUTDOWN:
+    newState = MUST_CLOSE;
+    break;
+  default:
+    LOG(FATAL) << "Invalid prev state for fd " << fd << ": " << int(prevState);
+  }
+
+  if (!sref.compare_exchange_weak(prevState,
+                                  newState,
+                                  std::memory_order_acq_rel)) {
+    goto retry;
+  }
+
+  return newState == FREE ? folly::closeNoInt(fd) : 0;
+}
+
+void ShutdownSocketSet::shutdown(int fd, bool abortive) {
+  DCHECK_GE(fd, 0);
+  if (fd >= maxFd_) {
+    doShutdown(fd, abortive);
+    return;
+  }
+
+  auto& sref = data_[fd];
+  uint8_t prevState = IN_USE;
+  if (!sref.compare_exchange_strong(prevState,
+                                    IN_SHUTDOWN,
+                                    std::memory_order_acq_rel)) {
+    return;
+  }
+
+  doShutdown(fd, abortive);
+
+  prevState = IN_SHUTDOWN;
+  if (sref.compare_exchange_strong(prevState,
+                                   SHUT_DOWN,
+                                   std::memory_order_acq_rel)) {
+    return;
+  }
+
+  CHECK_EQ(prevState, MUST_CLOSE)
+    << "Invalid prev state for fd " << fd << ": " << int(prevState);
+
+  folly::closeNoInt(fd);  // ignore errors, nothing to do
+
+  CHECK(sref.compare_exchange_strong(prevState,
+                                     FREE,
+                                     std::memory_order_acq_rel))
+    << "Invalid prev state for fd " << fd << ": " << int(prevState);
+}
+
+void ShutdownSocketSet::shutdownAll(bool abortive) {
+  for (size_t i = 0; i < maxFd_; ++i) {
+    auto& sref = data_[i];
+    if (sref.load(std::memory_order_acquire) == IN_USE) {
+      shutdown(i, abortive);
+    }
+  }
+}
+
+void ShutdownSocketSet::doShutdown(int fd, bool abortive) {
+  // shutdown() the socket first, to awaken any threads blocked on the fd
+  // (subsequent IO will fail because it's been shutdown); close()ing the
+  // socket does not wake up blockers, see
+  // http://stackoverflow.com/a/3624545/1736339
+  folly::shutdownNoInt(fd, SHUT_RDWR);
+
+  // If abortive shutdown is desired, we'll set the SO_LINGER option on
+  // the socket with a timeout of 0; this will cause RST to be sent on
+  // close.
+  if (abortive) {
+    struct linger l = {1, 0};
+    if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, sizeof(l)) != 0) {
+      // Probably not a socket, ignore.
+      return;
+    }
+  }
+
+  // We can't close() the socket, as that would be dangerous; a new file
+  // could be opened and get the same file descriptor, and then code assuming
+  // the old fd would do IO in the wrong place. We'll (atomically) dup2
+  // /dev/null onto the fd instead.
+  folly::dup2NoInt(nullFile_.fd(), fd);
+}
+
+}  // namespaces
diff --git a/folly/io/ShutdownSocketSet.h b/folly/io/ShutdownSocketSet.h
new file mode 100644 (file)
index 0000000..a395e13
--- /dev/null
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <cstdlib>
+#include <memory>
+
+#include <boost/noncopyable.hpp>
+
+#include <folly/File.h>
+
+namespace folly {
+
+/**
+ * Set of sockets that allows immediate, take-no-prisoners abort.
+ */
+class ShutdownSocketSet : private boost::noncopyable {
+ public:
+  /**
+   * Create a socket set that can handle file descriptors < maxFd.
+   * The default value (256Ki) is high enough for just about all
+   * applications, even if you increased the number of file descriptors
+   * on your system.
+   */
+  explicit ShutdownSocketSet(size_t maxFd = 1 << 18);
+
+  /**
+   * Add an already open socket to the list of sockets managed by
+   * ShutdownSocketSet. You MUST close the socket by calling
+   * ShutdownSocketSet::close (which will, as a side effect, also handle EINTR
+   * properly) and not by calling close() on the file descriptor.
+   */
+  void add(int fd);
+
+  /**
+   * Remove a socket from the list of sockets managed by ShutdownSocketSet.
+   * Note that remove() might block! (which we lamely implement by
+   * sleep()-ing) in the extremely rare case that the fd is currently
+   * being shutdown().
+   */
+  void remove(int fd);
+
+  /**
+   * Close a socket managed by ShutdownSocketSet. Returns the same return code
+   * as ::close() (and sets errno accordingly).
+   */
+  int close(int fd);
+
+  /**
+   * Shut down a socket. If abortive is true, we perform an abortive
+   * shutdown (send RST rather than FIN). Note that we might still end up
+   * sending FIN due to the rather interesting implementation.
+   *
+   * This is async-signal-safe and ignores errors.  Obviously, subsequent
+   * read() and write() operations to the socket will fail. During normal
+   * operation, just call ::shutdown() on the socket.
+   */
+  void shutdown(int fd, bool abortive=false);
+
+  /**
+   * Shut down all sockets managed by ShutdownSocketSet. This is
+   * async-signal-safe and ignores errors.
+   */
+  void shutdownAll(bool abortive=false);
+
+ private:
+  void doShutdown(int fd, bool abortive);
+
+  // State transitions:
+  // add():
+  //   FREE -> IN_USE
+  //
+  // close():
+  //   IN_USE -> (::close()) -> FREE
+  //   SHUT_DOWN -> (::close()) -> FREE
+  //   IN_SHUTDOWN -> MUST_CLOSE
+  //   (If the socket is currently being shut down, we must defer the
+  //    ::close() until the shutdown completes)
+  //
+  // shutdown():
+  //   IN_USE -> IN_SHUTDOWN
+  //   (::shutdown())
+  //   IN_SHUTDOWN -> SHUT_DOWN
+  //   MUST_CLOSE -> (::close()) -> FREE
+  enum State : uint8_t {
+    FREE = 0,
+    IN_USE,
+    IN_SHUTDOWN,
+    SHUT_DOWN,
+    MUST_CLOSE
+  };
+
+  struct Free {
+    template <class T>
+    void operator()(T* ptr) const {
+      ::free(ptr);
+    }
+  };
+
+  const size_t maxFd_;
+  std::unique_ptr<std::atomic<uint8_t>[], Free> data_;
+  folly::File nullFile_;
+};
+
+}  // namespaces