proxygen
ThreadCachedLists.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <atomic>
20 
21 #include <folly/Function.h>
22 #include <folly/Synchronized.h>
23 #include <folly/ThreadLocal.h>
24 #include <glog/logging.h>
25 
26 namespace folly {
27 
28 namespace detail {
29 
30 // This is a thread-local cached, multi-producer single-consumer
31 // queue, similar to a concurrent version of std::list.
32 //
34  public:
35  struct Node {
37  Node* next_{nullptr};
38  };
39 };
40 
41 template <typename Tag>
43  public:
44  struct AtomicListHead {
45  std::atomic<Node*> tail_{nullptr};
46  std::atomic<Node*> head_{nullptr};
47  };
48 
49  // Non-concurrent list, similar to std::list.
50  struct ListHead {
51  Node* head_{nullptr};
52  Node* tail_{nullptr};
53 
54  // Run func on each list node.
55  template <typename Func>
56  void forEach(Func func) {
57  auto node = tail_;
58  while (node != nullptr) {
59  auto next = node->next_;
60  func(node);
61  node = next;
62  }
63  }
64 
65  // Splice other in to this list.
66  // Afterwards, other is a valid empty listhead.
67  void splice(ListHead& other);
68 
69  void splice(AtomicListHead& other);
70  };
71 
72  // Push a node on a thread-local list. Returns true if local list
73  // was pushed global.
74  void push(Node* node);
75 
76  // Collect all thread local lists to a single local list.
77  // This function is threadsafe with concurrent push()es,
78  // but only a single thread may call collect() at a time.
79  void collect(ListHead& list);
80 
81  private:
82  // Push list to the global list.
83  void pushGlobal(ListHead& list);
84 
86 
87  struct TLHead : public AtomicListHead {
89 
90  public:
91  TLHead(ThreadCachedLists* parent) : parent_(parent) {}
92 
93  ~TLHead() {
94  parent_->ghead_->splice(*this);
95  }
96  };
97 
99 };
100 
101 // push() and splice() are optimistic w.r.t setting the list head: The
102 // first pusher cas's the list head, which functions as a lock until
103 // tail != null. The first pusher then sets tail_ = head_.
104 //
105 // splice() does the opposite: steals the tail_ via exchange, then
106 // unlocks the list again by setting head_ to null.
107 template <typename Tag>
109  DCHECK(node->next_ == nullptr);
110  static thread_local TLHead* cache_{nullptr};
111 
112  if (!cache_) {
113  auto l = lhead_.get();
114  if (!l) {
115  lhead_.reset(new TLHead(this));
116  l = lhead_.get();
117  DCHECK(l);
118  }
119  cache_ = l;
120  }
121 
122  while (true) {
123  auto head = cache_->head_.load(std::memory_order_relaxed);
124  if (!head) {
125  node->next_ = nullptr;
126  if (cache_->head_.compare_exchange_weak(head, node)) {
127  cache_->tail_.store(node);
128  break;
129  }
130  } else {
131  auto tail = cache_->tail_.load(std::memory_order_relaxed);
132  if (tail) {
133  node->next_ = tail;
134  if (cache_->tail_.compare_exchange_weak(node->next_, node)) {
135  break;
136  }
137  }
138  }
139  }
140 }
141 
142 template <typename Tag>
144  auto acc = lhead_.accessAllThreads();
145 
146  for (auto& thr : acc) {
147  list.splice(thr);
148  }
149 
150  list.splice(*ghead_.wlock());
151 }
152 
153 template <typename Tag>
155  if (other.head_ != nullptr) {
156  DCHECK(other.tail_ != nullptr);
157  } else {
158  DCHECK(other.tail_ == nullptr);
159  return;
160  }
161 
162  if (head_) {
163  DCHECK(tail_ != nullptr);
164  DCHECK(head_->next_ == nullptr);
165  head_->next_ = other.tail_;
166  head_ = other.head_;
167  } else {
168  DCHECK(head_ == nullptr);
169  head_ = other.head_;
170  tail_ = other.tail_;
171  }
172 
173  other.head_ = nullptr;
174  other.tail_ = nullptr;
175 }
176 
177 template <typename Tag>
179  ListHead local;
180 
181  auto tail = list.tail_.load();
182  if (tail) {
183  local.tail_ = list.tail_.exchange(nullptr);
184  local.head_ = list.head_.exchange(nullptr);
185  splice(local);
186  }
187 }
188 
189 } // namespace detail
190 } // namespace folly
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
folly::ThreadLocalPtr< TLHead, Tag > lhead_
folly::Synchronized< ListHead > ghead_
Encoder::MutableCompressedList list
Future< std::vector< typename std::iterator_traits< InputIterator >::value_type::value_type > > collect(InputIterator first, InputIterator last)
Definition: Future-inl.h:1536
folly::Function< void()> parent
Definition: AtFork.cpp:34
def next(obj)
Definition: ast.py:58