Caffe2 - C++ API
A deep learning, cross platform ML framework
blob_serialization.h
1 #ifndef CAFFE2_CORE_BLOB_SERIALIZATION_H_
2 #define CAFFE2_CORE_BLOB_SERIALIZATION_H_
3 
4 #include <limits>
5 #include <future>
6 
7 #include <google/protobuf/repeated_field.h>
8 
9 #include "caffe2/core/blob.h"
10 #include "caffe2/core/blob_serializer_base.h"
11 #include "caffe2/core/tensor.h"
12 #include "caffe2/core/typeid.h"
13 #include "caffe2/core/types.h"
14 #include "caffe2/utils/simple_queue.h"
15 
16 CAFFE2_DECLARE_int(caffe2_tensor_chunk_size);
17 CAFFE2_DECLARE_int(caffe2_max_tensor_serializer_threads);
18 CAFFE2_DECLARE_bool(caffe2_serialize_fp16_as_bytes);
19 
20 namespace caffe2 {
21 
22 constexpr auto kTensorBlobType = "Tensor";
23 // String used to separate chunk id from the blob name when storing in DB
24 constexpr auto kChunkIdSeparator = "#%";
25 
26 // The Blob serialization registry and serializer creator functions.
27 CAFFE_DECLARE_TYPED_REGISTRY(
28  BlobSerializerRegistry,
29  CaffeTypeId,
30  BlobSerializerBase,
31  std::unique_ptr);
32 #define REGISTER_BLOB_SERIALIZER(id, ...) \
33  CAFFE_REGISTER_TYPED_CLASS(BlobSerializerRegistry, id, __VA_ARGS__)
34 // Creates an operator with the given operator definition.
35 inline unique_ptr<BlobSerializerBase> CreateSerializer(CaffeTypeId id) {
36  return BlobSerializerRegistry()->Create(id);
37 }
38 
45 template <class Context>
47  public:
48  TensorSerializer() : context_() {}
49  ~TensorSerializer() override {}
54  void Serialize(
55  const Blob& blob,
56  const string& name,
57  SerializationAcceptor acceptor) override;
58  void SerializeWithChunkSize(
59  const Blob& blob,
60  const string& name,
61  SerializationAcceptor acceptor,
62  int chunk_size) override;
63 
64  void Serialize(const Tensor<Context>& tensor, const string& name,
65  TensorProto* proto, size_t chunkBegin, int32_t chunkSize);
66 
67  private:
68  // A utility function to store the device context detauls.
69  void StoreDeviceDetail(const Tensor<Context>& input, TensorProto* proto);
70  Context context_;
71 };
72 
78  public:
79  virtual ~BlobDeserializerBase() {}
80 
81  // Deserializes from a BlobProto object.
82  virtual void Deserialize(const BlobProto& proto, Blob* blob) = 0;
83 };
84 
85 CAFFE_DECLARE_REGISTRY(BlobDeserializerRegistry, BlobDeserializerBase);
86 #define REGISTER_BLOB_DESERIALIZER(name, ...) \
87  CAFFE_REGISTER_CLASS(BlobDeserializerRegistry, name, __VA_ARGS__)
88 // Creates an operator with the given operator definition.
89 inline unique_ptr<BlobDeserializerBase> CreateDeserializer(const string& type) {
90  return BlobDeserializerRegistry()->Create(type);
91 }
92 
101 template <class Context>
103  public:
104  void Deserialize(const BlobProto& proto, Blob* blob) override;
105  void Deserialize(const TensorProto& proto, Tensor<Context>* tensor);
106 };
107 
109 // Implementations
111 
112 namespace detail {
113 template <typename SrcType, typename DstType, class Context>
114 inline void CopyToProtoAsIs(
115  const size_t size,
116  const SrcType* src,
117  google::protobuf::RepeatedField<DstType>* field,
118  Context* context) {
119  static_assert(
120  sizeof(SrcType) == sizeof(DstType),
121  "The source type and dest type cannot be copied as-is. Did "
122  "you mean CopyToProtoWithCast?");
123  field->Reserve(size);
124  for (int i = 0; i < size; ++i) {
125  field->Add(0);
126  }
127  context->template Copy<SrcType, Context, CPUContext>(
128  size, src, reinterpret_cast<SrcType*>(field->mutable_data()));
129  // Make sure that we finish the copy into the protobuf.
130  context->FinishDeviceComputation();
131 }
132 
133 template <typename SrcType, typename DstType, class Context>
134 inline void CopyToProtoWithCast(
135  const size_t size,
136  const SrcType* src,
137  google::protobuf::RepeatedField<DstType>* field,
138  Context* context) {
139  // TODO: we are having one unnecessary copy here if the context is already
140  // CPUContext. Remove it if it is performance critical.
141  unique_ptr<SrcType[]> buffer(new SrcType[size]);
142  context->template Copy<SrcType, Context, CPUContext>(
143  size, src, buffer.get());
144  context->FinishDeviceComputation();
145  field->Reserve(size);
146  for (int i = 0; i < size; ++i) {
147  field->Add(static_cast<DstType>(buffer[i]));
148  }
149 }
150 
151 template <typename SrcType, typename DstType, class Context>
152 inline void CopyFromProtoAsIs(
153  const size_t size,
154  const google::protobuf::RepeatedField<SrcType>& field,
155  DstType* dst,
156  Context* context) {
157  static_assert(
158  sizeof(SrcType) == sizeof(DstType),
159  "The source type and dest type cannot be copied as-is. Did "
160  "you mean CopyFromProtoWithCast?");
161  CAFFE_ENFORCE_EQ(size, field.size(), "Incorrect proto field size.");
162  context->template Copy<DstType, CPUContext, Context>(
163  size, reinterpret_cast<const DstType*>(field.data()), dst);
164 }
165 
166 template <typename SrcType, typename DstType, class Context>
167 inline void CopyFromProtoWithCast(
168  const size_t size,
169  const google::protobuf::RepeatedField<SrcType>& field,
170  DstType* dst,
171  Context* context) {
172  CAFFE_ENFORCE_EQ(size, field.size(), "Incorrect proto field size.");
173  // TODO: we are having one unnecessary copy here if the context is already
174  // CPUContext. Remove it if it is performance critical.
175  unique_ptr<DstType[]> buffer(new DstType[size]);
176  const SrcType* src = field.data();
177  for (int i = 0; i < size; ++i) {
178  buffer[i] = static_cast<DstType>(src[i]);
179  }
180  context->template Copy<DstType, CPUContext, Context>(size, buffer.get(), dst);
181 }
182 
183 } // namespace detail
184 
185 template <class Context>
187  const Blob& blob,
188  const string& name,
189  BlobSerializerBase::SerializationAcceptor acceptor) {
190  this->SerializeWithChunkSize(blob, name, acceptor, kDefaultChunkSize);
191 }
192 
193 template <class Context>
195  const Blob& blob,
196  const string& name,
197  BlobSerializerBase::SerializationAcceptor acceptor,
198  int chunk_size) {
199  CAFFE_ENFORCE(blob.IsType<Tensor<Context>>());
200  const auto& tensor = blob.template Get<Tensor<Context>>();
201  if (chunk_size == kNoChunking) {
202  chunk_size = tensor.size() + 1; // to account for empty tensors
203  } else if (chunk_size == kDefaultChunkSize) {
204  chunk_size = FLAGS_caffe2_tensor_chunk_size;
205  }
206 
207  auto processChunk = [&](int64_t chunkStart) {
208  BlobProto blob_proto;
209  blob_proto.set_name(name);
210  blob_proto.set_type(kTensorBlobType);
211  TensorProto& proto = *blob_proto.mutable_tensor();
212  proto.set_name(name);
213  this->Serialize(
214  tensor, name, blob_proto.mutable_tensor(), chunkStart, chunk_size);
215  acceptor(
216  MakeString(name, kChunkIdSeparator, chunkStart / chunk_size),
217  blob_proto.SerializeAsString());
218  };
219 
220 #ifndef __ANDROID__
221  std::vector<std::future<void>> futures;
222  // Poorman's IOBound ThreadPool
223  SimpleQueue<size_t> chunkQueue;
224  auto task = [&]() {
225  size_t chunkStart;
226  while (chunkQueue.Pop(&chunkStart)) {
227  processChunk(chunkStart);
228  }
229  };
230  if (tensor.size() > chunk_size) {
231  for (int i = 0; i < FLAGS_caffe2_max_tensor_serializer_threads; ++i) {
232  futures.emplace_back(std::async(std::launch::async, task));
233  }
234  }
235 #endif
236 
237  VLOG(1) << "Serializing blob " << name;
238  // Serialize whole vector. If vector is empty, it's shape still needs to be
239  // serialized in empty proto
240  for (size_t chunkBegin = 0;
241  chunkBegin < std::max(tensor.size(), static_cast<TIndex>(1));
242  chunkBegin += chunk_size) {
243  VLOG(2) << "Starting a chunk at " << chunkBegin;
244 #ifndef __ANDROID__
245  if (tensor.size() > chunk_size) {
246  chunkQueue.Push(chunkBegin);
247  } else {
248  // Sync mode for small tensors
249  processChunk(chunkBegin);
250  }
251 #else
252  // Since Android does not have std::future, we will always do sync mode
253  processChunk(chunkBegin);
254 #endif
255  }
256 
257 #ifndef __ANDROID__
258  chunkQueue.NoMoreJobs();
259  for (auto& fut : futures) {
260  fut.get();
261  }
262 #endif
263 }
264 
265 template <class Context>
267  const Tensor<Context>& input,
268  const string& /*name*/,
269  TensorProto* proto_ptr,
270  size_t chunkBegin,
271  int32_t chunkSize) {
272  CAFFE_ENFORCE(
273  chunkBegin <= input.size(),
274  "Chunk begin is out of tensor: ",
275  chunkBegin,
276  ' ',
277  input.size());
278  if (chunkBegin + chunkSize > input.size()) {
279  chunkSize = input.size() - chunkBegin;
280  }
281 
282  CAFFE_ENFORCE(
283  input.raw_data() || chunkSize == 0,
284  "The input does not have data input yet. This is probably because you "
285  "created a tensor of non-zero shape but never filled its data via "
286  "mutable_data() calls. This means that it makes no sense to serialize "
287  "the tensor content.");
288 
289  TensorProto& proto = *proto_ptr;
290  proto.mutable_segment()->set_begin(chunkBegin);
291  proto.mutable_segment()->set_end(chunkBegin + chunkSize);
292 
293  for (int i = 0; i < input.ndim(); ++i) {
294  proto.add_dims(input.dim(i));
295  }
296  const TensorProto::DataType data_type = TypeMetaToDataType(input.meta());
297  proto.set_data_type(data_type);
298  StoreDeviceDetail(input, &proto);
299 
300  // A lot of copypaste is error prone. Should we create a macro for this?
301  switch (data_type) {
302  case TensorProto_DataType_FLOAT:
303  detail::CopyToProtoAsIs(
304  chunkSize,
305  input.template data<float>() + chunkBegin,
306  proto.mutable_float_data(),
307  &this->context_);
308  break;
309  case TensorProto_DataType_INT32:
310  detail::CopyToProtoAsIs(
311  chunkSize,
312  input.template data<int>() + chunkBegin,
313  proto.mutable_int32_data(),
314  &this->context_);
315  break;
316  case TensorProto_DataType_BYTE:
317  LOG(FATAL) << "This should not happen. When serializing, "
318  "BYTE is deprecated and moved to UINT8.";
319  break;
320  case TensorProto_DataType_STRING:
321  {
322  proto.mutable_string_data()->Reserve(chunkSize);
323  const string* content = input.template data<string>();
324  for (int i = chunkBegin; i < chunkBegin + chunkSize; ++i) {
325  proto.add_string_data(content[i]);
326  }
327  break;
328  }
329  case TensorProto_DataType_BOOL:
330  detail::CopyToProtoWithCast(
331  chunkSize,
332  input.template data<bool>() + chunkBegin,
333  proto.mutable_int32_data(),
334  &this->context_);
335  break;
336  case TensorProto_DataType_UINT8:
337  detail::CopyToProtoWithCast(
338  chunkSize,
339  input.template data<uint8_t>() + chunkBegin,
340  proto.mutable_int32_data(),
341  &this->context_);
342  break;
343  case TensorProto_DataType_INT8:
344  detail::CopyToProtoWithCast(
345  chunkSize,
346  input.template data<int8_t>() + chunkBegin,
347  proto.mutable_int32_data(),
348  &this->context_);
349  break;
350  case TensorProto_DataType_UINT16:
351  detail::CopyToProtoWithCast(
352  chunkSize,
353  input.template data<uint16_t>() + chunkBegin,
354  proto.mutable_int32_data(),
355  &this->context_);
356  break;
357  case TensorProto_DataType_INT16:
358  detail::CopyToProtoWithCast(
359  chunkSize,
360  input.template data<int16_t>() + chunkBegin,
361  proto.mutable_int32_data(),
362  &this->context_);
363  break;
364  case TensorProto_DataType_INT64:
365  detail::CopyToProtoAsIs(
366  chunkSize,
367  input.template data<int64_t>() + chunkBegin,
368  proto.mutable_int64_data(),
369  &this->context_);
370  break;
371  case TensorProto_DataType_FLOAT16: {
372  if (FLAGS_caffe2_serialize_fp16_as_bytes) {
373  const int kValue = 1;
374  CAFFE_ENFORCE_EQ(
375  reinterpret_cast<const char*>(&kValue)[0],
376  1,
377  "Serialization of FLOAT16 on big endian platform "
378  "is not written yet.");
379  unique_ptr<char[]> buffer(new char[2 * chunkSize]);
380  this->context_.template Copy<char, Context, CPUContext>(
381  2 * chunkSize,
382  reinterpret_cast<const char*>(
383  input.template data<float16>() + chunkBegin),
384  buffer.get());
385  this->context_.FinishDeviceComputation();
386  proto.set_byte_data(buffer.release(), 2 * chunkSize);
387  } else {
388  detail::CopyToProtoWithCast(
389  chunkSize,
390  reinterpret_cast<const uint16_t*>(input.template data<float16>()) +
391  chunkBegin,
392  proto.mutable_int32_data(),
393  &this->context_);
394  }
395  } break;
396  case TensorProto_DataType_DOUBLE:
397  detail::CopyToProtoAsIs(
398  chunkSize,
399  input.template data<double>() + chunkBegin,
400  proto.mutable_double_data(),
401  &this->context_);
402  break;
403  case TensorProto_DataType_UNDEFINED: {
404  proto.mutable_string_data()->Reserve(chunkSize);
405  Blob temp_blob;
406  const char* raw_data = static_cast<const char*>(input.raw_data());
407  for (int i = chunkBegin; i < chunkBegin + chunkSize; ++i) {
408  temp_blob.ShareExternal(
409  const_cast<char*>(raw_data + i * input.itemsize()), input.meta());
410  proto.add_string_data(temp_blob.Serialize(""));
411  }
412  } break;
413  // Note: we intentially do not provide "default:" so if any new data types
414  // are added, the compiler should warn the user to add the case here.
415  }
416 }
417 
418 template <class Context>
420  const BlobProto& blob_proto,
421  Blob* blob) {
422  Deserialize(blob_proto.tensor(), blob->GetMutable<Tensor<Context>>());
423 }
424 
425 template <class Context>
427  const TensorProto& proto,
428  Tensor<Context>* tensor) {
429  // We create a local context for deserializing. Since Caffe2 contexts are
430  // usually lightweighted, this should not involve too much overhead.
431  Context context(proto.device_detail());
432  context.SwitchToDevice(0);
433  vector<TIndex> dims;
434  for (const TIndex d : proto.dims()) {
435  dims.push_back(d);
436  }
437  tensor->Resize(dims);
438 
439  int64_t chunkBegin = 0;
440  auto chunkEnd = tensor->size();
441  if (proto.has_segment()) {
442  chunkBegin = proto.segment().begin();
443  chunkEnd = proto.segment().end();
444  }
445  CAFFE_ENFORCE(
446  0 <= chunkBegin && chunkBegin <= chunkEnd && chunkEnd <= tensor->size(),
447  "Invalid chunk ",
448  chunkBegin,
449  ' ',
450  chunkEnd,
451  " with total tensor size ",
452  tensor->size());
453  auto chunkSize = chunkEnd - chunkBegin;
454 
455  switch (proto.data_type()) {
456  case TensorProto_DataType_FLOAT:
457  detail::CopyFromProtoAsIs(
458  chunkSize,
459  proto.float_data(),
460  tensor->template mutable_data<float>() + chunkBegin,
461  &context);
462  break;
463  case TensorProto_DataType_INT32:
464  detail::CopyFromProtoAsIs(
465  chunkSize,
466  proto.int32_data(),
467  tensor->template mutable_data<int>() + chunkBegin,
468  &context);
469  break;
470  case TensorProto_DataType_BYTE:
471  // Since BYTE stores the data in a string field instead of a repreated
472  // field we will have it special cased.
473  CAFFE_ENFORCE_EQ(
474  chunkSize, proto.byte_data().size(), "Incorrect proto field size.");
475  context.template Copy<uint8_t, Context, CPUContext>(
476  chunkSize,
477  reinterpret_cast<const uint8_t*>(proto.byte_data().data()),
478  tensor->template mutable_data<uint8_t>() + chunkBegin);
479  break;
480  case TensorProto_DataType_STRING:
481  // Special handing of string because it is a non-fundamental type.
482  {
483  string* content = tensor->template mutable_data<string>();
484  for (int i = 0; i < chunkSize; ++i) {
485  content[i + chunkBegin] = proto.string_data(i);
486  }
487  }
488  break;
489  case TensorProto_DataType_BOOL:
490  detail::CopyFromProtoWithCast(
491  chunkSize,
492  proto.int32_data(),
493  tensor->template mutable_data<bool>() + chunkBegin,
494  &context);
495  break;
496  case TensorProto_DataType_UINT8:
497  detail::CopyFromProtoWithCast(
498  chunkSize,
499  proto.int32_data(),
500  tensor->template mutable_data<uint8_t>() + chunkBegin,
501  &context);
502  break;
503  case TensorProto_DataType_INT8:
504  detail::CopyFromProtoWithCast(
505  chunkSize,
506  proto.int32_data(),
507  tensor->template mutable_data<int8_t>() + chunkBegin,
508  &context);
509  break;
510  case TensorProto_DataType_UINT16:
511  detail::CopyFromProtoWithCast(
512  chunkSize,
513  proto.int32_data(),
514  tensor->template mutable_data<uint16_t>() + chunkBegin,
515  &context);
516  break;
517  case TensorProto_DataType_INT16:
518  detail::CopyFromProtoWithCast(
519  chunkSize,
520  proto.int32_data(),
521  tensor->template mutable_data<int16_t>() + chunkBegin,
522  &context);
523  break;
524  case TensorProto_DataType_INT64:
525  detail::CopyFromProtoAsIs(
526  chunkSize,
527  proto.int64_data(),
528  tensor->template mutable_data<int64_t>() + chunkBegin,
529  &context);
530  break;
531  case TensorProto_DataType_FLOAT16:
532  if (proto.has_byte_data()) {
533  const int kValue = 1;
534  CAFFE_ENFORCE_EQ(
535  reinterpret_cast<const char*>(&kValue)[0],
536  1,
537  "Serialization of FLOAT16 on big endian platform "
538  "is not written yet.");
539  CAFFE_ENFORCE_EQ(
540  2 * chunkSize,
541  proto.byte_data().size(),
542  "Incorrect proto field size.");
543  context.template Copy<float16, Context, CPUContext>(
544  chunkSize,
545  reinterpret_cast<const float16*>(proto.byte_data().data()),
546  tensor->template mutable_data<float16>() + chunkBegin);
547  } else {
548  // Backward compatibility with models which used int32_data field
549  detail::CopyFromProtoWithCast(
550  chunkSize,
551  proto.int32_data(),
552  reinterpret_cast<uint16_t*>(
553  tensor->template mutable_data<float16>()) +
554  chunkBegin,
555  &context);
556  }
557  break;
558  case TensorProto_DataType_DOUBLE:
559  detail::CopyFromProtoAsIs(
560  chunkSize,
561  proto.double_data(),
562  tensor->template mutable_data<double>() + chunkBegin,
563  &context);
564  break;
565  case TensorProto_DataType_UNDEFINED: {
566  Blob temp_blob;
567  void* raw_ptr = nullptr;
568  for (int i = 0; i < chunkSize; ++i) {
569  temp_blob.Deserialize(proto.string_data(i));
570  if (i == 0) {
571  raw_ptr = tensor->raw_mutable_data(temp_blob.meta());
572  }
573  temp_blob.meta().copy()(
574  temp_blob.GetRaw(),
575  static_cast<char*>(raw_ptr) +
576  (i + chunkBegin) * temp_blob.meta().itemsize(),
577  1);
578  }
579  }
580  }
581  context.FinishDeviceComputation();
582 }
583 
584 } // namespace caffe2
585 
586 #endif // CAFFE2_CORE_BLOB_SERIALIZATION_H_
Blob is a general container that hosts a typed pointer.
Definition: blob.h:25
size_t itemsize() const
Return the number of bytes each item takes in the tensor.
Definition: tensor.h:597
std::remove_const< T >::type * ShareExternal(typename std::remove_const< T >::type *allocated)
Sets the underlying object to the allocated one, but does not take over the ownership of the passed i...
Definition: blob.h:163
const TypeMeta & meta() const
Returns the TypeMeta object associated with the current data type.
Definition: tensor.h:648
TIndex dim(const int i) const
Returns the i-th dimension of the tensor.
Definition: tensor.h:671
TensorSerializer is the serializer for Tensors.
BlobDeserializerBase is an abstract class that deserializes a blob from a BlobProto or a TensorProto...
Tensor is the basic class in Caffe2 that stores a contiguous memory with its shape information...
Definition: tensor.h:93
TIndex size() const
Returns the size (i.e.
Definition: tensor.h:593
void Serialize(const Blob &blob, const string &name, SerializationAcceptor acceptor) override
Serializes a Blob.
void Serialize(const string &name, BlobSerializerBase::SerializationAcceptor acceptor, int chunk_size=kDefaultChunkSize) const
Serializes the current blob, if possible.
void Resize(Ts...dim_source)
Resizes a tensor.
Definition: tensor.h:288
TensorDeserializer is the deserializer for Tensors.
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
const void * raw_data() const
Returns a const raw void* pointer of the underlying storage.
Definition: tensor.h:472
TypedCopy copy() const
Returns the typed copy function pointer for individual iterms.
Definition: typeid.h:155
T * GetMutable(bool *is_new_object=nullptr)
Gets a mutable pointer to the stored object.
Definition: blob.h:101
const TypeMeta & meta() const
Returns the meta info of the blob.
Definition: blob.h:63
void Deserialize(const string &content)
Deserializes from a string containing either BlobProto or TensorProto.
bool IsType() const
Checks if the content stored in the blob is of type T.
Definition: blob.h:58
int ndim() const
Returns the number of dimensions of the data.
Definition: tensor.h:589
void * raw_mutable_data(const TypeMeta &meta)
Returns a mutable raw pointer of the underlying storage.
Definition: tensor.h:510
const size_t & itemsize() const
Returns the size of the item.
Definition: typeid.h:143
BlobSerializerBase is an abstract class that serializes a blob to a string.