Pull from FB rev 63ce89e2f2301e6bba44a111cc7d4218022156f6
[folly.git] / folly / experimental / io / IOBufQueue.cpp
1 /*
2  * Copyright 2012 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/experimental/io/IOBufQueue.h"
18
19 #include <string.h>
20
21 #include <stdexcept>
22
23 using std::make_pair;
24 using std::pair;
25 using std::unique_ptr;
26
27 namespace {
28
29 using folly::IOBuf;
30
31 const size_t MIN_ALLOC_SIZE = 2000;
32 const size_t MAX_ALLOC_SIZE = 8000; // Must fit within a uint32_t
33
34 /**
35  * Convenience function to append chain src to chain dst.
36  */
37 void
38 appendToChain(unique_ptr<IOBuf>& dst, unique_ptr<IOBuf>&& src) {
39   if (dst == NULL) {
40     dst = std::move(src);
41   } else {
42     dst->prev()->appendChain(std::move(src));
43   }
44 }
45
46 } // anonymous namespace
47
48 namespace folly {
49
50 IOBufQueue::IOBufQueue(const Options& options)
51   : options_(options),
52     chainLength_(0) {
53 }
54
55 IOBufQueue::IOBufQueue(IOBufQueue&& other)
56   : options_(other.options_),
57     chainLength_(other.chainLength_),
58     head_(std::move(other.head_)) {
59   other.chainLength_ = 0;
60 }
61
62 IOBufQueue& IOBufQueue::operator=(IOBufQueue&& other) {
63   if (&other != this) {
64     options_ = other.options_;
65     chainLength_ = other.chainLength_;
66     head_ = std::move(other.head_);
67     other.chainLength_ = 0;
68   }
69   return *this;
70 }
71
72 void
73 IOBufQueue::append(unique_ptr<IOBuf>&& buf) {
74   if (!buf) {
75     return;
76   }
77   if (options_.cacheChainLength) {
78     chainLength_ += buf->computeChainDataLength();
79   }
80   appendToChain(head_, std::move(buf));
81 }
82
83 void
84 IOBufQueue::append(IOBufQueue& other) {
85   if (!other.head_) {
86     return;
87   }
88   if (options_.cacheChainLength) {
89     if (other.options_.cacheChainLength) {
90       chainLength_ += other.chainLength_;
91     } else {
92       chainLength_ += other.head_->computeChainDataLength();
93     }
94   }
95   appendToChain(head_, std::move(other.head_));
96   other.chainLength_ = 0;
97 }
98
99 void
100 IOBufQueue::append(const void* buf, size_t len) {
101   auto src = static_cast<const uint8_t*>(buf);
102   while (len != 0) {
103     if ((head_ == NULL) || head_->prev()->isSharedOne() ||
104         (head_->prev()->tailroom() == 0)) {
105       appendToChain(head_, std::move(
106           IOBuf::create(std::max(MIN_ALLOC_SIZE,
107               std::min(len, MAX_ALLOC_SIZE)))));
108     }
109     IOBuf* last = head_->prev();
110     uint32_t copyLen = std::min(len, (size_t)last->tailroom());
111     memcpy(last->writableTail(), src, copyLen);
112     src += copyLen;
113     last->append(copyLen);
114     if (options_.cacheChainLength) {
115       chainLength_ += copyLen;
116     }
117     len -= copyLen;
118   }
119 }
120
121 pair<void*,uint32_t>
122 IOBufQueue::preallocate(uint32_t min, uint32_t max) {
123   if (head_ != NULL) {
124     // If there's enough space left over at the end of the queue, use that.
125     IOBuf* last = head_->prev();
126     if (!last->isSharedOne()) {
127       uint32_t avail = last->tailroom();
128       if (avail >= min) {
129         return make_pair(
130             last->writableTail(), std::min(max, avail));
131       }
132     }
133   }
134   // Allocate a new buffer of the requested max size.
135   unique_ptr<IOBuf> newBuf(IOBuf::create(max));
136   appendToChain(head_, std::move(newBuf));
137   IOBuf* last = head_->prev();
138   return make_pair(last->writableTail(),
139       std::min(max, last->tailroom()));
140 }
141
142 void
143 IOBufQueue::postallocate(uint32_t n) {
144   head_->prev()->append(n);
145   if (options_.cacheChainLength) {
146     chainLength_ += n;
147   }
148 }
149
150 unique_ptr<IOBuf>
151 IOBufQueue::split(size_t n) {
152   unique_ptr<IOBuf> result;
153   while (n != 0) {
154     if (head_ == NULL) {
155       throw std::underflow_error(
156           "Attempt to remove more bytes than are present in IOBufQueue");
157     } else if (head_->length() <= n) {
158       n -= head_->length();
159       if (options_.cacheChainLength) {
160         chainLength_ -= head_->length();
161       }
162       unique_ptr<IOBuf> remainder = head_->pop();
163       appendToChain(result, std::move(head_));
164       head_ = std::move(remainder);
165     } else {
166       unique_ptr<IOBuf> clone = head_->cloneOne();
167       clone->trimEnd(clone->length() - n);
168       appendToChain(result, std::move(clone));
169       head_->trimStart(n);
170       if (options_.cacheChainLength) {
171         chainLength_ -= n;
172       }
173       break;
174     }
175   }
176   return std::move(result);
177 }
178
179 void IOBufQueue::trimStart(size_t amount) {
180   while (amount > 0) {
181     if (!head_) {
182       throw std::underflow_error(
183         "Attempt to trim more bytes than are present in IOBufQueue");
184     }
185     if (head_->length() > amount) {
186       head_->trimStart(amount);
187       if (options_.cacheChainLength) {
188         chainLength_ -= amount;
189       }
190       break;
191     }
192     amount -= head_->length();
193     if (options_.cacheChainLength) {
194       chainLength_ -= head_->length();
195     }
196     head_ = head_->pop();
197   }
198 }
199
200 void IOBufQueue::trimEnd(size_t amount) {
201   while (amount > 0) {
202     if (!head_) {
203       throw std::underflow_error(
204         "Attempt to trim more bytes than are present in IOBufQueue");
205     }
206     if (head_->prev()->length() > amount) {
207       head_->prev()->trimEnd(amount);
208       if (options_.cacheChainLength) {
209         chainLength_ -= amount;
210       }
211       break;
212     }
213     amount -= head_->prev()->length();
214     if (options_.cacheChainLength) {
215       chainLength_ -= head_->prev()->length();
216     }
217     unique_ptr<IOBuf> b = head_->prev()->unlink();
218
219     // Null queue if we unlinked the head.
220     if (b.get() == head_.get()) {
221       head_.reset();
222     }
223   }
224 }
225
226 } // folly