1 #include "caffe2/utils/threadpool/ThreadPool.h" 2 #include "WorkersPool.h" 3 #include "caffe2/core/logging.h" 7 CAFFE2_DEFINE_bool(caffe2_threadpool_force_inline,
false,
8 "Force to always run jobs on the calling thread");
11 CAFFE2_DEFINE_int(caffe2_threadpool_android_cap,
true,
"");
14 CAFFE2_DEFINE_int(caffe2_threadpool_ios_cap,
true,
"");
22 constexpr
size_t kDefaultMinWorkSize = 8;
24 constexpr
size_t kDefaultMinWorkSize = 80;
27 std::unique_ptr<ThreadPool> ThreadPool::defaultThreadPool() {
28 CAFFE_ENFORCE(cpuinfo_initialize(),
"cpuinfo initialization failed");
29 int numThreads = cpuinfo_get_processors_count();
31 bool applyCap =
false;
33 applyCap = caffe2::FLAGS_caffe2_threadpool_android_cap;
35 applyCap = caffe2::FLAGS_caffe2_threadpool_ios_cap;
40 #if CAFFE2_ANDROID && (CPUINFO_ARCH_ARM || CPUINFO_ARCH_ARM64) 42 switch (cpuinfo_get_core(0)->midr & UINT32_C(0xFF00FFF0)) {
43 case UINT32_C(0x51002110):
44 case UINT32_C(0x51002010):
45 case UINT32_C(0x51002050):
74 numThreads = numThreads / 2;
79 LOG(INFO) <<
"Constructing thread pool with " << numThreads <<
" threads";
80 return caffe2::make_unique<ThreadPool>(numThreads);
83 ThreadPool::ThreadPool(
int numThreads)
84 : minWorkSize_(kDefaultMinWorkSize), numThreads_(numThreads),
85 workersPool_(
std::make_shared<WorkersPool>()) {}
87 ThreadPool::~ThreadPool() {}
89 int ThreadPool::getNumThreads()
const {
90 std::lock_guard<std::mutex> guard(executionMutex_);
97 void ThreadPool::setMinWorkSize(
size_t size) {
98 std::lock_guard<std::mutex> guard(executionMutex_);
102 void ThreadPool::run(
const std::function<
void(
int,
size_t)>& fn,
size_t range) {
103 std::lock_guard<std::mutex> guard(executionMutex_);
106 const bool runLocally = range < minWorkSize_ ||
107 FLAGS_caffe2_threadpool_force_inline ||
112 for (
size_t i = 0; i < range; ++i) {
118 struct FnTask :
public Task {
121 const std::function<void(int, size_t)> *fn_;
125 virtual void Run()
override {
126 for (
auto i = start_; i < end_; ++i) {
132 CAFFE_ENFORCE_GE(numThreads_, 1);
133 const size_t unitsPerTask = (range + numThreads_ - 1) / numThreads_;
134 tasks_.resize(numThreads_);
135 for (
size_t i = 0; i < numThreads_; ++i) {
137 tasks_[i].reset(
new FnTask());
139 auto *task = (FnTask *)tasks_[i].
get();
142 task->start_ = std::min<size_t>(range, i * unitsPerTask);
143 task->end_ = std::min<size_t>(range, (i + 1) * unitsPerTask);
144 if (task->start_ >= task->end_) {
148 CAFFE_ENFORCE_LE(task->start_, range);
149 CAFFE_ENFORCE_LE(task->end_, range);
151 CAFFE_ENFORCE_LE(tasks_.size(), numThreads_);
152 CAFFE_ENFORCE_GE(tasks_.size(), 1);
153 workersPool_->Execute(tasks_);
156 void ThreadPool::withPool(
const std::function<
void(WorkersPool*)>& f) {
157 std::lock_guard<std::mutex> guard(executionMutex_);
158 f(workersPool_.get());
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...