Remove per-write buffer callback from AsyncSocket
authorYang Chi <yangchi@fb.com>
Mon, 14 Dec 2015 23:33:46 +0000 (15:33 -0800)
committerfacebook-github-bot-1 <folly-bot@fb.com>
Tue, 15 Dec 2015 00:20:23 +0000 (16:20 -0800)
Summary: Remove per-write buffer callback from AsyncSocket

Reviewed By: afrind

Differential Revision: D2733142

fb-gh-sync-id: 91214a8c833bbd479bf049c2bb72d660e0c30f50

folly/io/async/AsyncPipe.cpp
folly/io/async/AsyncPipe.h
folly/io/async/AsyncSocket.cpp
folly/io/async/AsyncSocket.h
folly/io/async/AsyncTransport.h
folly/io/async/test/AsyncSocketTest.h
folly/io/async/test/AsyncSocketTest2.cpp
folly/io/async/test/MockAsyncTransport.h

index b4263346917849e7ec7d8b9b3dc57a190849543c..206f455765fab3d2e366901196ee0d1e9d1dccf8 100644 (file)
@@ -148,8 +148,7 @@ void AsyncPipeWriter::write(unique_ptr<folly::IOBuf> buf,
 
 void AsyncPipeWriter::writeChain(folly::AsyncWriter::WriteCallback* callback,
                                  std::unique_ptr<folly::IOBuf>&& buf,
-                                 WriteFlags,
-                                 BufferCallback*) {
+                                 WriteFlags) {
   write(std::move(buf), callback);
 }
 
index efa659b24687a524536ed9ca30925a896d579950..6f9f344b7355c2dad57c5c4ebfd05ed3d6712a82 100644 (file)
@@ -147,20 +147,21 @@ class AsyncPipeWriter : public EventHandler,
   }
 
   // AsyncWriter methods
-  void write(folly::AsyncWriter::WriteCallback* callback, const void* buf,
-             size_t bytes, WriteFlags flags = WriteFlags::NONE,
-             BufferCallback* bufCallback = nullptr) override {
-    writeChain(callback, IOBuf::wrapBuffer(buf, bytes), flags, bufCallback);
+  void write(folly::AsyncWriter::WriteCallback* callback,
+             const void* buf,
+             size_t bytes,
+             WriteFlags flags = WriteFlags::NONE) override {
+    writeChain(callback, IOBuf::wrapBuffer(buf, bytes), flags);
   }
-  void writev(folly::AsyncWriter::WriteCallback*, const iovec*,
-              size_t, WriteFlags = WriteFlags::NONE,
-              BufferCallback* = nullptr) override {
+  void writev(folly::AsyncWriter::WriteCallback*,
+              const iovec*,
+              size_t,
+              WriteFlags = WriteFlags::NONE) override {
     throw std::runtime_error("writev is not supported. Please use writeChain.");
   }
   void writeChain(folly::AsyncWriter::WriteCallback* callback,
                   std::unique_ptr<folly::IOBuf>&& buf,
-                  WriteFlags flags = WriteFlags::NONE,
-                  BufferCallback* bufCallback = nullptr) override;
+                  WriteFlags flags = WriteFlags::NONE) override;
 
  private:
   void handlerReady(uint16_t events) noexcept override;
index 373066e0a40fd07ebb7048ae752e9a32ef7d9deb..782a4ec41e161f9d3312d7616809c358a1e3babe 100644 (file)
@@ -63,16 +63,14 @@ const AsyncSocketException socketShutdownForWritesEx(
  */
 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
  public:
-  static BytesWriteRequest* newRequest(
-      AsyncSocket* socket,
-      WriteCallback* callback,
-      const iovec* ops,
-      uint32_t opCount,
-      uint32_t partialWritten,
-      uint32_t bytesWritten,
-      unique_ptr<IOBuf>&& ioBuf,
-      WriteFlags flags,
-      BufferCallback* bufferCallback = nullptr) {
+  static BytesWriteRequest* newRequest(AsyncSocket* socket,
+                                       WriteCallback* callback,
+                                       const iovec* ops,
+                                       uint32_t opCount,
+                                       uint32_t partialWritten,
+                                       uint32_t bytesWritten,
+                                       unique_ptr<IOBuf>&& ioBuf,
+                                       WriteFlags flags) {
     assert(opCount > 0);
     // Since we put a variable size iovec array at the end
     // of each BytesWriteRequest, we have to manually allocate the memory.
@@ -84,7 +82,7 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
 
     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
                                       partialWritten, bytesWritten,
-                                      std::move(ioBuf), flags, bufferCallback);
+                                      std::move(ioBuf), flags);
   }
 
   void destroy() override {
@@ -138,9 +136,8 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
                     uint32_t partialBytes,
                     uint32_t bytesWritten,
                     unique_ptr<IOBuf>&& ioBuf,
-                    WriteFlags flags,
-                    BufferCallback* bufferCallback = nullptr)
-    : AsyncSocket::WriteRequest(socket, callback, bufferCallback)
+                    WriteFlags flags)
+    : AsyncSocket::WriteRequest(socket, callback)
     , opCount_(opCount)
     , opIndex_(0)
     , flags_(flags)
@@ -611,46 +608,43 @@ AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
 }
 
 void AsyncSocket::write(WriteCallback* callback,
-                         const void* buf, size_t bytes, WriteFlags flags,
-                         BufferCallback* bufCallback) {
+                         const void* buf, size_t bytes, WriteFlags flags) {
   iovec op;
   op.iov_base = const_cast<void*>(buf);
   op.iov_len = bytes;
-  writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags, bufCallback);
+  writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
 }
 
 void AsyncSocket::writev(WriteCallback* callback,
                           const iovec* vec,
                           size_t count,
-                          WriteFlags flags,
-                          BufferCallback* bufCallback) {
-  writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags, bufCallback);
+                          WriteFlags flags) {
+  writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
 }
 
 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
-                              WriteFlags flags, BufferCallback* bufCallback) {
+                              WriteFlags flags) {
   constexpr size_t kSmallSizeMax = 64;
   size_t count = buf->countChainElements();
   if (count <= kSmallSizeMax) {
     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
-    writeChainImpl(callback, vec, count, std::move(buf), flags, bufCallback);
+    writeChainImpl(callback, vec, count, std::move(buf), flags);
   } else {
     iovec* vec = new iovec[count];
-    writeChainImpl(callback, vec, count, std::move(buf), flags, bufCallback);
+    writeChainImpl(callback, vec, count, std::move(buf), flags);
     delete[] vec;
   }
 }
 
 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
-    size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags,
-    BufferCallback* bufCallback) {
+    size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
   size_t veclen = buf->fillIov(vec, count);
-  writeImpl(callback, vec, veclen, std::move(buf), flags, bufCallback);
+  writeImpl(callback, vec, veclen, std::move(buf), flags);
 }
 
 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
                              size_t count, unique_ptr<IOBuf>&& buf,
-                             WriteFlags flags, BufferCallback* bufCallback) {
+                             WriteFlags flags) {
   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
           << ", callback=" << callback << ", count=" << count
           << ", state=" << state_;
@@ -694,11 +688,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
           callback->writeSuccess();
         }
         return;
-      } else { // continue writing the next writeReq
-        if (bufCallback) {
-          bufCallback->onEgressBuffered();
-        }
-      }
+      } // else { continue writing the next writeReq }
       mustRegister = true;
     }
   } else if (!connecting()) {
@@ -711,8 +701,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
   try {
     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
                                         count - countWritten, partialWritten,
-                                        bytesWritten, std::move(ioBuf), flags,
-                                        bufCallback);
+                                        bytesWritten, std::move(ioBuf), flags);
   } catch (const std::exception& ex) {
     // we mainly expect to catch std::bad_alloc here
     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
@@ -1515,11 +1504,6 @@ void AsyncSocket::handleWrite() noexcept {
       }
       // We'll continue around the loop, trying to write another request
     } else {
-      // Notify BufferCallback:
-      BufferCallback* bufferCallback = writeReqHead_->getBufferCallback();
-      if (bufferCallback) {
-        bufferCallback->onEgressBuffered();
-      }
       // Partial write.
       writeReqHead_->consume();
       // Stop after a partial write; it's highly likely that a subsequent write
index d158878dac2922adbc78e6eed9df8bea87a9cca3..e813216f6405df368da50588ca303c2b6e6a8d62 100644 (file)
@@ -328,15 +328,12 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
   ReadCallback* getReadCallback() const override;
 
   void write(WriteCallback* callback, const void* buf, size_t bytes,
-             WriteFlags flags = WriteFlags::NONE,
-             BufferCallback* bufCallback = nullptr) override;
+             WriteFlags flags = WriteFlags::NONE) override;
   void writev(WriteCallback* callback, const iovec* vec, size_t count,
-              WriteFlags flags = WriteFlags::NONE,
-              BufferCallback* bufCallback = nullptr) override;
+              WriteFlags flags = WriteFlags::NONE) override;
   void writeChain(WriteCallback* callback,
                   std::unique_ptr<folly::IOBuf>&& buf,
-                  WriteFlags flags = WriteFlags::NONE,
-                  BufferCallback* bufCallback = nullptr) override;
+                  WriteFlags flags = WriteFlags::NONE) override;
 
   class WriteRequest;
   virtual void writeRequest(WriteRequest* req);
@@ -518,11 +515,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
    */
   class WriteRequest {
    public:
-    WriteRequest(
-        AsyncSocket* socket,
-        WriteCallback* callback,
-        BufferCallback* bufferCallback = nullptr) :
-      socket_(socket), callback_(callback), bufferCallback_(bufferCallback) {}
+    WriteRequest(AsyncSocket* socket, WriteCallback* callback) :
+      socket_(socket), callback_(callback) {}
 
     virtual void start() {};
 
@@ -560,10 +554,6 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
       socket_->appBytesWritten_ += count;
     }
 
-    BufferCallback* getBufferCallback() const {
-      return bufferCallback_;
-    }
-
    protected:
     // protected destructor, to ensure callers use destroy()
     virtual ~WriteRequest() {}
@@ -572,7 +562,6 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
     WriteRequest* next_{nullptr};          ///< pointer to next WriteRequest
     WriteCallback* callback_;     ///< completion callback
     uint32_t totalBytesWritten_{0};  ///< total bytes written
-    BufferCallback* bufferCallback_{nullptr};
   };
 
  protected:
@@ -696,39 +685,36 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
   /**
    * Populate an iovec array from an IOBuf and attempt to write it.
    *
-   * @param callback    Write completion/error callback.
-   * @param vec         Target iovec array; caller retains ownership.
-   * @param count       Number of IOBufs to write, beginning at start of buf.
-   * @param buf         Chain of iovecs.
-   * @param flags       set of flags for the underlying write calls, like cork
-   * @param bufCallback Callback when egress data begins to buffer
+   * @param callback Write completion/error callback.
+   * @param vec      Target iovec array; caller retains ownership.
+   * @param count    Number of IOBufs to write, beginning at start of buf.
+   * @param buf      Chain of iovecs.
+   * @param flags    set of flags for the underlying write calls, like cork
    */
   void writeChainImpl(WriteCallback* callback, iovec* vec,
                       size_t count, std::unique_ptr<folly::IOBuf>&& buf,
-                      WriteFlags flags, BufferCallback* bufCallback = nullptr);
+                      WriteFlags flags);
 
   /**
    * Write as much data as possible to the socket without blocking,
    * and queue up any leftover data to send when the socket can
    * handle writes again.
    *
-   * @param callback    The callback to invoke when the write is completed.
-   * @param vec         Array of buffers to write; this method will make a
-   *                    copy of the vector (but not the buffers themselves)
-   *                    if the write has to be completed asynchronously.
-   * @param count       Number of elements in vec.
-   * @param buf         The IOBuf that manages the buffers referenced by
-   *                    vec, or a pointer to nullptr if the buffers are not
-   *                    associated with an IOBuf.  Note that ownership of
-   *                    the IOBuf is transferred here; upon completion of
-   *                    the write, the AsyncSocket deletes the IOBuf.
-   * @param flags       Set of write flags.
-   * @param bufCallback Callback when egress data buffers up
+   * @param callback The callback to invoke when the write is completed.
+   * @param vec      Array of buffers to write; this method will make a
+   *                 copy of the vector (but not the buffers themselves)
+   *                 if the write has to be completed asynchronously.
+   * @param count    Number of elements in vec.
+   * @param buf      The IOBuf that manages the buffers referenced by
+   *                 vec, or a pointer to nullptr if the buffers are not
+   *                 associated with an IOBuf.  Note that ownership of
+   *                 the IOBuf is transferred here; upon completion of
+   *                 the write, the AsyncSocket deletes the IOBuf.
+   * @param flags    Set of write flags.
    */
   void writeImpl(WriteCallback* callback, const iovec* vec, size_t count,
                  std::unique_ptr<folly::IOBuf>&& buf,
-                 WriteFlags flags = WriteFlags::NONE,
-                 BufferCallback* bufCallback = nullptr);
+                 WriteFlags flags = WriteFlags::NONE);
 
   /**
    * Attempt to write to the socket.
index 8fdbefce013f869f5c81d7cbe6a2a40c4855daed..40dca58b8e18acb6f61303008268cf7d445490e6 100644 (file)
@@ -464,12 +464,6 @@ class AsyncReader {
 
 class AsyncWriter {
  public:
-  class BufferCallback {
-   public:
-    virtual ~BufferCallback() {}
-    virtual void onEgressBuffered() = 0;
-  };
-
   class WriteCallback {
    public:
     virtual ~WriteCallback() = default;
@@ -499,15 +493,12 @@ class AsyncWriter {
 
   // Write methods that aren't part of AsyncTransport
   virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
-                     WriteFlags flags = WriteFlags::NONE,
-                     BufferCallback* bufCallback = nullptr) = 0;
+                     WriteFlags flags = WriteFlags::NONE) = 0;
   virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
-                      WriteFlags flags = WriteFlags::NONE,
-                      BufferCallback* bufCallback = nullptr) = 0;
+                      WriteFlags flags = WriteFlags::NONE) = 0;
   virtual void writeChain(WriteCallback* callback,
                           std::unique_ptr<IOBuf>&& buf,
-                          WriteFlags flags = WriteFlags::NONE,
-                          BufferCallback* bufCallback = nullptr) = 0;
+                          WriteFlags flags = WriteFlags::NONE) = 0;
 
  protected:
   virtual ~AsyncWriter() = default;
@@ -525,19 +516,15 @@ class AsyncTransportWrapper : virtual public AsyncTransport,
   // to keep compatibility.
   using ReadCallback    = AsyncReader::ReadCallback;
   using WriteCallback   = AsyncWriter::WriteCallback;
-  using BufferCallback  = AsyncWriter::BufferCallback;
   virtual void setReadCB(ReadCallback* callback) override = 0;
   virtual ReadCallback* getReadCallback() const override = 0;
   virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
-                     WriteFlags flags = WriteFlags::NONE,
-                     BufferCallback* bufCallback = nullptr) override = 0;
+                     WriteFlags flags = WriteFlags::NONE) override = 0;
   virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
-                      WriteFlags flags = WriteFlags::NONE,
-                      BufferCallback* bufCallback = nullptr) override = 0;
+                      WriteFlags flags = WriteFlags::NONE) override = 0;
   virtual void writeChain(WriteCallback* callback,
                           std::unique_ptr<IOBuf>&& buf,
-                          WriteFlags flags = WriteFlags::NONE,
-                          BufferCallback* bufCallback = nullptr) override = 0;
+                          WriteFlags flags = WriteFlags::NONE) override = 0;
   /**
    * The transport wrapper may wrap another transport. This returns the
    * transport that is wrapped. It returns nullptr if there is no wrapped
index 5d52ad204f68faa2859d91ec4c4c8bb42b58126d..51230014203c038cda8fabb1872c61a1b6623ff5 100644 (file)
@@ -60,23 +60,6 @@ class ConnCallback : public AsyncSocket::ConnectCallback {
   VoidCallback errorCallback;
 };
 
-class BufferCallback : public AsyncTransportWrapper::BufferCallback {
- public:
-  BufferCallback()
-    : buffered_(false) {}
-
-  void onEgressBuffered() override {
-    buffered_ = true;
-  }
-
-  bool hasBuffered() const {
-    return buffered_;
-  }
-
- private:
-  bool buffered_{false};
-};
-
 class WriteCallback : public AsyncTransportWrapper::WriteCallback {
  public:
   WriteCallback()
index 81acc8265a1e74f07d96185ffe9a19b4130628e1..1a5ebeb84e1d79f9d5a18f62161e90b7a6cd1495 100644 (file)
@@ -2238,32 +2238,3 @@ TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
 
   eventBase.loop();
 }
-
-TEST(AsyncSocketTest, BufferTest) {
-  TestServer server;
-
-  EventBase evb;
-  AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
-  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
-  ConnCallback ccb;
-  socket->connect(&ccb, server.getAddress(), 30, option);
-
-
-  char buf[100 * 1024];
-  memset(buf, 'c', sizeof(buf));
-  WriteCallback wcb;
-  BufferCallback bcb;
-  socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE, &bcb);
-
-  evb.loop();
-  CHECK_EQ(ccb.state, STATE_SUCCEEDED);
-  CHECK_EQ(wcb.state, STATE_SUCCEEDED);
-
-  ASSERT_TRUE(bcb.hasBuffered());
-
-  socket->close();
-  server.verifyConnection(buf, sizeof(buf));
-
-  ASSERT_TRUE(socket->isClosedBySelf());
-  ASSERT_FALSE(socket->isClosedByPeer());
-}
index bd091056514341e8348212be7622cd81eb5f9c00..8cff00f7ab214ca6c29f70a64e25673f795a023b 100644 (file)
@@ -27,31 +27,15 @@ class MockAsyncTransport: public AsyncTransportWrapper {
   MOCK_METHOD1(setReadCB, void(ReadCallback*));
   MOCK_CONST_METHOD0(getReadCallback, ReadCallback*());
   MOCK_CONST_METHOD0(getReadCB, ReadCallback*());
-  MOCK_METHOD5(write, void(WriteCallback*,
-                           const void*, size_t,
-                           WriteFlags,
-                           BufferCallback*));
-  MOCK_METHOD5(writev, void(WriteCallback*,
-                            const iovec*, size_t,
-                            WriteFlags,
-                            BufferCallback*));
-  MOCK_METHOD4(writeChain,
-               void(WriteCallback*,
-                    std::shared_ptr<folly::IOBuf>,
-                    WriteFlags,
-                    BufferCallback*));
-
+  MOCK_METHOD4(write, void(WriteCallback*, const void*, size_t, WriteFlags));
+  MOCK_METHOD4(writev, void(WriteCallback*, const iovec*, size_t, WriteFlags));
+  MOCK_METHOD3(writeChain,
+               void(WriteCallback*, std::shared_ptr<folly::IOBuf>, WriteFlags));
 
   void writeChain(WriteCallback* callback,
                   std::unique_ptr<folly::IOBuf>&& iob,
-                  WriteFlags flags =
-                  WriteFlags::NONE,
-                  BufferCallback* bufCB = nullptr) override {
-    writeChain(
-        callback,
-        std::shared_ptr<folly::IOBuf>(iob.release()),
-        flags,
-        bufCB);
+                  WriteFlags flags = WriteFlags::NONE) override {
+    writeChain(callback, std::shared_ptr<folly::IOBuf>(iob.release()), flags);
   }
 
   MOCK_METHOD0(close, void());