proxygen
submit.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
24 #include <functional>
25 
26 namespace folly {
27 namespace pushmi {
28 namespace detail {
29 namespace submit_detail {
30 
32  template(class In, class... AN)(
33  concept AutoSenderTo)(In, AN...),
34  Sender<In>&& SenderTo<In, receiver_type_t<In, AN...>>
35 );
37  template(class In, class... AN)(
38  concept AutoConstrainedSenderTo)(In, AN...),
39  ConstrainedSenderTo<In, receiver_type_t<In, AN...>>
40 );
42  template(class In, class... AN)(
43  concept AutoTimeSenderTo)(In, AN...),
44  TimeSenderTo<In, receiver_type_t<In, AN...>>
45 );
46 } // namespace submit_detail
47 
48 struct submit_fn {
49  private:
50  // TODO - only move, move-only types..
51  // if out can be copied, then submit can be called multiple
52  // times..
53  template <class... AN>
54  struct fn {
55  std::tuple<AN...> args_;
56  PUSHMI_TEMPLATE(class In)
57  (requires submit_detail::AutoSenderTo<In, AN...>)
58  In operator()(In in) {
59  auto out{
62  return in;
63  }
64  };
65 
66  public:
67  template <class... AN>
68  auto operator()(AN&&... an) const {
69  return submit_fn::fn<AN...>{std::tuple<AN...>{(AN &&) an...}};
70  }
71 };
72 
73 struct submit_at_fn {
74  private:
75  template <class TP, class... AN>
76  struct fn {
77  TP at_;
78  std::tuple<AN...> args_;
79  PUSHMI_TEMPLATE(class In)
80  (requires submit_detail::AutoTimeSenderTo<In, AN...>)
81  In operator()(In in) {
82  auto out{
85  return in;
86  }
87  };
88 
89  public:
90  PUSHMI_TEMPLATE(class TP, class... AN)
91  (requires Regular<TP>)
92  auto operator()(TP at, AN... an) const {
93  return submit_at_fn::fn<TP, AN...>{std::move(at),
94  std::tuple<AN...>{(AN &&) an...}};
95  }
96 };
97 
99  private:
100  template <class D, class... AN>
101  struct fn {
103  std::tuple<AN...> args_;
104  PUSHMI_TEMPLATE(class In)
105  (requires submit_detail::AutoTimeSenderTo<In, AN...>)
106  In operator()(In in) {
107  // TODO - only move, move-only types..
108  // if out can be copied, then submit can be called multiple
109  // times..
110  auto out{
112  auto at = ::folly::pushmi::now(in) + std::move(after_);
114  return in;
115  }
116  };
117 
118  public:
119  PUSHMI_TEMPLATE(class D, class... AN)
120  (requires Regular<D>)
121  auto operator()(D after, AN... an) const {
122  return submit_after_fn::fn<D, AN...>{std::move(after),
123  std::tuple<AN...>{(AN &&) an...}};
124  }
125 };
126 
128  private:
129  struct lock_state {
130  bool done{false};
131  std::atomic<int> nested{0};
133  std::condition_variable signaled;
134  };
135  template <class Out>
137  PUSHMI_TEMPLATE(class Exec)
138  (requires Sender<Exec>&& Executor<Exec>)
141  : state_(state), ex_(std::move(ex)) {}
143  Exec ex_;
144 
145  template <class U>
147 
148  PUSHMI_TEMPLATE(class Ex)
149  (requires Sender<Ex>&& Executor<Ex>&&
150  detail::is_v<Ex, test_for_this>)
151  static auto make(lock_state*, Ex ex) {
152  return ex;
153  }
154  PUSHMI_TEMPLATE(class Ex)
155  (requires Sender<Ex>&& Executor<Ex> &&
156  not detail::is_v<Ex, test_for_this>)
157  static auto make(lock_state* state, Ex ex) {
158  return nested_executor_impl<Ex>{state, ex};
159  }
160 
162 
163  auto executor() {
164  return make(state_, ::folly::pushmi::executor(ex_));
165  }
166 
167  PUSHMI_TEMPLATE(class... ZN)
168  (requires Constrained<Exec>)
169  auto top() {
171  }
172 
173  PUSHMI_TEMPLATE(class CV, class Out)
174  (requires Receiver<Out>&& Constrained<Exec>)
175  void submit(CV cv, Out out) {
176  ++state_->nested;
178  ex_, cv, nested_receiver_impl<Out>{state_, std::move(out)});
179  }
180 
181  PUSHMI_TEMPLATE(class Out)
182  (requires Receiver<Out> && not Constrained<Exec>)
183  void submit(Out out) {
184  ++state_->nested;
186  ex_, nested_receiver_impl<Out>{state_, std::move(out)});
187  }
188  };
189  template <class Out>
190  struct nested_receiver_impl {
192  : state_(state), out_(std::move(out)) {}
194  Out out_;
195 
197 
198  template <class V>
199  void value(V&& v) {
200  std::exception_ptr e;
201  using executor_t = remove_cvref_t<V>;
202  auto n = nested_executor_impl<executor_t>::make(state_, (V &&) v);
203  set_value(out_, any_executor_ref<>{n});
204  }
205  template <class E>
206  void error(E&& e) noexcept {
207  set_error(out_, (E &&) e);
208  if (--state_->nested == 0) {
209  state_->signaled.notify_all();
210  }
211  }
212  void done() {
213  std::exception_ptr e;
214  try {
215  set_done(out_);
216  } catch (...) {
217  e = std::current_exception();
218  }
219  if (--state_->nested == 0) {
220  state_->signaled.notify_all();
221  }
222  if (e) {
223  std::rethrow_exception(e);
224  }
225  }
226  };
228  PUSHMI_TEMPLATE(class Exec)
230  auto operator()(lock_state* state, Exec ex) const {
231  return nested_executor_impl<Exec>::make(state, std::move(ex));
232  }
233  };
234  struct on_value_impl {
236  PUSHMI_TEMPLATE(class Out, class Value)
237  (requires Executor<std::decay_t<Value>>&& ReceiveValue<
238  Out,
241  lock_state*,
242  std::decay_t<Value>>>)
243  void operator()(Out out, Value&& v) const {
244  ++state_->nested;
245  set_value(out, nested_executor_impl_fn{}(state_, (Value &&) v));
246  if (--state_->nested == 0) {
247  std::unique_lock<std::mutex> guard{state_->lock};
248  state_->signaled.notify_all();
249  }
250  }
251  PUSHMI_TEMPLATE(class Out, class... VN)
252  (requires True<>&& ReceiveValue<Out, VN...> &&
253  not(sizeof...(VN) == 1 && And<Executor<std::decay_t<VN>>...>))
254  void operator()(Out out, VN&&... vn) const {
255  set_value(out, (VN &&) vn...);
256  }
257  };
258  struct on_error_impl {
259  lock_state* state_;
260  PUSHMI_TEMPLATE(class Out, class E)
261  (requires ReceiveError<Out, E>)
262  void operator()(Out out, E e) const noexcept {
263  set_error(out, std::move(e));
264  std::unique_lock<std::mutex> guard{state_->lock};
265  state_->done = true;
266  state_->signaled.notify_all();
267  }
268  };
269  struct on_done_impl {
270  lock_state* state_;
271  PUSHMI_TEMPLATE(class Out)
272  (requires Receiver<Out>)
273  void operator()(Out out) const {
274  set_done(out);
275  std::unique_lock<std::mutex> guard{state_->lock};
276  state_->done = true;
277  state_->signaled.notify_all();
278  }
279  };
280 
281  template <class In>
282  struct receiver_impl {
283  PUSHMI_TEMPLATE(class... AN)
284  (requires Sender<In>)
285  auto operator()(
286  lock_state* state,
287  std::tuple<AN...> args) const {
288  return ::folly::pushmi::detail::receiver_from_fn<In>()(
289  std::move(args),
290  on_value_impl{state},
291  on_error_impl{state},
292  on_done_impl{state});
293  }
294  };
295  template <class In>
296  struct submit_impl {
297  PUSHMI_TEMPLATE(class Out)
298  (requires Receiver<Out>&& SenderTo<In, Out>)
299  void operator()(In& in, Out out) const {
301  }
302  };
303  // TODO - only move, move-only types..
304  // if out can be copied, then submit can be called multiple
305  // times..
306  template <class... AN>
307  struct fn {
308  std::tuple<AN...> args_;
309 
310  PUSHMI_TEMPLATE(class In)
311  (requires Sender<In>&& Invocable<
313  In&,
316  lock_state*,
317  std::tuple<AN...>&&>> &&
318  not AlwaysBlocking<In>)
319  In operator()(In in) {
320  lock_state state{};
321 
322  auto make = receiver_impl<In>{};
323  auto submit = submit_impl<In>{};
324  submit(in, make(&state, std::move(args_)));
325 
326  std::unique_lock<std::mutex> guard{state.lock};
327  state.signaled.wait(
328  guard, [&] { return state.done && state.nested.load() == 0; });
329  return in;
330  }
331  };
332 
333  public:
334  template <class... AN>
335  auto operator()(AN... an) const {
336  return blocking_submit_fn::fn<AN...>{std::tuple<AN...>{(AN &&) an...}};
337  }
338 };
339 
340 template <class T>
341 struct get_fn {
342  private:
343  struct on_value_impl {
345  template <class... TN>
346  void operator()(TN&&... tn) const {
347  *result_ = T{(TN &&) tn...};
348  }
349  };
350  struct on_error_impl {
352  template <class E>
353  void operator()(E e) const noexcept {
354  *ep_ = std::make_exception_ptr(e);
355  }
356  void operator()(std::exception_ptr ep) const noexcept {
357  *ep_ = ep;
358  }
359  };
360 
361  public:
362  // TODO constrain this better
363  PUSHMI_TEMPLATE(class In)
364  (requires Sender<In>)
365  T operator()(In in) const {
369  on_value_impl{&result_}, on_error_impl{&ep_});
370  using Out = decltype(out);
371  static_assert(
372  SenderTo<In, Out>,
373  "'In' does not deliver value compatible with 'T' to 'Out'");
374  std::conditional_t<AlwaysBlocking<In>, submit_fn, blocking_submit_fn>{}(
375  std::move(out))(std::move(in));
376  if (!!ep_) {
377  std::rethrow_exception(*ep_);
378  }
379  return std::move(*result_);
380  }
381 };
382 
383 } // namespace detail
384 
385 namespace operators {
390 template <class T>
392 } // namespace operators
393 
394 } // namespace pushmi
395 } // namespace folly
PUSHMI_CONCEPT_DEF(template(class In, class...AN)(concept AutoSenderTo)(In, AN...), Sender< In > &&SenderTo< In, receiver_type_t< In, AN... >>)
std::true_type True
Definition: TypeList.h:82
void operator()(std::exception_ptr ep) const noexcept
Definition: submit.h:356
::folly::pushmi::detail::opt< std::exception_ptr > * ep_
Definition: submit.h:351
auto v
std::remove_cv_t< std::remove_reference_t< T >> remove_cvref_t
Definition: traits.h:81
typename receiver_from_fn< In >::template receiver_type< AN... > receiver_type_t
auto operator()(AN &&...an) const
Definition: submit.h:68
std::enable_if_t< PropertySet< __properties_t< property_set_traits< T >>>, __properties_t< property_set_traits< T >>> properties_t
Definition: properties.h:105
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error
folly::std T
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
requires Sender< Ex > &&Executor< Ex > &&static detail::is_v< Ex, test_for_this > auto make(lock_state *, Ex ex)
Definition: submit.h:151
PUSHMI_INLINE_VAR constexpr detail::blocking_submit_fn blocking_submit
Definition: submit.h:389
requires Receiver< Out > &&Constrained< Exec > void submit(CV cv, Out out)
Definition: submit.h:175
#define D(name, bit)
Definition: CpuId.h:145
bool Value(const T &value, M matcher)
requires Sender< Ex > &&Executor< Ex > &&not static detail::is_v< Ex, test_for_this > auto make(lock_state *state, Ex ex)
Definition: submit.h:157
#define concept
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
requires requires(::folly::pushmi::invoke(std::declval< F >(), std::get< Is >(std::declval< Tuple >())...))) const expr decltype(auto) apply_impl(F &&f
PUSHMI_INLINE_VAR constexpr detail::submit_at_fn submit_at
Definition: submit.h:387
void operator()(TN &&...tn) const
Definition: submit.h:346
void operator()(E e) const noexcept
Definition: submit.h:353
#define PUSHMI_INLINE_VAR
Definition: concept_def.h:60
std::tuple< AN... > args_
Definition: submit.h:55
std::string & out_
Definition: json.cpp:185
PUSHMI_INLINE_VAR constexpr __adl::get_top_fn top
std::mutex mutex
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
::folly::pushmi::detail::opt< T > * result_
Definition: submit.h:344
PUSHMI_INLINE_VAR constexpr __adl::do_submit_fn submit
PUSHMI_TEMPLATE(class In, class Out, bool SenderRequires, bool SingleSenderRequires, bool TimeSingleSenderRequires)(requires Sender< In > &&Receiver< Out >) constexpr bool sender_requires_from()
decltype(folly::pushmi::invoke(std::declval< F >(), std::declval< As >()...)) invoke_result_t
Definition: functional.h:47
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::make_receiver_fn make_receiver
PUSHMI_INLINE_VAR constexpr detail::submit_after_fn submit_after
Definition: submit.h:388
requires Receiver< Out > &&not Constrained< Exec > void submit(Out out)
Definition: submit.h:183
state
Definition: http_parser.c:272
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done