class TestReadCallback : public folly::AsyncReader::ReadCallback {
public:
+ bool isBufferMovable() noexcept override {
+ return movable_;
+ }
+ void setMovable(bool movable) {
+ movable_ = movable;
+ }
+
+ void readBufferAvailable(
+ std::unique_ptr<folly::IOBuf> readBuf) noexcept override {
+ readBuffer_.append(std::move(readBuf));
+ }
+
void readDataAvailable(size_t len) noexcept override {
readBuffer_.postallocate(len);
}
return std::string((char *)buf->data(), buf->length());
}
+ void reset() {
+ movable_ = false;
+ error_ = false;
+ readBuffer_.clear();
+ }
+
folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()};
bool error_{false};
+ bool movable_{false};
};
class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
error_ = true;
}
+ void reset() {
+ writes_ = 0;
+ error_ = false;
+ }
+
uint32_t writes_{0};
bool error_{false};
};
class AsyncPipeTest: public Test {
public:
- void SetUp() override {
+ void reset(bool movable) {
+ reader_.reset();
+ readCallback_.reset();
+ writer_.reset();
+ writeCallback_.reset();
+
int rc = pipe(pipeFds_);
EXPECT_EQ(rc, 0);
&eventBase_, pipeFds_[0]);
writer_ = folly::AsyncPipeWriter::newWriter(
&eventBase_, pipeFds_[1]);
+
+ readCallback_.setMovable(movable);
}
protected:
TEST_F(AsyncPipeTest, simple) {
- reader_->setReadCB(&readCallback_);
- writer_->write(getBuf("hello"), &writeCallback_);
- writer_->closeOnEmpty();
- eventBase_.loop();
- EXPECT_EQ(readCallback_.getData(), "hello");
- EXPECT_FALSE(readCallback_.error_);
- EXPECT_EQ(writeCallback_.writes_, 1);
- EXPECT_FALSE(writeCallback_.error_);
+ for (int pass = 0; pass < 2; ++pass) {
+ reset(pass % 2 != 0);
+ reader_->setReadCB(&readCallback_);
+ writer_->write(getBuf("hello"), &writeCallback_);
+ writer_->closeOnEmpty();
+ eventBase_.loop();
+ EXPECT_EQ(readCallback_.getData(), "hello");
+ EXPECT_FALSE(readCallback_.error_);
+ EXPECT_EQ(writeCallback_.writes_, 1);
+ EXPECT_FALSE(writeCallback_.error_);
+ }
}
TEST_F(AsyncPipeTest, blocked_writes) {
- uint32_t writeAttempts = 0;
- do {
- ++writeAttempts;
- writer_->write(getBuf("hello"), &writeCallback_);
- } while (writeCallback_.writes_ == writeAttempts);
- // there is one blocked write
- writer_->closeOnEmpty();
-
- reader_->setReadCB(&readCallback_);
-
- eventBase_.loop();
- std::string expected;
- for (uint32_t i = 0; i < writeAttempts; i++) {
- expected += "hello";
+ for (int pass = 0; pass < 2; ++pass) {
+ reset(pass % 2 != 0);
+ uint32_t writeAttempts = 0;
+ do {
+ ++writeAttempts;
+ writer_->write(getBuf("hello"), &writeCallback_);
+ } while (writeCallback_.writes_ == writeAttempts);
+ // there is one blocked write
+ writer_->closeOnEmpty();
+
+ reader_->setReadCB(&readCallback_);
+
+ eventBase_.loop();
+ std::string expected;
+ for (uint32_t i = 0; i < writeAttempts; i++) {
+ expected += "hello";
+ }
+ EXPECT_EQ(readCallback_.getData(), expected);
+ EXPECT_FALSE(readCallback_.error_);
+ EXPECT_EQ(writeCallback_.writes_, writeAttempts);
+ EXPECT_FALSE(writeCallback_.error_);
}
- EXPECT_EQ(readCallback_.getData(), expected);
- EXPECT_FALSE(readCallback_.error_);
- EXPECT_EQ(writeCallback_.writes_, writeAttempts);
- EXPECT_FALSE(writeCallback_.error_);
}
TEST_F(AsyncPipeTest, writeOnClose) {
- reader_->setReadCB(&readCallback_);
- writer_->write(getBuf("hello"), &writeCallback_);
- writer_->closeOnEmpty();
- writer_->write(getBuf("hello"), &writeCallback_);
- eventBase_.loop();
- EXPECT_EQ(readCallback_.getData(), "hello");
- EXPECT_FALSE(readCallback_.error_);
- EXPECT_EQ(writeCallback_.writes_, 1);
- EXPECT_TRUE(writeCallback_.error_);
+ for (int pass = 0; pass < 2; ++pass) {
+ reset(pass % 2 != 0);
+ reader_->setReadCB(&readCallback_);
+ writer_->write(getBuf("hello"), &writeCallback_);
+ writer_->closeOnEmpty();
+ writer_->write(getBuf("hello"), &writeCallback_);
+ eventBase_.loop();
+ EXPECT_EQ(readCallback_.getData(), "hello");
+ EXPECT_FALSE(readCallback_.error_);
+ EXPECT_EQ(writeCallback_.writes_, 1);
+ EXPECT_TRUE(writeCallback_.error_);
+ }
}