d64a164d6c4706a96559a51ca7e8163e1de7a3d2
[folly.git] / folly / io / async / AsyncPipe.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/io/async/AsyncPipe.h>
17
18 #include <folly/FileUtil.h>
19 #include <folly/io/async/AsyncSocketException.h>
20
21 using std::string;
22 using std::unique_ptr;
23 using folly::IOBuf;
24 using folly::IOBufQueue;
25
26 namespace folly {
27
28 AsyncPipeReader::~AsyncPipeReader() {
29   close();
30 }
31
32 void AsyncPipeReader::failRead(const AsyncSocketException& ex) {
33   VLOG(5) << "AsyncPipeReader(this=" << this << ", fd=" << fd_ <<
34     "): failed while reading: " << ex.what();
35
36   DCHECK(readCallback_ != nullptr);
37   AsyncReader::ReadCallback* callback = readCallback_;
38   readCallback_ = nullptr;
39   callback->readErr(ex);
40   close();
41 }
42
43 void AsyncPipeReader::close() {
44   unregisterHandler();
45   if (fd_ >= 0) {
46     changeHandlerFD(-1);
47
48     if (closeCb_) {
49       closeCb_(fd_);
50     } else {
51       ::close(fd_);
52     }
53     fd_ = -1;
54   }
55 }
56
57 void AsyncPipeReader::handlerReady(uint16_t events) noexcept {
58   DestructorGuard dg(this);
59   CHECK(events & EventHandler::READ);
60
61   VLOG(5) << "AsyncPipeReader::handlerReady() this=" << this << ", fd=" << fd_;
62   assert(readCallback_ != nullptr);
63
64   while (readCallback_) {
65     // - What API does callback support?
66     const auto movable = readCallback_->isBufferMovable(); // noexcept
67
68     // Get the buffer to read into.
69     void* buf = nullptr;
70     size_t buflen = 0;
71     std::unique_ptr<IOBuf> ioBuf;
72
73     if (movable) {
74       ioBuf = IOBuf::create(readCallback_->maxBufferSize());
75       buf = ioBuf->writableBuffer();
76       buflen = ioBuf->capacity();
77     } else {
78       try {
79         readCallback_->getReadBuffer(&buf, &buflen);
80       } catch (const std::exception& ex) {
81         AsyncSocketException aex(
82             AsyncSocketException::BAD_ARGS,
83             string("ReadCallback::getReadBuffer() "
84                    "threw exception: ") +
85                 ex.what());
86         failRead(aex);
87         return;
88       } catch (...) {
89         AsyncSocketException aex(
90             AsyncSocketException::BAD_ARGS,
91             string("ReadCallback::getReadBuffer() "
92                    "threw non-exception type"));
93         failRead(aex);
94         return;
95       }
96       if (buf == nullptr || buflen == 0) {
97         AsyncSocketException aex(
98             AsyncSocketException::INVALID_STATE,
99             string("ReadCallback::getReadBuffer() "
100                    "returned empty buffer"));
101         failRead(aex);
102         return;
103       }
104     }
105
106     // Perform the read
107     ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen);
108
109     if (bytesRead > 0) {
110       if (movable) {
111         ioBuf->append(uint64_t(bytesRead));
112         readCallback_->readBufferAvailable(std::move(ioBuf));
113       } else {
114         readCallback_->readDataAvailable(size_t(bytesRead));
115       }
116       // Fall through and continue around the loop if the read
117       // completely filled the available buffer.
118       // Note that readCallback_ may have been uninstalled or changed inside
119       // readDataAvailable().
120       if (static_cast<size_t>(bytesRead) < buflen) {
121         return;
122       }
123     } else if (bytesRead < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
124       // No more data to read right now.
125       return;
126     } else if (bytesRead < 0) {
127       AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
128                               "read failed", errno);
129       failRead(ex);
130       return;
131     } else {
132       assert(bytesRead == 0);
133       // EOF
134
135       unregisterHandler();
136       AsyncReader::ReadCallback* callback = readCallback_;
137       readCallback_ = nullptr;
138       callback->readEOF();
139       return;
140     }
141     // Max reads per loop?
142   }
143 }
144
145
146 void AsyncPipeWriter::write(unique_ptr<folly::IOBuf> buf,
147                             AsyncWriter::WriteCallback* callback) {
148   if (closed()) {
149     if (callback) {
150       AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
151                               "attempt to write to closed pipe");
152       callback->writeErr(0, ex);
153     }
154     return;
155   }
156   bool wasEmpty = (queue_.empty());
157   folly::IOBufQueue iobq;
158   iobq.append(std::move(buf));
159   std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*> p(
160     std::move(iobq), callback);
161   queue_.emplace_back(std::move(p));
162   if (wasEmpty)  {
163     handleWrite();
164   } else {
165     CHECK(!queue_.empty());
166     CHECK(isHandlerRegistered());
167   }
168 }
169
170 void AsyncPipeWriter::writeChain(folly::AsyncWriter::WriteCallback* callback,
171                                  std::unique_ptr<folly::IOBuf>&& buf,
172                                  WriteFlags) {
173   write(std::move(buf), callback);
174 }
175
176 void AsyncPipeWriter::closeOnEmpty() {
177   VLOG(5) << "close on empty";
178   if (queue_.empty()) {
179     closeNow();
180   } else {
181     closeOnEmpty_ = true;
182     CHECK(isHandlerRegistered());
183   }
184 }
185
186 void AsyncPipeWriter::closeNow() {
187   VLOG(5) << "close now";
188   if (!queue_.empty()) {
189     failAllWrites(AsyncSocketException(AsyncSocketException::NOT_OPEN,
190                                        "closed with pending writes"));
191   }
192   if (fd_ >= 0) {
193     unregisterHandler();
194     changeHandlerFD(-1);
195     if (closeCb_) {
196       closeCb_(fd_);
197     } else {
198       close(fd_);
199     }
200     fd_ = -1;
201   }
202 }
203
204 void AsyncPipeWriter::failAllWrites(const AsyncSocketException& ex) {
205   DestructorGuard dg(this);
206   while (!queue_.empty()) {
207     // the first entry of the queue could have had a partial write, but needs to
208     // be tracked.
209     if (queue_.front().second) {
210       queue_.front().second->writeErr(0, ex);
211     }
212     queue_.pop_front();
213   }
214 }
215
216
217 void AsyncPipeWriter::handlerReady(uint16_t events) noexcept {
218   CHECK(events & EventHandler::WRITE);
219
220   handleWrite();
221 }
222
223 void AsyncPipeWriter::handleWrite() {
224   DestructorGuard dg(this);
225   assert(!queue_.empty());
226   do {
227     auto& front = queue_.front();
228     folly::IOBufQueue &curQueue = front.first;
229     DCHECK(!curQueue.empty());
230     // someday, support writev.  The logic for partial writes is a bit complex
231     const IOBuf* head = curQueue.front();
232     CHECK(head->length());
233     ssize_t rc = folly::writeNoInt(fd_, head->data(), head->length());
234     if (rc < 0) {
235       if (errno == EAGAIN || errno == EWOULDBLOCK) {
236         // pipe is full
237         VLOG(5) << "write blocked";
238         registerHandler(EventHandler::WRITE);
239         return;
240       } else {
241         failAllWrites(AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
242                                            "write failed", errno));
243         closeNow();
244         return;
245       }
246     } else if (rc == 0) {
247       registerHandler(EventHandler::WRITE);
248       return;
249     }
250     curQueue.trimStart(size_t(rc));
251     if (curQueue.empty()) {
252       auto cb = front.second;
253       queue_.pop_front();
254       if (cb) {
255         cb->writeSuccess();
256       }
257     } else {
258       VLOG(5) << "partial write blocked";
259     }
260   } while (!queue_.empty());
261
262   if (closeOnEmpty_) {
263     closeNow();
264   } else {
265     unregisterHandler();
266   }
267 }
268
269 } // folly