2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/Singleton.h>
19 #include <folly/io/async/AsyncTransport.h>
20 #include <folly/io/async/AsyncSocket.h>
21 #include <folly/io/async/NotificationQueue.h>
22 #include <folly/futures/Future.h>
23 #include <folly/futures/Promise.h>
24 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
26 namespace folly { namespace wangle {
30 FileRegion(int fd, off_t offset, size_t count)
31 : fd_(fd), offset_(offset), count_(count) {}
33 Future<Unit> transferTo(std::shared_ptr<AsyncTransport> transport) {
34 auto socket = std::dynamic_pointer_cast<AsyncSocket>(
37 auto cb = new WriteCallback();
38 auto f = cb->promise_.getFuture();
39 auto req = new FileWriteRequest(socket.get(), cb, fd_, offset_, count_);
40 socket->writeRequest(req);
45 class WriteCallback : private AsyncSocket::WriteCallback {
46 void writeSuccess() noexcept override {
51 void writeErr(size_t bytesWritten,
52 const AsyncSocketException& ex)
54 promise_.setException(ex);
58 friend class FileRegion;
59 folly::Promise<Unit> promise_;
66 class FileWriteRequest : public AsyncSocket::WriteRequest,
67 public NotificationQueue<size_t>::Consumer {
69 FileWriteRequest(AsyncSocket* socket, WriteCallback* callback,
70 int fd, off_t offset, size_t count);
72 void destroy() override;
74 bool performWrite() override;
76 void consume() override;
78 bool isComplete() override;
80 void messageAvailable(size_t&& count) override;
82 void start() override;
84 class FileReadHandler : public folly::EventHandler {
86 FileReadHandler(FileWriteRequest* req, int pipe_in, size_t bytesToRead);
90 void handlerReady(uint16_t events) noexcept override;
93 FileWriteRequest* req_;
101 void fail(const char* fn, const AsyncSocketException& ex);
106 bool started_{false};
109 size_t bytesInPipe_{0};
110 folly::EventBase* readBase_;
111 folly::NotificationQueue<size_t> queue_;
112 std::unique_ptr<FileReadHandler> readHandler_;