proxygen
composition_2.cpp File Reference
#include <algorithm>
#include <cassert>
#include <iostream>
#include <vector>
#include <folly/experimental/pushmi/examples/pool.h>
#include <folly/experimental/pushmi/o/transform.h>
#include <folly/experimental/pushmi/o/via.h>
#include <folly/experimental/pushmi/strand.h>

Go to the source code of this file.

Classes

struct  f_t
 
struct  g_t
 

Functions

f_t f ()
 
g_t g (f_t)
 
template<class CPUExecutor , class IOExecutor >
void lisp (CPUExecutor cpu, IOExecutor io)
 
template<class CPUExecutor , class IOExecutor >
void sugar (CPUExecutor cpu, IOExecutor io)
 
template<class CPUExecutor , class IOExecutor >
void pipe (CPUExecutor cpu, IOExecutor io)
 
int main ()
 

Function Documentation

f_t f ( )

Definition at line 30 of file composition_2.cpp.

Referenced by lisp(), pipe(), and sugar().

30  {
31  return {};
32 }
g_t g ( f_t  )

Definition at line 34 of file composition_2.cpp.

Referenced by folly::detail::NodeRecycler< NodeType, NodeAlloc, typename std::enable_if< !NodeType::template DestroyIsNoOp< NodeAlloc >::value >::type >::add(), folly::TestExecutor::addImpl(), folly::threadlocal_detail::StaticMetaBase::allocate(), folly::SimpleAllocator::allocate(), folly::detail::BufferedStat< DigestT, ClockT >::append(), folly::detail::DigestBuilder< DigestT >::append(), proxygen::HTTPMessage::atomicDumpMessage(), BENCHMARK(), folly::AsyncUDPSocket::bind(), folly::detail::DigestBuilder< DigestT >::build(), folly::ssl::cleanup(), folly::detail::ConcurrentHashMapSegment< KeyType, ValueType, ShardBits, HashFn, KeyEqual, Allocator, Atom, Mutex >::clear(), folly::NotificationQueue< MessageT >::Consumer::consumeUntilDrained(), folly::NotificationQueue< MessageT >::SimpleConsumer::consumeUntilDrained(), folly::SimpleAllocator::deallocate(), folly::threadlocal_detail::StaticMetaBase::destroy(), QueueTest::destroyCallback(), folly::portability::ssl::DH_set0_pqg(), folly::detail::BufferedStat< DigestT, ClockT >::doUpdate(), wangle::ConnectionManager::DrainHelper::drainConnections(), wangle::ConnectionManager::dropAllConnections(), wangle::ConnectionManager::dropConnections(), BaselinePQ< T, PriorityQueue, Mutex >::empty(), testing::gmock_matchers_test::EMString(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::ensureSignal(), folly::detail::ConcurrentHashMapSegment< KeyType, ValueType, ShardBits, HashFn, KeyEqual, Allocator, Atom, Mutex >::erase_internal(), TestAsyncTransport::failPendingWrites(), testing::gmock_matchers_test::FindBacktrackingMaxBPM(), proxygen::HTTPSession::flowControlTimeoutExpired(), folly::detail::BufferedStat< DigestT, ClockT >::flush(), folly::compression::generateRandomList(), folly::detail::BufferedDigest< DigestT, ClockT >::get(), folly::detail::BufferedSlidingWindow< DigestT, ClockT >::get(), wangle::SSLSessionCacheManager::getLocalCache(), wangle::SSLUtil::getRSAExIndex(), folly::SharedPromise< T >::getSemiFuture(), wangle::SSLUtil::getSSLCtxExIndex(), wangle::SSLUtil::getSSLSessionExStrIndex(), HTTPDownstreamTest< SPDY3_1CodecPair >::gracefulShutdown(), folly::ConcurrentSkipList< T, Comp, NodeAlloc, MAX_HEIGHT >::growHeight(), folly::HandshakeCallback::handshakeErr(), folly::HandshakeCallback::handshakeSuc(), folly::ssl::init(), folly::NotificationQueue< MessageT >::Consumer::init(), folly::detail::ConcurrentHashMapSegment< KeyType, ValueType, ShardBits, HashFn, KeyEqual, Allocator, Atom, Mutex >::insert_internal(), proxygen::HTTPSession::invokeOnAllTransactions(), folly::SharedPromise< T >::isFulfilled(), lisp(), wangle::ShardedLocalSSLSessionCache::lookupSession(), folly::ssl::markInitialized(), folly::detail::ConcurrentHashMapSegment< KeyType, ValueType, ShardBits, HashFn, KeyEqual, Allocator, Atom, Mutex >::max_load_factor(), testing::gmock_generated_function_mockers_test::MockFoo::MockFoo(), proxygen::HTTPTransaction::onEgressBodyFirstByte(), proxygen::HTTPTransaction::onEgressBodyLastByte(), proxygen::HTTPTransaction::onEgressHeaderFirstByte(), proxygen::HTTPTransaction::onEgressLastByteAck(), proxygen::HTTPTransaction::onEgressTimeout(), proxygen::HTTPTransaction::onEgressTrackedByte(), proxygen::HTTPTransaction::onError(), proxygen::HTTPTransaction::onExTransaction(), proxygen::HTTPSession::onGoaway(), proxygen::HTTPTransaction::onGoaway(), proxygen::HTTP1xCodec::onHeadersComplete(), proxygen::HTTPTransaction::onIngressBody(), proxygen::HTTPTransaction::onIngressHeadersComplete(), proxygen::HTTPTransaction::onIngressTimeout(), proxygen::HTTPTransaction::onIngressWindowUpdate(), proxygen::HTTPTransaction::onPushedTransaction(), proxygen::HTTPSession::onSettings(), folly::threadlocal_detail::StaticMetaBase::onThreadExit(), proxygen::HTTPTransaction::onWriteReady(), proxygen::HTTP2Codec::parseHeadersDecodeFrames(), proxygen::HTTPTransaction::pauseEgress(), proxygen::HTTPTransaction::pauseIngress(), testing::gmock_generated_actions_test::SubstractAction::Perform(), pipe(), GlobalLockPQ< T >::pop(), proxygen::HTTPTransaction::processIngressBody(), proxygen::HTTPTransaction::processIngressChunkComplete(), proxygen::HTTPTransaction::processIngressChunkHeader(), proxygen::HTTPTransaction::processIngressEOM(), proxygen::HTTPTransaction::processIngressHeadersComplete(), proxygen::HTTPTransaction::processIngressTrailers(), proxygen::HTTPTransaction::processIngressUpgrade(), GlobalLockPQ< T >::push(), folly::threadlocal_detail::StaticMetaBase::pushBackLocked(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessageImpl(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessagesImpl(), folly::detail::NodeRecycler< NodeType, NodeAlloc, typename std::enable_if< !NodeType::template DestroyIsNoOp< NodeAlloc >::value >::type >::releaseRef(), wangle::ShardedLocalSSLSessionCache::removeSession(), folly::threadlocal_detail::StaticMetaBase::reserve(), proxygen::HTTPTransaction::resumeEgress(), proxygen::HTTPTransaction::resumeIngress(), proxygen::HTTPSession::resumeTransactions(), folly::rcu_domain< Tag >::retire(), runFairness(), runTest(), proxygen::HTTPTransaction::sendAbort(), proxygen::HTTPTransaction::sendEOM(), folly::SharedPromise< T >::setInterruptHandler(), folly::ssl::setLockTypes(), folly::ssl::setLockTypesAndInit(), folly::SharedPromise< T >::setTry(), shared_ptr_test(), wangle::SSLSessionCacheManager::shutdown(), folly::FunctionScheduler::shutdown(), TestAsyncTransport::shutdownWriteNow(), BaselinePQ< T, PriorityQueue, Mutex >::size(), folly::SharedPromise< T >::size(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::size(), testing::internal::MatchMatrix::SpaceIndex(), folly::NotificationQueue< MessageT >::Consumer::stopConsuming(), wangle::ShardedLocalSSLSessionCache::storeSession(), sugar(), folly::rcu_domain< Tag >::synchronize(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::syncSignalAndQueue(), TEST(), testing::gmock_generated_actions_test::TEST(), TEST_F(), testing::gmock_matchers_test::TEST_F(), HTTPDownstreamTest< C >::testChunks(), folly::EventBaseManager::trackEventBase(), BaselinePQ< T, PriorityQueue, Mutex >::try_peek(), BaselinePQ< T, PriorityQueue, Mutex >::try_pop(), BaselinePQ< T, PriorityQueue, Mutex >::try_push(), testing::internal::MaxBipartiteMatchState::TryAugment(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::tryConsume(), tryretire(), unique_ptr_test(), folly::EventBaseManager::untrackEventBase(), folly::detail::BufferedStat< DigestT, ClockT >::updateIfExpired(), proxygen::HTTPTransaction::validateIngressStateTransition(), folly::EventBaseManager::withEventBaseSet(), proxygen::HTTPSession::writeTimeoutExpired(), testing::gmock_generated_function_mockers_test::FooInterface::~FooInterface(), wangle::LocalSSLSessionCache::~LocalSSLSessionCache(), and folly::SimpleAllocator::~SimpleAllocator().

34  {
35  return {};
36 }
template<class CPUExecutor , class IOExecutor >
void lisp ( CPUExecutor  cpu,
IOExecutor  io 
)

Definition at line 40 of file composition_2.cpp.

References f(), g(), folly::pushmi::strands(), submit, folly::pushmi::operators::transform, and folly::pushmi::operators::via.

Referenced by main().

40  {
41  // f on cpu - g on cpu (implicit: a single task on the cpu executor runs all
42  // the functions)
43  op::submit([](g_t) {})(op::transform([](f_t ft) { return g(ft); })(
44  op::transform([](auto) { return f(); })(cpu)));
45 
46  // f on cpu - g on cpu (explicit: the first cpu task runs f and a second cpu
47  // task runs g)
48  op::submit([](g_t) {})(op::transform([](f_t ft) { return g(ft); })(
49  op::via(mi::strands(cpu))(op::transform([](auto) { return f(); })(cpu))));
50 
51  // f on io - g on cpu
52  op::submit([](g_t) {})(op::transform([](f_t ft) { return g(ft); })(
53  op::via(mi::strands(cpu))(op::transform([](auto) { return f(); })(io))));
54 }
requires Invocable< ExecutorFactory & > &&Executor< invoke_result_t< ExecutorFactory & > > &&ConcurrentSequence< invoke_result_t< ExecutorFactory & > > auto strands(ExecutorFactory ef)
Definition: strand.h:246
PUSHMI_INLINE_VAR constexpr detail::transform_fn transform
Definition: transform.h:158
PUSHMI_INLINE_VAR constexpr detail::via_fn via
Definition: via.h:166
f_t f()
g_t g(f_t)
int main ( void  )

Definition at line 101 of file composition_2.cpp.

References lisp(), max, folly::pushmi::pipe, and sugar().

101  {
102  mi::pool cpuPool{std::max(1u, std::thread::hardware_concurrency())};
103  mi::pool ioPool{std::max(1u, std::thread::hardware_concurrency())};
104 
105  lisp(cpuPool.executor(), ioPool.executor());
106  sugar(cpuPool.executor(), ioPool.executor());
107  pipe(cpuPool.executor(), ioPool.executor());
108 
109  ioPool.wait();
110  cpuPool.wait();
111 
112  std::cout << "OK" << std::endl;
113 }
LogLevel max
Definition: LogLevel.cpp:31
void sugar(CPUExecutor cpu, IOExecutor io)
void lisp(CPUExecutor cpu, IOExecutor io)
void pipe(CPUExecutor cpu, IOExecutor io)
template<class CPUExecutor , class IOExecutor >
void pipe ( CPUExecutor  cpu,
IOExecutor  io 
)

Definition at line 74 of file composition_2.cpp.

References f(), g(), folly::pushmi::strands(), submit, folly::pushmi::operators::transform, and folly::pushmi::operators::via.

Referenced by BENCHMARK(), folly::Subprocess::findByChildFd(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::NotificationQueue(), folly::Subprocess::spawn(), folly::Subprocess::spawnInternal(), TEST(), and folly::TEST().

74  {
75  // f on cpu - g on cpu (implicit: a single task on the cpu executor runs all
76  // the functions)
77  mi::pipe(
78  cpu,
79  op::transform([](auto) { return f(); }),
80  op::transform([](f_t ft) { return g(ft); }),
81  op::submit([](g_t) {}));
82 
83  // f on cpu - g on cpu (explicit: the first cpu task runs f and a second cpu
84  // task runs g)
85  mi::pipe(
86  cpu,
87  op::transform([](auto) { return f(); }),
88  op::via(mi::strands(cpu)),
89  op::transform([](f_t ft) { return g(ft); }),
90  op::submit([](g_t) {}));
91 
92  // f on io - g on cpu
93  mi::pipe(
94  io,
95  op::transform([](auto) { return f(); }),
96  op::via(mi::strands(cpu)),
97  op::transform([](f_t ft) { return g(ft); }),
98  op::submit([](g_t) {}));
99 }
requires Invocable< ExecutorFactory & > &&Executor< invoke_result_t< ExecutorFactory & > > &&ConcurrentSequence< invoke_result_t< ExecutorFactory & > > auto strands(ExecutorFactory ef)
Definition: strand.h:246
PUSHMI_INLINE_VAR constexpr detail::transform_fn transform
Definition: transform.h:158
PUSHMI_INLINE_VAR constexpr detail::via_fn via
Definition: via.h:166
f_t f()
g_t g(f_t)
void pipe(CPUExecutor cpu, IOExecutor io)
template<class CPUExecutor , class IOExecutor >
void sugar ( CPUExecutor  cpu,
IOExecutor  io 
)

Definition at line 57 of file composition_2.cpp.

References f(), g(), folly::pushmi::strands(), submit, folly::pushmi::operators::transform, and folly::pushmi::operators::via.

Referenced by main().

57  {
58  // f on cpu - g on cpu (implicit: a single task on the cpu executor runs all
59  // the functions)
60  cpu | op::transform([](auto) { return f(); }) |
61  op::transform([](f_t ft) { return g(ft); }) | op::submit([](g_t) {});
62 
63  // f on cpu - g on cpu (explicit: the first cpu task runs f and a second cpu
64  // task runs g)
65  cpu | op::transform([](auto) { return f(); }) | op::via(mi::strands(cpu)) |
66  op::transform([](f_t ft) { return g(ft); }) | op::submit([](g_t) {});
67 
68  // f on io - g on cpu
69  io | op::transform([](auto) { return f(); }) | op::via(mi::strands(cpu)) |
70  op::transform([](f_t ft) { return g(ft); }) | op::submit([](g_t) {});
71 }
requires Invocable< ExecutorFactory & > &&Executor< invoke_result_t< ExecutorFactory & > > &&ConcurrentSequence< invoke_result_t< ExecutorFactory & > > auto strands(ExecutorFactory ef)
Definition: strand.h:246
PUSHMI_INLINE_VAR constexpr detail::transform_fn transform
Definition: transform.h:158
PUSHMI_INLINE_VAR constexpr detail::via_fn via
Definition: via.h:166
f_t f()
g_t g(f_t)