fix flaky ConnectTFOTimeout and ConnectTFOFallbackTimeout tests
[folly.git] / folly / io / async / AsyncUDPSocket.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncUDPSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/Likely.h>
21 #include <folly/portability/Fcntl.h>
22 #include <folly/portability/Sockets.h>
23 #include <folly/portability/Unistd.h>
24
25 #include <errno.h>
26
27 // Due to the way kernel headers are included, this may or may not be defined.
28 // Number pulled from 3.10 kernel headers.
29 #ifndef SO_REUSEPORT
30 #define SO_REUSEPORT 15
31 #endif
32
33 namespace folly {
34
35 AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
36     : EventHandler(CHECK_NOTNULL(evb)),
37       eventBase_(evb),
38       fd_(-1),
39       readCallback_(nullptr) {
40   DCHECK(evb->isInEventBaseThread());
41 }
42
43 AsyncUDPSocket::~AsyncUDPSocket() {
44   if (fd_ != -1) {
45     close();
46   }
47 }
48
49 void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
50   int socket = ::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
51   if (socket == -1) {
52     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
53                               "error creating async udp socket",
54                               errno);
55   }
56
57   auto g = folly::makeGuard([&] { ::close(socket); });
58
59   // put the socket in non-blocking mode
60   int ret = fcntl(socket, F_SETFL, O_NONBLOCK);
61   if (ret != 0) {
62     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
63                               "failed to put socket in non-blocking mode",
64                               errno);
65   }
66
67   if (reuseAddr_) {
68     // put the socket in reuse mode
69     int value = 1;
70     if (setsockopt(socket,
71                   SOL_SOCKET,
72                   SO_REUSEADDR,
73                   &value,
74                   sizeof(value)) != 0) {
75       throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
76                                 "failed to put socket in reuse mode",
77                                 errno);
78     }
79   }
80
81   if (reusePort_) {
82     // put the socket in port reuse mode
83     int value = 1;
84     if (setsockopt(socket,
85                    SOL_SOCKET,
86                    SO_REUSEPORT,
87                    &value,
88                    sizeof(value)) != 0) {
89       ::close(socket);
90       throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
91                                 "failed to put socket in reuse_port mode",
92                                 errno);
93
94     }
95   }
96
97   // If we're using IPv6, make sure we don't accept V4-mapped connections
98   if (address.getFamily() == AF_INET6) {
99     int flag = 1;
100     if (::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY,
101                      &flag, sizeof(flag))) {
102       throw AsyncSocketException(
103         AsyncSocketException::NOT_OPEN,
104         "Failed to set IPV6_V6ONLY",
105         errno);
106     }
107   }
108
109   // bind to the address
110   sockaddr_storage addrStorage;
111   address.getAddress(&addrStorage);
112   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
113   if (::bind(socket, saddr, address.getActualSize()) != 0) {
114     throw AsyncSocketException(
115         AsyncSocketException::NOT_OPEN,
116         "failed to bind the async udp socket for:" + address.describe(),
117         errno);
118   }
119
120   // success
121   g.dismiss();
122   fd_ = socket;
123   ownership_ = FDOwnership::OWNS;
124
125   // attach to EventHandler
126   EventHandler::changeHandlerFD(fd_);
127
128   if (address.getPort() != 0) {
129     localAddress_ = address;
130   } else {
131     localAddress_.setFromLocalAddress(fd_);
132   }
133 }
134
135 void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
136   CHECK_EQ(-1, fd_) << "Already bound to another FD";
137
138   fd_ = fd;
139   ownership_ = ownership;
140
141   EventHandler::changeHandlerFD(fd_);
142   localAddress_.setFromLocalAddress(fd_);
143 }
144
145 ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
146                                const std::unique_ptr<folly::IOBuf>& buf) {
147   // UDP's typical MTU size is 1500, so high number of buffers
148   //   really do not make sense. Optimze for buffer chains with
149   //   buffers less than 16, which is the highest I can think of
150   //   for a real use case.
151   iovec vec[16];
152   size_t iovec_len = buf->fillIov(vec, sizeof(vec)/sizeof(vec[0]));
153   if (UNLIKELY(iovec_len == 0)) {
154     buf->coalesce();
155     vec[0].iov_base = const_cast<uint8_t*>(buf->data());
156     vec[0].iov_len = buf->length();
157     iovec_len = 1;
158   }
159
160   return writev(address, vec, iovec_len);
161 }
162
163 ssize_t AsyncUDPSocket::writev(const folly::SocketAddress& address,
164                                const struct iovec* vec, size_t iovec_len) {
165   CHECK_NE(-1, fd_) << "Socket not yet bound";
166
167   sockaddr_storage addrStorage;
168   address.getAddress(&addrStorage);
169
170   struct msghdr msg;
171   msg.msg_name = reinterpret_cast<void*>(&addrStorage);
172   msg.msg_namelen = address.getActualSize();
173   msg.msg_iov = const_cast<struct iovec*>(vec);
174   msg.msg_iovlen = iovec_len;
175   msg.msg_control = nullptr;
176   msg.msg_controllen = 0;
177   msg.msg_flags = 0;
178
179   return ::sendmsg(fd_, &msg, 0);
180 }
181
182 void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
183   CHECK(!readCallback_) << "Another read callback already installed";
184   CHECK_NE(-1, fd_) << "UDP server socket not yet bind to an address";
185
186   readCallback_ = CHECK_NOTNULL(cob);
187   if (!updateRegistration()) {
188     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
189                            "failed to register for accept events");
190
191     readCallback_ = nullptr;
192     cob->onReadError(ex);
193     return;
194   }
195 }
196
197 void AsyncUDPSocket::pauseRead() {
198   // It is ok to pause an already paused socket
199   readCallback_ = nullptr;
200   updateRegistration();
201 }
202
203 void AsyncUDPSocket::close() {
204   DCHECK(eventBase_->isInEventBaseThread());
205
206   if (readCallback_) {
207     auto cob = readCallback_;
208     readCallback_ = nullptr;
209
210     cob->onReadClosed();
211   }
212
213   // Unregister any events we are registered for
214   unregisterHandler();
215
216   if (fd_ != -1 && ownership_ == FDOwnership::OWNS) {
217     ::close(fd_);
218   }
219
220   fd_ = -1;
221 }
222
223 void AsyncUDPSocket::handlerReady(uint16_t events) noexcept {
224   if (events & EventHandler::READ) {
225     DCHECK(readCallback_);
226     handleRead();
227   }
228 }
229
230 void AsyncUDPSocket::handleRead() noexcept {
231   void* buf{nullptr};
232   size_t len{0};
233
234   readCallback_->getReadBuffer(&buf, &len);
235   if (buf == nullptr || len == 0) {
236     AsyncSocketException ex(
237         AsyncSocketException::BAD_ARGS,
238         "AsyncUDPSocket::getReadBuffer() returned empty buffer");
239
240
241     auto cob = readCallback_;
242     readCallback_ = nullptr;
243
244     cob->onReadError(ex);
245     updateRegistration();
246     return;
247   }
248
249   struct sockaddr_storage addrStorage;
250   socklen_t addrLen = sizeof(addrStorage);
251   memset(&addrStorage, 0, addrLen);
252   struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
253   rawAddr->sa_family = localAddress_.getFamily();
254
255   ssize_t bytesRead = ::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
256   if (bytesRead >= 0) {
257     clientAddress_.setFromSockaddr(rawAddr, addrLen);
258
259     if (bytesRead > 0) {
260       bool truncated = false;
261       if ((size_t)bytesRead > len) {
262         truncated = true;
263         bytesRead = len;
264       }
265
266       readCallback_->onDataAvailable(clientAddress_, bytesRead, truncated);
267     }
268   } else {
269     if (errno == EAGAIN || errno == EWOULDBLOCK) {
270       // No data could be read without blocking the socket
271       return;
272     }
273
274     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
275                            "::recvfrom() failed",
276                            errno);
277
278     // In case of UDP we can continue reading from the socket
279     // even if the current request fails. We notify the user
280     // so that he can do some logging/stats collection if he wants.
281     auto cob = readCallback_;
282     readCallback_ = nullptr;
283
284     cob->onReadError(ex);
285     updateRegistration();
286   }
287 }
288
289 bool AsyncUDPSocket::updateRegistration() noexcept {
290   uint16_t flags = NONE;
291
292   if (readCallback_) {
293     flags |= READ;
294   }
295
296   return registerHandler(flags | PERSIST);
297 }
298
299 } // Namespace