proxygen
LockFreeRingBufferTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2015-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <iostream>
18 #include <thread>
19 
23 
24 namespace folly {
25 
26 TEST(LockFreeRingBuffer, writeReadSequentially) {
27  const int capacity = 256;
28  const int turns = 4;
29 
30  LockFreeRingBuffer<int> rb(capacity);
32  for (unsigned int turn = 0; turn < turns; turn++) {
33  for (unsigned int write = 0; write < capacity; write++) {
34  int val = turn * capacity + write;
35  rb.write(val);
36  }
37 
38  for (unsigned int write = 0; write < capacity; write++) {
39  int dest = 0;
40  ASSERT_TRUE(rb.tryRead(dest, cur));
41  ASSERT_EQ(turn * capacity + write, dest);
42  cur.moveForward();
43  }
44  }
45 }
46 
47 TEST(LockFreeRingBuffer, writeReadSequentiallyBackward) {
48  const int capacity = 256;
49  const int turns = 4;
50 
51  LockFreeRingBuffer<int> rb(capacity);
52  for (unsigned int turn = 0; turn < turns; turn++) {
53  for (unsigned int write = 0; write < capacity; write++) {
54  int val = turn * capacity + write;
55  rb.write(val);
56  }
57 
59  cur.moveBackward(1);
60  for (int write = capacity - 1; write >= 0; write--) {
61  int foo = 0;
62  ASSERT_TRUE(rb.tryRead(foo, cur));
63  ASSERT_EQ(turn * capacity + write, foo);
64  cur.moveBackward();
65  }
66  }
67 }
68 
69 TEST(LockFreeRingBuffer, readsCanBlock) {
70  // Start a reader thread, confirm that reading can block
71  std::atomic<bool> readerHasRun(false);
73  auto cursor = rb.currentHead();
74  cursor.moveForward(3); // wait for the 4th write
75 
76  const int sentinel = 0xfaceb00c;
77 
78  auto reader = std::thread([&]() {
79  int val = 0;
80  EXPECT_TRUE(rb.waitAndTryRead(val, cursor));
81  readerHasRun = true;
82  EXPECT_EQ(sentinel, val);
83  });
84 
85  for (int i = 0; i < 4; i++) {
86  EXPECT_FALSE(readerHasRun);
87  int val = sentinel;
88  rb.write(val);
89  }
90  reader.join();
91  EXPECT_TRUE(readerHasRun);
92 }
93 
94 // expose the cursor raw value via a wrapper type
95 template <typename T, template <typename> class Atom>
97  typedef typename LockFreeRingBuffer<T, Atom>::Cursor RBCursor;
98 
99  struct ExposedCursor : RBCursor {
100  ExposedCursor(const RBCursor& cursor) : RBCursor(cursor) {}
101  uint64_t value() {
102  return this->ticket;
103  }
104  };
105  return ExposedCursor(rbcursor).value();
106 }
107 
108 template <template <typename> class Atom>
111  std::atomic<int32_t>& writes) {
112  int32_t idx;
113  while ((idx = writes--) > 0) {
114  rb.write(idx);
115  }
116 }
117 
118 template <template <typename> class Atom>
119 void runWritesNeverFail(int capacity, int writes, int writers) {
121 
122  DeterministicSchedule sched(DeterministicSchedule::uniform(0));
123  LockFreeRingBuffer<int, Atom> rb(capacity);
124 
125  std::atomic<int32_t> writes_remaining(writes);
126  std::vector<std::thread> threads(writers);
127 
128  for (int i = 0; i < writers; i++) {
129  threads[i] = DeterministicSchedule::thread(
130  std::bind(runReader<Atom>, std::ref(rb), std::ref(writes_remaining)));
131  }
132 
133  for (auto& thread : threads) {
135  }
136 
137  EXPECT_EQ(writes, (value<int, Atom>)(rb.currentHead()));
138 }
139 
140 TEST(LockFreeRingBuffer, writesNeverFail) {
143 
144  runWritesNeverFail<DeterministicAtomic>(1, 100, 4);
145  runWritesNeverFail<DeterministicAtomic>(10, 100, 4);
146  runWritesNeverFail<DeterministicAtomic>(100, 1000, 8);
147  runWritesNeverFail<DeterministicAtomic>(1000, 10000, 16);
148 
149  runWritesNeverFail<std::atomic>(1, 100, 4);
150  runWritesNeverFail<std::atomic>(10, 100, 4);
151  runWritesNeverFail<std::atomic>(100, 1000, 8);
152  runWritesNeverFail<std::atomic>(1000, 10000, 16);
153 
154  runWritesNeverFail<EmulatedFutexAtomic>(1, 100, 4);
155  runWritesNeverFail<EmulatedFutexAtomic>(10, 100, 4);
156  runWritesNeverFail<EmulatedFutexAtomic>(100, 1000, 8);
157  runWritesNeverFail<EmulatedFutexAtomic>(1000, 10000, 16);
158 }
159 
160 TEST(LockFreeRingBuffer, readerCanDetectSkips) {
161  const int capacity = 4;
162  const int rounds = 4;
163 
164  LockFreeRingBuffer<int> rb(capacity);
165  auto cursor = rb.currentHead();
166  cursor.moveForward(1);
167 
168  for (int round = 0; round < rounds; round++) {
169  for (int i = 0; i < capacity; i++) {
170  int val = round * capacity + i;
171  rb.write(val);
172  }
173  }
174 
175  int result = -1;
176  EXPECT_FALSE(rb.tryRead(result, cursor));
177  EXPECT_FALSE(rb.waitAndTryRead(result, cursor));
178  EXPECT_EQ(-1, result);
179 
180  cursor = rb.currentTail();
181  EXPECT_TRUE(rb.tryRead(result, cursor));
182  EXPECT_EQ(capacity * (rounds - 1), result);
183 
184  cursor = rb.currentTail(1.0);
185  EXPECT_TRUE(rb.tryRead(result, cursor));
186  EXPECT_EQ((capacity * rounds) - 1, result);
187 }
188 
189 TEST(LockFreeRingBuffer, currentTailRange) {
190  const int capacity = 4;
191  LockFreeRingBuffer<int> rb(capacity);
192 
193  // Workaround for template deduction failure
194  auto (&cursorValue)(value<int, std::atomic>);
195 
196  // Empty buffer - everything points to 0
197  EXPECT_EQ(0, cursorValue(rb.currentTail(0)));
198  EXPECT_EQ(0, cursorValue(rb.currentTail(0.5)));
199  EXPECT_EQ(0, cursorValue(rb.currentTail(1)));
200 
201  // Half-full
202  int val = 5;
203  rb.write(val);
204  rb.write(val);
205 
206  EXPECT_EQ(0, cursorValue(rb.currentTail(0)));
207  EXPECT_EQ(1, cursorValue(rb.currentTail(1)));
208 
209  // Full
210  rb.write(val);
211  rb.write(val);
212 
213  EXPECT_EQ(0, cursorValue(rb.currentTail(0)));
214  EXPECT_EQ(3, cursorValue(rb.currentTail(1)));
215 
216  auto midvalue = cursorValue(rb.currentTail(0.5));
217  // both rounding behaviours are acceptable
218  EXPECT_TRUE(midvalue == 1 || midvalue == 2);
219 }
220 
221 TEST(LockFreeRingBuffer, cursorFromWrites) {
222  const int capacity = 3;
223  LockFreeRingBuffer<int> rb(capacity);
224 
225  // Workaround for template deduction failure
226  auto (&cursorValue)(value<int, std::atomic>);
227 
228  int val = 0xfaceb00c;
229  EXPECT_EQ(0, cursorValue(rb.writeAndGetCursor(val)));
230  EXPECT_EQ(1, cursorValue(rb.writeAndGetCursor(val)));
231  EXPECT_EQ(2, cursorValue(rb.writeAndGetCursor(val)));
232 
233  // Check that rb is giving out actual cursors and not just
234  // pointing to the current slot.
235  EXPECT_EQ(3, cursorValue(rb.writeAndGetCursor(val)));
236 }
237 
238 TEST(LockFreeRingBuffer, moveBackwardsCanFail) {
239  const int capacity = 3;
240  LockFreeRingBuffer<int> rb(capacity);
241 
242  // Workaround for template deduction failure
243  auto (&cursorValue)(value<int, std::atomic>);
244 
245  int val = 0xfaceb00c;
246  rb.write(val);
247  rb.write(val);
248 
249  auto cursor = rb.currentHead(); // points to 2
250  EXPECT_EQ(2, cursorValue(cursor));
251  EXPECT_TRUE(cursor.moveBackward());
252  EXPECT_TRUE(cursor.moveBackward()); // now at 0
253  EXPECT_FALSE(cursor.moveBackward()); // moving back does nothing
254 }
255 
256 } // namespace folly
Cursor currentTail(double skipFraction=0.0) noexcept
void write(const T &in, folly::io::Appender &appender)
Definition: Types-inl.h:112
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
bool waitAndTryRead(T &dest, const Cursor &cursor) noexcept
constexpr To round(std::chrono::duration< Rep, Period > const &d)
Definition: Chrono.h:139
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
DeterministicAtomicImpl< T, DeterministicSchedule > DeterministicAtomic
dest
Definition: upload.py:394
double val
Definition: String.cpp:273
void runWritesNeverFail(int capacity, int writes, int writers)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
static constexpr StringPiece ticket
std::vector< std::thread::id > threads
#define Atom
Cursor writeAndGetCursor(T &value) noexcept
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
bool moveBackward(uint64_t steps=1) noexcept
void write(T &value) noexcept
int bind(NetworkSocket s, const sockaddr *name, socklen_t namelen)
Definition: NetOps.cpp:76
#define join
void runReader(LockFreeRingBuffer< int, Atom > &rb, std::atomic< int32_t > &writes)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
Cursor currentHead() noexcept
Returns a Cursor pointing to the first write that has not occurred yet.
bool moveForward(uint64_t steps=1) noexcept
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
TEST(SequencedExecutor, CPUThreadPoolExecutor)
bool tryRead(T &dest, const Cursor &cursor) noexcept