X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fio%2Fasync%2FAsyncSocket.h;h=6462f115d7a0a52e301195681f212fd65c972377;hp=33924b6d807ff27cad9105c8ffb67b0367f19b6a;hb=4c7a736d6529f22451a0ec965e093e7e318695e3;hpb=ff9b70f3cd1f05fb8e8c4351248cd9f748c2644a diff --git a/folly/io/async/AsyncSocket.h b/folly/io/async/AsyncSocket.h index 33924b6d..6462f115 100644 --- a/folly/io/async/AsyncSocket.h +++ b/folly/io/async/AsyncSocket.h @@ -1,5 +1,5 @@ /* - * Copyright 2014 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,18 +16,21 @@ #pragma once -#include -#include -#include +#include #include -#include +#include #include -#include +#include #include +#include #include -#include #include +#include +#include +#include + +#include #include #include @@ -61,13 +64,28 @@ namespace folly { * responding and no further progress can be made sending the data. */ -class AsyncSocket : virtual public AsyncTransport { +#if defined __linux__ && !defined SO_NO_TRANSPARENT_TLS +#define SO_NO_TRANSPARENT_TLS 200 +#endif + +#if defined __linux__ && !defined SO_NO_TSOCKS +#define SO_NO_TSOCKS 201 +#endif + +#ifdef _MSC_VER +// We do a dynamic_cast on this, in +// AsyncTransportWrapper::getUnderlyingTransport so be safe and +// force displacements for it. See: +// https://msdn.microsoft.com/en-us/library/7sf3txa8.aspx +#pragma vtordisp(push, 2) +#endif +class AsyncSocket : virtual public AsyncTransportWrapper { public: typedef std::unique_ptr UniquePtr; class ConnectCallback { public: - virtual ~ConnectCallback() {} + virtual ~ConnectCallback() = default; /** * connectSuccess() will be invoked when the connection has been @@ -84,104 +102,116 @@ class AsyncSocket : virtual public AsyncTransport { noexcept = 0; }; - class ReadCallback { + class EvbChangeCallback { + public: + virtual ~EvbChangeCallback() = default; + + // Called when the socket has been attached to a new EVB + // and is called from within that EVB thread + virtual void evbAttached(AsyncSocket* socket) = 0; + + // Called when the socket is detached from an EVB and + // is called from the EVB thread being detached + virtual void evbDetached(AsyncSocket* socket) = 0; + }; + + /** + * This interface is implemented only for platforms supporting + * per-socket error queues. + */ + class ErrMessageCallback { public: - virtual ~ReadCallback() {} + virtual ~ErrMessageCallback() = default; /** - * When data becomes available, getReadBuffer() will be invoked to get the - * buffer into which data should be read. - * - * This method allows the ReadCallback to delay buffer allocation until - * data becomes available. This allows applications to manage large - * numbers of idle connections, without having to maintain a separate read - * buffer for each idle connection. - * - * It is possible that in some cases, getReadBuffer() may be called - * multiple times before readDataAvailable() is invoked. In this case, the - * data will be written to the buffer returned from the most recent call to - * readDataAvailable(). If the previous calls to readDataAvailable() - * returned different buffers, the ReadCallback is responsible for ensuring - * that they are not leaked. - * - * If getReadBuffer() throws an exception, returns a nullptr buffer, or - * returns a 0 length, the ReadCallback will be uninstalled and its - * readError() method will be invoked. - * - * getReadBuffer() is not allowed to change the transport state before it - * returns. (For example, it should never uninstall the read callback, or - * set a different read callback.) + * errMessage() will be invoked when kernel puts a message to + * the error queue associated with the socket. * - * @param bufReturn getReadBuffer() should update *bufReturn to contain the - * address of the read buffer. This parameter will never - * be nullptr. - * @param lenReturn getReadBuffer() should update *lenReturn to contain the - * maximum number of bytes that may be written to the read - * buffer. This parameter will never be nullptr. + * @param cmsg Reference to cmsghdr structure describing + * a message read from error queue associated + * with the socket. */ - virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0; + virtual void + errMessage(const cmsghdr& cmsg) noexcept = 0; /** - * readDataAvailable() will be invoked when data has been successfully read - * into the buffer returned by the last call to getReadBuffer(). + * errMessageError() will be invoked if an error occurs reading a message + * from the socket error stream. * - * The read callback remains installed after readDataAvailable() returns. - * It must be explicitly uninstalled to stop receiving read events. - * getReadBuffer() will be called at least once before each call to - * readDataAvailable(). getReadBuffer() will also be called before any - * call to readEOF(). - * - * @param len The number of bytes placed in the buffer. + * @param ex An exception describing the error that occurred. */ - virtual void readDataAvailable(size_t len) noexcept = 0; + virtual void errMessageError(const AsyncSocketException& ex) noexcept = 0; + }; + + class SendMsgParamsCallback { + public: + virtual ~SendMsgParamsCallback() = default; /** - * readEOF() will be invoked when the transport is closed. + * getFlags() will be invoked to retrieve the desired flags to be passed + * to ::sendmsg() system call. This method was intentionally declared + * non-virtual, so there is no way to override it. Instead feel free to + * override getFlagsImpl(flags, defaultFlags) method instead, and enjoy + * the convenience of defaultFlags passed there. * - * The read callback will be automatically uninstalled immediately before - * readEOF() is invoked. + * @param flags Write flags requested for the given write operation */ - virtual void readEOF() noexcept = 0; + int getFlags(folly::WriteFlags flags) noexcept { + return getFlagsImpl(flags, getDefaultFlags(flags)); + } /** - * readError() will be invoked if an error occurs reading from the - * transport. + * getAncillaryData() will be invoked to initialize ancillary data + * buffer referred by "msg_control" field of msghdr structure passed to + * ::sendmsg() system call. The function assumes that the size of buffer + * is not smaller than the value returned by getAncillaryDataSize() method + * for the same combination of flags. * - * The read callback will be automatically uninstalled immediately before - * readError() is invoked. + * @param flags Write flags requested for the given write operation + * @param data Pointer to ancillary data buffer to initialize. + */ + virtual void getAncillaryData( + folly::WriteFlags /*flags*/, + void* /*data*/) noexcept {} + + /** + * getAncillaryDataSize() will be invoked to retrieve the size of + * ancillary data buffer which should be passed to ::sendmsg() system call * - * @param ex An exception describing the error that occurred. + * @param flags Write flags requested for the given write operation */ - virtual void readErr(const AsyncSocketException& ex) - noexcept = 0; - }; + virtual uint32_t getAncillaryDataSize(folly::WriteFlags /*flags*/) + noexcept { + return 0; + } - class WriteCallback { - public: - virtual ~WriteCallback() {} + static const size_t maxAncillaryDataSize{0x5000}; + private: /** - * writeSuccess() will be invoked when all of the data has been - * successfully written. + * getFlagsImpl() will be invoked by getFlags(folly::WriteFlags flags) + * method to retrieve the flags to be passed to ::sendmsg() system call. + * SendMsgParamsCallback::getFlags() is calling this method, and returns + * its results directly to the caller in AsyncSocket. + * Classes inheriting from SendMsgParamsCallback are welcome to override + * this method to force SendMsgParamsCallback to return its own set + * of flags. * - * Note that this mainly signals that the buffer containing the data to - * write is no longer needed and may be freed or re-used. It does not - * guarantee that the data has been fully transmitted to the remote - * endpoint. For example, on socket-based transports, writeSuccess() only - * indicates that the data has been given to the kernel for eventual - * transmission. + * @param flags Write flags requested for the given write operation + * @param defaultflags A set of message flags returned by getDefaultFlags() + * method for the given "flags" mask. */ - virtual void writeSuccess() noexcept = 0; + virtual int getFlagsImpl(folly::WriteFlags /*flags*/, int defaultFlags) { + return defaultFlags; + } /** - * writeError() will be invoked if an error occurs writing the data. + * getDefaultFlags() will be invoked by getFlags(folly::WriteFlags flags) + * to retrieve the default set of flags, and pass them to getFlagsImpl(...) * - * @param bytesWritten The number of bytes that were successfull - * @param ex An exception describing the error that occurred. + * @param flags Write flags requested for the given write operation */ - virtual void writeErr(size_t bytesWritten, - const AsyncSocketException& ex) - noexcept = 0; + int getDefaultFlags(folly::WriteFlags flags) noexcept; }; explicit AsyncSocket(); @@ -234,6 +264,14 @@ class AsyncSocket : virtual public AsyncTransport { */ AsyncSocket(EventBase* evb, int fd); + /** + * Create an AsyncSocket from a different, already connected AsyncSocket. + * + * Similar to AsyncSocket(evb, fd) when fd was previously owned by an + * AsyncSocket. + */ + explicit AsyncSocket(AsyncSocket::UniquePtr); + /** * Helper function to create a shared_ptr. * @@ -286,7 +324,7 @@ class AsyncSocket : virtual public AsyncTransport { * This prevents callers from deleting a AsyncSocket while it is invoking a * callback. */ - virtual void destroy(); + void destroy() override; /** * Get the EventBase used by this socket. @@ -309,6 +347,10 @@ class AsyncSocket : virtual public AsyncTransport { * error. The AsyncSocket may no longer be used after the file descriptor * has been extracted. * + * This method should be used with care as the resulting fd is not guaranteed + * to perfectly reflect the state of the AsyncSocket (security state, + * pre-received data, etc.). + * * Returns the file descriptor. The caller assumes ownership of the * descriptor, and it will not be closed when the AsyncSocket is destroyed. */ @@ -338,7 +380,7 @@ class AsyncSocket : virtual public AsyncTransport { typedef std::map OptionMap; static const OptionMap emptyOptionMap; - static const folly::SocketAddress anyAddress; + static const folly::SocketAddress& anyAddress(); /** * Initiate a connection. @@ -350,15 +392,28 @@ class AsyncSocket : virtual public AsyncTransport { * does not succeed within this period, * callback->connectError() will be invoked. */ - virtual void connect(ConnectCallback* callback, - const folly::SocketAddress& address, - int timeout = 0, - const OptionMap &options = emptyOptionMap, - const folly::SocketAddress& bindAddr = anyAddress - ) noexcept; - void connect(ConnectCallback* callback, const std::string& ip, uint16_t port, - int timeout = 00, - const OptionMap &options = emptyOptionMap) noexcept; + virtual void connect( + ConnectCallback* callback, + const folly::SocketAddress& address, + int timeout = 0, + const OptionMap& options = emptyOptionMap, + const folly::SocketAddress& bindAddr = anyAddress()) noexcept; + + void connect( + ConnectCallback* callback, + const std::string& ip, + uint16_t port, + int timeout = 0, + const OptionMap& options = emptyOptionMap) noexcept; + + /** + * If a connect request is in-flight, cancels it and closes the socket + * immediately. Otherwise, this is a no-op. + * + * This does not invoke any connection related callbacks. Call this to + * prevent any connect callback while cleaning up, etc. + */ + void cancelConnect(); /** * Set the send timeout. @@ -413,17 +468,55 @@ class AsyncSocket : virtual public AsyncTransport { return maxReadsPerEvent_; } + /** + * Set a pointer to ErrMessageCallback implementation which will be + * receiving notifications for messages posted to the error queue + * associated with the socket. + * ErrMessageCallback is implemented only for platforms with + * per-socket error message queus support (recvmsg() system call must + * ) + * + */ + virtual void setErrMessageCB(ErrMessageCallback* callback); + + /** + * Get a pointer to ErrMessageCallback implementation currently + * registered with this socket. + * + */ + virtual ErrMessageCallback* getErrMessageCallback() const; + + /** + * Set a pointer to SendMsgParamsCallback implementation which + * will be used to form ::sendmsg() system call parameters + * + */ + virtual void setSendMsgParamCB(SendMsgParamsCallback* callback); + + /** + * Get a pointer to SendMsgParamsCallback implementation currently + * registered with this socket. + * + */ + virtual SendMsgParamsCallback* getSendMsgParamsCB() const; + // Read and write methods - void setReadCB(ReadCallback* callback); - ReadCallback* getReadCallback() const; + void setReadCB(ReadCallback* callback) override; + ReadCallback* getReadCallback() const override; void write(WriteCallback* callback, const void* buf, size_t bytes, - WriteFlags flags = WriteFlags::NONE); + WriteFlags flags = WriteFlags::NONE) override; void writev(WriteCallback* callback, const iovec* vec, size_t count, - WriteFlags flags = WriteFlags::NONE); + WriteFlags flags = WriteFlags::NONE) override; void writeChain(WriteCallback* callback, std::unique_ptr&& buf, - WriteFlags flags = WriteFlags::NONE); + WriteFlags flags = WriteFlags::NONE) override; + + class WriteRequest; + virtual void writeRequest(WriteRequest* req); + void writeRequestReady() { + handleWrite(); + } // Methods inherited from AsyncTransport void close() override; @@ -433,6 +526,7 @@ class AsyncSocket : virtual public AsyncTransport { void shutdownWriteNow() override; bool readable() const override; + bool writable() const override; bool isPending() const override; virtual bool hangup() const; bool good() const override; @@ -446,14 +540,28 @@ class AsyncSocket : virtual public AsyncTransport { void getPeerAddress( folly::SocketAddress* address) const override; - bool isEorTrackingEnabled() const override { return false; } + bool isEorTrackingEnabled() const override { + return trackEor_; + } - void setEorTracking(bool track) override {} + void setEorTracking(bool track) override { + trackEor_ = track; + } bool connecting() const override { return (state_ == StateEnum::CONNECTING); } + virtual bool isClosedByPeer() const { + return (state_ == StateEnum::CLOSED && + (readErr_ == READ_EOF || readErr_ == READ_ERROR)); + } + + virtual bool isClosedBySelf() const { + return (state_ == StateEnum::CLOSED && + (readErr_ != READ_EOF && readErr_ != READ_ERROR)); + } + size_t getAppBytesWritten() const override { return appBytesWritten_; } @@ -470,6 +578,42 @@ class AsyncSocket : virtual public AsyncTransport { return getAppBytesReceived(); } + std::chrono::nanoseconds getConnectTime() const { + return connectEndTime_ - connectStartTime_; + } + + std::chrono::milliseconds getConnectTimeout() const { + return connectTimeout_; + } + + bool getTFOAttempted() const { + return tfoAttempted_; + } + + /** + * Returns whether or not the attempt to use TFO + * finished successfully. This does not necessarily + * mean TFO worked, just that trying to use TFO + * succeeded. + */ + bool getTFOFinished() const { + return tfoFinished_; + } + + /** + * Returns whether or not TFO attempt succeded on this + * connection. + * For servers this is pretty straightforward API and can + * be invoked right after the connection is accepted. This API + * will perform one syscall. + * This API is a bit tricky to use for clients, since clients + * only know this for sure after the SYN-ACK is returned. So it's + * appropriate to call this only after the first application + * data is read from the socket when the caller knows that + * the SYN has been ACKed by the server. + */ + bool getTFOSucceded() const; + // Methods controlling socket options /** @@ -489,6 +633,13 @@ class AsyncSocket : virtual public AsyncTransport { */ int setNoDelay(bool noDelay); + + /** + * Set the FD_CLOEXEC flag so that the socket will be closed if the program + * later forks and execs. + */ + void setCloseOnExec(); + /* * Set the Flavor of Congestion Control to be used for this Socket * Please check '/lib/modules//kernel/net/ipv4' for tcp_*.ko @@ -522,7 +673,6 @@ class AsyncSocket : virtual public AsyncTransport { #define SO_SET_NAMESPACE 41 int setTCPProfile(int profd); - /** * Generic API for reading a socket option. * @@ -530,11 +680,14 @@ class AsyncSocket : virtual public AsyncTransport { * @param optname same as the "optname" parameter in getsockopt(). * @param optval pointer to the variable in which the option value should * be returned. + * @param optlen value-result argument, initially containing the size of + * the buffer pointed to by optval, and modified on return + * to indicate the actual size of the value returned. * @return same as the return value of getsockopt(). */ template - int getSockOpt(int level, int optname, T *optval) { - return getsockopt(fd_, level, optname, optval, sizeof(T)); + int getSockOpt(int level, int optname, T* optval, socklen_t* optlen) { + return getsockopt(fd_, level, optname, (void*) optval, optlen); } /** @@ -550,12 +703,183 @@ class AsyncSocket : virtual public AsyncTransport { return setsockopt(fd_, level, optname, optval, sizeof(T)); } + /** + * Virtual method for reading a socket option returning integer + * value, which is the most typical case. Convenient for overriding + * and mocking. + * + * @param level same as the "level" parameter in getsockopt(). + * @param optname same as the "optname" parameter in getsockopt(). + * @param optval same as "optval" parameter in getsockopt(). + * @param optlen same as "optlen" parameter in getsockopt(). + * @return same as the return value of getsockopt(). + */ + virtual int + getSockOptVirtual(int level, int optname, void* optval, socklen_t* optlen) { + return getsockopt(fd_, level, optname, optval, optlen); + } + + /** + * Virtual method for setting a socket option accepting integer + * value, which is the most typical case. Convenient for overriding + * and mocking. + * + * @param level same as the "level" parameter in setsockopt(). + * @param optname same as the "optname" parameter in setsockopt(). + * @param optval same as "optval" parameter in setsockopt(). + * @param optlen same as "optlen" parameter in setsockopt(). + * @return same as the return value of setsockopt(). + */ + virtual int setSockOptVirtual( + int level, + int optname, + void const* optval, + socklen_t optlen) { + return setsockopt(fd_, level, optname, optval, optlen); + } + + /** + * Set pre-received data, to be returned to read callback before any data + * from the socket. + */ + virtual void setPreReceivedData(std::unique_ptr data) { + if (preReceivedData_) { + preReceivedData_->prependChain(std::move(data)); + } else { + preReceivedData_ = std::move(data); + } + } + + /** + * Enables TFO behavior on the AsyncSocket if FOLLY_ALLOW_TFO + * is set. + */ + void enableTFO() { + // No-op if folly does not allow tfo +#if FOLLY_ALLOW_TFO + tfoEnabled_ = true; +#endif + } + + void disableTransparentTls() { + noTransparentTls_ = true; + } + + void disableTSocks() { + noTSocks_ = true; + } + enum class StateEnum : uint8_t { UNINIT, CONNECTING, ESTABLISHED, CLOSED, - ERROR + ERROR, + FAST_OPEN, + }; + + void setBufferCallback(BufferCallback* cb); + + // Callers should set this prior to connecting the socket for the safest + // behavior. + void setEvbChangedCallback(std::unique_ptr cb) { + evbChangeCb_ = std::move(cb); + } + + /** + * Attempt to cache the current local and peer addresses (if not already + * cached) so that they are available from getPeerAddress() and + * getLocalAddress() even after the socket is closed. + */ + void cacheAddresses(); + + /** + * writeReturn is the total number of bytes written, or WRITE_ERROR on error. + * If no data has been written, 0 is returned. + * exception is a more specific exception that cause a write error. + * Not all writes have exceptions associated with them thus writeReturn + * should be checked to determine whether the operation resulted in an error. + */ + struct WriteResult { + explicit WriteResult(ssize_t ret) : writeReturn(ret) {} + + WriteResult(ssize_t ret, std::unique_ptr e) + : writeReturn(ret), exception(std::move(e)) {} + + ssize_t writeReturn; + std::unique_ptr exception; + }; + + /** + * readReturn is the number of bytes read, or READ_EOF on EOF, or + * READ_ERROR on error, or READ_BLOCKING if the operation will + * block. + * exception is a more specific exception that may have caused a read error. + * Not all read errors have exceptions associated with them thus readReturn + * should be checked to determine whether the operation resulted in an error. + */ + struct ReadResult { + explicit ReadResult(ssize_t ret) : readReturn(ret) {} + + ReadResult(ssize_t ret, std::unique_ptr e) + : readReturn(ret), exception(std::move(e)) {} + + ssize_t readReturn; + std::unique_ptr exception; + }; + + /** + * A WriteRequest object tracks information about a pending write operation. + */ + class WriteRequest { + public: + WriteRequest(AsyncSocket* socket, WriteCallback* callback) : + socket_(socket), callback_(callback) {} + + virtual void start() {} + + virtual void destroy() = 0; + + virtual WriteResult performWrite() = 0; + + virtual void consume() = 0; + + virtual bool isComplete() = 0; + + WriteRequest* getNext() const { + return next_; + } + + WriteCallback* getCallback() const { + return callback_; + } + + uint32_t getTotalBytesWritten() const { + return totalBytesWritten_; + } + + void append(WriteRequest* next) { + assert(next_ == nullptr); + next_ = next; + } + + void fail(const char* fn, const AsyncSocketException& ex) { + socket_->failWrite(fn, ex); + } + + void bytesWritten(size_t count) { + totalBytesWritten_ += uint32_t(count); + socket_->appBytesWritten_ += count; + } + + protected: + // protected destructor, to ensure callers use destroy() + virtual ~WriteRequest() {} + + AsyncSocket* socket_; ///< parent socket + WriteRequest* next_{nullptr}; ///< pointer to next WriteRequest + WriteCallback* callback_; ///< completion callback + uint32_t totalBytesWritten_{0}; ///< total bytes written }; protected: @@ -563,6 +887,11 @@ class AsyncSocket : virtual public AsyncTransport { READ_EOF = 0, READ_ERROR = -1, READ_BLOCKING = -2, + READ_NO_ERROR = -3, + }; + + enum WriteResultEnum { + WRITE_ERROR = -1, }; /** @@ -572,7 +901,7 @@ class AsyncSocket : virtual public AsyncTransport { * destroy() instead. (See the documentation in DelayedDestruction.h for * more details.) */ - ~AsyncSocket(); + ~AsyncSocket() override; friend std::ostream& operator << (std::ostream& os, const StateEnum& state); @@ -597,7 +926,7 @@ class AsyncSocket : virtual public AsyncTransport { SHUT_READ = 0x04, }; - class WriteRequest; + class BytesWriteRequest; class WriteTimeout : public AsyncTimeout { public: @@ -605,7 +934,7 @@ class AsyncSocket : virtual public AsyncTransport { : AsyncTimeout(eventBase) , socket_(socket) {} - virtual void timeoutExpired() noexcept { + void timeoutExpired() noexcept override { socket_->timeoutExpired(); } @@ -622,7 +951,7 @@ class AsyncSocket : virtual public AsyncTransport { : EventHandler(eventBase, fd) , socket_(socket) {} - virtual void handlerReady(uint16_t events) noexcept { + void handlerReady(uint16_t events) noexcept override { socket_->ioReady(events); } @@ -632,10 +961,47 @@ class AsyncSocket : virtual public AsyncTransport { void init(); + class ImmediateReadCB : public folly::EventBase::LoopCallback { + public: + explicit ImmediateReadCB(AsyncSocket* socket) : socket_(socket) {} + void runLoopCallback() noexcept override { + DestructorGuard dg(socket_); + socket_->checkForImmediateRead(); + } + private: + AsyncSocket* socket_; + }; + + /** + * Schedule checkForImmediateRead to be executed in the next loop + * iteration. + */ + void scheduleImmediateRead() noexcept { + if (good()) { + eventBase_->runInLoop(&immediateReadHandler_); + } + } + + /** + * Schedule handleInitalReadWrite to run in the next iteration. + */ + void scheduleInitialReadWrite() noexcept { + if (good()) { + DestructorGuard dg(this); + eventBase_->runInLoop([this, dg] { + if (good()) { + handleInitialReadWrite(); + } + }); + } + } + // event notification methods void ioReady(uint16_t events) noexcept; virtual void checkForImmediateRead() noexcept; virtual void handleInitialReadWrite() noexcept; + virtual void prepareReadBuffer(void** buf, size_t* buflen); + virtual void handleErrMessages() noexcept; virtual void handleRead() noexcept; virtual void handleWrite() noexcept; virtual void handleConnect() noexcept; @@ -647,11 +1013,9 @@ class AsyncSocket : virtual public AsyncTransport { * @param buf The buffer to read data into. * @param buflen The length of the buffer. * - * @return Returns the number of bytes read, or READ_EOF on EOF, or - * READ_ERROR on error, or READ_BLOCKING if the operation will - * block. + * @return Returns a read result. See read result for details. */ - virtual ssize_t performRead(void* buf, size_t buflen); + virtual ReadResult performRead(void** buf, size_t* buflen, size_t* offset); /** * Populate an iovec array from an IOBuf and attempt to write it. @@ -700,12 +1064,30 @@ class AsyncSocket : virtual public AsyncTransport { * will contain the number of bytes written in the * partially written iovec entry. * - * @return Returns the total number of bytes written, or -1 on error. If no - * data can be written immediately, 0 is returned. + * @return Returns a WriteResult. See WriteResult for more details. */ - virtual ssize_t performWrite(const iovec* vec, uint32_t count, - WriteFlags flags, uint32_t* countWritten, - uint32_t* partialWritten); + virtual WriteResult performWrite( + const iovec* vec, + uint32_t count, + WriteFlags flags, + uint32_t* countWritten, + uint32_t* partialWritten); + + /** + * Sends the message over the socket using sendmsg + * + * @param msg Message to send + * @param msg_flags Flags to pass to sendmsg + */ + AsyncSocket::WriteResult + sendSocketMessage(int fd, struct msghdr* msg, int msg_flags); + + virtual ssize_t tfoSendMsg(int fd, struct msghdr* msg, int msg_flags); + + int socketConnect(const struct sockaddr* addr, socklen_t len); + + virtual void scheduleConnectTimeout(); + void registerForConnectEvents(); bool updateEventRegistration(); @@ -729,39 +1111,78 @@ class AsyncSocket : virtual public AsyncTransport { // error handling methods void startFail(); void finishFail(); + void finishFail(const AsyncSocketException& ex); + void invokeAllErrors(const AsyncSocketException& ex); void fail(const char* fn, const AsyncSocketException& ex); void failConnect(const char* fn, const AsyncSocketException& ex); void failRead(const char* fn, const AsyncSocketException& ex); + void failErrMessageRead(const char* fn, const AsyncSocketException& ex); void failWrite(const char* fn, WriteCallback* callback, size_t bytesWritten, const AsyncSocketException& ex); void failWrite(const char* fn, const AsyncSocketException& ex); void failAllWrites(const AsyncSocketException& ex); + virtual void invokeConnectErr(const AsyncSocketException& ex); + virtual void invokeConnectSuccess(); void invalidState(ConnectCallback* callback); + void invalidState(ErrMessageCallback* callback); void invalidState(ReadCallback* callback); void invalidState(WriteCallback* callback); std::string withAddr(const std::string& s); - StateEnum state_; ///< StateEnum describing current state - uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags) - uint16_t eventFlags_; ///< EventBase::HandlerFlags settings - int fd_; ///< The socket file descriptor - mutable - folly::SocketAddress addr_; ///< The address we tried to connect to - uint32_t sendTimeout_; ///< The send timeout, in milliseconds - uint16_t maxReadsPerEvent_; ///< Max reads per event loop iteration - EventBase* eventBase_; ///< The EventBase - WriteTimeout writeTimeout_; ///< A timeout for connect and write - IoHandler ioHandler_; ///< A EventHandler to monitor the fd - - ConnectCallback* connectCallback_; ///< ConnectCallback - ReadCallback* readCallback_; ///< ReadCallback - WriteRequest* writeReqHead_; ///< Chain of WriteRequests - WriteRequest* writeReqTail_; ///< End of WriteRequest chain + void cacheLocalAddress() const; + void cachePeerAddress() const; + + StateEnum state_; ///< StateEnum describing current state + uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags) + uint16_t eventFlags_; ///< EventBase::HandlerFlags settings + int fd_; ///< The socket file descriptor + mutable folly::SocketAddress addr_; ///< The address we tried to connect to + mutable folly::SocketAddress localAddr_; + ///< The address we are connecting from + uint32_t sendTimeout_; ///< The send timeout, in milliseconds + uint16_t maxReadsPerEvent_; ///< Max reads per event loop iteration + EventBase* eventBase_; ///< The EventBase + WriteTimeout writeTimeout_; ///< A timeout for connect and write + IoHandler ioHandler_; ///< A EventHandler to monitor the fd + ImmediateReadCB immediateReadHandler_; ///< LoopCallback for checking read + + ConnectCallback* connectCallback_; ///< ConnectCallback + ErrMessageCallback* errMessageCallback_; ///< TimestampCallback + SendMsgParamsCallback* ///< Callback for retreaving + sendMsgParamCallback_; ///< ::sendmsg() parameters + ReadCallback* readCallback_; ///< ReadCallback + WriteRequest* writeReqHead_; ///< Chain of WriteRequests + WriteRequest* writeReqTail_; ///< End of WriteRequest chain ShutdownSocketSet* shutdownSocketSet_; - size_t appBytesReceived_; ///< Num of bytes received from socket - size_t appBytesWritten_; ///< Num of bytes written to socket -}; + size_t appBytesReceived_; ///< Num of bytes received from socket + size_t appBytesWritten_; ///< Num of bytes written to socket + bool isBufferMovable_{false}; + // Pre-received data, to be returned to read callback before any data from the + // socket. + std::unique_ptr preReceivedData_; + + int8_t readErr_{READ_NO_ERROR}; ///< The read error encountered, if any + + std::chrono::steady_clock::time_point connectStartTime_; + std::chrono::steady_clock::time_point connectEndTime_; + + std::chrono::milliseconds connectTimeout_{0}; + + BufferCallback* bufferCallback_{nullptr}; + bool tfoEnabled_{false}; + bool tfoAttempted_{false}; + bool tfoFinished_{false}; + bool noTransparentTls_{false}; + bool noTSocks_{false}; + // Whether to track EOR or not. + bool trackEor_{false}; + + std::unique_ptr evbChangeCb_{nullptr}; +}; +#ifdef _MSC_VER +#pragma vtordisp(pop) +#endif } // folly