Add logs for TFO succeded
[folly.git] / folly / io / async / AsyncUDPSocket.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncUDPSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/Likely.h>
21 #include <folly/portability/Fcntl.h>
22 #include <folly/portability/Sockets.h>
23 #include <folly/portability/Unistd.h>
24
25 #include <errno.h>
26
27 // Due to the way kernel headers are included, this may or may not be defined.
28 // Number pulled from 3.10 kernel headers.
29 #ifndef SO_REUSEPORT
30 #define SO_REUSEPORT 15
31 #endif
32
33 namespace fsp = folly::portability::sockets;
34
35 namespace folly {
36
37 AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
38     : EventHandler(CHECK_NOTNULL(evb)),
39       eventBase_(evb),
40       fd_(-1),
41       readCallback_(nullptr) {
42   DCHECK(evb->isInEventBaseThread());
43 }
44
45 AsyncUDPSocket::~AsyncUDPSocket() {
46   if (fd_ != -1) {
47     close();
48   }
49 }
50
51 void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
52   int socket = fsp::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
53   if (socket == -1) {
54     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
55                               "error creating async udp socket",
56                               errno);
57   }
58
59   auto g = folly::makeGuard([&] { ::close(socket); });
60
61   // put the socket in non-blocking mode
62   int ret = fcntl(socket, F_SETFL, O_NONBLOCK);
63   if (ret != 0) {
64     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
65                               "failed to put socket in non-blocking mode",
66                               errno);
67   }
68
69   if (reuseAddr_) {
70     // put the socket in reuse mode
71     int value = 1;
72     if (setsockopt(socket,
73                   SOL_SOCKET,
74                   SO_REUSEADDR,
75                   &value,
76                   sizeof(value)) != 0) {
77       throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
78                                 "failed to put socket in reuse mode",
79                                 errno);
80     }
81   }
82
83   if (reusePort_) {
84     // put the socket in port reuse mode
85     int value = 1;
86     if (setsockopt(socket,
87                    SOL_SOCKET,
88                    SO_REUSEPORT,
89                    &value,
90                    sizeof(value)) != 0) {
91       ::close(socket);
92       throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
93                                 "failed to put socket in reuse_port mode",
94                                 errno);
95
96     }
97   }
98
99   // If we're using IPv6, make sure we don't accept V4-mapped connections
100   if (address.getFamily() == AF_INET6) {
101     int flag = 1;
102     if (setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, &flag, sizeof(flag))) {
103       throw AsyncSocketException(
104         AsyncSocketException::NOT_OPEN,
105         "Failed to set IPV6_V6ONLY",
106         errno);
107     }
108   }
109
110   // bind to the address
111   sockaddr_storage addrStorage;
112   address.getAddress(&addrStorage);
113   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
114   if (fsp::bind(socket, saddr, address.getActualSize()) != 0) {
115     throw AsyncSocketException(
116         AsyncSocketException::NOT_OPEN,
117         "failed to bind the async udp socket for:" + address.describe(),
118         errno);
119   }
120
121   // success
122   g.dismiss();
123   fd_ = socket;
124   ownership_ = FDOwnership::OWNS;
125
126   // attach to EventHandler
127   EventHandler::changeHandlerFD(fd_);
128
129   if (address.getPort() != 0) {
130     localAddress_ = address;
131   } else {
132     localAddress_.setFromLocalAddress(fd_);
133   }
134 }
135
136 void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
137   CHECK_EQ(-1, fd_) << "Already bound to another FD";
138
139   fd_ = fd;
140   ownership_ = ownership;
141
142   EventHandler::changeHandlerFD(fd_);
143   localAddress_.setFromLocalAddress(fd_);
144 }
145
146 ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
147                                const std::unique_ptr<folly::IOBuf>& buf) {
148   // UDP's typical MTU size is 1500, so high number of buffers
149   //   really do not make sense. Optimze for buffer chains with
150   //   buffers less than 16, which is the highest I can think of
151   //   for a real use case.
152   iovec vec[16];
153   size_t iovec_len = buf->fillIov(vec, sizeof(vec)/sizeof(vec[0]));
154   if (UNLIKELY(iovec_len == 0)) {
155     buf->coalesce();
156     vec[0].iov_base = const_cast<uint8_t*>(buf->data());
157     vec[0].iov_len = buf->length();
158     iovec_len = 1;
159   }
160
161   return writev(address, vec, iovec_len);
162 }
163
164 ssize_t AsyncUDPSocket::writev(const folly::SocketAddress& address,
165                                const struct iovec* vec, size_t iovec_len) {
166   CHECK_NE(-1, fd_) << "Socket not yet bound";
167
168   sockaddr_storage addrStorage;
169   address.getAddress(&addrStorage);
170
171   struct msghdr msg;
172   msg.msg_name = reinterpret_cast<void*>(&addrStorage);
173   msg.msg_namelen = address.getActualSize();
174   msg.msg_iov = const_cast<struct iovec*>(vec);
175   msg.msg_iovlen = iovec_len;
176   msg.msg_control = nullptr;
177   msg.msg_controllen = 0;
178   msg.msg_flags = 0;
179
180   return ::sendmsg(fd_, &msg, 0);
181 }
182
183 void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
184   CHECK(!readCallback_) << "Another read callback already installed";
185   CHECK_NE(-1, fd_) << "UDP server socket not yet bind to an address";
186
187   readCallback_ = CHECK_NOTNULL(cob);
188   if (!updateRegistration()) {
189     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
190                            "failed to register for accept events");
191
192     readCallback_ = nullptr;
193     cob->onReadError(ex);
194     return;
195   }
196 }
197
198 void AsyncUDPSocket::pauseRead() {
199   // It is ok to pause an already paused socket
200   readCallback_ = nullptr;
201   updateRegistration();
202 }
203
204 void AsyncUDPSocket::close() {
205   DCHECK(eventBase_->isInEventBaseThread());
206
207   if (readCallback_) {
208     auto cob = readCallback_;
209     readCallback_ = nullptr;
210
211     cob->onReadClosed();
212   }
213
214   // Unregister any events we are registered for
215   unregisterHandler();
216
217   if (fd_ != -1 && ownership_ == FDOwnership::OWNS) {
218     ::close(fd_);
219   }
220
221   fd_ = -1;
222 }
223
224 void AsyncUDPSocket::handlerReady(uint16_t events) noexcept {
225   if (events & EventHandler::READ) {
226     DCHECK(readCallback_);
227     handleRead();
228   }
229 }
230
231 void AsyncUDPSocket::handleRead() noexcept {
232   void* buf{nullptr};
233   size_t len{0};
234
235   readCallback_->getReadBuffer(&buf, &len);
236   if (buf == nullptr || len == 0) {
237     AsyncSocketException ex(
238         AsyncSocketException::BAD_ARGS,
239         "AsyncUDPSocket::getReadBuffer() returned empty buffer");
240
241
242     auto cob = readCallback_;
243     readCallback_ = nullptr;
244
245     cob->onReadError(ex);
246     updateRegistration();
247     return;
248   }
249
250   struct sockaddr_storage addrStorage;
251   socklen_t addrLen = sizeof(addrStorage);
252   memset(&addrStorage, 0, addrLen);
253   struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
254   rawAddr->sa_family = localAddress_.getFamily();
255
256   ssize_t bytesRead = recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
257   if (bytesRead >= 0) {
258     clientAddress_.setFromSockaddr(rawAddr, addrLen);
259
260     if (bytesRead > 0) {
261       bool truncated = false;
262       if ((size_t)bytesRead > len) {
263         truncated = true;
264         bytesRead = len;
265       }
266
267       readCallback_->onDataAvailable(clientAddress_, bytesRead, truncated);
268     }
269   } else {
270     if (errno == EAGAIN || errno == EWOULDBLOCK) {
271       // No data could be read without blocking the socket
272       return;
273     }
274
275     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
276                            "::recvfrom() failed",
277                            errno);
278
279     // In case of UDP we can continue reading from the socket
280     // even if the current request fails. We notify the user
281     // so that he can do some logging/stats collection if he wants.
282     auto cob = readCallback_;
283     readCallback_ = nullptr;
284
285     cob->onReadError(ex);
286     updateRegistration();
287   }
288 }
289
290 bool AsyncUDPSocket::updateRegistration() noexcept {
291   uint16_t flags = NONE;
292
293   if (readCallback_) {
294     flags |= READ;
295   }
296
297   return registerHandler(flags | PERSIST);
298 }
299
300 } // Namespace