Add handshake and connect times
[folly.git] / folly / io / async / AsyncSocket.h
index 77bd2b0c4db6e50c1333517c35243d7ac9ac1b07..b7eeafc7eca21b0d2144283c5de974876ac20078 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2015 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
 #include <folly/io/async/EventHandler.h>
 #include <folly/io/async/DelayedDestruction.h>
 
+#include <chrono>
 #include <memory>
 #include <map>
 
@@ -61,13 +62,13 @@ namespace folly {
  * responding and no further progress can be made sending the data.
  */
 
-class AsyncSocket : virtual public AsyncTransport {
+class AsyncSocket : virtual public AsyncTransportWrapper {
  public:
   typedef std::unique_ptr<AsyncSocket, Destructor> UniquePtr;
 
   class ConnectCallback {
    public:
-    virtual ~ConnectCallback() {}
+    virtual ~ConnectCallback() = default;
 
     /**
      * connectSuccess() will be invoked when the connection has been
@@ -84,106 +85,7 @@ class AsyncSocket : virtual public AsyncTransport {
       noexcept = 0;
   };
 
-  class ReadCallback {
-   public:
-    virtual ~ReadCallback() {}
-
-    /**
-     * When data becomes available, getReadBuffer() will be invoked to get the
-     * buffer into which data should be read.
-     *
-     * This method allows the ReadCallback to delay buffer allocation until
-     * data becomes available.  This allows applications to manage large
-     * numbers of idle connections, without having to maintain a separate read
-     * buffer for each idle connection.
-     *
-     * It is possible that in some cases, getReadBuffer() may be called
-     * multiple times before readDataAvailable() is invoked.  In this case, the
-     * data will be written to the buffer returned from the most recent call to
-     * readDataAvailable().  If the previous calls to readDataAvailable()
-     * returned different buffers, the ReadCallback is responsible for ensuring
-     * that they are not leaked.
-     *
-     * If getReadBuffer() throws an exception, returns a nullptr buffer, or
-     * returns a 0 length, the ReadCallback will be uninstalled and its
-     * readError() method will be invoked.
-     *
-     * getReadBuffer() is not allowed to change the transport state before it
-     * returns.  (For example, it should never uninstall the read callback, or
-     * set a different read callback.)
-     *
-     * @param bufReturn getReadBuffer() should update *bufReturn to contain the
-     *                  address of the read buffer.  This parameter will never
-     *                  be nullptr.
-     * @param lenReturn getReadBuffer() should update *lenReturn to contain the
-     *                  maximum number of bytes that may be written to the read
-     *                  buffer.  This parameter will never be nullptr.
-     */
-    virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0;
-
-    /**
-     * readDataAvailable() will be invoked when data has been successfully read
-     * into the buffer returned by the last call to getReadBuffer().
-     *
-     * The read callback remains installed after readDataAvailable() returns.
-     * It must be explicitly uninstalled to stop receiving read events.
-     * getReadBuffer() will be called at least once before each call to
-     * readDataAvailable().  getReadBuffer() will also be called before any
-     * call to readEOF().
-     *
-     * @param len       The number of bytes placed in the buffer.
-     */
-    virtual void readDataAvailable(size_t len) noexcept = 0;
-
-    /**
-     * readEOF() will be invoked when the transport is closed.
-     *
-     * The read callback will be automatically uninstalled immediately before
-     * readEOF() is invoked.
-     */
-    virtual void readEOF() noexcept = 0;
-
-    /**
-     * readError() will be invoked if an error occurs reading from the
-     * transport.
-     *
-     * The read callback will be automatically uninstalled immediately before
-     * readError() is invoked.
-     *
-     * @param ex        An exception describing the error that occurred.
-     */
-    virtual void readErr(const AsyncSocketException& ex)
-      noexcept = 0;
-  };
-
-  class WriteCallback {
-   public:
-    virtual ~WriteCallback() {}
-
-    /**
-     * writeSuccess() will be invoked when all of the data has been
-     * successfully written.
-     *
-     * Note that this mainly signals that the buffer containing the data to
-     * write is no longer needed and may be freed or re-used.  It does not
-     * guarantee that the data has been fully transmitted to the remote
-     * endpoint.  For example, on socket-based transports, writeSuccess() only
-     * indicates that the data has been given to the kernel for eventual
-     * transmission.
-     */
-    virtual void writeSuccess() noexcept = 0;
-
-    /**
-     * writeError() will be invoked if an error occurs writing the data.
-     *
-     * @param bytesWritten      The number of bytes that were successfull
-     * @param ex                An exception describing the error that occurred.
-     */
-    virtual void writeErr(size_t bytesWritten,
-                            const AsyncSocketException& ex)
-      noexcept = 0;
-  };
-
+  explicit AsyncSocket();
   /**
    * Create a new unconnected AsyncSocket.
    *
@@ -285,7 +187,7 @@ class AsyncSocket : virtual public AsyncTransport {
    * This prevents callers from deleting a AsyncSocket while it is invoking a
    * callback.
    */
-  virtual void destroy();
+  virtual void destroy() override;
 
   /**
    * Get the EventBase used by this socket.
@@ -337,7 +239,7 @@ class AsyncSocket : virtual public AsyncTransport {
   typedef std::map<OptionKey, int> OptionMap;
 
   static const OptionMap emptyOptionMap;
-  static const folly::SocketAddress anyAddress;
+  static const folly::SocketAddress& anyAddress();
 
   /**
    * Initiate a connection.
@@ -353,12 +255,21 @@ class AsyncSocket : virtual public AsyncTransport {
                const folly::SocketAddress& address,
                int timeout = 0,
                const OptionMap &options = emptyOptionMap,
-               const folly::SocketAddress& bindAddr = anyAddress
+               const folly::SocketAddress& bindAddr = anyAddress()
                ) noexcept;
   void connect(ConnectCallback* callback, const std::string& ip, uint16_t port,
                int timeout = 00,
                const OptionMap &options = emptyOptionMap) noexcept;
 
+  /**
+   * If a connect request is in-flight, cancels it and closes the socket
+   * immediately. Otherwise, this is a no-op.
+   *
+   * This does not invoke any connection related callbacks. Call this to
+   * prevent any connect callback while cleaning up, etc.
+   */
+  void cancelConnect();
+
   /**
    * Set the send timeout.
    *
@@ -413,16 +324,22 @@ class AsyncSocket : virtual public AsyncTransport {
   }
 
   // Read and write methods
-  void setReadCB(ReadCallback* callback);
-  ReadCallback* getReadCallback() const;
+  void setReadCB(ReadCallback* callback) override;
+  ReadCallback* getReadCallback() const override;
 
   void write(WriteCallback* callback, const void* buf, size_t bytes,
-             WriteFlags flags = WriteFlags::NONE);
+             WriteFlags flags = WriteFlags::NONE) override;
   void writev(WriteCallback* callback, const iovec* vec, size_t count,
-              WriteFlags flags = WriteFlags::NONE);
+              WriteFlags flags = WriteFlags::NONE) override;
   void writeChain(WriteCallback* callback,
                   std::unique_ptr<folly::IOBuf>&& buf,
-                  WriteFlags flags = WriteFlags::NONE);
+                  WriteFlags flags = WriteFlags::NONE) override;
+
+  class WriteRequest;
+  virtual void writeRequest(WriteRequest* req);
+  void writeRequestReady() {
+    handleWrite();
+  }
 
   // Methods inherited from AsyncTransport
   void close() override;
@@ -447,12 +364,22 @@ class AsyncSocket : virtual public AsyncTransport {
 
   bool isEorTrackingEnabled() const override { return false; }
 
-  void setEorTracking(bool track) override {}
+  void setEorTracking(bool /*track*/) override {}
 
   bool connecting() const override {
     return (state_ == StateEnum::CONNECTING);
   }
 
+  virtual bool isClosedByPeer() const {
+    return (state_ == StateEnum::CLOSED &&
+            (readErr_ == READ_EOF || readErr_ == READ_ERROR));
+  }
+
+  virtual bool isClosedBySelf() const {
+    return (state_ == StateEnum::CLOSED &&
+            (readErr_ != READ_EOF && readErr_ != READ_ERROR));
+  }
+
   size_t getAppBytesWritten() const override {
     return appBytesWritten_;
   }
@@ -469,6 +396,10 @@ class AsyncSocket : virtual public AsyncTransport {
     return getAppBytesReceived();
   }
 
+  std::chrono::nanoseconds getConnectTime() const {
+    return connectEndTime_ - connectStartTime_;
+  }
+
   // Methods controlling socket options
 
   /**
@@ -488,6 +419,13 @@ class AsyncSocket : virtual public AsyncTransport {
    */
   int setNoDelay(bool noDelay);
 
+
+  /**
+   * Set the FD_CLOEXEC flag so that the socket will be closed if the program
+   * later forks and execs.
+   */
+  void setCloseOnExec();
+
   /*
    * Set the Flavor of Congestion Control to be used for this Socket
    * Please check '/lib/modules/<kernel>/kernel/net/ipv4' for tcp_*.ko
@@ -529,11 +467,14 @@ class AsyncSocket : virtual public AsyncTransport {
    * @param optname   same as the "optname" parameter in getsockopt().
    * @param optval    pointer to the variable in which the option value should
    *                  be returned.
+   * @param optlen    value-result argument, initially containing the size of
+   *                  the buffer pointed to by optval, and modified on return
+   *                  to indicate the actual size of the value returned.
    * @return          same as the return value of getsockopt().
    */
   template <typename T>
-  int  getSockOpt(int level, int optname, T *optval) {
-    return getsockopt(fd_, level, optname, optval, sizeof(T));
+  int getSockOpt(int level, int optname, T* optval, socklen_t* optlen) {
+    return getsockopt(fd_, level, optname, (void*) optval, optlen);
   }
 
   /**
@@ -549,11 +490,78 @@ class AsyncSocket : virtual public AsyncTransport {
     return setsockopt(fd_, level, optname, optval, sizeof(T));
   }
 
+  virtual void setPeek(bool peek) {
+    peek_ = peek;
+  }
+
+  enum class StateEnum : uint8_t {
+    UNINIT,
+    CONNECTING,
+    ESTABLISHED,
+    CLOSED,
+    ERROR
+  };
+
+  /**
+   * A WriteRequest object tracks information about a pending write operation.
+   */
+  class WriteRequest {
+   public:
+    WriteRequest(AsyncSocket* socket, WriteCallback* callback) :
+      socket_(socket), callback_(callback) {}
+
+    virtual void start() {};
+
+    virtual void destroy() = 0;
+
+    virtual bool performWrite() = 0;
+
+    virtual void consume() = 0;
+
+    virtual bool isComplete() = 0;
+
+    WriteRequest* getNext() const {
+      return next_;
+    }
+
+    WriteCallback* getCallback() const {
+      return callback_;
+    }
+
+    uint32_t getTotalBytesWritten() const {
+      return totalBytesWritten_;
+    }
+
+    void append(WriteRequest* next) {
+      assert(next_ == nullptr);
+      next_ = next;
+    }
+
+    void fail(const char* fn, const AsyncSocketException& ex) {
+      socket_->failWrite(fn, ex);
+    }
+
+    void bytesWritten(size_t count) {
+      totalBytesWritten_ += count;
+      socket_->appBytesWritten_ += count;
+    }
+
+   protected:
+    // protected destructor, to ensure callers use destroy()
+    virtual ~WriteRequest() {}
+
+    AsyncSocket* socket_;         ///< parent socket
+    WriteRequest* next_{nullptr};          ///< pointer to next WriteRequest
+    WriteCallback* callback_;     ///< completion callback
+    uint32_t totalBytesWritten_{0};  ///< total bytes written
+  };
+
  protected:
   enum ReadResultEnum {
     READ_EOF = 0,
     READ_ERROR = -1,
     READ_BLOCKING = -2,
+    READ_NO_ERROR = -3,
   };
 
   /**
@@ -565,14 +573,6 @@ class AsyncSocket : virtual public AsyncTransport {
    */
   ~AsyncSocket();
 
-  enum class StateEnum : uint8_t {
-    UNINIT,
-    CONNECTING,
-    ESTABLISHED,
-    CLOSED,
-    ERROR
-  };
-
   friend std::ostream& operator << (std::ostream& os, const StateEnum& state);
 
   enum ShutdownFlags {
@@ -596,7 +596,7 @@ class AsyncSocket : virtual public AsyncTransport {
     SHUT_READ = 0x04,
   };
 
-  class WriteRequest;
+  class BytesWriteRequest;
 
   class WriteTimeout : public AsyncTimeout {
    public:
@@ -631,10 +631,32 @@ class AsyncSocket : virtual public AsyncTransport {
 
   void init();
 
+  class ImmediateReadCB : public folly::EventBase::LoopCallback {
+   public:
+    explicit ImmediateReadCB(AsyncSocket* socket) : socket_(socket) {}
+    void runLoopCallback() noexcept override {
+      DestructorGuard dg(socket_);
+      socket_->checkForImmediateRead();
+    }
+   private:
+    AsyncSocket* socket_;
+  };
+
+  /**
+   * Schedule checkForImmediateRead to be executed in the next loop
+   * iteration.
+   */
+  void scheduleImmediateRead() noexcept {
+    if (good()) {
+      eventBase_->runInLoop(&immediateReadHandler_);
+    }
+  }
+
   // event notification methods
   void ioReady(uint16_t events) noexcept;
   virtual void checkForImmediateRead() noexcept;
   virtual void handleInitialReadWrite() noexcept;
+  virtual void prepareReadBuffer(void** buf, size_t* buflen) noexcept;
   virtual void handleRead() noexcept;
   virtual void handleWrite() noexcept;
   virtual void handleConnect() noexcept;
@@ -650,7 +672,7 @@ class AsyncSocket : virtual public AsyncTransport {
    * READ_ERROR on error, or READ_BLOCKING if the operation will
    * block.
    */
-  virtual ssize_t performRead(void* buf, size_t buflen);
+  virtual ssize_t performRead(void** buf, size_t* buflen, size_t* offset);
 
   /**
    * Populate an iovec array from an IOBuf and attempt to write it.
@@ -735,6 +757,8 @@ class AsyncSocket : virtual public AsyncTransport {
                  const AsyncSocketException& ex);
   void failWrite(const char* fn, const AsyncSocketException& ex);
   void failAllWrites(const AsyncSocketException& ex);
+  void invokeConnectErr(const AsyncSocketException& ex);
+  void invokeConnectSuccess();
   void invalidState(ConnectCallback* callback);
   void invalidState(ReadCallback* callback);
   void invalidState(WriteCallback* callback);
@@ -752,6 +776,7 @@ class AsyncSocket : virtual public AsyncTransport {
   EventBase* eventBase_;               ///< The EventBase
   WriteTimeout writeTimeout_;           ///< A timeout for connect and write
   IoHandler ioHandler_;                 ///< A EventHandler to monitor the fd
+  ImmediateReadCB immediateReadHandler_; ///< LoopCallback for checking read
 
   ConnectCallback* connectCallback_;    ///< ConnectCallback
   ReadCallback* readCallback_;          ///< ReadCallback
@@ -760,6 +785,14 @@ class AsyncSocket : virtual public AsyncTransport {
   ShutdownSocketSet* shutdownSocketSet_;
   size_t appBytesReceived_;             ///< Num of bytes received from socket
   size_t appBytesWritten_;              ///< Num of bytes written to socket
+  bool isBufferMovable_{false};
+
+  bool peek_{false}; // Peek bytes.
+
+  int8_t readErr_{READ_NO_ERROR};      ///< The read error encountered, if any.
+
+  std::chrono::steady_clock::time_point connectStartTime_;
+  std::chrono::steady_clock::time_point connectEndTime_;
 };