proxygen
LockFreeRingBuffer.h
Go to the documentation of this file.
1 /*
2  * Copyright 2015-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <atomic>
20 #include <cmath>
21 #include <cstring>
22 #include <memory>
23 #include <type_traits>
24 
25 #include <boost/noncopyable.hpp>
26 
27 #include <folly/Portability.h>
28 #include <folly/Traits.h>
31 
32 namespace folly {
33 namespace detail {
34 
35 template <typename T, template <typename> class Atom>
37 } // namespace detail
38 
57 
58 template <typename T, template <typename> class Atom = std::atomic>
59 class LockFreeRingBuffer : boost::noncopyable {
60  static_assert(
62  "Element type must be nothrow default constructible");
63 
64  static_assert(
66  "Element type must be trivially copyable");
67 
68  public:
71  struct Cursor {
72  explicit Cursor(uint64_t initialTicket) noexcept : ticket(initialTicket) {}
73 
77  uint64_t prevTicket = ticket;
78  ticket += steps;
79  return prevTicket != ticket;
80  }
81 
85  uint64_t prevTicket = ticket;
86  if (steps > ticket) {
87  ticket = 0;
88  } else {
89  ticket -= steps;
90  }
91  return prevTicket != ticket;
92  }
93 
94  protected: // for test visibility reasons
96  friend class LockFreeRingBuffer;
97  };
98 
100  : capacity_(capacity),
101  slots_(new detail::RingBufferSlot<T, Atom>[capacity]),
102  ticket_(0) {}
103 
108  uint64_t ticket = ticket_.fetch_add(1);
109  slots_[idx(ticket)].write(turn(ticket), value);
110  }
111 
117  uint64_t ticket = ticket_.fetch_add(1);
118  slots_[idx(ticket)].write(turn(ticket), value);
119  return Cursor(ticket);
120  }
121 
126  bool tryRead(T& dest, const Cursor& cursor) noexcept {
127  return slots_[idx(cursor.ticket)].tryRead(dest, turn(cursor.ticket));
128  }
129 
134  bool waitAndTryRead(T& dest, const Cursor& cursor) noexcept {
135  return slots_[idx(cursor.ticket)].waitAndTryRead(dest, turn(cursor.ticket));
136  }
137 
140  return Cursor(ticket_.load());
141  }
142 
147  Cursor currentTail(double skipFraction = 0.0) noexcept {
148  assert(skipFraction >= 0.0 && skipFraction <= 1.0);
149  uint64_t ticket = ticket_.load();
150 
151  uint64_t backStep = llround((1.0 - skipFraction) * capacity_);
152 
153  // always try to move at least one step backward to something readable
154  backStep = std::max<uint64_t>(1, backStep);
155 
156  // can't go back more steps than we've taken
157  backStep = std::min(ticket, backStep);
158 
159  return Cursor(ticket - backStep);
160  }
161 
163 
164  private:
166 
167  const std::unique_ptr<detail::RingBufferSlot<T, Atom>[]> slots_;
168 
169  Atom<uint64_t> ticket_;
170 
172  return ticket % capacity_;
173  }
174 
176  return (uint32_t)(ticket / capacity_);
177  }
178 }; // LockFreeRingBuffer
179 
180 namespace detail {
181 template <typename T, template <typename> class Atom>
182 class RingBufferSlot {
183  public:
184  explicit RingBufferSlot() noexcept : sequencer_(), data() {}
185 
186  void write(const uint32_t turn, T& value) noexcept {
187  Atom<uint32_t> cutoff(0);
188  sequencer_.waitForTurn(turn * 2, cutoff, false);
189 
190  // Change to an odd-numbered turn to indicate write in process
191  sequencer_.completeTurn(turn * 2);
192 
193  data = std::move(value);
194  sequencer_.completeTurn(turn * 2 + 1);
195  // At (turn + 1) * 2
196  }
197 
199  uint32_t desired_turn = (turn + 1) * 2;
200  Atom<uint32_t> cutoff(0);
201  if (sequencer_.tryWaitForTurn(desired_turn, cutoff, false) !=
203  return false;
204  }
205  memcpy(&dest, &data, sizeof(T));
206 
207  // if it's still the same turn, we read the value successfully
208  return sequencer_.isTurn(desired_turn);
209  }
210 
211  bool tryRead(T& dest, uint32_t turn) noexcept {
212  // The write that started at turn 0 ended at turn 2
213  if (!sequencer_.isTurn((turn + 1) * 2)) {
214  return false;
215  }
216  memcpy(&dest, &data, sizeof(T));
217 
218  // if it's still the same turn, we read the value successfully
219  return sequencer_.isTurn((turn + 1) * 2);
220  }
221 
222  private:
225 }; // RingBufferSlot
226 
227 } // namespace detail
228 
229 } // namespace folly
Cursor currentTail(double skipFraction=0.0) noexcept
bool waitAndTryRead(T &dest, const Cursor &cursor) noexcept
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
dest
Definition: upload.py:394
folly::std T
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
uint32_t idx(uint64_t ticket) noexcept
static constexpr StringPiece ticket
LockFreeRingBuffer(uint32_t capacity) noexcept
void write(const uint32_t turn, T &value) noexcept
LogLevel min
Definition: LogLevel.cpp:30
Cursor(uint64_t initialTicket) noexcept
#define Atom
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
Cursor writeAndGetCursor(T &value) noexcept
std::is_trivially_copyable< T > is_trivially_copyable
Definition: Traits.h:313
const std::unique_ptr< detail::RingBufferSlot< T, Atom >[]> slots_
static const char *const value
Definition: Conv.cpp:50
TurnSequencer< Atom > sequencer_
uint32_t turn(uint64_t ticket) noexcept
bool moveBackward(uint64_t steps=1) noexcept
void write(T &value) noexcept
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
Cursor currentHead() noexcept
Returns a Cursor pointing to the first write that has not occurred yet.
bool waitAndTryRead(T &dest, uint32_t turn) noexcept
bool moveForward(uint64_t steps=1) noexcept
bool tryRead(T &dest, uint32_t turn) noexcept
bool tryRead(T &dest, const Cursor &cursor) noexcept