Caffe2 - C++ API
A deep learning, cross platform ML framework
recurrent_network_executor.h
1 #ifndef CAFFE2_OPERATORS_RECURRENT_NETWORK_EXECUTOR_H_
2 #define CAFFE2_OPERATORS_RECURRENT_NETWORK_EXECUTOR_H_
3 
4 #include <map>
5 #include <unordered_set>
6 #include <vector>
7 
8 #include "caffe2/core/context.h"
9 #include "caffe2/core/logging.h"
10 #include "caffe2/core/operator.h"
11 #include "caffe2/core/timer.h"
12 #include "caffe2/operators/rnn/recurrent_network_executor_incl.h"
13 #include "caffe2/operators/rnn/rnn_capable_operator_observer.h"
14 
15 namespace caffe2 {
16 
33  protected:
35  const NetDef& step_net_def,
36  std::map<string, string>& recurrent_input_map,
37  std::string timestep_blob)
38  : step_net_def_(step_net_def),
39  recurrent_input_map_(recurrent_input_map),
40  timestep_blob_(timestep_blob) {
41  for (int i = 0; i < step_net_def_.op_size(); i++) {
42  op_deps_.push_back(op_deps(i));
43  }
44  }
45 
46  public:
47  virtual ~RecurrentNetworkExecutorBase() {
48  if (debug_) {
49  if (timestep_ops_.size() > 0) {
50  PrintInfo(0);
51  }
52  }
53  }
54 
55  virtual bool Run(int T) = 0;
56 
57  virtual bool RunBackwards(int T) = 0;
58 
67  int t,
68  Workspace* ws,
69  const std::vector<std::unique_ptr<ObserverBase<OperatorBase>>>&
70  observers_list) {
71  if (timestep_ops_template_.size() == 0) {
72  // Firsrt invocation -- compute dependencies
73  CalculateInternalDependencies();
74 
75  // Label ops based on whether they contain reference to the timestep
76  // blob. This is an optimization to avoid string comparisons later.
77  for (auto& rnn_op : timestep_ops_template_) {
78  rnn_op.has_timestep_blob = false;
79  const OperatorDef& op = step_net_def_.op(rnn_op.order);
80  for (int i = 0; i < op.input_size(); i++) {
81  if (op.input(i) == timestep_blob_) {
82  rnn_op.has_timestep_blob = true;
83  break;
84  }
85  }
86  CAFFE_ENFORCE(
87  !HasOutput(op, timestep_blob_),
88  "Timestep cannot be output of an op: ",
89  timestep_blob_,
90  " op=" + ProtoDebugString(op));
91  }
92  }
93 
94  // Initialize timestep if it is not initialized
95  if (timestep_ops_.size() <= t ||
96  (timestep_ops_.size() > t && timestep_ops_[t].size() == 0)) {
97  // Initialize empty timestep ops vectors for each timestep preceding
98  // this.
99  for (int j = timestep_ops_.size(); j < t + 1; j++) {
100  timestep_ops_.push_back(std::vector<RNNNetOperator>());
101  timestep_ops_.back().reserve(timestep_ops_template_.size());
102  }
103 
104  // Keep track of workspaces for optimization in forward-only case
105  if (workspaces_.size() < t + 1) {
106  workspaces_.resize(t + 1);
107  }
108  workspaces_[t] = ws;
109 
110  // Create a specific timestep blob for this timestep. This is to
111  // avoid conflicting timestep blobs when reusing workspaces, as with
112  // the forward-only mode.
113  std::string this_timestep_blob =
114  timestep_blob_ + "_rnnexec_t" + caffe2::to_string(t);
115  ws->CreateBlob(this_timestep_blob)->GetMutable<TensorCPU>()->Resize(1);
116  auto b = ws->GetBlob(this_timestep_blob);
117  CAFFE_ENFORCE(b);
118  b->GetMutable<TensorCPU>()->mutable_data<int32_t>()[0] = t;
119 
120  // Copy the operators from template
121  for (auto& template_rnn_op : timestep_ops_template_) {
122  auto& rnn_op = template_rnn_op;
123 
124  // For ops that have the timestep blob as an input we need to
125  // create a new operator definition with the timestep-specific
126  // timestep blob. This is required to avoid race conditions when
127  // multiple timesteps execute in paralle.
128  if (rnn_op.has_timestep_blob) {
129  OperatorDef op_copy = step_net_def_.op(rnn_op.order);
130 
131  for (int i = 0; i < op_copy.input_size(); i++) {
132  if (op_copy.input(i) == timestep_blob_) {
133  op_copy.set_input(i, this_timestep_blob);
134  }
135  }
136 
137  rnn_op.op = CreateOperator(op_copy, ws);
138  for (const auto& observer : observers_list) {
139  auto rnn_observer =
140  dynamic_cast_if_rtti<const RNNCapableOperatorObserver*>(
141  observer.get());
142  if (rnn_observer) {
143  std::unique_ptr<ObserverBase<OperatorBase>> rnn_observer_copy =
144  rnn_observer->rnnCopy(rnn_op.op.get(), rnn_op.order);
145  CAFFE_ENFORCE(
146  rnn_observer_copy,
147  "Observers without rnnCopy() implemented cannot be attached "
148  "to RNN using RNNExecutor.");
149  rnn_op.op->AttachObserver(std::move(rnn_observer_copy));
150  }
151  }
152  } else {
153  // Optimization for forward-only models when we can share workspaces
154  // with timesteps: then we can just copy the op reference.
155  if (t > max_parallel_timesteps_ && max_parallel_timesteps_ > 0 &&
156  workspaces_[t - max_parallel_timesteps_] == ws) {
157  rnn_op.op =
158  timestep_ops_[t - max_parallel_timesteps_][rnn_op.order].op;
159  } else {
160  // Otherwise, we need to create a brand new op with the workspace
161  // owned by this timestep.
162  rnn_op.op = CreateOperator(step_net_def_.op(rnn_op.order), ws);
163  for (const auto& observer : observers_list) {
164  auto rnn_observer =
165  dynamic_cast_if_rtti<const RNNCapableOperatorObserver*>(
166  observer.get());
167  if (rnn_observer) {
168  std::unique_ptr<ObserverBase<OperatorBase>> rnn_observer_copy =
169  rnn_observer->rnnCopy(rnn_op.op.get(), rnn_op.order);
170  CAFFE_ENFORCE(
171  rnn_observer_copy,
172  "Observers without rnnCopy() implemented cannot be attached "
173  "to RNN using RNNExecutor.");
174  rnn_op.op->AttachObserver(std::move(rnn_observer_copy));
175  }
176  }
177  }
178  }
179  rnn_op.op->DisableEvent();
180 
181  timestep_ops_[t].emplace_back(rnn_op);
182  }
183  }
184  }
185 
192  max_parallel_timesteps_ = p;
193  }
194 
195  size_t NumObserversStepNet() {
196  size_t num = 0;
197  for (auto& ops_at_timestep_t : timestep_ops_) {
198  for (auto& rnn_op : ops_at_timestep_t) {
199  num += rnn_op.op->NumObservers();
200  }
201  }
202  return num;
203  }
204 
205  private:
206  // Utility method to check if any of the op inputs or control inputs
207  // contain given blob 'input'
208  bool has_input(std::string x, int opidx) {
209  for (auto& inp : step_net_def_.op(opidx).input()) {
210  if (inp == x) {
211  return true;
212  }
213  }
214  for (auto& inp : step_net_def_.op(opidx).control_input()) {
215  if (inp == x) {
216  return true;
217  }
218  }
219  return false;
220  }
221 
222  // Return all outbound dependencies of an op. Special case for
223  // rnn dependencies, that are set in recurent_network_op.
224  std::vector<string> op_deps(int i) {
225  std::vector<string> outs;
226  auto& opdef = step_net_def_.op(i);
227  for (string o : opdef.output()) {
228  outs.push_back(o);
229  };
230  for (auto& arg : opdef.arg()) {
231  if (arg.name().find("rnn_dependency") == 0) {
232  outs.push_back(arg.s());
233  }
234  }
235  return outs;
236  }
237 
242  void infer_dependencies(
243  int start_i,
244  std::unordered_set<string> outputs,
245  std::vector<RNNNetOperator>& rnn_ops,
246  std::unordered_set<int>* dep_ops) {
247  std::unordered_set<int> already_accounted_deps;
248  int num_ops = step_net_def_.op_size();
249  bool ignore_links = this->ignoreLinkDependencies();
250  for (int j = 0; j < num_ops - 1 && !outputs.empty(); j++) {
251  int i = (start_i + j) % num_ops;
252  if (ignore_links && rnn_ops[i].link_op) {
253  continue;
254  }
255  for (auto& outp : outputs) {
256  if (has_input(outp, i)) {
257  if (already_accounted_deps.find(i) == already_accounted_deps.end()) {
258  dep_ops->insert(i);
259  }
260 
261  // Now we can take the deps of this ops and not
262  // add them anymore
263  for (int odep : rnn_ops[i].dependencies) {
264  already_accounted_deps.insert(odep);
265  }
266  for (string& dep_out : op_deps_[i]) {
267  auto oit = outputs.find(dep_out);
268  if (oit != outputs.end()) {
269  // This op produces output of the orignal op, so the dependency
270  // passed through that op
271  outputs.erase(oit);
272  }
273  }
274  break;
275  }
276  }
277  }
278  }
279 
287  void add_race_conflict_dependencies(
288  int opidx,
289  std::vector<RNNNetOperator>& rnn_ops,
290  std::unordered_set<int>* dep_ops) {
291  for (int i = 0; i < rnn_ops.size(); i++) {
292  if (i == opidx) {
293  continue;
294  }
295  if (rnn_ops[i].link_op && this->ignoreLinkDependencies()) {
296  continue;
297  }
298  for (auto& dep_blob : op_deps_[i]) {
299  for (auto& inp : step_net_def_.op(opidx).input()) {
300  if (inp == dep_blob) {
301  dep_ops->insert(i);
302  break;
303  }
304  }
305  if (i < opidx) {
306  for (auto& outp : step_net_def_.op(opidx).output()) {
307  if (outp == dep_blob) {
308  dep_ops->insert(i);
309  break;
310  }
311  }
312  }
313  }
314  }
315  }
316 
322  void CalculateInternalDependencies() {
323  for (int i = 0; i < step_net_def_.op_size(); i++) {
324  timestep_ops_template_.push_back(RNNNetOperator(step_net_def_.op(i), i));
325  }
326  // Then see which outputs appear as inputs, and those are
327  // the internal blobs.
328  for (auto& rnn_op : timestep_ops_template_) {
329  std::unordered_set<string> dep_outputs;
330  for (auto& outp : op_deps_[rnn_op.order]) {
331  dep_outputs.insert(outp);
332  }
333 
334  // Add recurrent dependencies as 'outputs' for this op
335  for (auto& outp : dep_outputs) {
336  auto rit = recurrent_input_map_.find(outp);
337  if (rit != recurrent_input_map_.end()) {
338  dep_outputs.insert(rit->second);
339  } else {
340  dep_outputs.insert(outp);
341  }
342  }
343 
344  // Compute dependencies of this op.
345  if (!rnn_op.link_op || !this->ignoreLinkDependencies()) {
346  std::unordered_set<int> dependent_ops;
347  infer_dependencies(
348  rnn_op.order + 1,
349  dep_outputs,
350  timestep_ops_template_,
351  &dependent_ops);
352 
353  // Race conditions arise when operator writes a blob that is
354  // being read by another.
355  if (!this->ignoreLinkDependencies()) {
356  add_race_conflict_dependencies(
357  rnn_op.order, timestep_ops_template_, &dependent_ops);
358  }
359 
360  for (int i : dependent_ops) {
361  rnn_op.dependencies.push_back(i);
362  }
363 
364  // Sort in ascending order of dependency distance. If op
365  // j > i, then distance is j - i. But if j < i, then distance
366  // from i to j passes the timestep boundary and is j + num ops - i.
367  std::sort(
368  rnn_op.dependencies.begin(),
369  rnn_op.dependencies.end(),
370  [&](const int& a, const int& b) {
371  if (a < rnn_op.order && b < rnn_op.order) {
372  return a < b;
373  }
374  if (a >= rnn_op.order && b >= rnn_op.order) {
375  return a < b;
376  }
377  if (a >= rnn_op.order && b < rnn_op.order) {
378  return true;
379  }
380  return false;
381  });
382  }
383  }
384 
385  // Update dependency counts
386  for (auto& rnn_op : timestep_ops_template_) {
387  for (int i : rnn_op.dependencies) {
388  timestep_ops_template_[i].num_dynamic_inputs++;
389 
390  if (i > rnn_op.order) {
391  timestep_ops_template_[i].frontier = false;
392  } else {
393  timestep_ops_template_[i].num_recurrent_inputs++;
394  }
395  }
396  }
397  // Find ops that have no recurrent inputs, and bind them
398  // to the last op of the timestep. If there is only one op
399  // in the step net, then it will depend on itself. Note that
400  // we do not increase the dynamic input counter.
401  for (auto& rnn_op : timestep_ops_template_) {
402  if (rnn_op.num_dynamic_inputs == 0 && rnn_op.num_recurrent_inputs == 0) {
403  if (rnn_op.link_op && this->ignoreLinkDependencies()) {
404  continue;
405  }
406  timestep_ops_template_.back().dependencies.push_back(rnn_op.order);
407  }
408  }
409 
410  // compute parents
411  for (auto& rnn_op : timestep_ops_template_) {
412  for (int dep : rnn_op.dependencies) {
413  timestep_ops_template_[dep].parents.push_back(rnn_op.order);
414  }
415  }
416  AnalyzeOps();
417  }
418 
419  protected:
424  void PrintInfo(int t) {
425  auto& rnn_ops = timestep_ops_[t];
426 
427  LOG(INFO) << "Timestep: " << t;
428  for (auto& rnn_op : rnn_ops) {
429  auto& op = rnn_op.op;
430  LOG(INFO) << "Operator " << rnn_op.order << ": " << op->type()
431  << " dep inputs:" << rnn_op.num_dynamic_inputs
432  << " rec inputs:" << rnn_op.num_recurrent_inputs
433  << " frontier: " << rnn_op.frontier;
434  for (auto& inp : rnn_op.op->debug_def().input()) {
435  LOG(INFO) << " ---- input: " << inp;
436  }
437  for (auto& outp : rnn_op.op->debug_def().output()) {
438  LOG(INFO) << " ---- output: " << outp;
439  }
440  for (auto j : rnn_op.dependencies) {
441  LOG(INFO) << " dep: " << j << ": " << rnn_ops[j].op->type();
442  }
443  for (auto j : rnn_op.parents) {
444  LOG(INFO) << " parent: " << j << ": " << rnn_ops[j].op->type();
445  }
446  }
447 
448  LOG(INFO) << "recurrent_inputs:" << recurrent_input_map_;
449 
450  for (auto& rnn_op : rnn_ops) {
451  LOG(INFO) << "Operator " << rnn_op.order;
452  LOG(INFO) << ProtoDebugString(rnn_op.op->debug_def());
453  }
454  }
455 
456  virtual void AnalyzeOps() {}
457 
458  virtual bool ignoreLinkDependencies() = 0;
459 
460  std::vector<std::vector<RNNNetOperator>> timestep_ops_;
461  std::vector<OperatorBase*> op_ptrs_;
462 
463  std::vector<RNNNetOperator> timestep_ops_template_;
464 
465  NetDef step_net_def_;
466  std::vector<std::vector<string>> op_deps_;
467  std::vector<Workspace*> workspaces_;
468  std::map<string, string> recurrent_input_map_;
469  std::string timestep_blob_;
470 
471  int max_parallel_timesteps_ = -1;
472 
473  public:
474  bool debug_ = false;
475 };
476 
477 template <class Context>
478 std::unique_ptr<RecurrentNetworkExecutorBase> createRNNExecutor(
479  const NetDef& step_net_def,
480  std::map<string, string>& recurrent_input_map,
481  std::string timestep_blob,
482  ArgumentHelper rnn_args);
483 
485  public:
487  const NetDef& step_net_def,
488  std::map<string, string>& recurrent_input_map,
489  std::string timestep_blob)
490  : RecurrentNetworkExecutorBase(step_net_def, recurrent_input_map, timestep_blob),
491  failed_(false) {}
492 
494  task_queue_.NoMoreJobs();
495  VLOG(1) << "Joining workers.";
496  for (auto& worker : workers_) {
497  worker.join();
498  }
499  }
500 
501  bool Run(int T) override;
502 
503  bool RunBackwards(int T) override;
504 
505  bool ignoreLinkDependencies() override {
506  return false;
507  }
508 
509  void setNumThreads(int n) {
510  num_threads_ = n;
511  }
512 
513  private:
514  void _ExecRange(int from, int to);
515 
516  void _Exec();
517 
518  void WorkerFunction();
519 
520  void RunOp(OpTask job, int thread_id);
521 
522  SimpleQueue<OpTask> task_queue_;
523  std::atomic<int> countdown_;
524  std::atomic<bool> failed_;
525  std::atomic<int> finished_timesteps_;
526  int num_ops_;
527  std::mutex countdown_mtx_;
528  std::condition_variable cv_;
529  std::vector<std::thread> workers_;
530  int num_threads_ = 4;
531 };
532 
533 } // namespace caffe2
534 
535 #endif // CAFFE2_OPERATORS_RECURRENT_NETWORK_EXECUTOR_H_
Blob * CreateBlob(const string &name)
Creates a blob of the given name.
Definition: workspace.cc:104
RecurrentNetworkExecutor is a specialized runtime for recurrent neural networks (RNNs).
Struct for operator in a timestep and its dependenceis.
Data structure for a scheduled task in the task queue.
void EnsureTimestepInitialized(int t, Workspace *ws, const std::vector< std::unique_ptr< ObserverBase< OperatorBase >>> &observers_list)
Callers must call EnsureTimestepInitialized before starting execution for each of the relevant timest...
void PrintInfo(int t)
For debug purposes, print the dependency structure.
A helper class to index into arguments.
Definition: proto_utils.h:198
Workspace is a class that holds all the related objects created during runtime: (1) all blobs...
Definition: workspace.h:47
const Blob * GetBlob(const string &name) const
Gets the blob with the given name as a const pointer.
Definition: workspace.cc:164
void SetMaxParallelTimesteps(int p)
Set limit for the number of timesteps that run in parallel.
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
T * GetMutable(bool *is_new_object=nullptr)
Gets a mutable pointer to the stored object.
Definition: blob.h:101