Caffe2 - C++ API
A deep learning, cross platform ML framework
redis_store_handler.cc
1 #include "redis_store_handler.h"
2 
3 #include <caffe2/core/logging.h>
4 
5 #include <chrono>
6 #include <thread>
7 #include <vector>
8 
9 namespace caffe2 {
10 
11 RedisStoreHandler::RedisStoreHandler(
12  std::string& host,
13  int port,
14  std::string& prefix)
15  : host_(host), port_(port), prefix_(prefix) {
16  struct timeval tv = {
17  .tv_sec = 5, .tv_usec = 0,
18  };
19 
20  redis_ = redisConnectWithTimeout(host.c_str(), port, tv);
21  CAFFE_ENFORCE_NE(redis_, (redisContext*)nullptr);
22  CAFFE_ENFORCE_EQ(redis_->err, 0, redis_->errstr);
23 }
24 
25 RedisStoreHandler::~RedisStoreHandler() {
26  redisFree(redis_);
27 }
28 
29 std::string RedisStoreHandler::compoundKey(const std::string& name) {
30  return prefix_ + name;
31 }
32 
33 void RedisStoreHandler::set(const std::string& name, const std::string& data) {
34  auto key = compoundKey(name);
35  void* ptr = redisCommand(
36  redis_,
37  "SETNX %b %b",
38  key.c_str(),
39  (size_t)key.size(),
40  data.c_str(),
41  (size_t)data.size());
42  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
43  redisReply* reply = static_cast<redisReply*>(ptr);
44  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
45  CAFFE_ENFORCE_EQ(
46  reply->integer,
47  1,
48  "Value at ",
49  name,
50  " was already set",
51  " (perhaps you reused a run ID you have used before?)");
52 }
53 
54 std::string RedisStoreHandler::get(const std::string& name) {
55  // Block until key is set
56  wait({name});
57 
58  auto key = compoundKey(name);
59  void* ptr = redisCommand(redis_, "GET %b", key.c_str(), (size_t)key.size());
60  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
61  redisReply* reply = static_cast<redisReply*>(ptr);
62  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_STRING);
63  return std::string(reply->str, reply->len);
64 }
65 
66 int64_t RedisStoreHandler::add(const std::string& name, int64_t value) {
67  auto key = compoundKey(name);
68  void* ptr = redisCommand(
69  redis_, "INCRBY %b %ld", key.c_str(), (size_t)key.size(), value);
70  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
71  redisReply* reply = static_cast<redisReply*>(ptr);
72  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
73  return reply->integer;
74 }
75 
76 bool RedisStoreHandler::check(const std::vector<std::string>& names) {
77  std::vector<std::string> args;
78  args.push_back("EXISTS");
79  for (const auto& name : names) {
80  args.push_back(compoundKey(name));
81  }
82 
83  std::vector<const char*> argv;
84  std::vector<size_t> argvlen;
85  for (const auto& arg : args) {
86  argv.push_back(arg.c_str());
87  argvlen.push_back(arg.length());
88  }
89 
90  auto argc = argv.size();
91  void* ptr = redisCommandArgv(redis_, argc, argv.data(), argvlen.data());
92  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
93  redisReply* reply = static_cast<redisReply*>(ptr);
94  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
95  return reply->integer == names.size();
96 }
97 
98 void RedisStoreHandler::wait(
99  const std::vector<std::string>& names,
100  const std::chrono::milliseconds& timeout) {
101  // Simple approach: poll...
102  // Complex approach: use pub/sub.
103  // Polling is fine for the typical rendezvous use case, as it is
104  // only done at initialization time and not at run time.
105  const auto start = std::chrono::steady_clock::now();
106  while (!check(names)) {
107  const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
108  std::chrono::steady_clock::now() - start);
109  if (timeout != kNoTimeout && elapsed > timeout) {
110  STORE_HANDLER_TIMEOUT("Wait timeout for name(s): ", Join(" ", names));
111  }
112  /* sleep override */
113  std::this_thread::sleep_for(std::chrono::milliseconds(10));
114  }
115 }
116 }
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...