proxygen
RcuTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
17 
18 #include <thread>
19 #include <vector>
20 
21 #include <glog/logging.h>
22 
23 #include <folly/Benchmark.h>
24 #include <folly/Random.h>
27 
28 using namespace folly;
29 
30 DEFINE_int64(iters, 100000, "Number of iterations");
31 DEFINE_uint64(threads, 32, "Number of threads");
32 
33 TEST(RcuTest, Basic) {
34  auto foo = new int(2);
35  rcu_retire(foo);
36 }
37 
38 class des {
39  bool* d_;
40 
41  public:
42  des(bool* d) : d_(d) {}
43  ~des() {
44  *d_ = true;
45  }
46 };
47 
48 TEST(RcuTest, Guard) {
49  bool del = false;
50  auto foo = new des(&del);
51  { rcu_reader g; }
52  rcu_retire(foo);
54  EXPECT_TRUE(del);
55 }
56 
57 TEST(RcuTest, Perf) {
58  long i = FLAGS_iters;
60  while (i-- > 0) {
61  rcu_reader g;
62  }
64  printf(
65  "Total time %li ns \n",
66  std::chrono::duration_cast<std::chrono::nanoseconds>(diff).count() /
67  FLAGS_iters);
68 }
69 
70 TEST(RcuTest, ResetPerf) {
71  long i = FLAGS_iters;
73  while (i-- > 0) {
74  rcu_retire<int>(nullptr, [](int*) {});
75  }
77  printf(
78  "Total time %li ns \n",
79  std::chrono::duration_cast<std::chrono::nanoseconds>(diff).count() /
80  FLAGS_iters);
81 }
82 
83 TEST(RcuTest, SlowReader) {
84  std::thread t;
85  {
86  rcu_reader g;
87 
88  t = std::thread([&]() { synchronize_rcu(); });
89  usleep(100); // Wait for synchronize to start
90  }
91  t.join();
92 }
93 
95  rcu_reader g;
96  rcu_retire(obj);
97  return g;
98 }
99 
100 TEST(RcuTest, CopyGuard) {
101  bool del = false;
102  auto foo = new des(&del);
103  {
104  auto res = tryretire(foo);
105  EXPECT_FALSE(del);
106  }
107  rcu_barrier();
108  EXPECT_TRUE(del);
109 }
110 
111 TEST(RcuTest, Stress) {
112  std::vector<std::thread> threads;
113  constexpr uint32_t sz = 1000;
114  std::atomic<int*> ints[sz];
115  for (uint32_t i = 0; i < sz; i++) {
116  ints[i].store(new int(0));
117  }
118  for (unsigned th = 0; th < FLAGS_threads; th++) {
119  threads.push_back(std::thread([&]() {
120  for (int i = 0; i < FLAGS_iters / 100; i++) {
121  rcu_reader g;
122  int sum = 0;
123  int* ptrs[sz];
124  for (uint32_t j = 0; j < sz; j++) {
125  ptrs[j] = ints[j].load(std::memory_order_acquire);
126  }
127  for (uint32_t j = 0; j < sz; j++) {
128  sum += *ptrs[j];
129  }
130  EXPECT_EQ(sum, 0);
131  }
132  }));
133  }
134  std::atomic<bool> done{false};
135  std::thread updater([&]() {
136  while (!done.load()) {
137  auto newint = new int(0);
138  auto oldint = ints[folly::Random::rand32() % sz].exchange(newint);
139  rcu_retire<int>(oldint, [](int* obj) {
140  *obj = folly::Random::rand32();
141  delete obj;
142  });
143  }
144  });
145  for (auto& t : threads) {
146  t.join();
147  }
148  done = true;
149  updater.join();
150  // Cleanup for asan
151  synchronize_rcu();
152  for (uint32_t i = 0; i < sz; i++) {
153  delete ints[i].exchange(nullptr);
154  }
155 }
156 
157 TEST(RcuTest, Synchronize) {
158  std::vector<std::thread> threads;
159  for (unsigned th = 0; th < FLAGS_threads; th++) {
160  threads.push_back(std::thread([&]() {
161  for (int i = 0; i < 10; i++) {
162  synchronize_rcu();
163  }
164  }));
165  }
166  for (auto& t : threads) {
167  t.join();
168  }
169 }
170 
171 TEST(RcuTest, NewDomainTest) {
172  struct UniqueTag;
173  rcu_domain<UniqueTag> newdomain(nullptr);
174  synchronize_rcu(&newdomain);
175 }
176 
177 TEST(RcuTest, NewDomainGuardTest) {
178  struct UniqueTag;
179  rcu_domain<UniqueTag> newdomain(nullptr);
180  bool del = false;
181  auto foo = new des(&del);
182  { rcu_reader_domain<UniqueTag> g(&newdomain); }
183  rcu_retire(foo, {}, &newdomain);
184  synchronize_rcu(&newdomain);
185  EXPECT_TRUE(del);
186 }
187 
188 TEST(RcuTest, MovableReader) {
189  {
190  rcu_reader g;
191  rcu_reader f(std::move(g));
192  }
193  synchronize_rcu();
194  {
195  rcu_reader g(std::defer_lock);
196  rcu_reader f;
197  g = std::move(f);
198  }
199  synchronize_rcu();
200 }
201 
202 TEST(RcuTest, SynchronizeInCall) {
203  rcu_default_domain()->call([]() { synchronize_rcu(); });
204  synchronize_rcu();
205 }
206 
207 TEST(RcuTest, MoveReaderBetweenThreads) {
208  rcu_reader g;
209  std::thread t([f = std::move(g)] {});
210  t.join();
211  synchronize_rcu();
212 }
213 
214 TEST(RcuTest, ForkTest) {
215  rcu_token epoch;
216  std::thread t([&]() { epoch = rcu_default_domain()->lock_shared(); });
217  t.join();
218  auto pid = fork();
219  if (pid) {
220  // parent
221  rcu_default_domain()->unlock_shared(std::move(epoch));
222  synchronize_rcu();
223  int status;
224  auto pid2 = wait(&status);
225  EXPECT_EQ(status, 0);
226  EXPECT_EQ(pid, pid2);
227  } else {
228  // child
229  synchronize_rcu();
230  exit(0); // Do not print gtest results
231  }
232 }
233 
234 TEST(RcuTest, ThreadLocalList) {
235  struct TTag;
237  std::vector<std::thread> threads{FLAGS_threads};
238  std::atomic<unsigned long> done{FLAGS_threads};
239  for (auto& tr : threads) {
240  tr = std::thread([&]() {
241  for (int i = 0; i < FLAGS_iters; i++) {
243  lists.push(node);
244  }
245  --done;
246  });
247  }
248  while (done.load() > 0) {
250  lists.collect(list);
252  delete node;
253  });
254  }
255  for (auto& thread : threads) {
256  thread.join();
257  }
258  // Run cleanup pass one more time to make ASAN happy
260  lists.collect(list);
261  list.forEach(
262  [](folly::detail::ThreadCachedLists<TTag>::Node* node) { delete node; });
263 }
264 
265 TEST(RcuTest, ThreadDeath) {
266  bool del = false;
267  std::thread t([&] {
268  auto foo = new des(&del);
269  rcu_retire(foo);
270  });
271  t.join();
272  synchronize_rcu();
273  EXPECT_TRUE(del);
274 }
275 
276 TEST(RcuTest, RcuObjBase) {
277  bool retired = false;
278  struct base_test : rcu_obj_base<base_test> {
279  bool* ret_;
280  base_test(bool* ret) : ret_(ret) {}
281  ~base_test() {
282  (*ret_) = true;
283  }
284  };
285 
286  auto foo = new base_test(&retired);
287  foo->retire();
288  synchronize_rcu();
289  EXPECT_TRUE(retired);
290 }
auto f
std::atomic< int64_t > sum(0)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
~des()
Definition: RcuTest.cpp:43
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::chrono::steady_clock::time_point now()
Definition: RcuTest.cpp:38
DEFINE_uint64(threads, 32,"Number of threads")
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
DEFINE_int64(threadtimeout_ms, 60000,"Idle time before ThreadPoolExecutor threads are joined")
std::vector< std::thread::id > threads
des(bool *d)
Definition: RcuTest.cpp:42
auto tr
Encoder::MutableCompressedList list
void rcu_retire(T *p, D d={}, rcu_domain< Tag > *domain=rcu_default_domain())
Definition: Rcu.h:466
bool wait(Waiter *waiter, bool shouldSleep, Waiter *&next)
rcu_domain< RcuTag > * rcu_default_domain()
Definition: Rcu.h:385
auto start
int * count
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
void rcu_barrier(rcu_domain< Tag > *domain=rcu_default_domain()) noexcept
Definition: Rcu.h:456
uint64_t diff(uint64_t a, uint64_t b)
Definition: FutexTest.cpp:135
g_t g(f_t)
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
static uint32_t rand32()
Definition: Random.h:213
TEST(SequencedExecutor, CPUThreadPoolExecutor)
bool * d_
Definition: RcuTest.cpp:39
rcu_reader tryretire(des *obj)
Definition: RcuTest.cpp:94
void synchronize_rcu(rcu_domain< Tag > *domain=rcu_default_domain()) noexcept
Definition: Rcu.h:450