23 #include <type_traits> 55 template <
class Derived,
class BufType>
58 template <
class D,
typename B>
75 template <
class OtherDerived,
class OtherBuf>
131 end.crtPos_ =
end.crtEnd_;
144 size_t available =
length();
146 if (available >= amount) {
150 nextBuf = nextBuf->
next();
151 available = nextBuf->
length();
192 auto* nextBuf =
crtBuf_->next();
206 Derived* p =
static_cast<Derived*
>(
this);
211 Derived other(*
this);
217 Derived* p =
static_cast<Derived*
>(
this);
222 Derived other(*
this);
223 other.retreat(offset);
237 while (crtPos == crtBuf->
tail() && crtBuf !=
buffer_->prev()) {
238 crtBuf = crtBuf->
next();
239 crtPos = crtBuf->
data();
242 const IOBuf* crtBufOther = other.crtBuf_;
243 auto crtPosOther = other.crtPos_;
245 while (crtPosOther == crtBufOther->
tail() &&
246 crtBufOther != other.buffer_->
prev()) {
247 crtBufOther = crtBufOther->
next();
248 crtPosOther = crtBufOther->
data();
250 return (crtPos == crtPosOther) && (crtBuf == crtBufOther);
260 val = loadUnaligned<T>(
data());
269 const bool result =
tryRead(val);
276 const bool result =
tryRead(val);
288 return readSlow<T>();
313 str.append(reinterpret_cast<const char*>(
data()), len);
330 char termChar =
'\0',
341 template <
typename Predicate>
350 template <
typename Predicate,
typename Output>
351 void readWhile(
const Predicate& predicate, Output& out);
359 template <
typename Predicate>
360 void skipWhile(
const Predicate& predicate);
411 memcpy(buf,
data(), len);
418 void pull(
void* buf,
size_t len) {
424 memcpy(buf,
data(), len);
438 size_t available =
length();
451 std::pair<const uint8_t*, size_t>
peek() {
453 return std::make_pair(bytes.data(), bytes.size());
456 void clone(std::unique_ptr<folly::IOBuf>& buf,
size_t len) {
458 throw_exception<std::out_of_range>(
"underflow");
464 throw_exception<std::out_of_range>(
"underflow");
472 std::unique_ptr<folly::IOBuf> tmp;
474 for (
int loopCount = 0;
true; ++loopCount) {
476 size_t available =
length();
477 if (
LIKELY(available >= len)) {
478 if (loopCount == 0) {
494 if (loopCount == 0) {
511 size_t cloneAtMost(std::unique_ptr<folly::IOBuf>& buf,
size_t len) {
513 buf = std::make_unique<folly::IOBuf>();
528 for (otherBuf = otherBuf->next();
530 otherBuf = otherBuf->next()) {
531 len += otherBuf->length();
534 if (otherBuf == other.
buffer_) {
535 throw_exception<std::out_of_range>(
"wrap-around");
541 throw_exception<std::out_of_range>(
"underflow");
558 len += curBuf->length();
559 curBuf = curBuf->next();
560 if (curBuf == buf || curBuf ==
buffer_) {
561 throw_exception<std::out_of_range>(
"wrap-around");
635 for (
size_t available; (available =
length()) < len;) {
636 str->append(reinterpret_cast<const char*>(
data()), available);
638 throw_exception<std::out_of_range>(
"string underflow");
642 str->append(reinterpret_cast<const char*>(
data()), len);
656 for (
size_t available; (available =
length()) < len;) {
657 memcpy(p,
data(), available);
665 memcpy(p,
data(), len);
673 throw_exception<std::out_of_range>(
"underflow");
679 for (
size_t available; (available =
length()) < len;) {
680 skipped += available;
688 return skipped + len;
693 throw_exception<std::out_of_range>(
"underflow");
698 size_t retreated = 0;
700 retreated += available;
707 return retreated + len;
712 throw_exception<std::out_of_range>(
"underflow");
726 template <
class OtherDerived,
class OtherBuf>
733 template <
class Derived>
739 Derived* d =
static_cast<Derived*
>(
this);
740 d->push(u8,
sizeof(
T));
745 Derived* d =
static_cast<Derived*
>(
this);
751 Derived* d =
static_cast<Derived*
>(
this);
756 Derived* d =
static_cast<Derived*
>(
this);
757 if (d->pushAtMost(buf, len) != len) {
758 throw_exception<std::out_of_range>(
"overflow");
763 if (this->pushAtMost(buf) != buf.
size()) {
764 throw_exception<std::out_of_range>(
"overflow");
769 Derived* d =
static_cast<Derived*
>(
this);
770 return d->pushAtMost(buf.
data(), buf.
size());
779 if (this->pushAtMost(cursor, len) != len) {
780 throw_exception<std::out_of_range>(
"overflow");
788 const uint8_t* crtData = currentBuffer.data();
789 size_t available = currentBuffer.size();
790 if (available == 0) {
795 if (available >= len) {
796 this->push(crtData, len);
798 return written + len;
802 this->push(crtData, available);
803 cursor.
skip(available);
804 written += available;
814 template <CursorAccess access>
823 template <
class OtherDerived,
class OtherBuf>
826 maybeShared_(true) {}
841 throw std::overflow_error(
"cannot gather() past the end of the chain");
844 this->
crtBuf_->gather(offset + n);
845 this->crtBegin_ = this->
crtBuf_->data();
847 this->
crtPos_ = this->crtBegin_ + offset;
853 this->
crtBuf_->gather(offset + size);
854 this->crtBegin_ = this->
crtBuf_->data();
856 this->
crtPos_ = this->crtBegin_ + offset;
872 size_t available = this->
length();
873 if (
LIKELY(available >= len)) {
877 memcpy(writableData(), buf, len);
885 memcpy(writableData(), buf, available);
895 void insert(std::unique_ptr<folly::IOBuf> buf) {
903 std::unique_ptr<folly::IOBuf> remaining;
906 remaining = this->
crtBuf_->cloneOne();
908 nextBuf = remaining.get();
912 nextBuf = this->
crtBuf_->next();
918 if (nextBuf == this->
buffer_) {
921 this->crtBegin_ = this->
crtBuf_->data();
944 this->crtBegin_ = this->
crtBuf_->data();
946 this->
crtPos_ = this->crtBegin_ + offset;
947 maybeShared_ =
false;
975 return crtBuf_->writableTail();
1002 throw_exception<std::out_of_range>(
"can't grow buffer chain");
1030 size_t available =
length();
1031 if (
LIKELY(available >= len)) {
1032 memcpy(writableData(), buf, len);
1034 return copied + len;
1037 memcpy(writableData(), buf, available);
1039 copied += available;
1073 void vprintf(
const char* fmt, va_list ap);
1109 : queueCache_(queue), growth_(growth) {}
1112 queueCache_.reset(queue);
1117 return queueCache_.writableData();
1121 return queueCache_.length();
1125 queueCache_.append(n);
1141 queueCache_.appendUnsafe(
sizeof(
T));
1143 writeSlow<T>(
value);
1151 if (copyLength != 0) {
1152 memcpy(writableData(), buf, copyLength);
1153 queueCache_.appendUnsafe(copyLength);
1156 size_t remaining = len - copyLength;
1158 while (remaining != 0) {
1159 auto p = queueCache_.queue()->preallocate(
1160 std::min(remaining, growth_), growth_, remaining);
1161 memcpy(p.first, buf, p.second);
1162 queueCache_.queue()->postallocate(p.second);
1164 remaining -= p.second;
1169 void insert(std::unique_ptr<folly::IOBuf> buf) {
1171 queueCache_.queue()->append(
std::move(buf),
true);
1176 insert(buf.
clone());
1184 queueCache_.queue()->preallocate(n, growth_);
1185 queueCache_.fillCache();
1191 queueCache_.queue()->preallocate(
sizeof(
T), growth_);
1192 queueCache_.fillCache();
1195 queueCache_.appendUnsafe(
sizeof(
T));
size_t pushAtMost(ByteRange buf)
const uint8_t * data() const
#define FOLLY_PRINTF_FORMAT
size_t pullAtMostSlow(void *buf, size_t len)
static std::unique_ptr< IOBuf > create(std::size_t capacity)
void insert(const folly::IOBuf &buf)
void clone(std::unique_ptr< folly::IOBuf > &buf, size_t len)
std::enable_if< std::is_arithmetic< T >::value >::type write(T value)
void gatherAtMost(size_t n)
std::enable_if< std::is_arithmetic< T >::value, bool >::type tryRead(T &val)
std::enable_if< std::is_arithmetic< T >::value >::type write(T value)
constexpr detail::Map< Move > move
const uint8_t * tail() const
Derived operator+(size_t offset) const
void append(std::unique_ptr< IOBuf > &buf, StringPiece str)
constexpr size_type size() const
void reset(IOBufQueue *queue, std::size_t growth)
const uint8_t * data() const
void push(const uint8_t *buf, size_t len)
size_t pushAtMost(const uint8_t *buf, size_t len)
size_t pullAtMost(void *buf, size_t len)
—— Concurrent Priority Queue Implementation ——
std::unique_ptr< IOBuf > clone() const
void insert(std::unique_ptr< folly::IOBuf > buf)
Appender(IOBuf *buf, std::size_t growth)
FOLLY_NOINLINE void ensureSlow(size_t n)
bool operator==(const Derived &other) const
void pull(void *buf, size_t len)
void insert(std::unique_ptr< folly::IOBuf > buf)
Cursor(const detail::CursorBase< OtherDerived, OtherBuf > &cursor)
std::pair< const uint8_t *, size_t > peek()
std::string readFixedString(size_t len)
Derived operator-(size_t offset) const
void skipNoAdvance(size_t len)
constexpr auto size(C const &c) -> decltype(c.size())
void skipWhile(const Predicate &predicate)
size_t getCurrentPosition() const
size_t cloneAtMost(std::unique_ptr< folly::IOBuf > &buf, size_t len)
std::enable_if< std::is_arithmetic< T >::value >::type FOLLY_NOINLINE writeSlow(T value)
Derived & operator+=(size_t offset)
size_t retreatAtMost(size_t len)
void retreatSlow(size_t len)
auto end(TestAdlIterable &instance)
constexpr Iter data() const
std::size_t length() const
size_t skipAtMost(size_t len)
void ensure(std::size_t n)
QueueAppender(IOBufQueue *queue, std::size_t growth)
CursorBase(const CursorBase< OtherDerived, OtherBuf > &cursor)
RWCursor< CursorAccess::UNSHARE > RWUnshareCursor
void storeUnaligned(void *p, T value)
size_t pushAtMost(Cursor cursor, size_t len)
static const char *const value
void push(Cursor cursor, size_t len)
const uint8_t * crtBegin_
RWCursor< CursorAccess::PRIVATE > RWPrivateCursor
void clone(folly::IOBuf &buf, size_t len)
void prependChain(std::unique_ptr< IOBuf > &&iobuf)
Derived & operator-=(size_t offset)
std::size_t computeChainDataLength() const
size_t operator-(const BufType *buf) const
size_t pushAtMost(const uint8_t *buf, size_t len)
void readFixedStringSlow(std::string *str, size_t len)
std::string readWhile(const Predicate &predicate)
FOLLY_NOINLINE T readSlow()
size_t skipAtMostSlow(size_t len)
size_t retreatAtMostSlow(size_t len)
RWCursor(const detail::CursorBase< OtherDerived, OtherBuf > &cursor)
void advanceBufferIfEmpty()
Range< const unsigned char * > ByteRange
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
void trimStart(std::size_t amount)
size_t pushAtMost(const uint8_t *buf, size_t len)
void dcheckIntegrity() const
#define FOLLY_PRINTF_FORMAT_ATTR(format_param, dots_param)
std::string readTerminatedString(char termChar= '\0', size_t maxLength=std::numeric_limits< size_t >::max())
void trimEnd(std::size_t amount)
void pullSlow(void *buf, size_t len)
size_t totalLength() const
void operator()(StringPiece sp)
size_t operator-(const CursorBase &other) const
void skipSlow(size_t len)
bool canAdvance(size_t amount) const
bool operator!=(const Derived &other) const
size_t cloneAtMost(folly::IOBuf &buf, size_t len)