79fd4cc789f0c4a3b2b70d92f1142f774cd1d42e
[folly.git] / folly / io / Cursor.h
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_CURSOR_H
18 #define FOLLY_CURSOR_H
19
20 #include <assert.h>
21 #include <stdexcept>
22 #include <string.h>
23 #include <type_traits>
24 #include <memory>
25
26 #include "folly/Bits.h"
27 #include "folly/io/IOBuf.h"
28 #include "folly/io/IOBufQueue.h"
29 #include "folly/Likely.h"
30
31 /**
32  * Cursor class for fast iteration over IOBuf chains.
33  *
34  * Cursor - Read-only access
35  *
36  * RWPrivateCursor - Read-write access, assumes private access to IOBuf chain
37  * RWUnshareCursor - Read-write access, calls unshare on write (COW)
38  * Appender        - Write access, assumes private access to IOBuf chian
39  *
40  * Note that RW cursors write in the preallocated part of buffers (that is,
41  * between the buffer's data() and tail()), while Appenders append to the end
42  * of the buffer (between the buffer's tail() and bufferEnd()).  Appenders
43  * automatically adjust the buffer pointers, so you may only use one
44  * Appender with a buffer chain; for this reason, Appenders assume private
45  * access to the buffer (you need to call unshare() yourself if necessary).
46  **/
47 namespace folly { namespace io {
48 namespace detail {
49
50 template <class Derived, typename BufType>
51 class CursorBase {
52  public:
53   const uint8_t* data() const {
54     return crtBuf_->data() + offset_;
55   }
56
57   // Space available in the current IOBuf.  May be 0; use peek() instead which
58   // will always point to a non-empty chunk of data or at the end of the
59   // chain.
60   size_t length() const {
61     return crtBuf_->length() - offset_;
62   }
63
64   Derived& operator+=(size_t offset) {
65     Derived* p = static_cast<Derived*>(this);
66     p->skip(offset);
67     return *p;
68   }
69
70   template <class T>
71   typename std::enable_if<std::is_integral<T>::value, T>::type
72   read() {
73     T val;
74     pull(&val, sizeof(T));
75     return val;
76   }
77
78   template <class T>
79   T readBE() {
80     return Endian::big(read<T>());
81   }
82
83   template <class T>
84   T readLE() {
85     return Endian::little(read<T>());
86   }
87
88   /**
89    * Read a fixed-length string.
90    *
91    * The std::string-based APIs should probably be avoided unless you
92    * ultimately want the data to live in an std::string. You're better off
93    * using the pull() APIs to copy into a raw buffer otherwise.
94    */
95   std::string readFixedString(size_t len) {
96     std::string str;
97
98     str.reserve(len);
99     for (;;) {
100       // Fast path: it all fits in one buffer.
101       size_t available = length();
102       if (LIKELY(available >= len)) {
103         str.append(reinterpret_cast<const char*>(data()), len);
104         offset_ += len;
105         return str;
106       }
107
108       str.append(reinterpret_cast<const char*>(data()), available);
109       if (UNLIKELY(!tryAdvanceBuffer())) {
110         throw std::out_of_range("string underflow");
111       }
112       len -= available;
113     }
114   }
115
116   /**
117    * Read a string consisting of bytes until the given terminator character is
118    * seen. Raises an std::length_error if maxLength bytes have been processed
119    * before the terminator is seen.
120    *
121    * See comments in readFixedString() about when it's appropriate to use this
122    * vs. using pull().
123    */
124   std::string readTerminatedString(
125     char termChar = '\0',
126     size_t maxLength = std::numeric_limits<size_t>::max()) {
127     std::string str;
128
129     for (;;) {
130       const uint8_t* buf = data();
131       size_t buflen = length();
132
133       size_t i = 0;
134       while (i < buflen && buf[i] != termChar) {
135         ++i;
136
137         // Do this check after incrementing 'i', as even though we start at the
138         // 0 byte, it still represents a single character
139         if (str.length() + i >= maxLength) {
140           throw std::length_error("string overflow");
141         }
142       }
143
144       str.append(reinterpret_cast<const char*>(buf), i);
145       if (i < buflen) {
146         skip(i + 1);
147         return str;
148       }
149
150       skip(i);
151
152       if (UNLIKELY(!tryAdvanceBuffer())) {
153         throw std::out_of_range("string underflow");
154       }
155     }
156   }
157
158   explicit CursorBase(BufType* buf)
159     : crtBuf_(buf)
160     , offset_(0)
161     , buffer_(buf) {}
162
163   // Make all the templated classes friends for copy constructor.
164   template <class D, typename B> friend class CursorBase;
165
166   template <class T>
167   explicit CursorBase(const T& cursor) {
168     crtBuf_ = cursor.crtBuf_;
169     offset_ = cursor.offset_;
170     buffer_ = cursor.buffer_;
171   }
172
173   // reset cursor to point to a new buffer.
174   void reset(BufType* buf) {
175     crtBuf_ = buf;
176     buffer_ = buf;
177     offset_ = 0;
178   }
179
180   /**
181    * Return the available data in the current buffer.
182    * If you want to gather more data from the chain into a contiguous region
183    * (for hopefully zero-copy access), use gather() before peek().
184    */
185   std::pair<const uint8_t*, size_t> peek() {
186     // Ensure that we're pointing to valid data
187     size_t available = length();
188     while (UNLIKELY(available == 0 && tryAdvanceBuffer())) {
189       available = length();
190     }
191
192     return std::make_pair(data(), available);
193   }
194
195   void pull(void* buf, size_t len) {
196     if (UNLIKELY(pullAtMost(buf, len) != len)) {
197       throw std::out_of_range("underflow");
198     }
199   }
200
201   void clone(std::unique_ptr<folly::IOBuf>& buf, size_t len) {
202     if (UNLIKELY(cloneAtMost(buf, len) != len)) {
203       throw std::out_of_range("underflow");
204     }
205   }
206
207   void skip(size_t len) {
208     if (UNLIKELY(skipAtMost(len) != len)) {
209       throw std::out_of_range("underflow");
210     }
211   }
212
213   size_t pullAtMost(void* buf, size_t len) {
214     uint8_t* p = reinterpret_cast<uint8_t*>(buf);
215     size_t copied = 0;
216     for (;;) {
217       // Fast path: it all fits in one buffer.
218       size_t available = length();
219       if (LIKELY(available >= len)) {
220         memcpy(p, data(), len);
221         offset_ += len;
222         return copied + len;
223       }
224
225       memcpy(p, data(), available);
226       copied += available;
227       if (UNLIKELY(!tryAdvanceBuffer())) {
228         return copied;
229       }
230       p += available;
231       len -= available;
232     }
233   }
234
235   size_t cloneAtMost(std::unique_ptr<folly::IOBuf>& buf, size_t len) {
236     buf.reset(nullptr);
237
238     std::unique_ptr<folly::IOBuf> tmp;
239     size_t copied = 0;
240     for (;;) {
241       // Fast path: it all fits in one buffer.
242       size_t available = length();
243       if (LIKELY(available >= len)) {
244         tmp = crtBuf_->cloneOne();
245         tmp->trimStart(offset_);
246         tmp->trimEnd(tmp->length() - len);
247         offset_ += len;
248         if (!buf) {
249           buf = std::move(tmp);
250         } else {
251           buf->prependChain(std::move(tmp));
252         }
253         return copied + len;
254       }
255
256       tmp = crtBuf_->cloneOne();
257       tmp->trimStart(offset_);
258       if (!buf) {
259         buf = std::move(tmp);
260       } else {
261         buf->prependChain(std::move(tmp));
262       }
263
264       copied += available;
265       if (UNLIKELY(!tryAdvanceBuffer())) {
266         return copied;
267       }
268       len -= available;
269     }
270   }
271
272   size_t skipAtMost(size_t len) {
273     size_t skipped = 0;
274     for (;;) {
275       // Fast path: it all fits in one buffer.
276       size_t available = length();
277       if (LIKELY(available >= len)) {
278         offset_ += len;
279         return skipped + len;
280       }
281
282       skipped += available;
283       if (UNLIKELY(!tryAdvanceBuffer())) {
284         return skipped;
285       }
286       len -= available;
287     }
288   }
289
290   /**
291    * Return the distance between two cursors.
292    */
293   size_t operator-(const CursorBase& other) const {
294     BufType *otherBuf = other.crtBuf_;
295     size_t len = 0;
296
297     if (otherBuf != crtBuf_) {
298       len += otherBuf->length() - other.offset_;
299
300       for (otherBuf = otherBuf->next();
301            otherBuf != crtBuf_ && otherBuf != other.buffer_;
302            otherBuf = otherBuf->next()) {
303         len += otherBuf->length();
304       }
305
306       if (otherBuf == other.buffer_) {
307         throw std::out_of_range("wrap-around");
308       }
309
310       len += offset_;
311     } else {
312       if (offset_ < other.offset_) {
313         throw std::out_of_range("underflow");
314       }
315
316       len += offset_ - other.offset_;
317     }
318
319     return len;
320   }
321
322   /**
323    * Return the distance from the given IOBuf to the this cursor.
324    */
325   size_t operator-(const BufType* buf) const {
326     size_t len = 0;
327
328     BufType *curBuf = buf;
329     while (curBuf != crtBuf_) {
330       len += curBuf->length();
331       curBuf = curBuf->next();
332       if (curBuf == buf || curBuf == buffer_) {
333         throw std::out_of_range("wrap-around");
334       }
335     }
336
337     len += offset_;
338     return len;
339   }
340
341  protected:
342   BufType* crtBuf_;
343   size_t offset_;
344
345   ~CursorBase(){}
346
347   bool tryAdvanceBuffer() {
348     BufType* nextBuf = crtBuf_->next();
349     if (UNLIKELY(nextBuf == buffer_)) {
350       offset_ = crtBuf_->length();
351       return false;
352     }
353
354     offset_ = 0;
355     crtBuf_ = nextBuf;
356     static_cast<Derived*>(this)->advanceDone();
357     return true;
358   }
359
360  private:
361   void advanceDone() {
362   }
363
364   BufType* buffer_;
365 };
366
367 template <class Derived>
368 class Writable {
369  public:
370   template <class T>
371   typename std::enable_if<std::is_integral<T>::value>::type
372   write(T value) {
373     const uint8_t* u8 = reinterpret_cast<const uint8_t*>(&value);
374     push(u8, sizeof(T));
375   }
376
377   template <class T>
378   void writeBE(T value) {
379     write(Endian::big(value));
380   }
381
382   template <class T>
383   void writeLE(T value) {
384     write(Endian::little(value));
385   }
386
387   void push(const uint8_t* buf, size_t len) {
388     Derived* d = static_cast<Derived*>(this);
389     if (d->pushAtMost(buf, len) != len) {
390       throw std::out_of_range("overflow");
391     }
392   }
393 };
394
395 } // namespace detail
396
397 class Cursor : public detail::CursorBase<Cursor, const IOBuf> {
398  public:
399   explicit Cursor(const IOBuf* buf)
400     : detail::CursorBase<Cursor, const IOBuf>(buf) {}
401
402   template <class CursorType>
403   explicit Cursor(CursorType& cursor)
404     : detail::CursorBase<Cursor, const IOBuf>(cursor) {}
405 };
406
407 enum class CursorAccess {
408   PRIVATE,
409   UNSHARE
410 };
411
412 template <CursorAccess access>
413 class RWCursor
414   : public detail::CursorBase<RWCursor<access>, IOBuf>,
415     public detail::Writable<RWCursor<access>> {
416   friend class detail::CursorBase<RWCursor<access>, IOBuf>;
417  public:
418   explicit RWCursor(IOBuf* buf)
419     : detail::CursorBase<RWCursor<access>, IOBuf>(buf),
420       maybeShared_(true) {}
421
422   template <class CursorType>
423   explicit RWCursor(CursorType& cursor)
424     : detail::CursorBase<RWCursor<access>, IOBuf>(cursor),
425       maybeShared_(true) {}
426   /**
427    * Gather at least n bytes contiguously into the current buffer,
428    * by coalescing subsequent buffers from the chain as necessary.
429    */
430   void gather(size_t n) {
431     this->crtBuf_->gather(this->offset_ + n);
432   }
433
434   size_t pushAtMost(const uint8_t* buf, size_t len) {
435     size_t copied = 0;
436     for (;;) {
437       // Fast path: the current buffer is big enough.
438       size_t available = this->length();
439       if (LIKELY(available >= len)) {
440         if (access == CursorAccess::UNSHARE) {
441           maybeUnshare();
442         }
443         memcpy(writableData(), buf, len);
444         this->offset_ += len;
445         return copied + len;
446       }
447
448       if (access == CursorAccess::UNSHARE) {
449         maybeUnshare();
450       }
451       memcpy(writableData(), buf, available);
452       copied += available;
453       if (UNLIKELY(!this->tryAdvanceBuffer())) {
454         return copied;
455       }
456       buf += available;
457       len -= available;
458     }
459   }
460
461   void insert(std::unique_ptr<folly::IOBuf> buf) {
462     folly::IOBuf* nextBuf;
463     if (this->offset_ == 0) {
464       // Can just prepend
465       nextBuf = buf.get();
466       this->crtBuf_->prependChain(std::move(buf));
467     } else {
468       std::unique_ptr<folly::IOBuf> remaining;
469       if (this->crtBuf_->length() - this->offset_ > 0) {
470         // Need to split current IOBuf in two.
471         remaining = this->crtBuf_->cloneOne();
472         remaining->trimStart(this->offset_);
473         nextBuf = remaining.get();
474         buf->prependChain(std::move(remaining));
475       } else {
476         // Can just append
477         nextBuf = this->crtBuf_->next();
478       }
479       this->crtBuf_->trimEnd(this->length());
480       this->crtBuf_->appendChain(std::move(buf));
481     }
482     // Jump past the new links
483     this->offset_ = 0;
484     this->crtBuf_ = nextBuf;
485   }
486
487   uint8_t* writableData() {
488     return this->crtBuf_->writableData() + this->offset_;
489   }
490
491  private:
492   void maybeUnshare() {
493     if (UNLIKELY(maybeShared_)) {
494       this->crtBuf_->unshareOne();
495       maybeShared_ = false;
496     }
497   }
498
499   void advanceDone() {
500     maybeShared_ = true;
501   }
502
503   bool maybeShared_;
504 };
505
506 typedef RWCursor<CursorAccess::PRIVATE> RWPrivateCursor;
507 typedef RWCursor<CursorAccess::UNSHARE> RWUnshareCursor;
508
509 /**
510  * Append to the end of a buffer chain, growing the chain (by allocating new
511  * buffers) in increments of at least growth bytes every time.  Won't grow
512  * (and push() and ensure() will throw) if growth == 0.
513  *
514  * TODO(tudorb): add a flavor of Appender that reallocates one IOBuf instead
515  * of chaining.
516  */
517 class Appender : public detail::Writable<Appender> {
518  public:
519   Appender(IOBuf* buf, uint32_t growth)
520     : buffer_(buf),
521       crtBuf_(buf->prev()),
522       growth_(growth) {
523   }
524
525   uint8_t* writableData() {
526     return crtBuf_->writableTail();
527   }
528
529   size_t length() const {
530     return crtBuf_->tailroom();
531   }
532
533   /**
534    * Mark n bytes (must be <= length()) as appended, as per the
535    * IOBuf::append() method.
536    */
537   void append(size_t n) {
538     crtBuf_->append(n);
539   }
540
541   /**
542    * Ensure at least n contiguous bytes available to write.
543    * Postcondition: length() >= n.
544    */
545   void ensure(uint32_t n) {
546     if (LIKELY(length() >= n)) {
547       return;
548     }
549
550     // Waste the rest of the current buffer and allocate a new one.
551     // Don't make it too small, either.
552     if (growth_ == 0) {
553       throw std::out_of_range("can't grow buffer chain");
554     }
555
556     n = std::max(n, growth_);
557     buffer_->prependChain(IOBuf::create(n));
558     crtBuf_ = buffer_->prev();
559   }
560
561   size_t pushAtMost(const uint8_t* buf, size_t len) {
562     size_t copied = 0;
563     for (;;) {
564       // Fast path: it all fits in one buffer.
565       size_t available = length();
566       if (LIKELY(available >= len)) {
567         memcpy(writableData(), buf, len);
568         append(len);
569         return copied + len;
570       }
571
572       memcpy(writableData(), buf, available);
573       append(available);
574       copied += available;
575       if (UNLIKELY(!tryGrowChain())) {
576         return copied;
577       }
578       buf += available;
579       len -= available;
580     }
581   }
582
583  private:
584   bool tryGrowChain() {
585     assert(crtBuf_->next() == buffer_);
586     if (growth_ == 0) {
587       return false;
588     }
589
590     buffer_->prependChain(IOBuf::create(growth_));
591     crtBuf_ = buffer_->prev();
592     return true;
593   }
594
595   IOBuf* buffer_;
596   IOBuf* crtBuf_;
597   uint32_t growth_;
598 };
599
600 class QueueAppender : public detail::Writable<QueueAppender> {
601  public:
602   /**
603    * Create an Appender that writes to a IOBufQueue.  When we allocate
604    * space in the queue, we grow no more than growth bytes at once
605    * (unless you call ensure() with a bigger value yourself).
606    * Throw if we ever need to allocate more than maxTotalGrowth.
607    */
608   QueueAppender(IOBufQueue* queue,
609                 uint32_t growth,
610                 size_t maxTotalGrowth = std::numeric_limits<size_t>::max()) {
611     reset(queue, growth, maxTotalGrowth);
612   }
613
614   void reset(IOBufQueue* queue,
615              uint32_t growth,
616              size_t maxTotalGrowth = std::numeric_limits<size_t>::max()) {
617     queue_ = queue;
618     growth_ = growth;
619     remainingGrowth_ = maxTotalGrowth;
620     next_ = nullptr;
621     available_ = 0;
622   }
623
624   uint8_t* writableData() { return next_; }
625
626   size_t length() const { return available_; }
627
628   void append(size_t n) {
629     assert(n <= available_);
630     assert(n <= remainingGrowth_);
631     queue_->postallocate(n);
632     next_ += n;
633     available_ -= n;
634     remainingGrowth_ -= n;
635   }
636
637   // Ensure at least n contiguous; can go above growth_, throws if
638   // not enough room.
639   void ensure(uint32_t n) {
640     if (UNLIKELY(n > remainingGrowth_)) {
641       throw std::out_of_range("overflow");
642     }
643
644     if (LIKELY(length() >= n)) {
645       return;
646     }
647
648     size_t desired = std::min(growth_, remainingGrowth_ - n);
649
650     // Grab some more.
651     auto p = queue_->preallocate(n, desired);
652
653     next_ = static_cast<uint8_t*>(p.first);
654     available_ = p.second;
655   }
656
657   size_t pushAtMost(const uint8_t* buf, size_t len) {
658     if (UNLIKELY(len > remainingGrowth_)) {
659       len = remainingGrowth_;
660     }
661
662     size_t remaining = len;
663     while (remaining != 0) {
664       ensure(std::min(remaining, growth_));
665       size_t n = std::min(remaining, available_);
666       memcpy(next_, buf, n);
667       buf += n;
668       remaining -= n;
669       append(n);
670     }
671
672     return len;
673   }
674
675   // insert doesn't count towards maxTotalGrowth
676   void insert(std::unique_ptr<folly::IOBuf> buf) {
677     if (buf) {
678       queue_->append(std::move(buf), true);
679       next_ = nullptr;
680       available_ = 0;
681     }
682   }
683
684  private:
685   folly::IOBufQueue* queue_;
686   size_t growth_;
687   size_t remainingGrowth_;
688   uint8_t* next_;
689   size_t available_;
690 };
691
692 }}  // folly::io
693
694 #endif // FOLLY_CURSOR_H