proxygen
ProducerConsumerQueueBenchmark.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 // @author: Bert Maher <bertrand@fb.com>
18 
20 
21 #include <cstdio>
22 #include <iostream>
23 #include <thread>
24 
25 #include <glog/logging.h>
26 
27 #include <folly/Benchmark.h>
31 #include <folly/stats/Histogram.h>
32 
33 namespace {
34 
35 using namespace folly;
36 
37 typedef unsigned int ThroughputType;
38 typedef ProducerConsumerQueue<ThroughputType> ThroughputQueueType;
39 
40 typedef unsigned long LatencyType;
41 typedef ProducerConsumerQueue<LatencyType> LatencyQueueType;
42 
43 template <class QueueType>
44 struct ThroughputTest {
45  explicit ThroughputTest(size_t size, int iters, int cpu0, int cpu1)
46  : queue_(size), done_(false), iters_(iters), cpu0_(cpu0), cpu1_(cpu1) {}
47 
48  void producer() {
49  if (cpu0_ > -1) {
50  cpu_set_t cpuset;
51  CPU_ZERO(&cpuset);
52  CPU_SET(cpu0_, &cpuset);
53  pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
54  }
55  for (int i = 0; i < iters_; ++i) {
56  ThroughputType item = i;
57  while (!queue_.write((ThroughputType)item)) {
58  }
59  }
60  }
61 
62  void consumer() {
63  if (cpu1_ > -1) {
64  cpu_set_t cpuset;
65  CPU_ZERO(&cpuset);
66  CPU_SET(cpu1_, &cpuset);
67  pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
68  }
69  for (int i = 0; i < iters_; ++i) {
70  ThroughputType item = 0;
71  while (!queue_.read(item)) {
72  }
73  doNotOptimizeAway(item);
74  }
75  }
76 
77  QueueType queue_;
78  std::atomic<bool> done_;
79  const int iters_;
80  int cpu0_;
81  int cpu1_;
82 };
83 
84 template <class QueueType>
85 struct LatencyTest {
86  explicit LatencyTest(size_t size, int iters, int cpu0, int cpu1)
87  : queue_(size),
88  done_(false),
89  iters_(iters),
90  cpu0_(cpu0),
91  cpu1_(cpu1),
92  hist_(1, 0, 30) {
93  computeTimeCost();
94  }
95 
96  static uint64_t timespecDiff(timespec end, timespec start) {
97  if (end.tv_sec == start.tv_sec) {
98  assert(end.tv_nsec >= start.tv_nsec);
99  return uint64_t(end.tv_nsec - start.tv_nsec);
100  }
101  assert(end.tv_sec > start.tv_sec);
102  auto diff = uint64_t(end.tv_sec - start.tv_sec);
103  assert(diff < std::numeric_limits<uint64_t>::max() / 1000000000ULL);
104  return diff * 1000000000ULL + end.tv_nsec - start.tv_nsec;
105  }
106 
107  void computeTimeCost() {
108  timespec start, end;
109  clock_gettime(CLOCK_REALTIME, &start);
110  for (int i = 0; i < iters_; ++i) {
111  timespec tv;
112  clock_gettime(CLOCK_REALTIME, &tv);
113  }
114  clock_gettime(CLOCK_REALTIME, &end);
115  time_cost_ = 2 * timespecDiff(end, start) / iters_;
116  }
117 
118  void producer() {
119  if (cpu0_ > -1) {
120  cpu_set_t cpuset;
121  CPU_ZERO(&cpuset);
122  CPU_SET(cpu0_, &cpuset);
123  pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
124  }
125  for (int i = 0; i < iters_; ++i) {
126  timespec sleeptime, sleepstart;
127  clock_gettime(CLOCK_REALTIME, &sleepstart);
128  do {
129  clock_gettime(CLOCK_REALTIME, &sleeptime);
130  } while (timespecDiff(sleeptime, sleepstart) < 1000000);
131 
132  timespec tv;
133  clock_gettime(CLOCK_REALTIME, &tv);
134  while (!queue_.write((LatencyType)tv.tv_nsec)) {
135  }
136  }
137  }
138 
139  void consumer() {
140  if (cpu1_ > -1) {
141  cpu_set_t cpuset;
142  CPU_ZERO(&cpuset);
143  CPU_SET(cpu1_, &cpuset);
144  pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
145  }
146  for (int i = 0; i < iters_; ++i) {
147  unsigned long enqueue_nsec;
148  while (!queue_.read(enqueue_nsec)) {
149  }
150 
151  timespec tv;
152  clock_gettime(CLOCK_REALTIME, &tv);
153  int diff = tv.tv_nsec - enqueue_nsec - time_cost_;
154  if (diff < 0) {
155  continue;
156  }
157 
158  // Naive log-scale bucketing.
159  int bucket;
160  for (bucket = 0; bucket <= 30 && (1 << bucket) <= diff; ++bucket) {
161  }
162  hist_.addValue(bucket - 1);
163  }
164  }
165 
166  void printHistogram() {
167  hist_.toTSV(std::cout);
168  }
169 
170  QueueType queue_;
171  std::atomic<bool> done_;
172  int time_cost_;
173  const int iters_;
174  int cpu0_;
175  int cpu1_;
176  Histogram<int> hist_;
177 };
178 
179 void BM_ProducerConsumer(int iters, int size) {
180  BenchmarkSuspender susp;
181  CHECK_GT(size, 0);
182  ThroughputTest<ThroughputQueueType>* test =
183  new ThroughputTest<ThroughputQueueType>(size, iters, -1, -1);
184  susp.dismiss();
185 
186  std::thread producer([test] { test->producer(); });
187  std::thread consumer([test] { test->consumer(); });
188 
189  producer.join();
190  test->done_ = true;
191  consumer.join();
192  delete test;
193 }
194 
195 void BM_ProducerConsumerAffinity(int iters, int size) {
196  BenchmarkSuspender susp;
197  CHECK_GT(size, 0);
198  ThroughputTest<ThroughputQueueType>* test =
199  new ThroughputTest<ThroughputQueueType>(size, iters, 0, 1);
200  susp.dismiss();
201 
202  std::thread producer([test] { test->producer(); });
203  std::thread consumer([test] { test->consumer(); });
204 
205  producer.join();
206  test->done_ = true;
207  consumer.join();
208  delete test;
209 }
210 
211 void BM_ProducerConsumerLatency(int /* iters */, int size) {
212  BenchmarkSuspender susp;
213  CHECK_GT(size, 0);
214  LatencyTest<LatencyQueueType>* test =
215  new LatencyTest<LatencyQueueType>(size, 100000, 0, 1);
216  susp.dismiss();
217 
218  std::thread producer([test] { test->producer(); });
219  std::thread consumer([test] { test->consumer(); });
220 
221  producer.join();
222  test->done_ = true;
223  consumer.join();
224  test->printHistogram();
225  delete test;
226 }
227 
229 
230 BENCHMARK_PARAM(BM_ProducerConsumer, 1048574)
231 BENCHMARK_PARAM(BM_ProducerConsumerAffinity, 1048574)
232 BENCHMARK_PARAM(BM_ProducerConsumerLatency, 1048574)
233 
234 } // namespace
235 
236 int main(int argc, char** argv) {
237  google::InitGoogleLogging(argv[0]);
238  gflags::ParseCommandLineFlags(&argc, &argv, true);
239 
240  runBenchmarks();
241  return 0;
242 }
243 
244 #if 0
245 /*
246 Benchmark
247 
248 $ lscpu
249 Architecture: x86_64
250 CPU op-mode(s): 32-bit, 64-bit
251 Byte Order: Little Endian
252 CPU(s): 24
253 On-line CPU(s) list: 0-23
254 Thread(s) per core: 1
255 Core(s) per socket: 1
256 Socket(s): 24
257 NUMA node(s): 1
258 Vendor ID: GenuineIntel
259 CPU family: 6
260 Model: 60
261 Model name: Intel Core Processor (Haswell, no TSX)
262 Stepping: 1
263 CPU MHz: 2494.244
264 BogoMIPS: 4988.48
265 Hypervisor vendor: KVM
266 Virtualization type: full
267 L1d cache: 32K
268 L1i cache: 32K
269 L2 cache: 4096K
270 NUMA node0 CPU(s): 0-23
271 
272 $ ../buck-out/gen/folly/test/producer_consumer_queue_benchmark
273 5 6 1 5
274 6 7 1893 11358
275 7 8 39671 277697
276 8 9 34921 279368
277 9 10 17799 160191
278 10 11 3685 36850
279 11 12 1075 11825
280 12 13 456 5472
281 13 14 422 5486
282 14 15 64 896
283 15 16 7 105
284 16 17 3 48
285 17 18 3 51
286 ============================================================================
287 folly/test/ProducerConsumerQueueBenchmark.cpp relative time/iter iters/s
288 ============================================================================
289 ----------------------------------------------------------------------------
290 BM_ProducerConsumer(1048574) 5.82ns 171.75M
291 BM_ProducerConsumerAffinity(1048574) 7.36ns 135.83M
292 BM_ProducerConsumerLatency(1048574) 1.67min 9.99m
293 ============================================================================
294 */
295 #endif
int(* clock_gettime)(clockid_t, timespec *ts)
LogLevel max
Definition: LogLevel.cpp:31
static uint64_t test(std::string name, bool fc_, bool dedicated_, bool tc_, bool syncops_, uint64_t base)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void runBenchmarks()
Definition: Benchmark.cpp:456
char ** argv
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
auto end(TestAdlIterable &instance)
Definition: ForeachTest.cpp:62
int main(int argc, char **argv)
auto start
#define BENCHMARK_PARAM(name, param)
Definition: Benchmark.h:417
uint64_t diff(uint64_t a, uint64_t b)
Definition: FutexTest.cpp:135
BENCHMARK_DRAW_LINE()
auto doNotOptimizeAway(const T &datum) -> typename std::enable_if< !detail::DoNotOptimizeAwayNeedsIndirect< T >::value >::type
Definition: Benchmark.h:258