2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/async/AsyncPipe.h>
18 #include <folly/io/async/EventBase.h>
19 #include <folly/Memory.h>
20 #include <folly/portability/GTest.h>
24 using namespace testing;
28 class TestReadCallback : public folly::AsyncReader::ReadCallback {
30 bool isBufferMovable() noexcept override {
33 void setMovable(bool movable) {
37 void readBufferAvailable(
38 std::unique_ptr<folly::IOBuf> readBuf) noexcept override {
39 readBuffer_.append(std::move(readBuf));
42 void readDataAvailable(size_t len) noexcept override {
43 readBuffer_.postallocate(len);
46 void getReadBuffer(void** bufReturn, size_t* lenReturn) noexcept override {
47 auto res = readBuffer_.preallocate(4000, 65000);
48 *bufReturn = res.first;
49 *lenReturn = res.second;
52 void readEOF() noexcept override {}
54 void readErr(const folly::AsyncSocketException&) noexcept override {
58 std::string getData() {
59 auto buf = readBuffer_.move();
61 return std::string((char *)buf->data(), buf->length());
70 folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()};
75 class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
77 void writeSuccess() noexcept override { writes_++; }
79 void writeErr(size_t, const folly::AsyncSocketException&) noexcept override {
92 class AsyncPipeTest: public Test {
94 void reset(bool movable) {
96 readCallback_.reset();
98 writeCallback_.reset();
100 int rc = pipe(pipeFds_);
103 EXPECT_EQ(::fcntl(pipeFds_[0], F_SETFL, O_NONBLOCK), 0);
104 EXPECT_EQ(::fcntl(pipeFds_[1], F_SETFL, O_NONBLOCK), 0);
105 reader_ = folly::AsyncPipeReader::newReader(
106 &eventBase_, pipeFds_[0]);
107 writer_ = folly::AsyncPipeWriter::newWriter(
108 &eventBase_, pipeFds_[1]);
110 readCallback_.setMovable(movable);
114 folly::EventBase eventBase_;
116 folly::AsyncPipeReader::UniquePtr reader_;
117 folly::AsyncPipeWriter::UniquePtr writer_;
118 TestReadCallback readCallback_;
119 TestWriteCallback writeCallback_;
122 std::unique_ptr<folly::IOBuf> getBuf(const std::string& data) {
123 auto buf = folly::IOBuf::copyBuffer(data.c_str(), data.length());
127 } // anonymous namespace
130 TEST_F(AsyncPipeTest, simple) {
131 for (int pass = 0; pass < 2; ++pass) {
132 reset(pass % 2 != 0);
133 reader_->setReadCB(&readCallback_);
134 writer_->write(getBuf("hello"), &writeCallback_);
135 writer_->closeOnEmpty();
137 EXPECT_EQ(readCallback_.getData(), "hello");
138 EXPECT_FALSE(readCallback_.error_);
139 EXPECT_EQ(writeCallback_.writes_, 1);
140 EXPECT_FALSE(writeCallback_.error_);
144 TEST_F(AsyncPipeTest, blocked_writes) {
145 for (int pass = 0; pass < 2; ++pass) {
146 reset(pass % 2 != 0);
147 uint32_t writeAttempts = 0;
150 writer_->write(getBuf("hello"), &writeCallback_);
151 } while (writeCallback_.writes_ == writeAttempts);
152 // there is one blocked write
153 writer_->closeOnEmpty();
155 reader_->setReadCB(&readCallback_);
158 std::string expected;
159 for (uint32_t i = 0; i < writeAttempts; i++) {
162 EXPECT_EQ(readCallback_.getData(), expected);
163 EXPECT_FALSE(readCallback_.error_);
164 EXPECT_EQ(writeCallback_.writes_, writeAttempts);
165 EXPECT_FALSE(writeCallback_.error_);
169 TEST_F(AsyncPipeTest, writeOnClose) {
170 for (int pass = 0; pass < 2; ++pass) {
171 reset(pass % 2 != 0);
172 reader_->setReadCB(&readCallback_);
173 writer_->write(getBuf("hello"), &writeCallback_);
174 writer_->closeOnEmpty();
175 writer_->write(getBuf("hello"), &writeCallback_);
177 EXPECT_EQ(readCallback_.getData(), "hello");
178 EXPECT_FALSE(readCallback_.error_);
179 EXPECT_EQ(writeCallback_.writes_, 1);
180 EXPECT_TRUE(writeCallback_.error_);