proxygen
folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT > Class Template Reference

#include <BatchDispatcher.h>

Classes

struct  DispatchState
 

Public Types

using ValueBatchT = std::vector< ValueT >
 
using ResultBatchT = std::vector< ResultT >
 
using PromiseBatchT = std::vector< folly::Promise< ResultT >>
 
using DispatchFunctionT = folly::Function< ResultBatchT(ValueBatchT &&)>
 

Public Member Functions

 BatchDispatcher (ExecutorT &executor, DispatchFunctionT dispatchFunc)
 
Future< ResultT > add (ValueT value)
 

Static Private Member Functions

static void dispatchFunctionWrapper (DispatchState &state)
 

Private Attributes

ExecutorT & executor_
 
std::shared_ptr< DispatchStatestate_
 

Detailed Description

template<typename ValueT, typename ResultT, typename ExecutorT>
class folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >

BatchDispatcher is useful for batching values while doing I/O. For example, if you are launching multiple tasks which take a single id and each task fetches from database, you can use BatchDispatcher to batch those ids and do a single query requesting all those ids.

To use this, create a BatchDispatcher with a dispatch function which consumes a vector of values and returns a vector of results in the same order. Add values to BatchDispatcher using add function, which returns a future to the result set in your dispatch function.

Implementation Logic:

  • using FiberManager as executor example, user creates a thread_local BatchDispatcher, on which user calls add(value).
  • add(value) adds the value in a vector and also schedules a new task(BatchDispatchFunction) which will read the vector of values and call user's DispatchFunction() on it.
  • assuming the executor queues all the task and runs them in order of their creation time, then BatchDispatcher will run later than all the tasks already created. Depending on this, all the values were added in these tasks would be picked up by BatchDispatchFunction()

Example:

Note:

  • This only works with executors which runs the tasks in order of their schedule time.
  • BatchDispatcher is not thread safe.

Definition at line 72 of file BatchDispatcher.h.

Member Typedef Documentation

template<typename ValueT , typename ResultT , typename ExecutorT >
using folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::DispatchFunctionT = folly::Function<ResultBatchT(ValueBatchT&&)>

Definition at line 77 of file BatchDispatcher.h.

template<typename ValueT , typename ResultT , typename ExecutorT >
using folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::PromiseBatchT = std::vector<folly::Promise<ResultT>>

Definition at line 76 of file BatchDispatcher.h.

template<typename ValueT , typename ResultT , typename ExecutorT >
using folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::ResultBatchT = std::vector<ResultT>

Definition at line 75 of file BatchDispatcher.h.

template<typename ValueT , typename ResultT , typename ExecutorT >
using folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::ValueBatchT = std::vector<ValueT>

Definition at line 74 of file BatchDispatcher.h.

Constructor & Destructor Documentation

template<typename ValueT , typename ResultT , typename ExecutorT >
folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::BatchDispatcher ( ExecutorT &  executor,
DispatchFunctionT  dispatchFunc 
)
inline

Definition at line 79 of file BatchDispatcher.h.

81  state_(new DispatchState(std::move(dispatchFunc))) {}
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
std::shared_ptr< DispatchState > state_

Member Function Documentation

template<typename ValueT , typename ResultT , typename ExecutorT >
Future<ResultT> folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::add ( ValueT  value)
inline

Definition at line 83 of file BatchDispatcher.h.

References folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::dispatchFunctionWrapper(), folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::executor_, folly::Promise< T >::getFuture(), folly::gen::move, and folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::state_.

Referenced by doubleBatchInnerDispatch(), doubleBatchOuterDispatch(), and singleBatchDispatch().

83  {
84  if (state_->values.empty()) {
86  }
87 
88  folly::Promise<ResultT> resultPromise;
89  auto resultFuture = resultPromise.getFuture();
90 
91  state_->values.emplace_back(std::move(value));
92  state_->promises.emplace_back(std::move(resultPromise));
93 
94  return resultFuture;
95  }
static void dispatchFunctionWrapper(DispatchState &state)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
Future< T > getFuture()
Definition: Promise-inl.h:97
std::shared_ptr< DispatchState > state_
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
state
Definition: http_parser.c:272
template<typename ValueT , typename ResultT , typename ExecutorT >
static void folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::dispatchFunctionWrapper ( DispatchState state)
inlinestaticprivate

Definition at line 107 of file BatchDispatcher.h.

References folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::DispatchState::dispatchFunc, i, folly::gen::move, folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::DispatchState::promises, and folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::DispatchState::values.

Referenced by folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::add().

107  {
109  PromiseBatchT promises;
110  state.values.swap(values);
111  state.promises.swap(promises);
112 
113  try {
114  auto results = state.dispatchFunc(std::move(values));
115  if (results.size() != promises.size()) {
116  throw std::logic_error(
117  "Unexpected number of results returned from dispatch function");
118  }
119 
120  for (size_t i = 0; i < promises.size(); i++) {
121  promises[i].setValue(std::move(results[i]));
122  }
123  } catch (const std::exception& ex) {
124  for (size_t i = 0; i < promises.size(); i++) {
125  promises[i].setException(
126  exception_wrapper(std::current_exception(), ex));
127  }
128  } catch (...) {
129  for (size_t i = 0; i < promises.size(); i++) {
130  promises[i].setException(exception_wrapper(std::current_exception()));
131  }
132  }
133  }
std::vector< folly::Promise< ResultT >> PromiseBatchT
std::vector< ValueT > ValueBatchT
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
state
Definition: http_parser.c:272
std::vector< int > values(1'000)

Member Data Documentation

template<typename ValueT , typename ResultT , typename ExecutorT >
ExecutorT& folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::executor_
private
template<typename ValueT , typename ResultT , typename ExecutorT >
std::shared_ptr<DispatchState> folly::fibers::BatchDispatcher< ValueT, ResultT, ExecutorT >::state_
private

The documentation for this class was generated from the following file: