Open source AsyncPipe
[folly.git] / folly / io / async / AsyncPipe.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/io/async/AsyncPipe.h>
17
18 #include <folly/FileUtil.h>
19 #include <folly/io/async/AsyncSocketException.h>
20
21 using std::string;
22 using std::unique_ptr;
23 using folly::IOBuf;
24 using folly::IOBufQueue;
25
26 namespace folly {
27
28 AsyncPipeReader::~AsyncPipeReader() {
29   close();
30 }
31
32 void AsyncPipeReader::failRead(const AsyncSocketException& ex) {
33   VLOG(5) << "AsyncPipeReader(this=" << this << ", fd=" << fd_ <<
34     "): failed while reading: " << ex.what();
35
36   DCHECK(readCallback_ != nullptr);
37   AsyncReader::ReadCallback* callback = readCallback_;
38   readCallback_ = nullptr;
39   callback->readErr(ex);
40   close();
41 }
42
43 void AsyncPipeReader::close() {
44   unregisterHandler();
45   if (fd_ >= 0) {
46     changeHandlerFD(-1);
47
48     if (closeCb_) {
49       closeCb_(fd_);
50     } else {
51       ::close(fd_);
52     }
53     fd_ = -1;
54   }
55 }
56
57 void AsyncPipeReader::handlerReady(uint16_t events) noexcept {
58   DestructorGuard dg(this);
59   CHECK(events & EventHandler::READ);
60
61   VLOG(5) << "AsyncPipeReader::handlerReady() this=" << this << ", fd=" << fd_;
62   assert(readCallback_ != nullptr);
63
64   uint16_t numReads = 0;
65   while (readCallback_) {
66     // Get the buffer to read into.
67     void* buf = nullptr;
68     size_t buflen = 0;
69     try {
70       readCallback_->getReadBuffer(&buf, &buflen);
71     } catch (const std::exception& ex) {
72       AsyncSocketException aex(AsyncSocketException::BAD_ARGS,
73                                string("ReadCallback::getReadBuffer() "
74                                       "threw exception: ") + ex.what());
75       failRead(aex);
76       return;
77     } catch (...) {
78       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
79                               string("ReadCallback::getReadBuffer() "
80                                      "threw non-exception type"));
81       failRead(ex);
82       return;
83     }
84     if (buf == nullptr || buflen == 0) {
85       AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
86                               string("ReadCallback::getReadBuffer() "
87                                      "returned empty buffer"));
88       failRead(ex);
89       return;
90     }
91
92     // Perform the read
93     ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen);
94     if (bytesRead > 0) {
95       readCallback_->readDataAvailable(bytesRead);
96       // Fall through and continue around the loop if the read
97       // completely filled the available buffer.
98       // Note that readCallback_ may have been uninstalled or changed inside
99       // readDataAvailable().
100       if (static_cast<size_t>(bytesRead) < buflen) {
101         return;
102       }
103     } else if (bytesRead < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
104       // No more data to read right now.
105       return;
106     } else if (bytesRead < 0) {
107       AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
108                               "read failed", errno);
109       failRead(ex);
110       return;
111     } else {
112       assert(bytesRead == 0);
113       // EOF
114
115       unregisterHandler();
116       AsyncReader::ReadCallback* callback = readCallback_;
117       readCallback_ = nullptr;
118       callback->readEOF();
119       return;
120     }
121     // Max reads per loop?
122   }
123 }
124
125
126 void AsyncPipeWriter::write(unique_ptr<folly::IOBuf> buf,
127                             AsyncWriter::WriteCallback* callback) {
128   if (closed()) {
129     if (callback) {
130       AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
131                               "attempt to write to closed pipe");
132       callback->writeErr(0, ex);
133     }
134     return;
135   }
136   bool wasEmpty = (queue_.empty());
137   folly::IOBufQueue iobq;
138   iobq.append(std::move(buf));
139   std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*> p(
140     std::move(iobq), callback);
141   queue_.emplace_back(std::move(p));
142   if (wasEmpty)  {
143     handleWrite();
144   } else {
145     CHECK(!queue_.empty());
146     CHECK(isHandlerRegistered());
147   }
148 }
149
150 void AsyncPipeWriter::writeChain(folly::AsyncWriter::WriteCallback* callback,
151                                  std::unique_ptr<folly::IOBuf>&& buf,
152                                  WriteFlags) {
153   write(std::move(buf), callback);
154 }
155
156 void AsyncPipeWriter::closeOnEmpty() {
157   VLOG(5) << "close on empty";
158   if (queue_.empty()) {
159     closeNow();
160   } else {
161     closeOnEmpty_ = true;
162     CHECK(isHandlerRegistered());
163   }
164 }
165
166 void AsyncPipeWriter::closeNow() {
167   VLOG(5) << "close now";
168   if (!queue_.empty()) {
169     failAllWrites(AsyncSocketException(AsyncSocketException::NOT_OPEN,
170                                        "closed with pending writes"));
171   }
172   if (fd_ >= 0) {
173     unregisterHandler();
174     changeHandlerFD(-1);
175     if (closeCb_) {
176       closeCb_(fd_);
177     } else {
178       close(fd_);
179     }
180     fd_ = -1;
181   }
182 }
183
184 void AsyncPipeWriter::failAllWrites(const AsyncSocketException& ex) {
185   DestructorGuard dg(this);
186   while (!queue_.empty()) {
187     // the first entry of the queue could have had a partial write, but needs to
188     // be tracked.
189     if (queue_.front().second) {
190       queue_.front().second->writeErr(0, ex);
191     }
192     queue_.pop_front();
193   }
194 }
195
196
197 void AsyncPipeWriter::handlerReady(uint16_t events) noexcept {
198   CHECK(events & EventHandler::WRITE);
199
200   handleWrite();
201 }
202
203 void AsyncPipeWriter::handleWrite() {
204   DestructorGuard dg(this);
205   assert(!queue_.empty());
206   do {
207     auto& front = queue_.front();
208     folly::IOBufQueue &curQueue = front.first;
209     DCHECK(!curQueue.empty());
210     // someday, support writev.  The logic for partial writes is a bit complex
211     const IOBuf* head = curQueue.front();
212     CHECK(head->length());
213     ssize_t rc = folly::writeNoInt(fd_, head->data(), head->length());
214     if (rc < 0) {
215       if (errno == EAGAIN || errno == EWOULDBLOCK) {
216         // pipe is full
217         VLOG(5) << "write blocked";
218         registerHandler(EventHandler::WRITE);
219         return;
220       } else {
221         failAllWrites(AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
222                                            "write failed", errno));
223         closeNow();
224         return;
225       }
226     } else if (rc == 0) {
227       registerHandler(EventHandler::WRITE);
228       return;
229     }
230     curQueue.trimStart(rc);
231     if (curQueue.empty()) {
232       auto cb = front.second;
233       queue_.pop_front();
234       if (cb) {
235         cb->writeSuccess();
236       }
237     } else {
238       VLOG(5) << "partial write blocked";
239     }
240   } while (!queue_.empty());
241
242   if (closeOnEmpty_) {
243     closeNow();
244   } else {
245     unregisterHandler();
246   }
247 }
248
249 } // folly