1 #ifndef CAFFE2_OPERATORS_PARTITION_OPS_H_ 2 #define CAFFE2_OPERATORS_PARTITION_OPS_H_ 4 #include "caffe2/core/context.h" 5 #include "caffe2/core/operator.h" 9 template <
typename Index>
10 static inline int moduloPartition(Index key,
int numPartitions) {
11 int shard = key % numPartitions;
13 shard += numPartitions & (shard >> (
sizeof(int) * 8 - 1));
25 bool RunOnDevice()
override {
30 template <
typename Index>
31 bool DoRunWithType() {
32 const auto numPartitions = InputSize() - 1;
33 CAFFE_ENFORCE_GE(numPartitions, 1);
34 const auto& keysTensor = Input(0);
35 const auto* keysData = keysTensor.template data<Index>();
36 const auto& keysShape = Input(0).
dims();
38 keysShape.size(), 1,
"Only 1D keys tensor supported currently.");
41 const auto& in0Shape = Input(1).
dims();
42 CAFFE_ENFORCE_GE(in0Shape.size(), 1);
44 vector<TIndex> outShape(keysShape);
45 outShape.insert(outShape.end(), in0Shape.begin() + 1, in0Shape.end());
47 CAFFE_ENFORCE_GE(outShape.size(), 1);
48 auto totalSize = in0Shape[0];
49 auto meta = Input(1).
meta();
50 for (
int i = 2; i < InputSize(); ++i) {
51 const auto& input = Input(i);
52 CAFFE_ENFORCE(meta == input.meta());
53 CAFFE_ENFORCE_GE(input.ndim(), 1);
54 CAFFE_ENFORCE(std::equal(
55 outShape.begin() + keysShape.size(),
57 input.dims().begin() + 1));
58 totalSize += input.dim(0);
60 CAFFE_ENFORCE_EQ(keysTensor.size(), totalSize);
62 auto* outTensor = Output(0);
63 outTensor->Resize(outShape);
64 auto* outData =
static_cast<char*
>(outTensor->raw_mutable_data(meta));
65 const auto blockSize = outTensor->size_from_dim(1);
67 inputDatas_.resize(numPartitions);
68 for (
int i = 0; i < numPartitions; ++i) {
69 inputDatas_[i] =
static_cast<const char*
>(Input(i + 1).
raw_data());
71 inStartOffsets_.assign(numPartitions, 0);
72 Index outStartOffset = 0;
73 int currentShard = -1;
76 const auto numEntries = keysTensor.size();
77 for (int64_t i = 0; i <= numEntries; ++i) {
79 i < numEntries ? moduloPartition(keysData[i], numPartitions) : -1;
80 if (newShard != currentShard) {
81 if (currentShard != -1) {
82 auto inStartOffset = inStartOffsets_[currentShard];
83 auto numItems = i - outStartOffset;
84 context_.template CopyItems<CPUContext, CPUContext>(
87 inputDatas_[currentShard] +
88 inStartOffset * blockSize * meta.itemsize(),
89 outData + outStartOffset * blockSize * meta.itemsize());
90 inStartOffsets_[currentShard] += numItems;
92 currentShard = newShard;
100 std::vector<const char*> inputDatas_;
101 std::vector<int64_t> inStartOffsets_;
110 OP_SINGLE_ARG(
int,
"pack_first_input", pack_first_input_, 0) {}
113 template <
typename Index>
114 void ApplyPartition(
bool skipFirstArgument) {
116 OutputSize() % InputSize(),
118 "Output number must be a multiple of input number");
119 int partitions = OutputSize() / InputSize();
120 int inputSize = InputSize();
121 int mainInputIndex = skipFirstArgument;
122 CAFFE_ENFORCE_GT(partitions, 0,
"Invalid number of partitions");
124 auto& main_input = Input(mainInputIndex);
125 TIndex size = main_input.size();
126 const Index* data = main_input.template data<Index>();
127 counts_.assign(partitions, 0);
128 for (TIndex p = 0; p < size; p++) {
129 int shard = moduloPartition(data[p], partitions);
133 raw_datas_.resize(inputSize);
134 block_sizes_.resize(inputSize);
135 metas_.resize(inputSize);
136 out_datas_.resize(OutputSize());
137 for (
int i = mainInputIndex; i < inputSize; ++i) {
138 auto& input = Input(i);
139 if (i > mainInputIndex) {
143 "Prefix of extra input's shape must match main input's shape, ",
146 for (
int j = 0; j < main_input.ndim(); ++j) {
150 "Prefix of extra input's shape must match main input's shape, ",
157 raw_datas_[i] = input.raw_data();
158 block_sizes_[i] = input.size_from_dim(main_input.ndim());
159 metas_[i] = input.meta();
161 vector<TIndex> shape(
162 input.dims().begin() + main_input.ndim() - 1, input.dims().end());
163 for (
int j = 0; j < partitions; ++j) {
164 int out_idx = i + j * inputSize;
165 auto output = Output(out_idx);
166 shape[0] = counts_[j];
167 output->Resize(shape);
168 out_datas_[out_idx] = output->raw_mutable_data(input.meta());
172 counts_.assign(partitions, 0);
173 for (TIndex p = 0; p < size; p++) {
174 int shard = moduloPartition(data[p], partitions);
175 TIndex idx = counts_[shard]++;
178 static_cast<Index*
>(out_datas_[shard * inputSize + mainInputIndex])[idx] =
179 pack_first_input_ ? ((data[p] - shard) / partitions) : data[p];
181 int baseIndex = shard * inputSize;
182 for (
int i = mainInputIndex + 1; i < inputSize; ++i) {
183 auto bs = block_sizes_[i];
184 auto meta = metas_[i];
186 context_.template CopyItems<CPUContext, CPUContext>(
189 static_cast<const char*
>(raw_datas_[i]) + p * bs * meta.itemsize(),
190 static_cast<char*
>(out_datas_[baseIndex + i]) +
191 idx * bs * meta.itemsize());
196 bool pack_first_input_;
199 vector<TIndex> counts_;
200 vector<TIndex> block_sizes_;
201 vector<TypeMeta> metas_;
202 vector<const void*> raw_datas_;
203 vector<void*> out_datas_;
213 bool RunOnDevice()
override {
218 template <
typename Index>
219 bool DoRunWithType() {
220 ApplyPartition<Index>(
false );
234 bool RunOnDevice()
override {
239 template <
typename Index>
240 bool DoRunWithType() {
242 OutputSize() % InputSize() == 0,
243 "Output number must be a multiple of input number");
244 int partitions = OutputSize() / InputSize();
245 CAFFE_ENFORCE_GT(partitions, 0,
"Invalid number of partitions");
249 "Only 1-D tensors supported as a partitioning tensor for sharding");
252 ApplyPartition<Index>(
true );
255 auto& main_input = Input(1);
256 TIndex size = main_input.size();
257 const Index* data = main_input.template data<Index>();
259 auto& length_input = Input(0);
260 TIndex elements = length_input.size();
261 const int32_t* lengths_data = length_input.template data<int32_t>();
262 out_length_.resize(partitions);
263 for (
int i = 0; i < partitions; ++i) {
264 auto& output = *Output(i * InputSize());
265 output.Resize(elements);
266 out_length_[i] = output.template mutable_data<int32_t>();
269 int total_length = 0;
270 for (
int i = 0; i < elements; ++i) {
271 total_length += lengths_data[i];
274 total_length == size,
275 "Total length is not matching to the number of elements");
278 for (
int i = 0; i < elements; ++i) {
279 for (
int j = 0; j < partitions; ++j) {
280 out_length_[j][i] = 0;
282 for (
int j = 0; j < lengths_data[i]; ++j, ++index) {
283 int shard = moduloPartition(data[index], partitions);
284 ++out_length_[shard][i];
292 vector<int32_t*> out_length_;
297 #endif // CAFFE2_OPERATORS_PARTITION_OPS_H_ const TypeMeta & meta() const
Returns the TypeMeta object associated with the current data type.
The CPU Context, representing the bare minimum of what a Context class in Caffe2 should implement...
const vector< TIndex > & dims() const
Returns the dimensions of the tensor as a vector.
Workspace is a class that holds all the related objects created during runtime: (1) all blobs...
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
const void * raw_data() const
Returns a const raw void* pointer of the underlying storage.