Caffe2 - C++ API
A deep learning, cross platform ML framework
recurrent_network_op.h
1 #ifndef CAFFE2_OPERATORS_RECURRENT_NETWORK_OP_H_
2 #define CAFFE2_OPERATORS_RECURRENT_NETWORK_OP_H_
3 
4 #include "caffe2/core/context.h"
5 #include "caffe2/core/logging.h"
6 #include "caffe2/core/operator.h"
7 #include "caffe2/core/tensor.h"
8 #include "caffe2/operators/rnn/recurrent_network_executor.h"
9 #include "caffe2/utils/conversions.h"
10 #include "caffe2/utils/math.h"
11 
12 CAFFE2_DECLARE_bool(caffe2_rnn_executor);
13 
14 namespace caffe2 {
15 namespace detail {
16 
17 struct Param {
18  std::string param;
19  std::string grad;
20  std::string cellGradient;
21 };
22 
24  std::string state;
25  std::string input;
26 };
27 
29  std::string param;
30  std::string grad;
31  std::string externalGrad;
32  std::string lastExternalGrad;
33  int32_t offset;
34 };
35 
36 struct OffsetAlias {
37  std::string src;
38  std::string dst;
39  int32_t offset{0};
40 };
41 
42 struct Link {
43  std::string internal;
44  std::string external;
45  int32_t offset{0};
46  int32_t window{1};
47 };
48 
50  std::vector<std::shared_ptr<Workspace>> stepWorkspaces;
51  std::shared_ptr<Workspace> sharedBlobsWs = nullptr;
52 };
53 
54 inline void UpdateTimestepBlob(Workspace* ws, std::string blob_name, int t) {
55  ws->CreateBlob(blob_name)->GetMutable<TensorCPU>()->Resize(1);
56  auto timestepBlob = ws->GetBlob(blob_name);
57  CAFFE_ENFORCE(timestepBlob);
58  timestepBlob->GetMutable<TensorCPU>()->mutable_data<int32_t>()[0] = t;
59 }
60 
61 std::map<string, string> GetRecurrentMapping(
62  const std::vector<detail::Link>& links, bool backward);
63 
64 template <typename T, typename Context>
65 void applyOffsetAlias(
66  const OffsetAlias& oc,
67  Workspace* ws,
68  Context* /*context*/) {
69  VLOG(1) << "Aliasing: " << oc.src << " to: " << oc.dst
70  << " at offset: " << oc.offset;
71  auto srcBlob = ws->GetBlob(oc.src);
72  CAFFE_ENFORCE(srcBlob);
73  auto* src = srcBlob->template GetMutable<Tensor<Context>>();
74  auto* dst = ws->GetBlob(oc.dst)->template GetMutable<Tensor<Context>>();
75  auto timestep = src->size() / src->dim(0);
76  auto dims = src->dims();
77  const int32_t startDstTimestep =
78  oc.offset >= 0 ? oc.offset : src->dim(0) + oc.offset;
79  const int32_t numDstTimesteps = src->dim(0) - startDstTimestep;
80  CAFFE_ENFORCE(
81  numDstTimesteps >= 1, "Invalid number of timesteps: ", numDstTimesteps);
82  dims[0] = numDstTimesteps;
83  dst->Resize(dims);
84  CAFFE_ENFORCE(timestep == dst->size() / numDstTimesteps, "Invalid offset");
85  dst->ShareExternalPointer(
86  src->template mutable_data<T>() + startDstTimestep * timestep,
87  dst->size());
88 }
89 
90 template <typename T, class Context>
91 void repeatCopy(
92  size_t repeat_n,
93  size_t n,
94  const T* src,
95  T* dst,
96  Context* context) {
97  for (int i = 0; i < repeat_n; ++i) {
98  context->template Copy<T, Context, Context>(n, src, dst + i * n);
99  }
100 }
101 
106 template <typename T, typename Context>
107 void initializeRecurrentInput(
108  const RecurrentInput& rc,
109  int32_t seqLen,
110  int32_t batchSize,
111  Workspace* ws,
112  Context* context) {
113  auto stateBlob = ws->GetBlob(rc.state);
114  CAFFE_ENFORCE(stateBlob);
115  auto* state = stateBlob->template GetMutable<Tensor<Context>>();
116 
117  auto inputBlob = ws->GetBlob(rc.input);
118  CAFFE_ENFORCE(inputBlob);
119  const auto& input = inputBlob->template Get<Tensor<Context>>();
120  CAFFE_ENFORCE_GE(input.ndim(), 1, rc.input);
121  CAFFE_ENFORCE_LE(input.ndim(), 3, rc.input);
122 
123  const auto stateSize = input.dim(input.ndim() - 1);
124  // Sometimes we want to provide more than one initial step.
125  // For example, if we do a convolution op in step net
126  // and need a sufficient left padding around the input.
127  // This could be used together with links where window != 1.
128  auto initialStateLength = 1;
129  if (input.ndim() == 3) {
130  initialStateLength = input.dim(0);
131  }
132  // States at [0, ..., (T + initialStateLength - 1)] (inclusive)
133  state->Resize(seqLen + initialStateLength, batchSize, stateSize);
134 
135  if (input.ndim() >= 2) {
136  CAFFE_ENFORCE_EQ(input.dim(input.ndim() - 2), batchSize, rc.input);
137  context->template Copy<T, Context, Context>(
138  batchSize * stateSize * initialStateLength,
139  input.template data<T>(),
140  state->template mutable_data<T>());
141  } else {
142  // Usually, the initial state is the same for all inputs in the batch.
143  // So the op conveniently accepts 1-D input and copies it batchSize times.
144  repeatCopy<T, Context>(
145  batchSize,
146  stateSize,
147  input.template data<T>(),
148  state->template mutable_data<T>(),
149  context);
150  }
151 }
152 
153 void PrependOps(std::vector<OperatorDef> ops, NetDef* netdef);
154 
155 void AddApplyLinkOps(
156  const vector<Link>& links,
157  std::string timestep,
158  const DeviceOption& device_option,
159  NetDef* netdef);
160 
161 void extractLinks(
162  OperatorBase* op,
163  const std::string& internalArg,
164  const std::string& externalArg,
165  const std::string& offsetArg,
166  const std::string& windowArg,
167  std::vector<detail::Link>* links);
168 
169 NetDef extractNetDef(const OperatorDef& op, const std::string& argName);
170 } // namespace detail
171 
172 template <class Context>
173 class RecurrentNetworkOp final : public Operator<Context> {
174  public:
175  USE_OPERATOR_CONTEXT_FUNCTIONS;
176  RecurrentNetworkOp(const OperatorDef& operator_def, Workspace* ws)
177  : Operator<Context>(operator_def, ws),
178  sharedWs_(ws),
179  enable_rnn_executor_(OperatorBase::template GetSingleArgument<bool>(
180  "enable_rnn_executor",
181  false)),
182  timestep_(OperatorBase::template GetSingleArgument<std::string>(
183  "timestep",
184  "timestep")) {
185  CAFFE_ENFORCE(ws);
186 
187  stepNetDef_ = detail::extractNetDef(operator_def, "step_net");
188 
189  recurrentInputs_ = constructRecurrentInputs(operator_def, sharedWs_);
190  links_ = constructLinks();
191  aliases_ = constructAliases();
192 
193  stepNetDef_.add_external_input(timestep_);
194  detail::AddApplyLinkOps(
195  links_, timestep_, operator_def.device_option(), &stepNetDef_);
196 
197  if (FLAGS_caffe2_rnn_executor && enable_rnn_executor_) {
198  VLOG(1) << "Use RecurrentNetworkExecutor";
199  auto recurrent_map = detail::GetRecurrentMapping(links_, false /* backward */);
200  rnnExecutor_ =
201  createRNNExecutor<Context>(
202  stepNetDef_,
203  recurrent_map,
204  timestep_,
205  ArgumentHelper(operator_def));
206  } else {
207  // Fix for legacy models that pass "rnn" type net
208  if (stepNetDef_.type() == "rnn") {
209  stepNetDef_.set_type("async_simple");
210  }
211  CAFFE_ENFORCE(stepNetDef_.type() != "async_dag");
212  }
213  }
214 
215  size_t NumObservers() override {
216  size_t num = this->observers_list_.size();
217  if (rnnExecutor_) {
218  num += rnnExecutor_->NumObserversStepNet();
219  }
220  return num;
221  }
222 
223  std::vector<detail::RecurrentInput> constructRecurrentInputs(
224  const OperatorDef& operator_def,
225  Workspace* sharedWs) {
226  const auto states =
227  OperatorBase::GetRepeatedArgument<std::string>("recurrent_states");
228  const auto inputs =
229  OperatorBase::GetRepeatedArgument<int>("initial_recurrent_state_ids");
230  CAFFE_ENFORCE_EQ(states.size(), inputs.size(), "states/inputs mismatch");
231  std::vector<detail::RecurrentInput> ris;
232  for (auto i = 0; i < states.size(); ++i) {
233  // States need to be "global" (since they are shared between
234  // forward and backward).
235  sharedWs->CreateBlob(states[i]);
236 
238  ri.state = states[i];
239  ri.input = operator_def.input(inputs[i]);
240  ris.push_back(ri);
241  }
242  return ris;
243  }
244 
245  std::vector<detail::OffsetAlias> constructAliases() {
246  const auto& src =
247  OperatorBase::GetRepeatedArgument<std::string>("alias_src");
248  const auto& dst =
249  OperatorBase::GetRepeatedArgument<std::string>("alias_dst");
250  const auto& offset =
251  OperatorBase::GetRepeatedArgument<int32_t>("alias_offset");
252  CAFFE_ENFORCE(
253  src.size() == offset.size(), "alias_src/alias_offset mismatch");
254  CAFFE_ENFORCE(
255  dst.size() == offset.size(), "alias_dst/alias_offset mismatch");
256  std::vector<detail::OffsetAlias> aliases;
257  for (auto i = 0; i < src.size(); ++i) {
259  oc.src = src[i];
260  oc.dst = dst[i];
261  oc.offset = offset[i];
262  aliases.push_back(oc);
263  }
264  return aliases;
265  }
266 
274  std::vector<std::string> v;
275  const auto& blobs = OperatorBase::GetRepeatedArgument<std::string>(
276  "recompute_blobs_on_backward", v);
277  for (const auto& b : blobs) {
278  // Note: if the blob already was created, this is a no-op.
279  sharedBlobsWs->CreateBlob(b);
280  }
281  }
282 
283  std::vector<detail::Link> constructLinks() {
284  std::vector<detail::Link> links;
285  detail::extractLinks(
286  this,
287  "link_internal",
288  "link_external",
289  "link_offset",
290  "link_window",
291  &links);
292  return links;
293  }
294 
295  template<typename T>
296  bool DoRunWithType() {
297  const auto seqLen = Input(0).dim32(0);
298  const auto batchSize = Input(0).dim32(1);
299  for (const auto& ri : recurrentInputs_) {
300  detail::initializeRecurrentInput<T, Context>(
301  ri, seqLen, batchSize, sharedWs_, &context_);
302  }
303 
304  // If we don't have a backward step net, this operator is forward_only
305  // and we can avoid creating multiple workspaces.
306  bool has_backward_pass =
307  OperatorBase::HasSingleArgumentOfType<NetDef>("backward_step_net") ||
308  (OperatorBase::HasSingleArgumentOfType<string>("backward_step_net") &&
309  OperatorBase::GetSingleArgument<string>("backward_step_net", "") !=
310  "");
311 
312  // With backward pass: we need to create workspace for each timestep
313  detail::ScratchWorkspaces* scratch =
314  OperatorBase::Output<detail::ScratchWorkspaces>(OutputSize() - 1);
315  std::vector<std::shared_ptr<Workspace>>& stepWorkspaces =
316  scratch->stepWorkspaces;
317  std::shared_ptr<Workspace>& sharedBlobsWs = scratch->sharedBlobsWs;
318  if (!sharedBlobsWs) {
319  sharedBlobsWs = std::make_shared<Workspace>(sharedWs_);
320  }
321 
322  // Caller can decide that some of the forward activations
323  // are recomputed on backward pass. Then those activations do not
324  // have to be stored in step workspaces but can be shared.
325  initializeBlobsToRecomputeOnBackward(sharedBlobsWs.get());
326 
327  if (has_backward_pass && seqLen > stepWorkspaces.size()) {
328  stepWorkspaces.resize(seqLen);
329  }
330 
331  // In forward-only mode, we cycle over workspaces. This limits the amount
332  // of parallelism over timesteps that the RNNExecutor provides. So with
333  // RNN executor we use more workspaces to get better perf.
334  int num_workspaces_on_fwd_only = rnnExecutor_ ? 4 : 2;
335 
336  if (!has_backward_pass && stepWorkspaces.size() < num_workspaces_on_fwd_only) {
337  // Use alternating stepWorkspaces when forward_only=True.
338  // Note that the step workspaces can be shared by other ops, thus
339  // we cannot shrink it to 2 if there are more than 2 step workspaces.
340  stepWorkspaces.resize(num_workspaces_on_fwd_only);
341  }
342 
343  for (auto t = 0; t < seqLen; ++t) {
344  auto& currentStepWorkspace =
345  (has_backward_pass ? stepWorkspaces[t] :
346  stepWorkspaces[t % num_workspaces_on_fwd_only]);
347  if (!currentStepWorkspace) {
348  currentStepWorkspace = std::make_shared<Workspace>(sharedBlobsWs.get());
349  }
350 
351  if (rnnExecutor_) {
352  if (!has_backward_pass) {
353  // Need to limit timestep parallelism because we cycle over workspaces
354  rnnExecutor_->SetMaxParallelTimesteps(num_workspaces_on_fwd_only);
355  }
356  rnnExecutor_->EnsureTimestepInitialized(
357  t, currentStepWorkspace.get(), this->observers_list_);
358  } else {
359  // Use plain Caffe2 nets
360  detail::UpdateTimestepBlob(currentStepWorkspace.get(), timestep_, t);
361  auto* stepNet = currentStepWorkspace->GetNet(stepNetDef_.name());
362  if (stepNet == nullptr) {
363  stepNet = currentStepWorkspace->CreateNet(stepNetDef_);
364  }
365  CAFFE_ENFORCE(stepNet, "Step Net construction failure");
366  // Since we have a SimpleNet, there are no races here.
367  stepNet->RunAsync();
368  }
369  }
370 
371  if (rnnExecutor_) {
372  rnnExecutor_->Run(seqLen);
373  }
374 
375  for (const auto& alias : aliases_) {
376  detail::applyOffsetAlias<T, Context>(alias, sharedWs_, &context_);
377  }
378 
379  return true;
380  }
381 
382  bool RunOnDevice() override {
383  return DoRunWithType<float>();
384  }
385 
386  protected:
387  NetDef stepNetDef_;
388  Workspace* sharedWs_;
389  bool enable_rnn_executor_;
390  std::unique_ptr<RecurrentNetworkExecutorBase> rnnExecutor_;
391 
392  std::vector<detail::Link> links_;
393  std::vector<detail::OffsetAlias> aliases_;
394  std::vector<detail::RecurrentInput> recurrentInputs_;
395  std::string timestep_;
396 };
397 
398 template <class Context>
399 class RecurrentNetworkGradientOp final : public Operator<Context> {
400  public:
401  USE_OPERATOR_CONTEXT_FUNCTIONS;
402  RecurrentNetworkGradientOp(const OperatorDef& operator_def, Workspace* ws)
403  : Operator<Context>(operator_def, ws),
404  sharedWs_(ws),
405  enable_rnn_executor_(OperatorBase::template GetSingleArgument<bool>(
406  "enable_rnn_executor",
407  false)),
408  timestep_(OperatorBase::template GetSingleArgument<std::string>(
409  "timestep",
410  "timestep")),
411  gradInputs_(OperatorBase::template GetRepeatedArgument<int32_t>(
412  "outputs_with_grads")) {
413  CAFFE_ENFORCE(ws);
414 
415  stepNetDef_ = detail::extractNetDef(operator_def, "backward_step_net");
416 
417  links_ = constructLinks();
418  params_ = constructParams(operator_def);
419  recurrentGradients_ = constructRecurrentGradients(operator_def);
420  recurrentInputIds_ = OperatorBase::template GetRepeatedArgument<int32_t>(
421  "initial_recurrent_state_ids");
422 
423  /* Add operators to the backward step net to handle accumulation of
424  gradients over timesteps
425  */
426  stepNetDef_.add_external_input(timestep_);
427 
428  AddGradientInputAccumulationOps(operator_def);
429  detail::AddApplyLinkOps(
430  links_, timestep_, operator_def.device_option(), &stepNetDef_);
431  AddParamGradientAccumulationOps(operator_def);
432 
433  if (FLAGS_caffe2_rnn_executor && enable_rnn_executor_) {
434  InitializeExecutor(operator_def);
435  }
436  }
437 
438  // Renaming maps (generated by memonger.py)
439  std::string remappedName(std::string blob_name) {
440  return OperatorBase::template GetSingleArgument<std::string>(
441  blob_name + ".rename", blob_name);
442  }
443 
444  detail::Link remappedLink(const detail::Link& link) {
445  detail::Link renamed_link = link;
446  renamed_link.internal = remappedName(link.internal);
447  renamed_link.external = remappedName(link.external);
448  return renamed_link;
449  }
450 
451  void renameOpInputOutput(std::string from_name, std::string to_name) {
452  for (int j = 0; j < stepNetDef_.op_size(); j++) {
453  auto* op = stepNetDef_.mutable_op(j);
454  for (int i = 0; i < op->input_size(); i++) {
455  if (op->input(i) == from_name) {
456  op->set_input(i, to_name);
457  }
458  }
459  for (int i = 0; i < op->output_size(); i++) {
460  if (op->output(i) == from_name) {
461  op->set_output(i, to_name);
462  }
463  }
464  }
465  }
466 
467  std::vector<detail::Param> constructParams(const OperatorDef& operator_def) {
468  std::vector<detail::Param> params;
469  const auto& param = OperatorBase::GetRepeatedArgument<int32_t>("param");
470  const auto& param_grads =
471  OperatorBase::GetRepeatedArgument<string>("param_grads");
472  CAFFE_ENFORCE(
473  param_grads.empty() || param_grads.size() == param.size(),
474  param.size(),
475  " != ",
476  param_grads.size());
477  for (int i = 0; i < param.size(); ++i) {
478  detail::Param p;
479  // Forward inputs come after [outputs_with_grads] gradient inputs
480  p.param = operator_def.input(param[i] + gradInputs_.size());
481  // See GetRecurrentNetworkGradient to understand offseting here
482  p.grad = operator_def.output(i + numSequences_);
483 
484  std::string grad_blob =
485  param_grads.empty() ? p.grad : remappedName(param_grads[i]);
486  p.cellGradient = grad_blob + "_tmpstep";
487  params.push_back(p);
488 
489  renameOpInputOutput(grad_blob, p.cellGradient);
490  }
491  return params;
492  }
493 
494  std::vector<detail::RecurrentGradient> constructRecurrentGradients(
495  const OperatorDef& operator_def) {
496  std::vector<detail::RecurrentGradient> rgs;
497  const auto& recurrent =
498  OperatorBase::GetRepeatedArgument<std::string>("recurrent_states");
499  const auto& alias_src =
500  OperatorBase::GetRepeatedArgument<std::string>("alias_src");
501  const auto& offset =
502  OperatorBase::GetRepeatedArgument<int32_t>("alias_offset");
503 
504  for (auto i = 0; i < recurrent.size(); ++i) {
506  rg.param = recurrent[i];
507  rg.grad = remappedName(recurrent[i] + "_grad");
508 
509  for (int j = 0; j < alias_src.size(); ++j) {
510  if (alias_src[j] != recurrent[i]) {
511  continue;
512  }
513  int idx = -1;
514  for (int k = 0; k < gradInputs_.size(); ++k) {
515  if (gradInputs_[k] == j) {
516  idx = k;
517  }
518  }
519  if (idx == -1) {
520  continue;
521  }
522 
523  CAFFE_ENFORCE(offset[j] == 1 || offset[j] == -1);
524  if (offset[j] == 1) {
525  rg.externalGrad = operator_def.input(idx);
526  } else if (offset[j] == -1) {
527  rg.lastExternalGrad = operator_def.input(idx);
528  }
529  }
530  rg.offset = 1;
531  rgs.push_back(rg);
532  }
533  return rgs;
534  }
535 
536  std::vector<detail::Link> constructLinks() {
537  std::vector<detail::Link> links;
538  detail::extractLinks(
539  this,
540  "link_internal",
541  "link_external",
542  "link_offset",
543  "link_window",
544  &links);
545  detail::extractLinks(
546  this,
547  "backward_link_internal",
548  "backward_link_external",
549  "backward_link_offset",
550  "",
551  &links);
552  for (int i = 0; i < links.size(); i++) {
553  links[i] = remappedLink(links[i]);
554  }
555  return links;
556  }
557 
558  void InitializeExecutor(const OperatorDef& operator_def) {
559  VLOG(1) << "Use RecurrentNetworkExecutor for backward";
560  auto recurrent_map = detail::GetRecurrentMapping(links_, true /* backward */);
561  rnnExecutor_ = createRNNExecutor<Context>(
562  stepNetDef_, recurrent_map, timestep_, ArgumentHelper(operator_def));
563  }
564 
565  void AddGradientInputAccumulationOps(const OperatorDef& operator_def) {
569  std::vector<OperatorDef> ops;
570  for (const auto& rg : recurrentGradients_) {
571  if (rg.externalGrad.empty()) {
572  continue;
573  }
574  VLOG(1) << "Accumulating into: " << rg.grad << " from " << rg.externalGrad
575  << ", offset: " << rg.offset;
576 
577  OperatorDef opdef;
578  opdef.set_type("rnn_internal_accumulate_gradient_input");
579  opdef.add_input(timestep_);
580  opdef.add_input(rg.externalGrad);
581  opdef.add_input(rg.grad);
582  opdef.add_output(rg.grad);
583 
584  // Add also the linked blobs to outputs, to ensure correct
585  // chaining.
586  for (auto& l : links_) {
587  if (rg.grad == l.external) {
588  Argument* dep_arg = opdef.add_arg();
589  dep_arg->set_name("rnn_dependency." + l.internal);
590  dep_arg->set_s(l.internal);
591  }
592  }
593 
594  opdef.mutable_device_option()->CopyFrom(operator_def.device_option());
595 
596  Argument* offset_arg = opdef.add_arg();
597  offset_arg->set_name("offset");
598  offset_arg->set_i(rg.offset);
599  ops.push_back(opdef);
600 
601  stepNetDef_.add_external_input(rg.externalGrad);
602  stepNetDef_.add_external_input(rg.grad);
603  }
604  detail::PrependOps(ops, &stepNetDef_);
605  }
606 
607  void AddParamGradientAccumulationOps(const OperatorDef& operator_def) {
608  // If a user passes in param_grads mapping, we can copy dirrectly
609  // form a blob where backward cell net written data to.
610  // This becomes handy in a case where gradient from the cell net
611  // is an internal blob of the backward cell. This happens, for example,
612  // when SumOp is the first op of the cell
613  for (const auto& param : params_) {
614  OperatorDef opdef;
615  opdef.set_type("Sum");
616  opdef.add_input(param.grad);
617  opdef.add_input(param.cellGradient);
618  opdef.add_output(param.grad);
619  opdef.mutable_device_option()->CopyFrom(operator_def.device_option());
620  stepNetDef_.add_op()->CopyFrom(opdef);
621  stepNetDef_.add_external_input(param.grad);
622  }
623  }
624 
626  const std::shared_ptr<Workspace>& step0Ws,
627  Workspace* sharedBlobsWs) {
632  for (auto& op : stepNetDef_.op()) {
633  for (const string& outp : op.output()) {
634  if (!step0Ws->HasBlob(outp)) {
635  sharedBlobsWs->CreateBlob(outp);
636  }
637  }
638  }
639  }
640 
641  template<typename T>
642  bool DoRunWithType() {
643  const auto seqLen = Input(gradInputs_.size()).dim32(0);
644  VLOG(1) << "seqLen: " << seqLen;
645 
646  const detail::ScratchWorkspaces& scratch =
647  OperatorBase::Input<detail::ScratchWorkspaces>(InputSize() - 1);
648  const std::vector<std::shared_ptr<Workspace>>& stepWorkspaces =
649  scratch.stepWorkspaces;
650  CAFFE_ENFORCE_GE(stepWorkspaces.size(), seqLen);
651  Workspace& sharedBlobsWs = *scratch.sharedBlobsWs.get();
652 
653  const auto batchSize = Input(0).dim32(1);
654  for (auto& param : params_) {
655  auto pBlob = sharedWs_->GetBlob(param.param);
656  CAFFE_ENFORCE(pBlob);
657  const auto& p = pBlob->template Get<Tensor<Context>>();
658 
659  auto gBlob = sharedWs_->GetBlob(param.grad);
660  CAFFE_ENFORCE(gBlob);
661  auto* g = gBlob->template GetMutable<Tensor<Context>>();
662  g->ResizeLike(p);
663  math::Set<T, Context>(
664  g->size(),
665  convert::To<float,T>(0.0),
666  g->template mutable_data<T>(),
667  &context_);
668  }
669 
670  for (auto& rg : recurrentGradients_) {
671  auto pBlob = sharedWs_->GetBlob(rg.param);
672  CAFFE_ENFORCE(pBlob);
673  const auto& p = pBlob->template Get<Tensor<Context>>();
674 
675  auto gBlob = sharedWs_->CreateBlob(rg.grad);
676  CAFFE_ENFORCE(gBlob);
677  auto* g = gBlob->template GetMutable<Tensor<Context>>();
678  g->ResizeLike(p);
679  CAFFE_ENFORCE_EQ(g->ndim(), 3);
680  const auto timestep = g->size() / g->dim(0);
681  // Fill the last timestep with zeros for the gradient
682  math::Set<T, Context>(
683  timestep,
684  convert::To<float,T>(0.0),
685  g->template mutable_data<T>() + (g->dim(0) - 1) * timestep,
686  &context_);
687  }
688 
689  // This code assumes that there are several inputs
690  // sequences. Actually it is not supported by the rest of the code,
691  // and numSequences_ is a constant, equal to 1.
692  for (int i = 0; i < numSequences_; ++i) {
693  // Offseting as the first gradInputs_.size() inputs of the op
694  // are from GO. Then all I(0..N).
695  const int gradientInputIndex = i + gradInputs_.size();
696  const auto& inputName = this->debug_def().input(gradientInputIndex);
697  auto gradientName = remappedName(inputName + "_grad");
698  VLOG(1) << "Initializing gradient for input " << gradientInputIndex
699  << " (" << inputName << ") "
700  << " as blob " << gradientName
701  << ". Size: " << Input(gradientInputIndex).size();
702  auto pGradientBlob = sharedWs_->GetBlob(gradientName);
703  CAFFE_ENFORCE(pGradientBlob);
704  auto* g = pGradientBlob->template GetMutable<Tensor<Context>>();
705  g->ResizeLike(Input(gradientInputIndex));
706  g->template mutable_data<T>();
707  }
708 
709  auto accumulateFinalInputGradients = [&]() {
710  for (const auto& rg : recurrentGradients_) {
711  if (rg.lastExternalGrad.empty()) {
712  continue;
713  }
714  VLOG(1) << "Accumulating into: " << rg.grad << " from "
715  << rg.lastExternalGrad << " for final time step (sep. blob)";
716  auto gBlob = sharedWs_->GetBlob(rg.grad);
717  CAFFE_ENFORCE(gBlob);
718  auto* g = gBlob->template GetMutable<Tensor<Context>>();
719 
720  auto oglastBlob = sharedWs_->GetBlob(rg.lastExternalGrad);
721  CAFFE_ENFORCE(oglastBlob);
722  const auto& oglast = oglastBlob->template Get<Tensor<Context>>();
723  CAFFE_ENFORCE_EQ(g->dim(1), oglast.dim(1));
724  CAFFE_ENFORCE_EQ(g->dim(2), oglast.dim(2));
725 
726  const auto t = g->dim(0) - 1;
727  const auto timestep_size = g->size() / g->dim(0);
728  CAFFE_ENFORCE_EQ(timestep_size, oglast.size());
729  T* g_data_with_offset =
730  g->template mutable_data<T>() + t * timestep_size;
731  math::Add<T, Context>(
732  timestep_size,
733  oglast.template data<T>(),
734  g_data_with_offset,
735  g_data_with_offset,
736  &context_);
737  }
738  };
739 
740  accumulateFinalInputGradients();
741 
742  // Create shared blobs for blobs that can be shared between
743  // all timesteps.
744  if (stepWorkspaces.size() > 0) {
745  CreateSharedBlobs(stepWorkspaces[0], &sharedBlobsWs);
746  }
747  for (int32_t t = seqLen - 1; t >= 0; --t) {
748  if (rnnExecutor_) {
749  rnnExecutor_->EnsureTimestepInitialized(
750  t, stepWorkspaces[t].get(), this->observers_list_);
751  } else {
752  auto* stepNet = stepWorkspaces[t].get()->GetNet(stepNetDef_.name());
753  if (stepNet == nullptr) {
754  stepNet = stepWorkspaces[t].get()->CreateNet(stepNetDef_);
755  }
756  CAFFE_ENFORCE(stepNet);
757  stepNet->RunAsync();
758  }
759  }
760 
761  if (rnnExecutor_) {
762  rnnExecutor_->RunBackwards(seqLen);
763  }
764 
765  CAFFE_ENFORCE_EQ(recurrentInputIds_.size(), recurrentGradients_.size());
766  for (int i = 0; i < recurrentInputIds_.size(); ++i) {
767  // See GetRecurrentNetworkGradient to understand offseting here
768  // Outputs of the gradient are inputs of the forward pass.
769  // So we need to offset on all inputs that go before recurrent
770  // initial ones
771  auto outputIdx = i + params_.size() + numSequences_;
772  // because first gradInputs_.size() inputs are from GO
773  int inputId = recurrentInputIds_[i] + gradInputs_.size();
774  VLOG(1) << "Resetting output " << this->debug_def().output(outputIdx)
775  << " like input " << this->debug_def().input(inputId);
776  Output(outputIdx)->ResizeLike(Input(inputId));
777  T* output_data = Output(outputIdx)->template mutable_data<T>();
778  auto pBlob = sharedWs_->GetBlob(recurrentGradients_[i].grad);
779  CAFFE_ENFORCE(pBlob);
780  auto* p = pBlob->template GetMutable<Tensor<Context>>();
781 
782  if (Input(inputId).ndim() >= 2) {
783  // Gradient states blob should live. And if it gets changed by the
784  // backward pass, then output should be changed as well. Thus it should
785  // be okay to share data here
786  Output(outputIdx)->template ShareExternalPointer<T>(
787  p->template mutable_data<T>());
788  } else {
789  // We need to do a bunch of Adds any way. So lets not worry about
790  // copy / share data here. One way to speed this up could be a kernel
791  // which sums up several tensors together instead of going 1 by 1
792  const auto recurrentStateSize = Input(inputId).dim32(0);
793 
794  math::Set<T, Context>(
795  recurrentStateSize,
796  convert::To<float,T>(0.0),
797  output_data,
798  &context_);
799 
800  math::AddStripedBatch<T, Context>(
801  recurrentStateSize,
802  p->template data<T>(),
803  output_data,
804  recurrentStateSize,
805  batchSize,
806  &context_);
807  }
808  }
809 
810  return true;
811  }
812 
813  bool RunOnDevice() override {
814  return DoRunWithType<float>();
815  }
816 
817  protected:
818  NetDef stepNetDef_;
819  Workspace* sharedWs_;
820  bool enable_rnn_executor_;
821  std::unique_ptr<RecurrentNetworkExecutorBase> rnnExecutor_;
822  std::vector<detail::Link> links_;
823  std::vector<detail::Param> params_;
824  std::vector<detail::RecurrentGradient> recurrentGradients_;
825  std::string timestep_;
826  // For now we support only one input sequence
827  const int numSequences_{1};
828  std::vector<int32_t> recurrentInputIds_;
829  std::vector<int32_t> gradInputs_;
830 };
831 
832 template <class Context>
833 class AccumulateInputGradientOp : public Operator<Context> {
834  public:
835  AccumulateInputGradientOp(const OperatorDef& def, Workspace* ws)
836  : Operator<Context>(def, ws),
837  offset_(OperatorBase::GetSingleArgument<int>("offset", -1)) {
838  CAFFE_ENFORCE(offset_ >= 0, "Offset not set");
839  }
840  USE_OPERATOR_CONTEXT_FUNCTIONS;
841 
842  template<typename T>
843  bool DoRunWithType() {
844  const auto& t0 = OperatorBase::Input<Tensor<CPUContext>>(0);
845  const auto t = t0.template data<int32_t>()[0];
846  auto& og = Input(1);
847  auto* g = Output(0);
848 
849  T* g_data = g->template mutable_data<T>();
850  const auto timestep_size = g->size() / g->dim(0);
851 
852  CAFFE_ENFORCE(
853  (t + offset_) * timestep_size + timestep_size <= g->size(),
854  "Accumulation destination address over bounds");
855  CAFFE_ENFORCE(
856  t * timestep_size + timestep_size <= og.size(),
857  "Accumulation source address out of bounds");
858 
859  math::Add<T, Context>(
860  timestep_size,
861  og.template data<T>() + t * timestep_size,
862  g_data + (t + offset_) * timestep_size,
863  g_data + (t + offset_) * timestep_size,
864  &context_);
865  return true;
866  }
867 
868  bool RunOnDevice() override {
869  return DispatchHelper<TensorTypes<float>>::call(this, Input(1));
870  }
871 
872  private:
873  int offset_;
874 };
875 
876 template <class Context>
877 class RNNApplyLinkOp : public Operator<Context> {
878  public:
879  RNNApplyLinkOp(const OperatorDef& def, Workspace* ws)
880  : Operator<Context>(def, ws),
881  offset_(OperatorBase::GetSingleArgument<int>("offset", -1)),
882  window_(OperatorBase::GetSingleArgument<int>("window", -1)) {
883  CAFFE_ENFORCE(offset_ >= 0, "offset not set");
884  CAFFE_ENFORCE(window_ >= 0, "window not set");
885  }
886 
887  USE_OPERATOR_CONTEXT_FUNCTIONS;
888 
889  template <typename T>
890  bool DoRunWithType() {
891  // Both internal and external appear as both input and output to enforce
892  // correct dependency computation.
893  const auto& t0 = OperatorBase::Input<Tensor<CPUContext>>(0);
894  const auto t = t0.template data<int32_t>()[0];
895  auto& external = Input(1);
896 
897  auto* internal_out = Output(0);
898  auto* external_out = Output(1);
899 
900  CAFFE_ENFORCE_GT(external.size(), 0);
901  const TIndex externalTimestepSize = external.size() / external.dim(0);
902  auto* externalData = external_out->template mutable_data<T>() +
903  (t + offset_) * externalTimestepSize;
904  auto internalDims = external_out->dims();
905  internalDims[0] = window_;
906 
907  internal_out->Resize(internalDims);
908  internal_out->ShareExternalPointer(
909  externalData, externalTimestepSize * window_);
910  return true;
911  }
912 
913  bool RunOnDevice() override {
914  return DoRunWithType<float>();
915  }
916 
917  private:
918  int offset_;
919  int window_;
920 };
921 
922 } // namespace caffe2
923 
924 #endif // CAFFE2_OPERATORS_RECURRENT_NETWORK_OP_H_
void AddGradientInputAccumulationOps(const OperatorDef &operator_def)
Blob * CreateBlob(const string &name)
Creates a blob of the given name.
Definition: workspace.cc:104
void initializeBlobsToRecomputeOnBackward(Workspace *sharedBlobsWs)
Some blobs can be marked as to be recomputed on backward pass.
A helper class to index into arguments.
Definition: proto_utils.h:198
Workspace is a class that holds all the related objects created during runtime: (1) all blobs...
Definition: workspace.h:47
const Blob * GetBlob(const string &name) const
Gets the blob with the given name as a const pointer.
Definition: workspace.cc:164
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
T * GetMutable(bool *is_new_object=nullptr)
Gets a mutable pointer to the stored object.
Definition: blob.h:101
void CreateSharedBlobs(const std::shared_ptr< Workspace > &step0Ws, Workspace *sharedBlobsWs)