Caffe2 - C++ API
A deep learning, cross platform ML framework
db.cc
1 #include "caffe2/core/db.h"
2 
3 #include <mutex>
4 
5 #include "caffe2/core/blob_serialization.h"
6 #include "caffe2/core/logging.h"
7 
8 namespace caffe2 {
9 
10 CAFFE_KNOWN_TYPE(db::DBReader);
11 CAFFE_KNOWN_TYPE(db::Cursor);
12 
13 namespace db {
14 
15 CAFFE_DEFINE_REGISTRY(Caffe2DBRegistry, DB, const string&, Mode);
16 
17 // Below, we provide a bare minimum database "minidb" as a reference
18 // implementation as well as a portable choice to store data.
19 // Note that the MiniDB classes are not exposed via a header file - they should
20 // be created directly via the db interface. See MiniDB for details.
21 
22 class MiniDBCursor : public Cursor {
23  public:
24  explicit MiniDBCursor(FILE* f, std::mutex* mutex)
25  : file_(f), lock_(*mutex), valid_(true) {
26  // We call Next() to read in the first entry.
27  Next();
28  }
29  ~MiniDBCursor() {}
30 
31  void Seek(const string& /*key*/) override {
32  LOG(FATAL) << "MiniDB does not support seeking to a specific key.";
33  }
34 
35  void SeekToFirst() override {
36  fseek(file_, 0, SEEK_SET);
37  CAFFE_ENFORCE(!feof(file_), "Hmm, empty file?");
38  // Read the first item.
39  valid_ = true;
40  Next();
41  }
42 
43  void Next() override {
44  // First, read in the key and value length.
45  if (fread(&key_len_, sizeof(int), 1, file_) == 0) {
46  // Reaching EOF.
47  VLOG(1) << "EOF reached, setting valid to false";
48  valid_ = false;
49  return;
50  }
51  CAFFE_ENFORCE_EQ(fread(&value_len_, sizeof(int), 1, file_), 1);
52  CAFFE_ENFORCE_GT(key_len_, 0);
53  CAFFE_ENFORCE_GT(value_len_, 0);
54  // Resize if the key and value len is larger than the current one.
55  if (key_len_ > key_.size()) {
56  key_.resize(key_len_);
57  }
58  if (value_len_ > value_.size()) {
59  value_.resize(value_len_);
60  }
61  // Actually read in the contents.
62  CAFFE_ENFORCE_EQ(
63  fread(key_.data(), sizeof(char), key_len_, file_), key_len_);
64  CAFFE_ENFORCE_EQ(
65  fread(value_.data(), sizeof(char), value_len_, file_), value_len_);
66  // Note(Yangqing): as we read the file, the cursor naturally moves to the
67  // beginning of the next entry.
68  }
69 
70  string key() override {
71  CAFFE_ENFORCE(valid_, "Cursor is at invalid location!");
72  return string(key_.data(), key_len_);
73  }
74 
75  string value() override {
76  CAFFE_ENFORCE(valid_, "Cursor is at invalid location!");
77  return string(value_.data(), value_len_);
78  }
79 
80  bool Valid() override { return valid_; }
81 
82  private:
83  FILE* file_;
84  std::lock_guard<std::mutex> lock_;
85  bool valid_;
86  int key_len_;
87  vector<char> key_;
88  int value_len_;
89  vector<char> value_;
90 };
91 
93  public:
94  explicit MiniDBTransaction(FILE* f, std::mutex* mutex)
95  : file_(f), lock_(*mutex) {}
97  Commit();
98  }
99 
100  void Put(const string& key, const string& value) override {
101  int key_len = key.size();
102  int value_len = value.size();
103  CAFFE_ENFORCE_EQ(fwrite(&key_len, sizeof(int), 1, file_), 1);
104  CAFFE_ENFORCE_EQ(fwrite(&value_len, sizeof(int), 1, file_), 1);
105  CAFFE_ENFORCE_EQ(
106  fwrite(key.c_str(), sizeof(char), key_len, file_), key_len);
107  CAFFE_ENFORCE_EQ(
108  fwrite(value.c_str(), sizeof(char), value_len, file_), value_len);
109  }
110 
111  void Commit() override {
112  if (file_ != nullptr) {
113  CAFFE_ENFORCE_EQ(fflush(file_), 0);
114  file_ = nullptr;
115  }
116  }
117 
118  private:
119  FILE* file_;
120  std::lock_guard<std::mutex> lock_;
121 
122  DISABLE_COPY_AND_ASSIGN(MiniDBTransaction);
123 };
124 
125 class MiniDB : public DB {
126  public:
127  MiniDB(const string& source, Mode mode) : DB(source, mode), file_(nullptr) {
128  switch (mode) {
129  case NEW:
130  file_ = fopen(source.c_str(), "wb");
131  break;
132  case WRITE:
133  file_ = fopen(source.c_str(), "ab");
134  fseek(file_, 0, SEEK_END);
135  break;
136  case READ:
137  file_ = fopen(source.c_str(), "rb");
138  break;
139  }
140  CAFFE_ENFORCE(file_, "Cannot open file: " + source);
141  VLOG(1) << "Opened MiniDB " << source;
142  }
143  ~MiniDB() { Close(); }
144 
145  void Close() override {
146  if (file_) {
147  fclose(file_);
148  }
149  file_ = nullptr;
150  }
151 
152  unique_ptr<Cursor> NewCursor() override {
153  CAFFE_ENFORCE_EQ(this->mode_, READ);
154  return make_unique<MiniDBCursor>(file_, &file_access_mutex_);
155  }
156 
157  unique_ptr<Transaction> NewTransaction() override {
158  CAFFE_ENFORCE(this->mode_ == NEW || this->mode_ == WRITE);
159  return make_unique<MiniDBTransaction>(file_, &file_access_mutex_);
160  }
161 
162  private:
163  FILE* file_;
164  // access mutex makes sure we don't have multiple cursors/transactions
165  // reading the same file.
166  std::mutex file_access_mutex_;
167 };
168 
169 REGISTER_CAFFE2_DB(MiniDB, MiniDB);
170 REGISTER_CAFFE2_DB(minidb, MiniDB);
171 
173  const Blob& blob,
174  const string& name,
175  BlobSerializerBase::SerializationAcceptor acceptor) {
176  CAFFE_ENFORCE(blob.IsType<DBReader>());
177  auto& reader = blob.Get<DBReader>();
178  DBReaderProto proto;
179  proto.set_name(name);
180  proto.set_source(reader.source_);
181  proto.set_db_type(reader.db_type_);
182  if (reader.cursor() && reader.cursor()->SupportsSeek()) {
183  proto.set_key(reader.cursor()->key());
184  }
185  BlobProto blob_proto;
186  blob_proto.set_name(name);
187  blob_proto.set_type("DBReader");
188  blob_proto.set_content(proto.SerializeAsString());
189  acceptor(name, blob_proto.SerializeAsString());
190 }
191 
192 void DBReaderDeserializer::Deserialize(const BlobProto& proto, Blob* blob) {
193  DBReaderProto reader_proto;
194  CAFFE_ENFORCE(
195  reader_proto.ParseFromString(proto.content()),
196  "Cannot parse content into a DBReaderProto.");
197  blob->Reset(new DBReader(reader_proto));
198 }
199 
200 namespace {
201 // Serialize TensorCPU.
202 REGISTER_BLOB_SERIALIZER((TypeMeta::Id<DBReader>()),
204 REGISTER_BLOB_DESERIALIZER(DBReader, DBReaderDeserializer);
205 } // namespace
206 
207 } // namespace db
208 } // namespace caffe2
unique_ptr< Transaction > NewTransaction() override
Returns a transaction to write data to the database.
Definition: db.cc:157
Blob is a general container that hosts a typed pointer.
Definition: blob.h:25
string key() override
Returns the current key.
Definition: db.cc:70
An abstract class for the current database transaction while writing.
Definition: db.h:61
An abstract class for the cursor of the database while reading.
Definition: db.h:22
bool Valid() override
Returns whether the current location is valid - for example, if we have reached the end of the databa...
Definition: db.cc:80
unique_ptr< Cursor > NewCursor() override
Returns a cursor to read the database.
Definition: db.cc:152
A reader wrapper for DB that also allows us to serialize it.
Definition: db.h:144
void Next() override
Go to the next location in the database.
Definition: db.cc:43
void SeekToFirst() override
Seek to the first key in the database.
Definition: db.cc:35
void Close() override
Closes the database.
Definition: db.cc:145
void Put(const string &key, const string &value) override
Puts the key value pair to the database.
Definition: db.cc:100
string value() override
Returns the current value.
Definition: db.cc:75
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
An abstract class for accessing a database of key-value pairs.
Definition: db.h:80
void Serialize(const Blob &blob, const string &name, BlobSerializerBase::SerializationAcceptor acceptor) override
Serializes a DBReader.
Definition: db.cc:172
T * Reset(T *allocated)
Sets the underlying object to the allocated one.
Definition: blob.h:121
void Seek(const string &) override
Seek to a specific key (or if the key does not exist, seek to the immediate next).
Definition: db.cc:31
bool IsType() const
Checks if the content stored in the blob is of type T.
Definition: blob.h:58
const T & Get() const
Gets the const reference of the stored object.
Definition: blob.h:75
void Commit() override
Commits the current writes.
Definition: db.cc:111