Caffe2 - C++ API
A deep learning, cross platform ML framework
net_async_base.cc
1 #include "caffe2/core/net_async_polling.h"
2 
3 #include "caffe2/core/operator.h"
4 #include "caffe2/core/timer.h"
5 
6 CAFFE2_DEFINE_int(
7  caffe2_streams_per_gpu,
8  32,
9  "Number of streams per GPU to use in GPU thread pool");
10 
11 CAFFE2_DECLARE_bool(caffe2_dag_net_collect_stats);
12 
13 CAFFE2_DEFINE_bool(
14  caffe2_net_async_finish_chain,
15  false,
16  "Wait for chain to finish");
17 
18 CAFFE2_DEFINE_int(
19  caffe2_net_async_max_gpus,
20  16,
21  "Max number of GPUs allowed in net async executor");
22 
23 CAFFE2_DEFINE_int(
24  caffe2_net_async_max_numa_nodes,
25  8,
26  "Max number of NUMA nodes allowed in net async executor");
27 
28 CAFFE2_DEFINE_int(
29  caffe2_net_async_cpu_pool_size,
30  0,
31  "Number of threads in CPU pool (default - number of cores)");
32 
33 CAFFE2_DEFINE_bool(
34  caffe2_net_async_check_stream_status,
35  true,
36  "Select next non-busy stream");
37 
38 namespace caffe2 {
39 
40 thread_local std::vector<int> AsyncNetBase::stream_counters_;
41 
42 AsyncNetBase::AsyncNetBase(
43  const std::shared_ptr<const NetDef>& net_def,
44  Workspace* ws)
45  : NetBase(net_def, ws) {
46  operator_nodes_ = dag_utils::prepareOperatorNodes(net_def, ws);
47  operators_.reserve(operator_nodes_.size());
48  for (const auto& node : operator_nodes_) {
49  operators_.push_back(node.operator_.get());
50  }
51 
52  const auto& execution_chains = dag_utils::computeChains(operator_nodes_);
53  chains_.reserve(execution_chains.size());
54  for (const auto& kv : execution_chains) {
55  chains_.push_back(kv.second);
56  }
57  chain_nodes_ = dag_utils::prepareChainGraphNodes(operator_nodes_, chains_);
58 
59  events_.reserve(chains_.size());
60  for (const auto& chain : chains_) {
61  const auto& op = operators_[chain.back()];
62  events_.push_back(&op->event());
63  }
64 
65  gpu_pools_.resize(FLAGS_caffe2_net_async_max_gpus);
66  cpu_pools_.resize(FLAGS_caffe2_net_async_max_numa_nodes);
67  DeviceOption cpu_option;
68  cpu_option.set_device_type(CPU);
69  cpu_pool_ = ThreadPoolRegistry()->Create(
70  DeviceTypeName(cpu_option.device_type()), cpu_option);
71 }
72 
73 std::shared_ptr<TaskThreadPool> AsyncNetBase::pool_getter(
74  std::vector<std::shared_ptr<TaskThreadPool>>& pools,
75  int pool_idx,
76  const DeviceOption& device_option) {
77  std::unique_lock<std::mutex> pools_lock(pools_mutex_);
78  auto pool = pools[pool_idx];
79  if (!pool) {
80  pool = ThreadPoolRegistry()->Create(
81  DeviceTypeName(device_option.device_type()), device_option);
82  pools[pool_idx] = pool;
83  }
84  return pool;
85 }
86 
87 std::shared_ptr<TaskThreadPool> AsyncNetBase::pool(
88  const DeviceOption& device_option) {
89  if (device_option.device_type() == CPU) {
90  auto numa_node_id = device_option.numa_node_id();
91  if (numa_node_id == -1) {
92  return cpu_pool_;
93  } else {
94  CAFFE_ENFORCE(
95  numa_node_id >= 0 &&
96  numa_node_id < FLAGS_caffe2_net_async_max_numa_nodes,
97  "Invalid NUMA node id: " + caffe2::to_string(numa_node_id));
98  return pool_getter(cpu_pools_, numa_node_id, device_option);
99  }
100  } else if (device_option.device_type() == CUDA) {
101  auto gpu_id = device_option.cuda_gpu_id();
102  CAFFE_ENFORCE(
103  gpu_id >= 0 && gpu_id < FLAGS_caffe2_net_async_max_gpus,
104  "Invalid GPU id: " + caffe2::to_string(gpu_id));
105  return pool_getter(gpu_pools_, gpu_id, device_option);
106  } else {
107  CAFFE_THROW(
108  "Unsupported device type " +
109  caffe2::to_string(device_option.device_type()));
110  }
111 }
112 
113 int AsyncNetBase::stream(int task_id) {
114  const auto& device_option = event(task_id).GetDeviceOption();
115  int stream_id = 0;
116  if (device_option.device_type() == CUDA) {
117  int gpu_id = device_option.cuda_gpu_id();
118  CAFFE_ENFORCE_GE(gpu_id, 0, "Invalid gpu id: " + caffe2::to_string(gpu_id));
119  if (gpu_id >= stream_counters_.size()) {
120  stream_counters_.resize(gpu_id + 1, 0);
121  }
122  do {
123  stream_id = stream_counters_[gpu_id]++;
124  stream_counters_[gpu_id] %= FLAGS_caffe2_streams_per_gpu;
125  } while (!isStreamFree(task_id, stream_id) &&
126  FLAGS_caffe2_net_async_check_stream_status);
127  }
128  return stream_id;
129 }
130 
131 bool AsyncNetBase::isStreamFree(int task_id, int stream_id) const {
132  auto& task = chains_[task_id];
133  auto& last_task_op = operators_[task.back()];
134  return last_task_op->IsStreamFree(stream_id);
135 }
136 
137 bool AsyncNetBase::canSchedule(
138  int task_id,
139  const std::vector<EventStatus>* status) {
140  auto first_child_op_id = chains_[task_id].front();
141  for (auto parent_id : parents(task_id)) {
142  auto last_parent_op_id = chains_[parent_id].back();
143  EventStatus parent_status;
144  if (status) {
145  parent_status = status->at(parent_id);
146  } else {
147  parent_status = operators_[last_parent_op_id]->event().Query();
148  }
149  bool can_schedule = Event::CanSchedule(
150  operators_[last_parent_op_id]->event().GetType(),
151  parent_status,
152  operators_[first_child_op_id]->event().GetType(),
153  operators_[first_child_op_id]->SupportsAsyncScheduling());
154  if (!can_schedule) {
155  return false;
156  }
157  }
158 
159  return true;
160 }
161 
162 int AsyncNetBase::tasksNum() const {
163  return chains_.size();
164 }
165 
166 Event& AsyncNetBase::event(int task_id) const {
167  auto& task = chains_[task_id];
168  auto& last_task_op = operators_[task.back()];
169  return last_task_op->event();
170 }
171 
172 EventStatus AsyncNetBase::query(int task_id) const {
173  return event(task_id).Query();
174 }
175 
176 const std::vector<int>& AsyncNetBase::children(int task_id) const {
177  const auto& task_node = chain_nodes_[task_id];
178  return task_node.children_;
179 }
180 
181 const std::vector<int>& AsyncNetBase::parents(int task_id) const {
182  const auto& task_node = chain_nodes_[task_id];
183  return task_node.parents_;
184 }
185 
186 void AsyncNetBase::asyncWait(
187  int task_id,
188  int stream_id,
189  const std::vector<int>& wait_task_ids) const {
190  auto first_op_id = chains_[task_id].front();
191  auto& first_op = operators_[first_op_id];
192  std::vector<const Event*> events;
193  events.reserve(wait_task_ids.size());
194  for (auto wait_task_id : wait_task_ids) {
195  events.push_back(&event(wait_task_id));
196  }
197  first_op->WaitEvents(events, stream_id);
198 }
199 
200 void AsyncNetBase::run(int task_id, int stream_id) {
201  std::string err_msg;
202  for (auto& op_id : chains_[task_id]) {
203  auto& op = operators_[op_id];
204  try {
205  CAFFE_ENFORCE(op->RunAsync(stream_id), "Failed to execute an op");
206  } catch (const std::exception& e) {
207  CAFFE_THROW(
208  std::string(e.what()) + ", op " +
209  (op->has_debug_def() ? op->type() : " unknown"));
210  } catch (...) {
211  CAFFE_THROW(
212  "Failed to execute task: unknown error, op " +
213  (op->has_debug_def() ? op->type() : " unknown"));
214  }
215  }
216 
217  if (FLAGS_caffe2_net_async_finish_chain) {
218  operators_[chains_[task_id].back()]->event().Finish();
219  }
220 }
221 
222 void AsyncNetBase::finishTasks(const std::unordered_set<int>& task_ids) {
223  for (const auto& task_id : task_ids) {
224  event(task_id).Finish();
225  }
226 }
227 
228 void AsyncNetBase::finalizeEvents() {
229  for (auto task_id = 0; task_id < tasksNum(); ++task_id) {
230  auto status = query(task_id);
231  if (status == EventStatus::EVENT_SCHEDULED) {
232  event(task_id).Finish();
233  } else if (status == EventStatus::EVENT_INITIALIZED) {
234  event(task_id).SetFinished();
235  }
236  }
237 }
238 
239 AsyncNetBase::~AsyncNetBase() {}
240 
241 CAFFE_DEFINE_SHARED_REGISTRY(
242  ThreadPoolRegistry,
243  TaskThreadPool,
244  const DeviceOption&);
245 
246 namespace {
247 std::shared_ptr<TaskThreadPool> AsyncNetCPUThreadPoolCreator(
248  const DeviceOption& device_option) {
249  CAFFE_ENFORCE_EQ(
250  device_option.device_type(),
251  CPU,
252  "Unexpected device type for CPU thread pool");
253  return GetAsyncNetCPUThreadPool(device_option.numa_node_id());
254 }
255 } // namespace
256 
257 CAFFE_REGISTER_CREATOR(ThreadPoolRegistry, CPU, AsyncNetCPUThreadPoolCreator);
258 
259 /* static */
260 std::shared_ptr<TaskThreadPool> GetAsyncNetCPUThreadPool(int numa_node_id) {
261  // Note: numa_node_id = -1 (DeviceOption's default value) corresponds to
262  // no NUMA used
263  static std::unordered_map<int, std::weak_ptr<TaskThreadPool>> pools;
264  static std::mutex pool_mutex;
265  std::lock_guard<std::mutex> lock(pool_mutex);
266 
267  std::shared_ptr<TaskThreadPool> shared_pool = nullptr;
268  if (pools.count(numa_node_id)) {
269  shared_pool = pools.at(numa_node_id).lock();
270  }
271  if (!shared_pool) {
272  auto pool_size = FLAGS_caffe2_net_async_cpu_pool_size;
273  if (pool_size <= 0) {
274  auto num_cores = std::thread::hardware_concurrency();
275  CAFFE_ENFORCE(num_cores > 0, "Failed to get number of CPU cores");
276  pool_size = num_cores;
277  }
278  LOG(INFO) << "Using cpu pool size: " << pool_size;
279  shared_pool = std::make_shared<TaskThreadPool>(pool_size, numa_node_id);
280  pools[numa_node_id] = shared_pool;
281  }
282  return shared_pool;
283 }
284 
285 } // namespace caffe2
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...