Caffe2 - C++ API
A deep learning, cross platform ML framework
rocksdb.cc
1 
17 #include "caffe2/core/db.h"
18 #include "caffe2/core/logging.h"
19 #include "caffe2/core/flags.h"
20 #include "rocksdb/db.h"
21 #include "rocksdb/utilities/leveldb_options.h"
22 
23 CAFFE2_DEFINE_int(caffe2_rocksdb_block_size, 65536,
24  "The caffe2 rocksdb block size when writing a rocksdb.");
25 
26 namespace caffe2 {
27 namespace db {
28 
29 class RocksDBCursor : public Cursor {
30  public:
31  explicit RocksDBCursor(rocksdb::DB* db)
32  : iter_(db->NewIterator(rocksdb::ReadOptions())) {
33  SeekToFirst();
34  }
35  ~RocksDBCursor() {}
36  void Seek(const string& key) override { iter_->Seek(key); }
37  bool SupportsSeek() override { return true; }
38  void SeekToFirst() override { iter_->SeekToFirst(); }
39  void Next() override { iter_->Next(); }
40  string key() override { return iter_->key().ToString(); }
41  string value() override { return iter_->value().ToString(); }
42  bool Valid() override { return iter_->Valid(); }
43 
44  private:
45  std::unique_ptr<rocksdb::Iterator> iter_;
46 };
47 
49  public:
50  explicit RocksDBTransaction(rocksdb::DB* db) : db_(db) {
51  CAFFE_ENFORCE(db_);
52  batch_.reset(new rocksdb::WriteBatch());
53  }
54  ~RocksDBTransaction() { Commit(); }
55  void Put(const string& key, const string& value) override {
56  batch_->Put(key, value);
57  }
58  void Commit() override {
59  rocksdb::Status status = db_->Write(rocksdb::WriteOptions(), batch_.get());
60  batch_.reset(new rocksdb::WriteBatch());
61  CAFFE_ENFORCE(
62  status.ok(), "Failed to write batch to rocksdb: " + status.ToString());
63  }
64 
65  private:
66  rocksdb::DB* db_;
67  std::unique_ptr<rocksdb::WriteBatch> batch_;
68 
69  DISABLE_COPY_AND_ASSIGN(RocksDBTransaction);
70 };
71 
72 class RocksDB : public DB {
73  public:
74  RocksDB(const string& source, Mode mode) : DB(source, mode) {
75  rocksdb::LevelDBOptions options;
76  options.block_size = FLAGS_caffe2_rocksdb_block_size;
77  options.write_buffer_size = 268435456;
78  options.max_open_files = 100;
79  options.error_if_exists = mode == NEW;
80  options.create_if_missing = mode != READ;
81  rocksdb::Options rocksdb_options = rocksdb::ConvertOptions(options);
82 
83  rocksdb::DB* db_temp;
84  rocksdb::Status status = rocksdb::DB::Open(
85  rocksdb_options, source, &db_temp);
86  CAFFE_ENFORCE(
87  status.ok(),
88  "Failed to open rocksdb ",
89  source,
90  "\n",
91  status.ToString());
92  db_.reset(db_temp);
93  VLOG(1) << "Opened rocksdb " << source;
94  }
95 
96  void Close() override { db_.reset(); }
97  unique_ptr<Cursor> NewCursor() override {
98  return make_unique<RocksDBCursor>(db_.get());
99  }
100  unique_ptr<Transaction> NewTransaction() override {
101  return make_unique<RocksDBTransaction>(db_.get());
102  }
103 
104  private:
105  std::unique_ptr<rocksdb::DB> db_;
106 };
107 
108 REGISTER_CAFFE2_DB(RocksDB, RocksDB);
109 // For lazy-minded, one can also call with lower-case name.
110 REGISTER_CAFFE2_DB(rocksdb, RocksDB);
111 
112 } // namespace db
113 
114 CAFFE2_MODULE(caffe2_rocksdb, "RocksDB implementation for caffe2::DB.");
115 } // namespace caffe2
bool Valid() override
Returns whether the current location is valid - for example, if we have reached the end of the databa...
Definition: rocksdb.cc:42
void Next() override
Go to the next location in the database.
Definition: rocksdb.cc:39
unique_ptr< Cursor > NewCursor() override
Returns a cursor to read the database.
Definition: rocksdb.cc:97
An abstract class for the current database transaction while writing.
Definition: db.h:61
An abstract class for the cursor of the database while reading.
Definition: db.h:22
unique_ptr< Transaction > NewTransaction() override
Returns a transaction to write data to the database.
Definition: rocksdb.cc:100
string value() override
Returns the current value.
Definition: rocksdb.cc:41
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
An abstract class for accessing a database of key-value pairs.
Definition: db.h:80
void SeekToFirst() override
Seek to the first key in the database.
Definition: rocksdb.cc:38
void Put(const string &key, const string &value) override
Puts the key value pair to the database.
Definition: rocksdb.cc:55
void Seek(const string &key) override
Seek to a specific key (or if the key does not exist, seek to the immediate next).
Definition: rocksdb.cc:36
void Commit() override
Commits the current writes.
Definition: rocksdb.cc:58
string key() override
Returns the current key.
Definition: rocksdb.cc:40
Commandline flags support for Caffe2.
void Close() override
Closes the database.
Definition: rocksdb.cc:96