Enable EOR flag configuration for folly::AsyncSocket.
[folly.git] / folly / io / async / AsyncServerSocket.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/SocketAddress.h>
20 #include <folly/io/ShutdownSocketSet.h>
21 #include <folly/io/async/AsyncSocketBase.h>
22 #include <folly/io/async/AsyncTimeout.h>
23 #include <folly/io/async/DelayedDestruction.h>
24 #include <folly/io/async/EventBase.h>
25 #include <folly/io/async/EventHandler.h>
26 #include <folly/io/async/NotificationQueue.h>
27 #include <folly/portability/Sockets.h>
28
29 #include <limits.h>
30 #include <stddef.h>
31 #include <exception>
32 #include <memory>
33 #include <vector>
34
35 // Due to the way kernel headers are included, this may or may not be defined.
36 // Number pulled from 3.10 kernel headers.
37 #ifndef SO_REUSEPORT
38 #define SO_REUSEPORT 15
39 #endif
40
41 namespace folly {
42
43 /**
44  * A listening socket that asynchronously informs a callback whenever a new
45  * connection has been accepted.
46  *
47  * Unlike most async interfaces that always invoke their callback in the same
48  * EventBase thread, AsyncServerSocket is unusual in that it can distribute
49  * the callbacks across multiple EventBase threads.
50  *
51  * This supports a common use case for network servers to distribute incoming
52  * connections across a number of EventBase threads.  (Servers typically run
53  * with one EventBase thread per CPU.)
54  *
55  * Despite being able to invoke callbacks in multiple EventBase threads,
56  * AsyncServerSocket still has one "primary" EventBase.  Operations that
57  * modify the AsyncServerSocket state may only be performed from the primary
58  * EventBase thread.
59  */
60 class AsyncServerSocket : public DelayedDestruction
61                         , public AsyncSocketBase {
62  public:
63   typedef std::unique_ptr<AsyncServerSocket, Destructor> UniquePtr;
64   // Disallow copy, move, and default construction.
65   AsyncServerSocket(AsyncServerSocket&&) = delete;
66
67   /**
68    * A callback interface to get notified of client socket events.
69    *
70    * The ConnectionEventCallback implementations need to be thread-safe as the
71    * callbacks may be called from different threads.
72    */
73   class ConnectionEventCallback {
74    public:
75     virtual ~ConnectionEventCallback() = default;
76
77     /**
78      * onConnectionAccepted() is called right after a client connection
79      * is accepted using the system accept()/accept4() APIs.
80      */
81     virtual void onConnectionAccepted(const int socket,
82                                       const SocketAddress& addr) noexcept = 0;
83
84     /**
85      * onConnectionAcceptError() is called when an error occurred accepting
86      * a connection.
87      */
88     virtual void onConnectionAcceptError(const int err) noexcept = 0;
89
90     /**
91      * onConnectionDropped() is called when a connection is dropped,
92      * probably because of some error encountered.
93      */
94     virtual void onConnectionDropped(const int socket,
95                                      const SocketAddress& addr) noexcept = 0;
96
97     /**
98      * onConnectionEnqueuedForAcceptorCallback() is called when the
99      * connection is successfully enqueued for an AcceptCallback to pick up.
100      */
101     virtual void onConnectionEnqueuedForAcceptorCallback(
102         const int socket,
103         const SocketAddress& addr) noexcept = 0;
104
105     /**
106      * onConnectionDequeuedByAcceptorCallback() is called when the
107      * connection is successfully dequeued by an AcceptCallback.
108      */
109     virtual void onConnectionDequeuedByAcceptorCallback(
110         const int socket,
111         const SocketAddress& addr) noexcept = 0;
112
113     /**
114      * onBackoffStarted is called when the socket has successfully started
115      * backing off accepting new client sockets.
116      */
117     virtual void onBackoffStarted() noexcept = 0;
118
119     /**
120      * onBackoffEnded is called when the backoff period has ended and the socket
121      * has successfully resumed accepting new connections if there is any
122      * AcceptCallback registered.
123      */
124     virtual void onBackoffEnded() noexcept = 0;
125
126     /**
127      * onBackoffError is called when there is an error entering backoff
128      */
129     virtual void onBackoffError() noexcept = 0;
130   };
131
132   class AcceptCallback {
133    public:
134     virtual ~AcceptCallback() = default;
135
136     /**
137      * connectionAccepted() is called whenever a new client connection is
138      * received.
139      *
140      * The AcceptCallback will remain installed after connectionAccepted()
141      * returns.
142      *
143      * @param fd          The newly accepted client socket.  The AcceptCallback
144      *                    assumes ownership of this socket, and is responsible
145      *                    for closing it when done.  The newly accepted file
146      *                    descriptor will have already been put into
147      *                    non-blocking mode.
148      * @param clientAddr  A reference to a SocketAddress struct containing the
149      *                    client's address.  This struct is only guaranteed to
150      *                    remain valid until connectionAccepted() returns.
151      */
152     virtual void connectionAccepted(int fd,
153                                     const SocketAddress& clientAddr)
154       noexcept = 0;
155
156     /**
157      * acceptError() is called if an error occurs while accepting.
158      *
159      * The AcceptCallback will remain installed even after an accept error,
160      * as the errors are typically somewhat transient, such as being out of
161      * file descriptors.  The server socket must be explicitly stopped if you
162      * wish to stop accepting after an error.
163      *
164      * @param ex  An exception representing the error.
165      */
166     virtual void acceptError(const std::exception& ex) noexcept = 0;
167
168     /**
169      * acceptStarted() will be called in the callback's EventBase thread
170      * after this callback has been added to the AsyncServerSocket.
171      *
172      * acceptStarted() will be called before any calls to connectionAccepted()
173      * or acceptError() are made on this callback.
174      *
175      * acceptStarted() makes it easier for callbacks to perform initialization
176      * inside the callback thread.  (The call to addAcceptCallback() must
177      * always be made from the AsyncServerSocket's primary EventBase thread.
178      * acceptStarted() provides a hook that will always be invoked in the
179      * callback's thread.)
180      *
181      * Note that the call to acceptStarted() is made once the callback is
182      * added, regardless of whether or not the AsyncServerSocket is actually
183      * accepting at the moment.  acceptStarted() will be called even if the
184      * AsyncServerSocket is paused when the callback is added (including if
185      * the initial call to startAccepting() on the AsyncServerSocket has not
186      * been made yet).
187      */
188     virtual void acceptStarted() noexcept {}
189
190     /**
191      * acceptStopped() will be called when this AcceptCallback is removed from
192      * the AsyncServerSocket, or when the AsyncServerSocket is destroyed,
193      * whichever occurs first.
194      *
195      * No more calls to connectionAccepted() or acceptError() will be made
196      * after acceptStopped() is invoked.
197      */
198     virtual void acceptStopped() noexcept {}
199   };
200
201   static const uint32_t kDefaultMaxAcceptAtOnce = 30;
202   static const uint32_t kDefaultCallbackAcceptAtOnce = 5;
203   static const uint32_t kDefaultMaxMessagesInQueue = 1024;
204   /**
205    * Create a new AsyncServerSocket with the specified EventBase.
206    *
207    * @param eventBase  The EventBase to use for driving the asynchronous I/O.
208    *                   If this parameter is nullptr, attachEventBase() must be
209    *                   called before this socket can begin accepting
210    *                   connections.
211    */
212   explicit AsyncServerSocket(EventBase* eventBase = nullptr);
213
214   /**
215    * Helper function to create a shared_ptr<AsyncServerSocket>.
216    *
217    * This passes in the correct destructor object, since AsyncServerSocket's
218    * destructor is protected and cannot be invoked directly.
219    */
220   static std::shared_ptr<AsyncServerSocket>
221   newSocket(EventBase* evb = nullptr) {
222     return std::shared_ptr<AsyncServerSocket>(new AsyncServerSocket(evb),
223                                                  Destructor());
224   }
225
226   void setShutdownSocketSet(ShutdownSocketSet* newSS);
227
228   /**
229    * Destroy the socket.
230    *
231    * AsyncServerSocket::destroy() must be called to destroy the socket.
232    * The normal destructor is private, and should not be invoked directly.
233    * This prevents callers from deleting a AsyncServerSocket while it is
234    * invoking a callback.
235    *
236    * destroy() must be invoked from the socket's primary EventBase thread.
237    *
238    * If there are AcceptCallbacks still installed when destroy() is called,
239    * acceptStopped() will be called on these callbacks to notify them that
240    * accepting has stopped.  Accept callbacks being driven by other EventBase
241    * threads may continue to receive new accept callbacks for a brief period of
242    * time after destroy() returns.  They will not receive any more callback
243    * invocations once acceptStopped() is invoked.
244    */
245   virtual void destroy();
246
247   /**
248    * Attach this AsyncServerSocket to its primary EventBase.
249    *
250    * This may only be called if the AsyncServerSocket is not already attached
251    * to a EventBase.  The AsyncServerSocket must be attached to a EventBase
252    * before it can begin accepting connections.
253    */
254   void attachEventBase(EventBase *eventBase);
255
256   /**
257    * Detach the AsyncServerSocket from its primary EventBase.
258    *
259    * detachEventBase() may only be called if the AsyncServerSocket is not
260    * currently accepting connections.
261    */
262   void detachEventBase();
263
264   /**
265    * Get the EventBase used by this socket.
266    */
267   EventBase* getEventBase() const {
268     return eventBase_;
269   }
270
271   /**
272    * Create a AsyncServerSocket from an existing socket file descriptor.
273    *
274    * useExistingSocket() will cause the AsyncServerSocket to take ownership of
275    * the specified file descriptor, and use it to listen for new connections.
276    * The AsyncServerSocket will close the file descriptor when it is
277    * destroyed.
278    *
279    * useExistingSocket() must be called before bind() or listen().
280    *
281    * The supplied file descriptor will automatically be put into non-blocking
282    * mode.  The caller may have already directly called bind() and possibly
283    * listen on the file descriptor.  If so the caller should skip calling the
284    * corresponding AsyncServerSocket::bind() and listen() methods.
285    *
286    * On error a TTransportException will be thrown and the caller will retain
287    * ownership of the file descriptor.
288    */
289   void useExistingSocket(int fd);
290   void useExistingSockets(const std::vector<int>& fds);
291
292   /**
293    * Return the underlying file descriptor
294    */
295   std::vector<int> getSockets() const {
296     std::vector<int> sockets;
297     for (auto& handler : sockets_) {
298       sockets.push_back(handler.socket_);
299     }
300     return sockets;
301   }
302
303   /**
304    * Backwards compatible getSocket, warns if > 1 socket
305    */
306   int getSocket() const {
307     if (sockets_.size() > 1) {
308       VLOG(2) << "Warning: getSocket can return multiple fds, " <<
309         "but getSockets was not called, so only returning the first";
310     }
311     if (sockets_.size() == 0) {
312       return -1;
313     } else {
314       return sockets_[0].socket_;
315     }
316   }
317
318   /**
319    * Bind to the specified address.
320    *
321    * This must be called from the primary EventBase thread.
322    *
323    * Throws TTransportException on error.
324    */
325   virtual void bind(const SocketAddress& address);
326
327   /**
328    * Bind to the specified port for the specified addresses.
329    *
330    * This must be called from the primary EventBase thread.
331    *
332    * Throws TTransportException on error.
333    */
334   virtual void bind(
335       const std::vector<IPAddress>& ipAddresses,
336       uint16_t port);
337
338   /**
339    * Bind to the specified port.
340    *
341    * This must be called from the primary EventBase thread.
342    *
343    * Throws TTransportException on error.
344    */
345   virtual void bind(uint16_t port);
346
347   /**
348    * Get the local address to which the socket is bound.
349    *
350    * Throws TTransportException on error.
351    */
352   void getAddress(SocketAddress* addressReturn) const;
353
354   /**
355    * Get the local address to which the socket is bound.
356    *
357    * Throws TTransportException on error.
358    */
359   SocketAddress getAddress() const {
360     SocketAddress ret;
361     getAddress(&ret);
362     return ret;
363   }
364
365   /**
366    * Get all the local addresses to which the socket is bound.
367    *
368    * Throws TTransportException on error.
369    */
370   std::vector<SocketAddress> getAddresses() const;
371
372   /**
373    * Begin listening for connections.
374    *
375    * This calls ::listen() with the specified backlog.
376    *
377    * Once listen() is invoked the socket will actually be open so that remote
378    * clients may establish connections.  (Clients that attempt to connect
379    * before listen() is called will receive a connection refused error.)
380    *
381    * At least one callback must be set and startAccepting() must be called to
382    * actually begin notifying the accept callbacks of newly accepted
383    * connections.  The backlog parameter controls how many connections the
384    * kernel will accept and buffer internally while the accept callbacks are
385    * paused (or if accepting is enabled but the callbacks cannot keep up).
386    *
387    * bind() must be called before calling listen().
388    * listen() must be called from the primary EventBase thread.
389    *
390    * Throws TTransportException on error.
391    */
392   virtual void listen(int backlog);
393
394   /**
395    * Add an AcceptCallback.
396    *
397    * When a new socket is accepted, one of the AcceptCallbacks will be invoked
398    * with the new socket.  The AcceptCallbacks are invoked in a round-robin
399    * fashion.  This allows the accepted sockets to be distributed among a pool
400    * of threads, each running its own EventBase object.  This is a common model,
401    * since most asynchronous-style servers typically run one EventBase thread
402    * per CPU.
403    *
404    * The EventBase object associated with each AcceptCallback must be running
405    * its loop.  If the EventBase loop is not running, sockets will still be
406    * scheduled for the callback, but the callback cannot actually get invoked
407    * until the loop runs.
408    *
409    * This method must be invoked from the AsyncServerSocket's primary
410    * EventBase thread.
411    *
412    * Note that startAccepting() must be called on the AsyncServerSocket to
413    * cause it to actually start accepting sockets once callbacks have been
414    * installed.
415    *
416    * @param callback   The callback to invoke.
417    * @param eventBase  The EventBase to use to invoke the callback.  This
418    *     parameter may be nullptr, in which case the callback will be invoked in
419    *     the AsyncServerSocket's primary EventBase.
420    * @param maxAtOnce  The maximum number of connections to accept in this
421    *                   callback on a single iteration of the event base loop.
422    *                   This only takes effect when eventBase is non-nullptr.
423    *                   When using a nullptr eventBase for the callback, the
424    *                   setMaxAcceptAtOnce() method controls how many
425    *                   connections the main event base will accept at once.
426    */
427   virtual void addAcceptCallback(
428     AcceptCallback *callback,
429     EventBase *eventBase,
430     uint32_t maxAtOnce = kDefaultCallbackAcceptAtOnce);
431
432   /**
433    * Remove an AcceptCallback.
434    *
435    * This allows a single AcceptCallback to be removed from the round-robin
436    * pool.
437    *
438    * This method must be invoked from the AsyncServerSocket's primary
439    * EventBase thread.  Use EventBase::runInEventBaseThread() to schedule the
440    * operation in the correct EventBase if your code is not in the server
441    * socket's primary EventBase.
442    *
443    * Given that the accept callback is being driven by a different EventBase,
444    * the AcceptCallback may continue to be invoked for a short period of time
445    * after removeAcceptCallback() returns in this thread.  Once the other
446    * EventBase thread receives the notification to stop, it will call
447    * acceptStopped() on the callback to inform it that it is fully stopped and
448    * will not receive any new sockets.
449    *
450    * If the last accept callback is removed while the socket is accepting,
451    * the socket will implicitly pause accepting.  If a callback is later added,
452    * it will resume accepting immediately, without requiring startAccepting()
453    * to be invoked.
454    *
455    * @param callback   The callback to uninstall.
456    * @param eventBase  The EventBase associated with this callback.  This must
457    *     be the same EventBase that was used when the callback was installed
458    *     with addAcceptCallback().
459    */
460   void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase);
461
462   /**
463    * Begin accepting connctions on this socket.
464    *
465    * bind() and listen() must be called before calling startAccepting().
466    *
467    * When a AsyncServerSocket is initially created, it will not begin
468    * accepting connections until at least one callback has been added and
469    * startAccepting() has been called.  startAccepting() can also be used to
470    * resume accepting connections after a call to pauseAccepting().
471    *
472    * If startAccepting() is called when there are no accept callbacks
473    * installed, the socket will not actually begin accepting until an accept
474    * callback is added.
475    *
476    * This method may only be called from the primary EventBase thread.
477    */
478   virtual void startAccepting();
479
480   /**
481    * Pause accepting connections.
482    *
483    * startAccepting() may be called to resume accepting.
484    *
485    * This method may only be called from the primary EventBase thread.
486    * If there are AcceptCallbacks being driven by other EventBase threads they
487    * may continue to receive callbacks for a short period of time after
488    * pauseAccepting() returns.
489    *
490    * Unlike removeAcceptCallback() or destroy(), acceptStopped() will not be
491    * called on the AcceptCallback objects simply due to a temporary pause.  If
492    * the server socket is later destroyed while paused, acceptStopped() will be
493    * called all of the installed AcceptCallbacks.
494    */
495   void pauseAccepting();
496
497   /**
498    * Shutdown the listen socket and notify all callbacks that accept has
499    * stopped, but don't close the socket.  This invokes shutdown(2) with the
500    * supplied argument.  Passing -1 will close the socket now.  Otherwise, the
501    * close will be delayed until this object is destroyed.
502    *
503    * Only use this if you have reason to pass special flags to shutdown.
504    * Otherwise just destroy the socket.
505    *
506    * This method has no effect when a ShutdownSocketSet option is used.
507    *
508    * Returns the result of shutdown on sockets_[n-1]
509    */
510   int stopAccepting(int shutdownFlags = -1);
511
512   /**
513    * Get the maximum number of connections that will be accepted each time
514    * around the event loop.
515    */
516   uint32_t getMaxAcceptAtOnce() const {
517     return maxAcceptAtOnce_;
518   }
519
520   /**
521    * Set the maximum number of connections that will be accepted each time
522    * around the event loop.
523    *
524    * This provides a very coarse-grained way of controlling how fast the
525    * AsyncServerSocket will accept connections.  If you find that when your
526    * server is overloaded AsyncServerSocket accepts connections more quickly
527    * than your code can process them, you can try lowering this number so that
528    * fewer connections will be accepted each event loop iteration.
529    *
530    * For more explicit control over the accept rate, you can also use
531    * pauseAccepting() to temporarily pause accepting when your server is
532    * overloaded, and then use startAccepting() later to resume accepting.
533    */
534   void setMaxAcceptAtOnce(uint32_t numConns) {
535     maxAcceptAtOnce_ = numConns;
536   }
537
538   /**
539    * Get the maximum number of unprocessed messages which a NotificationQueue
540    * can hold.
541    */
542   uint32_t getMaxNumMessagesInQueue() const {
543     return maxNumMsgsInQueue_;
544   }
545
546   /**
547    * Set the maximum number of unprocessed messages in NotificationQueue.
548    * No new message will be sent to that NotificationQueue if there are more
549    * than such number of unprocessed messages in that queue.
550    *
551    * Only works if called before addAcceptCallback.
552    */
553   void setMaxNumMessagesInQueue(uint32_t num) {
554     maxNumMsgsInQueue_ = num;
555   }
556
557   /**
558    * Get the speed of adjusting connection accept rate.
559    */
560   double getAcceptRateAdjustSpeed() const {
561     return acceptRateAdjustSpeed_;
562   }
563
564   /**
565    * Set the speed of adjusting connection accept rate.
566    */
567   void setAcceptRateAdjustSpeed(double speed) {
568     acceptRateAdjustSpeed_ = speed;
569   }
570
571   /**
572    * Get the number of connections dropped by the AsyncServerSocket
573    */
574   uint64_t getNumDroppedConnections() const {
575     return numDroppedConnections_;
576   }
577
578   /**
579    * Get the current number of unprocessed messages in NotificationQueue.
580    *
581    * This method must be invoked from the AsyncServerSocket's primary
582    * EventBase thread.  Use EventBase::runInEventBaseThread() to schedule the
583    * operation in the correct EventBase if your code is not in the server
584    * socket's primary EventBase.
585    */
586   int64_t getNumPendingMessagesInQueue() const {
587     assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
588     int64_t numMsgs = 0;
589     for (const auto& callback : callbacks_) {
590       numMsgs += callback.consumer->getQueue()->size();
591     }
592     return numMsgs;
593   }
594
595   /**
596    * Set whether or not SO_KEEPALIVE should be enabled on the server socket
597    * (and thus on all subsequently-accepted connections). By default, keepalive
598    * is enabled.
599    *
600    * Note that TCP keepalive usually only kicks in after the connection has
601    * been idle for several hours. Applications should almost always have their
602    * own, shorter idle timeout.
603    */
604   void setKeepAliveEnabled(bool enabled) {
605     keepAliveEnabled_ = enabled;
606
607     for (auto& handler : sockets_) {
608       if (handler.socket_ < 0) {
609         continue;
610       }
611
612       int val = (enabled) ? 1 : 0;
613       if (setsockopt(handler.socket_, SOL_SOCKET,
614                      SO_KEEPALIVE, &val, sizeof(val)) != 0) {
615         LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: %s" <<
616                 strerror(errno);
617       }
618     }
619   }
620
621   /**
622    * Get whether or not SO_KEEPALIVE is enabled on the server socket.
623    */
624   bool getKeepAliveEnabled() const {
625     return keepAliveEnabled_;
626   }
627
628   /**
629    * Set whether or not SO_REUSEPORT should be enabled on the server socket,
630    * allowing multiple binds to the same port
631    */
632   void setReusePortEnabled(bool enabled) {
633     reusePortEnabled_ = enabled;
634
635     for (auto& handler : sockets_) {
636       if (handler.socket_ < 0) {
637         continue;
638       }
639
640       int val = (enabled) ? 1 : 0;
641       if (setsockopt(handler.socket_, SOL_SOCKET,
642                      SO_REUSEPORT, &val, sizeof(val)) != 0) {
643         LOG(ERROR) <<
644           "failed to set SO_REUSEPORT on async server socket " << errno;
645         folly::throwSystemError(errno,
646                                 "failed to bind to async server socket");
647       }
648     }
649   }
650
651   /**
652    * Get whether or not SO_REUSEPORT is enabled on the server socket.
653    */
654   bool getReusePortEnabled_() const {
655     return reusePortEnabled_;
656   }
657
658   /**
659    * Set whether or not the socket should close during exec() (FD_CLOEXEC). By
660    * default, this is enabled
661    */
662   void setCloseOnExec(bool closeOnExec) {
663     closeOnExec_ = closeOnExec;
664   }
665
666   /**
667    * Get whether or not FD_CLOEXEC is enabled on the server socket.
668    */
669   bool getCloseOnExec() const {
670     return closeOnExec_;
671   }
672
673   /**
674    * Tries to enable TFO if the machine supports it.
675    */
676   void setTFOEnabled(bool enabled, uint32_t maxTFOQueueSize) {
677     tfo_ = enabled;
678     tfoMaxQueueSize_ = maxTFOQueueSize;
679   }
680
681   /**
682    * Get whether or not the socket is accepting new connections
683    */
684   bool getAccepting() const {
685     return accepting_;
686   }
687
688   /**
689    * Set the ConnectionEventCallback
690    */
691   void setConnectionEventCallback(
692       ConnectionEventCallback* const connectionEventCallback) {
693     connectionEventCallback_ = connectionEventCallback;
694   }
695
696   /**
697    * Get the ConnectionEventCallback
698    */
699   ConnectionEventCallback* getConnectionEventCallback() const {
700     return connectionEventCallback_;
701   }
702
703  protected:
704   /**
705    * Protected destructor.
706    *
707    * Invoke destroy() instead to destroy the AsyncServerSocket.
708    */
709   virtual ~AsyncServerSocket();
710
711  private:
712   enum class MessageType {
713     MSG_NEW_CONN = 0,
714     MSG_ERROR = 1
715   };
716
717   struct QueueMessage {
718     MessageType type;
719     int fd;
720     int err;
721     SocketAddress address;
722     std::string msg;
723   };
724
725   /**
726    * A class to receive notifications to invoke AcceptCallback objects
727    * in other EventBase threads.
728    *
729    * A RemoteAcceptor object is created for each AcceptCallback that
730    * is installed in a separate EventBase thread.  The RemoteAcceptor
731    * receives notification of new sockets via a NotificationQueue,
732    * and then invokes the AcceptCallback.
733    */
734   class RemoteAcceptor
735       : private NotificationQueue<QueueMessage>::Consumer {
736   public:
737     explicit RemoteAcceptor(AcceptCallback *callback,
738                             ConnectionEventCallback *connectionEventCallback)
739       : callback_(callback),
740         connectionEventCallback_(connectionEventCallback) {}
741
742     ~RemoteAcceptor() = default;
743
744     void start(EventBase *eventBase, uint32_t maxAtOnce, uint32_t maxInQueue);
745     void stop(EventBase* eventBase, AcceptCallback* callback);
746
747     virtual void messageAvailable(QueueMessage&& message);
748
749     NotificationQueue<QueueMessage>* getQueue() {
750       return &queue_;
751     }
752
753   private:
754     AcceptCallback *callback_;
755     ConnectionEventCallback* connectionEventCallback_;
756
757     NotificationQueue<QueueMessage> queue_;
758   };
759
760   /**
761    * A struct to keep track of the callbacks associated with this server
762    * socket.
763    */
764   struct CallbackInfo {
765     CallbackInfo(AcceptCallback *cb, EventBase *evb)
766       : callback(cb),
767         eventBase(evb),
768         consumer(nullptr) {}
769
770     AcceptCallback *callback;
771     EventBase *eventBase;
772
773     RemoteAcceptor* consumer;
774   };
775
776   class BackoffTimeout;
777
778   virtual void handlerReady(
779     uint16_t events, int socket, sa_family_t family) noexcept;
780
781   int createSocket(int family);
782   void setupSocket(int fd, int family);
783   void bindSocket(int fd, const SocketAddress& address, bool isExistingSocket);
784   void dispatchSocket(int socket, SocketAddress&& address);
785   void dispatchError(const char *msg, int errnoValue);
786   void enterBackoff();
787   void backoffTimeoutExpired();
788
789   CallbackInfo* nextCallback() {
790     CallbackInfo* info = &callbacks_[callbackIndex_];
791
792     ++callbackIndex_;
793     if (callbackIndex_ >= callbacks_.size()) {
794       callbackIndex_ = 0;
795     }
796
797     return info;
798   }
799
800   struct ServerEventHandler : public EventHandler {
801     ServerEventHandler(EventBase* eventBase, int socket,
802                        AsyncServerSocket* parent,
803                       sa_family_t addressFamily)
804         : EventHandler(eventBase, socket)
805         , eventBase_(eventBase)
806         , socket_(socket)
807         , parent_(parent)
808         , addressFamily_(addressFamily) {}
809
810     ServerEventHandler(const ServerEventHandler& other)
811     : EventHandler(other.eventBase_, other.socket_)
812     , eventBase_(other.eventBase_)
813     , socket_(other.socket_)
814     , parent_(other.parent_)
815     , addressFamily_(other.addressFamily_) {}
816
817     ServerEventHandler& operator=(
818         const ServerEventHandler& other) {
819       if (this != &other) {
820         eventBase_ = other.eventBase_;
821         socket_ = other.socket_;
822         parent_ = other.parent_;
823         addressFamily_ = other.addressFamily_;
824
825         detachEventBase();
826         attachEventBase(other.eventBase_);
827         changeHandlerFD(other.socket_);
828       }
829       return *this;
830     }
831
832     // Inherited from EventHandler
833     virtual void handlerReady(uint16_t events) noexcept {
834       parent_->handlerReady(events, socket_, addressFamily_);
835     }
836
837     EventBase* eventBase_;
838     int socket_;
839     AsyncServerSocket* parent_;
840     sa_family_t addressFamily_;
841   };
842
843   EventBase *eventBase_;
844   std::vector<ServerEventHandler> sockets_;
845   std::vector<int> pendingCloseSockets_;
846   bool accepting_;
847   uint32_t maxAcceptAtOnce_;
848   uint32_t maxNumMsgsInQueue_;
849   double acceptRateAdjustSpeed_;  //0 to disable auto adjust
850   double acceptRate_;
851   std::chrono::time_point<std::chrono::steady_clock> lastAccepTimestamp_;
852   uint64_t numDroppedConnections_;
853   uint32_t callbackIndex_;
854   BackoffTimeout *backoffTimeout_;
855   std::vector<CallbackInfo> callbacks_;
856   bool keepAliveEnabled_;
857   bool reusePortEnabled_{false};
858   bool closeOnExec_;
859   bool tfo_{false};
860   uint32_t tfoMaxQueueSize_{0};
861   ShutdownSocketSet* shutdownSocketSet_;
862   ConnectionEventCallback* connectionEventCallback_{nullptr};
863 };
864
865 } // folly