19 template <
typename InputT,
typename ResultT>
22 : expectedCount_(0), dispatchFunction_(
std::
move(dispatchFunction)) {}
29 optEntries_.reserve(numEntries);
37 assert(expectedCount_ == 0 || !
"expectedCount_ being set more than once");
38 expectedCount_ = expectedCount;
39 optEntries_.resize(expectedCount_);
43 if (sequenceNumber >= optEntries_.size()) {
44 optEntries_.resize(sequenceNumber + 1);
47 assert(!optEntry || !
"Multiple inputs have the same token sequence number");
49 return optEntry->promise.getFuture();
54 for (
auto& optEntry : optEntries_) {
56 optEntry->promise.setException(exceptionWrapper);
63 return setExceptionResults(exceptionWrapper);
66 template <
typename TException>
69 std::exception_ptr eptr = std::exception_ptr()) {
70 auto exceptionWrapper =
72 return setExceptionResults(exceptionWrapper);
78 if (exceptionWrapper_) {
79 return setExceptionResults(exceptionWrapper_);
84 optEntries_.size() == expectedCount_ ||
85 !
"Entries vector did not have expected size");
86 std::vector<size_t> vecTokensNotDispatched;
87 for (
size_t i = 0;
i < expectedCount_; ++
i) {
88 if (!optEntries_[
i]) {
89 vecTokensNotDispatched.push_back(i);
92 if (!vecTokensNotDispatched.empty()) {
98 std::vector<InputT> inputs;
99 inputs.reserve(expectedCount_);
100 for (
auto& optEntry : optEntries_) {
101 inputs.emplace_back(
std::move(optEntry->input));
106 auto results = dispatchFunction_(
std::move(inputs));
107 if (results.size() != expectedCount_) {
108 return setExceptionResults(
110 expectedCount_, results.size())));
114 for (
size_t i = 0;
i < expectedCount_; ++
i) {
115 optEntries_[
i]->promise.setValue(
std::move(results[
i]));
117 }
catch (
const std::exception& ex) {
119 return setExceptionResults(ex, std::current_exception());
122 return setExceptionResults(std::current_exception());
131 : input(
std::
move(other.input)), promise(
std::
move(other.promise)) {}
148 template <
typename InputT,
typename ResultT>
150 std::shared_ptr<DispatchBaton> baton,
151 size_t sequenceNumber)
152 :
baton_(
std::
move(baton)), sequenceNumber_(sequenceNumber) {}
154 template <
typename InputT,
typename ResultT>
159 template <
typename InputT,
typename ResultT>
165 "Dispatch called more than once on the same Token object");
170 template <
typename InputT,
typename ResultT>
176 template <
typename InputT,
typename ResultT>
181 baton_->setExceptionWrapper(
182 folly::make_exception_wrapper<ABDCommitNotCalledException>());
187 template <
typename InputT,
typename ResultT>
192 baton_->reserve(numEntries);
195 template <
typename InputT,
typename ResultT>
203 template <
typename InputT,
typename ResultT>
208 "Cannot call commit() more than once on the same dispatcher");
213 template <
typename InputT,
typename ResultT>
215 folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
216 size_t initialCapacity) {
218 if (initialCapacity) {
void reserve(size_t numEntries)
Future< ResultT > getFutureResult(InputT &&input, size_t sequenceNumber)
DispatchFunctionT dispatchFunction_
void setExceptionResults(const TException &ex, std::exception_ptr eptr=std::exception_ptr())
AtomicBatchDispatcher< InputT, ResultT > createAtomicBatchDispatcher(folly::Function< std::vector< ResultT >(std::vector< InputT > &&)> dispatchFunc, size_t initialCapacity)
constexpr detail::Map< Move > move
Token(std::shared_ptr< DispatchBaton > baton, size_t sequenceNumber)
—— Concurrent Priority Queue Implementation ——
void setExceptionWrapper(folly::exception_wrapper &&exWrapper)
requires E e noexcept(noexcept(s.error(std::move(e))))
std::shared_ptr< DispatchBaton > baton_
AtomicBatchDispatcher(DispatchFunctionT &&dispatchFunc)
void setExceptionResults(std::exception_ptr eptr)
folly::exception_wrapper exceptionWrapper_
void setExpectedCount(size_t expectedCount)
std::string createABDTokenNotDispatchedExMsg(const std::vector< size_t > &vecTokensNotDispatched)
void setExceptionResults(const folly::exception_wrapper &exceptionWrapper)
DispatchBaton(DispatchFunctionT &&dispatchFunction)
Entry(Entry &&other) noexcept
std::vector< folly::Optional< Entry > > optEntries_
std::string createUnexpectedNumResultsABDUsageExMsg(size_t numExpectedResults, size_t numActualResults)
void reserve(size_t numEntries)
Future< ResultT > dispatch(InputT input)
folly::Promise< ResultT > promise
std::shared_ptr< DispatchBaton > baton_
size_t sequenceNumber() const
Entry & operator=(Entry &&other) noexcept