Add a buffer callback to AsyncSocket
[folly.git] / folly / io / async / AsyncPipe.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/io/async/AsyncPipe.h>
17
18 #include <folly/FileUtil.h>
19 #include <folly/io/async/AsyncSocketException.h>
20
21 using std::string;
22 using std::unique_ptr;
23 using folly::IOBuf;
24 using folly::IOBufQueue;
25
26 namespace folly {
27
28 AsyncPipeReader::~AsyncPipeReader() {
29   close();
30 }
31
32 void AsyncPipeReader::failRead(const AsyncSocketException& ex) {
33   VLOG(5) << "AsyncPipeReader(this=" << this << ", fd=" << fd_ <<
34     "): failed while reading: " << ex.what();
35
36   DCHECK(readCallback_ != nullptr);
37   AsyncReader::ReadCallback* callback = readCallback_;
38   readCallback_ = nullptr;
39   callback->readErr(ex);
40   close();
41 }
42
43 void AsyncPipeReader::close() {
44   unregisterHandler();
45   if (fd_ >= 0) {
46     changeHandlerFD(-1);
47
48     if (closeCb_) {
49       closeCb_(fd_);
50     } else {
51       ::close(fd_);
52     }
53     fd_ = -1;
54   }
55 }
56
57 void AsyncPipeReader::handlerReady(uint16_t events) noexcept {
58   DestructorGuard dg(this);
59   CHECK(events & EventHandler::READ);
60
61   VLOG(5) << "AsyncPipeReader::handlerReady() this=" << this << ", fd=" << fd_;
62   assert(readCallback_ != nullptr);
63
64   while (readCallback_) {
65     // Get the buffer to read into.
66     void* buf = nullptr;
67     size_t buflen = 0;
68     try {
69       readCallback_->getReadBuffer(&buf, &buflen);
70     } catch (const std::exception& ex) {
71       AsyncSocketException aex(AsyncSocketException::BAD_ARGS,
72                                string("ReadCallback::getReadBuffer() "
73                                       "threw exception: ") + ex.what());
74       failRead(aex);
75       return;
76     } catch (...) {
77       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
78                               string("ReadCallback::getReadBuffer() "
79                                      "threw non-exception type"));
80       failRead(ex);
81       return;
82     }
83     if (buf == nullptr || buflen == 0) {
84       AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
85                               string("ReadCallback::getReadBuffer() "
86                                      "returned empty buffer"));
87       failRead(ex);
88       return;
89     }
90
91     // Perform the read
92     ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen);
93     if (bytesRead > 0) {
94       readCallback_->readDataAvailable(bytesRead);
95       // Fall through and continue around the loop if the read
96       // completely filled the available buffer.
97       // Note that readCallback_ may have been uninstalled or changed inside
98       // readDataAvailable().
99       if (static_cast<size_t>(bytesRead) < buflen) {
100         return;
101       }
102     } else if (bytesRead < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
103       // No more data to read right now.
104       return;
105     } else if (bytesRead < 0) {
106       AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
107                               "read failed", errno);
108       failRead(ex);
109       return;
110     } else {
111       assert(bytesRead == 0);
112       // EOF
113
114       unregisterHandler();
115       AsyncReader::ReadCallback* callback = readCallback_;
116       readCallback_ = nullptr;
117       callback->readEOF();
118       return;
119     }
120     // Max reads per loop?
121   }
122 }
123
124
125 void AsyncPipeWriter::write(unique_ptr<folly::IOBuf> buf,
126                             AsyncWriter::WriteCallback* callback) {
127   if (closed()) {
128     if (callback) {
129       AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
130                               "attempt to write to closed pipe");
131       callback->writeErr(0, ex);
132     }
133     return;
134   }
135   bool wasEmpty = (queue_.empty());
136   folly::IOBufQueue iobq;
137   iobq.append(std::move(buf));
138   std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*> p(
139     std::move(iobq), callback);
140   queue_.emplace_back(std::move(p));
141   if (wasEmpty)  {
142     handleWrite();
143   } else {
144     CHECK(!queue_.empty());
145     CHECK(isHandlerRegistered());
146   }
147 }
148
149 void AsyncPipeWriter::writeChain(folly::AsyncWriter::WriteCallback* callback,
150                                  std::unique_ptr<folly::IOBuf>&& buf,
151                                  WriteFlags,
152                                  BufferCallback*) {
153   write(std::move(buf), callback);
154 }
155
156 void AsyncPipeWriter::closeOnEmpty() {
157   VLOG(5) << "close on empty";
158   if (queue_.empty()) {
159     closeNow();
160   } else {
161     closeOnEmpty_ = true;
162     CHECK(isHandlerRegistered());
163   }
164 }
165
166 void AsyncPipeWriter::closeNow() {
167   VLOG(5) << "close now";
168   if (!queue_.empty()) {
169     failAllWrites(AsyncSocketException(AsyncSocketException::NOT_OPEN,
170                                        "closed with pending writes"));
171   }
172   if (fd_ >= 0) {
173     unregisterHandler();
174     changeHandlerFD(-1);
175     if (closeCb_) {
176       closeCb_(fd_);
177     } else {
178       close(fd_);
179     }
180     fd_ = -1;
181   }
182 }
183
184 void AsyncPipeWriter::failAllWrites(const AsyncSocketException& ex) {
185   DestructorGuard dg(this);
186   while (!queue_.empty()) {
187     // the first entry of the queue could have had a partial write, but needs to
188     // be tracked.
189     if (queue_.front().second) {
190       queue_.front().second->writeErr(0, ex);
191     }
192     queue_.pop_front();
193   }
194 }
195
196
197 void AsyncPipeWriter::handlerReady(uint16_t events) noexcept {
198   CHECK(events & EventHandler::WRITE);
199
200   handleWrite();
201 }
202
203 void AsyncPipeWriter::handleWrite() {
204   DestructorGuard dg(this);
205   assert(!queue_.empty());
206   do {
207     auto& front = queue_.front();
208     folly::IOBufQueue &curQueue = front.first;
209     DCHECK(!curQueue.empty());
210     // someday, support writev.  The logic for partial writes is a bit complex
211     const IOBuf* head = curQueue.front();
212     CHECK(head->length());
213     ssize_t rc = folly::writeNoInt(fd_, head->data(), head->length());
214     if (rc < 0) {
215       if (errno == EAGAIN || errno == EWOULDBLOCK) {
216         // pipe is full
217         VLOG(5) << "write blocked";
218         registerHandler(EventHandler::WRITE);
219         return;
220       } else {
221         failAllWrites(AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
222                                            "write failed", errno));
223         closeNow();
224         return;
225       }
226     } else if (rc == 0) {
227       registerHandler(EventHandler::WRITE);
228       return;
229     }
230     curQueue.trimStart(rc);
231     if (curQueue.empty()) {
232       auto cb = front.second;
233       queue_.pop_front();
234       if (cb) {
235         cb->writeSuccess();
236       }
237     } else {
238       VLOG(5) << "partial write blocked";
239     }
240   } while (!queue_.empty());
241
242   if (closeOnEmpty_) {
243     closeNow();
244   } else {
245     unregisterHandler();
246   }
247 }
248
249 } // folly