proxygen
ThreadedExecutor.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <atomic>
20 #include <condition_variable>
21 #include <deque>
22 #include <map>
23 #include <memory>
24 #include <mutex>
25 #include <thread>
26 
27 #include <folly/Executor.h>
29 
30 namespace folly {
31 
32 /***
33  * ThreadedExecutor
34  *
35  * An executor for blocking tasks.
36  *
37  * This executor runs each task in its own thread. It works well for tasks
38  * which mostly sleep, but works poorly for tasks which mostly compute.
39  *
40  * For each task given to the executor with `add`, the executor spawns a new
41  * thread for that task, runs the task in that thread, and joins the thread
42  * after the task has completed.
43  *
44  * Spawning and joining task threads are done in the executor's internal
45  * control thread. Calls to `add` put the tasks to be run into a queue, where
46  * the control thread will find them.
47  *
48  * There is currently no limitation on, or throttling of, concurrency.
49  *
50  * This executor is not currently optimized for performance. For example, it
51  * makes no attempt to re-use task threads. Rather, it exists primarily to
52  * offload sleep-heavy tasks from the CPU executor, where they might otherwise
53  * be run.
54  */
55 class ThreadedExecutor : public virtual folly::Executor {
56  public:
57  explicit ThreadedExecutor(
58  std::shared_ptr<ThreadFactory> threadFactory = newDefaultThreadFactory());
59  ~ThreadedExecutor() override;
60 
61  ThreadedExecutor(ThreadedExecutor const&) = delete;
63 
66 
67  void add(Func func) override;
68 
69  private:
70  static std::shared_ptr<ThreadFactory> newDefaultThreadFactory();
71 
72  void notify();
73  void control();
74  void controlWait();
75  bool controlPerformAll();
78 
79  void work(Func& func);
80 
81  std::shared_ptr<ThreadFactory> threadFactory_;
82 
83  std::atomic<bool> stopping_{false};
84 
86  std::condition_variable controlc_;
87  bool controls_ = false;
88  std::thread controlt_;
89 
91  std::deque<Func> enqueued_;
92 
93  // Accessed only by the control thread, so no synchronization.
94  std::map<std::thread::id, std::thread> running_;
95 
97  std::deque<std::thread::id> finished_;
98 };
99 } // namespace folly
std::deque< Func > enqueued_
std::condition_variable controlc_
static std::shared_ptr< ThreadFactory > newDefaultThreadFactory()
ThreadedExecutor & operator=(ThreadedExecutor const &)=delete
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::deque< std::thread::id > finished_
std::shared_ptr< ThreadFactory > threadFactory_
void add(Func func) override
std::atomic< bool > stopping_
std::mutex mutex
std::map< std::thread::id, std::thread > running_
ThreadedExecutor(std::shared_ptr< ThreadFactory > threadFactory=newDefaultThreadFactory())