2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/async/AsyncUDPSocket.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/Likely.h>
26 // Due to the way kernel headers are included, this may or may not be defined.
27 // Number pulled from 3.10 kernel headers.
29 #define SO_REUSEPORT 15
34 AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
35 : EventHandler(CHECK_NOTNULL(evb)),
38 readCallback_(nullptr) {
39 DCHECK(evb->isInEventBaseThread());
42 AsyncUDPSocket::~AsyncUDPSocket() {
48 void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
49 int socket = ::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
51 throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
52 "error creating async udp socket",
56 auto g = folly::makeGuard([&] { ::close(socket); });
58 // put the socket in non-blocking mode
59 int ret = fcntl(socket, F_SETFL, O_NONBLOCK);
61 throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
62 "failed to put socket in non-blocking mode",
66 // put the socket in reuse mode
68 if (setsockopt(socket,
72 sizeof(value)) != 0) {
73 throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
74 "failed to put socket in reuse mode",
79 // put the socket in port reuse mode
81 if (setsockopt(socket,
85 sizeof(value)) != 0) {
87 throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
88 "failed to put socket in reuse_port mode",
94 // bind to the address
95 sockaddr_storage addrStorage;
96 address.getAddress(&addrStorage);
97 sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
98 if (::bind(socket, saddr, address.getActualSize()) != 0) {
99 throw AsyncSocketException(
100 AsyncSocketException::NOT_OPEN,
101 "failed to bind the async udp socket for:" + address.describe(),
108 ownership_ = FDOwnership::OWNS;
110 // attach to EventHandler
111 EventHandler::changeHandlerFD(fd_);
113 if (address.getPort() != 0) {
114 localAddress_ = address;
116 localAddress_.setFromLocalAddress(fd_);
120 void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
121 CHECK_EQ(-1, fd_) << "Already bound to another FD";
124 ownership_ = ownership;
126 EventHandler::changeHandlerFD(fd_);
127 localAddress_.setFromLocalAddress(fd_);
130 ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
131 const std::unique_ptr<folly::IOBuf>& buf) {
132 // UDP's typical MTU size is 1500, so high number of buffers
133 // really do not make sense. Optimze for buffer chains with
134 // buffers less than 16, which is the highest I can think of
135 // for a real use case.
137 size_t iovec_len = buf->fillIov(vec, sizeof(vec)/sizeof(vec[0]));
138 if (UNLIKELY(iovec_len == 0)) {
140 vec[0].iov_base = const_cast<uint8_t*>(buf->data());
141 vec[0].iov_len = buf->length();
145 return writev(address, vec, iovec_len);
148 ssize_t AsyncUDPSocket::writev(const folly::SocketAddress& address,
149 const struct iovec* vec, size_t iovec_len) {
150 CHECK_NE(-1, fd_) << "Socket not yet bound";
152 sockaddr_storage addrStorage;
153 address.getAddress(&addrStorage);
156 msg.msg_name = reinterpret_cast<void*>(&addrStorage);
157 msg.msg_namelen = address.getActualSize();
158 msg.msg_iov = const_cast<struct iovec*>(vec);
159 msg.msg_iovlen = iovec_len;
160 msg.msg_control = nullptr;
161 msg.msg_controllen = 0;
164 return ::sendmsg(fd_, &msg, 0);
167 void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
168 CHECK(!readCallback_) << "Another read callback already installed";
169 CHECK_NE(-1, fd_) << "UDP server socket not yet bind to an address";
171 readCallback_ = CHECK_NOTNULL(cob);
172 if (!updateRegistration()) {
173 AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
174 "failed to register for accept events");
176 readCallback_ = nullptr;
177 cob->onReadError(ex);
182 void AsyncUDPSocket::pauseRead() {
183 // It is ok to pause an already paused socket
184 readCallback_ = nullptr;
185 updateRegistration();
188 void AsyncUDPSocket::close() {
189 DCHECK(eventBase_->isInEventBaseThread());
192 auto cob = readCallback_;
193 readCallback_ = nullptr;
198 // Unregister any events we are registered for
201 if (fd_ != -1 && ownership_ == FDOwnership::OWNS) {
208 void AsyncUDPSocket::handlerReady(uint16_t events) noexcept {
209 if (events & EventHandler::READ) {
210 DCHECK(readCallback_);
215 void AsyncUDPSocket::handleRead() noexcept {
219 readCallback_->getReadBuffer(&buf, &len);
220 if (buf == nullptr || len == 0) {
221 AsyncSocketException ex(
222 AsyncSocketException::BAD_ARGS,
223 "AsyncUDPSocket::getReadBuffer() returned empty buffer");
226 auto cob = readCallback_;
227 readCallback_ = nullptr;
229 cob->onReadError(ex);
230 updateRegistration();
234 struct sockaddr_storage addrStorage;
235 socklen_t addrLen = sizeof(addrStorage);
236 memset(&addrStorage, 0, addrLen);
237 struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
238 rawAddr->sa_family = localAddress_.getFamily();
240 ssize_t bytesRead = ::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
241 if (bytesRead >= 0) {
242 clientAddress_.setFromSockaddr(rawAddr, addrLen);
245 bool truncated = false;
246 if ((size_t)bytesRead > len) {
251 readCallback_->onDataAvailable(clientAddress_, bytesRead, truncated);
254 if (errno == EAGAIN || errno == EWOULDBLOCK) {
255 // No data could be read without blocking the socket
259 AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
260 "::recvfrom() failed",
263 // In case of UDP we can continue reading from the socket
264 // even if the current request fails. We notify the user
265 // so that he can do some logging/stats collection if he wants.
266 auto cob = readCallback_;
267 readCallback_ = nullptr;
269 cob->onReadError(ex);
270 updateRegistration();
274 bool AsyncUDPSocket::updateRegistration() noexcept {
275 uint16_t flags = NONE;
281 return registerHandler(flags | PERSIST);