proxygen
ObserverTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2016-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <thread>
18 
22 
23 using namespace folly::observer;
24 
25 TEST(Observer, Observable) {
26  SimpleObservable<int> observable(42);
27  auto observer = observable.getObserver();
28 
29  EXPECT_EQ(42, **observer);
30 
31  folly::Baton<> baton;
32  auto waitingObserver = makeObserver([observer, &baton]() {
33  *observer;
34  baton.post();
35  return folly::Unit();
36  });
37  baton.reset();
38 
39  observable.setValue(24);
40 
41  EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
42 
43  EXPECT_EQ(24, **observer);
44 }
45 
46 TEST(Observer, MakeObserver) {
47  SimpleObservable<int> observable(42);
48 
49  auto observer = makeObserver(
50  [child = observable.getObserver()]() { return **child + 1; });
51 
52  EXPECT_EQ(43, **observer);
53 
54  folly::Baton<> baton;
55  auto waitingObserver = makeObserver([observer, &baton]() {
56  *observer;
57  baton.post();
58  return folly::Unit();
59  });
60  baton.reset();
61 
62  observable.setValue(24);
63 
64  EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
65 
66  EXPECT_EQ(25, **observer);
67 }
68 
69 TEST(Observer, MakeObserverDiamond) {
70  SimpleObservable<int> observable(42);
71 
72  auto observer1 = makeObserver(
73  [child = observable.getObserver()]() { return **child + 1; });
74 
75  auto observer2 = makeObserver([child = observable.getObserver()]() {
76  return std::make_shared<int>(**child + 2);
77  });
78 
79  auto observer = makeObserver(
80  [observer1, observer2]() { return (**observer1) * (**observer2); });
81 
82  EXPECT_EQ(43 * 44, *observer.getSnapshot());
83 
84  folly::Baton<> baton;
85  auto waitingObserver = makeObserver([observer, &baton]() {
86  *observer;
87  baton.post();
88  return folly::Unit();
89  });
90  baton.reset();
91 
92  observable.setValue(24);
93 
94  EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
95 
96  EXPECT_EQ(25 * 26, **observer);
97 }
98 
99 TEST(Observer, CreateException) {
100  struct ExpectedException {};
101  EXPECT_THROW(
102  auto observer = makeObserver(
103  []() -> std::shared_ptr<int> { throw ExpectedException(); }),
104  ExpectedException);
105 
106  EXPECT_THROW(
107  auto observer =
108  makeObserver([]() -> std::shared_ptr<int> { return nullptr; }),
109  std::logic_error);
110 }
111 
112 TEST(Observer, NullValue) {
113  SimpleObservable<int> observable(41);
114  auto oddObserver = makeObserver([innerObserver = observable.getObserver()]() {
115  auto value = **innerObserver;
116 
117  if (value % 2 != 0) {
118  return value * 2;
119  }
120 
121  throw std::logic_error("I prefer odd numbers");
122  });
123 
124  folly::Baton<> baton;
125  auto waitingObserver = makeObserver([oddObserver, &baton]() {
126  *oddObserver;
127  baton.post();
128  return folly::Unit();
129  });
130 
131  baton.reset();
132  EXPECT_EQ(82, **oddObserver);
133 
134  observable.setValue(2);
135 
136  // Waiting observer shouldn't be updated
137  EXPECT_FALSE(baton.try_wait_for(std::chrono::seconds{1}));
138  baton.reset();
139 
140  EXPECT_EQ(82, **oddObserver);
141 
142  observable.setValue(23);
143 
144  EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
145 
146  EXPECT_EQ(46, **oddObserver);
147 }
148 
149 TEST(Observer, Cycle) {
150  SimpleObservable<int> observable(0);
151  auto observer = observable.getObserver();
153 
154  auto observerA = makeObserver([observer, &observerB]() {
155  auto value = **observer;
156  if (value == 1) {
157  **observerB;
158  }
159  return value;
160  });
161 
162  observerB = makeObserver([observerA]() { return **observerA; });
163 
164  auto collectObserver = makeObserver([observer, observerA, &observerB]() {
165  auto value = **observer;
166  auto valueA = **observerA;
167  auto valueB = ***observerB;
168 
169  if (value == 1) {
170  if (valueA == 0) {
171  EXPECT_EQ(0, valueB);
172  } else {
173  EXPECT_EQ(1, valueA);
174  EXPECT_EQ(0, valueB);
175  }
176  } else if (value == 2) {
177  EXPECT_EQ(value, valueA);
178  EXPECT_TRUE(valueB == 0 || valueB == 2);
179  } else {
180  EXPECT_EQ(value, valueA);
181  EXPECT_EQ(value, valueB);
182  }
183 
184  return value;
185  });
186 
187  folly::Baton<> baton;
188  auto waitingObserver = makeObserver([collectObserver, &baton]() {
189  *collectObserver;
190  baton.post();
191  return folly::Unit();
192  });
193 
194  baton.reset();
195  EXPECT_EQ(0, **collectObserver);
196 
197  for (size_t i = 1; i <= 3; ++i) {
198  observable.setValue(i);
199 
200  EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
201  baton.reset();
202 
203  EXPECT_EQ(i, **collectObserver);
204  }
205 }
206 
207 TEST(Observer, Stress) {
208  SimpleObservable<int> observable(0);
209 
210  auto values = std::make_shared<folly::Synchronized<std::vector<int>>>();
211 
212  auto observer = makeObserver([child = observable.getObserver(), values]() {
213  auto value = **child * 10;
214  values->withWLock([&](std::vector<int>& vals) { vals.push_back(value); });
215  return value;
216  });
217 
218  EXPECT_EQ(0, **observer);
219  values->withRLock([](const std::vector<int>& vals) {
220  EXPECT_EQ(1, vals.size());
221  EXPECT_EQ(0, vals.back());
222  });
223 
224  constexpr size_t numIters = 10000;
225 
226  for (size_t i = 1; i <= numIters; ++i) {
227  observable.setValue(i);
228  }
229 
230  while (**observer != numIters * 10) {
232  }
233 
234  values->withRLock([numIters = numIters](const std::vector<int>& vals) {
235  EXPECT_EQ(numIters * 10, vals.back());
236  EXPECT_LT(vals.size(), numIters / 2);
237 
238  EXPECT_EQ(0, vals[0]);
239  EXPECT_EQ(numIters * 10, vals.back());
240 
241  for (auto value : vals) {
242  EXPECT_EQ(0, value % 10);
243  }
244 
245  for (size_t i = 0; i < vals.size() - 1; ++i) {
246  EXPECT_LE(vals[i], vals[i + 1]);
247  }
248  });
249 }
250 
252  auto createTLObserver = [](int value) {
253  return folly::observer::makeTLObserver([=] { return value; });
254  };
255 
256  auto k =
257  std::make_unique<folly::observer::TLObserver<int>>(createTLObserver(42));
258  EXPECT_EQ(42, ***k);
259  k = std::make_unique<folly::observer::TLObserver<int>>(createTLObserver(41));
260  EXPECT_EQ(41, ***k);
261 }
262 
263 TEST(Observer, SubscribeCallback) {
264  static auto mainThreadId = std::this_thread::get_id();
265  static std::function<void()> updatesCob;
266  static bool slowGet = false;
267  static std::atomic<size_t> getCallsStart{0};
268  static std::atomic<size_t> getCallsFinish{0};
269 
270  struct Observable {
271  ~Observable() {
272  EXPECT_EQ(mainThreadId, std::this_thread::get_id());
273  }
274  };
275  struct Traits {
276  using element_type = int;
277  static std::shared_ptr<const int> get(Observable&) {
278  ++getCallsStart;
279  if (slowGet) {
280  /* sleep override */ std::this_thread::sleep_for(
281  std::chrono::seconds{2});
282  }
283  ++getCallsFinish;
284  return std::make_shared<const int>(42);
285  }
286 
287  static void subscribe(Observable&, std::function<void()> cob) {
288  updatesCob = std::move(cob);
289  }
290 
291  static void unsubscribe(Observable&) {}
292  };
293 
294  std::thread cobThread;
295  {
296  auto observer =
298 
299  EXPECT_TRUE(updatesCob);
300  EXPECT_EQ(2, getCallsStart);
301  EXPECT_EQ(2, getCallsFinish);
302 
303  updatesCob();
304  EXPECT_EQ(3, getCallsStart);
305  EXPECT_EQ(3, getCallsFinish);
306 
307  slowGet = true;
308  cobThread = std::thread([] { updatesCob(); });
309  /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds{1});
310  EXPECT_EQ(4, getCallsStart);
311  EXPECT_EQ(3, getCallsFinish);
312 
313  // Observer is destroyed here
314  }
315 
316  // Make sure that destroying the observer actually joined the updates callback
317  EXPECT_EQ(4, getCallsStart);
318  EXPECT_EQ(4, getCallsFinish);
319  cobThread.join();
320 }
321 
322 TEST(Observer, SetCallback) {
324  auto observer = observable.getObserver();
325  folly::Baton<> baton;
326  int callbackValue = 0;
327  size_t callbackCallsCount = 0;
328 
329  auto callbackHandle =
330  observer.addCallback([&](folly::observer::Snapshot<int> snapshot) {
331  ++callbackCallsCount;
332  callbackValue = *snapshot;
333  baton.post();
334  });
335  baton.wait();
336  baton.reset();
337  EXPECT_EQ(42, callbackValue);
338  EXPECT_EQ(1, callbackCallsCount);
339 
340  observable.setValue(43);
341  baton.wait();
342  baton.reset();
343  EXPECT_EQ(43, callbackValue);
344  EXPECT_EQ(2, callbackCallsCount);
345 
346  callbackHandle.cancel();
347 
348  observable.setValue(44);
349  EXPECT_FALSE(baton.timed_wait(std::chrono::milliseconds{100}));
350  EXPECT_EQ(43, callbackValue);
351  EXPECT_EQ(2, callbackCallsCount);
352 }
353 
355  if (n == 0) {
356  return 0;
357  }
358  return **makeObserver([=] { return makeObserverRecursion(n - 1) + 1; });
359 }
360 
361 TEST(Observer, NestedMakeObserver) {
363 }
#define EXPECT_LE(val1, val2)
Definition: gtest.h:1928
int makeObserverRecursion(int n)
#define EXPECT_THROW(statement, expected_exception)
Definition: gtest.h:1843
void subscribe(uint32_t iters, int N)
Definition: RxBenchmark.cpp:54
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
FOLLY_ALWAYS_INLINE bool timed_wait(const std::chrono::duration< Rep, Period > &timeout) noexcept
Alias to try_wait_for. Deprecated.
Definition: Baton.h:241
Observer< observer_detail::ResultOfUnwrapSharedPtr< F > > makeObserver(F &&creator)
Definition: Observer-inl.h:37
FOLLY_ALWAYS_INLINE bool try_wait_for(const std::chrono::duration< Rep, Period > &timeout, const WaitOptions &opt=wait_options()) noexcept
Definition: Baton.h:206
FOLLY_ALWAYS_INLINE void wait(const WaitOptions &opt=wait_options()) noexcept
Definition: Baton.h:170
TLObserver< T > makeTLObserver(Observer< T > observer)
Definition: Observer.h:211
void post() noexcept
Definition: Baton.h:123
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
folly::Function< void()> child
Definition: AtFork.cpp:35
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
#define EXPECT_LT(val1, val2)
Definition: gtest.h:1930
KeyT k
TEST(SequencedExecutor, CPUThreadPoolExecutor)
void reset() noexcept
Definition: Baton.h:96
std::vector< int > values(1'000)