Caffe2 - Python API
A deep learning, cross platform ML framework
data_parallel_model_utils.py
1 ## @package data_parallel_model_utils
2 # Module caffe2.python.data_parallel_model_utils
3 from __future__ import absolute_import
4 from __future__ import division
5 from __future__ import print_function
6 
7 from future.utils import viewitems, viewkeys, viewvalues
8 
9 import logging
10 
11 from caffe2.python import core
12 from caffe2.python.data_parallel_model import stripBlobName
13 
14 log = logging.getLogger("data_parallel_model_utils")
15 log.setLevel(logging.INFO)
16 
17 
18 def GetActivationBlobs(model):
19  # Hacky way to get activations, think of a better way
20  activations = []
21  first_gpu_prefix = "{}_{}/".format(model._device_prefix, model._devices[0])
22 
23  all_inputs = set()
24  for op in model.net.Proto().op:
25  for inp in op.input:
26  all_inputs.add(inp)
27 
28  params = set(model.GetParams(''))
29 
30  for op in model.net.Proto().op:
31  for b in op.output:
32  if b.startswith(first_gpu_prefix) and not b.endswith("_grad"):
33  if b in all_inputs and b not in params and b + "_grad" in all_inputs:
34  activations.append(stripBlobName(b))
35  return activations
36 
37 
38 def _ShiftActivationDevices(model, activations, from_device, to_device):
39  prefix = "{}_{}/".format(model._device_prefix, from_device)
40  activations = set([prefix + a for a in activations])
41  all_activations = set([prefix + a for a in GetActivationBlobs(model)])
42  ops = list(op for op in model.net.Proto().op if
43  op.device_option.cuda_gpu_id == from_device)
44  device_mapping = {a: to_device for a in activations}
45  device_mapping.update({b: from_device for b in all_activations if
46  b not in activations})
47 
48  # Assign each blob to a device in a label propagation manner. activations
49  # override, and if multiple activations in same op, the output activations
50  # determine.
51  for op in ops:
52  op_device = None
53  for b in list(op.input) + list(op.output):
54  if b in device_mapping:
55  if b in all_activations or op_device is None:
56  op_device = device_mapping[b]
57  if op_device is None:
58  op_device = op.device_option.cuda_gpu_id
59  for b in list(op.input) + list(op.output):
60  if b not in device_mapping and b.startswith(prefix):
61  device_mapping[b] = op_device
62  op.device_option.cuda_gpu_id = op_device
63 
64  # Change param_init_net accordingly
65  for op in model.param_init_net.Proto().op:
66  if op.output[0] in device_mapping:
67  op.device_option.cuda_gpu_id = device_mapping[op.output[0]]
68 
69 
70 def ShiftActivationDevices(model, activations, shifts):
71  '''
72  Function to enable simple model-parallellism for data_parallel_model
73  models. 'shifts' is a dictionary from_gpu -> to_gpu, and activations is
74  a list of activation blobs (wout gpu_x/ prefix -- use GetActivationBlobs()).
75 
76  Operators handling these activations are shifted to the gpu declared in
77  'shifts'. Also related operators such as gradient operators will be moved.
78  Appropriate copy-ops are inserted.
79 
80  This allows shifting memory usage from one gpu to another, enabling bigger
81  models to be trained.
82  '''
83  assert set(viewvalues(shifts)).intersection(set(viewkeys(shifts))) == set()
84  for from_device, to_device in viewitems(shifts):
85  log.info(
86  "Shifting {} activations from {} --> {}".
87  format(len(activations), from_device, to_device)
88  )
89  _ShiftActivationDevices(model, activations, from_device, to_device)
90 
91  param_init_net, blob_to_device = core.InjectCrossDeviceCopies(model.param_init_net)
92  net, _blob_to_device = core.InjectCrossDeviceCopies(model.net, blob_to_device)
93  model.param_init_net = param_init_net
94  model.net = net