Fix copyright lines
[folly.git] / folly / io / async / AsyncServerSocket.h
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/SocketAddress.h>
20 #include <folly/io/ShutdownSocketSet.h>
21 #include <folly/io/async/AsyncSocketBase.h>
22 #include <folly/io/async/AsyncTimeout.h>
23 #include <folly/io/async/DelayedDestruction.h>
24 #include <folly/io/async/EventBase.h>
25 #include <folly/io/async/EventHandler.h>
26 #include <folly/io/async/NotificationQueue.h>
27 #include <folly/portability/Sockets.h>
28
29 #include <limits.h>
30 #include <stddef.h>
31 #include <exception>
32 #include <memory>
33 #include <vector>
34
35 // Due to the way kernel headers are included, this may or may not be defined.
36 // Number pulled from 3.10 kernel headers.
37 #ifndef SO_REUSEPORT
38 #define SO_REUSEPORT 15
39 #endif
40
41 #if defined __linux__ && !defined SO_NO_TRANSPARENT_TLS
42 #define SO_NO_TRANSPARENT_TLS 200
43 #endif
44
45 namespace folly {
46
47 /**
48  * A listening socket that asynchronously informs a callback whenever a new
49  * connection has been accepted.
50  *
51  * Unlike most async interfaces that always invoke their callback in the same
52  * EventBase thread, AsyncServerSocket is unusual in that it can distribute
53  * the callbacks across multiple EventBase threads.
54  *
55  * This supports a common use case for network servers to distribute incoming
56  * connections across a number of EventBase threads.  (Servers typically run
57  * with one EventBase thread per CPU.)
58  *
59  * Despite being able to invoke callbacks in multiple EventBase threads,
60  * AsyncServerSocket still has one "primary" EventBase.  Operations that
61  * modify the AsyncServerSocket state may only be performed from the primary
62  * EventBase thread.
63  */
64 class AsyncServerSocket : public DelayedDestruction
65                         , public AsyncSocketBase {
66  public:
67   typedef std::unique_ptr<AsyncServerSocket, Destructor> UniquePtr;
68   // Disallow copy, move, and default construction.
69   AsyncServerSocket(AsyncServerSocket&&) = delete;
70
71   /**
72    * A callback interface to get notified of client socket events.
73    *
74    * The ConnectionEventCallback implementations need to be thread-safe as the
75    * callbacks may be called from different threads.
76    */
77   class ConnectionEventCallback {
78    public:
79     virtual ~ConnectionEventCallback() = default;
80
81     /**
82      * onConnectionAccepted() is called right after a client connection
83      * is accepted using the system accept()/accept4() APIs.
84      */
85     virtual void onConnectionAccepted(const int socket,
86                                       const SocketAddress& addr) noexcept = 0;
87
88     /**
89      * onConnectionAcceptError() is called when an error occurred accepting
90      * a connection.
91      */
92     virtual void onConnectionAcceptError(const int err) noexcept = 0;
93
94     /**
95      * onConnectionDropped() is called when a connection is dropped,
96      * probably because of some error encountered.
97      */
98     virtual void onConnectionDropped(const int socket,
99                                      const SocketAddress& addr) noexcept = 0;
100
101     /**
102      * onConnectionEnqueuedForAcceptorCallback() is called when the
103      * connection is successfully enqueued for an AcceptCallback to pick up.
104      */
105     virtual void onConnectionEnqueuedForAcceptorCallback(
106         const int socket,
107         const SocketAddress& addr) noexcept = 0;
108
109     /**
110      * onConnectionDequeuedByAcceptorCallback() is called when the
111      * connection is successfully dequeued by an AcceptCallback.
112      */
113     virtual void onConnectionDequeuedByAcceptorCallback(
114         const int socket,
115         const SocketAddress& addr) noexcept = 0;
116
117     /**
118      * onBackoffStarted is called when the socket has successfully started
119      * backing off accepting new client sockets.
120      */
121     virtual void onBackoffStarted() noexcept = 0;
122
123     /**
124      * onBackoffEnded is called when the backoff period has ended and the socket
125      * has successfully resumed accepting new connections if there is any
126      * AcceptCallback registered.
127      */
128     virtual void onBackoffEnded() noexcept = 0;
129
130     /**
131      * onBackoffError is called when there is an error entering backoff
132      */
133     virtual void onBackoffError() noexcept = 0;
134   };
135
136   class AcceptCallback {
137    public:
138     virtual ~AcceptCallback() = default;
139
140     /**
141      * connectionAccepted() is called whenever a new client connection is
142      * received.
143      *
144      * The AcceptCallback will remain installed after connectionAccepted()
145      * returns.
146      *
147      * @param fd          The newly accepted client socket.  The AcceptCallback
148      *                    assumes ownership of this socket, and is responsible
149      *                    for closing it when done.  The newly accepted file
150      *                    descriptor will have already been put into
151      *                    non-blocking mode.
152      * @param clientAddr  A reference to a SocketAddress struct containing the
153      *                    client's address.  This struct is only guaranteed to
154      *                    remain valid until connectionAccepted() returns.
155      */
156     virtual void connectionAccepted(int fd,
157                                     const SocketAddress& clientAddr)
158       noexcept = 0;
159
160     /**
161      * acceptError() is called if an error occurs while accepting.
162      *
163      * The AcceptCallback will remain installed even after an accept error,
164      * as the errors are typically somewhat transient, such as being out of
165      * file descriptors.  The server socket must be explicitly stopped if you
166      * wish to stop accepting after an error.
167      *
168      * @param ex  An exception representing the error.
169      */
170     virtual void acceptError(const std::exception& ex) noexcept = 0;
171
172     /**
173      * acceptStarted() will be called in the callback's EventBase thread
174      * after this callback has been added to the AsyncServerSocket.
175      *
176      * acceptStarted() will be called before any calls to connectionAccepted()
177      * or acceptError() are made on this callback.
178      *
179      * acceptStarted() makes it easier for callbacks to perform initialization
180      * inside the callback thread.  (The call to addAcceptCallback() must
181      * always be made from the AsyncServerSocket's primary EventBase thread.
182      * acceptStarted() provides a hook that will always be invoked in the
183      * callback's thread.)
184      *
185      * Note that the call to acceptStarted() is made once the callback is
186      * added, regardless of whether or not the AsyncServerSocket is actually
187      * accepting at the moment.  acceptStarted() will be called even if the
188      * AsyncServerSocket is paused when the callback is added (including if
189      * the initial call to startAccepting() on the AsyncServerSocket has not
190      * been made yet).
191      */
192     virtual void acceptStarted() noexcept {}
193
194     /**
195      * acceptStopped() will be called when this AcceptCallback is removed from
196      * the AsyncServerSocket, or when the AsyncServerSocket is destroyed,
197      * whichever occurs first.
198      *
199      * No more calls to connectionAccepted() or acceptError() will be made
200      * after acceptStopped() is invoked.
201      */
202     virtual void acceptStopped() noexcept {}
203   };
204
205   static const uint32_t kDefaultMaxAcceptAtOnce = 30;
206   static const uint32_t kDefaultCallbackAcceptAtOnce = 5;
207   static const uint32_t kDefaultMaxMessagesInQueue = 1024;
208   /**
209    * Create a new AsyncServerSocket with the specified EventBase.
210    *
211    * @param eventBase  The EventBase to use for driving the asynchronous I/O.
212    *                   If this parameter is nullptr, attachEventBase() must be
213    *                   called before this socket can begin accepting
214    *                   connections.
215    */
216   explicit AsyncServerSocket(EventBase* eventBase = nullptr);
217
218   /**
219    * Helper function to create a shared_ptr<AsyncServerSocket>.
220    *
221    * This passes in the correct destructor object, since AsyncServerSocket's
222    * destructor is protected and cannot be invoked directly.
223    */
224   static std::shared_ptr<AsyncServerSocket>
225   newSocket(EventBase* evb = nullptr) {
226     return std::shared_ptr<AsyncServerSocket>(new AsyncServerSocket(evb),
227                                                  Destructor());
228   }
229
230   void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wNewSS);
231
232   /**
233    * Destroy the socket.
234    *
235    * AsyncServerSocket::destroy() must be called to destroy the socket.
236    * The normal destructor is private, and should not be invoked directly.
237    * This prevents callers from deleting a AsyncServerSocket while it is
238    * invoking a callback.
239    *
240    * destroy() must be invoked from the socket's primary EventBase thread.
241    *
242    * If there are AcceptCallbacks still installed when destroy() is called,
243    * acceptStopped() will be called on these callbacks to notify them that
244    * accepting has stopped.  Accept callbacks being driven by other EventBase
245    * threads may continue to receive new accept callbacks for a brief period of
246    * time after destroy() returns.  They will not receive any more callback
247    * invocations once acceptStopped() is invoked.
248    */
249   void destroy() override;
250
251   /**
252    * Attach this AsyncServerSocket to its primary EventBase.
253    *
254    * This may only be called if the AsyncServerSocket is not already attached
255    * to a EventBase.  The AsyncServerSocket must be attached to a EventBase
256    * before it can begin accepting connections.
257    */
258   void attachEventBase(EventBase *eventBase);
259
260   /**
261    * Detach the AsyncServerSocket from its primary EventBase.
262    *
263    * detachEventBase() may only be called if the AsyncServerSocket is not
264    * currently accepting connections.
265    */
266   void detachEventBase();
267
268   /**
269    * Get the EventBase used by this socket.
270    */
271   EventBase* getEventBase() const override {
272     return eventBase_;
273   }
274
275   /**
276    * Create a AsyncServerSocket from an existing socket file descriptor.
277    *
278    * useExistingSocket() will cause the AsyncServerSocket to take ownership of
279    * the specified file descriptor, and use it to listen for new connections.
280    * The AsyncServerSocket will close the file descriptor when it is
281    * destroyed.
282    *
283    * useExistingSocket() must be called before bind() or listen().
284    *
285    * The supplied file descriptor will automatically be put into non-blocking
286    * mode.  The caller may have already directly called bind() and possibly
287    * listen on the file descriptor.  If so the caller should skip calling the
288    * corresponding AsyncServerSocket::bind() and listen() methods.
289    *
290    * On error a TTransportException will be thrown and the caller will retain
291    * ownership of the file descriptor.
292    */
293   void useExistingSocket(int fd);
294   void useExistingSockets(const std::vector<int>& fds);
295
296   /**
297    * Return the underlying file descriptor
298    */
299   std::vector<int> getSockets() const {
300     std::vector<int> sockets;
301     for (auto& handler : sockets_) {
302       sockets.push_back(handler.socket_);
303     }
304     return sockets;
305   }
306
307   /**
308    * Backwards compatible getSocket, warns if > 1 socket
309    */
310   int getSocket() const {
311     if (sockets_.size() > 1) {
312       VLOG(2) << "Warning: getSocket can return multiple fds, " <<
313         "but getSockets was not called, so only returning the first";
314     }
315     if (sockets_.size() == 0) {
316       return -1;
317     } else {
318       return sockets_[0].socket_;
319     }
320   }
321
322   /* enable zerocopy support for the server sockets - the s = accept sockets
323    * inherit it
324    */
325   bool setZeroCopy(bool enable);
326
327   /**
328    * Bind to the specified address.
329    *
330    * This must be called from the primary EventBase thread.
331    *
332    * Throws TTransportException on error.
333    */
334   virtual void bind(const SocketAddress& address);
335
336   /**
337    * Bind to the specified port for the specified addresses.
338    *
339    * This must be called from the primary EventBase thread.
340    *
341    * Throws TTransportException on error.
342    */
343   virtual void bind(
344       const std::vector<IPAddress>& ipAddresses,
345       uint16_t port);
346
347   /**
348    * Bind to the specified port.
349    *
350    * This must be called from the primary EventBase thread.
351    *
352    * Throws TTransportException on error.
353    */
354   virtual void bind(uint16_t port);
355
356   /**
357    * Get the local address to which the socket is bound.
358    *
359    * Throws TTransportException on error.
360    */
361   void getAddress(SocketAddress* addressReturn) const override;
362
363   /**
364    * Get the local address to which the socket is bound.
365    *
366    * Throws TTransportException on error.
367    */
368   SocketAddress getAddress() const {
369     SocketAddress ret;
370     getAddress(&ret);
371     return ret;
372   }
373
374   /**
375    * Get all the local addresses to which the socket is bound.
376    *
377    * Throws TTransportException on error.
378    */
379   std::vector<SocketAddress> getAddresses() const;
380
381   /**
382    * Begin listening for connections.
383    *
384    * This calls ::listen() with the specified backlog.
385    *
386    * Once listen() is invoked the socket will actually be open so that remote
387    * clients may establish connections.  (Clients that attempt to connect
388    * before listen() is called will receive a connection refused error.)
389    *
390    * At least one callback must be set and startAccepting() must be called to
391    * actually begin notifying the accept callbacks of newly accepted
392    * connections.  The backlog parameter controls how many connections the
393    * kernel will accept and buffer internally while the accept callbacks are
394    * paused (or if accepting is enabled but the callbacks cannot keep up).
395    *
396    * bind() must be called before calling listen().
397    * listen() must be called from the primary EventBase thread.
398    *
399    * Throws TTransportException on error.
400    */
401   virtual void listen(int backlog);
402
403   /**
404    * Add an AcceptCallback.
405    *
406    * When a new socket is accepted, one of the AcceptCallbacks will be invoked
407    * with the new socket.  The AcceptCallbacks are invoked in a round-robin
408    * fashion.  This allows the accepted sockets to be distributed among a pool
409    * of threads, each running its own EventBase object.  This is a common model,
410    * since most asynchronous-style servers typically run one EventBase thread
411    * per CPU.
412    *
413    * The EventBase object associated with each AcceptCallback must be running
414    * its loop.  If the EventBase loop is not running, sockets will still be
415    * scheduled for the callback, but the callback cannot actually get invoked
416    * until the loop runs.
417    *
418    * This method must be invoked from the AsyncServerSocket's primary
419    * EventBase thread.
420    *
421    * Note that startAccepting() must be called on the AsyncServerSocket to
422    * cause it to actually start accepting sockets once callbacks have been
423    * installed.
424    *
425    * @param callback   The callback to invoke.
426    * @param eventBase  The EventBase to use to invoke the callback.  This
427    *     parameter may be nullptr, in which case the callback will be invoked in
428    *     the AsyncServerSocket's primary EventBase.
429    * @param maxAtOnce  The maximum number of connections to accept in this
430    *                   callback on a single iteration of the event base loop.
431    *                   This only takes effect when eventBase is non-nullptr.
432    *                   When using a nullptr eventBase for the callback, the
433    *                   setMaxAcceptAtOnce() method controls how many
434    *                   connections the main event base will accept at once.
435    */
436   virtual void addAcceptCallback(
437     AcceptCallback *callback,
438     EventBase *eventBase,
439     uint32_t maxAtOnce = kDefaultCallbackAcceptAtOnce);
440
441   /**
442    * Remove an AcceptCallback.
443    *
444    * This allows a single AcceptCallback to be removed from the round-robin
445    * pool.
446    *
447    * This method must be invoked from the AsyncServerSocket's primary
448    * EventBase thread.  Use EventBase::runInEventBaseThread() to schedule the
449    * operation in the correct EventBase if your code is not in the server
450    * socket's primary EventBase.
451    *
452    * Given that the accept callback is being driven by a different EventBase,
453    * the AcceptCallback may continue to be invoked for a short period of time
454    * after removeAcceptCallback() returns in this thread.  Once the other
455    * EventBase thread receives the notification to stop, it will call
456    * acceptStopped() on the callback to inform it that it is fully stopped and
457    * will not receive any new sockets.
458    *
459    * If the last accept callback is removed while the socket is accepting,
460    * the socket will implicitly pause accepting.  If a callback is later added,
461    * it will resume accepting immediately, without requiring startAccepting()
462    * to be invoked.
463    *
464    * @param callback   The callback to uninstall.
465    * @param eventBase  The EventBase associated with this callback.  This must
466    *     be the same EventBase that was used when the callback was installed
467    *     with addAcceptCallback().
468    */
469   void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase);
470
471   /**
472    * Begin accepting connctions on this socket.
473    *
474    * bind() and listen() must be called before calling startAccepting().
475    *
476    * When a AsyncServerSocket is initially created, it will not begin
477    * accepting connections until at least one callback has been added and
478    * startAccepting() has been called.  startAccepting() can also be used to
479    * resume accepting connections after a call to pauseAccepting().
480    *
481    * If startAccepting() is called when there are no accept callbacks
482    * installed, the socket will not actually begin accepting until an accept
483    * callback is added.
484    *
485    * This method may only be called from the primary EventBase thread.
486    */
487   virtual void startAccepting();
488
489   /**
490    * Pause accepting connections.
491    *
492    * startAccepting() may be called to resume accepting.
493    *
494    * This method may only be called from the primary EventBase thread.
495    * If there are AcceptCallbacks being driven by other EventBase threads they
496    * may continue to receive callbacks for a short period of time after
497    * pauseAccepting() returns.
498    *
499    * Unlike removeAcceptCallback() or destroy(), acceptStopped() will not be
500    * called on the AcceptCallback objects simply due to a temporary pause.  If
501    * the server socket is later destroyed while paused, acceptStopped() will be
502    * called all of the installed AcceptCallbacks.
503    */
504   void pauseAccepting();
505
506   /**
507    * Shutdown the listen socket and notify all callbacks that accept has
508    * stopped, but don't close the socket.  This invokes shutdown(2) with the
509    * supplied argument.  Passing -1 will close the socket now.  Otherwise, the
510    * close will be delayed until this object is destroyed.
511    *
512    * Only use this if you have reason to pass special flags to shutdown.
513    * Otherwise just destroy the socket.
514    *
515    * This method has no effect when a ShutdownSocketSet option is used.
516    *
517    * Returns the result of shutdown on sockets_[n-1]
518    */
519   int stopAccepting(int shutdownFlags = -1);
520
521   /**
522    * Get the maximum number of connections that will be accepted each time
523    * around the event loop.
524    */
525   uint32_t getMaxAcceptAtOnce() const {
526     return maxAcceptAtOnce_;
527   }
528
529   /**
530    * Set the maximum number of connections that will be accepted each time
531    * around the event loop.
532    *
533    * This provides a very coarse-grained way of controlling how fast the
534    * AsyncServerSocket will accept connections.  If you find that when your
535    * server is overloaded AsyncServerSocket accepts connections more quickly
536    * than your code can process them, you can try lowering this number so that
537    * fewer connections will be accepted each event loop iteration.
538    *
539    * For more explicit control over the accept rate, you can also use
540    * pauseAccepting() to temporarily pause accepting when your server is
541    * overloaded, and then use startAccepting() later to resume accepting.
542    */
543   void setMaxAcceptAtOnce(uint32_t numConns) {
544     maxAcceptAtOnce_ = numConns;
545   }
546
547   /**
548    * Get the maximum number of unprocessed messages which a NotificationQueue
549    * can hold.
550    */
551   uint32_t getMaxNumMessagesInQueue() const {
552     return maxNumMsgsInQueue_;
553   }
554
555   /**
556    * Set the maximum number of unprocessed messages in NotificationQueue.
557    * No new message will be sent to that NotificationQueue if there are more
558    * than such number of unprocessed messages in that queue.
559    *
560    * Only works if called before addAcceptCallback.
561    */
562   void setMaxNumMessagesInQueue(uint32_t num) {
563     maxNumMsgsInQueue_ = num;
564   }
565
566   /**
567    * Get the speed of adjusting connection accept rate.
568    */
569   double getAcceptRateAdjustSpeed() const {
570     return acceptRateAdjustSpeed_;
571   }
572
573   /**
574    * Set the speed of adjusting connection accept rate.
575    */
576   void setAcceptRateAdjustSpeed(double speed) {
577     acceptRateAdjustSpeed_ = speed;
578   }
579
580   /**
581    * Get the number of connections dropped by the AsyncServerSocket
582    */
583   uint64_t getNumDroppedConnections() const {
584     return numDroppedConnections_;
585   }
586
587   /**
588    * Get the current number of unprocessed messages in NotificationQueue.
589    *
590    * This method must be invoked from the AsyncServerSocket's primary
591    * EventBase thread.  Use EventBase::runInEventBaseThread() to schedule the
592    * operation in the correct EventBase if your code is not in the server
593    * socket's primary EventBase.
594    */
595   int64_t getNumPendingMessagesInQueue() const {
596     if (eventBase_) {
597       eventBase_->dcheckIsInEventBaseThread();
598     }
599     int64_t numMsgs = 0;
600     for (const auto& callback : callbacks_) {
601       numMsgs += callback.consumer->getQueue()->size();
602     }
603     return numMsgs;
604   }
605
606   /**
607    * Set whether or not SO_KEEPALIVE should be enabled on the server socket
608    * (and thus on all subsequently-accepted connections). By default, keepalive
609    * is enabled.
610    *
611    * Note that TCP keepalive usually only kicks in after the connection has
612    * been idle for several hours. Applications should almost always have their
613    * own, shorter idle timeout.
614    */
615   void setKeepAliveEnabled(bool enabled) {
616     keepAliveEnabled_ = enabled;
617
618     for (auto& handler : sockets_) {
619       if (handler.socket_ < 0) {
620         continue;
621       }
622
623       int val = (enabled) ? 1 : 0;
624       if (setsockopt(handler.socket_, SOL_SOCKET,
625                      SO_KEEPALIVE, &val, sizeof(val)) != 0) {
626         LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: %s" <<
627                 strerror(errno);
628       }
629     }
630   }
631
632   /**
633    * Get whether or not SO_KEEPALIVE is enabled on the server socket.
634    */
635   bool getKeepAliveEnabled() const {
636     return keepAliveEnabled_;
637   }
638
639   /**
640    * Set whether or not SO_REUSEPORT should be enabled on the server socket,
641    * allowing multiple binds to the same port
642    */
643   void setReusePortEnabled(bool enabled) {
644     reusePortEnabled_ = enabled;
645
646     for (auto& handler : sockets_) {
647       if (handler.socket_ < 0) {
648         continue;
649       }
650
651       int val = (enabled) ? 1 : 0;
652       if (setsockopt(handler.socket_, SOL_SOCKET,
653                      SO_REUSEPORT, &val, sizeof(val)) != 0) {
654         LOG(ERROR) <<
655           "failed to set SO_REUSEPORT on async server socket " << errno;
656         folly::throwSystemError(errno,
657                                 "failed to bind to async server socket");
658       }
659     }
660   }
661
662   /**
663    * Get whether or not SO_REUSEPORT is enabled on the server socket.
664    */
665   bool getReusePortEnabled_() const {
666     return reusePortEnabled_;
667   }
668
669   /**
670    * Set whether or not the socket should close during exec() (FD_CLOEXEC). By
671    * default, this is enabled
672    */
673   void setCloseOnExec(bool closeOnExec) {
674     closeOnExec_ = closeOnExec;
675   }
676
677   /**
678    * Get whether or not FD_CLOEXEC is enabled on the server socket.
679    */
680   bool getCloseOnExec() const {
681     return closeOnExec_;
682   }
683
684   /**
685    * Tries to enable TFO if the machine supports it.
686    */
687   void setTFOEnabled(bool enabled, uint32_t maxTFOQueueSize) {
688     tfo_ = enabled;
689     tfoMaxQueueSize_ = maxTFOQueueSize;
690   }
691
692   /**
693    * Do not attempt the transparent TLS handshake
694    */
695   void disableTransparentTls() {
696     noTransparentTls_ = true;
697   }
698
699   /**
700    * Get whether or not the socket is accepting new connections
701    */
702   bool getAccepting() const {
703     return accepting_;
704   }
705
706   /**
707    * Set the ConnectionEventCallback
708    */
709   void setConnectionEventCallback(
710       ConnectionEventCallback* const connectionEventCallback) {
711     connectionEventCallback_ = connectionEventCallback;
712   }
713
714   /**
715    * Get the ConnectionEventCallback
716    */
717   ConnectionEventCallback* getConnectionEventCallback() const {
718     return connectionEventCallback_;
719   }
720
721  protected:
722   /**
723    * Protected destructor.
724    *
725    * Invoke destroy() instead to destroy the AsyncServerSocket.
726    */
727   ~AsyncServerSocket() override;
728
729  private:
730   enum class MessageType {
731     MSG_NEW_CONN = 0,
732     MSG_ERROR = 1
733   };
734
735   struct QueueMessage {
736     MessageType type;
737     int fd;
738     int err;
739     SocketAddress address;
740     std::string msg;
741   };
742
743   /**
744    * A class to receive notifications to invoke AcceptCallback objects
745    * in other EventBase threads.
746    *
747    * A RemoteAcceptor object is created for each AcceptCallback that
748    * is installed in a separate EventBase thread.  The RemoteAcceptor
749    * receives notification of new sockets via a NotificationQueue,
750    * and then invokes the AcceptCallback.
751    */
752   class RemoteAcceptor
753       : private NotificationQueue<QueueMessage>::Consumer {
754    public:
755     explicit RemoteAcceptor(AcceptCallback *callback,
756                             ConnectionEventCallback *connectionEventCallback)
757       : callback_(callback),
758         connectionEventCallback_(connectionEventCallback) {}
759
760     ~RemoteAcceptor() override = default;
761
762     void start(EventBase *eventBase, uint32_t maxAtOnce, uint32_t maxInQueue);
763     void stop(EventBase* eventBase, AcceptCallback* callback);
764
765     void messageAvailable(QueueMessage&& message) noexcept override;
766
767     NotificationQueue<QueueMessage>* getQueue() {
768       return &queue_;
769     }
770
771    private:
772     AcceptCallback *callback_;
773     ConnectionEventCallback* connectionEventCallback_;
774
775     NotificationQueue<QueueMessage> queue_;
776   };
777
778   /**
779    * A struct to keep track of the callbacks associated with this server
780    * socket.
781    */
782   struct CallbackInfo {
783     CallbackInfo(AcceptCallback *cb, EventBase *evb)
784       : callback(cb),
785         eventBase(evb),
786         consumer(nullptr) {}
787
788     AcceptCallback *callback;
789     EventBase *eventBase;
790
791     RemoteAcceptor* consumer;
792   };
793
794   class BackoffTimeout;
795
796   virtual void handlerReady(
797     uint16_t events, int socket, sa_family_t family) noexcept;
798
799   int createSocket(int family);
800   void setupSocket(int fd, int family);
801   void bindSocket(int fd, const SocketAddress& address, bool isExistingSocket);
802   void dispatchSocket(int socket, SocketAddress&& address);
803   void dispatchError(const char *msg, int errnoValue);
804   void enterBackoff();
805   void backoffTimeoutExpired();
806
807   CallbackInfo* nextCallback() {
808     CallbackInfo* info = &callbacks_[callbackIndex_];
809
810     ++callbackIndex_;
811     if (callbackIndex_ >= callbacks_.size()) {
812       callbackIndex_ = 0;
813     }
814
815     return info;
816   }
817
818   struct ServerEventHandler : public EventHandler {
819     ServerEventHandler(EventBase* eventBase, int socket,
820                        AsyncServerSocket* parent,
821                       sa_family_t addressFamily)
822         : EventHandler(eventBase, socket)
823         , eventBase_(eventBase)
824         , socket_(socket)
825         , parent_(parent)
826         , addressFamily_(addressFamily) {}
827
828     ServerEventHandler(const ServerEventHandler& other)
829     : EventHandler(other.eventBase_, other.socket_)
830     , eventBase_(other.eventBase_)
831     , socket_(other.socket_)
832     , parent_(other.parent_)
833     , addressFamily_(other.addressFamily_) {}
834
835     ServerEventHandler& operator=(
836         const ServerEventHandler& other) {
837       if (this != &other) {
838         eventBase_ = other.eventBase_;
839         socket_ = other.socket_;
840         parent_ = other.parent_;
841         addressFamily_ = other.addressFamily_;
842
843         detachEventBase();
844         attachEventBase(other.eventBase_);
845         changeHandlerFD(other.socket_);
846       }
847       return *this;
848     }
849
850     // Inherited from EventHandler
851     void handlerReady(uint16_t events) noexcept override {
852       parent_->handlerReady(events, socket_, addressFamily_);
853     }
854
855     EventBase* eventBase_;
856     int socket_;
857     AsyncServerSocket* parent_;
858     sa_family_t addressFamily_;
859   };
860
861   EventBase *eventBase_;
862   std::vector<ServerEventHandler> sockets_;
863   std::vector<int> pendingCloseSockets_;
864   bool accepting_;
865   uint32_t maxAcceptAtOnce_;
866   uint32_t maxNumMsgsInQueue_;
867   double acceptRateAdjustSpeed_;  //0 to disable auto adjust
868   double acceptRate_;
869   std::chrono::time_point<std::chrono::steady_clock> lastAccepTimestamp_;
870   uint64_t numDroppedConnections_;
871   uint32_t callbackIndex_;
872   BackoffTimeout *backoffTimeout_;
873   std::vector<CallbackInfo> callbacks_;
874   bool keepAliveEnabled_;
875   bool reusePortEnabled_{false};
876   bool closeOnExec_;
877   bool tfo_{false};
878   bool noTransparentTls_{false};
879   uint32_t tfoMaxQueueSize_{0};
880   std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_;
881   ConnectionEventCallback* connectionEventCallback_{nullptr};
882 };
883
884 } // namespace folly