2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/wangle/channel/FileRegion.h>
18 using namespace folly;
19 using namespace folly::wangle;
23 struct FileRegionReadPool {};
25 Singleton<IOThreadPoolExecutor, FileRegionReadPool> readPool(
27 return new IOThreadPoolExecutor(
28 sysconf(_SC_NPROCESSORS_ONLN),
29 std::make_shared<NamedThreadFactory>("FileRegionReadPool"));
34 namespace folly { namespace wangle {
36 FileRegion::FileWriteRequest::FileWriteRequest(AsyncSocket* socket,
37 WriteCallback* callback, int fd, off_t offset, size_t count)
38 : WriteRequest(socket, callback),
39 readFd_(fd), offset_(offset), count_(count) {
42 void FileRegion::FileWriteRequest::destroy() {
43 readBase_->runInEventBaseThread([this]{
48 bool FileRegion::FileWriteRequest::performWrite() {
54 int flags = SPLICE_F_NONBLOCK | SPLICE_F_MORE;
55 ssize_t spliced = ::splice(pipe_out_, nullptr,
56 socket_->getFd(), nullptr,
59 if (errno == EAGAIN) {
65 bytesInPipe_ -= spliced;
66 bytesWritten(spliced);
70 void FileRegion::FileWriteRequest::consume() {
74 bool FileRegion::FileWriteRequest::isComplete() {
75 return totalBytesWritten_ == count_;
78 void FileRegion::FileWriteRequest::messageAvailable(size_t&& count) {
79 bool shouldWrite = bytesInPipe_ == 0;
80 bytesInPipe_ += count;
82 socket_->writeRequestReady();
87 # if (__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 9))
88 # define GLIBC_AT_LEAST_2_9 1
92 void FileRegion::FileWriteRequest::start() {
94 readBase_ = readPool.get()->getEventBase();
95 readBase_->runInEventBaseThread([this]{
96 auto flags = fcntl(readFd_, F_GETFL);
98 fail(__func__, AsyncSocketException(
99 AsyncSocketException::INTERNAL_ERROR,
100 "fcntl F_GETFL failed", errno));
105 if (flags == O_WRONLY) {
106 fail(__func__, AsyncSocketException(
107 AsyncSocketException::BAD_ARGS, "file not open for reading"));
111 #ifndef GLIBC_AT_LEAST_2_9
112 fail(__func__, AsyncSocketException(
113 AsyncSocketException::NOT_SUPPORTED,
114 "writeFile unsupported on glibc < 2.9"));
118 if (::pipe2(pipeFds, O_NONBLOCK) == -1) {
119 fail(__func__, AsyncSocketException(
120 AsyncSocketException::INTERNAL_ERROR,
121 "pipe2 failed", errno));
126 // Max size for unprevileged processes as set in /proc/sys/fs/pipe-max-size
127 // Ignore failures and just roll with it
128 // TODO maybe read max size from /proc?
129 fcntl(pipeFds[0], F_SETPIPE_SZ, 1048576);
130 fcntl(pipeFds[1], F_SETPIPE_SZ, 1048576);
133 pipe_out_ = pipeFds[0];
135 socket_->getEventBase()->runInEventBaseThreadAndWait([&]{
136 startConsuming(socket_->getEventBase(), &queue_);
138 readHandler_ = folly::make_unique<FileReadHandler>(
139 this, pipeFds[1], count_);
144 FileRegion::FileWriteRequest::~FileWriteRequest() {
145 CHECK(readBase_->isInEventBaseThread());
146 socket_->getEventBase()->runInEventBaseThreadAndWait([&]{
148 if (pipe_out_ > -1) {
155 void FileRegion::FileWriteRequest::fail(
157 const AsyncSocketException& ex) {
158 socket_->getEventBase()->runInEventBaseThread([=]{
159 WriteRequest::fail(fn, ex);
163 FileRegion::FileWriteRequest::FileReadHandler::FileReadHandler(
164 FileWriteRequest* req, int pipe_in, size_t bytesToRead)
165 : req_(req), pipe_in_(pipe_in), bytesToRead_(bytesToRead) {
166 CHECK(req_->readBase_->isInEventBaseThread());
167 initHandler(req_->readBase_, pipe_in);
168 if (!registerHandler(EventFlags::WRITE | EventFlags::PERSIST)) {
169 req_->fail(__func__, AsyncSocketException(
170 AsyncSocketException::INTERNAL_ERROR,
171 "registerHandler failed"));
175 FileRegion::FileWriteRequest::FileReadHandler::~FileReadHandler() {
176 CHECK(req_->readBase_->isInEventBaseThread());
181 void FileRegion::FileWriteRequest::FileReadHandler::handlerReady(
182 uint16_t events) noexcept {
183 CHECK(events & EventHandler::WRITE);
184 if (bytesToRead_ == 0) {
189 int flags = SPLICE_F_NONBLOCK | SPLICE_F_MORE;
190 ssize_t spliced = ::splice(req_->readFd_, &req_->offset_,
192 bytesToRead_, flags);
194 if (errno == EAGAIN) {
197 req_->fail(__func__, AsyncSocketException(
198 AsyncSocketException::INTERNAL_ERROR,
199 "splice failed", errno));
205 bytesToRead_ -= spliced;
207 req_->queue_.putMessage(static_cast<size_t>(spliced));
209 req_->fail(__func__, AsyncSocketException(
210 AsyncSocketException::INTERNAL_ERROR,
211 "putMessage failed"));