5b12812616f60ef84c2c62c3f613d9bc255c2239
[folly.git] / folly / io / async / AsyncUDPSocket.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncUDPSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/Likely.h>
21
22 #include <errno.h>
23 #include <unistd.h>
24 #include <fcntl.h>
25
26 // Due to the way kernel headers are included, this may or may not be defined.
27 // Number pulled from 3.10 kernel headers.
28 #ifndef SO_REUSEPORT
29 #define SO_REUSEPORT 15
30 #endif
31
32 namespace folly {
33
34 AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
35     : EventHandler(CHECK_NOTNULL(evb)),
36       eventBase_(evb),
37       fd_(-1),
38       readCallback_(nullptr) {
39   DCHECK(evb->isInEventBaseThread());
40 }
41
42 AsyncUDPSocket::~AsyncUDPSocket() {
43   if (fd_ != -1) {
44     close();
45   }
46 }
47
48 void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
49   int socket = ::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
50   if (socket == -1) {
51     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
52                               "error creating async udp socket",
53                               errno);
54   }
55
56   auto g = folly::makeGuard([&] { ::close(socket); });
57
58   // put the socket in non-blocking mode
59   int ret = fcntl(socket, F_SETFL, O_NONBLOCK);
60   if (ret != 0) {
61     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
62                               "failed to put socket in non-blocking mode",
63                               errno);
64   }
65
66   // put the socket in reuse mode
67   int value = 1;
68   if (setsockopt(socket,
69                  SOL_SOCKET,
70                  SO_REUSEADDR,
71                  &value,
72                  sizeof(value)) != 0) {
73     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
74                               "failed to put socket in reuse mode",
75                               errno);
76   }
77
78   if (reusePort_) {
79     // put the socket in port reuse mode
80     int value = 1;
81     if (setsockopt(socket,
82                    SOL_SOCKET,
83                    SO_REUSEPORT,
84                    &value,
85                    sizeof(value)) != 0) {
86       ::close(socket);
87       throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
88                                 "failed to put socket in reuse_port mode",
89                                 errno);
90
91     }
92   }
93
94   // bind to the address
95   sockaddr_storage addrStorage;
96   address.getAddress(&addrStorage);
97   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
98   if (::bind(socket, saddr, address.getActualSize()) != 0) {
99     throw AsyncSocketException(
100         AsyncSocketException::NOT_OPEN,
101         "failed to bind the async udp socket for:" + address.describe(),
102         errno);
103   }
104
105   // success
106   g.dismiss();
107   fd_ = socket;
108   ownership_ = FDOwnership::OWNS;
109
110   // attach to EventHandler
111   EventHandler::changeHandlerFD(fd_);
112
113   if (address.getPort() != 0) {
114     localAddress_ = address;
115   } else {
116     localAddress_.setFromLocalAddress(fd_);
117   }
118 }
119
120 void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
121   CHECK_EQ(-1, fd_) << "Already bound to another FD";
122
123   fd_ = fd;
124   ownership_ = ownership;
125
126   EventHandler::changeHandlerFD(fd_);
127   localAddress_.setFromLocalAddress(fd_);
128 }
129
130 ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
131                                const std::unique_ptr<folly::IOBuf>& buf) {
132   // UDP's typical MTU size is 1500, so high number of buffers
133   //   really do not make sense. Optimze for buffer chains with
134   //   buffers less than 16, which is the highest I can think of
135   //   for a real use case.
136   iovec vec[16];
137   size_t iovec_len = buf->fillIov(vec, sizeof(vec)/sizeof(vec[0]));
138   if (UNLIKELY(iovec_len == 0)) {
139     buf->coalesce();
140     vec[0].iov_base = const_cast<uint8_t*>(buf->data());
141     vec[0].iov_len = buf->length();
142     iovec_len = 1;
143   }
144
145   return writev(address, vec, iovec_len);
146 }
147
148 ssize_t AsyncUDPSocket::writev(const folly::SocketAddress& address,
149                                const struct iovec* vec, size_t iovec_len) {
150   CHECK_NE(-1, fd_) << "Socket not yet bound";
151
152   sockaddr_storage addrStorage;
153   address.getAddress(&addrStorage);
154
155   struct msghdr msg;
156   msg.msg_name = reinterpret_cast<void*>(&addrStorage);
157   msg.msg_namelen = address.getActualSize();
158   msg.msg_iov = const_cast<struct iovec*>(vec);
159   msg.msg_iovlen = iovec_len;
160   msg.msg_control = nullptr;
161   msg.msg_controllen = 0;
162   msg.msg_flags = 0;
163
164   return ::sendmsg(fd_, &msg, 0);
165 }
166
167 void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
168   CHECK(!readCallback_) << "Another read callback already installed";
169   CHECK_NE(-1, fd_) << "UDP server socket not yet bind to an address";
170
171   readCallback_ = CHECK_NOTNULL(cob);
172   if (!updateRegistration()) {
173     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
174                            "failed to register for accept events");
175
176     readCallback_ = nullptr;
177     cob->onReadError(ex);
178     return;
179   }
180 }
181
182 void AsyncUDPSocket::pauseRead() {
183   // It is ok to pause an already paused socket
184   readCallback_ = nullptr;
185   updateRegistration();
186 }
187
188 void AsyncUDPSocket::close() {
189   DCHECK(eventBase_->isInEventBaseThread());
190
191   if (readCallback_) {
192     auto cob = readCallback_;
193     readCallback_ = nullptr;
194
195     cob->onReadClosed();
196   }
197
198   // Unregister any events we are registered for
199   unregisterHandler();
200
201   if (fd_ != -1 && ownership_ == FDOwnership::OWNS) {
202     ::close(fd_);
203   }
204
205   fd_ = -1;
206 }
207
208 void AsyncUDPSocket::handlerReady(uint16_t events) noexcept {
209   if (events & EventHandler::READ) {
210     DCHECK(readCallback_);
211     handleRead();
212   }
213 }
214
215 void AsyncUDPSocket::handleRead() noexcept {
216   void* buf{nullptr};
217   size_t len{0};
218
219   readCallback_->getReadBuffer(&buf, &len);
220   if (buf == nullptr || len == 0) {
221     AsyncSocketException ex(
222         AsyncSocketException::BAD_ARGS,
223         "AsyncUDPSocket::getReadBuffer() returned empty buffer");
224
225
226     auto cob = readCallback_;
227     readCallback_ = nullptr;
228
229     cob->onReadError(ex);
230     updateRegistration();
231     return;
232   }
233
234   struct sockaddr_storage addrStorage;
235   socklen_t addrLen = sizeof(addrStorage);
236   memset(&addrStorage, 0, addrLen);
237   struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
238   rawAddr->sa_family = localAddress_.getFamily();
239
240   ssize_t bytesRead = ::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
241   if (bytesRead >= 0) {
242     clientAddress_.setFromSockaddr(rawAddr, addrLen);
243
244     if (bytesRead > 0) {
245       bool truncated = false;
246       if ((size_t)bytesRead > len) {
247         truncated = true;
248         bytesRead = len;
249       }
250
251       readCallback_->onDataAvailable(clientAddress_, bytesRead, truncated);
252     }
253   } else {
254     if (errno == EAGAIN || errno == EWOULDBLOCK) {
255       // No data could be read without blocking the socket
256       return;
257     }
258
259     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
260                            "::recvfrom() failed",
261                            errno);
262
263     // In case of UDP we can continue reading from the socket
264     // even if the current request fails. We notify the user
265     // so that he can do some logging/stats collection if he wants.
266     auto cob = readCallback_;
267     readCallback_ = nullptr;
268
269     cob->onReadError(ex);
270     updateRegistration();
271   }
272 }
273
274 bool AsyncUDPSocket::updateRegistration() noexcept {
275   uint16_t flags = NONE;
276
277   if (readCallback_) {
278     flags |= READ;
279   }
280
281   return registerHandler(flags | PERSIST);
282 }
283
284 } // Namespace