e38cc40996eca0ddccb9bc68ff9191110e161611
[folly.git] / folly / io / async / AsyncPipe.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/io/async/AsyncPipe.h>
17
18 #include <folly/FileUtil.h>
19 #include <folly/io/async/AsyncSocketException.h>
20
21 using std::string;
22 using std::unique_ptr;
23 using folly::IOBuf;
24 using folly::IOBufQueue;
25
26 namespace folly {
27
28 AsyncPipeReader::~AsyncPipeReader() {
29   close();
30 }
31
32 void AsyncPipeReader::failRead(const AsyncSocketException& ex) {
33   VLOG(5) << "AsyncPipeReader(this=" << this << ", fd=" << fd_ <<
34     "): failed while reading: " << ex.what();
35
36   DCHECK(readCallback_ != nullptr);
37   AsyncReader::ReadCallback* callback = readCallback_;
38   readCallback_ = nullptr;
39   callback->readErr(ex);
40   close();
41 }
42
43 void AsyncPipeReader::close() {
44   unregisterHandler();
45   if (fd_ >= 0) {
46     changeHandlerFD(-1);
47
48     if (closeCb_) {
49       closeCb_(fd_);
50     } else {
51       ::close(fd_);
52     }
53     fd_ = -1;
54   }
55 }
56
57 void AsyncPipeReader::handlerReady(uint16_t events) noexcept {
58   DestructorGuard dg(this);
59   CHECK(events & EventHandler::READ);
60
61   VLOG(5) << "AsyncPipeReader::handlerReady() this=" << this << ", fd=" << fd_;
62   assert(readCallback_ != nullptr);
63
64   while (readCallback_) {
65     // Get the buffer to read into.
66     void* buf = nullptr;
67     size_t buflen = 0;
68     try {
69       readCallback_->getReadBuffer(&buf, &buflen);
70     } catch (const std::exception& ex) {
71       AsyncSocketException aex(AsyncSocketException::BAD_ARGS,
72                                string("ReadCallback::getReadBuffer() "
73                                       "threw exception: ") + ex.what());
74       failRead(aex);
75       return;
76     } catch (...) {
77       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
78                               string("ReadCallback::getReadBuffer() "
79                                      "threw non-exception type"));
80       failRead(ex);
81       return;
82     }
83     if (buf == nullptr || buflen == 0) {
84       AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
85                               string("ReadCallback::getReadBuffer() "
86                                      "returned empty buffer"));
87       failRead(ex);
88       return;
89     }
90
91     // Perform the read
92     ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen);
93     if (bytesRead > 0) {
94       readCallback_->readDataAvailable(bytesRead);
95       // Fall through and continue around the loop if the read
96       // completely filled the available buffer.
97       // Note that readCallback_ may have been uninstalled or changed inside
98       // readDataAvailable().
99       if (static_cast<size_t>(bytesRead) < buflen) {
100         return;
101       }
102     } else if (bytesRead < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
103       // No more data to read right now.
104       return;
105     } else if (bytesRead < 0) {
106       AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
107                               "read failed", errno);
108       failRead(ex);
109       return;
110     } else {
111       assert(bytesRead == 0);
112       // EOF
113
114       unregisterHandler();
115       AsyncReader::ReadCallback* callback = readCallback_;
116       readCallback_ = nullptr;
117       callback->readEOF();
118       return;
119     }
120     // Max reads per loop?
121   }
122 }
123
124
125 void AsyncPipeWriter::write(unique_ptr<folly::IOBuf> buf,
126                             AsyncWriter::WriteCallback* callback) {
127   if (closed()) {
128     if (callback) {
129       AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
130                               "attempt to write to closed pipe");
131       callback->writeErr(0, ex);
132     }
133     return;
134   }
135   bool wasEmpty = (queue_.empty());
136   folly::IOBufQueue iobq;
137   iobq.append(std::move(buf));
138   std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*> p(
139     std::move(iobq), callback);
140   queue_.emplace_back(std::move(p));
141   if (wasEmpty)  {
142     handleWrite();
143   } else {
144     CHECK(!queue_.empty());
145     CHECK(isHandlerRegistered());
146   }
147 }
148
149 void AsyncPipeWriter::writeChain(folly::AsyncWriter::WriteCallback* callback,
150                                  std::unique_ptr<folly::IOBuf>&& buf,
151                                  WriteFlags) {
152   write(std::move(buf), callback);
153 }
154
155 void AsyncPipeWriter::closeOnEmpty() {
156   VLOG(5) << "close on empty";
157   if (queue_.empty()) {
158     closeNow();
159   } else {
160     closeOnEmpty_ = true;
161     CHECK(isHandlerRegistered());
162   }
163 }
164
165 void AsyncPipeWriter::closeNow() {
166   VLOG(5) << "close now";
167   if (!queue_.empty()) {
168     failAllWrites(AsyncSocketException(AsyncSocketException::NOT_OPEN,
169                                        "closed with pending writes"));
170   }
171   if (fd_ >= 0) {
172     unregisterHandler();
173     changeHandlerFD(-1);
174     if (closeCb_) {
175       closeCb_(fd_);
176     } else {
177       close(fd_);
178     }
179     fd_ = -1;
180   }
181 }
182
183 void AsyncPipeWriter::failAllWrites(const AsyncSocketException& ex) {
184   DestructorGuard dg(this);
185   while (!queue_.empty()) {
186     // the first entry of the queue could have had a partial write, but needs to
187     // be tracked.
188     if (queue_.front().second) {
189       queue_.front().second->writeErr(0, ex);
190     }
191     queue_.pop_front();
192   }
193 }
194
195
196 void AsyncPipeWriter::handlerReady(uint16_t events) noexcept {
197   CHECK(events & EventHandler::WRITE);
198
199   handleWrite();
200 }
201
202 void AsyncPipeWriter::handleWrite() {
203   DestructorGuard dg(this);
204   assert(!queue_.empty());
205   do {
206     auto& front = queue_.front();
207     folly::IOBufQueue &curQueue = front.first;
208     DCHECK(!curQueue.empty());
209     // someday, support writev.  The logic for partial writes is a bit complex
210     const IOBuf* head = curQueue.front();
211     CHECK(head->length());
212     ssize_t rc = folly::writeNoInt(fd_, head->data(), head->length());
213     if (rc < 0) {
214       if (errno == EAGAIN || errno == EWOULDBLOCK) {
215         // pipe is full
216         VLOG(5) << "write blocked";
217         registerHandler(EventHandler::WRITE);
218         return;
219       } else {
220         failAllWrites(AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
221                                            "write failed", errno));
222         closeNow();
223         return;
224       }
225     } else if (rc == 0) {
226       registerHandler(EventHandler::WRITE);
227       return;
228     }
229     curQueue.trimStart(rc);
230     if (curQueue.empty()) {
231       auto cb = front.second;
232       queue_.pop_front();
233       if (cb) {
234         cb->writeSuccess();
235       }
236     } else {
237       VLOG(5) << "partial write blocked";
238     }
239   } while (!queue_.empty());
240
241   if (closeOnEmpty_) {
242     closeNow();
243   } else {
244     unregisterHandler();
245   }
246 }
247
248 } // folly