9214d231fc7fa18c0d3246a2e5f2950b810fc7d9
[folly.git] / folly / io / async / AsyncServerSocket.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/SocketAddress.h>
20 #include <folly/io/ShutdownSocketSet.h>
21 #include <folly/io/async/AsyncSocketBase.h>
22 #include <folly/io/async/AsyncTimeout.h>
23 #include <folly/io/async/DelayedDestruction.h>
24 #include <folly/io/async/EventBase.h>
25 #include <folly/io/async/EventHandler.h>
26 #include <folly/io/async/NotificationQueue.h>
27 #include <limits.h>
28 #include <stddef.h>
29 #include <sys/socket.h>
30 #include <exception>
31 #include <memory>
32 #include <vector>
33
34 // Due to the way kernel headers are included, this may or may not be defined.
35 // Number pulled from 3.10 kernel headers.
36 #ifndef SO_REUSEPORT
37 #define SO_REUSEPORT 15
38 #endif
39
40 namespace folly {
41
42 /**
43  * A listening socket that asynchronously informs a callback whenever a new
44  * connection has been accepted.
45  *
46  * Unlike most async interfaces that always invoke their callback in the same
47  * EventBase thread, AsyncServerSocket is unusual in that it can distribute
48  * the callbacks across multiple EventBase threads.
49  *
50  * This supports a common use case for network servers to distribute incoming
51  * connections across a number of EventBase threads.  (Servers typically run
52  * with one EventBase thread per CPU.)
53  *
54  * Despite being able to invoke callbacks in multiple EventBase threads,
55  * AsyncServerSocket still has one "primary" EventBase.  Operations that
56  * modify the AsyncServerSocket state may only be performed from the primary
57  * EventBase thread.
58  */
59 class AsyncServerSocket : public DelayedDestruction
60                         , public AsyncSocketBase {
61  public:
62   typedef std::unique_ptr<AsyncServerSocket, Destructor> UniquePtr;
63   // Disallow copy, move, and default construction.
64   AsyncServerSocket(AsyncServerSocket&&) = delete;
65
66   /**
67    * A callback interface to get notified of client socket events.
68    *
69    * The ConnectionEventCallback implementations need to be thread-safe as the
70    * callbacks may be called from different threads.
71    */
72   class ConnectionEventCallback {
73    public:
74     virtual ~ConnectionEventCallback() = default;
75
76     /**
77      * onConnectionAccepted() is called right after a client connection
78      * is accepted using the system accept()/accept4() APIs.
79      */
80     virtual void onConnectionAccepted(const int socket,
81                                       const SocketAddress& addr) noexcept = 0;
82
83     /**
84      * onConnectionAcceptError() is called when an error occurred accepting
85      * a connection.
86      */
87     virtual void onConnectionAcceptError(const int err) noexcept = 0;
88
89     /**
90      * onConnectionDropped() is called when a connection is dropped,
91      * probably because of some error encountered.
92      */
93     virtual void onConnectionDropped(const int socket,
94                                      const SocketAddress& addr) noexcept = 0;
95
96     /**
97      * onConnectionEnqueuedForAcceptorCallback() is called when the
98      * connection is successfully enqueued for an AcceptCallback to pick up.
99      */
100     virtual void onConnectionEnqueuedForAcceptorCallback(
101         const int socket,
102         const SocketAddress& addr) noexcept = 0;
103
104     /**
105      * onConnectionDequeuedByAcceptorCallback() is called when the
106      * connection is successfully dequeued by an AcceptCallback.
107      */
108     virtual void onConnectionDequeuedByAcceptorCallback(
109         const int socket,
110         const SocketAddress& addr) noexcept = 0;
111
112     /**
113      * onBackoffStarted is called when the socket has successfully started
114      * backing off accepting new client sockets.
115      */
116     virtual void onBackoffStarted() noexcept = 0;
117
118     /**
119      * onBackoffEnded is called when the backoff period has ended and the socket
120      * has successfully resumed accepting new connections if there is any
121      * AcceptCallback registered.
122      */
123     virtual void onBackoffEnded() noexcept = 0;
124
125     /**
126      * onBackoffError is called when there is an error entering backoff
127      */
128     virtual void onBackoffError() noexcept = 0;
129   };
130
131   class AcceptCallback {
132    public:
133     virtual ~AcceptCallback() = default;
134
135     /**
136      * connectionAccepted() is called whenever a new client connection is
137      * received.
138      *
139      * The AcceptCallback will remain installed after connectionAccepted()
140      * returns.
141      *
142      * @param fd          The newly accepted client socket.  The AcceptCallback
143      *                    assumes ownership of this socket, and is responsible
144      *                    for closing it when done.  The newly accepted file
145      *                    descriptor will have already been put into
146      *                    non-blocking mode.
147      * @param clientAddr  A reference to a SocketAddress struct containing the
148      *                    client's address.  This struct is only guaranteed to
149      *                    remain valid until connectionAccepted() returns.
150      */
151     virtual void connectionAccepted(int fd,
152                                     const SocketAddress& clientAddr)
153       noexcept = 0;
154
155     /**
156      * acceptError() is called if an error occurs while accepting.
157      *
158      * The AcceptCallback will remain installed even after an accept error,
159      * as the errors are typically somewhat transient, such as being out of
160      * file descriptors.  The server socket must be explicitly stopped if you
161      * wish to stop accepting after an error.
162      *
163      * @param ex  An exception representing the error.
164      */
165     virtual void acceptError(const std::exception& ex) noexcept = 0;
166
167     /**
168      * acceptStarted() will be called in the callback's EventBase thread
169      * after this callback has been added to the AsyncServerSocket.
170      *
171      * acceptStarted() will be called before any calls to connectionAccepted()
172      * or acceptError() are made on this callback.
173      *
174      * acceptStarted() makes it easier for callbacks to perform initialization
175      * inside the callback thread.  (The call to addAcceptCallback() must
176      * always be made from the AsyncServerSocket's primary EventBase thread.
177      * acceptStarted() provides a hook that will always be invoked in the
178      * callback's thread.)
179      *
180      * Note that the call to acceptStarted() is made once the callback is
181      * added, regardless of whether or not the AsyncServerSocket is actually
182      * accepting at the moment.  acceptStarted() will be called even if the
183      * AsyncServerSocket is paused when the callback is added (including if
184      * the initial call to startAccepting() on the AsyncServerSocket has not
185      * been made yet).
186      */
187     virtual void acceptStarted() noexcept {}
188
189     /**
190      * acceptStopped() will be called when this AcceptCallback is removed from
191      * the AsyncServerSocket, or when the AsyncServerSocket is destroyed,
192      * whichever occurs first.
193      *
194      * No more calls to connectionAccepted() or acceptError() will be made
195      * after acceptStopped() is invoked.
196      */
197     virtual void acceptStopped() noexcept {}
198   };
199
200   static const uint32_t kDefaultMaxAcceptAtOnce = 30;
201   static const uint32_t kDefaultCallbackAcceptAtOnce = 5;
202   static const uint32_t kDefaultMaxMessagesInQueue = 1024;
203   /**
204    * Create a new AsyncServerSocket with the specified EventBase.
205    *
206    * @param eventBase  The EventBase to use for driving the asynchronous I/O.
207    *                   If this parameter is nullptr, attachEventBase() must be
208    *                   called before this socket can begin accepting
209    *                   connections.
210    */
211   explicit AsyncServerSocket(EventBase* eventBase = nullptr);
212
213   /**
214    * Helper function to create a shared_ptr<AsyncServerSocket>.
215    *
216    * This passes in the correct destructor object, since AsyncServerSocket's
217    * destructor is protected and cannot be invoked directly.
218    */
219   static std::shared_ptr<AsyncServerSocket>
220   newSocket(EventBase* evb = nullptr) {
221     return std::shared_ptr<AsyncServerSocket>(new AsyncServerSocket(evb),
222                                                  Destructor());
223   }
224
225   void setShutdownSocketSet(ShutdownSocketSet* newSS);
226
227   /**
228    * Destroy the socket.
229    *
230    * AsyncServerSocket::destroy() must be called to destroy the socket.
231    * The normal destructor is private, and should not be invoked directly.
232    * This prevents callers from deleting a AsyncServerSocket while it is
233    * invoking a callback.
234    *
235    * destroy() must be invoked from the socket's primary EventBase thread.
236    *
237    * If there are AcceptCallbacks still installed when destroy() is called,
238    * acceptStopped() will be called on these callbacks to notify them that
239    * accepting has stopped.  Accept callbacks being driven by other EventBase
240    * threads may continue to receive new accept callbacks for a brief period of
241    * time after destroy() returns.  They will not receive any more callback
242    * invocations once acceptStopped() is invoked.
243    */
244   virtual void destroy();
245
246   /**
247    * Attach this AsyncServerSocket to its primary EventBase.
248    *
249    * This may only be called if the AsyncServerSocket is not already attached
250    * to a EventBase.  The AsyncServerSocket must be attached to a EventBase
251    * before it can begin accepting connections.
252    */
253   void attachEventBase(EventBase *eventBase);
254
255   /**
256    * Detach the AsyncServerSocket from its primary EventBase.
257    *
258    * detachEventBase() may only be called if the AsyncServerSocket is not
259    * currently accepting connections.
260    */
261   void detachEventBase();
262
263   /**
264    * Get the EventBase used by this socket.
265    */
266   EventBase* getEventBase() const {
267     return eventBase_;
268   }
269
270   /**
271    * Create a AsyncServerSocket from an existing socket file descriptor.
272    *
273    * useExistingSocket() will cause the AsyncServerSocket to take ownership of
274    * the specified file descriptor, and use it to listen for new connections.
275    * The AsyncServerSocket will close the file descriptor when it is
276    * destroyed.
277    *
278    * useExistingSocket() must be called before bind() or listen().
279    *
280    * The supplied file descriptor will automatically be put into non-blocking
281    * mode.  The caller may have already directly called bind() and possibly
282    * listen on the file descriptor.  If so the caller should skip calling the
283    * corresponding AsyncServerSocket::bind() and listen() methods.
284    *
285    * On error a TTransportException will be thrown and the caller will retain
286    * ownership of the file descriptor.
287    */
288   void useExistingSocket(int fd);
289   void useExistingSockets(const std::vector<int>& fds);
290
291   /**
292    * Return the underlying file descriptor
293    */
294   std::vector<int> getSockets() const {
295     std::vector<int> sockets;
296     for (auto& handler : sockets_) {
297       sockets.push_back(handler.socket_);
298     }
299     return sockets;
300   }
301
302   /**
303    * Backwards compatible getSocket, warns if > 1 socket
304    */
305   int getSocket() const {
306     if (sockets_.size() > 1) {
307       VLOG(2) << "Warning: getSocket can return multiple fds, " <<
308         "but getSockets was not called, so only returning the first";
309     }
310     if (sockets_.size() == 0) {
311       return -1;
312     } else {
313       return sockets_[0].socket_;
314     }
315   }
316
317   /**
318    * Bind to the specified address.
319    *
320    * This must be called from the primary EventBase thread.
321    *
322    * Throws TTransportException on error.
323    */
324   virtual void bind(const SocketAddress& address);
325
326   /**
327    * Bind to the specified port for the specified addresses.
328    *
329    * This must be called from the primary EventBase thread.
330    *
331    * Throws TTransportException on error.
332    */
333   virtual void bind(
334       const std::vector<IPAddress>& ipAddresses,
335       uint16_t port);
336
337   /**
338    * Bind to the specified port.
339    *
340    * This must be called from the primary EventBase thread.
341    *
342    * Throws TTransportException on error.
343    */
344   virtual void bind(uint16_t port);
345
346   /**
347    * Get the local address to which the socket is bound.
348    *
349    * Throws TTransportException on error.
350    */
351   void getAddress(SocketAddress* addressReturn) const;
352
353   /**
354    * Get all the local addresses to which the socket is bound.
355    *
356    * Throws TTransportException on error.
357    */
358   std::vector<SocketAddress> getAddresses() const;
359
360   /**
361    * Begin listening for connections.
362    *
363    * This calls ::listen() with the specified backlog.
364    *
365    * Once listen() is invoked the socket will actually be open so that remote
366    * clients may establish connections.  (Clients that attempt to connect
367    * before listen() is called will receive a connection refused error.)
368    *
369    * At least one callback must be set and startAccepting() must be called to
370    * actually begin notifying the accept callbacks of newly accepted
371    * connections.  The backlog parameter controls how many connections the
372    * kernel will accept and buffer internally while the accept callbacks are
373    * paused (or if accepting is enabled but the callbacks cannot keep up).
374    *
375    * bind() must be called before calling listen().
376    * listen() must be called from the primary EventBase thread.
377    *
378    * Throws TTransportException on error.
379    */
380   virtual void listen(int backlog);
381
382   /**
383    * Add an AcceptCallback.
384    *
385    * When a new socket is accepted, one of the AcceptCallbacks will be invoked
386    * with the new socket.  The AcceptCallbacks are invoked in a round-robin
387    * fashion.  This allows the accepted sockets to be distributed among a pool
388    * of threads, each running its own EventBase object.  This is a common model,
389    * since most asynchronous-style servers typically run one EventBase thread
390    * per CPU.
391    *
392    * The EventBase object associated with each AcceptCallback must be running
393    * its loop.  If the EventBase loop is not running, sockets will still be
394    * scheduled for the callback, but the callback cannot actually get invoked
395    * until the loop runs.
396    *
397    * This method must be invoked from the AsyncServerSocket's primary
398    * EventBase thread.
399    *
400    * Note that startAccepting() must be called on the AsyncServerSocket to
401    * cause it to actually start accepting sockets once callbacks have been
402    * installed.
403    *
404    * @param callback   The callback to invoke.
405    * @param eventBase  The EventBase to use to invoke the callback.  This
406    *     parameter may be nullptr, in which case the callback will be invoked in
407    *     the AsyncServerSocket's primary EventBase.
408    * @param maxAtOnce  The maximum number of connections to accept in this
409    *                   callback on a single iteration of the event base loop.
410    *                   This only takes effect when eventBase is non-nullptr.
411    *                   When using a nullptr eventBase for the callback, the
412    *                   setMaxAcceptAtOnce() method controls how many
413    *                   connections the main event base will accept at once.
414    */
415   virtual void addAcceptCallback(
416     AcceptCallback *callback,
417     EventBase *eventBase,
418     uint32_t maxAtOnce = kDefaultCallbackAcceptAtOnce);
419
420   /**
421    * Remove an AcceptCallback.
422    *
423    * This allows a single AcceptCallback to be removed from the round-robin
424    * pool.
425    *
426    * This method must be invoked from the AsyncServerSocket's primary
427    * EventBase thread.  Use EventBase::runInEventBaseThread() to schedule the
428    * operation in the correct EventBase if your code is not in the server
429    * socket's primary EventBase.
430    *
431    * Given that the accept callback is being driven by a different EventBase,
432    * the AcceptCallback may continue to be invoked for a short period of time
433    * after removeAcceptCallback() returns in this thread.  Once the other
434    * EventBase thread receives the notification to stop, it will call
435    * acceptStopped() on the callback to inform it that it is fully stopped and
436    * will not receive any new sockets.
437    *
438    * If the last accept callback is removed while the socket is accepting,
439    * the socket will implicitly pause accepting.  If a callback is later added,
440    * it will resume accepting immediately, without requiring startAccepting()
441    * to be invoked.
442    *
443    * @param callback   The callback to uninstall.
444    * @param eventBase  The EventBase associated with this callback.  This must
445    *     be the same EventBase that was used when the callback was installed
446    *     with addAcceptCallback().
447    */
448   void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase);
449
450   /**
451    * Begin accepting connctions on this socket.
452    *
453    * bind() and listen() must be called before calling startAccepting().
454    *
455    * When a AsyncServerSocket is initially created, it will not begin
456    * accepting connections until at least one callback has been added and
457    * startAccepting() has been called.  startAccepting() can also be used to
458    * resume accepting connections after a call to pauseAccepting().
459    *
460    * If startAccepting() is called when there are no accept callbacks
461    * installed, the socket will not actually begin accepting until an accept
462    * callback is added.
463    *
464    * This method may only be called from the primary EventBase thread.
465    */
466   virtual void startAccepting();
467
468   /**
469    * Pause accepting connections.
470    *
471    * startAccepting() may be called to resume accepting.
472    *
473    * This method may only be called from the primary EventBase thread.
474    * If there are AcceptCallbacks being driven by other EventBase threads they
475    * may continue to receive callbacks for a short period of time after
476    * pauseAccepting() returns.
477    *
478    * Unlike removeAcceptCallback() or destroy(), acceptStopped() will not be
479    * called on the AcceptCallback objects simply due to a temporary pause.  If
480    * the server socket is later destroyed while paused, acceptStopped() will be
481    * called all of the installed AcceptCallbacks.
482    */
483   void pauseAccepting();
484
485   /**
486    * Shutdown the listen socket and notify all callbacks that accept has
487    * stopped, but don't close the socket.  This invokes shutdown(2) with the
488    * supplied argument.  Passing -1 will close the socket now.  Otherwise, the
489    * close will be delayed until this object is destroyed.
490    *
491    * Only use this if you have reason to pass special flags to shutdown.
492    * Otherwise just destroy the socket.
493    *
494    * This method has no effect when a ShutdownSocketSet option is used.
495    *
496    * Returns the result of shutdown on sockets_[n-1]
497    */
498   int stopAccepting(int shutdownFlags = -1);
499
500   /**
501    * Get the maximum number of connections that will be accepted each time
502    * around the event loop.
503    */
504   uint32_t getMaxAcceptAtOnce() const {
505     return maxAcceptAtOnce_;
506   }
507
508   /**
509    * Set the maximum number of connections that will be accepted each time
510    * around the event loop.
511    *
512    * This provides a very coarse-grained way of controlling how fast the
513    * AsyncServerSocket will accept connections.  If you find that when your
514    * server is overloaded AsyncServerSocket accepts connections more quickly
515    * than your code can process them, you can try lowering this number so that
516    * fewer connections will be accepted each event loop iteration.
517    *
518    * For more explicit control over the accept rate, you can also use
519    * pauseAccepting() to temporarily pause accepting when your server is
520    * overloaded, and then use startAccepting() later to resume accepting.
521    */
522   void setMaxAcceptAtOnce(uint32_t numConns) {
523     maxAcceptAtOnce_ = numConns;
524   }
525
526   /**
527    * Get the maximum number of unprocessed messages which a NotificationQueue
528    * can hold.
529    */
530   uint32_t getMaxNumMessagesInQueue() const {
531     return maxNumMsgsInQueue_;
532   }
533
534   /**
535    * Set the maximum number of unprocessed messages in NotificationQueue.
536    * No new message will be sent to that NotificationQueue if there are more
537    * than such number of unprocessed messages in that queue.
538    *
539    * Only works if called before addAcceptCallback.
540    */
541   void setMaxNumMessagesInQueue(uint32_t num) {
542     maxNumMsgsInQueue_ = num;
543   }
544
545   /**
546    * Get the speed of adjusting connection accept rate.
547    */
548   double getAcceptRateAdjustSpeed() const {
549     return acceptRateAdjustSpeed_;
550   }
551
552   /**
553    * Set the speed of adjusting connection accept rate.
554    */
555   void setAcceptRateAdjustSpeed(double speed) {
556     acceptRateAdjustSpeed_ = speed;
557   }
558
559   /**
560    * Get the number of connections dropped by the AsyncServerSocket
561    */
562   uint64_t getNumDroppedConnections() const {
563     return numDroppedConnections_;
564   }
565
566   /**
567    * Get the current number of unprocessed messages in NotificationQueue.
568    *
569    * This method must be invoked from the AsyncServerSocket's primary
570    * EventBase thread.  Use EventBase::runInEventBaseThread() to schedule the
571    * operation in the correct EventBase if your code is not in the server
572    * socket's primary EventBase.
573    */
574   int64_t getNumPendingMessagesInQueue() const {
575     assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
576     int64_t numMsgs = 0;
577     for (const auto& callback : callbacks_) {
578       numMsgs += callback.consumer->getQueue()->size();
579     }
580     return numMsgs;
581   }
582
583   /**
584    * Set whether or not SO_KEEPALIVE should be enabled on the server socket
585    * (and thus on all subsequently-accepted connections). By default, keepalive
586    * is enabled.
587    *
588    * Note that TCP keepalive usually only kicks in after the connection has
589    * been idle for several hours. Applications should almost always have their
590    * own, shorter idle timeout.
591    */
592   void setKeepAliveEnabled(bool enabled) {
593     keepAliveEnabled_ = enabled;
594
595     for (auto& handler : sockets_) {
596       if (handler.socket_ < 0) {
597         continue;
598       }
599
600       int val = (enabled) ? 1 : 0;
601       if (setsockopt(handler.socket_, SOL_SOCKET,
602                      SO_KEEPALIVE, &val, sizeof(val)) != 0) {
603         LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: %s" <<
604                 strerror(errno);
605       }
606     }
607   }
608
609   /**
610    * Get whether or not SO_KEEPALIVE is enabled on the server socket.
611    */
612   bool getKeepAliveEnabled() const {
613     return keepAliveEnabled_;
614   }
615
616   /**
617    * Set whether or not SO_REUSEPORT should be enabled on the server socket,
618    * allowing multiple binds to the same port
619    */
620   void setReusePortEnabled(bool enabled) {
621     reusePortEnabled_ = enabled;
622
623     for (auto& handler : sockets_) {
624       if (handler.socket_ < 0) {
625         continue;
626       }
627
628       int val = (enabled) ? 1 : 0;
629       if (setsockopt(handler.socket_, SOL_SOCKET,
630                      SO_REUSEPORT, &val, sizeof(val)) != 0) {
631         LOG(ERROR) <<
632           "failed to set SO_REUSEPORT on async server socket " << errno;
633         folly::throwSystemError(errno,
634                                 "failed to bind to async server socket");
635       }
636     }
637   }
638
639   /**
640    * Get whether or not SO_REUSEPORT is enabled on the server socket.
641    */
642   bool getReusePortEnabled_() const {
643     return reusePortEnabled_;
644   }
645
646   /**
647    * Set whether or not the socket should close during exec() (FD_CLOEXEC). By
648    * default, this is enabled
649    */
650   void setCloseOnExec(bool closeOnExec) {
651     closeOnExec_ = closeOnExec;
652   }
653
654   /**
655    * Get whether or not FD_CLOEXEC is enabled on the server socket.
656    */
657   bool getCloseOnExec() const {
658     return closeOnExec_;
659   }
660
661   /**
662    * Tries to enable TFO if the machine supports it.
663    */
664   void setTFOEnabled(bool enabled, uint32_t maxTFOQueueSize) {
665     tfo_ = enabled;
666     tfoMaxQueueSize_ = maxTFOQueueSize;
667   }
668
669   /**
670    * Get whether or not the socket is accepting new connections
671    */
672   bool getAccepting() const {
673     return accepting_;
674   }
675
676   /**
677    * Set the ConnectionEventCallback
678    */
679   void setConnectionEventCallback(
680       ConnectionEventCallback* const connectionEventCallback) {
681     connectionEventCallback_ = connectionEventCallback;
682   }
683
684   /**
685    * Get the ConnectionEventCallback
686    */
687   ConnectionEventCallback* getConnectionEventCallback() const {
688     return connectionEventCallback_;
689   }
690
691  protected:
692   /**
693    * Protected destructor.
694    *
695    * Invoke destroy() instead to destroy the AsyncServerSocket.
696    */
697   virtual ~AsyncServerSocket();
698
699  private:
700   enum class MessageType {
701     MSG_NEW_CONN = 0,
702     MSG_ERROR = 1
703   };
704
705   struct QueueMessage {
706     MessageType type;
707     int fd;
708     int err;
709     SocketAddress address;
710     std::string msg;
711   };
712
713   /**
714    * A class to receive notifications to invoke AcceptCallback objects
715    * in other EventBase threads.
716    *
717    * A RemoteAcceptor object is created for each AcceptCallback that
718    * is installed in a separate EventBase thread.  The RemoteAcceptor
719    * receives notification of new sockets via a NotificationQueue,
720    * and then invokes the AcceptCallback.
721    */
722   class RemoteAcceptor
723       : private NotificationQueue<QueueMessage>::Consumer {
724   public:
725     explicit RemoteAcceptor(AcceptCallback *callback,
726                             ConnectionEventCallback *connectionEventCallback)
727       : callback_(callback),
728         connectionEventCallback_(connectionEventCallback) {}
729
730     ~RemoteAcceptor() = default;
731
732     void start(EventBase *eventBase, uint32_t maxAtOnce, uint32_t maxInQueue);
733     void stop(EventBase* eventBase, AcceptCallback* callback);
734
735     virtual void messageAvailable(QueueMessage&& message);
736
737     NotificationQueue<QueueMessage>* getQueue() {
738       return &queue_;
739     }
740
741   private:
742     AcceptCallback *callback_;
743     ConnectionEventCallback* connectionEventCallback_;
744
745     NotificationQueue<QueueMessage> queue_;
746   };
747
748   /**
749    * A struct to keep track of the callbacks associated with this server
750    * socket.
751    */
752   struct CallbackInfo {
753     CallbackInfo(AcceptCallback *cb, EventBase *evb)
754       : callback(cb),
755         eventBase(evb),
756         consumer(nullptr) {}
757
758     AcceptCallback *callback;
759     EventBase *eventBase;
760
761     RemoteAcceptor* consumer;
762   };
763
764   class BackoffTimeout;
765
766   virtual void handlerReady(
767     uint16_t events, int socket, sa_family_t family) noexcept;
768
769   int createSocket(int family);
770   void setupSocket(int fd, int family);
771   void bindSocket(int fd, const SocketAddress& address, bool isExistingSocket);
772   void dispatchSocket(int socket, SocketAddress&& address);
773   void dispatchError(const char *msg, int errnoValue);
774   void enterBackoff();
775   void backoffTimeoutExpired();
776
777   CallbackInfo* nextCallback() {
778     CallbackInfo* info = &callbacks_[callbackIndex_];
779
780     ++callbackIndex_;
781     if (callbackIndex_ >= callbacks_.size()) {
782       callbackIndex_ = 0;
783     }
784
785     return info;
786   }
787
788   struct ServerEventHandler : public EventHandler {
789     ServerEventHandler(EventBase* eventBase, int socket,
790                        AsyncServerSocket* parent,
791                       sa_family_t addressFamily)
792         : EventHandler(eventBase, socket)
793         , eventBase_(eventBase)
794         , socket_(socket)
795         , parent_(parent)
796         , addressFamily_(addressFamily) {}
797
798     ServerEventHandler(const ServerEventHandler& other)
799     : EventHandler(other.eventBase_, other.socket_)
800     , eventBase_(other.eventBase_)
801     , socket_(other.socket_)
802     , parent_(other.parent_)
803     , addressFamily_(other.addressFamily_) {}
804
805     ServerEventHandler& operator=(
806         const ServerEventHandler& other) {
807       if (this != &other) {
808         eventBase_ = other.eventBase_;
809         socket_ = other.socket_;
810         parent_ = other.parent_;
811         addressFamily_ = other.addressFamily_;
812
813         detachEventBase();
814         attachEventBase(other.eventBase_);
815         changeHandlerFD(other.socket_);
816       }
817       return *this;
818     }
819
820     // Inherited from EventHandler
821     virtual void handlerReady(uint16_t events) noexcept {
822       parent_->handlerReady(events, socket_, addressFamily_);
823     }
824
825     EventBase* eventBase_;
826     int socket_;
827     AsyncServerSocket* parent_;
828     sa_family_t addressFamily_;
829   };
830
831   EventBase *eventBase_;
832   std::vector<ServerEventHandler> sockets_;
833   std::vector<int> pendingCloseSockets_;
834   bool accepting_;
835   uint32_t maxAcceptAtOnce_;
836   uint32_t maxNumMsgsInQueue_;
837   double acceptRateAdjustSpeed_;  //0 to disable auto adjust
838   double acceptRate_;
839   std::chrono::time_point<std::chrono::steady_clock> lastAccepTimestamp_;
840   uint64_t numDroppedConnections_;
841   uint32_t callbackIndex_;
842   BackoffTimeout *backoffTimeout_;
843   std::vector<CallbackInfo> callbacks_;
844   bool keepAliveEnabled_;
845   bool reusePortEnabled_{false};
846   bool closeOnExec_;
847   bool tfo_{false};
848   uint32_t tfoMaxQueueSize_{0};
849   ShutdownSocketSet* shutdownSocketSet_;
850   ConnectionEventCallback* connectionEventCallback_{nullptr};
851 };
852
853 } // folly