proxygen
subject.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <vector>
19 
22 
23 namespace folly {
24 namespace pushmi {
25 
26 template <class... TN>
27 struct subject;
28 
29 template <class PS, class... TN>
30 struct subject<PS, TN...> {
34 
35  struct subject_shared {
36  using receiver_t = any_receiver<std::exception_ptr, TN...>;
37  bool done_ = false;
39  std::exception_ptr ep_;
40  std::vector<receiver_t> receivers_;
42  PUSHMI_TEMPLATE(class Out)
43  (requires Receiver<Out>)
44  void submit(Out out) {
45  std::unique_lock<std::mutex> guard(lock_);
46  if (ep_) {
47  set_error(out, ep_);
48  return;
49  }
50  if (!!t_) {
51  auto args = *t_;
54  std::tuple_cat(std::tuple<Out>{std::move(out)}, std::move(args)));
55  return;
56  }
57  if (done_) {
58  set_done(out);
59  return;
60  }
61  receivers_.push_back(receiver_t{out});
62  }
63  PUSHMI_TEMPLATE(class... VN)
64  (requires And<SemiMovable<VN>...>)
65  void value(VN&&... vn) {
66  std::unique_lock<std::mutex> guard(lock_);
67  for (auto& out : receivers_) {
70  std::tuple<decltype(out), std::decay_t<TN>...>{
71  out, detail::as_const(vn)...});
72  }
73  t_ = std::make_tuple((VN &&) vn...);
74  receivers_.clear();
75  }
76  PUSHMI_TEMPLATE(class E)
77  (requires SemiMovable<E>)
78  void error(E e) noexcept {
79  std::unique_lock<std::mutex> guard(lock_);
80  ep_ = e;
81  for (auto& out : receivers_) {
82  set_error(out, std::move(e));
83  }
84  receivers_.clear();
85  }
86  void done() {
87  std::unique_lock<std::mutex> guard(lock_);
88  done_ = true;
89  for (auto& out : receivers_) {
90  set_done(out);
91  }
92  receivers_.clear();
93  }
94  };
95 
96  struct subject_receiver {
97  using properties = property_set<is_receiver<>>;
98 
99  std::shared_ptr<subject_shared> s;
100 
101  PUSHMI_TEMPLATE(class... VN)
102  (requires And<SemiMovable<VN>...>)
103  void value(VN&&... vn) {
104  s->value((VN &&) vn...);
105  }
106  PUSHMI_TEMPLATE(class E)
107  (requires SemiMovable<E>)
108  void error(E e) noexcept {
109  s->error(std::move(e));
110  }
111  void done() {
112  s->done();
113  }
114  };
115 
116  std::shared_ptr<subject_shared> s = std::make_shared<subject_shared>();
117 
118  auto executor() {
119  return trampoline();
120  }
121  PUSHMI_TEMPLATE(class Out)
122  (requires Receiver<Out>)
123  void submit(Out out) {
124  s->submit(std::move(out));
125  }
126 
127  auto receiver() {
128  return detail::receiver_from_fn<subject>{}(subject_receiver{s});
129  }
130 };
131 
132 } // namespace pushmi
133 } // namespace folly
requires SemiMovable< E > void error(E e) noexcept
Definition: subject.h:78
detail::delegator< E > trampoline()
Definition: trampoline.h:261
std::shared_ptr< subject_shared > s
Definition: subject.h:99
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
::folly::pushmi::detail::opt< std::tuple< std::decay_t< TN >... > > t_
Definition: subject.h:38
requires E e noexcept(noexcept(s.error(std::move(e))))
tuple make_tuple()
Definition: gtest-tuple.h:675
typename std::enable_if_t< PropertySet< PS0 > &&PropertySet< PS1 >, detail::property_set_insert< PS0, PS1 >>::type property_set_insert_t
Definition: properties.h:153
property_set_insert_t< property_set< is_sender<>, is_single<>>, property_set< property_set_index_t< PS, is_single<>>>> properties
Definition: subject.h:33
requires SemiMovable< E > void error(E e) noexcept
Definition: subject.h:108
requires And< SemiMovable< VN >... > void value(VN &&...vn)
Definition: subject.h:103
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
requires Receiver< Out > void submit(Out out)
Definition: subject.h:44
requires Receiver< Out > void submit(Out out)
Definition: subject.h:123
requires requires(detail::apply_impl(std::declval< F >(), std::declval< Tuple >(), detail::tupidxs< Tuple >{}))) const expr decltype(auto) apply(F &&f
PUSHMI_TEMPLATE(class E=std::exception_ptr, class Wrapped)(requires Sender< detail
Definition: executor.h:102
std::mutex mutex
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::detail::as_const_fn as_const
requires And< SemiMovable< VN >... > void value(VN &&...vn)
Definition: subject.h:65
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
decltype(auto) constexpr apply(F &&func, Tuple &&tuple)
Definition: ApplyTuple.h:87
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done