#include <folly/io/async/AsyncSocket.h>
#include <folly/io/async/EventBase.h>
+#include <folly/io/async/EventHandler.h>
+#include <folly/Singleton.h>
#include <folly/SocketAddress.h>
#include <folly/io/IOBuf.h>
#include <errno.h>
#include <limits.h>
#include <unistd.h>
+#include <thread>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
const AsyncSocketException socketShutdownForWritesEx(
AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
-// TODO: It might help performance to provide a version of WriteRequest that
+// TODO: It might help performance to provide a version of BytesWriteRequest that
// users could derive from, so we can avoid the extra allocation for each call
// to write()/writev(). We could templatize TFramedAsyncChannel just like the
// protocols are currently templatized for transports.
// storage space, and only our internal version would allocate it at the end of
// the WriteRequest.
-/**
- * A WriteRequest object tracks information about a pending write operation.
- */
-class AsyncSocket::WriteRequest {
- public:
- WriteRequest(AsyncSocket* socket,
- WriteRequest* next,
- WriteCallback* callback,
- uint32_t totalBytesWritten) :
- socket_(socket), next_(next), callback_(callback),
- totalBytesWritten_(totalBytesWritten) {}
-
- virtual void destroy() = 0;
-
- virtual bool performWrite() = 0;
-
- virtual void consume() = 0;
-
- virtual bool isComplete() = 0;
-
- WriteRequest* getNext() const {
- return next_;
- }
-
- WriteCallback* getCallback() const {
- return callback_;
- }
-
- uint32_t getTotalBytesWritten() const {
- return totalBytesWritten_;
- }
-
- void append(WriteRequest* next) {
- assert(next_ == nullptr);
- next_ = next;
- }
-
- protected:
- // protected destructor, to ensure callers use destroy()
- virtual ~WriteRequest() {}
-
- AsyncSocket* socket_; ///< parent socket
- WriteRequest* next_; ///< pointer to next WriteRequest
- WriteCallback* callback_; ///< completion callback
- uint32_t totalBytesWritten_; ///< total bytes written
-};
-
/* The default WriteRequest implementation, used for write(), writev() and
* writeChain()
*
uint32_t bytesWritten,
unique_ptr<IOBuf>&& ioBuf,
WriteFlags flags)
- : AsyncSocket::WriteRequest(socket, nullptr, callback, 0)
+ : AsyncSocket::WriteRequest(socket, callback)
, opCount_(opCount)
, opIndex_(0)
, flags_(flags)
}
}
+void AsyncSocket::writeRequest(WriteRequest* req) {
+ if (writeReqTail_ == nullptr) {
+ assert(writeReqHead_ == nullptr);
+ writeReqHead_ = writeReqTail_ = req;
+ req->start();
+ } else {
+ writeReqTail_->append(req);
+ writeReqTail_ = req;
+ }
+}
+
void AsyncSocket::close() {
VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
<< ", state=" << state_ << ", shutdownFlags="