proxygen
FutureDAG.h
Go to the documentation of this file.
1 /*
2  * Copyright 2015-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <folly/futures/Future.h>
20 
21 namespace folly {
22 
23 class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
24  public:
25  static std::shared_ptr<FutureDAG> create() {
26  return std::shared_ptr<FutureDAG>(new FutureDAG());
27  }
28 
29  typedef size_t Handle;
30  typedef std::function<Future<Unit>()> FutureFunc;
31 
32  Handle add(FutureFunc func, Executor* executor = nullptr) {
33  nodes.emplace_back(std::move(func), executor);
34  return nodes.size() - 1;
35  }
36 
37  void remove(Handle a) {
38  if (a >= nodes.size()) {
39  return;
40  }
41 
42  if (nodes[a].hasDependents) {
43  for (auto& node : nodes) {
44  auto& deps = node.dependencies;
45  deps.erase(
46  std::remove(std::begin(deps), std::end(deps), a), std::end(deps));
47  for (Handle& handle : deps) {
48  if (handle > a) {
49  handle--;
50  }
51  }
52  }
53  }
54 
55  nodes.erase(nodes.begin() + a);
56  }
57 
58  void reset() {
59  // Delete all but source node, and reset dependency properties
60  Handle source_node;
61  std::unordered_set<Handle> memo;
62  for (auto& node : nodes) {
63  for (Handle handle : node.dependencies) {
64  memo.insert(handle);
65  }
66  }
67  for (Handle handle = 0; handle < nodes.size(); handle++) {
68  if (memo.find(handle) == memo.end()) {
69  source_node = handle;
70  }
71  }
72 
73  nodes.erase(nodes.begin(), nodes.begin() + source_node);
74  nodes.erase(nodes.begin() + 1, nodes.end());
75  nodes[0].hasDependents = false;
76  nodes[0].dependencies.clear();
77  }
78 
79  void dependency(Handle a, Handle b) {
80  nodes[b].dependencies.push_back(a);
81  nodes[a].hasDependents = true;
82  }
83 
84  void clean_state(Handle source, Handle sink) {
85  for (auto handle : nodes[sink].dependencies) {
86  nodes[handle].hasDependents = false;
87  }
88  nodes[0].hasDependents = false;
89  remove(source);
90  remove(sink);
91  }
92 
94  if (hasCycle()) {
95  return makeFuture<Unit>(std::runtime_error("Cycle in FutureDAG graph"));
96  }
97  std::vector<Handle> rootNodes;
98  std::vector<Handle> leafNodes;
99  for (Handle handle = 0; handle < nodes.size(); handle++) {
100  if (nodes[handle].dependencies.empty()) {
101  rootNodes.push_back(handle);
102  }
103  if (!nodes[handle].hasDependents) {
104  leafNodes.push_back(handle);
105  }
106  }
107 
108  auto sinkHandle = add([] { return Future<Unit>(); });
109  for (auto handle : leafNodes) {
110  dependency(handle, sinkHandle);
111  }
112 
113  auto sourceHandle = add(nullptr);
114  for (auto handle : rootNodes) {
115  dependency(sourceHandle, handle);
116  }
117 
118  for (Handle handle = 0; handle < nodes.size() - 1; handle++) {
119  std::vector<Future<Unit>> dependencies;
120  for (auto depHandle : nodes[handle].dependencies) {
121  dependencies.push_back(nodes[depHandle].promise.getFuture());
122  }
123 
124  collect(dependencies)
125  .via(nodes[handle].executor)
126  .thenValue([this, handle](std::vector<Unit>&&) {
127  nodes[handle].func().then([this, handle](Try<Unit>&& t) {
128  nodes[handle].promise.setTry(std::move(t));
129  });
130  })
131  .onError([this, handle](exception_wrapper ew) {
132  nodes[handle].promise.setException(std::move(ew));
133  });
134  }
135 
136  nodes[sourceHandle].promise.setValue();
137  return nodes[sinkHandle].promise.getFuture().thenValue(
138  [that = shared_from_this(), sourceHandle, sinkHandle](Unit) {
139  that->clean_state(sourceHandle, sinkHandle);
140  });
141  }
142 
143  private:
144  FutureDAG() = default;
145 
146  bool hasCycle() {
147  // Perform a modified topological sort to detect cycles
148  std::vector<std::vector<Handle>> dependencies;
149  for (auto& node : nodes) {
150  dependencies.push_back(node.dependencies);
151  }
152 
153  std::vector<size_t> dependents(nodes.size());
154  for (auto& dependencyEdges : dependencies) {
155  for (auto handle : dependencyEdges) {
156  dependents[handle]++;
157  }
158  }
159 
160  std::vector<Handle> handles;
161  for (Handle handle = 0; handle < nodes.size(); handle++) {
162  if (!nodes[handle].hasDependents) {
163  handles.push_back(handle);
164  }
165  }
166 
167  while (!handles.empty()) {
168  auto handle = handles.back();
169  handles.pop_back();
170  while (!dependencies[handle].empty()) {
171  auto dependency = dependencies[handle].back();
172  dependencies[handle].pop_back();
173  if (--dependents[dependency] == 0) {
174  handles.push_back(dependency);
175  }
176  }
177  }
178 
179  for (auto& dependencyEdges : dependencies) {
180  if (!dependencyEdges.empty()) {
181  return true;
182  }
183  }
184 
185  return false;
186  }
187 
188  struct Node {
189  Node(FutureFunc&& funcArg, Executor* executorArg)
190  : func(std::move(funcArg)), executor(executorArg) {}
191 
192  FutureFunc func{nullptr};
193  Executor* executor{nullptr};
195  std::vector<Handle> dependencies;
196  bool hasDependents{false};
197  bool visited{false};
198  };
199 
200  std::vector<Node> nodes;
201 };
202 
203 // Polymorphic functor implementation
204 template <typename T>
206  public:
207  std::shared_ptr<FutureDAG> dag = FutureDAG::create();
209  std::vector<T> dep_states;
210  T result() {
211  return state;
212  }
213  // execReset() runs DAG & clears all nodes except for source
214  void execReset() {
215  this->dag->go().get();
216  this->dag->reset();
217  }
218  void exec() {
219  this->dag->go().get();
220  }
221  virtual void operator()() {}
222  explicit FutureDAGFunctor(T init_val) : state(init_val) {}
223  FutureDAGFunctor() : state() {}
224  virtual ~FutureDAGFunctor() {}
225 };
226 
227 } // namespace folly
char b
std::vector< T > dep_states
Definition: FutureDAG.h:209
Handle add(FutureFunc func, Executor *executor=nullptr)
Definition: FutureDAG.h:32
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
auto begin(TestAdlIterable &instance)
Definition: ForeachTest.cpp:56
SharedPromise< Unit > promise
Definition: FutureDAG.h:194
folly::std T
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
void dependency(Handle a, Handle b)
Definition: FutureDAG.h:79
Future< Unit > go()
Definition: FutureDAG.h:93
Node(FutureFunc &&funcArg, Executor *executorArg)
Definition: FutureDAG.h:189
std::vector< Node > nodes
Definition: FutureDAG.h:200
std::function< Future< Unit >)> FutureFunc
Definition: FutureDAG.h:30
constexpr auto empty(C const &c) -> decltype(c.empty())
Definition: Access.h:55
static std::shared_ptr< FutureDAG > create()
Definition: FutureDAG.h:25
auto end(TestAdlIterable &instance)
Definition: ForeachTest.cpp:62
FutureDAGFunctor(T init_val)
Definition: FutureDAG.h:222
char a
FutureDAG()=default
Definition: Try.h:51
virtual void operator()()
Definition: FutureDAG.h:221
Future< std::vector< typename std::iterator_traits< InputIterator >::value_type::value_type > > collect(InputIterator first, InputIterator last)
Definition: Future-inl.h:1536
void clean_state(Handle source, Handle sink)
Definition: FutureDAG.h:84
std::vector< Handle > dependencies
Definition: FutureDAG.h:195
virtual ~FutureDAGFunctor()
Definition: FutureDAG.h:224
state
Definition: http_parser.c:272