proxygen
folly::gen::detail::ClosableMPMCQueue< T > Class Template Reference

#include <Parallel-inl.h>

Public Member Functions

 ClosableMPMCQueue (size_t capacity)
 
 ~ClosableMPMCQueue ()
 
void openProducer ()
 
void openConsumer ()
 
void closeInputProducer ()
 
void closeOutputConsumer ()
 
size_t producers () const
 
size_t consumers () const
 
template<typename... Args>
bool writeUnlessFull (Args &&...args) noexcept
 
template<typename... Args>
bool writeUnlessClosed (Args &&...args)
 
bool readUnlessEmpty (T &out)
 
bool readUnlessClosed (T &out)
 

Private Attributes

MPMCQueue< Tqueue_
 
std::atomic< size_t > producers_ {0}
 
std::atomic< size_t > consumers_ {0}
 
folly::EventCount wakeProducer_
 
folly::EventCount wakeConsumer_
 

Detailed Description

template<typename T>
class folly::gen::detail::ClosableMPMCQueue< T >

Definition at line 33 of file Parallel-inl.h.

Constructor & Destructor Documentation

template<typename T>
folly::gen::detail::ClosableMPMCQueue< T >::ClosableMPMCQueue ( size_t  capacity)
inlineexplicit

Definition at line 41 of file Parallel-inl.h.

41 : queue_(capacity) {}
template<typename T>
folly::gen::detail::ClosableMPMCQueue< T >::~ClosableMPMCQueue ( )
inline

Definition at line 43 of file Parallel-inl.h.

43  {
44  CHECK(!producers());
45  CHECK(!consumers());
46  }

Member Function Documentation

template<typename T>
bool folly::gen::detail::ClosableMPMCQueue< T >::readUnlessClosed ( T out)
inline

Definition at line 116 of file Parallel-inl.h.

Referenced by folly::gen::detail::Parallel< Ops >::Generator< Input, Source, InputDecayed, Composed, Output, OutputDecayed >::Puller::apply(), folly::gen::detail::Parallel< Ops >::Generator< Input, Source, InputDecayed, Composed, Output, OutputDecayed >::Puller::foreach(), and folly::gen::detail::Parallel< Ops >::Generator< Input, Source, InputDecayed, Composed, Output, OutputDecayed >::Executor< all >::readUnlessClosed().

116  {
117  while (!queue_.readIfNotEmpty(out)) {
118  auto key = wakeConsumer_.prepareWait();
119  if (!producers()) {
120  // wake producers to fill empty space
122  return false;
123  }
124  wakeConsumer_.wait(key);
125  }
126  // wake writers blocked by full queue
128  return true;
129  }
void wait(Key key) noexcept
Definition: EventCount.h:163
void notify() noexcept
Definition: EventCount.h:134
Key prepareWait() noexcept
Definition: EventCount.h:150
template<typename T>
bool folly::gen::detail::ClosableMPMCQueue< T >::readUnlessEmpty ( T out)
inline

Definition at line 107 of file Parallel-inl.h.

Referenced by folly::gen::detail::Parallel< Ops >::Generator< Input, Source, InputDecayed, Composed, Output, OutputDecayed >::Executor< all >::readUnlessEmpty().

107  {
108  if (queue_.read(out)) {
109  // wake producers to fill empty space
111  return true;
112  }
113  return false;
114  }
void notify() noexcept
Definition: EventCount.h:134
template<typename T>
template<typename... Args>
bool folly::gen::detail::ClosableMPMCQueue< T >::writeUnlessClosed ( Args &&...  args)
inline

Definition at line 90 of file Parallel-inl.h.

Referenced by folly::gen::detail::Parallel< Ops >::Generator< Input, Source, InputDecayed, Composed, Output, OutputDecayed >::Pusher< all >::compose(), and folly::gen::detail::Parallel< Ops >::Generator< Input, Source, InputDecayed, Composed, Output, OutputDecayed >::Executor< all >::writeUnlessClosed().

90  {
91  // write if there's room
92  while (!queue_.writeIfNotFull(std::forward<Args>(args)...)) {
93  // if write fails, check if there are still consumers listening
94  auto key = wakeProducer_.prepareWait();
95  if (!consumers()) {
96  // no consumers left; bail out
98  return false;
99  }
100  wakeProducer_.wait(key);
101  }
102  // wake consumers to pick up new value
104  return true;
105  }
void wait(Key key) noexcept
Definition: EventCount.h:163
void cancelWait() noexcept
Definition: EventCount.h:155
void notify() noexcept
Definition: EventCount.h:134
Key prepareWait() noexcept
Definition: EventCount.h:150
template<typename T>
template<typename... Args>
bool folly::gen::detail::ClosableMPMCQueue< T >::writeUnlessFull ( Args &&...  args)
inlinenoexcept

Definition at line 80 of file Parallel-inl.h.

Referenced by folly::gen::detail::Parallel< Ops >::Generator< Input, Source, InputDecayed, Composed, Output, OutputDecayed >::Executor< all >::writeUnlessFull().

80  {
81  if (queue_.write(std::forward<Args>(args)...)) {
82  // wake consumers to pick up new value
84  return true;
85  }
86  return false;
87  }
void notify() noexcept
Definition: EventCount.h:134

Member Data Documentation

template<typename T>
MPMCQueue<T> folly::gen::detail::ClosableMPMCQueue< T >::queue_
private

Definition at line 34 of file Parallel-inl.h.

template<typename T>
folly::EventCount folly::gen::detail::ClosableMPMCQueue< T >::wakeConsumer_
private

Definition at line 38 of file Parallel-inl.h.

template<typename T>
folly::EventCount folly::gen::detail::ClosableMPMCQueue< T >::wakeProducer_
private

Definition at line 37 of file Parallel-inl.h.


The documentation for this class was generated from the following file: