1 #include "redis_store_handler.h" 3 #include <caffe2/core/logging.h> 11 RedisStoreHandler::RedisStoreHandler(
15 : host_(host), port_(port), prefix_(prefix) {
17 .tv_sec = 5, .tv_usec = 0,
20 redis_ = redisConnectWithTimeout(host.c_str(), port, tv);
21 CAFFE_ENFORCE_NE(redis_, (redisContext*)
nullptr);
22 CAFFE_ENFORCE_EQ(redis_->err, 0, redis_->errstr);
25 RedisStoreHandler::~RedisStoreHandler() {
29 std::string RedisStoreHandler::compoundKey(
const std::string& name) {
30 return prefix_ + name;
33 void RedisStoreHandler::set(
const std::string& name,
const std::string& data) {
34 auto key = compoundKey(name);
35 void* ptr = redisCommand(
42 CAFFE_ENFORCE_NE(ptr, (
void*)
nullptr, redis_->errstr);
43 redisReply* reply =
static_cast<redisReply*
>(ptr);
44 CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
51 " (perhaps you reused a run ID you have used before?)");
54 std::string RedisStoreHandler::get(
const std::string& name) {
58 auto key = compoundKey(name);
59 void* ptr = redisCommand(redis_,
"GET %b", key.c_str(), (size_t)key.size());
60 CAFFE_ENFORCE_NE(ptr, (
void*)
nullptr, redis_->errstr);
61 redisReply* reply =
static_cast<redisReply*
>(ptr);
62 CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_STRING);
63 return std::string(reply->str, reply->len);
66 int64_t RedisStoreHandler::add(
const std::string& name, int64_t value) {
67 auto key = compoundKey(name);
68 void* ptr = redisCommand(
69 redis_,
"INCRBY %b %ld", key.c_str(), (size_t)key.size(), value);
70 CAFFE_ENFORCE_NE(ptr, (
void*)
nullptr, redis_->errstr);
71 redisReply* reply =
static_cast<redisReply*
>(ptr);
72 CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
73 return reply->integer;
76 bool RedisStoreHandler::check(
const std::vector<std::string>& names) {
77 std::vector<std::string> args;
78 args.push_back(
"EXISTS");
79 for (
const auto& name : names) {
80 args.push_back(compoundKey(name));
83 std::vector<const char*> argv;
84 std::vector<size_t> argvlen;
85 for (
const auto& arg : args) {
86 argv.push_back(arg.c_str());
87 argvlen.push_back(arg.length());
90 auto argc = argv.size();
91 void* ptr = redisCommandArgv(redis_, argc, argv.data(), argvlen.data());
92 CAFFE_ENFORCE_NE(ptr, (
void*)
nullptr, redis_->errstr);
93 redisReply* reply =
static_cast<redisReply*
>(ptr);
94 CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
95 return reply->integer == names.size();
98 void RedisStoreHandler::wait(
99 const std::vector<std::string>& names,
100 const std::chrono::milliseconds& timeout) {
105 const auto start = std::chrono::steady_clock::now();
106 while (!check(names)) {
107 const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
108 std::chrono::steady_clock::now() - start);
109 if (timeout != kNoTimeout && elapsed > timeout) {
110 STORE_HANDLER_TIMEOUT(
"Wait timeout for name(s): ", Join(
" ", names));
113 std::this_thread::sleep_for(std::chrono::milliseconds(10));
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...