Allow underlying transport to be accessible from AsyncTransportWrapper
[folly.git] / folly / io / async / AsyncTransport.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <memory>
20 #include <sys/uio.h>
21
22 #include <folly/io/async/DelayedDestruction.h>
23 #include <folly/io/async/EventBase.h>
24 #include <folly/io/async/AsyncSocketBase.h>
25
26 #include <openssl/ssl.h>
27
28 constexpr bool kOpenSslModeMoveBufferOwnership =
29 #ifdef SSL_MODE_MOVE_BUFFER_OWNERSHIP
30   true
31 #else
32   false
33 #endif
34 ;
35
36 namespace folly {
37
38 class AsyncSocketException;
39 class EventBase;
40 class IOBuf;
41 class SocketAddress;
42
43 /*
44  * flags given by the application for write* calls
45  */
46 enum class WriteFlags : uint32_t {
47   NONE = 0x00,
48   /*
49    * Whether to delay the output until a subsequent non-corked write.
50    * (Note: may not be supported in all subclasses or on all platforms.)
51    */
52   CORK = 0x01,
53   /*
54    * for a socket that has ACK latency enabled, it will cause the kernel
55    * to fire a TCP ESTATS event when the last byte of the given write call
56    * will be acknowledged.
57    */
58   EOR = 0x02,
59   /*
60    * this indicates that only the write side of socket should be shutdown
61    */
62   WRITE_SHUTDOWN = 0x04,
63 };
64
65 /*
66  * union operator
67  */
68 inline WriteFlags operator|(WriteFlags a, WriteFlags b) {
69   return static_cast<WriteFlags>(
70     static_cast<uint32_t>(a) | static_cast<uint32_t>(b));
71 }
72
73 /*
74  * intersection operator
75  */
76 inline WriteFlags operator&(WriteFlags a, WriteFlags b) {
77   return static_cast<WriteFlags>(
78     static_cast<uint32_t>(a) & static_cast<uint32_t>(b));
79 }
80
81 /*
82  * exclusion parameter
83  */
84 inline WriteFlags operator~(WriteFlags a) {
85   return static_cast<WriteFlags>(~static_cast<uint32_t>(a));
86 }
87
88 /*
89  * unset operator
90  */
91 inline WriteFlags unSet(WriteFlags a, WriteFlags b) {
92   return a & ~b;
93 }
94
95 /*
96  * inclusion operator
97  */
98 inline bool isSet(WriteFlags a, WriteFlags b) {
99   return (a & b) == b;
100 }
101
102
103 /**
104  * AsyncTransport defines an asynchronous API for streaming I/O.
105  *
106  * This class provides an API to for asynchronously waiting for data
107  * on a streaming transport, and for asynchronously sending data.
108  *
109  * The APIs for reading and writing are intentionally asymmetric.  Waiting for
110  * data to read is a persistent API: a callback is installed, and is notified
111  * whenever new data is available.  It continues to be notified of new events
112  * until it is uninstalled.
113  *
114  * AsyncTransport does not provide read timeout functionality, because it
115  * typically cannot determine when the timeout should be active.  Generally, a
116  * timeout should only be enabled when processing is blocked waiting on data
117  * from the remote endpoint.  For server-side applications, the timeout should
118  * not be active if the server is currently processing one or more outstanding
119  * requests on this transport.  For client-side applications, the timeout
120  * should not be active if there are no requests pending on the transport.
121  * Additionally, if a client has multiple pending requests, it will ususally
122  * want a separate timeout for each request, rather than a single read timeout.
123  *
124  * The write API is fairly intuitive: a user can request to send a block of
125  * data, and a callback will be informed once the entire block has been
126  * transferred to the kernel, or on error.  AsyncTransport does provide a send
127  * timeout, since most callers want to give up if the remote end stops
128  * responding and no further progress can be made sending the data.
129  */
130 class AsyncTransport : public DelayedDestruction, public AsyncSocketBase {
131  public:
132   typedef std::unique_ptr<AsyncTransport, Destructor> UniquePtr;
133
134   /**
135    * Close the transport.
136    *
137    * This gracefully closes the transport, waiting for all pending write
138    * requests to complete before actually closing the underlying transport.
139    *
140    * If a read callback is set, readEOF() will be called immediately.  If there
141    * are outstanding write requests, the close will be delayed until all
142    * remaining writes have completed.  No new writes may be started after
143    * close() has been called.
144    */
145   virtual void close() = 0;
146
147   /**
148    * Close the transport immediately.
149    *
150    * This closes the transport immediately, dropping any outstanding data
151    * waiting to be written.
152    *
153    * If a read callback is set, readEOF() will be called immediately.
154    * If there are outstanding write requests, these requests will be aborted
155    * and writeError() will be invoked immediately on all outstanding write
156    * callbacks.
157    */
158   virtual void closeNow() = 0;
159
160   /**
161    * Reset the transport immediately.
162    *
163    * This closes the transport immediately, sending a reset to the remote peer
164    * if possible to indicate abnormal shutdown.
165    *
166    * Note that not all subclasses implement this reset functionality: some
167    * subclasses may treat reset() the same as closeNow().  Subclasses that use
168    * TCP transports should terminate the connection with a TCP reset.
169    */
170   virtual void closeWithReset() {
171     closeNow();
172   }
173
174   /**
175    * Perform a half-shutdown of the write side of the transport.
176    *
177    * The caller should not make any more calls to write() or writev() after
178    * shutdownWrite() is called.  Any future write attempts will fail
179    * immediately.
180    *
181    * Not all transport types support half-shutdown.  If the underlying
182    * transport does not support half-shutdown, it will fully shutdown both the
183    * read and write sides of the transport.  (Fully shutting down the socket is
184    * better than doing nothing at all, since the caller may rely on the
185    * shutdownWrite() call to notify the other end of the connection that no
186    * more data can be read.)
187    *
188    * If there is pending data still waiting to be written on the transport,
189    * the actual shutdown will be delayed until the pending data has been
190    * written.
191    *
192    * Note: There is no corresponding shutdownRead() equivalent.  Simply
193    * uninstall the read callback if you wish to stop reading.  (On TCP sockets
194    * at least, shutting down the read side of the socket is a no-op anyway.)
195    */
196   virtual void shutdownWrite() = 0;
197
198   /**
199    * Perform a half-shutdown of the write side of the transport.
200    *
201    * shutdownWriteNow() is identical to shutdownWrite(), except that it
202    * immediately performs the shutdown, rather than waiting for pending writes
203    * to complete.  Any pending write requests will be immediately failed when
204    * shutdownWriteNow() is called.
205    */
206   virtual void shutdownWriteNow() = 0;
207
208   /**
209    * Determine if transport is open and ready to read or write.
210    *
211    * Note that this function returns false on EOF; you must also call error()
212    * to distinguish between an EOF and an error.
213    *
214    * @return  true iff the transport is open and ready, false otherwise.
215    */
216   virtual bool good() const = 0;
217
218   /**
219    * Determine if the transport is readable or not.
220    *
221    * @return  true iff the transport is readable, false otherwise.
222    */
223   virtual bool readable() const = 0;
224
225   /**
226    * Determine if the there is pending data on the transport.
227    *
228    * @return  true iff the if the there is pending data, false otherwise.
229    */
230   virtual bool isPending() const {
231     return readable();
232   }
233
234   /**
235    * Determine if transport is connected to the endpoint
236    *
237    * @return  false iff the transport is connected, otherwise true
238    */
239   virtual bool connecting() const = 0;
240
241   /**
242    * Determine if an error has occurred with this transport.
243    *
244    * @return  true iff an error has occurred (not EOF).
245    */
246   virtual bool error() const = 0;
247
248   /**
249    * Attach the transport to a EventBase.
250    *
251    * This may only be called if the transport is not currently attached to a
252    * EventBase (by an earlier call to detachEventBase()).
253    *
254    * This method must be invoked in the EventBase's thread.
255    */
256   virtual void attachEventBase(EventBase* eventBase) = 0;
257
258   /**
259    * Detach the transport from its EventBase.
260    *
261    * This may only be called when the transport is idle and has no reads or
262    * writes pending.  Once detached, the transport may not be used again until
263    * it is re-attached to a EventBase by calling attachEventBase().
264    *
265    * This method must be called from the current EventBase's thread.
266    */
267   virtual void detachEventBase() = 0;
268
269   /**
270    * Determine if the transport can be detached.
271    *
272    * This method must be called from the current EventBase's thread.
273    */
274   virtual bool isDetachable() const = 0;
275
276   /**
277    * Set the send timeout.
278    *
279    * If write requests do not make any progress for more than the specified
280    * number of milliseconds, fail all pending writes and close the transport.
281    *
282    * If write requests are currently pending when setSendTimeout() is called,
283    * the timeout interval is immediately restarted using the new value.
284    *
285    * @param milliseconds  The timeout duration, in milliseconds.  If 0, no
286    *                      timeout will be used.
287    */
288   virtual void setSendTimeout(uint32_t milliseconds) = 0;
289
290   /**
291    * Get the send timeout.
292    *
293    * @return Returns the current send timeout, in milliseconds.  A return value
294    *         of 0 indicates that no timeout is set.
295    */
296   virtual uint32_t getSendTimeout() const = 0;
297
298   /**
299    * Get the address of the local endpoint of this transport.
300    *
301    * This function may throw AsyncSocketException on error.
302    *
303    * @param address  The local address will be stored in the specified
304    *                 SocketAddress.
305    */
306   virtual void getLocalAddress(SocketAddress* address) const = 0;
307
308   virtual void getAddress(SocketAddress* address) const {
309     getLocalAddress(address);
310   }
311
312   /**
313    * Get the address of the remote endpoint to which this transport is
314    * connected.
315    *
316    * This function may throw AsyncSocketException on error.
317    *
318    * @param address  The remote endpoint's address will be stored in the
319    *                 specified SocketAddress.
320    */
321   virtual void getPeerAddress(SocketAddress* address) const = 0;
322
323   /**
324    * @return True iff end of record tracking is enabled
325    */
326   virtual bool isEorTrackingEnabled() const = 0;
327
328   virtual void setEorTracking(bool track) = 0;
329
330   virtual size_t getAppBytesWritten() const = 0;
331   virtual size_t getRawBytesWritten() const = 0;
332   virtual size_t getAppBytesReceived() const = 0;
333   virtual size_t getRawBytesReceived() const = 0;
334
335  protected:
336   virtual ~AsyncTransport() = default;
337 };
338
339 class AsyncReader {
340  public:
341   class ReadCallback {
342    public:
343     virtual ~ReadCallback() = default;
344
345     /**
346      * When data becomes available, getReadBuffer() will be invoked to get the
347      * buffer into which data should be read.
348      *
349      * This method allows the ReadCallback to delay buffer allocation until
350      * data becomes available.  This allows applications to manage large
351      * numbers of idle connections, without having to maintain a separate read
352      * buffer for each idle connection.
353      *
354      * It is possible that in some cases, getReadBuffer() may be called
355      * multiple times before readDataAvailable() is invoked.  In this case, the
356      * data will be written to the buffer returned from the most recent call to
357      * readDataAvailable().  If the previous calls to readDataAvailable()
358      * returned different buffers, the ReadCallback is responsible for ensuring
359      * that they are not leaked.
360      *
361      * If getReadBuffer() throws an exception, returns a nullptr buffer, or
362      * returns a 0 length, the ReadCallback will be uninstalled and its
363      * readError() method will be invoked.
364      *
365      * getReadBuffer() is not allowed to change the transport state before it
366      * returns.  (For example, it should never uninstall the read callback, or
367      * set a different read callback.)
368      *
369      * @param bufReturn getReadBuffer() should update *bufReturn to contain the
370      *                  address of the read buffer.  This parameter will never
371      *                  be nullptr.
372      * @param lenReturn getReadBuffer() should update *lenReturn to contain the
373      *                  maximum number of bytes that may be written to the read
374      *                  buffer.  This parameter will never be nullptr.
375      */
376     virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0;
377
378     /**
379      * readDataAvailable() will be invoked when data has been successfully read
380      * into the buffer returned by the last call to getReadBuffer().
381      *
382      * The read callback remains installed after readDataAvailable() returns.
383      * It must be explicitly uninstalled to stop receiving read events.
384      * getReadBuffer() will be called at least once before each call to
385      * readDataAvailable().  getReadBuffer() will also be called before any
386      * call to readEOF().
387      *
388      * @param len       The number of bytes placed in the buffer.
389      */
390
391     virtual void readDataAvailable(size_t len) noexcept = 0;
392
393     /**
394      * When data becomes available, isBufferMovable() will be invoked to figure
395      * out which API will be used, readBufferAvailable() or
396      * readDataAvailable(). If isBufferMovable() returns true, that means
397      * ReadCallback supports the IOBuf ownership transfer and
398      * readBufferAvailable() will be used.  Otherwise, not.
399
400      * By default, isBufferMovable() always return false. If
401      * readBufferAvailable() is implemented and to be invoked, You should
402      * overwrite isBufferMovable() and return true in the inherited class.
403      *
404      * This method allows the AsyncSocket/AsyncSSLSocket do buffer allocation by
405      * itself until data becomes available.  Compared with the pre/post buffer
406      * allocation in getReadBuffer()/readDataAvailabe(), readBufferAvailable()
407      * has two advantages.  First, this can avoid memcpy. E.g., in
408      * AsyncSSLSocket, the decrypted data was copied from the openssl internal
409      * buffer to the readbuf buffer.  With the buffer ownership transfer, the
410      * internal buffer can be directly "moved" to ReadCallback. Second, the
411      * memory allocation can be more precise.  The reason is
412      * AsyncSocket/AsyncSSLSocket can allocate the memory of precise size
413      * because they have more context about the available data than
414      * ReadCallback.  Think about the getReadBuffer() pre-allocate 4072 bytes
415      * buffer, but the available data is always 16KB (max OpenSSL record size).
416      */
417
418     virtual bool isBufferMovable() noexcept {
419       return false;
420     }
421
422     /**
423      * readBufferAvailable() will be invoked when data has been successfully
424      * read.
425      *
426      * Note that only either readBufferAvailable() or readDataAvailable() will
427      * be invoked according to the return value of isBufferMovable(). The timing
428      * and aftereffect of readBufferAvailable() are the same as
429      * readDataAvailable()
430      *
431      * @param readBuf The unique pointer of read buffer.
432      */
433
434     virtual void readBufferAvailable(std::unique_ptr<IOBuf> /*readBuf*/)
435       noexcept {};
436
437     /**
438      * readEOF() will be invoked when the transport is closed.
439      *
440      * The read callback will be automatically uninstalled immediately before
441      * readEOF() is invoked.
442      */
443     virtual void readEOF() noexcept = 0;
444
445     /**
446      * readError() will be invoked if an error occurs reading from the
447      * transport.
448      *
449      * The read callback will be automatically uninstalled immediately before
450      * readError() is invoked.
451      *
452      * @param ex        An exception describing the error that occurred.
453      */
454     virtual void readErr(const AsyncSocketException& ex) noexcept = 0;
455   };
456
457   // Read methods that aren't part of AsyncTransport.
458   virtual void setReadCB(ReadCallback* callback) = 0;
459   virtual ReadCallback* getReadCallback() const = 0;
460
461  protected:
462   virtual ~AsyncReader() = default;
463 };
464
465 class AsyncWriter {
466  public:
467   class WriteCallback {
468    public:
469     virtual ~WriteCallback() = default;
470
471     /**
472      * writeSuccess() will be invoked when all of the data has been
473      * successfully written.
474      *
475      * Note that this mainly signals that the buffer containing the data to
476      * write is no longer needed and may be freed or re-used.  It does not
477      * guarantee that the data has been fully transmitted to the remote
478      * endpoint.  For example, on socket-based transports, writeSuccess() only
479      * indicates that the data has been given to the kernel for eventual
480      * transmission.
481      */
482     virtual void writeSuccess() noexcept = 0;
483
484     /**
485      * writeError() will be invoked if an error occurs writing the data.
486      *
487      * @param bytesWritten      The number of bytes that were successfull
488      * @param ex                An exception describing the error that occurred.
489      */
490     virtual void writeErr(size_t bytesWritten,
491                           const AsyncSocketException& ex) noexcept = 0;
492   };
493
494   // Write methods that aren't part of AsyncTransport
495   virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
496                      WriteFlags flags = WriteFlags::NONE) = 0;
497   virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
498                       WriteFlags flags = WriteFlags::NONE) = 0;
499   virtual void writeChain(WriteCallback* callback,
500                           std::unique_ptr<IOBuf>&& buf,
501                           WriteFlags flags = WriteFlags::NONE) = 0;
502
503  protected:
504   virtual ~AsyncWriter() = default;
505 };
506
507 // Transitional intermediate interface. This is deprecated.
508 // Wrapper around folly::AsyncTransport, that includes read/write callbacks
509 class AsyncTransportWrapper : virtual public AsyncTransport,
510                               virtual public AsyncReader,
511                               virtual public AsyncWriter {
512  public:
513   using UniquePtr = std::unique_ptr<AsyncTransportWrapper, Destructor>;
514
515   // Alias for inherited members from AsyncReader and AsyncWriter
516   // to keep compatibility.
517   using ReadCallback    = AsyncReader::ReadCallback;
518   using WriteCallback   = AsyncWriter::WriteCallback;
519   virtual void setReadCB(ReadCallback* callback) override = 0;
520   virtual ReadCallback* getReadCallback() const override = 0;
521   virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
522                      WriteFlags flags = WriteFlags::NONE) override = 0;
523   virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
524                       WriteFlags flags = WriteFlags::NONE) override = 0;
525   virtual void writeChain(WriteCallback* callback,
526                           std::unique_ptr<IOBuf>&& buf,
527                           WriteFlags flags = WriteFlags::NONE) override = 0;
528   /**
529    * The transport wrapper may wrap another transport. This returns the
530    * transport that is wrapped. It returns nullptr if there is no wrapped
531    * transport.
532    */
533   virtual AsyncTransportWrapper* getWrappedTransport() {
534     return nullptr;
535   }
536 };
537
538 } // folly