proxygen
strand.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
20 
21 #include <queue>
22 
23 namespace folly {
24 namespace pushmi {
25 
26 template <class E, class Executor>
28 
29 template <class E, class Executor>
31 
32 template <class E>
33 class strand_item {
34  public:
36  : what(std::move(out)) {}
37 
39 };
40 template <class E, class TP>
41 bool operator<(const strand_item<E>& l, const strand_item<E>& r) {
42  return l.when < r.when;
43 }
44 template <class E, class TP>
45 bool operator>(const strand_item<E>& l, const strand_item<E>& r) {
46  return l.when > r.when;
47 }
48 template <class E, class TP>
49 bool operator==(const strand_item<E>& l, const strand_item<E>& r) {
50  return l.when == r.when;
51 }
52 template <class E, class TP>
53 bool operator!=(const strand_item<E>& l, const strand_item<E>& r) {
54  return !(l == r);
55 }
56 template <class E, class TP>
57 bool operator<=(const strand_item<E>& l, const strand_item<E>& r) {
58  return !(l > r);
59 }
60 template <class E, class TP>
61 bool operator>=(const strand_item<E>& l, const strand_item<E>& r) {
62  return !(l < r);
63 }
64 
65 template <class E>
67  : public std::enable_shared_from_this<strand_queue_base<E>> {
68  public:
70  size_t remaining_ = 0;
71  std::queue<strand_item<E>> items_;
72 
73  virtual ~strand_queue_base() {}
74 
76  // :(
77  return const_cast<strand_item<E>&>(this->items_.front());
78  }
79 
80  virtual void dispatch() = 0;
81 };
82 
83 template <class E, class Executor>
84 class strand_queue : public strand_queue_base<E> {
85  public:
87  strand_queue(Executor ex) : ex_(std::move(ex)) {}
89 
90  void dispatch() override;
91 
93  return std::static_pointer_cast<strand_queue<E, Executor>>(
94  this->shared_from_this());
95  }
96 
97  template <class Exec>
98  void value(Exec&&) {
99  //
100  // pull ready items from the queue in order.
101 
102  std::unique_lock<std::mutex> guard{this->lock_};
103 
104  // only allow one at a time
105  if (this->remaining_ > 0) {
106  return;
107  }
108  // skip when empty
109  if (this->items_.empty()) {
110  return;
111  }
112 
113  // do not allow recursive queueing to block this executor
114  this->remaining_ = this->items_.size();
115 
116  auto that = shared_from_that();
117  auto subEx = strand_executor<E, Executor>{that};
118 
119  while (!this->items_.empty() && --this->remaining_ >= 0) {
120  auto item{std::move(this->front())};
121  this->items_.pop();
122  guard.unlock();
123  set_value(item.what, any_executor_ref<E>{subEx});
124  set_done(item.what);
125  guard.lock();
126  }
127  }
128  template <class AE>
129  void error(AE e) noexcept {
130  std::unique_lock<std::mutex> guard{this->lock_};
131 
132  this->remaining_ = 0;
133 
134  while (!this->items_.empty()) {
135  auto what{std::move(this->front().what)};
136  this->items_.pop();
137  guard.unlock();
139  guard.lock();
140  }
141  }
142  void done() {
143  std::unique_lock<std::mutex> guard{this->lock_};
144 
145  // only allow one at a time
146  if (this->remaining_ > 0) {
147  return;
148  }
149  // skip when empty
150  if (this->items_.empty()) {
151  return;
152  }
153 
154  auto that = shared_from_that();
156  }
157 };
158 
159 template <class E, class Executor>
160 struct strand_queue_receiver : std::shared_ptr<strand_queue<E, Executor>> {
163  std::shared_ptr<strand_queue<E, Executor>> that)
164  : std::shared_ptr<strand_queue<E, Executor>>(that) {}
166 };
167 
168 template <class E, class Executor>
170  submit(ex_, strand_queue_receiver<E, Executor>{shared_from_that()});
171 }
172 
173 //
174 // strand is used to build a fifo single_executor from a concurrent
175 // single_executor.
176 //
177 
178 template <class E, class Executor>
179 class strand_executor {
180  std::shared_ptr<strand_queue<E, Executor>> queue_;
181 
182  public:
183  using properties = property_set<
184  is_sender<>,
189 
191  : queue_(std::move(queue)) {}
192 
193  auto executor() {
194  return *this;
195  }
196 
197  PUSHMI_TEMPLATE(class Out)
198  (requires ReceiveValue<Out, any_executor_ref<E>>&&
199  ReceiveError<Out, E>)
200  void submit(Out out) {
201  // queue for later
202  std::unique_lock<std::mutex> guard{queue_->lock_};
203  queue_->items_.push(any_receiver<E, any_executor_ref<E>>{std::move(out)});
204  if (queue_->remaining_ == 0) {
205  // noone is minding the shop, send a worker
207  queue_->ex_, strand_queue_receiver<E, Executor>{queue_});
208  }
209  }
210 };
211 
212 //
213 // the strand executor factory produces a new fifo ordered queue each time that
214 // it is called.
215 //
216 
217 template <class E, class ExecutorFactory>
219  ExecutorFactory ef_;
220 
221  public:
222  explicit strand_executor_factory_fn(ExecutorFactory ef)
223  : ef_(std::move(ef)) {}
224  auto operator()() const {
225  auto ex = ef_();
226  auto queue = std::make_shared<strand_queue<E, decltype(ex)>>(std::move(ex));
227  return strand_executor<E, decltype(ex)>{queue};
228  }
229 };
230 
231 template <class Exec>
233  Exec ex_;
234 
235  public:
236  explicit same_executor_factory_fn(Exec ex) : ex_(std::move(ex)) {}
237  auto operator()() const {
238  return ex_;
239  }
240 };
241 
242 PUSHMI_TEMPLATE(class E = std::exception_ptr, class ExecutorFactory)
243 (requires Invocable<ExecutorFactory&>&&
244  Executor<invoke_result_t<ExecutorFactory&>>&& ConcurrentSequence<
246 auto strands(ExecutorFactory ef) {
248 }
249 PUSHMI_TEMPLATE(class E = std::exception_ptr, class Exec)
250 (requires Executor<Exec>&& ConcurrentSequence<Exec>)
251 auto strands(Exec ex) {
254 }
255 
256 } // namespace pushmi
257 } // namespace folly
strand_executor(std::shared_ptr< strand_queue< E, Executor >> queue)
Definition: strand.h:190
requires Invocable< ExecutorFactory & > &&Executor< invoke_result_t< ExecutorFactory & > > &&ConcurrentSequence< invoke_result_t< ExecutorFactory & > > auto strands(ExecutorFactory ef)
Definition: strand.h:246
strand_executor_factory_fn(ExecutorFactory ef)
Definition: strand.h:222
strand_queue(Executor ex)
Definition: strand.h:87
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
strand_item< E > & front()
Definition: strand.h:75
STL namespace.
bool operator!=(const strand_item< E > &l, const strand_item< E > &r)
Definition: strand.h:53
strand_queue_receiver(std::shared_ptr< strand_queue< E, Executor >> that)
Definition: strand.h:162
PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
any_receiver< E, any_executor_ref< E > > what
Definition: strand.h:38
requires E e noexcept(noexcept(s.error(std::move(e))))
bool operator==(const strand_item< E > &l, const strand_item< E > &r)
Definition: strand.h:49
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
std::enable_if_t< PropertySet< PS > &&Property< P >, decltype(detail::__property_set_index_fn< P >(PS{}))> property_set_index_t
Definition: properties.h:148
requires ReceiveValue< Out, any_executor_ref< E > > &&ReceiveError< Out, E > void submit(Out out)
Definition: strand.h:200
strand_item(any_receiver< E, any_executor_ref< E >> out)
Definition: strand.h:35
bool operator>=(const strand_item< E > &l, const strand_item< E > &r)
Definition: strand.h:61
bool operator>(const strand_item< E > &l, const strand_item< E > &r)
Definition: strand.h:45
std::queue< strand_item< E > > items_
Definition: strand.h:71
requires requires(detail::apply_impl(std::declval< F >(), std::declval< Tuple >(), detail::tupidxs< Tuple >{}))) const expr decltype(auto) apply(F &&f
PUSHMI_TEMPLATE(class E=std::exception_ptr, class Wrapped)(requires Sender< detail
Definition: executor.h:102
std::mutex mutex
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::detail::as_const_fn as_const
void error(AE e) noexcept
Definition: strand.h:129
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
auto dispatch(Type type, F &&f) -> decltype(f(std::declval< Default >()))
Definition: Instructions.h:175
PUSHMI_INLINE_VAR constexpr __adl::do_submit_fn submit
decltype(folly::pushmi::invoke(std::declval< F >(), std::declval< As >()...)) invoke_result_t
Definition: functional.h:47
void value(Exec &&)
Definition: strand.h:98
void dispatch() override
Definition: strand.h:169
std::shared_ptr< strand_queue< E, Executor > > queue_
Definition: strand.h:180
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done