X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fio%2Fasync%2FAsyncSocket.h;h=5aeb159c65fb635b8bc872688560ed6712348832;hp=33924b6d807ff27cad9105c8ffb67b0367f19b6a;hb=5c74326fdc75ccdfc2152c15203625d8588096b6;hpb=ff9b70f3cd1f05fb8e8c4351248cd9f748c2644a diff --git a/folly/io/async/AsyncSocket.h b/folly/io/async/AsyncSocket.h index 33924b6d..5aeb159c 100644 --- a/folly/io/async/AsyncSocket.h +++ b/folly/io/async/AsyncSocket.h @@ -1,5 +1,5 @@ /* - * Copyright 2014 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,18 +16,21 @@ #pragma once -#include -#include -#include +#include #include -#include +#include #include -#include +#include #include +#include #include -#include #include +#include +#include + +#include +#include #include #include @@ -61,13 +64,24 @@ namespace folly { * responding and no further progress can be made sending the data. */ -class AsyncSocket : virtual public AsyncTransport { +#if defined __linux__ && !defined SO_NO_TRANSPARENT_TLS +#define SO_NO_TRANSPARENT_TLS 200 +#endif + +#ifdef _MSC_VER +// We do a dynamic_cast on this, in +// AsyncTransportWrapper::getUnderlyingTransport so be safe and +// force displacements for it. See: +// https://msdn.microsoft.com/en-us/library/7sf3txa8.aspx +#pragma vtordisp(push, 2) +#endif +class AsyncSocket : virtual public AsyncTransportWrapper { public: typedef std::unique_ptr UniquePtr; class ConnectCallback { public: - virtual ~ConnectCallback() {} + virtual ~ConnectCallback() = default; /** * connectSuccess() will be invoked when the connection has been @@ -84,104 +98,17 @@ class AsyncSocket : virtual public AsyncTransport { noexcept = 0; }; - class ReadCallback { + class EvbChangeCallback { public: - virtual ~ReadCallback() {} + virtual ~EvbChangeCallback() = default; - /** - * When data becomes available, getReadBuffer() will be invoked to get the - * buffer into which data should be read. - * - * This method allows the ReadCallback to delay buffer allocation until - * data becomes available. This allows applications to manage large - * numbers of idle connections, without having to maintain a separate read - * buffer for each idle connection. - * - * It is possible that in some cases, getReadBuffer() may be called - * multiple times before readDataAvailable() is invoked. In this case, the - * data will be written to the buffer returned from the most recent call to - * readDataAvailable(). If the previous calls to readDataAvailable() - * returned different buffers, the ReadCallback is responsible for ensuring - * that they are not leaked. - * - * If getReadBuffer() throws an exception, returns a nullptr buffer, or - * returns a 0 length, the ReadCallback will be uninstalled and its - * readError() method will be invoked. - * - * getReadBuffer() is not allowed to change the transport state before it - * returns. (For example, it should never uninstall the read callback, or - * set a different read callback.) - * - * @param bufReturn getReadBuffer() should update *bufReturn to contain the - * address of the read buffer. This parameter will never - * be nullptr. - * @param lenReturn getReadBuffer() should update *lenReturn to contain the - * maximum number of bytes that may be written to the read - * buffer. This parameter will never be nullptr. - */ - virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0; - - /** - * readDataAvailable() will be invoked when data has been successfully read - * into the buffer returned by the last call to getReadBuffer(). - * - * The read callback remains installed after readDataAvailable() returns. - * It must be explicitly uninstalled to stop receiving read events. - * getReadBuffer() will be called at least once before each call to - * readDataAvailable(). getReadBuffer() will also be called before any - * call to readEOF(). - * - * @param len The number of bytes placed in the buffer. - */ - virtual void readDataAvailable(size_t len) noexcept = 0; + // Called when the socket has been attached to a new EVB + // and is called from within that EVB thread + virtual void evbAttached(AsyncSocket* socket) = 0; - /** - * readEOF() will be invoked when the transport is closed. - * - * The read callback will be automatically uninstalled immediately before - * readEOF() is invoked. - */ - virtual void readEOF() noexcept = 0; - - /** - * readError() will be invoked if an error occurs reading from the - * transport. - * - * The read callback will be automatically uninstalled immediately before - * readError() is invoked. - * - * @param ex An exception describing the error that occurred. - */ - virtual void readErr(const AsyncSocketException& ex) - noexcept = 0; - }; - - class WriteCallback { - public: - virtual ~WriteCallback() {} - - /** - * writeSuccess() will be invoked when all of the data has been - * successfully written. - * - * Note that this mainly signals that the buffer containing the data to - * write is no longer needed and may be freed or re-used. It does not - * guarantee that the data has been fully transmitted to the remote - * endpoint. For example, on socket-based transports, writeSuccess() only - * indicates that the data has been given to the kernel for eventual - * transmission. - */ - virtual void writeSuccess() noexcept = 0; - - /** - * writeError() will be invoked if an error occurs writing the data. - * - * @param bytesWritten The number of bytes that were successfull - * @param ex An exception describing the error that occurred. - */ - virtual void writeErr(size_t bytesWritten, - const AsyncSocketException& ex) - noexcept = 0; + // Called when the socket is detached from an EVB and + // is called from the EVB thread being detached + virtual void evbDetached(AsyncSocket* socket) = 0; }; explicit AsyncSocket(); @@ -286,7 +213,7 @@ class AsyncSocket : virtual public AsyncTransport { * This prevents callers from deleting a AsyncSocket while it is invoking a * callback. */ - virtual void destroy(); + virtual void destroy() override; /** * Get the EventBase used by this socket. @@ -338,7 +265,7 @@ class AsyncSocket : virtual public AsyncTransport { typedef std::map OptionMap; static const OptionMap emptyOptionMap; - static const folly::SocketAddress anyAddress; + static const folly::SocketAddress& anyAddress(); /** * Initiate a connection. @@ -350,15 +277,28 @@ class AsyncSocket : virtual public AsyncTransport { * does not succeed within this period, * callback->connectError() will be invoked. */ - virtual void connect(ConnectCallback* callback, - const folly::SocketAddress& address, - int timeout = 0, - const OptionMap &options = emptyOptionMap, - const folly::SocketAddress& bindAddr = anyAddress - ) noexcept; - void connect(ConnectCallback* callback, const std::string& ip, uint16_t port, - int timeout = 00, - const OptionMap &options = emptyOptionMap) noexcept; + virtual void connect( + ConnectCallback* callback, + const folly::SocketAddress& address, + int timeout = 0, + const OptionMap& options = emptyOptionMap, + const folly::SocketAddress& bindAddr = anyAddress()) noexcept; + + void connect( + ConnectCallback* callback, + const std::string& ip, + uint16_t port, + int timeout = 0, + const OptionMap& options = emptyOptionMap) noexcept; + + /** + * If a connect request is in-flight, cancels it and closes the socket + * immediately. Otherwise, this is a no-op. + * + * This does not invoke any connection related callbacks. Call this to + * prevent any connect callback while cleaning up, etc. + */ + void cancelConnect(); /** * Set the send timeout. @@ -414,16 +354,22 @@ class AsyncSocket : virtual public AsyncTransport { } // Read and write methods - void setReadCB(ReadCallback* callback); - ReadCallback* getReadCallback() const; + void setReadCB(ReadCallback* callback) override; + ReadCallback* getReadCallback() const override; void write(WriteCallback* callback, const void* buf, size_t bytes, - WriteFlags flags = WriteFlags::NONE); + WriteFlags flags = WriteFlags::NONE) override; void writev(WriteCallback* callback, const iovec* vec, size_t count, - WriteFlags flags = WriteFlags::NONE); + WriteFlags flags = WriteFlags::NONE) override; void writeChain(WriteCallback* callback, std::unique_ptr&& buf, - WriteFlags flags = WriteFlags::NONE); + WriteFlags flags = WriteFlags::NONE) override; + + class WriteRequest; + virtual void writeRequest(WriteRequest* req); + void writeRequestReady() { + handleWrite(); + } // Methods inherited from AsyncTransport void close() override; @@ -446,14 +392,28 @@ class AsyncSocket : virtual public AsyncTransport { void getPeerAddress( folly::SocketAddress* address) const override; - bool isEorTrackingEnabled() const override { return false; } + bool isEorTrackingEnabled() const override { + return trackEor_; + } - void setEorTracking(bool track) override {} + void setEorTracking(bool track) override { + trackEor_ = track; + } bool connecting() const override { return (state_ == StateEnum::CONNECTING); } + virtual bool isClosedByPeer() const { + return (state_ == StateEnum::CLOSED && + (readErr_ == READ_EOF || readErr_ == READ_ERROR)); + } + + virtual bool isClosedBySelf() const { + return (state_ == StateEnum::CLOSED && + (readErr_ != READ_EOF && readErr_ != READ_ERROR)); + } + size_t getAppBytesWritten() const override { return appBytesWritten_; } @@ -470,6 +430,42 @@ class AsyncSocket : virtual public AsyncTransport { return getAppBytesReceived(); } + std::chrono::nanoseconds getConnectTime() const { + return connectEndTime_ - connectStartTime_; + } + + std::chrono::milliseconds getConnectTimeout() const { + return connectTimeout_; + } + + bool getTFOAttempted() const { + return tfoAttempted_; + } + + /** + * Returns whether or not the attempt to use TFO + * finished successfully. This does not necessarily + * mean TFO worked, just that trying to use TFO + * succeeded. + */ + bool getTFOFinished() const { + return tfoFinished_; + } + + /** + * Returns whether or not TFO attempt succeded on this + * connection. + * For servers this is pretty straightforward API and can + * be invoked right after the connection is accepted. This API + * will perform one syscall. + * This API is a bit tricky to use for clients, since clients + * only know this for sure after the SYN-ACK is returned. So it's + * appropriate to call this only after the first application + * data is read from the socket when the caller knows that + * the SYN has been ACKed by the server. + */ + bool getTFOSucceded() const; + // Methods controlling socket options /** @@ -489,6 +485,13 @@ class AsyncSocket : virtual public AsyncTransport { */ int setNoDelay(bool noDelay); + + /** + * Set the FD_CLOEXEC flag so that the socket will be closed if the program + * later forks and execs. + */ + void setCloseOnExec(); + /* * Set the Flavor of Congestion Control to be used for this Socket * Please check '/lib/modules//kernel/net/ipv4' for tcp_*.ko @@ -522,7 +525,6 @@ class AsyncSocket : virtual public AsyncTransport { #define SO_SET_NAMESPACE 41 int setTCPProfile(int profd); - /** * Generic API for reading a socket option. * @@ -530,11 +532,14 @@ class AsyncSocket : virtual public AsyncTransport { * @param optname same as the "optname" parameter in getsockopt(). * @param optval pointer to the variable in which the option value should * be returned. + * @param optlen value-result argument, initially containing the size of + * the buffer pointed to by optval, and modified on return + * to indicate the actual size of the value returned. * @return same as the return value of getsockopt(). */ template - int getSockOpt(int level, int optname, T *optval) { - return getsockopt(fd_, level, optname, optval, sizeof(T)); + int getSockOpt(int level, int optname, T* optval, socklen_t* optlen) { + return getsockopt(fd_, level, optname, (void*) optval, optlen); } /** @@ -550,12 +555,129 @@ class AsyncSocket : virtual public AsyncTransport { return setsockopt(fd_, level, optname, optval, sizeof(T)); } + virtual void setPeek(bool peek) { + peek_ = peek; + } + + /** + * Enables TFO behavior on the AsyncSocket if FOLLY_ALLOW_TFO + * is set. + */ + void enableTFO() { + // No-op if folly does not allow tfo +#if FOLLY_ALLOW_TFO + tfoEnabled_ = true; +#endif + } + + void disableTransparentTls() { + noTransparentTls_ = true; + } + enum class StateEnum : uint8_t { UNINIT, CONNECTING, ESTABLISHED, CLOSED, - ERROR + ERROR, + FAST_OPEN, + }; + + void setBufferCallback(BufferCallback* cb); + + // Callers should set this prior to connecting the socket for the safest + // behavior. + void setEvbChangedCallback(std::unique_ptr cb) { + evbChangeCb_ = std::move(cb); + } + + /** + * writeReturn is the total number of bytes written, or WRITE_ERROR on error. + * If no data has been written, 0 is returned. + * exception is a more specific exception that cause a write error. + * Not all writes have exceptions associated with them thus writeReturn + * should be checked to determine whether the operation resulted in an error. + */ + struct WriteResult { + explicit WriteResult(ssize_t ret) : writeReturn(ret) {} + + WriteResult(ssize_t ret, std::unique_ptr e) + : writeReturn(ret), exception(std::move(e)) {} + + ssize_t writeReturn; + std::unique_ptr exception; + }; + + /** + * readReturn is the number of bytes read, or READ_EOF on EOF, or + * READ_ERROR on error, or READ_BLOCKING if the operation will + * block. + * exception is a more specific exception that may have caused a read error. + * Not all read errors have exceptions associated with them thus readReturn + * should be checked to determine whether the operation resulted in an error. + */ + struct ReadResult { + explicit ReadResult(ssize_t ret) : readReturn(ret) {} + + ReadResult(ssize_t ret, std::unique_ptr e) + : readReturn(ret), exception(std::move(e)) {} + + ssize_t readReturn; + std::unique_ptr exception; + }; + + /** + * A WriteRequest object tracks information about a pending write operation. + */ + class WriteRequest { + public: + WriteRequest(AsyncSocket* socket, WriteCallback* callback) : + socket_(socket), callback_(callback) {} + + virtual void start() {} + + virtual void destroy() = 0; + + virtual WriteResult performWrite() = 0; + + virtual void consume() = 0; + + virtual bool isComplete() = 0; + + WriteRequest* getNext() const { + return next_; + } + + WriteCallback* getCallback() const { + return callback_; + } + + uint32_t getTotalBytesWritten() const { + return totalBytesWritten_; + } + + void append(WriteRequest* next) { + assert(next_ == nullptr); + next_ = next; + } + + void fail(const char* fn, const AsyncSocketException& ex) { + socket_->failWrite(fn, ex); + } + + void bytesWritten(size_t count) { + totalBytesWritten_ += uint32_t(count); + socket_->appBytesWritten_ += count; + } + + protected: + // protected destructor, to ensure callers use destroy() + virtual ~WriteRequest() {} + + AsyncSocket* socket_; ///< parent socket + WriteRequest* next_{nullptr}; ///< pointer to next WriteRequest + WriteCallback* callback_; ///< completion callback + uint32_t totalBytesWritten_{0}; ///< total bytes written }; protected: @@ -563,6 +685,11 @@ class AsyncSocket : virtual public AsyncTransport { READ_EOF = 0, READ_ERROR = -1, READ_BLOCKING = -2, + READ_NO_ERROR = -3, + }; + + enum WriteResultEnum { + WRITE_ERROR = -1, }; /** @@ -597,7 +724,7 @@ class AsyncSocket : virtual public AsyncTransport { SHUT_READ = 0x04, }; - class WriteRequest; + class BytesWriteRequest; class WriteTimeout : public AsyncTimeout { public: @@ -632,10 +759,46 @@ class AsyncSocket : virtual public AsyncTransport { void init(); + class ImmediateReadCB : public folly::EventBase::LoopCallback { + public: + explicit ImmediateReadCB(AsyncSocket* socket) : socket_(socket) {} + void runLoopCallback() noexcept override { + DestructorGuard dg(socket_); + socket_->checkForImmediateRead(); + } + private: + AsyncSocket* socket_; + }; + + /** + * Schedule checkForImmediateRead to be executed in the next loop + * iteration. + */ + void scheduleImmediateRead() noexcept { + if (good()) { + eventBase_->runInLoop(&immediateReadHandler_); + } + } + + /** + * Schedule handleInitalReadWrite to run in the next iteration. + */ + void scheduleInitialReadWrite() noexcept { + if (good()) { + DestructorGuard dg(this); + eventBase_->runInLoop([this, dg] { + if (good()) { + handleInitialReadWrite(); + } + }); + } + } + // event notification methods void ioReady(uint16_t events) noexcept; virtual void checkForImmediateRead() noexcept; virtual void handleInitialReadWrite() noexcept; + virtual void prepareReadBuffer(void** buf, size_t* buflen); virtual void handleRead() noexcept; virtual void handleWrite() noexcept; virtual void handleConnect() noexcept; @@ -647,11 +810,9 @@ class AsyncSocket : virtual public AsyncTransport { * @param buf The buffer to read data into. * @param buflen The length of the buffer. * - * @return Returns the number of bytes read, or READ_EOF on EOF, or - * READ_ERROR on error, or READ_BLOCKING if the operation will - * block. + * @return Returns a read result. See read result for details. */ - virtual ssize_t performRead(void* buf, size_t buflen); + virtual ReadResult performRead(void** buf, size_t* buflen, size_t* offset); /** * Populate an iovec array from an IOBuf and attempt to write it. @@ -700,12 +861,30 @@ class AsyncSocket : virtual public AsyncTransport { * will contain the number of bytes written in the * partially written iovec entry. * - * @return Returns the total number of bytes written, or -1 on error. If no - * data can be written immediately, 0 is returned. + * @return Returns a WriteResult. See WriteResult for more details. + */ + virtual WriteResult performWrite( + const iovec* vec, + uint32_t count, + WriteFlags flags, + uint32_t* countWritten, + uint32_t* partialWritten); + + /** + * Sends the message over the socket using sendmsg + * + * @param msg Message to send + * @param msg_flags Flags to pass to sendmsg */ - virtual ssize_t performWrite(const iovec* vec, uint32_t count, - WriteFlags flags, uint32_t* countWritten, - uint32_t* partialWritten); + AsyncSocket::WriteResult + sendSocketMessage(int fd, struct msghdr* msg, int msg_flags); + + virtual ssize_t tfoSendMsg(int fd, struct msghdr* msg, int msg_flags); + + int socketConnect(const struct sockaddr* addr, socklen_t len); + + virtual void scheduleConnectTimeout(); + void registerForConnectEvents(); bool updateEventRegistration(); @@ -729,6 +908,8 @@ class AsyncSocket : virtual public AsyncTransport { // error handling methods void startFail(); void finishFail(); + void finishFail(const AsyncSocketException& ex); + void invokeAllErrors(const AsyncSocketException& ex); void fail(const char* fn, const AsyncSocketException& ex); void failConnect(const char* fn, const AsyncSocketException& ex); void failRead(const char* fn, const AsyncSocketException& ex); @@ -736,6 +917,8 @@ class AsyncSocket : virtual public AsyncTransport { const AsyncSocketException& ex); void failWrite(const char* fn, const AsyncSocketException& ex); void failAllWrites(const AsyncSocketException& ex); + virtual void invokeConnectErr(const AsyncSocketException& ex); + virtual void invokeConnectSuccess(); void invalidState(ConnectCallback* callback); void invalidState(ReadCallback* callback); void invalidState(WriteCallback* callback); @@ -746,13 +929,15 @@ class AsyncSocket : virtual public AsyncTransport { uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags) uint16_t eventFlags_; ///< EventBase::HandlerFlags settings int fd_; ///< The socket file descriptor - mutable - folly::SocketAddress addr_; ///< The address we tried to connect to + mutable folly::SocketAddress addr_; ///< The address we tried to connect to + mutable folly::SocketAddress localAddr_; + ///< The address we are connecting from uint32_t sendTimeout_; ///< The send timeout, in milliseconds uint16_t maxReadsPerEvent_; ///< Max reads per event loop iteration - EventBase* eventBase_; ///< The EventBase + EventBase* eventBase_; ///< The EventBase WriteTimeout writeTimeout_; ///< A timeout for connect and write IoHandler ioHandler_; ///< A EventHandler to monitor the fd + ImmediateReadCB immediateReadHandler_; ///< LoopCallback for checking read ConnectCallback* connectCallback_; ///< ConnectCallback ReadCallback* readCallback_; ///< ReadCallback @@ -761,7 +946,29 @@ class AsyncSocket : virtual public AsyncTransport { ShutdownSocketSet* shutdownSocketSet_; size_t appBytesReceived_; ///< Num of bytes received from socket size_t appBytesWritten_; ///< Num of bytes written to socket -}; + bool isBufferMovable_{false}; + + bool peek_{false}; // Peek bytes. + int8_t readErr_{READ_NO_ERROR}; ///< The read error encountered, if any. + + std::chrono::steady_clock::time_point connectStartTime_; + std::chrono::steady_clock::time_point connectEndTime_; + + std::chrono::milliseconds connectTimeout_{0}; + + BufferCallback* bufferCallback_{nullptr}; + bool tfoEnabled_{false}; + bool tfoAttempted_{false}; + bool tfoFinished_{false}; + bool noTransparentTls_{false}; + // Whether to track EOR or not. + bool trackEor_{false}; + + std::unique_ptr evbChangeCb_{nullptr}; +}; +#ifdef _MSC_VER +#pragma vtordisp(pop) +#endif } // folly