proxygen
HTTP2PriorityQueue.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
11 
12 using std::list;
13 using std::unique_ptr;
14 
15 namespace proxygen {
16 
18 std::chrono::milliseconds HTTP2PriorityQueue::kNodeLifetime_ =
19  std::chrono::seconds(30);
20 
22  HTTP2PriorityQueue::Node* inParent,
24  uint8_t weight, HTTPTransaction *txn)
25  : queue_(queue),
26  parent_(inParent),
27  id_(id),
28  weight_(weight + 1),
29  txn_(txn) {
30  DCHECK(queue_.nodes_.find(id_,
31  IdHash(), IdNodeEqual()) == queue_.nodes_.end());
32  queue_.nodes_.insert(*this);
33 }
34 
36  if (!txn_) {
38  }
39 }
40 
41 // Add a new node as a child of this node
44  unique_ptr<HTTP2PriorityQueue::Node> node, bool exclusive) {
45  CHECK(!node->isEnqueued());
46  list<unique_ptr<Node>> children;
47  CHECK_NE(id_, node->id_) << "Tried to create a loop in the tree";
48  if (exclusive) {
49  // this->children become new node's children
50  std::swap(children, children_);
52  bool wasInEgressTree = inEgressTree();
54 #ifndef NDEBUG
56 #endif
57  if (wasInEgressTree && !inEgressTree()) {
59  }
60  }
61  auto res = addChild(std::move(node));
62  res->addChildren(std::move(children));
63  return res;
64 }
65 
66 void
67 HTTP2PriorityQueue::Node::addChildren(list<unique_ptr<Node>>&& children) {
68  list<unique_ptr<Node>> emptyChilden;
69  uint64_t totalEnqueuedWeight = 0;
70  for (auto& child: children) {
71  if (child->inEgressTree()) {
72  totalEnqueuedWeight += child->weight_;
73  child->parent_->removeEnqueuedChild(child.get());
74  CHECK(!child->enqueuedHook_.is_linked());
75  addEnqueuedChild(child.get());
76  } else {
77  CHECK(!child->enqueuedHook_.is_linked());
78  }
80  }
81  std::swap(children, emptyChilden);
82  if (totalEnqueuedWeight > 0) {
83  if (!inEgressTree()) {
85  }
86  totalEnqueuedWeight_ += totalEnqueuedWeight;
87  }
88 }
89 
92  unique_ptr<HTTP2PriorityQueue::Node> child) {
93  CHECK_NE(id_, child->id_) << "Tried to create a loop in the tree";
94  child->parent_ = this;
95  totalChildWeight_ += child->weight_;
96  Node* raw = child.get();
97  raw->self_ = children_.insert(children_.end(), std::move(child));
98  cancelTimeout();
99  return raw;
100 }
101 
102 unique_ptr<HTTP2PriorityQueue::Node>
104  CHECK(!node->isEnqueued());
105  totalChildWeight_ -= node->weight_;
106  auto it = node->self_;
107  auto res = std::move(*node->self_);
108  children_.erase(it);
109  node->parent_ = nullptr;
110  if (children_.empty() && !txn_ && !isPermanent_) {
112  }
113  return res;
114 }
115 
118  bool exclusive) {
119  // Save enqueued_ and totalEnqueuedWeight_, clear them and restore
120  // after reparenting
121  bool wasInEgressTree = inEgressTree();
122  bool enqueued = enqueued_;
123  uint64_t totalEnqueuedWeight = totalEnqueuedWeight_;
125  enqueued_ = false;
126  if (wasInEgressTree) {
128  }
129 
130  auto self = parent_->detachChild(this);
131  (void)newParent->emplaceNode(std::move(self), exclusive);
132 
133  // Restore state
134  enqueued_ = enqueued;
135  if (wasInEgressTree) {
137  }
138  totalEnqueuedWeight_ += totalEnqueuedWeight;
139 
140  return this;
141 }
142 
143 // Returns true if this is a descendant of node
144 bool
146  Node* cur = parent_;
147  while (cur) {
148  if (cur->id_ == node->id_) {
149  return true;
150  }
151  cur = cur->parent_;
152  }
153  return false;
154 }
155 
156 // Here "enqueued" means enqueued or enqueued descendent - part of the
157 // nextEgress computation.
158 
159 void
161  CHECK(!node->enqueuedHook_.is_linked());
162  enqueuedChildren_.push_back(*node);
163 }
164 
165 void
167  CHECK(node->enqueuedHook_.is_linked());
168  enqueuedChildren_.erase(enqueuedChildren_.iterator_to(*node));
169 }
170 
171 void
173  enqueued_ = true;
175 }
176 
177 void
179  HTTP2PriorityQueue::Node *node) {
180  Node* parent = node->parent_;
181  bool stop = node->totalEnqueuedWeight_ > 0;
182  // Continue adding node->weight_ to parent_->totalEnqueuedWeight_ as
183  // long as node state changed from no-egress-in-subtree to
184  // egress-in-subtree
185  while (parent && !stop) {
186  stop = parent->inEgressTree();
187  parent->totalEnqueuedWeight_ += node->weight_;
188  parent->addEnqueuedChild(node);
189  node = parent;
190  parent = parent->parent_;
191  }
192 }
193 
194 void
196  CHECK(enqueued_);
197  enqueued_ = false;
199 }
200 
201 void
203  HTTP2PriorityQueue::Node* node) {
204  Node* parent = node->parent_;
205  bool stop = node->inEgressTree();
206  // Continue subtracting node->weight_ from parent_->totalEnqueuedWeight_
207  // as long as node state changes from egress-in-subtree to
208  // no-egress-in-subtree
209  while (parent && !stop) {
210  CHECK_GE(parent->totalEnqueuedWeight_, node->weight_);
211  parent->totalEnqueuedWeight_ -= node->weight_;
212  parent->removeEnqueuedChild(node);
213  stop = parent->inEgressTree();
214  node = parent;
215  parent = parent->parent_;
216  }
217 }
218 
219 // Set a new weight for this node
220 void
222  int16_t delta = weight - weight_ + 1;
223  weight_ = weight + 1;
224  parent_->totalChildWeight_ += delta;
225  if (inEgressTree()) {
226  parent_->totalEnqueuedWeight_ += delta;
227  }
228  refreshTimeout();
229 }
230 
231 // Removes the node from the tree
232 void
234  if (!children_.empty()) {
235  // update child weights so they sum to (approximately) this node's weight.
236  double r = double(weight_) / totalChildWeight_;
237  for (auto& child: children_) {
238  uint64_t newWeight = std::max(uint64_t(child->weight_ * r), uint64_t(1));
239  CHECK_LE(newWeight, 256);
240  child->updateWeight(uint8_t(newWeight) - 1);
241  }
242  }
243 
244  CHECK(!isEnqueued());
245  if (inEgressTree()) {
246  // Gah this is tricky.
247  // The children of this node are moving to this node's parent. We need the
248  // tree in a consistent state before calling addChildren, so mark the
249  // current node's totalEnqueuedWeight_ as 0 and propagate the clear upwards.
250  // addChildren will handle re-signalling egress.
253  }
254 
255  // move my children to my parent
257  (void)parent_->detachChild(this);
258 }
259 
260 bool
262  const std::function<bool(HTTPCodec::StreamID,
263  HTTPTransaction *, double)>& fn,
264  const std::function<bool()>& stopFn, bool all) {
265  bool stop = false;
266  if (stopFn()) {
267  return true;
268  }
269 #ifndef NDEBUG
271 #endif
272  if (parent_ /* exclude root */ && (all || isEnqueued())) {
273  stop = fn(id_, txn_, getRelativeWeight());
274  }
275  for (auto& child: children_) {
276  if (stop || stopFn()) {
277  return true;
278  }
279  stop = child->iterate(fn, stopFn, all);
280  }
281  return stop;
282 }
283 
284 bool
286  double relativeParentWeight,
287  const std::function<bool(HTTP2PriorityQueue& queue, HTTPCodec::StreamID,
288  HTTPTransaction *, double)>& fn,
289  bool all,
290  PendingList& pendingNodes, bool enqueuedChildren) {
291  bool invoke = (parent_ != nullptr && (all || isEnqueued()));
292  auto relativeEnqueuedWeight = getRelativeEnqueuedWeight();
293 
294 #ifndef NDEBUG
296 #endif
297  // Add children when all==true, or for any not invoked node with
298  // pending children
299  if (all || (!invoke && totalEnqueuedWeight_ > 0)) {
300  double newRelWeight = relativeParentWeight * relativeEnqueuedWeight;
301  if (enqueuedChildren) {
302  for (auto child = enqueuedChildren_.begin();
303  child != enqueuedChildren_.end();
304  child++) {
305  pendingNodes.emplace_back(child->id_, &(*child), newRelWeight);
306  }
307  } else {
308  for (auto& child: children_) {
309  pendingNodes.emplace_back(child->id_, child.get(), newRelWeight);
310  }
311  }
312  }
313 
314  // Invoke fn last in case it deletes this node
315  if (invoke && fn(queue_, id_, txn_,
316  relativeParentWeight * relativeEnqueuedWeight)) {
317  return true;
318  }
319 
320  return false;
321 }
322 
323 #ifndef NDEBUG
324 void
327  for (auto& child: children_) {
328  child->updateEnqueuedWeight(activeNodes);
329  }
330  if (activeNodes) {
331  if (totalEnqueuedWeightCheck_ == 0 && !isEnqueued()) {
332  // Must only be called with activeCount_ > 0, root cannot be dequeued
333  CHECK_NOTNULL(parent_)->totalEnqueuedWeightCheck_ -= weight_;
334  } else {
335  CHECK(parent_ == nullptr || enqueuedHook_.is_linked());
336  }
337  } else {
339  }
340 }
341 #endif
342 
343 void
345  for (auto it = children_.begin(); it != children_.end(); ) {
346  auto& child = *it++;
347  child->dropPriorityNodes();
348  }
349  if (!txn_ && !isPermanent_) {
350  removeFromTree();
351  }
352 }
353 
354 void
356  CHECK(!txn_);
357  CHECK(!isPermanent_);
358  CHECK_GT(queue_.numVirtualNodes_, 0);
360  txn_ = txn;
361  cancelTimeout();
362 }
363 
364 uint64_t
365 HTTP2PriorityQueue::Node::calculateDepth(bool includeVirtual) const {
366  uint64_t depth = 0;
367  const Node* cur = this;
368  while (cur->getParent() != nullptr) {
369  if (cur->txn_ || includeVirtual) {
370  depth += 1;
371  }
372  cur = cur->getParent();
373  }
374  return depth;
375 }
376 
377 void
379  std::list<std::unique_ptr<Node>> oldChildren_;
380  // Move the old children to a temporary list
381  std::swap(oldChildren_, children_);
382  // Reparent the children
383  for (auto& child : oldChildren_) {
384  child->flattenSubtreeDFS(this);
386  }
387  // Update the weights
389 #ifndef NDEBUG
391 #endif
392  totalChildWeight_ = 0;
394  children_.begin(),
395  children_.end(),
396  [this](const std::unique_ptr<Node>& child) {
397  totalChildWeight_ += child->weight_;
398  if (child->enqueued_) {
399  totalEnqueuedWeight_ += child->weight_;
400 #ifndef NDEBUG
401  totalEnqueuedWeightCheck_ += child->weight_;
402 #endif
403  }
404  }
405  );
406 }
407 
408 void
410  for (auto& child : children_) {
411  child->flattenSubtreeDFS(subtreeRoot);
413  }
414 }
415 
416 void
418  Node* subtreeRoot) {
419  child->children_.clear();
420  child->parent_ = subtreeRoot;
421  child->weight_ = kDefaultWeight;
422  child->totalChildWeight_ = 0;
423  child->totalEnqueuedWeight_ = 0;
424 #ifndef NDEBUG
425  child->totalEnqueuedWeightCheck_ = 0;
426 #endif
427  Node* raw = child.get();
428  raw->self_ = subtreeRoot->children_.insert(subtreeRoot->children_.end(),
429  std::move(child));
430 }
431 
434  timeout_ = timeout;
435 }
436 
438  // a bit harsh, we could cancel and reschedule the timeout
441 }
442 
443 void
445  http2::PriorityUpdate pri) {
446  auto handle = find(id);
447  if (handle) {
448  // already added
449  CHECK(handle->getTransaction() == nullptr);
450  updatePriority(handle, pri);
451  } else {
452  // brand new
453  addTransaction(id, pri, nullptr, false /* not permanent */);
454  }
455 }
456 
460  HTTPTransaction *txn,
461  bool permanent,
462  uint64_t* depth) {
463  CHECK_NE(id, 0);
464  CHECK_NE(id, pri.streamDependency) << "Tried to create a loop in the tree";
465  CHECK(!txn || !permanent);
466  Node *existingNode = find(id, depth);
467  if (existingNode) {
468  CHECK(!permanent);
469  existingNode->convertVirtualNode(CHECK_NOTNULL(txn));
470  updatePriority(existingNode, pri);
471  return existingNode;
472  }
473  if (!txn) {
475  return nullptr;
476  }
478  }
479 
480  Node* parent = &root_;
481  if (depth) {
482  *depth = 1;
483  }
484  if (pri.streamDependency != 0) {
485  Node* dep = find(pri.streamDependency, depth);
486  if (dep == nullptr) {
487  // specified a missing parent (timed out an idle node)?
488  VLOG(4) << "assigning default priority to txn=" << id;
489  // No point to try to instantiate one more virtual node
490  // if we already reached the virtual node limit
492  // The parent node hasn't arrived yet. For now setting
493  // its priority fields to default.
494  parent = dynamic_cast<Node*>(
497  nullptr,
498  permanent,
499  depth));
500  CHECK_NOTNULL(parent);
501  if (depth) {
502  *depth += 1;
503  }
504  } else {
505  VLOG(4) << "Virtual node limit reached, ignoring stream dependency "
506  << pri.streamDependency << " for new node ID " << id;
507  }
508  } else {
509  parent = dep;
510  if (depth) {
511  *depth += 1;
512  }
513  }
514  }
515  VLOG(4) << "Adding id=" << id << " with parent=" << parent->getID() <<
516  " and weight=" << ((uint16_t)pri.weight + 1);
517  auto node = std::make_unique<Node>(*this, parent, id, pri.weight, txn);
518  if (permanent) {
519  node->setPermanent();
520  } else if (!txn) {
521  scheduleNodeExpiration(node.get());
522  }
523  auto result = parent->emplaceNode(std::move(node), pri.exclusive);
524  pendingWeightChange_ = true;
525  return result;
526 }
527 
531  uint64_t* depth) {
532  Node* node = CHECK_NOTNULL(dynamic_cast<HTTP2PriorityQueue::Node*>(handle));
533  pendingWeightChange_ = true;
534  VLOG(4) << "Updating id=" << node->getID() << " with parent=" <<
535  pri.streamDependency << " and weight=" << ((uint16_t)pri.weight + 1);
536  node->updateWeight(pri.weight);
537  CHECK_NE(pri.streamDependency, node->getID()) <<
538  "Tried to create a loop in the tree";
539  if (pri.streamDependency == node->parentID() && !pri.exclusive) {
540  // no move
541  if (depth) {
542  *depth = handle->calculateDepth();
543  }
544  return handle;
545  }
546 
547  Node* newParent = find(pri.streamDependency, depth);
548  if (!newParent) {
550  newParent = &root_;
551  } else {
552  // allocate a virtual node for non-existing parent in my depenency tree
553  // then do normal priority processing
554  newParent = dynamic_cast<Node*>(
557  nullptr,
558  false,
559  depth));
560 
561  CHECK_NOTNULL(newParent);
562  VLOG(4) << "updatePriority missing parent, creating virtual parent="
563  << newParent->getID() << " for txn=" << node->getID();
564  }
565  }
566 
567  if (newParent->isDescendantOf(node)) {
568  newParent = newParent->reparent(node->getParent(), false);
569  }
570  node = node->reparent(newParent, pri.exclusive);
571  if (depth) {
572  *depth = node->calculateDepth();
573  }
574  return node;
575 }
576 
577 void
579  Node* node = CHECK_NOTNULL(dynamic_cast<HTTP2PriorityQueue::Node*>(handle));
580  pendingWeightChange_ = true;
581  // TODO: or require the node to do it?
582  if (node->isEnqueued()) {
583  clearPendingEgress(handle);
584  }
586  node->clearTransaction();
589  } else {
590  VLOG(5) << "Deleting dangling node over max id=" << node->getID();
591  node->removeFromTree();
592  }
593 }
594 
595 void
597  if (!handle->isEnqueued()) {
598  CHECK_NOTNULL(dynamic_cast<HTTP2PriorityQueue::Node*>(handle))
599  ->signalPendingEgress();
600  activeCount_++;
601  pendingWeightChange_ = true;
602  }
603 }
604 
605 void
607  CHECK_GT(activeCount_, 0);
608  // clear does a CHECK on handle->isEnqueued()
609  CHECK_NOTNULL(dynamic_cast<HTTP2PriorityQueue::Node*>(handle))
610  ->clearPendingEgress();
611  activeCount_--;
612  pendingWeightChange_ = true;
613 }
614 
615 void
617  const std::function<bool(HTTP2PriorityQueue&, HTTPCodec::StreamID,
618  HTTPTransaction *, double)>& fn,
619  const std::function<bool()>& stopFn, bool all) {
620  Node::PendingList pendingNodes{{0, &root_, 1.0}};
621  Node::PendingList newPendingNodes;
622  bool stop = false;
623 
625  while (!stop && !stopFn() && !pendingNodes.empty()) {
626  CHECK(newPendingNodes.empty());
627  while (!stop && !pendingNodes.empty()) {
628  Node* node = findInternal(pendingNodes.front().id);
629  if (node) {
630  stop = node->visitBFS(pendingNodes.front().ratio, fn, all,
631  newPendingNodes, false /* all children */);
632  }
633  pendingNodes.pop_front();
634  }
635  std::swap(pendingNodes, newPendingNodes);
636  }
637 }
638 
639 bool
642  HTTPTransaction* txn, double r) {
643  queue.nextEgressResults_->emplace_back(txn, r);
644  return false;
645 }
646 
647 void
649  bool spdyMode) {
650  struct WeightCmp {
651  bool operator()(const std::pair<HTTPTransaction*, double>& t1,
652  const std::pair<HTTPTransaction*, double>& t2) {
653  return t1.second > t2.second;
654  }
655  };
656 
657  result.reserve(activeCount_);
658  nextEgressResults_ = &result;
659 
661  Node::PendingList pendingNodes;
662  Node::PendingList pendingNodesTmp;
663  pendingNodes.emplace_back(0, &root_, 1.0);
664  bool stop = false;
665  do {
666  while (!stop && !pendingNodes.empty()) {
667  Node* node = pendingNodes.front().node;
668  if (node) {
669  stop = node->visitBFS(pendingNodes.front().ratio, nextEgressResult,
670  false, pendingNodesTmp,
671  true /* enqueued children */);
672  }
673  pendingNodes.pop_front();
674  }
675  // In SPDY mode, we stop as soon one level of the tree produces results,
676  // then normalize the ratios.
677  if (spdyMode && !result.empty() && !pendingNodesTmp.empty()) {
678  double totalRatio = 0;
679  for (auto &txnPair: result) {
680  totalRatio += txnPair.second;
681  }
682  CHECK_GT(totalRatio, 0);
683  for (auto &txnPair: result) {
684  txnPair.second = txnPair.second / totalRatio;
685  }
686  break;
687  }
688  std::swap(pendingNodes, pendingNodesTmp);
689  } while (!stop && !pendingNodes.empty());
690  std::sort(result.begin(), result.end(), WeightCmp());
691  nextEgressResults_ = nullptr;
692 }
693 
696  if (id == 0) {
697  return nullptr;
698  }
699  auto it = nodes_.find(id, Node::IdHash(), Node::IdNodeEqual());
700  if (it == nodes_.end()) {
701  return nullptr;
702  }
703  if (depth) {
704  *depth = it->calculateDepth();
705  }
706  return &(*it);
707 }
708 
709 void
711 #ifndef NDEBUG
712  if (pendingWeightChange_) {
714  pendingWeightChange_ = false;
715  }
716 #endif
717 }
718 
719 // Internal error handling
720 
721 void
723  CHECK_LE(rebuildCount_ + 1, kMaxRebuilds_);
725  rebuildCount_++;
726 }
727 
728 }
Node * emplaceNode(std::unique_ptr< Node > node, bool exclusive)
std::deque< PendingNode > PendingList
void convertVirtualNode(HTTPTransaction *txn)
Node * addChild(std::unique_ptr< Node > child)
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::invoke_fn invoke
LogLevel max
Definition: LogLevel.cpp:31
void scheduleNodeExpiration(Node *node)
Node * findInternal(HTTPCodec::StreamID id)
void flattenSubtreeDFS(Node *subtreeRoot)
std::vector< std::pair< HTTPTransaction *, double >> NextEgressResult
uint64_t calculateDepth(bool includeVirtual=true) const override
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::list< std::unique_ptr< Node > >::iterator self_
void updateEnqueuedWeight(bool activeNodes)
folly::IntrusiveList< Node,&Node::enqueuedHook_ > enqueuedChildren_
bool iterate(const std::function< bool(HTTPCodec::StreamID, HTTPTransaction *, double)> &fn, const std::function< bool()> &stopFn, bool all)
virtual uint64_t calculateDepth(bool includeVirtual=true) const =0
void rebuildTree()
Error handling code.
void addChildren(std::list< std::unique_ptr< Node >> &&children)
void attachThreadLocals(const WheelTimerInstance &timeout)
class HTTP2PriorityQueue
static void addChildToNewSubtreeRoot(std::unique_ptr< Node > child, Node *subtreeRoot)
std::unique_ptr< Node > detachChild(Node *node)
bool visitBFS(double relativeParentWeight, const std::function< bool(HTTP2PriorityQueue &queue, HTTPCodec::StreamID, HTTPTransaction *, double)> &fn, bool all, PendingList &pendingNodes, bool enqueuedChildren)
static void stop()
HTTPCodec::StreamID getID() const
Node * reparent(Node *newParent, bool exclusive)
folly::IntrusiveListHook enqueuedHook_
Encoder::MutableCompressedList list
static std::chrono::milliseconds kNodeLifetime_
HTTPCodec::StreamID parentID() const
Handle addTransaction(HTTPCodec::StreamID id, http2::PriorityUpdate pri, HTTPTransaction *txn, bool permanent=false, uint64_t *depth=nullptr) override
Node * find(HTTPCodec::StreamID id, uint64_t *depth=nullptr)
const PriorityUpdate DefaultPriority
Definition: HTTP2Framer.cpp:21
static bool nextEgressResult(HTTP2PriorityQueue &queue, HTTPCodec::StreamID id, HTTPTransaction *txn, double r)
void addEnqueuedChild(HTTP2PriorityQueue::Node *node)
void clearPendingEgress(Handle h) override
Handle updatePriority(Handle handle, http2::PriorityUpdate pri, uint64_t *depth=nullptr) override
void signalPendingEgress(Handle h) override
void iterateBFS(const std::function< bool(HTTP2PriorityQueue &, HTTPCodec::StreamID, HTTPTransaction *, double)> &fn, const std::function< bool()> &stopFn, bool all)
static void propagatePendingEgressSignal(Node *node)
void nextEgress(NextEgressResult &result, bool spdyMode=false)
uint64_t StreamID
Definition: HTTPCodec.h:49
void addOrUpdatePriorityNode(HTTPCodec::StreamID id, http2::PriorityUpdate pri)
folly::Function< void()> child
Definition: AtFork.cpp:35
NextEgressResult * nextEgressResults_
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
Definition: F14TestUtil.h:414
void for_each(T const &range, Function< void(typename T::value_type const &) const > const &func)
void removeTransaction(Handle handle) override
static void propagatePendingEgressClear(Node *node)
Node(HTTP2PriorityQueue &queue, Node *inParent, HTTPCodec::StreamID id, uint8_t weight, HTTPTransaction *txn)
folly::Function< void()> parent
Definition: AtFork.cpp:34
void removeEnqueuedChild(HTTP2PriorityQueue::Node *node)
std::list< std::unique_ptr< Node > > children_
Composed all(Predicate pred=Predicate())
Definition: Base.h:786