proxygen
folly::rcu_domain< Tag > Class Template Reference

#include <Rcu.h>

Public Member Functions

 rcu_domain (Executor *executor=nullptr) noexcept
 
 rcu_domain (const rcu_domain &)=delete
 
 rcu_domain (rcu_domain &&)=delete
 
rcu_domainoperator= (const rcu_domain &)=delete
 
rcu_domainoperator= (rcu_domain &&)=delete
 
 ~rcu_domain ()
 
FOLLY_ALWAYS_INLINE rcu_token lock_shared ()
 
FOLLY_ALWAYS_INLINE void unlock_shared (rcu_token &&)
 
template<typename T >
void call (T &&cbin)
 
void retire (list_node *node) noexcept
 
void synchronize () noexcept
 

Private Types

using list_head = typename detail::ThreadCachedLists< Tag >::ListHead
 
using list_node = typename detail::ThreadCachedLists< Tag >::Node
 

Private Member Functions

void half_sync (bool blocking, list_head &cbs)
 

Private Attributes

detail::ThreadCachedInts< Tagcounters_
 
std::atomic< uint64_tversion_ {0}
 
std::atomic< uint64_twork_ {0}
 
detail::TurnSequencer< std::atomic > turn_
 
std::mutex syncMutex_
 
std::atomic< uint64_tsyncTime_ {0}
 
detail::ThreadCachedLists< Tagq_
 
Executorexecutor_ {nullptr}
 
list_head queues_ [2] {}
 

Static Private Attributes

static constexpr uint64_t syncTimePeriod_ {1600 * 2 }
 
static bool singleton_ = false
 

Detailed Description

template<typename Tag>
class folly::rcu_domain< Tag >

Definition at line 287 of file Rcu.h.

Member Typedef Documentation

template<typename Tag>
using folly::rcu_domain< Tag >::list_head = typename detail::ThreadCachedLists<Tag>::ListHead
private

Definition at line 312 of file Rcu.h.

template<typename Tag>
using folly::rcu_domain< Tag >::list_node = typename detail::ThreadCachedLists<Tag>::Node
private

Definition at line 313 of file Rcu.h.

Constructor & Destructor Documentation

template<typename Tag >
folly::rcu_domain< Tag >::rcu_domain ( Executor executor = nullptr)
noexcept

Definition at line 27 of file Rcu-inl.h.

References folly::detail::AtFork::registerHandler().

29  // Please use a unique tag for each domain.
30  CHECK(!singleton_);
31  singleton_ = true;
32 
33  // Register fork handlers. Holding read locks across fork is not
34  // supported. Using read locks in other atfork handlers is not
35  // supported. Other atfork handlers launching new child threads
36  // that use read locks *is* supported.
38  this,
39  [this]() { return syncMutex_.try_lock(); },
40  [this]() { syncMutex_.unlock(); },
41  [this]() {
42  counters_.resetAfterFork();
43  syncMutex_.unlock();
44  });
45 }
Executor * executor_
Definition: Rcu.h:368
static QueuedImmediateExecutor & instance()
std::mutex syncMutex_
Definition: Rcu.h:359
static bool singleton_
Definition: Rcu.h:369
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
detail::ThreadCachedInts< Tag > counters_
Definition: Rcu.h:350
static void registerHandler(void *object, folly::Function< bool()> prepare, folly::Function< void()> parent, folly::Function< void()> child)
Definition: AtFork.cpp:108
template<typename Tag>
folly::rcu_domain< Tag >::rcu_domain ( const rcu_domain< Tag > &  )
delete
template<typename Tag>
folly::rcu_domain< Tag >::rcu_domain ( rcu_domain< Tag > &&  )
delete
template<typename Tag >
folly::rcu_domain< Tag >::~rcu_domain ( )

Definition at line 48 of file Rcu-inl.h.

References folly::detail::AtFork::unregisterHandler().

48  {
50 }
static void unregisterHandler(void *object)
Definition: AtFork.cpp:118

Member Function Documentation

template<typename Tag >
template<typename T >
void folly::rcu_domain< Tag >::call ( T &&  cbin)

Definition at line 69 of file Rcu-inl.h.

Referenced by folly::rcu_retire().

69  {
70  auto node = new list_node;
71  node->cb_ = [node, cb = std::forward<T>(cbin)]() {
72  cb();
73  delete node;
74  };
75  retire(node);
76 }
typename detail::ThreadCachedLists< Tag >::Node list_node
Definition: Rcu.h:313
void retire(list_node *node) noexcept
Definition: Rcu-inl.h:79
template<typename Tag >
void folly::rcu_domain< Tag >::half_sync ( bool  blocking,
list_head cbs 
)
private

Definition at line 146 of file Rcu-inl.h.

References cpp.ast::next(), and uint64_t.

146  {
147  uint64_t curr = version_.load(std::memory_order_acquire);
148  auto next = curr + 1;
149 
150  // Push all work to a queue for moving through two epochs. One
151  // version is not enough because of late readers of the version_
152  // counter in lock_shared.
153  //
154  // Note that for a similar reason we can't swap out the q here,
155  // and instead drain it, so concurrent calls to call() are safe,
156  // and will wait for the next epoch.
157  q_.collect(queues_[0]);
158 
159  if (blocking) {
160  counters_.waitForZero(next & 1);
161  } else {
162  if (counters_.readFull(next & 1) != 0) {
163  return;
164  }
165  }
166 
167  // Run callbacks that have been through two epochs, and swap queues
168  // for those only through a single epoch.
169  finished.splice(queues_[1]);
170  queues_[1].splice(queues_[0]);
171 
172  version_.store(next, std::memory_order_release);
173  // Notify synchronous waiters in synchronize().
174  turn_.completeTurn(curr);
175 }
list_head queues_[2]
Definition: Rcu.h:372
void completeTurn(const uint32_t turn) noexcept
Unblocks a thread running waitForTurn(turn + 1)
std::atomic< uint64_t > version_
Definition: Rcu.h:352
detail::ThreadCachedInts< Tag > counters_
Definition: Rcu.h:350
detail::TurnSequencer< std::atomic > turn_
Definition: Rcu.h:357
detail::ThreadCachedLists< Tag > q_
Definition: Rcu.h:366
def next(obj)
Definition: ast.py:58
template<typename Tag >
rcu_token folly::rcu_domain< Tag >::lock_shared ( )

Definition at line 53 of file Rcu-inl.h.

53  {
54  auto idx = version_.load(std::memory_order_acquire);
55  idx &= 1;
56  counters_.increment(idx);
57 
58  return idx;
59 }
std::atomic< uint64_t > version_
Definition: Rcu.h:352
detail::ThreadCachedInts< Tag > counters_
Definition: Rcu.h:350
template<typename Tag>
rcu_domain& folly::rcu_domain< Tag >::operator= ( const rcu_domain< Tag > &  )
delete
template<typename Tag>
rcu_domain& folly::rcu_domain< Tag >::operator= ( rcu_domain< Tag > &&  )
delete
template<typename Tag >
void folly::rcu_domain< Tag >::retire ( list_node node)
noexcept

Definition at line 79 of file Rcu-inl.h.

References count, g(), folly::gen::move, now(), folly::detail::distributed_mutex::time(), and uint64_t.

Referenced by folly::rcu_obj_base< T, D, Tag >::retire().

79  {
80  q_.push(node);
81 
82  // Note that it's likely we hold a read lock here,
83  // so we can only half_sync(false). half_sync(true)
84  // or a synchronize() call might block forever.
85  uint64_t time = std::chrono::duration_cast<std::chrono::milliseconds>(
86  std::chrono::steady_clock::now().time_since_epoch())
87  .count();
88  auto syncTime = syncTime_.load(std::memory_order_relaxed);
89  if (time > syncTime + syncTimePeriod_ &&
90  syncTime_.compare_exchange_strong(
91  syncTime, time, std::memory_order_relaxed)) {
93  {
94  std::lock_guard<std::mutex> g(syncMutex_);
95  half_sync(false, finished);
96  }
97  // callbacks are called outside of syncMutex_
98  finished.forEach(
99  [&](list_node* item) { executor_->add(std::move(item->cb_)); });
100  }
101 }
void half_sync(bool blocking, list_head &cbs)
Definition: Rcu-inl.h:146
Executor * executor_
Definition: Rcu.h:368
std::mutex syncMutex_
Definition: Rcu.h:359
typename detail::ThreadCachedLists< Tag >::ListHead list_head
Definition: Rcu.h:312
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::chrono::steady_clock::time_point now()
static constexpr uint64_t syncTimePeriod_
Definition: Rcu.h:363
virtual void add(Func)=0
std::atomic< uint64_t > syncTime_
Definition: Rcu.h:364
int * count
detail::ThreadCachedLists< Tag > q_
Definition: Rcu.h:366
g_t g(f_t)
typename detail::ThreadCachedLists< Tag >::Node list_node
Definition: Rcu.h:313
std::chrono::nanoseconds time()
template<typename Tag >
void folly::rcu_domain< Tag >::synchronize ( )
noexcept

Definition at line 104 of file Rcu-inl.h.

References g(), and folly::gen::move.

104  {
105  auto curr = version_.load(std::memory_order_acquire);
106  // Target is two epochs away.
107  auto target = curr + 2;
108  while (true) {
109  // Try to assign ourselves to do the sync work.
110  // If someone else is already assigned, we can wait for
111  // the work to be finished by waiting on turn_.
112  auto work = work_.load(std::memory_order_acquire);
113  auto tmp = work;
114  if (work < target && work_.compare_exchange_strong(tmp, target)) {
116  {
117  std::lock_guard<std::mutex> g(syncMutex_);
118  while (version_.load(std::memory_order_acquire) < target) {
119  half_sync(true, finished);
120  }
121  }
122  // callbacks are called outside of syncMutex_
123  finished.forEach(
124  [&](list_node* node) { executor_->add(std::move(node->cb_)); });
125  return;
126  } else {
127  if (version_.load(std::memory_order_acquire) >= target) {
128  return;
129  }
130  std::atomic<uint32_t> cutoff{100};
131  // Wait for someone to finish the work.
132  turn_.tryWaitForTurn(work, cutoff, false);
133  }
134  }
135 }
void half_sync(bool blocking, list_head &cbs)
Definition: Rcu-inl.h:146
TryWaitResult tryWaitForTurn(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, const std::chrono::time_point< Clock, Duration > *absTime=nullptr) noexcept
Executor * executor_
Definition: Rcu.h:368
std::mutex syncMutex_
Definition: Rcu.h:359
typename detail::ThreadCachedLists< Tag >::ListHead list_head
Definition: Rcu.h:312
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
virtual void add(Func)=0
std::atomic< uint64_t > version_
Definition: Rcu.h:352
detail::TurnSequencer< std::atomic > turn_
Definition: Rcu.h:357
g_t g(f_t)
std::atomic< uint64_t > work_
Definition: Rcu.h:354
typename detail::ThreadCachedLists< Tag >::Node list_node
Definition: Rcu.h:313
template<typename Tag >
void folly::rcu_domain< Tag >::unlock_shared ( rcu_token &&  token)

Definition at line 62 of file Rcu-inl.h.

62  {
63  DCHECK(0 == token.epoch_ || 1 == token.epoch_);
64  counters_.decrement(token.epoch_);
65 }
detail::ThreadCachedInts< Tag > counters_
Definition: Rcu.h:350

Member Data Documentation

template<typename Tag>
detail::ThreadCachedInts<Tag> folly::rcu_domain< Tag >::counters_
private

Definition at line 350 of file Rcu.h.

template<typename Tag>
Executor* folly::rcu_domain< Tag >::executor_ {nullptr}
private

Definition at line 368 of file Rcu.h.

template<typename Tag>
detail::ThreadCachedLists<Tag> folly::rcu_domain< Tag >::q_
private

Definition at line 366 of file Rcu.h.

template<typename Tag>
list_head folly::rcu_domain< Tag >::queues_[2] {}
private

Definition at line 372 of file Rcu.h.

template<typename Tag>
bool folly::rcu_domain< Tag >::singleton_ = false
staticprivate

Definition at line 369 of file Rcu.h.

template<typename Tag>
std::mutex folly::rcu_domain< Tag >::syncMutex_
private

Definition at line 359 of file Rcu.h.

template<typename Tag>
std::atomic<uint64_t> folly::rcu_domain< Tag >::syncTime_ {0}
private

Definition at line 364 of file Rcu.h.

template<typename Tag>
constexpr uint64_t folly::rcu_domain< Tag >::syncTimePeriod_ {1600 * 2 }
staticprivate

Definition at line 363 of file Rcu.h.

template<typename Tag>
detail::TurnSequencer<std::atomic> folly::rcu_domain< Tag >::turn_
private

Definition at line 357 of file Rcu.h.

template<typename Tag>
std::atomic<uint64_t> folly::rcu_domain< Tag >::version_ {0}
private

Definition at line 352 of file Rcu.h.

template<typename Tag>
std::atomic<uint64_t> folly::rcu_domain< Tag >::work_ {0}
private

Definition at line 354 of file Rcu.h.


The documentation for this class was generated from the following files: