Caffe2 - C++ API
A deep learning, cross platform ML framework
recurrent_network_executor.cc
1 #include "caffe2/operators/rnn/recurrent_network_executor.h"
2 
3 #include "caffe2/core/timer.h"
4 
5 namespace caffe2 {
6 
12 template <>
13 std::unique_ptr<RecurrentNetworkExecutorBase> createRNNExecutor<CPUContext>(
14  const NetDef& step_net_def,
15  std::map<string, string>& recurrent_input_map,
16  std::string timestep_blob,
17  ArgumentHelper rnn_args) {
18  auto* exec = new ThreadedRecurrentNetworkExecutor(
19  step_net_def, recurrent_input_map, timestep_blob);
20  int num_threads =
21  rnn_args.GetSingleArgument<int>("rnn_executor.num_threads", 0);
22  if (num_threads > 0) {
23  exec->setNumThreads(num_threads);
24  LOG(INFO) << "Set num threads: " << num_threads;
25  }
26  exec->debug_ = rnn_args.GetSingleArgument<int>("rnn_executor_debug", 0);
27  return std::unique_ptr<RecurrentNetworkExecutorBase>(exec);
28 }
29 
34  CAFFE_ENFORCE(timestep_ops_.size() >= T);
35  countdown_ = T * timestep_ops_[0].size();
36  finished_timesteps_ = 0;
37 
38  CHECK(task_queue_.size() == 0);
39 
40  for (auto& rnn_op : timestep_ops_[0]) {
41  // Launch "frontier"-ops first.
42  if (rnn_op.frontier) {
43  task_queue_.Push(OpTask(0, rnn_op.order, T, 1));
44  }
45  }
46 
47  _Exec();
48  return true;
49 }
50 
55  CAFFE_ENFORCE(timestep_ops_.size() >= T);
56  countdown_ = T * timestep_ops_[0].size();
57  finished_timesteps_ = 0;
58 
59  // Frontier
60  CHECK(task_queue_.size() == 0);
61 
62  for (auto& rnn_op : timestep_ops_[T - 1]) {
63  if (rnn_op.frontier) {
64  task_queue_.Push(OpTask(T - 1, rnn_op.order, T, -1));
65  }
66  }
67 
68  _Exec();
69  return true;
70 }
71 
76 void ThreadedRecurrentNetworkExecutor::RunOp(OpTask job, int thread_id) {
77  bool first_timestep =
78  ((job.forward() && job.timestep == 0) ||
79  (job.backward() && job.timestep == job.T - 1));
80  bool last_timestep =
81  ((job.backward() && job.timestep == 0) ||
82  (job.forward() && job.timestep == job.T - 1));
83  auto& rnn_op = timestep_ops_[job.timestep][job.op_idx];
84  if (rnn_op.num_dynamic_inputs > 0 && !rnn_op.frontier) {
85  CAFFE_ENFORCE_EQ(
86  rnn_op.proc_inputs,
87  rnn_op.num_dynamic_inputs -
88  first_timestep * rnn_op.num_recurrent_inputs,
89  "Error at operator ",
90  job.op_idx,
91  " on timestep ",
92  job.timestep,
93  " T=",
94  job.T,
95  " first =",
96  first_timestep);
97  }
98 
99  // Reset input dependency counter
100  rnn_op.proc_inputs = 0;
101 
102  // Run the operator
103  rnn_op.op->Run();
104 
105  // Knock down dependencies and start next ops, if this
106  // was last dependency fulfilled.
107  for (int depidx : rnn_op.dependencies) {
108  int t = job.timestep;
109  bool for_next_timestep = depidx <= rnn_op.order;
110  if (!last_timestep && for_next_timestep) {
111  t += job.direction;
112  } else if (for_next_timestep) {
113  continue;
114  }
115 
116  auto& dep_op = timestep_ops_[t][depidx];
117  int proc_inputs = dep_op.proc_inputs.fetch_add(1) + 1;
118 
119  // Schedule next op, if this was the last dependency. Note that on
120  // first timestep we don't have recurrent inputs.
121  int num_req_inputs = dep_op.num_dynamic_inputs;
122  if (first_timestep && !for_next_timestep) {
123  num_req_inputs -= dep_op.num_recurrent_inputs;
124  }
125 
126  if (proc_inputs == num_req_inputs || num_req_inputs == 0) {
127  task_queue_.Push(OpTask(t, depidx, job.T, job.direction));
128  }
129  }
130 
131  // Decrement countdown: when at zero, we have run all ops and can
132  // notify the caller thread.
133  if (countdown_.fetch_sub(1) == 1) {
134  CAFFE_ENFORCE_EQ(0, task_queue_.size());
135  std::unique_lock<std::mutex> lk(countdown_mtx_);
136  cv_.notify_one();
137  }
138 }
139 
144 void ThreadedRecurrentNetworkExecutor::WorkerFunction() {
145  size_t num_jobs = 0;
146  static std::atomic<int> seq(0);
147  int id = seq.fetch_add(1);
148 
149  while (!failed_) {
150  OpTask job;
151  if (!task_queue_.Pop(&job)) {
152  break;
153  }
154 
155  // Check for limited timestep parallelism, and if too many timesteps would
156  // be started concurrently, return the task to task queue.
157  if (max_parallel_timesteps_ > 0) {
158  int t = (job.direction == 1 ? job.timestep : job.T - job.timestep + 1);
159  if (t - finished_timesteps_ >= max_parallel_timesteps_) {
160  // Return to queue
161  task_queue_.Push(job);
162  continue;
163  }
164  }
165 
166  try {
167  RunOp(job, id);
168  if (job.op_idx == timestep_ops_template_.size() - 1) {
169  finished_timesteps_.fetch_add(1);
170  }
171  num_jobs++;
172  } catch (::caffe2::EnforceNotMet& enf) {
173  std::unique_lock<std::mutex> lk(countdown_mtx_);
174  LOG(ERROR) << "Crash at thread " << id << " timestep " << job.timestep
175  << " op:" << ProtoDebugString(step_net_def_.op(job.op_idx))
176  << enf.what();
177  task_queue_.NoMoreJobs();
178  failed_ = true;
179  cv_.notify_one();
180  return;
181  }
182  }
183  VLOG(1) << "Worker exiting, did run: " << num_jobs << " jobs";
184 }
185 
190 void ThreadedRecurrentNetworkExecutor::_Exec() {
191  CAFFE_ENFORCE_EQ(
192  false, failed_, "Tried to execute a previously failed RNN executor");
193 
194  // Start threads if not started
195  std::unique_lock<std::mutex> lk(countdown_mtx_);
196  while (workers_.size() < num_threads_) {
197  VLOG(1) << "Start RNN worker " << workers_.size() << " / " << num_threads_;
198  workers_.push_back(
199  std::thread(&ThreadedRecurrentNetworkExecutor::WorkerFunction, this));
200  }
201 
202  // Wait until threads finish.
203  Timer t;
204  while (!failed_ && countdown_ > 0) {
205  cv_.wait_for(lk, std::chrono::seconds(30), [&] {
206  // Log if we are still running, so that we catch deadlocks.. there
207  // should not be any deadlocks, but...
208  if (t.Seconds() > 10) {
209  LOG(INFO) << "RNN Executor still running, remaining ops: "
210  << countdown_;
211  }
212  return failed_ || countdown_ == 0;
213  });
214  }
215 
216  CAFFE_ENFORCE_EQ(
217  false,
218  failed_,
219  "RNN executor encountered failure. See prior error logs for details.");
220 }
221 
222 } // namespace caffe2
Data structure for a scheduled task in the task queue.
std::unique_ptr< RecurrentNetworkExecutorBase > createRNNExecutor< CPUContext >(const NetDef &step_net_def, std::map< string, string > &recurrent_input_map, std::string timestep_blob, ArgumentHelper rnn_args)
Implementation of RecurrentNetworkExecutor that uses thread pool for multithreaded execution of RNNs...
A helper class to index into arguments.
Definition: proto_utils.h:198
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
float Seconds()
Returns the elapsed time in seconds.
Definition: timer.h:40
bool RunBackwards(int T) override
Run backward pass with T timesteps.
A simple timer object for measuring time.
Definition: timer.h:16
bool Run(int T) override
Run forwardpass with T timesteps.