Return if we handle any error messages to avoid unnecessarily calling recv/send
[folly.git] / folly / io / async / AsyncSocket.h
index 4056066045bdf7012d2780c53a3f26c18fd872ea..c85ac6b87bd51392d9a667cffadbb8c5fe886bff 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2017-present Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 #pragma once
 
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <glog/logging.h>
+#include <folly/Optional.h>
 #include <folly/SocketAddress.h>
-#include <folly/io/ShutdownSocketSet.h>
+#include <folly/detail/SocketFastOpen.h>
 #include <folly/io/IOBuf.h>
-#include <folly/io/async/AsyncTimeout.h>
+#include <folly/io/ShutdownSocketSet.h>
 #include <folly/io/async/AsyncSocketException.h>
+#include <folly/io/async/AsyncTimeout.h>
 #include <folly/io/async/AsyncTransport.h>
-#include <folly/io/async/EventHandler.h>
 #include <folly/io/async/DelayedDestruction.h>
+#include <folly/io/async/EventHandler.h>
+#include <folly/portability/Sockets.h>
 
-#include <memory>
+#include <sys/types.h>
+
+#include <chrono>
 #include <map>
+#include <memory>
 
 namespace folly {
 
@@ -61,13 +63,28 @@ namespace folly {
  * responding and no further progress can be made sending the data.
  */
 
-class AsyncSocket : virtual public AsyncTransport {
+#if defined __linux__ && !defined SO_NO_TRANSPARENT_TLS
+#define SO_NO_TRANSPARENT_TLS 200
+#endif
+
+#if defined __linux__ && !defined SO_NO_TSOCKS
+#define SO_NO_TSOCKS 201
+#endif
+
+#ifdef _MSC_VER
+// We do a dynamic_cast on this, in
+// AsyncTransportWrapper::getUnderlyingTransport so be safe and
+// force displacements for it. See:
+// https://msdn.microsoft.com/en-us/library/7sf3txa8.aspx
+#pragma vtordisp(push, 2)
+#endif
+class AsyncSocket : virtual public AsyncTransportWrapper {
  public:
   typedef std::unique_ptr<AsyncSocket, Destructor> UniquePtr;
 
   class ConnectCallback {
    public:
-    virtual ~ConnectCallback() {}
+    virtual ~ConnectCallback() = default;
 
     /**
      * connectSuccess() will be invoked when the connection has been
@@ -84,104 +101,116 @@ class AsyncSocket : virtual public AsyncTransport {
       noexcept = 0;
   };
 
-  class ReadCallback {
+  class EvbChangeCallback {
    public:
-    virtual ~ReadCallback() {}
+    virtual ~EvbChangeCallback() = default;
+
+    // Called when the socket has been attached to a new EVB
+    // and is called from within that EVB thread
+    virtual void evbAttached(AsyncSocket* socket) = 0;
+
+    // Called when the socket is detached from an EVB and
+    // is called from the EVB thread being detached
+    virtual void evbDetached(AsyncSocket* socket) = 0;
+  };
+
+  /**
+   * This interface is implemented only for platforms supporting
+   * per-socket error queues.
+   */
+  class ErrMessageCallback {
+   public:
+    virtual ~ErrMessageCallback() = default;
 
     /**
-     * When data becomes available, getReadBuffer() will be invoked to get the
-     * buffer into which data should be read.
-     *
-     * This method allows the ReadCallback to delay buffer allocation until
-     * data becomes available.  This allows applications to manage large
-     * numbers of idle connections, without having to maintain a separate read
-     * buffer for each idle connection.
-     *
-     * It is possible that in some cases, getReadBuffer() may be called
-     * multiple times before readDataAvailable() is invoked.  In this case, the
-     * data will be written to the buffer returned from the most recent call to
-     * readDataAvailable().  If the previous calls to readDataAvailable()
-     * returned different buffers, the ReadCallback is responsible for ensuring
-     * that they are not leaked.
+     * errMessage() will be invoked when kernel puts a message to
+     * the error queue associated with the socket.
      *
-     * If getReadBuffer() throws an exception, returns a nullptr buffer, or
-     * returns a 0 length, the ReadCallback will be uninstalled and its
-     * readErr() method will be invoked.
-     *
-     * getReadBuffer() is not allowed to change the transport state before it
-     * returns.  (For example, it should never uninstall the read callback, or
-     * set a different read callback.)
-     *
-     * @param bufReturn getReadBuffer() should update *bufReturn to contain the
-     *                  address of the read buffer.  This parameter will never
-     *                  be nullptr.
-     * @param lenReturn getReadBuffer() should update *lenReturn to contain the
-     *                  maximum number of bytes that may be written to the read
-     *                  buffer.  This parameter will never be nullptr.
+     * @param cmsg      Reference to cmsghdr structure describing
+     *                  a message read from error queue associated
+     *                  with the socket.
      */
-    virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0;
+    virtual void
+    errMessage(const cmsghdr& cmsg) noexcept = 0;
 
     /**
-     * readDataAvailable() will be invoked when data has been successfully read
-     * into the buffer returned by the last call to getReadBuffer().
-     *
-     * The read callback remains installed after readDataAvailable() returns.
-     * It must be explicitly uninstalled to stop receiving read events.
-     * getReadBuffer() will be called at least once before each call to
-     * readDataAvailable().  getReadBuffer() will also be called before any
-     * call to readEOF().
+     * errMessageError() will be invoked if an error occurs reading a message
+     * from the socket error stream.
      *
-     * @param len       The number of bytes placed in the buffer.
+     * @param ex        An exception describing the error that occurred.
      */
-    virtual void readDataAvailable(size_t len) noexcept = 0;
+    virtual void errMessageError(const AsyncSocketException& ex) noexcept = 0;
+  };
+
+  class SendMsgParamsCallback {
+   public:
+    virtual ~SendMsgParamsCallback() = default;
 
     /**
-     * readEOF() will be invoked when the transport is closed.
+     * getFlags() will be invoked to retrieve the desired flags to be passed
+     * to ::sendmsg() system call. This method was intentionally declared
+     * non-virtual, so there is no way to override it. Instead feel free to
+     * override getFlagsImpl(flags, defaultFlags) method instead, and enjoy
+     * the convenience of defaultFlags passed there.
      *
-     * The read callback will be automatically uninstalled immediately before
-     * readEOF() is invoked.
+     * @param flags     Write flags requested for the given write operation
      */
-    virtual void readEOF() noexcept = 0;
+    int getFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept {
+      return getFlagsImpl(flags, getDefaultFlags(flags, zeroCopyEnabled));
+    }
 
     /**
-     * readErr() will be invoked if an error occurs reading from the
-     * transport.
+     * getAncillaryData() will be invoked to initialize ancillary data
+     * buffer referred by "msg_control" field of msghdr structure passed to
+     * ::sendmsg() system call. The function assumes that the size of buffer
+     * is not smaller than the value returned by getAncillaryDataSize() method
+     * for the same combination of flags.
      *
-     * The read callback will be automatically uninstalled immediately before
-     * readErr() is invoked.
+     * @param flags     Write flags requested for the given write operation
+     * @param data      Pointer to ancillary data buffer to initialize.
+     */
+    virtual void getAncillaryData(
+      folly::WriteFlags /*flags*/,
+      void* /*data*/) noexcept {}
+
+    /**
+     * getAncillaryDataSize() will be invoked to retrieve the size of
+     * ancillary data buffer which should be passed to ::sendmsg() system call
      *
-     * @param ex        An exception describing the error that occurred.
+     * @param flags     Write flags requested for the given write operation
      */
-    virtual void readErr(const AsyncSocketException& ex)
-      noexcept = 0;
-  };
+    virtual uint32_t getAncillaryDataSize(folly::WriteFlags /*flags*/)
+        noexcept {
+      return 0;
+    }
 
-  class WriteCallback {
-   public:
-    virtual ~WriteCallback() {}
+    static const size_t maxAncillaryDataSize{0x5000};
 
+   private:
     /**
-     * writeSuccess() will be invoked when all of the data has been
-     * successfully written.
+     * getFlagsImpl() will be invoked by getFlags(folly::WriteFlags flags)
+     * method to retrieve the flags to be passed to ::sendmsg() system call.
+     * SendMsgParamsCallback::getFlags() is calling this method, and returns
+     * its results directly to the caller in AsyncSocket.
+     * Classes inheriting from SendMsgParamsCallback are welcome to override
+     * this method to force SendMsgParamsCallback to return its own set
+     * of flags.
      *
-     * Note that this mainly signals that the buffer containing the data to
-     * write is no longer needed and may be freed or re-used.  It does not
-     * guarantee that the data has been fully transmitted to the remote
-     * endpoint.  For example, on socket-based transports, writeSuccess() only
-     * indicates that the data has been given to the kernel for eventual
-     * transmission.
+     * @param flags        Write flags requested for the given write operation
+     * @param defaultflags A set of message flags returned by getDefaultFlags()
+     *                     method for the given "flags" mask.
      */
-    virtual void writeSuccess() noexcept = 0;
+    virtual int getFlagsImpl(folly::WriteFlags /*flags*/, int defaultFlags) {
+      return defaultFlags;
+    }
 
     /**
-     * writeErr() will be invoked if an error occurs writing the data.
+     * getDefaultFlags() will be invoked by  getFlags(folly::WriteFlags flags)
+     * to retrieve the default set of flags, and pass them to getFlagsImpl(...)
      *
-     * @param bytesWritten      The number of bytes that were successfull
-     * @param ex                An exception describing the error that occurred.
+     * @param flags     Write flags requested for the given write operation
      */
-    virtual void writeErr(size_t bytesWritten,
-                            const AsyncSocketException& ex)
-      noexcept = 0;
+    int getDefaultFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept;
   };
 
   explicit AsyncSocket();
@@ -192,7 +221,7 @@ class AsyncSocket : virtual public AsyncTransport {
    */
   explicit AsyncSocket(EventBase* evb);
 
-  void setShutdownSocketSet(ShutdownSocketSet* ss);
+  void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wSS);
 
   /**
    * Create a new AsyncSocket and begin the connection process.
@@ -231,8 +260,17 @@ class AsyncSocket : virtual public AsyncTransport {
    *
    * @param evb EventBase that will manage this socket.
    * @param fd  File descriptor to take over (should be a connected socket).
+   * @param zeroCopyBufId Zerocopy buf id to start with.
    */
-  AsyncSocket(EventBase* evb, int fd);
+  AsyncSocket(EventBase* evb, int fd, uint32_t zeroCopyBufId = 0);
+
+  /**
+   * Create an AsyncSocket from a different, already connected AsyncSocket.
+   *
+   * Similar to AsyncSocket(evb, fd) when fd was previously owned by an
+   * AsyncSocket.
+   */
+  explicit AsyncSocket(AsyncSocket::UniquePtr);
 
   /**
    * Helper function to create a shared_ptr<AsyncSocket>.
@@ -286,7 +324,7 @@ class AsyncSocket : virtual public AsyncTransport {
    * This prevents callers from deleting a AsyncSocket while it is invoking a
    * callback.
    */
-  virtual void destroy();
+  void destroy() override;
 
   /**
    * Get the EventBase used by this socket.
@@ -309,6 +347,10 @@ class AsyncSocket : virtual public AsyncTransport {
    * error.  The AsyncSocket may no longer be used after the file descriptor
    * has been extracted.
    *
+   * This method should be used with care as the resulting fd is not guaranteed
+   * to perfectly reflect the state of the AsyncSocket (security state,
+   * pre-received data, etc.).
+   *
    * Returns the file descriptor.  The caller assumes ownership of the
    * descriptor, and it will not be closed when the AsyncSocket is destroyed.
    */
@@ -338,7 +380,7 @@ class AsyncSocket : virtual public AsyncTransport {
   typedef std::map<OptionKey, int> OptionMap;
 
   static const OptionMap emptyOptionMap;
-  static const folly::SocketAddress anyAddress;
+  static const folly::SocketAddress& anyAddress();
 
   /**
    * Initiate a connection.
@@ -350,15 +392,28 @@ class AsyncSocket : virtual public AsyncTransport {
    *                  does not succeed within this period,
    *                  callback->connectError() will be invoked.
    */
-  virtual void connect(ConnectCallback* callback,
-               const folly::SocketAddress& address,
-               int timeout = 0,
-               const OptionMap &options = emptyOptionMap,
-               const folly::SocketAddress& bindAddr = anyAddress
-               ) noexcept;
-  void connect(ConnectCallback* callback, const std::string& ip, uint16_t port,
-               int timeout = 00,
-               const OptionMap &options = emptyOptionMap) noexcept;
+  virtual void connect(
+      ConnectCallback* callback,
+      const folly::SocketAddress& address,
+      int timeout = 0,
+      const OptionMap& options = emptyOptionMap,
+      const folly::SocketAddress& bindAddr = anyAddress()) noexcept;
+
+  void connect(
+      ConnectCallback* callback,
+      const std::string& ip,
+      uint16_t port,
+      int timeout = 0,
+      const OptionMap& options = emptyOptionMap) noexcept;
+
+  /**
+   * If a connect request is in-flight, cancels it and closes the socket
+   * immediately. Otherwise, this is a no-op.
+   *
+   * This does not invoke any connection related callbacks. Call this to
+   * prevent any connect callback while cleaning up, etc.
+   */
+  void cancelConnect();
 
   /**
    * Set the send timeout.
@@ -413,17 +468,64 @@ class AsyncSocket : virtual public AsyncTransport {
     return maxReadsPerEvent_;
   }
 
+  /**
+   * Set a pointer to ErrMessageCallback implementation which will be
+   * receiving notifications for messages posted to the error queue
+   * associated with the socket.
+   * ErrMessageCallback is implemented only for platforms with
+   * per-socket error message queus support (recvmsg() system call must
+   * )
+   *
+   */
+  virtual void setErrMessageCB(ErrMessageCallback* callback);
+
+  /**
+   * Get a pointer to ErrMessageCallback implementation currently
+   * registered with this socket.
+   *
+   */
+  virtual ErrMessageCallback* getErrMessageCallback() const;
+
+  /**
+   * Set a pointer to SendMsgParamsCallback implementation which
+   * will be used to form ::sendmsg() system call parameters
+   *
+   */
+  virtual void setSendMsgParamCB(SendMsgParamsCallback* callback);
+
+  /**
+   * Get a pointer to SendMsgParamsCallback implementation currently
+   * registered with this socket.
+   *
+   */
+  virtual SendMsgParamsCallback* getSendMsgParamsCB() const;
+
   // Read and write methods
-  void setReadCB(ReadCallback* callback);
-  ReadCallback* getReadCallback() const;
+  void setReadCB(ReadCallback* callback) override;
+  ReadCallback* getReadCallback() const override;
+
+  bool setZeroCopy(bool enable);
+  bool getZeroCopy() const {
+    return zeroCopyEnabled_;
+  }
+
+  uint32_t getZeroCopyBufId() const {
+    return zeroCopyBufId_;
+  }
 
   void write(WriteCallback* callback, const void* buf, size_t bytes,
-             WriteFlags flags = WriteFlags::NONE);
+             WriteFlags flags = WriteFlags::NONE) override;
   void writev(WriteCallback* callback, const iovec* vec, size_t count,
-              WriteFlags flags = WriteFlags::NONE);
+              WriteFlags flags = WriteFlags::NONE) override;
   void writeChain(WriteCallback* callback,
                   std::unique_ptr<folly::IOBuf>&& buf,
-                  WriteFlags flags = WriteFlags::NONE);
+                  WriteFlags flags = WriteFlags::NONE) override;
+
+  class WriteRequest;
+  virtual void writeRequest(WriteRequest* req);
+  void writeRequestReady() {
+    handleWrite();
+  }
 
   // Methods inherited from AsyncTransport
   void close() override;
@@ -433,6 +535,7 @@ class AsyncSocket : virtual public AsyncTransport {
   void shutdownWriteNow() override;
 
   bool readable() const override;
+  bool writable() const override;
   bool isPending() const override;
   virtual bool hangup() const;
   bool good() const override;
@@ -446,14 +549,28 @@ class AsyncSocket : virtual public AsyncTransport {
   void getPeerAddress(
     folly::SocketAddress* address) const override;
 
-  bool isEorTrackingEnabled() const override { return false; }
+  bool isEorTrackingEnabled() const override {
+    return trackEor_;
+  }
 
-  void setEorTracking(bool track) override {}
+  void setEorTracking(bool track) override {
+    trackEor_ = track;
+  }
 
   bool connecting() const override {
     return (state_ == StateEnum::CONNECTING);
   }
 
+  virtual bool isClosedByPeer() const {
+    return (state_ == StateEnum::CLOSED &&
+            (readErr_ == READ_EOF || readErr_ == READ_ERROR));
+  }
+
+  virtual bool isClosedBySelf() const {
+    return (state_ == StateEnum::CLOSED &&
+            (readErr_ != READ_EOF && readErr_ != READ_ERROR));
+  }
+
   size_t getAppBytesWritten() const override {
     return appBytesWritten_;
   }
@@ -470,6 +587,42 @@ class AsyncSocket : virtual public AsyncTransport {
     return getAppBytesReceived();
   }
 
+  std::chrono::nanoseconds getConnectTime() const {
+    return connectEndTime_ - connectStartTime_;
+  }
+
+  std::chrono::milliseconds getConnectTimeout() const {
+    return connectTimeout_;
+  }
+
+  bool getTFOAttempted() const {
+    return tfoAttempted_;
+  }
+
+  /**
+   * Returns whether or not the attempt to use TFO
+   * finished successfully. This does not necessarily
+   * mean TFO worked, just that trying to use TFO
+   * succeeded.
+   */
+  bool getTFOFinished() const {
+    return tfoFinished_;
+  }
+
+  /**
+   * Returns whether or not TFO attempt succeded on this
+   * connection.
+   * For servers this is pretty straightforward API and can
+   * be invoked right after the connection is accepted. This API
+   * will perform one syscall.
+   * This API is a bit tricky to use for clients, since clients
+   * only know this for sure after the SYN-ACK is returned. So it's
+   * appropriate to call this only after the first application
+   * data is read from the socket when the caller knows that
+   * the SYN has been ACKed by the server.
+   */
+  bool getTFOSucceded() const;
+
   // Methods controlling socket options
 
   /**
@@ -489,6 +642,13 @@ class AsyncSocket : virtual public AsyncTransport {
    */
   int setNoDelay(bool noDelay);
 
+
+  /**
+   * Set the FD_CLOEXEC flag so that the socket will be closed if the program
+   * later forks and execs.
+   */
+  void setCloseOnExec();
+
   /*
    * Set the Flavor of Congestion Control to be used for this Socket
    * Please check '/lib/modules/<kernel>/kernel/net/ipv4' for tcp_*.ko
@@ -522,7 +682,6 @@ class AsyncSocket : virtual public AsyncTransport {
   #define SO_SET_NAMESPACE        41
   int setTCPProfile(int profd);
 
-
   /**
    * Generic API for reading a socket option.
    *
@@ -553,12 +712,195 @@ class AsyncSocket : virtual public AsyncTransport {
     return setsockopt(fd_, level, optname, optval, sizeof(T));
   }
 
+  /**
+   * Virtual method for reading a socket option returning integer
+   * value, which is the most typical case. Convenient for overriding
+   * and mocking.
+   *
+   * @param level     same as the "level" parameter in getsockopt().
+   * @param optname   same as the "optname" parameter in getsockopt().
+   * @param optval    same as "optval" parameter in getsockopt().
+   * @param optlen    same as "optlen" parameter in getsockopt().
+   * @return          same as the return value of getsockopt().
+   */
+  virtual int
+  getSockOptVirtual(int level, int optname, void* optval, socklen_t* optlen) {
+    return getsockopt(fd_, level, optname, optval, optlen);
+  }
+
+  /**
+   * Virtual method for setting a socket option accepting integer
+   * value, which is the most typical case. Convenient for overriding
+   * and mocking.
+   *
+   * @param level     same as the "level" parameter in setsockopt().
+   * @param optname   same as the "optname" parameter in setsockopt().
+   * @param optval    same as "optval" parameter in setsockopt().
+   * @param optlen    same as "optlen" parameter in setsockopt().
+   * @return          same as the return value of setsockopt().
+   */
+  virtual int setSockOptVirtual(
+      int level,
+      int optname,
+      void const* optval,
+      socklen_t optlen) {
+    return setsockopt(fd_, level, optname, optval, optlen);
+  }
+
+  /**
+   * Set pre-received data, to be returned to read callback before any data
+   * from the socket.
+   */
+  virtual void setPreReceivedData(std::unique_ptr<IOBuf> data) {
+    if (preReceivedData_) {
+      preReceivedData_->prependChain(std::move(data));
+    } else {
+      preReceivedData_ = std::move(data);
+    }
+  }
+
+  /**
+   * Enables TFO behavior on the AsyncSocket if FOLLY_ALLOW_TFO
+   * is set.
+   */
+  void enableTFO() {
+    // No-op if folly does not allow tfo
+#if FOLLY_ALLOW_TFO
+    tfoEnabled_ = true;
+#endif
+  }
+
+  void disableTransparentTls() {
+    noTransparentTls_ = true;
+  }
+
+  void disableTSocks() {
+    noTSocks_ = true;
+  }
+
   enum class StateEnum : uint8_t {
     UNINIT,
     CONNECTING,
     ESTABLISHED,
     CLOSED,
-    ERROR
+    ERROR,
+    FAST_OPEN,
+  };
+
+  void setBufferCallback(BufferCallback* cb);
+
+  // Callers should set this prior to connecting the socket for the safest
+  // behavior.
+  void setEvbChangedCallback(std::unique_ptr<EvbChangeCallback> cb) {
+    evbChangeCb_ = std::move(cb);
+  }
+
+  /**
+   * Attempt to cache the current local and peer addresses (if not already
+   * cached) so that they are available from getPeerAddress() and
+   * getLocalAddress() even after the socket is closed.
+   */
+  void cacheAddresses();
+
+  /**
+   * Returns true if there is any zero copy write in progress
+   * Needs to be called from within the socket's EVB thread
+   */
+  bool isZeroCopyWriteInProgress() const noexcept;
+
+  /**
+   * Tries to process the msg error queue
+   * And returns true if there are no more zero copy writes in progress
+   */
+  bool processZeroCopyWriteInProgress() noexcept;
+
+  /**
+   * writeReturn is the total number of bytes written, or WRITE_ERROR on error.
+   * If no data has been written, 0 is returned.
+   * exception is a more specific exception that cause a write error.
+   * Not all writes have exceptions associated with them thus writeReturn
+   * should be checked to determine whether the operation resulted in an error.
+   */
+  struct WriteResult {
+    explicit WriteResult(ssize_t ret) : writeReturn(ret) {}
+
+    WriteResult(ssize_t ret, std::unique_ptr<const AsyncSocketException> e)
+        : writeReturn(ret), exception(std::move(e)) {}
+
+    ssize_t writeReturn;
+    std::unique_ptr<const AsyncSocketException> exception;
+  };
+
+  /**
+   * readReturn is the number of bytes read, or READ_EOF on EOF, or
+   * READ_ERROR on error, or READ_BLOCKING if the operation will
+   * block.
+   * exception is a more specific exception that may have caused a read error.
+   * Not all read errors have exceptions associated with them thus readReturn
+   * should be checked to determine whether the operation resulted in an error.
+   */
+  struct ReadResult {
+    explicit ReadResult(ssize_t ret) : readReturn(ret) {}
+
+    ReadResult(ssize_t ret, std::unique_ptr<const AsyncSocketException> e)
+        : readReturn(ret), exception(std::move(e)) {}
+
+    ssize_t readReturn;
+    std::unique_ptr<const AsyncSocketException> exception;
+  };
+
+  /**
+   * A WriteRequest object tracks information about a pending write operation.
+   */
+  class WriteRequest {
+   public:
+    WriteRequest(AsyncSocket* socket, WriteCallback* callback) :
+      socket_(socket), callback_(callback) {}
+
+    virtual void start() {}
+
+    virtual void destroy() = 0;
+
+    virtual WriteResult performWrite() = 0;
+
+    virtual void consume() = 0;
+
+    virtual bool isComplete() = 0;
+
+    WriteRequest* getNext() const {
+      return next_;
+    }
+
+    WriteCallback* getCallback() const {
+      return callback_;
+    }
+
+    uint32_t getTotalBytesWritten() const {
+      return totalBytesWritten_;
+    }
+
+    void append(WriteRequest* next) {
+      assert(next_ == nullptr);
+      next_ = next;
+    }
+
+    void fail(const char* fn, const AsyncSocketException& ex) {
+      socket_->failWrite(fn, ex);
+    }
+
+    void bytesWritten(size_t count) {
+      totalBytesWritten_ += uint32_t(count);
+      socket_->appBytesWritten_ += count;
+    }
+
+   protected:
+    // protected destructor, to ensure callers use destroy()
+    virtual ~WriteRequest() {}
+
+    AsyncSocket* socket_;         ///< parent socket
+    WriteRequest* next_{nullptr};          ///< pointer to next WriteRequest
+    WriteCallback* callback_;     ///< completion callback
+    uint32_t totalBytesWritten_{0};  ///< total bytes written
   };
 
  protected:
@@ -566,6 +908,11 @@ class AsyncSocket : virtual public AsyncTransport {
     READ_EOF = 0,
     READ_ERROR = -1,
     READ_BLOCKING = -2,
+    READ_NO_ERROR = -3,
+  };
+
+  enum WriteResultEnum {
+    WRITE_ERROR = -1,
   };
 
   /**
@@ -575,7 +922,7 @@ class AsyncSocket : virtual public AsyncTransport {
    * destroy() instead.  (See the documentation in DelayedDestruction.h for
    * more details.)
    */
-  ~AsyncSocket();
+  ~AsyncSocket() override;
 
   friend std::ostream& operator << (std::ostream& os, const StateEnum& state);
 
@@ -600,7 +947,7 @@ class AsyncSocket : virtual public AsyncTransport {
     SHUT_READ = 0x04,
   };
 
-  class WriteRequest;
+  class BytesWriteRequest;
 
   class WriteTimeout : public AsyncTimeout {
    public:
@@ -608,7 +955,7 @@ class AsyncSocket : virtual public AsyncTransport {
       : AsyncTimeout(eventBase)
       , socket_(socket) {}
 
-    virtual void timeoutExpired() noexcept {
+    void timeoutExpired() noexcept override {
       socket_->timeoutExpired();
     }
 
@@ -625,7 +972,7 @@ class AsyncSocket : virtual public AsyncTransport {
       : EventHandler(eventBase, fd)
       , socket_(socket) {}
 
-    virtual void handlerReady(uint16_t events) noexcept {
+    void handlerReady(uint16_t events) noexcept override {
       socket_->ioReady(events);
     }
 
@@ -635,10 +982,47 @@ class AsyncSocket : virtual public AsyncTransport {
 
   void init();
 
+  class ImmediateReadCB : public folly::EventBase::LoopCallback {
+   public:
+    explicit ImmediateReadCB(AsyncSocket* socket) : socket_(socket) {}
+    void runLoopCallback() noexcept override {
+      DestructorGuard dg(socket_);
+      socket_->checkForImmediateRead();
+    }
+   private:
+    AsyncSocket* socket_;
+  };
+
+  /**
+   * Schedule checkForImmediateRead to be executed in the next loop
+   * iteration.
+   */
+  void scheduleImmediateRead() noexcept {
+    if (good()) {
+      eventBase_->runInLoop(&immediateReadHandler_);
+    }
+  }
+
+  /**
+   * Schedule handleInitalReadWrite to run in the next iteration.
+   */
+  void scheduleInitialReadWrite() noexcept {
+    if (good()) {
+      DestructorGuard dg(this);
+      eventBase_->runInLoop([this, dg] {
+        if (good()) {
+          handleInitialReadWrite();
+        }
+      });
+    }
+  }
+
   // event notification methods
   void ioReady(uint16_t events) noexcept;
   virtual void checkForImmediateRead() noexcept;
   virtual void handleInitialReadWrite() noexcept;
+  virtual void prepareReadBuffer(void** buf, size_t* buflen);
+  virtual size_t handleErrMessages() noexcept;
   virtual void handleRead() noexcept;
   virtual void handleWrite() noexcept;
   virtual void handleConnect() noexcept;
@@ -650,11 +1034,9 @@ class AsyncSocket : virtual public AsyncTransport {
    * @param buf      The buffer to read data into.
    * @param buflen   The length of the buffer.
    *
-   * @return Returns the number of bytes read, or READ_EOF on EOF, or
-   * READ_ERROR on error, or READ_BLOCKING if the operation will
-   * block.
+   * @return Returns a read result. See read result for details.
    */
-  virtual ssize_t performRead(void* buf, size_t buflen);
+  virtual ReadResult performRead(void** buf, size_t* buflen, size_t* offset);
 
   /**
    * Populate an iovec array from an IOBuf and attempt to write it.
@@ -703,12 +1085,30 @@ class AsyncSocket : virtual public AsyncTransport {
    *                          will contain the number of bytes written in the
    *                          partially written iovec entry.
    *
-   * @return Returns the total number of bytes written, or -1 on error.  If no
-   *     data can be written immediately, 0 is returned.
+   * @return Returns a WriteResult. See WriteResult for more details.
+   */
+  virtual WriteResult performWrite(
+      const iovec* vec,
+      uint32_t count,
+      WriteFlags flags,
+      uint32_t* countWritten,
+      uint32_t* partialWritten);
+
+  /**
+   * Sends the message over the socket using sendmsg
+   *
+   * @param msg       Message to send
+   * @param msg_flags Flags to pass to sendmsg
    */
-  virtual ssize_t performWrite(const iovec* vec, uint32_t count,
-                               WriteFlags flags, uint32_t* countWritten,
-                               uint32_t* partialWritten);
+  AsyncSocket::WriteResult
+  sendSocketMessage(int fd, struct msghdr* msg, int msg_flags);
+
+  virtual ssize_t tfoSendMsg(int fd, struct msghdr* msg, int msg_flags);
+
+  int socketConnect(const struct sockaddr* addr, socklen_t len);
+
+  virtual void scheduleConnectTimeout();
+  void registerForConnectEvents();
 
   bool updateEventRegistration();
 
@@ -732,39 +1132,111 @@ class AsyncSocket : virtual public AsyncTransport {
   // error handling methods
   void startFail();
   void finishFail();
+  void finishFail(const AsyncSocketException& ex);
+  void invokeAllErrors(const AsyncSocketException& ex);
   void fail(const char* fn, const AsyncSocketException& ex);
   void failConnect(const char* fn, const AsyncSocketException& ex);
   void failRead(const char* fn, const AsyncSocketException& ex);
+  void failErrMessageRead(const char* fn, const AsyncSocketException& ex);
   void failWrite(const char* fn, WriteCallback* callback, size_t bytesWritten,
                  const AsyncSocketException& ex);
   void failWrite(const char* fn, const AsyncSocketException& ex);
   void failAllWrites(const AsyncSocketException& ex);
+  virtual void invokeConnectErr(const AsyncSocketException& ex);
+  virtual void invokeConnectSuccess();
   void invalidState(ConnectCallback* callback);
+  void invalidState(ErrMessageCallback* callback);
   void invalidState(ReadCallback* callback);
   void invalidState(WriteCallback* callback);
 
   std::string withAddr(const std::string& s);
 
-  StateEnum state_;                     ///< StateEnum describing current state
-  uint8_t shutdownFlags_;               ///< Shutdown state (ShutdownFlags)
-  uint16_t eventFlags_;                 ///< EventBase::HandlerFlags settings
-  int fd_;                              ///< The socket file descriptor
-  mutable
-    folly::SocketAddress addr_;    ///< The address we tried to connect to
-  uint32_t sendTimeout_;                ///< The send timeout, in milliseconds
-  uint16_t maxReadsPerEvent_;           ///< Max reads per event loop iteration
-  EventBase* eventBase_;               ///< The EventBase
-  WriteTimeout writeTimeout_;           ///< A timeout for connect and write
-  IoHandler ioHandler_;                 ///< A EventHandler to monitor the fd
-
-  ConnectCallback* connectCallback_;    ///< ConnectCallback
-  ReadCallback* readCallback_;          ///< ReadCallback
-  WriteRequest* writeReqHead_;          ///< Chain of WriteRequests
-  WriteRequest* writeReqTail_;          ///< End of WriteRequest chain
-  ShutdownSocketSet* shutdownSocketSet_;
-  size_t appBytesReceived_;             ///< Num of bytes received from socket
-  size_t appBytesWritten_;              ///< Num of bytes written to socket
-};
+  void cacheLocalAddress() const;
+  void cachePeerAddress() const;
 
+  bool isZeroCopyRequest(WriteFlags flags);
+
+  bool isZeroCopyMsg(const cmsghdr& cmsg) const;
+  void processZeroCopyMsg(const cmsghdr& cmsg);
+
+  uint32_t getNextZeroCopyBufId() {
+    return zeroCopyBufId_++;
+  }
+  void adjustZeroCopyFlags(folly::WriteFlags& flags);
+  void addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
+  void addZeroCopyBuf(folly::IOBuf* ptr);
+  void setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
+  bool containsZeroCopyBuf(folly::IOBuf* ptr);
+  void releaseZeroCopyBuf(uint32_t id);
+
+  // a folly::IOBuf can be used in multiple partial requests
+  // there is a that maps a buffer id to a raw folly::IOBuf ptr
+  // and another one that adds a ref count for a folly::IOBuf that is either
+  // the original ptr or nullptr
+  uint32_t zeroCopyBufId_{0};
+
+  struct IOBufInfo {
+    uint32_t count_{0};
+    std::unique_ptr<folly::IOBuf> buf_;
+  };
+
+  std::unordered_map<uint32_t, folly::IOBuf*> idZeroCopyBufPtrMap_;
+  std::unordered_map<folly::IOBuf*, IOBufInfo> idZeroCopyBufInfoMap_;
+
+  StateEnum state_;                      ///< StateEnum describing current state
+  uint8_t shutdownFlags_;                ///< Shutdown state (ShutdownFlags)
+  uint16_t eventFlags_;                  ///< EventBase::HandlerFlags settings
+  int fd_;                               ///< The socket file descriptor
+  mutable folly::SocketAddress addr_;    ///< The address we tried to connect to
+  mutable folly::SocketAddress localAddr_;
+                                         ///< The address we are connecting from
+  uint32_t sendTimeout_;                 ///< The send timeout, in milliseconds
+  uint16_t maxReadsPerEvent_;            ///< Max reads per event loop iteration
+
+  bool isBufferMovable_{false};
+
+  int8_t readErr_{READ_NO_ERROR}; ///< The read error encountered, if any
+
+  EventBase* eventBase_;                 ///< The EventBase
+  WriteTimeout writeTimeout_;            ///< A timeout for connect and write
+  IoHandler ioHandler_;                  ///< A EventHandler to monitor the fd
+  ImmediateReadCB immediateReadHandler_; ///< LoopCallback for checking read
+
+  ConnectCallback* connectCallback_;     ///< ConnectCallback
+  ErrMessageCallback* errMessageCallback_; ///< TimestampCallback
+  SendMsgParamsCallback* ///< Callback for retrieving
+      sendMsgParamCallback_; ///< ::sendmsg() parameters
+  ReadCallback* readCallback_;           ///< ReadCallback
+  WriteRequest* writeReqHead_;           ///< Chain of WriteRequests
+  WriteRequest* writeReqTail_;           ///< End of WriteRequest chain
+  std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_;
+  size_t appBytesReceived_;              ///< Num of bytes received from socket
+  size_t appBytesWritten_;               ///< Num of bytes written to socket
+
+  // Pre-received data, to be returned to read callback before any data from the
+  // socket.
+  std::unique_ptr<IOBuf> preReceivedData_;
+
+  std::chrono::steady_clock::time_point connectStartTime_;
+  std::chrono::steady_clock::time_point connectEndTime_;
+
+  std::chrono::milliseconds connectTimeout_{0};
+
+  std::unique_ptr<EvbChangeCallback> evbChangeCb_{nullptr};
+
+  BufferCallback* bufferCallback_{nullptr};
+  bool tfoEnabled_{false};
+  bool tfoAttempted_{false};
+  bool tfoFinished_{false};
+  bool noTransparentTls_{false};
+  bool noTSocks_{false};
+  // Whether to track EOR or not.
+  bool trackEor_{false};
+  bool zeroCopyEnabled_{false};
+  bool zeroCopyVal_{false};
+};
+#ifdef _MSC_VER
+#pragma vtordisp(pop)
+#endif
 
-} // folly
+} // namespace folly