2 from __future__
import absolute_import
3 from __future__
import division
4 from __future__
import print_function
5 from __future__
import unicode_literals
13 from caffe2.python import core, workspace, experiment_util, data_parallel_model
14 from caffe2.python import data_parallel_model_utils, dyndep, optimizer
16 from caffe2.proto
import caffe2_pb2
25 Parallelized multi-GPU distributed trainer for Resnet 50. Can be used to train 26 on imagenet data, for example. 28 To run the trainer in single-machine multi-gpu mode by setting num_shards = 1. 30 To run the trainer in multi-machine multi-gpu mode with M machines, 31 run the same program on all machines, specifying num_shards = M, and 32 shard_id = a unique integer in the set [0, M-1]. 34 For rendezvous (the trainer processes have to know about each other), 35 you can either use a directory path that is visible to all processes 36 (e.g. NFS directory), or use a Redis instance. Use the former by 37 passing the `file_store_path` argument. Use the latter by passing the 38 `redis_host` and `redis_port` arguments. 42 log = logging.getLogger(
"resnet50_trainer")
43 log.setLevel(logging.DEBUG)
45 dyndep.InitOpsLibrary(
'@/caffe2/caffe2/distributed:file_store_handler_ops')
46 dyndep.InitOpsLibrary(
'@/caffe2/caffe2/distributed:redis_store_handler_ops')
49 def AddImageInput(model, reader, batch_size, img_size, dtype, is_test):
51 The image input operator loads image and label data from the reader and 52 applies transformations to the images (random cropping, mirroring, ...). 54 data, label = brew.image_input(
56 reader, [
"data",
"label"],
57 batch_size=batch_size,
59 use_gpu_transform=
True if model._device_type == 1
else False,
69 data = model.StopGradient(data, data)
72 def AddNullInput(model, reader, batch_size, img_size, dtype):
74 The null input function uses a gaussian fill operator to emulate real image 75 input. A label blob is hardcoded to a single value. This is useful if you 76 want to test compute throughput or don't have a dataset available. 78 suffix =
"_fp16" if dtype ==
"float16" else "" 79 model.param_init_net.GaussianFill(
82 shape=[batch_size, 3, img_size, img_size],
84 if dtype ==
"float16":
85 model.param_init_net.FloatToHalf(
"data" + suffix,
"data")
87 model.param_init_net.ConstantFill(
92 dtype=core.DataType.INT32,
96 def SaveModel(args, train_model, epoch):
97 prefix =
"[]_{}".format(train_model._device_prefix, train_model._devices[0])
98 predictor_export_meta = pred_exp.PredictorExportMeta(
99 predict_net=train_model.net.Proto(),
100 parameters=data_parallel_model.GetCheckpointParams(train_model),
101 inputs=[prefix +
"/data"],
102 outputs=[prefix +
"/softmax"],
104 prefix +
"/softmax": (1, args.num_labels),
105 prefix +
"/data": (args.num_channels, args.image_size, args.image_size)
110 model_path =
"%s/%s_%d.mdl" % (
111 args.file_store_path,
112 args.save_model_name,
121 db_destination=model_path,
122 predictor_export_meta=predictor_export_meta,
126 def LoadModel(path, model):
128 Load pretrained model from file 130 log.info(
"Loading path: {}".format(path))
131 meta_net_def = pred_exp.load_from_db(path,
'minidb')
132 init_net = core.Net(pred_utils.GetNet(
133 meta_net_def, predictor_constants.GLOBAL_INIT_NET_TYPE))
134 predict_init_net = core.Net(pred_utils.GetNet(
135 meta_net_def, predictor_constants.PREDICT_INIT_NET_TYPE))
137 predict_init_net.RunAllOnGPU()
138 init_net.RunAllOnGPU()
140 assert workspace.RunNetOnce(predict_init_net)
141 assert workspace.RunNetOnce(init_net)
144 itercnt = workspace.FetchBlob(
"optimizer_iteration")
146 "optimizer_iteration",
148 device_option=core.DeviceOption(caffe2_pb2.CPU, 0)
163 Run one epoch of the trainer. 164 TODO: add checkpointing here. 167 log.info(
"Starting epoch {}/{}".format(epoch, args.num_epochs))
168 epoch_iters = int(args.epoch_size / total_batch_size / num_shards)
169 for i
in range(epoch_iters):
172 timeout = 600.0
if i == 0
else 60.0
173 with timeout_guard.CompleteInTimeOrDie(timeout):
175 workspace.RunNet(train_model.net.Proto().name)
179 fmt =
"Finished iteration {}/{} of epoch {} ({:.2f} images/sec)" 180 log.info(fmt.format(i + 1, epoch_iters, epoch, total_batch_size / dt))
181 prefix =
"{}_{}".format(
182 train_model._device_prefix,
183 train_model._devices[0])
184 accuracy = workspace.FetchBlob(prefix +
'/accuracy')
185 loss = workspace.FetchBlob(prefix +
'/loss')
186 train_fmt =
"Training loss: {}, accuracy: {}" 187 log.info(train_fmt.format(loss, accuracy))
189 num_images = epoch * epoch_iters * total_batch_size
190 prefix =
"{}_{}".format(train_model._device_prefix, train_model._devices[0])
191 accuracy = workspace.FetchBlob(prefix +
'/accuracy')
192 loss = workspace.FetchBlob(prefix +
'/loss')
193 learning_rate = workspace.FetchBlob(
194 data_parallel_model.GetLearningRateBlobNames(train_model)[0]
197 if (test_model
is not None):
200 for _
in range(0, 100):
201 workspace.RunNet(test_model.net.Proto().name)
202 for g
in test_model._devices:
203 test_accuracy += np.asscalar(workspace.FetchBlob(
204 "{}_{}".format(test_model._device_prefix, g) +
'/accuracy' 207 test_accuracy /= ntests
212 input_count=num_images,
213 batch_count=(i + epoch * epoch_iters),
215 'accuracy': accuracy,
217 'learning_rate': learning_rate,
219 'test_accuracy': test_accuracy,
222 assert loss < 40,
"Exploded gradients :(" 230 if args.gpus
is not None:
231 gpus = [int(x)
for x
in args.gpus.split(
',')]
234 gpus = list(range(args.num_gpus))
235 num_gpus = args.num_gpus
237 log.info(
"Running on GPUs: {}".format(gpus))
240 total_batch_size = args.batch_size
241 batch_per_device = total_batch_size // num_gpus
243 total_batch_size % num_gpus == 0, \
244 "Number of GPUs must divide batch size" 247 global_batch_size = total_batch_size * args.num_shards
248 epoch_iters = int(args.epoch_size / global_batch_size)
252 "Epoch size must be larger than batch size times shard count" 254 args.epoch_size = epoch_iters * global_batch_size
255 log.info(
"Using epoch size: {}".format(args.epoch_size))
261 'cudnn_exhaustive_search':
True,
262 'ws_nbytes_limit': (args.cudnn_workspace_limit_mb * 1024 * 1024),
264 train_model = model_helper.ModelHelper(
265 name=
"resnet50", arg_scope=train_arg_scope
268 num_shards = args.num_shards
269 shard_id = args.shard_id
274 interfaces = args.distributed_interfaces.split(
",")
277 if os.getenv(
"OMPI_COMM_WORLD_SIZE")
is not None:
278 num_shards = int(os.getenv(
"OMPI_COMM_WORLD_SIZE", 1))
279 shard_id = int(os.getenv(
"OMPI_COMM_WORLD_RANK", 0))
283 num_shards=num_shards,
286 transport=args.distributed_transport,
287 interface=interfaces[0],
293 store_handler =
"store_handler" 294 if args.redis_host
is not None:
296 workspace.RunOperatorOnce(
298 "RedisStoreHandlerCreate", [], [store_handler],
299 host=args.redis_host,
300 port=args.redis_port,
306 workspace.RunOperatorOnce(
308 "FileStoreHandlerCreate", [], [store_handler],
309 path=args.file_store_path,
315 kv_handler=store_handler,
317 num_shards=num_shards,
319 transport=args.distributed_transport,
320 interface=interfaces[0],
327 def create_resnet50_model_ops(model, loss_scale):
328 initializer = (PseudoFP16Initializer
if args.dtype ==
'float16' 331 with brew.arg_scope([brew.conv, brew.fc],
332 WeightInitializer=initializer,
333 BiasInitializer=initializer,
334 enable_tensor_core=args.enable_tensor_core,
335 float16_compute=args.float16_compute):
336 pred = resnet.create_resnet50(
339 num_input_channels=args.num_channels,
340 num_labels=args.num_labels,
345 if args.dtype ==
'float16':
346 pred = model.net.HalfToFloat(pred, pred +
'_fp32')
348 softmax, loss = model.SoftmaxWithLoss([pred,
'label'],
350 loss = model.Scale(loss, scale=loss_scale)
351 brew.accuracy(model, [softmax,
"label"],
"accuracy")
354 def add_optimizer(model):
355 stepsz = int(30 * args.epoch_size / total_batch_size / num_shards)
357 if args.float16_compute:
359 opt = optimizer.build_fp16_sgd(
361 args.base_learning_rate,
364 weight_decay=args.weight_decay,
370 optimizer.add_weight_decay(model, args.weight_decay)
371 opt = optimizer.build_multi_precision_sgd(
373 args.base_learning_rate,
385 if args.train_data ==
"null":
386 def add_image_input(model):
390 batch_size=batch_per_device,
391 img_size=args.image_size,
395 reader = train_model.CreateDB(
398 db_type=args.db_type,
399 num_shards=num_shards,
403 def add_image_input(model):
407 batch_size=batch_per_device,
408 img_size=args.image_size,
413 def add_post_sync_ops(model):
414 """Add ops applied after initial parameter sync.""" 415 for param_info
in model.GetOptimizationParamInfo(model.GetParams()):
416 if param_info.blob_copy
is not None:
417 model.param_init_net.HalfToFloat(
419 param_info.blob_copy[core.DataType.FLOAT]
423 data_parallel_model.Parallelize(
425 input_builder_fun=add_image_input,
426 forward_pass_builder_fun=create_resnet50_model_ops,
427 optimizer_builder_fun=add_optimizer,
428 post_sync_builder_fun=add_post_sync_ops,
430 rendezvous=rendezvous,
431 optimize_gradient_memory=
False,
432 cpu_device=args.use_cpu,
433 shared_model=args.use_cpu,
434 combine_spatial_bn=args.use_cpu,
437 if args.model_parallel:
439 assert workspace.NumCudaDevices() >= 2 * args.num_gpus
440 activations = data_parallel_model_utils.GetActivationBlobs(train_model)
441 data_parallel_model_utils.ShiftActivationDevices(
443 activations=activations[len(activations) // 2:],
444 shifts={g: args.num_gpus + g
for g
in range(args.num_gpus)},
447 data_parallel_model.OptimizeGradientMemory(train_model, {}, set(),
False)
449 workspace.RunNetOnce(train_model.param_init_net)
450 workspace.CreateNet(train_model.net)
454 if (args.test_data
is not None):
455 log.info(
"----- Create test net ----")
459 'cudnn_exhaustive_search':
True,
461 test_model = model_helper.ModelHelper(
462 name=
"resnet50_test", arg_scope=test_arg_scope, init_params=
False 465 test_reader = test_model.CreateDB(
468 db_type=args.db_type,
471 def test_input_fn(model):
475 batch_size=batch_per_device,
476 img_size=args.image_size,
481 data_parallel_model.Parallelize(
483 input_builder_fun=test_input_fn,
484 forward_pass_builder_fun=create_resnet50_model_ops,
485 post_sync_builder_fun=add_post_sync_ops,
486 param_update_builder_fun=
None,
488 cpu_device=args.use_cpu,
490 workspace.RunNetOnce(test_model.param_init_net)
491 workspace.CreateNet(test_model.net)
495 if args.load_model_path
is not None:
496 LoadModel(args.load_model_path, train_model)
499 data_parallel_model.FinalizeAfterCheckpoint(train_model)
503 last_str = args.load_model_path.split(
'_')[-1]
504 if last_str.endswith(
'.mdl'):
505 epoch = int(last_str[:-4])
506 log.info(
"Reset epoch to {}".format(epoch))
508 log.warning(
"The format of load_model_path doesn't match!")
510 expname =
"resnet50_gpu%d_b%d_L%d_lr%.2f_v2" % (
514 args.base_learning_rate,
517 explog = experiment_util.ModelTrainerLog(expname, args)
520 while epoch < args.num_epochs:
533 SaveModel(args, train_model, epoch)
535 model_path =
"%s/%s_" % (
536 args.file_store_path,
540 if os.path.isfile(model_path + str(epoch - 1) +
".mdl"):
541 os.remove(model_path + str(epoch - 1) +
".mdl")
546 parser = argparse.ArgumentParser(
547 description=
"Caffe2: Resnet-50 training" 549 parser.add_argument(
"--train_data", type=str, default=
None, required=
True,
550 help=
"Path to training data (or 'null' to simulate)")
551 parser.add_argument(
"--test_data", type=str, default=
None,
552 help=
"Path to test data")
553 parser.add_argument(
"--db_type", type=str, default=
"lmdb",
554 help=
"Database type (such as lmdb or leveldb)")
555 parser.add_argument(
"--gpus", type=str,
556 help=
"Comma separated list of GPU devices to use")
557 parser.add_argument(
"--num_gpus", type=int, default=1,
558 help=
"Number of GPU devices (instead of --gpus)")
559 parser.add_argument(
"--model_parallel", type=bool, default=
False,
560 help=
"Split model over 2 x num_gpus")
561 parser.add_argument(
"--num_channels", type=int, default=3,
562 help=
"Number of color channels")
563 parser.add_argument(
"--image_size", type=int, default=227,
564 help=
"Input image size (to crop to)")
565 parser.add_argument(
"--num_labels", type=int, default=1000,
566 help=
"Number of labels")
567 parser.add_argument(
"--batch_size", type=int, default=32,
568 help=
"Batch size, total over all GPUs")
569 parser.add_argument(
"--epoch_size", type=int, default=1500000,
570 help=
"Number of images/epoch, total over all machines")
571 parser.add_argument(
"--num_epochs", type=int, default=1000,
573 parser.add_argument(
"--base_learning_rate", type=float, default=0.1,
574 help=
"Initial learning rate.")
575 parser.add_argument(
"--weight_decay", type=float, default=1e-4,
576 help=
"Weight decay (L2 regularization)")
577 parser.add_argument(
"--cudnn_workspace_limit_mb", type=int, default=64,
578 help=
"CuDNN workspace limit in MBs")
579 parser.add_argument(
"--num_shards", type=int, default=1,
580 help=
"Number of machines in distributed run")
581 parser.add_argument(
"--shard_id", type=int, default=0,
583 parser.add_argument(
"--run_id", type=str,
584 help=
"Unique run identifier (e.g. uuid)")
585 parser.add_argument(
"--redis_host", type=str,
586 help=
"Host of Redis server (for rendezvous)")
587 parser.add_argument(
"--redis_port", type=int, default=6379,
588 help=
"Port of Redis server (for rendezvous)")
589 parser.add_argument(
"--file_store_path", type=str, default=
"/tmp",
590 help=
"Path to directory to use for rendezvous")
591 parser.add_argument(
"--save_model_name", type=str, default=
"resnet50_model",
592 help=
"Save the trained model to a given name")
593 parser.add_argument(
"--load_model_path", type=str, default=
None,
594 help=
"Load previously saved model to continue training")
595 parser.add_argument(
"--use_cpu", type=bool, default=
False,
596 help=
"Use CPU instead of GPU")
597 parser.add_argument(
'--dtype', default=
'float',
598 choices=[
'float',
'float16'],
599 help=
'Data type used for training')
600 parser.add_argument(
'--float16_compute', action=
'store_true',
601 help=
"Use float 16 compute, if available")
602 parser.add_argument(
'--enable_tensor_core', action=
'store_true',
603 help=
'Enable Tensor Core math for Conv and FC ops')
604 parser.add_argument(
"--distributed_transport", type=str, default=
"tcp",
605 help=
"Transport to use for distributed run [tcp|ibverbs]")
606 parser.add_argument(
"--distributed_interfaces", type=str, default=
"",
607 help=
"Network interfaces to use for distributed run")
609 args = parser.parse_args()
613 if __name__ ==
'__main__':
614 workspace.GlobalInit([
'caffe2',
'--caffe2_log_level=2'])