Copyright 2014->2015
[folly.git] / folly / io / async / AsyncServerSocket.cpp
index 7dce10c1c6d6155eaf33cddb96ef35a6b0a386c0..da84c68b5002c814d4e1720bf3b1296530e449f5 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2015 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 #include <folly/io/async/AsyncServerSocket.h>
 
+#include <folly/FileUtil.h>
+#include <folly/SocketAddress.h>
 #include <folly/io/async/EventBase.h>
 #include <folly/io/async/NotificationQueue.h>
-#include <folly/SocketAddress.h>
 
 #include <errno.h>
-#include <string.h>
-#include <unistd.h>
 #include <fcntl.h>
-#include <sys/types.h>
-#include <sys/socket.h>
 #include <netinet/tcp.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
 
 namespace folly {
 
@@ -185,10 +186,10 @@ int AsyncServerSocket::stopAccepting(int shutdownFlags) {
     if (shutdownSocketSet_) {
       shutdownSocketSet_->close(handler.socket_);
     } else if (shutdownFlags >= 0) {
-      result = ::shutdown(handler.socket_, shutdownFlags);
+      result = shutdownNoInt(handler.socket_, shutdownFlags);
       pendingCloseSockets_.push_back(handler.socket_);
     } else {
-      ::close(handler.socket_);
+      closeNoInt(handler.socket_);
     }
   }
   sockets_.clear();
@@ -216,8 +217,8 @@ int AsyncServerSocket::stopAccepting(int shutdownFlags) {
 
 void AsyncServerSocket::destroy() {
   stopAccepting();
-  for (auto s: pendingCloseSockets_) {
-    ::close(s);
+  for (auto s : pendingCloseSockets_) {
+    closeNoInt(s);
   }
   // Then call DelayedDestruction::destroy() to take care of
   // whether or not we need immediate or delayed destruction
@@ -273,6 +274,29 @@ void AsyncServerSocket::useExistingSocket(int fd) {
   useExistingSockets({fd});
 }
 
+void AsyncServerSocket::bindSocket(
+    int fd,
+    const SocketAddress& address,
+    bool isExistingSocket) {
+  sockaddr_storage addrStorage;
+  address.getAddress(&addrStorage);
+  sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
+  if (::bind(fd, saddr, address.getActualSize()) != 0) {
+    if (!isExistingSocket) {
+      closeNoInt(fd);
+    }
+    folly::throwSystemError(errno,
+        "failed to bind to async server socket: " +
+        address.describe());
+  }
+
+  // If we just created this socket, update the EventHandler and set socket_
+  if (!isExistingSocket) {
+    sockets_.push_back(
+      ServerEventHandler(eventBase_, fd, this, address.getFamily()));
+  }
+}
+
 void AsyncServerSocket::bind(const SocketAddress& address) {
   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
 
@@ -295,27 +319,29 @@ void AsyncServerSocket::bind(const SocketAddress& address) {
                               "Attempted to bind to multiple fds");
   }
 
-  // Bind to the socket
-  sockaddr_storage addrStorage;
-  address.getAddress(&addrStorage);
-  sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
-  if (::bind(fd, saddr, address.getActualSize()) != 0) {
-    if (sockets_.size() == 0) {
-      ::close(fd);
-    }
-    folly::throwSystemError(errno,
-                              "failed to bind to async server socket: " +
-                                address.describe());
+  bindSocket(fd, address, !sockets_.empty());
+}
+
+void AsyncServerSocket::bind(
+    const std::vector<IPAddress>& ipAddresses,
+    uint16_t port) {
+  if (ipAddresses.empty()) {
+    throw std::invalid_argument("No ip addresses were provided");
+  }
+  if (!sockets_.empty()) {
+    throw std::invalid_argument("Cannot call bind on a AsyncServerSocket "
+                                "that already has a socket.");
   }
 
-  // Record the address family that we are using,
-  // so we know how much address space we need to record accepted addresses.
+  for (const IPAddress& ipAddress : ipAddresses) {
+    SocketAddress address(ipAddress.toFullyQualified(), port);
+    int fd = createSocket(address.getFamily());
 
-  // If we just created this socket, update the EventHandler and set socket_
+    bindSocket(fd, address, false);
+  }
   if (sockets_.size() == 0) {
-    sockets_.push_back(
-      ServerEventHandler(eventBase_, fd, this, address.getFamily()));
-    sockets_[0].changeHandlerFD(fd);
+    throw std::runtime_error(
+        "did not bind any async server socket for port and addresses");
   }
 }
 
@@ -335,23 +361,20 @@ void AsyncServerSocket::bind(uint16_t port) {
                               "bad getaddrinfo");
   }
 
-  folly::ScopeGuard guard = folly::makeGuard([&]{
-      freeaddrinfo(res0);
-    });
-  DCHECK(&guard);
+  SCOPE_EXIT { freeaddrinfo(res0); };
 
-  for (res = res0; res; res = res->ai_next) {
+  auto setupAddress = [&] (struct addrinfo* res) {
     int s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
     // IPv6/IPv4 may not be supported by the kernel
     if (s < 0 && errno == EAFNOSUPPORT) {
-      continue;
+      return;
     }
-    CHECK(s);
+    CHECK_GE(s, 0);
 
     try {
       setupSocket(s);
     } catch (...) {
-      ::close(s);
+      closeNoInt(s);
       throw;
     }
 
@@ -373,7 +396,65 @@ void AsyncServerSocket::bind(uint16_t port) {
         errno,
         "failed to bind to async server socket for port");
     }
+  };
+
+  const int kNumTries = 5;
+  for (int tries = 1; true; tries++) {
+    // Prefer AF_INET6 addresses. RFC 3484 mandates that getaddrinfo
+    // should return IPv6 first and then IPv4 addresses, but glibc's
+    // getaddrinfo(nullptr) with AI_PASSIVE returns:
+    // - 0.0.0.0 (IPv4-only)
+    // - :: (IPv6+IPv4) in this order
+    // See: https://sourceware.org/bugzilla/show_bug.cgi?id=9981
+    for (res = res0; res; res = res->ai_next) {
+      if (res->ai_family == AF_INET6) {
+        setupAddress(res);
+      }
+    }
+
+    // If port == 0, then we should try to bind to the same port on ipv4 and
+    // ipv6.  So if we did bind to ipv6, figure out that port and use it,
+    // except for the last attempt when we just use any port available.
+    if (sockets_.size() == 1 && port == 0) {
+      SocketAddress address;
+      address.setFromLocalAddress(sockets_.back().socket_);
+      snprintf(sport, sizeof(sport), "%u", address.getPort());
+      freeaddrinfo(res0);
+      CHECK_EQ(0, getaddrinfo(nullptr, sport, &hints, &res0));
+    }
+
+    try {
+      for (res = res0; res; res = res->ai_next) {
+        if (res->ai_family != AF_INET6) {
+          setupAddress(res);
+        }
+      }
+    } catch (const std::system_error& e) {
+      // if we can't bind to the same port on ipv4 as ipv6 when using port=0
+      // then we will try again another 2 times before giving up.  We do this
+      // by closing the sockets that were opened, then redoing the whole thing
+      if (port == 0 && !sockets_.empty() && tries != kNumTries) {
+        for (const auto& socket : sockets_) {
+          if (socket.socket_ <= 0) {
+            continue;
+          } else if (shutdownSocketSet_) {
+            shutdownSocketSet_->close(socket.socket_);
+          } else {
+            closeNoInt(socket.socket_);
+          }
+        }
+        sockets_.clear();
+        snprintf(sport, sizeof(sport), "%u", port);
+        freeaddrinfo(res0);
+        CHECK_EQ(0, getaddrinfo(nullptr, sport, &hints, &res0));
+        continue;
+      }
+      throw;
+    }
+
+    break;
   }
+
   if (sockets_.size() == 0) {
     throw std::runtime_error(
         "did not bind any async server socket for port");
@@ -394,10 +475,10 @@ void AsyncServerSocket::listen(int backlog) {
 
 void AsyncServerSocket::getAddress(SocketAddress* addressReturn) const {
   CHECK(sockets_.size() >= 1);
-  if (sockets_.size() > 1) {
-    VLOG(2) << "Warning: getAddress can return multiple addresses, " <<
-      "but getAddress was called, so only returning the first";
-  }
+  VLOG_IF(2, sockets_.size() > 1)
+    << "Warning: getAddress() called and multiple addresses available ("
+    << sockets_.size() << "). Returning only the first one.";
+
   addressReturn->setFromLocalAddress(sockets_[0].socket_);
 }
 
@@ -549,7 +630,7 @@ int AsyncServerSocket::createSocket(int family) {
   try {
     setupSocket(fd);
   } catch (...) {
-    ::close(fd);
+    closeNoInt(fd);
     throw;
   }
   return fd;
@@ -574,8 +655,18 @@ void AsyncServerSocket::setupSocket(int fd) {
     LOG(ERROR) << "failed to set SO_REUSEADDR on async server socket " << errno;
   }
 
-  // Set keepalive as desired
+  // Set reuseport to support multiple accept threads
   int zero = 0;
+  if (reusePortEnabled_ &&
+      setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(int)) != 0) {
+    LOG(ERROR) << "failed to set SO_REUSEPORT on async server socket "
+               << strerror(errno);
+    folly::throwSystemError(errno,
+                            "failed to bind to async server socket: " +
+                            address.describe());
+  }
+
+  // Set keepalive as desired
   if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE,
                  (keepAliveEnabled_) ? &one : &zero, sizeof(int)) != 0) {
     LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: " <<
@@ -649,7 +740,7 @@ void AsyncServerSocket::handlerReady(
       } else if (rand() > acceptRate_ * RAND_MAX) {
         ++numDroppedConnections_;
         if (clientSocket >= 0) {
-          ::close(clientSocket);
+          closeNoInt(clientSocket);
         }
         continue;
       }
@@ -679,7 +770,7 @@ void AsyncServerSocket::handlerReady(
 #ifndef SOCK_NONBLOCK
     // Explicitly set the new connection to non-blocking mode
     if (fcntl(clientSocket, F_SETFL, O_NONBLOCK) != 0) {
-      ::close(clientSocket);
+      closeNoInt(clientSocket);
       dispatchError("failed to set accepted socket to non-blocking mode",
                     errno);
       return;
@@ -743,7 +834,7 @@ void AsyncServerSocket::dispatchSocket(int socket,
       // even accept new messages.
       LOG(ERROR) << "failed to dispatch newly accepted socket:"
                  << " all accept callback queues are full";
-      ::close(socket);
+      closeNoInt(socket);
       return;
     }
 
@@ -764,7 +855,8 @@ void AsyncServerSocket::dispatchError(const char *msgstr, int errnoValue) {
   while (true) {
     // Short circuit if the callback is in the primary EventBase thread
     if (info->eventBase == nullptr) {
-      std::runtime_error ex(msgstr + errnoValue);
+      std::runtime_error ex(
+        std::string(msgstr) +  folly::to<std::string>(errnoValue));
       info->callback->acceptError(ex);
       return;
     }