proxygen
flow_many_sender.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
21 
22 namespace folly {
23 namespace pushmi {
24 
25 template <class PE, class PV, class E, class... VN>
27  union data {
28  void* pobj_ = nullptr;
29  char buffer_[sizeof(std::tuple<VN...>)]; // can hold a V in-situ
30  } data_{};
31  template <class Wrapped>
32  static constexpr bool insitu() {
33  return sizeof(Wrapped) <= sizeof(data::buffer_) &&
35  }
36  struct vtable {
37  static void s_op(data&, data*) {}
38  static any_executor<E> s_executor(data&) { return {}; }
40  void (*op_)(data&, data*) = vtable::s_op;
42  void (*submit_)(data&, any_flow_receiver<PE, PV, E, VN...>) = vtable::s_submit;
43  };
44  static constexpr vtable const noop_ {};
45  vtable const* vptr_ = &noop_;
46  template <class Wrapped>
48  struct s {
49  static void op(data& src, data* dst) {
50  if (dst)
51  dst->pobj_ = std::exchange(src.pobj_, nullptr);
52  delete static_cast<Wrapped const*>(src.pobj_);
53  }
54  static any_executor<E> executor(data& src) {
55  return any_executor<E>{
56  ::folly::pushmi::executor(*static_cast<Wrapped*>(src.pobj_))};
57  }
58  static void submit(data& src, any_flow_receiver<PE, PV, E, VN...> out) {
60  *static_cast<Wrapped*>(src.pobj_), std::move(out));
61  }
62  };
63  static const vtable vtbl{s::op, s::executor, s::submit};
64  data_.pobj_ = new Wrapped(std::move(obj));
65  vptr_ = &vtbl;
66  }
67  template <class Wrapped>
70  struct s {
71  static void op(data& src, data* dst) {
72  if (dst)
73  new (dst->buffer_) Wrapped(
74  std::move(*static_cast<Wrapped*>((void*)src.buffer_)));
75  static_cast<Wrapped const*>((void*)src.buffer_)->~Wrapped();
76  }
77  static any_executor<E> executor(data& src) {
79  *static_cast<Wrapped*>((void*)src.buffer_))};
80  }
81  static void submit(data& src, any_flow_receiver<PE, PV, E, VN...> out) {
83  *static_cast<Wrapped*>((void*)src.buffer_), std::move(out));
84  }
85  };
86  static const vtable vtbl{s::op, s::executor, s::submit};
87  new (data_.buffer_) Wrapped(std::move(obj));
88  vptr_ = &vtbl;
89  }
90  template <class T, class U = std::decay_t<T>>
91  using wrapped_t =
93  public:
95 
96  any_flow_many_sender() = default;
99  that.vptr_->op_(that.data_, &data_);
100  std::swap(that.vptr_, vptr_);
101  }
102  PUSHMI_TEMPLATE (class Wrapped)
103  (requires FlowSender<wrapped_t<Wrapped>, is_many<>>)
104  explicit any_flow_many_sender(Wrapped obj) noexcept(insitu<Wrapped>())
107  vptr_->op_(data_, nullptr);
108  }
110  this->~any_flow_many_sender();
111  new ((void*)this) any_flow_many_sender(std::move(that));
112  return *this;
113  }
115  return vptr_->executor_(data_);
116  }
118  vptr_->submit_(data_, std::move(out));
119  }
120 };
121 
122 // Class static definitions:
123 template <class PE, class PV, class E, class... VN>
124 constexpr typename any_flow_many_sender<PE, PV, E, VN...>::vtable const
125  any_flow_many_sender<PE, PV, E, VN...>::noop_;
126 
127 template <class SF, class EXF>
128 class flow_many_sender<SF, EXF> {
129  SF sf_;
130  EXF exf_;
131 
132  public:
133  using properties = property_set<is_sender<>, is_flow<>, is_many<>>;
134 
135  constexpr flow_many_sender() = default;
136  constexpr explicit flow_many_sender(SF sf)
137  : sf_(std::move(sf)) {}
138  constexpr flow_many_sender(SF sf, EXF exf)
139  : sf_(std::move(sf)), exf_(std::move(exf)) {}
140 
141  auto executor() { return exf_(); }
142  PUSHMI_TEMPLATE(class Out)
143  (requires FlowReceiver<Out> && Invocable<SF&, Out>)
144  void submit(Out out) {
145  sf_(std::move(out));
146  }
147 };
148 
149 template <PUSHMI_TYPE_CONSTRAINT(Sender<is_many<>, is_flow<>>) Data, class DSF, class DEXF>
150 class flow_many_sender<Data, DSF, DEXF> {
152  DSF sf_;
153  DEXF exf_;
154 
155  public:
157 
158  constexpr flow_many_sender() = default;
159  constexpr explicit flow_many_sender(Data data)
160  : data_(std::move(data)) {}
161  constexpr flow_many_sender(Data data, DSF sf)
162  : data_(std::move(data)), sf_(std::move(sf)) {}
163  constexpr flow_many_sender(Data data, DSF sf, DEXF exf)
164  : data_(std::move(data)), sf_(std::move(sf)), exf_(std::move(exf)) {}
165 
166  auto executor() { return exf_(data_); }
167  PUSHMI_TEMPLATE(class Out)
168  (requires PUSHMI_EXP(lazy::FlowReceiver<Out> PUSHMI_AND
169  lazy::Invocable<DSF&, Data&, Out>))
170  void submit(Out out) {
171  sf_(data_, std::move(out));
172  }
173 };
174 
175 template <>
177  : public flow_many_sender<ignoreSF, trampolineEXF> {
178 public:
179  flow_many_sender() = default;
180 };
181 
183 // make_flow_many_sender
185  inline auto operator()() const {
187  }
188  PUSHMI_TEMPLATE(class SF)
189  (requires True<> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>))
190  auto operator()(SF sf) const {
192  }
193  PUSHMI_TEMPLATE(class SF, class EXF)
194  (requires True<> && Invocable<EXF&> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>))
195  auto operator()(SF sf, EXF exf) const {
197  }
198  PUSHMI_TEMPLATE(class Data)
199  (requires True<> && Sender<Data, is_many<>, is_flow<>>)
200  auto operator()(Data d) const {
202  }
203  PUSHMI_TEMPLATE(class Data, class DSF)
204  (requires Sender<Data, is_many<>, is_flow<>>)
205  auto operator()(Data d, DSF sf) const {
207  }
208  PUSHMI_TEMPLATE(class Data, class DSF, class DEXF)
209  (requires Sender<Data, is_many<>, is_flow<>> && Invocable<DEXF&, Data&>)
210  auto operator()(Data d, DSF sf, DEXF exf) const {
212  }
213 } const make_flow_many_sender {};
214 
216 // deduction guides
217 #if __cpp_deduction_guides >= 201703
219 
220 PUSHMI_TEMPLATE(class SF)
221  (requires True<> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>))
223 
224 PUSHMI_TEMPLATE(class SF, class EXF)
225  (requires True<> && Invocable<EXF&> PUSHMI_BROKEN_SUBSUMPTION(&& not Sender<SF>))
227 
228 PUSHMI_TEMPLATE(class Data)
229  (requires True<> && Sender<Data, is_many<>, is_flow<>>)
231 
232 PUSHMI_TEMPLATE(class Data, class DSF)
233  (requires Sender<Data, is_many<>, is_flow<>>)
235 
236 PUSHMI_TEMPLATE(class Data, class DSF, class DEXF)
237  (requires Sender<Data, is_many<>, is_flow<>> && Invocable<DEXF&, Data&>)
239 #endif
240 
241 template<>
244 
245 // // TODO constrain me
246 // template <class V, class E = std::exception_ptr, Sender Wrapped>
247 // auto erase_cast(Wrapped w) {
248 // return flow_many_sender<V, E>{std::move(w)};
249 // }
250 
251 } // namespace pushmi
252 } // namespace folly
constexpr flow_many_sender(Data data, DSF sf, DEXF exf)
any_executor< E >(* executor_)(data &)
std::true_type True
Definition: TypeList.h:82
void(* submit_)(data &, any_flow_receiver< PE, PV, E, VN... >)
char buffer_[sizeof(std::tuple< VN... >)]
property_set_insert_t< properties_t< Data >, property_set< is_sender<>, is_flow<>, is_many<>>> properties
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
typename std::enable_if_t< PropertySet< PS0 > &&PropertySet< PS1 >, detail::property_set_insert< PS0, PS1 >>::type property_set_insert_t
Definition: properties.h:153
static void s_submit(data &, any_flow_receiver< PE, PV, E, VN... >)
bool_constant< true > true_type
Definition: gtest-port.h:2210
static any_executor< E > s_executor(data &)
static constexpr vtable const noop_
#define PUSHMI_AND
Definition: concept_def.h:424
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::make_flow_many_sender_fn make_flow_many_sender
any_flow_many_sender & operator=(any_flow_many_sender &&that) noexcept
any_flow_many_sender(Wrapped obj, std::true_type) noexcept
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
requires FlowSender< wrapped_t< Wrapped >, is_many<> > any_flow_many_sender(Wrapped obj) noexcept(insitu< Wrapped >())
static const char *const value
Definition: Conv.cpp:50
#define PUSHMI_INLINE_VAR
Definition: concept_def.h:60
std::integral_constant< bool, B > bool_
Definition: concept_def.h:443
requires requires(detail::apply_impl(std::declval< F >(), std::declval< Tuple >(), detail::tupidxs< Tuple >{}))) const expr decltype(auto) apply(F &&f
T exchange(T &obj, U &&new_value)
Definition: Utility.h:120
PUSHMI_TEMPLATE(class E=std::exception_ptr, class Wrapped)(requires Sender< detail
Definition: executor.h:102
union folly::pushmi::any_flow_many_sender::data data_
any_flow_many_sender(Wrapped obj, std::false_type)
decltype(pe) PE
requires PUSHMI_EXP(lazy::FlowReceiver< Out > PUSHMI_AND lazy::Invocable< DSF &, Data &, Out >)) void submit(Out out)
static set< string > s
const
Definition: upload.py:398
requires FlowReceiver< Out > &&Invocable< SF &, Out > void submit(Out out)
any_flow_many_sender(any_flow_many_sender &&that) noexcept
bool_constant< false > false_type
Definition: gtest-port.h:2209
void submit(any_flow_receiver< PE, PV, E, VN... > out)
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
Definition: F14TestUtil.h:414
#define PUSHMI_BROKEN_SUBSUMPTION(...)
Definition: concept_def.h:419
std::enable_if_t<!std::is_same< U, any_flow_many_sender >::value, U > wrapped_t