proxygen
folly::pushmi::time_source_shared< E, TP > Class Template Reference

#include <time_source.h>

Inheritance diagram for folly::pushmi::time_source_shared< E, TP >:
folly::pushmi::time_source_shared_base< E, TP >

Public Member Functions

 ~time_source_shared ()
 
 time_source_shared ()
 
void insert (std::shared_ptr< time_source_queue_base< E, TP >> queue, time_heap_item< E, TP > item)
 
- Public Member Functions inherited from folly::pushmi::time_source_shared_base< E, TP >
 time_source_shared_base ()
 

Static Public Member Functions

static void start (std::shared_ptr< time_source_shared< E, TP >> that)
 
static void join (std::shared_ptr< time_source_shared< E, TP >> that)
 
static void worker (std::shared_ptr< time_source_shared< E, TP >> that)
 

Public Attributes

std::thread t_
 
std::vector< std::shared_ptr< time_source_queue_base< E, TP > > > ready_
 
- Public Attributes inherited from folly::pushmi::time_source_shared_base< E, TP >
std::mutex lock_
 
std::condition_variable wake_
 
std::thread t_
 
std::chrono::system_clock::time_point earliest_
 
bool done_
 
bool joined_
 
int dirty_
 
int items_
 
detail::opt< Eerror_
 
std::deque< std::shared_ptr< time_source_queue_base< E, TP > > > pending_
 

Additional Inherited Members

- Public Types inherited from folly::pushmi::time_source_shared_base< E, TP >
using time_point = std::decay_t< TP >
 

Detailed Description

template<class E, class TP>
class folly::pushmi::time_source_shared< E, TP >

Definition at line 31 of file time_source.h.

Constructor & Destructor Documentation

template<class E, class TP>
folly::pushmi::time_source_shared< E, TP >::~time_source_shared ( )
inline

Definition at line 304 of file time_source.h.

304  {
305  // not allowed to be discarded without joining and completing all queued
306  // items
307  if (t_.joinable() || this->items_ != 0) {
308  std::abort();
309  }
310  }
template<class E, class TP>
folly::pushmi::time_source_shared< E, TP >::time_source_shared ( )
inline

Definition at line 311 of file time_source.h.

311 {}

Member Function Documentation

template<class E, class TP>
void folly::pushmi::time_source_shared< E, TP >::insert ( std::shared_ptr< time_source_queue_base< E, TP >>  queue,
time_heap_item< E, TP >  item 
)
inline

Definition at line 403 of file time_source.h.

405  {
406  std::unique_lock<std::mutex> guard{this->lock_};
407 
408  // deliver error_ and return
409  if (!!this->error_) {
410  set_error(item.what, *this->error_);
411  return;
412  }
413  // once join() is called, new work queued to the executor is not safe unless
414  // it is nested in an existing item.
415  if (!!this->joined_) {
416  std::abort();
417  };
418 
419  queue->heap_.push(std::move(item));
420  ++this->items_;
421 
422  if (!queue->dispatching_ && !queue->pending_) {
423  // add queue to pending pending_ list if it is not already there
424  this->pending_.push_back(queue);
425  queue->pending_ = true;
426  }
427 
428  if (queue->heap_.top().when < this->earliest_) {
429  // this is the earliest, tell worker to reset earliest_
430  ++this->dirty_;
431  this->wake_.notify_one();
432  }
433  }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error
std::chrono::system_clock::time_point earliest_
Definition: time_source.h:280
std::deque< std::shared_ptr< time_source_queue_base< E, TP > > > pending_
Definition: time_source.h:286
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
template<class E, class TP>
static void folly::pushmi::time_source_shared< E, TP >::join ( std::shared_ptr< time_source_shared< E, TP >>  that)
inlinestatic

Definition at line 316 of file time_source.h.

316  {
317  std::unique_lock<std::mutex> guard{that->lock_};
318  that->done_ = true;
319  ++that->dirty_;
320  that->wake_.notify_one();
321  guard.unlock();
322  that->t_.join();
323  }
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
template<class E, class TP>
static void folly::pushmi::time_source_shared< E, TP >::start ( std::shared_ptr< time_source_shared< E, TP >>  that)
inlinestatic

Definition at line 313 of file time_source.h.

313  {
314  that->t_ = std::thread{&time_source_shared<E, TP>::worker, that};
315  }
static void worker(std::shared_ptr< time_source_shared< E, TP >> that)
Definition: time_source.h:325
template<class E, class TP>
static void folly::pushmi::time_source_shared< E, TP >::worker ( std::shared_ptr< time_source_shared< E, TP >>  that)
inlinestatic

Definition at line 325 of file time_source.h.

325  {
326  try {
327  std::unique_lock<std::mutex> guard{that->lock_};
328 
329  // once done_, keep going until empty
330  while (!that->done_ || that->items_ > 0) {
331  // wait for something to do
332  that->wake_.wait_until(guard, that->earliest_, [&]() {
333  return that->dirty_ != 0 ||
334  std::chrono::system_clock::now() >= that->earliest_;
335  });
336  that->dirty_ = 0;
337 
338  //
339  // select ready and empty queues and reset earliest_
340 
342  auto earliest = start + std::chrono::hours(24);
343  auto process = time_item_process_pred_fn<E, TP>{&start, &earliest};
344 
345  auto process_begin = std::partition(
346  that->pending_.begin(), that->pending_.end(), process);
347  that->earliest_ = earliest;
348 
349  // copy out the queues that have ready items so that the lock
350  // is not held during dispatch
351 
352  std::copy_if(
353  process_begin,
354  that->pending_.end(),
355  std::back_inserter(that->ready_),
356  time_queue_dispatch_pred_fn<E, TP>{});
357 
358  // remove processed queues from pending queue.
359  that->pending_.erase(process_begin, that->pending_.end());
360 
361  // printf("d %lu, %lu, %d, %ld\n", that->pending_.size(),
362  // that->ready_.size(), that->items_,
363  // std::chrono::duration_cast<std::chrono::milliseconds>(earliest -
364  // start).count());
365 
366  // dispatch to queues with ready items
367  guard.unlock();
368  for (auto& q : that->ready_) {
369  q->dispatch();
370  }
371  guard.lock();
372  that->ready_.clear();
373  }
374  that->joined_ = true;
375  } catch (...) {
376  //
377  // block any more items from being enqueued, all new items will be sent
378  // this error on the same context that calls submit
379  //
380  // also dispatch errors to all items already in the queues from the
381  // time thread
382  std::unique_lock<std::mutex> guard{that->lock_};
383  // creates a dependency that std::exception_ptr must be ConvertibleTo E
384  // TODO: break this dependency rather than enforce it with concepts
385  that->error_ = std::current_exception();
386  for (auto& q : that->pending_) {
387  while (!q->heap_.empty()) {
388  try {
389  auto what{std::move(q->top().what)};
390  q->heap_.pop();
391  --that->items_;
392  guard.unlock();
393  set_error(what, *that->error_);
394  guard.lock();
395  } catch (...) {
396  // we already have an error, ignore this one.
397  }
398  }
399  }
400  }
401  }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::chrono::steady_clock::time_point now()
PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
static void start(std::shared_ptr< time_source_shared< E, TP >> that)
Definition: time_source.h:313

Member Data Documentation

template<class E, class TP>
std::vector<std::shared_ptr<time_source_queue_base<E, TP> > > folly::pushmi::time_source_shared< E, TP >::ready_

Definition at line 302 of file time_source.h.

template<class E, class TP>
std::thread folly::pushmi::time_source_shared< E, TP >::t_

Definition at line 299 of file time_source.h.


The documentation for this class was generated from the following file: