/*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <folly/io/async/AsyncUDPSocket.h>
#include <folly/io/async/EventBase.h>
+#include <folly/Likely.h>
+#include <folly/portability/Fcntl.h>
+#include <folly/portability/Sockets.h>
+#include <folly/portability/Unistd.h>
#include <errno.h>
-#include <unistd.h>
-#include <fcntl.h>
// Due to the way kernel headers are included, this may or may not be defined.
// Number pulled from 3.10 kernel headers.
#define SO_REUSEPORT 15
#endif
+namespace fsp = folly::portability::sockets;
+
namespace folly {
AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
eventBase_(evb),
fd_(-1),
readCallback_(nullptr) {
- DCHECK(evb->isInEventBaseThread());
+ evb->dcheckIsInEventBaseThread();
}
AsyncUDPSocket::~AsyncUDPSocket() {
}
void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
- int socket = ::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
+ int socket = fsp::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
if (socket == -1) {
throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
"error creating async udp socket",
errno);
}
- // put the socket in reuse mode
- int value = 1;
- if (setsockopt(socket,
- SOL_SOCKET,
- SO_REUSEADDR,
- &value,
- sizeof(value)) != 0) {
- throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
- "failed to put socket in reuse mode",
- errno);
+ if (reuseAddr_) {
+ // put the socket in reuse mode
+ int value = 1;
+ if (setsockopt(socket,
+ SOL_SOCKET,
+ SO_REUSEADDR,
+ &value,
+ sizeof(value)) != 0) {
+ throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
+ "failed to put socket in reuse mode",
+ errno);
+ }
}
if (reusePort_) {
SO_REUSEPORT,
&value,
sizeof(value)) != 0) {
- ::close(socket);
throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
"failed to put socket in reuse_port mode",
errno);
}
}
+ // If we're using IPv6, make sure we don't accept V4-mapped connections
+ if (address.getFamily() == AF_INET6) {
+ int flag = 1;
+ if (setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, &flag, sizeof(flag))) {
+ throw AsyncSocketException(
+ AsyncSocketException::NOT_OPEN,
+ "Failed to set IPV6_V6ONLY",
+ errno);
+ }
+ }
+
// bind to the address
sockaddr_storage addrStorage;
address.getAddress(&addrStorage);
sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
- if (::bind(socket, saddr, address.getActualSize()) != 0) {
+ if (fsp::bind(socket, saddr, address.getActualSize()) != 0) {
throw AsyncSocketException(
AsyncSocketException::NOT_OPEN,
"failed to bind the async udp socket for:" + address.describe(),
ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
const std::unique_ptr<folly::IOBuf>& buf) {
- CHECK_NE(-1, fd_) << "Socket not yet bound";
+ // UDP's typical MTU size is 1500, so high number of buffers
+ // really do not make sense. Optimze for buffer chains with
+ // buffers less than 16, which is the highest I can think of
+ // for a real use case.
+ iovec vec[16];
+ size_t iovec_len = buf->fillIov(vec, sizeof(vec)/sizeof(vec[0]));
+ if (UNLIKELY(iovec_len == 0)) {
+ buf->coalesce();
+ vec[0].iov_base = const_cast<uint8_t*>(buf->data());
+ vec[0].iov_len = buf->length();
+ iovec_len = 1;
+ }
- // XXX: Use `sendmsg` instead of coalescing here
- buf->coalesce();
+ return writev(address, vec, iovec_len);
+}
+
+ssize_t AsyncUDPSocket::writev(const folly::SocketAddress& address,
+ const struct iovec* vec, size_t iovec_len) {
+ CHECK_NE(-1, fd_) << "Socket not yet bound";
sockaddr_storage addrStorage;
address.getAddress(&addrStorage);
- sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
- return ::sendto(fd_,
- buf->data(),
- buf->length(),
- MSG_DONTWAIT,
- saddr,
- address.getActualSize());
+ struct msghdr msg;
+ msg.msg_name = reinterpret_cast<void*>(&addrStorage);
+ msg.msg_namelen = address.getActualSize();
+ msg.msg_iov = const_cast<struct iovec*>(vec);
+ msg.msg_iovlen = iovec_len;
+ msg.msg_control = nullptr;
+ msg.msg_controllen = 0;
+ msg.msg_flags = 0;
+
+ return ::sendmsg(fd_, &msg, 0);
}
void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
}
void AsyncUDPSocket::close() {
- DCHECK(eventBase_->isInEventBaseThread());
+ eventBase_->dcheckIsInEventBaseThread();
if (readCallback_) {
auto cob = readCallback_;
struct sockaddr_storage addrStorage;
socklen_t addrLen = sizeof(addrStorage);
- memset(&addrStorage, 0, addrLen);
+ memset(&addrStorage, 0, size_t(addrLen));
struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
rawAddr->sa_family = localAddress_.getFamily();
- ssize_t bytesRead = ::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
+ ssize_t bytesRead = recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
if (bytesRead >= 0) {
clientAddress_.setFromSockaddr(rawAddr, addrLen);
bool truncated = false;
if ((size_t)bytesRead > len) {
truncated = true;
- bytesRead = len;
+ bytesRead = ssize_t(len);
}
- readCallback_->onDataAvailable(clientAddress_, bytesRead, truncated);
+ readCallback_->onDataAvailable(
+ clientAddress_, size_t(bytesRead), truncated);
}
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
flags |= READ;
}
- return registerHandler(flags | PERSIST);
+ return registerHandler(uint16_t(flags | PERSIST));
}
} // Namespace