template<typename InputT, typename ResultT>
class folly::fibers::AtomicBatchDispatcher< InputT, ResultT >
AtomicBatchDispatcher should be used if you want to process fiber tasks in parallel, but require to synchronize them at some point. The canonical example is to create a database transaction dispatch round. This API notably enforces that all tasks in the batch have reached the synchronization point before the user provided dispatch function is called with all the inputs provided in one function call. It also provides a guarantee that the inputs in the vector of inputs passed to the user provided dispatch function will be in the same order as the order in which the token for the job was issued.
Use this when you want all the inputs in the batch to be processed by a single function call to the user provided dispatch function. The user provided dispatch function takes a vector of InputT as input and returns a vector of ResultT. To use an AtomicBatchDispatcher, create it by providing a dispatch function: TO EITHER the constructor of the AtomicBatchDispatcher class (can call reserve method on the dispatcher to reserve space (for number of inputs expected)), OR the createAtomicBatchDispatcher function in folly::fibers namespace (optionally specify an initial capacity (for number of inputs expected)). The AtomicBatchDispatcher object created using this call (dispatcher), is the only object that can issue tokens (Token objects) that are used to add an input to the batch. A single Token is issued when the user calls the getToken function on the dispatcher. Token objects cannot be copied (can only be moved). User can call the public dispatch function on the Token providing a single input value. The dispatch function returns a folly::Future<ResultT> value that the user can then wait on to obtain a ResultT value. The ResultT value will only be available once the dispatch function has been called on all the Tokens in the batch and the user has called dispatcher.commit() to indicate no more batched transactions are to be added. User code pertaining to a task can be run between the point where a token for the task has been issued and before calling the dispatch function on the token. Since this code can potentially throw, the token issued for a task should be moved into this processing code in such a way that if an exception is thrown and then handled, the token object for the task is destroyed. The batch query dispatcher will wait until all tokens have either been destroyed or have had the dispatch function called on them. Leaking an issued token will cause the batch dispatch to wait forever to happen.
The AtomicBatchDispatcher object is referred to as the dispatcher below.
POSSIBLE ERRORS: 1) The dispatcher is destroyed before calling commit on it, for example because the user forgot to call commit OR an exception was thrown in user code before the call to commit:
- The future ResultT has an exception of type ABDCommitNotCalledException set for all tokens that were issued by the dispatcher (once all tokens are either destroyed or have called dispatch) 2) Calling the dispatch function more than once on the same Token object (or a moved version of the same Token):
- Subsequent calls to dispatch (after the first one) will throw an ABDUsageException exception (the batch itself will not have any errors and will get processed) 3) One/more of the Tokens issued are destroyed before calling dispatch on it/them:
- The future ResultT has an ABDTokenNotDispatchedException set for all tokens that were issued by the dispatcher (once all tokens are either destroyed or have called dispatch) 4) dispatcher.getToken() is called after calling dispatcher.commit()
- the call to getToken() will throw an ABDUsageException exception (the batch itself will not have any errors and will get processed). 5) All tokens were issued and called dispatch, the user provided batch dispatch function is called, but that function throws any exception.
- The future ResultT has exception for all tokens that were issued by the dispatcher. The result will contain the wrapped user exception.
EXAMPLE (There are other ways to achieve this, but this is one example):
- User creates an AtomicBatchDispatcher on stack auto dispatcher = folly::fibers::createAtomicBatchDispatcher(dispatchFunc, count);
- User creates "count" number of token objects by calling "getToken" count number of times std::vector<Job> jobs; for (size_t i = 0; i < count; ++i) { auto token = dispatcher.getToken(); jobs.push_back(Job(std::move(token), singleInputValueToProcess); }
- User calls commit() on the dispatcher to indicate that no new tokens will be issued for this batch dispatcher.commit();
- Use any single threaded executor that will process the jobs
- On each execution (fiber) preprocess a single "Job" that has been moved in from the original vector "jobs". This way if the preprocessing throws the Job object being processed is destroyed and so is the token.
- On each execution (fiber) call the dispatch on the token auto future = job.token.dispatch(job.input);
- Save the future returned so that eventually you can wait on the results ResultT result; try { result = future.value(); // future.hasValue() is true } catch (...) { // future.hasException() is true <DO whatever="" you="" want="" in="" case="" of="" error>=""> } }
NOTES:
Definition at line 168 of file AtomicBatchDispatcher.h.