X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fio%2Fasync%2FAsyncServerSocket.h;h=dcebbc59fb7ed15c5bf92fdb9ebda3062e8497bf;hp=935e1917049722fea6079d9af4655e2c60be52da;hb=321542683a01c3f334047531e9b487f047129775;hpb=19b1f5f765dc59354416fbd6df24fa7a39d4b390 diff --git a/folly/io/async/AsyncServerSocket.h b/folly/io/async/AsyncServerSocket.h index 935e1917..dcebbc59 100644 --- a/folly/io/async/AsyncServerSocket.h +++ b/folly/io/async/AsyncServerSocket.h @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2016 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -64,6 +64,71 @@ class AsyncServerSocket : public DelayedDestruction // Disallow copy, move, and default construction. AsyncServerSocket(AsyncServerSocket&&) = delete; + /** + * A callback interface to get notified of client socket events. + * + * The ConnectionEventCallback implementations need to be thread-safe as the + * callbacks may be called from different threads. + */ + class ConnectionEventCallback { + public: + virtual ~ConnectionEventCallback() = default; + + /** + * onConnectionAccepted() is called right after a client connection + * is accepted using the system accept()/accept4() APIs. + */ + virtual void onConnectionAccepted(const int socket, + const SocketAddress& addr) noexcept = 0; + + /** + * onConnectionAcceptError() is called when an error occurred accepting + * a connection. + */ + virtual void onConnectionAcceptError(const int err) noexcept = 0; + + /** + * onConnectionDropped() is called when a connection is dropped, + * probably because of some error encountered. + */ + virtual void onConnectionDropped(const int socket, + const SocketAddress& addr) noexcept = 0; + + /** + * onConnectionEnqueuedForAcceptorCallback() is called when the + * connection is successfully enqueued for an AcceptCallback to pick up. + */ + virtual void onConnectionEnqueuedForAcceptorCallback( + const int socket, + const SocketAddress& addr) noexcept = 0; + + /** + * onConnectionDequeuedByAcceptorCallback() is called when the + * connection is successfully dequeued by an AcceptCallback. + */ + virtual void onConnectionDequeuedByAcceptorCallback( + const int socket, + const SocketAddress& addr) noexcept = 0; + + /** + * onBackoffStarted is called when the socket has successfully started + * backing off accepting new client sockets. + */ + virtual void onBackoffStarted() noexcept = 0; + + /** + * onBackoffEnded is called when the backoff period has ended and the socket + * has successfully resumed accepting new connections if there is any + * AcceptCallback registered. + */ + virtual void onBackoffEnded() noexcept = 0; + + /** + * onBackoffError is called when there is an error entering backoff + */ + virtual void onBackoffError() noexcept = 0; + }; + class AcceptCallback { public: virtual ~AcceptCallback() = default; @@ -135,7 +200,7 @@ class AsyncServerSocket : public DelayedDestruction static const uint32_t kDefaultMaxAcceptAtOnce = 30; static const uint32_t kDefaultCallbackAcceptAtOnce = 5; - static const uint32_t kDefaultMaxMessagesInQueue = 0; + static const uint32_t kDefaultMaxMessagesInQueue = 1024; /** * Create a new AsyncServerSocket with the specified EventBase. * @@ -320,8 +385,8 @@ class AsyncServerSocket : public DelayedDestruction * * When a new socket is accepted, one of the AcceptCallbacks will be invoked * with the new socket. The AcceptCallbacks are invoked in a round-robin - * fashion. This allows the accepted sockets to distributed among a pool of - * threads, each running its own EventBase object. This is a common model, + * fashion. This allows the accepted sockets to be distributed among a pool + * of threads, each running its own EventBase object. This is a common model, * since most asynchronous-style servers typically run one EventBase thread * per CPU. * @@ -499,6 +564,23 @@ class AsyncServerSocket : public DelayedDestruction return numDroppedConnections_; } + /** + * Get the current number of unprocessed messages in NotificationQueue. + * + * This method must be invoked from the AsyncServerSocket's primary + * EventBase thread. Use EventBase::runInEventBaseThread() to schedule the + * operation in the correct EventBase if your code is not in the server + * socket's primary EventBase. + */ + int64_t getNumPendingMessagesInQueue() const { + assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + int64_t numMsgs = 0; + for (const auto& callback : callbacks_) { + numMsgs += callback.consumer->getQueue()->size(); + } + return numMsgs; + } + /** * Set whether or not SO_KEEPALIVE should be enabled on the server socket * (and thus on all subsequently-accepted connections). By default, keepalive @@ -584,6 +666,21 @@ class AsyncServerSocket : public DelayedDestruction return accepting_; } + /** + * Set the ConnectionEventCallback + */ + void setConnectionEventCallback( + ConnectionEventCallback* const connectionEventCallback) { + connectionEventCallback_ = connectionEventCallback; + } + + /** + * Get the ConnectionEventCallback + */ + ConnectionEventCallback* getConnectionEventCallback() const { + return connectionEventCallback_; + } + protected: /** * Protected destructor. @@ -618,8 +715,10 @@ class AsyncServerSocket : public DelayedDestruction class RemoteAcceptor : private NotificationQueue::Consumer { public: - explicit RemoteAcceptor(AcceptCallback *callback) - : callback_(callback) {} + explicit RemoteAcceptor(AcceptCallback *callback, + ConnectionEventCallback *connectionEventCallback) + : callback_(callback), + connectionEventCallback_(connectionEventCallback) {} ~RemoteAcceptor() = default; @@ -634,6 +733,7 @@ class AsyncServerSocket : public DelayedDestruction private: AcceptCallback *callback_; + ConnectionEventCallback* connectionEventCallback_; NotificationQueue queue_; }; @@ -660,7 +760,7 @@ class AsyncServerSocket : public DelayedDestruction uint16_t events, int socket, sa_family_t family) noexcept; int createSocket(int family); - void setupSocket(int fd); + void setupSocket(int fd, int family); void bindSocket(int fd, const SocketAddress& address, bool isExistingSocket); void dispatchSocket(int socket, SocketAddress&& address); void dispatchError(const char *msg, int errnoValue); @@ -738,6 +838,7 @@ class AsyncServerSocket : public DelayedDestruction bool reusePortEnabled_{false}; bool closeOnExec_; ShutdownSocketSet* shutdownSocketSet_; + ConnectionEventCallback* connectionEventCallback_{nullptr}; }; } // folly