Copyright 2014->2015
[folly.git] / folly / io / async / AsyncUDPSocket.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncUDPSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20
21 #include <errno.h>
22 #include <unistd.h>
23 #include <fcntl.h>
24
25 // Due to the way kernel headers are included, this may or may not be defined.
26 // Number pulled from 3.10 kernel headers.
27 #ifndef SO_REUSEPORT
28 #define SO_REUSEPORT 15
29 #endif
30
31 namespace folly {
32
33 AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
34     : EventHandler(CHECK_NOTNULL(evb)),
35       eventBase_(evb),
36       fd_(-1),
37       readCallback_(nullptr) {
38   DCHECK(evb->isInEventBaseThread());
39 }
40
41 AsyncUDPSocket::~AsyncUDPSocket() {
42   if (fd_ != -1) {
43     close();
44   }
45 }
46
47 void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
48   int socket = ::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
49   if (socket == -1) {
50     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
51                               "error creating async udp socket",
52                               errno);
53   }
54
55   auto g = folly::makeGuard([&] { ::close(socket); });
56
57   // put the socket in non-blocking mode
58   int ret = fcntl(socket, F_SETFL, O_NONBLOCK);
59   if (ret != 0) {
60     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
61                               "failed to put socket in non-blocking mode",
62                               errno);
63   }
64
65   // put the socket in reuse mode
66   int value = 1;
67   if (setsockopt(socket,
68                  SOL_SOCKET,
69                  SO_REUSEADDR,
70                  &value,
71                  sizeof(value)) != 0) {
72     throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
73                               "failed to put socket in reuse mode",
74                               errno);
75   }
76
77   if (reusePort_) {
78     // put the socket in port reuse mode
79     int value = 1;
80     if (setsockopt(socket,
81                    SOL_SOCKET,
82                    SO_REUSEPORT,
83                    &value,
84                    sizeof(value)) != 0) {
85       ::close(socket);
86       throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
87                                 "failed to put socket in reuse_port mode",
88                                 errno);
89
90     }
91   }
92
93   // bind to the address
94   sockaddr_storage addrStorage;
95   address.getAddress(&addrStorage);
96   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
97   if (::bind(socket, saddr, address.getActualSize()) != 0) {
98     throw AsyncSocketException(
99         AsyncSocketException::NOT_OPEN,
100         "failed to bind the async udp socket for:" + address.describe(),
101         errno);
102   }
103
104   // success
105   g.dismiss();
106   fd_ = socket;
107   ownership_ = FDOwnership::OWNS;
108
109   // attach to EventHandler
110   EventHandler::changeHandlerFD(fd_);
111
112   if (address.getPort() != 0) {
113     localAddress_ = address;
114   } else {
115     localAddress_.setFromLocalAddress(fd_);
116   }
117 }
118
119 void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
120   CHECK_EQ(-1, fd_) << "Already bound to another FD";
121
122   fd_ = fd;
123   ownership_ = ownership;
124
125   EventHandler::changeHandlerFD(fd_);
126   localAddress_.setFromLocalAddress(fd_);
127 }
128
129 ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
130                                const std::unique_ptr<folly::IOBuf>& buf) {
131   CHECK_NE(-1, fd_) << "Socket not yet bound";
132
133   // XXX: Use `sendmsg` instead of coalescing here
134   buf->coalesce();
135
136   sockaddr_storage addrStorage;
137   address.getAddress(&addrStorage);
138   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
139
140   return ::sendto(fd_,
141                   buf->data(),
142                   buf->length(),
143                   MSG_DONTWAIT,
144                   saddr,
145                   address.getActualSize());
146 }
147
148 void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
149   CHECK(!readCallback_) << "Another read callback already installed";
150   CHECK_NE(-1, fd_) << "UDP server socket not yet bind to an address";
151
152   readCallback_ = CHECK_NOTNULL(cob);
153   if (!updateRegistration()) {
154     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
155                            "failed to register for accept events");
156
157     readCallback_ = nullptr;
158     cob->onReadError(ex);
159     return;
160   }
161 }
162
163 void AsyncUDPSocket::pauseRead() {
164   // It is ok to pause an already paused socket
165   readCallback_ = nullptr;
166   updateRegistration();
167 }
168
169 void AsyncUDPSocket::close() {
170   DCHECK(eventBase_->isInEventBaseThread());
171
172   if (readCallback_) {
173     auto cob = readCallback_;
174     readCallback_ = nullptr;
175
176     cob->onReadClosed();
177   }
178
179   // Unregister any events we are registered for
180   unregisterHandler();
181
182   if (fd_ != -1 && ownership_ == FDOwnership::OWNS) {
183     ::close(fd_);
184   }
185
186   fd_ = -1;
187 }
188
189 void AsyncUDPSocket::handlerReady(uint16_t events) noexcept {
190   if (events & EventHandler::READ) {
191     DCHECK(readCallback_);
192     handleRead();
193   }
194 }
195
196 void AsyncUDPSocket::handleRead() noexcept {
197   void* buf{nullptr};
198   size_t len{0};
199
200   readCallback_->getReadBuffer(&buf, &len);
201   if (buf == nullptr || len == 0) {
202     AsyncSocketException ex(
203         AsyncSocketException::BAD_ARGS,
204         "AsyncUDPSocket::getReadBuffer() returned empty buffer");
205
206
207     auto cob = readCallback_;
208     readCallback_ = nullptr;
209
210     cob->onReadError(ex);
211     updateRegistration();
212     return;
213   }
214
215   struct sockaddr_storage addrStorage;
216   socklen_t addrLen = sizeof(addrStorage);
217   memset(&addrStorage, 0, addrLen);
218   struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
219   rawAddr->sa_family = localAddress_.getFamily();
220
221   ssize_t bytesRead = ::recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
222   if (bytesRead >= 0) {
223     clientAddress_.setFromSockaddr(rawAddr, addrLen);
224
225     if (bytesRead > 0) {
226       bool truncated = false;
227       if ((size_t)bytesRead > len) {
228         truncated = true;
229         bytesRead = len;
230       }
231
232       readCallback_->onDataAvailable(clientAddress_, bytesRead, truncated);
233     }
234   } else {
235     if (errno == EAGAIN || errno == EWOULDBLOCK) {
236       // No data could be read without blocking the socket
237       return;
238     }
239
240     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
241                            "::recvfrom() failed",
242                            errno);
243
244     // In case of UDP we can continue reading from the socket
245     // even if the current request fails. We notify the user
246     // so that he can do some logging/stats collection if he wants.
247     auto cob = readCallback_;
248     readCallback_ = nullptr;
249
250     cob->onReadError(ex);
251     updateRegistration();
252   }
253 }
254
255 bool AsyncUDPSocket::updateRegistration() noexcept {
256   uint16_t flags = NONE;
257
258   if (readCallback_) {
259     flags |= READ;
260   }
261
262   return registerHandler(flags | PERSIST);
263 }
264
265 } // Namespace