proxygen
AtomicBatchDispatcher-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2016-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 namespace folly {
17 namespace fibers {
18 
19 template <typename InputT, typename ResultT>
21  DispatchBaton(DispatchFunctionT&& dispatchFunction)
22  : expectedCount_(0), dispatchFunction_(std::move(dispatchFunction)) {}
23 
25  fulfillPromises();
26  }
27 
28  void reserve(size_t numEntries) {
29  optEntries_.reserve(numEntries);
30  }
31 
33  exceptionWrapper_ = std::move(exWrapper);
34  }
35 
36  void setExpectedCount(size_t expectedCount) {
37  assert(expectedCount_ == 0 || !"expectedCount_ being set more than once");
38  expectedCount_ = expectedCount;
39  optEntries_.resize(expectedCount_);
40  }
41 
42  Future<ResultT> getFutureResult(InputT&& input, size_t sequenceNumber) {
43  if (sequenceNumber >= optEntries_.size()) {
44  optEntries_.resize(sequenceNumber + 1);
45  }
46  folly::Optional<Entry>& optEntry = optEntries_[sequenceNumber];
47  assert(!optEntry || !"Multiple inputs have the same token sequence number");
48  optEntry = Entry(std::move(input));
49  return optEntry->promise.getFuture();
50  }
51 
52  private:
53  void setExceptionResults(const folly::exception_wrapper& exceptionWrapper) {
54  for (auto& optEntry : optEntries_) {
55  if (optEntry) {
56  optEntry->promise.setException(exceptionWrapper);
57  }
58  }
59  }
60 
61  void setExceptionResults(std::exception_ptr eptr) {
62  auto exceptionWrapper = exception_wrapper(eptr);
63  return setExceptionResults(exceptionWrapper);
64  }
65 
66  template <typename TException>
68  const TException& ex,
69  std::exception_ptr eptr = std::exception_ptr()) {
70  auto exceptionWrapper =
71  eptr ? exception_wrapper(eptr, ex) : exception_wrapper(ex);
72  return setExceptionResults(exceptionWrapper);
73  }
74 
75  void fulfillPromises() {
76  try {
77  // If an error message is set, set all promises to exception with message
78  if (exceptionWrapper_) {
79  return setExceptionResults(exceptionWrapper_);
80  }
81 
82  // Validate entries count same as expectedCount_
83  assert(
84  optEntries_.size() == expectedCount_ ||
85  !"Entries vector did not have expected size");
86  std::vector<size_t> vecTokensNotDispatched;
87  for (size_t i = 0; i < expectedCount_; ++i) {
88  if (!optEntries_[i]) {
89  vecTokensNotDispatched.push_back(i);
90  }
91  }
92  if (!vecTokensNotDispatched.empty()) {
93  return setExceptionResults(ABDTokenNotDispatchedException(
94  detail::createABDTokenNotDispatchedExMsg(vecTokensNotDispatched)));
95  }
96 
97  // Create the inputs vector
98  std::vector<InputT> inputs;
99  inputs.reserve(expectedCount_);
100  for (auto& optEntry : optEntries_) {
101  inputs.emplace_back(std::move(optEntry->input));
102  }
103 
104  // Call the user provided batch dispatch function to get all results
105  // and make sure that we have the expected number of results returned
106  auto results = dispatchFunction_(std::move(inputs));
107  if (results.size() != expectedCount_) {
108  return setExceptionResults(
110  expectedCount_, results.size())));
111  }
112 
113  // Fulfill the promises with the results from the batch dispatch
114  for (size_t i = 0; i < expectedCount_; ++i) {
115  optEntries_[i]->promise.setValue(std::move(results[i]));
116  }
117  } catch (const std::exception& ex) {
118  // Set exceptions thrown when executing the user provided dispatch func
119  return setExceptionResults(ex, std::current_exception());
120  } catch (...) {
121  // Set exceptions thrown when executing the user provided dispatch func
122  return setExceptionResults(std::current_exception());
123  }
124  }
125 
126  struct Entry {
127  InputT input;
129 
131  : input(std::move(other.input)), promise(std::move(other.promise)) {}
132 
134  input = std::move(other.input);
135  promise = std::move(other.promise);
136  return *this;
137  }
138 
139  explicit Entry(InputT&& input_) : input(std::move(input_)) {}
140  };
141 
144  std::vector<folly::Optional<Entry>> optEntries_;
146 };
147 
148 template <typename InputT, typename ResultT>
150  std::shared_ptr<DispatchBaton> baton,
151  size_t sequenceNumber)
152  : baton_(std::move(baton)), sequenceNumber_(sequenceNumber) {}
153 
154 template <typename InputT, typename ResultT>
156  return sequenceNumber_;
157 }
158 
159 template <typename InputT, typename ResultT>
161  InputT input) {
162  auto baton = std::move(baton_);
163  if (!baton) {
164  throw ABDUsageException(
165  "Dispatch called more than once on the same Token object");
166  }
167  return baton->getFutureResult(std::move(input), sequenceNumber_);
168 }
169 
170 template <typename InputT, typename ResultT>
172  DispatchFunctionT&& dispatchFunc)
173  : numTokensIssued_(0),
174  baton_(std::make_shared<DispatchBaton>(std::move(dispatchFunc))) {}
175 
176 template <typename InputT, typename ResultT>
178  if (baton_) {
179  // Set error here rather than throw because we do not want to throw from
180  // the destructor of AtomicBatchDispatcher
181  baton_->setExceptionWrapper(
182  folly::make_exception_wrapper<ABDCommitNotCalledException>());
183  commit();
184  }
185 }
186 
187 template <typename InputT, typename ResultT>
189  if (!baton_) {
190  throw ABDUsageException("Cannot call reserve(....) after calling commit()");
191  }
192  baton_->reserve(numEntries);
193 }
194 
195 template <typename InputT, typename ResultT>
197  if (!baton_) {
198  throw ABDUsageException("Cannot issue more tokens after calling commit()");
199  }
200  return Token(baton_, numTokensIssued_++);
201 }
202 
203 template <typename InputT, typename ResultT>
205  auto baton = std::move(baton_);
206  if (!baton) {
207  throw ABDUsageException(
208  "Cannot call commit() more than once on the same dispatcher");
209  }
210  baton->setExpectedCount(numTokensIssued_);
211 }
212 
213 template <typename InputT, typename ResultT>
215  folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
216  size_t initialCapacity) {
217  auto abd = AtomicBatchDispatcher<InputT, ResultT>(std::move(dispatchFunc));
218  if (initialCapacity) {
219  abd.reserve(initialCapacity);
220  }
221  return abd;
222 }
223 
224 } // namespace fibers
225 } // namespace folly
Future< ResultT > getFutureResult(InputT &&input, size_t sequenceNumber)
void setExceptionResults(const TException &ex, std::exception_ptr eptr=std::exception_ptr())
AtomicBatchDispatcher< InputT, ResultT > createAtomicBatchDispatcher(folly::Function< std::vector< ResultT >(std::vector< InputT > &&)> dispatchFunc, size_t initialCapacity)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
Token(std::shared_ptr< DispatchBaton > baton, size_t sequenceNumber)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void setExceptionWrapper(folly::exception_wrapper &&exWrapper)
requires E e noexcept(noexcept(s.error(std::move(e))))
std::shared_ptr< DispatchBaton > baton_
AtomicBatchDispatcher(DispatchFunctionT &&dispatchFunc)
std::string createABDTokenNotDispatchedExMsg(const std::vector< size_t > &vecTokensNotDispatched)
void setExceptionResults(const folly::exception_wrapper &exceptionWrapper)
std::string createUnexpectedNumResultsABDUsageExMsg(size_t numExpectedResults, size_t numActualResults)