Caffe2 - C++ API
A deep learning, cross platform ML framework
partition_ops.h
1 #ifndef CAFFE2_OPERATORS_PARTITION_OPS_H_
2 #define CAFFE2_OPERATORS_PARTITION_OPS_H_
3 
4 #include "caffe2/core/context.h"
5 #include "caffe2/core/operator.h"
6 
7 namespace caffe2 {
8 
9 template <typename Index>
10 static inline int moduloPartition(Index key, int numPartitions) {
11  int shard = key % numPartitions;
12  // equivalent to `if (shard < 0) shard += partitions;`
13  shard += numPartitions & (shard >> (sizeof(int) * 8 - 1));
14  return shard;
15 }
16 
17 class GatherByKeyOp : public Operator<CPUContext> {
18  public:
19  USE_DISPATCH_HELPER;
20  USE_OPERATOR_FUNCTIONS(CPUContext);
21  GatherByKeyOp(const OperatorDef& operator_def, Workspace* ws)
22  : Operator<CPUContext>(operator_def, ws) {}
23 
24  private:
25  bool RunOnDevice() override {
26  return DispatchHelper<TensorTypes<int32_t, int64_t>>::call(this, Input(0));
27  }
28 
29  private:
30  template <typename Index>
31  bool DoRunWithType() {
32  const auto numPartitions = InputSize() - 1;
33  CAFFE_ENFORCE_GE(numPartitions, 1);
34  const auto& keysTensor = Input(0);
35  const auto* keysData = keysTensor.template data<Index>();
36  const auto& keysShape = Input(0).dims();
37  CAFFE_ENFORCE_EQ(
38  keysShape.size(), 1, "Only 1D keys tensor supported currently.");
39 
40  // 1. Shape and type consistency checks
41  const auto& in0Shape = Input(1).dims();
42  CAFFE_ENFORCE_GE(in0Shape.size(), 1);
43 
44  vector<TIndex> outShape(keysShape);
45  outShape.insert(outShape.end(), in0Shape.begin() + 1, in0Shape.end());
46 
47  CAFFE_ENFORCE_GE(outShape.size(), 1);
48  auto totalSize = in0Shape[0];
49  auto meta = Input(1).meta();
50  for (int i = 2; i < InputSize(); ++i) {
51  const auto& input = Input(i);
52  CAFFE_ENFORCE(meta == input.meta());
53  CAFFE_ENFORCE_GE(input.ndim(), 1);
54  CAFFE_ENFORCE(std::equal(
55  outShape.begin() + keysShape.size(),
56  outShape.end(),
57  input.dims().begin() + 1));
58  totalSize += input.dim(0);
59  }
60  CAFFE_ENFORCE_EQ(keysTensor.size(), totalSize);
61 
62  auto* outTensor = Output(0);
63  outTensor->Resize(outShape);
64  auto* outData = static_cast<char*>(outTensor->raw_mutable_data(meta));
65  const auto blockSize = outTensor->size_from_dim(1);
66 
67  inputDatas_.resize(numPartitions);
68  for (int i = 0; i < numPartitions; ++i) {
69  inputDatas_[i] = static_cast<const char*>(Input(i + 1).raw_data());
70  }
71  inStartOffsets_.assign(numPartitions, 0);
72  Index outStartOffset = 0;
73  int currentShard = -1;
74 
75  // 2. copy from inputs into output based on shard for each input key
76  const auto numEntries = keysTensor.size();
77  for (int64_t i = 0; i <= numEntries; ++i) {
78  auto newShard =
79  i < numEntries ? moduloPartition(keysData[i], numPartitions) : -1;
80  if (newShard != currentShard) {
81  if (currentShard != -1) {
82  auto inStartOffset = inStartOffsets_[currentShard];
83  auto numItems = i - outStartOffset;
84  context_.template CopyItems<CPUContext, CPUContext>(
85  meta,
86  numItems * blockSize,
87  inputDatas_[currentShard] +
88  inStartOffset * blockSize * meta.itemsize(),
89  outData + outStartOffset * blockSize * meta.itemsize());
90  inStartOffsets_[currentShard] += numItems;
91  }
92  currentShard = newShard;
93  outStartOffset = i;
94  }
95  }
96 
97  return true;
98  }
99 
100  std::vector<const char*> inputDatas_;
101  std::vector<int64_t> inStartOffsets_;
102 };
103 
104 class PartitionOpBase : public Operator<CPUContext> {
105  public:
106  USE_OPERATOR_FUNCTIONS(CPUContext);
107 
108  PartitionOpBase(const OperatorDef& operator_def, Workspace* ws)
109  : Operator<CPUContext>(operator_def, ws),
110  OP_SINGLE_ARG(int, "pack_first_input", pack_first_input_, 0) {}
111 
112  protected:
113  template <typename Index>
114  void ApplyPartition(bool skipFirstArgument) {
115  CAFFE_ENFORCE_EQ(
116  OutputSize() % InputSize(),
117  0,
118  "Output number must be a multiple of input number");
119  int partitions = OutputSize() / InputSize();
120  int inputSize = InputSize();
121  int mainInputIndex = skipFirstArgument;
122  CAFFE_ENFORCE_GT(partitions, 0, "Invalid number of partitions");
123 
124  auto& main_input = Input(mainInputIndex);
125  TIndex size = main_input.size();
126  const Index* data = main_input.template data<Index>();
127  counts_.assign(partitions, 0);
128  for (TIndex p = 0; p < size; p++) {
129  int shard = moduloPartition(data[p], partitions);
130  ++counts_[shard];
131  }
132 
133  raw_datas_.resize(inputSize);
134  block_sizes_.resize(inputSize);
135  metas_.resize(inputSize);
136  out_datas_.resize(OutputSize());
137  for (int i = mainInputIndex; i < inputSize; ++i) {
138  auto& input = Input(i);
139  if (i > mainInputIndex) {
140  CAFFE_ENFORCE_GE(
141  input.ndim(),
142  main_input.ndim(),
143  "Prefix of extra input's shape must match main input's shape, ",
144  "input: ",
145  i);
146  for (int j = 0; j < main_input.ndim(); ++j) {
147  CAFFE_ENFORCE_GE(
148  input.dim(j),
149  main_input.dim(j),
150  "Prefix of extra input's shape must match main input's shape, ",
151  "input: ",
152  i,
153  ", dim ",
154  j);
155  }
156  }
157  raw_datas_[i] = input.raw_data();
158  block_sizes_[i] = input.size_from_dim(main_input.ndim());
159  metas_[i] = input.meta();
160  // shape = partition_size + suffix of input dims
161  vector<TIndex> shape(
162  input.dims().begin() + main_input.ndim() - 1, input.dims().end());
163  for (int j = 0; j < partitions; ++j) {
164  int out_idx = i + j * inputSize;
165  auto output = Output(out_idx);
166  shape[0] = counts_[j];
167  output->Resize(shape);
168  out_datas_[out_idx] = output->raw_mutable_data(input.meta());
169  }
170  }
171 
172  counts_.assign(partitions, 0);
173  for (TIndex p = 0; p < size; p++) {
174  int shard = moduloPartition(data[p], partitions);
175  TIndex idx = counts_[shard]++;
176 
177  // special case first input
178  static_cast<Index*>(out_datas_[shard * inputSize + mainInputIndex])[idx] =
179  pack_first_input_ ? ((data[p] - shard) / partitions) : data[p];
180 
181  int baseIndex = shard * inputSize;
182  for (int i = mainInputIndex + 1; i < inputSize; ++i) {
183  auto bs = block_sizes_[i];
184  auto meta = metas_[i];
185  // special case for small bs?
186  context_.template CopyItems<CPUContext, CPUContext>(
187  meta,
188  bs,
189  static_cast<const char*>(raw_datas_[i]) + p * bs * meta.itemsize(),
190  static_cast<char*>(out_datas_[baseIndex + i]) +
191  idx * bs * meta.itemsize());
192  }
193  }
194  }
195 
196  bool pack_first_input_;
197 
198  // use member fields to reuse memory
199  vector<TIndex> counts_;
200  vector<TIndex> block_sizes_;
201  vector<TypeMeta> metas_;
202  vector<const void*> raw_datas_;
203  vector<void*> out_datas_;
204 };
205 
206 class PartitionOp : public PartitionOpBase {
207  public:
208  USE_DISPATCH_HELPER;
209 
210  PartitionOp(const OperatorDef& operator_def, Workspace* ws)
211  : PartitionOpBase(operator_def, ws) {}
212 
213  bool RunOnDevice() override {
214  return DispatchHelper<TensorTypes<int32_t, int64_t>>::call(this, Input(0));
215  }
216 
217  private:
218  template <typename Index>
219  bool DoRunWithType() {
220  ApplyPartition<Index>(false /* skipFirstArgument */);
221  return true;
222  }
223 
224  DISABLE_COPY_AND_ASSIGN(PartitionOp);
225 };
226 
228  public:
229  USE_DISPATCH_HELPER;
230 
231  LengthsPartitionOp(const OperatorDef& operator_def, Workspace* ws)
232  : PartitionOpBase(operator_def, ws) {}
233 
234  bool RunOnDevice() override {
235  return DispatchHelper<TensorTypes<int32_t, int64_t>>::call(this, Input(1));
236  }
237 
238  private:
239  template <typename Index>
240  bool DoRunWithType() {
241  CAFFE_ENFORCE(
242  OutputSize() % InputSize() == 0,
243  "Output number must be a multiple of input number");
244  int partitions = OutputSize() / InputSize();
245  CAFFE_ENFORCE_GT(partitions, 0, "Invalid number of partitions");
246  CAFFE_ENFORCE_EQ(
247  Input(1).ndim(),
248  1,
249  "Only 1-D tensors supported as a partitioning tensor for sharding");
250 
251  // Apply sharding to all parameters except lengths
252  ApplyPartition<Index>(true /* skipFirstArgument */);
253 
254  // Compute lengths after sharding
255  auto& main_input = Input(1);
256  TIndex size = main_input.size();
257  const Index* data = main_input.template data<Index>();
258 
259  auto& length_input = Input(0);
260  TIndex elements = length_input.size();
261  const int32_t* lengths_data = length_input.template data<int32_t>();
262  out_length_.resize(partitions);
263  for (int i = 0; i < partitions; ++i) {
264  auto& output = *Output(i * InputSize());
265  output.Resize(elements);
266  out_length_[i] = output.template mutable_data<int32_t>();
267  }
268 
269  int total_length = 0;
270  for (int i = 0; i < elements; ++i) {
271  total_length += lengths_data[i];
272  }
273  CAFFE_ENFORCE(
274  total_length == size,
275  "Total length is not matching to the number of elements");
276 
277  int index = 0;
278  for (int i = 0; i < elements; ++i) {
279  for (int j = 0; j < partitions; ++j) {
280  out_length_[j][i] = 0;
281  }
282  for (int j = 0; j < lengths_data[i]; ++j, ++index) {
283  int shard = moduloPartition(data[index], partitions);
284  ++out_length_[shard][i];
285  }
286  }
287  return true;
288  }
289 
290  DISABLE_COPY_AND_ASSIGN(LengthsPartitionOp);
291 
292  vector<int32_t*> out_length_;
293 };
294 
295 } // namespace caffe2
296 
297 #endif // CAFFE2_OPERATORS_PARTITION_OPS_H_
const TypeMeta & meta() const
Returns the TypeMeta object associated with the current data type.
Definition: tensor.h:648
The CPU Context, representing the bare minimum of what a Context class in Caffe2 should implement...
Definition: context.h:66
const vector< TIndex > & dims() const
Returns the dimensions of the tensor as a vector.
Definition: tensor.h:611
Workspace is a class that holds all the related objects created during runtime: (1) all blobs...
Definition: workspace.h:47
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
const void * raw_data() const
Returns a const raw void* pointer of the underlying storage.
Definition: tensor.h:472