Caffe2 - C++ API
A deep learning, cross platform ML framework
ThreadPool.cc
1 #include "caffe2/utils/threadpool/ThreadPool.h"
2 #include "WorkersPool.h"
3 #include "caffe2/core/logging.h"
4 
5 #include <cpuinfo.h>
6 
7 CAFFE2_DEFINE_bool(caffe2_threadpool_force_inline, false,
8  "Force to always run jobs on the calling thread");
9 
10 // Whether or not threadpool caps apply to Android
11 CAFFE2_DEFINE_int(caffe2_threadpool_android_cap, true, "");
12 
13 // Whether or not threadpool caps apply to iOS
14 CAFFE2_DEFINE_int(caffe2_threadpool_ios_cap, true, "");
15 
16 
17 namespace caffe2 {
18 
19 // Default smallest amount of work that will be partitioned between
20 // multiple threads; the runtime value is configurable
21 #if CAFFE2_ANDROID
22 constexpr size_t kDefaultMinWorkSize = 8;
23 #else
24 constexpr size_t kDefaultMinWorkSize = 80;
25 #endif
26 
27 std::unique_ptr<ThreadPool> ThreadPool::defaultThreadPool() {
28  CAFFE_ENFORCE(cpuinfo_initialize(), "cpuinfo initialization failed");
29  int numThreads = cpuinfo_get_processors_count();
30 
31  bool applyCap = false;
32 #if CAFFE2_ANDROID
33  applyCap = caffe2::FLAGS_caffe2_threadpool_android_cap;
34 #elif CAFFE2_IOS
35  applyCap = caffe2::FLAGS_caffe2_threadpool_ios_cap;
36 #endif
37 
38  if (applyCap) {
39  switch (numThreads) {
40 #if CAFFE2_ANDROID && (CPUINFO_ARCH_ARM || CPUINFO_ARCH_ARM64)
41  case 4:
42  switch (cpuinfo_get_core(0)->midr & UINT32_C(0xFF00FFF0)) {
43  case UINT32_C(0x51002110): /* Snapdragon 820 Kryo Silver */
44  case UINT32_C(0x51002010): /* Snapdragon 821 Kryo Silver */
45  case UINT32_C(0x51002050): /* Snapdragon 820/821 Kryo Gold */
46  /* Kryo: 2+2 big.LITTLE */
47  numThreads = 2;
48  break;
49  default:
50  /* Anything else: assume homogeneous architecture */
51  numThreads = 4;
52  break;
53  }
54  break;
55 #endif
56  case 5:
57  /* 4+1 big.LITTLE */
58  numThreads = 4;
59  break;
60  case 6:
61  /* 2+4 big.LITTLE */
62  numThreads = 2;
63  break;
64  case 8:
65  /* 4+4 big.LITTLE */
66  numThreads = 4;
67  break;
68  case 10:
69  /* 4+4+2 Min.Med.Max, running on Med cores */
70  numThreads = 4;
71  break;
72  default:
73  if (numThreads > 4) {
74  numThreads = numThreads / 2;
75  }
76  break;
77  }
78  }
79  LOG(INFO) << "Constructing thread pool with " << numThreads << " threads";
80  return caffe2::make_unique<ThreadPool>(numThreads);
81 }
82 
83 ThreadPool::ThreadPool(int numThreads)
84  : minWorkSize_(kDefaultMinWorkSize), numThreads_(numThreads),
85  workersPool_(std::make_shared<WorkersPool>()) {}
86 
87 ThreadPool::~ThreadPool() {}
88 
89 int ThreadPool::getNumThreads() const {
90  std::lock_guard<std::mutex> guard(executionMutex_);
91  return numThreads_;
92 }
93 
94 // Sets the minimum work size (range) for which to invoke the
95 // threadpool; work sizes smaller than this will just be run on the
96 // main (calling) thread
97 void ThreadPool::setMinWorkSize(size_t size) {
98  std::lock_guard<std::mutex> guard(executionMutex_);
99  minWorkSize_ = size;
100 }
101 
102 void ThreadPool::run(const std::function<void(int, size_t)>& fn, size_t range) {
103  std::lock_guard<std::mutex> guard(executionMutex_);
104  // If there are no worker threads, or if the range is too small (too
105  // little work), just run locally
106  const bool runLocally = range < minWorkSize_ ||
107  FLAGS_caffe2_threadpool_force_inline ||
108  (numThreads_ == 0);
109  if (runLocally) {
110  // Work is small enough to just run locally; multithread overhead
111  // is too high
112  for (size_t i = 0; i < range; ++i) {
113  fn(0, i);
114  }
115  return;
116  }
117 
118  struct FnTask : public Task {
119  FnTask(){};
120  virtual ~FnTask(){};
121  const std::function<void(int, size_t)> *fn_;
122  int idx_;
123  size_t start_;
124  size_t end_;
125  virtual void Run() override {
126  for (auto i = start_; i < end_; ++i) {
127  (*fn_)(idx_, i);
128  }
129  }
130  };
131 
132  CAFFE_ENFORCE_GE(numThreads_, 1);
133  const size_t unitsPerTask = (range + numThreads_ - 1) / numThreads_;
134  tasks_.resize(numThreads_);
135  for (size_t i = 0; i < numThreads_; ++i) {
136  if (!tasks_[i]) {
137  tasks_[i].reset(new FnTask());
138  }
139  auto *task = (FnTask *)tasks_[i].get();
140  task->fn_ = &fn;
141  task->idx_ = i;
142  task->start_ = std::min<size_t>(range, i * unitsPerTask);
143  task->end_ = std::min<size_t>(range, (i + 1) * unitsPerTask);
144  if (task->start_ >= task->end_) {
145  tasks_.resize(i);
146  break;
147  }
148  CAFFE_ENFORCE_LE(task->start_, range);
149  CAFFE_ENFORCE_LE(task->end_, range);
150  }
151  CAFFE_ENFORCE_LE(tasks_.size(), numThreads_);
152  CAFFE_ENFORCE_GE(tasks_.size(), 1);
153  workersPool_->Execute(tasks_);
154 }
155 
156 void ThreadPool::withPool(const std::function<void(WorkersPool*)>& f) {
157  std::lock_guard<std::mutex> guard(executionMutex_);
158  f(workersPool_.get());
159 }
160 
161 } // namespace caffe2
Definition: types.h:72
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...