6505c287fcaa3faa277d8274f4d05ba1c057914b
[folly.git] / folly / io / async / AsyncTransport.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <memory>
20
21 #include <folly/io/IOBuf.h>
22 #include <folly/io/async/AsyncSocketBase.h>
23 #include <folly/io/async/DelayedDestruction.h>
24 #include <folly/io/async/EventBase.h>
25 #include <folly/io/async/ssl/OpenSSLPtrTypes.h>
26 #include <folly/portability/SysUio.h>
27
28 #include <openssl/ssl.h>
29
30 constexpr bool kOpenSslModeMoveBufferOwnership =
31 #ifdef SSL_MODE_MOVE_BUFFER_OWNERSHIP
32   true
33 #else
34   false
35 #endif
36 ;
37
38 namespace folly {
39
40 class AsyncSocketException;
41 class EventBase;
42 class SocketAddress;
43
44 /*
45  * flags given by the application for write* calls
46  */
47 enum class WriteFlags : uint32_t {
48   NONE = 0x00,
49   /*
50    * Whether to delay the output until a subsequent non-corked write.
51    * (Note: may not be supported in all subclasses or on all platforms.)
52    */
53   CORK = 0x01,
54   /*
55    * for a socket that has ACK latency enabled, it will cause the kernel
56    * to fire a TCP ESTATS event when the last byte of the given write call
57    * will be acknowledged.
58    */
59   EOR = 0x02,
60   /*
61    * this indicates that only the write side of socket should be shutdown
62    */
63   WRITE_SHUTDOWN = 0x04,
64 };
65
66 /*
67  * union operator
68  */
69 inline WriteFlags operator|(WriteFlags a, WriteFlags b) {
70   return static_cast<WriteFlags>(
71     static_cast<uint32_t>(a) | static_cast<uint32_t>(b));
72 }
73
74 /*
75  * compound assignment union operator
76  */
77 inline WriteFlags& operator|=(WriteFlags& a, WriteFlags b) {
78   a = a | b;
79   return a;
80 }
81
82 /*
83  * intersection operator
84  */
85 inline WriteFlags operator&(WriteFlags a, WriteFlags b) {
86   return static_cast<WriteFlags>(
87     static_cast<uint32_t>(a) & static_cast<uint32_t>(b));
88 }
89
90 /*
91  * compound assignment intersection operator
92  */
93 inline WriteFlags& operator&=(WriteFlags& a, WriteFlags b) {
94   a = a & b;
95   return a;
96 }
97
98 /*
99  * exclusion parameter
100  */
101 inline WriteFlags operator~(WriteFlags a) {
102   return static_cast<WriteFlags>(~static_cast<uint32_t>(a));
103 }
104
105 /*
106  * unset operator
107  */
108 inline WriteFlags unSet(WriteFlags a, WriteFlags b) {
109   return a & ~b;
110 }
111
112 /*
113  * inclusion operator
114  */
115 inline bool isSet(WriteFlags a, WriteFlags b) {
116   return (a & b) == b;
117 }
118
119
120 /**
121  * AsyncTransport defines an asynchronous API for streaming I/O.
122  *
123  * This class provides an API to for asynchronously waiting for data
124  * on a streaming transport, and for asynchronously sending data.
125  *
126  * The APIs for reading and writing are intentionally asymmetric.  Waiting for
127  * data to read is a persistent API: a callback is installed, and is notified
128  * whenever new data is available.  It continues to be notified of new events
129  * until it is uninstalled.
130  *
131  * AsyncTransport does not provide read timeout functionality, because it
132  * typically cannot determine when the timeout should be active.  Generally, a
133  * timeout should only be enabled when processing is blocked waiting on data
134  * from the remote endpoint.  For server-side applications, the timeout should
135  * not be active if the server is currently processing one or more outstanding
136  * requests on this transport.  For client-side applications, the timeout
137  * should not be active if there are no requests pending on the transport.
138  * Additionally, if a client has multiple pending requests, it will ususally
139  * want a separate timeout for each request, rather than a single read timeout.
140  *
141  * The write API is fairly intuitive: a user can request to send a block of
142  * data, and a callback will be informed once the entire block has been
143  * transferred to the kernel, or on error.  AsyncTransport does provide a send
144  * timeout, since most callers want to give up if the remote end stops
145  * responding and no further progress can be made sending the data.
146  */
147 class AsyncTransport : public DelayedDestruction, public AsyncSocketBase {
148  public:
149   typedef std::unique_ptr<AsyncTransport, Destructor> UniquePtr;
150
151   /**
152    * Close the transport.
153    *
154    * This gracefully closes the transport, waiting for all pending write
155    * requests to complete before actually closing the underlying transport.
156    *
157    * If a read callback is set, readEOF() will be called immediately.  If there
158    * are outstanding write requests, the close will be delayed until all
159    * remaining writes have completed.  No new writes may be started after
160    * close() has been called.
161    */
162   virtual void close() = 0;
163
164   /**
165    * Close the transport immediately.
166    *
167    * This closes the transport immediately, dropping any outstanding data
168    * waiting to be written.
169    *
170    * If a read callback is set, readEOF() will be called immediately.
171    * If there are outstanding write requests, these requests will be aborted
172    * and writeError() will be invoked immediately on all outstanding write
173    * callbacks.
174    */
175   virtual void closeNow() = 0;
176
177   /**
178    * Reset the transport immediately.
179    *
180    * This closes the transport immediately, sending a reset to the remote peer
181    * if possible to indicate abnormal shutdown.
182    *
183    * Note that not all subclasses implement this reset functionality: some
184    * subclasses may treat reset() the same as closeNow().  Subclasses that use
185    * TCP transports should terminate the connection with a TCP reset.
186    */
187   virtual void closeWithReset() {
188     closeNow();
189   }
190
191   /**
192    * Perform a half-shutdown of the write side of the transport.
193    *
194    * The caller should not make any more calls to write() or writev() after
195    * shutdownWrite() is called.  Any future write attempts will fail
196    * immediately.
197    *
198    * Not all transport types support half-shutdown.  If the underlying
199    * transport does not support half-shutdown, it will fully shutdown both the
200    * read and write sides of the transport.  (Fully shutting down the socket is
201    * better than doing nothing at all, since the caller may rely on the
202    * shutdownWrite() call to notify the other end of the connection that no
203    * more data can be read.)
204    *
205    * If there is pending data still waiting to be written on the transport,
206    * the actual shutdown will be delayed until the pending data has been
207    * written.
208    *
209    * Note: There is no corresponding shutdownRead() equivalent.  Simply
210    * uninstall the read callback if you wish to stop reading.  (On TCP sockets
211    * at least, shutting down the read side of the socket is a no-op anyway.)
212    */
213   virtual void shutdownWrite() = 0;
214
215   /**
216    * Perform a half-shutdown of the write side of the transport.
217    *
218    * shutdownWriteNow() is identical to shutdownWrite(), except that it
219    * immediately performs the shutdown, rather than waiting for pending writes
220    * to complete.  Any pending write requests will be immediately failed when
221    * shutdownWriteNow() is called.
222    */
223   virtual void shutdownWriteNow() = 0;
224
225   /**
226    * Determine if transport is open and ready to read or write.
227    *
228    * Note that this function returns false on EOF; you must also call error()
229    * to distinguish between an EOF and an error.
230    *
231    * @return  true iff the transport is open and ready, false otherwise.
232    */
233   virtual bool good() const = 0;
234
235   /**
236    * Determine if the transport is readable or not.
237    *
238    * @return  true iff the transport is readable, false otherwise.
239    */
240   virtual bool readable() const = 0;
241
242   /**
243    * Determine if the there is pending data on the transport.
244    *
245    * @return  true iff the if the there is pending data, false otherwise.
246    */
247   virtual bool isPending() const {
248     return readable();
249   }
250
251   /**
252    * Determine if transport is connected to the endpoint
253    *
254    * @return  false iff the transport is connected, otherwise true
255    */
256   virtual bool connecting() const = 0;
257
258   /**
259    * Determine if an error has occurred with this transport.
260    *
261    * @return  true iff an error has occurred (not EOF).
262    */
263   virtual bool error() const = 0;
264
265   /**
266    * Attach the transport to a EventBase.
267    *
268    * This may only be called if the transport is not currently attached to a
269    * EventBase (by an earlier call to detachEventBase()).
270    *
271    * This method must be invoked in the EventBase's thread.
272    */
273   virtual void attachEventBase(EventBase* eventBase) = 0;
274
275   /**
276    * Detach the transport from its EventBase.
277    *
278    * This may only be called when the transport is idle and has no reads or
279    * writes pending.  Once detached, the transport may not be used again until
280    * it is re-attached to a EventBase by calling attachEventBase().
281    *
282    * This method must be called from the current EventBase's thread.
283    */
284   virtual void detachEventBase() = 0;
285
286   /**
287    * Determine if the transport can be detached.
288    *
289    * This method must be called from the current EventBase's thread.
290    */
291   virtual bool isDetachable() const = 0;
292
293   /**
294    * Set the send timeout.
295    *
296    * If write requests do not make any progress for more than the specified
297    * number of milliseconds, fail all pending writes and close the transport.
298    *
299    * If write requests are currently pending when setSendTimeout() is called,
300    * the timeout interval is immediately restarted using the new value.
301    *
302    * @param milliseconds  The timeout duration, in milliseconds.  If 0, no
303    *                      timeout will be used.
304    */
305   virtual void setSendTimeout(uint32_t milliseconds) = 0;
306
307   /**
308    * Get the send timeout.
309    *
310    * @return Returns the current send timeout, in milliseconds.  A return value
311    *         of 0 indicates that no timeout is set.
312    */
313   virtual uint32_t getSendTimeout() const = 0;
314
315   /**
316    * Get the address of the local endpoint of this transport.
317    *
318    * This function may throw AsyncSocketException on error.
319    *
320    * @param address  The local address will be stored in the specified
321    *                 SocketAddress.
322    */
323   virtual void getLocalAddress(SocketAddress* address) const = 0;
324
325   /**
326    * Get the address of the remote endpoint to which this transport is
327    * connected.
328    *
329    * This function may throw AsyncSocketException on error.
330    *
331    * @return         Return the local address
332    */
333   SocketAddress getLocalAddress() const {
334     SocketAddress addr;
335     getLocalAddress(&addr);
336     return addr;
337   }
338
339   virtual void getAddress(SocketAddress* address) const {
340     getLocalAddress(address);
341   }
342
343   /**
344    * Get the address of the remote endpoint to which this transport is
345    * connected.
346    *
347    * This function may throw AsyncSocketException on error.
348    *
349    * @param address  The remote endpoint's address will be stored in the
350    *                 specified SocketAddress.
351    */
352   virtual void getPeerAddress(SocketAddress* address) const = 0;
353
354   /**
355    * Get the address of the remote endpoint to which this transport is
356    * connected.
357    *
358    * This function may throw AsyncSocketException on error.
359    *
360    * @return         Return the remote endpoint's address
361    */
362   SocketAddress getPeerAddress() const {
363     SocketAddress addr;
364     getPeerAddress(&addr);
365     return addr;
366   }
367
368   /**
369    * Get the certificate used to authenticate the peer.
370    */
371   virtual ssl::X509UniquePtr getPeerCert() const { return nullptr; }
372
373   /**
374    * The local certificate used for this connection. May be null
375    */
376   virtual const X509* getSelfCert() const {
377     return nullptr;
378   }
379
380   /**
381    * @return True iff end of record tracking is enabled
382    */
383   virtual bool isEorTrackingEnabled() const = 0;
384
385   virtual void setEorTracking(bool track) = 0;
386
387   virtual size_t getAppBytesWritten() const = 0;
388   virtual size_t getRawBytesWritten() const = 0;
389   virtual size_t getAppBytesReceived() const = 0;
390   virtual size_t getRawBytesReceived() const = 0;
391
392   class BufferCallback {
393    public:
394     virtual ~BufferCallback() {}
395     virtual void onEgressBuffered() = 0;
396     virtual void onEgressBufferCleared() = 0;
397   };
398
399   /**
400    * Callback class to signal when a transport that did not have replay
401    * protection gains replay protection. This is needed for 0-RTT security
402    * protocols.
403    */
404   class ReplaySafetyCallback {
405    public:
406     virtual ~ReplaySafetyCallback() = default;
407
408     /**
409      * Called when the transport becomes replay safe.
410      */
411     virtual void onReplaySafe() = 0;
412   };
413
414   /**
415    * False if the transport does not have replay protection, but will in the
416    * future.
417    */
418   virtual bool isReplaySafe() const { return true; }
419
420   /**
421    * Set the ReplaySafeCallback on this transport.
422    *
423    * This should only be called if isReplaySafe() returns false.
424    */
425   virtual void setReplaySafetyCallback(ReplaySafetyCallback* callback) {
426     if (callback) {
427       CHECK(false) << "setReplaySafetyCallback() not supported";
428     }
429   }
430
431  protected:
432   virtual ~AsyncTransport() = default;
433 };
434
435 class AsyncReader {
436  public:
437   class ReadCallback {
438    public:
439     virtual ~ReadCallback() = default;
440
441     /**
442      * When data becomes available, getReadBuffer() will be invoked to get the
443      * buffer into which data should be read.
444      *
445      * This method allows the ReadCallback to delay buffer allocation until
446      * data becomes available.  This allows applications to manage large
447      * numbers of idle connections, without having to maintain a separate read
448      * buffer for each idle connection.
449      *
450      * It is possible that in some cases, getReadBuffer() may be called
451      * multiple times before readDataAvailable() is invoked.  In this case, the
452      * data will be written to the buffer returned from the most recent call to
453      * readDataAvailable().  If the previous calls to readDataAvailable()
454      * returned different buffers, the ReadCallback is responsible for ensuring
455      * that they are not leaked.
456      *
457      * If getReadBuffer() throws an exception, returns a nullptr buffer, or
458      * returns a 0 length, the ReadCallback will be uninstalled and its
459      * readError() method will be invoked.
460      *
461      * getReadBuffer() is not allowed to change the transport state before it
462      * returns.  (For example, it should never uninstall the read callback, or
463      * set a different read callback.)
464      *
465      * @param bufReturn getReadBuffer() should update *bufReturn to contain the
466      *                  address of the read buffer.  This parameter will never
467      *                  be nullptr.
468      * @param lenReturn getReadBuffer() should update *lenReturn to contain the
469      *                  maximum number of bytes that may be written to the read
470      *                  buffer.  This parameter will never be nullptr.
471      */
472     virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0;
473
474     /**
475      * readDataAvailable() will be invoked when data has been successfully read
476      * into the buffer returned by the last call to getReadBuffer().
477      *
478      * The read callback remains installed after readDataAvailable() returns.
479      * It must be explicitly uninstalled to stop receiving read events.
480      * getReadBuffer() will be called at least once before each call to
481      * readDataAvailable().  getReadBuffer() will also be called before any
482      * call to readEOF().
483      *
484      * @param len       The number of bytes placed in the buffer.
485      */
486
487     virtual void readDataAvailable(size_t len) noexcept = 0;
488
489     /**
490      * When data becomes available, isBufferMovable() will be invoked to figure
491      * out which API will be used, readBufferAvailable() or
492      * readDataAvailable(). If isBufferMovable() returns true, that means
493      * ReadCallback supports the IOBuf ownership transfer and
494      * readBufferAvailable() will be used.  Otherwise, not.
495
496      * By default, isBufferMovable() always return false. If
497      * readBufferAvailable() is implemented and to be invoked, You should
498      * overwrite isBufferMovable() and return true in the inherited class.
499      *
500      * This method allows the AsyncSocket/AsyncSSLSocket do buffer allocation by
501      * itself until data becomes available.  Compared with the pre/post buffer
502      * allocation in getReadBuffer()/readDataAvailabe(), readBufferAvailable()
503      * has two advantages.  First, this can avoid memcpy. E.g., in
504      * AsyncSSLSocket, the decrypted data was copied from the openssl internal
505      * buffer to the readbuf buffer.  With the buffer ownership transfer, the
506      * internal buffer can be directly "moved" to ReadCallback. Second, the
507      * memory allocation can be more precise.  The reason is
508      * AsyncSocket/AsyncSSLSocket can allocate the memory of precise size
509      * because they have more context about the available data than
510      * ReadCallback.  Think about the getReadBuffer() pre-allocate 4072 bytes
511      * buffer, but the available data is always 16KB (max OpenSSL record size).
512      */
513
514     virtual bool isBufferMovable() noexcept {
515       return false;
516     }
517
518     /**
519      * Suggested buffer size, allocated for read operations,
520      * if callback is movable and supports folly::IOBuf
521      */
522
523     virtual size_t maxBufferSize() const {
524       return 64 * 1024; // 64K
525     }
526
527     /**
528      * readBufferAvailable() will be invoked when data has been successfully
529      * read.
530      *
531      * Note that only either readBufferAvailable() or readDataAvailable() will
532      * be invoked according to the return value of isBufferMovable(). The timing
533      * and aftereffect of readBufferAvailable() are the same as
534      * readDataAvailable()
535      *
536      * @param readBuf The unique pointer of read buffer.
537      */
538
539     virtual void readBufferAvailable(std::unique_ptr<IOBuf> /*readBuf*/)
540       noexcept {}
541
542     /**
543      * readEOF() will be invoked when the transport is closed.
544      *
545      * The read callback will be automatically uninstalled immediately before
546      * readEOF() is invoked.
547      */
548     virtual void readEOF() noexcept = 0;
549
550     /**
551      * readError() will be invoked if an error occurs reading from the
552      * transport.
553      *
554      * The read callback will be automatically uninstalled immediately before
555      * readError() is invoked.
556      *
557      * @param ex        An exception describing the error that occurred.
558      */
559     virtual void readErr(const AsyncSocketException& ex) noexcept = 0;
560   };
561
562   // Read methods that aren't part of AsyncTransport.
563   virtual void setReadCB(ReadCallback* callback) = 0;
564   virtual ReadCallback* getReadCallback() const = 0;
565
566  protected:
567   virtual ~AsyncReader() = default;
568 };
569
570 class AsyncWriter {
571  public:
572   class WriteCallback {
573    public:
574     virtual ~WriteCallback() = default;
575
576     /**
577      * writeSuccess() will be invoked when all of the data has been
578      * successfully written.
579      *
580      * Note that this mainly signals that the buffer containing the data to
581      * write is no longer needed and may be freed or re-used.  It does not
582      * guarantee that the data has been fully transmitted to the remote
583      * endpoint.  For example, on socket-based transports, writeSuccess() only
584      * indicates that the data has been given to the kernel for eventual
585      * transmission.
586      */
587     virtual void writeSuccess() noexcept = 0;
588
589     /**
590      * writeError() will be invoked if an error occurs writing the data.
591      *
592      * @param bytesWritten      The number of bytes that were successfull
593      * @param ex                An exception describing the error that occurred.
594      */
595     virtual void writeErr(size_t bytesWritten,
596                           const AsyncSocketException& ex) noexcept = 0;
597   };
598
599   // Write methods that aren't part of AsyncTransport
600   virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
601                      WriteFlags flags = WriteFlags::NONE) = 0;
602   virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
603                       WriteFlags flags = WriteFlags::NONE) = 0;
604   virtual void writeChain(WriteCallback* callback,
605                           std::unique_ptr<IOBuf>&& buf,
606                           WriteFlags flags = WriteFlags::NONE) = 0;
607
608  protected:
609   virtual ~AsyncWriter() = default;
610 };
611
612 // Transitional intermediate interface. This is deprecated.
613 // Wrapper around folly::AsyncTransport, that includes read/write callbacks
614 class AsyncTransportWrapper : virtual public AsyncTransport,
615                               virtual public AsyncReader,
616                               virtual public AsyncWriter {
617  public:
618   using UniquePtr = std::unique_ptr<AsyncTransportWrapper, Destructor>;
619
620   // Alias for inherited members from AsyncReader and AsyncWriter
621   // to keep compatibility.
622   using ReadCallback    = AsyncReader::ReadCallback;
623   using WriteCallback   = AsyncWriter::WriteCallback;
624   virtual void setReadCB(ReadCallback* callback) override = 0;
625   virtual ReadCallback* getReadCallback() const override = 0;
626   virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
627                      WriteFlags flags = WriteFlags::NONE) override = 0;
628   virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
629                       WriteFlags flags = WriteFlags::NONE) override = 0;
630   virtual void writeChain(WriteCallback* callback,
631                           std::unique_ptr<IOBuf>&& buf,
632                           WriteFlags flags = WriteFlags::NONE) override = 0;
633   /**
634    * The transport wrapper may wrap another transport. This returns the
635    * transport that is wrapped. It returns nullptr if there is no wrapped
636    * transport.
637    */
638   virtual const AsyncTransportWrapper* getWrappedTransport() const {
639     return nullptr;
640   }
641
642   /**
643    * In many cases when we need to set socket properties or otherwise access the
644    * underlying transport from a wrapped transport. This method allows access to
645    * the derived classes of the underlying transport.
646    */
647   template <class T>
648   const T* getUnderlyingTransport() const {
649     const AsyncTransportWrapper* current = this;
650     while (current) {
651       auto sock = dynamic_cast<const T*>(current);
652       if (sock) {
653         return sock;
654       }
655       current = current->getWrappedTransport();
656     }
657     return nullptr;
658   }
659
660   template <class T>
661   T* getUnderlyingTransport() {
662     return const_cast<T*>(static_cast<const AsyncTransportWrapper*>(this)
663         ->getUnderlyingTransport<T>());
664   }
665
666   /**
667    * Return the application protocol being used by the underlying transport
668    * protocol. This is useful for transports which are used to tunnel other
669    * protocols.
670    */
671   virtual std::string getApplicationProtocol() noexcept {
672     return "";
673   }
674
675   /**
676    * Returns the name of the security protocol being used.
677    */
678   virtual std::string getSecurityProtocol() const { return ""; }
679 };
680
681 } // folly