1 #include "caffe2/core/db.h" 5 #include "caffe2/core/blob_serialization.h" 6 #include "caffe2/core/logging.h" 10 CAFFE_KNOWN_TYPE(db::DBReader);
11 CAFFE_KNOWN_TYPE(db::Cursor);
15 CAFFE_DEFINE_REGISTRY(Caffe2DBRegistry, DB,
const string&, Mode);
25 : file_(f), lock_(*mutex), valid_(
true) {
31 void Seek(
const string& )
override {
32 LOG(FATAL) <<
"MiniDB does not support seeking to a specific key.";
36 fseek(file_, 0, SEEK_SET);
37 CAFFE_ENFORCE(!feof(file_),
"Hmm, empty file?");
45 if (fread(&key_len_,
sizeof(
int), 1, file_) == 0) {
47 VLOG(1) <<
"EOF reached, setting valid to false";
51 CAFFE_ENFORCE_EQ(fread(&value_len_,
sizeof(
int), 1, file_), 1);
52 CAFFE_ENFORCE_GT(key_len_, 0);
53 CAFFE_ENFORCE_GT(value_len_, 0);
55 if (key_len_ > key_.size()) {
56 key_.resize(key_len_);
58 if (value_len_ > value_.size()) {
59 value_.resize(value_len_);
63 fread(key_.data(),
sizeof(char), key_len_, file_), key_len_);
65 fread(value_.data(),
sizeof(char), value_len_, file_), value_len_);
70 string key()
override {
71 CAFFE_ENFORCE(valid_,
"Cursor is at invalid location!");
72 return string(key_.data(), key_len_);
76 CAFFE_ENFORCE(valid_,
"Cursor is at invalid location!");
77 return string(value_.data(), value_len_);
80 bool Valid()
override {
return valid_; }
84 std::lock_guard<std::mutex> lock_;
95 : file_(f), lock_(*mutex) {}
101 int key_len = key.size();
102 int value_len = value.size();
103 CAFFE_ENFORCE_EQ(fwrite(&key_len,
sizeof(
int), 1, file_), 1);
104 CAFFE_ENFORCE_EQ(fwrite(&value_len,
sizeof(
int), 1, file_), 1);
106 fwrite(key.c_str(),
sizeof(char), key_len, file_), key_len);
108 fwrite(value.c_str(),
sizeof(char), value_len, file_), value_len);
112 if (file_ !=
nullptr) {
113 CAFFE_ENFORCE_EQ(fflush(file_), 0);
120 std::lock_guard<std::mutex> lock_;
127 MiniDB(
const string& source, Mode mode) :
DB(source, mode), file_(
nullptr) {
130 file_ = fopen(source.c_str(),
"wb");
133 file_ = fopen(source.c_str(),
"ab");
134 fseek(file_, 0, SEEK_END);
137 file_ = fopen(source.c_str(),
"rb");
140 CAFFE_ENFORCE(file_,
"Cannot open file: " + source);
141 VLOG(1) <<
"Opened MiniDB " << source;
153 CAFFE_ENFORCE_EQ(this->mode_, READ);
154 return make_unique<MiniDBCursor>(file_, &file_access_mutex_);
158 CAFFE_ENFORCE(this->mode_ == NEW || this->mode_ == WRITE);
159 return make_unique<MiniDBTransaction>(file_, &file_access_mutex_);
166 std::mutex file_access_mutex_;
170 REGISTER_CAFFE2_DB(minidb,
MiniDB);
175 BlobSerializerBase::SerializationAcceptor acceptor) {
179 proto.set_name(name);
180 proto.set_source(reader.source_);
181 proto.set_db_type(reader.db_type_);
182 if (reader.cursor() && reader.cursor()->SupportsSeek()) {
183 proto.set_key(reader.cursor()->key());
185 BlobProto blob_proto;
186 blob_proto.set_name(name);
187 blob_proto.set_type(
"DBReader");
188 blob_proto.set_content(proto.SerializeAsString());
189 acceptor(name, blob_proto.SerializeAsString());
192 void DBReaderDeserializer::Deserialize(
const BlobProto& proto,
Blob* blob) {
193 DBReaderProto reader_proto;
195 reader_proto.ParseFromString(proto.content()),
196 "Cannot parse content into a DBReaderProto.");
202 REGISTER_BLOB_SERIALIZER((TypeMeta::Id<DBReader>()),
unique_ptr< Transaction > NewTransaction() override
Returns a transaction to write data to the database.
Blob is a general container that hosts a typed pointer.
string key() override
Returns the current key.
An abstract class for the current database transaction while writing.
An abstract class for the cursor of the database while reading.
bool Valid() override
Returns whether the current location is valid - for example, if we have reached the end of the databa...
unique_ptr< Cursor > NewCursor() override
Returns a cursor to read the database.
A reader wrapper for DB that also allows us to serialize it.
void Next() override
Go to the next location in the database.
void SeekToFirst() override
Seek to the first key in the database.
void Close() override
Closes the database.
void Put(const string &key, const string &value) override
Puts the key value pair to the database.
string value() override
Returns the current value.
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
An abstract class for accessing a database of key-value pairs.
void Serialize(const Blob &blob, const string &name, BlobSerializerBase::SerializationAcceptor acceptor) override
Serializes a DBReader.
T * Reset(T *allocated)
Sets the underlying object to the allocated one.
void Seek(const string &) override
Seek to a specific key (or if the key does not exist, seek to the immediate next).
bool IsType() const
Checks if the content stored in the blob is of type T.
const T & Get() const
Gets the const reference of the stored object.
void Commit() override
Commits the current writes.