From: Yang Chi Date: Mon, 14 Dec 2015 23:33:46 +0000 (-0800) Subject: Remove per-write buffer callback from AsyncSocket X-Git-Tag: deprecate-dynamic-initializer~187 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=76fcf3897e2600b0dbd719d84f0143148caf107b;p=folly.git Remove per-write buffer callback from AsyncSocket Summary: Remove per-write buffer callback from AsyncSocket Reviewed By: afrind Differential Revision: D2733142 fb-gh-sync-id: 91214a8c833bbd479bf049c2bb72d660e0c30f50 --- diff --git a/folly/io/async/AsyncPipe.cpp b/folly/io/async/AsyncPipe.cpp index b4263346..206f4557 100644 --- a/folly/io/async/AsyncPipe.cpp +++ b/folly/io/async/AsyncPipe.cpp @@ -148,8 +148,7 @@ void AsyncPipeWriter::write(unique_ptr buf, void AsyncPipeWriter::writeChain(folly::AsyncWriter::WriteCallback* callback, std::unique_ptr&& buf, - WriteFlags, - BufferCallback*) { + WriteFlags) { write(std::move(buf), callback); } diff --git a/folly/io/async/AsyncPipe.h b/folly/io/async/AsyncPipe.h index efa659b2..6f9f344b 100644 --- a/folly/io/async/AsyncPipe.h +++ b/folly/io/async/AsyncPipe.h @@ -147,20 +147,21 @@ class AsyncPipeWriter : public EventHandler, } // AsyncWriter methods - void write(folly::AsyncWriter::WriteCallback* callback, const void* buf, - size_t bytes, WriteFlags flags = WriteFlags::NONE, - BufferCallback* bufCallback = nullptr) override { - writeChain(callback, IOBuf::wrapBuffer(buf, bytes), flags, bufCallback); + void write(folly::AsyncWriter::WriteCallback* callback, + const void* buf, + size_t bytes, + WriteFlags flags = WriteFlags::NONE) override { + writeChain(callback, IOBuf::wrapBuffer(buf, bytes), flags); } - void writev(folly::AsyncWriter::WriteCallback*, const iovec*, - size_t, WriteFlags = WriteFlags::NONE, - BufferCallback* = nullptr) override { + void writev(folly::AsyncWriter::WriteCallback*, + const iovec*, + size_t, + WriteFlags = WriteFlags::NONE) override { throw std::runtime_error("writev is not supported. Please use writeChain."); } void writeChain(folly::AsyncWriter::WriteCallback* callback, std::unique_ptr&& buf, - WriteFlags flags = WriteFlags::NONE, - BufferCallback* bufCallback = nullptr) override; + WriteFlags flags = WriteFlags::NONE) override; private: void handlerReady(uint16_t events) noexcept override; diff --git a/folly/io/async/AsyncSocket.cpp b/folly/io/async/AsyncSocket.cpp index 373066e0..782a4ec4 100644 --- a/folly/io/async/AsyncSocket.cpp +++ b/folly/io/async/AsyncSocket.cpp @@ -63,16 +63,14 @@ const AsyncSocketException socketShutdownForWritesEx( */ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest { public: - static BytesWriteRequest* newRequest( - AsyncSocket* socket, - WriteCallback* callback, - const iovec* ops, - uint32_t opCount, - uint32_t partialWritten, - uint32_t bytesWritten, - unique_ptr&& ioBuf, - WriteFlags flags, - BufferCallback* bufferCallback = nullptr) { + static BytesWriteRequest* newRequest(AsyncSocket* socket, + WriteCallback* callback, + const iovec* ops, + uint32_t opCount, + uint32_t partialWritten, + uint32_t bytesWritten, + unique_ptr&& ioBuf, + WriteFlags flags) { assert(opCount > 0); // Since we put a variable size iovec array at the end // of each BytesWriteRequest, we have to manually allocate the memory. @@ -84,7 +82,7 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest { return new(buf) BytesWriteRequest(socket, callback, ops, opCount, partialWritten, bytesWritten, - std::move(ioBuf), flags, bufferCallback); + std::move(ioBuf), flags); } void destroy() override { @@ -138,9 +136,8 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest { uint32_t partialBytes, uint32_t bytesWritten, unique_ptr&& ioBuf, - WriteFlags flags, - BufferCallback* bufferCallback = nullptr) - : AsyncSocket::WriteRequest(socket, callback, bufferCallback) + WriteFlags flags) + : AsyncSocket::WriteRequest(socket, callback) , opCount_(opCount) , opIndex_(0) , flags_(flags) @@ -611,46 +608,43 @@ AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const { } void AsyncSocket::write(WriteCallback* callback, - const void* buf, size_t bytes, WriteFlags flags, - BufferCallback* bufCallback) { + const void* buf, size_t bytes, WriteFlags flags) { iovec op; op.iov_base = const_cast(buf); op.iov_len = bytes; - writeImpl(callback, &op, 1, unique_ptr(), flags, bufCallback); + writeImpl(callback, &op, 1, unique_ptr(), flags); } void AsyncSocket::writev(WriteCallback* callback, const iovec* vec, size_t count, - WriteFlags flags, - BufferCallback* bufCallback) { - writeImpl(callback, vec, count, unique_ptr(), flags, bufCallback); + WriteFlags flags) { + writeImpl(callback, vec, count, unique_ptr(), flags); } void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr&& buf, - WriteFlags flags, BufferCallback* bufCallback) { + WriteFlags flags) { constexpr size_t kSmallSizeMax = 64; size_t count = buf->countChainElements(); if (count <= kSmallSizeMax) { iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)]; - writeChainImpl(callback, vec, count, std::move(buf), flags, bufCallback); + writeChainImpl(callback, vec, count, std::move(buf), flags); } else { iovec* vec = new iovec[count]; - writeChainImpl(callback, vec, count, std::move(buf), flags, bufCallback); + writeChainImpl(callback, vec, count, std::move(buf), flags); delete[] vec; } } void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec, - size_t count, unique_ptr&& buf, WriteFlags flags, - BufferCallback* bufCallback) { + size_t count, unique_ptr&& buf, WriteFlags flags) { size_t veclen = buf->fillIov(vec, count); - writeImpl(callback, vec, veclen, std::move(buf), flags, bufCallback); + writeImpl(callback, vec, veclen, std::move(buf), flags); } void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, size_t count, unique_ptr&& buf, - WriteFlags flags, BufferCallback* bufCallback) { + WriteFlags flags) { VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_ << ", callback=" << callback << ", count=" << count << ", state=" << state_; @@ -694,11 +688,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, callback->writeSuccess(); } return; - } else { // continue writing the next writeReq - if (bufCallback) { - bufCallback->onEgressBuffered(); - } - } + } // else { continue writing the next writeReq } mustRegister = true; } } else if (!connecting()) { @@ -711,8 +701,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, try { req = BytesWriteRequest::newRequest(this, callback, vec + countWritten, count - countWritten, partialWritten, - bytesWritten, std::move(ioBuf), flags, - bufCallback); + bytesWritten, std::move(ioBuf), flags); } catch (const std::exception& ex) { // we mainly expect to catch std::bad_alloc here AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR, @@ -1515,11 +1504,6 @@ void AsyncSocket::handleWrite() noexcept { } // We'll continue around the loop, trying to write another request } else { - // Notify BufferCallback: - BufferCallback* bufferCallback = writeReqHead_->getBufferCallback(); - if (bufferCallback) { - bufferCallback->onEgressBuffered(); - } // Partial write. writeReqHead_->consume(); // Stop after a partial write; it's highly likely that a subsequent write diff --git a/folly/io/async/AsyncSocket.h b/folly/io/async/AsyncSocket.h index d158878d..e813216f 100644 --- a/folly/io/async/AsyncSocket.h +++ b/folly/io/async/AsyncSocket.h @@ -328,15 +328,12 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ReadCallback* getReadCallback() const override; void write(WriteCallback* callback, const void* buf, size_t bytes, - WriteFlags flags = WriteFlags::NONE, - BufferCallback* bufCallback = nullptr) override; + WriteFlags flags = WriteFlags::NONE) override; void writev(WriteCallback* callback, const iovec* vec, size_t count, - WriteFlags flags = WriteFlags::NONE, - BufferCallback* bufCallback = nullptr) override; + WriteFlags flags = WriteFlags::NONE) override; void writeChain(WriteCallback* callback, std::unique_ptr&& buf, - WriteFlags flags = WriteFlags::NONE, - BufferCallback* bufCallback = nullptr) override; + WriteFlags flags = WriteFlags::NONE) override; class WriteRequest; virtual void writeRequest(WriteRequest* req); @@ -518,11 +515,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper { */ class WriteRequest { public: - WriteRequest( - AsyncSocket* socket, - WriteCallback* callback, - BufferCallback* bufferCallback = nullptr) : - socket_(socket), callback_(callback), bufferCallback_(bufferCallback) {} + WriteRequest(AsyncSocket* socket, WriteCallback* callback) : + socket_(socket), callback_(callback) {} virtual void start() {}; @@ -560,10 +554,6 @@ class AsyncSocket : virtual public AsyncTransportWrapper { socket_->appBytesWritten_ += count; } - BufferCallback* getBufferCallback() const { - return bufferCallback_; - } - protected: // protected destructor, to ensure callers use destroy() virtual ~WriteRequest() {} @@ -572,7 +562,6 @@ class AsyncSocket : virtual public AsyncTransportWrapper { WriteRequest* next_{nullptr}; ///< pointer to next WriteRequest WriteCallback* callback_; ///< completion callback uint32_t totalBytesWritten_{0}; ///< total bytes written - BufferCallback* bufferCallback_{nullptr}; }; protected: @@ -696,39 +685,36 @@ class AsyncSocket : virtual public AsyncTransportWrapper { /** * Populate an iovec array from an IOBuf and attempt to write it. * - * @param callback Write completion/error callback. - * @param vec Target iovec array; caller retains ownership. - * @param count Number of IOBufs to write, beginning at start of buf. - * @param buf Chain of iovecs. - * @param flags set of flags for the underlying write calls, like cork - * @param bufCallback Callback when egress data begins to buffer + * @param callback Write completion/error callback. + * @param vec Target iovec array; caller retains ownership. + * @param count Number of IOBufs to write, beginning at start of buf. + * @param buf Chain of iovecs. + * @param flags set of flags for the underlying write calls, like cork */ void writeChainImpl(WriteCallback* callback, iovec* vec, size_t count, std::unique_ptr&& buf, - WriteFlags flags, BufferCallback* bufCallback = nullptr); + WriteFlags flags); /** * Write as much data as possible to the socket without blocking, * and queue up any leftover data to send when the socket can * handle writes again. * - * @param callback The callback to invoke when the write is completed. - * @param vec Array of buffers to write; this method will make a - * copy of the vector (but not the buffers themselves) - * if the write has to be completed asynchronously. - * @param count Number of elements in vec. - * @param buf The IOBuf that manages the buffers referenced by - * vec, or a pointer to nullptr if the buffers are not - * associated with an IOBuf. Note that ownership of - * the IOBuf is transferred here; upon completion of - * the write, the AsyncSocket deletes the IOBuf. - * @param flags Set of write flags. - * @param bufCallback Callback when egress data buffers up + * @param callback The callback to invoke when the write is completed. + * @param vec Array of buffers to write; this method will make a + * copy of the vector (but not the buffers themselves) + * if the write has to be completed asynchronously. + * @param count Number of elements in vec. + * @param buf The IOBuf that manages the buffers referenced by + * vec, or a pointer to nullptr if the buffers are not + * associated with an IOBuf. Note that ownership of + * the IOBuf is transferred here; upon completion of + * the write, the AsyncSocket deletes the IOBuf. + * @param flags Set of write flags. */ void writeImpl(WriteCallback* callback, const iovec* vec, size_t count, std::unique_ptr&& buf, - WriteFlags flags = WriteFlags::NONE, - BufferCallback* bufCallback = nullptr); + WriteFlags flags = WriteFlags::NONE); /** * Attempt to write to the socket. diff --git a/folly/io/async/AsyncTransport.h b/folly/io/async/AsyncTransport.h index 8fdbefce..40dca58b 100644 --- a/folly/io/async/AsyncTransport.h +++ b/folly/io/async/AsyncTransport.h @@ -464,12 +464,6 @@ class AsyncReader { class AsyncWriter { public: - class BufferCallback { - public: - virtual ~BufferCallback() {} - virtual void onEgressBuffered() = 0; - }; - class WriteCallback { public: virtual ~WriteCallback() = default; @@ -499,15 +493,12 @@ class AsyncWriter { // Write methods that aren't part of AsyncTransport virtual void write(WriteCallback* callback, const void* buf, size_t bytes, - WriteFlags flags = WriteFlags::NONE, - BufferCallback* bufCallback = nullptr) = 0; + WriteFlags flags = WriteFlags::NONE) = 0; virtual void writev(WriteCallback* callback, const iovec* vec, size_t count, - WriteFlags flags = WriteFlags::NONE, - BufferCallback* bufCallback = nullptr) = 0; + WriteFlags flags = WriteFlags::NONE) = 0; virtual void writeChain(WriteCallback* callback, std::unique_ptr&& buf, - WriteFlags flags = WriteFlags::NONE, - BufferCallback* bufCallback = nullptr) = 0; + WriteFlags flags = WriteFlags::NONE) = 0; protected: virtual ~AsyncWriter() = default; @@ -525,19 +516,15 @@ class AsyncTransportWrapper : virtual public AsyncTransport, // to keep compatibility. using ReadCallback = AsyncReader::ReadCallback; using WriteCallback = AsyncWriter::WriteCallback; - using BufferCallback = AsyncWriter::BufferCallback; virtual void setReadCB(ReadCallback* callback) override = 0; virtual ReadCallback* getReadCallback() const override = 0; virtual void write(WriteCallback* callback, const void* buf, size_t bytes, - WriteFlags flags = WriteFlags::NONE, - BufferCallback* bufCallback = nullptr) override = 0; + WriteFlags flags = WriteFlags::NONE) override = 0; virtual void writev(WriteCallback* callback, const iovec* vec, size_t count, - WriteFlags flags = WriteFlags::NONE, - BufferCallback* bufCallback = nullptr) override = 0; + WriteFlags flags = WriteFlags::NONE) override = 0; virtual void writeChain(WriteCallback* callback, std::unique_ptr&& buf, - WriteFlags flags = WriteFlags::NONE, - BufferCallback* bufCallback = nullptr) override = 0; + WriteFlags flags = WriteFlags::NONE) override = 0; /** * The transport wrapper may wrap another transport. This returns the * transport that is wrapped. It returns nullptr if there is no wrapped diff --git a/folly/io/async/test/AsyncSocketTest.h b/folly/io/async/test/AsyncSocketTest.h index 5d52ad20..51230014 100644 --- a/folly/io/async/test/AsyncSocketTest.h +++ b/folly/io/async/test/AsyncSocketTest.h @@ -60,23 +60,6 @@ class ConnCallback : public AsyncSocket::ConnectCallback { VoidCallback errorCallback; }; -class BufferCallback : public AsyncTransportWrapper::BufferCallback { - public: - BufferCallback() - : buffered_(false) {} - - void onEgressBuffered() override { - buffered_ = true; - } - - bool hasBuffered() const { - return buffered_; - } - - private: - bool buffered_{false}; -}; - class WriteCallback : public AsyncTransportWrapper::WriteCallback { public: WriteCallback() diff --git a/folly/io/async/test/AsyncSocketTest2.cpp b/folly/io/async/test/AsyncSocketTest2.cpp index 81acc826..1a5ebeb8 100644 --- a/folly/io/async/test/AsyncSocketTest2.cpp +++ b/folly/io/async/test/AsyncSocketTest2.cpp @@ -2238,32 +2238,3 @@ TEST(AsyncSocketTest, NumPendingMessagesInQueue) { eventBase.loop(); } - -TEST(AsyncSocketTest, BufferTest) { - TestServer server; - - EventBase evb; - AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}}; - std::shared_ptr socket = AsyncSocket::newSocket(&evb); - ConnCallback ccb; - socket->connect(&ccb, server.getAddress(), 30, option); - - - char buf[100 * 1024]; - memset(buf, 'c', sizeof(buf)); - WriteCallback wcb; - BufferCallback bcb; - socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE, &bcb); - - evb.loop(); - CHECK_EQ(ccb.state, STATE_SUCCEEDED); - CHECK_EQ(wcb.state, STATE_SUCCEEDED); - - ASSERT_TRUE(bcb.hasBuffered()); - - socket->close(); - server.verifyConnection(buf, sizeof(buf)); - - ASSERT_TRUE(socket->isClosedBySelf()); - ASSERT_FALSE(socket->isClosedByPeer()); -} diff --git a/folly/io/async/test/MockAsyncTransport.h b/folly/io/async/test/MockAsyncTransport.h index bd091056..8cff00f7 100644 --- a/folly/io/async/test/MockAsyncTransport.h +++ b/folly/io/async/test/MockAsyncTransport.h @@ -27,31 +27,15 @@ class MockAsyncTransport: public AsyncTransportWrapper { MOCK_METHOD1(setReadCB, void(ReadCallback*)); MOCK_CONST_METHOD0(getReadCallback, ReadCallback*()); MOCK_CONST_METHOD0(getReadCB, ReadCallback*()); - MOCK_METHOD5(write, void(WriteCallback*, - const void*, size_t, - WriteFlags, - BufferCallback*)); - MOCK_METHOD5(writev, void(WriteCallback*, - const iovec*, size_t, - WriteFlags, - BufferCallback*)); - MOCK_METHOD4(writeChain, - void(WriteCallback*, - std::shared_ptr, - WriteFlags, - BufferCallback*)); - + MOCK_METHOD4(write, void(WriteCallback*, const void*, size_t, WriteFlags)); + MOCK_METHOD4(writev, void(WriteCallback*, const iovec*, size_t, WriteFlags)); + MOCK_METHOD3(writeChain, + void(WriteCallback*, std::shared_ptr, WriteFlags)); void writeChain(WriteCallback* callback, std::unique_ptr&& iob, - WriteFlags flags = - WriteFlags::NONE, - BufferCallback* bufCB = nullptr) override { - writeChain( - callback, - std::shared_ptr(iob.release()), - flags, - bufCB); + WriteFlags flags = WriteFlags::NONE) override { + writeChain(callback, std::shared_ptr(iob.release()), flags); } MOCK_METHOD0(close, void());