proxygen
folly::pushmi::time_source_queue< E, TP, NF, Executor > Class Template Reference

#include <time_source.h>

Inheritance diagram for folly::pushmi::time_source_queue< E, TP, NF, Executor >:
folly::pushmi::time_source_queue_base< E, TP >

Public Types

using time_point = std::decay_t< TP >
 
- Public Types inherited from folly::pushmi::time_source_queue_base< E, TP >
using time_point = std::decay_t< TP >
 

Public Member Functions

 ~time_source_queue ()
 
 time_source_queue (std::weak_ptr< time_source_shared< E, time_point >> source, NF nf, Executor ex)
 
void dispatch () override
 
auto shared_from_that ()
 
template<class Exec >
void value (Exec &&)
 
template<class AE >
void error (AE e) noexcept
 
void done ()
 
- Public Member Functions inherited from folly::pushmi::time_source_queue_base< E, TP >
virtual ~time_source_queue_base ()
 
time_heap_item< E, TP > & top ()
 

Public Attributes

std::weak_ptr< time_source_shared< E, time_point > > source_
 
NF nf_
 
Executor ex_
 
- Public Attributes inherited from folly::pushmi::time_source_queue_base< E, TP >
bool dispatching_ = false
 
bool pending_ = false
 
std::priority_queue< time_heap_item< E, TP >, std::vector< time_heap_item< E, TP > >, std::greater<> > heap_
 

Detailed Description

template<class E, class TP, class NF, class Executor>
class folly::pushmi::time_source_queue< E, TP, NF, Executor >

Definition at line 106 of file time_source.h.

Member Typedef Documentation

template<class E , class TP , class NF , class Executor >
using folly::pushmi::time_source_queue< E, TP, NF, Executor >::time_point = std::decay_t<TP>

Definition at line 108 of file time_source.h.

Constructor & Destructor Documentation

template<class E , class TP , class NF , class Executor >
folly::pushmi::time_source_queue< E, TP, NF, Executor >::~time_source_queue ( )
inline

Definition at line 109 of file time_source.h.

109 {}
template<class E , class TP , class NF , class Executor >
folly::pushmi::time_source_queue< E, TP, NF, Executor >::time_source_queue ( std::weak_ptr< time_source_shared< E, time_point >>  source,
NF  nf,
Executor  ex 
)
inline

Definition at line 110 of file time_source.h.

114  : source_(std::move(source)), nf_(std::move(nf)), ex_(std::move(ex)) {}
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::weak_ptr< time_source_shared< E, time_point > > source_
Definition: time_source.h:115

Member Function Documentation

template<class E , class TP , class NF , class Executor >
void folly::pushmi::time_source_queue< E, TP, NF, Executor >::dispatch ( )
overridevirtual

Implements folly::pushmi::time_source_queue_base< E, TP >.

Definition at line 238 of file time_source.h.

References folly::pushmi::submit.

238  {
239  submit(
240  ex_, time_source_queue_receiver<E, TP, NF, Executor>{shared_from_that()});
241 }
PUSHMI_INLINE_VAR constexpr __adl::do_submit_fn submit
template<class E , class TP , class NF , class Executor >
void folly::pushmi::time_source_queue< E, TP, NF, Executor >::done ( )
inline

Definition at line 205 of file time_source.h.

References folly::gen::guard(), and s.

205  {
206  auto s = source_.lock();
207  std::unique_lock<std::mutex> guard{s->lock_};
208 
209  if (!this->dispatching_ || this->pending_) {
210  std::abort();
211  }
212  this->dispatching_ = false;
213 
214  // add back to pending_ to get the remaining items dispatched
215  s->pending_.push_back(this->shared_from_this());
216  this->pending_ = true;
217  if (this->heap_.top().when <= s->earliest_) {
218  // this is the earliest, tell worker to reset earliest_
219  ++s->dirty_;
220  s->wake_.notify_one();
221  }
222  }
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
std::priority_queue< time_heap_item< E, TP >, std::vector< time_heap_item< E, TP > >, std::greater<> > heap_
Definition: time_source.h:93
std::weak_ptr< time_source_shared< E, time_point > > source_
Definition: time_source.h:115
static set< string > s
template<class E , class TP , class NF , class Executor >
template<class AE >
void folly::pushmi::time_source_queue< E, TP, NF, Executor >::error ( AE  e)
inlinenoexcept

Definition at line 187 of file time_source.h.

References folly::pushmi::detail::as_const, folly::gen::guard(), folly::gen::move, s, folly::pushmi::set_error, folly::pushmi::top, and folly::pushmi::time_heap_item< E, TP >::what.

187  {
188  auto s = source_.lock();
189  std::unique_lock<std::mutex> guard{s->lock_};
190 
191  if (!this->dispatching_ || this->pending_) {
192  std::abort();
193  }
194 
195  while (!this->heap_.empty()) {
196  auto what{std::move(this->top().what)};
197  this->heap_.pop();
198  --s->items_;
199  guard.unlock();
200  set_error(what, detail::as_const(e));
201  guard.lock();
202  }
203  this->dispatching_ = false;
204  }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
std::priority_queue< time_heap_item< E, TP >, std::vector< time_heap_item< E, TP > >, std::greater<> > heap_
Definition: time_source.h:93
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::detail::as_const_fn as_const
std::weak_ptr< time_source_shared< E, time_point > > source_
Definition: time_source.h:115
static set< string > s
time_heap_item< E, TP > & top()
Definition: time_source.h:97
template<class E , class TP , class NF , class Executor >
auto folly::pushmi::time_source_queue< E, TP, NF, Executor >::shared_from_that ( )
inline

Definition at line 121 of file time_source.h.

121  {
122  return std::static_pointer_cast<time_source_queue<E, TP, NF, Executor>>(
123  this->shared_from_this());
124  }
template<class E , class TP , class NF , class Executor >
template<class Exec >
void folly::pushmi::time_source_queue< E, TP, NF, Executor >::value ( Exec &&  )
inline

Definition at line 127 of file time_source.h.

References folly::gen::guard(), folly::gen::move, s, folly::pushmi::set_done, folly::pushmi::set_error, folly::pushmi::set_value, start, folly::pushmi::top, and folly::pushmi::time_heap_item< E, TP >::what.

127  {
128  auto s = source_.lock();
129 
130  if (s->t_.get_id() == std::this_thread::get_id()) {
131  // Executor is not allowed to use the time_source thread
132  std::abort();
133  }
134 
135  //
136  // pull ready items from the heap in order.
137 
138  // drain anything queued within the next 50ms before
139  // going back to the pending queue.
140  auto start = nf_() + std::chrono::milliseconds(50);
141 
142  std::unique_lock<std::mutex> guard{s->lock_};
143 
144  if (!this->dispatching_ || this->pending_) {
145  std::abort();
146  }
147 
148  if (this->heap_.empty()) {
149  return;
150  }
151  auto that = shared_from_that();
152  auto subEx = time_source_executor<E, TP, NF, Executor>{s, that};
153  while (!this->heap_.empty() && this->heap_.top().when <= start) {
154  auto item{std::move(this->top())};
155  this->heap_.pop();
156  guard.unlock();
157  std::this_thread::sleep_until(item.when);
158  set_value(item.what, any_time_executor_ref<E, TP>{subEx});
159  set_done(item.what);
160  guard.lock();
161  // allows set_value to queue nested items
162  --s->items_;
163  }
164 
165  if (this->heap_.empty()) {
166  // if this is empty, tell worker to check for the done condition.
167  ++s->dirty_;
168  s->wake_.notify_one();
169  } else {
170  if (!!s->error_) {
171  while (!this->heap_.empty()) {
172  try {
173  auto what{std::move(this->top().what)};
174  this->heap_.pop();
175  --s->items_;
176  guard.unlock();
177  set_error(what, *s->error_);
178  guard.lock();
179  } catch (...) {
180  // we already have an error, ignore this one.
181  }
182  }
183  }
184  }
185  }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
auto start
std::priority_queue< time_heap_item< E, TP >, std::vector< time_heap_item< E, TP > >, std::greater<> > heap_
Definition: time_source.h:93
std::weak_ptr< time_source_shared< E, time_point > > source_
Definition: time_source.h:115
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
static set< string > s
time_heap_item< E, TP > & top()
Definition: time_source.h:97
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done

Member Data Documentation

template<class E , class TP , class NF , class Executor >
Executor folly::pushmi::time_source_queue< E, TP, NF, Executor >::ex_

Definition at line 117 of file time_source.h.

template<class E , class TP , class NF , class Executor >
NF folly::pushmi::time_source_queue< E, TP, NF, Executor >::nf_

Definition at line 116 of file time_source.h.

template<class E , class TP , class NF , class Executor >
std::weak_ptr<time_source_shared<E, time_point> > folly::pushmi::time_source_queue< E, TP, NF, Executor >::source_

Definition at line 115 of file time_source.h.


The documentation for this class was generated from the following file: