6360ae3589acd7a2adbda18bf4ace04490064a80
[folly.git] / folly / wangle / channel / FileRegion.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/Singleton.h>
19 #include <folly/io/async/AsyncTransport.h>
20 #include <folly/io/async/AsyncSocket.h>
21 #include <folly/io/async/NotificationQueue.h>
22 #include <folly/futures/Future.h>
23 #include <folly/futures/Promise.h>
24 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
25
26 namespace folly { namespace wangle {
27
28 class FileRegion {
29  public:
30   FileRegion(int fd, off_t offset, size_t count)
31     : fd_(fd), offset_(offset), count_(count) {}
32
33   Future<void> transferTo(std::shared_ptr<AsyncTransport> transport) {
34     auto socket = std::dynamic_pointer_cast<AsyncSocket>(
35         transport);
36     CHECK(socket);
37     auto cb = new WriteCallback();
38     auto f = cb->promise_.getFuture();
39     auto req = new FileWriteRequest(socket.get(), cb, fd_, offset_, count_);
40     socket->writeRequest(req);
41     return f;
42   }
43
44  private:
45   class WriteCallback : private AsyncSocket::WriteCallback {
46     void writeSuccess() noexcept override {
47       promise_.setValue();
48       delete this;
49     }
50
51     void writeErr(size_t bytesWritten,
52                   const AsyncSocketException& ex)
53       noexcept override {
54       promise_.setException(ex);
55       delete this;
56     }
57
58     friend class FileRegion;
59     folly::Promise<void> promise_;
60   };
61
62   const int fd_;
63   const off_t offset_;
64   const size_t count_;
65
66   class FileWriteRequest : public AsyncSocket::WriteRequest,
67                            public NotificationQueue<size_t>::Consumer {
68    public:
69     FileWriteRequest(AsyncSocket* socket, WriteCallback* callback,
70                      int fd, off_t offset, size_t count);
71
72     void destroy() override;
73
74     bool performWrite() override;
75
76     void consume() override;
77
78     bool isComplete() override;
79
80     void messageAvailable(size_t&& count) override;
81
82     void start() override;
83
84     class FileReadHandler : public folly::EventHandler {
85      public:
86       FileReadHandler(FileWriteRequest* req, int pipe_in, size_t bytesToRead);
87
88       ~FileReadHandler();
89
90       void handlerReady(uint16_t events) noexcept override;
91
92      private:
93       FileWriteRequest* req_;
94       int pipe_in_;
95       size_t bytesToRead_;
96     };
97
98    private:
99     ~FileWriteRequest();
100
101     void fail(const char* fn, const AsyncSocketException& ex);
102
103     const int readFd_;
104     off_t offset_;
105     const size_t count_;
106     bool started_{false};
107     int pipe_out_{-1};
108
109     size_t bytesInPipe_{0};
110     folly::EventBase* readBase_;
111     folly::NotificationQueue<size_t> queue_;
112     std::unique_ptr<FileReadHandler> readHandler_;
113   };
114 };
115
116 }} // folly::wangle