proxygen
folly::FutureDAG Class Reference

#include <FutureDAG.h>

Inheritance diagram for folly::FutureDAG:

Classes

struct  Node
 

Public Types

typedef size_t Handle
 
typedef std::function< Future< Unit >)> FutureFunc
 

Public Member Functions

Handle add (FutureFunc func, Executor *executor=nullptr)
 
void remove (Handle a)
 
void reset ()
 
void dependency (Handle a, Handle b)
 
void clean_state (Handle source, Handle sink)
 
Future< Unitgo ()
 

Static Public Member Functions

static std::shared_ptr< FutureDAGcreate ()
 

Private Member Functions

 FutureDAG ()=default
 
bool hasCycle ()
 

Private Attributes

std::vector< Nodenodes
 

Detailed Description

Definition at line 23 of file FutureDAG.h.

Member Typedef Documentation

typedef std::function<Future<Unit>)> folly::FutureDAG::FutureFunc

Definition at line 30 of file FutureDAG.h.

typedef size_t folly::FutureDAG::Handle

Definition at line 29 of file FutureDAG.h.

Constructor & Destructor Documentation

folly::FutureDAG::FutureDAG ( )
privatedefault

Referenced by create(), and go().

Member Function Documentation

Handle folly::FutureDAG::add ( FutureFunc  func,
Executor executor = nullptr 
)
inline

Definition at line 32 of file FutureDAG.h.

References folly::pushmi::executor, folly::gen::move, and nodes.

Referenced by go().

32  {
33  nodes.emplace_back(std::move(func), executor);
34  return nodes.size() - 1;
35  }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
std::vector< Node > nodes
Definition: FutureDAG.h:200
void folly::FutureDAG::clean_state ( Handle  source,
Handle  sink 
)
inline

Definition at line 84 of file FutureDAG.h.

References nodes.

84  {
85  for (auto handle : nodes[sink].dependencies) {
86  nodes[handle].hasDependents = false;
87  }
88  nodes[0].hasDependents = false;
89  remove(source);
90  remove(sink);
91  }
std::vector< Node > nodes
Definition: FutureDAG.h:200
static std::shared_ptr<FutureDAG> folly::FutureDAG::create ( )
inlinestatic

Definition at line 25 of file FutureDAG.h.

References FutureDAG().

Referenced by TEST_F().

25  {
26  return std::shared_ptr<FutureDAG>(new FutureDAG());
27  }
FutureDAG()=default
void folly::FutureDAG::dependency ( Handle  a,
Handle  b 
)
inline

Definition at line 79 of file FutureDAG.h.

References a, b, and nodes.

Referenced by go(), and hasCycle().

79  {
80  nodes[b].dependencies.push_back(a);
81  nodes[a].hasDependents = true;
82  }
char b
std::vector< Node > nodes
Definition: FutureDAG.h:200
char a
Future<Unit> folly::FutureDAG::go ( )
inline

Definition at line 93 of file FutureDAG.h.

References add(), folly::collect(), dependency(), folly::pushmi::executor, FutureDAG(), hasCycle(), folly::gen::move, nodes, and folly::pushmi::detail::t.

93  {
94  if (hasCycle()) {
95  return makeFuture<Unit>(std::runtime_error("Cycle in FutureDAG graph"));
96  }
97  std::vector<Handle> rootNodes;
98  std::vector<Handle> leafNodes;
99  for (Handle handle = 0; handle < nodes.size(); handle++) {
100  if (nodes[handle].dependencies.empty()) {
101  rootNodes.push_back(handle);
102  }
103  if (!nodes[handle].hasDependents) {
104  leafNodes.push_back(handle);
105  }
106  }
107 
108  auto sinkHandle = add([] { return Future<Unit>(); });
109  for (auto handle : leafNodes) {
110  dependency(handle, sinkHandle);
111  }
112 
113  auto sourceHandle = add(nullptr);
114  for (auto handle : rootNodes) {
115  dependency(sourceHandle, handle);
116  }
117 
118  for (Handle handle = 0; handle < nodes.size() - 1; handle++) {
119  std::vector<Future<Unit>> dependencies;
120  for (auto depHandle : nodes[handle].dependencies) {
121  dependencies.push_back(nodes[depHandle].promise.getFuture());
122  }
123 
124  collect(dependencies)
125  .via(nodes[handle].executor)
126  .thenValue([this, handle](std::vector<Unit>&&) {
127  nodes[handle].func().then([this, handle](Try<Unit>&& t) {
128  nodes[handle].promise.setTry(std::move(t));
129  });
130  })
131  .onError([this, handle](exception_wrapper ew) {
132  nodes[handle].promise.setException(std::move(ew));
133  });
134  }
135 
136  nodes[sourceHandle].promise.setValue();
137  return nodes[sinkHandle].promise.getFuture().thenValue(
138  [that = shared_from_this(), sourceHandle, sinkHandle](Unit) {
139  that->clean_state(sourceHandle, sinkHandle);
140  });
141  }
Handle add(FutureFunc func, Executor *executor=nullptr)
Definition: FutureDAG.h:32
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
void dependency(Handle a, Handle b)
Definition: FutureDAG.h:79
std::vector< Node > nodes
Definition: FutureDAG.h:200
Future< std::vector< typename std::iterator_traits< InputIterator >::value_type::value_type > > collect(InputIterator first, InputIterator last)
Definition: Future-inl.h:1536
bool folly::FutureDAG::hasCycle ( )
inlineprivate

Definition at line 146 of file FutureDAG.h.

References dependency(), folly::empty(), and nodes.

Referenced by go().

146  {
147  // Perform a modified topological sort to detect cycles
148  std::vector<std::vector<Handle>> dependencies;
149  for (auto& node : nodes) {
150  dependencies.push_back(node.dependencies);
151  }
152 
153  std::vector<size_t> dependents(nodes.size());
154  for (auto& dependencyEdges : dependencies) {
155  for (auto handle : dependencyEdges) {
156  dependents[handle]++;
157  }
158  }
159 
160  std::vector<Handle> handles;
161  for (Handle handle = 0; handle < nodes.size(); handle++) {
162  if (!nodes[handle].hasDependents) {
163  handles.push_back(handle);
164  }
165  }
166 
167  while (!handles.empty()) {
168  auto handle = handles.back();
169  handles.pop_back();
170  while (!dependencies[handle].empty()) {
171  auto dependency = dependencies[handle].back();
172  dependencies[handle].pop_back();
173  if (--dependents[dependency] == 0) {
174  handles.push_back(dependency);
175  }
176  }
177  }
178 
179  for (auto& dependencyEdges : dependencies) {
180  if (!dependencyEdges.empty()) {
181  return true;
182  }
183  }
184 
185  return false;
186  }
void dependency(Handle a, Handle b)
Definition: FutureDAG.h:79
std::vector< Node > nodes
Definition: FutureDAG.h:200
constexpr auto empty(C const &c) -> decltype(c.empty())
Definition: Access.h:55
void folly::FutureDAG::remove ( Handle  a)
inline

Definition at line 37 of file FutureDAG.h.

References a, folly::test::begin(), folly::test::end(), and nodes.

37  {
38  if (a >= nodes.size()) {
39  return;
40  }
41 
42  if (nodes[a].hasDependents) {
43  for (auto& node : nodes) {
44  auto& deps = node.dependencies;
45  deps.erase(
46  std::remove(std::begin(deps), std::end(deps), a), std::end(deps));
47  for (Handle& handle : deps) {
48  if (handle > a) {
49  handle--;
50  }
51  }
52  }
53  }
54 
55  nodes.erase(nodes.begin() + a);
56  }
auto begin(TestAdlIterable &instance)
Definition: ForeachTest.cpp:56
std::vector< Node > nodes
Definition: FutureDAG.h:200
auto end(TestAdlIterable &instance)
Definition: ForeachTest.cpp:62
char a
void folly::FutureDAG::reset ( )
inline

Definition at line 58 of file FutureDAG.h.

References nodes.

58  {
59  // Delete all but source node, and reset dependency properties
60  Handle source_node;
61  std::unordered_set<Handle> memo;
62  for (auto& node : nodes) {
63  for (Handle handle : node.dependencies) {
64  memo.insert(handle);
65  }
66  }
67  for (Handle handle = 0; handle < nodes.size(); handle++) {
68  if (memo.find(handle) == memo.end()) {
69  source_node = handle;
70  }
71  }
72 
73  nodes.erase(nodes.begin(), nodes.begin() + source_node);
74  nodes.erase(nodes.begin() + 1, nodes.end());
75  nodes[0].hasDependents = false;
76  nodes[0].dependencies.clear();
77  }
std::vector< Node > nodes
Definition: FutureDAG.h:200

Member Data Documentation

std::vector<Node> folly::FutureDAG::nodes
private

Definition at line 200 of file FutureDAG.h.

Referenced by add(), clean_state(), dependency(), go(), hasCycle(), remove(), and reset().


The documentation for this class was generated from the following file: