proxygen
time_source.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
20 
21 #include <queue>
22 
23 //
24 // time_source is used to build a time_single_executor from a single_executor.
25 //
26 
27 namespace folly {
28 namespace pushmi {
29 
30 template <class E, class TP>
32 
33 template <class E, class TP, class NF, class Executor>
35 
36 template <class E, class TP>
38  public:
39  using time_point = std::decay_t<TP>;
40 
42  time_point at,
44  : when(std::move(at)), what(std::move(out)) {}
45 
48 };
49 template <class E, class TP>
50 bool operator<(const time_heap_item<E, TP>& l, const time_heap_item<E, TP>& r) {
51  return l.when < r.when;
52 }
53 template <class E, class TP>
55  return l.when > r.when;
56 }
57 template <class E, class TP>
59  const time_heap_item<E, TP>& l,
60  const time_heap_item<E, TP>& r) {
61  return l.when == r.when;
62 }
63 template <class E, class TP>
65  const time_heap_item<E, TP>& l,
66  const time_heap_item<E, TP>& r) {
67  return !(l == r);
68 }
69 template <class E, class TP>
71  const time_heap_item<E, TP>& l,
72  const time_heap_item<E, TP>& r) {
73  return !(l > r);
74 }
75 template <class E, class TP>
77  const time_heap_item<E, TP>& l,
78  const time_heap_item<E, TP>& r) {
79  return !(l < r);
80 }
81 
82 template <class E, class TP>
84  : public std::enable_shared_from_this<time_source_queue_base<E, TP>> {
85  public:
86  using time_point = std::decay_t<TP>;
87  bool dispatching_ = false;
88  bool pending_ = false;
89  std::priority_queue<
91  std::vector<time_heap_item<E, TP>>,
92  std::greater<>>
94 
96 
97  time_heap_item<E, TP>& top() {
98  // :(
99  return const_cast<time_heap_item<E, TP>&>(this->heap_.top());
100  }
101 
102  virtual void dispatch() = 0;
103 };
104 
105 template <class E, class TP, class NF, class Executor>
107  public:
108  using time_point = std::decay_t<TP>;
111  std::weak_ptr<time_source_shared<E, time_point>> source,
112  NF nf,
113  Executor ex)
114  : source_(std::move(source)), nf_(std::move(nf)), ex_(std::move(ex)) {}
115  std::weak_ptr<time_source_shared<E, time_point>> source_;
116  NF nf_;
118 
119  void dispatch() override;
120 
122  return std::static_pointer_cast<time_source_queue<E, TP, NF, Executor>>(
123  this->shared_from_this());
124  }
125 
126  template <class Exec>
127  void value(Exec&&) {
128  auto s = source_.lock();
129 
130  if (s->t_.get_id() == std::this_thread::get_id()) {
131  // Executor is not allowed to use the time_source thread
132  std::abort();
133  }
134 
135  //
136  // pull ready items from the heap in order.
137 
138  // drain anything queued within the next 50ms before
139  // going back to the pending queue.
140  auto start = nf_() + std::chrono::milliseconds(50);
141 
142  std::unique_lock<std::mutex> guard{s->lock_};
143 
144  if (!this->dispatching_ || this->pending_) {
145  std::abort();
146  }
147 
148  if (this->heap_.empty()) {
149  return;
150  }
151  auto that = shared_from_that();
152  auto subEx = time_source_executor<E, TP, NF, Executor>{s, that};
153  while (!this->heap_.empty() && this->heap_.top().when <= start) {
154  auto item{std::move(this->top())};
155  this->heap_.pop();
156  guard.unlock();
157  std::this_thread::sleep_until(item.when);
158  set_value(item.what, any_time_executor_ref<E, TP>{subEx});
159  set_done(item.what);
160  guard.lock();
161  // allows set_value to queue nested items
162  --s->items_;
163  }
164 
165  if (this->heap_.empty()) {
166  // if this is empty, tell worker to check for the done condition.
167  ++s->dirty_;
168  s->wake_.notify_one();
169  } else {
170  if (!!s->error_) {
171  while (!this->heap_.empty()) {
172  try {
173  auto what{std::move(this->top().what)};
174  this->heap_.pop();
175  --s->items_;
176  guard.unlock();
177  set_error(what, *s->error_);
178  guard.lock();
179  } catch (...) {
180  // we already have an error, ignore this one.
181  }
182  }
183  }
184  }
185  }
186  template <class AE>
187  void error(AE e) noexcept {
188  auto s = source_.lock();
189  std::unique_lock<std::mutex> guard{s->lock_};
190 
191  if (!this->dispatching_ || this->pending_) {
192  std::abort();
193  }
194 
195  while (!this->heap_.empty()) {
196  auto what{std::move(this->top().what)};
197  this->heap_.pop();
198  --s->items_;
199  guard.unlock();
201  guard.lock();
202  }
203  this->dispatching_ = false;
204  }
205  void done() {
206  auto s = source_.lock();
207  std::unique_lock<std::mutex> guard{s->lock_};
208 
209  if (!this->dispatching_ || this->pending_) {
210  std::abort();
211  }
212  this->dispatching_ = false;
213 
214  // add back to pending_ to get the remaining items dispatched
215  s->pending_.push_back(this->shared_from_this());
216  this->pending_ = true;
217  if (this->heap_.top().when <= s->earliest_) {
218  // this is the earliest, tell worker to reset earliest_
219  ++s->dirty_;
220  s->wake_.notify_one();
221  }
222  }
223 };
224 
225 template <class E, class TP, class NF, class Executor>
227  : std::shared_ptr<time_source_queue<E, TP, NF, Executor>> {
230  std::shared_ptr<time_source_queue<E, TP, NF, Executor>> that)
231  : std::shared_ptr<time_source_queue<E, TP, NF, Executor>>(that),
232  source_(that->source_.lock()) {}
234  std::shared_ptr<time_source_shared<E, TP>> source_;
235 };
236 
237 template <class E, class TP, class NF, class Executor>
239  submit(
240  ex_, time_source_queue_receiver<E, TP, NF, Executor>{shared_from_that()});
241 }
242 
243 template <class E, class TP>
245  public:
246  bool operator()(std::shared_ptr<time_source_queue_base<E, TP>>& q) {
247  return !q->heap_.empty();
248  }
249 };
250 
251 template <class E, class TP>
253  public:
254  using time_point = std::decay_t<TP>;
257  bool operator()(const std::shared_ptr<time_source_queue_base<E, TP>>& q) {
258  // ready for dispatch if it has a ready item
259  bool ready =
260  !q->dispatching_ && !q->heap_.empty() && q->heap_.top().when <= *start_;
261  q->dispatching_ = ready;
262  q->pending_ = !ready && !q->heap_.empty();
263  // ready queues are ignored, they will update earliest_ after they have
264  // processed the ready items
265  *earliest_ = !ready && !q->heap_.empty()
266  ? min(*earliest_, q->heap_.top().when)
267  : *earliest_;
268  return q->pending_;
269  }
270 };
271 
272 template <class E, class TP>
274  : public std::enable_shared_from_this<time_source_shared_base<E, TP>> {
275  public:
276  using time_point = std::decay_t<TP>;
278  std::condition_variable wake_;
279  std::thread t_;
280  std::chrono::system_clock::time_point earliest_;
281  bool done_;
282  bool joined_;
283  int dirty_;
284  int items_;
286  std::deque<std::shared_ptr<time_source_queue_base<E, TP>>> pending_;
287 
289  : earliest_(std::chrono::system_clock::now() + std::chrono::hours(24)),
290  done_(false),
291  joined_(false),
292  dirty_(0),
293  items_(0) {}
294 };
295 
296 template <class E, class TP>
297 class time_source_shared : public time_source_shared_base<E, TP> {
298  public:
299  std::thread t_;
300  // this is safe to reuse as long as there is only one thread in the
301  // time_source_shared
302  std::vector<std::shared_ptr<time_source_queue_base<E, TP>>> ready_;
303 
305  // not allowed to be discarded without joining and completing all queued
306  // items
307  if (t_.joinable() || this->items_ != 0) {
308  std::abort();
309  }
310  }
312 
313  static void start(std::shared_ptr<time_source_shared<E, TP>> that) {
314  that->t_ = std::thread{&time_source_shared<E, TP>::worker, that};
315  }
316  static void join(std::shared_ptr<time_source_shared<E, TP>> that) {
317  std::unique_lock<std::mutex> guard{that->lock_};
318  that->done_ = true;
319  ++that->dirty_;
320  that->wake_.notify_one();
321  guard.unlock();
322  that->t_.join();
323  }
324 
325  static void worker(std::shared_ptr<time_source_shared<E, TP>> that) {
326  try {
327  std::unique_lock<std::mutex> guard{that->lock_};
328 
329  // once done_, keep going until empty
330  while (!that->done_ || that->items_ > 0) {
331  // wait for something to do
332  that->wake_.wait_until(guard, that->earliest_, [&]() {
333  return that->dirty_ != 0 ||
334  std::chrono::system_clock::now() >= that->earliest_;
335  });
336  that->dirty_ = 0;
337 
338  //
339  // select ready and empty queues and reset earliest_
340 
342  auto earliest = start + std::chrono::hours(24);
343  auto process = time_item_process_pred_fn<E, TP>{&start, &earliest};
344 
345  auto process_begin = std::partition(
346  that->pending_.begin(), that->pending_.end(), process);
347  that->earliest_ = earliest;
348 
349  // copy out the queues that have ready items so that the lock
350  // is not held during dispatch
351 
352  std::copy_if(
353  process_begin,
354  that->pending_.end(),
355  std::back_inserter(that->ready_),
357 
358  // remove processed queues from pending queue.
359  that->pending_.erase(process_begin, that->pending_.end());
360 
361  // printf("d %lu, %lu, %d, %ld\n", that->pending_.size(),
362  // that->ready_.size(), that->items_,
363  // std::chrono::duration_cast<std::chrono::milliseconds>(earliest -
364  // start).count());
365 
366  // dispatch to queues with ready items
367  guard.unlock();
368  for (auto& q : that->ready_) {
369  q->dispatch();
370  }
371  guard.lock();
372  that->ready_.clear();
373  }
374  that->joined_ = true;
375  } catch (...) {
376  //
377  // block any more items from being enqueued, all new items will be sent
378  // this error on the same context that calls submit
379  //
380  // also dispatch errors to all items already in the queues from the
381  // time thread
382  std::unique_lock<std::mutex> guard{that->lock_};
383  // creates a dependency that std::exception_ptr must be ConvertibleTo E
384  // TODO: break this dependency rather than enforce it with concepts
385  that->error_ = std::current_exception();
386  for (auto& q : that->pending_) {
387  while (!q->heap_.empty()) {
388  try {
389  auto what{std::move(q->top().what)};
390  q->heap_.pop();
391  --that->items_;
392  guard.unlock();
393  set_error(what, *that->error_);
394  guard.lock();
395  } catch (...) {
396  // we already have an error, ignore this one.
397  }
398  }
399  }
400  }
401  }
402 
403  void insert(
404  std::shared_ptr<time_source_queue_base<E, TP>> queue,
405  time_heap_item<E, TP> item) {
406  std::unique_lock<std::mutex> guard{this->lock_};
407 
408  // deliver error_ and return
409  if (!!this->error_) {
410  set_error(item.what, *this->error_);
411  return;
412  }
413  // once join() is called, new work queued to the executor is not safe unless
414  // it is nested in an existing item.
415  if (!!this->joined_) {
416  std::abort();
417  };
418 
419  queue->heap_.push(std::move(item));
420  ++this->items_;
421 
422  if (!queue->dispatching_ && !queue->pending_) {
423  // add queue to pending pending_ list if it is not already there
424  this->pending_.push_back(queue);
425  queue->pending_ = true;
426  }
427 
428  if (queue->heap_.top().when < this->earliest_) {
429  // this is the earliest, tell worker to reset earliest_
430  ++this->dirty_;
431  this->wake_.notify_one();
432  }
433  }
434 };
435 
436 //
437 // the time executor will queue the work to the time ordered heap.
438 //
439 
440 template <class E, class TP, class NF, class Executor>
441 class time_source_executor {
442  using time_point = std::decay_t<TP>;
443  std::shared_ptr<time_source_shared<E, time_point>> source_;
444  std::shared_ptr<time_source_queue<E, time_point, NF, Executor>> queue_;
445 
446  public:
447  using properties = property_set<
448  is_time<>,
453 
455  std::shared_ptr<time_source_shared<E, time_point>> source,
456  std::shared_ptr<time_source_queue<E, time_point, NF, Executor>> queue)
457  : source_(std::move(source)), queue_(std::move(queue)) {}
458 
459  auto top() {
460  return queue_->nf_();
461  }
462  auto executor() {
463  return *this;
464  }
465 
466  PUSHMI_TEMPLATE(class TPA, class Out)
467  (requires Regular<TPA>&& ReceiveValue<Out, any_time_executor_ref<E, TP>>&&
468  ReceiveError<Out, E>)
469  void submit(TPA tp, Out out) {
470  // queue for later
471  source_->insert(
472  queue_,
475  }
476 };
477 
478 //
479 // the time executor factory produces a new time ordered queue each time that it
480 // is called.
481 //
482 
483 template <class E, class TP, class NF, class ExecutorFactory>
485  using time_point = std::decay_t<TP>;
486  std::shared_ptr<time_source_shared<E, time_point>> source_;
487  NF nf_;
488  ExecutorFactory ef_;
489 
490  public:
492  std::shared_ptr<time_source_shared<E, time_point>> source,
493  NF nf,
494  ExecutorFactory ef)
495  : source_(std::move(source)), nf_(std::move(nf)), ef_(std::move(ef)) {}
496  auto operator()() {
497  auto ex = ef_();
498  auto queue =
499  std::make_shared<time_source_queue<E, time_point, NF, decltype(ex)>>(
500  source_, nf_, std::move(ex));
502  queue};
503  }
504 };
505 
506 //
507 // each time_source is an independent source of timed events
508 //
509 // a time_source is a time_single_executor factory, it is not an executor
510 // itself.
511 //
512 // each time_source has a single thread that is shared across all the
513 // time executors it produces. the thread is used to wait for the next time
514 // event. when a time event is ready the thread will use the executor passed
515 // into make() to callback on the receiver passed to the time executor submit()
516 //
517 // passing an executor to time_source.make() will create a time executor
518 // factory. the time executor factory is a function that will return a time
519 // executor when called with no arguments.
520 //
521 //
522 //
523 
524 template <
525  class E = std::exception_ptr,
526  class TP = std::chrono::system_clock::time_point>
527 class time_source {
528  public:
529  using time_point = std::decay_t<TP>;
530 
531  private:
532  std::shared_ptr<time_source_shared<E, time_point>> source_;
533 
534  public:
536  : source_(std::make_shared<time_source_shared<E, time_point>>()) {
537  source_->start(source_);
538  }
539 
540  PUSHMI_TEMPLATE(class NF, class ExecutorFactory)
541  (requires Invocable<ExecutorFactory&>&&
543  NeverBlocking<invoke_result_t<
544  ExecutorFactory&>>)
545  auto make(NF nf, ExecutorFactory ef) {
547  source_, std::move(nf), std::move(ef)};
548  }
549 
550  void join() {
551  source_->join(source_);
552  }
553 };
554 } // namespace pushmi
555 } // namespace folly
chrono
Definition: CMakeCache.txt:563
std::decay_t< TP > time_point
Definition: time_source.h:39
std::shared_ptr< time_source_shared< E, time_point > > source_
Definition: time_source.h:486
time_source_executor(std::shared_ptr< time_source_shared< E, time_point >> source, std::shared_ptr< time_source_queue< E, time_point, NF, Executor >> queue)
Definition: time_source.h:454
void insert(std::shared_ptr< time_source_queue_base< E, TP >> queue, time_heap_item< E, TP > item)
Definition: time_source.h:403
std::shared_ptr< time_source_shared< E, time_point > > source_
Definition: time_source.h:532
std::vector< std::shared_ptr< time_source_queue_base< E, TP > > > ready_
Definition: time_source.h:302
std::decay_t< TP > time_point
Definition: time_source.h:529
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
bool operator!=(const strand_item< E > &l, const strand_item< E > &r)
Definition: strand.h:53
PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error
time_source_executor_factory_fn(std::shared_ptr< time_source_shared< E, time_point >> source, NF nf, ExecutorFactory ef)
Definition: time_source.h:491
bool operator()(std::shared_ptr< time_source_queue_base< E, TP >> &q)
Definition: time_source.h:246
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
any_receiver< E, any_time_executor_ref< E, TP > > what
Definition: time_source.h:47
requires E e noexcept(noexcept(s.error(std::move(e))))
std::chrono::system_clock::time_point earliest_
Definition: time_source.h:280
bool operator==(const strand_item< E > &l, const strand_item< E > &r)
Definition: strand.h:49
std::shared_ptr< time_source_shared< E, time_point > > source_
Definition: time_source.h:443
LogLevel min
Definition: LogLevel.cpp:30
requires Regular< TPA > &&ReceiveValue< Out, any_time_executor_ref< E, TP > > &&ReceiveError< Out, E > void submit(TPA tp, Out out)
Definition: time_source.h:469
std::deque< std::shared_ptr< time_source_queue_base< E, TP > > > pending_
Definition: time_source.h:286
std::shared_ptr< time_source_shared< E, TP > > source_
Definition: time_source.h:234
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
bool operator>=(const strand_item< E > &l, const strand_item< E > &r)
Definition: strand.h:61
static void start(std::shared_ptr< time_source_shared< E, TP >> that)
Definition: time_source.h:313
auto start
bool operator()(const std::shared_ptr< time_source_queue_base< E, TP >> &q)
Definition: time_source.h:257
std::shared_ptr< time_source_queue< E, time_point, NF, Executor > > queue_
Definition: time_source.h:444
bool operator>(const strand_item< E > &l, const strand_item< E > &r)
Definition: strand.h:45
requires requires(detail::apply_impl(std::declval< F >(), std::declval< Tuple >(), detail::tupidxs< Tuple >{}))) const expr decltype(auto) apply(F &&f
PUSHMI_INLINE_VAR constexpr __adl::get_top_fn now
PUSHMI_TEMPLATE(class E=std::exception_ptr, class Wrapped)(requires Sender< detail
Definition: executor.h:102
time_source_queue(std::weak_ptr< time_source_shared< E, time_point >> source, NF nf, Executor ex)
Definition: time_source.h:110
std::priority_queue< time_heap_item< E, TP >, std::vector< time_heap_item< E, TP > >, std::greater<> > heap_
Definition: time_source.h:93
PUSHMI_INLINE_VAR constexpr __adl::get_top_fn top
std::mutex mutex
uintptr_t start_
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::detail::as_const_fn as_const
std::weak_ptr< time_source_shared< E, time_point > > source_
Definition: time_source.h:115
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
auto dispatch(Type type, F &&f) -> decltype(f(std::declval< Default >()))
Definition: Instructions.h:175
static set< string > s
time_source_queue_receiver(std::shared_ptr< time_source_queue< E, TP, NF, Executor >> that)
Definition: time_source.h:229
time_heap_item< E, TP > & top()
Definition: time_source.h:97
PUSHMI_INLINE_VAR constexpr __adl::do_submit_fn submit
decltype(folly::pushmi::invoke(std::declval< F >(), std::declval< As >()...)) invoke_result_t
Definition: functional.h:47
bool operator<=(const strand_item< E > &l, const strand_item< E > &r)
Definition: strand.h:57
static void join(std::shared_ptr< time_source_shared< E, TP >> that)
Definition: time_source.h:316
static void worker(std::shared_ptr< time_source_shared< E, TP >> that)
Definition: time_source.h:325
requires Invocable< ExecutorFactory & > &&Executor< invoke_result_t< ExecutorFactory & > > &&NeverBlocking< invoke_result_t< ExecutorFactory & > > auto make(NF nf, ExecutorFactory ef)
Definition: time_source.h:545
time_heap_item(time_point at, any_receiver< E, any_time_executor_ref< E, TP >> out)
Definition: time_source.h:41
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done