2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/async/AsyncPipe.h>
18 #include <folly/io/async/EventBase.h>
19 #include <folly/Memory.h>
20 #include <gtest/gtest.h>
24 using namespace testing;
28 class TestReadCallback : public folly::AsyncReader::ReadCallback {
30 void readDataAvailable(size_t len) noexcept override {
31 readBuffer_.postallocate(len);
34 void getReadBuffer(void** bufReturn, size_t* lenReturn) noexcept override {
35 auto res = readBuffer_.preallocate(4000, 65000);
36 *bufReturn = res.first;
37 *lenReturn = res.second;
40 void readEOF() noexcept override {}
42 void readErr(const folly::AsyncSocketException&) noexcept override {
46 std::string getData() {
47 auto buf = readBuffer_.move();
49 return std::string((char *)buf->data(), buf->length());
52 folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()};
56 class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
58 void writeSuccess() noexcept override { writes_++; }
60 void writeErr(size_t, const folly::AsyncSocketException&) noexcept override {
68 class AsyncPipeTest: public Test {
70 void SetUp() override {
71 int rc = pipe(pipeFds_);
74 EXPECT_EQ(::fcntl(pipeFds_[0], F_SETFL, O_NONBLOCK), 0);
75 EXPECT_EQ(::fcntl(pipeFds_[1], F_SETFL, O_NONBLOCK), 0);
76 reader_ = folly::make_unique<folly::AsyncPipeReader,
77 folly::DelayedDestruction::Destructor>(
78 &eventBase_, pipeFds_[0]);
79 writer_ = folly::make_unique<folly::AsyncPipeWriter,
80 folly::DelayedDestruction::Destructor>(
81 &eventBase_, pipeFds_[1]);
85 folly::EventBase eventBase_;
87 folly::AsyncPipeReader::UniquePtr reader_;
88 folly::AsyncPipeWriter::UniquePtr writer_;
89 TestReadCallback readCallback_;
90 TestWriteCallback writeCallback_;
93 std::unique_ptr<folly::IOBuf> getBuf(const std::string& data) {
94 auto buf = folly::IOBuf::copyBuffer(data.c_str(), data.length());
98 } // anonymous namespace
101 TEST_F(AsyncPipeTest, simple) {
102 reader_->setReadCB(&readCallback_);
103 writer_->write(getBuf("hello"), &writeCallback_);
104 writer_->closeOnEmpty();
106 EXPECT_EQ(readCallback_.getData(), "hello");
107 EXPECT_FALSE(readCallback_.error_);
108 EXPECT_EQ(writeCallback_.writes_, 1);
109 EXPECT_FALSE(writeCallback_.error_);
112 TEST_F(AsyncPipeTest, blocked_writes) {
113 uint32_t writeAttempts = 0;
116 writer_->write(getBuf("hello"), &writeCallback_);
117 } while (writeCallback_.writes_ == writeAttempts);
118 // there is one blocked write
119 writer_->closeOnEmpty();
121 reader_->setReadCB(&readCallback_);
124 std::string expected;
125 for (uint32_t i = 0; i < writeAttempts; i++) {
128 EXPECT_EQ(readCallback_.getData(), expected);
129 EXPECT_FALSE(readCallback_.error_);
130 EXPECT_EQ(writeCallback_.writes_, writeAttempts);
131 EXPECT_FALSE(writeCallback_.error_);
134 TEST_F(AsyncPipeTest, writeOnClose) {
135 reader_->setReadCB(&readCallback_);
136 writer_->write(getBuf("hello"), &writeCallback_);
137 writer_->closeOnEmpty();
138 writer_->write(getBuf("hello"), &writeCallback_);
140 EXPECT_EQ(readCallback_.getData(), "hello");
141 EXPECT_FALSE(readCallback_.error_);
142 EXPECT_EQ(writeCallback_.writes_, 1);
143 EXPECT_TRUE(writeCallback_.error_);