2a9c426f7cbb5eaf964c49a654fa21663c9d5d7e
[folly.git] / folly / io / async / test / AsyncPipeTest.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncPipe.h>
18 #include <folly/io/async/EventBase.h>
19 #include <folly/Memory.h>
20 #include <gtest/gtest.h>
21
22 #include <fcntl.h>
23
24 using namespace testing;
25
26 namespace {
27
28 class TestReadCallback : public folly::AsyncReader::ReadCallback {
29  public:
30   bool isBufferMovable() noexcept override {
31     return movable_;
32   }
33   void setMovable(bool movable) {
34     movable_ = movable;
35   }
36
37   void readBufferAvailable(
38       std::unique_ptr<folly::IOBuf> readBuf) noexcept override {
39     readBuffer_.append(std::move(readBuf));
40   }
41
42   void readDataAvailable(size_t len) noexcept override {
43     readBuffer_.postallocate(len);
44   }
45
46   void getReadBuffer(void** bufReturn, size_t* lenReturn) noexcept override {
47     auto res = readBuffer_.preallocate(4000, 65000);
48     *bufReturn = res.first;
49     *lenReturn = res.second;
50   }
51
52   void readEOF() noexcept override {}
53
54   void readErr(const folly::AsyncSocketException&) noexcept override {
55     error_ = true;
56   }
57
58   std::string getData() {
59     auto buf = readBuffer_.move();
60     buf->coalesce();
61     return std::string((char *)buf->data(), buf->length());
62   }
63
64   void reset() {
65     movable_ = false;
66     error_ = false;
67     readBuffer_.clear();
68   }
69
70   folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()};
71   bool error_{false};
72   bool movable_{false};
73 };
74
75 class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
76  public:
77   void writeSuccess() noexcept override { writes_++; }
78
79   void writeErr(size_t, const folly::AsyncSocketException&) noexcept override {
80     error_ = true;
81   }
82
83   void reset() {
84     writes_ = 0;
85     error_ = false;
86   }
87
88   uint32_t writes_{0};
89   bool error_{false};
90 };
91
92 class AsyncPipeTest: public Test {
93  public:
94   void reset(bool movable) {
95     reader_.reset();
96     readCallback_.reset();
97     writer_.reset();
98     writeCallback_.reset();
99
100     int rc = pipe(pipeFds_);
101     EXPECT_EQ(rc, 0);
102
103     EXPECT_EQ(::fcntl(pipeFds_[0], F_SETFL, O_NONBLOCK), 0);
104     EXPECT_EQ(::fcntl(pipeFds_[1], F_SETFL, O_NONBLOCK), 0);
105     reader_ = folly::AsyncPipeReader::newReader(
106       &eventBase_, pipeFds_[0]);
107     writer_ = folly::AsyncPipeWriter::newWriter(
108       &eventBase_, pipeFds_[1]);
109
110     readCallback_.setMovable(movable);
111   }
112
113  protected:
114   folly::EventBase eventBase_;
115   int pipeFds_[2];
116   folly::AsyncPipeReader::UniquePtr reader_;
117   folly::AsyncPipeWriter::UniquePtr writer_;
118   TestReadCallback readCallback_;
119   TestWriteCallback writeCallback_;
120 };
121
122 std::unique_ptr<folly::IOBuf> getBuf(const std::string& data) {
123   auto buf = folly::IOBuf::copyBuffer(data.c_str(), data.length());
124   return buf;
125 }
126
127 } // anonymous namespace
128
129
130 TEST_F(AsyncPipeTest, simple) {
131   for (int pass = 0; pass < 2; ++pass) {
132     reset(pass % 2 != 0);
133     reader_->setReadCB(&readCallback_);
134     writer_->write(getBuf("hello"), &writeCallback_);
135     writer_->closeOnEmpty();
136     eventBase_.loop();
137     EXPECT_EQ(readCallback_.getData(), "hello");
138     EXPECT_FALSE(readCallback_.error_);
139     EXPECT_EQ(writeCallback_.writes_, 1);
140     EXPECT_FALSE(writeCallback_.error_);
141   }
142 }
143
144 TEST_F(AsyncPipeTest, blocked_writes) {
145   for (int pass = 0; pass < 2; ++pass) {
146     reset(pass % 2 != 0);
147     uint32_t writeAttempts = 0;
148     do {
149       ++writeAttempts;
150       writer_->write(getBuf("hello"), &writeCallback_);
151     } while (writeCallback_.writes_ == writeAttempts);
152     // there is one blocked write
153     writer_->closeOnEmpty();
154
155     reader_->setReadCB(&readCallback_);
156
157     eventBase_.loop();
158     std::string expected;
159     for (uint32_t i = 0; i < writeAttempts; i++) {
160       expected += "hello";
161     }
162     EXPECT_EQ(readCallback_.getData(), expected);
163     EXPECT_FALSE(readCallback_.error_);
164     EXPECT_EQ(writeCallback_.writes_, writeAttempts);
165     EXPECT_FALSE(writeCallback_.error_);
166   }
167 }
168
169 TEST_F(AsyncPipeTest, writeOnClose) {
170   for (int pass = 0; pass < 2; ++pass) {
171     reset(pass % 2 != 0);
172     reader_->setReadCB(&readCallback_);
173     writer_->write(getBuf("hello"), &writeCallback_);
174     writer_->closeOnEmpty();
175     writer_->write(getBuf("hello"), &writeCallback_);
176     eventBase_.loop();
177     EXPECT_EQ(readCallback_.getData(), "hello");
178     EXPECT_FALSE(readCallback_.error_);
179     EXPECT_EQ(writeCallback_.writes_, 1);
180     EXPECT_TRUE(writeCallback_.error_);
181   }
182 }