proxygen
ConcurrentFlowSingleSender Class Reference
Inheritance diagram for ConcurrentFlowSingleSender:
testing::Test

Protected Types

using TNT = mi::invoke_result_t< decltype(make_time), mi::time_source<> &, NT & >
 

Protected Member Functions

void reset ()
 
void join ()
 
template<class MakeTokens >
void cancellation_test (std::chrono::system_clock::time_point at, MakeTokens make_tokens)
 
- Protected Member Functions inherited from testing::Test
 Test ()
 
virtual void SetUp ()
 
virtual void TearDown ()
 
 Test ()
 
virtual void SetUp ()
 
virtual void TearDown ()
 
 Test ()
 
virtual void SetUp ()
 
virtual void TearDown ()
 

Protected Attributes

NT ntproduce_ {mi::new_thread()}
 
mi::time_source timeproduce_ {}
 
TNT tnt_ {make_time(timeproduce_, ntproduce_)}
 
NT ntcancel_ {mi::new_thread()}
 
mi::time_source timecancel_ {}
 
TNT tcncl_ {make_time(timecancel_, ntcancel_)}
 
std::atomic< int > signals_ {0}
 
std::atomic< int > terminal_ {0}
 
std::atomic< int > cancel_ {0}
 
std::chrono::system_clock::time_point at_ {mi::now(tnt_) + 100ms}
 

Additional Inherited Members

- Public Types inherited from testing::Test
typedef internal::SetUpTestCaseFunc SetUpTestCaseFunc
 
typedef internal::TearDownTestCaseFunc TearDownTestCaseFunc
 
typedef internal::SetUpTestCaseFunc SetUpTestCaseFunc
 
typedef internal::TearDownTestCaseFunc TearDownTestCaseFunc
 
typedef internal::SetUpTestCaseFunc SetUpTestCaseFunc
 
typedef internal::TearDownTestCaseFunc TearDownTestCaseFunc
 
- Public Member Functions inherited from testing::Test
virtual ~Test ()
 
virtual ~Test ()
 
virtual ~Test ()
 
- Static Public Member Functions inherited from testing::Test
static void SetUpTestCase ()
 
static void TearDownTestCase ()
 
static bool HasFatalFailure ()
 
static bool HasNonfatalFailure ()
 
static bool HasFailure ()
 
static void RecordProperty (const std::string &key, const std::string &value)
 
static void RecordProperty (const std::string &key, int value)
 
static void SetUpTestCase ()
 
static void TearDownTestCase ()
 
static bool HasFatalFailure ()
 
static bool HasNonfatalFailure ()
 
static bool HasFailure ()
 
static void RecordProperty (const std::string &key, const std::string &value)
 
static void RecordProperty (const std::string &key, int value)
 
static void SetUpTestCase ()
 
static void TearDownTestCase ()
 
static bool HasFatalFailure ()
 
static bool HasNonfatalFailure ()
 
static bool HasFailure ()
 
static void RecordProperty (const std::string &key, const std::string &value)
 
static void RecordProperty (const std::string &key, int value)
 

Detailed Description

Definition at line 130 of file FlowTest.cpp.

Member Typedef Documentation

Definition at line 132 of file FlowTest.cpp.

Member Function Documentation

template<class MakeTokens >
void ConcurrentFlowSingleSender::cancellation_test ( std::chrono::system_clock::time_point  at,
MakeTokens  make_tokens 
)
inlineprotected

Definition at line 147 of file FlowTest.cpp.

References data, f, folly::pushmi::lock_both(), MAKE, folly::gen::move, folly::pushmi::__adl::noexcept(), folly::pushmi::on_done(), folly::pushmi::on_error(), folly::pushmi::on_starting(), folly::pushmi::on_value(), folly::pushmi::set_done, stop(), submit, folly::pushmi::operators::submit_at, tokens, and folly::fibers::yield().

149  {
150  auto f = mi::MAKE(flow_single_sender)([&, make_tokens](auto out) {
151  // boolean cancellation
152  bool stop = false;
153  auto set_stop = [](auto& stop) {
154  if (!!stop) {
155  *stop = true;
156  }
157  };
158  auto tokens = make_tokens(stop, set_stop);
159 
160  using Stopper = decltype(tokens.second);
161  struct Data : mi::receiver<> {
162  explicit Data(Stopper stopper) : stopper(std::move(stopper)) {}
163  Stopper stopper;
164  };
165  auto up = mi::MAKE(receiver)(
166  Data{std::move(tokens.second)},
167  [&](auto& data, auto) noexcept {
168  signals_ += 100000;
169  ++cancel_;
170  auto both = lock_both(data.stopper);
171  (*(both.first))(both.second);
172  },
173  mi::on_done([&](auto& data) {
174  signals_ += 10000;
175  ++cancel_;
176  auto both = lock_both(data.stopper);
177  (*(both.first))(both.second);
178  }));
179 
180  // make all the signals come from the same thread
181  tnt_ |
182  op::submit([&,
183  stoppee = std::move(tokens.first),
184  up_not_a_shadow_howtoeven = std::move(up),
185  out = std::move(out)](auto tnt) mutable {
186  // pass reference for cancellation.
187  ::mi::set_starting(out, std::move(up_not_a_shadow_howtoeven));
188 
189  // submit work to happen later
190  tnt |
191  op::submit_at(
192  at_,
193  [stoppee = std::move(stoppee),
194  out = std::move(out)](auto) mutable {
195  // check boolean to select signal
196  auto both = lock_both(stoppee);
197  if (!!both.first && !*(both.first)) {
198  ::mi::set_value(out, 42);
199  ::mi::set_done(out);
200  } else {
201  // cancellation is not an error
202  ::mi::set_done(out);
203  }
204  });
205  });
206  });
207 
208  f |
209  op::submit(
210  mi::on_value([&](int) { signals_ += 100; }),
211  mi::on_error([&](auto) noexcept {
212  signals_ += 1000;
213  ++terminal_;
214  }),
215  mi::on_done([&]() {
216  signals_ += 1;
217  ++terminal_;
218  }),
219  // stop producer before it is scheduled to run
220  mi::on_starting([&, at](auto up) {
221  signals_ += 10;
222  tcncl_ | op::submit_at(at, [up = std::move(up)](auto) mutable {
223  ::mi::set_done(up);
224  });
225  }));
226 
227  while (terminal_ == 0 || cancel_ == 0) {
229  }
230  }
locked_entangled_pair< T, Dual > lock_both(entangled< T, Dual > &e)
Definition: entangle.h:257
auto f
auto on_value(Fns...fns) -> on_value_fn< Fns... >
Definition: boosters.h:262
std::atomic< int > terminal_
Definition: FlowTest.cpp:241
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
auto on_error(Fns...fns) -> on_error_fn< Fns... >
Definition: boosters.h:273
auto on_starting(Fns...fns) -> on_starting_fn< Fns... >
Definition: boosters.h:296
requires E e noexcept(noexcept(s.error(std::move(e))))
#define MAKE(x)
Definition: FlowTest.cpp:42
static void stop()
PUSHMI_INLINE_VAR constexpr detail::submit_at_fn submit_at
Definition: submit.h:387
auto on_done(Fn fn) -> on_done_fn< Fn >
Definition: boosters.h:285
std::atomic< int > signals_
Definition: FlowTest.cpp:240
std::atomic< int > cancel_
Definition: FlowTest.cpp:242
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
static const char tokens[256]
Definition: http_parser.c:184
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done
void ConcurrentFlowSingleSender::join ( )
inlineprotected

Definition at line 141 of file FlowTest.cpp.

141  {
142  timeproduce_.join();
143  timecancel_.join();
144  }
mi::time_source timeproduce_
Definition: FlowTest.cpp:233
mi::time_source timecancel_
Definition: FlowTest.cpp:237
void ConcurrentFlowSingleSender::reset ( )
inlineprotected

Definition at line 134 of file FlowTest.cpp.

References now().

134  {
135  at_ = mi::now(tnt_) + 100ms;
136  signals_ = 0;
137  terminal_ = 0;
138  cancel_ = 0;
139  }
std::atomic< int > terminal_
Definition: FlowTest.cpp:241
std::chrono::steady_clock::time_point now()
std::atomic< int > signals_
Definition: FlowTest.cpp:240
std::atomic< int > cancel_
Definition: FlowTest.cpp:242
std::chrono::system_clock::time_point at_
Definition: FlowTest.cpp:243

Member Data Documentation

std::chrono::system_clock::time_point ConcurrentFlowSingleSender::at_ {mi::now(tnt_) + 100ms}
protected

Definition at line 243 of file FlowTest.cpp.

std::atomic<int> ConcurrentFlowSingleSender::cancel_ {0}
protected

Definition at line 242 of file FlowTest.cpp.

NT ConcurrentFlowSingleSender::ntcancel_ {mi::new_thread()}
protected

Definition at line 236 of file FlowTest.cpp.

NT ConcurrentFlowSingleSender::ntproduce_ {mi::new_thread()}
protected

Definition at line 232 of file FlowTest.cpp.

std::atomic<int> ConcurrentFlowSingleSender::signals_ {0}
protected

Definition at line 240 of file FlowTest.cpp.

TNT ConcurrentFlowSingleSender::tcncl_ {make_time(timecancel_, ntcancel_)}
protected

Definition at line 238 of file FlowTest.cpp.

std::atomic<int> ConcurrentFlowSingleSender::terminal_ {0}
protected

Definition at line 241 of file FlowTest.cpp.

mi::time_source ConcurrentFlowSingleSender::timecancel_ {}
protected

Definition at line 237 of file FlowTest.cpp.

mi::time_source ConcurrentFlowSingleSender::timeproduce_ {}
protected

Definition at line 233 of file FlowTest.cpp.

TNT ConcurrentFlowSingleSender::tnt_ {make_time(timeproduce_, ntproduce_)}
protected

Definition at line 234 of file FlowTest.cpp.


The documentation for this class was generated from the following file: