proxygen
then_execute_2.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <cassert>
18 #include <iostream>
19 #include <vector>
20 
21 #include <futures.h>
22 #include <futures_static_thread_pool.h>
23 #include <atomic>
24 #include <functional>
25 #include <memory>
26 #include <thread>
27 #include <utility>
28 
30 
35 
36 using namespace folly::pushmi::aliases;
37 
38 struct inline_executor {
39  public:
40  friend bool operator==(
41  const inline_executor&,
42  const inline_executor&) noexcept {
43  return true;
44  }
45  friend bool operator!=(
46  const inline_executor&,
47  const inline_executor&) noexcept {
48  return false;
49  }
50  template <class Function>
51  void execute(Function f) const noexcept {
52  f();
53  }
54  constexpr bool query(std::experimental::execution::oneway_t) {
55  return true;
56  }
57  constexpr bool query(std::experimental::execution::twoway_t) {
58  return false;
59  }
60  constexpr bool query(std::experimental::execution::single_t) {
61  return true;
62  }
63 };
64 
65 namespace p1054 {
66 // A promise refers to a promise and is associated with a future,
67 // either through type-erasure or through construction of an
68 // underlying promise with an overload of make_promise_contract().
69 
70 // make_promise_contract() cannot be written to produce a lazy future.
71 // the promise has to exist prior to .then() getting a continuation.
72 // there must be a shared allocation to connect the promise and future.
73 template <class T, class Executor>
74 std::pair<
75  std::experimental::standard_promise<T>,
76  std::experimental::standard_future<T, std::decay_t<Executor>>>
77 make_promise_contract(const Executor& e) {
78  std::experimental::standard_promise<T> promise;
79  auto ex = e;
80  return {promise, promise.get_future(std::move(ex))};
81 }
82 
83 template <class Executor, class Function, class Future>
84 std::experimental::standard_future<
85  std::result_of_t<
86  Function(std::decay_t<typename std::decay_t<Future>::value_type>&&)>,
87  std::decay_t<Executor>>
88 then_execute(Executor&& e, Function&& f, Future&& pred) {
89  using V = std::decay_t<typename std::decay_t<Future>::value_type>;
90  using T = std::result_of_t<Function(V &&)>;
91  auto pc = make_promise_contract<T>(e);
92  auto p = std::get<0>(pc);
93  auto r = std::get<1>(pc);
94  ((Future &&) pred).then([e, p, f](V v) mutable {
95  e.execute([p, f, v]() mutable { p.set_value(f(v)); });
96  return 0;
97  });
98  return r;
99 }
100 
101 } // namespace p1054
102 
103 namespace p1055 {
104 
105 template <class Executor, class Function, class Future>
106 auto then_execute(Executor&& e, Function&& f, Future&& pred) {
107  return pred | op::via(mi::strands(e)) |
108  op::transform([f](auto v) { return f(v); });
109 }
110 
111 } // namespace p1055
112 
113 int main() {
114  mi::pool p{std::max(1u, std::thread::hardware_concurrency())};
115 
116  std::experimental::futures_static_thread_pool sp{
117  std::max(1u, std::thread::hardware_concurrency())};
118 
119  auto pc = p1054::make_promise_contract<int>(inline_executor{});
120  auto& pr = std::get<0>(pc);
121  auto& r = std::get<1>(pc);
122  auto f = p1054::then_execute(
123  sp.executor(), [](int v) { return v * 2; }, std::move(r));
124  pr.set_value(42);
125  f.get();
126 
127  p1055::then_execute(p.executor(), [](int v) { return v * 2; }, op::just(21)) |
128  op::get<int>;
129 
130  sp.stop();
131  sp.wait();
132  p.stop();
133  p.wait();
134 
135  std::cout << "OK" << std::endl;
136 }
propagate_const< T > pc
auto f
auto v
requires Invocable< ExecutorFactory & > &&Executor< invoke_result_t< ExecutorFactory & > > &&ConcurrentSequence< invoke_result_t< ExecutorFactory & > > auto strands(ExecutorFactory ef)
Definition: strand.h:246
LogLevel max
Definition: LogLevel.cpp:31
std::pair< std::experimental::standard_promise< T >, std::experimental::standard_future< T, std::decay_t< Executor > > > make_promise_contract(const Executor &e)
int main()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
void execute(Function f) const noexcept
friend bool operator==(const inline_executor &, const inline_executor &) noexcept
friend bool operator!=(const inline_executor &, const inline_executor &) noexcept
folly::std T
constexpr bool query(std::experimental::execution::oneway_t)
requires E e noexcept(noexcept(s.error(std::move(e))))
PUSHMI_INLINE_VAR constexpr detail::transform_fn transform
Definition: transform.h:158
PUSHMI_INLINE_VAR constexpr detail::via_fn via
Definition: via.h:166
std::experimental::standard_future< std::result_of_t< Function(std::decay_t< typename std::decay_t< Future >::value_type > &&)>, std::decay_t< Executor > > then_execute(Executor &&e, Function &&f, Future &&pred)
auto then_execute(Executor &&e, Function &&f, Future &&pred)
constexpr bool query(std::experimental::execution::single_t)
constexpr bool query(std::experimental::execution::twoway_t)
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::operators::just_fn just