30 template <
class E,
class TP>
33 template <
class E,
class TP,
class NF,
class Executor>
36 template <
class E,
class TP>
49 template <
class E,
class TP>
51 return l.
when < r.when;
53 template <
class E,
class TP>
57 template <
class E,
class TP>
63 template <
class E,
class TP>
69 template <
class E,
class TP>
75 template <
class E,
class TP>
82 template <
class E,
class TP>
84 :
public std::enable_shared_from_this<time_source_queue_base<E, TP>> {
87 bool dispatching_ =
false;
88 bool pending_ =
false;
91 std::vector<time_heap_item<E, TP>>,
97 time_heap_item<E, TP>&
top() {
99 return const_cast<time_heap_item<E, TP>&
>(this->heap_.top());
105 template <
class E,
class TP,
class NF,
class Executor>
115 std::weak_ptr<time_source_shared<E, time_point>>
source_;
123 this->shared_from_this());
126 template <
class Exec>
128 auto s = source_.lock();
130 if (
s->t_.get_id() == std::this_thread::get_id()) {
140 auto start = nf_() + std::chrono::milliseconds(50);
142 std::unique_lock<std::mutex>
guard{
s->lock_};
144 if (!this->dispatching_ || this->pending_) {
148 if (this->heap_.empty()) {
151 auto that = shared_from_that();
153 while (!this->heap_.empty() && this->heap_.top().when <=
start) {
157 std::this_thread::sleep_until(item.when);
165 if (this->heap_.empty()) {
168 s->wake_.notify_one();
171 while (!this->heap_.empty()) {
188 auto s = source_.lock();
189 std::unique_lock<std::mutex>
guard{
s->lock_};
191 if (!this->dispatching_ || this->pending_) {
195 while (!this->heap_.empty()) {
203 this->dispatching_ =
false;
206 auto s = source_.lock();
207 std::unique_lock<std::mutex>
guard{
s->lock_};
209 if (!this->dispatching_ || this->pending_) {
212 this->dispatching_ =
false;
215 s->pending_.push_back(this->shared_from_this());
216 this->pending_ =
true;
217 if (this->heap_.top().when <=
s->earliest_) {
220 s->wake_.notify_one();
225 template <
class E,
class TP,
class NF,
class Executor>
227 : std::shared_ptr<time_source_queue<E, TP, NF, Executor>> {
232 source_(that->source_.
lock()) {}
234 std::shared_ptr<time_source_shared<E, TP>>
source_;
237 template <
class E,
class TP,
class NF,
class Executor>
243 template <
class E,
class TP>
247 return !q->heap_.empty();
251 template <
class E,
class TP>
260 !q->dispatching_ && !q->heap_.empty() && q->heap_.top().when <= *
start_;
261 q->dispatching_ = ready;
262 q->pending_ = !ready && !q->heap_.empty();
265 *earliest_ = !ready && !q->heap_.empty()
266 ?
min(*earliest_, q->heap_.top().when)
272 template <
class E,
class TP>
274 :
public std::enable_shared_from_this<time_source_shared_base<E, TP>> {
286 std::deque<std::shared_ptr<time_source_queue_base<E, TP>>>
pending_;
296 template <
class E,
class TP>
302 std::vector<std::shared_ptr<time_source_queue_base<E, TP>>>
ready_;
307 if (t_.joinable() || this->items_ != 0) {
317 std::unique_lock<std::mutex>
guard{that->lock_};
320 that->wake_.notify_one();
327 std::unique_lock<std::mutex>
guard{that->lock_};
330 while (!that->done_ || that->items_ > 0) {
332 that->wake_.wait_until(
guard, that->earliest_, [&]() {
333 return that->dirty_ != 0 ||
334 std::chrono::system_clock::now() >= that->earliest_;
342 auto earliest =
start + std::chrono::hours(24);
345 auto process_begin = std::partition(
346 that->pending_.begin(), that->pending_.end(), process);
347 that->earliest_ = earliest;
354 that->pending_.end(),
355 std::back_inserter(that->ready_),
359 that->pending_.erase(process_begin, that->pending_.end());
368 for (
auto& q : that->ready_) {
372 that->ready_.clear();
374 that->joined_ =
true;
382 std::unique_lock<std::mutex>
guard{that->lock_};
385 that->error_ = std::current_exception();
386 for (
auto& q : that->pending_) {
387 while (!q->heap_.empty()) {
406 std::unique_lock<std::mutex>
guard{this->lock_};
409 if (!!this->error_) {
415 if (!!this->joined_) {
422 if (!queue->dispatching_ && !queue->pending_) {
424 this->pending_.push_back(queue);
425 queue->pending_ =
true;
428 if (queue->heap_.top().when < this->earliest_) {
431 this->wake_.notify_one();
440 template <
class E,
class TP,
class NF,
class Executor>
443 std::shared_ptr<time_source_shared<E, time_point>>
source_;
444 std::shared_ptr<time_source_queue<E, time_point, NF, Executor>>
queue_;
460 return queue_->nf_();
467 (
requires Regular<TPA>&& ReceiveValue<Out, any_time_executor_ref<E, TP>>&&
468 ReceiveError<Out, E>)
483 template <
class E,
class TP,
class NF,
class ExecutorFactory>
486 std::shared_ptr<time_source_shared<E, time_point>>
source_;
499 std::make_shared<time_source_queue<E, time_point, NF, decltype(ex)>>(
525 class E = std::exception_ptr,
526 class TP = std::chrono::system_clock::time_point>
532 std::shared_ptr<time_source_shared<E, time_point>>
source_;
537 source_->start(source_);
541 (
requires Invocable<ExecutorFactory&>&&
545 auto make(NF nf, ExecutorFactory ef) {
551 source_->join(source_);
std::decay_t< TP > time_point
std::shared_ptr< time_source_shared< E, time_point > > source_
time_source_executor(std::shared_ptr< time_source_shared< E, time_point >> source, std::shared_ptr< time_source_queue< E, time_point, NF, Executor >> queue)
void insert(std::shared_ptr< time_source_queue_base< E, TP >> queue, time_heap_item< E, TP > item)
std::shared_ptr< time_source_shared< E, time_point > > source_
std::vector< std::shared_ptr< time_source_queue_base< E, TP > > > ready_
std::decay_t< TP > time_point
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
std::decay_t< TP > time_point
std::decay_t< TP > time_point
bool operator!=(const strand_item< E > &l, const strand_item< E > &r)
PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error
time_source_executor_factory_fn(std::shared_ptr< time_source_shared< E, time_point >> source, NF nf, ExecutorFactory ef)
bool operator()(std::shared_ptr< time_source_queue_base< E, TP >> &q)
—— Concurrent Priority Queue Implementation ——
any_receiver< E, any_time_executor_ref< E, TP > > what
requires E e noexcept(noexcept(s.error(std::move(e))))
std::decay_t< time_point > time_point
std::chrono::system_clock::time_point earliest_
~time_source_queue_receiver()
bool operator==(const strand_item< E > &l, const strand_item< E > &r)
std::shared_ptr< time_source_shared< E, time_point > > source_
requires Regular< TPA > &&ReceiveValue< Out, any_time_executor_ref< E, TP > > &&ReceiveError< Out, E > void submit(TPA tp, Out out)
std::deque< std::shared_ptr< time_source_queue_base< E, TP > > > pending_
std::shared_ptr< time_source_shared< E, TP > > source_
GuardImpl guard(ErrorHandler &&handler)
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
std::condition_variable wake_
bool operator>=(const strand_item< E > &l, const strand_item< E > &r)
static void start(std::shared_ptr< time_source_shared< E, TP >> that)
std::decay_t< TP > time_point
bool operator()(const std::shared_ptr< time_source_queue_base< E, TP >> &q)
std::shared_ptr< time_source_queue< E, time_point, NF, Executor > > queue_
bool operator>(const strand_item< E > &l, const strand_item< E > &r)
requires requires(detail::apply_impl(std::declval< F >(), std::declval< Tuple >(), detail::tupidxs< Tuple >{}))) const expr decltype(auto) apply(F &&f
PUSHMI_INLINE_VAR constexpr __adl::get_top_fn now
PUSHMI_TEMPLATE(class E=std::exception_ptr, class Wrapped)(requires Sender< detail
time_source_queue(std::weak_ptr< time_source_shared< E, time_point >> source, NF nf, Executor ex)
std::priority_queue< time_heap_item< E, TP >, std::vector< time_heap_item< E, TP > >, std::greater<> > heap_
PUSHMI_INLINE_VAR constexpr __adl::get_top_fn top
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::detail::as_const_fn as_const
std::weak_ptr< time_source_shared< E, time_point > > source_
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
auto dispatch(Type type, F &&f) -> decltype(f(std::declval< Default >()))
const time_point * start_
time_source_queue_receiver(std::shared_ptr< time_source_queue< E, TP, NF, Executor >> that)
time_heap_item< E, TP > & top()
PUSHMI_INLINE_VAR constexpr __adl::do_submit_fn submit
decltype(folly::pushmi::invoke(std::declval< F >(), std::declval< As >()...)) invoke_result_t
virtual ~time_source_queue_base()
void error(AE e) noexcept
time_source_shared_base()
bool operator<=(const strand_item< E > &l, const strand_item< E > &r)
static void join(std::shared_ptr< time_source_shared< E, TP >> that)
static void worker(std::shared_ptr< time_source_shared< E, TP >> that)
requires Invocable< ExecutorFactory & > &&Executor< invoke_result_t< ExecutorFactory & > > &&NeverBlocking< invoke_result_t< ExecutorFactory & > > auto make(NF nf, ExecutorFactory ef)
std::decay_t< TP > time_point
time_heap_item(time_point at, any_receiver< E, any_time_executor_ref< E, TP >> out)
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done