proxygen
Rcu-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/Function.h>
18 #include <folly/detail/AtFork.h>
20 
21 namespace folly {
22 
23 template <typename Tag>
24 bool rcu_domain<Tag>::singleton_ = false;
25 
26 template <typename Tag>
29  // Please use a unique tag for each domain.
30  CHECK(!singleton_);
31  singleton_ = true;
32 
33  // Register fork handlers. Holding read locks across fork is not
34  // supported. Using read locks in other atfork handlers is not
35  // supported. Other atfork handlers launching new child threads
36  // that use read locks *is* supported.
38  this,
39  [this]() { return syncMutex_.try_lock(); },
40  [this]() { syncMutex_.unlock(); },
41  [this]() {
42  counters_.resetAfterFork();
43  syncMutex_.unlock();
44  });
45 }
46 
47 template <typename Tag>
50 }
51 
52 template <typename Tag>
54  auto idx = version_.load(std::memory_order_acquire);
55  idx &= 1;
56  counters_.increment(idx);
57 
58  return idx;
59 }
60 
61 template <typename Tag>
63  DCHECK(0 == token.epoch_ || 1 == token.epoch_);
64  counters_.decrement(token.epoch_);
65 }
66 
67 template <typename Tag>
68 template <typename T>
69 void rcu_domain<Tag>::call(T&& cbin) {
70  auto node = new list_node;
71  node->cb_ = [node, cb = std::forward<T>(cbin)]() {
72  cb();
73  delete node;
74  };
75  retire(node);
76 }
77 
78 template <typename Tag>
80  q_.push(node);
81 
82  // Note that it's likely we hold a read lock here,
83  // so we can only half_sync(false). half_sync(true)
84  // or a synchronize() call might block forever.
85  uint64_t time = std::chrono::duration_cast<std::chrono::milliseconds>(
86  std::chrono::steady_clock::now().time_since_epoch())
87  .count();
88  auto syncTime = syncTime_.load(std::memory_order_relaxed);
89  if (time > syncTime + syncTimePeriod_ &&
90  syncTime_.compare_exchange_strong(
91  syncTime, time, std::memory_order_relaxed)) {
92  list_head finished;
93  {
94  std::lock_guard<std::mutex> g(syncMutex_);
95  half_sync(false, finished);
96  }
97  // callbacks are called outside of syncMutex_
98  finished.forEach(
99  [&](list_node* item) { executor_->add(std::move(item->cb_)); });
100  }
101 }
102 
103 template <typename Tag>
105  auto curr = version_.load(std::memory_order_acquire);
106  // Target is two epochs away.
107  auto target = curr + 2;
108  while (true) {
109  // Try to assign ourselves to do the sync work.
110  // If someone else is already assigned, we can wait for
111  // the work to be finished by waiting on turn_.
112  auto work = work_.load(std::memory_order_acquire);
113  auto tmp = work;
114  if (work < target && work_.compare_exchange_strong(tmp, target)) {
115  list_head finished;
116  {
117  std::lock_guard<std::mutex> g(syncMutex_);
118  while (version_.load(std::memory_order_acquire) < target) {
119  half_sync(true, finished);
120  }
121  }
122  // callbacks are called outside of syncMutex_
123  finished.forEach(
124  [&](list_node* node) { executor_->add(std::move(node->cb_)); });
125  return;
126  } else {
127  if (version_.load(std::memory_order_acquire) >= target) {
128  return;
129  }
130  std::atomic<uint32_t> cutoff{100};
131  // Wait for someone to finish the work.
132  turn_.tryWaitForTurn(work, cutoff, false);
133  }
134  }
135 }
136 
137 /*
138  * Not multithread safe, but it could be with proper version
139  * checking and stronger increment of version. See
140  * https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/gracesharingurcu-2016.pdf
141  *
142  * This version, however, can go to sleep if there are outstanding
143  * readers, and does not spin or need rescheduling, unless blocking = false.
144  */
145 template <typename Tag>
146 void rcu_domain<Tag>::half_sync(bool blocking, list_head& finished) {
147  uint64_t curr = version_.load(std::memory_order_acquire);
148  auto next = curr + 1;
149 
150  // Push all work to a queue for moving through two epochs. One
151  // version is not enough because of late readers of the version_
152  // counter in lock_shared.
153  //
154  // Note that for a similar reason we can't swap out the q here,
155  // and instead drain it, so concurrent calls to call() are safe,
156  // and will wait for the next epoch.
157  q_.collect(queues_[0]);
158 
159  if (blocking) {
160  counters_.waitForZero(next & 1);
161  } else {
162  if (counters_.readFull(next & 1) != 0) {
163  return;
164  }
165  }
166 
167  // Run callbacks that have been through two epochs, and swap queues
168  // for those only through a single epoch.
169  finished.splice(queues_[1]);
170  queues_[1].splice(queues_[0]);
171 
172  version_.store(next, std::memory_order_release);
173  // Notify synchronous waiters in synchronize().
174  turn_.completeTurn(curr);
175 }
176 
177 } // namespace folly
void half_sync(bool blocking, list_head &cbs)
Definition: Rcu-inl.h:146
static QueuedImmediateExecutor & instance()
static bool singleton_
Definition: Rcu.h:369
typename detail::ThreadCachedLists< Tag >::ListHead list_head
Definition: Rcu.h:312
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::chrono::steady_clock::time_point now()
FOLLY_ALWAYS_INLINE void unlock_shared(rcu_token &&)
Definition: Rcu-inl.h:62
folly::std T
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
rcu_domain(Executor *executor=nullptr) noexcept
Definition: Rcu-inl.h:27
requires E e noexcept(noexcept(s.error(std::move(e))))
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
void call(T &&cbin)
Definition: Rcu-inl.h:69
FOLLY_ALWAYS_INLINE rcu_token lock_shared()
Definition: Rcu-inl.h:53
void synchronize() noexcept
Definition: Rcu-inl.h:104
int * count
g_t g(f_t)
typename detail::ThreadCachedLists< Tag >::Node list_node
Definition: Rcu.h:313
static void unregisterHandler(void *object)
Definition: AtFork.cpp:118
static void registerHandler(void *object, folly::Function< bool()> prepare, folly::Function< void()> parent, folly::Function< void()> child)
Definition: AtFork.cpp:108
std::chrono::nanoseconds time()
void retire(list_node *node) noexcept
Definition: Rcu-inl.h:79
def next(obj)
Definition: ast.py:58