X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2Fasync%2FAsyncServerSocket.h;h=6589c667e91b6a047d225cb17ea2784b857f86d7;hb=fbc4c23895b0ee3874d9a36401d580a2a8957ba9;hp=935e1917049722fea6079d9af4655e2c60be52da;hpb=19b1f5f765dc59354416fbd6df24fa7a39d4b390;p=folly.git diff --git a/folly/io/async/AsyncServerSocket.h b/folly/io/async/AsyncServerSocket.h index 935e1917..6589c667 100644 --- a/folly/io/async/AsyncServerSocket.h +++ b/folly/io/async/AsyncServerSocket.h @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,21 +16,21 @@ #pragma once +#include +#include +#include +#include #include -#include #include +#include #include -#include -#include -#include -#include -#include -#include -#include +#include + #include #include -#include - +#include +#include +#include // Due to the way kernel headers are included, this may or may not be defined. // Number pulled from 3.10 kernel headers. @@ -38,6 +38,10 @@ #define SO_REUSEPORT 15 #endif +#if defined __linux__ && !defined SO_NO_TRANSPARENT_TLS +#define SO_NO_TRANSPARENT_TLS 200 +#endif + namespace folly { /** @@ -64,6 +68,71 @@ class AsyncServerSocket : public DelayedDestruction // Disallow copy, move, and default construction. AsyncServerSocket(AsyncServerSocket&&) = delete; + /** + * A callback interface to get notified of client socket events. + * + * The ConnectionEventCallback implementations need to be thread-safe as the + * callbacks may be called from different threads. + */ + class ConnectionEventCallback { + public: + virtual ~ConnectionEventCallback() = default; + + /** + * onConnectionAccepted() is called right after a client connection + * is accepted using the system accept()/accept4() APIs. + */ + virtual void onConnectionAccepted(const int socket, + const SocketAddress& addr) noexcept = 0; + + /** + * onConnectionAcceptError() is called when an error occurred accepting + * a connection. + */ + virtual void onConnectionAcceptError(const int err) noexcept = 0; + + /** + * onConnectionDropped() is called when a connection is dropped, + * probably because of some error encountered. + */ + virtual void onConnectionDropped(const int socket, + const SocketAddress& addr) noexcept = 0; + + /** + * onConnectionEnqueuedForAcceptorCallback() is called when the + * connection is successfully enqueued for an AcceptCallback to pick up. + */ + virtual void onConnectionEnqueuedForAcceptorCallback( + const int socket, + const SocketAddress& addr) noexcept = 0; + + /** + * onConnectionDequeuedByAcceptorCallback() is called when the + * connection is successfully dequeued by an AcceptCallback. + */ + virtual void onConnectionDequeuedByAcceptorCallback( + const int socket, + const SocketAddress& addr) noexcept = 0; + + /** + * onBackoffStarted is called when the socket has successfully started + * backing off accepting new client sockets. + */ + virtual void onBackoffStarted() noexcept = 0; + + /** + * onBackoffEnded is called when the backoff period has ended and the socket + * has successfully resumed accepting new connections if there is any + * AcceptCallback registered. + */ + virtual void onBackoffEnded() noexcept = 0; + + /** + * onBackoffError is called when there is an error entering backoff + */ + virtual void onBackoffError() noexcept = 0; + }; + class AcceptCallback { public: virtual ~AcceptCallback() = default; @@ -135,7 +204,7 @@ class AsyncServerSocket : public DelayedDestruction static const uint32_t kDefaultMaxAcceptAtOnce = 30; static const uint32_t kDefaultCallbackAcceptAtOnce = 5; - static const uint32_t kDefaultMaxMessagesInQueue = 0; + static const uint32_t kDefaultMaxMessagesInQueue = 1024; /** * Create a new AsyncServerSocket with the specified EventBase. * @@ -177,7 +246,7 @@ class AsyncServerSocket : public DelayedDestruction * time after destroy() returns. They will not receive any more callback * invocations once acceptStopped() is invoked. */ - virtual void destroy(); + void destroy() override; /** * Attach this AsyncServerSocket to its primary EventBase. @@ -199,7 +268,7 @@ class AsyncServerSocket : public DelayedDestruction /** * Get the EventBase used by this socket. */ - EventBase* getEventBase() const { + EventBase* getEventBase() const override { return eventBase_; } @@ -250,6 +319,11 @@ class AsyncServerSocket : public DelayedDestruction } } + /* enable zerocopy support for the server sockets - the s = accept sockets + * inherit it + */ + bool setZeroCopy(bool enable); + /** * Bind to the specified address. * @@ -284,7 +358,18 @@ class AsyncServerSocket : public DelayedDestruction * * Throws TTransportException on error. */ - void getAddress(SocketAddress* addressReturn) const; + void getAddress(SocketAddress* addressReturn) const override; + + /** + * Get the local address to which the socket is bound. + * + * Throws TTransportException on error. + */ + SocketAddress getAddress() const { + SocketAddress ret; + getAddress(&ret); + return ret; + } /** * Get all the local addresses to which the socket is bound. @@ -320,8 +405,8 @@ class AsyncServerSocket : public DelayedDestruction * * When a new socket is accepted, one of the AcceptCallbacks will be invoked * with the new socket. The AcceptCallbacks are invoked in a round-robin - * fashion. This allows the accepted sockets to distributed among a pool of - * threads, each running its own EventBase object. This is a common model, + * fashion. This allows the accepted sockets to be distributed among a pool + * of threads, each running its own EventBase object. This is a common model, * since most asynchronous-style servers typically run one EventBase thread * per CPU. * @@ -499,6 +584,25 @@ class AsyncServerSocket : public DelayedDestruction return numDroppedConnections_; } + /** + * Get the current number of unprocessed messages in NotificationQueue. + * + * This method must be invoked from the AsyncServerSocket's primary + * EventBase thread. Use EventBase::runInEventBaseThread() to schedule the + * operation in the correct EventBase if your code is not in the server + * socket's primary EventBase. + */ + int64_t getNumPendingMessagesInQueue() const { + if (eventBase_) { + eventBase_->dcheckIsInEventBaseThread(); + } + int64_t numMsgs = 0; + for (const auto& callback : callbacks_) { + numMsgs += callback.consumer->getQueue()->size(); + } + return numMsgs; + } + /** * Set whether or not SO_KEEPALIVE should be enabled on the server socket * (and thus on all subsequently-accepted connections). By default, keepalive @@ -577,6 +681,21 @@ class AsyncServerSocket : public DelayedDestruction return closeOnExec_; } + /** + * Tries to enable TFO if the machine supports it. + */ + void setTFOEnabled(bool enabled, uint32_t maxTFOQueueSize) { + tfo_ = enabled; + tfoMaxQueueSize_ = maxTFOQueueSize; + } + + /** + * Do not attempt the transparent TLS handshake + */ + void disableTransparentTls() { + noTransparentTls_ = true; + } + /** * Get whether or not the socket is accepting new connections */ @@ -584,13 +703,28 @@ class AsyncServerSocket : public DelayedDestruction return accepting_; } + /** + * Set the ConnectionEventCallback + */ + void setConnectionEventCallback( + ConnectionEventCallback* const connectionEventCallback) { + connectionEventCallback_ = connectionEventCallback; + } + + /** + * Get the ConnectionEventCallback + */ + ConnectionEventCallback* getConnectionEventCallback() const { + return connectionEventCallback_; + } + protected: /** * Protected destructor. * * Invoke destroy() instead to destroy the AsyncServerSocket. */ - virtual ~AsyncServerSocket(); + ~AsyncServerSocket() override; private: enum class MessageType { @@ -617,23 +751,26 @@ class AsyncServerSocket : public DelayedDestruction */ class RemoteAcceptor : private NotificationQueue::Consumer { - public: - explicit RemoteAcceptor(AcceptCallback *callback) - : callback_(callback) {} + public: + explicit RemoteAcceptor(AcceptCallback *callback, + ConnectionEventCallback *connectionEventCallback) + : callback_(callback), + connectionEventCallback_(connectionEventCallback) {} - ~RemoteAcceptor() = default; + ~RemoteAcceptor() override = default; void start(EventBase *eventBase, uint32_t maxAtOnce, uint32_t maxInQueue); void stop(EventBase* eventBase, AcceptCallback* callback); - virtual void messageAvailable(QueueMessage&& message); + void messageAvailable(QueueMessage&& message) noexcept override; NotificationQueue* getQueue() { return &queue_; } - private: + private: AcceptCallback *callback_; + ConnectionEventCallback* connectionEventCallback_; NotificationQueue queue_; }; @@ -660,7 +797,7 @@ class AsyncServerSocket : public DelayedDestruction uint16_t events, int socket, sa_family_t family) noexcept; int createSocket(int family); - void setupSocket(int fd); + void setupSocket(int fd, int family); void bindSocket(int fd, const SocketAddress& address, bool isExistingSocket); void dispatchSocket(int socket, SocketAddress&& address); void dispatchError(const char *msg, int errnoValue); @@ -711,7 +848,7 @@ class AsyncServerSocket : public DelayedDestruction } // Inherited from EventHandler - virtual void handlerReady(uint16_t events) noexcept { + void handlerReady(uint16_t events) noexcept override { parent_->handlerReady(events, socket_, addressFamily_); } @@ -737,7 +874,11 @@ class AsyncServerSocket : public DelayedDestruction bool keepAliveEnabled_; bool reusePortEnabled_{false}; bool closeOnExec_; + bool tfo_{false}; + bool noTransparentTls_{false}; + uint32_t tfoMaxQueueSize_{0}; ShutdownSocketSet* shutdownSocketSet_; + ConnectionEventCallback* connectionEventCallback_{nullptr}; }; -} // folly +} // namespace folly