/*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <folly/io/async/EventBase.h>
#include <folly/io/async/NotificationQueue.h>
#include <folly/io/async/AsyncTimeout.h>
+#include <folly/io/async/AsyncSocketBase.h>
#include <folly/io/ShutdownSocketSet.h>
#include <folly/SocketAddress.h>
#include <memory>
* modify the AsyncServerSocket state may only be performed from the primary
* EventBase thread.
*/
-class AsyncServerSocket : public DelayedDestruction {
+class AsyncServerSocket : public DelayedDestruction
+ , public AsyncSocketBase {
public:
typedef std::unique_ptr<AsyncServerSocket, Destructor> UniquePtr;
+ // Disallow copy, move, and default construction.
+ AsyncServerSocket(AsyncServerSocket&&) = delete;
+
+ /**
+ * A callback interface to get notified of client socket events.
+ *
+ * The ConnectionEventCallback implementations need to be thread-safe as the
+ * callbacks may be called from different threads.
+ */
+ class ConnectionEventCallback {
+ public:
+ virtual ~ConnectionEventCallback() = default;
+
+ /**
+ * onConnectionAccepted() is called right after a client connection
+ * is accepted using the system accept()/accept4() APIs.
+ */
+ virtual void onConnectionAccepted(const int socket,
+ const SocketAddress& addr) noexcept = 0;
+
+ /**
+ * onConnectionAcceptError() is called when an error occurred accepting
+ * a connection.
+ */
+ virtual void onConnectionAcceptError(const int err) noexcept = 0;
+
+ /**
+ * onConnectionDropped() is called when a connection is dropped,
+ * probably because of some error encountered.
+ */
+ virtual void onConnectionDropped(const int socket,
+ const SocketAddress& addr) noexcept = 0;
+
+ /**
+ * onConnectionEnqueuedForAcceptorCallback() is called when the
+ * connection is successfully enqueued for an AcceptCallback to pick up.
+ */
+ virtual void onConnectionEnqueuedForAcceptorCallback(
+ const int socket,
+ const SocketAddress& addr) noexcept = 0;
+
+ /**
+ * onConnectionDequeuedByAcceptorCallback() is called when the
+ * connection is successfully dequeued by an AcceptCallback.
+ */
+ virtual void onConnectionDequeuedByAcceptorCallback(
+ const int socket,
+ const SocketAddress& addr) noexcept = 0;
+
+ /**
+ * onBackoffStarted is called when the socket has successfully started
+ * backing off accepting new client sockets.
+ */
+ virtual void onBackoffStarted() noexcept = 0;
+
+ /**
+ * onBackoffEnded is called when the backoff period has ended and the socket
+ * has successfully resumed accepting new connections if there is any
+ * AcceptCallback registered.
+ */
+ virtual void onBackoffEnded() noexcept = 0;
+
+ /**
+ * onBackoffError is called when there is an error entering backoff
+ */
+ virtual void onBackoffError() noexcept = 0;
+ };
class AcceptCallback {
public:
- virtual ~AcceptCallback() {}
+ virtual ~AcceptCallback() = default;
/**
* connectionAccepted() is called whenever a new client connection is
* for closing it when done. The newly accepted file
* descriptor will have already been put into
* non-blocking mode.
- * @param clientAddr A reference to a TSocketAddress struct containing the
+ * @param clientAddr A reference to a SocketAddress struct containing the
* client's address. This struct is only guaranteed to
* remain valid until connectionAccepted() returns.
*/
static const uint32_t kDefaultMaxAcceptAtOnce = 30;
static const uint32_t kDefaultCallbackAcceptAtOnce = 5;
- static const uint32_t kDefaultMaxMessagesInQueue = 0;
+ static const uint32_t kDefaultMaxMessagesInQueue = 1024;
/**
* Create a new AsyncServerSocket with the specified EventBase.
*
*/
virtual void bind(const SocketAddress& address);
+ /**
+ * Bind to the specified port for the specified addresses.
+ *
+ * This must be called from the primary EventBase thread.
+ *
+ * Throws TTransportException on error.
+ */
+ virtual void bind(
+ const std::vector<IPAddress>& ipAddresses,
+ uint16_t port);
+
/**
* Bind to the specified port.
*
*
* When a new socket is accepted, one of the AcceptCallbacks will be invoked
* with the new socket. The AcceptCallbacks are invoked in a round-robin
- * fashion. This allows the accepted sockets to distributed among a pool of
- * threads, each running its own EventBase object. This is a common model,
+ * fashion. This allows the accepted sockets to be distributed among a pool
+ * of threads, each running its own EventBase object. This is a common model,
* since most asynchronous-style servers typically run one EventBase thread
* per CPU.
*
return numDroppedConnections_;
}
+ /**
+ * Get the current number of unprocessed messages in NotificationQueue.
+ *
+ * This method must be invoked from the AsyncServerSocket's primary
+ * EventBase thread. Use EventBase::runInEventBaseThread() to schedule the
+ * operation in the correct EventBase if your code is not in the server
+ * socket's primary EventBase.
+ */
+ int64_t getNumPendingMessagesInQueue() const {
+ assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ int64_t numMsgs = 0;
+ for (const auto& callback : callbacks_) {
+ numMsgs += callback.consumer->getQueue()->size();
+ }
+ return numMsgs;
+ }
+
/**
* Set whether or not SO_KEEPALIVE should be enabled on the server socket
* (and thus on all subsequently-accepted connections). By default, keepalive
return closeOnExec_;
}
+ /**
+ * Get whether or not the socket is accepting new connections
+ */
+ bool getAccepting() const {
+ return accepting_;
+ }
+
+ /**
+ * Set the ConnectionEventCallback
+ */
+ void setConnectionEventCallback(
+ ConnectionEventCallback* const connectionEventCallback) {
+ connectionEventCallback_ = connectionEventCallback;
+ }
+
+ /**
+ * Get the ConnectionEventCallback
+ */
+ ConnectionEventCallback* getConnectionEventCallback() const {
+ return connectionEventCallback_;
+ }
+
protected:
/**
* Protected destructor.
class RemoteAcceptor
: private NotificationQueue<QueueMessage>::Consumer {
public:
- explicit RemoteAcceptor(AcceptCallback *callback)
- : callback_(callback) {}
+ explicit RemoteAcceptor(AcceptCallback *callback,
+ ConnectionEventCallback *connectionEventCallback)
+ : callback_(callback),
+ connectionEventCallback_(connectionEventCallback) {}
- ~RemoteAcceptor() {}
+ ~RemoteAcceptor() = default;
void start(EventBase *eventBase, uint32_t maxAtOnce, uint32_t maxInQueue);
void stop(EventBase* eventBase, AcceptCallback* callback);
private:
AcceptCallback *callback_;
+ ConnectionEventCallback* connectionEventCallback_;
NotificationQueue<QueueMessage> queue_;
};
uint16_t events, int socket, sa_family_t family) noexcept;
int createSocket(int family);
- void setupSocket(int fd);
+ void setupSocket(int fd, int family);
+ void bindSocket(int fd, const SocketAddress& address, bool isExistingSocket);
void dispatchSocket(int socket, SocketAddress&& address);
void dispatchError(const char *msg, int errnoValue);
void enterBackoff();
bool reusePortEnabled_{false};
bool closeOnExec_;
ShutdownSocketSet* shutdownSocketSet_;
+ ConnectionEventCallback* connectionEventCallback_{nullptr};
};
} // folly