AsyncSocket::writeRequest() and its first user wangle::FileRegion
[folly.git] / folly / wangle / channel / FileRegion.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/wangle/channel/FileRegion.h>
17
18 using namespace folly;
19 using namespace folly::wangle;
20
21 namespace {
22
23 struct FileRegionReadPool {};
24
25 Singleton<IOThreadPoolExecutor, FileRegionReadPool> readPool(
26   []{
27     return new IOThreadPoolExecutor(
28         sysconf(_SC_NPROCESSORS_ONLN),
29         std::make_shared<NamedThreadFactory>("FileRegionReadPool"));
30   });
31
32 }
33
34 namespace folly { namespace wangle {
35
36 FileRegion::FileWriteRequest::FileWriteRequest(AsyncSocket* socket,
37     WriteCallback* callback, int fd, off_t offset, size_t count)
38   : WriteRequest(socket, callback),
39     readFd_(fd), offset_(offset), count_(count) {
40 }
41
42 void FileRegion::FileWriteRequest::destroy() {
43   readBase_->runInEventBaseThread([this]{
44     delete this;
45   });
46 }
47
48 bool FileRegion::FileWriteRequest::performWrite() {
49   if (!started_) {
50     start();
51     return true;
52   }
53
54   int flags = SPLICE_F_NONBLOCK | SPLICE_F_MORE;
55   ssize_t spliced = ::splice(pipe_out_, nullptr,
56                              socket_->getFd(), nullptr,
57                              bytesInPipe_, flags);
58   if (spliced == -1) {
59     if (errno == EAGAIN) {
60       return true;
61     }
62     return false;
63   }
64
65   bytesInPipe_ -= spliced;
66   bytesWritten(spliced);
67   return true;
68 }
69
70 void FileRegion::FileWriteRequest::consume() {
71   // do nothing
72 }
73
74 bool FileRegion::FileWriteRequest::isComplete() {
75   return totalBytesWritten_ == count_;
76 }
77
78 void FileRegion::FileWriteRequest::messageAvailable(size_t&& count) {
79   bool shouldWrite = bytesInPipe_ == 0;
80   bytesInPipe_ += count;
81   if (shouldWrite) {
82     socket_->writeRequestReady();
83   }
84 }
85
86 #ifdef __GLIBC__
87 # if (__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 9))
88 #   define GLIBC_AT_LEAST_2_9 1
89 #  endif
90 #endif
91
92 void FileRegion::FileWriteRequest::start() {
93   started_ = true;
94   readBase_ = readPool.get()->getEventBase();
95   readBase_->runInEventBaseThread([this]{
96     auto flags = fcntl(readFd_, F_GETFL);
97     if (flags == -1) {
98       fail(__func__, AsyncSocketException(
99           AsyncSocketException::INTERNAL_ERROR,
100           "fcntl F_GETFL failed", errno));
101       return;
102     }
103
104     flags &= O_ACCMODE;
105     if (flags == O_WRONLY) {
106       fail(__func__, AsyncSocketException(
107           AsyncSocketException::BAD_ARGS, "file not open for reading"));
108       return;
109     }
110
111 #ifndef GLIBC_AT_LEAST_2_9
112     fail(__func__, AsyncSocketException(
113         AsyncSocketException::NOT_SUPPORTED,
114         "writeFile unsupported on glibc < 2.9"));
115     return;
116 #else
117     int pipeFds[2];
118     if (::pipe2(pipeFds, O_NONBLOCK) == -1) {
119       fail(__func__, AsyncSocketException(
120           AsyncSocketException::INTERNAL_ERROR,
121           "pipe2 failed", errno));
122       return;
123     }
124
125     // Max size for unprevileged processes as set in /proc/sys/fs/pipe-max-size
126     // Ignore failures and just roll with it
127     // TODO maybe read max size from /proc?
128     fcntl(pipeFds[0], F_SETPIPE_SZ, 1048576);
129     fcntl(pipeFds[1], F_SETPIPE_SZ, 1048576);
130
131     pipe_out_ = pipeFds[0];
132
133     socket_->getEventBase()->runInEventBaseThreadAndWait([&]{
134       startConsuming(socket_->getEventBase(), &queue_);
135     });
136     readHandler_ = folly::make_unique<FileReadHandler>(
137         this, pipeFds[1], count_);
138 #endif
139   });
140 }
141
142 FileRegion::FileWriteRequest::~FileWriteRequest() {
143   CHECK(readBase_->isInEventBaseThread());
144   socket_->getEventBase()->runInEventBaseThreadAndWait([&]{
145     stopConsuming();
146     if (pipe_out_ > -1) {
147       ::close(pipe_out_);
148     }
149   });
150
151 }
152
153 void FileRegion::FileWriteRequest::fail(
154     const char* fn,
155     const AsyncSocketException& ex) {
156   socket_->getEventBase()->runInEventBaseThread([=]{
157     WriteRequest::fail(fn, ex);
158   });
159 }
160
161 FileRegion::FileWriteRequest::FileReadHandler::FileReadHandler(
162     FileWriteRequest* req, int pipe_in, size_t bytesToRead)
163   : req_(req), pipe_in_(pipe_in), bytesToRead_(bytesToRead) {
164   CHECK(req_->readBase_->isInEventBaseThread());
165   initHandler(req_->readBase_, pipe_in);
166   if (!registerHandler(EventFlags::WRITE | EventFlags::PERSIST)) {
167     req_->fail(__func__, AsyncSocketException(
168         AsyncSocketException::INTERNAL_ERROR,
169         "registerHandler failed"));
170   }
171 }
172
173 FileRegion::FileWriteRequest::FileReadHandler::~FileReadHandler() {
174   CHECK(req_->readBase_->isInEventBaseThread());
175   unregisterHandler();
176   ::close(pipe_in_);
177 }
178
179 void FileRegion::FileWriteRequest::FileReadHandler::handlerReady(
180     uint16_t events) noexcept {
181   CHECK(events & EventHandler::WRITE);
182   if (bytesToRead_ == 0) {
183     unregisterHandler();
184     return;
185   }
186
187   int flags = SPLICE_F_NONBLOCK | SPLICE_F_MORE;
188   ssize_t spliced = ::splice(req_->readFd_, &req_->offset_,
189                              pipe_in_, nullptr,
190                              bytesToRead_, flags);
191   if (spliced == -1) {
192     if (errno == EAGAIN) {
193       return;
194     } else {
195       req_->fail(__func__, AsyncSocketException(
196           AsyncSocketException::INTERNAL_ERROR,
197           "splice failed", errno));
198       return;
199     }
200   }
201
202   if (spliced > 0) {
203     bytesToRead_ -= spliced;
204     try {
205       req_->queue_.putMessage(static_cast<size_t>(spliced));
206     } catch (...) {
207       req_->fail(__func__, AsyncSocketException(
208           AsyncSocketException::INTERNAL_ERROR,
209           "putMessage failed"));
210       return;
211     }
212   }
213 }
214 }} // folly::wangle