move asyncserversocket to folly
[folly.git] / folly / io / async / AsyncServerSocket.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/io/async/DelayedDestruction.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/NotificationQueue.h>
23 #include <folly/io/async/AsyncTimeout.h>
24 #include <folly/io/ShutdownSocketSet.h>
25 #include <folly/SocketAddress.h>
26 #include <memory>
27 #include <exception>
28 #include <vector>
29 #include <limits.h>
30 #include <stddef.h>
31 #include <sys/socket.h>
32
33 namespace folly {
34
35 /**
36  * A listening socket that asynchronously informs a callback whenever a new
37  * connection has been accepted.
38  *
39  * Unlike most async interfaces that always invoke their callback in the same
40  * EventBase thread, AsyncServerSocket is unusual in that it can distribute
41  * the callbacks across multiple EventBase threads.
42  *
43  * This supports a common use case for network servers to distribute incoming
44  * connections across a number of EventBase threads.  (Servers typically run
45  * with one EventBase thread per CPU.)
46  *
47  * Despite being able to invoke callbacks in multiple EventBase threads,
48  * AsyncServerSocket still has one "primary" EventBase.  Operations that
49  * modify the AsyncServerSocket state may only be performed from the primary
50  * EventBase thread.
51  */
52 class AsyncServerSocket : public DelayedDestruction {
53  public:
54   typedef std::unique_ptr<AsyncServerSocket, Destructor> UniquePtr;
55
56   class AcceptCallback {
57    public:
58     virtual ~AcceptCallback() {}
59
60     /**
61      * connectionAccepted() is called whenever a new client connection is
62      * received.
63      *
64      * The AcceptCallback will remain installed after connectionAccepted()
65      * returns.
66      *
67      * @param fd          The newly accepted client socket.  The AcceptCallback
68      *                    assumes ownership of this socket, and is responsible
69      *                    for closing it when done.  The newly accepted file
70      *                    descriptor will have already been put into
71      *                    non-blocking mode.
72      * @param clientAddr  A reference to a TSocketAddress struct containing the
73      *                    client's address.  This struct is only guaranteed to
74      *                    remain valid until connectionAccepted() returns.
75      */
76     virtual void connectionAccepted(int fd,
77                                     const SocketAddress& clientAddr)
78       noexcept = 0;
79
80     /**
81      * acceptError() is called if an error occurs while accepting.
82      *
83      * The AcceptCallback will remain installed even after an accept error,
84      * as the errors are typically somewhat transient, such as being out of
85      * file descriptors.  The server socket must be explicitly stopped if you
86      * wish to stop accepting after an error.
87      *
88      * @param ex  An exception representing the error.
89      */
90     virtual void acceptError(const std::exception& ex) noexcept = 0;
91
92     /**
93      * acceptStarted() will be called in the callback's EventBase thread
94      * after this callback has been added to the AsyncServerSocket.
95      *
96      * acceptStarted() will be called before any calls to connectionAccepted()
97      * or acceptError() are made on this callback.
98      *
99      * acceptStarted() makes it easier for callbacks to perform initialization
100      * inside the callback thread.  (The call to addAcceptCallback() must
101      * always be made from the AsyncServerSocket's primary EventBase thread.
102      * acceptStarted() provides a hook that will always be invoked in the
103      * callback's thread.)
104      *
105      * Note that the call to acceptStarted() is made once the callback is
106      * added, regardless of whether or not the AsyncServerSocket is actually
107      * accepting at the moment.  acceptStarted() will be called even if the
108      * AsyncServerSocket is paused when the callback is added (including if
109      * the initial call to startAccepting() on the AsyncServerSocket has not
110      * been made yet).
111      */
112     virtual void acceptStarted() noexcept {}
113
114     /**
115      * acceptStopped() will be called when this AcceptCallback is removed from
116      * the AsyncServerSocket, or when the AsyncServerSocket is destroyed,
117      * whichever occurs first.
118      *
119      * No more calls to connectionAccepted() or acceptError() will be made
120      * after acceptStopped() is invoked.
121      */
122     virtual void acceptStopped() noexcept {}
123   };
124
125   static const uint32_t kDefaultMaxAcceptAtOnce = 30;
126   static const uint32_t kDefaultCallbackAcceptAtOnce = 5;
127   static const uint32_t kDefaultMaxMessagesInQueue = 0;
128   /**
129    * Create a new AsyncServerSocket with the specified EventBase.
130    *
131    * @param eventBase  The EventBase to use for driving the asynchronous I/O.
132    *                   If this parameter is nullptr, attachEventBase() must be
133    *                   called before this socket can begin accepting
134    *                   connections.
135    */
136   explicit AsyncServerSocket(EventBase* eventBase = nullptr);
137
138   /**
139    * Helper function to create a shared_ptr<AsyncServerSocket>.
140    *
141    * This passes in the correct destructor object, since AsyncServerSocket's
142    * destructor is protected and cannot be invoked directly.
143    */
144   static std::shared_ptr<AsyncServerSocket>
145   newSocket(EventBase* evb = nullptr) {
146     return std::shared_ptr<AsyncServerSocket>(new AsyncServerSocket(evb),
147                                                  Destructor());
148   }
149
150   void setShutdownSocketSet(ShutdownSocketSet* newSS);
151
152   /**
153    * Destroy the socket.
154    *
155    * AsyncServerSocket::destroy() must be called to destroy the socket.
156    * The normal destructor is private, and should not be invoked directly.
157    * This prevents callers from deleting a AsyncServerSocket while it is
158    * invoking a callback.
159    *
160    * destroy() must be invoked from the socket's primary EventBase thread.
161    *
162    * If there are AcceptCallbacks still installed when destroy() is called,
163    * acceptStopped() will be called on these callbacks to notify them that
164    * accepting has stopped.  Accept callbacks being driven by other EventBase
165    * threads may continue to receive new accept callbacks for a brief period of
166    * time after destroy() returns.  They will not receive any more callback
167    * invocations once acceptStopped() is invoked.
168    */
169   virtual void destroy();
170
171   /**
172    * Attach this AsyncServerSocket to its primary EventBase.
173    *
174    * This may only be called if the AsyncServerSocket is not already attached
175    * to a EventBase.  The AsyncServerSocket must be attached to a EventBase
176    * before it can begin accepting connections.
177    */
178   void attachEventBase(EventBase *eventBase);
179
180   /**
181    * Detach the AsyncServerSocket from its primary EventBase.
182    *
183    * detachEventBase() may only be called if the AsyncServerSocket is not
184    * currently accepting connections.
185    */
186   void detachEventBase();
187
188   /**
189    * Get the EventBase used by this socket.
190    */
191   EventBase* getEventBase() const {
192     return eventBase_;
193   }
194
195   /**
196    * Create a AsyncServerSocket from an existing socket file descriptor.
197    *
198    * useExistingSocket() will cause the AsyncServerSocket to take ownership of
199    * the specified file descriptor, and use it to listen for new connections.
200    * The AsyncServerSocket will close the file descriptor when it is
201    * destroyed.
202    *
203    * useExistingSocket() must be called before bind() or listen().
204    *
205    * The supplied file descriptor will automatically be put into non-blocking
206    * mode.  The caller may have already directly called bind() and possibly
207    * listen on the file descriptor.  If so the caller should skip calling the
208    * corresponding AsyncServerSocket::bind() and listen() methods.
209    *
210    * On error a TTransportException will be thrown and the caller will retain
211    * ownership of the file descriptor.
212    */
213   void useExistingSocket(int fd);
214   void useExistingSockets(const std::vector<int>& fds);
215
216   /**
217    * Return the underlying file descriptor
218    */
219   std::vector<int> getSockets() const {
220     std::vector<int> sockets;
221     for (auto& handler : sockets_) {
222       sockets.push_back(handler.socket_);
223     }
224     return sockets;
225   }
226
227   /**
228    * Backwards compatible getSocket, warns if > 1 socket
229    */
230   int getSocket() const {
231     if (sockets_.size() > 1) {
232       VLOG(2) << "Warning: getSocket can return multiple fds, " <<
233         "but getSockets was not called, so only returning the first";
234     }
235     if (sockets_.size() == 0) {
236       return -1;
237     } else {
238       return sockets_[0].socket_;
239     }
240   }
241
242   /**
243    * Bind to the specified address.
244    *
245    * This must be called from the primary EventBase thread.
246    *
247    * Throws TTransportException on error.
248    */
249   virtual void bind(const SocketAddress& address);
250
251   /**
252    * Bind to the specified port.
253    *
254    * This must be called from the primary EventBase thread.
255    *
256    * Throws TTransportException on error.
257    */
258   virtual void bind(uint16_t port);
259
260   /**
261    * Get the local address to which the socket is bound.
262    *
263    * Throws TTransportException on error.
264    */
265   void getAddress(SocketAddress* addressReturn) const;
266
267   /**
268    * Get all the local addresses to which the socket is bound.
269    *
270    * Throws TTransportException on error.
271    */
272   std::vector<SocketAddress> getAddresses() const;
273
274   /**
275    * Begin listening for connections.
276    *
277    * This calls ::listen() with the specified backlog.
278    *
279    * Once listen() is invoked the socket will actually be open so that remote
280    * clients may establish connections.  (Clients that attempt to connect
281    * before listen() is called will receive a connection refused error.)
282    *
283    * At least one callback must be set and startAccepting() must be called to
284    * actually begin notifying the accept callbacks of newly accepted
285    * connections.  The backlog parameter controls how many connections the
286    * kernel will accept and buffer internally while the accept callbacks are
287    * paused (or if accepting is enabled but the callbacks cannot keep up).
288    *
289    * bind() must be called before calling listen().
290    * listen() must be called from the primary EventBase thread.
291    *
292    * Throws TTransportException on error.
293    */
294   virtual void listen(int backlog);
295
296   /**
297    * Add an AcceptCallback.
298    *
299    * When a new socket is accepted, one of the AcceptCallbacks will be invoked
300    * with the new socket.  The AcceptCallbacks are invoked in a round-robin
301    * fashion.  This allows the accepted sockets to distributed among a pool of
302    * threads, each running its own EventBase object.  This is a common model,
303    * since most asynchronous-style servers typically run one EventBase thread
304    * per CPU.
305    *
306    * The EventBase object associated with each AcceptCallback must be running
307    * its loop.  If the EventBase loop is not running, sockets will still be
308    * scheduled for the callback, but the callback cannot actually get invoked
309    * until the loop runs.
310    *
311    * This method must be invoked from the AsyncServerSocket's primary
312    * EventBase thread.
313    *
314    * Note that startAccepting() must be called on the AsyncServerSocket to
315    * cause it to actually start accepting sockets once callbacks have been
316    * installed.
317    *
318    * @param callback   The callback to invoke.
319    * @param eventBase  The EventBase to use to invoke the callback.  This
320    *     parameter may be nullptr, in which case the callback will be invoked in
321    *     the AsyncServerSocket's primary EventBase.
322    * @param maxAtOnce  The maximum number of connections to accept in this
323    *                   callback on a single iteration of the event base loop.
324    *                   This only takes effect when eventBase is non-nullptr.
325    *                   When using a nullptr eventBase for the callback, the
326    *                   setMaxAcceptAtOnce() method controls how many
327    *                   connections the main event base will accept at once.
328    */
329   virtual void addAcceptCallback(
330     AcceptCallback *callback,
331     EventBase *eventBase,
332     uint32_t maxAtOnce = kDefaultCallbackAcceptAtOnce);
333
334   /**
335    * Remove an AcceptCallback.
336    *
337    * This allows a single AcceptCallback to be removed from the round-robin
338    * pool.
339    *
340    * This method must be invoked from the AsyncServerSocket's primary
341    * EventBase thread.  Use EventBase::runInEventBaseThread() to schedule the
342    * operation in the correct EventBase if your code is not in the server
343    * socket's primary EventBase.
344    *
345    * Given that the accept callback is being driven by a different EventBase,
346    * the AcceptCallback may continue to be invoked for a short period of time
347    * after removeAcceptCallback() returns in this thread.  Once the other
348    * EventBase thread receives the notification to stop, it will call
349    * acceptStopped() on the callback to inform it that it is fully stopped and
350    * will not receive any new sockets.
351    *
352    * If the last accept callback is removed while the socket is accepting,
353    * the socket will implicitly pause accepting.  If a callback is later added,
354    * it will resume accepting immediately, without requiring startAccepting()
355    * to be invoked.
356    *
357    * @param callback   The callback to uninstall.
358    * @param eventBase  The EventBase associated with this callback.  This must
359    *     be the same EventBase that was used when the callback was installed
360    *     with addAcceptCallback().
361    */
362   void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase);
363
364   /**
365    * Begin accepting connctions on this socket.
366    *
367    * bind() and listen() must be called before calling startAccepting().
368    *
369    * When a AsyncServerSocket is initially created, it will not begin
370    * accepting connections until at least one callback has been added and
371    * startAccepting() has been called.  startAccepting() can also be used to
372    * resume accepting connections after a call to pauseAccepting().
373    *
374    * If startAccepting() is called when there are no accept callbacks
375    * installed, the socket will not actually begin accepting until an accept
376    * callback is added.
377    *
378    * This method may only be called from the primary EventBase thread.
379    */
380   virtual void startAccepting();
381
382   /**
383    * Pause accepting connections.
384    *
385    * startAccepting() may be called to resume accepting.
386    *
387    * This method may only be called from the primary EventBase thread.
388    * If there are AcceptCallbacks being driven by other EventBase threads they
389    * may continue to receive callbacks for a short period of time after
390    * pauseAccepting() returns.
391    *
392    * Unlike removeAcceptCallback() or destroy(), acceptStopped() will not be
393    * called on the AcceptCallback objects simply due to a temporary pause.  If
394    * the server socket is later destroyed while paused, acceptStopped() will be
395    * called all of the installed AcceptCallbacks.
396    */
397   void pauseAccepting();
398
399   /**
400    * Shutdown the listen socket and notify all callbacks that accept has
401    * stopped, but don't close the socket.  This invokes shutdown(2) with the
402    * supplied argument.  Passing -1 will close the socket now.  Otherwise, the
403    * close will be delayed until this object is destroyed.
404    *
405    * Only use this if you have reason to pass special flags to shutdown.
406    * Otherwise just destroy the socket.
407    *
408    * This method has no effect when a ShutdownSocketSet option is used.
409    *
410    * Returns the result of shutdown on sockets_[n-1]
411    */
412   int stopAccepting(int shutdownFlags = -1);
413
414   /**
415    * Get the maximum number of connections that will be accepted each time
416    * around the event loop.
417    */
418   uint32_t getMaxAcceptAtOnce() const {
419     return maxAcceptAtOnce_;
420   }
421
422   /**
423    * Set the maximum number of connections that will be accepted each time
424    * around the event loop.
425    *
426    * This provides a very coarse-grained way of controlling how fast the
427    * AsyncServerSocket will accept connections.  If you find that when your
428    * server is overloaded AsyncServerSocket accepts connections more quickly
429    * than your code can process them, you can try lowering this number so that
430    * fewer connections will be accepted each event loop iteration.
431    *
432    * For more explicit control over the accept rate, you can also use
433    * pauseAccepting() to temporarily pause accepting when your server is
434    * overloaded, and then use startAccepting() later to resume accepting.
435    */
436   void setMaxAcceptAtOnce(uint32_t numConns) {
437     maxAcceptAtOnce_ = numConns;
438   }
439
440   /**
441    * Get the maximum number of unprocessed messages which a NotificationQueue
442    * can hold.
443    */
444   uint32_t getMaxNumMessagesInQueue() const {
445     return maxNumMsgsInQueue_;
446   }
447
448   /**
449    * Set the maximum number of unprocessed messages in NotificationQueue.
450    * No new message will be sent to that NotificationQueue if there are more
451    * than such number of unprocessed messages in that queue.
452    *
453    * Only works if called before addAcceptCallback.
454    */
455   void setMaxNumMessagesInQueue(uint32_t num) {
456     maxNumMsgsInQueue_ = num;
457   }
458
459   /**
460    * Get the speed of adjusting connection accept rate.
461    */
462   double getAcceptRateAdjustSpeed() const {
463     return acceptRateAdjustSpeed_;
464   }
465
466   /**
467    * Set the speed of adjusting connection accept rate.
468    */
469   void setAcceptRateAdjustSpeed(double speed) {
470     acceptRateAdjustSpeed_ = speed;
471   }
472
473   /**
474    * Get the number of connections dropped by the AsyncServerSocket
475    */
476   uint64_t getNumDroppedConnections() const {
477     return numDroppedConnections_;
478   }
479
480   /**
481    * Set whether or not SO_KEEPALIVE should be enabled on the server socket
482    * (and thus on all subsequently-accepted connections). By default, keepalive
483    * is enabled.
484    *
485    * Note that TCP keepalive usually only kicks in after the connection has
486    * been idle for several hours. Applications should almost always have their
487    * own, shorter idle timeout.
488    */
489   void setKeepAliveEnabled(bool enabled) {
490     keepAliveEnabled_ = enabled;
491
492     for (auto& handler : sockets_) {
493       if (handler.socket_ < 0) {
494         continue;
495       }
496
497       int val = (enabled) ? 1 : 0;
498       if (setsockopt(handler.socket_, SOL_SOCKET,
499                      SO_KEEPALIVE, &val, sizeof(val)) != 0) {
500         LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: %s" <<
501                 strerror(errno);
502       }
503     }
504   }
505
506   /**
507    * Get whether or not SO_KEEPALIVE is enabled on the server socket.
508    */
509   bool getKeepAliveEnabled() const {
510     return keepAliveEnabled_;
511   }
512
513   /**
514    * Set whether or not the socket should close during exec() (FD_CLOEXEC). By
515    * default, this is enabled
516    */
517   void setCloseOnExec(bool closeOnExec) {
518     closeOnExec_ = closeOnExec;
519   }
520
521   /**
522    * Get whether or not FD_CLOEXEC is enabled on the server socket.
523    */
524   bool getCloseOnExec() const {
525     return closeOnExec_;
526   }
527
528  protected:
529   /**
530    * Protected destructor.
531    *
532    * Invoke destroy() instead to destroy the AsyncServerSocket.
533    */
534   virtual ~AsyncServerSocket();
535
536  private:
537   enum class MessageType {
538     MSG_NEW_CONN = 0,
539     MSG_ERROR = 1
540   };
541
542   struct QueueMessage {
543     MessageType type;
544     int fd;
545     int err;
546     SocketAddress address;
547     std::string msg;
548   };
549
550   /**
551    * A class to receive notifications to invoke AcceptCallback objects
552    * in other EventBase threads.
553    *
554    * A RemoteAcceptor object is created for each AcceptCallback that
555    * is installed in a separate EventBase thread.  The RemoteAcceptor
556    * receives notification of new sockets via a NotificationQueue,
557    * and then invokes the AcceptCallback.
558    */
559   class RemoteAcceptor
560       : private NotificationQueue<QueueMessage>::Consumer {
561   public:
562     explicit RemoteAcceptor(AcceptCallback *callback)
563       : callback_(callback) {}
564
565     ~RemoteAcceptor() {}
566
567     void start(EventBase *eventBase, uint32_t maxAtOnce, uint32_t maxInQueue);
568     void stop(EventBase* eventBase, AcceptCallback* callback);
569
570     virtual void messageAvailable(QueueMessage&& message);
571
572     NotificationQueue<QueueMessage>* getQueue() {
573       return &queue_;
574     }
575
576   private:
577     AcceptCallback *callback_;
578
579     NotificationQueue<QueueMessage> queue_;
580   };
581
582   /**
583    * A struct to keep track of the callbacks associated with this server
584    * socket.
585    */
586   struct CallbackInfo {
587     CallbackInfo(AcceptCallback *cb, EventBase *evb)
588       : callback(cb),
589         eventBase(evb),
590         consumer(nullptr) {}
591
592     AcceptCallback *callback;
593     EventBase *eventBase;
594
595     RemoteAcceptor* consumer;
596   };
597
598   class BackoffTimeout;
599
600   virtual void handlerReady(
601     uint16_t events, int socket, sa_family_t family) noexcept;
602
603   int createSocket(int family);
604   void setupSocket(int fd);
605   void dispatchSocket(int socket, SocketAddress&& address);
606   void dispatchError(const char *msg, int errnoValue);
607   void enterBackoff();
608   void backoffTimeoutExpired();
609
610   CallbackInfo* nextCallback() {
611     CallbackInfo* info = &callbacks_[callbackIndex_];
612
613     ++callbackIndex_;
614     if (callbackIndex_ >= callbacks_.size()) {
615       callbackIndex_ = 0;
616     }
617
618     return info;
619   }
620
621   struct ServerEventHandler : public EventHandler {
622     ServerEventHandler(EventBase* eventBase, int socket,
623                        AsyncServerSocket* parent,
624                       sa_family_t addressFamily)
625         : EventHandler(eventBase, socket)
626         , eventBase_(eventBase)
627         , socket_(socket)
628         , parent_(parent)
629         , addressFamily_(addressFamily) {}
630
631     ServerEventHandler(const ServerEventHandler& other)
632     : EventHandler(other.eventBase_, other.socket_)
633     , eventBase_(other.eventBase_)
634     , socket_(other.socket_)
635     , parent_(other.parent_)
636     , addressFamily_(other.addressFamily_) {}
637
638     ServerEventHandler& operator=(
639         const ServerEventHandler& other) {
640       if (this != &other) {
641         eventBase_ = other.eventBase_;
642         socket_ = other.socket_;
643         parent_ = other.parent_;
644         addressFamily_ = other.addressFamily_;
645
646         detachEventBase();
647         attachEventBase(other.eventBase_);
648         changeHandlerFD(other.socket_);
649       }
650       return *this;
651     }
652
653     // Inherited from EventHandler
654     virtual void handlerReady(uint16_t events) noexcept {
655       parent_->handlerReady(events, socket_, addressFamily_);
656     }
657
658     EventBase* eventBase_;
659     int socket_;
660     AsyncServerSocket* parent_;
661     sa_family_t addressFamily_;
662   };
663
664   EventBase *eventBase_;
665   std::vector<ServerEventHandler> sockets_;
666   std::vector<int> pendingCloseSockets_;
667   bool accepting_;
668   uint32_t maxAcceptAtOnce_;
669   uint32_t maxNumMsgsInQueue_;
670   double acceptRateAdjustSpeed_;  //0 to disable auto adjust
671   double acceptRate_;
672   std::chrono::time_point<std::chrono::steady_clock> lastAccepTimestamp_;
673   uint64_t numDroppedConnections_;
674   uint32_t callbackIndex_;
675   BackoffTimeout *backoffTimeout_;
676   std::vector<CallbackInfo> callbacks_;
677   bool keepAliveEnabled_;
678   bool closeOnExec_;
679   ShutdownSocketSet* shutdownSocketSet_;
680 };
681
682 } // folly