Fix copyright lines
[folly.git] / folly / io / async / AsyncUDPSocket.cpp
index 979c2b75702eac9c939240a6726400b24d7e6a54..5ed0204de845cf5b6017d0729e241f9a6222a3a8 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2014-present Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 #include <folly/io/async/AsyncUDPSocket.h>
 
+#include <folly/Likely.h>
 #include <folly/io/async/EventBase.h>
+#include <folly/portability/Fcntl.h>
+#include <folly/portability/Sockets.h>
+#include <folly/portability/Unistd.h>
 
 #include <errno.h>
-#include <unistd.h>
-#include <fcntl.h>
+
+// Due to the way kernel headers are included, this may or may not be defined.
+// Number pulled from 3.10 kernel headers.
+#ifndef SO_REUSEPORT
+#define SO_REUSEPORT 15
+#endif
+
+namespace fsp = folly::portability::sockets;
 
 namespace folly {
 
 AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
     : EventHandler(CHECK_NOTNULL(evb)),
+      readCallback_(nullptr),
       eventBase_(evb),
-      fd_(-1),
-      readCallback_(nullptr) {
-  DCHECK(evb->isInEventBaseThread());
+      fd_(-1) {
+  evb->dcheckIsInEventBaseThread();
 }
 
 AsyncUDPSocket::~AsyncUDPSocket() {
@@ -39,7 +49,7 @@ AsyncUDPSocket::~AsyncUDPSocket() {
 }
 
 void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
-  int socket = ::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
+  int socket = fsp::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
   if (socket == -1) {
     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
                               "error creating async udp socket",
@@ -56,23 +66,51 @@ void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
                               errno);
   }
 
-  // put the socket in reuse mode
-  int value = 1;
-  if (setsockopt(socket,
-                 SOL_SOCKET,
-                 SO_REUSEADDR,
-                 &value,
-                 sizeof(value)) != 0) {
-    throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
-                              "failed to put socket in reuse mode",
-                              errno);
+  if (reuseAddr_) {
+    // put the socket in reuse mode
+    int value = 1;
+    if (setsockopt(socket,
+                  SOL_SOCKET,
+                  SO_REUSEADDR,
+                  &value,
+                  sizeof(value)) != 0) {
+      throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
+                                "failed to put socket in reuse mode",
+                                errno);
+    }
+  }
+
+  if (reusePort_) {
+    // put the socket in port reuse mode
+    int value = 1;
+    if (setsockopt(socket,
+                   SOL_SOCKET,
+                   SO_REUSEPORT,
+                   &value,
+                   sizeof(value)) != 0) {
+      throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
+                                "failed to put socket in reuse_port mode",
+                                errno);
+
+    }
+  }
+
+  // If we're using IPv6, make sure we don't accept V4-mapped connections
+  if (address.getFamily() == AF_INET6) {
+    int flag = 1;
+    if (setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, &flag, sizeof(flag))) {
+      throw AsyncSocketException(
+        AsyncSocketException::NOT_OPEN,
+        "Failed to set IPV6_V6ONLY",
+        errno);
+    }
   }
 
   // bind to the address
   sockaddr_storage addrStorage;
   address.getAddress(&addrStorage);
   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
-  if (::bind(socket, saddr, address.getActualSize()) != 0) {
+  if (fsp::bind(socket, saddr, address.getActualSize()) != 0) {
     throw AsyncSocketException(
         AsyncSocketException::NOT_OPEN,
         "failed to bind the async udp socket for:" + address.describe(),
@@ -106,21 +144,39 @@ void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
 
 ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
                                const std::unique_ptr<folly::IOBuf>& buf) {
-  CHECK_NE(-1, fd_) << "Socket not yet bound";
+  // UDP's typical MTU size is 1500, so high number of buffers
+  //   really do not make sense. Optimze for buffer chains with
+  //   buffers less than 16, which is the highest I can think of
+  //   for a real use case.
+  iovec vec[16];
+  size_t iovec_len = buf->fillIov(vec, sizeof(vec)/sizeof(vec[0]));
+  if (UNLIKELY(iovec_len == 0)) {
+    buf->coalesce();
+    vec[0].iov_base = const_cast<uint8_t*>(buf->data());
+    vec[0].iov_len = buf->length();
+    iovec_len = 1;
+  }
 
-  // XXX: Use `sendmsg` instead of coalescing here
-  buf->coalesce();
+  return writev(address, vec, iovec_len);
+}
+
+ssize_t AsyncUDPSocket::writev(const folly::SocketAddress& address,
+                               const struct iovec* vec, size_t iovec_len) {
+  CHECK_NE(-1, fd_) << "Socket not yet bound";
 
   sockaddr_storage addrStorage;
   address.getAddress(&addrStorage);
-  sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
 
-  return ::sendto(fd_,
-                  buf->data(),
-                  buf->length(),
-                  MSG_DONTWAIT,
-                  saddr,
-                  address.getActualSize());
+  struct msghdr msg;
+  msg.msg_name = reinterpret_cast<void*>(&addrStorage);
+  msg.msg_namelen = address.getActualSize();
+  msg.msg_iov = const_cast<struct iovec*>(vec);
+  msg.msg_iovlen = iovec_len;
+  msg.msg_control = nullptr;
+  msg.msg_controllen = 0;
+  msg.msg_flags = 0;
+
+  return sendmsg(fd_, &msg, 0);
 }
 
 void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
@@ -145,7 +201,7 @@ void AsyncUDPSocket::pauseRead() {
 }
 
 void AsyncUDPSocket::close() {
-  DCHECK(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   if (readCallback_) {
     auto cob = readCallback_;
@@ -192,11 +248,11 @@ void AsyncUDPSocket::handleRead() noexcept {
 
   struct sockaddr_storage addrStorage;
   socklen_t addrLen = sizeof(addrStorage);
-  memset(&addrStorage, 0, addrLen);
+  memset(&addrStorage, 0, size_t(addrLen));
   struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
   rawAddr->sa_family = localAddress_.getFamily();
 
-  ssize_t bytesRead = ::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
+  ssize_t bytesRead = recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
   if (bytesRead >= 0) {
     clientAddress_.setFromSockaddr(rawAddr, addrLen);
 
@@ -204,10 +260,11 @@ void AsyncUDPSocket::handleRead() noexcept {
       bool truncated = false;
       if ((size_t)bytesRead > len) {
         truncated = true;
-        bytesRead = len;
+        bytesRead = ssize_t(len);
       }
 
-      readCallback_->onDataAvailable(clientAddress_, bytesRead, truncated);
+      readCallback_->onDataAvailable(
+          clientAddress_, size_t(bytesRead), truncated);
     }
   } else {
     if (errno == EAGAIN || errno == EWOULDBLOCK) {
@@ -237,7 +294,7 @@ bool AsyncUDPSocket::updateRegistration() noexcept {
     flags |= READ;
   }
 
-  return registerHandler(flags | PERSIST);
+  return registerHandler(uint16_t(flags | PERSIST));
 }
 
-} // Namespace
+} // namespace folly