Return if we handle any error messages to avoid unnecessarily calling recv/send
[folly.git] / folly / io / async / AsyncServerSocket.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef __STDC_FORMAT_MACROS
18   #define __STDC_FORMAT_MACROS
19 #endif
20
21 #include <folly/io/async/AsyncServerSocket.h>
22
23 #include <folly/FileUtil.h>
24 #include <folly/Portability.h>
25 #include <folly/SocketAddress.h>
26 #include <folly/String.h>
27 #include <folly/detail/SocketFastOpen.h>
28 #include <folly/io/async/EventBase.h>
29 #include <folly/io/async/NotificationQueue.h>
30 #include <folly/portability/Fcntl.h>
31 #include <folly/portability/Sockets.h>
32 #include <folly/portability/Unistd.h>
33
34 #include <errno.h>
35 #include <string.h>
36 #include <sys/types.h>
37
38 namespace fsp = folly::portability::sockets;
39
40 namespace folly {
41
42 static constexpr bool msgErrQueueSupported =
43 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
44     true;
45 #else
46     false;
47 #endif // FOLLY_HAVE_MSG_ERRQUEUE
48
49 const uint32_t AsyncServerSocket::kDefaultMaxAcceptAtOnce;
50 const uint32_t AsyncServerSocket::kDefaultCallbackAcceptAtOnce;
51 const uint32_t AsyncServerSocket::kDefaultMaxMessagesInQueue;
52
53 int setCloseOnExec(int fd, int value) {
54   // Read the current flags
55   int old_flags = fcntl(fd, F_GETFD, 0);
56
57   // If reading the flags failed, return error indication now
58   if (old_flags < 0) {
59     return -1;
60   }
61
62   // Set just the flag we want to set
63   int new_flags;
64   if (value != 0) {
65     new_flags = old_flags | FD_CLOEXEC;
66   } else {
67     new_flags = old_flags & ~FD_CLOEXEC;
68   }
69
70   // Store modified flag word in the descriptor
71   return fcntl(fd, F_SETFD, new_flags);
72 }
73
74 void AsyncServerSocket::RemoteAcceptor::start(
75   EventBase* eventBase, uint32_t maxAtOnce, uint32_t maxInQueue) {
76   setMaxReadAtOnce(maxAtOnce);
77   queue_.setMaxQueueSize(maxInQueue);
78
79   if (!eventBase->runInEventBaseThread([=](){
80         callback_->acceptStarted();
81         this->startConsuming(eventBase, &queue_);
82       })) {
83     throw std::invalid_argument("unable to start waiting on accept "
84                             "notification queue in the specified "
85                             "EventBase thread");
86   }
87 }
88
89 void AsyncServerSocket::RemoteAcceptor::stop(
90   EventBase* eventBase, AcceptCallback* callback) {
91   if (!eventBase->runInEventBaseThread([=](){
92         callback->acceptStopped();
93         delete this;
94       })) {
95     throw std::invalid_argument("unable to start waiting on accept "
96                             "notification queue in the specified "
97                             "EventBase thread");
98   }
99 }
100
101 void AsyncServerSocket::RemoteAcceptor::messageAvailable(
102     QueueMessage&& msg) noexcept {
103   switch (msg.type) {
104     case MessageType::MSG_NEW_CONN:
105     {
106       if (connectionEventCallback_) {
107         connectionEventCallback_->onConnectionDequeuedByAcceptorCallback(
108             msg.fd, msg.address);
109       }
110       callback_->connectionAccepted(msg.fd, msg.address);
111       break;
112     }
113     case MessageType::MSG_ERROR:
114     {
115       std::runtime_error ex(msg.msg);
116       callback_->acceptError(ex);
117       break;
118     }
119     default:
120     {
121       LOG(ERROR) << "invalid accept notification message type "
122                  << int(msg.type);
123       std::runtime_error ex(
124         "received invalid accept notification message type");
125       callback_->acceptError(ex);
126     }
127   }
128 }
129
130 /*
131  * AsyncServerSocket::BackoffTimeout
132  */
133 class AsyncServerSocket::BackoffTimeout : public AsyncTimeout {
134  public:
135   // Disallow copy, move, and default constructors.
136   BackoffTimeout(BackoffTimeout&&) = delete;
137   explicit BackoffTimeout(AsyncServerSocket* socket)
138       : AsyncTimeout(socket->getEventBase()), socket_(socket) {}
139
140   void timeoutExpired() noexcept override { socket_->backoffTimeoutExpired(); }
141
142  private:
143   AsyncServerSocket* socket_;
144 };
145
146 /*
147  * AsyncServerSocket methods
148  */
149
150 AsyncServerSocket::AsyncServerSocket(EventBase* eventBase)
151     : eventBase_(eventBase),
152       accepting_(false),
153       maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce),
154       maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue),
155       acceptRateAdjustSpeed_(0),
156       acceptRate_(1),
157       lastAccepTimestamp_(std::chrono::steady_clock::now()),
158       numDroppedConnections_(0),
159       callbackIndex_(0),
160       backoffTimeout_(nullptr),
161       callbacks_(),
162       keepAliveEnabled_(true),
163       closeOnExec_(true) {}
164
165 void AsyncServerSocket::setShutdownSocketSet(
166     const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
167   const auto newSS = wNewSS.lock();
168   const auto shutdownSocketSet = wShutdownSocketSet_.lock();
169
170   if (shutdownSocketSet == newSS) {
171     return;
172   }
173
174   if (shutdownSocketSet) {
175     for (auto& h : sockets_) {
176       shutdownSocketSet->remove(h.socket_);
177     }
178   }
179
180   if (newSS) {
181     for (auto& h : sockets_) {
182       newSS->add(h.socket_);
183     }
184   }
185
186   wShutdownSocketSet_ = wNewSS;
187 }
188
189 AsyncServerSocket::~AsyncServerSocket() {
190   assert(callbacks_.empty());
191 }
192
193 int AsyncServerSocket::stopAccepting(int shutdownFlags) {
194   int result = 0;
195   for (auto& handler : sockets_) {
196     VLOG(10) << "AsyncServerSocket::stopAccepting " << this <<
197               handler.socket_;
198   }
199   if (eventBase_) {
200     eventBase_->dcheckIsInEventBaseThread();
201   }
202
203   // When destroy is called, unregister and close the socket immediately.
204   accepting_ = false;
205
206   // Close the sockets in reverse order as they were opened to avoid
207   // the condition where another process concurrently tries to open
208   // the same port, succeed to bind the first socket but fails on the
209   // second because it hasn't been closed yet.
210   for (; !sockets_.empty(); sockets_.pop_back()) {
211     auto& handler = sockets_.back();
212     handler.unregisterHandler();
213     if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
214       shutdownSocketSet->close(handler.socket_);
215     } else if (shutdownFlags >= 0) {
216       result = shutdownNoInt(handler.socket_, shutdownFlags);
217       pendingCloseSockets_.push_back(handler.socket_);
218     } else {
219       closeNoInt(handler.socket_);
220     }
221   }
222
223   // Destroy the backoff timout.  This will cancel it if it is running.
224   delete backoffTimeout_;
225   backoffTimeout_ = nullptr;
226
227   // Close all of the callback queues to notify them that they are being
228   // destroyed.  No one should access the AsyncServerSocket any more once
229   // destroy() is called.  However, clear out callbacks_ before invoking the
230   // accept callbacks just in case.  This will potentially help us detect the
231   // bug if one of the callbacks calls addAcceptCallback() or
232   // removeAcceptCallback().
233   std::vector<CallbackInfo> callbacksCopy;
234   callbacks_.swap(callbacksCopy);
235   for (std::vector<CallbackInfo>::iterator it = callbacksCopy.begin();
236        it != callbacksCopy.end();
237        ++it) {
238     // consumer may not be set if we are running in primary event base
239     if (it->consumer) {
240       DCHECK(it->eventBase);
241       it->consumer->stop(it->eventBase, it->callback);
242     } else {
243       DCHECK(it->callback);
244       it->callback->acceptStopped();
245     }
246   }
247
248   return result;
249 }
250
251 void AsyncServerSocket::destroy() {
252   stopAccepting();
253   for (auto s : pendingCloseSockets_) {
254     closeNoInt(s);
255   }
256   // Then call DelayedDestruction::destroy() to take care of
257   // whether or not we need immediate or delayed destruction
258   DelayedDestruction::destroy();
259 }
260
261 void AsyncServerSocket::attachEventBase(EventBase *eventBase) {
262   assert(eventBase_ == nullptr);
263   eventBase->dcheckIsInEventBaseThread();
264
265   eventBase_ = eventBase;
266   for (auto& handler : sockets_) {
267     handler.attachEventBase(eventBase);
268   }
269 }
270
271 void AsyncServerSocket::detachEventBase() {
272   assert(eventBase_ != nullptr);
273   eventBase_->dcheckIsInEventBaseThread();
274   assert(!accepting_);
275
276   eventBase_ = nullptr;
277   for (auto& handler : sockets_) {
278     handler.detachEventBase();
279   }
280 }
281
282 void AsyncServerSocket::useExistingSockets(const std::vector<int>& fds) {
283   if (eventBase_) {
284     eventBase_->dcheckIsInEventBaseThread();
285   }
286
287   if (sockets_.size() > 0) {
288     throw std::invalid_argument(
289                               "cannot call useExistingSocket() on a "
290                               "AsyncServerSocket that already has a socket");
291   }
292
293   for (auto fd: fds) {
294     // Set addressFamily_ from this socket.
295     // Note that the socket may not have been bound yet, but
296     // setFromLocalAddress() will still work and get the correct address family.
297     // We will update addressFamily_ again anyway if bind() is called later.
298     SocketAddress address;
299     address.setFromLocalAddress(fd);
300
301 #if __linux__
302     if (noTransparentTls_) {
303       // Ignore return value, errors are ok
304       setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
305     }
306 #endif
307
308     setupSocket(fd, address.getFamily());
309     sockets_.emplace_back(eventBase_, fd, this, address.getFamily());
310     sockets_.back().changeHandlerFD(fd);
311   }
312 }
313
314 void AsyncServerSocket::useExistingSocket(int fd) {
315   useExistingSockets({fd});
316 }
317
318 void AsyncServerSocket::bindSocket(
319     int fd,
320     const SocketAddress& address,
321     bool isExistingSocket) {
322   sockaddr_storage addrStorage;
323   address.getAddress(&addrStorage);
324   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
325
326   if (fsp::bind(fd, saddr, address.getActualSize()) != 0) {
327     if (!isExistingSocket) {
328       closeNoInt(fd);
329     }
330     folly::throwSystemError(errno,
331         "failed to bind to async server socket: " +
332         address.describe());
333   }
334
335 #if __linux__
336   if (noTransparentTls_) {
337     // Ignore return value, errors are ok
338     setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
339   }
340 #endif
341
342   // If we just created this socket, update the EventHandler and set socket_
343   if (!isExistingSocket) {
344     sockets_.emplace_back(eventBase_, fd, this, address.getFamily());
345   }
346 }
347
348 bool AsyncServerSocket::setZeroCopy(bool enable) {
349   if (msgErrQueueSupported) {
350     int fd = getSocket();
351     int val = enable ? 1 : 0;
352     int ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
353
354     return (0 == ret);
355   }
356
357   return false;
358 }
359
360 void AsyncServerSocket::bind(const SocketAddress& address) {
361   if (eventBase_) {
362     eventBase_->dcheckIsInEventBaseThread();
363   }
364
365   // useExistingSocket() may have been called to initialize socket_ already.
366   // However, in the normal case we need to create a new socket now.
367   // Don't set socket_ yet, so that socket_ will remain uninitialized if an
368   // error occurs.
369   int fd;
370   if (sockets_.size() == 0) {
371     fd = createSocket(address.getFamily());
372   } else if (sockets_.size() == 1) {
373     if (address.getFamily() != sockets_[0].addressFamily_) {
374       throw std::invalid_argument(
375                                 "Attempted to bind address to socket with "
376                                 "different address family");
377     }
378     fd = sockets_[0].socket_;
379   } else {
380     throw std::invalid_argument(
381                               "Attempted to bind to multiple fds");
382   }
383
384   bindSocket(fd, address, !sockets_.empty());
385 }
386
387 void AsyncServerSocket::bind(
388     const std::vector<IPAddress>& ipAddresses,
389     uint16_t port) {
390   if (ipAddresses.empty()) {
391     throw std::invalid_argument("No ip addresses were provided");
392   }
393   if (!sockets_.empty()) {
394     throw std::invalid_argument("Cannot call bind on a AsyncServerSocket "
395                                 "that already has a socket.");
396   }
397
398   for (const IPAddress& ipAddress : ipAddresses) {
399     SocketAddress address(ipAddress.toFullyQualified(), port);
400     int fd = createSocket(address.getFamily());
401
402     bindSocket(fd, address, false);
403   }
404   if (sockets_.size() == 0) {
405     throw std::runtime_error(
406         "did not bind any async server socket for port and addresses");
407   }
408 }
409
410 void AsyncServerSocket::bind(uint16_t port) {
411   struct addrinfo hints, *res0;
412   char sport[sizeof("65536")];
413
414   memset(&hints, 0, sizeof(hints));
415   hints.ai_family = AF_UNSPEC;
416   hints.ai_socktype = SOCK_STREAM;
417   hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
418   snprintf(sport, sizeof(sport), "%u", port);
419
420   // On Windows the value we need to pass to bind to all available
421   // addresses is an empty string. Everywhere else, it's nullptr.
422   constexpr const char* kWildcardNode = kIsWindows ? "" : nullptr;
423   if (getaddrinfo(kWildcardNode, sport, &hints, &res0)) {
424     throw std::invalid_argument(
425                               "Attempted to bind address to socket with "
426                               "bad getaddrinfo");
427   }
428
429   SCOPE_EXIT { freeaddrinfo(res0); };
430
431   auto setupAddress = [&] (struct addrinfo* res) {
432     int s = fsp::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
433     // IPv6/IPv4 may not be supported by the kernel
434     if (s < 0 && errno == EAFNOSUPPORT) {
435       return;
436     }
437     CHECK_GE(s, 0);
438
439     try {
440       setupSocket(s, res->ai_family);
441     } catch (...) {
442       closeNoInt(s);
443       throw;
444     }
445
446     if (res->ai_family == AF_INET6) {
447       int v6only = 1;
448       CHECK(0 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY,
449                             &v6only, sizeof(v6only)));
450     }
451
452     // Bind to the socket
453     if (fsp::bind(s, res->ai_addr, socklen_t(res->ai_addrlen)) != 0) {
454       folly::throwSystemError(
455           errno,
456           "failed to bind to async server socket for port ",
457           SocketAddress::getPortFrom(res->ai_addr),
458           " family ",
459           SocketAddress::getFamilyNameFrom(res->ai_addr, "<unknown>"));
460     }
461
462 #if __linux__
463     if (noTransparentTls_) {
464       // Ignore return value, errors are ok
465       setsockopt(s, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
466     }
467 #endif
468
469     SocketAddress address;
470     address.setFromLocalAddress(s);
471
472     sockets_.emplace_back(eventBase_, s, this, address.getFamily());
473   };
474
475   const int kNumTries = 25;
476   for (int tries = 1; true; tries++) {
477     // Prefer AF_INET6 addresses. RFC 3484 mandates that getaddrinfo
478     // should return IPv6 first and then IPv4 addresses, but glibc's
479     // getaddrinfo(nullptr) with AI_PASSIVE returns:
480     // - 0.0.0.0 (IPv4-only)
481     // - :: (IPv6+IPv4) in this order
482     // See: https://sourceware.org/bugzilla/show_bug.cgi?id=9981
483     for (struct addrinfo* res = res0; res; res = res->ai_next) {
484       if (res->ai_family == AF_INET6) {
485         setupAddress(res);
486       }
487     }
488
489     // If port == 0, then we should try to bind to the same port on ipv4 and
490     // ipv6.  So if we did bind to ipv6, figure out that port and use it.
491     if (sockets_.size() == 1 && port == 0) {
492       SocketAddress address;
493       address.setFromLocalAddress(sockets_.back().socket_);
494       snprintf(sport, sizeof(sport), "%u", address.getPort());
495       freeaddrinfo(res0);
496       CHECK_EQ(0, getaddrinfo(nullptr, sport, &hints, &res0));
497     }
498
499     try {
500       for (struct addrinfo* res = res0; res; res = res->ai_next) {
501         if (res->ai_family != AF_INET6) {
502           setupAddress(res);
503         }
504       }
505     } catch (const std::system_error&) {
506       // If we can't bind to the same port on ipv4 as ipv6 when using
507       // port=0 then we will retry again before giving up after
508       // kNumTries attempts.  We do this by closing the sockets that
509       // were opened, then restarting from scratch.
510       if (port == 0 && !sockets_.empty() && tries != kNumTries) {
511         for (const auto& socket : sockets_) {
512           if (socket.socket_ <= 0) {
513             continue;
514           } else if (
515               const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
516             shutdownSocketSet->close(socket.socket_);
517           } else {
518             closeNoInt(socket.socket_);
519           }
520         }
521         sockets_.clear();
522         snprintf(sport, sizeof(sport), "%u", port);
523         freeaddrinfo(res0);
524         CHECK_EQ(0, getaddrinfo(nullptr, sport, &hints, &res0));
525         continue;
526       }
527
528       throw;
529     }
530
531     break;
532   }
533
534   if (sockets_.size() == 0) {
535     throw std::runtime_error(
536         "did not bind any async server socket for port");
537   }
538 }
539
540 void AsyncServerSocket::listen(int backlog) {
541   if (eventBase_) {
542     eventBase_->dcheckIsInEventBaseThread();
543   }
544
545   // Start listening
546   for (auto& handler : sockets_) {
547     if (fsp::listen(handler.socket_, backlog) == -1) {
548       folly::throwSystemError(errno,
549                                     "failed to listen on async server socket");
550     }
551   }
552 }
553
554 void AsyncServerSocket::getAddress(SocketAddress* addressReturn) const {
555   CHECK(sockets_.size() >= 1);
556   VLOG_IF(2, sockets_.size() > 1)
557     << "Warning: getAddress() called and multiple addresses available ("
558     << sockets_.size() << "). Returning only the first one.";
559
560   addressReturn->setFromLocalAddress(sockets_[0].socket_);
561 }
562
563 std::vector<SocketAddress> AsyncServerSocket::getAddresses()
564     const {
565   CHECK(sockets_.size() >= 1);
566   auto tsaVec = std::vector<SocketAddress>(sockets_.size());
567   auto tsaIter = tsaVec.begin();
568   for (const auto& socket : sockets_) {
569     (tsaIter++)->setFromLocalAddress(socket.socket_);
570   };
571   return tsaVec;
572 }
573
574 void AsyncServerSocket::addAcceptCallback(AcceptCallback *callback,
575                                            EventBase *eventBase,
576                                            uint32_t maxAtOnce) {
577   if (eventBase_) {
578     eventBase_->dcheckIsInEventBaseThread();
579   }
580
581   // If this is the first accept callback and we are supposed to be accepting,
582   // start accepting once the callback is installed.
583   bool runStartAccepting = accepting_ && callbacks_.empty();
584
585   callbacks_.emplace_back(callback, eventBase);
586
587   SCOPE_SUCCESS {
588     // If this is the first accept callback and we are supposed to be accepting,
589     // start accepting.
590     if (runStartAccepting) {
591       startAccepting();
592     }
593   };
594
595   if (!eventBase) {
596     // Run in AsyncServerSocket's eventbase; notify that we are
597     // starting to accept connections
598     callback->acceptStarted();
599     return;
600   }
601
602   // Start the remote acceptor.
603   //
604   // It would be nice if we could avoid starting the remote acceptor if
605   // eventBase == eventBase_.  However, that would cause issues if
606   // detachEventBase() and attachEventBase() were ever used to change the
607   // primary EventBase for the server socket.  Therefore we require the caller
608   // to specify a nullptr EventBase if they want to ensure that the callback is
609   // always invoked in the primary EventBase, and to be able to invoke that
610   // callback more efficiently without having to use a notification queue.
611   RemoteAcceptor* acceptor = nullptr;
612   try {
613     acceptor = new RemoteAcceptor(callback, connectionEventCallback_);
614     acceptor->start(eventBase, maxAtOnce, maxNumMsgsInQueue_);
615   } catch (...) {
616     callbacks_.pop_back();
617     delete acceptor;
618     throw;
619   }
620   callbacks_.back().consumer = acceptor;
621 }
622
623 void AsyncServerSocket::removeAcceptCallback(AcceptCallback *callback,
624                                               EventBase *eventBase) {
625   if (eventBase_) {
626     eventBase_->dcheckIsInEventBaseThread();
627   }
628
629   // Find the matching AcceptCallback.
630   // We just do a simple linear search; we don't expect removeAcceptCallback()
631   // to be called frequently, and we expect there to only be a small number of
632   // callbacks anyway.
633   std::vector<CallbackInfo>::iterator it = callbacks_.begin();
634   uint32_t n = 0;
635   while (true) {
636     if (it == callbacks_.end()) {
637       throw std::runtime_error("AsyncServerSocket::removeAcceptCallback(): "
638                               "accept callback not found");
639     }
640     if (it->callback == callback &&
641         (it->eventBase == eventBase || eventBase == nullptr)) {
642       break;
643     }
644     ++it;
645     ++n;
646   }
647
648   // Remove this callback from callbacks_.
649   //
650   // Do this before invoking the acceptStopped() callback, in case
651   // acceptStopped() invokes one of our methods that examines callbacks_.
652   //
653   // Save a copy of the CallbackInfo first.
654   CallbackInfo info(*it);
655   callbacks_.erase(it);
656   if (n < callbackIndex_) {
657     // We removed an element before callbackIndex_.  Move callbackIndex_ back
658     // one step, since things after n have been shifted back by 1.
659     --callbackIndex_;
660   } else {
661     // We removed something at or after callbackIndex_.
662     // If we removed the last element and callbackIndex_ was pointing at it,
663     // we need to reset callbackIndex_ to 0.
664     if (callbackIndex_ >= callbacks_.size()) {
665       callbackIndex_ = 0;
666     }
667   }
668
669   if (info.consumer) {
670     // consumer could be nullptr is we run callbacks in primary event
671     // base
672     DCHECK(info.eventBase);
673     info.consumer->stop(info.eventBase, info.callback);
674   } else {
675     // callback invoked in the primary event base, just call directly
676     DCHECK(info.callback);
677     callback->acceptStopped();
678   }
679
680   // If we are supposed to be accepting but the last accept callback
681   // was removed, unregister for events until a callback is added.
682   if (accepting_ && callbacks_.empty()) {
683     for (auto& handler : sockets_) {
684       handler.unregisterHandler();
685     }
686   }
687 }
688
689 void AsyncServerSocket::startAccepting() {
690   if (eventBase_) {
691     eventBase_->dcheckIsInEventBaseThread();
692   }
693
694   accepting_ = true;
695   if (callbacks_.empty()) {
696     // We can't actually begin accepting if no callbacks are defined.
697     // Wait until a callback is added to start accepting.
698     return;
699   }
700
701   for (auto& handler : sockets_) {
702     if (!handler.registerHandler(
703           EventHandler::READ | EventHandler::PERSIST)) {
704       throw std::runtime_error("failed to register for accept events");
705     }
706   }
707 }
708
709 void AsyncServerSocket::pauseAccepting() {
710   if (eventBase_) {
711     eventBase_->dcheckIsInEventBaseThread();
712   }
713   accepting_ = false;
714   for (auto& handler : sockets_) {
715    handler. unregisterHandler();
716   }
717
718   // If we were in the accept backoff state, disable the backoff timeout
719   if (backoffTimeout_) {
720     backoffTimeout_->cancelTimeout();
721   }
722 }
723
724 int AsyncServerSocket::createSocket(int family) {
725   int fd = fsp::socket(family, SOCK_STREAM, 0);
726   if (fd == -1) {
727     folly::throwSystemError(errno, "error creating async server socket");
728   }
729
730   try {
731     setupSocket(fd, family);
732   } catch (...) {
733     closeNoInt(fd);
734     throw;
735   }
736   return fd;
737 }
738
739 void AsyncServerSocket::setupSocket(int fd, int family) {
740   // Put the socket in non-blocking mode
741   if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) {
742     folly::throwSystemError(errno,
743                             "failed to put socket in non-blocking mode");
744   }
745
746   // Set reuseaddr to avoid 2MSL delay on server restart
747   int one = 1;
748   if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) != 0) {
749     // This isn't a fatal error; just log an error message and continue
750     LOG(ERROR) << "failed to set SO_REUSEADDR on async server socket " << errno;
751   }
752
753   // Set reuseport to support multiple accept threads
754   int zero = 0;
755   if (reusePortEnabled_ &&
756       setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(int)) != 0) {
757     LOG(ERROR) << "failed to set SO_REUSEPORT on async server socket "
758                << strerror(errno);
759 #ifdef WIN32
760     folly::throwSystemError(errno, "failed to bind to the async server socket");
761 #else
762     SocketAddress address;
763     address.setFromLocalAddress(fd);
764     folly::throwSystemError(errno,
765                             "failed to bind to async server socket: " +
766                             address.describe());
767 #endif
768   }
769
770   // Set keepalive as desired
771   if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE,
772                  (keepAliveEnabled_) ? &one : &zero, sizeof(int)) != 0) {
773     LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: " <<
774             strerror(errno);
775   }
776
777   // Setup FD_CLOEXEC flag
778   if (closeOnExec_ &&
779       (-1 == folly::setCloseOnExec(fd, closeOnExec_))) {
780     LOG(ERROR) << "failed to set FD_CLOEXEC on async server socket: " <<
781             strerror(errno);
782   }
783
784   // Set TCP nodelay if available, MAC OS X Hack
785   // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
786 #ifndef TCP_NOPUSH
787   if (family != AF_UNIX) {
788     if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) != 0) {
789       // This isn't a fatal error; just log an error message and continue
790       LOG(ERROR) << "failed to set TCP_NODELAY on async server socket: " <<
791               strerror(errno);
792     }
793   }
794 #endif
795
796 #if FOLLY_ALLOW_TFO
797   if (tfo_ && detail::tfo_enable(fd, tfoMaxQueueSize_) != 0) {
798     // This isn't a fatal error; just log an error message and continue
799     LOG(WARNING) << "failed to set TCP_FASTOPEN on async server socket: "
800                  << folly::errnoStr(errno);
801   }
802 #endif
803
804   if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
805     shutdownSocketSet->add(fd);
806   }
807 }
808
809 void AsyncServerSocket::handlerReady(uint16_t /* events */,
810                                      int fd,
811                                      sa_family_t addressFamily) noexcept {
812   assert(!callbacks_.empty());
813   DestructorGuard dg(this);
814
815   // Only accept up to maxAcceptAtOnce_ connections at a time,
816   // to avoid starving other I/O handlers using this EventBase.
817   for (uint32_t n = 0; n < maxAcceptAtOnce_; ++n) {
818     SocketAddress address;
819
820     sockaddr_storage addrStorage;
821     socklen_t addrLen = sizeof(addrStorage);
822     sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
823
824     // In some cases, accept() doesn't seem to update these correctly.
825     saddr->sa_family = addressFamily;
826     if (addressFamily == AF_UNIX) {
827       addrLen = sizeof(struct sockaddr_un);
828     }
829
830     // Accept a new client socket
831 #ifdef SOCK_NONBLOCK
832     int clientSocket = accept4(fd, saddr, &addrLen, SOCK_NONBLOCK);
833 #else
834     int clientSocket = accept(fd, saddr, &addrLen);
835 #endif
836
837     address.setFromSockaddr(saddr, addrLen);
838
839     if (clientSocket >= 0 && connectionEventCallback_) {
840       connectionEventCallback_->onConnectionAccepted(clientSocket, address);
841     }
842
843     std::chrono::time_point<std::chrono::steady_clock> nowMs =
844       std::chrono::steady_clock::now();
845     auto timeSinceLastAccept = std::max<int64_t>(
846       0,
847       nowMs.time_since_epoch().count() -
848       lastAccepTimestamp_.time_since_epoch().count());
849     lastAccepTimestamp_ = nowMs;
850     if (acceptRate_ < 1) {
851       acceptRate_ *= 1 + acceptRateAdjustSpeed_ * timeSinceLastAccept;
852       if (acceptRate_ >= 1) {
853         acceptRate_ = 1;
854       } else if (rand() > acceptRate_ * RAND_MAX) {
855         ++numDroppedConnections_;
856         if (clientSocket >= 0) {
857           closeNoInt(clientSocket);
858           if (connectionEventCallback_) {
859             connectionEventCallback_->onConnectionDropped(clientSocket,
860                                                           address);
861           }
862         }
863         continue;
864       }
865     }
866
867     if (clientSocket < 0) {
868       if (errno == EAGAIN) {
869         // No more sockets to accept right now.
870         // Check for this code first, since it's the most common.
871         return;
872       } else if (errno == EMFILE || errno == ENFILE) {
873         // We're out of file descriptors.  Perhaps we're accepting connections
874         // too quickly. Pause accepting briefly to back off and give the server
875         // a chance to recover.
876         LOG(ERROR) << "accept failed: out of file descriptors; entering accept "
877                 "back-off state";
878         enterBackoff();
879
880         // Dispatch the error message
881         dispatchError("accept() failed", errno);
882       } else {
883         dispatchError("accept() failed", errno);
884       }
885       if (connectionEventCallback_) {
886         connectionEventCallback_->onConnectionAcceptError(errno);
887       }
888       return;
889     }
890
891 #ifndef SOCK_NONBLOCK
892     // Explicitly set the new connection to non-blocking mode
893     if (fcntl(clientSocket, F_SETFL, O_NONBLOCK) != 0) {
894       closeNoInt(clientSocket);
895       dispatchError("failed to set accepted socket to non-blocking mode",
896                     errno);
897       if (connectionEventCallback_) {
898         connectionEventCallback_->onConnectionDropped(clientSocket, address);
899       }
900       return;
901     }
902 #endif
903
904     // Inform the callback about the new connection
905     dispatchSocket(clientSocket, std::move(address));
906
907     // If we aren't accepting any more, break out of the loop
908     if (!accepting_ || callbacks_.empty()) {
909       break;
910     }
911   }
912 }
913
914 void AsyncServerSocket::dispatchSocket(int socket,
915                                         SocketAddress&& address) {
916   uint32_t startingIndex = callbackIndex_;
917
918   // Short circuit if the callback is in the primary EventBase thread
919
920   CallbackInfo *info = nextCallback();
921   if (info->eventBase == nullptr) {
922     info->callback->connectionAccepted(socket, address);
923     return;
924   }
925
926   const SocketAddress addr(address);
927   // Create a message to send over the notification queue
928   QueueMessage msg;
929   msg.type = MessageType::MSG_NEW_CONN;
930   msg.address = std::move(address);
931   msg.fd = socket;
932
933   // Loop until we find a free queue to write to
934   while (true) {
935     if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
936       if (connectionEventCallback_) {
937         connectionEventCallback_->onConnectionEnqueuedForAcceptorCallback(
938             socket,
939             addr);
940       }
941       // Success! return.
942       return;
943     }
944
945     // We couldn't add to queue.  Fall through to below
946
947     ++numDroppedConnections_;
948     if (acceptRateAdjustSpeed_ > 0) {
949       // aggressively decrease accept rate when in trouble
950       static const double kAcceptRateDecreaseSpeed = 0.1;
951       acceptRate_ *= 1 - kAcceptRateDecreaseSpeed;
952     }
953
954
955     if (callbackIndex_ == startingIndex) {
956       // The notification queue was full
957       // We can't really do anything at this point other than close the socket.
958       //
959       // This should only happen if a user's service is behaving extremely
960       // badly and none of the EventBase threads are looping fast enough to
961       // process the incoming connections.  If the service is overloaded, it
962       // should use pauseAccepting() to temporarily back off accepting new
963       // connections, before they reach the point where their threads can't
964       // even accept new messages.
965       LOG_EVERY_N(ERROR, 100) << "failed to dispatch newly accepted socket:"
966                               << " all accept callback queues are full";
967       closeNoInt(socket);
968       if (connectionEventCallback_) {
969         connectionEventCallback_->onConnectionDropped(socket, addr);
970       }
971       return;
972     }
973
974     info = nextCallback();
975   }
976 }
977
978 void AsyncServerSocket::dispatchError(const char *msgstr, int errnoValue) {
979   uint32_t startingIndex = callbackIndex_;
980   CallbackInfo *info = nextCallback();
981
982   // Create a message to send over the notification queue
983   QueueMessage msg;
984   msg.type = MessageType::MSG_ERROR;
985   msg.err = errnoValue;
986   msg.msg = std::move(msgstr);
987
988   while (true) {
989     // Short circuit if the callback is in the primary EventBase thread
990     if (info->eventBase == nullptr) {
991       std::runtime_error ex(
992         std::string(msgstr) +  folly::to<std::string>(errnoValue));
993       info->callback->acceptError(ex);
994       return;
995     }
996
997     if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
998       return;
999     }
1000     // Fall through and try another callback
1001
1002     if (callbackIndex_ == startingIndex) {
1003       // The notification queues for all of the callbacks were full.
1004       // We can't really do anything at this point.
1005       LOG_EVERY_N(ERROR, 100)
1006           << "failed to dispatch accept error: all accept"
1007           << " callback queues are full: error msg:  " << msg.msg << ": "
1008           << errnoValue;
1009       return;
1010     }
1011     info = nextCallback();
1012   }
1013 }
1014
1015 void AsyncServerSocket::enterBackoff() {
1016   // If this is the first time we have entered the backoff state,
1017   // allocate backoffTimeout_.
1018   if (backoffTimeout_ == nullptr) {
1019     try {
1020       backoffTimeout_ = new BackoffTimeout(this);
1021     } catch (const std::bad_alloc&) {
1022       // Man, we couldn't even allocate the timer to re-enable accepts.
1023       // We must be in pretty bad shape.  Don't pause accepting for now,
1024       // since we won't be able to re-enable ourselves later.
1025       LOG(ERROR) << "failed to allocate AsyncServerSocket backoff"
1026                  << " timer; unable to temporarly pause accepting";
1027       if (connectionEventCallback_) {
1028         connectionEventCallback_->onBackoffError();
1029       }
1030       return;
1031     }
1032   }
1033
1034   // For now, we simply pause accepting for 1 second.
1035   //
1036   // We could add some smarter backoff calculation here in the future.  (e.g.,
1037   // start sleeping for longer if we keep hitting the backoff frequently.)
1038   // Typically the user needs to figure out why the server is overloaded and
1039   // fix it in some other way, though.  The backoff timer is just a simple
1040   // mechanism to try and give the connection processing code a little bit of
1041   // breathing room to catch up, and to avoid just spinning and failing to
1042   // accept over and over again.
1043   const uint32_t timeoutMS = 1000;
1044   if (!backoffTimeout_->scheduleTimeout(timeoutMS)) {
1045     LOG(ERROR) << "failed to schedule AsyncServerSocket backoff timer;"
1046                << "unable to temporarly pause accepting";
1047     if (connectionEventCallback_) {
1048       connectionEventCallback_->onBackoffError();
1049     }
1050     return;
1051   }
1052
1053   // The backoff timer is scheduled to re-enable accepts.
1054   // Go ahead and disable accepts for now.  We leave accepting_ set to true,
1055   // since that tracks the desired state requested by the user.
1056   for (auto& handler : sockets_) {
1057     handler.unregisterHandler();
1058   }
1059   if (connectionEventCallback_) {
1060     connectionEventCallback_->onBackoffStarted();
1061   }
1062 }
1063
1064 void AsyncServerSocket::backoffTimeoutExpired() {
1065   // accepting_ should still be true.
1066   // If pauseAccepting() was called while in the backoff state it will cancel
1067   // the backoff timeout.
1068   assert(accepting_);
1069   // We can't be detached from the EventBase without being paused
1070   assert(eventBase_ != nullptr);
1071   eventBase_->dcheckIsInEventBaseThread();
1072
1073   // If all of the callbacks were removed, we shouldn't re-enable accepts
1074   if (callbacks_.empty()) {
1075     if (connectionEventCallback_) {
1076       connectionEventCallback_->onBackoffEnded();
1077     }
1078     return;
1079   }
1080
1081   // Register the handler.
1082   for (auto& handler : sockets_) {
1083     if (!handler.registerHandler(
1084           EventHandler::READ | EventHandler::PERSIST)) {
1085       // We're hosed.  We could just re-schedule backoffTimeout_ to
1086       // re-try again after a little bit.  However, we don't want to
1087       // loop retrying forever if we can't re-enable accepts.  Just
1088       // abort the entire program in this state; things are really bad
1089       // and restarting the entire server is probably the best remedy.
1090       LOG(ERROR)
1091         << "failed to re-enable AsyncServerSocket accepts after backoff; "
1092         << "crashing now";
1093       abort();
1094     }
1095   }
1096   if (connectionEventCallback_) {
1097     connectionEventCallback_->onBackoffEnded();
1098   }
1099 }
1100
1101 } // namespace folly