Bump version to 52:0
[folly.git] / folly / wangle / channel / FileRegion.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/wangle/channel/FileRegion.h>
17
18 using namespace folly;
19 using namespace folly::wangle;
20
21 namespace {
22
23 struct FileRegionReadPool {};
24
25 Singleton<IOThreadPoolExecutor, FileRegionReadPool> readPool(
26   []{
27     return new IOThreadPoolExecutor(
28         sysconf(_SC_NPROCESSORS_ONLN),
29         std::make_shared<NamedThreadFactory>("FileRegionReadPool"));
30   });
31
32 }
33
34 namespace folly { namespace wangle {
35
36 FileRegion::FileWriteRequest::FileWriteRequest(AsyncSocket* socket,
37     WriteCallback* callback, int fd, off_t offset, size_t count)
38   : WriteRequest(socket, callback),
39     readFd_(fd), offset_(offset), count_(count) {
40 }
41
42 void FileRegion::FileWriteRequest::destroy() {
43   readBase_->runInEventBaseThread([this]{
44     delete this;
45   });
46 }
47
48 bool FileRegion::FileWriteRequest::performWrite() {
49   if (!started_) {
50     start();
51     return true;
52   }
53
54   int flags = SPLICE_F_NONBLOCK | SPLICE_F_MORE;
55   ssize_t spliced = ::splice(pipe_out_, nullptr,
56                              socket_->getFd(), nullptr,
57                              bytesInPipe_, flags);
58   if (spliced == -1) {
59     if (errno == EAGAIN) {
60       return true;
61     }
62     return false;
63   }
64
65   bytesInPipe_ -= spliced;
66   bytesWritten(spliced);
67   return true;
68 }
69
70 void FileRegion::FileWriteRequest::consume() {
71   // do nothing
72 }
73
74 bool FileRegion::FileWriteRequest::isComplete() {
75   return totalBytesWritten_ == count_;
76 }
77
78 void FileRegion::FileWriteRequest::messageAvailable(size_t&& count) {
79   bool shouldWrite = bytesInPipe_ == 0;
80   bytesInPipe_ += count;
81   if (shouldWrite) {
82     socket_->writeRequestReady();
83   }
84 }
85
86 #ifdef __GLIBC__
87 # if (__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 9))
88 #   define GLIBC_AT_LEAST_2_9 1
89 #  endif
90 #endif
91
92 void FileRegion::FileWriteRequest::start() {
93   started_ = true;
94   readBase_ = readPool.get()->getEventBase();
95   readBase_->runInEventBaseThread([this]{
96     auto flags = fcntl(readFd_, F_GETFL);
97     if (flags == -1) {
98       fail(__func__, AsyncSocketException(
99           AsyncSocketException::INTERNAL_ERROR,
100           "fcntl F_GETFL failed", errno));
101       return;
102     }
103
104     flags &= O_ACCMODE;
105     if (flags == O_WRONLY) {
106       fail(__func__, AsyncSocketException(
107           AsyncSocketException::BAD_ARGS, "file not open for reading"));
108       return;
109     }
110
111 #ifndef GLIBC_AT_LEAST_2_9
112     fail(__func__, AsyncSocketException(
113         AsyncSocketException::NOT_SUPPORTED,
114         "writeFile unsupported on glibc < 2.9"));
115     return;
116 #else
117     int pipeFds[2];
118     if (::pipe2(pipeFds, O_NONBLOCK) == -1) {
119       fail(__func__, AsyncSocketException(
120           AsyncSocketException::INTERNAL_ERROR,
121           "pipe2 failed", errno));
122       return;
123     }
124
125 #ifdef F_SETPIPE_SZ
126     // Max size for unprevileged processes as set in /proc/sys/fs/pipe-max-size
127     // Ignore failures and just roll with it
128     // TODO maybe read max size from /proc?
129     fcntl(pipeFds[0], F_SETPIPE_SZ, 1048576);
130     fcntl(pipeFds[1], F_SETPIPE_SZ, 1048576);
131 #endif
132
133     pipe_out_ = pipeFds[0];
134
135     socket_->getEventBase()->runInEventBaseThreadAndWait([&]{
136       startConsuming(socket_->getEventBase(), &queue_);
137     });
138     readHandler_ = folly::make_unique<FileReadHandler>(
139         this, pipeFds[1], count_);
140 #endif
141   });
142 }
143
144 FileRegion::FileWriteRequest::~FileWriteRequest() {
145   CHECK(readBase_->isInEventBaseThread());
146   socket_->getEventBase()->runInEventBaseThreadAndWait([&]{
147     stopConsuming();
148     if (pipe_out_ > -1) {
149       ::close(pipe_out_);
150     }
151   });
152
153 }
154
155 void FileRegion::FileWriteRequest::fail(
156     const char* fn,
157     const AsyncSocketException& ex) {
158   socket_->getEventBase()->runInEventBaseThread([=]{
159     WriteRequest::fail(fn, ex);
160   });
161 }
162
163 FileRegion::FileWriteRequest::FileReadHandler::FileReadHandler(
164     FileWriteRequest* req, int pipe_in, size_t bytesToRead)
165   : req_(req), pipe_in_(pipe_in), bytesToRead_(bytesToRead) {
166   CHECK(req_->readBase_->isInEventBaseThread());
167   initHandler(req_->readBase_, pipe_in);
168   if (!registerHandler(EventFlags::WRITE | EventFlags::PERSIST)) {
169     req_->fail(__func__, AsyncSocketException(
170         AsyncSocketException::INTERNAL_ERROR,
171         "registerHandler failed"));
172   }
173 }
174
175 FileRegion::FileWriteRequest::FileReadHandler::~FileReadHandler() {
176   CHECK(req_->readBase_->isInEventBaseThread());
177   unregisterHandler();
178   ::close(pipe_in_);
179 }
180
181 void FileRegion::FileWriteRequest::FileReadHandler::handlerReady(
182     uint16_t events) noexcept {
183   CHECK(events & EventHandler::WRITE);
184   if (bytesToRead_ == 0) {
185     unregisterHandler();
186     return;
187   }
188
189   int flags = SPLICE_F_NONBLOCK | SPLICE_F_MORE;
190   ssize_t spliced = ::splice(req_->readFd_, &req_->offset_,
191                              pipe_in_, nullptr,
192                              bytesToRead_, flags);
193   if (spliced == -1) {
194     if (errno == EAGAIN) {
195       return;
196     } else {
197       req_->fail(__func__, AsyncSocketException(
198           AsyncSocketException::INTERNAL_ERROR,
199           "splice failed", errno));
200       return;
201     }
202   }
203
204   if (spliced > 0) {
205     bytesToRead_ -= spliced;
206     try {
207       req_->queue_.putMessage(static_cast<size_t>(spliced));
208     } catch (...) {
209       req_->fail(__func__, AsyncSocketException(
210           AsyncSocketException::INTERNAL_ERROR,
211           "putMessage failed"));
212       return;
213     }
214   }
215 }
216 }} // folly::wangle