proxygen
DeterministicScheduleTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
21 
22 using namespace folly::test;
23 
26  int buckets[10] = {};
27  for (int i = 0; i < 100000; ++i) {
28  buckets[p(10)]++;
29  }
30  for (int i = 0; i < 10; ++i) {
31  EXPECT_TRUE(buckets[i] > 9000);
32  }
33 }
34 
35 TEST(DeterministicSchedule, uniformSubset) {
36  auto ps = DeterministicSchedule::uniformSubset(0, 3, 100);
37  int buckets[10] = {};
38  std::set<int> seen;
39  for (int i = 0; i < 100000; ++i) {
40  if (i > 0 && (i % 100) == 0) {
41  EXPECT_EQ(seen.size(), 3);
42  seen.clear();
43  }
44  int x = ps(10);
45  seen.insert(x);
46  EXPECT_TRUE(seen.size() <= 3);
47  buckets[x]++;
48  }
49  for (int i = 0; i < 10; ++i) {
50  EXPECT_TRUE(buckets[i] > 9000);
51  }
52 }
53 
55  for (bool bug : {false, true}) {
57  if (bug) {
58  FOLLY_TEST_DSCHED_VLOG("Test with race condition");
59  } else {
60  FOLLY_TEST_DSCHED_VLOG("Test without race condition");
61  }
63  // The use of DeterinisticAtomic is not needed here, but it makes
64  // it easier to understand the sequence of events in logs.
66  DeterministicAtomic<int> baseline{0};
67  int numThreads = 10;
68  std::vector<std::thread> threads(numThreads);
69  for (int t = 0; t < numThreads; ++t) {
70  threads[t] = DeterministicSchedule::thread([&, t] {
71  baseline.fetch_add(1);
72  // Atomic increment of test protected by mutex m
73  do {
74  // Some threads use lock() others use try_lock()
75  if ((t & 1) == 0) {
76  m.lock();
77  } else {
78  if (!m.try_lock()) {
79  continue;
80  }
81  }
82  int newval = test.load() + 1;
83  if (bug) {
84  // Break the atomicity of the increment operation
85  m.unlock();
86  m.lock();
87  }
88  test.store(newval);
89  m.unlock();
90  break;
91  } while (true);
92  }); // thread lambda
93  } // for t
94  for (auto& t : threads) {
96  }
97  if (!bug) {
98  EXPECT_EQ(test.load(), baseline.load());
99  } else {
100  if (test.load() == baseline.load()) {
101  FOLLY_TEST_DSCHED_VLOG("Didn't catch the bug");
102  } else {
103  FOLLY_TEST_DSCHED_VLOG("Caught the bug");
104  }
105  }
106  } // for bug
107 }
108 
109 /*
110  * Test DSched support for auxiliary data and global invariants
111  *
112  * How to use DSched support for auxiliary data and global invariants
113  * (Let Foo<T, Atom> be the template to be tested):
114  * 1. Add friend AnnotatedFoo<T> to Foo<T,Atom> (Typically, in Foo.h).
115  * 2. Define a class AuxData for whatever auxiliary data is needed
116  * to maintain global knowledge of shared and private state.
117  * 3. Define:
118  * static AuxData* aux_;
119  * static FOLLY_TLS uint32_t tid_;
120  * 4. (Optional) Define gflags for command line options. E.g.:
121  * DEFINE_int64(seed, 0, "Seed for random number generators");
122  * 5. (Optionl) Define macros for mangement of auxiliary data. E.g.,
123  * #define AUX_THR(x) (aux_->t_[tid_]->x)
124  * 6. (Optional) Define macro for creating auxiliary actions. E.g.,
125  * #define AUX_ACT(act) \
126  * { \
127  * AUX_THR(func_) = __func__; \
128  * AUX_THR(line_) = __LINE__; \
129  * AuxAct auxact([&](bool success) { if (success); act}); \
130  * DeterministicSchedule::setAuxAct(auxact); \
131  * }
132  * [Note: Auxiliary actions must not contain any standard shared
133  * accesses, or else deadlock will occur. Use the load_direct()
134  * member function of DeterministicAtomic instead.]
135  * 7. Define AnnotatedFoo<T> derived from Foo<T,DeterministicAtomic>.
136  * 8. Define member functions in AnnotatedFoo to manage DSched::auxChk.
137  * 9. Define member functions for logging and checkig global invariants.
138  * 10. Define member functions for direct access to data members of Foo.
139  * 11. (Optional) Add a member function dummyStep() to update
140  * auxiliary data race-free when the next step is unknoown or
141  * not conveniently accessible (e.g., in a different
142  * library). The functions adds a dummy shared step to force
143  * DSched to invoke the auxiliary action at a known point.This
144  * is needed for now because DSched allows threads to run in
145  * parallel between shared accesses. Hence, concurrent updates
146  * of shared auxiliary data can be racy if executed outside
147  * auxiliary actions. This may be obviated in the future if
148  * DSched supports fully seriallized execution.
149  * void dummyStep() {
150  * DeterministicSchedule::beforeSharedAccess();
151  * DeterministicSchedule::afterSharedAccess(true);
152  * }
153  * 12. Override member functions of Foo as needed in order to
154  * annotate the code with auxiliary actions. [Note: There may be
155  * a lot of duplication of Foo's code. Alternatively, Foo can be
156  * annotated directly.]
157  * 13. Define TEST using instances of AuxData and AnnotatedFoo.
158  * 14. For debugging, iteratively add (as needed) auxiliary data,
159  * global invariants, logging details, command line flags as
160  * needed and selectively generate relevant logs to detect the
161  * race condition shortly after it occurs.
162  *
163  * In the following example Foo = AtomicCounter
164  */
165 
167 
169 template <typename T>
171 
173 template <typename T, template <typename> class Atom = std::atomic>
176  friend struct AnnotatedAtomicCounter<T>;
177 
178  public:
179  explicit AtomicCounter(T val) : counter_(val) {}
180 
181  void inc() {
182  this->counter_.fetch_add(1);
183  }
184 
185  void incBug() {
186  this->counter_.store(this->counter_.load() + 1);
187  }
188 
189  T load() {
190  return this->counter_.load();
191  }
192 
193  private:
194  Atom<T> counter_ = {0};
195 };
196 
198 struct AuxData {
199  using T = int;
200 
201  /* General */
202  uint64_t step_ = {0};
203  uint64_t lastUpdate_ = {0};
204 
205  struct PerThread {
206  /* General */
208  int line_;
209  /* Custom */
210  T count_ = {0};
211  };
212 
213  std::vector<PerThread> t_;
214 
215  explicit AuxData(int nthr) : t_(nthr) {}
216 };
217 
218 static AuxData* aux_;
219 static FOLLY_TLS uint32_t tid_;
220 
221 /* Command line flags */
222 DEFINE_int64(seed, 0, "Seed for random number generators");
223 DEFINE_int64(max_steps, 1000000, "Max. number of shared steps for the test");
224 DEFINE_int64(num_reps, 1, "Number of test repetitions");
225 DEFINE_int64(num_ops, 1000, "Number of increments per repetition");
226 DEFINE_int64(liveness_thresh, 1000000, "Liveness threshold");
227 DEFINE_int64(log_begin, 0, "Step number to start logging. No logging if <= 0");
228 DEFINE_int64(log_length, 1000, "Length of step by step log (if log_begin > 0)");
229 DEFINE_int64(log_freq, 100000, "Log every so many steps");
230 DEFINE_int32(num_threads, 1, "Number of producers");
231 DEFINE_bool(bug, false, "Introduce bug");
232 
234 #define AUX_THR(x) (aux_->t_[tid_].x)
235 #define AUX_UPDATE() (aux_->lastUpdate_ = aux_->step_ + 1)
236 
238 #define AUX_ACT(act) \
239  do { \
240  AUX_THR(func_) = __func__; \
241  AUX_THR(line_) = __LINE__; \
242  AuxAct auxfn([&](bool success) { \
243  if (success) { \
244  } \
245  if (true) { \
246  act \
247  } \
248  }); \
249  DeterministicSchedule::setAuxAct(auxfn); \
250  } while (0)
251 
253 template <typename T>
255 
257 template <typename T>
258 struct AnnotatedAtomicCounter : public Base<T> {
260  void setAuxChk() {
261  AuxChk auxfn([&](uint64_t step) {
262  auxLog(step);
263  auxCheck();
264  });
266  }
267 
268  void clearAuxChk() {
270  }
271 
273  void auxLog(uint64_t step) {
274  if (aux_->step_ == 0) {
275  aux_->lastUpdate_ = step;
276  }
277  aux_->step_ = step;
278  if (step > (uint64_t)FLAGS_max_steps) {
279  exit(0);
280  }
281  bool doLog =
282  (((FLAGS_log_begin > 0) && (step >= (uint64_t)FLAGS_log_begin) &&
283  (step <= (uint64_t)FLAGS_log_begin + FLAGS_log_length)) ||
284  ((step % FLAGS_log_freq) == 0));
285  if (doLog) {
286  doAuxLog(step);
287  }
288  }
289 
290  void doAuxLog(uint64_t step) {
291  std::stringstream ss;
292  /* General */
293  ss << step << " - " << aux_->lastUpdate_ << " --";
294  /* Shared */
295  ss << " counter =" << this->counter_.load_direct();
296  /* Thread */
297  ss << " -- t" << tid_ << " " << AUX_THR(func_) << ":" << AUX_THR(line_);
298  ss << " count[" << tid_ << "] = " << AUX_THR(count_);
299  /* Output */
300  std::cerr << ss.str() << std::endl;
301  }
302 
303  void auxCheck() {
304  /* Liveness */
305  CHECK_LT(aux_->step_, aux_->lastUpdate_ + FLAGS_liveness_thresh);
306  /* Safety */
307  int sum = {0};
308  for (auto& t : aux_->t_) {
309  sum += t.count_;
310  }
311  CHECK_EQ(this->counter_.load_direct(), sum);
312  }
313 
314  /* Direct access without going through DSched */
316  return this->counter_.load_direct();
317  }
318 
319  /* Constructor -- calls original constructor */
320  explicit AnnotatedAtomicCounter(int val) : Base<T>(val) {}
321 
322  /* Overloads of original member functions (as needed) */
323 
324  void inc() {
325  AUX_ACT({ ++AUX_THR(count_); });
326  this->counter_.fetch_add(1);
327  }
328 
329  void incBug() {
330  AUX_ACT({});
331  T newval = this->counter_.load() + 1;
332  AUX_ACT({ ++AUX_THR(count_); });
333  this->counter_.store(newval);
334  }
335 };
336 
338 
339 TEST(DeterministicSchedule, global_invariants) {
340  CHECK_GT(FLAGS_num_threads, 0);
341 
342  DSched sched(DSched::uniform(FLAGS_seed));
343  for (int i = 0; i < FLAGS_num_reps; ++i) {
344  aux_ = new AuxData(FLAGS_num_threads);
345  Annotated annotated(0);
346  annotated.setAuxChk();
347 
348  std::vector<std::thread> threads(FLAGS_num_threads);
349  for (int tid = 0; tid < FLAGS_num_threads; ++tid) {
350  threads[tid] = DSched::thread([&, tid]() {
351  tid_ = tid;
352  for (int j = tid; j < FLAGS_num_ops; j += FLAGS_num_threads) {
353  (FLAGS_bug) ? annotated.incBug() : annotated.inc();
354  }
355  });
356  }
357  for (auto& t : threads) {
358  DSched::join(t);
359  }
360  std::cerr << "====== rep " << i << " completed in step " << aux_->step_
361  << std::endl;
362  annotated.doAuxLog(aux_->step_);
363  std::cerr << std::endl;
364  EXPECT_EQ(annotated.loadDirect(), FLAGS_num_ops);
365  annotated.clearAuxChk();
366  delete aux_;
367  }
368 }
369 
370 int main(int argc, char** argv) {
371  testing::InitGoogleTest(&argc, argv);
372  gflags::ParseCommandLineFlags(&argc, &argv, true);
373  return RUN_ALL_TESTS();
374 }
Definition: InvokeTest.cpp:58
std::atomic< int64_t > sum(0)
#define FOLLY_TEST_DSCHED_VLOG(...)
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: gtest.h:2232
static const int seed
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
static AuxData * aux_
const int x
DEFINE_int32(num_threads, 1,"Number of producers")
const int nthr[]
double val
Definition: String.cpp:273
folly::std T
static std::thread thread(Func &&func, Args &&...args)
#define AUX_THR(x)
DEFINE_int64(threadtimeout_ms, 60000,"Idle time before ThreadPoolExecutor threads are joined")
std::unordered_set< std::pair< const IValidator *, const dynamic * > > seen
Definition: JSONSchema.cpp:92
std::vector< std::thread::id > threads
#define AUX_ACT(act)
char ** argv
static std::function< size_t(size_t)> uniform(uint64_t seed)
std::function< void(uint64_t)> AuxChk
#define Atom
static map< string, int > m
AtomicCounter< T, DeterministicAtomic > Base
static std::function< size_t(size_t)> uniformSubset(uint64_t seed, size_t n=2, size_t m=64)
TEST(ProgramOptionsTest, Errors)
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
DEFINE_bool(bug, false,"Introduce bug")
const char * string
Definition: Conv.cpp:212
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
Definition: gtest.cc:5370
folly::detail::CompressionCounter * counter_
static void join(std::thread &child)
int main(int argc, char **argv)
std::vector< PerThread > t_
static FOLLY_TLS uint32_t tid_