folly::io::QueueAppender speedup
[folly.git] / folly / io / Cursor.h
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_CURSOR_H
18 #define FOLLY_CURSOR_H
19
20 #include <assert.h>
21 #include <stdexcept>
22 #include <string.h>
23 #include <type_traits>
24 #include <memory>
25
26 #include "folly/Bits.h"
27 #include "folly/io/IOBuf.h"
28 #include "folly/io/IOBufQueue.h"
29 #include "folly/Likely.h"
30
31 /**
32  * Cursor class for fast iteration over IOBuf chains.
33  *
34  * Cursor - Read-only access
35  *
36  * RWPrivateCursor - Read-write access, assumes private access to IOBuf chain
37  * RWUnshareCursor - Read-write access, calls unshare on write (COW)
38  * Appender        - Write access, assumes private access to IOBuf chian
39  *
40  * Note that RW cursors write in the preallocated part of buffers (that is,
41  * between the buffer's data() and tail()), while Appenders append to the end
42  * of the buffer (between the buffer's tail() and bufferEnd()).  Appenders
43  * automatically adjust the buffer pointers, so you may only use one
44  * Appender with a buffer chain; for this reason, Appenders assume private
45  * access to the buffer (you need to call unshare() yourself if necessary).
46  **/
47 namespace folly { namespace io {
48 namespace detail {
49
50 template <class Derived, typename BufType>
51 class CursorBase {
52  public:
53   const uint8_t* data() const {
54     return crtBuf_->data() + offset_;
55   }
56
57   // Space available in the current IOBuf.  May be 0; use peek() instead which
58   // will always point to a non-empty chunk of data or at the end of the
59   // chain.
60   size_t length() const {
61     return crtBuf_->length() - offset_;
62   }
63
64   Derived& operator+=(size_t offset) {
65     Derived* p = static_cast<Derived*>(this);
66     p->skip(offset);
67     return *p;
68   }
69
70   template <class T>
71   typename std::enable_if<std::is_integral<T>::value, T>::type
72   read() {
73     T val;
74     pull(&val, sizeof(T));
75     return val;
76   }
77
78   template <class T>
79   T readBE() {
80     return Endian::big(read<T>());
81   }
82
83   template <class T>
84   T readLE() {
85     return Endian::little(read<T>());
86   }
87
88   /**
89    * Read a fixed-length string.
90    *
91    * The std::string-based APIs should probably be avoided unless you
92    * ultimately want the data to live in an std::string. You're better off
93    * using the pull() APIs to copy into a raw buffer otherwise.
94    */
95   std::string readFixedString(size_t len) {
96     std::string str;
97
98     str.reserve(len);
99     for (;;) {
100       // Fast path: it all fits in one buffer.
101       size_t available = length();
102       if (LIKELY(available >= len)) {
103         str.append(reinterpret_cast<const char*>(data()), len);
104         offset_ += len;
105         return str;
106       }
107
108       str.append(reinterpret_cast<const char*>(data()), available);
109       if (UNLIKELY(!tryAdvanceBuffer())) {
110         throw std::out_of_range("string underflow");
111       }
112       len -= available;
113     }
114   }
115
116   /**
117    * Read a string consisting of bytes until the given terminator character is
118    * seen. Raises an std::length_error if maxLength bytes have been processed
119    * before the terminator is seen.
120    *
121    * See comments in readFixedString() about when it's appropriate to use this
122    * vs. using pull().
123    */
124   std::string readTerminatedString(
125     char termChar = '\0',
126     size_t maxLength = std::numeric_limits<size_t>::max()) {
127     std::string str;
128
129     for (;;) {
130       const uint8_t* buf = data();
131       size_t buflen = length();
132
133       size_t i = 0;
134       while (i < buflen && buf[i] != termChar) {
135         ++i;
136
137         // Do this check after incrementing 'i', as even though we start at the
138         // 0 byte, it still represents a single character
139         if (str.length() + i >= maxLength) {
140           throw std::length_error("string overflow");
141         }
142       }
143
144       str.append(reinterpret_cast<const char*>(buf), i);
145       if (i < buflen) {
146         skip(i + 1);
147         return str;
148       }
149
150       skip(i);
151
152       if (UNLIKELY(!tryAdvanceBuffer())) {
153         throw std::out_of_range("string underflow");
154       }
155     }
156   }
157
158   explicit CursorBase(BufType* buf)
159     : crtBuf_(buf)
160     , offset_(0)
161     , buffer_(buf) {}
162
163   // Make all the templated classes friends for copy constructor.
164   template <class D, typename B> friend class CursorBase;
165
166   template <class T>
167   explicit CursorBase(const T& cursor) {
168     crtBuf_ = cursor.crtBuf_;
169     offset_ = cursor.offset_;
170     buffer_ = cursor.buffer_;
171   }
172
173   // reset cursor to point to a new buffer.
174   void reset(BufType* buf) {
175     crtBuf_ = buf;
176     buffer_ = buf;
177     offset_ = 0;
178   }
179
180   /**
181    * Return the available data in the current buffer.
182    * If you want to gather more data from the chain into a contiguous region
183    * (for hopefully zero-copy access), use gather() before peek().
184    */
185   std::pair<const uint8_t*, size_t> peek() {
186     // Ensure that we're pointing to valid data
187     size_t available = length();
188     while (UNLIKELY(available == 0 && tryAdvanceBuffer())) {
189       available = length();
190     }
191
192     return std::make_pair(data(), available);
193   }
194
195   void pull(void* buf, size_t len) {
196     if (UNLIKELY(pullAtMost(buf, len) != len)) {
197       throw std::out_of_range("underflow");
198     }
199   }
200
201   void clone(std::unique_ptr<folly::IOBuf>& buf, size_t len) {
202     if (UNLIKELY(cloneAtMost(buf, len) != len)) {
203       throw std::out_of_range("underflow");
204     }
205   }
206
207   void skip(size_t len) {
208     if (UNLIKELY(skipAtMost(len) != len)) {
209       throw std::out_of_range("underflow");
210     }
211   }
212
213   size_t pullAtMost(void* buf, size_t len) {
214     uint8_t* p = reinterpret_cast<uint8_t*>(buf);
215     size_t copied = 0;
216     for (;;) {
217       // Fast path: it all fits in one buffer.
218       size_t available = length();
219       if (LIKELY(available >= len)) {
220         memcpy(p, data(), len);
221         offset_ += len;
222         return copied + len;
223       }
224
225       memcpy(p, data(), available);
226       copied += available;
227       if (UNLIKELY(!tryAdvanceBuffer())) {
228         return copied;
229       }
230       p += available;
231       len -= available;
232     }
233   }
234
235   size_t cloneAtMost(std::unique_ptr<folly::IOBuf>& buf, size_t len) {
236     buf.reset(nullptr);
237
238     std::unique_ptr<folly::IOBuf> tmp;
239     size_t copied = 0;
240     for (;;) {
241       // Fast path: it all fits in one buffer.
242       size_t available = length();
243       if (LIKELY(available >= len)) {
244         tmp = crtBuf_->cloneOne();
245         tmp->trimStart(offset_);
246         tmp->trimEnd(tmp->length() - len);
247         offset_ += len;
248         if (!buf) {
249           buf = std::move(tmp);
250         } else {
251           buf->prependChain(std::move(tmp));
252         }
253         return copied + len;
254       }
255
256       tmp = crtBuf_->cloneOne();
257       tmp->trimStart(offset_);
258       if (!buf) {
259         buf = std::move(tmp);
260       } else {
261         buf->prependChain(std::move(tmp));
262       }
263
264       copied += available;
265       if (UNLIKELY(!tryAdvanceBuffer())) {
266         return copied;
267       }
268       len -= available;
269     }
270   }
271
272   size_t skipAtMost(size_t len) {
273     size_t skipped = 0;
274     for (;;) {
275       // Fast path: it all fits in one buffer.
276       size_t available = length();
277       if (LIKELY(available >= len)) {
278         offset_ += len;
279         return skipped + len;
280       }
281
282       skipped += available;
283       if (UNLIKELY(!tryAdvanceBuffer())) {
284         return skipped;
285       }
286       len -= available;
287     }
288   }
289
290   /**
291    * Return the distance between two cursors.
292    */
293   size_t operator-(const CursorBase& other) const {
294     BufType *otherBuf = other.crtBuf_;
295     size_t len = 0;
296
297     if (otherBuf != crtBuf_) {
298       len += otherBuf->length() - other.offset_;
299
300       for (otherBuf = otherBuf->next();
301            otherBuf != crtBuf_ && otherBuf != other.buffer_;
302            otherBuf = otherBuf->next()) {
303         len += otherBuf->length();
304       }
305
306       if (otherBuf == other.buffer_) {
307         throw std::out_of_range("wrap-around");
308       }
309
310       len += offset_;
311     } else {
312       if (offset_ < other.offset_) {
313         throw std::out_of_range("underflow");
314       }
315
316       len += offset_ - other.offset_;
317     }
318
319     return len;
320   }
321
322   /**
323    * Return the distance from the given IOBuf to the this cursor.
324    */
325   size_t operator-(const BufType* buf) const {
326     size_t len = 0;
327
328     BufType *curBuf = buf;
329     while (curBuf != crtBuf_) {
330       len += curBuf->length();
331       curBuf = curBuf->next();
332       if (curBuf == buf || curBuf == buffer_) {
333         throw std::out_of_range("wrap-around");
334       }
335     }
336
337     len += offset_;
338     return len;
339   }
340
341  protected:
342   BufType* crtBuf_;
343   size_t offset_;
344
345   ~CursorBase(){}
346
347   bool tryAdvanceBuffer() {
348     BufType* nextBuf = crtBuf_->next();
349     if (UNLIKELY(nextBuf == buffer_)) {
350       offset_ = crtBuf_->length();
351       return false;
352     }
353
354     offset_ = 0;
355     crtBuf_ = nextBuf;
356     static_cast<Derived*>(this)->advanceDone();
357     return true;
358   }
359
360  private:
361   void advanceDone() {
362   }
363
364   BufType* buffer_;
365 };
366
367 template <class Derived>
368 class Writable {
369  public:
370   template <class T>
371   typename std::enable_if<std::is_integral<T>::value>::type
372   write(T value) {
373     const uint8_t* u8 = reinterpret_cast<const uint8_t*>(&value);
374     Derived* d = static_cast<Derived*>(this);
375     push(u8, sizeof(T));
376   }
377
378   template <class T>
379   void writeBE(T value) {
380     Derived* d = static_cast<Derived*>(this);
381     d->write(Endian::big(value));
382   }
383
384   template <class T>
385   void writeLE(T value) {
386     Derived* d = static_cast<Derived*>(this);
387     d->write(Endian::little(value));
388   }
389
390   void push(const uint8_t* buf, size_t len) {
391     Derived* d = static_cast<Derived*>(this);
392     if (d->pushAtMost(buf, len) != len) {
393       throw std::out_of_range("overflow");
394     }
395   }
396 };
397
398 } // namespace detail
399
400 class Cursor : public detail::CursorBase<Cursor, const IOBuf> {
401  public:
402   explicit Cursor(const IOBuf* buf)
403     : detail::CursorBase<Cursor, const IOBuf>(buf) {}
404
405   template <class CursorType>
406   explicit Cursor(CursorType& cursor)
407     : detail::CursorBase<Cursor, const IOBuf>(cursor) {}
408 };
409
410 enum class CursorAccess {
411   PRIVATE,
412   UNSHARE
413 };
414
415 template <CursorAccess access>
416 class RWCursor
417   : public detail::CursorBase<RWCursor<access>, IOBuf>,
418     public detail::Writable<RWCursor<access>> {
419   friend class detail::CursorBase<RWCursor<access>, IOBuf>;
420  public:
421   explicit RWCursor(IOBuf* buf)
422     : detail::CursorBase<RWCursor<access>, IOBuf>(buf),
423       maybeShared_(true) {}
424
425   template <class CursorType>
426   explicit RWCursor(CursorType& cursor)
427     : detail::CursorBase<RWCursor<access>, IOBuf>(cursor),
428       maybeShared_(true) {}
429   /**
430    * Gather at least n bytes contiguously into the current buffer,
431    * by coalescing subsequent buffers from the chain as necessary.
432    */
433   void gather(size_t n) {
434     this->crtBuf_->gather(this->offset_ + n);
435   }
436
437   size_t pushAtMost(const uint8_t* buf, size_t len) {
438     size_t copied = 0;
439     for (;;) {
440       // Fast path: the current buffer is big enough.
441       size_t available = this->length();
442       if (LIKELY(available >= len)) {
443         if (access == CursorAccess::UNSHARE) {
444           maybeUnshare();
445         }
446         memcpy(writableData(), buf, len);
447         this->offset_ += len;
448         return copied + len;
449       }
450
451       if (access == CursorAccess::UNSHARE) {
452         maybeUnshare();
453       }
454       memcpy(writableData(), buf, available);
455       copied += available;
456       if (UNLIKELY(!this->tryAdvanceBuffer())) {
457         return copied;
458       }
459       buf += available;
460       len -= available;
461     }
462   }
463
464   void insert(std::unique_ptr<folly::IOBuf> buf) {
465     folly::IOBuf* nextBuf;
466     if (this->offset_ == 0) {
467       // Can just prepend
468       nextBuf = buf.get();
469       this->crtBuf_->prependChain(std::move(buf));
470     } else {
471       std::unique_ptr<folly::IOBuf> remaining;
472       if (this->crtBuf_->length() - this->offset_ > 0) {
473         // Need to split current IOBuf in two.
474         remaining = this->crtBuf_->cloneOne();
475         remaining->trimStart(this->offset_);
476         nextBuf = remaining.get();
477         buf->prependChain(std::move(remaining));
478       } else {
479         // Can just append
480         nextBuf = this->crtBuf_->next();
481       }
482       this->crtBuf_->trimEnd(this->length());
483       this->crtBuf_->appendChain(std::move(buf));
484     }
485     // Jump past the new links
486     this->offset_ = 0;
487     this->crtBuf_ = nextBuf;
488   }
489
490   uint8_t* writableData() {
491     return this->crtBuf_->writableData() + this->offset_;
492   }
493
494  private:
495   void maybeUnshare() {
496     if (UNLIKELY(maybeShared_)) {
497       this->crtBuf_->unshareOne();
498       maybeShared_ = false;
499     }
500   }
501
502   void advanceDone() {
503     maybeShared_ = true;
504   }
505
506   bool maybeShared_;
507 };
508
509 typedef RWCursor<CursorAccess::PRIVATE> RWPrivateCursor;
510 typedef RWCursor<CursorAccess::UNSHARE> RWUnshareCursor;
511
512 /**
513  * Append to the end of a buffer chain, growing the chain (by allocating new
514  * buffers) in increments of at least growth bytes every time.  Won't grow
515  * (and push() and ensure() will throw) if growth == 0.
516  *
517  * TODO(tudorb): add a flavor of Appender that reallocates one IOBuf instead
518  * of chaining.
519  */
520 class Appender : public detail::Writable<Appender> {
521  public:
522   Appender(IOBuf* buf, uint32_t growth)
523     : buffer_(buf),
524       crtBuf_(buf->prev()),
525       growth_(growth) {
526   }
527
528   uint8_t* writableData() {
529     return crtBuf_->writableTail();
530   }
531
532   size_t length() const {
533     return crtBuf_->tailroom();
534   }
535
536   /**
537    * Mark n bytes (must be <= length()) as appended, as per the
538    * IOBuf::append() method.
539    */
540   void append(size_t n) {
541     crtBuf_->append(n);
542   }
543
544   /**
545    * Ensure at least n contiguous bytes available to write.
546    * Postcondition: length() >= n.
547    */
548   void ensure(uint32_t n) {
549     if (LIKELY(length() >= n)) {
550       return;
551     }
552
553     // Waste the rest of the current buffer and allocate a new one.
554     // Don't make it too small, either.
555     if (growth_ == 0) {
556       throw std::out_of_range("can't grow buffer chain");
557     }
558
559     n = std::max(n, growth_);
560     buffer_->prependChain(IOBuf::create(n));
561     crtBuf_ = buffer_->prev();
562   }
563
564   size_t pushAtMost(const uint8_t* buf, size_t len) {
565     size_t copied = 0;
566     for (;;) {
567       // Fast path: it all fits in one buffer.
568       size_t available = length();
569       if (LIKELY(available >= len)) {
570         memcpy(writableData(), buf, len);
571         append(len);
572         return copied + len;
573       }
574
575       memcpy(writableData(), buf, available);
576       append(available);
577       copied += available;
578       if (UNLIKELY(!tryGrowChain())) {
579         return copied;
580       }
581       buf += available;
582       len -= available;
583     }
584   }
585
586  private:
587   bool tryGrowChain() {
588     assert(crtBuf_->next() == buffer_);
589     if (growth_ == 0) {
590       return false;
591     }
592
593     buffer_->prependChain(IOBuf::create(growth_));
594     crtBuf_ = buffer_->prev();
595     return true;
596   }
597
598   IOBuf* buffer_;
599   IOBuf* crtBuf_;
600   uint32_t growth_;
601 };
602
603 class QueueAppender : public detail::Writable<QueueAppender> {
604  public:
605   /**
606    * Create an Appender that writes to a IOBufQueue.  When we allocate
607    * space in the queue, we grow no more than growth bytes at once
608    * (unless you call ensure() with a bigger value yourself).
609    */
610   QueueAppender(IOBufQueue* queue, uint32_t growth) {
611     reset(queue, growth);
612   }
613
614   void reset(IOBufQueue* queue, uint32_t growth) {
615     queue_ = queue;
616     growth_ = growth;
617   }
618
619   uint8_t* writableData() {
620     return static_cast<uint8_t*>(queue_->writableTail());
621   }
622
623   size_t length() const { return queue_->tailroom(); }
624
625   void append(size_t n) { queue_->postallocate(n); }
626
627   // Ensure at least n contiguous; can go above growth_, throws if
628   // not enough room.
629   void ensure(uint32_t n) { queue_->preallocate(n, growth_); }
630
631   template <class T>
632   typename std::enable_if<std::is_integral<T>::value>::type
633   write(T value) {
634     // We can't fail.
635     auto p = queue_->preallocate(sizeof(T), growth_);
636     storeUnaligned(p.first, value);
637     queue_->postallocate(sizeof(T));
638   }
639
640
641   size_t pushAtMost(const uint8_t* buf, size_t len) {
642     size_t remaining = len;
643     while (remaining != 0) {
644       auto p = queue_->preallocate(std::min(remaining, growth_),
645                                    growth_,
646                                    remaining);
647       memcpy(p.first, buf, p.second);
648       queue_->postallocate(p.second);
649       buf += p.second;
650       remaining -= p.second;
651     }
652
653     return len;
654   }
655
656   void insert(std::unique_ptr<folly::IOBuf> buf) {
657     if (buf) {
658       queue_->append(std::move(buf), true);
659     }
660   }
661
662  private:
663   folly::IOBufQueue* queue_;
664   size_t growth_;
665 };
666
667 }}  // folly::io
668
669 #endif // FOLLY_CURSOR_H