Make IOBuf support 64-bit length and capacity
[folly.git] / folly / io / Cursor.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_CURSOR_H
18 #define FOLLY_CURSOR_H
19
20 #include <assert.h>
21 #include <stdexcept>
22 #include <string.h>
23 #include <type_traits>
24 #include <memory>
25
26 #include "folly/Bits.h"
27 #include "folly/io/IOBuf.h"
28 #include "folly/io/IOBufQueue.h"
29 #include "folly/Likely.h"
30 #include "folly/Memory.h"
31
32 /**
33  * Cursor class for fast iteration over IOBuf chains.
34  *
35  * Cursor - Read-only access
36  *
37  * RWPrivateCursor - Read-write access, assumes private access to IOBuf chain
38  * RWUnshareCursor - Read-write access, calls unshare on write (COW)
39  * Appender        - Write access, assumes private access to IOBuf chian
40  *
41  * Note that RW cursors write in the preallocated part of buffers (that is,
42  * between the buffer's data() and tail()), while Appenders append to the end
43  * of the buffer (between the buffer's tail() and bufferEnd()).  Appenders
44  * automatically adjust the buffer pointers, so you may only use one
45  * Appender with a buffer chain; for this reason, Appenders assume private
46  * access to the buffer (you need to call unshare() yourself if necessary).
47  **/
48 namespace folly { namespace io {
49 namespace detail {
50
51 template <class Derived, typename BufType>
52 class CursorBase {
53  public:
54   const uint8_t* data() const {
55     return crtBuf_->data() + offset_;
56   }
57
58   /*
59    * Return the remaining space available in the current IOBuf.
60    *
61    * May return 0 if the cursor is at the end of an IOBuf.  Use peek() instead
62    * if you want to avoid this.  peek() will advance to the next non-empty
63    * IOBuf (up to the end of the chain) if the cursor is currently pointing at
64    * the end of a buffer.
65    */
66   size_t length() const {
67     return crtBuf_->length() - offset_;
68   }
69
70   /*
71    * Return the space available until the end of the entire IOBuf chain.
72    */
73   size_t totalLength() const {
74     if (crtBuf_ == buffer_) {
75       return crtBuf_->computeChainDataLength() - offset_;
76     }
77     CursorBase end(buffer_->prev());
78     end.offset_ = end.buffer_->length();
79     return end - *this;
80   }
81
82   Derived& operator+=(size_t offset) {
83     Derived* p = static_cast<Derived*>(this);
84     p->skip(offset);
85     return *p;
86   }
87
88   template <class T>
89   typename std::enable_if<std::is_integral<T>::value, T>::type
90   read() {
91     T val;
92     pull(&val, sizeof(T));
93     return val;
94   }
95
96   template <class T>
97   T readBE() {
98     return Endian::big(read<T>());
99   }
100
101   template <class T>
102   T readLE() {
103     return Endian::little(read<T>());
104   }
105
106   /**
107    * Read a fixed-length string.
108    *
109    * The std::string-based APIs should probably be avoided unless you
110    * ultimately want the data to live in an std::string. You're better off
111    * using the pull() APIs to copy into a raw buffer otherwise.
112    */
113   std::string readFixedString(size_t len) {
114     std::string str;
115
116     str.reserve(len);
117     for (;;) {
118       // Fast path: it all fits in one buffer.
119       size_t available = length();
120       if (LIKELY(available >= len)) {
121         str.append(reinterpret_cast<const char*>(data()), len);
122         offset_ += len;
123         return str;
124       }
125
126       str.append(reinterpret_cast<const char*>(data()), available);
127       if (UNLIKELY(!tryAdvanceBuffer())) {
128         throw std::out_of_range("string underflow");
129       }
130       len -= available;
131     }
132   }
133
134   /**
135    * Read a string consisting of bytes until the given terminator character is
136    * seen. Raises an std::length_error if maxLength bytes have been processed
137    * before the terminator is seen.
138    *
139    * See comments in readFixedString() about when it's appropriate to use this
140    * vs. using pull().
141    */
142   std::string readTerminatedString(
143     char termChar = '\0',
144     size_t maxLength = std::numeric_limits<size_t>::max()) {
145     std::string str;
146
147     for (;;) {
148       const uint8_t* buf = data();
149       size_t buflen = length();
150
151       size_t i = 0;
152       while (i < buflen && buf[i] != termChar) {
153         ++i;
154
155         // Do this check after incrementing 'i', as even though we start at the
156         // 0 byte, it still represents a single character
157         if (str.length() + i >= maxLength) {
158           throw std::length_error("string overflow");
159         }
160       }
161
162       str.append(reinterpret_cast<const char*>(buf), i);
163       if (i < buflen) {
164         skip(i + 1);
165         return str;
166       }
167
168       skip(i);
169
170       if (UNLIKELY(!tryAdvanceBuffer())) {
171         throw std::out_of_range("string underflow");
172       }
173     }
174   }
175
176   explicit CursorBase(BufType* buf)
177     : crtBuf_(buf)
178     , offset_(0)
179     , buffer_(buf) {}
180
181   // Make all the templated classes friends for copy constructor.
182   template <class D, typename B> friend class CursorBase;
183
184   template <class T>
185   explicit CursorBase(const T& cursor) {
186     crtBuf_ = cursor.crtBuf_;
187     offset_ = cursor.offset_;
188     buffer_ = cursor.buffer_;
189   }
190
191   // reset cursor to point to a new buffer.
192   void reset(BufType* buf) {
193     crtBuf_ = buf;
194     buffer_ = buf;
195     offset_ = 0;
196   }
197
198   /**
199    * Return the available data in the current buffer.
200    * If you want to gather more data from the chain into a contiguous region
201    * (for hopefully zero-copy access), use gather() before peek().
202    */
203   std::pair<const uint8_t*, size_t> peek() {
204     // Ensure that we're pointing to valid data
205     size_t available = length();
206     while (UNLIKELY(available == 0 && tryAdvanceBuffer())) {
207       available = length();
208     }
209
210     return std::make_pair(data(), available);
211   }
212
213   void pull(void* buf, size_t len) {
214     if (UNLIKELY(pullAtMost(buf, len) != len)) {
215       throw std::out_of_range("underflow");
216     }
217   }
218
219   void clone(std::unique_ptr<folly::IOBuf>& buf, size_t len) {
220     if (UNLIKELY(cloneAtMost(buf, len) != len)) {
221       throw std::out_of_range("underflow");
222     }
223   }
224
225   void clone(folly::IOBuf& buf, size_t len) {
226     if (UNLIKELY(cloneAtMost(buf, len) != len)) {
227       throw std::out_of_range("underflow");
228     }
229   }
230
231   void skip(size_t len) {
232     if (UNLIKELY(skipAtMost(len) != len)) {
233       throw std::out_of_range("underflow");
234     }
235   }
236
237   size_t pullAtMost(void* buf, size_t len) {
238     uint8_t* p = reinterpret_cast<uint8_t*>(buf);
239     size_t copied = 0;
240     for (;;) {
241       // Fast path: it all fits in one buffer.
242       size_t available = length();
243       if (LIKELY(available >= len)) {
244         memcpy(p, data(), len);
245         offset_ += len;
246         return copied + len;
247       }
248
249       memcpy(p, data(), available);
250       copied += available;
251       if (UNLIKELY(!tryAdvanceBuffer())) {
252         return copied;
253       }
254       p += available;
255       len -= available;
256     }
257   }
258
259   size_t cloneAtMost(folly::IOBuf& buf, size_t len) {
260     buf = folly::IOBuf();
261
262     std::unique_ptr<folly::IOBuf> tmp;
263     size_t copied = 0;
264     for (int loopCount = 0; true; ++loopCount) {
265       // Fast path: it all fits in one buffer.
266       size_t available = length();
267       if (LIKELY(available >= len)) {
268         if (loopCount == 0) {
269           crtBuf_->cloneOneInto(buf);
270           buf.trimStart(offset_);
271           buf.trimEnd(buf.length() - len);
272         } else {
273           tmp = crtBuf_->cloneOne();
274           tmp->trimStart(offset_);
275           tmp->trimEnd(tmp->length() - len);
276           buf.prependChain(std::move(tmp));
277         }
278
279         offset_ += len;
280         return copied + len;
281       }
282
283
284       if (loopCount == 0) {
285         crtBuf_->cloneOneInto(buf);
286         buf.trimStart(offset_);
287       } else {
288         tmp = crtBuf_->cloneOne();
289         tmp->trimStart(offset_);
290         buf.prependChain(std::move(tmp));
291       }
292
293       copied += available;
294       if (UNLIKELY(!tryAdvanceBuffer())) {
295         return copied;
296       }
297       len -= available;
298     }
299   }
300
301   size_t cloneAtMost(std::unique_ptr<folly::IOBuf>& buf, size_t len) {
302     if (!buf) {
303       buf = make_unique<folly::IOBuf>();
304     }
305
306     return cloneAtMost(*buf, len);
307   }
308
309   size_t skipAtMost(size_t len) {
310     size_t skipped = 0;
311     for (;;) {
312       // Fast path: it all fits in one buffer.
313       size_t available = length();
314       if (LIKELY(available >= len)) {
315         offset_ += len;
316         return skipped + len;
317       }
318
319       skipped += available;
320       if (UNLIKELY(!tryAdvanceBuffer())) {
321         return skipped;
322       }
323       len -= available;
324     }
325   }
326
327   /**
328    * Return the distance between two cursors.
329    */
330   size_t operator-(const CursorBase& other) const {
331     BufType *otherBuf = other.crtBuf_;
332     size_t len = 0;
333
334     if (otherBuf != crtBuf_) {
335       len += otherBuf->length() - other.offset_;
336
337       for (otherBuf = otherBuf->next();
338            otherBuf != crtBuf_ && otherBuf != other.buffer_;
339            otherBuf = otherBuf->next()) {
340         len += otherBuf->length();
341       }
342
343       if (otherBuf == other.buffer_) {
344         throw std::out_of_range("wrap-around");
345       }
346
347       len += offset_;
348     } else {
349       if (offset_ < other.offset_) {
350         throw std::out_of_range("underflow");
351       }
352
353       len += offset_ - other.offset_;
354     }
355
356     return len;
357   }
358
359   /**
360    * Return the distance from the given IOBuf to the this cursor.
361    */
362   size_t operator-(const BufType* buf) const {
363     size_t len = 0;
364
365     BufType *curBuf = buf;
366     while (curBuf != crtBuf_) {
367       len += curBuf->length();
368       curBuf = curBuf->next();
369       if (curBuf == buf || curBuf == buffer_) {
370         throw std::out_of_range("wrap-around");
371       }
372     }
373
374     len += offset_;
375     return len;
376   }
377
378  protected:
379   BufType* crtBuf_;
380   size_t offset_;
381
382   ~CursorBase(){}
383
384   BufType* head() {
385     return buffer_;
386   }
387
388   bool tryAdvanceBuffer() {
389     BufType* nextBuf = crtBuf_->next();
390     if (UNLIKELY(nextBuf == buffer_)) {
391       offset_ = crtBuf_->length();
392       return false;
393     }
394
395     offset_ = 0;
396     crtBuf_ = nextBuf;
397     static_cast<Derived*>(this)->advanceDone();
398     return true;
399   }
400
401  private:
402   void advanceDone() {
403   }
404
405   BufType* buffer_;
406 };
407
408 template <class Derived>
409 class Writable {
410  public:
411   template <class T>
412   typename std::enable_if<std::is_integral<T>::value>::type
413   write(T value) {
414     const uint8_t* u8 = reinterpret_cast<const uint8_t*>(&value);
415     Derived* d = static_cast<Derived*>(this);
416     d->push(u8, sizeof(T));
417   }
418
419   template <class T>
420   void writeBE(T value) {
421     Derived* d = static_cast<Derived*>(this);
422     d->write(Endian::big(value));
423   }
424
425   template <class T>
426   void writeLE(T value) {
427     Derived* d = static_cast<Derived*>(this);
428     d->write(Endian::little(value));
429   }
430
431   void push(const uint8_t* buf, size_t len) {
432     Derived* d = static_cast<Derived*>(this);
433     if (d->pushAtMost(buf, len) != len) {
434       throw std::out_of_range("overflow");
435     }
436   }
437 };
438
439 } // namespace detail
440
441 class Cursor : public detail::CursorBase<Cursor, const IOBuf> {
442  public:
443   explicit Cursor(const IOBuf* buf)
444     : detail::CursorBase<Cursor, const IOBuf>(buf) {}
445
446   template <class CursorType>
447   explicit Cursor(CursorType& cursor)
448     : detail::CursorBase<Cursor, const IOBuf>(cursor) {}
449 };
450
451 enum class CursorAccess {
452   PRIVATE,
453   UNSHARE
454 };
455
456 template <CursorAccess access>
457 class RWCursor
458   : public detail::CursorBase<RWCursor<access>, IOBuf>,
459     public detail::Writable<RWCursor<access>> {
460   friend class detail::CursorBase<RWCursor<access>, IOBuf>;
461  public:
462   explicit RWCursor(IOBuf* buf)
463     : detail::CursorBase<RWCursor<access>, IOBuf>(buf),
464       maybeShared_(true) {}
465
466   template <class CursorType>
467   explicit RWCursor(CursorType& cursor)
468     : detail::CursorBase<RWCursor<access>, IOBuf>(cursor),
469       maybeShared_(true) {}
470   /**
471    * Gather at least n bytes contiguously into the current buffer,
472    * by coalescing subsequent buffers from the chain as necessary.
473    */
474   void gather(size_t n) {
475     // Forbid attempts to gather beyond the end of this IOBuf chain.
476     // Otherwise we could try to coalesce the head of the chain and end up
477     // accidentally freeing it, invalidating the pointer owned by external
478     // code.
479     //
480     // If crtBuf_ == head() then IOBuf::gather() will perform all necessary
481     // checking.  We only have to perform an explicit check here when calling
482     // gather() on a non-head element.
483     if (this->crtBuf_ != this->head() && this->totalLength() < n) {
484       throw std::overflow_error("cannot gather() past the end of the chain");
485     }
486     this->crtBuf_->gather(this->offset_ + n);
487   }
488   void gatherAtMost(size_t n) {
489     size_t size = std::min(n, this->totalLength());
490     return this->crtBuf_->gather(this->offset_ + size);
491   }
492
493   size_t pushAtMost(const uint8_t* buf, size_t len) {
494     size_t copied = 0;
495     for (;;) {
496       // Fast path: the current buffer is big enough.
497       size_t available = this->length();
498       if (LIKELY(available >= len)) {
499         if (access == CursorAccess::UNSHARE) {
500           maybeUnshare();
501         }
502         memcpy(writableData(), buf, len);
503         this->offset_ += len;
504         return copied + len;
505       }
506
507       if (access == CursorAccess::UNSHARE) {
508         maybeUnshare();
509       }
510       memcpy(writableData(), buf, available);
511       copied += available;
512       if (UNLIKELY(!this->tryAdvanceBuffer())) {
513         return copied;
514       }
515       buf += available;
516       len -= available;
517     }
518   }
519
520   void insert(std::unique_ptr<folly::IOBuf> buf) {
521     folly::IOBuf* nextBuf;
522     if (this->offset_ == 0) {
523       // Can just prepend
524       nextBuf = this->crtBuf_;
525       this->crtBuf_->prependChain(std::move(buf));
526     } else {
527       std::unique_ptr<folly::IOBuf> remaining;
528       if (this->crtBuf_->length() - this->offset_ > 0) {
529         // Need to split current IOBuf in two.
530         remaining = this->crtBuf_->cloneOne();
531         remaining->trimStart(this->offset_);
532         nextBuf = remaining.get();
533         buf->prependChain(std::move(remaining));
534       } else {
535         // Can just append
536         nextBuf = this->crtBuf_->next();
537       }
538       this->crtBuf_->trimEnd(this->length());
539       this->crtBuf_->appendChain(std::move(buf));
540     }
541     // Jump past the new links
542     this->offset_ = 0;
543     this->crtBuf_ = nextBuf;
544   }
545
546   uint8_t* writableData() {
547     return this->crtBuf_->writableData() + this->offset_;
548   }
549
550  private:
551   void maybeUnshare() {
552     if (UNLIKELY(maybeShared_)) {
553       this->crtBuf_->unshareOne();
554       maybeShared_ = false;
555     }
556   }
557
558   void advanceDone() {
559     maybeShared_ = true;
560   }
561
562   bool maybeShared_;
563 };
564
565 typedef RWCursor<CursorAccess::PRIVATE> RWPrivateCursor;
566 typedef RWCursor<CursorAccess::UNSHARE> RWUnshareCursor;
567
568 /**
569  * Append to the end of a buffer chain, growing the chain (by allocating new
570  * buffers) in increments of at least growth bytes every time.  Won't grow
571  * (and push() and ensure() will throw) if growth == 0.
572  *
573  * TODO(tudorb): add a flavor of Appender that reallocates one IOBuf instead
574  * of chaining.
575  */
576 class Appender : public detail::Writable<Appender> {
577  public:
578   Appender(IOBuf* buf, uint64_t growth)
579     : buffer_(buf),
580       crtBuf_(buf->prev()),
581       growth_(growth) {
582   }
583
584   uint8_t* writableData() {
585     return crtBuf_->writableTail();
586   }
587
588   size_t length() const {
589     return crtBuf_->tailroom();
590   }
591
592   /**
593    * Mark n bytes (must be <= length()) as appended, as per the
594    * IOBuf::append() method.
595    */
596   void append(size_t n) {
597     crtBuf_->append(n);
598   }
599
600   /**
601    * Ensure at least n contiguous bytes available to write.
602    * Postcondition: length() >= n.
603    */
604   void ensure(uint64_t n) {
605     if (LIKELY(length() >= n)) {
606       return;
607     }
608
609     // Waste the rest of the current buffer and allocate a new one.
610     // Don't make it too small, either.
611     if (growth_ == 0) {
612       throw std::out_of_range("can't grow buffer chain");
613     }
614
615     n = std::max(n, growth_);
616     buffer_->prependChain(IOBuf::create(n));
617     crtBuf_ = buffer_->prev();
618   }
619
620   size_t pushAtMost(const uint8_t* buf, size_t len) {
621     size_t copied = 0;
622     for (;;) {
623       // Fast path: it all fits in one buffer.
624       size_t available = length();
625       if (LIKELY(available >= len)) {
626         memcpy(writableData(), buf, len);
627         append(len);
628         return copied + len;
629       }
630
631       memcpy(writableData(), buf, available);
632       append(available);
633       copied += available;
634       if (UNLIKELY(!tryGrowChain())) {
635         return copied;
636       }
637       buf += available;
638       len -= available;
639     }
640   }
641
642  private:
643   bool tryGrowChain() {
644     assert(crtBuf_->next() == buffer_);
645     if (growth_ == 0) {
646       return false;
647     }
648
649     buffer_->prependChain(IOBuf::create(growth_));
650     crtBuf_ = buffer_->prev();
651     return true;
652   }
653
654   IOBuf* buffer_;
655   IOBuf* crtBuf_;
656   uint64_t growth_;
657 };
658
659 class QueueAppender : public detail::Writable<QueueAppender> {
660  public:
661   /**
662    * Create an Appender that writes to a IOBufQueue.  When we allocate
663    * space in the queue, we grow no more than growth bytes at once
664    * (unless you call ensure() with a bigger value yourself).
665    */
666   QueueAppender(IOBufQueue* queue, uint64_t growth) {
667     reset(queue, growth);
668   }
669
670   void reset(IOBufQueue* queue, uint64_t growth) {
671     queue_ = queue;
672     growth_ = growth;
673   }
674
675   uint8_t* writableData() {
676     return static_cast<uint8_t*>(queue_->writableTail());
677   }
678
679   size_t length() const { return queue_->tailroom(); }
680
681   void append(size_t n) { queue_->postallocate(n); }
682
683   // Ensure at least n contiguous; can go above growth_, throws if
684   // not enough room.
685   void ensure(uint64_t n) { queue_->preallocate(n, growth_); }
686
687   template <class T>
688   typename std::enable_if<std::is_integral<T>::value>::type
689   write(T value) {
690     // We can't fail.
691     auto p = queue_->preallocate(sizeof(T), growth_);
692     storeUnaligned(p.first, value);
693     queue_->postallocate(sizeof(T));
694   }
695
696
697   size_t pushAtMost(const uint8_t* buf, size_t len) {
698     size_t remaining = len;
699     while (remaining != 0) {
700       auto p = queue_->preallocate(std::min(remaining, growth_),
701                                    growth_,
702                                    remaining);
703       memcpy(p.first, buf, p.second);
704       queue_->postallocate(p.second);
705       buf += p.second;
706       remaining -= p.second;
707     }
708
709     return len;
710   }
711
712   void insert(std::unique_ptr<folly::IOBuf> buf) {
713     if (buf) {
714       queue_->append(std::move(buf), true);
715     }
716   }
717
718  private:
719   folly::IOBufQueue* queue_;
720   size_t growth_;
721 };
722
723 }}  // folly::io
724
725 #endif // FOLLY_CURSOR_H