Fix copyright lines
[folly.git] / folly / io / async / AsyncTransport.h
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <memory>
20
21 #include <folly/io/IOBuf.h>
22 #include <folly/io/async/AsyncSocketBase.h>
23 #include <folly/io/async/DelayedDestruction.h>
24 #include <folly/io/async/EventBase.h>
25 #include <folly/portability/OpenSSL.h>
26 #include <folly/portability/SysUio.h>
27 #include <folly/ssl/OpenSSLPtrTypes.h>
28
29 constexpr bool kOpenSslModeMoveBufferOwnership =
30 #ifdef SSL_MODE_MOVE_BUFFER_OWNERSHIP
31   true
32 #else
33   false
34 #endif
35 ;
36
37 namespace folly {
38
39 class AsyncSocketException;
40 class EventBase;
41 class SocketAddress;
42
43 /*
44  * flags given by the application for write* calls
45  */
46 enum class WriteFlags : uint32_t {
47   NONE = 0x00,
48   /*
49    * Whether to delay the output until a subsequent non-corked write.
50    * (Note: may not be supported in all subclasses or on all platforms.)
51    */
52   CORK = 0x01,
53   /*
54    * for a socket that has ACK latency enabled, it will cause the kernel
55    * to fire a TCP ESTATS event when the last byte of the given write call
56    * will be acknowledged.
57    */
58   EOR = 0x02,
59   /*
60    * this indicates that only the write side of socket should be shutdown
61    */
62   WRITE_SHUTDOWN = 0x04,
63   /*
64    * use msg zerocopy if allowed
65    */
66   WRITE_MSG_ZEROCOPY = 0x08,
67 };
68
69 /*
70  * union operator
71  */
72 inline WriteFlags operator|(WriteFlags a, WriteFlags b) {
73   return static_cast<WriteFlags>(
74     static_cast<uint32_t>(a) | static_cast<uint32_t>(b));
75 }
76
77 /*
78  * compound assignment union operator
79  */
80 inline WriteFlags& operator|=(WriteFlags& a, WriteFlags b) {
81   a = a | b;
82   return a;
83 }
84
85 /*
86  * intersection operator
87  */
88 inline WriteFlags operator&(WriteFlags a, WriteFlags b) {
89   return static_cast<WriteFlags>(
90     static_cast<uint32_t>(a) & static_cast<uint32_t>(b));
91 }
92
93 /*
94  * compound assignment intersection operator
95  */
96 inline WriteFlags& operator&=(WriteFlags& a, WriteFlags b) {
97   a = a & b;
98   return a;
99 }
100
101 /*
102  * exclusion parameter
103  */
104 inline WriteFlags operator~(WriteFlags a) {
105   return static_cast<WriteFlags>(~static_cast<uint32_t>(a));
106 }
107
108 /*
109  * unset operator
110  */
111 inline WriteFlags unSet(WriteFlags a, WriteFlags b) {
112   return a & ~b;
113 }
114
115 /*
116  * inclusion operator
117  */
118 inline bool isSet(WriteFlags a, WriteFlags b) {
119   return (a & b) == b;
120 }
121
122
123 /**
124  * AsyncTransport defines an asynchronous API for streaming I/O.
125  *
126  * This class provides an API to for asynchronously waiting for data
127  * on a streaming transport, and for asynchronously sending data.
128  *
129  * The APIs for reading and writing are intentionally asymmetric.  Waiting for
130  * data to read is a persistent API: a callback is installed, and is notified
131  * whenever new data is available.  It continues to be notified of new events
132  * until it is uninstalled.
133  *
134  * AsyncTransport does not provide read timeout functionality, because it
135  * typically cannot determine when the timeout should be active.  Generally, a
136  * timeout should only be enabled when processing is blocked waiting on data
137  * from the remote endpoint.  For server-side applications, the timeout should
138  * not be active if the server is currently processing one or more outstanding
139  * requests on this transport.  For client-side applications, the timeout
140  * should not be active if there are no requests pending on the transport.
141  * Additionally, if a client has multiple pending requests, it will ususally
142  * want a separate timeout for each request, rather than a single read timeout.
143  *
144  * The write API is fairly intuitive: a user can request to send a block of
145  * data, and a callback will be informed once the entire block has been
146  * transferred to the kernel, or on error.  AsyncTransport does provide a send
147  * timeout, since most callers want to give up if the remote end stops
148  * responding and no further progress can be made sending the data.
149  */
150 class AsyncTransport : public DelayedDestruction, public AsyncSocketBase {
151  public:
152   typedef std::unique_ptr<AsyncTransport, Destructor> UniquePtr;
153
154   /**
155    * Close the transport.
156    *
157    * This gracefully closes the transport, waiting for all pending write
158    * requests to complete before actually closing the underlying transport.
159    *
160    * If a read callback is set, readEOF() will be called immediately.  If there
161    * are outstanding write requests, the close will be delayed until all
162    * remaining writes have completed.  No new writes may be started after
163    * close() has been called.
164    */
165   virtual void close() = 0;
166
167   /**
168    * Close the transport immediately.
169    *
170    * This closes the transport immediately, dropping any outstanding data
171    * waiting to be written.
172    *
173    * If a read callback is set, readEOF() will be called immediately.
174    * If there are outstanding write requests, these requests will be aborted
175    * and writeError() will be invoked immediately on all outstanding write
176    * callbacks.
177    */
178   virtual void closeNow() = 0;
179
180   /**
181    * Reset the transport immediately.
182    *
183    * This closes the transport immediately, sending a reset to the remote peer
184    * if possible to indicate abnormal shutdown.
185    *
186    * Note that not all subclasses implement this reset functionality: some
187    * subclasses may treat reset() the same as closeNow().  Subclasses that use
188    * TCP transports should terminate the connection with a TCP reset.
189    */
190   virtual void closeWithReset() {
191     closeNow();
192   }
193
194   /**
195    * Perform a half-shutdown of the write side of the transport.
196    *
197    * The caller should not make any more calls to write() or writev() after
198    * shutdownWrite() is called.  Any future write attempts will fail
199    * immediately.
200    *
201    * Not all transport types support half-shutdown.  If the underlying
202    * transport does not support half-shutdown, it will fully shutdown both the
203    * read and write sides of the transport.  (Fully shutting down the socket is
204    * better than doing nothing at all, since the caller may rely on the
205    * shutdownWrite() call to notify the other end of the connection that no
206    * more data can be read.)
207    *
208    * If there is pending data still waiting to be written on the transport,
209    * the actual shutdown will be delayed until the pending data has been
210    * written.
211    *
212    * Note: There is no corresponding shutdownRead() equivalent.  Simply
213    * uninstall the read callback if you wish to stop reading.  (On TCP sockets
214    * at least, shutting down the read side of the socket is a no-op anyway.)
215    */
216   virtual void shutdownWrite() = 0;
217
218   /**
219    * Perform a half-shutdown of the write side of the transport.
220    *
221    * shutdownWriteNow() is identical to shutdownWrite(), except that it
222    * immediately performs the shutdown, rather than waiting for pending writes
223    * to complete.  Any pending write requests will be immediately failed when
224    * shutdownWriteNow() is called.
225    */
226   virtual void shutdownWriteNow() = 0;
227
228   /**
229    * Determine if transport is open and ready to read or write.
230    *
231    * Note that this function returns false on EOF; you must also call error()
232    * to distinguish between an EOF and an error.
233    *
234    * @return  true iff the transport is open and ready, false otherwise.
235    */
236   virtual bool good() const = 0;
237
238   /**
239    * Determine if the transport is readable or not.
240    *
241    * @return  true iff the transport is readable, false otherwise.
242    */
243   virtual bool readable() const = 0;
244
245   /**
246    * Determine if the transport is writable or not.
247    *
248    * @return  true iff the transport is writable, false otherwise.
249    */
250   virtual bool writable() const {
251     // By default return good() - leave it to implementers to override.
252     return good();
253   }
254
255   /**
256    * Determine if the there is pending data on the transport.
257    *
258    * @return  true iff the if the there is pending data, false otherwise.
259    */
260   virtual bool isPending() const {
261     return readable();
262   }
263
264   /**
265    * Determine if transport is connected to the endpoint
266    *
267    * @return  false iff the transport is connected, otherwise true
268    */
269   virtual bool connecting() const = 0;
270
271   /**
272    * Determine if an error has occurred with this transport.
273    *
274    * @return  true iff an error has occurred (not EOF).
275    */
276   virtual bool error() const = 0;
277
278   /**
279    * Attach the transport to a EventBase.
280    *
281    * This may only be called if the transport is not currently attached to a
282    * EventBase (by an earlier call to detachEventBase()).
283    *
284    * This method must be invoked in the EventBase's thread.
285    */
286   virtual void attachEventBase(EventBase* eventBase) = 0;
287
288   /**
289    * Detach the transport from its EventBase.
290    *
291    * This may only be called when the transport is idle and has no reads or
292    * writes pending.  Once detached, the transport may not be used again until
293    * it is re-attached to a EventBase by calling attachEventBase().
294    *
295    * This method must be called from the current EventBase's thread.
296    */
297   virtual void detachEventBase() = 0;
298
299   /**
300    * Determine if the transport can be detached.
301    *
302    * This method must be called from the current EventBase's thread.
303    */
304   virtual bool isDetachable() const = 0;
305
306   /**
307    * Set the send timeout.
308    *
309    * If write requests do not make any progress for more than the specified
310    * number of milliseconds, fail all pending writes and close the transport.
311    *
312    * If write requests are currently pending when setSendTimeout() is called,
313    * the timeout interval is immediately restarted using the new value.
314    *
315    * @param milliseconds  The timeout duration, in milliseconds.  If 0, no
316    *                      timeout will be used.
317    */
318   virtual void setSendTimeout(uint32_t milliseconds) = 0;
319
320   /**
321    * Get the send timeout.
322    *
323    * @return Returns the current send timeout, in milliseconds.  A return value
324    *         of 0 indicates that no timeout is set.
325    */
326   virtual uint32_t getSendTimeout() const = 0;
327
328   /**
329    * Get the address of the local endpoint of this transport.
330    *
331    * This function may throw AsyncSocketException on error.
332    *
333    * @param address  The local address will be stored in the specified
334    *                 SocketAddress.
335    */
336   virtual void getLocalAddress(SocketAddress* address) const = 0;
337
338   /**
339    * Get the address of the remote endpoint to which this transport is
340    * connected.
341    *
342    * This function may throw AsyncSocketException on error.
343    *
344    * @return         Return the local address
345    */
346   SocketAddress getLocalAddress() const {
347     SocketAddress addr;
348     getLocalAddress(&addr);
349     return addr;
350   }
351
352   void getAddress(SocketAddress* address) const override {
353     getLocalAddress(address);
354   }
355
356   /**
357    * Get the address of the remote endpoint to which this transport is
358    * connected.
359    *
360    * This function may throw AsyncSocketException on error.
361    *
362    * @param address  The remote endpoint's address will be stored in the
363    *                 specified SocketAddress.
364    */
365   virtual void getPeerAddress(SocketAddress* address) const = 0;
366
367   /**
368    * Get the address of the remote endpoint to which this transport is
369    * connected.
370    *
371    * This function may throw AsyncSocketException on error.
372    *
373    * @return         Return the remote endpoint's address
374    */
375   SocketAddress getPeerAddress() const {
376     SocketAddress addr;
377     getPeerAddress(&addr);
378     return addr;
379   }
380
381   /**
382    * Get the certificate used to authenticate the peer.
383    */
384   virtual ssl::X509UniquePtr getPeerCert() const { return nullptr; }
385
386   /**
387    * The local certificate used for this connection. May be null
388    */
389   virtual const X509* getSelfCert() const {
390     return nullptr;
391   }
392
393   /**
394    * Return the application protocol being used by the underlying transport
395    * protocol. This is useful for transports which are used to tunnel other
396    * protocols.
397    */
398   virtual std::string getApplicationProtocol() noexcept {
399     return "";
400   }
401
402   /**
403    * Returns the name of the security protocol being used.
404    */
405   virtual std::string getSecurityProtocol() const {
406     return "";
407   }
408
409   /**
410    * @return True iff end of record tracking is enabled
411    */
412   virtual bool isEorTrackingEnabled() const = 0;
413
414   virtual void setEorTracking(bool track) = 0;
415
416   virtual size_t getAppBytesWritten() const = 0;
417   virtual size_t getRawBytesWritten() const = 0;
418   virtual size_t getAppBytesReceived() const = 0;
419   virtual size_t getRawBytesReceived() const = 0;
420
421   class BufferCallback {
422    public:
423     virtual ~BufferCallback() {}
424     virtual void onEgressBuffered() = 0;
425     virtual void onEgressBufferCleared() = 0;
426   };
427
428   /**
429    * Callback class to signal when a transport that did not have replay
430    * protection gains replay protection. This is needed for 0-RTT security
431    * protocols.
432    */
433   class ReplaySafetyCallback {
434    public:
435     virtual ~ReplaySafetyCallback() = default;
436
437     /**
438      * Called when the transport becomes replay safe.
439      */
440     virtual void onReplaySafe() = 0;
441   };
442
443   /**
444    * False if the transport does not have replay protection, but will in the
445    * future.
446    */
447   virtual bool isReplaySafe() const { return true; }
448
449   /**
450    * Set the ReplaySafeCallback on this transport.
451    *
452    * This should only be called if isReplaySafe() returns false.
453    */
454   virtual void setReplaySafetyCallback(ReplaySafetyCallback* callback) {
455     if (callback) {
456       CHECK(false) << "setReplaySafetyCallback() not supported";
457     }
458   }
459
460  protected:
461   ~AsyncTransport() override = default;
462 };
463
464 class AsyncReader {
465  public:
466   class ReadCallback {
467    public:
468     virtual ~ReadCallback() = default;
469
470     /**
471      * When data becomes available, getReadBuffer() will be invoked to get the
472      * buffer into which data should be read.
473      *
474      * This method allows the ReadCallback to delay buffer allocation until
475      * data becomes available.  This allows applications to manage large
476      * numbers of idle connections, without having to maintain a separate read
477      * buffer for each idle connection.
478      *
479      * It is possible that in some cases, getReadBuffer() may be called
480      * multiple times before readDataAvailable() is invoked.  In this case, the
481      * data will be written to the buffer returned from the most recent call to
482      * readDataAvailable().  If the previous calls to readDataAvailable()
483      * returned different buffers, the ReadCallback is responsible for ensuring
484      * that they are not leaked.
485      *
486      * If getReadBuffer() throws an exception, returns a nullptr buffer, or
487      * returns a 0 length, the ReadCallback will be uninstalled and its
488      * readError() method will be invoked.
489      *
490      * getReadBuffer() is not allowed to change the transport state before it
491      * returns.  (For example, it should never uninstall the read callback, or
492      * set a different read callback.)
493      *
494      * @param bufReturn getReadBuffer() should update *bufReturn to contain the
495      *                  address of the read buffer.  This parameter will never
496      *                  be nullptr.
497      * @param lenReturn getReadBuffer() should update *lenReturn to contain the
498      *                  maximum number of bytes that may be written to the read
499      *                  buffer.  This parameter will never be nullptr.
500      */
501     virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0;
502
503     /**
504      * readDataAvailable() will be invoked when data has been successfully read
505      * into the buffer returned by the last call to getReadBuffer().
506      *
507      * The read callback remains installed after readDataAvailable() returns.
508      * It must be explicitly uninstalled to stop receiving read events.
509      * getReadBuffer() will be called at least once before each call to
510      * readDataAvailable().  getReadBuffer() will also be called before any
511      * call to readEOF().
512      *
513      * @param len       The number of bytes placed in the buffer.
514      */
515
516     virtual void readDataAvailable(size_t len) noexcept = 0;
517
518     /**
519      * When data becomes available, isBufferMovable() will be invoked to figure
520      * out which API will be used, readBufferAvailable() or
521      * readDataAvailable(). If isBufferMovable() returns true, that means
522      * ReadCallback supports the IOBuf ownership transfer and
523      * readBufferAvailable() will be used.  Otherwise, not.
524
525      * By default, isBufferMovable() always return false. If
526      * readBufferAvailable() is implemented and to be invoked, You should
527      * overwrite isBufferMovable() and return true in the inherited class.
528      *
529      * This method allows the AsyncSocket/AsyncSSLSocket do buffer allocation by
530      * itself until data becomes available.  Compared with the pre/post buffer
531      * allocation in getReadBuffer()/readDataAvailabe(), readBufferAvailable()
532      * has two advantages.  First, this can avoid memcpy. E.g., in
533      * AsyncSSLSocket, the decrypted data was copied from the openssl internal
534      * buffer to the readbuf buffer.  With the buffer ownership transfer, the
535      * internal buffer can be directly "moved" to ReadCallback. Second, the
536      * memory allocation can be more precise.  The reason is
537      * AsyncSocket/AsyncSSLSocket can allocate the memory of precise size
538      * because they have more context about the available data than
539      * ReadCallback.  Think about the getReadBuffer() pre-allocate 4072 bytes
540      * buffer, but the available data is always 16KB (max OpenSSL record size).
541      */
542
543     virtual bool isBufferMovable() noexcept {
544       return false;
545     }
546
547     /**
548      * Suggested buffer size, allocated for read operations,
549      * if callback is movable and supports folly::IOBuf
550      */
551
552     virtual size_t maxBufferSize() const {
553       return 64 * 1024; // 64K
554     }
555
556     /**
557      * readBufferAvailable() will be invoked when data has been successfully
558      * read.
559      *
560      * Note that only either readBufferAvailable() or readDataAvailable() will
561      * be invoked according to the return value of isBufferMovable(). The timing
562      * and aftereffect of readBufferAvailable() are the same as
563      * readDataAvailable()
564      *
565      * @param readBuf The unique pointer of read buffer.
566      */
567
568     virtual void readBufferAvailable(std::unique_ptr<IOBuf> /*readBuf*/)
569       noexcept {}
570
571     /**
572      * readEOF() will be invoked when the transport is closed.
573      *
574      * The read callback will be automatically uninstalled immediately before
575      * readEOF() is invoked.
576      */
577     virtual void readEOF() noexcept = 0;
578
579     /**
580      * readError() will be invoked if an error occurs reading from the
581      * transport.
582      *
583      * The read callback will be automatically uninstalled immediately before
584      * readError() is invoked.
585      *
586      * @param ex        An exception describing the error that occurred.
587      */
588     virtual void readErr(const AsyncSocketException& ex) noexcept = 0;
589   };
590
591   // Read methods that aren't part of AsyncTransport.
592   virtual void setReadCB(ReadCallback* callback) = 0;
593   virtual ReadCallback* getReadCallback() const = 0;
594
595  protected:
596   virtual ~AsyncReader() = default;
597 };
598
599 class AsyncWriter {
600  public:
601   class WriteCallback {
602    public:
603     virtual ~WriteCallback() = default;
604
605     /**
606      * writeSuccess() will be invoked when all of the data has been
607      * successfully written.
608      *
609      * Note that this mainly signals that the buffer containing the data to
610      * write is no longer needed and may be freed or re-used.  It does not
611      * guarantee that the data has been fully transmitted to the remote
612      * endpoint.  For example, on socket-based transports, writeSuccess() only
613      * indicates that the data has been given to the kernel for eventual
614      * transmission.
615      */
616     virtual void writeSuccess() noexcept = 0;
617
618     /**
619      * writeError() will be invoked if an error occurs writing the data.
620      *
621      * @param bytesWritten      The number of bytes that were successfull
622      * @param ex                An exception describing the error that occurred.
623      */
624     virtual void writeErr(size_t bytesWritten,
625                           const AsyncSocketException& ex) noexcept = 0;
626   };
627
628   // Write methods that aren't part of AsyncTransport
629   virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
630                      WriteFlags flags = WriteFlags::NONE) = 0;
631   virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
632                       WriteFlags flags = WriteFlags::NONE) = 0;
633   virtual void writeChain(WriteCallback* callback,
634                           std::unique_ptr<IOBuf>&& buf,
635                           WriteFlags flags = WriteFlags::NONE) = 0;
636
637  protected:
638   virtual ~AsyncWriter() = default;
639 };
640
641 // Transitional intermediate interface. This is deprecated.
642 // Wrapper around folly::AsyncTransport, that includes read/write callbacks
643 class AsyncTransportWrapper : virtual public AsyncTransport,
644                               virtual public AsyncReader,
645                               virtual public AsyncWriter {
646  public:
647   using UniquePtr = std::unique_ptr<AsyncTransportWrapper, Destructor>;
648
649   // Alias for inherited members from AsyncReader and AsyncWriter
650   // to keep compatibility.
651   using ReadCallback    = AsyncReader::ReadCallback;
652   using WriteCallback   = AsyncWriter::WriteCallback;
653   void setReadCB(ReadCallback* callback) override = 0;
654   ReadCallback* getReadCallback() const override = 0;
655   void write(
656       WriteCallback* callback,
657       const void* buf,
658       size_t bytes,
659       WriteFlags flags = WriteFlags::NONE) override = 0;
660   void writev(
661       WriteCallback* callback,
662       const iovec* vec,
663       size_t count,
664       WriteFlags flags = WriteFlags::NONE) override = 0;
665   void writeChain(
666       WriteCallback* callback,
667       std::unique_ptr<IOBuf>&& buf,
668       WriteFlags flags = WriteFlags::NONE) override = 0;
669   /**
670    * The transport wrapper may wrap another transport. This returns the
671    * transport that is wrapped. It returns nullptr if there is no wrapped
672    * transport.
673    */
674   virtual const AsyncTransportWrapper* getWrappedTransport() const {
675     return nullptr;
676   }
677
678   /**
679    * In many cases when we need to set socket properties or otherwise access the
680    * underlying transport from a wrapped transport. This method allows access to
681    * the derived classes of the underlying transport.
682    */
683   template <class T>
684   const T* getUnderlyingTransport() const {
685     const AsyncTransportWrapper* current = this;
686     while (current) {
687       auto sock = dynamic_cast<const T*>(current);
688       if (sock) {
689         return sock;
690       }
691       current = current->getWrappedTransport();
692     }
693     return nullptr;
694   }
695
696   template <class T>
697   T* getUnderlyingTransport() {
698     return const_cast<T*>(static_cast<const AsyncTransportWrapper*>(this)
699         ->getUnderlyingTransport<T>());
700   }
701 };
702
703 } // namespace folly