/* Copyright 2015 Adobe Systems Incorporated Distributed under the MIT License (see license at http://stlab.adobe.com/licenses.html) This file is intended as example code and is not production quality. */ /**************************************************************************************************/ #include #include #include #include #include #include /**************************************************************************************************/ using namespace std; using namespace boost::multiprecision; /**************************************************************************************************/ template T power(T x, N n, O op) { if (n == 0) return identity_element(op); while ((n & 1) == 0) { n >>= 1; x = op(x, x); } T result = x; n >>= 1; while (n != 0) { x = op(x, x); if ((n & 1) != 0) result = op(result, x); n >>= 1; } return result; } /**************************************************************************************************/ template struct multiply_2x2 { array operator()(const array& x, const array& y) { return { x[0] * y[0] + x[1] * y[2], x[0] * y[1] + x[1] * y[3], x[2] * y[0] + x[3] * y[2], x[2] * y[1] + x[3] * y[3] }; } }; template array identity_element(const multiply_2x2&) { return { N(1), N(0), N(0), N(1) }; } template R fibonacci(N n) { if (n == 0) return R(0); return power(array{ 1, 1, 1, 0 }, N(n - 1), multiply_2x2())[0]; } /**************************************************************************************************/ using lock_t = unique_lock; class notification_queue { deque> _q; bool _done{false}; mutex _mutex; condition_variable _ready; public: bool try_pop(function& x) { lock_t lock{_mutex, try_to_lock}; if (!lock || _q.empty()) return false; x = move(_q.front()); _q.pop_front(); return true; } template bool try_push(F&& f) { { lock_t lock{_mutex, try_to_lock}; if (!lock) return false; _q.emplace_back(forward(f)); } _ready.notify_one(); return true; } void done() { { unique_lock lock{_mutex}; _done = true; } _ready.notify_all(); } bool pop(function& x) { lock_t lock{_mutex}; while (_q.empty() && !_done) _ready.wait(lock); if (_q.empty()) return false; x = move(_q.front()); _q.pop_front(); return true; } template void push(F&& f) { { lock_t lock{_mutex}; _q.emplace_back(forward(f)); } _ready.notify_one(); } }; /**************************************************************************************************/ class task_system { const unsigned _count{thread::hardware_concurrency()}; vector _threads; vector _q{_count}; atomic _index{0}; void run(unsigned i) { while (true) { function f; for (unsigned n = 0; n != _count * 32; ++n) { if (_q[(i + n) % _count].try_pop(f)) break; } if (!f && !_q[i].pop(f)) break; f(); } } public: task_system() { for (unsigned n = 0; n != _count; ++n) { _threads.emplace_back([&, n]{ run(n); }); } } ~task_system() { for (auto& e : _q) e.done(); for (auto& e : _threads) e.join(); } template void async_(F&& f) { auto i = _index++; for (unsigned n = 0; n != _count; ++n) { if (_q[(i + n) % _count].try_push(forward(f))) return; } _q[i % _count].push(forward(f)); } }; /**************************************************************************************************/ task_system _system; /**************************************************************************************************/ template struct result_of_; template struct result_of_ { using type = R; }; template using result_of_t_ = typename result_of_::type; /**************************************************************************************************/ template struct shared_base { vector _r; // optional mutex _mutex; condition_variable _ready; vector> _then; virtual ~shared_base() { } void set(R&& r) { vector> then; { lock_t lock{_mutex}; _r.push_back(move(r)); swap(_then, then); } _ready.notify_all(); for (const auto& f : then) _system.async_(move(f)); } template void then(F&& f) { bool resolved{false}; { lock_t lock{_mutex}; if (_r.empty()) _then.push_back(forward(f)); else resolved = true; } if (resolved) _system.async_(move(f)); } const R& get() { lock_t lock{_mutex}; while (_r.empty()) _ready.wait(lock); return _r.back(); } }; template struct shared; // not defined template struct shared : shared_base { function _f; template shared(F&& f) : _f(forward(f)) { } template void operator()(A&&... args) { this->set(_f(forward(args)...)); _f = nullptr; } }; template class packaged_task; //not defined template class future; template auto package(F&& f) -> pair, future>>; template class future { shared_ptr> _p; template friend auto package(F&& f) -> pair, future>>; explicit future(shared_ptr> p) : _p(move(p)) { } public: future() = default; template auto then(F&& f) { auto pack = package()>([p = _p, f = forward(f)](){ return f(p->_r.back()); }); _p->then(move(pack.first)); return pack.second; } const R& get() const { return _p->get(); } }; template class packaged_task { weak_ptr> _p; template friend auto package(F&& f) -> pair, future>>; explicit packaged_task(weak_ptr> p) : _p(move(p)) { } public: packaged_task() = default; template void operator()(A&&... args) const { auto p = _p.lock(); if (p) (*p)(forward(args)...); } }; template auto package(F&& f) -> pair, future>> { auto p = make_shared>(forward(f)); return make_pair(packaged_task(p), future>(p)); } /**************************************************************************************************/ template auto async(F&& f, Args&&... args) { using result_type = result_of_t; using packaged_type = packaged_task; auto pack = package(bind(forward(f), forward(args)...)); _system.async_(move(get<0>(pack))); return get<1>(pack); } /**************************************************************************************************/ int main() { future x = async([]{ return fibonacci(100); }); future y = x.then([](const cpp_int& x){ return cpp_int(x * 2); }); future z = x.then([](const cpp_int& x){ return cpp_int(x / 15); }); cout << y.get() << endl; cout << z.get() << endl; }