2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/wangle/channel/FileRegion.h>
18 using namespace folly;
19 using namespace folly::wangle;
23 struct FileRegionReadPool {};
25 Singleton<IOThreadPoolExecutor, FileRegionReadPool> readPool(
27 return new IOThreadPoolExecutor(
28 sysconf(_SC_NPROCESSORS_ONLN),
29 std::make_shared<NamedThreadFactory>("FileRegionReadPool"));
34 namespace folly { namespace wangle {
36 FileRegion::FileWriteRequest::FileWriteRequest(AsyncSocket* socket,
37 WriteCallback* callback, int fd, off_t offset, size_t count)
38 : WriteRequest(socket, callback),
39 readFd_(fd), offset_(offset), count_(count) {
42 void FileRegion::FileWriteRequest::destroy() {
43 readBase_->runInEventBaseThread([this]{
48 bool FileRegion::FileWriteRequest::performWrite() {
54 int flags = SPLICE_F_NONBLOCK | SPLICE_F_MORE;
55 ssize_t spliced = ::splice(pipe_out_, nullptr,
56 socket_->getFd(), nullptr,
59 if (errno == EAGAIN) {
65 bytesInPipe_ -= spliced;
66 bytesWritten(spliced);
70 void FileRegion::FileWriteRequest::consume() {
74 bool FileRegion::FileWriteRequest::isComplete() {
75 return totalBytesWritten_ == count_;
78 void FileRegion::FileWriteRequest::messageAvailable(size_t&& count) {
79 bool shouldWrite = bytesInPipe_ == 0;
80 bytesInPipe_ += count;
82 socket_->writeRequestReady();
87 # if (__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 9))
88 # define GLIBC_AT_LEAST_2_9 1
92 void FileRegion::FileWriteRequest::start() {
94 readBase_ = readPool.get()->getEventBase();
95 readBase_->runInEventBaseThread([this]{
96 auto flags = fcntl(readFd_, F_GETFL);
98 fail(__func__, AsyncSocketException(
99 AsyncSocketException::INTERNAL_ERROR,
100 "fcntl F_GETFL failed", errno));
105 if (flags == O_WRONLY) {
106 fail(__func__, AsyncSocketException(
107 AsyncSocketException::BAD_ARGS, "file not open for reading"));
111 #ifndef GLIBC_AT_LEAST_2_9
112 fail(__func__, AsyncSocketException(
113 AsyncSocketException::NOT_SUPPORTED,
114 "writeFile unsupported on glibc < 2.9"));
118 if (::pipe2(pipeFds, O_NONBLOCK) == -1) {
119 fail(__func__, AsyncSocketException(
120 AsyncSocketException::INTERNAL_ERROR,
121 "pipe2 failed", errno));
125 // Max size for unprevileged processes as set in /proc/sys/fs/pipe-max-size
126 // Ignore failures and just roll with it
127 // TODO maybe read max size from /proc?
128 fcntl(pipeFds[0], F_SETPIPE_SZ, 1048576);
129 fcntl(pipeFds[1], F_SETPIPE_SZ, 1048576);
131 pipe_out_ = pipeFds[0];
133 socket_->getEventBase()->runInEventBaseThreadAndWait([&]{
134 startConsuming(socket_->getEventBase(), &queue_);
136 readHandler_ = folly::make_unique<FileReadHandler>(
137 this, pipeFds[1], count_);
142 FileRegion::FileWriteRequest::~FileWriteRequest() {
143 CHECK(readBase_->isInEventBaseThread());
144 socket_->getEventBase()->runInEventBaseThreadAndWait([&]{
146 if (pipe_out_ > -1) {
153 void FileRegion::FileWriteRequest::fail(
155 const AsyncSocketException& ex) {
156 socket_->getEventBase()->runInEventBaseThread([=]{
157 WriteRequest::fail(fn, ex);
161 FileRegion::FileWriteRequest::FileReadHandler::FileReadHandler(
162 FileWriteRequest* req, int pipe_in, size_t bytesToRead)
163 : req_(req), pipe_in_(pipe_in), bytesToRead_(bytesToRead) {
164 CHECK(req_->readBase_->isInEventBaseThread());
165 initHandler(req_->readBase_, pipe_in);
166 if (!registerHandler(EventFlags::WRITE | EventFlags::PERSIST)) {
167 req_->fail(__func__, AsyncSocketException(
168 AsyncSocketException::INTERNAL_ERROR,
169 "registerHandler failed"));
173 FileRegion::FileWriteRequest::FileReadHandler::~FileReadHandler() {
174 CHECK(req_->readBase_->isInEventBaseThread());
179 void FileRegion::FileWriteRequest::FileReadHandler::handlerReady(
180 uint16_t events) noexcept {
181 CHECK(events & EventHandler::WRITE);
182 if (bytesToRead_ == 0) {
187 int flags = SPLICE_F_NONBLOCK | SPLICE_F_MORE;
188 ssize_t spliced = ::splice(req_->readFd_, &req_->offset_,
190 bytesToRead_, flags);
192 if (errno == EAGAIN) {
195 req_->fail(__func__, AsyncSocketException(
196 AsyncSocketException::INTERNAL_ERROR,
197 "splice failed", errno));
203 bytesToRead_ -= spliced;
205 req_->queue_.putMessage(static_cast<size_t>(spliced));
207 req_->fail(__func__, AsyncSocketException(
208 AsyncSocketException::INTERNAL_ERROR,
209 "putMessage failed"));