3 from __future__
import absolute_import
4 from __future__
import division
5 from __future__
import print_function
7 from collections
import OrderedDict
8 from future.utils
import viewitems, viewkeys, viewvalues
13 model_helper, dyndep, scope, workspace, core, memonger, utils
14 from caffe2.proto
import caffe2_pb2
18 dyndep.InitOpsLibrary(
"@/caffe2/caffe2/contrib/nccl:nccl_ops")
19 dyndep.InitOpsLibrary(
"@/caffe2/caffe2/contrib/gloo:gloo_ops")
20 dyndep.InitOpsLibrary(
"@/caffe2/caffe2/contrib/gloo:gloo_ops_gpu")
22 log = logging.getLogger(
"data_parallel_model")
23 log.setLevel(logging.INFO)
25 _DEFAULT_TIMEOUT_SEC = 30
28 def Parallelize_GPU(*args, **kwargs):
29 kwargs[
'cpu_device'] =
False 30 Parallelize(*args, **kwargs)
33 def Parallelize_CPU(*args, **kwargs):
34 kwargs[
'cpu_device'] =
True 35 Parallelize(*args, **kwargs)
41 forward_pass_builder_fun,
42 param_update_builder_fun=
None,
43 optimizer_builder_fun=
None,
44 post_sync_builder_fun=
None,
48 broadcast_computed_params=
True,
49 optimize_gradient_memory=
False,
50 dynamic_memory_management=
False,
53 max_concurrent_distributed_ops=16,
55 num_threads_per_device=4,
57 combine_spatial_bn=
False,
60 Function to create a model that can run on many GPUs or CPUs. 61 model_helper_obj: an object of ModelHelper 63 Function that adds the input operators 64 Note: Remember to instantiate reader outside of this 65 function so all devices share same reader object. 66 Signature: input_builder_fun(model) 67 forward_pass_builder_fun: 68 Function to add the operators to the model. 69 Must return list of loss-blob references that 70 are used to build the gradient. Loss scale parameter 71 is passed, as you should scale the loss of your model 72 by 1.0 / the total number of devices. 73 Signature: forward_pass_builder_fun(model, loss_scale) 74 param_update_builder_fun: 75 Function that adds operators that are run after 76 gradient update, such as updating the weights and 77 weight decaying. This is called for each GPU separately. 78 Signature: param_update_builder_fun(model) 79 optimizer_builder_fun: 80 Alternative to param_update_builder_fun, allows one 81 to add an optimizer for the whole model. Called only 82 once, without name or devicescope. 83 post_sync_builder_fun: 84 Function applied after initial parameter sync has been 85 completed, such as keeping multi-precision parameters 87 Signature: post_sync_builder_fun(model) 88 devices: List of GPU ids, such as [0, 1, 2, 3], 89 rendezvous: used for rendezvous in distributed computation, if None 90 then only one node is used. To create rendezvous, 92 net_type: Network type 93 optimize_gradient_memory: whether to apply 'memonger' to share blobs 94 shared_model (only for CPU) use same parameters on each device 95 in gradient computation to reduce memory footprint. 96 dynamic_memory_management: Whether to apply dynamic memory optimization 97 by freeing unused blobs. The underlying (de)allocation 98 uses cached allocator. For GPU training PLEASE MAKE SURE 99 caffe2_cuda_memory_pool is set. 100 blobs_to_keep : A list of blob names to keep and don't free during 101 dynamic memory optimization (for example loss blob). 102 cpu_device Use CPU instead of GPU. 104 When set to True, applies batch normalization across 105 all devices within the node. If False, batch 106 normalization will be done separately for each device. 107 This option is currently only supported on the CPU. 109 assert scope.CurrentDeviceScope()
is None \
110 or scope.CurrentDeviceScope().device_type == caffe2_pb2.CPU, \
111 "Parallelize must be called without device-scope, \ 112 device scope was: {}".format(scope.CurrentDeviceScope())
115 devices = list(range(0, workspace.NumCudaDevices())),
119 if gpu >= workspace.NumCudaDevices():
120 log.warning(
"** Only {} GPUs available, GPUs {} requested".format(
121 workspace.NumCudaDevices(), devices))
123 model_helper_obj._device_type = caffe2_pb2.CUDA
124 model_helper_obj._device_prefix =
"gpu" 125 model_helper_obj._shared_model =
False 127 assert shared_model
is False,
"Shared model only supported on CPU" 129 model_helper_obj._device_type = caffe2_pb2.CPU
130 model_helper_obj._device_prefix =
"cpu" 132 model_helper_obj._shared_model = shared_model
133 if shared_model
and rendezvous
is not None:
134 assert "Shared model only supported on single-node currently" 136 log.info(
"Parallelizing model for devices: {}".format(devices))
137 extra_workers = 8
if rendezvous
is not None else 0
138 num_workers = len(devices) * num_threads_per_device + extra_workers
139 max_concurrent_distributed_ops =\
140 min(max_concurrent_distributed_ops, num_workers - 1)
141 model_helper_obj.net.Proto().num_workers = num_workers
142 model_helper_obj.net.Proto().type = net_type
145 model_helper_obj._devices = devices
146 model_helper_obj._rendezvous = rendezvous
147 model_helper_obj._broadcast_context =
None 148 model_helper_obj._grad_names = []
150 assert isinstance(model_helper_obj, model_helper.ModelHelper)
154 non_datapar_params = copy.copy(model_helper_obj.params)
157 log.info(
"Create input and model training operators")
160 num_shards = 1
if rendezvous
is None else rendezvous[
'num_shards']
161 loss_scale = 1.0 / (len(devices) * num_shards)
163 has_parameter_updates = param_update_builder_fun
is not None or \
164 optimizer_builder_fun
is not None 166 param_update_builder_fun
is not None and 167 optimizer_builder_fun
is not None 168 ),
'Can only specify one of param_update_builder_fun, optimizer_builder_fun' 173 if not has_parameter_updates
and model_helper_obj.init_params:
175 log.warning(
"############# WARNING #############")
176 log.warning(
"Model {}/{} is used for testing/validation but".format(
177 model_helper_obj.name, model_helper_obj))
178 log.warning(
"has init_params=True!")
179 log.warning(
"This can conflict with model training.")
180 log.warning(
"Please ensure model = ModelHelper(init_params=False)")
181 log.warning(
'####################################')
185 for device
in devices:
186 device_opt = core.DeviceOption(model_helper_obj._device_type, device)
187 with core.DeviceScope(device_opt):
188 with core.NameScope(
"{}_{}".format(model_helper_obj._device_prefix,
190 log.info(
"Model for {} : {}".format(device_name, device))
191 input_builder_fun(model_helper_obj)
192 losses = forward_pass_builder_fun(model_helper_obj, loss_scale)
194 if has_parameter_updates:
195 assert isinstance(losses, list), \
196 'Model builder function must return list of loss blobs' 198 assert isinstance(loss, core.BlobReference), \
199 'Model builder func must return list of loss blobs' 201 losses_by_gpu[device] = losses
202 _ValidateParams(model_helper_obj.params)
205 model_helper_obj._device_grouped_blobs =\
206 _GroupByDevice(model_helper_obj, devices,
207 model_helper_obj.params, non_datapar_params)
210 computed_params_grouped =\
211 _GroupByDevice(model_helper_obj, devices,
212 model_helper_obj.GetComputedParams(
''), [])
213 model_helper_obj._device_grouped_blobs.update(computed_params_grouped)
215 model_helper_obj._param_names =\
216 list(viewkeys(model_helper_obj._device_grouped_blobs))
217 model_helper_obj._computed_param_names =\
218 list(viewkeys(computed_params_grouped))
220 if not has_parameter_updates:
221 log.info(
"Parameter update function not defined --> only forward")
222 _InferBlobDevice(model_helper_obj)
225 log.info(
"Adding gradient operators")
226 _AddGradientOperators(devices, model_helper_obj, losses_by_gpu)
228 if combine_spatial_bn:
229 assert(cpu_device), \
230 'combine_spatial_bn is currently only supported on the CPU' 231 assert(has_parameter_updates), \
232 'combine_spatial_bn should only be used for train model' 233 _InterleaveOps(model_helper_obj)
234 _InterDeviceBatchNormalization(model_helper_obj)
236 _ValidateParams(model_helper_obj.params)
239 param_to_grad = model_helper_obj.param_to_grad
240 grads_ordered = [param_to_grad[p]
for p
in 241 model_helper_obj.params
if p
in param_to_grad]
242 non_datapar_grads = [param_to_grad[p]
for p
in non_datapar_params]
244 gradients_grouped = _GroupByDevice(
250 model_helper_obj._device_grouped_blobs.update(gradients_grouped)
251 model_helper_obj._grad_names = list(viewkeys(gradients_grouped))
252 model_helper_obj._losses_by_gpu = losses_by_gpu
254 _InferBlobDevice(model_helper_obj)
256 log.info(
"Add gradient all-reduces for SyncSGD")
257 if broadcast_computed_params:
258 _BroadcastComputedParams(devices, model_helper_obj, rendezvous, use_nccl)
260 if len(model_helper_obj._grad_names) > 0:
262 reverse_ordered_grads = _GetReverseOrderedGrads(model_helper_obj)
263 assert(len(reverse_ordered_grads) > 0)
265 reverse_ordered_grads,
268 model_helper_obj.net,
271 max_concurrent_distributed_ops,
274 log.info(
"NOTE: Param builder function did not create any parameters.")
276 log.info(
"Post-iteration operators for updating params")
277 num_shards = 1
if rendezvous
is None else rendezvous[
'num_shards']
279 all_params = set(model_helper_obj.GetParams(
''))
281 _PruneParametersForSharing(model_helper_obj)
283 if param_update_builder_fun
is not None:
284 for device
in devices:
285 device_opt = core.DeviceOption(model_helper_obj._device_type, device)
286 with core.DeviceScope(device_opt):
288 "{}_{}".format(model_helper_obj._device_prefix, device)
290 param_update_builder_fun(model_helper_obj)
292 log.info(
"Calling optimizer builder function")
293 optimizer = optimizer_builder_fun(model_helper_obj)
294 model_helper_obj._optimizer = optimizer
296 (sync_blobs, sync_names) = _ComputeBlobsToSync(model_helper_obj)
297 sync_blobs_grouped = _GroupByDevice(
303 model_helper_obj._device_grouped_blobs.update(sync_blobs_grouped)
305 _InferBlobDevice(model_helper_obj)
306 _AnalyzeOperators(model_helper_obj)
310 arg = model_helper_obj.Proto().arg.add()
311 arg.name =
"first_iter_only_one_worker" 315 log.info(
"Add initial parameter sync")
319 model_helper_obj.param_init_net,
320 model_helper_obj.param_init_net,
323 max_concurrent_distributed_ops=1
328 if post_sync_builder_fun
is not None:
329 for device
in devices:
330 device_opt = core.DeviceOption(model_helper_obj._device_type, device)
331 with core.DeviceScope(device_opt):
333 "{}_{}".format(model_helper_obj._device_prefix, device)
335 post_sync_builder_fun(model_helper_obj)
337 assert not (optimize_gradient_memory
and dynamic_memory_management), \
338 """It is not advised to use gradient optimization ('memonger') 339 with dynamic memory management.""" 341 if optimize_gradient_memory:
342 _OptimizeGradientMemorySimple(model_helper_obj, losses_by_gpu, devices)
344 if dynamic_memory_management:
345 _AddDynamicMemoryOptimization(model_helper_obj, blobs_to_keep, devices)
347 model_helper_obj._data_parallel_model_init_nets = [
348 model_helper_obj.param_init_net,
350 model_helper_obj._data_parallel_model_nets = [model_helper_obj.net]
353 _RemapParameterBlobsForSharedModel(model_helper_obj, all_params)
356 def Parallelize_GPU_BMUF(*args, **kwargs):
357 kwargs[
'cpu_device'] =
False 358 Parallelize_BMUF(*args, **kwargs)
361 def Parallelize_CPU_BMUF(*args, **kwargs):
362 kwargs[
'cpu_device'] =
True 363 Parallelize_BMUF(*args, **kwargs)
366 def Parallelize_BMUF(
369 forward_pass_builder_fun,
370 param_update_builder_fun,
371 block_learning_rate=1.0,
379 optimize_gradient_memory=
False,
380 reset_momentum_sgd=
False,
381 warmup_iterations=
None,
382 max_concurrent_distributed_ops=4,
383 add_blobs_to_sync=
None,
384 num_threads_per_device=4,
388 Function to create model that run on many GPUs and creates a net for 389 parameter_updates that can be run independently for number of iterations 390 then followed by another net that runs once to compute the final parameter 391 updates according to block wise model update filtering rule described 392 in : Scalable Training of Deep Learning Machines by Incremental Block 393 Training with Intra-block Parallel Optimization and Blockwise Model-Update 394 Filtering (ICASSP 2016). 396 assert scope.CurrentDeviceScope()
is None \
397 or scope.CurrentDeviceScope().device_type == caffe2_pb2.CPU, \
398 "Parallelize must be called without device-scope, \ 399 device scope was: {}".format(scope.CurrentDeviceScope())
401 assert isinstance(model_helper_obj, model_helper.ModelHelper)
404 devices = list(range(0, workspace.NumCudaDevices()))
405 if master_device
is None:
406 master_device = devices[0]
410 if gpu >= workspace.NumCudaDevices():
411 log.warning(
"** Only {} GPUs available, GPUs {} requested".format(
412 workspace.NumCudaDevices(), devices))
414 model_helper_obj._device_type = caffe2_pb2.CUDA
415 model_helper_obj._device_prefix =
"gpu" 417 model_helper_obj._device_type = caffe2_pb2.CPU
418 model_helper_obj._device_prefix =
"cpu" 420 model_helper_obj._devices = devices
421 model_helper_obj._rendezvous = rendezvous
422 model_helper_obj._broadcast_context =
None 423 model_helper_obj._shared_model =
False 424 master_dev_opt = core.DeviceOption(model_helper_obj._device_type, master_device)
427 num_shards = rendezvous[
'num_shards']
if rendezvous
else 1
429 num_devices = len(devices) * num_shards
431 num_workers = num_threads_per_device * len(devices)
435 loss_scale = 1.0 / num_devices
436 if block_momentum
is None:
437 block_momentum = 1.0 - 1.0 / num_devices
439 max_concurrent_distributed_ops = min(
440 max_concurrent_distributed_ops,
444 model_helper_obj.net.Proto().num_workers = num_workers
445 model_helper_obj.net.Proto().type = net_type
449 model_helper_obj._global_model_init_net = core.Net(
'global_model_init')
450 model_helper_obj._global_model_init_net.Proto().type = net_type
451 model_helper_obj._global_model_init_net.Proto().num_workers = \
456 model_helper_obj._global_model_param_updates_net = core.Net(
'global_model')
457 model_helper_obj._global_model_param_updates_net.Proto().type = net_type
458 model_helper_obj._global_model_param_updates_net.Proto().num_workers = \
462 return "{}_v".format(param)
465 return "{}_g".format(param)
468 return "{}_prev".format(param)
472 non_datapar_params = copy.copy(model_helper_obj.params)
473 model_helper_obj._losses_by_gpu = {}
475 def _InitializeModels(gpu_id):
476 input_builder_fun(model_helper_obj)
477 loss = forward_pass_builder_fun(model_helper_obj, loss_scale)
478 model_helper_obj._losses_by_gpu[gpu_id] = loss
482 device_type=model_helper_obj._device_type,
483 device_prefix=model_helper_obj._device_prefix,
486 _ValidateParams(model_helper_obj.params)
488 model_helper_obj._device_grouped_blobs =\
489 _GroupByDevice(model_helper_obj, devices,
490 model_helper_obj.params, non_datapar_params)
492 model_helper_obj._param_names =\
493 list(viewkeys(model_helper_obj._device_grouped_blobs))
495 _AddGradientOperators(
496 devices, model_helper_obj, model_helper_obj._losses_by_gpu
498 _ValidateParams(model_helper_obj.params)
500 _InferBlobDevice(model_helper_obj)
502 def _InitializeParamUpdate(gpu_id):
503 param_update_builder_fun(model_helper_obj)
506 _InitializeParamUpdate,
507 device_type=model_helper_obj._device_type,
508 device_prefix=model_helper_obj._device_prefix,
512 model_parameter_names = list(
513 viewkeys(model_helper_obj._device_grouped_blobs)
515 if warmup_iterations
is not None:
516 model_helper_obj._warmup_iterations = warmup_iterations
519 model_helper_obj._warmup_broadcast = core.Net(
'warmup-broadcast')
520 model_helper_obj._warmup_broadcast.Proto().type = net_type
521 model_helper_obj._warmup_broadcast.Proto().num_workers = \
527 model_helper_obj.param_init_net,
528 model_helper_obj._warmup_broadcast,
530 model_parameter_names,
531 max_concurrent_distributed_ops
533 for param_name
in viewkeys(model_helper_obj._device_grouped_blobs):
534 param = model_helper_obj._device_grouped_blobs[param_name][master_device]
535 with core.DeviceScope(master_dev_opt):
536 model_helper_obj._warmup_broadcast.Copy(param, _g(param))
539 for param_name
in viewkeys(model_helper_obj._device_grouped_blobs):
540 param = model_helper_obj._device_grouped_blobs[param_name][master_device]
541 with core.DeviceScope(master_dev_opt):
542 model_helper_obj._global_model_init_net.ConstantFill(
543 param, _v(param), value=0.0
545 model_helper_obj._global_model_init_net.Copy(param, _g(param))
547 model_helper_obj._global_model_init_net.ConstantFill(
548 param, _v_prev(param), value=0.0
556 model_parameter_names,
559 model_helper_obj._global_model_param_updates_net,
562 max_concurrent_distributed_ops
574 for param_name
in model_parameter_names:
575 param = model_helper_obj._device_grouped_blobs[param_name][master_device]
576 with core.DeviceScope(master_dev_opt):
578 model_helper_obj._global_model_param_updates_net.Scale(
579 param, param, scale=1.0 / num_devices
581 model_helper_obj._global_model_param_updates_net.Sub(
582 [param, _g(param)], param
584 model_helper_obj._global_model_param_updates_net.Scale(
585 param, param, scale=block_learning_rate
587 model_helper_obj._global_model_param_updates_net.Scale(
588 _v(param), _v(param), scale=block_momentum
590 model_helper_obj._global_model_param_updates_net.Add(
591 [_v(param), param], _v(param)
593 model_helper_obj._global_model_param_updates_net.Add(
594 [_g(param), _v(param)], _g(param)
597 model_helper_obj._global_model_param_updates_net.Sub(
598 [_v(param), _v_prev(param)], _v_prev(param)
600 model_helper_obj._global_model_param_updates_net.Scale(
601 _v_prev(param), _v_prev(param), scale=block_momentum
603 model_helper_obj._global_model_param_updates_net.Sub(
604 [_g(param), _v_prev(param)], _g(param)
606 model_helper_obj._global_model_param_updates_net.Copy(
607 _v(param), _v_prev(param)
609 model_helper_obj._global_model_param_updates_net.Copy(
617 model_helper_obj.param_init_net,
618 model_helper_obj._global_model_param_updates_net,
620 model_parameter_names,
621 max_concurrent_distributed_ops
625 if add_blobs_to_sync
is not None:
629 net=model_helper_obj._global_model_param_updates_net)
632 if reset_momentum_sgd:
633 momentum_ops = [op
for op
in model_helper_obj.net.Proto().op
634 if op.type ==
'MomentumSGDUpdate']
635 for op
in momentum_ops:
636 momentum_blob = op.input[1]
637 with core.DeviceScope(op.device_option):
638 model_helper_obj._global_model_param_updates_net.ConstantFill(
639 [momentum_blob], momentum_blob, value=0.0
642 if optimize_gradient_memory:
643 _OptimizeGradientMemorySimple(
644 model_helper_obj, model_helper_obj._losses_by_gpu, devices
647 model_helper_obj._data_parallel_model_init_nets = [
648 model_helper_obj.param_init_net,
649 model_helper_obj._global_model_init_net
652 model_helper_obj._data_parallel_model_nets = [
653 model_helper_obj.net,
654 (model_helper_obj._global_model_param_updates_net, 1)
658 def RunInitNet(model):
659 for init_net
in model._data_parallel_model_init_nets:
660 workspace.RunNetOnce(init_net)
661 for net_iters
in model._data_parallel_model_nets:
662 if isinstance(net_iters, tuple):
663 workspace.CreateNet(net_iters[0])
665 workspace.CreateNet(net_iters)
668 def RunWarmup(model):
669 workspace.RunNet(model.net, model._warmup_iterations)
670 workspace.RunNetOnce(model._warmup_broadcast)
673 def RunNet(model, num_iterations):
674 for net_iter
in model._data_parallel_model_nets:
675 if isinstance(net_iter, tuple):
676 workspace.RunNet(net_iter[0].Proto().name, net_iter[1])
678 workspace.RunNet(net_iter, num_iterations)
684 def Synchronize(model, timeout_sec=_DEFAULT_TIMEOUT_SEC):
685 if model._rendezvous
is None or model._rendezvous[
'num_shards'] <= 1:
689 log.info(
"Creating synchronization barrier net")
690 assert model._rendezvous[
'engine'] ==
'GLOO',
"Engine does not support barrier" 691 global barrier_instance
692 instance = barrier_instance
693 barrier_instance += 1
694 barrier_net = core.Net(
"sync_barrier_net_" + str(instance))
695 comm_world = _CreateOrCloneCommonWorld(
697 "sync_barrier_cw_" + str(instance),
698 rendezvous=model._rendezvous,
699 status_blob=
"sync_barrier_cw_status_" + str(instance),
700 timeout_sec=timeout_sec,
705 engine=model._rendezvous[
'engine'],
706 status_blob=
"sync_barrier_status_" + str(instance),
708 workspace.RunNetOnce(barrier_net)
711 def ConvertNetForDevice(net, device=None):
713 Converts all blobs in the net to have namescope gpu_X, and correct 714 device scope. You can use this to enable AppendNet with a 715 forward_pass_builder_fun: 717 def builder_fun(model): 720 data_parallel_model.ConvertNetForDevice(othermodel.net)) 721 model.param_init_net.AppendNet( 722 data_parallel_model.ConvertNetForDevice(othermodel.param_init_net)) 724 mnet = copy.deepcopy(net)
727 device = scope.CurrentDeviceScope()
729 device_prefix =
"gpu" if device.device_type == caffe2_pb2.CUDA
else "cpu" 731 namescope =
"{}_{}/".format(device_prefix, device.cuda_gpu_id)
732 for op
in mnet.Proto().op:
733 if "RecurrentNetwork" in op.type:
734 raise(
"RecurrentNetwork conversion not yet supported")
735 for i, inputb
in enumerate(op.input):
736 op.input[i] = namescope + inputb
737 for i, outputb
in enumerate(op.output):
738 op.output[i] = namescope + outputb
739 for i, blob
in enumerate(op.control_input):
740 op.control_input[i] = namescope + blob
741 op.device_option.CopyFrom(device)
742 for i, einp
in enumerate(mnet.Proto().external_input):
743 mnet.Proto().external_input[i] = namescope + einp
744 for i, eoutp
in enumerate(mnet.Proto().external_output):
745 mnet.Proto().external_output[i] = namescope + eoutp
749 def _ForEachDevice(devices, f, device_type, device_prefix, scoped=False,
751 for device
in devices:
752 device_opt = core.DeviceOption(device_type, device)
753 with core.DeviceScope(device_opt):
755 with core.NameScope(
"{}_{}".format(device_prefix, device)):
756 f(device, *args, **kwargs)
758 f(device, *args, **kwargs)
761 def _AddGradientOperators(devices, model, losses_by_gpu):
762 def create_grad(lossp):
763 return model.ConstantFill(lossp, str(lossp) +
"_grad", value=1.0)
767 for gpu_id
in devices:
768 device = core.DeviceOption(model._device_type, gpu_id)
769 with core.DeviceScope(device):
770 for l
in losses_by_gpu[gpu_id]:
772 loss_grad[str(l)] = str(lg)
774 model.AddGradientOperators(loss_grad)
777 def ExtractPredictorNet(model, inputs, outputs, device):
779 Returns (net, params) that can be exported to be used as a prediction 782 master_device = model._devices[0]
783 prefix =
"{}_{}/".format(model._device_prefix, master_device)
784 prefix_inputs = [prefix + str(b)
for b
in inputs]
785 prefix_outputs = [prefix + str(b)
for b
in outputs]
786 (predictor_net, export_blobs) = model_helper.ExtractPredictorNet(
787 net_proto=model.net.Proto(),
788 input_blobs=prefix_inputs,
789 output_blobs=prefix_outputs,
793 for (a, b)
in zip(prefix_inputs + prefix_outputs, inputs + outputs)
797 return (predictor_net, export_blobs)
800 def GetCheckpointParams(model):
802 Returns a set of blobs that are needed for a complete check point. 803 They are blobs for the first gpu and iteration blobs. 805 (all_blobs, _) = _ComputeBlobsToSync(model)
810 .startswith(
"{}_{}/".format(model._device_prefix, model._devices[0]))
815 iteration_blobs = set()
816 for op
in model.net.Proto().op:
817 if op.type ==
'Iter' or op.type ==
'AtomicIter':
818 if not op.output[0].startswith(
"{}_".format(model._device_prefix)):
819 iteration_blobs.add(op.output[0])
821 return first_gpu_blobs.union(iteration_blobs)
824 def FinalizeAfterCheckpoint(model, blobs=None):
826 This function should be called after loading parameters from a 827 checkpoint / initial parameters file. 830 if not hasattr(model,
"_checkpoint_net"):
832 (_, uniq_blob_names) = _ComputeBlobsToSync(model)
834 uniq_blob_names = [stripBlobName(p)
for p
in blobs]
838 log.info(
"Creating checkpoint synchronization net")
839 devices = model.GetDevices()
840 for name
in uniq_blob_names:
841 if name
not in model._device_grouped_blobs:
844 core.BlobReference(
"{}_{}{}{}".format(
845 model._device_prefix,
847 scope._NAMESCOPE_SEPARATOR,
850 model._device_grouped_blobs[name] = grouped
852 model._checkpoint_net = core.Net(
"checkpoint_sync_net")
853 model._checkpoint_net.RunAllOnGPU()
855 checkpoint_init_net =
None 856 if (model._rendezvous
is not None and model._rendezvous[
'num_shards'] > 1):
857 checkpoint_init_net = core.Net(
"checkpoint_init_net")
858 checkpoint_init_net.RunAllOnGPU()
864 model._checkpoint_net,
867 max_concurrent_distributed_ops=1
869 if (checkpoint_init_net):
870 workspace.RunNetOnce(checkpoint_init_net)
872 workspace.CreateNet(model._checkpoint_net)
875 log.info(
"Run checkpoint net")
876 workspace.RunNet(model._checkpoint_net.Proto().name)
879 def GetLearningRateBlobNames(model):
881 Returns a list of learning rates blob names used in the optimizer. 883 if model._optimizer
is not None:
884 if model._device_type == caffe2_pb2.CPU:
885 return [model._optimizer.get_cpu_blob_name(
'lr')]
886 elif model._device_type == caffe2_pb2.CUDA:
887 return [model._optimizer.get_gpu_blob_name(
'lr', gpu,
'')
888 for gpu
in model._devices]
891 "Unsupported device type : {}".format(model._device_type)
895 for op
in model.net.Proto().op:
896 if op.type ==
"LearningRate":
897 lr_blob_names.append(op.output(0))
901 def _Broadcast(devices, model, net, param, use_nccl=False):
903 master_dev = devices[0]
906 if _IsGPUBlob(model, param):
907 master_device_opt = core.DeviceOption(model._device_type, master_dev)
908 with core.DeviceScope(master_device_opt):
913 list(viewvalues(model._device_grouped_blobs[param])),
914 list(viewvalues(model._device_grouped_blobs[param])),
919 for dev_idx
in devices[1:]:
920 if _IsGPUBlob(model, param):
921 device_opt = core.DeviceOption(caffe2_pb2.CUDA, dev_idx)
923 device_opt = core.DeviceOption(caffe2_pb2.CPU, 0)
924 with core.DeviceScope(device_opt):
926 model._device_grouped_blobs[param][master_dev],
927 model._device_grouped_blobs[param][dev_idx]
931 def _AllReduce(devices, model, net, param, use_nccl=False, control_input=None):
932 blobs_group = list(viewvalues(model._device_grouped_blobs[param]))
933 if model._device_type == caffe2_pb2.CUDA
and use_nccl:
936 blobs_group, blobs_group, control_input=control_input
940 if model._device_type == caffe2_pb2.CUDA:
941 p2p_access_pattern = workspace.GetCudaPeerAccessPattern()
943 p2p_access_pattern =
None 945 def sumN(*dev_indices):
946 """Create a Sum op for 2 or more blobs on different devices. 947 Saves the result on the first device. 950 dev_indices -- a list of device indices, which can be translated into 951 CUDA identifiers with model._devices 953 devices = [model._devices[idx]
for idx
in dev_indices]
954 blobs = [blobs_group[idx]
for idx
in dev_indices]
955 for i, peer
in enumerate(devices):
958 if p2p_access_pattern
is not None and not p2p_access_pattern[
962 blobs[i] = model.Copy(
964 'gpu_{}/{}_gpu{}_copy'.format(devices[0], param, peer)
966 device_opt = core.DeviceOption(model._device_type, devices[0])
967 with core.DeviceScope(device_opt):
968 net.Sum(blobs, [blobs[0]], name=
'dpm')
970 if len(devices) == 16:
973 sumN(j * 2, j * 2 + 1)
975 sumN(j * 4, j * 4 + 2)
977 sumN(j * 8, j * 8 + 4)
979 elif len(devices) == 8:
981 sumN(j * 2, j * 2 + 1)
983 sumN(j * 4, j * 4 + 2)
985 elif len(devices) == 4:
990 sumN(*range(len(devices)))
992 _Broadcast(devices, model, net, param)
1002 max_concurrent_distributed_ops=4
1004 if rendezvous
is None or rendezvous[
'num_shards'] <= 1:
1005 _SyncAllParamsSingleHost(devices, model, net, unique_param_names)
1007 _SyncAllParamsDistributed(
1014 max_concurrent_distributed_ops
1018 def AddBlobSync(model, blobs, net=None):
1020 Sync a blob across devices and hosts 1024 net = model.net
if net
is None else net
1026 assert not b.startswith(model._device_prefix), \
1027 "Provide unprefixed blob name: {}".format(b)
1028 model._device_grouped_blobs[b] = {
1029 d: core.BlobReference(
"{}_{}/{}".format(model._device_prefix, d, b))
1030 for d
in model._devices
1036 model.param_init_net,
1042 def AddDistributedBlobSync(model, blobs):
1044 Sync blobs across machines (but not across devices) 1046 if model._rendezvous
is None:
1048 synth_name =
"_".join([str(b)
for b
in blobs])
1049 comm_world = _CreateOrCloneCommonWorld(
1050 model.param_init_net,
1051 "blob_sync_cw_" + synth_name,
1052 rendezvous=model._rendezvous,
1053 status_blob=
"create_blob_sync_cw_{}_cw_status".format(
1058 model.net.Allreduce(
1059 inputs=[comm_world] + blobs,
1061 engine=model._rendezvous[
'engine'],
1062 status_blob=
"blob_sync_allred_{}_status".format(synth_name),
1066 def _SyncAllParamsDistributed(
1073 max_concurrent_distributed_ops
1075 assert rendezvous[
'num_shards'] > 1
1077 gpu_device_opt = core.DeviceOption(model._device_type, devices[0])
1078 cpu_device_opt = core.DeviceOption(caffe2_pb2.CPU)
1080 if model._broadcast_context
is None:
1081 model._broadcast_context = CollectivesConcurrencyControl(
1083 max_concurrent_distributed_ops,
1087 context = model._broadcast_context
1089 for param_name
in sorted(unique_param_names):
1090 master_param = model._device_grouped_blobs[param_name][devices[0]]
1091 params_group = list(viewvalues(model._device_grouped_blobs[param_name]))
1093 def broadcast(params):
1094 comm_world, control_input = context.get_control_and_context(params)
1096 inputs=[comm_world] + params,
1099 engine=rendezvous[
'engine'],
1100 status_blob=
"broadcast_{}_status".format(str(param_name)),
1101 control_input=control_input
1104 device_opt = gpu_device_opt
if _IsGPUBlob(
1106 )
else cpu_device_opt
1108 if rendezvous[
'engine'] ==
'GLOO':
1109 with core.DeviceScope(device_opt):
1110 broadcast(params_group)
1113 with core.DeviceScope(device_opt):
1114 param_cpu = net.CopyGPUToCPU(
1116 str(master_param) +
"cpu" 1118 with core.DeviceScope(cpu_device_opt):
1119 broadcast([param_cpu])
1120 with core.DeviceScope(device_opt):
1121 net.CopyCPUToGPU(param_cpu, master_param)
1124 _Broadcast(devices, model, net, param_name)
1127 def _SyncAllParamsSingleHost(devices, model, net, unique_param_names):
1128 for param
in unique_param_names:
1129 _Broadcast(devices, model, net, param)
1132 def _AllReduceBlobs(blob_names, devices, model, net, rendezvous, use_nccl,
1133 max_concurrent_distributed_ops):
1134 if rendezvous
is None or rendezvous[
'num_shards'] <= 1:
1135 _AllReduceBlobsSingleHost(
1143 _AllReduceBlobsDistributed(
1149 max_concurrent_distributed_ops,
1153 def _PruneParametersForSharing(model):
1154 assert model._shared_model
1155 master_prefix =
"{}_{}/".format(model._device_prefix, model._devices[0])
1159 model.params = model.GetParams(master_prefix)
1160 paramset = set(model.params)
1162 model.param_to_grad = {
1163 p: model.param_to_grad[p]
1164 for p
in model.param_to_grad
if p
in paramset
1166 model.weights = [w
for w
in model.weights
if w
in paramset]
1167 model.biases = [w
for w
in model.biases
if w
in paramset]
1170 def _RemapParameterBlobsForSharedModel(model, all_params):
1171 assert model._shared_model
1172 master_prefix =
"{}_{}/".format(
1173 model._device_prefix, model._devices[0])
1174 log.info(
"Remapping param blobs to master -> {}".format(master_prefix))
1175 master_params = set(model.GetParams())
1178 def modify_ops(net):
1180 for op
in net.Proto().op:
1183 for outp
in op.output:
1184 if outp
in all_params
and outp
not in master_params:
1186 log.debug(
"Delete b/c {}: {}".format(outp, str(op)))
1191 for j, inp
in enumerate(op.input):
1192 if inp
in all_params
and inp
not in master_params:
1193 op.input[j] = master_prefix + stripBlobName(inp)
1195 del net.Proto().op[:]
1196 net.Proto().op.extend(ops)
1198 modify_ops(model.param_init_net)
1199 modify_ops(model.net)
1204 Creates common worlds (up to max_concurrent_context) and manage the 1205 sequential execution of collectives that shares the same context with 1206 cyclic control inputs. 1211 max_concurrent_context,
1223 def get_control_and_context(self, control_output_blob):
1224 common_world, control_input = [
None,
None]
1227 common_world = _CreateOrCloneCommonWorld(
1229 "{}_{}_cw".format(self.
name, current_slot),
1231 status_blob=
"create_{}_cw_{}_status".format(
1236 self.common_worlds.append(common_world)
1237 self.control_inputs.append(control_output_blob)
1243 return common_world, control_input
1246 def _AllReduceBlobsDistributed(
1252 max_concurrent_distributed_ops,
1254 num_workers = model.net.Proto().num_workers
1255 assert num_workers > 1,
"Please specify more than 1 worker" 1256 all_reduce_engine = rendezvous[
'engine']
1258 master_device_opt = core.DeviceOption(model._device_type, devices[0])
1260 reducing_device_opt = master_device_opt
1264 max_concurrent_distributed_ops,
1265 model.param_init_net,
1269 nccl_control_blob =
None 1271 for blob_name
in blob_names:
1272 master_blob = model._device_grouped_blobs[blob_name][devices[0]]
1273 blobs_group = list(viewvalues(model._device_grouped_blobs[blob_name]))
1275 assert master_blob
in blobs_group
1279 reduced_blob = str(master_blob) +
"_red" 1281 def allreduce(blobs, **kwargs):
1282 with core.DeviceScope(reducing_device_opt):
1283 comm_world, control_input = \
1284 context.get_control_and_context(blobs[0])
1286 inputs=[comm_world] + blobs,
1289 engine=all_reduce_engine,
1290 control_input=control_input,
1291 status_blob=
"allreduce_{}_status".format(blob_name),
1295 if rendezvous[
'engine'] ==
'GLOO':
1301 gpu_direct=(rendezvous.get(
"transport",
None) ==
"ibverbs"),
1305 with core.DeviceScope(master_device_opt):
1306 model.ConstantFill(master_blob, reduced_blob, value=0.0)
1312 control_input=nccl_control_blob,
1314 nccl_control_blob = blobs_group[0]
1315 net.Copy(master_blob, reduced_blob)
1318 allreduce([reduced_blob])
1320 with core.DeviceScope(master_device_opt):
1321 net.Copy(reduced_blob, master_blob)
1324 _Broadcast(devices, model, net, blob_name)
1327 def _AllReduceBlobsSingleHost(blob_names, devices, model, net, use_nccl):
1328 """Performs NCCL AllReduce to distribute blobs to all the GPUs.""" 1330 if len(devices) == 1:
1335 master_device_opt = core.DeviceOption(model._device_type, devices[0])
1337 concatenated_idx = set()
1339 for blob_name
in blob_names:
1341 blobs_group = list(viewvalues(model._device_grouped_blobs[blob_name]))
1342 if len(blobs_group) == 1:
1345 assert len(blobs_group) == len(devices), \
1346 "Each GPU from {}, should have a copy of {}.".format(
1349 if _IsGPUBlob(model, blob_name):
1350 with core.DeviceScope(master_device_opt):
1351 if not isinstance(blobs_group[0], core.GradientSlice):
1353 devices, model, net, blob_name, use_nccl, last_out
1356 last_out = blobs_group[0]
1360 master_ns =
"{}_{}".format(model._device_prefix, devices[0])
1362 Skip if we have already copied concatenated indices 1363 to the indices of GradientSlice. This happens when two 1364 or more grad blobs are gathered with the same indices 1367 skip_idx_concat =
False 1368 for g
in blobs_group:
1369 if g.indices
in concatenated_idx:
1370 skip_idx_concat =
True 1372 if not skip_idx_concat:
1373 grad_idx_concat, _ = net.Concat(
1374 [g.indices
for g
in blobs_group],
1375 [
"{}/{}_index_concat".format(master_ns, blob_name),
1376 "{}/{}_index_splitinfo".format(master_ns, blob_name)],
1378 name=
"note:data_parallel_model")
1380 for gpu, g
in viewitems(model._device_grouped_blobs[blob_name]):
1381 device_opt = core.DeviceOption(model._device_type, gpu)
1382 with core.DeviceScope(device_opt):
1383 model.Copy(grad_idx_concat, g.indices)
1384 concatenated_idx.add(g.indices)
1386 grad_val_concat, _ = net.Concat(
1387 [g.values
for g
in blobs_group],
1388 [
"{}/{}_val_concat".format(master_ns, blob_name),
1389 "{}/{}_val_splitinfo".format(master_ns, blob_name)],
1390 axis=0, name=
"note:data_parallel_model")
1392 for gpu, g
in viewitems(model._device_grouped_blobs[blob_name]):
1393 device_opt = core.DeviceOption(model._device_type, gpu)
1394 with core.DeviceScope(device_opt):
1395 model.Copy(grad_val_concat, g.values)
1398 assert not isinstance(blobs_group[0], core.GradientSlice), \
1399 "Synchronizing gradient slices not supported" 1400 with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
1402 net.Sum(blobs_group, [blobs_group[0]])
1403 if not model._shared_model:
1404 _Broadcast(devices, model, net, blob_name)
1407 def _BroadcastComputedParams(devices, model, rendezvous, use_nccl=False):
1408 if rendezvous
is None:
1409 _BroadcastComputedParamsSingleHost(devices, model, use_nccl)
1411 _BroadcastComputedParamsDistributed(devices, model, rendezvous, use_nccl)
1414 def _BroadcastComputedParamsDistributed(
1420 _BroadcastComputedParamsSingleHost(devices, model, use_nccl)
1421 log.warn(
"Distributed broadcast of computed params is not implemented yet")
1424 def _BroadcastComputedParamsSingleHost(devices, model, use_nccl=False):
1426 Average computed params over all devices 1428 if len(devices) == 1:
1431 for param_name
in model._computed_param_names:
1434 _Broadcast(devices, model, model.net, param_name, use_nccl)
1437 def _GetReverseOrderedGrads(model):
1439 Returns the gradients in reverse order (namespace stripped), 1440 for the optimal synchronization order. 1442 return list(reversed(model._grad_names))
1446 def stripBlobName(param):
1448 if isinstance(param, core.GradientSlice):
1449 return stripBlobName(param.indices) +
":" + stripBlobName(param.values)
1452 return name[name.index(scope._NAMESCOPE_SEPARATOR) + 1:]
1455 def _AnalyzeOperators(model):
1457 Look at all the operators and check that they do not cross device scopes 1459 for op
in model.Proto().op:
1460 if "NCCL" in op.type
or "Copy" in op.type
or "Concat" in op.type:
1462 if "Sum" == op.type
and op.name ==
"dpm":
1464 if "Allreduce" in op.type
and "GLOO" in op.engine:
1467 op_dev = op.device_option
1468 op_gpu = op_dev.cuda_gpu_id
1471 if op_dev.device_type != caffe2_pb2.CUDA:
1474 namescope =
"{}_{}/".format(model._device_prefix, op_gpu)
1475 for inp
in list(op.input) + list(op.output):
1476 if inp.startswith(
"{}_".format(model._device_prefix)
1477 )
and not inp.startswith(namescope):
1479 "Blob {} of op {}, should have namescope {}. Op: {}".format(
1482 "{}_{}/".format(model._device_prefix, op_gpu),
1488 def _InferBlobDevice(model):
1490 Assign blob to device option based on the operator outputing it 1496 device_option = op.device_option
1497 if op.type ==
"Iter":
1499 device_option = caffe2_pb2.DeviceOption()
1500 device_option.device_type = caffe2_pb2.CPU
1501 for b
in list(op.input) + list(op.output):
1502 if b
not in mapping:
1503 mapping[b] = device_option
1504 if op.type.startswith(
'RecurrentNetwork'):
1505 step_args = [a
for a
in op.arg
if a.name.endswith(
"step_net")]
1506 for step_arg
in step_args:
1508 map_ops(model.param_init_net.Proto())
1509 map_ops(model.net.Proto())
1510 model._blob_to_device = mapping
1512 def _IsGPUBlob(model, blob_name):
1513 if blob_name
in model._blob_to_device:
1514 return model._blob_to_device[blob_name].device_type == caffe2_pb2.CUDA
1516 blob_name =
"{}_{}/{}".format(
1517 model._device_prefix, model._devices[0], blob_name
1519 if blob_name
not in model._blob_to_device:
1520 return model._device_type == caffe2_pb2.CUDA
1521 return model._blob_to_device[blob_name].device_type == caffe2_pb2.CUDA
1524 def _GroupByDevice(model, devices, params, non_data_params):
1526 Groups blobs by device, returning a map of [blobname] = {0: BlobRef, 1: ..}. 1527 Returns ordered dictionary, ensuring the original order. 1529 grouped = OrderedDict()
1531 params = params[len(non_data_params):]
1533 for _i, p
in enumerate(params):
1535 isinstance(p, core.GradientSlice), \
1536 "Param {} is not BlobReference or GradientSlice".format(p)
1538 name = stripBlobName(p)
1542 gpuid = int(p.GetNameScope().
split(
"_")[1].
split(
"/")[0])
1543 assert "{}_{}/".format(model._device_prefix, gpuid)
in p.GetNameScope(),\
1544 "Param {} expected to have namescope '{}_{}'".format(str(p), model._device_prefix, gpuid)
1546 gpuid = int(p.indices.GetNameScope().
split(
"_")[1].
split(
"/")[0])
1547 assert "{}_{}/".format(model._device_prefix, gpuid)
in p.indices.GetNameScope(),\
1548 "Indices {} expected to have namescope '{}_{}'".format(str(p), model._device_prefix, gpuid)
1549 assert "{}_{}/".format(model._device_prefix, gpuid)
in p.values.GetNameScope(),\
1550 "Values {} expected to have namescope '{}_{}'".format(str(p), model._device_prefix, gpuid)
1552 if name
not in grouped:
1554 grouped[name][gpuid] = p
1559 def _ValidateParams(params):
1560 set_params = set(params)
1561 if len(params) > len(set_params):
1564 for j, p
in enumerate(sp):
1565 if j > 0
and sp[j - 1] == p:
1568 assert len(params) == len(set_params), \
1569 "Duplicate entries in params: {}".format(dupes)
1572 def _ComputeBlobsToSync(model):
1574 We sync all blobs that are generated by param init net and 1575 are 'data parallel', i.e assigned to a device 1580 if model._shared_model:
1581 blobs_to_sync = [str(p)
for p
in model.GetComputedParams(
'')]
1582 sync_names = [stripBlobName(p)
for p
in blobs_to_sync]
1586 for op
in model.param_init_net.Proto().op:
1588 o
for o
in op.output
1589 if o.startswith(
"{}_".format(model._device_prefix))
1591 sync_names.update([stripBlobName(o)
for o
in dp_outputs])
1592 blobs_to_sync.extend(dp_outputs)
1595 diff = set(model._param_names) - sync_names
1596 assert diff == set(), \
1597 "Some params not instantiated in param init net: {}".format(diff)
1600 prefixlen = len(model._device_prefix) + 1
1602 def extract_sort_key(b):
1604 deviceid = int(b[prefixlen:b.index(scope._NAMESCOPE_SEPARATOR)])
1605 return (deviceid, b)
1607 blobs_to_sync = sorted(
1608 list(set(blobs_to_sync)),
1609 key=extract_sort_key)
1612 return (blobs_to_sync, sync_names)
1615 def _OptimizeGradientMemorySimple(model, losses_by_gpu, devices):
1616 log.warning(
"------- DEPRECATED API, please use " +
1617 "data_parallel_model.OptimizeGradientMemory() ----- ")
1618 for device
in devices:
1619 namescope =
"{}_{}/".format(model._device_prefix, device)
1620 model.net._net = memonger.share_grad_blobs(
1622 losses_by_gpu[device],
1623 set(viewvalues(model.param_to_grad)),
1625 share_activations=
False,
1629 def _AddDynamicMemoryOptimization(model, blobs_to_keep, devices):
1630 blobs_to_keep_all_devices = set()
1631 if blobs_to_keep
is not None:
1632 for device
in devices:
1633 for blob_name
in blobs_to_keep:
1634 blobs_to_keep_all_devices.add(
1635 "{}_{}/{}".format(model._device_prefix, device, blob_name)
1638 if model._rendezvous
is not None:
1642 blobs_to_keep_all_devices.update(
1643 [str(b)
for b
in viewvalues(model.param_to_grad)]
1646 model.net._net = memonger.release_blobs_when_used(
1648 blobs_to_keep_all_devices
1652 def OptimizeGradientMemory(model,
1655 recycle_activations):
1657 Optimize memory usage of the backward pass by recycling blobs for gradient 1658 inputs that have been 'used'. 1659 input_shapes: dict of blob name to shape for the inputs of the model. 1660 Pass empty dictionary if not known. 1661 excluded_blobs: list of blobs that cannot be recycled. These are blobs 1662 that you will access externally. 1663 recycle_activations: whether to also recycle forward pass activations 1665 if input_shapes
is not None:
1666 input_shapes_all_devices = {}
1667 for b, shp
in viewitems(input_shapes):
1668 for d
in model._devices:
1669 input_shapes_all_devices[
"{}_{}/{}".
1670 format(model._device_prefix, d, b)] = shp
1672 (shapes, types) = workspace.InferShapesAndTypes(
1673 [model.param_init_net, model.net],
1674 input_shapes_all_devices,
1679 for device
in model._devices:
1680 namescope =
"{}_{}/".format(model._device_prefix, device)
1681 excluded_blobs_by_device = set(namescope + b
for b
in excluded_blobs)
1682 model.net._net = memonger.share_grad_blobs(
1684 model._losses_by_gpu[device],
1685 set(viewvalues(model.param_to_grad)),
1687 dont_share_blobs=excluded_blobs_by_device,
1688 share_activations=recycle_activations,
1693 def _CreateOrCloneCommonWorld(
1701 if timeout_sec
is None:
1702 timeout_sec = _DEFAULT_TIMEOUT_SEC
1704 timeout_ms = timeout_sec * 1000
1710 for op
in net.Proto().op:
1711 if op.type !=
"CreateCommonWorld":
1717 if arg.name ==
'timeout_ms':
1718 op_timeout_ms = arg.i
1720 if op_timeout_ms != timeout_ms:
1725 existing = op.output[0]
1729 name =
"{}_op".format(common_world_blob)
1731 if existing
is not None:
1732 comm_world = net.CloneCommonWorld(
1736 engine=rendezvous[
'engine'],
1737 status_blob=status_blob,
1741 if 'transport' in rendezvous:
1742 kwargs[
'transport'] = rendezvous[
'transport']
1743 if 'interface' in rendezvous:
1744 kwargs[
'interface'] = rendezvous[
'interface']
1745 if 'mpi_rendezvous' in rendezvous:
1746 kwargs[
'mpi_rendezvous'] = rendezvous[
'mpi_rendezvous']
1747 comm_world = net.CreateCommonWorld(
1748 rendezvous[
'kv_handler']
or [],
1751 size=rendezvous[
'num_shards'],
1752 rank=rendezvous[
'shard_id'],
1753 engine=rendezvous[
'engine'],
1754 status_blob=status_blob,
1755 timeout_ms=timeout_ms,
1762 def _RunComparison(model, blob_name, device=None):
1764 device = model._blob_to_device[blob_name]
1765 with core.DeviceScope(device):
1766 rendezvous = model._rendezvous
1767 if rendezvous
is None or rendezvous[
'num_shards'] == 1:
1770 test_data_arr = np.zeros(rendezvous[
'num_shards']).astype(np.float32)
1771 test_data_arr[rendezvous[
'shard_id']] = 1
1772 workspace.FeedBlob(
"compare_arr", test_data_arr)
1774 comparison_net =
core.Net(
"allcompare_net")
1777 if 'mpi_rendezvous' in rendezvous:
1778 kwargs[
'mpi_rendezvous'] = rendezvous[
'mpi_rendezvous']
1779 comm_world = comparison_net.CreateCommonWorld(
1780 rendezvous[
'kv_handler']
or [],
1782 name=model.net.Proto().name +
".cw_master_select",
1783 size=rendezvous[
'num_shards'],
1784 rank=rendezvous[
'shard_id'],
1785 engine=rendezvous[
'engine'],
1786 status_blob=
"cw_master_select",
1790 blob_name_checksum = blob_name +
"_checksum" 1791 comparison_net.SumSqrElements(
1792 [blob_name], [blob_name_checksum], average=
False 1795 blob_name_gather = blob_name +
"_gather" 1797 inputs=[
"compare_arr", blob_name_checksum],
1798 outputs=blob_name_gather,
1802 comparison_net.Allreduce(
1803 inputs=[comm_world, blob_name_gather],
1804 outputs=[blob_name_gather],
1805 engine=rendezvous[
'engine'],
1806 status_blob=
"all_reduce_master_select_status",
1809 workspace.RunNetOnce(comparison_net)
1810 gather_arr = workspace.FetchBlob(blob_name_gather)
1812 baseline = gather_arr[0]
1813 for i
in range(rendezvous[
'num_shards']):
1814 assert gather_arr[i] == baseline, \
1815 "allcompare failed on shard {}.".format(rendezvous[
'shard_id'])
1820 def _InterleaveOps(model):
1822 Data Parallel Model creates a net with ops in one device grouped together. 1823 This will interleave the ops so that each op for each device is next 1824 to each other in the net. Kind of like combining decks of cards. This 1825 ensures that progress is made along the critical path roughly concurrently 1826 for each device, which is important due to the extra intra-node 1827 synchronization required for multi-device batch normalization. 1829 orig_ops = list(model.net.Proto().op)
1830 num_devices = len(model._devices)
1831 num_ops_per_dev = len(orig_ops) // num_devices
1832 assert num_devices * num_ops_per_dev == len(orig_ops), \
1833 'Number of ops per device in original net is not uniform' 1835 ops = {d: []
for d
in range(num_devices)}
1837 ops[op.device_option.cuda_gpu_id].append(op)
1839 for j
in range(num_ops_per_dev):
1841 for d
in model._devices:
1844 new_ops.append(ops[d][j])
1846 assert ops[d][j].type == tp, \
1847 "Type mismatch {} / {}".format(tp, ops[d][j].type)
1849 del model.net.Proto().op[:]
1850 model.net.Proto().op.extend(new_ops)
1853 def _InterDeviceBatchNormalization(model):
1854 orig_ops = list(model.net.Proto().op)
1856 num_devices = len(model._devices)
1860 spatial_bn_phase =
False 1864 input_blob_name =
None 1866 spatial_bn_gradient_phase =
False 1867 scale_grad_blobs = []
1868 bias_grad_blobs = []
1871 if op.type !=
'SpatialBN' and op.type !=
'SpatialBNGradient':
1872 if spatial_bn_phase:
1873 new_ops.extend(injected_ops)
1875 core.CreateOperator(
"Sum",
1877 input_blob_name +
"_sums_combined"))
1879 core.CreateOperator(
"Sum",
1881 input_blob_name +
"_sumsq_combined"))
1882 new_ops.extend(batch_norm_ops)
1887 spatial_bn_phase =
False 1888 input_blob_name =
None 1889 elif spatial_bn_gradient_phase:
1890 new_ops.extend(injected_ops)
1892 "cpu_0/" + stripBlobName(scale_grad_blobs[0]) +
"_combined" 1894 "cpu_0/" + stripBlobName(bias_grad_blobs[0]) +
"_combined" 1896 core.CreateOperator(
"Sum", scale_grad_blobs, scale_blob))
1898 core.CreateOperator(
"Sum", bias_grad_blobs, bias_blob))
1899 for blob
in scale_grad_blobs:
1901 core.CreateOperator(
"Copy", scale_blob, blob))
1902 for blob
in bias_grad_blobs:
1903 new_ops.append(core.CreateOperator(
"Copy", bias_blob, blob))
1904 new_ops.extend(batch_norm_ops)
1907 scale_grad_blobs = []
1908 bias_grad_blobs = []
1909 spatial_bn_gradient_phase =
False 1911 elif op.type ==
'SpatialBN':
1912 spatial_bn_phase =
True 1913 if input_blob_name
is None:
1914 input_blob_name = op.input[0]
1916 injected_ops.append(
1917 core.CreateOperator(
1920 [name +
"_sums", name +
"_sumsq"]))
1921 sums_blobs.append(name +
"_sums")
1922 sumsq_blobs.append(name +
"_sumsq")
1923 op.input.append(input_blob_name +
"_sums_combined")
1924 op.input.append(input_blob_name +
"_sumsq_combined")
1925 op.arg.extend([utils.MakeArgument(
"num_batches", num_devices)])
1926 batch_norm_ops.append(op)
1927 elif op.type ==
'SpatialBNGradient':
1928 spatial_bn_gradient_phase =
True 1929 injected_ops.append(
1930 core.CreateOperator(
"ChannelBackpropStats",
1931 [op.input[0], op.input[3], op.input[4],
1933 [op.output[1], op.output[2]]))
1934 scale_grad_blobs.append(op.output[1])
1935 bias_grad_blobs.append(op.output[2])
1936 op.arg.extend([utils.MakeArgument(
"num_batches", num_devices)])
1937 op.input.extend([op.output[1], op.output[2]])
1938 batch_norm_ops.append(op)
1940 assert not spatial_bn_phase, \
1941 "Net modification for inter-device batch normalization failed" 1942 del model.net.Proto().op[:]
1943 model.net.Proto().op.extend(new_ops)
Module caffe2.python.layers.split.