Caffe2 - C++ API
A deep learning, cross platform ML framework
net_singlethread_async_gpu.cc
1 #include <condition_variable>
2 #include <mutex>
3 #include <stack>
4 
5 #if !defined(_MSC_VER) && !defined(__APPLE__)
6 #include <sched.h>
7 #endif
8 
9 #include "caffe2/core/context_gpu.h"
10 #include "caffe2/core/net_simple.h"
11 #include "caffe2/core/operator.h"
12 #include "caffe2/proto/caffe2.pb.h"
13 
14 namespace caffe2 {
15 
16 namespace gpu_single_thread {
17 
18 struct Task {
19  std::vector<std::unique_ptr<OperatorBase>>* ops_;
20  std::condition_variable* cv_;
21  std::mutex* mtx_;
22  int stream_id_;
23  bool done_ = false;
24 };
25 
26 class GPUExecutor {
27  public:
28  explicit GPUExecutor(int gpu_id) : gpu_id_(gpu_id) {}
29 
30  ~GPUExecutor() {
31  queue_.NoMoreJobs();
32  thread_.join();
33  }
34 
35  void RunJob(Task* task) {
36  queue_.Push(task);
37  }
38 
39  void start() {
40  thread_ = std::thread(&GPUExecutor::WorkerFunction, this);
41  }
42 
43  static std::shared_ptr<GPUExecutor> Get(int gpu);
44  static void Release(int gpu);
45 
46  private:
47  void set_affinity();
48  void WorkerFunction();
49 
50  std::thread thread_;
51  int gpu_id_;
52  SimpleQueue<Task*> queue_;
53  static std::shared_ptr<GPUExecutor> executors_[CAFFE2_COMPILE_TIME_MAX_GPUS];
54  static std::mutex gpu_mtx_[CAFFE2_COMPILE_TIME_MAX_GPUS];
55 };
56 
57 std::shared_ptr<GPUExecutor>
58  GPUExecutor::executors_[CAFFE2_COMPILE_TIME_MAX_GPUS];
59 std::mutex GPUExecutor::gpu_mtx_[CAFFE2_COMPILE_TIME_MAX_GPUS];
60 
61 std::shared_ptr<GPUExecutor> GPUExecutor::Get(int gpu) {
62  std::lock_guard<std::mutex> grd(gpu_mtx_[gpu]);
63  if (!executors_[gpu].get()) {
64  executors_[gpu].reset(new GPUExecutor(gpu));
65  executors_[gpu].get()->start();
66  }
67  return executors_[gpu];
68 }
69 
70 void GPUExecutor::Release(int gpu) {
71  std::lock_guard<std::mutex> grd(gpu_mtx_[gpu]);
72  if (executors_[gpu].use_count() == 1) {
73  executors_[gpu].reset();
74  }
75 }
76 
77 void GPUExecutor::set_affinity() {
78 // TODO: find a Windows-compatible affinity setting approach.
79 // Currently, set_affinity has no effect in Windows. The code is still
80 // correct with possible slowdowns.
81 #if !defined(_MSC_VER) && !defined(__APPLE__)
82  /* Set CPU affinity */
83  int num_cores = std::thread::hardware_concurrency();
84  if (num_cores > 0) {
85  cpu_set_t mask;
86  CPU_ZERO(&mask);
87 
88  CPU_SET(gpu_id_ % num_cores, &mask);
89  if (sched_setaffinity(0, sizeof(cpu_set_t), &mask)) {
90  LOG(WARNING) << "Could not set CPU affinity";
91  }
92  }
93 #endif
94 }
95 
96 // Worker that takes list of operators from the queue
97 // and executes them.
98 void GPUExecutor::WorkerFunction() {
99  int stream_id_seq = 0;
100  std::stack<int> streams;
101  set_affinity();
102 
103  while (true) {
104  Task* task = nullptr;
105  vector<Task*> task_batch;
106 
107  if (!queue_.Pop(&task)) {
108  return;
109  }
110  int num_tasks = 1 + queue_.size();
111 
112  // Grab all tasks currently in queue so we can run them in parallel
113  // Since we have only one producer, we know this does not block
114 
115  // TODO: launch ops in "zig-zag" manner so that we can start multiple
116  // streams as simultaneously as possible
117  for (int i = num_tasks - 1; i >= 0; i--) {
118  assert(task != nullptr);
119  if (streams.empty()) {
120  task->stream_id_ = stream_id_seq++;
121  } else {
122  task->stream_id_ = streams.top();
123  streams.pop();
124  }
125 
126  for (auto& op : *task->ops_) {
127  op->RunAsync(task->stream_id_);
128  }
129  task_batch.push_back(task);
130 
131  // Get the next one
132  if (i > 0) {
133  if (!queue_.Pop(&task)) {
134  return;
135  }
136  }
137  }
138 
139  // Wait for the currently executing streams
140  for (auto& pendtask : task_batch) {
141  cudaStream_t stream =
142  CUDAContext::cuda_stream(gpu_id_, pendtask->stream_id_);
143  CUDA_ENFORCE(cudaStreamSynchronize(stream));
144  streams.push(pendtask->stream_id_);
145  std::unique_lock<std::mutex> lk(*pendtask->mtx_);
146  pendtask->done_ = true;
147  pendtask->cv_->notify_one();
148  }
149  }
150 }
151 
153  public:
154  using SimpleNet::SimpleNet;
155 
157  if (executor_.get()) {
158  // Explicitly reset my holding of the exeuctor so it can be
159  // killed.
160  executor_.reset();
161  GPUExecutor::Release(gpu_id_);
162  }
163  }
164 
165  bool Run() override {
166  if (!executor_.get()) {
167  initialize();
168  }
169 
170  // Dispatch jobs to the gpu-specific executor thread
171  std::unique_lock<std::mutex> lk(mutex_);
172  Task t;
173  t.ops_ = &operators_;
174  t.cv_ = &cv_;
175  t.mtx_ = &mutex_;
176  t.done_ = false;
177  executor_.get()->RunJob(&t);
178 
179  while (!t.done_) {
180  cv_.wait(lk);
181  }
182 
183  return true;
184  }
185 
186  private:
187  std::condition_variable cv_;
188  std::mutex mutex_;
189 
190  void initialize() {
191  std::lock_guard<std::mutex> grd(mutex_);
192 
193  /* Check the gpu id of this net and check that only one
194  GPU has operators on this net */
195  gpu_id_ = (-1);
196  for (auto& op : operators_) {
197  if (op->device_option().device_type() == CUDA) {
198  if (gpu_id_ < 0) {
199  gpu_id_ = op->device_option().cuda_gpu_id();
200  } else {
201  CAFFE_ENFORCE_EQ(
202  gpu_id_,
203  op->device_option().cuda_gpu_id(),
204  "One net can only have operators for one GPU");
205  }
206  }
207  }
208  executor_ = GPUExecutor::Get(gpu_id_);
209  }
210 
211  int gpu_id_;
212  std::shared_ptr<GPUExecutor> executor_;
213 };
214 
215 REGISTER_NET(singlethread_async, SingleThreadAsyncNet)
216 
217 } // namespace gpu_single_thread
218 } // namespace caffe2
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...