proxygen
FlatCombining.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/Function.h>
20 #include <folly/IndexedMemPool.h>
21 #include <folly/Portability.h>
24 
25 #include <atomic>
26 #include <cassert>
27 #include <mutex>
28 #include <thread>
29 
30 namespace folly {
31 
60 
103 
104 template <
105  typename T, // concurrent data structure using FC interface
106  typename Mutex = std::mutex,
107  template <typename> class Atom = std::atomic,
108  typename Req = /* default dummy type */ bool>
111 
112  public:
114  class Rec {
119  size_t index_;
120  size_t next_;
122  Req req_;
124 
125  public:
126  Rec() {
127  setDone();
128  setDisconnected();
129  }
130 
131  void setValid() {
132  valid_.post();
133  }
134 
135  void clearValid() {
136  valid_.reset();
137  }
138 
139  bool isValid() const {
140  return valid_.ready();
141  }
142 
143  void setDone() {
144  done_.post();
145  }
146 
147  void clearDone() {
148  done_.reset();
149  }
150 
151  bool isDone() const {
152  return done_.ready();
153  }
154 
155  void awaitDone() {
156  done_.wait();
157  }
158 
160  disconnected_.post();
161  }
162 
164  disconnected_.reset();
165  }
166 
167  bool isDisconnected() const {
168  return disconnected_.ready();
169  }
170 
171  void setIndex(const size_t index) {
172  index_ = index;
173  }
174 
175  size_t getIndex() const {
176  return index_;
177  }
178 
179  void setNext(const size_t next) {
180  next_ = next;
181  }
182 
183  size_t getNext() const {
184  return next_;
185  }
186 
187  void setLast(const uint64_t pass) {
188  last_ = pass;
189  }
190 
191  uint64_t getLast() const {
192  return last_;
193  }
194 
195  Req& getReq() {
196  return req_;
197  }
198 
199  template <typename Func>
200  void setFn(Func&& fn) {
201  static_assert(
202  std::is_nothrow_constructible<
203  folly::Function<void()>,
204  _t<std::decay<Func>>>::value,
205  "Try using a smaller function object that can fit in folly::Function "
206  "without allocation, or use the custom interface of requestFC() to "
207  "manage the requested function's arguments and results explicitly "
208  "in a custom request structure without allocation.");
209  fn_ = std::forward<Func>(fn);
210  assert(fn_);
211  }
212 
213  void clearFn() {
214  fn_ = {};
215  assert(!fn_);
216  }
217 
219  return fn_;
220  }
221 
222  void complete() {
223  clearValid();
224  assert(!isDone());
225  setDone();
226  }
227  };
228 
229  using Pool = folly::
231 
232  public:
239  explicit FlatCombining(
240  const bool dedicated = true,
241  const uint32_t numRecs = 0, // number of combining records
242  const uint32_t maxOps = 0 // hint of max ops per combining session
243  )
244  : numRecs_(numRecs == 0 ? kDefaultNumRecs : numRecs),
245  maxOps_(maxOps == 0 ? kDefaultMaxOps : maxOps),
246  recs_(NULL_INDEX),
249  if (dedicated_) {
250  // dedicated combiner thread
251  combiner_ = std::thread([this] { dedicatedCombining(); });
252  }
253  }
254 
259  if (dedicated_) {
260  shutdown();
261  combiner_.join();
262  } else {
263  drainAll();
264  }
265  }
266 
267  // Wait for all pending operations to complete. Useful primarily
268  // when there are asynchronous operations without a dedicated
269  // combiner.
270  void drainAll() {
271  for (size_t i = getRecsHead(); i != NULL_INDEX; i = nextIndex(i)) {
272  Rec& rec = recsPool_[i];
273  awaitDone(rec);
274  }
275  }
276 
277  // Give the caller exclusive access.
279  m_.lock();
280  }
281 
282  // Try to give the caller exclusive access. Returns true iff successful.
283  bool tryExclusive() {
284  return m_.try_lock();
285  }
286 
287  // Release exclusive access. The caller must have exclusive access.
289  m_.unlock();
290  }
291 
292  // Give the lock holder ownership of the mutex and exclusive access.
293  // No need for explicit release.
294  template <typename LockHolder>
295  void holdLock(LockHolder& l) {
296  l = LockHolder(m_);
297  }
298 
299  // Give the caller's lock holder ownership of the mutex but without
300  // exclusive access. The caller can later use the lock holder to try
301  // to acquire exclusive access.
302  template <typename LockHolder>
303  void holdLock(LockHolder& l, std::defer_lock_t) {
304  l = LockHolder(m_, std::defer_lock);
305  }
306 
307  // Execute an operation without combining
308  template <typename OpFunc>
309  void requestNoFC(OpFunc& opFn) {
310  std::lock_guard<Mutex> guard(m_);
311  opFn();
312  }
313 
314  // This function first tries to execute the operation without
315  // combining. If unuccessful, it allocates a combining record if
316  // needed. If there are no available records, it waits for exclusive
317  // access and executes the operation. If a record is available and
318  // ready for use, it fills the record and indicates that the request
319  // is valid for combining. If the request is synchronous (by default
320  // or necessity), it waits for the operation to be completed by a
321  // combiner and optionally extracts the result, if any.
322  //
323  // This function can be called in several forms:
324  // Simple forms that do not require the user to define a Req structure
325  // or to override any request processing member functions:
326  // requestFC(opFn)
327  // requestFC(opFn, rec) // provides its own pre-allocated record
328  // requestFC(opFn, rec, syncop) // asynchronous if syncop == false
329  // Custom forms that require the user to define a Req structure and to
330  // override some request processing member functions:
331  // requestFC(opFn, fillFn)
332  // requestFC(opFn, fillFn, rec)
333  // requestFC(opFn, fillFn, rec, syncop)
334  // requestFC(opFn, fillFn, resFn)
335  // requestFC(opFn, fillFn, resFn, rec)
336  template <typename OpFunc>
337  void requestFC(OpFunc&& opFn, Rec* rec = nullptr, bool syncop = true) {
338  auto dummy = [](Req&) {};
339  requestOp(
340  std::forward<OpFunc>(opFn),
341  dummy /* fillFn */,
342  dummy /* resFn */,
343  rec,
344  syncop,
345  false /* simple */);
346  }
347  template <typename OpFunc, typename FillFunc>
348  void requestFC(
349  OpFunc&& opFn,
350  const FillFunc& fillFn,
351  Rec* rec = nullptr,
352  bool syncop = true) {
353  auto dummy = [](Req&) {};
354  requestOp(
355  std::forward<OpFunc>(opFn),
356  fillFn,
357  dummy /* resFn */,
358  rec,
359  syncop,
360  true /* custom */);
361  }
362  template <typename OpFunc, typename FillFunc, typename ResFn>
363  void requestFC(
364  OpFunc&& opFn,
365  const FillFunc& fillFn,
366  const ResFn& resFn,
367  Rec* rec = nullptr) {
368  // must wait for result to execute resFn -- so it must be synchronous
369  requestOp(
370  std::forward<OpFunc>(opFn),
371  fillFn,
372  resFn,
373  rec,
374  true /* sync */,
375  true /* custom*/);
376  }
377 
378  // Allocate a record.
380  auto idx = recsPool_.allocIndex();
381  if (idx == NULL_INDEX) {
382  return nullptr;
383  }
384  Rec& rec = recsPool_[idx];
385  rec.setIndex(idx);
386  return &rec;
387  }
388 
389  // Free a record
390  void freeRec(Rec* rec) {
391  if (rec == nullptr) {
392  return;
393  }
394  auto idx = rec->getIndex();
395  recsPool_.recycleIndex(idx);
396  }
397 
398  // Returns the number of uncombined operations so far.
400  return uncombined_;
401  }
402 
403  // Returns the number of combined operations so far.
405  return combined_;
406  }
407 
408  // Returns the number of combining passes so far.
410  return passes_;
411  }
412 
413  // Returns the number of combining sessions so far.
415  return sessions_;
416  }
417 
418  protected:
419  const size_t NULL_INDEX = 0;
423 
425 
428  Atom<bool> shutdown_{false};
429 
432  Atom<size_t> recs_;
434  std::thread combiner_;
436 
441 
442  template <typename OpFunc, typename FillFunc, typename ResFn>
443  void requestOp(
444  OpFunc&& opFn,
445  const FillFunc& fillFn,
446  const ResFn& resFn,
447  Rec* rec,
448  bool syncop,
449  const bool custom) {
450  std::unique_lock<Mutex> l(this->m_, std::defer_lock);
451  if (l.try_lock()) {
452  // No contention
453  ++uncombined_;
454  tryCombining();
455  opFn();
456  return;
457  }
458 
459  // Try FC
460  bool tc = (rec != nullptr);
461  if (!tc) {
462  // if an async op doesn't have a thread-cached record then turn
463  // it into a synchronous op.
464  syncop = true;
465  rec = allocRec();
466  }
467  if (rec == nullptr) {
468  // Can't use FC - Must acquire lock
469  l.lock();
470  ++uncombined_;
471  tryCombining();
472  opFn();
473  return;
474  }
475 
476  // Use FC
477  // Wait if record is in use
478  awaitDone(*rec);
479  rec->clearDone();
480  // Fill record
481  if (custom) {
482  // Fill the request (custom)
483  Req& req = rec->getReq();
484  fillFn(req);
485  rec->clearFn();
486  } else {
487  rec->setFn(std::forward<OpFunc>(opFn));
488  }
489  // Indicate that record is valid
490  assert(!rec->isValid());
491  rec->setValid();
492  // end of combining critical path
493  setPending();
494  // store-load order setValid before isDisconnected
495  std::atomic_thread_fence(std::memory_order_seq_cst);
496  if (rec->isDisconnected()) {
497  rec->clearDisconnected();
498  pushRec(rec->getIndex());
499  setPending();
500  }
501  // If synchronous wait for the request to be completed
502  if (syncop) {
503  awaitDone(*rec);
504  if (custom) {
505  Req& req = rec->getReq();
506  resFn(req); // Extract the result (custom)
507  }
508  if (!tc) {
509  freeRec(rec); // Free the temporary record.
510  }
511  }
512  }
513 
514  void pushRec(size_t idx) {
515  Rec& rec = recsPool_[idx];
516  while (true) {
517  auto head = recs_.load(std::memory_order_acquire);
518  rec.setNext(head); // there shouldn't be a data race here
519  if (recs_.compare_exchange_weak(head, idx)) {
520  return;
521  }
522  }
523  }
524 
525  size_t getRecsHead() {
526  return recs_.load(std::memory_order_acquire);
527  }
528 
529  size_t nextIndex(size_t idx) {
530  return recsPool_[idx].getNext();
531  }
532 
533  void clearPending() {
534  pending_.reset();
535  }
536 
537  void setPending() {
538  pending_.post();
539  }
540 
541  bool isPending() const {
542  return pending_.ready();
543  }
544 
545  void awaitPending() {
546  pending_.wait();
547  }
548 
550  uint64_t combined = 0;
551  do {
552  uint64_t count = static_cast<T*>(this)->combiningPass();
553  if (count == 0) {
554  break;
555  }
556  combined += count;
557  ++this->passes_;
558  } while (combined < this->maxOps_);
559  return combined;
560  }
561 
562  void tryCombining() {
563  if (!dedicated_) {
564  while (isPending()) {
565  clearPending();
566  ++sessions_;
567  combined_ += combiningSession();
568  }
569  }
570  }
571 
573  while (true) {
574  awaitPending();
575  clearPending();
576  if (shutdown_.load()) {
577  break;
578  }
579  while (true) {
580  uint64_t count;
581  ++sessions_;
582  {
583  std::lock_guard<Mutex> guard(m_);
584  count = combiningSession();
585  combined_ += count;
586  }
587  if (count < maxOps_) {
588  break;
589  }
590  }
591  }
592  }
593 
594  void awaitDone(Rec& rec) {
595  if (dedicated_) {
596  rec.awaitDone();
597  } else {
598  awaitDoneTryLock(rec);
599  }
600  }
601 
605  void awaitDoneTryLock(Rec& rec) {
606  assert(!dedicated_);
607  int count = 0;
608  while (!rec.isDone()) {
609  if (count == 0) {
610  std::unique_lock<Mutex> l(m_, std::defer_lock);
611  if (l.try_lock()) {
612  setPending();
613  tryCombining();
614  }
615  } else {
617  if (++count == 1000) {
618  count = 0;
619  }
620  }
621  }
622  }
623 
624  void shutdown() {
625  shutdown_.store(true);
626  setPending();
627  }
628 
630 
631  void combinedOp(Req&) {
632  throw std::runtime_error(
633  "FlatCombining::combinedOp(Req&) must be overridden in the derived"
634  " class if called.");
635  }
636 
637  void processReq(Rec& rec) {
638  SavedFn& opFn = rec.getFn();
639  if (opFn) {
640  // simple interface
641  opFn();
642  } else {
643  // custom interface
644  Req& req = rec.getReq();
645  static_cast<T*>(this)->combinedOp(req); // defined in derived class
646  }
647  rec.setLast(passes_);
648  rec.complete();
649  }
650 
652  uint64_t count = 0;
653  auto idx = getRecsHead();
654  Rec* prev = nullptr;
655  while (idx != NULL_INDEX) {
656  Rec& rec = recsPool_[idx];
657  auto next = rec.getNext();
658  bool valid = rec.isValid();
659  if (!valid && (passes_ - rec.getLast() > kIdleThreshold) &&
660  (prev != nullptr)) {
661  // Disconnect
662  prev->setNext(next);
663  rec.setDisconnected();
664  // store-load order setDisconnected before isValid
665  std::atomic_thread_fence(std::memory_order_seq_cst);
666  valid = rec.isValid();
667  } else {
668  prev = &rec;
669  }
670  if (valid) {
671  processReq(rec);
672  ++count;
673  }
674  idx = next;
675  }
676  return count;
677  }
678 };
679 
680 } // namespace folly
FOLLY_ALWAYS_INLINE bool ready() const noexcept
FOLLY_ALWAYS_INLINE void wait(const WaitOptions &opt=wait_options()) noexcept
void pushRec(size_t idx)
folly::SaturatingSemaphore< true, Atom > pending_
FlatCombining(const bool dedicated=true, const uint32_t numRecs=0, const uint32_t maxOps=0)
void holdLock(LockHolder &l, std::defer_lock_t)
const uint32_t kDefaultMaxOps
bool isPending() const
void requestFC(OpFunc &&opFn, const FillFunc &fillFn, Rec *rec=nullptr, bool syncop=true)
size_t getIndex() const
uint64_t getNumPasses() const
void requestNoFC(OpFunc &opFn)
#define Mutex
bool isDisconnected() const
folly::std T
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
folly::SaturatingSemaphore< false, Atom > valid_
size_t nextIndex(size_t idx)
folly::SaturatingSemaphore< false, Atom > disconnected_
void freeRec(Rec *rec)
static bool dedicated
FOLLY_ALWAYS_INLINE void post() noexcept
constexpr std::size_t hardware_destructive_interference_size
Definition: Align.h:107
void holdLock(LockHolder &l)
folly::SaturatingSemaphore< false, Atom > done_
uint64_t getNumUncombined() const
const uint64_t kDefaultNumRecs
void requestFC(OpFunc &&opFn, const FillFunc &fillFn, const ResFn &resFn, Rec *rec=nullptr)
void awaitDoneTryLock(Rec &rec)
uint64_t getNumCombined() const
uint64_t combiningSession()
typename T::type _t
Definition: Traits.h:171
#define Atom
void dummy()
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
static bool tc
void processReq(Rec &rec)
void setNext(const size_t next)
uint32_t allocIndex(Args &&...args)
Combining request record.
void combinedOp(Req &)
The following member functions may be overridden for customization.
void setLast(const uint64_t pass)
const uint64_t kIdleThreshold
int * count
std::mutex mutex
const size_t NULL_INDEX
Atom< bool > shutdown_
uint64_t getLast() const
uint64_t getNumSessions() const
void requestFC(OpFunc &&opFn, Rec *rec=nullptr, bool syncop=true)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
void recycleIndex(uint32_t idx)
Gives up ownership previously granted by alloc()
void requestOp(OpFunc &&opFn, const FillFunc &fillFn, const ResFn &resFn, Rec *rec, bool syncop, const bool custom)
Atom< size_t > recs_
uint64_t combiningPass()
void setIndex(const size_t index)
def next(obj)
Definition: ast.py:58
void awaitDone(Rec &rec)
void asm_volatile_pause()
Definition: Asm.h:37