1 #include "caffe2/operators/dataset_ops.h" 7 #include "caffe2/core/blob_serialization.h" 8 #include "caffe2/core/operator.h" 9 #include "caffe2/core/tensor.h" 10 #include "caffe2/utils/string_utils.h" 14 CAFFE_KNOWN_TYPE(std::unique_ptr<dataset_ops::TreeCursor>);
15 CAFFE_KNOWN_TYPE(dataset_ops::TensorVectorPtr<CPUContext>);
16 CAFFE_KNOWN_TYPE(dataset_ops::SharedTensorVectorPtr);
18 namespace dataset_ops {
21 const char kDatasetFieldSeparator =
':';
22 const char* kDatasetLengthField =
"lengths";
25 const int kDatasetGrowthPct = 40;
29 TreeIterator::TreeIterator(
const std::vector<std::string>& fields) {
31 fields_.resize(fields.size());
32 std::vector<std::vector<std::string>> nameParts(fields_.size());
33 for (
int i = 0; i < fields.size(); ++i) {
34 auto& field = fields_.at(i);
35 field.name = fields[i];
37 field.lengthFieldId = -1;
38 nameParts.at(i) = split(kDatasetFieldSeparator, field.name);
42 for (
const auto& field : fields_) {
43 const auto& parts = nameParts.at(field.id);
44 if (!parts.empty() && parts.back() == kDatasetLengthField) {
45 lengthFieldIds_.push_back(field.id);
50 for (
auto& field : fields_) {
52 int maxMatchLevel = 1;
53 int maxMatchLengthFieldId = -1;
54 for (
int j = 0; j < numLengthFields(); ++j) {
55 const auto& lenField = lengthField(j);
57 if (field.id == lenField.id) {
60 auto lf = nameParts.at(lenField.id);
61 auto lfEnd = lf.end() - 1;
63 if (std::mismatch(lf.begin(), lfEnd, nameParts.at(field.id).begin())
67 if (lf.size() > maxMatchLevel) {
68 maxMatchLevel = lf.size();
69 maxMatchLengthFieldId = j;
72 field.lengthFieldId = maxMatchLengthFieldId;
77 for (
const auto& field : fields_) {
78 const auto* lengthField = lengthFieldFor(field);
80 (lengthField ==
nullptr) || (lengthField->id < field.id),
86 "depends on a field defined afterwards: ",
94 void TreeIterator::advance(
95 const std::vector<const TLength*>& lengths,
96 std::vector<TOffset>& offsets,
97 std::vector<TOffset>& sizes,
98 std::vector<TOffset>& limits,
100 std::vector<TOffset> newOffsets;
101 CAFFE_ENFORCE_EQ(lengths.size(), numLengthFields());
102 CAFFE_ENFORCE_EQ(offsets.size(), numOffsetFields());
103 sizes.resize(offsets.size());
104 newOffsets.resize(offsets.size());
107 auto limit = limits[0];
108 auto offset = offsets[0];
109 CAFFE_ENFORCE(limit >= offset,
"Tried to advance past end of cursor.");
110 TOffset total = std::min(limit - offset, num);
112 newOffsets[0] = offset + total;
115 for (
int j = 1; j < numOffsetFields(); ++j) {
117 int parentOffsetId = offsetFieldIdFor(lengthField(j - 1));
118 const TLength* length = lengths[j - 1] + offsets[parentOffsetId];
119 for (
int k = 0; k < sizes[parentOffsetId]; ++k) {
120 total += *(length++);
122 auto offset = offsets[j];
124 offset + total <= limits[j],
125 "Inconsistent field length: ",
126 "tried to advance past the end of field ",
129 newOffsets[j] = offset + total;
131 offsets = newOffsets;
134 TreeWalker::TreeWalker(
const vector<const Blob*>& inputs, TreeCursor& cursor)
135 : inputs_(inputs), cursor_(cursor), sizes_(cursor.it.numOffsetFields()) {
136 CAFFE_ENFORCE_EQ(inputs.size(), cursor.it.fields().size());
137 if (cursor.offsets.empty()) {
138 cursor.offsets.assign(cursor.it.numOffsetFields(), 0);
141 for (
int fieldId = 0; fieldId < cursor_.it.fields().size(); ++fieldId) {
142 fields_.emplace_back(*
this, fieldId);
153 void TreeWalker::advance() {
154 prevOffsets_ = cursor_.offsets;
155 cursor_.it.advance(lengths_, cursor_.offsets, sizes_, limits_, 1);
158 std::vector<TIndex> TreeWalker::fieldDim(
int fieldId)
const {
159 auto tensorDim = input(fieldId).dims();
160 tensorDim[0] = sizes_[lengthIdx(fieldId)];
164 void* TreeWalker::fieldPtr(
int fieldId)
const {
165 auto& in = input(fieldId);
166 return (
char*)in.raw_data() +
167 offset(fieldId) * in.size_from_dim(1) * in.meta().itemsize();
170 void TreeWalker::gatherLengthData() {
171 static const TLength lenZero = 0;
172 lengths_.resize(cursor_.it.numLengthFields());
173 for (
int i = 0; i < lengths_.size(); ++i) {
174 auto& in = input(cursor_.it.lengthField(i).id);
176 lengths_[i] = in.data<
int>();
178 lengths_[i] = &lenZero;
183 void TreeWalker::gatherSizeLimits() {
184 limits_.assign(sizes_.size(), std::numeric_limits<TOffset>::max());
185 for (
auto fieldId = 0; fieldId < cursor_.it.fields().size(); ++fieldId) {
186 auto lengthFieldIdx = lengthIdx(fieldId);
187 limits_[lengthFieldIdx] =
188 std::min(limits_[lengthFieldIdx], (TOffset)input(fieldId).dims()[0]);
194 class CreateTreeCursorOp :
public Operator<CPUContext> {
196 CreateTreeCursorOp(
const OperatorDef& operator_def, Workspace* ws)
197 : Operator(operator_def, ws),
198 fields_(OperatorBase::GetRepeatedArgument<
std::string>(
"fields")) {}
200 bool RunOnDevice()
override {
201 *OperatorBase::Output<std::unique_ptr<TreeCursor>>(0) =
202 std::unique_ptr<TreeCursor>(
new TreeCursor(TreeIterator(fields_)));
207 std::vector<std::string> fields_;
210 class ResetCursorOp :
public Operator<CPUContext> {
212 ResetCursorOp(
const OperatorDef& operator_def, Workspace* ws)
213 : Operator(operator_def, ws) {}
215 bool RunOnDevice()
override {
216 auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
217 std::lock_guard<std::mutex> lock(cursor->mutex_);
218 cursor->offsets.clear();
223 class CheckDatasetConsistencyOp :
public Operator<CPUContext> {
225 CheckDatasetConsistencyOp(
const OperatorDef& operator_def, Workspace* ws)
226 : Operator(operator_def, ws),
227 iterator_(OperatorBase::GetRepeatedArgument<
std::string>(
"fields")) {}
229 bool RunOnDevice()
override {
230 std::vector<const TLength*> lengths;
231 std::vector<TOffset> limits;
232 std::vector<TOffset> sizes;
233 std::vector<TOffset> offsets;
235 InputSize() == iterator_.fields().size(),
236 "Invalid number of fields. Expected ",
237 iterator_.fields().size(),
240 sizes.resize(iterator_.numOffsetFields());
242 lengths.resize(iterator_.numLengthFields());
243 for (
int i = 0; i < lengths.size(); ++i) {
244 lengths[i] = Input(iterator_.lengthField(i).id).data<TLength>();
247 limits.assign(sizes.size(), std::numeric_limits<TOffset>::max());
248 for (
int i = 0; i < iterator_.fields().size(); ++i) {
249 int lengthIdx = iterator_.fields()[i].lengthFieldId + 1;
250 CAFFE_ENFORCE_GT(Input(i).ndim(), 0);
251 TOffset size = (TOffset)Input(i).dims()[0];
252 if (limits[lengthIdx] == std::numeric_limits<TOffset>::max()) {
253 limits[lengthIdx] = size;
256 limits[lengthIdx] == size,
257 "Inconsistent sizes for fields belonging to same domain.",
261 iterator_.fields()[i].name,
262 "); Length field index: ",
264 "); Previous size: ",
271 offsets.assign(sizes.size(), 0);
272 iterator_.advance(lengths, offsets, sizes, limits, limits[0]);
273 for (
int i = 0; i < limits.size(); ++i) {
274 CAFFE_ENFORCE(limits[i] == offsets[i]);
280 TreeIterator iterator_;
283 class PackRecordsOp :
public Operator<CPUContext> {
285 PackRecordsOp(
const OperatorDef& operator_def, Workspace* ws)
286 : Operator(operator_def, ws),
287 fields_(OperatorBase::GetRepeatedArgument<
std::string>(
"fields")) {}
289 bool RunOnDevice()
override {
291 CAFFE_ENFORCE_EQ(InputSize(), fields_.size());
292 CAFFE_ENFORCE_EQ(OutputSize(), 1);
294 TreeCursor cursor((TreeIterator(fields_)));
296 TreeWalker walker(Inputs(), cursor);
298 Output(0)->Resize(walker.size());
301 auto* dst = Output(0)->mutable_data<SharedTensorVectorPtr>();
303 for (
int batchId = 0; batchId < walker.size(); ++batchId) {
304 dst[batchId] = std::make_shared<std::vector<TensorCPU>>();
305 dst[batchId]->reserve(walker.fields().size());
307 for (
const auto& field : walker.fields()) {
308 dst[batchId]->emplace_back(field.dim());
309 auto& tensor = dst[batchId]->back();
310 context_.template CopyItems<CPUContext, CPUContext>(
314 tensor.raw_mutable_data(field.meta()) );
324 std::vector<std::string> fields_;
327 class UnPackRecordsOp :
public Operator<CPUContext> {
329 UnPackRecordsOp(
const OperatorDef& operator_def, Workspace* ws)
330 : Operator(operator_def, ws),
331 fields_(OperatorBase::GetRepeatedArgument<
std::string>(
"fields")) {}
333 bool RunOnDevice()
override {
334 const auto* inputs = Input(0).template data<SharedTensorVectorPtr>();
335 const auto numRows = Input(0).size();
337 CAFFE_ENFORCE_GE(numRows, 0);
339 auto numTensors = OutputSize();
342 std::vector<std::vector<TIndex>> outputDims(numTensors);
343 std::vector<const TypeMeta*> metas(numTensors);
346 numRows > 0 || InputSize() > 1,
347 "Unpacking empty record without shape will leave output blobs in " 350 if (InputSize() == 1) {
351 getShapeAndMetaFromInput(outputDims, metas);
353 getShapeAndMetaFromPrototypeBlobs(outputDims, metas);
356 for (
int i = 0; i < numRows; ++i) {
357 CAFFE_ENFORCE(inputs[i]);
358 for (
int j = 0; j < inputs[i]->size(); ++j) {
359 const auto& input = inputs[i]->at(j);
362 CAFFE_ENFORCE_EQ(outputDims[j].size(), input.ndim());
363 CAFFE_ENFORCE(*metas[j] == input.meta());
365 for (
int k = 1; k < input.ndim(); ++k) {
366 CAFFE_ENFORCE_EQ(input.dims()[k], outputDims[j][k]);
369 outputDims[j][0] += input.dim(0);
374 std::vector<void*> destinations(numTensors);
375 for (
int i = 0; i < numTensors; ++i) {
376 Output(i)->Resize(outputDims[i]);
377 destinations[i] = Output(i)->raw_mutable_data(*metas[i]);
380 for (
int i = 0; i < numRows; ++i) {
381 for (
int j = 0; j < numTensors; ++j) {
382 const auto& input = inputs[i]->at(j);
384 context_.CopyItems<CPUContext, CPUContext>(
392 (
char*)destinations[j] + input.size() * input.itemsize();
400 void getShapeAndMetaFromInput(
401 std::vector<std::vector<TIndex>>& outputDims,
402 std::vector<const TypeMeta*>& metas) {
403 const auto* inputs = Input(0).template data<SharedTensorVectorPtr>();
405 const auto& inputZero = inputs[0];
406 CAFFE_ENFORCE(inputZero);
408 const auto numTensors = inputZero->size();
410 CAFFE_ENFORCE_EQ(numTensors, fields_.size());
411 CAFFE_ENFORCE_EQ(numTensors, OutputSize());
413 for (
int i = 0; i < numTensors; ++i) {
414 outputDims[i] = inputZero->at(i).dims();
415 outputDims[i][0] = 0;
416 metas[i] = &inputZero->at(i).meta();
420 void getShapeAndMetaFromPrototypeBlobs(
421 std::vector<std::vector<TIndex>>& outputDims,
422 std::vector<const TypeMeta*>& metas) {
423 const auto numTensors = fields_.size();
424 CAFFE_ENFORCE_EQ(numTensors, InputSize() - 1);
425 CAFFE_ENFORCE_EQ(numTensors, OutputSize());
426 for (
int i = 0; i < numTensors; ++i) {
427 const auto& input = Input(i + 1);
428 outputDims[i] = input.dims();
429 outputDims[i][0] = 0;
430 metas[i] = &input.meta();
434 std::vector<std::string> fields_;
437 class ReadNextBatchOp :
public Operator<CPUContext> {
439 ReadNextBatchOp(
const OperatorDef& operator_def, Workspace* ws)
440 : Operator(operator_def, ws),
441 batchSize_(OperatorBase::GetSingleArgument<int>(
"batch_size", 1)),
442 enforceBatchSize_(OperatorBase::GetSingleArgument<bool>(
443 "enforce_batch_size",
446 bool RunOnDevice()
override {
447 auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
448 CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1);
449 std::vector<const TLength*> lengths;
450 std::vector<TOffset> limits;
451 std::vector<TOffset> sizes;
452 std::vector<TOffset> offsets;
454 sizes.resize(cursor->it.numOffsetFields());
456 lengths.resize(cursor->it.numLengthFields());
457 for (
int i = 0; i < lengths.size(); ++i) {
458 auto& a = Input(cursor->it.lengthField(i).id + 1);
460 lengths[i] = a.data<
int>();
462 lengths[i] = &lenZero;
466 limits.assign(sizes.size(), std::numeric_limits<TOffset>::max());
467 for (
int i = 0; i < cursor->it.fields().size(); ++i) {
468 int lengthFieldIdx = cursor->it.fields()[i].lengthFieldId + 1;
469 limits[lengthFieldIdx] =
470 std::min(limits[lengthFieldIdx], (TOffset)Input(i + 1).dims()[0]);
474 std::lock_guard<std::mutex> lock(cursor->mutex_);
475 if (cursor->offsets.empty()) {
476 cursor->offsets.assign(sizes.size(), 0);
478 offsets = cursor->offsets;
479 cursor->it.advance(lengths, cursor->offsets, sizes, limits, batchSize_);
480 if (enforceBatchSize_ && sizes[0] < batchSize_) {
484 sizes.assign(sizes.size(), 0);
488 std::vector<TIndex> outDim;
489 for (
int i = 0; i < cursor->it.fields().size(); ++i) {
490 auto lengthIdx = cursor->it.fields()[i].lengthFieldId + 1;
491 auto size = sizes[lengthIdx];
492 auto offset = offsets[lengthIdx];
493 auto& in = Input(i + 1);
494 auto innerSize = in.size_from_dim(1);
497 auto* out = Output(i);
500 (
char*)in.raw_data() + offset * innerSize * in.meta().itemsize();
501 void* dst = out->raw_mutable_data(in.meta());
502 if (out->size() == 0) {
505 context_.template CopyItems<CPUContext, CPUContext>(
506 in.meta(), out->size(), src, dst);
511 bool enforceBatchSize_;
514 class ComputeOffsetOp :
public Operator<CPUContext> {
516 ComputeOffsetOp(
const OperatorDef& operator_def, Workspace* ws)
517 : Operator(operator_def, ws) {}
519 bool RunOnDevice()
override {
520 auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
521 CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1);
522 auto* out = Output(0);
523 std::vector<const TLength*> lengths;
524 std::vector<TOffset> limits;
525 std::vector<TOffset> sizes;
526 std::vector<TOffset> offsets;
528 sizes.resize(cursor->it.numOffsetFields());
530 lengths.resize(cursor->it.numLengthFields());
531 for (
int i = 0; i < lengths.size(); ++i) {
532 auto& a = Input(cursor->it.lengthField(i).id + 1);
534 lengths[i] = a.data<
int>();
536 lengths[i] = &lenZero;
540 limits.assign(sizes.size(), std::numeric_limits<TOffset>::max());
541 for (
int i = 0; i < cursor->it.fields().size(); ++i) {
542 int lengthFieldIdx = cursor->it.fields()[i].lengthFieldId + 1;
543 limits[lengthFieldIdx] =
544 std::min(limits[lengthFieldIdx], (TOffset)Input(i + 1).dims()[0]);
546 out->Resize(limits.at(0) + 1, sizes.size());
547 auto* out_data = out->mutable_data<int64_t>();
548 for (
int k = 0; k <= limits.at(0); k++) {
550 if (cursor->offsets.empty()) {
551 cursor->offsets.assign(sizes.size(), 0);
554 std::copy(cursor->offsets.begin(), cursor->offsets.end(), out_data);
555 out_data += sizes.size();
556 cursor->it.advance(lengths, cursor->offsets, sizes, limits, 1);
558 cursor->offsets.assign(sizes.size(), 0);
563 class SortAndShuffleOp :
public Operator<CPUContext> {
565 SortAndShuffleOp(
const OperatorDef& operator_def, Workspace* ws)
566 : Operator(operator_def, ws),
568 OperatorBase::GetSingleArgument<int>(
"sort_by_field_idx", 1)),
569 batch_size_(OperatorBase::GetSingleArgument<int>(
"batch_size", 1)),
570 shuffle_size_(OperatorBase::GetSingleArgument<int>(
"shuffle_size", 1)) {
573 bool RunOnDevice()
override {
574 auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
575 CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1);
576 CAFFE_ENFORCE(-1 <= sort_by_field_idx_);
577 CAFFE_ENFORCE(cursor->it.fields().size() - sort_by_field_idx_ > 0);
579 if (sort_by_field_idx_ != -1) {
580 size = Input(sort_by_field_idx_ + 1).dims()[0];
582 size = Input(1).dims()[0];
586 batch_size_ > 0 && shuffle_size_ > 0 &&
587 0 < batch_size_ * shuffle_size_);
589 if (batch_size_ * shuffle_size_ > size) {
590 shuffle_size_ = size / batch_size_;
593 int num_batch = size / batch_size_;
594 auto* out = Output(0);
596 auto* out_data = out->mutable_data<int64_t>();
598 vector<int> shuffle_idx(size);
599 iota(shuffle_idx.begin(), shuffle_idx.end(), 0);
601 if (sort_by_field_idx_ != -1) {
602 auto& sortblob = Input(sort_by_field_idx_ + 1);
603 auto* sortdata = sortblob.data<
int>();
606 cursor->it.fields()[sort_by_field_idx_].lengthFieldId == -1);
607 sort(shuffle_idx.begin(), shuffle_idx.end(), [&sortdata](
int i1,
int i2) {
608 return sortdata[i1] < sortdata[i2];
612 if (batch_size_ * shuffle_size_ > 1) {
614 while (offset + batch_size_ * shuffle_size_ < size) {
616 shuffle_idx.begin() + offset,
617 shuffle_idx.begin() + offset + batch_size_ * shuffle_size_,
618 std::default_random_engine());
619 offset += batch_size_ * shuffle_size_;
623 vector<int> batch_idx(num_batch);
624 iota(batch_idx.begin(), batch_idx.end(), 0);
626 batch_idx.begin(), batch_idx.end(), std::default_random_engine());
628 for (
int i = 0; i < num_batch; i++) {
630 shuffle_idx.begin() + batch_idx[i] * batch_size_,
631 shuffle_idx.begin() + (batch_idx[i] + 1) * batch_size_,
633 out_data += batch_size_;
636 shuffle_idx.begin() + num_batch * batch_size_,
643 int sort_by_field_idx_;
648 class ReadRandomBatchOp :
public Operator<CPUContext> {
650 ReadRandomBatchOp(
const OperatorDef& operator_def, Workspace* ws)
651 : Operator(operator_def, ws),
652 batchSize_(OperatorBase::GetSingleArgument<int>(
"batch_size", 1)),
654 OperatorBase::GetSingleArgument<bool>(
"enforce_batch_size", false)),
655 loopOver_(OperatorBase::GetSingleArgument<bool>(
"loop_over", false)) {}
656 bool RunOnDevice()
override {
657 auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
658 auto& idxblob = Input(1);
659 auto& offsetsmat = Input(2);
660 CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 3);
661 auto idxvec = idxblob.template data<int64_t>();
662 auto& offsetdim = offsetsmat.dims();
664 std::vector<TIndex> outDim;
667 std::lock_guard<std::mutex> lock(cursor->mutex_);
668 cursor->offsets.resize(1);
669 idx = cursor->offsets.at(0);
672 if (enforceBatchSize_ && idx + batchSize_ > idxblob.size()) {
673 idx = idxblob.size();
675 if (loopOver_ && idx >= idxblob.size()) {
676 cursor->offsets.at(0) = 0;
679 cursor->offsets.at(0) += batchSize_;
682 for (
int i = 0; i < cursor->it.fields().size(); ++i) {
683 auto lengthIdx = cursor->it.fields()[i].lengthFieldId + 1;
684 auto& in = Input(i + 3);
688 for (
int j = 0; j < batchSize_; ++j) {
689 if (idx >= idxblob.size()) {
693 (idxvec[idx] + 1) * offsetdim[1] + lengthIdx < offsetsmat.size(),
694 "Out of bound when trying to get elem from offsetsmat");
695 auto offsetptr = offsetsmat.template data<TOffset>() +
696 idxvec[idx] * offsetdim[1] + lengthIdx;
697 auto offset = *offsetptr;
698 auto size = *(offsetptr + offsetdim[1]) - offset;
699 outDim.at(0) += size;
703 auto* out = Output(i);
705 if (out->size() == 0) {
708 auto dst =
static_cast<char*
>(out->raw_mutable_data(in.meta()));
709 int block_size = in.size() / in.dim(0);
710 auto block_bytesize = in.size_from_dim(1) * in.meta().itemsize();
712 block_bytesize == in.nbytes() / in.dim(0),
713 "block_bytesize should be consistent with data dim");
714 auto src_base =
static_cast<const char*
>(in.raw_data());
716 for (
int j = 0; j < batchSize_; ++j) {
717 if (idx >= idxblob.size()) {
720 auto offsetptr = offsetsmat.template data<TOffset>() +
721 idxvec[idx] * offsetdim[1] + lengthIdx;
722 auto offset = *offsetptr;
723 auto size = *(offsetptr + offsetdim[1]) - offset;
725 auto src = src_base + offset * block_bytesize;
726 context_.template CopyItems<CPUContext, CPUContext>(
727 in.meta(), size * block_size, src, dst + start * block_bytesize);
736 bool enforceBatchSize_;
740 template <
class Context>
741 class AppendOp final :
public Operator<Context> {
743 USE_OPERATOR_CONTEXT_FUNCTIONS;
744 AppendOp(
const OperatorDef& operator_def, Workspace* ws)
745 : Operator<Context>(operator_def, ws) {}
747 bool RunOnDevice()
override {
751 CAFFE_ENFORCE(b.ndim() >= 1);
752 if (a.size() == 0 && a.dim(0) == 0) {
756 CAFFE_ENFORCE(&a == c,
"First argument must be in-place.");
757 CAFFE_ENFORCE(c->ndim() == b.ndim());
758 CAFFE_ENFORCE(b.ndim() == c->ndim());
759 CAFFE_ENFORCE(a.meta() == b.meta());
760 for (
int i = 1; i < a.ndim(); ++i) {
761 CAFFE_ENFORCE(a.dims()[i] == b.dims()[i]);
763 auto oldSize = c->size();
764 c->Extend(b.dims()[0], kDatasetGrowthPct, &context_);
765 auto* dst = (
char*)c->raw_mutable_data() + oldSize * b.meta().itemsize();
766 context_.template CopyItems<Context, Context>(
767 b.meta(), b.size(), b.raw_data(), dst);
772 template <
class Context>
773 class AtomicAppendOp final :
public Operator<Context> {
775 USE_OPERATOR_CONTEXT_FUNCTIONS;
776 AtomicAppendOp(
const OperatorDef& operator_def, Workspace* ws)
777 : Operator<Context>(operator_def, ws) {}
779 bool RunOnDevice()
override {
780 auto& mutex = OperatorBase::Input<std::unique_ptr<std::mutex>>(0);
781 const auto numFields = (InputSize() - 1) / 2;
782 CAFFE_ENFORCE(OutputSize() == numFields);
784 std::lock_guard<std::mutex> guard(*mutex);
787 for (
int i = 0; i < numFields; ++i) {
788 auto& a = Input(1 + i);
789 auto& b = Input(1 + i + numFields);
791 CAFFE_ENFORCE(b.ndim() >= 1);
796 (
void*)&a == (
void*)c,
"Appended-to arguments must be in-place.");
797 CAFFE_ENFORCE(c->ndim() == b.ndim());
798 CAFFE_ENFORCE(b.ndim() == c->ndim());
799 CAFFE_ENFORCE(a.meta() == b.meta());
800 for (
int j = 1; j < a.ndim(); ++j) {
801 CAFFE_ENFORCE(a.dims()[j] == b.dims()[j]);
806 for (
int i = 0; i < numFields; ++i) {
807 auto& a = Input(1 + i);
808 auto& b = Input(1 + i + numFields);
810 if (a.size() == 0 && a.dim(0) == 0) {
814 auto oldSize = c->size();
815 c->Extend(b.dims()[0], kDatasetGrowthPct, &context_);
816 auto* dst = (
char*)c->raw_mutable_data() + oldSize * b.meta().itemsize();
817 context_.template CopyItems<Context, Context>(
818 b.meta(), b.size(), b.raw_data(), dst);
824 template <
class Context>
825 class CreateTensorVectorOp final :
public Operator<Context> {
827 USE_OPERATOR_CONTEXT_FUNCTIONS;
828 using Operator<Context>::Operator;
830 bool RunOnDevice()
override {
831 auto ptr = make_unique<std::vector<Tensor<Context>>>();
832 *OperatorBase::Output<TensorVectorPtr<Context>>(TENSOR_VECTOR) =
838 OUTPUT_TAGS(TENSOR_VECTOR);
841 template <
class Context>
842 class TensorVectorSizeOp final :
public Operator<Context> {
844 USE_OPERATOR_CONTEXT_FUNCTIONS;
845 USE_SIMPLE_CTOR_DTOR(TensorVectorSizeOp);
847 bool RunOnDevice()
override {
849 OperatorBase::Input<TensorVectorPtr<Context>>(TENSOR_VECTOR);
850 auto* size = Output(SIZE);
853 *size->template mutable_data<int32_t>() = vector_ptr->size();
858 INPUT_TAGS(TENSOR_VECTOR);
862 template <
class Context>
863 class ConcatTensorVectorOp final :
public Operator<Context> {
865 USE_OPERATOR_CONTEXT_FUNCTIONS;
866 using Operator<Context>::Operator;
868 bool RunOnDevice()
override {
869 const TensorVectorPtr<Context>& tensorVector =
870 OperatorBase::Input<TensorVectorPtr<Context>>(TENSOR_VECTOR);
872 auto* tensor = Output(TENSOR);
873 CAFFE_ENFORCE(!tensorVector->empty());
875 vector<TIndex> outputDims(tensorVector->at(0).dims());
876 CAFFE_ENFORCE(outputDims.size() > 0);
877 for (
int i = 1; i < tensorVector->size(); i++) {
879 for (
int j = 1; j < tensorVector->at(i).ndim(); j++) {
880 CAFFE_ENFORCE(outputDims[j] == tensorVector->at(i).dims()[j]);
882 CAFFE_ENFORCE(tensorVector->at(0).meta() == tensorVector->at(i).meta());
883 outputDims[0] += tensorVector->at(i).dims()[0];
886 tensor->Resize(outputDims);
888 auto* dst = (
char*)tensor->raw_mutable_data(tensorVector->at(0).meta());
890 for (
const auto& t : *tensorVector) {
891 context_.template CopyItems<Context, Context>(
892 t.meta(), t.size(), t.raw_data(), dst + offset);
893 offset += t.nbytes();
900 INPUT_TAGS(TENSOR_VECTOR);
904 template <
class Context>
905 class CollectTensorOp final :
public Operator<Context> {
907 USE_OPERATOR_CONTEXT_FUNCTIONS;
908 CollectTensorOp(
const OperatorDef& operator_def, Workspace* ws)
909 : Operator<Context>(operator_def, ws),
911 OperatorBase::GetSingleArgument<int>(
"num_to_collect", -1)),
913 CAFFE_ENFORCE(numToCollect_ > 0);
916 bool RunOnDevice()
override {
918 if (numVisited_ < numToCollect_) {
922 auto& gen = context_.RandGenerator();
924 std::uniform_int_distribution<int> uniformDist(0, numVisited_);
925 pos = uniformDist(gen);
926 if (pos >= numToCollect_) {
932 for (
int i = 0; i < OutputSize(); ++i) {
934 TensorVectorPtr<Context>& tensorVector =
935 *OperatorBase::Output<TensorVectorPtr<Context>>(i);
937 if (numVisited_ >= numToCollect_) {
939 tensorVector->size() == numToCollect_,
940 "TensorVecotor size = ",
941 tensorVector->size(),
942 " is different from numToCollect = ",
946 const auto& tensor = Input(OutputSize() + i);
950 CAFFE_ENFORCE(numVisited_ >= numToCollect_);
951 }
else if (pos >= tensorVector->size()) {
954 tensorVector->back().template CopyFrom<Context, Context>(
958 tensorVector->at(pos).template CopyFrom<Context, Context>(
974 class TrimDatasetOp :
public Operator<CPUContext> {
976 TrimDatasetOp(
const OperatorDef& operator_def, Workspace* ws)
977 : Operator(operator_def, ws),
978 iterator_(OperatorBase::GetRepeatedArgument<
std::string>(
"fields")),
979 multiple_of_(OperatorBase::GetSingleArgument<int>(
"multiple_of", 1)) {
980 CAFFE_ENFORCE_GE(multiple_of_, 1);
983 bool RunOnDevice()
override {
984 TreeCursor cursor(iterator_);
985 TreeWalker walker(Inputs(), cursor);
987 int trimmedSize = (walker.size() / multiple_of_) * multiple_of_;
988 if (trimmedSize == walker.size()) {
993 for (
int i = 0; i < trimmedSize; ++i) {
997 for (
int col = 0; col < walker.fields().size(); ++col) {
998 auto newOuterSize = walker.fields().at(col).offset();
999 Output(col)->Shrink(newOuterSize);
1005 TreeIterator iterator_;
1009 REGISTER_CPU_OPERATOR(CreateTreeCursor, CreateTreeCursorOp);
1010 REGISTER_CPU_OPERATOR(ResetCursor, ResetCursorOp);
1011 REGISTER_CPU_OPERATOR(ReadNextBatch, ReadNextBatchOp);
1012 REGISTER_CPU_OPERATOR(ComputeOffset, ComputeOffsetOp);
1013 REGISTER_CPU_OPERATOR(SortAndShuffle, SortAndShuffleOp);
1014 REGISTER_CPU_OPERATOR(ReadRandomBatch, ReadRandomBatchOp);
1015 REGISTER_CPU_OPERATOR(CheckDatasetConsistency, CheckDatasetConsistencyOp);
1016 REGISTER_CPU_OPERATOR(Append, AppendOp<CPUContext>);
1017 REGISTER_CPU_OPERATOR(AtomicAppend, AtomicAppendOp<CPUContext>);
1018 REGISTER_CPU_OPERATOR(CreateTensorVector, CreateTensorVectorOp<CPUContext>);
1019 REGISTER_CPU_OPERATOR(TensorVectorSize, TensorVectorSizeOp<CPUContext>);
1020 REGISTER_CPU_OPERATOR(ConcatTensorVector, ConcatTensorVectorOp<CPUContext>);
1021 REGISTER_CPU_OPERATOR(CollectTensor, CollectTensorOp<CPUContext>);
1022 REGISTER_CPU_OPERATOR(PackRecords, PackRecordsOp);
1023 REGISTER_CPU_OPERATOR(UnPackRecords, UnPackRecordsOp);
1024 REGISTER_CPU_OPERATOR(TrimDataset, TrimDatasetOp);
1026 OPERATOR_SCHEMA(CreateTreeCursor)
1030 Creates a cursor to iterate through a list of tensors, where some of those 1031 tensors contains the lengths in a nested schema. The schema is determined by 1032 the `fields` arguments. 1034 For example, to represent the following schema: 1047 the field list will be: 1059 And for the following instance of the struct: 1063 b=[[4, 5], [6, 7, 8], [], [9]], 1065 Struct(c1='alex', c2=[10, 11]), 1066 Struct(c1='bob', c2=[12]), 1070 The values of the fields will be: 1074 "b:values:lengths": [2, 3, 0, 1], 1075 "b:values:values": [4, 5, 6, 7, 8, 9], 1077 "c:c1": ["alex", "bob"], 1078 "c:c2:lengths": [2, 1], 1079 "c:c2:values", [10, 11, 12], 1082 In general, every field name in the format "{prefix}:lengths" defines a domain 1083 "{prefix}", and every subsequent field in the format "{prefix}:{field}" will 1084 be in that domain, and the length of the domain is provided for each entry of 1085 the parent domain. In the example, "b:lengths" defines a domain of length 4, so 1086 every field under domain "b" will have 4 entries. 1087 The "lengths" field for a given domain must appear before any reference to 1090 Returns a pointer to an instance of the Cursor, which keeps the current offset 1091 on each of the domains defined by `fields`. Cursor also ensures thread-safety 1092 such that ReadNextBatch and ResetCursor can be used safely in parallel. 1094 A cursor does not contain data per se, so calls to ReadNextBatch actually need 1095 to pass a list of blobs containing the data to read for each one of the fields. 1097 .Output(0, "cursor",
"A blob pointing to an instance of a new TreeCursor.")
1100 "A list of strings each one representing a field of the dataset.");
1102 OPERATOR_SCHEMA(ResetCursor)
1106 Resets the offsets for the given TreeCursor. This operation is thread safe. 1108 .Input(0, "cursor",
"A blob containing a pointer to the cursor.");
1110 OPERATOR_SCHEMA(ReadNextBatch)
1111 .NumInputs(1, INT_MAX)
1112 .NumOutputs(1, INT_MAX)
1114 Read the next batch of examples out of the given cursor and data blobs. 1116 Input(0) is a blob pointing to a TreeCursor, and 1117 [Input(1),... Input(num_fields)] a list of tensors containing the data for 1118 each field of the dataset. 1120 ReadNextBatch is thread safe. 1122 .Input(0, "cursor",
"A blob containing a pointer to the cursor.")
1123 .Input(1,
"dataset_field_0",
"First dataset field")
1124 .Output(0,
"field_0",
"Tensor containing the next batch for field 0.")
1125 .Arg(
"batch_size",
"Number of top-level entries to read.");
1127 OPERATOR_SCHEMA(ComputeOffset)
1128 .NumInputs(1, INT_MAX)
1131 Compute the offsets matrix given cursor and data blobs. Need to be ran at 1132 beginning or after reseting cursor 1134 Input(0) is a blob pointing to a TreeCursor, and 1135 [Input(1),... Input(num_fields)] a list of tensors containing the data for 1136 each field of the dataset. 1138 ComputeOffset is thread safe. 1140 .Input(0, "cursor",
"A blob containing a pointer to the cursor.")
1141 .Input(1,
"dataset_field_0",
"First dataset field")
1142 .Output(0,
"field_0",
"Tensor containing offset info for this chunk.");
1144 OPERATOR_SCHEMA(SortAndShuffle)
1145 .NumInputs(1, INT_MAX)
1148 Compute the sorted indices given a field index to sort by and break the sorted 1149 indices into chunks of shuffle_size * batch_size and shuffle each chunk, 1150 finally we shuffle between batches. If sort_by_field_idx is -1 we skip sort. 1152 For example, we have data sorted as 1153 1,2,3,4,5,6,7,8,9,10,11,12 1155 and batchSize = 2 and shuffleSize = 3, when we shuffle we get: 1156 [3,1,4,6,5,2] [12,10,11,8,9,7] 1158 After this we will shuffle among different batches with size 2 1159 [3,1],[4,6],[5,2],[12,10],[11,8],[9,7] 1161 We may end up with something like 1162 [9,7],[5,2],[12,10],[4,6],[3,1],[11,8] 1164 Input(0) is a blob pointing to a TreeCursor, and 1165 [Input(1),... Input(num_fields)] a list of tensors containing the data for 1166 each field of the dataset. 1168 SortAndShuffle is thread safe. 1170 .Input(0, "cursor",
"A blob containing a pointer to the cursor.")
1171 .Input(1,
"dataset_field_0",
"First dataset field")
1172 .Output(0,
"indices",
"Tensor containing sorted indices.");
1174 OPERATOR_SCHEMA(ReadRandomBatch)
1175 .NumInputs(1, INT_MAX)
1176 .NumOutputs(1, INT_MAX)
1178 Read the next batch of examples out of the given cursor, 1179 idx blob, offset matrix and data blobs. 1181 Input(0) is a blob pointing to a TreeCursor, 1182 Input(1) is a blob pointing to the shuffled idx 1183 Input(2) is a blob pointing to the offset matrix and 1184 [Input(3),... Input(num_fields)] a list of tensors containing the data for 1185 each field of the dataset. 1187 ReadRandomBatch is thread safe. 1189 .Input(0, "cursor",
"A blob containing a pointer to the cursor.")
1190 .Input(1,
"idx",
"idx with a shuffled order.")
1191 .Input(2,
"offsetsmat",
"offset matrix containing length offset info.")
1192 .Input(3,
"dataset_field_0",
"First dataset field")
1193 .Output(0,
"field_0",
"Tensor containing the next batch for field 0.")
1194 .Arg(
"batch_size",
"Number of top-level entries to read.")
1195 .Arg(
"loop_over",
"(bool) Repeat the dataset indefinitely");
1197 OPERATOR_SCHEMA(CheckDatasetConsistency)
1198 .NumInputs(1, INT_MAX)
1201 Checks that the given data fields represents a consistent dataset under 1202 the schema specified by the `fields` argument. Operator fails if the fields 1203 are not consistent. If data is consistent, each field's data can be safely 1204 appended to an existing dataset, keeping it consistent. 1206 .Input(0, "field_0",
"Data for field 0.")
1209 "List of strings representing the string names in the format" 1210 "specified in the doc for CreateTreeCursor.");
1212 OPERATOR_SCHEMA(Append)
1215 .EnforceInplace({{0, 0}})
1217 Append input 2 to the end of input 1. 1218 Input 1 must be the same as output, that is, it is required to be in-place. 1219 Input 1 may have to be re-allocated in order for accommodate to the new size. 1220 Currently, an exponential growth ratio is used in order to ensure amortized 1221 constant time complexity. 1222 All except the outer-most dimension must be the same between input 1 and 2. 1224 .Input(0, "dataset",
"The tensor to be appended to.")
1225 .Input(1,
"new_data",
"Tensor to append to the end of dataset.")
1226 .Output(0,
"dataset",
"Same as input 0, representing the mutated tensor.");
1228 OPERATOR_SCHEMA(AtomicAppend)
1229 .NumInputs(3, INT_MAX)
1230 .NumOutputs(1, INT_MAX)
1231 .AllowInplace([](
int in,
int out) {
return in == out + 1; });
1233 OPERATOR_SCHEMA(CreateTensorVector)
1236 .SetDoc(
"Create a std::unique_ptr<std::vector<Tensor> >");
1238 OPERATOR_SCHEMA(TensorVectorSize)
1241 .SetDoc(
"Get the size of the input vector")
1242 .Input(0,
"tensor vector",
"std::unique_ptr<std::vector<Tensor> >")
1243 .Output(0,
"size",
"int32_t size");
1245 OPERATOR_SCHEMA(ConcatTensorVector)
1249 Concat Tensors in the std::unique_ptr<std::vector<Tensor> > 1250 along the first dimension. 1252 .Input(0, "vector of Tensor",
"std::unique_ptr<std::vector<Tensor> >")
1253 .Output(0,
"tensor",
"tensor after concatenating");
1255 OPERATOR_SCHEMA(CollectTensor)
1256 .NumInputs([](
int n) {
return n > 0 && n % 2 == 0; })
1257 .NumOutputs(1, INT_MAX)
1258 .NumInputsOutputs([](
int in,
int out) {
return in == out * 2; })
1259 .EnforceInplace([](
int in,
int out) {
return in == out; })
1261 Collect tensor into tensor vector by reservoir sampling, 1262 argument num_to_collect indicates the max number of tensors that will be 1263 collected. The first half of the inputs are tensor vectors, which are also the 1264 outputs. The second half of the inputs are the tensors to be collected into each 1265 vector (in the same order). The input tensors are collected in all-or-none 1266 manner. If they are collected, they will be placed at the same index in the 1269 .Arg("num_to_collect",
"The max number of tensors to collect");
1271 OPERATOR_SCHEMA(PackRecords)
1272 .NumInputs(1, INT_MAX)
1275 Given a dataset under a schema specified by the `fields` argument will pack all 1276 the input tensors into one, where each tensor element represents a row of data 1277 (batch of size 1). This format allows easier use with the rest of Caffe2 1282 "List of strings representing the string names in the format" 1283 "specified in the doc for CreateTreeCursor.")
1287 "One dimensional tensor having a complex type of SharedTensorVectorPtr." 1288 " In order to reverse it back to the original input it has to be " 1289 "inserted into UnPackRecordsOp.");
1291 OPERATOR_SCHEMA(TrimDataset)
1292 .NumInputs(1, INT_MAX)
1293 .NumOutputs(1, INT_MAX)
1295 Trim the given dataset inplace, given the dataset blobs and the field specs. 1296 Trimming happens such that the dataset will contain the largest possible number 1297 of records that is a multiple of the 'multiple_of' argument. 1299 .EnforceInplace([](int input,
int output) {
return input == output; })
1302 "List of strings representing the string names in the format" 1303 "specified in the doc for CreateTreeCursor.");
1305 OPERATOR_SCHEMA(UnPackRecords)
1306 .NumInputs(1, INT_MAX)
1307 .NumOutputs(1, INT_MAX)
1309 Given a packed dataset (packed by the PackRecordsOp) and the `fields` argument 1310 describing the datasets schema returns the original dataset format. Number of 1311 returned tensors is equal to the number of fields in the `fields` argument. 1313 The first input is the packed tensor to be unpacked. Optionally, you can provide 1314 prototype tensors to give the expected shapes of the output tensors. This is 1315 helpful when you expected to unpack empty tensor, e.g., output of a sampling 1320 "List of strings representing the string names in the format" 1321 "specified in the doc for CreateTreeCursor.")
1322 .Input(0,
"packed_tensor",
"The tensor to be unpacked");
1324 SHOULD_NOT_DO_GRADIENT(CreateTreeCursor);
1325 SHOULD_NOT_DO_GRADIENT(ResetCursor);
1326 SHOULD_NOT_DO_GRADIENT(ReadNextBatch);
1327 SHOULD_NOT_DO_GRADIENT(ComputeOffset);
1328 SHOULD_NOT_DO_GRADIENT(ReadRandomBatch);
1329 SHOULD_NOT_DO_GRADIENT(CheckDatasetConsistency);
1330 SHOULD_NOT_DO_GRADIENT(Append);
1331 SHOULD_NOT_DO_GRADIENT(AtomicAppend);
1332 SHOULD_NOT_DO_GRADIENT(CreateTensorVector);
1333 SHOULD_NOT_DO_GRADIENT(TensorVectorSize);
1334 SHOULD_NOT_DO_GRADIENT(ConcatTensorVector);
1335 SHOULD_NOT_DO_GRADIENT(CollectTensor);
1336 SHOULD_NOT_DO_GRADIENT(UnPackRecords);
1337 SHOULD_NOT_DO_GRADIENT(PackRecords);
1339 class TreeCursorSerializer :
public BlobSerializerBase {
1341 TreeCursorSerializer() {}
1342 ~TreeCursorSerializer() {}
1347 SerializationAcceptor acceptor)
override {
1348 auto& cursor = blob.template Get<std::unique_ptr<TreeCursor>>();
1349 BlobProto blob_proto;
1352 if (cursor->offsets.size() > 0) {
1354 auto* offsets = offsets_blob.template GetMutable<Tensor<CPUContext>>();
1355 offsets->Resize(cursor->offsets.size());
1357 cursor->offsets.begin(),
1358 cursor->offsets.end(),
1359 offsets->mutable_data<TOffset>());
1360 TensorSerializer<CPUContext> ser;
1362 *offsets, name, blob_proto.mutable_tensor(), 0, offsets->size());
1364 blob_proto.set_name(name);
1365 blob_proto.set_type(
"std::unique_ptr<TreeCursor>");
1368 std::ostringstream os;
1369 for (
const auto& field : cursor->it.fields()) {
1370 os << field.name <<
" ";
1372 blob_proto.set_content(os.str());
1374 acceptor(name, blob_proto.SerializeAsString());
1378 class TreeCursorDeserializer :
public BlobDeserializerBase {
1380 void Deserialize(
const BlobProto& proto, Blob* blob)
override {
1382 TensorDeserializer<CPUContext> deser;
1384 deser.Deserialize(proto, &offset_blob);
1385 auto& offsets = offset_blob.template Get<Tensor<CPUContext>>();
1386 auto* offsets_ptr = offsets.data<TOffset>();
1389 std::vector<std::string> fieldNames;
1390 std::istringstream is(proto.content());
1397 fieldNames.push_back(field);
1399 TreeIterator it(fieldNames);
1401 auto* base = blob->template GetMutable<std::unique_ptr<TreeCursor>>();
1402 (*base).reset(
new TreeCursor(it));
1403 (*base)->offsets.assign(offsets_ptr, offsets_ptr + offsets.size());
1407 REGISTER_BLOB_SERIALIZER(
1409 TreeCursorSerializer);
1410 REGISTER_BLOB_DESERIALIZER(std::unique_ptr<TreeCursor>, TreeCursorDeserializer);
1414 void SharedTensorVectorPtrSerializer::Serialize(
1417 BlobSerializerBase::SerializationAcceptor acceptor) {
1424 CAFFE_ENFORCE(blob.IsType<std::shared_ptr<std::vector<TensorCPU>>>());
1425 BlobProto blob_proto;
1426 blob_proto.set_name(name);
1427 blob_proto.set_type(
"std::shared_ptr<std::vector<TensorCPU>>");
1428 blob_proto.set_content(
"");
1429 acceptor(name, blob_proto.SerializeAsString());
1432 void SharedTensorVectorPtrDeserializer::Deserialize(
1437 blob->GetMutable<std::shared_ptr<std::vector<TensorCPU>>>();
1440 REGISTER_BLOB_SERIALIZER(
1441 (
TypeMeta::Id<std::shared_ptr<std::vector<TensorCPU>>>()),
1442 SharedTensorVectorPtrSerializer);
1444 REGISTER_BLOB_DESERIALIZER(
1445 std::shared_ptr<std::vector<TensorCPU>>,
1446 SharedTensorVectorPtrDeserializer);
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...