proxygen
FlatCombiningExamples.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <atomic>
20 #include <memory>
21 #include <mutex>
22 
25 
26 namespace folly {
27 
28 struct alignas(hardware_destructive_interference_size) Line {
30 };
31 
32 class Data { // Sequential data structure
33  public:
34  explicit Data(size_t size) : size_(size) {
35  x_ = std::make_unique<Line[]>(size_);
36  }
37 
39  uint64_t val = x_[0].val_;
40  for (size_t i = 1; i < size_; ++i) {
41  assert(x_[i].val_ == val);
42  }
43  return val;
44  }
45 
46  // add
47 
48  void add(uint64_t val) {
49  uint64_t oldval = x_[0].val_;
50  for (size_t i = 0; i < size_; ++i) {
51  assert(x_[i].val_ == oldval);
52  x_[i].val_ = oldval + val;
53  }
54  }
55 
57  uint64_t res = x_[0].val_;
58  for (size_t i = 0; i < size_; ++i) {
59  assert(x_[i].val_ == res);
60  x_[i].val_ += val;
61  }
62  return res;
63  }
64 
65  private:
66  size_t size_;
67  std::unique_ptr<Line[]> x_;
68 };
69 
70 // Example of FC concurrent data structure using simple interface
71 
72 template <
73  typename Mutex = std::mutex,
74  template <typename> class Atom = std::atomic>
76  : public FlatCombining<FcSimpleExample<Mutex, Atom>, Mutex, Atom> {
78  using Rec = typename FC::Rec;
79 
80  public:
81  explicit FcSimpleExample(
82  size_t size,
83  bool dedicated = true,
84  uint32_t numRecs = 0,
85  uint32_t maxOps = 0)
86  : FC(dedicated, numRecs, maxOps), data_(size) {}
87 
89  return data_.getVal();
90  }
91 
92  // add
93 
95  this->requestNoFC([&] { data_.add(val); });
96  }
97 
98  void add(uint64_t val, Rec* rec = nullptr) {
99  auto opFn = [&, val] { // asynchronous -- capture val by value
100  data_.add(val);
101  };
102  this->requestFC(opFn, rec, false);
103  }
104 
105  // fetchAdd
106 
108  uint64_t res;
109  auto opFn = [&] { res = data_.fetchAdd(val); };
110  this->requestNoFC(opFn);
111  return res;
112  }
113 
114  uint64_t fetchAdd(uint64_t val, Rec* rec = nullptr) {
115  uint64_t res;
116  auto opFn = [&] { res = data_.fetchAdd(val); };
117  this->requestFC(opFn, rec);
118  return res;
119  }
120 
121  private:
123 };
124 
125 // Example of FC data structure using custom request processing
126 
127 class Req {
128  public:
129  enum class Type { ADD, FETCHADD };
130 
131  void setType(Type type) {
132  type_ = type;
133  }
134 
136  return type_;
137  }
138 
140  val_ = val;
141  }
142 
144  return val_;
145  }
146 
147  void setRes(uint64_t res) {
148  res_ = res;
149  }
150 
152  return res_;
153  }
154 
155  private:
159 };
160 
161 template <
162  typename Req,
163  typename Mutex = std::mutex,
164  template <typename> class Atom = std::atomic>
166  FcCustomExample<Req, Mutex, Atom>,
167  Mutex,
168  Atom,
169  Req> {
171  using Rec = typename FC::Rec;
172 
173  public:
174  explicit FcCustomExample(
175  int size,
176  bool dedicated = true,
177  uint32_t numRecs = 0,
178  uint32_t maxOps = 0)
179  : FC(dedicated, numRecs, maxOps), data_(size) {}
180 
182  return data_.getVal();
183  }
184 
185  // add
186 
188  this->requestNoFC([&] { data_.add(val); });
189  }
190 
191  void add(uint64_t val, Rec* rec = nullptr) {
192  auto opFn = [&, val] { data_.add(val); };
193  auto fillFn = [&](Req& req) {
194  req.setType(Req::Type::ADD);
195  req.setVal(val);
196  };
197  this->requestFC(opFn, fillFn, rec, false); // asynchronous
198  }
199 
200  // fetchAdd
201 
203  uint64_t res;
204  auto opFn = [&] { res = data_.fetchAdd(val); };
205  this->requestNoFC(opFn);
206  return res;
207  }
208 
209  uint64_t fetchAdd(uint64_t val, Rec* rec = nullptr) {
210  uint64_t res;
211  auto opFn = [&] { res = data_.fetchAdd(val); };
212  auto fillFn = [&](Req& req) {
213  req.setType(Req::Type::FETCHADD);
214  req.setVal(val);
215  };
216  auto resFn = [&](Req& req) { res = req.getRes(); };
217  this->requestFC(opFn, fillFn, resFn, rec);
218  return res;
219  }
220 
221  // custom combined op processing - overrides FlatCombining::combinedOp(Req&)
222  void combinedOp(Req& req) {
223  switch (req.getType()) {
224  case Req::Type::ADD:
225  data_.add(req.getVal());
226  return;
227  case Req::Type::FETCHADD:
228  req.setRes(data_.fetchAdd(req.getVal()));
229  return;
230  }
232  }
233 
234  private:
236 };
237 
238 } // namespace folly
uint64_t fetchAddNoFC(uint64_t val)
PskType type
std::unique_ptr< Line[]> x_
#define Mutex
double val
Definition: String.cpp:273
void add(uint64_t val)
uint64_t fetchAddNoFC(uint64_t val)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
uint64_t fetchAdd(uint64_t val, Rec *rec=nullptr)
FcSimpleExample(size_t size, bool dedicated=true, uint32_t numRecs=0, uint32_t maxOps=0)
static bool dedicated
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
FOLLY_ALWAYS_INLINE void assume_unreachable()
Definition: Assume.h:59
Type type_
Definition: JSONSchema.cpp:208
#define Atom
uint64_t fetchAdd(uint64_t val, Rec *rec=nullptr)
void add(uint64_t val, Rec *rec=nullptr)
void setVal(uint64_t val)
Data(size_t size)
void setType(Type type)
std::mutex mutex
FcCustomExample(int size, bool dedicated=true, uint32_t numRecs=0, uint32_t maxOps=0)
void addNoFC(uint64_t val)
int x_
void add(uint64_t val, Rec *rec=nullptr)
StringPiece data_
void setRes(uint64_t res)
uint64_t fetchAdd(uint64_t val)