proxygen
ConcurrentFlowManySender Class Reference
Inheritance diagram for ConcurrentFlowManySender:
testing::Test

Protected Types

using TNT = mi::invoke_result_t< decltype(make_time), mi::time_source<> &, NT & >
 

Protected Member Functions

void reset ()
 
void join ()
 
void cancellation_test (std::chrono::system_clock::time_point at)
 
- Protected Member Functions inherited from testing::Test
 Test ()
 
virtual void SetUp ()
 
virtual void TearDown ()
 
 Test ()
 
virtual void SetUp ()
 
virtual void TearDown ()
 
 Test ()
 
virtual void SetUp ()
 
virtual void TearDown ()
 

Protected Attributes

NT ntproduce_ {mi::new_thread()}
 
mi::time_source timeproduce_ {}
 
TNT tnt_ {make_time(timeproduce_, ntproduce_)}
 
NT ntcancel_ {mi::new_thread()}
 
mi::time_source timecancel_ {}
 
TNT tcncl_ {make_time(timecancel_, ntcancel_)}
 
std::atomic< int > signals_ {0}
 
std::atomic< int > terminal_ {0}
 
std::atomic< int > cancel_ {0}
 
std::chrono::system_clock::time_point at_ {mi::now(tnt_) + 100ms}
 

Additional Inherited Members

- Public Types inherited from testing::Test
typedef internal::SetUpTestCaseFunc SetUpTestCaseFunc
 
typedef internal::TearDownTestCaseFunc TearDownTestCaseFunc
 
typedef internal::SetUpTestCaseFunc SetUpTestCaseFunc
 
typedef internal::TearDownTestCaseFunc TearDownTestCaseFunc
 
typedef internal::SetUpTestCaseFunc SetUpTestCaseFunc
 
typedef internal::TearDownTestCaseFunc TearDownTestCaseFunc
 
- Public Member Functions inherited from testing::Test
virtual ~Test ()
 
virtual ~Test ()
 
virtual ~Test ()
 
- Static Public Member Functions inherited from testing::Test
static void SetUpTestCase ()
 
static void TearDownTestCase ()
 
static bool HasFatalFailure ()
 
static bool HasNonfatalFailure ()
 
static bool HasFailure ()
 
static void RecordProperty (const std::string &key, const std::string &value)
 
static void RecordProperty (const std::string &key, int value)
 
static void SetUpTestCase ()
 
static void TearDownTestCase ()
 
static bool HasFatalFailure ()
 
static bool HasNonfatalFailure ()
 
static bool HasFailure ()
 
static void RecordProperty (const std::string &key, const std::string &value)
 
static void RecordProperty (const std::string &key, int value)
 
static void SetUpTestCase ()
 
static void TearDownTestCase ()
 
static bool HasFatalFailure ()
 
static bool HasNonfatalFailure ()
 
static bool HasFailure ()
 
static void RecordProperty (const std::string &key, const std::string &value)
 
static void RecordProperty (const std::string &key, int value)
 

Detailed Description

Definition at line 130 of file FlowManyTest.cpp.

Member Typedef Documentation

Definition at line 132 of file FlowManyTest.cpp.

Member Function Documentation

void ConcurrentFlowManySender::cancellation_test ( std::chrono::system_clock::time_point  at)
inlineprotected

Definition at line 146 of file FlowManyTest.cpp.

References data, f, MAKE, folly::gen::move, folly::pushmi::__adl::noexcept(), folly::pushmi::on_done(), folly::pushmi::on_error(), folly::pushmi::on_starting(), folly::pushmi::on_value(), s, folly::pushmi::set_done, folly::pushmi::set_starting, folly::pushmi::set_value, stop(), submit, folly::pushmi::operators::submit_at, and folly::fibers::yield().

146  {
147  auto f = mi::MAKE(flow_many_sender)([&](auto out) {
148  using Out = decltype(out);
149 
150  // boolean cancellation
151  struct producer {
152  producer(Out out, TNT tnt, bool s)
153  : out(std::move(out)), tnt(std::move(tnt)), stop(s) {}
154  Out out;
155  TNT tnt;
156  std::atomic<bool> stop;
157  };
158  auto p = std::make_shared<producer>(std::move(out), tnt_, false);
159 
160  struct Data : mi::receiver<> {
161  explicit Data(std::shared_ptr<producer> p) : p(std::move(p)) {}
162  std::shared_ptr<producer> p;
163  };
164 
165  auto up = mi::MAKE(receiver)(
166  Data{p},
167  [&](auto& data, auto requested) {
168  signals_ += 1000000;
169  if (requested < 1) {
170  return;
171  }
172  // submit work to happen later
173  data.p->tnt | op::submit_at(at_, [p = data.p](auto) {
174  // check boolean to select signal
175  if (!p->stop) {
176  ::mi::set_value(p->out, 42);
177  ::mi::set_done(p->out);
178  }
179  });
180  },
181  [&](auto& data, auto) noexcept {
182  signals_ += 100000;
183  data.p->stop.store(true);
184  data.p->tnt |
185  op::submit([p = data.p](auto) { ::mi::set_done(p->out); });
186  ++cancel_;
187  },
188  [&](auto& data) {
189  signals_ += 10000;
190  data.p->stop.store(true);
191  data.p->tnt |
192  op::submit([p = data.p](auto) { ::mi::set_done(p->out); });
193  ++cancel_;
194  });
195 
196  tnt_ | op::submit([p, sup = std::move(up)](auto) mutable {
197  // pass reference for cancellation.
198  ::mi::set_starting(p->out, std::move(sup));
199  });
200  });
201  f |
202  op::submit(mi::MAKE(flow_receiver)(
203  mi::on_value([&](int) { signals_ += 100; }),
204  mi::on_error([&](auto) noexcept {
205  signals_ += 1000;
206  ++terminal_;
207  }),
208  mi::on_done([&]() {
209  signals_ += 1;
210  ++terminal_;
211  }),
212  // stop producer before it is scheduled to run
213  mi::on_starting([&, at](auto up) {
214  signals_ += 10;
215  mi::set_value(up, 1);
216  tcncl_ | op::submit_at(at, [up = std::move(up)](auto) mutable {
217  ::mi::set_done(up);
218  });
219  })));
220 
221  while (terminal_ == 0 || cancel_ == 0) {
223  }
224  }
auto f
auto on_value(Fns...fns) -> on_value_fn< Fns... >
Definition: boosters.h:262
std::atomic< int > cancel_
std::atomic< int > terminal_
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
std::chrono::system_clock::time_point at_
auto on_error(Fns...fns) -> on_error_fn< Fns... >
Definition: boosters.h:273
auto on_starting(Fns...fns) -> on_starting_fn< Fns... >
Definition: boosters.h:296
requires E e noexcept(noexcept(s.error(std::move(e))))
static void stop()
PUSHMI_INLINE_VAR constexpr detail::submit_at_fn submit_at
Definition: submit.h:387
#define MAKE(x)
PUSHMI_INLINE_VAR constexpr __adl::set_starting_fn set_starting
std::atomic< int > signals_
auto on_done(Fn fn) -> on_done_fn< Fn >
Definition: boosters.h:285
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
static set< string > s
mi::invoke_result_t< decltype(make_time), mi::time_source<> &, NT & > TNT
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done
void ConcurrentFlowManySender::join ( )
inlineprotected

Definition at line 141 of file FlowManyTest.cpp.

141  {
142  timeproduce_.join();
143  timecancel_.join();
144  }
mi::time_source timeproduce_
mi::time_source timecancel_
void ConcurrentFlowManySender::reset ( )
inlineprotected

Definition at line 134 of file FlowManyTest.cpp.

References now().

134  {
135  at_ = mi::now(tnt_) + 100ms;
136  signals_ = 0;
137  terminal_ = 0;
138  cancel_ = 0;
139  }
std::atomic< int > cancel_
std::atomic< int > terminal_
std::chrono::steady_clock::time_point now()
std::chrono::system_clock::time_point at_
std::atomic< int > signals_

Member Data Documentation

std::chrono::system_clock::time_point ConcurrentFlowManySender::at_ {mi::now(tnt_) + 100ms}
protected

Definition at line 237 of file FlowManyTest.cpp.

std::atomic<int> ConcurrentFlowManySender::cancel_ {0}
protected

Definition at line 236 of file FlowManyTest.cpp.

NT ConcurrentFlowManySender::ntcancel_ {mi::new_thread()}
protected

Definition at line 230 of file FlowManyTest.cpp.

NT ConcurrentFlowManySender::ntproduce_ {mi::new_thread()}
protected

Definition at line 226 of file FlowManyTest.cpp.

std::atomic<int> ConcurrentFlowManySender::signals_ {0}
protected

Definition at line 234 of file FlowManyTest.cpp.

TNT ConcurrentFlowManySender::tcncl_ {make_time(timecancel_, ntcancel_)}
protected

Definition at line 232 of file FlowManyTest.cpp.

std::atomic<int> ConcurrentFlowManySender::terminal_ {0}
protected

Definition at line 235 of file FlowManyTest.cpp.

mi::time_source ConcurrentFlowManySender::timecancel_ {}
protected

Definition at line 231 of file FlowManyTest.cpp.

mi::time_source ConcurrentFlowManySender::timeproduce_ {}
protected

Definition at line 227 of file FlowManyTest.cpp.

TNT ConcurrentFlowManySender::tnt_ {make_time(timeproduce_, ntproduce_)}
protected

Definition at line 228 of file FlowManyTest.cpp.


The documentation for this class was generated from the following file: