26 template <
class E,
class Executor>
29 template <
class E,
class Executor>
40 template <
class E,
class TP>
42 return l.when < r.when;
44 template <
class E,
class TP>
46 return l.when > r.when;
48 template <
class E,
class TP>
50 return l.when == r.when;
52 template <
class E,
class TP>
56 template <
class E,
class TP>
60 template <
class E,
class TP>
67 :
public std::enable_shared_from_this<strand_queue_base<E>> {
70 size_t remaining_ = 0;
83 template <
class E,
class Executor>
94 this->shared_from_this());
102 std::unique_lock<std::mutex>
guard{this->lock_};
105 if (this->remaining_ > 0) {
109 if (this->items_.empty()) {
114 this->remaining_ = this->items_.size();
116 auto that = shared_from_that();
119 while (!this->items_.empty() && --this->remaining_ >= 0) {
130 std::unique_lock<std::mutex>
guard{this->lock_};
132 this->remaining_ = 0;
134 while (!this->items_.empty()) {
143 std::unique_lock<std::mutex>
guard{this->lock_};
146 if (this->remaining_ > 0) {
150 if (this->items_.empty()) {
154 auto that = shared_from_that();
159 template <
class E,
class Executor>
168 template <
class E,
class Executor>
178 template <
class E,
class Executor>
180 std::shared_ptr<strand_queue<E, Executor>>
queue_;
198 (
requires ReceiveValue<Out, any_executor_ref<E>>&&
199 ReceiveError<Out, E>)
202 std::unique_lock<std::mutex>
guard{queue_->lock_};
204 if (queue_->remaining_ == 0) {
217 template <
class E,
class ExecutorFactory>
226 auto queue = std::make_shared<strand_queue<E, decltype(ex)>>(
std::move(ex));
231 template <
class Exec>
243 (
requires Invocable<ExecutorFactory&>&&
strand_executor(std::shared_ptr< strand_queue< E, Executor >> queue)
requires Invocable< ExecutorFactory & > &&Executor< invoke_result_t< ExecutorFactory & > > &&ConcurrentSequence< invoke_result_t< ExecutorFactory & > > auto strands(ExecutorFactory ef)
strand_executor_factory_fn(ExecutorFactory ef)
strand_queue(Executor ex)
constexpr detail::Map< Move > move
strand_item< E > & front()
bool operator!=(const strand_item< E > &l, const strand_item< E > &r)
strand_queue_receiver(std::shared_ptr< strand_queue< E, Executor >> that)
PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error
—— Concurrent Priority Queue Implementation ——
any_receiver< E, any_executor_ref< E > > what
requires E e noexcept(noexcept(s.error(std::move(e))))
bool operator==(const strand_item< E > &l, const strand_item< E > &r)
virtual ~strand_queue_base()
GuardImpl guard(ErrorHandler &&handler)
std::enable_if_t< PropertySet< PS > &&Property< P >, decltype(detail::__property_set_index_fn< P >(PS{}))> property_set_index_t
requires ReceiveValue< Out, any_executor_ref< E > > &&ReceiveError< Out, E > void submit(Out out)
strand_item(any_receiver< E, any_executor_ref< E >> out)
bool operator>=(const strand_item< E > &l, const strand_item< E > &r)
same_executor_factory_fn(Exec ex)
bool operator>(const strand_item< E > &l, const strand_item< E > &r)
std::queue< strand_item< E > > items_
requires requires(detail::apply_impl(std::declval< F >(), std::declval< Tuple >(), detail::tupidxs< Tuple >{}))) const expr decltype(auto) apply(F &&f
PUSHMI_TEMPLATE(class E=std::exception_ptr, class Wrapped)(requires Sender< detail
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::detail::as_const_fn as_const
void error(AE e) noexcept
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
auto dispatch(Type type, F &&f) -> decltype(f(std::declval< Default >()))
PUSHMI_INLINE_VAR constexpr __adl::do_submit_fn submit
decltype(folly::pushmi::invoke(std::declval< F >(), std::declval< As >()...)) invoke_result_t
std::shared_ptr< strand_queue< E, Executor > > queue_
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done