Include <folly/portability/Fcntl.h> where needed
[folly.git] / folly / io / async / AsyncUDPSocket.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncUDPSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/Likely.h>
21 #include <folly/portability/Fcntl.h>
22
23 #include <errno.h>
24 #include <unistd.h>
25
26 // Due to the way kernel headers are included, this may or may not be defined.
27 // Number pulled from 3.10 kernel headers.
28 #ifndef SO_REUSEPORT
29 #define SO_REUSEPORT 15
30 #endif
31
32 namespace folly {
33
34 AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
35     : EventHandler(CHECK_NOTNULL(evb)),
36       eventBase_(evb),
37       fd_(-1),
38       readCallback_(nullptr) {
39   DCHECK(evb->isInEventBaseThread());
40 }
41
42 AsyncUDPSocket::~AsyncUDPSocket() {
43   if (fd_ != -1) {
44     close();
45   }
46 }
47
48 void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
49   int socket = ::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
50   if (socket == -1) {
51     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
52                               "error creating async udp socket",
53                               errno);
54   }
55
56   auto g = folly::makeGuard([&] { ::close(socket); });
57
58   // put the socket in non-blocking mode
59   int ret = fcntl(socket, F_SETFL, O_NONBLOCK);
60   if (ret != 0) {
61     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
62                               "failed to put socket in non-blocking mode",
63                               errno);
64   }
65
66   if (reuseAddr_) {
67     // put the socket in reuse mode
68     int value = 1;
69     if (setsockopt(socket,
70                   SOL_SOCKET,
71                   SO_REUSEADDR,
72                   &value,
73                   sizeof(value)) != 0) {
74       throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
75                                 "failed to put socket in reuse mode",
76                                 errno);
77     }
78   }
79
80   if (reusePort_) {
81     // put the socket in port reuse mode
82     int value = 1;
83     if (setsockopt(socket,
84                    SOL_SOCKET,
85                    SO_REUSEPORT,
86                    &value,
87                    sizeof(value)) != 0) {
88       ::close(socket);
89       throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
90                                 "failed to put socket in reuse_port mode",
91                                 errno);
92
93     }
94   }
95
96   // If we're using IPv6, make sure we don't accept V4-mapped connections
97   if (address.getFamily() == AF_INET6) {
98     int flag = 1;
99     if (::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY,
100                      &flag, sizeof(flag))) {
101       throw AsyncSocketException(
102         AsyncSocketException::NOT_OPEN,
103         "Failed to set IPV6_V6ONLY",
104         errno);
105     }
106   }
107
108   // bind to the address
109   sockaddr_storage addrStorage;
110   address.getAddress(&addrStorage);
111   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
112   if (::bind(socket, saddr, address.getActualSize()) != 0) {
113     throw AsyncSocketException(
114         AsyncSocketException::NOT_OPEN,
115         "failed to bind the async udp socket for:" + address.describe(),
116         errno);
117   }
118
119   // success
120   g.dismiss();
121   fd_ = socket;
122   ownership_ = FDOwnership::OWNS;
123
124   // attach to EventHandler
125   EventHandler::changeHandlerFD(fd_);
126
127   if (address.getPort() != 0) {
128     localAddress_ = address;
129   } else {
130     localAddress_.setFromLocalAddress(fd_);
131   }
132 }
133
134 void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
135   CHECK_EQ(-1, fd_) << "Already bound to another FD";
136
137   fd_ = fd;
138   ownership_ = ownership;
139
140   EventHandler::changeHandlerFD(fd_);
141   localAddress_.setFromLocalAddress(fd_);
142 }
143
144 ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
145                                const std::unique_ptr<folly::IOBuf>& buf) {
146   // UDP's typical MTU size is 1500, so high number of buffers
147   //   really do not make sense. Optimze for buffer chains with
148   //   buffers less than 16, which is the highest I can think of
149   //   for a real use case.
150   iovec vec[16];
151   size_t iovec_len = buf->fillIov(vec, sizeof(vec)/sizeof(vec[0]));
152   if (UNLIKELY(iovec_len == 0)) {
153     buf->coalesce();
154     vec[0].iov_base = const_cast<uint8_t*>(buf->data());
155     vec[0].iov_len = buf->length();
156     iovec_len = 1;
157   }
158
159   return writev(address, vec, iovec_len);
160 }
161
162 ssize_t AsyncUDPSocket::writev(const folly::SocketAddress& address,
163                                const struct iovec* vec, size_t iovec_len) {
164   CHECK_NE(-1, fd_) << "Socket not yet bound";
165
166   sockaddr_storage addrStorage;
167   address.getAddress(&addrStorage);
168
169   struct msghdr msg;
170   msg.msg_name = reinterpret_cast<void*>(&addrStorage);
171   msg.msg_namelen = address.getActualSize();
172   msg.msg_iov = const_cast<struct iovec*>(vec);
173   msg.msg_iovlen = iovec_len;
174   msg.msg_control = nullptr;
175   msg.msg_controllen = 0;
176   msg.msg_flags = 0;
177
178   return ::sendmsg(fd_, &msg, 0);
179 }
180
181 void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
182   CHECK(!readCallback_) << "Another read callback already installed";
183   CHECK_NE(-1, fd_) << "UDP server socket not yet bind to an address";
184
185   readCallback_ = CHECK_NOTNULL(cob);
186   if (!updateRegistration()) {
187     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
188                            "failed to register for accept events");
189
190     readCallback_ = nullptr;
191     cob->onReadError(ex);
192     return;
193   }
194 }
195
196 void AsyncUDPSocket::pauseRead() {
197   // It is ok to pause an already paused socket
198   readCallback_ = nullptr;
199   updateRegistration();
200 }
201
202 void AsyncUDPSocket::close() {
203   DCHECK(eventBase_->isInEventBaseThread());
204
205   if (readCallback_) {
206     auto cob = readCallback_;
207     readCallback_ = nullptr;
208
209     cob->onReadClosed();
210   }
211
212   // Unregister any events we are registered for
213   unregisterHandler();
214
215   if (fd_ != -1 && ownership_ == FDOwnership::OWNS) {
216     ::close(fd_);
217   }
218
219   fd_ = -1;
220 }
221
222 void AsyncUDPSocket::handlerReady(uint16_t events) noexcept {
223   if (events & EventHandler::READ) {
224     DCHECK(readCallback_);
225     handleRead();
226   }
227 }
228
229 void AsyncUDPSocket::handleRead() noexcept {
230   void* buf{nullptr};
231   size_t len{0};
232
233   readCallback_->getReadBuffer(&buf, &len);
234   if (buf == nullptr || len == 0) {
235     AsyncSocketException ex(
236         AsyncSocketException::BAD_ARGS,
237         "AsyncUDPSocket::getReadBuffer() returned empty buffer");
238
239
240     auto cob = readCallback_;
241     readCallback_ = nullptr;
242
243     cob->onReadError(ex);
244     updateRegistration();
245     return;
246   }
247
248   struct sockaddr_storage addrStorage;
249   socklen_t addrLen = sizeof(addrStorage);
250   memset(&addrStorage, 0, addrLen);
251   struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
252   rawAddr->sa_family = localAddress_.getFamily();
253
254   ssize_t bytesRead = ::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
255   if (bytesRead >= 0) {
256     clientAddress_.setFromSockaddr(rawAddr, addrLen);
257
258     if (bytesRead > 0) {
259       bool truncated = false;
260       if ((size_t)bytesRead > len) {
261         truncated = true;
262         bytesRead = len;
263       }
264
265       readCallback_->onDataAvailable(clientAddress_, bytesRead, truncated);
266     }
267   } else {
268     if (errno == EAGAIN || errno == EWOULDBLOCK) {
269       // No data could be read without blocking the socket
270       return;
271     }
272
273     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
274                            "::recvfrom() failed",
275                            errno);
276
277     // In case of UDP we can continue reading from the socket
278     // even if the current request fails. We notify the user
279     // so that he can do some logging/stats collection if he wants.
280     auto cob = readCallback_;
281     readCallback_ = nullptr;
282
283     cob->onReadError(ex);
284     updateRegistration();
285   }
286 }
287
288 bool AsyncUDPSocket::updateRegistration() noexcept {
289   uint16_t flags = NONE;
290
291   if (readCallback_) {
292     flags |= READ;
293   }
294
295   return registerHandler(flags | PERSIST);
296 }
297
298 } // Namespace