Allow capacity and length to be different for user buffers
[folly.git] / folly / experimental / io / IOBufQueue.cpp
1 /*
2  * Copyright 2012 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/experimental/io/IOBufQueue.h"
18
19 #include <string.h>
20
21 #include <stdexcept>
22
23 using std::make_pair;
24 using std::pair;
25 using std::unique_ptr;
26
27 namespace {
28
29 using folly::IOBuf;
30
31 const size_t MIN_ALLOC_SIZE = 2000;
32 const size_t MAX_ALLOC_SIZE = 8000; // Must fit within a uint32_t
33
34 /**
35  * Convenience function to append chain src to chain dst.
36  */
37 void
38 appendToChain(unique_ptr<IOBuf>& dst, unique_ptr<IOBuf>&& src) {
39   if (dst == NULL) {
40     dst = std::move(src);
41   } else {
42     dst->prev()->appendChain(std::move(src));
43   }
44 }
45
46 } // anonymous namespace
47
48 namespace folly {
49
50 IOBufQueue::IOBufQueue(const Options& options)
51   : options_(options),
52     chainLength_(0) {
53 }
54
55 IOBufQueue::IOBufQueue(IOBufQueue&& other)
56   : options_(other.options_),
57     chainLength_(other.chainLength_),
58     head_(std::move(other.head_)) {
59   other.chainLength_ = 0;
60 }
61
62 IOBufQueue& IOBufQueue::operator=(IOBufQueue&& other) {
63   if (&other != this) {
64     options_ = other.options_;
65     chainLength_ = other.chainLength_;
66     head_ = std::move(other.head_);
67     other.chainLength_ = 0;
68   }
69   return *this;
70 }
71
72 std::pair<void*, uint32_t>
73 IOBufQueue::headroom() {
74   if (head_) {
75     return std::make_pair(head_->writableBuffer(), head_->headroom());
76   } else {
77     return std::make_pair(nullptr, 0);
78   }
79 }
80
81 void
82 IOBufQueue::markPrepended(uint32_t n) {
83   if (n == 0) {
84     return;
85   }
86   assert(head_);
87   head_->prepend(n);
88   if (options_.cacheChainLength) {
89     chainLength_ += n;
90   }
91 }
92
93 void
94 IOBufQueue::prepend(const void* buf, uint32_t n) {
95   auto p = headroom();
96   if (n > p.second) {
97     throw std::overflow_error("Not enough room to prepend");
98   }
99   memcpy(static_cast<char*>(p.first) + p.second - n, buf, n);
100   markPrepended(n);
101 }
102
103 void
104 IOBufQueue::append(unique_ptr<IOBuf>&& buf) {
105   if (!buf) {
106     return;
107   }
108   if (options_.cacheChainLength) {
109     chainLength_ += buf->computeChainDataLength();
110   }
111   appendToChain(head_, std::move(buf));
112 }
113
114 void
115 IOBufQueue::append(IOBufQueue& other) {
116   if (!other.head_) {
117     return;
118   }
119   if (options_.cacheChainLength) {
120     if (other.options_.cacheChainLength) {
121       chainLength_ += other.chainLength_;
122     } else {
123       chainLength_ += other.head_->computeChainDataLength();
124     }
125   }
126   appendToChain(head_, std::move(other.head_));
127   other.chainLength_ = 0;
128 }
129
130 void
131 IOBufQueue::append(const void* buf, size_t len) {
132   auto src = static_cast<const uint8_t*>(buf);
133   while (len != 0) {
134     if ((head_ == NULL) || head_->prev()->isSharedOne() ||
135         (head_->prev()->tailroom() == 0)) {
136       appendToChain(head_, std::move(
137           IOBuf::create(std::max(MIN_ALLOC_SIZE,
138               std::min(len, MAX_ALLOC_SIZE)))));
139     }
140     IOBuf* last = head_->prev();
141     uint32_t copyLen = std::min(len, (size_t)last->tailroom());
142     memcpy(last->writableTail(), src, copyLen);
143     src += copyLen;
144     last->append(copyLen);
145     if (options_.cacheChainLength) {
146       chainLength_ += copyLen;
147     }
148     len -= copyLen;
149   }
150 }
151
152 void
153 IOBufQueue::wrapBuffer(const void* buf, size_t len, uint32_t blockSize) {
154   auto src = static_cast<const uint8_t*>(buf);
155   while (len != 0) {
156     size_t n = std::min(len, size_t(blockSize));
157     append(IOBuf::wrapBuffer(src, n));
158     src += n;
159     len -= n;
160   }
161 }
162
163 pair<void*,uint32_t>
164 IOBufQueue::preallocate(uint32_t min, uint32_t max) {
165   if (head_ != NULL) {
166     // If there's enough space left over at the end of the queue, use that.
167     IOBuf* last = head_->prev();
168     if (!last->isSharedOne()) {
169       uint32_t avail = last->tailroom();
170       if (avail >= min) {
171         return make_pair(
172             last->writableTail(), std::min(max, avail));
173       }
174     }
175   }
176   // Allocate a new buffer of the requested max size.
177   unique_ptr<IOBuf> newBuf(IOBuf::create(max));
178   appendToChain(head_, std::move(newBuf));
179   IOBuf* last = head_->prev();
180   return make_pair(last->writableTail(),
181       std::min(max, last->tailroom()));
182 }
183
184 void
185 IOBufQueue::postallocate(uint32_t n) {
186   head_->prev()->append(n);
187   if (options_.cacheChainLength) {
188     chainLength_ += n;
189   }
190 }
191
192 unique_ptr<IOBuf>
193 IOBufQueue::split(size_t n) {
194   unique_ptr<IOBuf> result;
195   while (n != 0) {
196     if (head_ == NULL) {
197       throw std::underflow_error(
198           "Attempt to remove more bytes than are present in IOBufQueue");
199     } else if (head_->length() <= n) {
200       n -= head_->length();
201       if (options_.cacheChainLength) {
202         chainLength_ -= head_->length();
203       }
204       unique_ptr<IOBuf> remainder = head_->pop();
205       appendToChain(result, std::move(head_));
206       head_ = std::move(remainder);
207     } else {
208       unique_ptr<IOBuf> clone = head_->cloneOne();
209       clone->trimEnd(clone->length() - n);
210       appendToChain(result, std::move(clone));
211       head_->trimStart(n);
212       if (options_.cacheChainLength) {
213         chainLength_ -= n;
214       }
215       break;
216     }
217   }
218   return std::move(result);
219 }
220
221 void IOBufQueue::trimStart(size_t amount) {
222   while (amount > 0) {
223     if (!head_) {
224       throw std::underflow_error(
225         "Attempt to trim more bytes than are present in IOBufQueue");
226     }
227     if (head_->length() > amount) {
228       head_->trimStart(amount);
229       if (options_.cacheChainLength) {
230         chainLength_ -= amount;
231       }
232       break;
233     }
234     amount -= head_->length();
235     if (options_.cacheChainLength) {
236       chainLength_ -= head_->length();
237     }
238     head_ = head_->pop();
239   }
240 }
241
242 void IOBufQueue::trimEnd(size_t amount) {
243   while (amount > 0) {
244     if (!head_) {
245       throw std::underflow_error(
246         "Attempt to trim more bytes than are present in IOBufQueue");
247     }
248     if (head_->prev()->length() > amount) {
249       head_->prev()->trimEnd(amount);
250       if (options_.cacheChainLength) {
251         chainLength_ -= amount;
252       }
253       break;
254     }
255     amount -= head_->prev()->length();
256     if (options_.cacheChainLength) {
257       chainLength_ -= head_->prev()->length();
258     }
259     unique_ptr<IOBuf> b = head_->prev()->unlink();
260
261     // Null queue if we unlinked the head.
262     if (b.get() == head_.get()) {
263       head_.reset();
264     }
265   }
266 }
267
268 } // folly