Return if we handle any error messages to avoid unnecessarily calling recv/send
[folly.git] / folly / io / async / AsyncPipe.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <list>
20 #include <system_error>
21
22 #include <folly/io/IOBufQueue.h>
23 #include <folly/io/async/AsyncTransport.h>
24 #include <folly/io/async/DelayedDestruction.h>
25 #include <folly/io/async/EventHandler.h>
26
27 namespace folly {
28
29 class AsyncSocketException;
30
31 /**
32  * Read from a pipe in an async manner.
33  */
34 class AsyncPipeReader : public EventHandler,
35                         public AsyncReader,
36                         public DelayedDestruction {
37  public:
38   typedef std::unique_ptr<AsyncPipeReader,
39                           folly::DelayedDestruction::Destructor> UniquePtr;
40
41   template <typename... Args>
42   static UniquePtr newReader(Args&&... args) {
43     return UniquePtr(new AsyncPipeReader(std::forward<Args>(args)...));
44   }
45
46   AsyncPipeReader(folly::EventBase* eventBase, int pipeFd)
47     : EventHandler(eventBase, pipeFd),
48     fd_(pipeFd) {}
49
50   /**
51    * Set the read callback and automatically install/uninstall the handler
52    * for events.
53    */
54   void setReadCB(AsyncReader::ReadCallback* callback) override {
55     if (callback == readCallback_) {
56       return;
57     }
58     readCallback_ = callback;
59     if (readCallback_ && !isHandlerRegistered()) {
60       registerHandler(EventHandler::READ | EventHandler::PERSIST);
61     } else if (!readCallback_ && isHandlerRegistered()) {
62       unregisterHandler();
63     }
64   }
65
66   /**
67    * Get the read callback
68    */
69   AsyncReader::ReadCallback* getReadCallback() const override {
70     return readCallback_;
71   }
72
73   /**
74    * Set a special hook to close the socket (otherwise, will call close())
75    */
76   void setCloseCallback(std::function<void(int)> closeCb) {
77     closeCb_ = closeCb;
78   }
79
80  private:
81   ~AsyncPipeReader() override;
82
83   void handlerReady(uint16_t events) noexcept override;
84   void failRead(const AsyncSocketException& ex);
85   void close();
86
87   int fd_;
88   AsyncReader::ReadCallback* readCallback_{nullptr};
89   std::function<void(int)> closeCb_;
90 };
91
92 /**
93  * Write to a pipe in an async manner.
94  */
95 class AsyncPipeWriter : public EventHandler,
96                         public AsyncWriter,
97                         public DelayedDestruction {
98  public:
99   typedef std::unique_ptr<AsyncPipeWriter,
100                           folly::DelayedDestruction::Destructor> UniquePtr;
101
102   template <typename... Args>
103   static UniquePtr newWriter(Args&&... args) {
104     return UniquePtr(new AsyncPipeWriter(std::forward<Args>(args)...));
105   }
106
107   AsyncPipeWriter(folly::EventBase* eventBase, int pipeFd)
108     : EventHandler(eventBase, pipeFd),
109     fd_(pipeFd) {}
110
111   /**
112    * Asynchronously write the given iobuf to this pipe, and invoke the callback
113    * on success/error.
114    */
115   void write(std::unique_ptr<folly::IOBuf> iob,
116              AsyncWriter::WriteCallback* wcb = nullptr);
117
118   /**
119    * Set a special hook to close the socket (otherwise, will call close())
120    */
121   void setCloseCallback(std::function<void(int)> closeCb) {
122     closeCb_ = closeCb;
123   }
124
125   /**
126    * Returns true if the pipe is closed
127    */
128   bool closed() const {
129     return (fd_ < 0 || closeOnEmpty_);
130   }
131
132   /**
133    * Notify the pipe to close as soon as all pending writes complete
134    */
135   void closeOnEmpty();
136
137   /**
138    * Close the pipe immediately, and fail all pending writes
139    */
140   void closeNow();
141
142   /**
143    * Return true if there are currently writes pending (eg: the pipe is blocked
144    * for writing)
145    */
146   bool hasPendingWrites() const {
147     return !queue_.empty();
148   }
149
150   // AsyncWriter methods
151   void write(folly::AsyncWriter::WriteCallback* callback,
152              const void* buf,
153              size_t bytes,
154              WriteFlags flags = WriteFlags::NONE) override {
155     writeChain(callback, IOBuf::wrapBuffer(buf, bytes), flags);
156   }
157   void writev(folly::AsyncWriter::WriteCallback*,
158               const iovec*,
159               size_t,
160               WriteFlags = WriteFlags::NONE) override {
161     throw std::runtime_error("writev is not supported. Please use writeChain.");
162   }
163   void writeChain(folly::AsyncWriter::WriteCallback* callback,
164                   std::unique_ptr<folly::IOBuf>&& buf,
165                   WriteFlags flags = WriteFlags::NONE) override;
166
167  private:
168   void handlerReady(uint16_t events) noexcept override;
169   void handleWrite();
170   void failAllWrites(const AsyncSocketException& ex);
171
172   int fd_;
173   std::list<std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*>> queue_;
174   bool closeOnEmpty_{false};
175   std::function<void(int)> closeCb_;
176
177   ~AsyncPipeWriter() override {
178     closeNow();
179   }
180 };
181
182 } // namespace folly