proxygen
IOBufQueue.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/io/IOBufQueue.h>
18 
19 #include <string.h>
20 
21 #include <stdexcept>
22 
23 using std::make_pair;
24 using std::pair;
25 using std::unique_ptr;
26 
27 namespace {
28 
29 using folly::IOBuf;
30 
31 const size_t MIN_ALLOC_SIZE = 2000;
32 const size_t MAX_ALLOC_SIZE = 8000;
33 const size_t MAX_PACK_COPY = 4096;
34 
38 void appendToChain(unique_ptr<IOBuf>& dst, unique_ptr<IOBuf>&& src, bool pack) {
39  if (dst == nullptr) {
40  dst = std::move(src);
41  } else {
42  IOBuf* tail = dst->prev();
43  if (pack) {
44  // Copy up to MAX_PACK_COPY bytes if we can free buffers; this helps
45  // reduce wastage (the tail's tailroom and the head's headroom) when
46  // joining two IOBufQueues together.
47  size_t copyRemaining = MAX_PACK_COPY;
48  std::size_t n;
49  while (src && (n = src->length()) < copyRemaining &&
50  n < tail->tailroom() && n > 0) {
51  memcpy(tail->writableTail(), src->data(), n);
52  tail->append(n);
53  copyRemaining -= n;
54  src = src->pop();
55  }
56  }
57  if (src) {
58  tail->appendChain(std::move(src));
59  }
60  }
61 }
62 
63 } // namespace
64 
65 namespace folly {
66 
68  : options_(options), cachePtr_(&localCache_) {
69  localCache_.attached = true;
70 }
71 
74 }
75 
78  other.clearWritableRangeCache();
79  head_ = std::move(other.head_);
80  chainLength_ = other.chainLength_;
81 
82  tailStart_ = other.tailStart_;
83  localCache_.cachedRange = other.localCache_.cachedRange;
84  localCache_.attached = true;
85 
86  other.chainLength_ = 0;
87  other.tailStart_ = nullptr;
88  other.localCache_.cachedRange = {nullptr, nullptr};
89 }
90 
92  if (&other != this) {
95 
96  options_ = other.options_;
97  head_ = std::move(other.head_);
98  chainLength_ = other.chainLength_;
99 
100  tailStart_ = other.tailStart_;
101  localCache_.cachedRange = other.localCache_.cachedRange;
102  localCache_.attached = true;
103 
104  other.chainLength_ = 0;
105  other.tailStart_ = nullptr;
106  other.localCache_.cachedRange = {nullptr, nullptr};
107  }
108  return *this;
109 }
110 
111 std::pair<void*, std::size_t> IOBufQueue::headroom() {
112  // Note, headroom is independent from the tail, so we don't need to flush the
113  // cache.
114  if (head_) {
115  return std::make_pair(head_->writableBuffer(), head_->headroom());
116  } else {
117  return std::make_pair(nullptr, 0);
118  }
119 }
120 
121 void IOBufQueue::markPrepended(std::size_t n) {
122  if (n == 0) {
123  return;
124  }
125  // Note, headroom is independent from the tail, so we don't need to flush the
126  // cache.
127  assert(head_);
128  head_->prepend(n);
129  chainLength_ += n;
130 }
131 
132 void IOBufQueue::prepend(const void* buf, std::size_t n) {
133  // We're not touching the tail, so we don't need to flush the cache.
134  auto hroom = head_->headroom();
135  if (!head_ || hroom < n) {
136  throw std::overflow_error("Not enough room to prepend");
137  }
138  memcpy(head_->writableBuffer() + hroom - n, buf, n);
139  head_->prepend(n);
140  chainLength_ += n;
141 }
142 
143 void IOBufQueue::append(unique_ptr<IOBuf>&& buf, bool pack) {
144  if (!buf) {
145  return;
146  }
147  auto guard = updateGuard();
149  chainLength_ += buf->computeChainDataLength();
150  }
151  appendToChain(head_, std::move(buf), pack);
152 }
153 
154 void IOBufQueue::append(IOBufQueue& other, bool pack) {
155  if (!other.head_) {
156  return;
157  }
158  // We're going to chain other, thus we need to grab both guards.
159  auto otherGuard = other.updateGuard();
160  auto guard = updateGuard();
162  if (other.options_.cacheChainLength) {
163  chainLength_ += other.chainLength_;
164  } else {
165  chainLength_ += other.head_->computeChainDataLength();
166  }
167  }
168  appendToChain(head_, std::move(other.head_), pack);
169  other.chainLength_ = 0;
170 }
171 
172 void IOBufQueue::append(const void* buf, size_t len) {
173  auto guard = updateGuard();
174  auto src = static_cast<const uint8_t*>(buf);
175  while (len != 0) {
176  if ((head_ == nullptr) || head_->prev()->isSharedOne() ||
177  (head_->prev()->tailroom() == 0)) {
178  appendToChain(
179  head_,
181  std::max(MIN_ALLOC_SIZE, std::min(len, MAX_ALLOC_SIZE))),
182  false);
183  }
184  IOBuf* last = head_->prev();
185  std::size_t copyLen = std::min(len, (size_t)last->tailroom());
186  memcpy(last->writableTail(), src, copyLen);
187  src += copyLen;
188  last->append(copyLen);
189  chainLength_ += copyLen;
190  len -= copyLen;
191  }
192 }
193 
195  const void* buf,
196  size_t len,
197  std::size_t blockSize) {
198  auto src = static_cast<const uint8_t*>(buf);
199  while (len != 0) {
200  size_t n = std::min(len, size_t(blockSize));
201  append(IOBuf::wrapBuffer(src, n));
202  src += n;
203  len -= n;
204  }
205 }
206 
208  std::size_t min,
209  std::size_t newAllocationSize,
210  std::size_t max) {
211  // Avoid grabbing update guard, since we're manually setting the cache ptrs.
212  flushCache();
213  // Allocate a new buffer of the requested max size.
214  unique_ptr<IOBuf> newBuf(IOBuf::create(std::max(min, newAllocationSize)));
215 
216  tailStart_ = newBuf->writableTail();
217  cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(
218  tailStart_, tailStart_ + newBuf->tailroom());
219  appendToChain(head_, std::move(newBuf), false);
220  return make_pair(writableTail(), std::min<std::size_t>(max, tailroom()));
221 }
222 
223 unique_ptr<IOBuf> IOBufQueue::split(size_t n, bool throwOnUnderflow) {
224  auto guard = updateGuard();
225  unique_ptr<IOBuf> result;
226  while (n != 0) {
227  if (head_ == nullptr) {
228  if (throwOnUnderflow) {
229  throw std::underflow_error(
230  "Attempt to remove more bytes than are present in IOBufQueue");
231  } else {
232  break;
233  }
234  } else if (head_->length() <= n) {
235  n -= head_->length();
236  chainLength_ -= head_->length();
237  unique_ptr<IOBuf> remainder = head_->pop();
238  appendToChain(result, std::move(head_), false);
239  head_ = std::move(remainder);
240  } else {
241  unique_ptr<IOBuf> clone = head_->cloneOne();
242  clone->trimEnd(clone->length() - n);
243  appendToChain(result, std::move(clone), false);
244  head_->trimStart(n);
245  chainLength_ -= n;
246  break;
247  }
248  }
249  if (UNLIKELY(result == nullptr)) {
250  return IOBuf::create(0);
251  }
252  return result;
253 }
254 
255 void IOBufQueue::trimStart(size_t amount) {
256  auto trimmed = trimStartAtMost(amount);
257  if (trimmed != amount) {
258  throw std::underflow_error(
259  "Attempt to trim more bytes than are present in IOBufQueue");
260  }
261 }
262 
263 size_t IOBufQueue::trimStartAtMost(size_t amount) {
264  auto guard = updateGuard();
265  auto original = amount;
266  while (amount > 0) {
267  if (!head_) {
268  break;
269  }
270  if (head_->length() > amount) {
271  head_->trimStart(amount);
272  chainLength_ -= amount;
273  amount = 0;
274  break;
275  }
276  amount -= head_->length();
277  chainLength_ -= head_->length();
278  head_ = head_->pop();
279  }
280  return original - amount;
281 }
282 
283 void IOBufQueue::trimEnd(size_t amount) {
284  auto trimmed = trimEndAtMost(amount);
285  if (trimmed != amount) {
286  throw std::underflow_error(
287  "Attempt to trim more bytes than are present in IOBufQueue");
288  }
289 }
290 
291 size_t IOBufQueue::trimEndAtMost(size_t amount) {
292  auto guard = updateGuard();
293  auto original = amount;
294  while (amount > 0) {
295  if (!head_) {
296  break;
297  }
298  if (head_->prev()->length() > amount) {
299  head_->prev()->trimEnd(amount);
300  chainLength_ -= amount;
301  amount = 0;
302  break;
303  }
304  amount -= head_->prev()->length();
305  chainLength_ -= head_->prev()->length();
306 
307  if (head_->isChained()) {
308  head_->prev()->unlink();
309  } else {
310  head_.reset();
311  }
312  }
313  return original - amount;
314 }
315 
316 std::unique_ptr<folly::IOBuf> IOBufQueue::pop_front() {
317  auto guard = updateGuard();
318  if (!head_) {
319  return nullptr;
320  }
321  chainLength_ -= head_->length();
322  std::unique_ptr<folly::IOBuf> retBuf = std::move(head_);
323  head_ = retBuf->pop();
324  return retBuf;
325 }
326 
328  if (!head_) {
329  return;
330  }
331  auto guard = updateGuard();
332  IOBuf* buf = head_.get();
333  do {
334  buf->clear();
335  buf = buf->next();
336  } while (buf != head_.get());
337  chainLength_ = 0;
338 }
339 
341  if (!head_) {
342  return;
343  }
344  auto len = options_.cacheChainLength
346  : head_->computeChainDataLength() +
347  (cachePtr_->cachedRange.first - tailStart_);
348  out.reserve(out.size() + len);
349 
350  for (auto range : *head_) {
351  out.append(reinterpret_cast<const char*>(range.data()), range.size());
352  }
353 
354  if (tailStart_ != cachePtr_->cachedRange.first) {
355  out.append(
356  reinterpret_cast<const char*>(tailStart_),
357  cachePtr_->cachedRange.first - tailStart_);
358  }
359 }
360 
361 void IOBufQueue::gather(std::size_t maxLength) {
362  auto guard = updateGuard();
363  if (head_ != nullptr) {
364  head_->gather(maxLength);
365  }
366 }
367 
368 } // namespace folly
WritableRangeCacheData * cachePtr_
Definition: IOBufQueue.h:556
std::unique_ptr< folly::IOBuf > split(size_t n)
Definition: IOBufQueue.h:420
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
Definition: IOBufQueue.cpp:143
std::pair< void *, std::size_t > preallocateSlow(std::size_t min, std::size_t newAllocationSize, std::size_t max)
Definition: IOBufQueue.cpp:207
void prepend(const void *buf, std::size_t n)
Definition: IOBufQueue.cpp:132
static std::unique_ptr< IOBuf > create(std::size_t capacity)
Definition: IOBuf.cpp:229
LogLevel max
Definition: LogLevel.cpp:31
static std::unique_ptr< IOBuf > wrapBuffer(const void *buf, std::size_t capacity)
Definition: IOBuf.cpp:353
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
void clearWritableRangeCache()
Definition: IOBufQueue.h:596
size_t tailroom() const
Definition: IOBufQueue.h:403
void markPrepended(std::size_t n)
Definition: IOBufQueue.cpp:121
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void appendToString(std::string &out) const
Definition: IOBufQueue.cpp:340
requires E e noexcept(noexcept(s.error(std::move(e))))
void * writableTail() const
Definition: IOBufQueue.h:398
std::size_t tailroom() const
Definition: IOBuf.h:551
uint8_t * writableTail()
Definition: IOBuf.h:526
size_t trimStartAtMost(size_t amount)
Definition: IOBufQueue.cpp:263
auto updateGuard()
Definition: IOBufQueue.h:43
IOBufQueue(const Options &options=Options())
Definition: IOBufQueue.cpp:67
std::unique_ptr< IOBuf > pop()
Definition: IOBuf.h:859
WritableRangeCacheData localCache_
Definition: IOBufQueue.h:557
LogLevel min
Definition: LogLevel.cpp:30
std::pair< void *, std::size_t > headroom()
Definition: IOBufQueue.cpp:111
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
void wrapBuffer(const void *buf, size_t len, std::size_t blockSize=(1U<< 31))
Definition: IOBufQueue.cpp:194
constexpr Range< Iter > range(Iter first, Iter last)
Definition: Range.h:1114
void flushCache() const
Definition: IOBufQueue.h:610
std::unique_ptr< folly::IOBuf > head_
Definition: IOBufQueue.h:553
IOBuf * next()
Definition: IOBuf.h:600
size_t trimEndAtMost(size_t amount)
Definition: IOBufQueue.cpp:291
IOBufQueue & operator=(IOBufQueue &&)
Definition: IOBufQueue.cpp:91
void clear()
Definition: IOBuf.h:728
const char * string
Definition: Conv.cpp:212
Definition: Traits.h:577
void trimStart(size_t amount)
Definition: IOBufQueue.cpp:255
std::pair< uint8_t *, uint8_t * > cachedRange
Definition: IOBufQueue.h:49
#define UNLIKELY(x)
Definition: Likely.h:48
void trimEnd(size_t amount)
Definition: IOBufQueue.cpp:283
void gather(std::size_t maxLength)
Definition: IOBufQueue.cpp:361
uint8_t * tailStart_
Definition: IOBufQueue.h:555
void append(std::size_t amount)
Definition: IOBuf.h:689
std::unique_ptr< folly::IOBuf > pop_front()
Definition: IOBufQueue.cpp:316