1 #include "caffe2/core/net_async_polling.h" 3 #include "caffe2/core/operator.h" 4 #include "caffe2/core/timer.h" 7 caffe2_streams_per_gpu,
9 "Number of streams per GPU to use in GPU thread pool");
11 CAFFE2_DECLARE_bool(caffe2_dag_net_collect_stats);
14 caffe2_net_async_finish_chain,
16 "Wait for chain to finish");
19 caffe2_net_async_max_gpus,
21 "Max number of GPUs allowed in net async executor");
24 caffe2_net_async_max_numa_nodes,
26 "Max number of NUMA nodes allowed in net async executor");
29 caffe2_net_async_cpu_pool_size,
31 "Number of threads in CPU pool (default - number of cores)");
34 caffe2_net_async_check_stream_status,
36 "Select next non-busy stream");
40 thread_local std::vector<int> AsyncNetBase::stream_counters_;
42 AsyncNetBase::AsyncNetBase(
43 const std::shared_ptr<const NetDef>& net_def,
45 : NetBase(net_def, ws) {
46 operator_nodes_ = dag_utils::prepareOperatorNodes(net_def, ws);
47 operators_.reserve(operator_nodes_.size());
48 for (
const auto& node : operator_nodes_) {
49 operators_.push_back(node.operator_.get());
52 const auto& execution_chains = dag_utils::computeChains(operator_nodes_);
53 chains_.reserve(execution_chains.size());
54 for (
const auto& kv : execution_chains) {
55 chains_.push_back(kv.second);
57 chain_nodes_ = dag_utils::prepareChainGraphNodes(operator_nodes_, chains_);
59 events_.reserve(chains_.size());
60 for (
const auto& chain : chains_) {
61 const auto& op = operators_[chain.back()];
62 events_.push_back(&op->event());
65 gpu_pools_.resize(FLAGS_caffe2_net_async_max_gpus);
66 cpu_pools_.resize(FLAGS_caffe2_net_async_max_numa_nodes);
67 DeviceOption cpu_option;
68 cpu_option.set_device_type(CPU);
69 cpu_pool_ = ThreadPoolRegistry()->Create(
70 DeviceTypeName(cpu_option.device_type()), cpu_option);
73 std::shared_ptr<TaskThreadPool> AsyncNetBase::pool_getter(
74 std::vector<std::shared_ptr<TaskThreadPool>>& pools,
76 const DeviceOption& device_option) {
77 std::unique_lock<std::mutex> pools_lock(pools_mutex_);
78 auto pool = pools[pool_idx];
80 pool = ThreadPoolRegistry()->Create(
81 DeviceTypeName(device_option.device_type()), device_option);
82 pools[pool_idx] = pool;
87 std::shared_ptr<TaskThreadPool> AsyncNetBase::pool(
88 const DeviceOption& device_option) {
89 if (device_option.device_type() == CPU) {
90 auto numa_node_id = device_option.numa_node_id();
91 if (numa_node_id == -1) {
96 numa_node_id < FLAGS_caffe2_net_async_max_numa_nodes,
97 "Invalid NUMA node id: " + caffe2::to_string(numa_node_id));
98 return pool_getter(cpu_pools_, numa_node_id, device_option);
100 }
else if (device_option.device_type() == CUDA) {
101 auto gpu_id = device_option.cuda_gpu_id();
103 gpu_id >= 0 && gpu_id < FLAGS_caffe2_net_async_max_gpus,
104 "Invalid GPU id: " + caffe2::to_string(gpu_id));
105 return pool_getter(gpu_pools_, gpu_id, device_option);
108 "Unsupported device type " +
109 caffe2::to_string(device_option.device_type()));
113 int AsyncNetBase::stream(
int task_id) {
114 const auto& device_option = event(task_id).GetDeviceOption();
116 if (device_option.device_type() == CUDA) {
117 int gpu_id = device_option.cuda_gpu_id();
118 CAFFE_ENFORCE_GE(gpu_id, 0,
"Invalid gpu id: " + caffe2::to_string(gpu_id));
119 if (gpu_id >= stream_counters_.size()) {
120 stream_counters_.resize(gpu_id + 1, 0);
123 stream_id = stream_counters_[gpu_id]++;
124 stream_counters_[gpu_id] %= FLAGS_caffe2_streams_per_gpu;
125 }
while (!isStreamFree(task_id, stream_id) &&
126 FLAGS_caffe2_net_async_check_stream_status);
131 bool AsyncNetBase::isStreamFree(
int task_id,
int stream_id)
const {
132 auto& task = chains_[task_id];
133 auto& last_task_op = operators_[task.back()];
134 return last_task_op->IsStreamFree(stream_id);
137 bool AsyncNetBase::canSchedule(
139 const std::vector<EventStatus>* status) {
140 auto first_child_op_id = chains_[task_id].front();
141 for (
auto parent_id : parents(task_id)) {
142 auto last_parent_op_id = chains_[parent_id].back();
143 EventStatus parent_status;
145 parent_status = status->at(parent_id);
147 parent_status = operators_[last_parent_op_id]->event().Query();
149 bool can_schedule = Event::CanSchedule(
150 operators_[last_parent_op_id]->event().GetType(),
152 operators_[first_child_op_id]->event().GetType(),
153 operators_[first_child_op_id]->SupportsAsyncScheduling());
162 int AsyncNetBase::tasksNum()
const {
163 return chains_.size();
166 Event& AsyncNetBase::event(
int task_id)
const {
167 auto& task = chains_[task_id];
168 auto& last_task_op = operators_[task.back()];
169 return last_task_op->event();
172 EventStatus AsyncNetBase::query(
int task_id)
const {
173 return event(task_id).Query();
176 const std::vector<int>& AsyncNetBase::children(
int task_id)
const {
177 const auto& task_node = chain_nodes_[task_id];
178 return task_node.children_;
181 const std::vector<int>& AsyncNetBase::parents(
int task_id)
const {
182 const auto& task_node = chain_nodes_[task_id];
183 return task_node.parents_;
186 void AsyncNetBase::asyncWait(
189 const std::vector<int>& wait_task_ids)
const {
190 auto first_op_id = chains_[task_id].front();
191 auto& first_op = operators_[first_op_id];
192 std::vector<const Event*> events;
193 events.reserve(wait_task_ids.size());
194 for (
auto wait_task_id : wait_task_ids) {
195 events.push_back(&event(wait_task_id));
197 first_op->WaitEvents(events, stream_id);
200 void AsyncNetBase::run(
int task_id,
int stream_id) {
202 for (
auto& op_id : chains_[task_id]) {
203 auto& op = operators_[op_id];
205 CAFFE_ENFORCE(op->RunAsync(stream_id),
"Failed to execute an op");
206 }
catch (
const std::exception& e) {
208 std::string(e.what()) +
", op " +
209 (op->has_debug_def() ? op->type() :
" unknown"));
212 "Failed to execute task: unknown error, op " +
213 (op->has_debug_def() ? op->type() :
" unknown"));
217 if (FLAGS_caffe2_net_async_finish_chain) {
218 operators_[chains_[task_id].back()]->event().Finish();
222 void AsyncNetBase::finishTasks(
const std::unordered_set<int>& task_ids) {
223 for (
const auto& task_id : task_ids) {
224 event(task_id).Finish();
228 void AsyncNetBase::finalizeEvents() {
229 for (
auto task_id = 0; task_id < tasksNum(); ++task_id) {
230 auto status = query(task_id);
231 if (status == EventStatus::EVENT_SCHEDULED) {
232 event(task_id).Finish();
233 }
else if (status == EventStatus::EVENT_INITIALIZED) {
234 event(task_id).SetFinished();
239 AsyncNetBase::~AsyncNetBase() {}
241 CAFFE_DEFINE_SHARED_REGISTRY(
244 const DeviceOption&);
247 std::shared_ptr<TaskThreadPool> AsyncNetCPUThreadPoolCreator(
248 const DeviceOption& device_option) {
250 device_option.device_type(),
252 "Unexpected device type for CPU thread pool");
253 return GetAsyncNetCPUThreadPool(device_option.numa_node_id());
257 CAFFE_REGISTER_CREATOR(ThreadPoolRegistry, CPU, AsyncNetCPUThreadPoolCreator);
260 std::shared_ptr<TaskThreadPool> GetAsyncNetCPUThreadPool(
int numa_node_id) {
263 static std::unordered_map<int, std::weak_ptr<TaskThreadPool>> pools;
264 static std::mutex pool_mutex;
265 std::lock_guard<std::mutex> lock(pool_mutex);
267 std::shared_ptr<TaskThreadPool> shared_pool =
nullptr;
268 if (pools.count(numa_node_id)) {
269 shared_pool = pools.at(numa_node_id).lock();
272 auto pool_size = FLAGS_caffe2_net_async_cpu_pool_size;
273 if (pool_size <= 0) {
274 auto num_cores = std::thread::hardware_concurrency();
275 CAFFE_ENFORCE(num_cores > 0,
"Failed to get number of CPU cores");
276 pool_size = num_cores;
278 LOG(INFO) <<
"Using cpu pool size: " << pool_size;
279 shared_pool = std::make_shared<TaskThreadPool>(pool_size, numa_node_id);
280 pools[numa_node_id] = shared_pool;
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...