/*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
// Disallow copy, move, and default construction.
AsyncServerSocket(AsyncServerSocket&&) = delete;
+ /**
+ * A callback interface to get notified of client socket events.
+ *
+ * The ConnectionEventCallback implementations need to be thread-safe as the
+ * callbacks may be called from different threads.
+ */
+ class ConnectionEventCallback {
+ public:
+ virtual ~ConnectionEventCallback() = default;
+
+ /**
+ * onConnectionAccepted() is called right after a client connection
+ * is accepted using the system accept()/accept4() APIs.
+ */
+ virtual void onConnectionAccepted(const int socket,
+ const SocketAddress& addr) noexcept = 0;
+
+ /**
+ * onConnectionAcceptError() is called when an error occurred accepting
+ * a connection.
+ */
+ virtual void onConnectionAcceptError(const int err) noexcept = 0;
+
+ /**
+ * onConnectionDropped() is called when a connection is dropped,
+ * probably because of some error encountered.
+ */
+ virtual void onConnectionDropped(const int socket,
+ const SocketAddress& addr) noexcept = 0;
+
+ /**
+ * onConnectionEnqueuedForAcceptorCallback() is called when the
+ * connection is successfully enqueued for an AcceptCallback to pick up.
+ */
+ virtual void onConnectionEnqueuedForAcceptorCallback(
+ const int socket,
+ const SocketAddress& addr) noexcept = 0;
+
+ /**
+ * onConnectionDequeuedByAcceptorCallback() is called when the
+ * connection is successfully dequeued by an AcceptCallback.
+ */
+ virtual void onConnectionDequeuedByAcceptorCallback(
+ const int socket,
+ const SocketAddress& addr) noexcept = 0;
+
+ /**
+ * onBackoffStarted is called when the socket has successfully started
+ * backing off accepting new client sockets.
+ */
+ virtual void onBackoffStarted() noexcept = 0;
+
+ /**
+ * onBackoffEnded is called when the backoff period has ended and the socket
+ * has successfully resumed accepting new connections if there is any
+ * AcceptCallback registered.
+ */
+ virtual void onBackoffEnded() noexcept = 0;
+
+ /**
+ * onBackoffError is called when there is an error entering backoff
+ */
+ virtual void onBackoffError() noexcept = 0;
+ };
+
class AcceptCallback {
public:
virtual ~AcceptCallback() = default;
static const uint32_t kDefaultMaxAcceptAtOnce = 30;
static const uint32_t kDefaultCallbackAcceptAtOnce = 5;
- static const uint32_t kDefaultMaxMessagesInQueue = 0;
+ static const uint32_t kDefaultMaxMessagesInQueue = 1024;
/**
* Create a new AsyncServerSocket with the specified EventBase.
*
*
* When a new socket is accepted, one of the AcceptCallbacks will be invoked
* with the new socket. The AcceptCallbacks are invoked in a round-robin
- * fashion. This allows the accepted sockets to distributed among a pool of
- * threads, each running its own EventBase object. This is a common model,
+ * fashion. This allows the accepted sockets to be distributed among a pool
+ * of threads, each running its own EventBase object. This is a common model,
* since most asynchronous-style servers typically run one EventBase thread
* per CPU.
*
return numDroppedConnections_;
}
+ /**
+ * Get the current number of unprocessed messages in NotificationQueue.
+ *
+ * This method must be invoked from the AsyncServerSocket's primary
+ * EventBase thread. Use EventBase::runInEventBaseThread() to schedule the
+ * operation in the correct EventBase if your code is not in the server
+ * socket's primary EventBase.
+ */
+ int64_t getNumPendingMessagesInQueue() const {
+ assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ int64_t numMsgs = 0;
+ for (const auto& callback : callbacks_) {
+ numMsgs += callback.consumer->getQueue()->size();
+ }
+ return numMsgs;
+ }
+
/**
* Set whether or not SO_KEEPALIVE should be enabled on the server socket
* (and thus on all subsequently-accepted connections). By default, keepalive
return closeOnExec_;
}
+ /**
+ * Get whether or not the socket is accepting new connections
+ */
+ bool getAccepting() const {
+ return accepting_;
+ }
+
+ /**
+ * Set the ConnectionEventCallback
+ */
+ void setConnectionEventCallback(
+ ConnectionEventCallback* const connectionEventCallback) {
+ connectionEventCallback_ = connectionEventCallback;
+ }
+
+ /**
+ * Get the ConnectionEventCallback
+ */
+ ConnectionEventCallback* getConnectionEventCallback() const {
+ return connectionEventCallback_;
+ }
+
protected:
/**
* Protected destructor.
class RemoteAcceptor
: private NotificationQueue<QueueMessage>::Consumer {
public:
- explicit RemoteAcceptor(AcceptCallback *callback)
- : callback_(callback) {}
+ explicit RemoteAcceptor(AcceptCallback *callback,
+ ConnectionEventCallback *connectionEventCallback)
+ : callback_(callback),
+ connectionEventCallback_(connectionEventCallback) {}
~RemoteAcceptor() = default;
private:
AcceptCallback *callback_;
+ ConnectionEventCallback* connectionEventCallback_;
NotificationQueue<QueueMessage> queue_;
};
uint16_t events, int socket, sa_family_t family) noexcept;
int createSocket(int family);
- void setupSocket(int fd);
+ void setupSocket(int fd, int family);
void bindSocket(int fd, const SocketAddress& address, bool isExistingSocket);
void dispatchSocket(int socket, SocketAddress&& address);
void dispatchError(const char *msg, int errnoValue);
bool reusePortEnabled_{false};
bool closeOnExec_;
ShutdownSocketSet* shutdownSocketSet_;
+ ConnectionEventCallback* connectionEventCallback_{nullptr};
};
} // folly