23 class FutureDAG :
public std::enable_shared_from_this<FutureDAG> {
25 static std::shared_ptr<FutureDAG>
create() {
26 return std::shared_ptr<FutureDAG>(
new FutureDAG());
34 return nodes.size() - 1;
37 void remove(Handle
a) {
42 if (
nodes[
a].hasDependents) {
43 for (
auto& node :
nodes) {
44 auto& deps = node.dependencies;
47 for (Handle& handle : deps) {
61 std::unordered_set<Handle> memo;
62 for (
auto& node :
nodes) {
63 for (Handle handle : node.dependencies) {
67 for (Handle handle = 0; handle < nodes.size(); handle++) {
68 if (memo.find(handle) == memo.end()) {
73 nodes.erase(nodes.begin(), nodes.begin() + source_node);
74 nodes.erase(nodes.begin() + 1, nodes.end());
75 nodes[0].hasDependents =
false;
76 nodes[0].dependencies.clear();
80 nodes[
b].dependencies.push_back(a);
81 nodes[
a].hasDependents =
true;
85 for (
auto handle :
nodes[sink].dependencies) {
86 nodes[handle].hasDependents =
false;
88 nodes[0].hasDependents =
false;
95 return makeFuture<Unit>(std::runtime_error(
"Cycle in FutureDAG graph"));
97 std::vector<Handle> rootNodes;
98 std::vector<Handle> leafNodes;
99 for (Handle handle = 0; handle <
nodes.size(); handle++) {
100 if (
nodes[handle].dependencies.empty()) {
101 rootNodes.push_back(handle);
103 if (!
nodes[handle].hasDependents) {
104 leafNodes.push_back(handle);
109 for (
auto handle : leafNodes) {
113 auto sourceHandle =
add(
nullptr);
114 for (
auto handle : rootNodes) {
118 for (Handle handle = 0; handle <
nodes.size() - 1; handle++) {
119 std::vector<Future<Unit>> dependencies;
120 for (
auto depHandle :
nodes[handle].dependencies) {
121 dependencies.push_back(
nodes[depHandle].promise.getFuture());
126 .thenValue([
this, handle](std::vector<Unit>&&) {
136 nodes[sourceHandle].promise.setValue();
137 return nodes[sinkHandle].promise.getFuture().thenValue(
138 [that = shared_from_this(), sourceHandle, sinkHandle](
Unit) {
139 that->clean_state(sourceHandle, sinkHandle);
148 std::vector<std::vector<Handle>> dependencies;
149 for (
auto& node :
nodes) {
150 dependencies.push_back(node.dependencies);
153 std::vector<size_t> dependents(nodes.size());
154 for (
auto& dependencyEdges : dependencies) {
155 for (
auto handle : dependencyEdges) {
156 dependents[handle]++;
160 std::vector<Handle> handles;
161 for (Handle handle = 0; handle < nodes.size(); handle++) {
162 if (!nodes[handle].hasDependents) {
163 handles.push_back(handle);
167 while (!handles.empty()) {
168 auto handle = handles.back();
170 while (!dependencies[handle].
empty()) {
171 auto dependency = dependencies[handle].back();
172 dependencies[handle].pop_back();
179 for (
auto& dependencyEdges : dependencies) {
180 if (!dependencyEdges.empty()) {
204 template <
typename T>
215 this->dag->go().get();
219 this->dag->go().get();
std::vector< T > dep_states
Handle add(FutureFunc func, Executor *executor=nullptr)
constexpr detail::Map< Move > move
auto begin(TestAdlIterable &instance)
SharedPromise< Unit > promise
—— Concurrent Priority Queue Implementation ——
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
void dependency(Handle a, Handle b)
Node(FutureFunc &&funcArg, Executor *executorArg)
std::vector< Node > nodes
std::function< Future< Unit >)> FutureFunc
constexpr auto empty(C const &c) -> decltype(c.empty())
static std::shared_ptr< FutureDAG > create()
auto end(TestAdlIterable &instance)
FutureDAGFunctor(T init_val)
virtual void operator()()
Future< std::vector< typename std::iterator_traits< InputIterator >::value_type::value_type > > collect(InputIterator first, InputIterator last)
void clean_state(Handle source, Handle sink)
std::vector< Handle > dependencies
virtual ~FutureDAGFunctor()