2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/async/AsyncUDPSocket.h>
19 #include <folly/io/async/EventBase.h>
25 // Due to the way kernel headers are included, this may or may not be defined.
26 // Number pulled from 3.10 kernel headers.
28 #define SO_REUSEPORT 15
33 AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
34 : EventHandler(CHECK_NOTNULL(evb)),
37 readCallback_(nullptr) {
38 DCHECK(evb->isInEventBaseThread());
41 AsyncUDPSocket::~AsyncUDPSocket() {
47 void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
48 int socket = ::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
50 throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
51 "error creating async udp socket",
55 auto g = folly::makeGuard([&] { ::close(socket); });
57 // put the socket in non-blocking mode
58 int ret = fcntl(socket, F_SETFL, O_NONBLOCK);
60 throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
61 "failed to put socket in non-blocking mode",
65 // put the socket in reuse mode
67 if (setsockopt(socket,
71 sizeof(value)) != 0) {
72 throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
73 "failed to put socket in reuse mode",
78 // put the socket in port reuse mode
80 if (setsockopt(socket,
84 sizeof(value)) != 0) {
86 throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
87 "failed to put socket in reuse_port mode",
93 // bind to the address
94 sockaddr_storage addrStorage;
95 address.getAddress(&addrStorage);
96 sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
97 if (::bind(socket, saddr, address.getActualSize()) != 0) {
98 throw AsyncSocketException(
99 AsyncSocketException::NOT_OPEN,
100 "failed to bind the async udp socket for:" + address.describe(),
107 ownership_ = FDOwnership::OWNS;
109 // attach to EventHandler
110 EventHandler::changeHandlerFD(fd_);
112 if (address.getPort() != 0) {
113 localAddress_ = address;
115 localAddress_.setFromLocalAddress(fd_);
119 void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
120 CHECK_EQ(-1, fd_) << "Already bound to another FD";
123 ownership_ = ownership;
125 EventHandler::changeHandlerFD(fd_);
126 localAddress_.setFromLocalAddress(fd_);
129 ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
130 const std::unique_ptr<folly::IOBuf>& buf) {
131 CHECK_NE(-1, fd_) << "Socket not yet bound";
133 // XXX: Use `sendmsg` instead of coalescing here
136 sockaddr_storage addrStorage;
137 address.getAddress(&addrStorage);
138 sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
145 address.getActualSize());
148 void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
149 CHECK(!readCallback_) << "Another read callback already installed";
150 CHECK_NE(-1, fd_) << "UDP server socket not yet bind to an address";
152 readCallback_ = CHECK_NOTNULL(cob);
153 if (!updateRegistration()) {
154 AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
155 "failed to register for accept events");
157 readCallback_ = nullptr;
158 cob->onReadError(ex);
163 void AsyncUDPSocket::pauseRead() {
164 // It is ok to pause an already paused socket
165 readCallback_ = nullptr;
166 updateRegistration();
169 void AsyncUDPSocket::close() {
170 DCHECK(eventBase_->isInEventBaseThread());
173 auto cob = readCallback_;
174 readCallback_ = nullptr;
179 // Unregister any events we are registered for
182 if (fd_ != -1 && ownership_ == FDOwnership::OWNS) {
189 void AsyncUDPSocket::handlerReady(uint16_t events) noexcept {
190 if (events & EventHandler::READ) {
191 DCHECK(readCallback_);
196 void AsyncUDPSocket::handleRead() noexcept {
200 readCallback_->getReadBuffer(&buf, &len);
201 if (buf == nullptr || len == 0) {
202 AsyncSocketException ex(
203 AsyncSocketException::BAD_ARGS,
204 "AsyncUDPSocket::getReadBuffer() returned empty buffer");
207 auto cob = readCallback_;
208 readCallback_ = nullptr;
210 cob->onReadError(ex);
211 updateRegistration();
215 struct sockaddr_storage addrStorage;
216 socklen_t addrLen = sizeof(addrStorage);
217 memset(&addrStorage, 0, addrLen);
218 struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
219 rawAddr->sa_family = localAddress_.getFamily();
221 ssize_t bytesRead = ::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
222 if (bytesRead >= 0) {
223 clientAddress_.setFromSockaddr(rawAddr, addrLen);
226 bool truncated = false;
227 if ((size_t)bytesRead > len) {
232 readCallback_->onDataAvailable(clientAddress_, bytesRead, truncated);
235 if (errno == EAGAIN || errno == EWOULDBLOCK) {
236 // No data could be read without blocking the socket
240 AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
241 "::recvfrom() failed",
244 // In case of UDP we can continue reading from the socket
245 // even if the current request fails. We notify the user
246 // so that he can do some logging/stats collection if he wants.
247 auto cob = readCallback_;
248 readCallback_ = nullptr;
250 cob->onReadError(ex);
251 updateRegistration();
255 bool AsyncUDPSocket::updateRegistration() noexcept {
256 uint16_t flags = NONE;
262 return registerHandler(flags | PERSIST);