proxygen
HTTP2PriorityQueue.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
10 #pragma once
11 
12 #include <folly/IntrusiveList.h>
17 
18 #include <list>
19 #include <deque>
20 #include <boost/intrusive/unordered_set.hpp>
21 
22 namespace proxygen {
23 
24 class HTTPTransaction;
25 
26 
28  public:
29  class BaseNode {
30  public:
31  virtual ~BaseNode() {}
32  virtual bool isEnqueued() const = 0;
33  virtual uint64_t calculateDepth(bool includeVirtual = true) const = 0;
34  };
35 
36  using Handle = BaseNode*;
37 
40  HTTPTransaction *txn, bool permanent = false,
41  uint64_t* depth = nullptr) = 0;
42 
43  // update the priority of an existing node
44  virtual Handle updatePriority(
45  Handle handle,
47  uint64_t* depth = nullptr) = 0;
48 
49  // Remove the transaction from the priority tree
50  virtual void removeTransaction(Handle handle) = 0;
51 
52  // Notify the queue when a transaction has egress
53  virtual void signalPendingEgress(Handle h) = 0;
54 
55  // Notify the queue when a transaction no longer has egress
56  virtual void clearPendingEgress(Handle h) = 0;
57 };
58 
60 
61  private:
62  class Node;
63  using NodeMap = boost::intrusive::unordered_set<
64  Node, boost::intrusive::constant_time_size<false>>;
65 
66  static const size_t kNumBuckets = 100;
67 
68  public:
69 
71  : nodes_(NodeMap::bucket_traits(nodeBuckets_, kNumBuckets)) {
72  root_.setPermanent();
73  }
74 
75  explicit HTTP2PriorityQueue(const WheelTimerInstance& timeout)
76  : nodes_(NodeMap::bucket_traits(nodeBuckets_, kNumBuckets)),
77  timeout_(timeout) {
78  root_.setPermanent();
79  }
80 
81  void attachThreadLocals(const WheelTimerInstance& timeout);
82 
83  void detachThreadLocals();
84 
85  void setMaxVirtualNodes(uint32_t maxVirtualNodes) {
86  maxVirtualNodes_ = maxVirtualNodes;
87  }
88 
89  // Notify the queue when a transaction has egress
90  void signalPendingEgress(Handle h) override;
91 
92  // Notify the queue when a transaction no longer has egress
93  void clearPendingEgress(Handle h) override;
94 
96  HTTPCodec::StreamID parent) override{
97  addTransaction(id, {(uint32_t)parent, false, 0}, nullptr, true);
98  }
99 
100  void addOrUpdatePriorityNode(HTTPCodec::StreamID id,
102 
104  root_.dropPriorityNodes();
105  }
106 
107  // adds new transaction (possibly nullptr) to the priority tree
109  HTTPTransaction *txn, bool permanent = false,
110  uint64_t* depth = nullptr) override;
111 
112  // update the priority of an existing node
114  Handle handle,
116  uint64_t* depth = nullptr) override;
117 
118  // Remove the transaction from the priority tree
119  void removeTransaction(Handle handle) override;
120 
121  // Returns true if there are no transaction with pending egress
122  bool empty() const {
123  return activeCount_ == 0;
124  }
125 
126  // The number with pending egress
128  return activeCount_;
129  }
130 
132  return numVirtualNodes_;
133  }
134 
135  void iterate(const std::function<bool(HTTPCodec::StreamID,
136  HTTPTransaction *, double)>& fn,
137  const std::function<bool()>& stopFn, bool all) {
138  updateEnqueuedWeight();
139  root_.iterate(fn, stopFn, all);
140  }
141 
142  // stopFn is only evaluated once per level
143  void iterateBFS(const std::function<bool(HTTP2PriorityQueue&,
145  HTTPTransaction *, double)>& fn,
146  const std::function<bool()>& stopFn, bool all);
147 
148  using NextEgressResult = std::vector<std::pair<HTTPTransaction*, double>>;
149 
150  void nextEgress(NextEgressResult& result, bool spdyMode = false);
151 
152  static void setNodeLifetime(std::chrono::milliseconds lifetime) {
153  kNodeLifetime_ = lifetime;
154  }
155 
157  // Rebuilds tree by making all non-root nodes direct children of the root and
158  // weight reset to the default 16
159  void rebuildTree();
160  uint32_t getRebuildCount() const { return rebuildCount_; }
161  bool isRebuilt() const { return rebuildCount_ > 0; }
162 
163 
164  private:
165  // Find the node in priority tree
166  Node* find(HTTPCodec::StreamID id, uint64_t* depth = nullptr);
167 
169  if (id == 0) {
170  return &root_;
171  }
172  return find(id);
173  }
174 
175  bool allowDanglingNodes() const {
176  return timeout_ && kNodeLifetime_.count() > 0;
177  }
178 
180  if (timeout_) {
181  VLOG(5) << "scheduling expiration for node=" << node->getID();
182  DCHECK_GT(kNodeLifetime_.count(), 0);
183  timeout_.scheduleTimeout(node, kNodeLifetime_);
184  }
185  }
186 
187  static bool nextEgressResult(HTTP2PriorityQueue& queue,
189  double r);
190 
191  void updateEnqueuedWeight();
192 
193  private:
194  typedef boost::intrusive::link_mode<boost::intrusive::auto_unlink> link_mode;
195 
196  class Node : public BaseNode,
198  public boost::intrusive::unordered_set_base_hook<link_mode> {
199  public:
200 
201  static const uint16_t kDefaultWeight = 16;
202 
203  Node(HTTP2PriorityQueue& queue, Node* inParent, HTTPCodec::StreamID id,
204  uint8_t weight, HTTPTransaction *txn);
205 
206  ~Node() override;
207 
208  // Functor comparing id to node and vice-versa
209  struct IdNodeEqual {
210  bool operator()(const HTTPCodec::StreamID& id, const Node& node) {
211  return id == node.id_;
212  }
213  bool operator()(const Node& node, const HTTPCodec::StreamID& id) {
214  return node.id_ == id;
215  }
216  };
217 
218  // Hash function
219  struct IdHash {
220  size_t operator()(const HTTPCodec::StreamID& id) const {
221  return boost::hash<HTTPCodec::StreamID>()(id);
222  }
223  };
224 
225  // Equality and hash operators (for intrusive set)
226  friend bool operator==(const Node& lhs, const Node& rhs) {
227  return lhs.id_ == rhs.id_;
228  }
229  friend std::size_t hash_value(const Node& node) {
230  return IdHash()(node.id_);
231  }
232 
233  void setPermanent() {
234  isPermanent_ = true;
235  }
236 
237  Node* getParent() const {
238  return parent_;
239  }
240 
242  return id_;
243  }
244 
246  if (parent_) {
247  return parent_->id_;
248  }
249  return 0;
250  }
251 
253  return txn_;
254  }
255 
257  txn_ = nullptr;
258  }
259 
260  // Add a new node as a child of this node
261  Node* emplaceNode(std::unique_ptr<Node> node, bool exclusive);
262 
263  // Removes the node from the tree
264  void removeFromTree();
265 
266  void signalPendingEgress();
267 
268  void clearPendingEgress();
269 
270  uint16_t getWeight() const {
271  return weight_;
272  }
273 
274  // Set a new weight for this node
275  void updateWeight(uint8_t weight);
276 
277  Node* reparent(Node* newParent, bool exclusive);
278 
279  // Returns true if this is a descendant of node
280  bool isDescendantOf(Node *node) const;
281 
282  // True if this Node is in the egress queue
283  bool isEnqueued() const override {
284  return (txn_ != nullptr && enqueued_);
285  }
286 
287  // True if this Node is in the egress tree even if the node itself is
288  // virtual but has enqueued descendants.
289  bool inEgressTree() const {
290  return isEnqueued() || totalEnqueuedWeight_ > 0;
291  }
292 
293  double getRelativeWeight() const {
294  if (!parent_) {
295  return 1.0;
296  }
297 
298  return static_cast<double>(weight_) / parent_->totalChildWeight_;
299  }
300 
301  double getRelativeEnqueuedWeight() const {
302  if (!parent_) {
303  return 1.0;
304  }
305 
306  if (parent_->totalEnqueuedWeight_ == 0) {
307  return 0.0;
308  }
309 
310  return static_cast<double>(weight_) / parent_->totalEnqueuedWeight_;
311  }
312 
313  /* Execute the given function on this node and all child nodes presently
314  * enqueued, until one of them asks to stop, or the stop function returns
315  * true.
316  *
317  * The all parameter visits every node, even the ones not currently
318  * enqueued.
319  *
320  * The arguments to the function are
321  * txn - HTTPTransaction for the node
322  * ratio - weight of this txn relative to all peers (not just enequeued)
323  */
324  bool iterate(const std::function<bool(HTTPCodec::StreamID,
325  HTTPTransaction *, double)>& fn,
326  const std::function<bool()>& stopFn, bool all);
327 
328  struct PendingNode {
331  double ratio;
333  id(i), node(n), ratio(r) {}
334  };
335 
336  using PendingList = std::deque<PendingNode>;
337  bool visitBFS(double relativeParentWeight,
338  const std::function<bool(HTTP2PriorityQueue& queue,
340  HTTPTransaction *, double)>& fn,
341  bool all,
342  PendingList& pendingNodes, bool enqueuedChildren);
343 
344  void updateEnqueuedWeight(bool activeNodes);
345 
346  void dropPriorityNodes();
347 
348  void convertVirtualNode(HTTPTransaction* txn);
349 
350  uint64_t calculateDepth(bool includeVirtual = true) const override;
351 
352  // Internal error recovery
353  void flattenSubtree();
354  void flattenSubtreeDFS(Node* subtreeRoot);
355  static void addChildToNewSubtreeRoot(std::unique_ptr<Node> child,
356  Node* subtreeRoot);
357 
358  private:
359  Node* addChild(std::unique_ptr<Node> child);
360 
361  void addChildren(std::list<std::unique_ptr<Node>>&& children);
362 
363  std::unique_ptr<Node> detachChild(Node* node);
364 
365  void addEnqueuedChild(HTTP2PriorityQueue::Node* node);
366 
367  void removeEnqueuedChild(HTTP2PriorityQueue::Node* node);
368 
369  static void propagatePendingEgressSignal(Node *node);
370 
371  static void propagatePendingEgressClear(Node* node);
372 
373  void timeoutExpired() noexcept override {
374  VLOG(5) << "Node=" << id_ << " expired";
375  CHECK(txn_ == nullptr);
376  queue_.pendingWeightChange_ = true;
377  removeFromTree();
378  }
379 
380  void refreshTimeout() {
381  if (!txn_ && !isPermanent_ && isScheduled()) {
382  queue_.scheduleNodeExpiration(this);
383  }
384  }
385 
387  Node *parent_{nullptr};
389  uint16_t weight_{kDefaultWeight};
390  HTTPTransaction *txn_{nullptr};
391  bool isPermanent_{false};
392  bool enqueued_{false};
393 #ifndef NDEBUG
394  uint64_t totalEnqueuedWeightCheck_{0};
395 #endif
396  uint64_t totalEnqueuedWeight_{0};
397  uint64_t totalChildWeight_{0};
398  std::list<std::unique_ptr<Node>> children_;
399  std::list<std::unique_ptr<Node>>::iterator self_;
400  // enqueuedChildren_ includes all children that are themselves enqueued_
401  // or have enqueued descendants. Therefore, enqueuedChildren_ may contain
402  // direct children that have enqueued_ == false
405  };
406 
407  typename NodeMap::bucket_type nodeBuckets_[kNumBuckets];
409  Node root_{*this, nullptr, 0, 1, nullptr};
410  uint32_t rebuildCount_{0};
412  uint64_t activeCount_{0};
413  uint32_t maxVirtualNodes_{50};
414  uint32_t numVirtualNodes_{0};
415  bool pendingWeightChange_{false};
417 
418  NextEgressResult* nextEgressResults_{nullptr};
419  static std::chrono::milliseconds kNodeLifetime_;
420 };
421 
422 }
std::deque< PendingNode > PendingList
void addPriorityNode(HTTPCodec::StreamID id, HTTPCodec::StreamID parent) override
boost::intrusive::unordered_set< Node, boost::intrusive::constant_time_size< false >> NodeMap
*than *hazptr_holder h
Definition: Hazptr.h:116
void setMaxVirtualNodes(uint32_t maxVirtualNodes)
void scheduleNodeExpiration(Node *node)
size_t operator()(const HTTPCodec::StreamID &id) const
Node * findInternal(HTTPCodec::StreamID id)
std::vector< std::pair< HTTPTransaction *, double >> NextEgressResult
virtual Handle updatePriority(Handle handle, http2::PriorityUpdate pri, uint64_t *depth=nullptr)=0
std::list< std::unique_ptr< Node > >::iterator self_
folly::IntrusiveList< Node,&Node::enqueuedHook_ > enqueuedChildren_
virtual uint64_t calculateDepth(bool includeVirtual=true) const =0
requires E e noexcept(noexcept(s.error(std::move(e))))
FOLLY_PUSH_WARNING RHS rhs
Definition: Traits.h:649
HTTPCodec::StreamID getID() const
virtual void removeTransaction(Handle handle)=0
friend std::size_t hash_value(const Node &node)
virtual Handle addTransaction(HTTPCodec::StreamID id, http2::PriorityUpdate pri, HTTPTransaction *txn, bool permanent=false, uint64_t *depth=nullptr)=0
boost::intrusive::list_member_hook< boost::intrusive::link_mode< boost::intrusive::auto_unlink >> IntrusiveListHook
Definition: IntrusiveList.h:32
folly::IntrusiveListHook enqueuedHook_
Encoder::MutableCompressedList list
bool operator()(const HTTPCodec::StreamID &id, const Node &node)
void iterate(const std::function< bool(HTTPCodec::StreamID, HTTPTransaction *, double)> &fn, const std::function< bool()> &stopFn, bool all)
PendingNode(HTTPCodec::StreamID i, Node *n, double r)
static std::chrono::milliseconds kNodeLifetime_
HTTPCodec::StreamID parentID() const
bool operator()(const Node &node, const HTTPCodec::StreamID &id)
HTTPTransaction * getTransaction() const
boost::intrusive::list< T, boost::intrusive::member_hook< T, IntrusiveListHook, PtrToMember >, boost::intrusive::constant_time_size< false >> IntrusiveList
Definition: IntrusiveList.h:68
HTTP2PriorityQueue(const WheelTimerInstance &timeout)
Node(int v=0, Node *n=nullptr, bool=false) noexcept
Definition: HazptrTest.cpp:103
uint64_t StreamID
Definition: HTTPCodec.h:49
static void setNodeLifetime(std::chrono::milliseconds lifetime)
folly::Function< void()> child
Definition: AtFork.cpp:35
friend bool operator==(const Node &lhs, const Node &rhs)
void timeoutExpired() noexceptoverride
virtual void clearPendingEgress(Handle h)=0
boost::intrusive::link_mode< boost::intrusive::auto_unlink > link_mode
folly::Function< void()> parent
Definition: AtFork.cpp:34
std::list< std::unique_ptr< Node > > children_
Composed all(Predicate pred=Predicate())
Definition: Base.h:786
virtual void signalPendingEgress(Handle h)=0