Open source AsyncPipe
[folly.git] / folly / io / async / test / AsyncPipeTest.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncPipe.h>
18 #include <folly/io/async/EventBase.h>
19 #include <folly/Memory.h>
20 #include <gtest/gtest.h>
21
22 #include <fcntl.h>
23
24 using namespace testing;
25
26 namespace {
27
28 class TestReadCallback : public folly::AsyncReader::ReadCallback {
29  public:
30   void readDataAvailable(size_t len) noexcept override {
31     readBuffer_.postallocate(len);
32   }
33
34   void getReadBuffer(void** bufReturn, size_t* lenReturn) noexcept override {
35     auto res = readBuffer_.preallocate(4000, 65000);
36     *bufReturn = res.first;
37     *lenReturn = res.second;
38   }
39
40   void readEOF() noexcept override {}
41
42   void readErr(const folly::AsyncSocketException&) noexcept override {
43     error_ = true;
44   }
45
46   std::string getData() {
47     auto buf = readBuffer_.move();
48     buf->coalesce();
49     return std::string((char *)buf->data(), buf->length());
50   }
51
52   folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()};
53   bool error_{false};
54 };
55
56 class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
57  public:
58   void writeSuccess() noexcept override { writes_++; }
59
60   void writeErr(size_t, const folly::AsyncSocketException&) noexcept override {
61     error_ = true;
62   }
63
64   uint32_t writes_{0};
65   bool error_{false};
66 };
67
68 class AsyncPipeTest: public Test {
69  public:
70   void SetUp() override {
71     int rc = pipe(pipeFds_);
72     EXPECT_EQ(rc, 0);
73
74     EXPECT_EQ(::fcntl(pipeFds_[0], F_SETFL, O_NONBLOCK), 0);
75     EXPECT_EQ(::fcntl(pipeFds_[1], F_SETFL, O_NONBLOCK), 0);
76     reader_ = folly::make_unique<folly::AsyncPipeReader,
77                                  folly::DelayedDestruction::Destructor>(
78       &eventBase_, pipeFds_[0]);
79     writer_ = folly::make_unique<folly::AsyncPipeWriter,
80                                  folly::DelayedDestruction::Destructor>(
81       &eventBase_, pipeFds_[1]);
82   }
83
84  protected:
85   folly::EventBase eventBase_;
86   int pipeFds_[2];
87   folly::AsyncPipeReader::UniquePtr reader_;
88   folly::AsyncPipeWriter::UniquePtr writer_;
89   TestReadCallback readCallback_;
90   TestWriteCallback writeCallback_;
91 };
92
93 std::unique_ptr<folly::IOBuf> getBuf(const std::string& data) {
94   auto buf = folly::IOBuf::copyBuffer(data.c_str(), data.length());
95   return buf;
96 }
97
98 } // anonymous namespace
99
100
101 TEST_F(AsyncPipeTest, simple) {
102   reader_->setReadCB(&readCallback_);
103   writer_->write(getBuf("hello"), &writeCallback_);
104   writer_->closeOnEmpty();
105   eventBase_.loop();
106   EXPECT_EQ(readCallback_.getData(), "hello");
107   EXPECT_FALSE(readCallback_.error_);
108   EXPECT_EQ(writeCallback_.writes_, 1);
109   EXPECT_FALSE(writeCallback_.error_);
110 }
111
112 TEST_F(AsyncPipeTest, blocked_writes) {
113   uint32_t writeAttempts = 0;
114   do {
115     ++writeAttempts;
116     writer_->write(getBuf("hello"), &writeCallback_);
117   } while (writeCallback_.writes_ == writeAttempts);
118   // there is one blocked write
119   writer_->closeOnEmpty();
120
121   reader_->setReadCB(&readCallback_);
122
123   eventBase_.loop();
124   std::string expected;
125   for (uint32_t i = 0; i < writeAttempts; i++) {
126     expected += "hello";
127   }
128   EXPECT_EQ(readCallback_.getData(), expected);
129   EXPECT_FALSE(readCallback_.error_);
130   EXPECT_EQ(writeCallback_.writes_, writeAttempts);
131   EXPECT_FALSE(writeCallback_.error_);
132 }
133
134 TEST_F(AsyncPipeTest, writeOnClose) {
135   reader_->setReadCB(&readCallback_);
136   writer_->write(getBuf("hello"), &writeCallback_);
137   writer_->closeOnEmpty();
138   writer_->write(getBuf("hello"), &writeCallback_);
139   eventBase_.loop();
140   EXPECT_EQ(readCallback_.getData(), "hello");
141   EXPECT_FALSE(readCallback_.error_);
142   EXPECT_EQ(writeCallback_.writes_, 1);
143   EXPECT_TRUE(writeCallback_.error_);
144 }