34 #include <glog/logging.h> 71 static_assert(
sizeof(
SpinLock) == 1,
"missized");
194 template <
typename T>
198 "void futures are not supported. Use Unit instead.");
217 template <
typename...
Args>
224 Core& operator=(
Core const&) =
delete;
233 auto const state = state_.load(std::memory_order_acquire);
245 auto state = core->state_.load(std::memory_order_acquire);
248 state = core->state_.load(std::memory_order_acquire);
283 while (core->state_.load(std::memory_order_relaxed) ==
State::Proxy) {
286 return core->result_;
291 while (core->state_.load(std::memory_order_relaxed) ==
State::Proxy) {
294 return core->result_;
305 template <
typename F>
307 DCHECK(!hasCallback());
313 auto state = state_.load(std::memory_order_acquire);
316 if (state_.compare_exchange_strong(
324 state_.store(
State::Done, std::memory_order_relaxed);
330 return proxyCallback();
333 terminate_with<std::logic_error>(
"setCallback unexpected state");
343 DCHECK(!hasResult());
347 auto state = state_.load(std::memory_order_acquire);
350 if (state_.compare_exchange_strong(
362 terminate_with<std::logic_error>(
"setCallback unexpected state");
377 DCHECK(!hasResult());
381 auto state = state_.load(std::memory_order_acquire);
385 if (state_.compare_exchange_strong(
393 if (state_.compare_exchange_strong(
401 terminate_with<std::logic_error>(
"setResult unexpected state");
429 priority_ = priority;
437 return executor_.get();
454 std::lock_guard<SpinLock>
lock(interruptLock_);
455 if (!interrupt_ && !hasResult()) {
456 interrupt_ = std::make_unique<exception_wrapper>(
std::move(e));
457 if (interruptHandler_) {
458 interruptHandler_(*interrupt_);
464 if (!interruptHandlerSet_.load(std::memory_order_acquire)) {
467 std::lock_guard<SpinLock>
lock(interruptLock_);
468 return interruptHandler_;
482 template <
typename F>
484 std::lock_guard<SpinLock>
lock(interruptLock_);
489 setInterruptHandlerNoLock(std::forward<F>(fn));
496 interruptHandlerSet_.store(
true, std::memory_order_relaxed);
506 template <
typename...
Args>
514 DCHECK(attached_ == 0);
515 auto state = state_.load(std::memory_order_relaxed);
525 proxy_->detachFuture();
532 terminate_with<std::logic_error>(
"~Core unexpected state");
552 : core_(
exchange(o.core_,
nullptr)) {}
561 core_->derefCallback();
573 int8_t priority = priority_;
586 attached_.fetch_add(2, std::memory_order_relaxed);
587 callbackReferences_.fetch_add(2, std::memory_order_relaxed);
592 if (
LIKELY(
x->getNumPriorities() == 1)) {
593 xPtr->add([core_ref =
std::move(guard_lambda),
596 Core*
const core = cr.getCore();
601 xPtr->addWithPriority(
605 Core*
const core = cr.getCore();
611 }
catch (
const std::exception& e) {
622 attached_.fetch_add(1, std::memory_order_relaxed);
635 proxy_->setExecutor(
std::move(executor_), priority_);
637 proxy_->detachFuture();
643 auto a = attached_.fetch_sub(1, std::memory_order_acq_rel);
651 auto c = callbackReferences_.fetch_sub(1, std::memory_order_acq_rel);
659 using Context = std::shared_ptr<RequestContext>;
672 std::atomic<unsigned char> callbackReferences_{0};
673 std::atomic<bool> interruptHandlerSet_{
false};
680 std::unique_ptr<exception_wrapper> interrupt_{};
681 std::function<void(exception_wrapper const&)> interruptHandler_{
nullptr};
SpinLock is and must stay a 1-byte object because of how Core is laid out.
CoreAndCallbackReference(Core *core) noexcept
std::atomic< State > state_
std::atomic< unsigned char > attached_
static const int8_t MID_PRI
constexpr State operator~(State a)
void detachOne() noexcept
int8_t getPriority() const
Core * getCore() const noexcept
void setProxy(Core *proxy)
constexpr detail::Map< Move > move
void derefCallback() noexcept
Try< T > const & getTry() const
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
static Core< T > * make(in_place_t, Args &&...args)
—— Concurrent Priority Queue Implementation ——
static Core * make(Try< T > &&t)
in_place_tag in_place(in_place_tag={})
requires E e noexcept(noexcept(s.error(std::move(e))))
in_place_tag(&)(in_place_tag) in_place_t
constexpr State operator&(State a, State b)
std::function< void(exception_wrapper const &)> getInterruptHandler()
void detachFuture() noexcept
constexpr T const & as_const(T &t) noexcept
bool ready() const noexcept
constexpr State operator^(State a, State b)
State
See Core for details.
void detachPromise() noexcept
~CoreAndCallbackReference() noexcept
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
void setInterruptHandler(F &&fn)
Executor::KeepAlive executor_
bool hasCallback() const noexcept
May call from any thread.
void setExecutor(Executor::KeepAlive<> x, int8_t priority=Executor::MID_PRI)
static const char *const value
Core(in_place_t, Args &&...args) noexcept(std::is_nothrow_constructible< T, Args &&... >::value)
Executor * getExecutor() const
T exchange(T &obj, U &&new_value)
void setExecutor(Executor *x, int8_t priority=Executor::MID_PRI)
void setInterruptHandlerNoLock(std::function< void(exception_wrapper const &)> fn)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
void setCallback(F &&func, std::shared_ptr< folly::RequestContext > context)
CoreAndCallbackReference(CoreAndCallbackReference &&o) noexcept
void setResult(Try< T > &&t)
folly::Function< void()> callback_
Executor::KeepAlive< ExecutorT > getKeepAliveToken(ExecutorT *executor)
std::shared_ptr< RequestContext > Context
FOLLY_ALWAYS_INLINE void assume(bool cond)
bool hasResult() const noexcept
#define FOLLY_FALLTHROUGH
constexpr State operator|(State a, State b)
static Core * make()
State will be Start.