proxygen
folly::fibers::AtomicBatchDispatcher< InputT, ResultT > Class Template Reference

#include <AtomicBatchDispatcher.h>

Classes

struct  DispatchBaton
 
class  Token
 

Public Types

using DispatchFunctionT = folly::Function< std::vector< ResultT >(std::vector< InputT > &&)>
 

Public Member Functions

 AtomicBatchDispatcher (DispatchFunctionT &&dispatchFunc)
 
 ~AtomicBatchDispatcher ()
 
void reserve (size_t numEntries)
 
Token getToken ()
 
void commit ()
 
 AtomicBatchDispatcher (AtomicBatchDispatcher &&)=default
 
AtomicBatchDispatcheroperator= (AtomicBatchDispatcher &&)=default
 

Private Member Functions

 AtomicBatchDispatcher (const AtomicBatchDispatcher &)=delete
 
AtomicBatchDispatcheroperator= (const AtomicBatchDispatcher &)=delete
 

Private Attributes

size_t numTokensIssued_
 
std::shared_ptr< DispatchBatonbaton_
 

Friends

struct DispatchBaton
 

Detailed Description

template<typename InputT, typename ResultT>
class folly::fibers::AtomicBatchDispatcher< InputT, ResultT >

AtomicBatchDispatcher should be used if you want to process fiber tasks in parallel, but require to synchronize them at some point. The canonical example is to create a database transaction dispatch round. This API notably enforces that all tasks in the batch have reached the synchronization point before the user provided dispatch function is called with all the inputs provided in one function call. It also provides a guarantee that the inputs in the vector of inputs passed to the user provided dispatch function will be in the same order as the order in which the token for the job was issued.

Use this when you want all the inputs in the batch to be processed by a single function call to the user provided dispatch function. The user provided dispatch function takes a vector of InputT as input and returns a vector of ResultT. To use an AtomicBatchDispatcher, create it by providing a dispatch function: TO EITHER the constructor of the AtomicBatchDispatcher class (can call reserve method on the dispatcher to reserve space (for number of inputs expected)), OR the createAtomicBatchDispatcher function in folly::fibers namespace (optionally specify an initial capacity (for number of inputs expected)). The AtomicBatchDispatcher object created using this call (dispatcher), is the only object that can issue tokens (Token objects) that are used to add an input to the batch. A single Token is issued when the user calls the getToken function on the dispatcher. Token objects cannot be copied (can only be moved). User can call the public dispatch function on the Token providing a single input value. The dispatch function returns a folly::Future<ResultT> value that the user can then wait on to obtain a ResultT value. The ResultT value will only be available once the dispatch function has been called on all the Tokens in the batch and the user has called dispatcher.commit() to indicate no more batched transactions are to be added. User code pertaining to a task can be run between the point where a token for the task has been issued and before calling the dispatch function on the token. Since this code can potentially throw, the token issued for a task should be moved into this processing code in such a way that if an exception is thrown and then handled, the token object for the task is destroyed. The batch query dispatcher will wait until all tokens have either been destroyed or have had the dispatch function called on them. Leaking an issued token will cause the batch dispatch to wait forever to happen.

The AtomicBatchDispatcher object is referred to as the dispatcher below.

POSSIBLE ERRORS: 1) The dispatcher is destroyed before calling commit on it, for example because the user forgot to call commit OR an exception was thrown in user code before the call to commit:

  • The future ResultT has an exception of type ABDCommitNotCalledException set for all tokens that were issued by the dispatcher (once all tokens are either destroyed or have called dispatch) 2) Calling the dispatch function more than once on the same Token object (or a moved version of the same Token):
  • Subsequent calls to dispatch (after the first one) will throw an ABDUsageException exception (the batch itself will not have any errors and will get processed) 3) One/more of the Tokens issued are destroyed before calling dispatch on it/them:
  • The future ResultT has an ABDTokenNotDispatchedException set for all tokens that were issued by the dispatcher (once all tokens are either destroyed or have called dispatch) 4) dispatcher.getToken() is called after calling dispatcher.commit()
  • the call to getToken() will throw an ABDUsageException exception (the batch itself will not have any errors and will get processed). 5) All tokens were issued and called dispatch, the user provided batch dispatch function is called, but that function throws any exception.
  • The future ResultT has exception for all tokens that were issued by the dispatcher. The result will contain the wrapped user exception.

EXAMPLE (There are other ways to achieve this, but this is one example):

  • User creates an AtomicBatchDispatcher on stack auto dispatcher = folly::fibers::createAtomicBatchDispatcher(dispatchFunc, count);
  • User creates "count" number of token objects by calling "getToken" count number of times std::vector<Job> jobs; for (size_t i = 0; i < count; ++i) { auto token = dispatcher.getToken(); jobs.push_back(Job(std::move(token), singleInputValueToProcess); }
  • User calls commit() on the dispatcher to indicate that no new tokens will be issued for this batch dispatcher.commit();
  • Use any single threaded executor that will process the jobs
  • On each execution (fiber) preprocess a single "Job" that has been moved in from the original vector "jobs". This way if the preprocessing throws the Job object being processed is destroyed and so is the token.
  • On each execution (fiber) call the dispatch on the token auto future = job.token.dispatch(job.input);
  • Save the future returned so that eventually you can wait on the results ResultT result; try { result = future.value(); // future.hasValue() is true } catch (...) { // future.hasException() is true <DO whatever="" you="" want="" in="" case="" of="" error>=""> } }

NOTES:

Definition at line 168 of file AtomicBatchDispatcher.h.

Member Typedef Documentation

template<typename InputT, typename ResultT>
using folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::DispatchFunctionT = folly::Function<std::vector<ResultT>(std::vector<InputT>&&)>

Definition at line 175 of file AtomicBatchDispatcher.h.

Constructor & Destructor Documentation

template<typename InputT , typename ResultT >
folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::AtomicBatchDispatcher ( DispatchFunctionT &&  dispatchFunc)
explicit

Definition at line 171 of file AtomicBatchDispatcher-inl.h.

173  : numTokensIssued_(0),
174  baton_(std::make_shared<DispatchBaton>(std::move(dispatchFunc))) {}
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::shared_ptr< DispatchBaton > baton_
template<typename InputT , typename ResultT >
folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::~AtomicBatchDispatcher ( )

Definition at line 177 of file AtomicBatchDispatcher-inl.h.

References folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::baton_, and folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::commit().

177  {
178  if (baton_) {
179  // Set error here rather than throw because we do not want to throw from
180  // the destructor of AtomicBatchDispatcher
181  baton_->setExceptionWrapper(
182  folly::make_exception_wrapper<ABDCommitNotCalledException>());
183  commit();
184  }
185 }
std::shared_ptr< DispatchBaton > baton_
template<typename InputT, typename ResultT>
folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::AtomicBatchDispatcher ( AtomicBatchDispatcher< InputT, ResultT > &&  )
default
template<typename InputT, typename ResultT>
folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::AtomicBatchDispatcher ( const AtomicBatchDispatcher< InputT, ResultT > &  )
privatedelete

Member Function Documentation

template<typename InputT , typename ResultT >
void folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::commit ( )

Definition at line 204 of file AtomicBatchDispatcher-inl.h.

References folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::baton_, folly::gen::move, and folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::numTokensIssued_.

Referenced by folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::~AtomicBatchDispatcher().

204  {
205  auto baton = std::move(baton_);
206  if (!baton) {
207  throw ABDUsageException(
208  "Cannot call commit() more than once on the same dispatcher");
209  }
210  baton->setExpectedCount(numTokensIssued_);
211 }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::shared_ptr< DispatchBaton > baton_
template<typename InputT , typename ResultT >
auto folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::getToken ( )

Definition at line 196 of file AtomicBatchDispatcher-inl.h.

References folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::baton_, and folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::numTokensIssued_.

Referenced by AtomicBatchDispatcherTesting::createJobs().

196  {
197  if (!baton_) {
198  throw ABDUsageException("Cannot issue more tokens after calling commit()");
199  }
200  return Token(baton_, numTokensIssued_++);
201 }
std::shared_ptr< DispatchBaton > baton_
template<typename InputT, typename ResultT>
AtomicBatchDispatcher& folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::operator= ( AtomicBatchDispatcher< InputT, ResultT > &&  )
default
template<typename InputT, typename ResultT>
AtomicBatchDispatcher& folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::operator= ( const AtomicBatchDispatcher< InputT, ResultT > &  )
privatedelete
template<typename InputT , typename ResultT >
void folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::reserve ( size_t  numEntries)

Definition at line 188 of file AtomicBatchDispatcher-inl.h.

References folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::baton_.

Referenced by folly::fibers::createAtomicBatchDispatcher().

188  {
189  if (!baton_) {
190  throw ABDUsageException("Cannot call reserve(....) after calling commit()");
191  }
192  baton_->reserve(numEntries);
193 }
std::shared_ptr< DispatchBaton > baton_

Friends And Related Function Documentation

template<typename InputT, typename ResultT>
friend struct DispatchBaton
friend

Definition at line 170 of file AtomicBatchDispatcher.h.

Member Data Documentation

template<typename InputT, typename ResultT>
size_t folly::fibers::AtomicBatchDispatcher< InputT, ResultT >::numTokensIssued_
private

The documentation for this class was generated from the following files: