proxygen
flow_receiver.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
19 
20 namespace folly {
21 namespace pushmi {
22 
23 template <class PE, class PV, class E, class... VN>
25  bool done_ = false;
26  bool started_ = false;
27  union data {
28  void* pobj_ = nullptr;
29  char buffer_[sizeof(std::tuple<VN...>)]; // can hold V in-situ
30  } data_{};
31  template <class Wrapped>
32  static constexpr bool insitu() {
33  return sizeof(Wrapped) <= sizeof(data::buffer_) &&
35  }
36  struct vtable {
37  static void s_op(data&, data*) {}
38  static void s_done(data&) {}
39  static void s_error(data&, E) noexcept { std::terminate(); }
40  static void s_value(data&, VN...) {}
42  void (*op_)(data&, data*) = vtable::s_op;
43  void (*done_)(data&) = vtable::s_done;
44  void (*error_)(data&, E) noexcept = vtable::s_error;
45  void (*value_)(data&, VN...) = vtable::s_value;
47  };
48  static constexpr vtable const noop_ {};
49  vtable const* vptr_ = &noop_;
50  template <class Wrapped>
52  struct s {
53  static void op(data& src, data* dst) {
54  if (dst)
55  dst->pobj_ = std::exchange(src.pobj_, nullptr);
56  delete static_cast<Wrapped const*>(src.pobj_);
57  }
58  static void done(data& src) {
59  set_done(*static_cast<Wrapped*>(src.pobj_));
60  }
61  static void error(data& src, E e) noexcept {
62  set_error(*static_cast<Wrapped*>(src.pobj_), std::move(e));
63  }
64  static void value(data& src, VN... vn) {
65  set_value(*static_cast<Wrapped*>(src.pobj_), std::move(vn)...);
66  }
67  static void starting(data& src, any_receiver<PE, PV> up) {
68  set_starting(*static_cast<Wrapped*>(src.pobj_), std::move(up));
69  }
70  };
71  static const vtable vtbl{s::op, s::done, s::error, s::value, s::starting};
72  data_.pobj_ = new Wrapped(std::move(obj));
73  vptr_ = &vtbl;
74  }
75  template <class Wrapped>
77  struct s {
78  static void op(data& src, data* dst) {
79  if (dst)
80  new (dst->buffer_) Wrapped(
81  std::move(*static_cast<Wrapped*>((void*)src.buffer_)));
82  static_cast<Wrapped const*>((void*)src.buffer_)->~Wrapped();
83  }
84  static void done(data& src) {
85  set_done(*static_cast<Wrapped*>((void*)src.buffer_));
86  }
87  static void error(data& src, E e) noexcept {
88  set_error(*static_cast<Wrapped*>((void*)src.buffer_), std::move(e));
89  }
90  static void value(data& src, VN... vn) {
91  set_value(*static_cast<Wrapped*>((void*)src.buffer_), std::move(vn)...);
92  }
93  static void starting(data& src, any_receiver<PE, PV> up) {
94  set_starting(*static_cast<Wrapped*>((void*)src.buffer_), std::move(up));
95  }
96  };
97  static const vtable vtbl{s::op, s::done, s::error, s::value, s::starting};
98  new (data_.buffer_) Wrapped(std::move(obj));
99  vptr_ = &vtbl;
100  }
101  template <class T, class U = std::decay_t<T>>
102  using wrapped_t =
104 public:
106 
107  any_flow_receiver() = default;
109  that.vptr_->op_(that.data_, &data_);
110  std::swap(that.vptr_, vptr_);
111  }
112  PUSHMI_TEMPLATE(class Wrapped)
113  (requires FlowUpTo<wrapped_t<Wrapped>, any_receiver<PE, PV>> &&
114  ReceiveValue<wrapped_t<Wrapped>, VN...> &&
115  ReceiveError<wrapped_t<Wrapped>, E>)
116  explicit any_flow_receiver(Wrapped obj) noexcept(insitu<Wrapped>())
119  vptr_->op_(data_, nullptr);
120  }
122  this->~any_flow_receiver();
123  new ((void*)this) any_flow_receiver(std::move(that));
124  return *this;
125  }
126  void value(VN... vn) {
127  if (!started_) {std::abort();}
128  if (done_){ return; }
129  vptr_->value_(data_, std::move(vn)...);
130  }
131  void error(E e) noexcept {
132  if (!started_) {std::abort();}
133  if (done_){ return; }
134  done_ = true;
135  vptr_->error_(data_, std::move(e));
136  }
137  void done() {
138  if (!started_) {std::abort();}
139  if (done_){ return; }
140  done_ = true;
141  vptr_->done_(data_);
142  }
143 
145  if (started_) {std::abort();}
146  started_ = true;
147  vptr_->starting_(data_, std::move(up));
148  }
149 };
150 
151 // Class static definitions:
152 template <class PE, class PV, class E, class... VN>
153 constexpr typename any_flow_receiver<PE, PV, E, VN...>::vtable const
154  any_flow_receiver<PE, PV, E, VN...>::noop_;
155 
156 template <class VF, class EF, class DF, class StrtF>
157 #if __cpp_concepts
158  requires Invocable<DF&>
159 #endif
160 class flow_receiver<VF, EF, DF, StrtF> {
161  bool done_ = false;
162  bool started_ = false;
163  VF nf_;
164  EF ef_;
165  DF df_;
166  StrtF strtf_;
167 
168  public:
170 
171  static_assert(
172  !detail::is_v<VF, on_error_fn>,
173  "the first parameter is the value implementation, but on_error{} was passed");
174  static_assert(
175  !detail::is_v<EF, on_value_fn>,
176  "the second parameter is the error implementation, but on_value{} was passed");
177 
178  flow_receiver() = default;
179  constexpr explicit flow_receiver(VF nf)
180  : flow_receiver(std::move(nf), EF{}, DF{}) {}
181  constexpr explicit flow_receiver(EF ef)
182  : flow_receiver(VF{}, std::move(ef), DF{}) {}
183  constexpr explicit flow_receiver(DF df)
184  : flow_receiver(VF{}, EF{}, std::move(df)) {}
185  constexpr flow_receiver(EF ef, DF df)
186  : nf_(), ef_(std::move(ef)), df_(std::move(df)) {}
187  constexpr flow_receiver(
188  VF nf,
189  EF ef,
190  DF df = DF{},
191  StrtF strtf = StrtF{})
192  : nf_(std::move(nf)),
193  ef_(std::move(ef)),
194  df_(std::move(df)),
195  strtf_(std::move(strtf)) {}
196  PUSHMI_TEMPLATE (class V)
197  (requires Invocable<VF&, V>)
198  void value(V&& v) {
199  if (!started_) {std::abort();}
200  if (done_){ return; }
201  nf_((V&&) v);
202  }
203  PUSHMI_TEMPLATE (class E)
204  (requires Invocable<EF&, E>)
205  void error(E e) noexcept {
206  static_assert(NothrowInvocable<EF&, E>, "error function must be noexcept");
207  if (!started_) {std::abort();}
208  if (done_){ return; }
209  done_ = true;
210  ef_(std::move(e));
211  }
212  void done() {
213  if (!started_) {std::abort();}
214  if (done_){ return; }
215  done_ = true;
216  df_();
217  }
218  PUSHMI_TEMPLATE(class Up)
219  (requires Invocable<StrtF&, Up&&>)
220  void starting(Up&& up) {
221  if (started_) {std::abort();}
222  started_ = true;
223  strtf_( (Up &&) up);
224  }
225 };
226 
227 template<
228  PUSHMI_TYPE_CONSTRAINT(Receiver) Data,
229  class DVF,
230  class DEF,
231  class DDF,
232  class DStrtF>
233 #if __cpp_concepts
234  requires Invocable<DDF&, Data&>
235 #endif
236 class flow_receiver<Data, DVF, DEF, DDF, DStrtF> {
237  bool done_ = false;
238  bool started_ = false;
240  DVF nf_;
241  DEF ef_;
242  DDF df_;
243  DStrtF strtf_;
244 
245  public:
247 
248  static_assert(
249  !detail::is_v<DVF, on_error_fn>,
250  "the first parameter is the value implementation, but on_error{} was passed");
251  static_assert(
252  !detail::is_v<DEF, on_value_fn>,
253  "the second parameter is the error implementation, but on_value{} was passed");
254 
255  constexpr explicit flow_receiver(Data d)
256  : flow_receiver(std::move(d), DVF{}, DEF{}, DDF{}) {}
257  constexpr flow_receiver(Data d, DDF df)
258  : data_(std::move(d)), nf_(), ef_(), df_(df) {}
259  constexpr flow_receiver(Data d, DEF ef, DDF df = DDF{})
260  : data_(std::move(d)), nf_(), ef_(ef), df_(df) {}
261  constexpr flow_receiver(
262  Data d,
263  DVF nf,
264  DEF ef = DEF{},
265  DDF df = DDF{},
266  DStrtF strtf = DStrtF{})
267  : data_(std::move(d)),
268  nf_(nf),
269  ef_(ef),
270  df_(df),
271  strtf_(std::move(strtf)) {}
272 
273 
274  Data& data() { return data_; }
275 
276  PUSHMI_TEMPLATE (class V)
277  (requires Invocable<DVF&, Data&, V>)
278  void value(V&& v) {
279  if (!started_) {std::abort();}
280  if (done_){ return; }
281  nf_(data_, (V&&) v);
282  }
283  PUSHMI_TEMPLATE (class E)
284  (requires Invocable<DEF&, Data&, E>)
285  void error(E&& e) noexcept {
286  static_assert(
287  NothrowInvocable<DEF&, Data&, E>, "error function must be noexcept");
288  if (!started_) {std::abort();}
289  if (done_){ return; }
290  done_ = true;
291  ef_(data_, (E&&) e);
292  }
293  void done() {
294  if (!started_) {std::abort();}
295  if (done_){ return; }
296  done_ = true;
297  df_(data_);
298  }
299  PUSHMI_TEMPLATE (class Up)
300  (requires Invocable<DStrtF&, Data&, Up&&>)
301  void starting(Up&& up) {
302  if (started_) {std::abort();}
303  started_ = true;
304  strtf_(data_, (Up &&) up);
305  }
306 };
307 
308 template <>
310  : public flow_receiver<ignoreVF, abortEF, ignoreDF, ignoreStrtF> {
311 };
312 
314  template (class T)
315  concept FlowReceiverDataArg,
316  Receiver<T, is_flow<>> &&
317  not Invocable<T&>
318 );
319 
320 // TODO winnow down the number of make_flow_receiver overloads and deduction
321 // guides here, as was done for make_many.
322 
324 // make_flow_receiver
326  inline auto operator()() const {
327  return flow_receiver<>{};
328  }
329  PUSHMI_TEMPLATE (class VF)
331  lazy::True<>
333  not lazy::FlowReceiverDataArg<VF>)))
334  auto operator()(VF nf) const {
336  std::move(nf)};
337  }
338  template <class... EFN>
341  std::move(ef)};
342  }
343  template <class... DFN>
346  std::move(df)};
347  }
348  PUSHMI_TEMPLATE (class VF, class EF)
350  lazy::True<>
352  not lazy::FlowReceiverDataArg<VF> PUSHMI_AND
353  not lazy::Invocable<EF&>)))
354  auto operator()(VF nf, EF ef) const {
356  std::move(ef)};
357  }
358  PUSHMI_TEMPLATE(class EF, class DF)
360  lazy::True<> PUSHMI_AND
361  lazy::Invocable<DF&>
363  not lazy::FlowReceiverDataArg<EF>)))
364  auto operator()(EF ef, DF df) const {
366  }
367  PUSHMI_TEMPLATE (class VF, class EF, class DF)
369  lazy::Invocable<DF&>
371  not lazy::FlowReceiverDataArg<VF>)))
372  auto operator()(VF nf, EF ef, DF df) const {
374  std::move(ef), std::move(df)};
375  }
376  PUSHMI_TEMPLATE (class VF, class EF, class DF, class StrtF)
378  lazy::Invocable<DF&>
380  not lazy::FlowReceiverDataArg<VF>)))
381  auto operator()(VF nf, EF ef, DF df, StrtF strtf) const {
383  std::move(df), std::move(strtf)};
384  }
385  PUSHMI_TEMPLATE(class Data)
387  lazy::True<> PUSHMI_AND
388  lazy::FlowReceiverDataArg<Data>))
389  auto operator()(Data d) const {
391  std::move(d)};
392  }
393  PUSHMI_TEMPLATE(class Data, class DVF)
395  lazy::True<> PUSHMI_AND
396  lazy::FlowReceiverDataArg<Data>))
397  auto operator()(Data d, DVF nf) const {
399  std::move(d), std::move(nf)};
400  }
401  PUSHMI_TEMPLATE(class Data, class... DEFN)
403  lazy::FlowReceiverDataArg<Data>))
404  auto operator()(Data d, on_error_fn<DEFN...> ef) const {
406  std::move(d), std::move(ef)};
407  }
408  PUSHMI_TEMPLATE(class Data, class... DDFN)
410  lazy::FlowReceiverDataArg<Data>))
411  auto operator()(Data d, on_done_fn<DDFN...> df) const {
412  return flow_receiver<Data, passDVF, passDEF, on_done_fn<DDFN...>, passDStrtF>{
413  std::move(d), std::move(df)};
414  }
415  PUSHMI_TEMPLATE(class Data, class DVF, class DEF)
417  lazy::FlowReceiverDataArg<Data>
419  not lazy::Invocable<DEF&, Data&>)))
420  auto operator()(Data d, DVF nf, DEF ef) const {
422  }
423  PUSHMI_TEMPLATE(class Data, class DEF, class DDF)
425  lazy::FlowReceiverDataArg<Data> PUSHMI_AND
426  lazy::Invocable<DDF&, Data&>))
427  auto operator()(Data d, DEF ef, DDF df) const {
429  std::move(d), std::move(ef), std::move(df)};
430  }
431  PUSHMI_TEMPLATE(class Data, class DVF, class DEF, class DDF)
433  lazy::FlowReceiverDataArg<Data> PUSHMI_AND
434  lazy::Invocable<DDF&, Data&>))
435  auto operator()(Data d, DVF nf, DEF ef, DDF df) const {
437  std::move(nf), std::move(ef), std::move(df)};
438  }
439  PUSHMI_TEMPLATE(class Data, class DVF, class DEF, class DDF, class DStrtF)
441  lazy::FlowReceiverDataArg<Data> PUSHMI_AND
442  lazy::Invocable<DDF&, Data&>))
443  auto operator()(Data d, DVF nf, DEF ef, DDF df, DStrtF strtf) const {
445  std::move(nf), std::move(ef), std::move(df), std::move(strtf)};
446  }
447 } const make_flow_receiver {};
448 
450 // deduction guides
451 #if __cpp_deduction_guides >= 201703
453 
454 PUSHMI_TEMPLATE(class VF)
456  lazy::True<>
458  not lazy::FlowReceiverDataArg<VF>)))
459 flow_receiver(VF) ->
461 
462 template <class... EFN>
465 
466 template <class... DFN>
468  flow_receiver<ignoreVF, abortEF, on_done_fn<DFN...>, ignoreStrtF>;
469 
470 PUSHMI_TEMPLATE(class VF, class EF)
472  lazy::True<>
474  not lazy::FlowReceiverDataArg<VF> PUSHMI_AND
475  not lazy::Invocable<EF&>)))
476 flow_receiver(VF, EF) ->
478 
479 PUSHMI_TEMPLATE(class EF, class DF)
481  lazy::True<> PUSHMI_AND
482  lazy::Invocable<DF&>
484  not lazy::FlowReceiverDataArg<EF>)))
485 flow_receiver(EF, DF) ->
487 
488 PUSHMI_TEMPLATE(class VF, class EF, class DF)
490  lazy::Invocable<DF&>
492  not lazy::FlowReceiverDataArg<VF>)))
493 flow_receiver(VF, EF, DF) ->
495 
496 PUSHMI_TEMPLATE(class VF, class EF, class DF, class StrtF)
498  lazy::Invocable<DF&>
500  not lazy::FlowReceiverDataArg<VF>)))
501 flow_receiver(VF, EF, DF, StrtF) ->
503 
504 PUSHMI_TEMPLATE(class Data)
506  lazy::True<> PUSHMI_AND
507  lazy::FlowReceiverDataArg<Data>))
508 flow_receiver(Data d) ->
510 
511 PUSHMI_TEMPLATE(class Data, class DVF)
513  lazy::True<> PUSHMI_AND
514  lazy::FlowReceiverDataArg<Data>))
515 flow_receiver(Data d, DVF nf) ->
517 
518 PUSHMI_TEMPLATE(class Data, class... DEFN)
520  lazy::FlowReceiverDataArg<Data>))
523 
524 PUSHMI_TEMPLATE(class Data, class... DDFN)
526  lazy::FlowReceiverDataArg<Data>))
528  flow_receiver<Data, passDVF, passDEF, on_done_fn<DDFN...>, passDStrtF>;
529 
530 PUSHMI_TEMPLATE(class Data, class DDF)
532  lazy::True<> PUSHMI_AND
533  lazy::FlowReceiverDataArg<Data> PUSHMI_AND
534  lazy::Invocable<DDF&, Data&>))
535 flow_receiver(Data d, DDF) ->
537 
538 PUSHMI_TEMPLATE(class Data, class DVF, class DEF)
540  lazy::FlowReceiverDataArg<Data>
542  not lazy::Invocable<DEF&, Data&>)))
543 flow_receiver(Data d, DVF nf, DEF ef) ->
545 
546 PUSHMI_TEMPLATE(class Data, class DEF, class DDF)
548  lazy::FlowReceiverDataArg<Data> PUSHMI_AND
549  lazy::Invocable<DDF&, Data&>))
550 flow_receiver(Data d, DEF, DDF) ->
552 
553 PUSHMI_TEMPLATE(class Data, class DVF, class DEF, class DDF)
555  lazy::FlowReceiverDataArg<Data> PUSHMI_AND
556  lazy::Invocable<DDF&, Data&>))
557 flow_receiver(Data d, DVF nf, DEF ef, DDF df) ->
559 
560 PUSHMI_TEMPLATE(class Data, class DVF, class DEF, class DDF, class DStrtF)
562  lazy::FlowReceiverDataArg<Data> PUSHMI_AND
563  lazy::Invocable<DDF&, Data&> ))
564 flow_receiver(Data d, DVF nf, DEF ef, DDF df, DStrtF strtf) ->
566 #endif
567 
568 template<>
570 
571 
572 } // namespace pushmi
573 } // namespace folly
any_flow_receiver & operator=(any_flow_receiver &&that) noexcept
static void s_value(data &, VN...)
Definition: flow_receiver.h:40
static void s_starting(data &, any_receiver< PE, PV >)
Definition: flow_receiver.h:41
void starting(any_receiver< PE, PV > up)
requires Invocable< EF &, E > void error(E e) noexcept
requires PUSHMI_EXP(lazy::True<> PUSHMI_AND lazy::FlowReceiverDataArg< Data >)) auto operator()(Data d) const
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
requires Invocable< DVF &, Data &, V > void value(V &&v)
STL namespace.
PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error
void(* error_)(data &, E) noexcept
Definition: flow_receiver.h:44
static constexpr vtable const noop_
Definition: flow_receiver.h:48
folly::std T
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
auto operator()(on_done_fn< DFN... > df) const
any_flow_receiver(Wrapped obj, std::false_type)
Definition: flow_receiver.h:51
property_set_insert_t< properties_t< Data >, property_set< is_receiver<>, is_flow<>>> properties
requires E e noexcept(noexcept(s.error(std::move(e))))
requires And< SemiMovable< VN >... > &&SemiMovable< E > auto error(E e)
Definition: error.h:48
typename std::enable_if_t< PropertySet< PS0 > &&PropertySet< PS1 >, detail::property_set_insert< PS0, PS1 >>::type property_set_insert_t
Definition: properties.h:153
bool_constant< true > true_type
Definition: gtest-port.h:2210
constexpr flow_receiver(Data d, DVF nf, DEF ef=DEF{}, DDF df=DDF{}, DStrtF strtf=DStrtF{})
requires PUSHMI_EXP(lazy::True<> PUSHMI_BROKEN_SUBSUMPTION(PUSHMI_ANDnot lazy::FlowReceiverDataArg< VF >))) auto operator()(VF nf) const
char buffer_[sizeof(std::tuple< VN... >)]
Definition: flow_receiver.h:29
static constexpr bool insitu()
Definition: flow_receiver.h:32
requires Invocable< StrtF &, Up && > void starting(Up &&up)
#define PUSHMI_AND
Definition: concept_def.h:424
any_flow_receiver(Wrapped obj, std::true_type) noexcept
Definition: flow_receiver.h:76
constexpr flow_receiver(VF nf, EF ef, DF df=DF{}, StrtF strtf=StrtF{})
requires FlowUpTo< wrapped_t< Wrapped >, any_receiver< PE, PV > > &&ReceiveValue< wrapped_t< Wrapped >, VN... > &&ReceiveError< wrapped_t< Wrapped >, E > any_flow_receiver(Wrapped obj) noexcept(insitu< Wrapped >())
requires Invocable< DEF &, Data &, E > void error(E &&e) noexcept
PUSHMI_CONCEPT_DEF(template(class PS) concept Cardinality, has_cardinality_v< PS >)
any_flow_receiver(any_flow_receiver &&that) noexcept
void(* starting_)(data &, any_receiver< PE, PV >)
Definition: flow_receiver.h:46
#define concept
static const char *const value
Definition: Conv.cpp:50
requires Invocable< VF &, V > void value(V &&v)
#define PUSHMI_INLINE_VAR
Definition: concept_def.h:60
union folly::pushmi::any_flow_receiver::data data_
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::make_flow_receiver_fn make_flow_receiver
#define PUSHMI_EXP(...)
Definition: concept_def.h:423
std::integral_constant< bool, B > bool_
Definition: concept_def.h:443
PUSHMI_INLINE_VAR constexpr __adl::set_starting_fn set_starting
requires requires(detail::apply_impl(std::declval< F >(), std::declval< Tuple >(), detail::tupidxs< Tuple >{}))) const expr decltype(auto) apply(F &&f
T exchange(T &obj, U &&new_value)
Definition: Utility.h:120
PUSHMI_TEMPLATE(class E=std::exception_ptr, class Wrapped)(requires Sender< detail
Definition: executor.h:102
auto operator()(on_error_fn< EFN... > ef) const
std::enable_if_t<!std::is_same< U, any_flow_receiver >::value, U > wrapped_t
decltype(pe) PE
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
static set< string > s
const
Definition: upload.py:398
static void s_op(data &, data *)
Definition: flow_receiver.h:37
bool_constant< false > false_type
Definition: gtest-port.h:2209
#define PUSHMI_TYPE_CONSTRAINT(...)
Definition: concept_def.h:420
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
Definition: F14TestUtil.h:414
constexpr flow_receiver(Data d, DEF ef, DDF df=DDF{})
#define PUSHMI_BROKEN_SUBSUMPTION(...)
Definition: concept_def.h:419
requires Invocable< DStrtF &, Data &, Up && > void starting(Up &&up)
static void s_error(data &, E) noexcept
Definition: flow_receiver.h:39
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done