13 using std::unique_ptr;
19 std::chrono::seconds(30);
44 unique_ptr<HTTP2PriorityQueue::Node> node,
bool exclusive) {
45 CHECK(!node->isEnqueued());
46 list<unique_ptr<Node>> children;
47 CHECK_NE(
id_, node->id_) <<
"Tried to create a loop in the tree";
68 list<unique_ptr<Node>> emptyChilden;
70 for (
auto&
child: children) {
71 if (
child->inEgressTree()) {
72 totalEnqueuedWeight +=
child->weight_;
73 child->parent_->removeEnqueuedChild(
child.get());
74 CHECK(!
child->enqueuedHook_.is_linked());
77 CHECK(!
child->enqueuedHook_.is_linked());
82 if (totalEnqueuedWeight > 0) {
92 unique_ptr<HTTP2PriorityQueue::Node>
child) {
93 CHECK_NE(
id_, child->id_) <<
"Tried to create a loop in the tree";
94 child->parent_ =
this;
96 Node* raw = child.get();
102 unique_ptr<HTTP2PriorityQueue::Node>
106 auto it = node->
self_;
126 if (wasInEgressTree) {
135 if (wasInEgressTree) {
148 if (cur->
id_ == node->
id_) {
185 while (parent && !stop) {
209 while (parent && !stop) {
223 weight_ = weight + 1;
239 CHECK_LE(newWeight, 256);
264 const std::function<
bool()>& stopFn,
bool all) {
276 if (stop || stopFn()) {
279 stop =
child->iterate(fn, stopFn, all);
286 double relativeParentWeight,
290 PendingList& pendingNodes,
bool enqueuedChildren) {
300 double newRelWeight = relativeParentWeight * relativeEnqueuedWeight;
301 if (enqueuedChildren) {
305 pendingNodes.emplace_back(
child->id_, &(*
child), newRelWeight);
309 pendingNodes.emplace_back(
child->id_,
child.get(), newRelWeight);
316 relativeParentWeight * relativeEnqueuedWeight)) {
328 child->updateEnqueuedWeight(activeNodes);
347 child->dropPriorityNodes();
367 const Node* cur =
this;
369 if (cur->
txn_ || includeVirtual) {
379 std::list<std::unique_ptr<Node>> oldChildren_;
383 for (
auto&
child : oldChildren_) {
384 child->flattenSubtreeDFS(
this);
396 [
this](
const std::unique_ptr<Node>&
child) {
398 if (
child->enqueued_) {
411 child->flattenSubtreeDFS(subtreeRoot);
419 child->children_.clear();
420 child->parent_ = subtreeRoot;
422 child->totalChildWeight_ = 0;
423 child->totalEnqueuedWeight_ = 0;
425 child->totalEnqueuedWeightCheck_ = 0;
427 Node* raw = child.get();
446 auto handle =
find(
id);
449 CHECK(handle->getTransaction() ==
nullptr);
465 CHECK(!txn || !permanent);
466 Node *existingNode =
find(
id, depth);
486 if (dep ==
nullptr) {
488 VLOG(4) <<
"assigning default priority to txn=" << id;
494 parent =
dynamic_cast<Node*
>(
500 CHECK_NOTNULL(parent);
505 VLOG(4) <<
"Virtual node limit reached, ignoring stream dependency " 515 VLOG(4) <<
"Adding id=" <<
id <<
" with parent=" << parent->
getID() <<
517 auto node = std::make_unique<Node>(*
this,
parent, id, pri.
weight, txn);
519 node->setPermanent();
532 Node* node = CHECK_NOTNULL(dynamic_cast<HTTP2PriorityQueue::Node*>(handle));
534 VLOG(4) <<
"Updating id=" << node->
getID() <<
" with parent=" <<
538 "Tried to create a loop in the tree";
554 newParent =
dynamic_cast<Node*
>(
561 CHECK_NOTNULL(newParent);
562 VLOG(4) <<
"updatePriority missing parent, creating virtual parent=" 563 << newParent->
getID() <<
" for txn=" << node->
getID();
579 Node* node = CHECK_NOTNULL(dynamic_cast<HTTP2PriorityQueue::Node*>(handle));
590 VLOG(5) <<
"Deleting dangling node over max id=" << node->
getID();
598 CHECK_NOTNULL(dynamic_cast<HTTP2PriorityQueue::Node*>(handle))
599 ->signalPendingEgress();
609 CHECK_NOTNULL(dynamic_cast<HTTP2PriorityQueue::Node*>(handle))
610 ->clearPendingEgress();
619 const std::function<
bool()>& stopFn,
bool all) {
625 while (!stop && !stopFn() && !pendingNodes.empty()) {
626 CHECK(newPendingNodes.empty());
627 while (!stop && !pendingNodes.empty()) {
630 stop = node->
visitBFS(pendingNodes.front().ratio, fn,
all,
631 newPendingNodes,
false );
633 pendingNodes.pop_front();
635 std::swap(pendingNodes, newPendingNodes);
651 bool operator()(
const std::pair<HTTPTransaction*, double>& t1,
652 const std::pair<HTTPTransaction*, double>& t2) {
653 return t1.second > t2.second;
663 pendingNodes.emplace_back(0, &
root_, 1.0);
666 while (!stop && !pendingNodes.empty()) {
667 Node* node = pendingNodes.front().node;
670 false, pendingNodesTmp,
673 pendingNodes.pop_front();
677 if (spdyMode && !result.empty() && !pendingNodesTmp.empty()) {
678 double totalRatio = 0;
679 for (
auto &txnPair: result) {
680 totalRatio += txnPair.second;
682 CHECK_GT(totalRatio, 0);
683 for (
auto &txnPair: result) {
684 txnPair.second = txnPair.second / totalRatio;
688 std::swap(pendingNodes, pendingNodesTmp);
689 }
while (!stop && !pendingNodes.empty());
690 std::sort(result.begin(), result.end(), WeightCmp());
704 *depth = it->calculateDepth();
Node * emplaceNode(std::unique_ptr< Node > node, bool exclusive)
std::deque< PendingNode > PendingList
void convertVirtualNode(HTTPTransaction *txn)
Node * addChild(std::unique_ptr< Node > child)
void signalPendingEgress()
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::invoke_fn invoke
uint32_t maxVirtualNodes_
uint32_t streamDependency
uint32_t numVirtualNodes_
void scheduleNodeExpiration(Node *node)
Node * findInternal(HTTPCodec::StreamID id)
void flattenSubtreeDFS(Node *subtreeRoot)
std::vector< std::pair< HTTPTransaction *, double >> NextEgressResult
uint64_t calculateDepth(bool includeVirtual=true) const override
WheelTimerInstance timeout_
constexpr detail::Map< Move > move
HTTP2PriorityQueue & queue_
std::list< std::unique_ptr< Node > >::iterator self_
void updateEnqueuedWeight(bool activeNodes)
folly::IntrusiveList< Node,&Node::enqueuedHook_ > enqueuedChildren_
bool iterate(const std::function< bool(HTTPCodec::StreamID, HTTPTransaction *, double)> &fn, const std::function< bool()> &stopFn, bool all)
static const uint16_t kDefaultWeight
virtual uint64_t calculateDepth(bool includeVirtual=true) const =0
void rebuildTree()
Error handling code.
void addChildren(std::list< std::unique_ptr< Node >> &&children)
void attachThreadLocals(const WheelTimerInstance &timeout)
class HTTP2PriorityQueue
static void addChildToNewSubtreeRoot(std::unique_ptr< Node > child, Node *subtreeRoot)
std::unique_ptr< Node > detachChild(Node *node)
bool visitBFS(double relativeParentWeight, const std::function< bool(HTTP2PriorityQueue &queue, HTTPCodec::StreamID, HTTPTransaction *, double)> &fn, bool all, PendingList &pendingNodes, bool enqueuedChildren)
HTTPCodec::StreamID getID() const
bool isDescendantOf(Node *node) const
static uint32_t kMaxRebuilds_
virtual bool isEnqueued() const =0
Node * reparent(Node *newParent, bool exclusive)
folly::IntrusiveListHook enqueuedHook_
void updateEnqueuedWeight()
Encoder::MutableCompressedList list
static std::chrono::milliseconds kNodeLifetime_
HTTPCodec::StreamID parentID() const
Handle addTransaction(HTTPCodec::StreamID id, http2::PriorityUpdate pri, HTTPTransaction *txn, bool permanent=false, uint64_t *depth=nullptr) override
uint64_t totalChildWeight_
void clearPendingEgress()
Node * find(HTTPCodec::StreamID id, uint64_t *depth=nullptr)
const PriorityUpdate DefaultPriority
void detachThreadLocals()
bool isEnqueued() const override
static bool nextEgressResult(HTTP2PriorityQueue &queue, HTTPCodec::StreamID id, HTTPTransaction *txn, double r)
double getRelativeEnqueuedWeight() const
void addEnqueuedChild(HTTP2PriorityQueue::Node *node)
void clearPendingEgress(Handle h) override
Handle updatePriority(Handle handle, http2::PriorityUpdate pri, uint64_t *depth=nullptr) override
bool pendingWeightChange_
void signalPendingEgress(Handle h) override
void iterateBFS(const std::function< bool(HTTP2PriorityQueue &, HTTPCodec::StreamID, HTTPTransaction *, double)> &fn, const std::function< bool()> &stopFn, bool all)
bool allowDanglingNodes() const
static void propagatePendingEgressSignal(Node *node)
void nextEgress(NextEgressResult &result, bool spdyMode=false)
void addOrUpdatePriorityNode(HTTPCodec::StreamID id, http2::PriorityUpdate pri)
uint64_t totalEnqueuedWeightCheck_
folly::Function< void()> child
NextEgressResult * nextEgressResults_
uint64_t totalEnqueuedWeight_
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
void for_each(T const &range, Function< void(typename T::value_type const &) const > const &func)
void removeTransaction(Handle handle) override
double getRelativeWeight() const
static void propagatePendingEgressClear(Node *node)
Node(HTTP2PriorityQueue &queue, Node *inParent, HTTPCodec::StreamID id, uint8_t weight, HTTPTransaction *txn)
bool inEgressTree() const
folly::Function< void()> parent
void removeEnqueuedChild(HTTP2PriorityQueue::Node *node)
std::list< std::unique_ptr< Node > > children_
Composed all(Predicate pred=Predicate())
void updateWeight(uint8_t weight)