2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/io/async/DelayedDestruction.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/NotificationQueue.h>
23 #include <folly/io/async/AsyncTimeout.h>
24 #include <folly/io/ShutdownSocketSet.h>
25 #include <folly/SocketAddress.h>
31 #include <sys/socket.h>
36 * A listening socket that asynchronously informs a callback whenever a new
37 * connection has been accepted.
39 * Unlike most async interfaces that always invoke their callback in the same
40 * EventBase thread, AsyncServerSocket is unusual in that it can distribute
41 * the callbacks across multiple EventBase threads.
43 * This supports a common use case for network servers to distribute incoming
44 * connections across a number of EventBase threads. (Servers typically run
45 * with one EventBase thread per CPU.)
47 * Despite being able to invoke callbacks in multiple EventBase threads,
48 * AsyncServerSocket still has one "primary" EventBase. Operations that
49 * modify the AsyncServerSocket state may only be performed from the primary
52 class AsyncServerSocket : public DelayedDestruction {
54 typedef std::unique_ptr<AsyncServerSocket, Destructor> UniquePtr;
56 class AcceptCallback {
58 virtual ~AcceptCallback() {}
61 * connectionAccepted() is called whenever a new client connection is
64 * The AcceptCallback will remain installed after connectionAccepted()
67 * @param fd The newly accepted client socket. The AcceptCallback
68 * assumes ownership of this socket, and is responsible
69 * for closing it when done. The newly accepted file
70 * descriptor will have already been put into
72 * @param clientAddr A reference to a TSocketAddress struct containing the
73 * client's address. This struct is only guaranteed to
74 * remain valid until connectionAccepted() returns.
76 virtual void connectionAccepted(int fd,
77 const SocketAddress& clientAddr)
81 * acceptError() is called if an error occurs while accepting.
83 * The AcceptCallback will remain installed even after an accept error,
84 * as the errors are typically somewhat transient, such as being out of
85 * file descriptors. The server socket must be explicitly stopped if you
86 * wish to stop accepting after an error.
88 * @param ex An exception representing the error.
90 virtual void acceptError(const std::exception& ex) noexcept = 0;
93 * acceptStarted() will be called in the callback's EventBase thread
94 * after this callback has been added to the AsyncServerSocket.
96 * acceptStarted() will be called before any calls to connectionAccepted()
97 * or acceptError() are made on this callback.
99 * acceptStarted() makes it easier for callbacks to perform initialization
100 * inside the callback thread. (The call to addAcceptCallback() must
101 * always be made from the AsyncServerSocket's primary EventBase thread.
102 * acceptStarted() provides a hook that will always be invoked in the
103 * callback's thread.)
105 * Note that the call to acceptStarted() is made once the callback is
106 * added, regardless of whether or not the AsyncServerSocket is actually
107 * accepting at the moment. acceptStarted() will be called even if the
108 * AsyncServerSocket is paused when the callback is added (including if
109 * the initial call to startAccepting() on the AsyncServerSocket has not
112 virtual void acceptStarted() noexcept {}
115 * acceptStopped() will be called when this AcceptCallback is removed from
116 * the AsyncServerSocket, or when the AsyncServerSocket is destroyed,
117 * whichever occurs first.
119 * No more calls to connectionAccepted() or acceptError() will be made
120 * after acceptStopped() is invoked.
122 virtual void acceptStopped() noexcept {}
125 static const uint32_t kDefaultMaxAcceptAtOnce = 30;
126 static const uint32_t kDefaultCallbackAcceptAtOnce = 5;
127 static const uint32_t kDefaultMaxMessagesInQueue = 0;
129 * Create a new AsyncServerSocket with the specified EventBase.
131 * @param eventBase The EventBase to use for driving the asynchronous I/O.
132 * If this parameter is nullptr, attachEventBase() must be
133 * called before this socket can begin accepting
136 explicit AsyncServerSocket(EventBase* eventBase = nullptr);
139 * Helper function to create a shared_ptr<AsyncServerSocket>.
141 * This passes in the correct destructor object, since AsyncServerSocket's
142 * destructor is protected and cannot be invoked directly.
144 static std::shared_ptr<AsyncServerSocket>
145 newSocket(EventBase* evb = nullptr) {
146 return std::shared_ptr<AsyncServerSocket>(new AsyncServerSocket(evb),
150 void setShutdownSocketSet(ShutdownSocketSet* newSS);
153 * Destroy the socket.
155 * AsyncServerSocket::destroy() must be called to destroy the socket.
156 * The normal destructor is private, and should not be invoked directly.
157 * This prevents callers from deleting a AsyncServerSocket while it is
158 * invoking a callback.
160 * destroy() must be invoked from the socket's primary EventBase thread.
162 * If there are AcceptCallbacks still installed when destroy() is called,
163 * acceptStopped() will be called on these callbacks to notify them that
164 * accepting has stopped. Accept callbacks being driven by other EventBase
165 * threads may continue to receive new accept callbacks for a brief period of
166 * time after destroy() returns. They will not receive any more callback
167 * invocations once acceptStopped() is invoked.
169 virtual void destroy();
172 * Attach this AsyncServerSocket to its primary EventBase.
174 * This may only be called if the AsyncServerSocket is not already attached
175 * to a EventBase. The AsyncServerSocket must be attached to a EventBase
176 * before it can begin accepting connections.
178 void attachEventBase(EventBase *eventBase);
181 * Detach the AsyncServerSocket from its primary EventBase.
183 * detachEventBase() may only be called if the AsyncServerSocket is not
184 * currently accepting connections.
186 void detachEventBase();
189 * Get the EventBase used by this socket.
191 EventBase* getEventBase() const {
196 * Create a AsyncServerSocket from an existing socket file descriptor.
198 * useExistingSocket() will cause the AsyncServerSocket to take ownership of
199 * the specified file descriptor, and use it to listen for new connections.
200 * The AsyncServerSocket will close the file descriptor when it is
203 * useExistingSocket() must be called before bind() or listen().
205 * The supplied file descriptor will automatically be put into non-blocking
206 * mode. The caller may have already directly called bind() and possibly
207 * listen on the file descriptor. If so the caller should skip calling the
208 * corresponding AsyncServerSocket::bind() and listen() methods.
210 * On error a TTransportException will be thrown and the caller will retain
211 * ownership of the file descriptor.
213 void useExistingSocket(int fd);
214 void useExistingSockets(const std::vector<int>& fds);
217 * Return the underlying file descriptor
219 std::vector<int> getSockets() const {
220 std::vector<int> sockets;
221 for (auto& handler : sockets_) {
222 sockets.push_back(handler.socket_);
228 * Backwards compatible getSocket, warns if > 1 socket
230 int getSocket() const {
231 if (sockets_.size() > 1) {
232 VLOG(2) << "Warning: getSocket can return multiple fds, " <<
233 "but getSockets was not called, so only returning the first";
235 if (sockets_.size() == 0) {
238 return sockets_[0].socket_;
243 * Bind to the specified address.
245 * This must be called from the primary EventBase thread.
247 * Throws TTransportException on error.
249 virtual void bind(const SocketAddress& address);
252 * Bind to the specified port.
254 * This must be called from the primary EventBase thread.
256 * Throws TTransportException on error.
258 virtual void bind(uint16_t port);
261 * Get the local address to which the socket is bound.
263 * Throws TTransportException on error.
265 void getAddress(SocketAddress* addressReturn) const;
268 * Get all the local addresses to which the socket is bound.
270 * Throws TTransportException on error.
272 std::vector<SocketAddress> getAddresses() const;
275 * Begin listening for connections.
277 * This calls ::listen() with the specified backlog.
279 * Once listen() is invoked the socket will actually be open so that remote
280 * clients may establish connections. (Clients that attempt to connect
281 * before listen() is called will receive a connection refused error.)
283 * At least one callback must be set and startAccepting() must be called to
284 * actually begin notifying the accept callbacks of newly accepted
285 * connections. The backlog parameter controls how many connections the
286 * kernel will accept and buffer internally while the accept callbacks are
287 * paused (or if accepting is enabled but the callbacks cannot keep up).
289 * bind() must be called before calling listen().
290 * listen() must be called from the primary EventBase thread.
292 * Throws TTransportException on error.
294 virtual void listen(int backlog);
297 * Add an AcceptCallback.
299 * When a new socket is accepted, one of the AcceptCallbacks will be invoked
300 * with the new socket. The AcceptCallbacks are invoked in a round-robin
301 * fashion. This allows the accepted sockets to distributed among a pool of
302 * threads, each running its own EventBase object. This is a common model,
303 * since most asynchronous-style servers typically run one EventBase thread
306 * The EventBase object associated with each AcceptCallback must be running
307 * its loop. If the EventBase loop is not running, sockets will still be
308 * scheduled for the callback, but the callback cannot actually get invoked
309 * until the loop runs.
311 * This method must be invoked from the AsyncServerSocket's primary
314 * Note that startAccepting() must be called on the AsyncServerSocket to
315 * cause it to actually start accepting sockets once callbacks have been
318 * @param callback The callback to invoke.
319 * @param eventBase The EventBase to use to invoke the callback. This
320 * parameter may be nullptr, in which case the callback will be invoked in
321 * the AsyncServerSocket's primary EventBase.
322 * @param maxAtOnce The maximum number of connections to accept in this
323 * callback on a single iteration of the event base loop.
324 * This only takes effect when eventBase is non-nullptr.
325 * When using a nullptr eventBase for the callback, the
326 * setMaxAcceptAtOnce() method controls how many
327 * connections the main event base will accept at once.
329 virtual void addAcceptCallback(
330 AcceptCallback *callback,
331 EventBase *eventBase,
332 uint32_t maxAtOnce = kDefaultCallbackAcceptAtOnce);
335 * Remove an AcceptCallback.
337 * This allows a single AcceptCallback to be removed from the round-robin
340 * This method must be invoked from the AsyncServerSocket's primary
341 * EventBase thread. Use EventBase::runInEventBaseThread() to schedule the
342 * operation in the correct EventBase if your code is not in the server
343 * socket's primary EventBase.
345 * Given that the accept callback is being driven by a different EventBase,
346 * the AcceptCallback may continue to be invoked for a short period of time
347 * after removeAcceptCallback() returns in this thread. Once the other
348 * EventBase thread receives the notification to stop, it will call
349 * acceptStopped() on the callback to inform it that it is fully stopped and
350 * will not receive any new sockets.
352 * If the last accept callback is removed while the socket is accepting,
353 * the socket will implicitly pause accepting. If a callback is later added,
354 * it will resume accepting immediately, without requiring startAccepting()
357 * @param callback The callback to uninstall.
358 * @param eventBase The EventBase associated with this callback. This must
359 * be the same EventBase that was used when the callback was installed
360 * with addAcceptCallback().
362 void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase);
365 * Begin accepting connctions on this socket.
367 * bind() and listen() must be called before calling startAccepting().
369 * When a AsyncServerSocket is initially created, it will not begin
370 * accepting connections until at least one callback has been added and
371 * startAccepting() has been called. startAccepting() can also be used to
372 * resume accepting connections after a call to pauseAccepting().
374 * If startAccepting() is called when there are no accept callbacks
375 * installed, the socket will not actually begin accepting until an accept
378 * This method may only be called from the primary EventBase thread.
380 virtual void startAccepting();
383 * Pause accepting connections.
385 * startAccepting() may be called to resume accepting.
387 * This method may only be called from the primary EventBase thread.
388 * If there are AcceptCallbacks being driven by other EventBase threads they
389 * may continue to receive callbacks for a short period of time after
390 * pauseAccepting() returns.
392 * Unlike removeAcceptCallback() or destroy(), acceptStopped() will not be
393 * called on the AcceptCallback objects simply due to a temporary pause. If
394 * the server socket is later destroyed while paused, acceptStopped() will be
395 * called all of the installed AcceptCallbacks.
397 void pauseAccepting();
400 * Shutdown the listen socket and notify all callbacks that accept has
401 * stopped, but don't close the socket. This invokes shutdown(2) with the
402 * supplied argument. Passing -1 will close the socket now. Otherwise, the
403 * close will be delayed until this object is destroyed.
405 * Only use this if you have reason to pass special flags to shutdown.
406 * Otherwise just destroy the socket.
408 * This method has no effect when a ShutdownSocketSet option is used.
410 * Returns the result of shutdown on sockets_[n-1]
412 int stopAccepting(int shutdownFlags = -1);
415 * Get the maximum number of connections that will be accepted each time
416 * around the event loop.
418 uint32_t getMaxAcceptAtOnce() const {
419 return maxAcceptAtOnce_;
423 * Set the maximum number of connections that will be accepted each time
424 * around the event loop.
426 * This provides a very coarse-grained way of controlling how fast the
427 * AsyncServerSocket will accept connections. If you find that when your
428 * server is overloaded AsyncServerSocket accepts connections more quickly
429 * than your code can process them, you can try lowering this number so that
430 * fewer connections will be accepted each event loop iteration.
432 * For more explicit control over the accept rate, you can also use
433 * pauseAccepting() to temporarily pause accepting when your server is
434 * overloaded, and then use startAccepting() later to resume accepting.
436 void setMaxAcceptAtOnce(uint32_t numConns) {
437 maxAcceptAtOnce_ = numConns;
441 * Get the maximum number of unprocessed messages which a NotificationQueue
444 uint32_t getMaxNumMessagesInQueue() const {
445 return maxNumMsgsInQueue_;
449 * Set the maximum number of unprocessed messages in NotificationQueue.
450 * No new message will be sent to that NotificationQueue if there are more
451 * than such number of unprocessed messages in that queue.
453 * Only works if called before addAcceptCallback.
455 void setMaxNumMessagesInQueue(uint32_t num) {
456 maxNumMsgsInQueue_ = num;
460 * Get the speed of adjusting connection accept rate.
462 double getAcceptRateAdjustSpeed() const {
463 return acceptRateAdjustSpeed_;
467 * Set the speed of adjusting connection accept rate.
469 void setAcceptRateAdjustSpeed(double speed) {
470 acceptRateAdjustSpeed_ = speed;
474 * Get the number of connections dropped by the AsyncServerSocket
476 uint64_t getNumDroppedConnections() const {
477 return numDroppedConnections_;
481 * Set whether or not SO_KEEPALIVE should be enabled on the server socket
482 * (and thus on all subsequently-accepted connections). By default, keepalive
485 * Note that TCP keepalive usually only kicks in after the connection has
486 * been idle for several hours. Applications should almost always have their
487 * own, shorter idle timeout.
489 void setKeepAliveEnabled(bool enabled) {
490 keepAliveEnabled_ = enabled;
492 for (auto& handler : sockets_) {
493 if (handler.socket_ < 0) {
497 int val = (enabled) ? 1 : 0;
498 if (setsockopt(handler.socket_, SOL_SOCKET,
499 SO_KEEPALIVE, &val, sizeof(val)) != 0) {
500 LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: %s" <<
507 * Get whether or not SO_KEEPALIVE is enabled on the server socket.
509 bool getKeepAliveEnabled() const {
510 return keepAliveEnabled_;
514 * Set whether or not the socket should close during exec() (FD_CLOEXEC). By
515 * default, this is enabled
517 void setCloseOnExec(bool closeOnExec) {
518 closeOnExec_ = closeOnExec;
522 * Get whether or not FD_CLOEXEC is enabled on the server socket.
524 bool getCloseOnExec() const {
530 * Protected destructor.
532 * Invoke destroy() instead to destroy the AsyncServerSocket.
534 virtual ~AsyncServerSocket();
537 enum class MessageType {
542 struct QueueMessage {
546 SocketAddress address;
551 * A class to receive notifications to invoke AcceptCallback objects
552 * in other EventBase threads.
554 * A RemoteAcceptor object is created for each AcceptCallback that
555 * is installed in a separate EventBase thread. The RemoteAcceptor
556 * receives notification of new sockets via a NotificationQueue,
557 * and then invokes the AcceptCallback.
560 : private NotificationQueue<QueueMessage>::Consumer {
562 explicit RemoteAcceptor(AcceptCallback *callback)
563 : callback_(callback) {}
567 void start(EventBase *eventBase, uint32_t maxAtOnce, uint32_t maxInQueue);
568 void stop(EventBase* eventBase, AcceptCallback* callback);
570 virtual void messageAvailable(QueueMessage&& message);
572 NotificationQueue<QueueMessage>* getQueue() {
577 AcceptCallback *callback_;
579 NotificationQueue<QueueMessage> queue_;
583 * A struct to keep track of the callbacks associated with this server
586 struct CallbackInfo {
587 CallbackInfo(AcceptCallback *cb, EventBase *evb)
592 AcceptCallback *callback;
593 EventBase *eventBase;
595 RemoteAcceptor* consumer;
598 class BackoffTimeout;
600 virtual void handlerReady(
601 uint16_t events, int socket, sa_family_t family) noexcept;
603 int createSocket(int family);
604 void setupSocket(int fd);
605 void dispatchSocket(int socket, SocketAddress&& address);
606 void dispatchError(const char *msg, int errnoValue);
608 void backoffTimeoutExpired();
610 CallbackInfo* nextCallback() {
611 CallbackInfo* info = &callbacks_[callbackIndex_];
614 if (callbackIndex_ >= callbacks_.size()) {
621 struct ServerEventHandler : public EventHandler {
622 ServerEventHandler(EventBase* eventBase, int socket,
623 AsyncServerSocket* parent,
624 sa_family_t addressFamily)
625 : EventHandler(eventBase, socket)
626 , eventBase_(eventBase)
629 , addressFamily_(addressFamily) {}
631 ServerEventHandler(const ServerEventHandler& other)
632 : EventHandler(other.eventBase_, other.socket_)
633 , eventBase_(other.eventBase_)
634 , socket_(other.socket_)
635 , parent_(other.parent_)
636 , addressFamily_(other.addressFamily_) {}
638 ServerEventHandler& operator=(
639 const ServerEventHandler& other) {
640 if (this != &other) {
641 eventBase_ = other.eventBase_;
642 socket_ = other.socket_;
643 parent_ = other.parent_;
644 addressFamily_ = other.addressFamily_;
647 attachEventBase(other.eventBase_);
648 changeHandlerFD(other.socket_);
653 // Inherited from EventHandler
654 virtual void handlerReady(uint16_t events) noexcept {
655 parent_->handlerReady(events, socket_, addressFamily_);
658 EventBase* eventBase_;
660 AsyncServerSocket* parent_;
661 sa_family_t addressFamily_;
664 EventBase *eventBase_;
665 std::vector<ServerEventHandler> sockets_;
666 std::vector<int> pendingCloseSockets_;
668 uint32_t maxAcceptAtOnce_;
669 uint32_t maxNumMsgsInQueue_;
670 double acceptRateAdjustSpeed_; //0 to disable auto adjust
672 std::chrono::time_point<std::chrono::steady_clock> lastAccepTimestamp_;
673 uint64_t numDroppedConnections_;
674 uint32_t callbackIndex_;
675 BackoffTimeout *backoffTimeout_;
676 std::vector<CallbackInfo> callbacks_;
677 bool keepAliveEnabled_;
679 ShutdownSocketSet* shutdownSocketSet_;