From: Yuri Putivsky Date: Wed, 3 Aug 2016 19:02:26 +0000 (-0700) Subject: folly AsyncPipeReader supports IOBuf X-Git-Tag: v2016.08.08.00~32 X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=commitdiff_plain;h=2bee060d49f2a26a4d7f345ab6c682f43a2b4a0a folly AsyncPipeReader supports IOBuf Summary: folly AsyncPipeReader takes a callback of type AsyncReader::ReadCallback. Now AsyncReader::ReadCallback class supports IOBuf as a buffer for transfer read bytes. Need to extend AsyncPipeReader class to support IOBuf as well Reviewed By: yfeldblum Differential Revision: D3650893 fbshipit-source-id: e2142341c8b8b0b2ef248c1f13a8caba9d50ba67 --- diff --git a/folly/io/async/AsyncPipe.cpp b/folly/io/async/AsyncPipe.cpp index e38cc409..06f8c911 100644 --- a/folly/io/async/AsyncPipe.cpp +++ b/folly/io/async/AsyncPipe.cpp @@ -62,36 +62,57 @@ void AsyncPipeReader::handlerReady(uint16_t events) noexcept { assert(readCallback_ != nullptr); while (readCallback_) { + // - What API does callback support? + const auto movable = readCallback_->isBufferMovable(); // noexcept + // Get the buffer to read into. void* buf = nullptr; size_t buflen = 0; - try { - readCallback_->getReadBuffer(&buf, &buflen); - } catch (const std::exception& ex) { - AsyncSocketException aex(AsyncSocketException::BAD_ARGS, - string("ReadCallback::getReadBuffer() " - "threw exception: ") + ex.what()); - failRead(aex); - return; - } catch (...) { - AsyncSocketException ex(AsyncSocketException::BAD_ARGS, - string("ReadCallback::getReadBuffer() " - "threw non-exception type")); - failRead(ex); - return; - } - if (buf == nullptr || buflen == 0) { - AsyncSocketException ex(AsyncSocketException::INVALID_STATE, - string("ReadCallback::getReadBuffer() " - "returned empty buffer")); - failRead(ex); - return; + std::unique_ptr ioBuf; + + if (movable) { + ioBuf = IOBuf::create(readCallback_->maxBufferSize()); + buf = ioBuf->writableBuffer(); + buflen = ioBuf->capacity(); + } else { + try { + readCallback_->getReadBuffer(&buf, &buflen); + } catch (const std::exception& ex) { + AsyncSocketException aex( + AsyncSocketException::BAD_ARGS, + string("ReadCallback::getReadBuffer() " + "threw exception: ") + + ex.what()); + failRead(aex); + return; + } catch (...) { + AsyncSocketException aex( + AsyncSocketException::BAD_ARGS, + string("ReadCallback::getReadBuffer() " + "threw non-exception type")); + failRead(aex); + return; + } + if (buf == nullptr || buflen == 0) { + AsyncSocketException aex( + AsyncSocketException::INVALID_STATE, + string("ReadCallback::getReadBuffer() " + "returned empty buffer")); + failRead(aex); + return; + } } // Perform the read ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen); + if (bytesRead > 0) { - readCallback_->readDataAvailable(bytesRead); + if (movable) { + ioBuf->append(bytesRead); + readCallback_->readBufferAvailable(std::move(ioBuf)); + } else { + readCallback_->readDataAvailable(bytesRead); + } // Fall through and continue around the loop if the read // completely filled the available buffer. // Note that readCallback_ may have been uninstalled or changed inside diff --git a/folly/io/async/AsyncTransport.h b/folly/io/async/AsyncTransport.h index 632a782b..a1023b2c 100644 --- a/folly/io/async/AsyncTransport.h +++ b/folly/io/async/AsyncTransport.h @@ -471,6 +471,15 @@ class AsyncReader { return false; } + /** + * Suggested buffer size, allocated for read operations, + * if callback is movable and supports folly::IOBuf + */ + + virtual size_t maxBufferSize() const { + return 64 * 1024; // 64K + } + /** * readBufferAvailable() will be invoked when data has been successfully * read. diff --git a/folly/io/async/test/AsyncPipeTest.cpp b/folly/io/async/test/AsyncPipeTest.cpp index af74811d..2a9c426f 100644 --- a/folly/io/async/test/AsyncPipeTest.cpp +++ b/folly/io/async/test/AsyncPipeTest.cpp @@ -27,6 +27,18 @@ namespace { class TestReadCallback : public folly::AsyncReader::ReadCallback { public: + bool isBufferMovable() noexcept override { + return movable_; + } + void setMovable(bool movable) { + movable_ = movable; + } + + void readBufferAvailable( + std::unique_ptr readBuf) noexcept override { + readBuffer_.append(std::move(readBuf)); + } + void readDataAvailable(size_t len) noexcept override { readBuffer_.postallocate(len); } @@ -49,8 +61,15 @@ class TestReadCallback : public folly::AsyncReader::ReadCallback { return std::string((char *)buf->data(), buf->length()); } + void reset() { + movable_ = false; + error_ = false; + readBuffer_.clear(); + } + folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()}; bool error_{false}; + bool movable_{false}; }; class TestWriteCallback : public folly::AsyncWriter::WriteCallback { @@ -61,13 +80,23 @@ class TestWriteCallback : public folly::AsyncWriter::WriteCallback { error_ = true; } + void reset() { + writes_ = 0; + error_ = false; + } + uint32_t writes_{0}; bool error_{false}; }; class AsyncPipeTest: public Test { public: - void SetUp() override { + void reset(bool movable) { + reader_.reset(); + readCallback_.reset(); + writer_.reset(); + writeCallback_.reset(); + int rc = pipe(pipeFds_); EXPECT_EQ(rc, 0); @@ -77,6 +106,8 @@ class AsyncPipeTest: public Test { &eventBase_, pipeFds_[0]); writer_ = folly::AsyncPipeWriter::newWriter( &eventBase_, pipeFds_[1]); + + readCallback_.setMovable(movable); } protected: @@ -97,46 +128,55 @@ std::unique_ptr getBuf(const std::string& data) { TEST_F(AsyncPipeTest, simple) { - reader_->setReadCB(&readCallback_); - writer_->write(getBuf("hello"), &writeCallback_); - writer_->closeOnEmpty(); - eventBase_.loop(); - EXPECT_EQ(readCallback_.getData(), "hello"); - EXPECT_FALSE(readCallback_.error_); - EXPECT_EQ(writeCallback_.writes_, 1); - EXPECT_FALSE(writeCallback_.error_); + for (int pass = 0; pass < 2; ++pass) { + reset(pass % 2 != 0); + reader_->setReadCB(&readCallback_); + writer_->write(getBuf("hello"), &writeCallback_); + writer_->closeOnEmpty(); + eventBase_.loop(); + EXPECT_EQ(readCallback_.getData(), "hello"); + EXPECT_FALSE(readCallback_.error_); + EXPECT_EQ(writeCallback_.writes_, 1); + EXPECT_FALSE(writeCallback_.error_); + } } TEST_F(AsyncPipeTest, blocked_writes) { - uint32_t writeAttempts = 0; - do { - ++writeAttempts; - writer_->write(getBuf("hello"), &writeCallback_); - } while (writeCallback_.writes_ == writeAttempts); - // there is one blocked write - writer_->closeOnEmpty(); - - reader_->setReadCB(&readCallback_); - - eventBase_.loop(); - std::string expected; - for (uint32_t i = 0; i < writeAttempts; i++) { - expected += "hello"; + for (int pass = 0; pass < 2; ++pass) { + reset(pass % 2 != 0); + uint32_t writeAttempts = 0; + do { + ++writeAttempts; + writer_->write(getBuf("hello"), &writeCallback_); + } while (writeCallback_.writes_ == writeAttempts); + // there is one blocked write + writer_->closeOnEmpty(); + + reader_->setReadCB(&readCallback_); + + eventBase_.loop(); + std::string expected; + for (uint32_t i = 0; i < writeAttempts; i++) { + expected += "hello"; + } + EXPECT_EQ(readCallback_.getData(), expected); + EXPECT_FALSE(readCallback_.error_); + EXPECT_EQ(writeCallback_.writes_, writeAttempts); + EXPECT_FALSE(writeCallback_.error_); } - EXPECT_EQ(readCallback_.getData(), expected); - EXPECT_FALSE(readCallback_.error_); - EXPECT_EQ(writeCallback_.writes_, writeAttempts); - EXPECT_FALSE(writeCallback_.error_); } TEST_F(AsyncPipeTest, writeOnClose) { - reader_->setReadCB(&readCallback_); - writer_->write(getBuf("hello"), &writeCallback_); - writer_->closeOnEmpty(); - writer_->write(getBuf("hello"), &writeCallback_); - eventBase_.loop(); - EXPECT_EQ(readCallback_.getData(), "hello"); - EXPECT_FALSE(readCallback_.error_); - EXPECT_EQ(writeCallback_.writes_, 1); - EXPECT_TRUE(writeCallback_.error_); + for (int pass = 0; pass < 2; ++pass) { + reset(pass % 2 != 0); + reader_->setReadCB(&readCallback_); + writer_->write(getBuf("hello"), &writeCallback_); + writer_->closeOnEmpty(); + writer_->write(getBuf("hello"), &writeCallback_); + eventBase_.loop(); + EXPECT_EQ(readCallback_.getData(), "hello"); + EXPECT_FALSE(readCallback_.error_); + EXPECT_EQ(writeCallback_.writes_, 1); + EXPECT_TRUE(writeCallback_.error_); + } }