/*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <folly/io/async/EventHandler.h>
#include <folly/io/async/DelayedDestruction.h>
+#include <chrono>
#include <memory>
#include <map>
* responding and no further progress can be made sending the data.
*/
-class AsyncSocket : virtual public AsyncTransport {
+class AsyncSocket : virtual public AsyncTransportWrapper {
public:
typedef std::unique_ptr<AsyncSocket, Destructor> UniquePtr;
class ConnectCallback {
public:
- virtual ~ConnectCallback() {}
+ virtual ~ConnectCallback() = default;
/**
* connectSuccess() will be invoked when the connection has been
noexcept = 0;
};
- class ReadCallback {
- public:
- virtual ~ReadCallback() {}
-
- /**
- * When data becomes available, getReadBuffer() will be invoked to get the
- * buffer into which data should be read.
- *
- * This method allows the ReadCallback to delay buffer allocation until
- * data becomes available. This allows applications to manage large
- * numbers of idle connections, without having to maintain a separate read
- * buffer for each idle connection.
- *
- * It is possible that in some cases, getReadBuffer() may be called
- * multiple times before readDataAvailable() is invoked. In this case, the
- * data will be written to the buffer returned from the most recent call to
- * readDataAvailable(). If the previous calls to readDataAvailable()
- * returned different buffers, the ReadCallback is responsible for ensuring
- * that they are not leaked.
- *
- * If getReadBuffer() throws an exception, returns a nullptr buffer, or
- * returns a 0 length, the ReadCallback will be uninstalled and its
- * readError() method will be invoked.
- *
- * getReadBuffer() is not allowed to change the transport state before it
- * returns. (For example, it should never uninstall the read callback, or
- * set a different read callback.)
- *
- * @param bufReturn getReadBuffer() should update *bufReturn to contain the
- * address of the read buffer. This parameter will never
- * be nullptr.
- * @param lenReturn getReadBuffer() should update *lenReturn to contain the
- * maximum number of bytes that may be written to the read
- * buffer. This parameter will never be nullptr.
- */
- virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0;
-
- /**
- * readDataAvailable() will be invoked when data has been successfully read
- * into the buffer returned by the last call to getReadBuffer().
- *
- * The read callback remains installed after readDataAvailable() returns.
- * It must be explicitly uninstalled to stop receiving read events.
- * getReadBuffer() will be called at least once before each call to
- * readDataAvailable(). getReadBuffer() will also be called before any
- * call to readEOF().
- *
- * @param len The number of bytes placed in the buffer.
- */
- virtual void readDataAvailable(size_t len) noexcept = 0;
-
- /**
- * readEOF() will be invoked when the transport is closed.
- *
- * The read callback will be automatically uninstalled immediately before
- * readEOF() is invoked.
- */
- virtual void readEOF() noexcept = 0;
-
- /**
- * readError() will be invoked if an error occurs reading from the
- * transport.
- *
- * The read callback will be automatically uninstalled immediately before
- * readError() is invoked.
- *
- * @param ex An exception describing the error that occurred.
- */
- virtual void readErr(const AsyncSocketException& ex)
- noexcept = 0;
- };
-
- class WriteCallback {
- public:
- virtual ~WriteCallback() {}
-
- /**
- * writeSuccess() will be invoked when all of the data has been
- * successfully written.
- *
- * Note that this mainly signals that the buffer containing the data to
- * write is no longer needed and may be freed or re-used. It does not
- * guarantee that the data has been fully transmitted to the remote
- * endpoint. For example, on socket-based transports, writeSuccess() only
- * indicates that the data has been given to the kernel for eventual
- * transmission.
- */
- virtual void writeSuccess() noexcept = 0;
-
- /**
- * writeError() will be invoked if an error occurs writing the data.
- *
- * @param bytesWritten The number of bytes that were successfull
- * @param ex An exception describing the error that occurred.
- */
- virtual void writeErr(size_t bytesWritten,
- const AsyncSocketException& ex)
- noexcept = 0;
- };
-
+ explicit AsyncSocket();
/**
* Create a new unconnected AsyncSocket.
*
* This prevents callers from deleting a AsyncSocket while it is invoking a
* callback.
*/
- virtual void destroy();
+ virtual void destroy() override;
/**
* Get the EventBase used by this socket.
typedef std::map<OptionKey, int> OptionMap;
static const OptionMap emptyOptionMap;
- static const folly::SocketAddress anyAddress;
+ static const folly::SocketAddress& anyAddress();
/**
* Initiate a connection.
const folly::SocketAddress& address,
int timeout = 0,
const OptionMap &options = emptyOptionMap,
- const folly::SocketAddress& bindAddr = anyAddress
+ const folly::SocketAddress& bindAddr = anyAddress()
) noexcept;
void connect(ConnectCallback* callback, const std::string& ip, uint16_t port,
int timeout = 00,
const OptionMap &options = emptyOptionMap) noexcept;
+ /**
+ * If a connect request is in-flight, cancels it and closes the socket
+ * immediately. Otherwise, this is a no-op.
+ *
+ * This does not invoke any connection related callbacks. Call this to
+ * prevent any connect callback while cleaning up, etc.
+ */
+ void cancelConnect();
+
/**
* Set the send timeout.
*
}
// Read and write methods
- void setReadCB(ReadCallback* callback);
- ReadCallback* getReadCallback() const;
+ void setReadCB(ReadCallback* callback) override;
+ ReadCallback* getReadCallback() const override;
void write(WriteCallback* callback, const void* buf, size_t bytes,
- WriteFlags flags = WriteFlags::NONE);
+ WriteFlags flags = WriteFlags::NONE) override;
void writev(WriteCallback* callback, const iovec* vec, size_t count,
- WriteFlags flags = WriteFlags::NONE);
+ WriteFlags flags = WriteFlags::NONE) override;
void writeChain(WriteCallback* callback,
std::unique_ptr<folly::IOBuf>&& buf,
- WriteFlags flags = WriteFlags::NONE);
+ WriteFlags flags = WriteFlags::NONE) override;
+
+ class WriteRequest;
+ virtual void writeRequest(WriteRequest* req);
+ void writeRequestReady() {
+ handleWrite();
+ }
// Methods inherited from AsyncTransport
void close() override;
bool isEorTrackingEnabled() const override { return false; }
- void setEorTracking(bool track) override {}
+ void setEorTracking(bool /*track*/) override {}
bool connecting() const override {
return (state_ == StateEnum::CONNECTING);
}
+ virtual bool isClosedByPeer() const {
+ return (state_ == StateEnum::CLOSED &&
+ (readErr_ == READ_EOF || readErr_ == READ_ERROR));
+ }
+
+ virtual bool isClosedBySelf() const {
+ return (state_ == StateEnum::CLOSED &&
+ (readErr_ != READ_EOF && readErr_ != READ_ERROR));
+ }
+
size_t getAppBytesWritten() const override {
return appBytesWritten_;
}
return getAppBytesReceived();
}
+ std::chrono::nanoseconds getConnectTime() const {
+ return connectEndTime_ - connectStartTime_;
+ }
+
// Methods controlling socket options
/**
*/
int setNoDelay(bool noDelay);
+
+ /**
+ * Set the FD_CLOEXEC flag so that the socket will be closed if the program
+ * later forks and execs.
+ */
+ void setCloseOnExec();
+
/*
* Set the Flavor of Congestion Control to be used for this Socket
* Please check '/lib/modules/<kernel>/kernel/net/ipv4' for tcp_*.ko
* @param optname same as the "optname" parameter in getsockopt().
* @param optval pointer to the variable in which the option value should
* be returned.
+ * @param optlen value-result argument, initially containing the size of
+ * the buffer pointed to by optval, and modified on return
+ * to indicate the actual size of the value returned.
* @return same as the return value of getsockopt().
*/
template <typename T>
- int getSockOpt(int level, int optname, T *optval) {
- return getsockopt(fd_, level, optname, optval, sizeof(T));
+ int getSockOpt(int level, int optname, T* optval, socklen_t* optlen) {
+ return getsockopt(fd_, level, optname, (void*) optval, optlen);
}
/**
return setsockopt(fd_, level, optname, optval, sizeof(T));
}
+ virtual void setPeek(bool peek) {
+ peek_ = peek;
+ }
+
+ enum class StateEnum : uint8_t {
+ UNINIT,
+ CONNECTING,
+ ESTABLISHED,
+ CLOSED,
+ ERROR
+ };
+
+ /**
+ * A WriteRequest object tracks information about a pending write operation.
+ */
+ class WriteRequest {
+ public:
+ WriteRequest(AsyncSocket* socket, WriteCallback* callback) :
+ socket_(socket), callback_(callback) {}
+
+ virtual void start() {};
+
+ virtual void destroy() = 0;
+
+ virtual bool performWrite() = 0;
+
+ virtual void consume() = 0;
+
+ virtual bool isComplete() = 0;
+
+ WriteRequest* getNext() const {
+ return next_;
+ }
+
+ WriteCallback* getCallback() const {
+ return callback_;
+ }
+
+ uint32_t getTotalBytesWritten() const {
+ return totalBytesWritten_;
+ }
+
+ void append(WriteRequest* next) {
+ assert(next_ == nullptr);
+ next_ = next;
+ }
+
+ void fail(const char* fn, const AsyncSocketException& ex) {
+ socket_->failWrite(fn, ex);
+ }
+
+ void bytesWritten(size_t count) {
+ totalBytesWritten_ += count;
+ socket_->appBytesWritten_ += count;
+ }
+
+ protected:
+ // protected destructor, to ensure callers use destroy()
+ virtual ~WriteRequest() {}
+
+ AsyncSocket* socket_; ///< parent socket
+ WriteRequest* next_{nullptr}; ///< pointer to next WriteRequest
+ WriteCallback* callback_; ///< completion callback
+ uint32_t totalBytesWritten_{0}; ///< total bytes written
+ };
+
protected:
enum ReadResultEnum {
READ_EOF = 0,
READ_ERROR = -1,
READ_BLOCKING = -2,
+ READ_NO_ERROR = -3,
};
/**
*/
~AsyncSocket();
- enum class StateEnum : uint8_t {
- UNINIT,
- CONNECTING,
- ESTABLISHED,
- CLOSED,
- ERROR
- };
-
friend std::ostream& operator << (std::ostream& os, const StateEnum& state);
enum ShutdownFlags {
SHUT_READ = 0x04,
};
- class WriteRequest;
+ class BytesWriteRequest;
class WriteTimeout : public AsyncTimeout {
public:
void init();
+ class ImmediateReadCB : public folly::EventBase::LoopCallback {
+ public:
+ explicit ImmediateReadCB(AsyncSocket* socket) : socket_(socket) {}
+ void runLoopCallback() noexcept override {
+ DestructorGuard dg(socket_);
+ socket_->checkForImmediateRead();
+ }
+ private:
+ AsyncSocket* socket_;
+ };
+
+ /**
+ * Schedule checkForImmediateRead to be executed in the next loop
+ * iteration.
+ */
+ void scheduleImmediateRead() noexcept {
+ if (good()) {
+ eventBase_->runInLoop(&immediateReadHandler_);
+ }
+ }
+
// event notification methods
void ioReady(uint16_t events) noexcept;
virtual void checkForImmediateRead() noexcept;
virtual void handleInitialReadWrite() noexcept;
+ virtual void prepareReadBuffer(void** buf, size_t* buflen) noexcept;
virtual void handleRead() noexcept;
virtual void handleWrite() noexcept;
virtual void handleConnect() noexcept;
* READ_ERROR on error, or READ_BLOCKING if the operation will
* block.
*/
- virtual ssize_t performRead(void* buf, size_t buflen);
+ virtual ssize_t performRead(void** buf, size_t* buflen, size_t* offset);
/**
* Populate an iovec array from an IOBuf and attempt to write it.
const AsyncSocketException& ex);
void failWrite(const char* fn, const AsyncSocketException& ex);
void failAllWrites(const AsyncSocketException& ex);
+ void invokeConnectErr(const AsyncSocketException& ex);
+ void invokeConnectSuccess();
void invalidState(ConnectCallback* callback);
void invalidState(ReadCallback* callback);
void invalidState(WriteCallback* callback);
EventBase* eventBase_; ///< The EventBase
WriteTimeout writeTimeout_; ///< A timeout for connect and write
IoHandler ioHandler_; ///< A EventHandler to monitor the fd
+ ImmediateReadCB immediateReadHandler_; ///< LoopCallback for checking read
ConnectCallback* connectCallback_; ///< ConnectCallback
ReadCallback* readCallback_; ///< ReadCallback
ShutdownSocketSet* shutdownSocketSet_;
size_t appBytesReceived_; ///< Num of bytes received from socket
size_t appBytesWritten_; ///< Num of bytes written to socket
+ bool isBufferMovable_{false};
+
+ bool peek_{false}; // Peek bytes.
+
+ int8_t readErr_{READ_NO_ERROR}; ///< The read error encountered, if any.
+
+ std::chrono::steady_clock::time_point connectStartTime_;
+ std::chrono::steady_clock::time_point connectEndTime_;
};