1 #include <condition_variable> 5 #if !defined(_MSC_VER) && !defined(__APPLE__) 9 #include "caffe2/core/context_gpu.h" 10 #include "caffe2/core/net_simple.h" 11 #include "caffe2/core/operator.h" 12 #include "caffe2/proto/caffe2.pb.h" 16 namespace gpu_single_thread {
19 std::vector<std::unique_ptr<OperatorBase>>* ops_;
20 std::condition_variable* cv_;
28 explicit GPUExecutor(
int gpu_id) : gpu_id_(gpu_id) {}
35 void RunJob(
Task* task) {
40 thread_ = std::thread(&GPUExecutor::WorkerFunction,
this);
43 static std::shared_ptr<GPUExecutor> Get(
int gpu);
44 static void Release(
int gpu);
48 void WorkerFunction();
53 static std::shared_ptr<GPUExecutor> executors_[CAFFE2_COMPILE_TIME_MAX_GPUS];
54 static std::mutex gpu_mtx_[CAFFE2_COMPILE_TIME_MAX_GPUS];
57 std::shared_ptr<GPUExecutor>
58 GPUExecutor::executors_[CAFFE2_COMPILE_TIME_MAX_GPUS];
59 std::mutex GPUExecutor::gpu_mtx_[CAFFE2_COMPILE_TIME_MAX_GPUS];
61 std::shared_ptr<GPUExecutor> GPUExecutor::Get(
int gpu) {
62 std::lock_guard<std::mutex> grd(gpu_mtx_[gpu]);
63 if (!executors_[gpu].
get()) {
65 executors_[gpu].get()->start();
67 return executors_[gpu];
70 void GPUExecutor::Release(
int gpu) {
71 std::lock_guard<std::mutex> grd(gpu_mtx_[gpu]);
72 if (executors_[gpu].use_count() == 1) {
73 executors_[gpu].reset();
77 void GPUExecutor::set_affinity() {
81 #if !defined(_MSC_VER) && !defined(__APPLE__) 83 int num_cores = std::thread::hardware_concurrency();
88 CPU_SET(gpu_id_ % num_cores, &mask);
89 if (sched_setaffinity(0,
sizeof(cpu_set_t), &mask)) {
90 LOG(WARNING) <<
"Could not set CPU affinity";
98 void GPUExecutor::WorkerFunction() {
99 int stream_id_seq = 0;
100 std::stack<int> streams;
104 Task* task =
nullptr;
105 vector<Task*> task_batch;
107 if (!queue_.Pop(&task)) {
110 int num_tasks = 1 + queue_.size();
117 for (
int i = num_tasks - 1; i >= 0; i--) {
118 assert(task !=
nullptr);
119 if (streams.empty()) {
120 task->stream_id_ = stream_id_seq++;
122 task->stream_id_ = streams.top();
126 for (
auto& op : *task->ops_) {
127 op->RunAsync(task->stream_id_);
129 task_batch.push_back(task);
133 if (!queue_.Pop(&task)) {
140 for (
auto& pendtask : task_batch) {
141 cudaStream_t stream =
142 CUDAContext::cuda_stream(gpu_id_, pendtask->stream_id_);
143 CUDA_ENFORCE(cudaStreamSynchronize(stream));
144 streams.push(pendtask->stream_id_);
145 std::unique_lock<std::mutex> lk(*pendtask->mtx_);
146 pendtask->done_ =
true;
147 pendtask->cv_->notify_one();
154 using SimpleNet::SimpleNet;
157 if (executor_.get()) {
161 GPUExecutor::Release(gpu_id_);
165 bool Run()
override {
166 if (!executor_.get()) {
171 std::unique_lock<std::mutex> lk(mutex_);
173 t.ops_ = &operators_;
177 executor_.get()->RunJob(&t);
187 std::condition_variable cv_;
191 std::lock_guard<std::mutex> grd(mutex_);
196 for (
auto& op : operators_) {
197 if (op->device_option().device_type() == CUDA) {
199 gpu_id_ = op->device_option().cuda_gpu_id();
203 op->device_option().cuda_gpu_id(),
204 "One net can only have operators for one GPU");
208 executor_ = GPUExecutor::Get(gpu_id_);
212 std::shared_ptr<GPUExecutor> executor_;
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...