Caffe2 - C++ API
A deep learning, cross platform ML framework
pthreadpool.cc
1 /* Standard C headers */
2 #include <stdint.h>
3 #include <stdbool.h>
4 #include <stdlib.h>
5 #include <string.h>
6 #include <assert.h>
7 
8 /* POSIX headers */
9 #include <pthread.h>
10 #include <unistd.h>
11 
12 /* Library header */
13 #include "caffe2/core/logging.h"
14 #include "caffe2/utils/fixed_divisor.h"
15 #include "caffe2/utils/threadpool/pthreadpool.h"
16 
17 
18 static inline size_t divide_round_up(size_t dividend, size_t divisor) {
19  if (dividend % divisor == 0) {
20  return dividend / divisor;
21  } else {
22  return dividend / divisor + 1;
23  }
24 }
25 
26 static inline size_t min(size_t a, size_t b) {
27  return a < b ? a : b;
28 }
29 
31  pthreadpool_function_1d_tiled_t function;
32  void* argument;
33  size_t range;
34  size_t tile;
35 };
36 
37 static void compute_1d_tiled(const struct compute_1d_tiled_context* context, size_t linear_index) {
38  const size_t tile_index = linear_index;
39  const size_t index = tile_index * context->tile;
40  const size_t tile = min(context->tile, context->range - index);
41  context->function(context->argument, index, tile);
42 }
43 
44 void pthreadpool_compute_1d_tiled(
45  pthreadpool_t threadpool,
46  pthreadpool_function_1d_tiled_t function,
47  void* argument,
48  size_t range,
49  size_t tile)
50 {
51  if (threadpool == NULL) {
52  /* No thread pool provided: execute function sequentially on the calling thread */
53  for (size_t i = 0; i < range; i += tile) {
54  function(argument, i, min(range - i, tile));
55  }
56  } else {
57  /* Execute in parallel on the thread pool using linearized index */
58  const size_t tile_range = divide_round_up(range, tile);
59  struct compute_1d_tiled_context context = {
60  .function = function,
61  .argument = argument,
62  .range = range,
63  .tile = tile
64  };
65  pthreadpool_compute_1d(threadpool, (pthreadpool_function_1d_t) compute_1d_tiled, &context, tile_range);
66  }
67 }
68 
70  pthreadpool_function_2d_t function;
71  void* argument;
73 };
74 
75 static void compute_2d(const struct compute_2d_context* context, size_t linear_index) {
76  DCHECK_LE(linear_index, std::numeric_limits<int>::max());
77 
78  int q;
79  int r;
80  context->range_j.divMod((int) linear_index, q, r);
81  context->function(context->argument, q, r);
82 }
83 
84 void pthreadpool_compute_2d(
85  struct pthreadpool* threadpool,
86  pthreadpool_function_2d_t function,
87  void* argument,
88  size_t range_i,
89  size_t range_j)
90 {
91  if (threadpool == NULL) {
92  /* No thread pool provided: execute function sequentially on the calling thread */
93  for (size_t i = 0; i < range_i; i++) {
94  for (size_t j = 0; j < range_j; j++) {
95  function(argument, i, j);
96  }
97  }
98  } else {
99  DCHECK_LE(range_i * range_j, (size_t) std::numeric_limits<int>::max());
100  /* Execute in parallel on the thread pool using linearized index */
101  struct compute_2d_context context = {
102  .function = function,
103  .argument = argument,
104  .range_j = caffe2::FixedDivisor<int>(range_j)
105  };
106  pthreadpool_compute_1d(threadpool, (pthreadpool_function_1d_t) compute_2d, &context, range_i * range_j);
107  }
108 }
109 
111  pthreadpool_function_2d_tiled_t function;
112  void* argument;
113  caffe2::FixedDivisor<int> tile_range_j;
114  size_t range_i;
115  size_t range_j;
116  size_t tile_i;
117  size_t tile_j;
118 };
119 
120 static void compute_2d_tiled(const struct compute_2d_tiled_context* context, size_t linear_index) {
121  int q;
122  int r;
123 
124  context->tile_range_j.divMod(linear_index, q, r);
125  const size_t max_tile_i = context->tile_i;
126  const size_t max_tile_j = context->tile_j;
127  const size_t index_i = q * max_tile_i;
128  const size_t index_j = r * max_tile_j;
129  const size_t tile_i = min(max_tile_i, context->range_i - index_i);
130  const size_t tile_j = min(max_tile_j, context->range_j - index_j);
131  context->function(context->argument, index_i, index_j, tile_i, tile_j);
132 }
133 
134 void pthreadpool_compute_2d_tiled(
135  pthreadpool_t threadpool,
136  pthreadpool_function_2d_tiled_t function,
137  void* argument,
138  size_t range_i,
139  size_t range_j,
140  size_t tile_i,
141  size_t tile_j)
142 {
143  if (threadpool == NULL) {
144  /* No thread pool provided: execute function sequentially on the calling thread */
145  for (size_t i = 0; i < range_i; i += tile_i) {
146  for (size_t j = 0; j < range_j; j += tile_j) {
147  function(argument, i, j, min(range_i - i, tile_i), min(range_j - j, tile_j));
148  }
149  }
150  } else {
151  /* Execute in parallel on the thread pool using linearized index */
152  const size_t tile_range_i = divide_round_up(range_i, tile_i);
153  const size_t tile_range_j = divide_round_up(range_j, tile_j);
154  DCHECK_LE(tile_range_i * tile_range_j, (size_t) std::numeric_limits<int>::max());
155  struct compute_2d_tiled_context context = {
156  .function = function,
157  .argument = argument,
158  .tile_range_j = caffe2::FixedDivisor<int>(tile_range_j),
159  .range_i = range_i,
160  .range_j = range_j,
161  .tile_i = tile_i,
162  .tile_j = tile_j
163  };
164  pthreadpool_compute_1d(threadpool, (pthreadpool_function_1d_t) compute_2d_tiled, &context, tile_range_i * tile_range_j);
165  }
166 }