Caffe2 - C++ API
A deep learning, cross platform ML framework
net_async_dag_gpu.cc
1 #include "caffe2/core/net_async_dag_gpu.h"
2 
3 #include <set>
4 #include <stack>
5 #include <unordered_map>
6 #include <unordered_set>
7 
8 #include "caffe2/core/operator.h"
9 #include "caffe2/core/static_tracepoint.h"
10 #include "caffe2/core/timer.h"
11 #include "caffe2/proto/caffe2.pb.h"
12 #include "caffe2/utils/proto_utils.h"
13 
14 #include "caffe2/core/context_gpu.h"
15 
16 #ifdef CAFFE2_USE_NVTX
17 #include <nvToolsExt.h>
18 #endif
19 
20 CAFFE2_DEFINE_bool(caffe2_use_nvtx, false, "Use NVTX ranges for profiling");
21 
22 CAFFE2_DEFINE_bool(
23  caffe2_async_dag_use_multiple_streams,
24  false,
25  "Use multiple streams per thread");
26 
27 CAFFE2_DECLARE_bool(caffe2_dag_net_collect_stats);
28 
29 CAFFE2_DECLARE_bool(caffe2_net_async_finish_chain);
30 
31 CAFFE2_DECLARE_int(caffe2_streams_per_gpu);
32 
33 CAFFE2_DECLARE_bool(caffe2_net_async_check_stream_status);
34 
35 namespace caffe2 {
36 
37 thread_local std::vector<int> AsyncDAGNet::stream_counters_;
38 
39 namespace {
40 
41 using Color = int32_t;
42 constexpr Color kRunColor = 0x0000CCFF; // blue
43 constexpr Color kRecordColor = 0x00FF3300; // red
44 constexpr Color kWaitColor = 0x0066FF33; // green
45 
46 #ifdef CAFFE2_USE_NVTX
47 
48 class ProfiledRange {
49  public:
50  ProfiledRange(const OperatorDef& def, Color color) {
51  if (!FLAGS_caffe2_use_nvtx) {
52  return;
53  }
54  nvtxEventAttributes_t eventAttrib = {0};
55  eventAttrib.version = NVTX_VERSION;
56  eventAttrib.size = NVTX_EVENT_ATTRIB_STRUCT_SIZE;
57  eventAttrib.colorType = NVTX_COLOR_ARGB;
58  eventAttrib.color = color;
59  eventAttrib.messageType = NVTX_MESSAGE_TYPE_ASCII;
60  eventAttrib.message.ascii = def.type().c_str();
61  range_ = nvtxRangeStartEx(&eventAttrib);
62  CAFFE_ENFORCE(range_, "Start range is invalid.");
63  }
64 
65  ~ProfiledRange() {
66  if (!FLAGS_caffe2_use_nvtx) {
67  return;
68  }
69  nvtxRangeEnd(range_);
70  }
71 
72  private:
73  nvtxRangeId_t range_ = 0;
74  DISABLE_COPY_AND_ASSIGN(ProfiledRange);
75 };
76 
77 #else
78 
79 class ProfiledRange {
80  public:
81  ProfiledRange(const OperatorDef& def, Color color) {}
82 
83  private:
84  DISABLE_COPY_AND_ASSIGN(ProfiledRange);
85 };
86 
87 #endif // ifdef CAFFE2_USE_NVTX
88 
89 } // namespace
90 
91 AsyncDAGNet::AsyncDAGNet(
92  const std::shared_ptr<const NetDef>& net_def,
93  Workspace* ws)
94  : DAGNetBase(net_def, ws) {
95  VLOG(1) << "Constructing Async DAG Net " << net_def->name();
96  eventRecorded_.resize(net_def->op_size());
97 
98  // For all chains, their tail should consist the list of events that we are
99  // needing for synchronization in the Run() inteface, unless there are other
100  // chains depending on it.
101  events_.reserve(execution_chains_.size());
102  for (const auto& chain : execution_chains_) {
103  const int tail_op_idx = chain.second.back();
104  if (operator_nodes_[tail_op_idx].children_.empty()) {
105  events_.push_back(&operator_nodes_[tail_op_idx].operator_->event());
106  }
107  }
108  VLOG(1) << "Total " << execution_chains_.size()
109  << " chains, final waiting on " << events_.size() << " events";
110 }
111 
112 int AsyncDAGNet::stream(const DeviceOption& device_option) {
113  int stream_id = 0;
114  if (device_option.device_type() == CUDA) {
115  int gpu_id = device_option.cuda_gpu_id();
116  CAFFE_ENFORCE_GE(gpu_id, 0, "Invalid gpu id: " + caffe2::to_string(gpu_id));
117  if (gpu_id >= stream_counters_.size()) {
118  stream_counters_.resize(gpu_id + 1, 0);
119  }
120  do {
121  stream_id = stream_counters_[gpu_id]++;
122  stream_counters_[gpu_id] %= FLAGS_caffe2_streams_per_gpu;
123  } while (FLAGS_caffe2_net_async_check_stream_status &&
124  !CUDAContext::IsStreamFree(device_option, stream_id));
125  }
126  return stream_id;
127 }
128 
129 bool AsyncDAGNet::RunAt(int chain_id, const std::vector<int>& chain) {
130  CAFFE_ENFORCE(!chain.empty(), "Chain should not be empty.");
131  const auto source_idx = chain.front();
132  const auto& parents = operator_nodes_[source_idx].parents_;
133  // Help ensure that our chaining is correct by verifying at least
134  // one parent recorded an event.
135  CAFFE_ENFORCE(
136  parents.empty() ||
137  std::any_of(
138  parents.begin(),
139  parents.end(),
140  [this](int p) { return eventRecorded_[p]; }),
141  "None of the parent is recorded for an event.");
142 
143  int stream_id = 0;
144  if (FLAGS_caffe2_async_dag_use_multiple_streams) {
145  stream_id = stream(
146  operator_nodes_[source_idx].operator_->event().GetDeviceOption());
147  }
148 
149  std::vector<const Event*> parent_events;
150  parent_events.reserve(operator_nodes_[source_idx].parents_.size());
151  for (auto source_parent_idx : operator_nodes_[source_idx].parents_) {
152  parent_events.push_back(
153  &operator_nodes_[source_parent_idx].operator_->event());
154  }
155  {
156  ProfiledRange r(
157  operator_nodes_[source_idx].operator_->debug_def(), kWaitColor);
158  operator_nodes_[source_idx].operator_->WaitEvents(parent_events, stream_id);
159  }
160 
161  if (FLAGS_caffe2_dag_net_collect_stats) {
162  const auto& device_option =
163  operator_nodes_[source_idx].operator_->event().GetDeviceOption();
164  CAFFE_EVENT(
165  stats_[device_option.device_type()],
166  task_wait_time_us,
167  task_timers_[chain_id]->MicroSeconds());
168  }
169 
170  // We've waited on all our parent indices.
171  bool success = true;
172  for (auto idx : chain) {
173  ProfiledRange r(operator_nodes_[idx].operator_->debug_def(), kRunColor);
174  success &= operator_nodes_[idx].operator_->RunAsync(stream_id);
175  }
176 
177  const auto& sink_idx = chain.back();
178  if (success && FLAGS_caffe2_net_async_finish_chain) {
179  operator_nodes_[sink_idx].operator_->event().Finish();
180  }
181  CAFFE_ENFORCE(
182  !eventRecorded_[sink_idx],
183  "An event for ",
184  sink_idx,
185  " should not be recorded.");
186  eventRecorded_[sink_idx] = 1;
187 
188  if (FLAGS_caffe2_dag_net_collect_stats) {
189  const auto& device_option =
190  operator_nodes_[source_idx].operator_->event().GetDeviceOption();
191  CAFFE_EVENT(
192  stats_[device_option.device_type()],
193  task_time_to_scheduled_us,
194  task_timers_[chain_id]->MicroSeconds());
195  }
196  return success;
197 }
198 
199 bool AsyncDAGNet::DoRunAsync() {
200  // Reset the event tracking at each iteration
201  eventRecorded_.assign(eventRecorded_.size(), 0);
202 
203  const auto result = DAGNetBase::DoRunAsync();
204  return result;
205 }
206 
207 REGISTER_NET(async_dag, AsyncDAGNet);
208 
209 } // namespace caffe2
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...