/*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#pragma once
#include <folly/Optional.h>
#include <sys/types.h>
#include <chrono>
-#include <memory>
#include <map>
+#include <memory>
namespace folly {
#define SO_NO_TRANSPARENT_TLS 200
#endif
+#if defined __linux__ && !defined SO_NO_TSOCKS
+#define SO_NO_TSOCKS 201
+#endif
+
#ifdef _MSC_VER
// We do a dynamic_cast on this, in
// AsyncTransportWrapper::getUnderlyingTransport so be safe and
virtual void errMessageError(const AsyncSocketException& ex) noexcept = 0;
};
+ class SendMsgParamsCallback {
+ public:
+ virtual ~SendMsgParamsCallback() = default;
+
+ /**
+ * getFlags() will be invoked to retrieve the desired flags to be passed
+ * to ::sendmsg() system call. This method was intentionally declared
+ * non-virtual, so there is no way to override it. Instead feel free to
+ * override getFlagsImpl(flags, defaultFlags) method instead, and enjoy
+ * the convenience of defaultFlags passed there.
+ *
+ * @param flags Write flags requested for the given write operation
+ */
+ int getFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept {
+ return getFlagsImpl(flags, getDefaultFlags(flags, zeroCopyEnabled));
+ }
+
+ /**
+ * getAncillaryData() will be invoked to initialize ancillary data
+ * buffer referred by "msg_control" field of msghdr structure passed to
+ * ::sendmsg() system call. The function assumes that the size of buffer
+ * is not smaller than the value returned by getAncillaryDataSize() method
+ * for the same combination of flags.
+ *
+ * @param flags Write flags requested for the given write operation
+ * @param data Pointer to ancillary data buffer to initialize.
+ */
+ virtual void getAncillaryData(
+ folly::WriteFlags /*flags*/,
+ void* /*data*/) noexcept {}
+
+ /**
+ * getAncillaryDataSize() will be invoked to retrieve the size of
+ * ancillary data buffer which should be passed to ::sendmsg() system call
+ *
+ * @param flags Write flags requested for the given write operation
+ */
+ virtual uint32_t getAncillaryDataSize(folly::WriteFlags /*flags*/)
+ noexcept {
+ return 0;
+ }
+
+ static const size_t maxAncillaryDataSize{0x5000};
+
+ private:
+ /**
+ * getFlagsImpl() will be invoked by getFlags(folly::WriteFlags flags)
+ * method to retrieve the flags to be passed to ::sendmsg() system call.
+ * SendMsgParamsCallback::getFlags() is calling this method, and returns
+ * its results directly to the caller in AsyncSocket.
+ * Classes inheriting from SendMsgParamsCallback are welcome to override
+ * this method to force SendMsgParamsCallback to return its own set
+ * of flags.
+ *
+ * @param flags Write flags requested for the given write operation
+ * @param defaultflags A set of message flags returned by getDefaultFlags()
+ * method for the given "flags" mask.
+ */
+ virtual int getFlagsImpl(folly::WriteFlags /*flags*/, int defaultFlags) {
+ return defaultFlags;
+ }
+
+ /**
+ * getDefaultFlags() will be invoked by getFlags(folly::WriteFlags flags)
+ * to retrieve the default set of flags, and pass them to getFlagsImpl(...)
+ *
+ * @param flags Write flags requested for the given write operation
+ */
+ int getDefaultFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept;
+ };
+
explicit AsyncSocket();
/**
* Create a new unconnected AsyncSocket.
*/
explicit AsyncSocket(EventBase* evb);
- void setShutdownSocketSet(ShutdownSocketSet* ss);
+ void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wSS);
/**
* Create a new AsyncSocket and begin the connection process.
*
* @param evb EventBase that will manage this socket.
* @param fd File descriptor to take over (should be a connected socket).
+ * @param zeroCopyBufId Zerocopy buf id to start with.
+ */
+ AsyncSocket(EventBase* evb, int fd, uint32_t zeroCopyBufId = 0);
+
+ /**
+ * Create an AsyncSocket from a different, already connected AsyncSocket.
+ *
+ * Similar to AsyncSocket(evb, fd) when fd was previously owned by an
+ * AsyncSocket.
*/
- AsyncSocket(EventBase* evb, int fd);
+ explicit AsyncSocket(AsyncSocket::UniquePtr);
/**
* Helper function to create a shared_ptr<AsyncSocket>.
* This prevents callers from deleting a AsyncSocket while it is invoking a
* callback.
*/
- virtual void destroy() override;
+ void destroy() override;
/**
* Get the EventBase used by this socket.
* error. The AsyncSocket may no longer be used after the file descriptor
* has been extracted.
*
+ * This method should be used with care as the resulting fd is not guaranteed
+ * to perfectly reflect the state of the AsyncSocket (security state,
+ * pre-received data, etc.).
+ *
* Returns the file descriptor. The caller assumes ownership of the
* descriptor, and it will not be closed when the AsyncSocket is destroyed.
*/
* )
*
*/
- void setErrMessageCB(ErrMessageCallback* callback);
+ virtual void setErrMessageCB(ErrMessageCallback* callback);
/**
* Get a pointer to ErrMessageCallback implementation currently
* registered with this socket.
*
*/
- ErrMessageCallback* getErrMessageCallback() const;
+ virtual ErrMessageCallback* getErrMessageCallback() const;
+
+ /**
+ * Set a pointer to SendMsgParamsCallback implementation which
+ * will be used to form ::sendmsg() system call parameters
+ *
+ */
+ virtual void setSendMsgParamCB(SendMsgParamsCallback* callback);
+
+ /**
+ * Get a pointer to SendMsgParamsCallback implementation currently
+ * registered with this socket.
+ *
+ */
+ virtual SendMsgParamsCallback* getSendMsgParamsCB() const;
// Read and write methods
void setReadCB(ReadCallback* callback) override;
ReadCallback* getReadCallback() const override;
+ bool setZeroCopy(bool enable);
+ bool getZeroCopy() const {
+ return zeroCopyEnabled_;
+ }
+
+ uint32_t getZeroCopyBufId() const {
+ return zeroCopyBufId_;
+ }
+
void write(WriteCallback* callback, const void* buf, size_t bytes,
WriteFlags flags = WriteFlags::NONE) override;
void writev(WriteCallback* callback, const iovec* vec, size_t count,
void shutdownWriteNow() override;
bool readable() const override;
+ bool writable() const override;
bool isPending() const override;
virtual bool hangup() const;
bool good() const override;
return setsockopt(fd_, level, optname, optval, sizeof(T));
}
- virtual void setPeek(bool peek) {
- peek_ = peek;
+ /**
+ * Virtual method for reading a socket option returning integer
+ * value, which is the most typical case. Convenient for overriding
+ * and mocking.
+ *
+ * @param level same as the "level" parameter in getsockopt().
+ * @param optname same as the "optname" parameter in getsockopt().
+ * @param optval same as "optval" parameter in getsockopt().
+ * @param optlen same as "optlen" parameter in getsockopt().
+ * @return same as the return value of getsockopt().
+ */
+ virtual int
+ getSockOptVirtual(int level, int optname, void* optval, socklen_t* optlen) {
+ return getsockopt(fd_, level, optname, optval, optlen);
+ }
+
+ /**
+ * Virtual method for setting a socket option accepting integer
+ * value, which is the most typical case. Convenient for overriding
+ * and mocking.
+ *
+ * @param level same as the "level" parameter in setsockopt().
+ * @param optname same as the "optname" parameter in setsockopt().
+ * @param optval same as "optval" parameter in setsockopt().
+ * @param optlen same as "optlen" parameter in setsockopt().
+ * @return same as the return value of setsockopt().
+ */
+ virtual int setSockOptVirtual(
+ int level,
+ int optname,
+ void const* optval,
+ socklen_t optlen) {
+ return setsockopt(fd_, level, optname, optval, optlen);
+ }
+
+ /**
+ * Set pre-received data, to be returned to read callback before any data
+ * from the socket.
+ */
+ virtual void setPreReceivedData(std::unique_ptr<IOBuf> data) {
+ if (preReceivedData_) {
+ preReceivedData_->prependChain(std::move(data));
+ } else {
+ preReceivedData_ = std::move(data);
+ }
}
/**
noTransparentTls_ = true;
}
+ void disableTSocks() {
+ noTSocks_ = true;
+ }
+
enum class StateEnum : uint8_t {
UNINIT,
CONNECTING,
evbChangeCb_ = std::move(cb);
}
+ /**
+ * Attempt to cache the current local and peer addresses (if not already
+ * cached) so that they are available from getPeerAddress() and
+ * getLocalAddress() even after the socket is closed.
+ */
+ void cacheAddresses();
+
+ /**
+ * Returns true if there is any zero copy write in progress
+ * Needs to be called from within the socket's EVB thread
+ */
+ bool isZeroCopyWriteInProgress() const noexcept;
+
+ /**
+ * Tries to process the msg error queue
+ * And returns true if there are no more zero copy writes in progress
+ */
+ bool processZeroCopyWriteInProgress() noexcept;
+
/**
* writeReturn is the total number of bytes written, or WRITE_ERROR on error.
* If no data has been written, 0 is returned.
* destroy() instead. (See the documentation in DelayedDestruction.h for
* more details.)
*/
- ~AsyncSocket();
+ ~AsyncSocket() override;
friend std::ostream& operator << (std::ostream& os, const StateEnum& state);
: AsyncTimeout(eventBase)
, socket_(socket) {}
- virtual void timeoutExpired() noexcept {
+ void timeoutExpired() noexcept override {
socket_->timeoutExpired();
}
: EventHandler(eventBase, fd)
, socket_(socket) {}
- virtual void handlerReady(uint16_t events) noexcept {
+ void handlerReady(uint16_t events) noexcept override {
socket_->ioReady(events);
}
virtual void checkForImmediateRead() noexcept;
virtual void handleInitialReadWrite() noexcept;
virtual void prepareReadBuffer(void** buf, size_t* buflen);
- virtual void handleErrMessages() noexcept;
+ virtual size_t handleErrMessages() noexcept;
virtual void handleRead() noexcept;
virtual void handleWrite() noexcept;
virtual void handleConnect() noexcept;
std::string withAddr(const std::string& s);
+ void cacheLocalAddress() const;
+ void cachePeerAddress() const;
+
+ bool isZeroCopyRequest(WriteFlags flags);
+
+ bool isZeroCopyMsg(const cmsghdr& cmsg) const;
+ void processZeroCopyMsg(const cmsghdr& cmsg);
+
+ uint32_t getNextZeroCopyBufId() {
+ return zeroCopyBufId_++;
+ }
+ void adjustZeroCopyFlags(folly::WriteFlags& flags);
+ void addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
+ void addZeroCopyBuf(folly::IOBuf* ptr);
+ void setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
+ bool containsZeroCopyBuf(folly::IOBuf* ptr);
+ void releaseZeroCopyBuf(uint32_t id);
+
+ // a folly::IOBuf can be used in multiple partial requests
+ // there is a that maps a buffer id to a raw folly::IOBuf ptr
+ // and another one that adds a ref count for a folly::IOBuf that is either
+ // the original ptr or nullptr
+ uint32_t zeroCopyBufId_{0};
+
+ struct IOBufInfo {
+ uint32_t count_{0};
+ std::unique_ptr<folly::IOBuf> buf_;
+ };
+
+ std::unordered_map<uint32_t, folly::IOBuf*> idZeroCopyBufPtrMap_;
+ std::unordered_map<folly::IOBuf*, IOBufInfo> idZeroCopyBufInfoMap_;
+
StateEnum state_; ///< StateEnum describing current state
uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags)
uint16_t eventFlags_; ///< EventBase::HandlerFlags settings
///< The address we are connecting from
uint32_t sendTimeout_; ///< The send timeout, in milliseconds
uint16_t maxReadsPerEvent_; ///< Max reads per event loop iteration
+
+ bool isBufferMovable_{false};
+
+ int8_t readErr_{READ_NO_ERROR}; ///< The read error encountered, if any
+
EventBase* eventBase_; ///< The EventBase
WriteTimeout writeTimeout_; ///< A timeout for connect and write
IoHandler ioHandler_; ///< A EventHandler to monitor the fd
ConnectCallback* connectCallback_; ///< ConnectCallback
ErrMessageCallback* errMessageCallback_; ///< TimestampCallback
+ SendMsgParamsCallback* ///< Callback for retrieving
+ sendMsgParamCallback_; ///< ::sendmsg() parameters
ReadCallback* readCallback_; ///< ReadCallback
WriteRequest* writeReqHead_; ///< Chain of WriteRequests
WriteRequest* writeReqTail_; ///< End of WriteRequest chain
- ShutdownSocketSet* shutdownSocketSet_;
+ std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_;
size_t appBytesReceived_; ///< Num of bytes received from socket
size_t appBytesWritten_; ///< Num of bytes written to socket
- bool isBufferMovable_{false};
-
- bool peek_{false}; // Peek bytes.
- int8_t readErr_{READ_NO_ERROR}; ///< The read error encountered, if any.
+ // Pre-received data, to be returned to read callback before any data from the
+ // socket.
+ std::unique_ptr<IOBuf> preReceivedData_;
std::chrono::steady_clock::time_point connectStartTime_;
std::chrono::steady_clock::time_point connectEndTime_;
std::chrono::milliseconds connectTimeout_{0};
+ std::unique_ptr<EvbChangeCallback> evbChangeCb_{nullptr};
+
BufferCallback* bufferCallback_{nullptr};
bool tfoEnabled_{false};
bool tfoAttempted_{false};
bool tfoFinished_{false};
bool noTransparentTls_{false};
+ bool noTSocks_{false};
// Whether to track EOR or not.
bool trackEor_{false};
-
- std::unique_ptr<EvbChangeCallback> evbChangeCb_{nullptr};
+ bool zeroCopyEnabled_{false};
+ bool zeroCopyVal_{false};
};
#ifdef _MSC_VER
#pragma vtordisp(pop)
#endif
-} // folly
+} // namespace folly