Caffe2 - C++ API
A deep learning, cross platform ML framework
dataset_ops.cc
1 #include "caffe2/operators/dataset_ops.h"
2 
3 #include <memory>
4 #include <mutex>
5 #include <string>
6 #include <vector>
7 #include "caffe2/core/blob_serialization.h"
8 #include "caffe2/core/operator.h"
9 #include "caffe2/core/tensor.h"
10 #include "caffe2/utils/string_utils.h"
11 
12 namespace caffe2 {
13 
14 CAFFE_KNOWN_TYPE(std::unique_ptr<dataset_ops::TreeCursor>);
15 CAFFE_KNOWN_TYPE(dataset_ops::TensorVectorPtr<CPUContext>);
16 CAFFE_KNOWN_TYPE(dataset_ops::SharedTensorVectorPtr);
17 
18 namespace dataset_ops {
19 namespace {
20 
21 const char kDatasetFieldSeparator = ':';
22 const char* kDatasetLengthField = "lengths";
23 
24 // how much percent to grow the dataset when needed
25 const int kDatasetGrowthPct = 40;
26 
27 } // namespace
28 
29 TreeIterator::TreeIterator(const std::vector<std::string>& fields) {
30  // populate field vector and split field names
31  fields_.resize(fields.size());
32  std::vector<std::vector<std::string>> nameParts(fields_.size());
33  for (int i = 0; i < fields.size(); ++i) {
34  auto& field = fields_.at(i);
35  field.name = fields[i];
36  field.id = i;
37  field.lengthFieldId = -1;
38  nameParts.at(i) = split(kDatasetFieldSeparator, field.name);
39  }
40 
41  // populate lengthFields
42  for (const auto& field : fields_) {
43  const auto& parts = nameParts.at(field.id);
44  if (!parts.empty() && parts.back() == kDatasetLengthField) {
45  lengthFieldIds_.push_back(field.id);
46  }
47  }
48 
49  // find length-field with maximum prefix matching for each field
50  for (auto& field : fields_) {
51  // by default, we are matching against the root domain
52  int maxMatchLevel = 1;
53  int maxMatchLengthFieldId = -1;
54  for (int j = 0; j < numLengthFields(); ++j) {
55  const auto& lenField = lengthField(j);
56  // a length field can't have itself as its length field
57  if (field.id == lenField.id) {
58  continue;
59  }
60  auto lf = nameParts.at(lenField.id);
61  auto lfEnd = lf.end() - 1;
62  // check whether this lengthField is a prefix for this field name
63  if (std::mismatch(lf.begin(), lfEnd, nameParts.at(field.id).begin())
64  .first != lfEnd) {
65  continue;
66  }
67  if (lf.size() > maxMatchLevel) {
68  maxMatchLevel = lf.size();
69  maxMatchLengthFieldId = j;
70  }
71  }
72  field.lengthFieldId = maxMatchLengthFieldId;
73  }
74 
75  // check that fields are topologically sorted
76  // (no length field depends on a length defined afterwards)
77  for (const auto& field : fields_) {
78  const auto* lengthField = lengthFieldFor(field);
79  CAFFE_ENFORCE(
80  (lengthField == nullptr) || (lengthField->id < field.id),
81  "Error: Field ",
82  field.id,
83  " (",
84  field.name,
85  ") ",
86  "depends on a field defined afterwards: ",
87  lengthField->id,
88  " (",
89  lengthField->name,
90  ").");
91  }
92 }
93 
94 void TreeIterator::advance(
95  const std::vector<const TLength*>& lengths,
96  std::vector<TOffset>& offsets,
97  std::vector<TOffset>& sizes,
98  std::vector<TOffset>& limits,
99  TOffset num) {
100  std::vector<TOffset> newOffsets;
101  CAFFE_ENFORCE_EQ(lengths.size(), numLengthFields());
102  CAFFE_ENFORCE_EQ(offsets.size(), numOffsetFields());
103  sizes.resize(offsets.size());
104  newOffsets.resize(offsets.size());
105  // first index, top level
106  {
107  auto limit = limits[0];
108  auto offset = offsets[0];
109  CAFFE_ENFORCE(limit >= offset, "Tried to advance past end of cursor.");
110  TOffset total = std::min(limit - offset, num);
111  sizes[0] = total;
112  newOffsets[0] = offset + total;
113  }
114  // child indices
115  for (int j = 1; j < numOffsetFields(); ++j) {
116  TOffset total = 0;
117  int parentOffsetId = offsetFieldIdFor(lengthField(j - 1));
118  const TLength* length = lengths[j - 1] + offsets[parentOffsetId];
119  for (int k = 0; k < sizes[parentOffsetId]; ++k) {
120  total += *(length++);
121  }
122  auto offset = offsets[j];
123  CAFFE_ENFORCE(
124  offset + total <= limits[j],
125  "Inconsistent field length: ",
126  "tried to advance past the end of field ",
127  j);
128  sizes[j] = total;
129  newOffsets[j] = offset + total;
130  }
131  offsets = newOffsets;
132 }
133 
134 TreeWalker::TreeWalker(const vector<const Blob*>& inputs, TreeCursor& cursor)
135  : inputs_(inputs), cursor_(cursor), sizes_(cursor.it.numOffsetFields()) {
136  CAFFE_ENFORCE_EQ(inputs.size(), cursor.it.fields().size());
137  if (cursor.offsets.empty()) {
138  cursor.offsets.assign(cursor.it.numOffsetFields(), 0);
139  }
140 
141  for (int fieldId = 0; fieldId < cursor_.it.fields().size(); ++fieldId) {
142  fields_.emplace_back(*this, fieldId);
143  }
144 
145  gatherLengthData();
146 
147  gatherSizeLimits();
148 
149  // The invariant we hold is that we are always one step ahead
150  advance();
151 }
152 
153 void TreeWalker::advance() {
154  prevOffsets_ = cursor_.offsets;
155  cursor_.it.advance(lengths_, cursor_.offsets, sizes_, limits_, 1);
156 }
157 
158 std::vector<TIndex> TreeWalker::fieldDim(int fieldId) const {
159  auto tensorDim = input(fieldId).dims();
160  tensorDim[0] = sizes_[lengthIdx(fieldId)];
161  return tensorDim;
162 }
163 
164 void* TreeWalker::fieldPtr(int fieldId) const {
165  auto& in = input(fieldId);
166  return (char*)in.raw_data() +
167  offset(fieldId) * in.size_from_dim(1) * in.meta().itemsize();
168 }
169 
170 void TreeWalker::gatherLengthData() {
171  static const TLength lenZero = 0;
172  lengths_.resize(cursor_.it.numLengthFields());
173  for (int i = 0; i < lengths_.size(); ++i) {
174  auto& in = input(cursor_.it.lengthField(i).id);
175  if (in.size() > 0) {
176  lengths_[i] = in.data<int>();
177  } else {
178  lengths_[i] = &lenZero;
179  }
180  }
181 }
182 
183 void TreeWalker::gatherSizeLimits() {
184  limits_.assign(sizes_.size(), std::numeric_limits<TOffset>::max());
185  for (auto fieldId = 0; fieldId < cursor_.it.fields().size(); ++fieldId) {
186  auto lengthFieldIdx = lengthIdx(fieldId);
187  limits_[lengthFieldIdx] =
188  std::min(limits_[lengthFieldIdx], (TOffset)input(fieldId).dims()[0]);
189  }
190 }
191 
192 namespace {
193 
194 class CreateTreeCursorOp : public Operator<CPUContext> {
195  public:
196  CreateTreeCursorOp(const OperatorDef& operator_def, Workspace* ws)
197  : Operator(operator_def, ws),
198  fields_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {}
199 
200  bool RunOnDevice() override {
201  *OperatorBase::Output<std::unique_ptr<TreeCursor>>(0) =
202  std::unique_ptr<TreeCursor>(new TreeCursor(TreeIterator(fields_)));
203  return true;
204  }
205 
206  private:
207  std::vector<std::string> fields_;
208 };
209 
210 class ResetCursorOp : public Operator<CPUContext> {
211  public:
212  ResetCursorOp(const OperatorDef& operator_def, Workspace* ws)
213  : Operator(operator_def, ws) {}
214 
215  bool RunOnDevice() override {
216  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
217  std::lock_guard<std::mutex> lock(cursor->mutex_);
218  cursor->offsets.clear();
219  return true;
220  }
221 };
222 
223 class CheckDatasetConsistencyOp : public Operator<CPUContext> {
224  public:
225  CheckDatasetConsistencyOp(const OperatorDef& operator_def, Workspace* ws)
226  : Operator(operator_def, ws),
227  iterator_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {}
228 
229  bool RunOnDevice() override {
230  std::vector<const TLength*> lengths;
231  std::vector<TOffset> limits;
232  std::vector<TOffset> sizes;
233  std::vector<TOffset> offsets;
234  CAFFE_ENFORCE(
235  InputSize() == iterator_.fields().size(),
236  "Invalid number of fields. Expected ",
237  iterator_.fields().size(),
238  ", got ",
239  InputSize());
240  sizes.resize(iterator_.numOffsetFields());
241  // gather length data
242  lengths.resize(iterator_.numLengthFields());
243  for (int i = 0; i < lengths.size(); ++i) {
244  lengths[i] = Input(iterator_.lengthField(i).id).data<TLength>();
245  }
246  // gather size limits
247  limits.assign(sizes.size(), std::numeric_limits<TOffset>::max());
248  for (int i = 0; i < iterator_.fields().size(); ++i) {
249  int lengthIdx = iterator_.fields()[i].lengthFieldId + 1;
250  CAFFE_ENFORCE_GT(Input(i).ndim(), 0);
251  TOffset size = (TOffset)Input(i).dims()[0];
252  if (limits[lengthIdx] == std::numeric_limits<TOffset>::max()) {
253  limits[lengthIdx] = size;
254  } else {
255  CAFFE_ENFORCE(
256  limits[lengthIdx] == size,
257  "Inconsistent sizes for fields belonging to same domain.",
258  " Field: ",
259  i,
260  " (",
261  iterator_.fields()[i].name,
262  "); Length field index: ",
263  lengthIdx,
264  "); Previous size: ",
265  limits[lengthIdx],
266  "; New size: ",
267  size);
268  }
269  }
270  // advance to the end
271  offsets.assign(sizes.size(), 0);
272  iterator_.advance(lengths, offsets, sizes, limits, limits[0]);
273  for (int i = 0; i < limits.size(); ++i) {
274  CAFFE_ENFORCE(limits[i] == offsets[i]);
275  }
276  return true;
277  }
278 
279  private:
280  TreeIterator iterator_;
281 };
282 
283 class PackRecordsOp : public Operator<CPUContext> {
284  public:
285  PackRecordsOp(const OperatorDef& operator_def, Workspace* ws)
286  : Operator(operator_def, ws),
287  fields_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {}
288 
289  bool RunOnDevice() override {
290  // There should be one input per field
291  CAFFE_ENFORCE_EQ(InputSize(), fields_.size());
292  CAFFE_ENFORCE_EQ(OutputSize(), 1);
293 
294  TreeCursor cursor((TreeIterator(fields_)));
295 
296  TreeWalker walker(Inputs(), cursor);
297 
298  Output(0)->Resize(walker.size());
299 
300  // Output(0)->raw_mutable_data(TypeMeta::Make<SharedTensorVectorPtr>()));
301  auto* dst = Output(0)->mutable_data<SharedTensorVectorPtr>();
302 
303  for (int batchId = 0; batchId < walker.size(); ++batchId) {
304  dst[batchId] = std::make_shared<std::vector<TensorCPU>>();
305  dst[batchId]->reserve(walker.fields().size());
306 
307  for (const auto& field : walker.fields()) {
308  dst[batchId]->emplace_back(field.dim());
309  auto& tensor = dst[batchId]->back();
310  context_.template CopyItems<CPUContext, CPUContext>(
311  field.meta(),
312  tensor.size(),
313  field.ptr() /* src */,
314  tensor.raw_mutable_data(field.meta()) /* dst */);
315  }
316 
317  walker.advance();
318  }
319 
320  return true;
321  }
322 
323  private:
324  std::vector<std::string> fields_;
325 };
326 
327 class UnPackRecordsOp : public Operator<CPUContext> {
328  public:
329  UnPackRecordsOp(const OperatorDef& operator_def, Workspace* ws)
330  : Operator(operator_def, ws),
331  fields_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {}
332 
333  bool RunOnDevice() override {
334  const auto* inputs = Input(0).template data<SharedTensorVectorPtr>();
335  const auto numRows = Input(0).size();
336 
337  CAFFE_ENFORCE_GE(numRows, 0);
338 
339  auto numTensors = OutputSize();
340 
341  // Precomputer the output sizes to avoid resizing
342  std::vector<std::vector<TIndex>> outputDims(numTensors);
343  std::vector<const TypeMeta*> metas(numTensors);
344 
345  CAFFE_ENFORCE(
346  numRows > 0 || InputSize() > 1,
347  "Unpacking empty record without shape will leave output blobs in "
348  "undefined state.");
349 
350  if (InputSize() == 1) {
351  getShapeAndMetaFromInput(outputDims, metas);
352  } else {
353  getShapeAndMetaFromPrototypeBlobs(outputDims, metas);
354  }
355 
356  for (int i = 0; i < numRows; ++i) {
357  CAFFE_ENFORCE(inputs[i]);
358  for (int j = 0; j < inputs[i]->size(); ++j) {
359  const auto& input = inputs[i]->at(j);
360 
361  // Checks to ensure that dimensions/sizes match
362  CAFFE_ENFORCE_EQ(outputDims[j].size(), input.ndim());
363  CAFFE_ENFORCE(*metas[j] == input.meta());
364  // We look from first dimension, because we concat on the first.
365  for (int k = 1; k < input.ndim(); ++k) {
366  CAFFE_ENFORCE_EQ(input.dims()[k], outputDims[j][k]);
367  }
368 
369  outputDims[j][0] += input.dim(0);
370  }
371  }
372 
373  // Resize to the final output size
374  std::vector<void*> destinations(numTensors);
375  for (int i = 0; i < numTensors; ++i) {
376  Output(i)->Resize(outputDims[i]);
377  destinations[i] = Output(i)->raw_mutable_data(*metas[i]);
378  }
379 
380  for (int i = 0; i < numRows; ++i) {
381  for (int j = 0; j < numTensors; ++j) {
382  const auto& input = inputs[i]->at(j);
383 
384  context_.CopyItems<CPUContext, CPUContext>(
385  *metas[j],
386  input.size(),
387  input.raw_data() /* src */,
388  destinations[j] /* dst */
389  );
390 
391  destinations[j] =
392  (char*)destinations[j] + input.size() * input.itemsize();
393  }
394  }
395 
396  return true;
397  }
398 
399  private:
400  void getShapeAndMetaFromInput(
401  std::vector<std::vector<TIndex>>& outputDims,
402  std::vector<const TypeMeta*>& metas) {
403  const auto* inputs = Input(0).template data<SharedTensorVectorPtr>();
404 
405  const auto& inputZero = inputs[0];
406  CAFFE_ENFORCE(inputZero);
407 
408  const auto numTensors = inputZero->size();
409 
410  CAFFE_ENFORCE_EQ(numTensors, fields_.size());
411  CAFFE_ENFORCE_EQ(numTensors, OutputSize());
412 
413  for (int i = 0; i < numTensors; ++i) {
414  outputDims[i] = inputZero->at(i).dims();
415  outputDims[i][0] = 0;
416  metas[i] = &inputZero->at(i).meta();
417  }
418  }
419 
420  void getShapeAndMetaFromPrototypeBlobs(
421  std::vector<std::vector<TIndex>>& outputDims,
422  std::vector<const TypeMeta*>& metas) {
423  const auto numTensors = fields_.size();
424  CAFFE_ENFORCE_EQ(numTensors, InputSize() - 1);
425  CAFFE_ENFORCE_EQ(numTensors, OutputSize());
426  for (int i = 0; i < numTensors; ++i) {
427  const auto& input = Input(i + 1);
428  outputDims[i] = input.dims();
429  outputDims[i][0] = 0;
430  metas[i] = &input.meta();
431  }
432  }
433 
434  std::vector<std::string> fields_;
435 };
436 
437 class ReadNextBatchOp : public Operator<CPUContext> {
438  public:
439  ReadNextBatchOp(const OperatorDef& operator_def, Workspace* ws)
440  : Operator(operator_def, ws),
441  batchSize_(OperatorBase::GetSingleArgument<int>("batch_size", 1)),
442  enforceBatchSize_(OperatorBase::GetSingleArgument<bool>(
443  "enforce_batch_size",
444  false)) {}
445 
446  bool RunOnDevice() override {
447  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
448  CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1);
449  std::vector<const TLength*> lengths;
450  std::vector<TOffset> limits;
451  std::vector<TOffset> sizes;
452  std::vector<TOffset> offsets;
453  TLength lenZero = 0;
454  sizes.resize(cursor->it.numOffsetFields());
455  // gather length data
456  lengths.resize(cursor->it.numLengthFields());
457  for (int i = 0; i < lengths.size(); ++i) {
458  auto& a = Input(cursor->it.lengthField(i).id + 1);
459  if (a.size() > 0) {
460  lengths[i] = a.data<int>();
461  } else {
462  lengths[i] = &lenZero;
463  }
464  }
465  // gather size limits
466  limits.assign(sizes.size(), std::numeric_limits<TOffset>::max());
467  for (int i = 0; i < cursor->it.fields().size(); ++i) {
468  int lengthFieldIdx = cursor->it.fields()[i].lengthFieldId + 1;
469  limits[lengthFieldIdx] =
470  std::min(limits[lengthFieldIdx], (TOffset)Input(i + 1).dims()[0]);
471  }
472  // advance cursor
473  {
474  std::lock_guard<std::mutex> lock(cursor->mutex_);
475  if (cursor->offsets.empty()) {
476  cursor->offsets.assign(sizes.size(), 0);
477  }
478  offsets = cursor->offsets;
479  cursor->it.advance(lengths, cursor->offsets, sizes, limits, batchSize_);
480  if (enforceBatchSize_ && sizes[0] < batchSize_) {
481  // if we enforce batch_size but don't have enough rows left to
482  // complete a full batch, return empty for all columns.
483  // This signals end of dataset to the caller.
484  sizes.assign(sizes.size(), 0);
485  }
486  }
487  // gather data
488  std::vector<TIndex> outDim;
489  for (int i = 0; i < cursor->it.fields().size(); ++i) {
490  auto lengthIdx = cursor->it.fields()[i].lengthFieldId + 1;
491  auto size = sizes[lengthIdx];
492  auto offset = offsets[lengthIdx];
493  auto& in = Input(i + 1);
494  auto innerSize = in.size_from_dim(1);
495  outDim = in.dims();
496  outDim[0] = size;
497  auto* out = Output(i);
498  out->Resize(outDim);
499  void* src =
500  (char*)in.raw_data() + offset * innerSize * in.meta().itemsize();
501  void* dst = out->raw_mutable_data(in.meta()); // create the tensor
502  if (out->size() == 0) {
503  continue;
504  }
505  context_.template CopyItems<CPUContext, CPUContext>(
506  in.meta(), out->size(), src, dst);
507  }
508  return true;
509  }
510  int batchSize_;
511  bool enforceBatchSize_;
512 };
513 
514 class ComputeOffsetOp : public Operator<CPUContext> {
515  public:
516  ComputeOffsetOp(const OperatorDef& operator_def, Workspace* ws)
517  : Operator(operator_def, ws) {}
518 
519  bool RunOnDevice() override {
520  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
521  CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1);
522  auto* out = Output(0);
523  std::vector<const TLength*> lengths;
524  std::vector<TOffset> limits;
525  std::vector<TOffset> sizes;
526  std::vector<TOffset> offsets;
527  TLength lenZero = 0;
528  sizes.resize(cursor->it.numOffsetFields());
529  // gather length data
530  lengths.resize(cursor->it.numLengthFields());
531  for (int i = 0; i < lengths.size(); ++i) {
532  auto& a = Input(cursor->it.lengthField(i).id + 1);
533  if (a.size() > 0) {
534  lengths[i] = a.data<int>();
535  } else {
536  lengths[i] = &lenZero;
537  }
538  }
539  // gather size limits
540  limits.assign(sizes.size(), std::numeric_limits<TOffset>::max());
541  for (int i = 0; i < cursor->it.fields().size(); ++i) {
542  int lengthFieldIdx = cursor->it.fields()[i].lengthFieldId + 1;
543  limits[lengthFieldIdx] =
544  std::min(limits[lengthFieldIdx], (TOffset)Input(i + 1).dims()[0]);
545  }
546  out->Resize(limits.at(0) + 1, sizes.size());
547  auto* out_data = out->mutable_data<int64_t>();
548  for (int k = 0; k <= limits.at(0); k++) {
549  // advance cursor
550  if (cursor->offsets.empty()) {
551  cursor->offsets.assign(sizes.size(), 0);
552  }
553  // write output
554  std::copy(cursor->offsets.begin(), cursor->offsets.end(), out_data);
555  out_data += sizes.size();
556  cursor->it.advance(lengths, cursor->offsets, sizes, limits, 1);
557  }
558  cursor->offsets.assign(sizes.size(), 0); // reSet after getting meta info
559  return true;
560  }
561 };
562 
563 class SortAndShuffleOp : public Operator<CPUContext> {
564  public:
565  SortAndShuffleOp(const OperatorDef& operator_def, Workspace* ws)
566  : Operator(operator_def, ws),
567  sort_by_field_idx_(
568  OperatorBase::GetSingleArgument<int>("sort_by_field_idx", 1)),
569  batch_size_(OperatorBase::GetSingleArgument<int>("batch_size", 1)),
570  shuffle_size_(OperatorBase::GetSingleArgument<int>("shuffle_size", 1)) {
571  }
572 
573  bool RunOnDevice() override {
574  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
575  CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1);
576  CAFFE_ENFORCE(-1 <= sort_by_field_idx_);
577  CAFFE_ENFORCE(cursor->it.fields().size() - sort_by_field_idx_ > 0);
578  int size;
579  if (sort_by_field_idx_ != -1) {
580  size = Input(sort_by_field_idx_ + 1).dims()[0];
581  } else {
582  size = Input(1).dims()[0];
583  }
584 
585  CAFFE_ENFORCE(
586  batch_size_ > 0 && shuffle_size_ > 0 &&
587  0 < batch_size_ * shuffle_size_);
588  // adjust shuffle_size_ if it is too large
589  if (batch_size_ * shuffle_size_ > size) {
590  shuffle_size_ = size / batch_size_;
591  }
592 
593  int num_batch = size / batch_size_;
594  auto* out = Output(0);
595  out->Resize(size);
596  auto* out_data = out->mutable_data<int64_t>();
597 
598  vector<int> shuffle_idx(size);
599  iota(shuffle_idx.begin(), shuffle_idx.end(), 0);
600 
601  if (sort_by_field_idx_ != -1) {
602  auto& sortblob = Input(sort_by_field_idx_ + 1);
603  auto* sortdata = sortblob.data<int>();
604  // must sort by a field at the root level
605  CAFFE_ENFORCE(
606  cursor->it.fields()[sort_by_field_idx_].lengthFieldId == -1);
607  sort(shuffle_idx.begin(), shuffle_idx.end(), [&sortdata](int i1, int i2) {
608  return sortdata[i1] < sortdata[i2];
609  });
610  }
611 
612  if (batch_size_ * shuffle_size_ > 1) {
613  int offset = 0;
614  while (offset + batch_size_ * shuffle_size_ < size) {
615  std::shuffle(
616  shuffle_idx.begin() + offset,
617  shuffle_idx.begin() + offset + batch_size_ * shuffle_size_,
618  std::default_random_engine());
619  offset += batch_size_ * shuffle_size_;
620  }
621  }
622 
623  vector<int> batch_idx(num_batch);
624  iota(batch_idx.begin(), batch_idx.end(), 0);
625  std::shuffle(
626  batch_idx.begin(), batch_idx.end(), std::default_random_engine());
627 
628  for (int i = 0; i < num_batch; i++) {
629  std::copy(
630  shuffle_idx.begin() + batch_idx[i] * batch_size_,
631  shuffle_idx.begin() + (batch_idx[i] + 1) * batch_size_,
632  out_data);
633  out_data += batch_size_;
634  }
635  std::copy(
636  shuffle_idx.begin() + num_batch * batch_size_,
637  shuffle_idx.end(),
638  out_data);
639 
640  return true;
641  }
642 
643  int sort_by_field_idx_;
644  int batch_size_;
645  int shuffle_size_;
646 };
647 
648 class ReadRandomBatchOp : public Operator<CPUContext> {
649  public:
650  ReadRandomBatchOp(const OperatorDef& operator_def, Workspace* ws)
651  : Operator(operator_def, ws),
652  batchSize_(OperatorBase::GetSingleArgument<int>("batch_size", 1)),
653  enforceBatchSize_(
654  OperatorBase::GetSingleArgument<bool>("enforce_batch_size", false)),
655  loopOver_(OperatorBase::GetSingleArgument<bool>("loop_over", false)) {}
656  bool RunOnDevice() override {
657  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
658  auto& idxblob = Input(1);
659  auto& offsetsmat = Input(2);
660  CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 3);
661  auto idxvec = idxblob.template data<int64_t>();
662  auto& offsetdim = offsetsmat.dims();
663  // gather data
664  std::vector<TIndex> outDim;
665  int64_t idx;
666  {
667  std::lock_guard<std::mutex> lock(cursor->mutex_);
668  cursor->offsets.resize(1);
669  idx = cursor->offsets.at(0);
670  // if we want to enforce batch size but we dont have a complete
671  // batch, skip the last rows.
672  if (enforceBatchSize_ && idx + batchSize_ > idxblob.size()) {
673  idx = idxblob.size();
674  }
675  if (loopOver_ && idx >= idxblob.size()) {
676  cursor->offsets.at(0) = 0;
677  idx = 0;
678  }
679  cursor->offsets.at(0) += batchSize_;
680  }
681 
682  for (int i = 0; i < cursor->it.fields().size(); ++i) {
683  auto lengthIdx = cursor->it.fields()[i].lengthFieldId + 1;
684  auto& in = Input(i + 3);
685  outDim = in.dims();
686  outDim.at(0) = 0;
687  auto idxbegin = idx;
688  for (int j = 0; j < batchSize_; ++j) {
689  if (idx >= idxblob.size()) {
690  break;
691  }
692  CAFFE_ENFORCE(
693  (idxvec[idx] + 1) * offsetdim[1] + lengthIdx < offsetsmat.size(),
694  "Out of bound when trying to get elem from offsetsmat");
695  auto offsetptr = offsetsmat.template data<TOffset>() +
696  idxvec[idx] * offsetdim[1] + lengthIdx;
697  auto offset = *offsetptr;
698  auto size = *(offsetptr + offsetdim[1]) - offset;
699  outDim.at(0) += size; // accumulate over the batch
700  idx++;
701  }
702  idx = idxbegin; // reSet
703  auto* out = Output(i);
704  out->Resize(outDim);
705  if (out->size() == 0) {
706  continue;
707  }
708  auto dst = static_cast<char*>(out->raw_mutable_data(in.meta()));
709  int block_size = in.size() / in.dim(0);
710  auto block_bytesize = in.size_from_dim(1) * in.meta().itemsize();
711  CAFFE_ENFORCE(
712  block_bytesize == in.nbytes() / in.dim(0),
713  "block_bytesize should be consistent with data dim");
714  auto src_base = static_cast<const char*>(in.raw_data());
715  int start = 0;
716  for (int j = 0; j < batchSize_; ++j) {
717  if (idx >= idxblob.size()) {
718  break;
719  }
720  auto offsetptr = offsetsmat.template data<TOffset>() +
721  idxvec[idx] * offsetdim[1] + lengthIdx;
722  auto offset = *offsetptr;
723  auto size = *(offsetptr + offsetdim[1]) - offset;
724  // copy data
725  auto src = src_base + offset * block_bytesize;
726  context_.template CopyItems<CPUContext, CPUContext>(
727  in.meta(), size * block_size, src, dst + start * block_bytesize);
728  start += size;
729  idx++;
730  }
731  idx = idxbegin; // reSet
732  }
733  return true;
734  }
735  int batchSize_;
736  bool enforceBatchSize_;
737  bool loopOver_;
738 };
739 
740 template <class Context>
741 class AppendOp final : public Operator<Context> {
742  public:
743  USE_OPERATOR_CONTEXT_FUNCTIONS;
744  AppendOp(const OperatorDef& operator_def, Workspace* ws)
745  : Operator<Context>(operator_def, ws) {}
746 
747  bool RunOnDevice() override {
748  auto& a = Input(0);
749  auto& b = Input(1);
750  auto* c = Output(0);
751  CAFFE_ENFORCE(b.ndim() >= 1);
752  if (a.size() == 0 && a.dim(0) == 0) {
753  c->CopyFrom(b);
754  return true;
755  }
756  CAFFE_ENFORCE(&a == c, "First argument must be in-place.");
757  CAFFE_ENFORCE(c->ndim() == b.ndim());
758  CAFFE_ENFORCE(b.ndim() == c->ndim());
759  CAFFE_ENFORCE(a.meta() == b.meta());
760  for (int i = 1; i < a.ndim(); ++i) {
761  CAFFE_ENFORCE(a.dims()[i] == b.dims()[i]);
762  }
763  auto oldSize = c->size();
764  c->Extend(b.dims()[0], kDatasetGrowthPct, &context_);
765  auto* dst = (char*)c->raw_mutable_data() + oldSize * b.meta().itemsize();
766  context_.template CopyItems<Context, Context>(
767  b.meta(), b.size(), b.raw_data(), dst);
768  return true;
769  }
770 };
771 
772 template <class Context>
773 class AtomicAppendOp final : public Operator<Context> {
774  public:
775  USE_OPERATOR_CONTEXT_FUNCTIONS;
776  AtomicAppendOp(const OperatorDef& operator_def, Workspace* ws)
777  : Operator<Context>(operator_def, ws) {}
778 
779  bool RunOnDevice() override {
780  auto& mutex = OperatorBase::Input<std::unique_ptr<std::mutex>>(0);
781  const auto numFields = (InputSize() - 1) / 2;
782  CAFFE_ENFORCE(OutputSize() == numFields);
783 
784  std::lock_guard<std::mutex> guard(*mutex);
785 
786  // 1: checks
787  for (int i = 0; i < numFields; ++i) {
788  auto& a = Input(1 + i);
789  auto& b = Input(1 + i + numFields);
790  auto* c = Output(i);
791  CAFFE_ENFORCE(b.ndim() >= 1);
792  if (a.size() == 0) {
793  continue;
794  }
795  CAFFE_ENFORCE(
796  (void*)&a == (void*)c, "Appended-to arguments must be in-place.");
797  CAFFE_ENFORCE(c->ndim() == b.ndim());
798  CAFFE_ENFORCE(b.ndim() == c->ndim());
799  CAFFE_ENFORCE(a.meta() == b.meta());
800  for (int j = 1; j < a.ndim(); ++j) {
801  CAFFE_ENFORCE(a.dims()[j] == b.dims()[j]);
802  }
803  }
804 
805  // 2: copies
806  for (int i = 0; i < numFields; ++i) {
807  auto& a = Input(1 + i);
808  auto& b = Input(1 + i + numFields);
809  auto* c = Output(i);
810  if (a.size() == 0 && a.dim(0) == 0) {
811  c->CopyFrom(b);
812  continue;
813  }
814  auto oldSize = c->size();
815  c->Extend(b.dims()[0], kDatasetGrowthPct, &context_);
816  auto* dst = (char*)c->raw_mutable_data() + oldSize * b.meta().itemsize();
817  context_.template CopyItems<Context, Context>(
818  b.meta(), b.size(), b.raw_data(), dst);
819  }
820  return true;
821  }
822 };
823 
824 template <class Context>
825 class CreateTensorVectorOp final : public Operator<Context> {
826  public:
827  USE_OPERATOR_CONTEXT_FUNCTIONS;
828  using Operator<Context>::Operator;
829 
830  bool RunOnDevice() override {
831  auto ptr = make_unique<std::vector<Tensor<Context>>>();
832  *OperatorBase::Output<TensorVectorPtr<Context>>(TENSOR_VECTOR) =
833  std::move(ptr);
834  return true;
835  }
836 
837  private:
838  OUTPUT_TAGS(TENSOR_VECTOR);
839 };
840 
841 template <class Context>
842 class TensorVectorSizeOp final : public Operator<Context> {
843  public:
844  USE_OPERATOR_CONTEXT_FUNCTIONS;
845  USE_SIMPLE_CTOR_DTOR(TensorVectorSizeOp);
846 
847  bool RunOnDevice() override {
848  auto& vector_ptr =
849  OperatorBase::Input<TensorVectorPtr<Context>>(TENSOR_VECTOR);
850  auto* size = Output(SIZE);
851  size->Resize();
852  // 32-bit should be enough here
853  *size->template mutable_data<int32_t>() = vector_ptr->size();
854  return true;
855  }
856 
857  private:
858  INPUT_TAGS(TENSOR_VECTOR);
859  OUTPUT_TAGS(SIZE);
860 };
861 
862 template <class Context>
863 class ConcatTensorVectorOp final : public Operator<Context> {
864  public:
865  USE_OPERATOR_CONTEXT_FUNCTIONS;
866  using Operator<Context>::Operator;
867 
868  bool RunOnDevice() override {
869  const TensorVectorPtr<Context>& tensorVector =
870  OperatorBase::Input<TensorVectorPtr<Context>>(TENSOR_VECTOR);
871 
872  auto* tensor = Output(TENSOR);
873  CAFFE_ENFORCE(!tensorVector->empty());
874 
875  vector<TIndex> outputDims(tensorVector->at(0).dims());
876  CAFFE_ENFORCE(outputDims.size() > 0);
877  for (int i = 1; i < tensorVector->size(); i++) {
878  // the tensor shapes are the same except for the first dimension
879  for (int j = 1; j < tensorVector->at(i).ndim(); j++) {
880  CAFFE_ENFORCE(outputDims[j] == tensorVector->at(i).dims()[j]);
881  }
882  CAFFE_ENFORCE(tensorVector->at(0).meta() == tensorVector->at(i).meta());
883  outputDims[0] += tensorVector->at(i).dims()[0];
884  }
885 
886  tensor->Resize(outputDims);
887  TIndex offset = 0;
888  auto* dst = (char*)tensor->raw_mutable_data(tensorVector->at(0).meta());
889 
890  for (const auto& t : *tensorVector) {
891  context_.template CopyItems<Context, Context>(
892  t.meta(), t.size(), t.raw_data(), dst + offset);
893  offset += t.nbytes();
894  }
895 
896  return true;
897  }
898 
899  private:
900  INPUT_TAGS(TENSOR_VECTOR);
901  OUTPUT_TAGS(TENSOR);
902 };
903 
904 template <class Context>
905 class CollectTensorOp final : public Operator<Context> {
906  public:
907  USE_OPERATOR_CONTEXT_FUNCTIONS;
908  CollectTensorOp(const OperatorDef& operator_def, Workspace* ws)
909  : Operator<Context>(operator_def, ws),
910  numToCollect_(
911  OperatorBase::GetSingleArgument<int>("num_to_collect", -1)),
912  numVisited_(0) {
913  CAFFE_ENFORCE(numToCollect_ > 0);
914  }
915 
916  bool RunOnDevice() override {
917  int pos = -1;
918  if (numVisited_ < numToCollect_) {
919  // append
920  pos = numVisited_;
921  } else {
922  auto& gen = context_.RandGenerator();
923  // uniform between [0, numVisited_]
924  std::uniform_int_distribution<int> uniformDist(0, numVisited_);
925  pos = uniformDist(gen);
926  if (pos >= numToCollect_) {
927  // discard
928  pos = -1;
929  }
930  }
931 
932  for (int i = 0; i < OutputSize(); ++i) {
933  // TENSOR_VECTOR_IN is enforced inplace with TENSOR_VECTOR_OUT
934  TensorVectorPtr<Context>& tensorVector =
935  *OperatorBase::Output<TensorVectorPtr<Context>>(i);
936 
937  if (numVisited_ >= numToCollect_) {
938  CAFFE_ENFORCE(
939  tensorVector->size() == numToCollect_,
940  "TensorVecotor size = ",
941  tensorVector->size(),
942  " is different from numToCollect = ",
943  numToCollect_);
944  }
945 
946  const auto& tensor = Input(OutputSize() + i);
947 
948  if (pos < 0) {
949  // discard
950  CAFFE_ENFORCE(numVisited_ >= numToCollect_);
951  } else if (pos >= tensorVector->size()) {
952  // append
953  tensorVector->push_back(Tensor<Context>());
954  tensorVector->back().template CopyFrom<Context, Context>(
955  tensor, &context_);
956  } else {
957  // replace
958  tensorVector->at(pos).template CopyFrom<Context, Context>(
959  tensor, &context_);
960  }
961  }
962 
963  numVisited_++;
964  return true;
965  }
966 
967  private:
968  // number of tensors to collect
969  int numToCollect_;
970  // number of tensors visited
971  int numVisited_;
972 };
973 
974 class TrimDatasetOp : public Operator<CPUContext> {
975  public:
976  TrimDatasetOp(const OperatorDef& operator_def, Workspace* ws)
977  : Operator(operator_def, ws),
978  iterator_(OperatorBase::GetRepeatedArgument<std::string>("fields")),
979  multiple_of_(OperatorBase::GetSingleArgument<int>("multiple_of", 1)) {
980  CAFFE_ENFORCE_GE(multiple_of_, 1);
981  }
982 
983  bool RunOnDevice() override {
984  TreeCursor cursor(iterator_);
985  TreeWalker walker(Inputs(), cursor);
986 
987  int trimmedSize = (walker.size() / multiple_of_) * multiple_of_;
988  if (trimmedSize == walker.size()) {
989  // we already satisfy the condition
990  return true;
991  }
992  // advance desired number of records
993  for (int i = 0; i < trimmedSize; ++i) {
994  walker.advance();
995  }
996  // trim each column to the offset
997  for (int col = 0; col < walker.fields().size(); ++col) {
998  auto newOuterSize = walker.fields().at(col).offset();
999  Output(col)->Shrink(newOuterSize);
1000  }
1001  return true;
1002  }
1003 
1004  private:
1005  TreeIterator iterator_;
1006  int multiple_of_;
1007 };
1008 
1009 REGISTER_CPU_OPERATOR(CreateTreeCursor, CreateTreeCursorOp);
1010 REGISTER_CPU_OPERATOR(ResetCursor, ResetCursorOp);
1011 REGISTER_CPU_OPERATOR(ReadNextBatch, ReadNextBatchOp);
1012 REGISTER_CPU_OPERATOR(ComputeOffset, ComputeOffsetOp);
1013 REGISTER_CPU_OPERATOR(SortAndShuffle, SortAndShuffleOp);
1014 REGISTER_CPU_OPERATOR(ReadRandomBatch, ReadRandomBatchOp);
1015 REGISTER_CPU_OPERATOR(CheckDatasetConsistency, CheckDatasetConsistencyOp);
1016 REGISTER_CPU_OPERATOR(Append, AppendOp<CPUContext>);
1017 REGISTER_CPU_OPERATOR(AtomicAppend, AtomicAppendOp<CPUContext>);
1018 REGISTER_CPU_OPERATOR(CreateTensorVector, CreateTensorVectorOp<CPUContext>);
1019 REGISTER_CPU_OPERATOR(TensorVectorSize, TensorVectorSizeOp<CPUContext>);
1020 REGISTER_CPU_OPERATOR(ConcatTensorVector, ConcatTensorVectorOp<CPUContext>);
1021 REGISTER_CPU_OPERATOR(CollectTensor, CollectTensorOp<CPUContext>);
1022 REGISTER_CPU_OPERATOR(PackRecords, PackRecordsOp);
1023 REGISTER_CPU_OPERATOR(UnPackRecords, UnPackRecordsOp);
1024 REGISTER_CPU_OPERATOR(TrimDataset, TrimDatasetOp);
1025 
1026 OPERATOR_SCHEMA(CreateTreeCursor)
1027  .NumInputs(0)
1028  .NumOutputs(1)
1029  .SetDoc(R"DOC(
1030 Creates a cursor to iterate through a list of tensors, where some of those
1031 tensors contains the lengths in a nested schema. The schema is determined by
1032 the `fields` arguments.
1033 
1034 For example, to represent the following schema:
1035 
1036  Struct(
1037  a=Int(),
1038  b=List(List(Int),
1039  c=List(
1040  Struct(
1041  c1=String,
1042  c2=List(Int),
1043  ),
1044  ),
1045  )
1046 
1047 the field list will be:
1048  [
1049  "a",
1050  "b:lengths",
1051  "b:values:lengths",
1052  "b:values:values",
1053  "c:lengths",
1054  "c:c1",
1055  "c:c2:lengths",
1056  "c:c2:values",
1057  ]
1058 
1059 And for the following instance of the struct:
1060 
1061  Struct(
1062  a=3,
1063  b=[[4, 5], [6, 7, 8], [], [9]],
1064  c=[
1065  Struct(c1='alex', c2=[10, 11]),
1066  Struct(c1='bob', c2=[12]),
1067  ],
1068  )
1069 
1070 The values of the fields will be:
1071  {
1072  "a": [3],
1073  "b:lengths": [4],
1074  "b:values:lengths": [2, 3, 0, 1],
1075  "b:values:values": [4, 5, 6, 7, 8, 9],
1076  "c:lengths": [2],
1077  "c:c1": ["alex", "bob"],
1078  "c:c2:lengths": [2, 1],
1079  "c:c2:values", [10, 11, 12],
1080  }
1081 
1082 In general, every field name in the format "{prefix}:lengths" defines a domain
1083 "{prefix}", and every subsequent field in the format "{prefix}:{field}" will
1084 be in that domain, and the length of the domain is provided for each entry of
1085 the parent domain. In the example, "b:lengths" defines a domain of length 4, so
1086 every field under domain "b" will have 4 entries.
1087 The "lengths" field for a given domain must appear before any reference to
1088 that domain.
1089 
1090 Returns a pointer to an instance of the Cursor, which keeps the current offset
1091 on each of the domains defined by `fields`. Cursor also ensures thread-safety
1092 such that ReadNextBatch and ResetCursor can be used safely in parallel.
1093 
1094 A cursor does not contain data per se, so calls to ReadNextBatch actually need
1095 to pass a list of blobs containing the data to read for each one of the fields.
1096 )DOC")
1097  .Output(0, "cursor", "A blob pointing to an instance of a new TreeCursor.")
1098  .Arg(
1099  "fields",
1100  "A list of strings each one representing a field of the dataset.");
1101 
1102 OPERATOR_SCHEMA(ResetCursor)
1103  .NumInputs(1)
1104  .NumOutputs(0)
1105  .SetDoc(R"DOC(
1106 Resets the offsets for the given TreeCursor. This operation is thread safe.
1107 )DOC")
1108  .Input(0, "cursor", "A blob containing a pointer to the cursor.");
1109 
1110 OPERATOR_SCHEMA(ReadNextBatch)
1111  .NumInputs(1, INT_MAX)
1112  .NumOutputs(1, INT_MAX)
1113  .SetDoc(R"DOC(
1114 Read the next batch of examples out of the given cursor and data blobs.
1115 
1116 Input(0) is a blob pointing to a TreeCursor, and
1117 [Input(1),... Input(num_fields)] a list of tensors containing the data for
1118 each field of the dataset.
1119 
1120 ReadNextBatch is thread safe.
1121 )DOC")
1122  .Input(0, "cursor", "A blob containing a pointer to the cursor.")
1123  .Input(1, "dataset_field_0", "First dataset field")
1124  .Output(0, "field_0", "Tensor containing the next batch for field 0.")
1125  .Arg("batch_size", "Number of top-level entries to read.");
1126 
1127 OPERATOR_SCHEMA(ComputeOffset)
1128  .NumInputs(1, INT_MAX)
1129  .NumOutputs(1)
1130  .SetDoc(R"DOC(
1131 Compute the offsets matrix given cursor and data blobs. Need to be ran at
1132 beginning or after reseting cursor
1133 
1134 Input(0) is a blob pointing to a TreeCursor, and
1135 [Input(1),... Input(num_fields)] a list of tensors containing the data for
1136 each field of the dataset.
1137 
1138 ComputeOffset is thread safe.
1139 )DOC")
1140  .Input(0, "cursor", "A blob containing a pointer to the cursor.")
1141  .Input(1, "dataset_field_0", "First dataset field")
1142  .Output(0, "field_0", "Tensor containing offset info for this chunk.");
1143 
1144 OPERATOR_SCHEMA(SortAndShuffle)
1145  .NumInputs(1, INT_MAX)
1146  .NumOutputs(1)
1147  .SetDoc(R"DOC(
1148 Compute the sorted indices given a field index to sort by and break the sorted
1149 indices into chunks of shuffle_size * batch_size and shuffle each chunk,
1150 finally we shuffle between batches. If sort_by_field_idx is -1 we skip sort.
1151 
1152 For example, we have data sorted as
1153 1,2,3,4,5,6,7,8,9,10,11,12
1154 
1155 and batchSize = 2 and shuffleSize = 3, when we shuffle we get:
1156 [3,1,4,6,5,2] [12,10,11,8,9,7]
1157 
1158 After this we will shuffle among different batches with size 2
1159 [3,1],[4,6],[5,2],[12,10],[11,8],[9,7]
1160 
1161 We may end up with something like
1162 [9,7],[5,2],[12,10],[4,6],[3,1],[11,8]
1163 
1164 Input(0) is a blob pointing to a TreeCursor, and
1165 [Input(1),... Input(num_fields)] a list of tensors containing the data for
1166 each field of the dataset.
1167 
1168 SortAndShuffle is thread safe.
1169 )DOC")
1170  .Input(0, "cursor", "A blob containing a pointer to the cursor.")
1171  .Input(1, "dataset_field_0", "First dataset field")
1172  .Output(0, "indices", "Tensor containing sorted indices.");
1173 
1174 OPERATOR_SCHEMA(ReadRandomBatch)
1175  .NumInputs(1, INT_MAX)
1176  .NumOutputs(1, INT_MAX)
1177  .SetDoc(R"DOC(
1178 Read the next batch of examples out of the given cursor,
1179 idx blob, offset matrix and data blobs.
1180 
1181 Input(0) is a blob pointing to a TreeCursor,
1182 Input(1) is a blob pointing to the shuffled idx
1183 Input(2) is a blob pointing to the offset matrix and
1184 [Input(3),... Input(num_fields)] a list of tensors containing the data for
1185 each field of the dataset.
1186 
1187 ReadRandomBatch is thread safe.
1188 )DOC")
1189  .Input(0, "cursor", "A blob containing a pointer to the cursor.")
1190  .Input(1, "idx", "idx with a shuffled order.")
1191  .Input(2, "offsetsmat", "offset matrix containing length offset info.")
1192  .Input(3, "dataset_field_0", "First dataset field")
1193  .Output(0, "field_0", "Tensor containing the next batch for field 0.")
1194  .Arg("batch_size", "Number of top-level entries to read.")
1195  .Arg("loop_over", "(bool) Repeat the dataset indefinitely");
1196 
1197 OPERATOR_SCHEMA(CheckDatasetConsistency)
1198  .NumInputs(1, INT_MAX)
1199  .NumOutputs(0)
1200  .SetDoc(R"DOC(
1201 Checks that the given data fields represents a consistent dataset under
1202 the schema specified by the `fields` argument. Operator fails if the fields
1203 are not consistent. If data is consistent, each field's data can be safely
1204 appended to an existing dataset, keeping it consistent.
1205 )DOC")
1206  .Input(0, "field_0", "Data for field 0.")
1207  .Arg(
1208  "fields",
1209  "List of strings representing the string names in the format"
1210  "specified in the doc for CreateTreeCursor.");
1211 
1212 OPERATOR_SCHEMA(Append)
1213  .NumInputs(2)
1214  .NumOutputs(1)
1215  .EnforceInplace({{0, 0}})
1216  .SetDoc(R"DOC(
1217 Append input 2 to the end of input 1.
1218 Input 1 must be the same as output, that is, it is required to be in-place.
1219 Input 1 may have to be re-allocated in order for accommodate to the new size.
1220 Currently, an exponential growth ratio is used in order to ensure amortized
1221 constant time complexity.
1222 All except the outer-most dimension must be the same between input 1 and 2.
1223 )DOC")
1224  .Input(0, "dataset", "The tensor to be appended to.")
1225  .Input(1, "new_data", "Tensor to append to the end of dataset.")
1226  .Output(0, "dataset", "Same as input 0, representing the mutated tensor.");
1227 
1228 OPERATOR_SCHEMA(AtomicAppend)
1229  .NumInputs(3, INT_MAX)
1230  .NumOutputs(1, INT_MAX)
1231  .AllowInplace([](int in, int out) { return in == out + 1; });
1232 
1233 OPERATOR_SCHEMA(CreateTensorVector)
1234  .NumInputs(0)
1235  .NumOutputs(1)
1236  .SetDoc("Create a std::unique_ptr<std::vector<Tensor> >");
1237 
1238 OPERATOR_SCHEMA(TensorVectorSize)
1239  .NumInputs(1)
1240  .NumOutputs(1)
1241  .SetDoc("Get the size of the input vector")
1242  .Input(0, "tensor vector", "std::unique_ptr<std::vector<Tensor> >")
1243  .Output(0, "size", "int32_t size");
1244 
1245 OPERATOR_SCHEMA(ConcatTensorVector)
1246  .NumInputs(1)
1247  .NumOutputs(1)
1248  .SetDoc(R"DOC(
1249 Concat Tensors in the std::unique_ptr<std::vector<Tensor> >
1250 along the first dimension.
1251  )DOC")
1252  .Input(0, "vector of Tensor", "std::unique_ptr<std::vector<Tensor> >")
1253  .Output(0, "tensor", "tensor after concatenating");
1254 
1255 OPERATOR_SCHEMA(CollectTensor)
1256  .NumInputs([](int n) { return n > 0 && n % 2 == 0; })
1257  .NumOutputs(1, INT_MAX)
1258  .NumInputsOutputs([](int in, int out) { return in == out * 2; })
1259  .EnforceInplace([](int in, int out) { return in == out; })
1260  .SetDoc(R"DOC(
1261 Collect tensor into tensor vector by reservoir sampling,
1262 argument num_to_collect indicates the max number of tensors that will be
1263 collected. The first half of the inputs are tensor vectors, which are also the
1264 outputs. The second half of the inputs are the tensors to be collected into each
1265 vector (in the same order). The input tensors are collected in all-or-none
1266 manner. If they are collected, they will be placed at the same index in the
1267 output vectors.
1268 )DOC")
1269  .Arg("num_to_collect", "The max number of tensors to collect");
1270 
1271 OPERATOR_SCHEMA(PackRecords)
1272  .NumInputs(1, INT_MAX)
1273  .NumOutputs(1)
1274  .SetDoc(R"DOC(
1275 Given a dataset under a schema specified by the `fields` argument will pack all
1276 the input tensors into one, where each tensor element represents a row of data
1277 (batch of size 1). This format allows easier use with the rest of Caffe2
1278 operators.
1279 )DOC")
1280  .Arg(
1281  "fields",
1282  "List of strings representing the string names in the format"
1283  "specified in the doc for CreateTreeCursor.")
1284  .Output(
1285  0,
1286  "tensor",
1287  "One dimensional tensor having a complex type of SharedTensorVectorPtr."
1288  " In order to reverse it back to the original input it has to be "
1289  "inserted into UnPackRecordsOp.");
1290 
1291 OPERATOR_SCHEMA(TrimDataset)
1292  .NumInputs(1, INT_MAX)
1293  .NumOutputs(1, INT_MAX)
1294  .SetDoc(R"DOC(
1295 Trim the given dataset inplace, given the dataset blobs and the field specs.
1296 Trimming happens such that the dataset will contain the largest possible number
1297 of records that is a multiple of the 'multiple_of' argument.
1298 )DOC")
1299  .EnforceInplace([](int input, int output) { return input == output; })
1300  .Arg(
1301  "fields",
1302  "List of strings representing the string names in the format"
1303  "specified in the doc for CreateTreeCursor.");
1304 
1305 OPERATOR_SCHEMA(UnPackRecords)
1306  .NumInputs(1, INT_MAX)
1307  .NumOutputs(1, INT_MAX)
1308  .SetDoc(R"DOC(
1309 Given a packed dataset (packed by the PackRecordsOp) and the `fields` argument
1310 describing the datasets schema returns the original dataset format. Number of
1311 returned tensors is equal to the number of fields in the `fields` argument.
1312 
1313 The first input is the packed tensor to be unpacked. Optionally, you can provide
1314 prototype tensors to give the expected shapes of the output tensors. This is
1315 helpful when you expected to unpack empty tensor, e.g., output of a sampling
1316 process.
1317 )DOC")
1318  .Arg(
1319  "fields",
1320  "List of strings representing the string names in the format"
1321  "specified in the doc for CreateTreeCursor.")
1322  .Input(0, "packed_tensor", "The tensor to be unpacked");
1323 
1324 SHOULD_NOT_DO_GRADIENT(CreateTreeCursor);
1325 SHOULD_NOT_DO_GRADIENT(ResetCursor);
1326 SHOULD_NOT_DO_GRADIENT(ReadNextBatch);
1327 SHOULD_NOT_DO_GRADIENT(ComputeOffset);
1328 SHOULD_NOT_DO_GRADIENT(ReadRandomBatch);
1329 SHOULD_NOT_DO_GRADIENT(CheckDatasetConsistency);
1330 SHOULD_NOT_DO_GRADIENT(Append);
1331 SHOULD_NOT_DO_GRADIENT(AtomicAppend);
1332 SHOULD_NOT_DO_GRADIENT(CreateTensorVector);
1333 SHOULD_NOT_DO_GRADIENT(TensorVectorSize);
1334 SHOULD_NOT_DO_GRADIENT(ConcatTensorVector);
1335 SHOULD_NOT_DO_GRADIENT(CollectTensor);
1336 SHOULD_NOT_DO_GRADIENT(UnPackRecords);
1337 SHOULD_NOT_DO_GRADIENT(PackRecords);
1338 
1339 class TreeCursorSerializer : public BlobSerializerBase {
1340  public:
1341  TreeCursorSerializer() {}
1342  ~TreeCursorSerializer() {}
1343 
1344  void Serialize(
1345  const Blob& blob,
1346  const string& name,
1347  SerializationAcceptor acceptor) override {
1348  auto& cursor = blob.template Get<std::unique_ptr<TreeCursor>>();
1349  BlobProto blob_proto;
1350 
1351  // serialize offsets as a tensor
1352  if (cursor->offsets.size() > 0) {
1353  Blob offsets_blob;
1354  auto* offsets = offsets_blob.template GetMutable<Tensor<CPUContext>>();
1355  offsets->Resize(cursor->offsets.size());
1356  std::copy(
1357  cursor->offsets.begin(),
1358  cursor->offsets.end(),
1359  offsets->mutable_data<TOffset>());
1360  TensorSerializer<CPUContext> ser;
1361  ser.Serialize(
1362  *offsets, name, blob_proto.mutable_tensor(), 0, offsets->size());
1363  }
1364  blob_proto.set_name(name);
1365  blob_proto.set_type("std::unique_ptr<TreeCursor>");
1366 
1367  // serialize field names in the content
1368  std::ostringstream os;
1369  for (const auto& field : cursor->it.fields()) {
1370  os << field.name << " ";
1371  }
1372  blob_proto.set_content(os.str());
1373 
1374  acceptor(name, blob_proto.SerializeAsString());
1375  }
1376 };
1377 
1378 class TreeCursorDeserializer : public BlobDeserializerBase {
1379  public:
1380  void Deserialize(const BlobProto& proto, Blob* blob) override {
1381  // deserialize the offsets
1382  TensorDeserializer<CPUContext> deser;
1383  Blob offset_blob;
1384  deser.Deserialize(proto, &offset_blob);
1385  auto& offsets = offset_blob.template Get<Tensor<CPUContext>>();
1386  auto* offsets_ptr = offsets.data<TOffset>();
1387 
1388  // deserialize the field names
1389  std::vector<std::string> fieldNames;
1390  std::istringstream is(proto.content());
1391  std::string field;
1392  while (true) {
1393  is >> field;
1394  if (is.eof()) {
1395  break;
1396  }
1397  fieldNames.push_back(field);
1398  }
1399  TreeIterator it(fieldNames);
1400 
1401  auto* base = blob->template GetMutable<std::unique_ptr<TreeCursor>>();
1402  (*base).reset(new TreeCursor(it));
1403  (*base)->offsets.assign(offsets_ptr, offsets_ptr + offsets.size());
1404  }
1405 };
1406 
1407 REGISTER_BLOB_SERIALIZER(
1408  (TypeMeta::Id<std::unique_ptr<TreeCursor>>()),
1409  TreeCursorSerializer);
1410 REGISTER_BLOB_DESERIALIZER(std::unique_ptr<TreeCursor>, TreeCursorDeserializer);
1411 
1412 } // namespace
1413 
1414 void SharedTensorVectorPtrSerializer::Serialize(
1415  const Blob& blob,
1416  const string& name,
1417  BlobSerializerBase::SerializationAcceptor acceptor) {
1418  /* This is dummy serialize that doesn't save anything. If saving the content
1419  is desired in future use case, you can change this serializer. Note: special
1420  care need to be taken for the parameter initialization of
1421  LastNWindowCollectorOp and ReservoirSamplingOp if this serializer actually
1422  saves the content.
1423  */
1424  CAFFE_ENFORCE(blob.IsType<std::shared_ptr<std::vector<TensorCPU>>>());
1425  BlobProto blob_proto;
1426  blob_proto.set_name(name);
1427  blob_proto.set_type("std::shared_ptr<std::vector<TensorCPU>>");
1428  blob_proto.set_content("");
1429  acceptor(name, blob_proto.SerializeAsString());
1430 };
1431 
1432 void SharedTensorVectorPtrDeserializer::Deserialize(
1433  const BlobProto& /* unused */,
1434  Blob* blob) {
1435  /* This is dummy deserialize which creates a nullptr
1436  */
1437  blob->GetMutable<std::shared_ptr<std::vector<TensorCPU>>>();
1438 }
1439 
1440 REGISTER_BLOB_SERIALIZER(
1441  (TypeMeta::Id<std::shared_ptr<std::vector<TensorCPU>>>()),
1442  SharedTensorVectorPtrSerializer);
1443 
1444 REGISTER_BLOB_DESERIALIZER(
1445  std::shared_ptr<std::vector<TensorCPU>>,
1446  SharedTensorVectorPtrDeserializer);
1447 
1448 } // namespace dataset_ops
1449 } // namespace caffe2
Definition: types.h:72
static CAFFE2_API CaffeTypeId Id()
Returns the unique id for the given type T.
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...