1 #include "caffe2/core/net_async_dag_gpu.h" 5 #include <unordered_map> 6 #include <unordered_set> 8 #include "caffe2/core/operator.h" 9 #include "caffe2/core/static_tracepoint.h" 10 #include "caffe2/core/timer.h" 11 #include "caffe2/proto/caffe2.pb.h" 12 #include "caffe2/utils/proto_utils.h" 14 #include "caffe2/core/context_gpu.h" 16 #ifdef CAFFE2_USE_NVTX 17 #include <nvToolsExt.h> 20 CAFFE2_DEFINE_bool(caffe2_use_nvtx,
false,
"Use NVTX ranges for profiling");
23 caffe2_async_dag_use_multiple_streams,
25 "Use multiple streams per thread");
27 CAFFE2_DECLARE_bool(caffe2_dag_net_collect_stats);
29 CAFFE2_DECLARE_bool(caffe2_net_async_finish_chain);
31 CAFFE2_DECLARE_int(caffe2_streams_per_gpu);
33 CAFFE2_DECLARE_bool(caffe2_net_async_check_stream_status);
37 thread_local std::vector<int> AsyncDAGNet::stream_counters_;
41 using Color = int32_t;
42 constexpr Color kRunColor = 0x0000CCFF;
43 constexpr Color kRecordColor = 0x00FF3300;
44 constexpr Color kWaitColor = 0x0066FF33;
46 #ifdef CAFFE2_USE_NVTX 50 ProfiledRange(
const OperatorDef& def, Color color) {
51 if (!FLAGS_caffe2_use_nvtx) {
54 nvtxEventAttributes_t eventAttrib = {0};
55 eventAttrib.version = NVTX_VERSION;
56 eventAttrib.
size = NVTX_EVENT_ATTRIB_STRUCT_SIZE;
57 eventAttrib.colorType = NVTX_COLOR_ARGB;
58 eventAttrib.color = color;
59 eventAttrib.messageType = NVTX_MESSAGE_TYPE_ASCII;
60 eventAttrib.message.ascii = def.type().c_str();
61 range_ = nvtxRangeStartEx(&eventAttrib);
62 CAFFE_ENFORCE(range_,
"Start range is invalid.");
66 if (!FLAGS_caffe2_use_nvtx) {
73 nvtxRangeId_t range_ = 0;
74 DISABLE_COPY_AND_ASSIGN(ProfiledRange);
81 ProfiledRange(
const OperatorDef& def, Color color) {}
84 DISABLE_COPY_AND_ASSIGN(ProfiledRange);
87 #endif // ifdef CAFFE2_USE_NVTX 91 AsyncDAGNet::AsyncDAGNet(
92 const std::shared_ptr<const NetDef>& net_def,
94 : DAGNetBase(net_def, ws) {
95 VLOG(1) <<
"Constructing Async DAG Net " << net_def->name();
96 eventRecorded_.resize(net_def->op_size());
101 events_.reserve(execution_chains_.size());
102 for (
const auto& chain : execution_chains_) {
103 const int tail_op_idx = chain.second.back();
104 if (operator_nodes_[tail_op_idx].children_.empty()) {
105 events_.push_back(&operator_nodes_[tail_op_idx].operator_->event());
108 VLOG(1) <<
"Total " << execution_chains_.size()
109 <<
" chains, final waiting on " << events_.size() <<
" events";
112 int AsyncDAGNet::stream(
const DeviceOption& device_option) {
114 if (device_option.device_type() == CUDA) {
115 int gpu_id = device_option.cuda_gpu_id();
116 CAFFE_ENFORCE_GE(gpu_id, 0,
"Invalid gpu id: " + caffe2::to_string(gpu_id));
117 if (gpu_id >= stream_counters_.size()) {
118 stream_counters_.resize(gpu_id + 1, 0);
121 stream_id = stream_counters_[gpu_id]++;
122 stream_counters_[gpu_id] %= FLAGS_caffe2_streams_per_gpu;
123 }
while (FLAGS_caffe2_net_async_check_stream_status &&
124 !CUDAContext::IsStreamFree(device_option, stream_id));
129 bool AsyncDAGNet::RunAt(
int chain_id,
const std::vector<int>& chain) {
130 CAFFE_ENFORCE(!chain.empty(),
"Chain should not be empty.");
131 const auto source_idx = chain.front();
132 const auto& parents = operator_nodes_[source_idx].parents_;
140 [
this](
int p) {
return eventRecorded_[p]; }),
141 "None of the parent is recorded for an event.");
144 if (FLAGS_caffe2_async_dag_use_multiple_streams) {
146 operator_nodes_[source_idx].operator_->event().GetDeviceOption());
149 std::vector<const Event*> parent_events;
150 parent_events.reserve(operator_nodes_[source_idx].parents_.size());
151 for (
auto source_parent_idx : operator_nodes_[source_idx].parents_) {
152 parent_events.push_back(
153 &operator_nodes_[source_parent_idx].operator_->event());
157 operator_nodes_[source_idx].operator_->debug_def(), kWaitColor);
158 operator_nodes_[source_idx].operator_->WaitEvents(parent_events, stream_id);
161 if (FLAGS_caffe2_dag_net_collect_stats) {
162 const auto& device_option =
163 operator_nodes_[source_idx].operator_->event().GetDeviceOption();
165 stats_[device_option.device_type()],
167 task_timers_[chain_id]->MicroSeconds());
172 for (
auto idx : chain) {
173 ProfiledRange r(operator_nodes_[idx].operator_->debug_def(), kRunColor);
174 success &= operator_nodes_[idx].operator_->RunAsync(stream_id);
177 const auto& sink_idx = chain.back();
178 if (success && FLAGS_caffe2_net_async_finish_chain) {
179 operator_nodes_[sink_idx].operator_->event().Finish();
182 !eventRecorded_[sink_idx],
185 " should not be recorded.");
186 eventRecorded_[sink_idx] = 1;
188 if (FLAGS_caffe2_dag_net_collect_stats) {
189 const auto& device_option =
190 operator_nodes_[source_idx].operator_->event().GetDeviceOption();
192 stats_[device_option.device_type()],
193 task_time_to_scheduled_us,
194 task_timers_[chain_id]->MicroSeconds());
199 bool AsyncDAGNet::DoRunAsync() {
201 eventRecorded_.assign(eventRecorded_.size(), 0);
203 const auto result = DAGNetBase::DoRunAsync();
207 REGISTER_NET(async_dag, AsyncDAGNet);
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...