proxygen
ManualExecutor.h
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <cstdio>
20 #include <memory>
21 #include <mutex>
22 #include <queue>
23 
28 
29 namespace folly {
39  public ScheduledExecutor,
40  public SequencedExecutor {
41  public:
43 
44  void add(Func) override;
45 
52  size_t run();
53 
54  // Do work until there is no more work to do.
55  // Returns the number of functions that were executed (maybe 0).
56  // Unlike run, this method is not stable. It will chase an infinite tail of
57  // work so should be used with care.
58  // There will be no work available to perform at the moment that this
59  // returns.
60  size_t drain();
61 
63  void wait();
64 
66  void makeProgress() {
67  wait();
68  run();
69  }
70 
72  void drive() override {
73  makeProgress();
74  }
75 
77  template <class F>
78  void waitFor(F const& f) {
79  // TODO(5427828)
80 #if 0
81  while (!f.isReady())
82  makeProgress();
83 #else
84  while (!f.isReady()) {
85  run();
86  }
87 #endif
88  }
89 
90  void scheduleAt(Func&& f, TimePoint const& t) override {
91  std::lock_guard<std::mutex> lock(lock_);
92  scheduledFuncs_.emplace(t, std::move(f));
93  sem_.post();
94  }
95 
100  void advance(Duration const& dur) {
101  advanceTo(now_ + dur);
102  }
103 
106  void advanceTo(TimePoint const& t);
107 
108  TimePoint now() override {
109  return now_;
110  }
111 
114  std::size_t clear() {
115  std::queue<Func> funcs;
116  std::priority_queue<ScheduledFunc> scheduled_funcs;
117 
118  {
119  std::lock_guard<std::mutex> lock(lock_);
120  funcs_.swap(funcs);
121  scheduledFuncs_.swap(scheduled_funcs);
122  }
123 
124  return funcs.size() + scheduled_funcs.size();
125  }
126 
127  private:
129  std::queue<Func> funcs_;
131 
132  // helper class to enable ordering of scheduled events in the priority
133  // queue
134  struct ScheduledFunc {
136  size_t ordinal;
137  Func mutable func;
138 
139  ScheduledFunc(TimePoint const& t, Func&& f) : time(t), func(std::move(f)) {
140  static size_t seq = 0;
141  ordinal = seq++;
142  }
143 
144  bool operator<(ScheduledFunc const& b) const {
145  // Earlier-scheduled things must be *higher* priority
146  // in the max-based std::priority_queue
147  if (time == b.time) {
148  return ordinal > b.ordinal;
149  }
150  return time > b.time;
151  }
152 
153  Func&& moveOutFunc() const {
154  return std::move(func);
155  }
156  };
157  std::priority_queue<ScheduledFunc> scheduledFuncs_;
159 };
160 
161 } // namespace folly
auto f
void makeProgress()
Wait for work to do, and do it.
std::queue< Func > funcs_
char b
std::chrono::steady_clock::time_point TimePoint
void scheduleAt(Func &&f, TimePoint const &t) override
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
void waitFor(F const &f)
makeProgress until this Future is ready.
bool post()
Silently saturates if value is already 2^32-1.
Definition: LifoSem.h:361
STL namespace.
Gen seq(Value first, Value last)
Definition: Base.h:484
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void advance(Duration const &dur)
void wait()
Wait for work to do.
std::chrono::microseconds Duration
void drive() override
Implements DrivableExecutor.
LogLevel min
Definition: LogLevel.cpp:30
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
void advanceTo(TimePoint const &t)
TimePoint now() override
Get this executor&#39;s notion of time. Must be threadsafe.
std::mutex mutex
bool operator<(ScheduledFunc const &b) const
ScheduledFunc(TimePoint const &t, Func &&f)
std::priority_queue< ScheduledFunc > scheduledFuncs_
void add(Func) override