proxygen
IOBufQueue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/ScopeGuard.h>
20 #include <folly/io/IOBuf.h>
21 
22 #include <stdexcept>
23 #include <string>
24 
25 namespace folly {
26 
35 class IOBufQueue {
36  private:
43  auto updateGuard() {
44  flushCache();
45  return folly::makeGuard([this] { updateWritableTailCache(); });
46  }
47 
49  std::pair<uint8_t*, uint8_t*> cachedRange;
50  bool attached{false};
51 
52  WritableRangeCacheData() = default;
53 
55  : cachedRange(other.cachedRange), attached(other.attached) {
56  other.cachedRange = {};
57  other.attached = false;
58  }
60  cachedRange = other.cachedRange;
61  attached = other.attached;
62 
63  other.cachedRange = {};
64  other.attached = false;
65 
66  return *this;
67  }
68 
71  };
72 
73  public:
74  struct Options {
75  Options() : cacheChainLength(false) {}
77  };
78 
85  options.cacheChainLength = true;
86  return options;
87  }
88 
102  public:
103  explicit WritableRangeCache(folly::IOBufQueue* q = nullptr) : queue_(q) {
104  if (queue_) {
105  fillCache();
106  }
107  }
108 
114  : data_(std::move(other.data_)), queue_(other.queue_) {
115  if (data_.attached) {
116  queue_->updateCacheRef(data_);
117  }
118  }
120  if (data_.attached) {
121  queue_->clearWritableRangeCache();
122  }
123 
124  data_ = std::move(other.data_);
125  queue_ = other.queue_;
126 
127  if (data_.attached) {
128  queue_->updateCacheRef(data_);
129  }
130 
131  return *this;
132  }
133 
138  : queue_(other.queue_) {}
140  if (data_.attached) {
141  queue_->clearWritableRangeCache();
142  }
143 
144  queue_ = other.queue_;
145 
146  return *this;
147  }
148 
150  if (data_.attached) {
151  queue_->clearWritableRangeCache();
152  }
153  }
154 
158  void reset(IOBufQueue* q) {
159  if (data_.attached) {
160  queue_->clearWritableRangeCache();
161  }
162 
163  queue_ = q;
164 
165  if (queue_) {
166  fillCache();
167  }
168  }
169 
174  return queue_;
175  }
176 
183  dcheckIntegrity();
184  return data_.cachedRange.first;
185  }
186 
192  size_t length() {
193  dcheckIntegrity();
194  return data_.cachedRange.second - data_.cachedRange.first;
195  }
196 
200  void append(size_t n) {
201  dcheckIntegrity();
202  // This can happen only if somebody is misusing the interface.
203  // E.g. calling append after touching IOBufQueue or without checking
204  // the length().
205  if (LIKELY(data_.cachedRange.first != nullptr)) {
206  DCHECK_LE(n, length());
207  data_.cachedRange.first += n;
208  } else {
209  appendSlow(n);
210  }
211  }
212 
218  void appendUnsafe(size_t n) {
219  data_.cachedRange.first += n;
220  }
221 
225  void fillCache() {
226  queue_->fillWritableRangeCache(data_);
227  }
228 
229  private:
232 
233  FOLLY_NOINLINE void appendSlow(size_t n) {
234  queue_->postallocate(n);
235  }
236 
238  // Tail start should always be less than tail end.
239  DCHECK_LE(
240  (void*)data_.cachedRange.first, (void*)data_.cachedRange.second);
241  DCHECK(
242  data_.cachedRange.first != nullptr ||
243  data_.cachedRange.second == nullptr);
244 
245  // Cached range should be always empty if the cache is not attached.
246  DCHECK(
247  data_.attached ||
248  (data_.cachedRange.first == nullptr &&
249  data_.cachedRange.second == nullptr));
250 
251  // We cannot be in attached state if the queue_ is not set.
252  DCHECK(queue_ != nullptr || !data_.attached);
253 
254  // If we're attached and the cache is not empty, then it should coincide
255  // with the tail buffer.
256  DCHECK(
257  !data_.attached || data_.cachedRange.first == nullptr ||
258  (queue_->head_ != nullptr &&
259  data_.cachedRange.first >= queue_->head_->prev()->writableTail() &&
260  data_.cachedRange.second ==
261  queue_->head_->prev()->writableTail() +
262  queue_->head_->prev()->tailroom()));
263  }
264  };
265 
266  explicit IOBufQueue(const Options& options = Options());
267  ~IOBufQueue();
268 
272  std::pair<void*, std::size_t> headroom();
273 
277  void markPrepended(std::size_t n);
278 
283  void prepend(const void* buf, std::size_t n);
284 
294  void append(std::unique_ptr<folly::IOBuf>&& buf, bool pack = false);
295 
300  void append(IOBufQueue& other, bool pack = false);
301  void append(IOBufQueue&& other, bool pack = false) {
302  append(other, pack); // call lvalue reference overload, above
303  }
304 
309  void append(const void* buf, size_t len);
310 
315  void append(StringPiece sp) {
316  append(sp.data(), sp.size());
317  }
318 
331  void wrapBuffer(
332  const void* buf,
333  size_t len,
334  std::size_t blockSize = (1U << 31)); // default block size: 2GB
335 
356  std::pair<void*, std::size_t> preallocate(
357  std::size_t min,
358  std::size_t newAllocationSize,
359  std::size_t max = std::numeric_limits<std::size_t>::max()) {
361 
362  if (LIKELY(writableTail() != nullptr && tailroom() >= min)) {
363  return std::make_pair(
364  writableTail(), std::min<std::size_t>(max, tailroom()));
365  }
366 
367  return preallocateSlow(min, newAllocationSize, max);
368  }
369 
380  void postallocate(std::size_t n) {
382  DCHECK_LE(
383  (void*)(cachePtr_->cachedRange.first + n),
384  (void*)cachePtr_->cachedRange.second);
385  cachePtr_->cachedRange.first += n;
386  }
387 
392  void* allocate(std::size_t n) {
393  void* p = preallocate(n, n).first;
394  postallocate(n);
395  return p;
396  }
397 
398  void* writableTail() const {
400  return cachePtr_->cachedRange.first;
401  }
402 
403  size_t tailroom() const {
405  return cachePtr_->cachedRange.second - cachePtr_->cachedRange.first;
406  }
407 
420  std::unique_ptr<folly::IOBuf> split(size_t n) {
421  return split(n, true);
422  }
423 
428  std::unique_ptr<folly::IOBuf> splitAtMost(size_t n) {
429  return split(n, false);
430  }
431 
436  void trimStart(size_t amount);
437 
442  size_t trimStartAtMost(size_t amount);
443 
448  void trimEnd(size_t amount);
449 
454  size_t trimEndAtMost(size_t amount);
455 
459  std::unique_ptr<folly::IOBuf> move() {
460  auto guard = updateGuard();
461  std::unique_ptr<folly::IOBuf> res = std::move(head_);
462  chainLength_ = 0;
463  return res;
464  }
465 
476  const folly::IOBuf* front() const {
477  flushCache();
478  return head_.get();
479  }
480 
486  std::unique_ptr<folly::IOBuf> pop_front();
487 
492  size_t chainLength() const {
494  throw std::invalid_argument("IOBufQueue: chain length not cached");
495  }
497  return chainLength_ + (cachePtr_->cachedRange.first - tailStart_);
498  }
499 
503  bool empty() const {
505  return !head_ ||
506  (head_->empty() && cachePtr_->cachedRange.first == tailStart_);
507  }
508 
509  const Options& options() const {
510  return options_;
511  }
512 
518  void clear();
519 
523  void appendToString(std::string& out) const;
524 
528  void gather(std::size_t maxLength);
529 
533 
534  private:
535  std::unique_ptr<folly::IOBuf> split(size_t n, bool throwOnUnderflow);
536 
537  static const size_t kChainLengthNotCached = (size_t)-1;
539  IOBufQueue(const IOBufQueue&) = delete;
540  IOBufQueue& operator=(const IOBufQueue&) = delete;
541 
543 
544  // NOTE that chainLength_ is still updated even if !options_.cacheChainLength
545  // because doing it unchecked in postallocate() is faster (no (mis)predicted
546  // branch)
547  mutable size_t chainLength_{0};
553  std::unique_ptr<folly::IOBuf> head_;
554 
555  mutable uint8_t* tailStart_{nullptr};
558 
559  void dcheckCacheIntegrity() const {
560  // Tail start should always be less than tail end.
561  DCHECK_LE((void*)tailStart_, (void*)cachePtr_->cachedRange.first);
562  DCHECK_LE(
563  (void*)cachePtr_->cachedRange.first,
564  (void*)cachePtr_->cachedRange.second);
565  DCHECK(
566  cachePtr_->cachedRange.first != nullptr ||
567  cachePtr_->cachedRange.second == nullptr);
568 
569  // There is always an attached cache instance.
570  DCHECK(cachePtr_->attached);
571 
572  // Either cache is empty or it coincides with the tail.
573  DCHECK(
574  cachePtr_->cachedRange.first == nullptr ||
575  (head_ != nullptr && tailStart_ == head_->prev()->writableTail() &&
576  tailStart_ <= cachePtr_->cachedRange.first &&
577  cachePtr_->cachedRange.first >= head_->prev()->writableTail() &&
578  cachePtr_->cachedRange.second ==
579  head_->prev()->writableTail() + head_->prev()->tailroom()));
580  }
581 
587  if (cachePtr_ != &dest) {
588  dest = std::move(*cachePtr_);
589  cachePtr_ = &dest;
590  }
591  }
592 
597  flushCache();
598 
599  if (cachePtr_ != &localCache_) {
600  localCache_ = std::move(*cachePtr_);
602  }
603 
604  DCHECK(cachePtr_ == &localCache_ && localCache_.attached);
605  }
606 
610  void flushCache() const {
612 
613  if (tailStart_ != cachePtr_->cachedRange.first) {
614  auto buf = head_->prev();
615  DCHECK_EQ(
616  (void*)(buf->writableTail() + buf->tailroom()),
617  (void*)cachePtr_->cachedRange.second);
618  auto len = cachePtr_->cachedRange.first - tailStart_;
619  buf->append(len);
620  chainLength_ += len;
621  tailStart_ += len;
622  }
623  }
624 
625  // For WritableRangeCache move assignment/construction.
627  cachePtr_ = &newRef;
628  }
629 
634  if (LIKELY(head_ != nullptr)) {
635  IOBuf* buf = head_->prev();
636  if (LIKELY(!buf->isSharedOne())) {
637  tailStart_ = buf->writableTail();
638  cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(
639  tailStart_, tailStart_ + buf->tailroom());
640  return;
641  }
642  }
643  tailStart_ = nullptr;
644  cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>();
645  }
646 
647  std::pair<void*, std::size_t> preallocateSlow(
648  std::size_t min,
649  std::size_t newAllocationSize,
650  std::size_t max);
651 };
652 
653 } // namespace folly
void append(IOBufQueue &&other, bool pack=false)
Definition: IOBufQueue.h:301
WritableRangeCacheData * cachePtr_
Definition: IOBufQueue.h:556
std::unique_ptr< folly::IOBuf > split(size_t n)
Definition: IOBufQueue.h:420
const folly::IOBuf * front() const
Definition: IOBufQueue.h:476
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
Definition: IOBufQueue.cpp:143
size_t chainLength() const
Definition: IOBufQueue.h:492
std::pair< void *, std::size_t > preallocateSlow(std::size_t min, std::size_t newAllocationSize, std::size_t max)
Definition: IOBufQueue.cpp:207
void prepend(const void *buf, std::size_t n)
Definition: IOBufQueue.cpp:132
std::unique_ptr< folly::IOBuf > splitAtMost(size_t n)
Definition: IOBufQueue.h:428
WritableRangeCache & operator=(const WritableRangeCache &other)
Definition: IOBufQueue.h:139
LogLevel max
Definition: LogLevel.cpp:31
WritableRangeCache(WritableRangeCache &&other)
Definition: IOBufQueue.h:113
void dcheckCacheIntegrity() const
Definition: IOBufQueue.h:559
WritableRangeCacheData & operator=(WritableRangeCacheData &&other)
Definition: IOBufQueue.h:59
bool empty() const
Definition: IOBufQueue.h:503
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
WritableRangeCacheData data_
Definition: IOBufQueue.h:230
#define LIKELY(x)
Definition: Likely.h:47
void clearWritableRangeCache()
Definition: IOBufQueue.h:596
bool isSharedOne() const
Definition: IOBuf.h:952
size_t tailroom() const
Definition: IOBufQueue.h:403
dest
Definition: upload.py:394
STL namespace.
constexpr size_type size() const
Definition: Range.h:431
void markPrepended(std::size_t n)
Definition: IOBufQueue.cpp:121
std::unique_ptr< folly::IOBuf > move()
Definition: IOBufQueue.h:459
void append(StringPiece sp)
Definition: IOBufQueue.h:315
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void appendToString(std::string &out) const
Definition: IOBufQueue.cpp:340
requires E e noexcept(noexcept(s.error(std::move(e))))
void * writableTail() const
Definition: IOBufQueue.h:398
const Options & options() const
Definition: IOBufQueue.h:509
std::size_t tailroom() const
Definition: IOBuf.h:551
uint8_t * writableTail()
Definition: IOBuf.h:526
std::pair< void *, std::size_t > preallocate(std::size_t min, std::size_t newAllocationSize, std::size_t max=std::numeric_limits< std::size_t >::max())
Definition: IOBufQueue.h:356
size_t trimStartAtMost(size_t amount)
Definition: IOBufQueue.cpp:263
WritableRangeCache & operator=(WritableRangeCache &&other)
Definition: IOBufQueue.h:119
void fillWritableRangeCache(WritableRangeCacheData &dest)
Definition: IOBufQueue.h:585
auto updateGuard()
Definition: IOBufQueue.h:43
#define FOLLY_NOINLINE
Definition: CPortability.h:142
IOBufQueue(const Options &options=Options())
Definition: IOBufQueue.cpp:67
WritableRangeCacheData localCache_
Definition: IOBufQueue.h:557
LogLevel min
Definition: LogLevel.cpp:30
static Options cacheChainLength()
Definition: IOBufQueue.h:83
static const size_t kChainLengthNotCached
Definition: IOBufQueue.h:537
std::pair< void *, std::size_t > headroom()
Definition: IOBufQueue.cpp:111
constexpr Iter data() const
Definition: Range.h:446
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
void wrapBuffer(const void *buf, size_t len, std::size_t blockSize=(1U<< 31))
Definition: IOBufQueue.cpp:194
void flushCache() const
Definition: IOBufQueue.h:610
std::unique_ptr< folly::IOBuf > head_
Definition: IOBufQueue.h:553
void * allocate(std::size_t n)
Definition: IOBufQueue.h:392
size_t trimEndAtMost(size_t amount)
Definition: IOBufQueue.cpp:291
FOLLY_NOINLINE void appendSlow(size_t n)
Definition: IOBufQueue.h:233
IOBuf * prev()
Definition: IOBuf.h:610
void updateWritableTailCache()
Definition: IOBufQueue.h:633
FOLLY_NODISCARD detail::ScopeGuardImplDecay< F, true > makeGuard(F &&f) noexcept(noexcept(detail::ScopeGuardImplDecay< F, true >(static_cast< F && >(f))))
Definition: ScopeGuard.h:184
const char * string
Definition: Conv.cpp:212
WritableRangeCache(folly::IOBufQueue *q=nullptr)
Definition: IOBufQueue.h:103
WritableRangeCache(const WritableRangeCache &other)
Definition: IOBufQueue.h:137
void trimStart(size_t amount)
Definition: IOBufQueue.cpp:255
WritableRangeCacheData(WritableRangeCacheData &&other)
Definition: IOBufQueue.h:54
std::pair< uint8_t *, uint8_t * > cachedRange
Definition: IOBufQueue.h:49
#define UNLIKELY(x)
Definition: Likely.h:48
void trimEnd(size_t amount)
Definition: IOBufQueue.cpp:283
void postallocate(std::size_t n)
Definition: IOBufQueue.h:380
void gather(std::size_t maxLength)
Definition: IOBufQueue.cpp:361
uint8_t * tailStart_
Definition: IOBufQueue.h:555
StringPiece data_
std::unique_ptr< folly::IOBuf > pop_front()
Definition: IOBufQueue.cpp:316
void updateCacheRef(WritableRangeCacheData &newRef)
Definition: IOBufQueue.h:626