Caffe2 - Python API
A deep learning, cross platform ML framework
muji.py
1 ## @package muji
2 # Module caffe2.python.muji
3 """muji.py does multi-gpu training for caffe2 with no need to change the c++
4 side code. Everything is defined on the computation graph level.
5 
6 Currently, here are the assumptions: we only support the following use cases:
7  - 2 gpus, where peer access is enabled between them.
8  - 4 gpus, where peer access are enabled between all of them.
9  - 8 gpus, where peer access are enabled in two groups,
10  between {1, 2, 3, 4} and {5, 6, 7, 8}.
11 """
12 
13 from caffe2.proto import caffe2_pb2
14 
15 
16 def OnGPU(gpu_id):
17  """A utility function that returns a device option protobuf of the
18  specified gpu id.
19  """
20  device_option = caffe2_pb2.DeviceOption()
21  device_option.device_type = caffe2_pb2.CUDA
22  device_option.cuda_gpu_id = gpu_id
23  return device_option
24 
25 
26 def OnCPU():
27  device_option = caffe2_pb2.DeviceOption()
28  device_option.device_type = caffe2_pb2.CPU
29  return device_option
30 
31 
32 def Allreduce(net, blobs, reduced_affix="_reduced", gpu_indices=None):
33  """The general Allreduce interface that reroutes the function calls.
34  """
35  if gpu_indices is None:
36  gpu_indices = list(range(len(blobs)))
37  if len(gpu_indices) != len(blobs):
38  raise RuntimeError(
39  "gpu_indices length and blobs length mismatch: %d vs %d" %
40  (len(gpu_indices), len(blobs))
41  )
42  if len(blobs) == 2:
43  return Allreduce2(net, blobs, reduced_affix, gpu_indices)
44  elif len(blobs) == 4:
45  return Allreduce4(net, blobs, reduced_affix, gpu_indices)
46  elif len(blobs) == 8:
47  return Allreduce8(net, blobs, reduced_affix, gpu_indices)
48  else:
49  return AllreduceFallback(net, blobs, reduced_affix, gpu_indices)
50 
51 
52 def Allreduce2(net, blobs, reduced_affix, gpu_indices):
53  """Allreduce for 2 gpus.
54 
55  Algorithm: 0r <- 0 + 1, 1r <- 0r, where r means "reduced"
56  """
57  a, b = blobs
58  gpu_a, gpu_b = gpu_indices
59  a_reduced = net.Add([a, b], a + reduced_affix, device_option=OnGPU(gpu_a))
60  b_reduced = a_reduced.Copy(
61  [],
62  b + reduced_affix,
63  device_option=OnGPU(gpu_b)
64  )
65  return a_reduced, b_reduced
66 
67 
68 def Allreduce4(net, blobs, reduced_affix, gpu_indices):
69  """Allreduce for 4 gpus.
70 
71  Algorithm: 2 level reduction.
72  0r <- 0 + 1, 2r <- 2 + 3
73  0r <- 0r + 2r
74  2r <- 0r,
75  1r <- 0r, 3r <- 2r
76  """
77  a, b, c, d = blobs
78  gpu_a, gpu_b, gpu_c, gpu_d = gpu_indices
79  # a_reduced <- a+b, c_reduced <- c + d
80  a_reduced = net.Add(
81  [a, b],
82  str(a) + reduced_affix,
83  device_option=OnGPU(gpu_a)
84  )
85  c_reduced = net.Add(
86  [c, d],
87  str(c) + reduced_affix,
88  device_option=OnGPU(gpu_c)
89  )
90  # a_reduced <- a_reduced + c_reduced
91  a_reduced = a_reduced.Add(c_reduced, a_reduced, device_option=OnGPU(gpu_a))
92  # broadcast a_reduced to c_reduced
93  c_reduced = a_reduced.Copy([], c_reduced, device_option=OnGPU(gpu_c))
94  # broadcast to b and d
95  b_reduced = a_reduced.Copy(
96  [],
97  str(b) + reduced_affix,
98  device_option=OnGPU(gpu_b)
99  )
100  d_reduced = c_reduced.Copy(
101  [],
102  str(d) + reduced_affix,
103  device_option=OnGPU(gpu_d)
104  )
105  return a_reduced, b_reduced, c_reduced, d_reduced
106 
107 
108 def Allreduce8(net, blobs, reduced_affix, gpu_indices):
109  """Allreduce for 8 gpus.
110 
111  Algorithm: 3 level reduction.
112  0r <- 0 + 1, 2r <- 2 + 3, 4r <- 4 + 5, 6r <- 6 + 7
113  0r <- 0r + 2r, 4r <- 4r + 6r
114  0r <- 0r + 4r
115  4r <- 0r
116  2r <- 0r, 6r <- 4r
117  1r <- 0r, 3r <- 2r, 5r <- 4r, 7r <- 6r
118  """
119  reduced = [None] * 8
120  # Reduction level 1
121  for i in [0, 2, 4, 6]:
122  reduced[i] = net.Add(
123  [blobs[i], blobs[i + 1]],
124  blobs[i] + reduced_affix,
125  device_option=OnGPU(gpu_indices[i])
126  )
127  # Reduction level 2
128  for i in [0, 4]:
129  reduced[i] = net.Add(
130  [reduced[i], reduced[i + 2]],
131  str(blobs[i]) + reduced_affix,
132  device_option=OnGPU(gpu_indices[i])
133  )
134  # Reduction level 3: this involves a copy.
135  reduced_4_copy = reduced[4].Copy(
136  [],
137  str(reduced[4]) + '_copy',
138  device_option=OnGPU(gpu_indices[0])
139  )
140  reduced[0] = reduced[0].Add(
141  reduced_4_copy,
142  reduced[0],
143  device_option=OnGPU(gpu_indices[0])
144  )
145  # Broadcast level 1
146  reduced[4] = reduced[0].Copy(
147  [],
148  reduced[4],
149  device_option=OnGPU(gpu_indices[4])
150  )
151  # Broadcast level 2
152  for i in [2, 6]:
153  reduced[i] = reduced[i - 2].Copy(
154  [],
155  reduced[i],
156  device_option=OnGPU(gpu_indices[i])
157  )
158  # Broadcast level 3
159  for i in [1, 3, 5, 7]:
160  reduced[i] = reduced[i - 1].Copy(
161  [],
162  blobs[i] + reduced_affix,
163  device_option=OnGPU(gpu_indices[i])
164  )
165  return reduced
166 
167 
168 def AllreduceFallback(net, blobs, reduced_affix, gpu_indices):
169  """A fallback option for Allreduce with no assumption on p2p.
170 
171  Algorithm: a flat operation on gpu 0
172  0r <- 0
173  0r <- 0r + i for i in gpu_indices[1:]
174  ir <- 0r for i in gpu_indices[1:]
175  """
176  reduced = [None] * len(gpu_indices)
177  # copy first
178  reduced[0] = net.Copy(
179  blobs[0],
180  blobs[0] + reduced_affix,
181  device_option=OnGPU(gpu_indices[0])
182  )
183  # do temp copy and add
184  temp_name = reduced[0] + '_temp_copy'
185  for i in range(1, len(gpu_indices)):
186  temp = net.Copy(
187  blobs[i],
188  temp_name,
189  device_option=OnGPU(gpu_indices[0])
190  )
191  reduced[0] = reduced[0].Add(
192  temp,
193  reduced[0],
194  device_option=OnGPU(gpu_indices[0])
195  )
196  # Broadcast to everyone else
197  for i in range(1, len(gpu_indices)):
198  reduced[i] = net.Copy(
199  reduced[0],
200  blobs[i] + reduced_affix,
201  device_option=OnGPU(gpu_indices[i])
202  )
203  return reduced