8af868f7b79e586bedf18a3d706e67eac2658363
[folly.git] / folly / io / async / AsyncTransport.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <memory>
20
21 #include <folly/io/IOBuf.h>
22 #include <folly/io/async/AsyncSocketBase.h>
23 #include <folly/io/async/DelayedDestruction.h>
24 #include <folly/io/async/EventBase.h>
25 #include <folly/io/async/ssl/OpenSSLPtrTypes.h>
26 #include <folly/portability/SysUio.h>
27
28 #include <openssl/ssl.h>
29
30 constexpr bool kOpenSslModeMoveBufferOwnership =
31 #ifdef SSL_MODE_MOVE_BUFFER_OWNERSHIP
32   true
33 #else
34   false
35 #endif
36 ;
37
38 namespace folly {
39
40 class AsyncSocketException;
41 class EventBase;
42 class SocketAddress;
43
44 /*
45  * flags given by the application for write* calls
46  */
47 enum class WriteFlags : uint32_t {
48   NONE = 0x00,
49   /*
50    * Whether to delay the output until a subsequent non-corked write.
51    * (Note: may not be supported in all subclasses or on all platforms.)
52    */
53   CORK = 0x01,
54   /*
55    * for a socket that has ACK latency enabled, it will cause the kernel
56    * to fire a TCP ESTATS event when the last byte of the given write call
57    * will be acknowledged.
58    */
59   EOR = 0x02,
60   /*
61    * this indicates that only the write side of socket should be shutdown
62    */
63   WRITE_SHUTDOWN = 0x04,
64 };
65
66 /*
67  * union operator
68  */
69 inline WriteFlags operator|(WriteFlags a, WriteFlags b) {
70   return static_cast<WriteFlags>(
71     static_cast<uint32_t>(a) | static_cast<uint32_t>(b));
72 }
73
74 /*
75  * compound assignment union operator
76  */
77 inline WriteFlags& operator|=(WriteFlags& a, WriteFlags b) {
78   a = a | b;
79   return a;
80 }
81
82 /*
83  * intersection operator
84  */
85 inline WriteFlags operator&(WriteFlags a, WriteFlags b) {
86   return static_cast<WriteFlags>(
87     static_cast<uint32_t>(a) & static_cast<uint32_t>(b));
88 }
89
90 /*
91  * compound assignment intersection operator
92  */
93 inline WriteFlags& operator&=(WriteFlags& a, WriteFlags b) {
94   a = a & b;
95   return a;
96 }
97
98 /*
99  * exclusion parameter
100  */
101 inline WriteFlags operator~(WriteFlags a) {
102   return static_cast<WriteFlags>(~static_cast<uint32_t>(a));
103 }
104
105 /*
106  * unset operator
107  */
108 inline WriteFlags unSet(WriteFlags a, WriteFlags b) {
109   return a & ~b;
110 }
111
112 /*
113  * inclusion operator
114  */
115 inline bool isSet(WriteFlags a, WriteFlags b) {
116   return (a & b) == b;
117 }
118
119
120 /**
121  * AsyncTransport defines an asynchronous API for streaming I/O.
122  *
123  * This class provides an API to for asynchronously waiting for data
124  * on a streaming transport, and for asynchronously sending data.
125  *
126  * The APIs for reading and writing are intentionally asymmetric.  Waiting for
127  * data to read is a persistent API: a callback is installed, and is notified
128  * whenever new data is available.  It continues to be notified of new events
129  * until it is uninstalled.
130  *
131  * AsyncTransport does not provide read timeout functionality, because it
132  * typically cannot determine when the timeout should be active.  Generally, a
133  * timeout should only be enabled when processing is blocked waiting on data
134  * from the remote endpoint.  For server-side applications, the timeout should
135  * not be active if the server is currently processing one or more outstanding
136  * requests on this transport.  For client-side applications, the timeout
137  * should not be active if there are no requests pending on the transport.
138  * Additionally, if a client has multiple pending requests, it will ususally
139  * want a separate timeout for each request, rather than a single read timeout.
140  *
141  * The write API is fairly intuitive: a user can request to send a block of
142  * data, and a callback will be informed once the entire block has been
143  * transferred to the kernel, or on error.  AsyncTransport does provide a send
144  * timeout, since most callers want to give up if the remote end stops
145  * responding and no further progress can be made sending the data.
146  */
147 class AsyncTransport : public DelayedDestruction, public AsyncSocketBase {
148  public:
149   typedef std::unique_ptr<AsyncTransport, Destructor> UniquePtr;
150
151   /**
152    * Close the transport.
153    *
154    * This gracefully closes the transport, waiting for all pending write
155    * requests to complete before actually closing the underlying transport.
156    *
157    * If a read callback is set, readEOF() will be called immediately.  If there
158    * are outstanding write requests, the close will be delayed until all
159    * remaining writes have completed.  No new writes may be started after
160    * close() has been called.
161    */
162   virtual void close() = 0;
163
164   /**
165    * Close the transport immediately.
166    *
167    * This closes the transport immediately, dropping any outstanding data
168    * waiting to be written.
169    *
170    * If a read callback is set, readEOF() will be called immediately.
171    * If there are outstanding write requests, these requests will be aborted
172    * and writeError() will be invoked immediately on all outstanding write
173    * callbacks.
174    */
175   virtual void closeNow() = 0;
176
177   /**
178    * Reset the transport immediately.
179    *
180    * This closes the transport immediately, sending a reset to the remote peer
181    * if possible to indicate abnormal shutdown.
182    *
183    * Note that not all subclasses implement this reset functionality: some
184    * subclasses may treat reset() the same as closeNow().  Subclasses that use
185    * TCP transports should terminate the connection with a TCP reset.
186    */
187   virtual void closeWithReset() {
188     closeNow();
189   }
190
191   /**
192    * Perform a half-shutdown of the write side of the transport.
193    *
194    * The caller should not make any more calls to write() or writev() after
195    * shutdownWrite() is called.  Any future write attempts will fail
196    * immediately.
197    *
198    * Not all transport types support half-shutdown.  If the underlying
199    * transport does not support half-shutdown, it will fully shutdown both the
200    * read and write sides of the transport.  (Fully shutting down the socket is
201    * better than doing nothing at all, since the caller may rely on the
202    * shutdownWrite() call to notify the other end of the connection that no
203    * more data can be read.)
204    *
205    * If there is pending data still waiting to be written on the transport,
206    * the actual shutdown will be delayed until the pending data has been
207    * written.
208    *
209    * Note: There is no corresponding shutdownRead() equivalent.  Simply
210    * uninstall the read callback if you wish to stop reading.  (On TCP sockets
211    * at least, shutting down the read side of the socket is a no-op anyway.)
212    */
213   virtual void shutdownWrite() = 0;
214
215   /**
216    * Perform a half-shutdown of the write side of the transport.
217    *
218    * shutdownWriteNow() is identical to shutdownWrite(), except that it
219    * immediately performs the shutdown, rather than waiting for pending writes
220    * to complete.  Any pending write requests will be immediately failed when
221    * shutdownWriteNow() is called.
222    */
223   virtual void shutdownWriteNow() = 0;
224
225   /**
226    * Determine if transport is open and ready to read or write.
227    *
228    * Note that this function returns false on EOF; you must also call error()
229    * to distinguish between an EOF and an error.
230    *
231    * @return  true iff the transport is open and ready, false otherwise.
232    */
233   virtual bool good() const = 0;
234
235   /**
236    * Determine if the transport is readable or not.
237    *
238    * @return  true iff the transport is readable, false otherwise.
239    */
240   virtual bool readable() const = 0;
241
242   /**
243    * Determine if the there is pending data on the transport.
244    *
245    * @return  true iff the if the there is pending data, false otherwise.
246    */
247   virtual bool isPending() const {
248     return readable();
249   }
250
251   /**
252    * Determine if transport is connected to the endpoint
253    *
254    * @return  false iff the transport is connected, otherwise true
255    */
256   virtual bool connecting() const = 0;
257
258   /**
259    * Determine if an error has occurred with this transport.
260    *
261    * @return  true iff an error has occurred (not EOF).
262    */
263   virtual bool error() const = 0;
264
265   /**
266    * Attach the transport to a EventBase.
267    *
268    * This may only be called if the transport is not currently attached to a
269    * EventBase (by an earlier call to detachEventBase()).
270    *
271    * This method must be invoked in the EventBase's thread.
272    */
273   virtual void attachEventBase(EventBase* eventBase) = 0;
274
275   /**
276    * Detach the transport from its EventBase.
277    *
278    * This may only be called when the transport is idle and has no reads or
279    * writes pending.  Once detached, the transport may not be used again until
280    * it is re-attached to a EventBase by calling attachEventBase().
281    *
282    * This method must be called from the current EventBase's thread.
283    */
284   virtual void detachEventBase() = 0;
285
286   /**
287    * Determine if the transport can be detached.
288    *
289    * This method must be called from the current EventBase's thread.
290    */
291   virtual bool isDetachable() const = 0;
292
293   /**
294    * Set the send timeout.
295    *
296    * If write requests do not make any progress for more than the specified
297    * number of milliseconds, fail all pending writes and close the transport.
298    *
299    * If write requests are currently pending when setSendTimeout() is called,
300    * the timeout interval is immediately restarted using the new value.
301    *
302    * @param milliseconds  The timeout duration, in milliseconds.  If 0, no
303    *                      timeout will be used.
304    */
305   virtual void setSendTimeout(uint32_t milliseconds) = 0;
306
307   /**
308    * Get the send timeout.
309    *
310    * @return Returns the current send timeout, in milliseconds.  A return value
311    *         of 0 indicates that no timeout is set.
312    */
313   virtual uint32_t getSendTimeout() const = 0;
314
315   /**
316    * Get the address of the local endpoint of this transport.
317    *
318    * This function may throw AsyncSocketException on error.
319    *
320    * @param address  The local address will be stored in the specified
321    *                 SocketAddress.
322    */
323   virtual void getLocalAddress(SocketAddress* address) const = 0;
324
325   virtual void getAddress(SocketAddress* address) const {
326     getLocalAddress(address);
327   }
328
329   /**
330    * Get the address of the remote endpoint to which this transport is
331    * connected.
332    *
333    * This function may throw AsyncSocketException on error.
334    *
335    * @param address  The remote endpoint's address will be stored in the
336    *                 specified SocketAddress.
337    */
338   virtual void getPeerAddress(SocketAddress* address) const = 0;
339
340   /**
341    * Get the certificate used to authenticate the peer.
342    */
343   virtual ssl::X509UniquePtr getPeerCert() const { return nullptr; }
344
345   /**
346    * The local certificate used for this connection. May be null
347    */
348   virtual const X509* getSelfCert() const {
349     return nullptr;
350   }
351
352   /**
353    * @return True iff end of record tracking is enabled
354    */
355   virtual bool isEorTrackingEnabled() const = 0;
356
357   virtual void setEorTracking(bool track) = 0;
358
359   virtual size_t getAppBytesWritten() const = 0;
360   virtual size_t getRawBytesWritten() const = 0;
361   virtual size_t getAppBytesReceived() const = 0;
362   virtual size_t getRawBytesReceived() const = 0;
363
364   class BufferCallback {
365    public:
366     virtual ~BufferCallback() {}
367     virtual void onEgressBuffered() = 0;
368     virtual void onEgressBufferCleared() = 0;
369   };
370
371   /**
372    * Callback class to signal when a transport that did not have replay
373    * protection gains replay protection. This is needed for 0-RTT security
374    * protocols.
375    */
376   class ReplaySafetyCallback {
377    public:
378     virtual ~ReplaySafetyCallback() = default;
379
380     /**
381      * Called when the transport becomes replay safe.
382      */
383     virtual void onReplaySafe() = 0;
384   };
385
386   /**
387    * False if the transport does not have replay protection, but will in the
388    * future.
389    */
390   virtual bool isReplaySafe() const { return true; }
391
392   /**
393    * Set the ReplaySafeCallback on this transport.
394    *
395    * This should only be called if isReplaySafe() returns false.
396    */
397   virtual void setReplaySafetyCallback(ReplaySafetyCallback* callback) {
398     if (callback) {
399       CHECK(false) << "setReplaySafetyCallback() not supported";
400     }
401   }
402
403  protected:
404   virtual ~AsyncTransport() = default;
405 };
406
407 class AsyncReader {
408  public:
409   class ReadCallback {
410    public:
411     virtual ~ReadCallback() = default;
412
413     /**
414      * When data becomes available, getReadBuffer() will be invoked to get the
415      * buffer into which data should be read.
416      *
417      * This method allows the ReadCallback to delay buffer allocation until
418      * data becomes available.  This allows applications to manage large
419      * numbers of idle connections, without having to maintain a separate read
420      * buffer for each idle connection.
421      *
422      * It is possible that in some cases, getReadBuffer() may be called
423      * multiple times before readDataAvailable() is invoked.  In this case, the
424      * data will be written to the buffer returned from the most recent call to
425      * readDataAvailable().  If the previous calls to readDataAvailable()
426      * returned different buffers, the ReadCallback is responsible for ensuring
427      * that they are not leaked.
428      *
429      * If getReadBuffer() throws an exception, returns a nullptr buffer, or
430      * returns a 0 length, the ReadCallback will be uninstalled and its
431      * readError() method will be invoked.
432      *
433      * getReadBuffer() is not allowed to change the transport state before it
434      * returns.  (For example, it should never uninstall the read callback, or
435      * set a different read callback.)
436      *
437      * @param bufReturn getReadBuffer() should update *bufReturn to contain the
438      *                  address of the read buffer.  This parameter will never
439      *                  be nullptr.
440      * @param lenReturn getReadBuffer() should update *lenReturn to contain the
441      *                  maximum number of bytes that may be written to the read
442      *                  buffer.  This parameter will never be nullptr.
443      */
444     virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0;
445
446     /**
447      * readDataAvailable() will be invoked when data has been successfully read
448      * into the buffer returned by the last call to getReadBuffer().
449      *
450      * The read callback remains installed after readDataAvailable() returns.
451      * It must be explicitly uninstalled to stop receiving read events.
452      * getReadBuffer() will be called at least once before each call to
453      * readDataAvailable().  getReadBuffer() will also be called before any
454      * call to readEOF().
455      *
456      * @param len       The number of bytes placed in the buffer.
457      */
458
459     virtual void readDataAvailable(size_t len) noexcept = 0;
460
461     /**
462      * When data becomes available, isBufferMovable() will be invoked to figure
463      * out which API will be used, readBufferAvailable() or
464      * readDataAvailable(). If isBufferMovable() returns true, that means
465      * ReadCallback supports the IOBuf ownership transfer and
466      * readBufferAvailable() will be used.  Otherwise, not.
467
468      * By default, isBufferMovable() always return false. If
469      * readBufferAvailable() is implemented and to be invoked, You should
470      * overwrite isBufferMovable() and return true in the inherited class.
471      *
472      * This method allows the AsyncSocket/AsyncSSLSocket do buffer allocation by
473      * itself until data becomes available.  Compared with the pre/post buffer
474      * allocation in getReadBuffer()/readDataAvailabe(), readBufferAvailable()
475      * has two advantages.  First, this can avoid memcpy. E.g., in
476      * AsyncSSLSocket, the decrypted data was copied from the openssl internal
477      * buffer to the readbuf buffer.  With the buffer ownership transfer, the
478      * internal buffer can be directly "moved" to ReadCallback. Second, the
479      * memory allocation can be more precise.  The reason is
480      * AsyncSocket/AsyncSSLSocket can allocate the memory of precise size
481      * because they have more context about the available data than
482      * ReadCallback.  Think about the getReadBuffer() pre-allocate 4072 bytes
483      * buffer, but the available data is always 16KB (max OpenSSL record size).
484      */
485
486     virtual bool isBufferMovable() noexcept {
487       return false;
488     }
489
490     /**
491      * Suggested buffer size, allocated for read operations,
492      * if callback is movable and supports folly::IOBuf
493      */
494
495     virtual size_t maxBufferSize() const {
496       return 64 * 1024; // 64K
497     }
498
499     /**
500      * readBufferAvailable() will be invoked when data has been successfully
501      * read.
502      *
503      * Note that only either readBufferAvailable() or readDataAvailable() will
504      * be invoked according to the return value of isBufferMovable(). The timing
505      * and aftereffect of readBufferAvailable() are the same as
506      * readDataAvailable()
507      *
508      * @param readBuf The unique pointer of read buffer.
509      */
510
511     virtual void readBufferAvailable(std::unique_ptr<IOBuf> /*readBuf*/)
512       noexcept {}
513
514     /**
515      * readEOF() will be invoked when the transport is closed.
516      *
517      * The read callback will be automatically uninstalled immediately before
518      * readEOF() is invoked.
519      */
520     virtual void readEOF() noexcept = 0;
521
522     /**
523      * readError() will be invoked if an error occurs reading from the
524      * transport.
525      *
526      * The read callback will be automatically uninstalled immediately before
527      * readError() is invoked.
528      *
529      * @param ex        An exception describing the error that occurred.
530      */
531     virtual void readErr(const AsyncSocketException& ex) noexcept = 0;
532   };
533
534   // Read methods that aren't part of AsyncTransport.
535   virtual void setReadCB(ReadCallback* callback) = 0;
536   virtual ReadCallback* getReadCallback() const = 0;
537
538  protected:
539   virtual ~AsyncReader() = default;
540 };
541
542 class AsyncWriter {
543  public:
544   class WriteCallback {
545    public:
546     virtual ~WriteCallback() = default;
547
548     /**
549      * writeSuccess() will be invoked when all of the data has been
550      * successfully written.
551      *
552      * Note that this mainly signals that the buffer containing the data to
553      * write is no longer needed and may be freed or re-used.  It does not
554      * guarantee that the data has been fully transmitted to the remote
555      * endpoint.  For example, on socket-based transports, writeSuccess() only
556      * indicates that the data has been given to the kernel for eventual
557      * transmission.
558      */
559     virtual void writeSuccess() noexcept = 0;
560
561     /**
562      * writeError() will be invoked if an error occurs writing the data.
563      *
564      * @param bytesWritten      The number of bytes that were successfull
565      * @param ex                An exception describing the error that occurred.
566      */
567     virtual void writeErr(size_t bytesWritten,
568                           const AsyncSocketException& ex) noexcept = 0;
569   };
570
571   // Write methods that aren't part of AsyncTransport
572   virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
573                      WriteFlags flags = WriteFlags::NONE) = 0;
574   virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
575                       WriteFlags flags = WriteFlags::NONE) = 0;
576   virtual void writeChain(WriteCallback* callback,
577                           std::unique_ptr<IOBuf>&& buf,
578                           WriteFlags flags = WriteFlags::NONE) = 0;
579
580  protected:
581   virtual ~AsyncWriter() = default;
582 };
583
584 // Transitional intermediate interface. This is deprecated.
585 // Wrapper around folly::AsyncTransport, that includes read/write callbacks
586 class AsyncTransportWrapper : virtual public AsyncTransport,
587                               virtual public AsyncReader,
588                               virtual public AsyncWriter {
589  public:
590   using UniquePtr = std::unique_ptr<AsyncTransportWrapper, Destructor>;
591
592   // Alias for inherited members from AsyncReader and AsyncWriter
593   // to keep compatibility.
594   using ReadCallback    = AsyncReader::ReadCallback;
595   using WriteCallback   = AsyncWriter::WriteCallback;
596   virtual void setReadCB(ReadCallback* callback) override = 0;
597   virtual ReadCallback* getReadCallback() const override = 0;
598   virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
599                      WriteFlags flags = WriteFlags::NONE) override = 0;
600   virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
601                       WriteFlags flags = WriteFlags::NONE) override = 0;
602   virtual void writeChain(WriteCallback* callback,
603                           std::unique_ptr<IOBuf>&& buf,
604                           WriteFlags flags = WriteFlags::NONE) override = 0;
605   /**
606    * The transport wrapper may wrap another transport. This returns the
607    * transport that is wrapped. It returns nullptr if there is no wrapped
608    * transport.
609    */
610   virtual const AsyncTransportWrapper* getWrappedTransport() const {
611     return nullptr;
612   }
613
614   /**
615    * In many cases when we need to set socket properties or otherwise access the
616    * underlying transport from a wrapped transport. This method allows access to
617    * the derived classes of the underlying transport.
618    */
619   template <class T>
620   const T* getUnderlyingTransport() const {
621     const AsyncTransportWrapper* current = this;
622     while (current) {
623       auto sock = dynamic_cast<const T*>(current);
624       if (sock) {
625         return sock;
626       }
627       current = current->getWrappedTransport();
628     }
629     return nullptr;
630   }
631
632   template <class T>
633   T* getUnderlyingTransport() {
634     return const_cast<T*>(static_cast<const AsyncTransportWrapper*>(this)
635         ->getUnderlyingTransport<T>());
636   }
637
638   /**
639    * Return the application protocol being used by the underlying transport
640    * protocol. This is useful for transports which are used to tunnel other
641    * protocols.
642    */
643   virtual std::string getApplicationProtocol() noexcept {
644     return "";
645   }
646
647   /**
648    * Returns the name of the security protocol being used.
649    */
650   virtual std::string getSecurityProtocol() const { return ""; }
651 };
652
653 } // folly