proxygen
inline_executor_flow_many Struct Reference

Public Types

using properties = mi::property_set< mi::is_sender<>, mi::is_flow<>, mi::is_fifo_sequence<>, mi::is_maybe_blocking<>, mi::is_many<>>
 

Public Member Functions

 inline_executor_flow_many ()
 
 inline_executor_flow_many (std::atomic< int > &c)
 
auto executor ()
 
template<class Out >
void submit (Out out)
 

Public Attributes

std::atomic< int > * counter
 

Detailed Description

Definition at line 213 of file PushmiBenchmarks.cpp.

Member Typedef Documentation

Constructor & Destructor Documentation

inline_executor_flow_many::inline_executor_flow_many ( )
inline

Definition at line 214 of file PushmiBenchmarks.cpp.

214 : counter(nullptr) {}
std::atomic< int > * counter
inline_executor_flow_many::inline_executor_flow_many ( std::atomic< int > &  c)
inline

Definition at line 215 of file PushmiBenchmarks.cpp.

215 : counter(&c) {}
std::atomic< int > * counter
char c

Member Function Documentation

auto inline_executor_flow_many::executor ( )
inline

Definition at line 226 of file PushmiBenchmarks.cpp.

226  {
227  return inline_time_executor{};
228  }
template<class Out >
void inline_executor_flow_many::submit ( Out  out)
inline

Definition at line 230 of file PushmiBenchmarks.cpp.

References counter, folly::data(), MAKE, folly::gen::move, folly::pushmi::__adl::noexcept(), s, folly::pushmi::set_done, folly::pushmi::set_starting, folly::pushmi::set_value, and stop().

230  {
231  // boolean cancellation
232  struct producer {
233  producer(Out out, bool s) : out(std::move(out)), stop(s) {}
234  Out out;
235  std::atomic<bool> stop;
236  };
237  auto p = std::make_shared<producer>(std::move(out), false);
238 
239  struct Data : mi::receiver<> {
240  explicit Data(std::shared_ptr<producer> p) : p(std::move(p)) {}
241  std::shared_ptr<producer> p;
242  };
243 
244  auto up = mi::MAKE(receiver)(
245  Data{p},
246  [counter = this->counter](auto& data, auto requested) {
247  if (requested < 1) {
248  return;
249  }
250  // this is re-entrant
251  while (!data.p->stop && --requested >= 0 &&
252  (!counter || --*counter >= 0)) {
254  data.p->out,
255  !!counter ? inline_executor_flow_many{*counter}
257  }
258  if (!counter || *counter == 0) {
259  ::mi::set_done(data.p->out);
260  }
261  },
262  [](auto& data, auto e) noexcept {
263  data.p->stop.store(true);
264  ::mi::set_done(data.p->out);
265  },
266  [](auto& data) {
267  data.p->stop.store(true);
268  ::mi::set_done(data.p->out);
269  });
270 
271  // pass reference for cancellation.
272  ::mi::set_starting(p->out, std::move(up));
273  }
std::atomic< int > * counter
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
requires E e noexcept(noexcept(s.error(std::move(e))))
#define MAKE(x)
Definition: pool.h:29
static void stop()
PUSHMI_INLINE_VAR constexpr __adl::set_starting_fn set_starting
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
static set< string > s
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done

Member Data Documentation

std::atomic<int>* inline_executor_flow_many::counter

Definition at line 217 of file PushmiBenchmarks.cpp.


The documentation for this struct was generated from the following file: