Caffe2 - Python API
A deep learning, cross platform ML framework
task.py
1 ## @package task
2 # Module caffe2.python.task
3 from __future__ import absolute_import
4 from __future__ import division
5 from __future__ import print_function
6 from __future__ import unicode_literals
7 
8 from caffe2.python import core, context
9 from caffe2.python.schema import Field, from_blob_list
10 from collections import defaultdict
11 from copy import copy
12 from future.utils import viewitems
13 
14 
15 def _merge_node_kwargs(a, b):
16  # TODO(azzolini): consistency checks
17  if a is None:
18  return b
19  if b is None:
20  return a
21  c = copy(a)
22  c.update(b)
23  return c
24 
25 
26 @context.define_context(allow_default=True)
27 class Cluster(object):
28  """
29  Context that keeps track of all the node names used.
30  Users shouldn't have to use them directly, since a Cluster is automatically
31  generated at the first usage of 'Node'.
32  """
33 
34  def __init__(self):
35  # list instead of set to keep order
36  self._nodes = []
37  self._node_kwargs = {}
38 
39  def add_node(self, node):
40  if str(node) not in self._nodes:
41  self._nodes.append(str(node))
42  self._node_kwargs[str(node)] = _merge_node_kwargs(
43  node.kwargs(),
44  self._node_kwargs.get(str(node)))
45 
46  def nodes(self):
47  """
48  Returns the list of unique node names used within this context.
49  """
50  return self._nodes
51 
52  def node_kwargs(self):
53  return self._node_kwargs
54 
55 
56 @context.define_context(allow_default=True)
57 class Node(object):
58  """
59  A Node context is used to indicate that all Tasks instantiated within will
60  run on the given node name. (Only the name of the node actually counts.)
61  Example:
62 
63  with TaskGroup() as tg:
64  with Node('node1'):
65  s1 = execution_step(...)
66  Task(step=s1)
67  with Node('node2'):
68  s2 = execution_step(...)
69  with Node('node1'):
70  s3 = execution_step(...)
71 
72  In this example, all three execution steps will run in parallel.
73  Moreover, s1 and s3 will run on the same node, and can see each
74  others blobs.
75 
76  Additionally, a Node can be passed implementation-specific kwargs,
77  in order to specify properties of the node.
78  """
79 
80  def __init__(self, node='local', **kwargs):
81  self._name = str(node)
82  self._kwargs = kwargs
83  Cluster.current().add_node(self)
84 
85  def __str__(self):
86  return self._name
87 
88  def kwargs(self):
89  return self._kwargs
90 
91 
92 class WorkspaceType(object):
93  """
94  Determines whether tasks of a TaskGroup will run directly at the global
95  workspace, which is kept alive across runs, or whether a new child
96  workspace will be created for the run and destroyed afterwards.
97  """
98  PRIVATE = 'private'
99  GLOBAL = 'global'
100 
101 
102 def get_setup_nets(key, steps_or_nets, target):
103  init_net = core.Net(key + '/init')
104  exit_net = core.Net(key + '/exit')
105  init_nets = []
106  exit_nets = []
107  objs = []
108  for step_or_net in steps_or_nets:
109  if hasattr(step_or_net, 'get_all_attributes'):
110  objs += step_or_net.get_all_attributes(key)
111  elif hasattr(step_or_net, 'get_attributes'):
112  objs += step_or_net.get_attributes(key)
113  for obj in objs:
114  # these are needed in order to allow nesting of TaskGroup, which
115  # is a feature not yet implemented.
116  if hasattr(obj, '_setup_used') and obj._setup_used:
117  continue
118  if hasattr(obj, '_setup_target') and obj._setup_target != target:
119  continue
120  if hasattr(obj, 'setup'):
121  nets = obj.setup(init_net)
122  if isinstance(nets, (list, tuple)):
123  init_nets += nets
124  elif isinstance(nets, (core.Net, core.ExecutionStep)):
125  init_nets.append(nets)
126  elif nets is not None:
127  raise TypeError('Unsupported type for setup: %s' % type(nets))
128  obj._setup_used = True
129  if hasattr(obj, 'exit'):
130  nets = obj.exit(exit_net)
131  if isinstance(nets, (list, tuple)):
132  exit_nets += nets
133  elif isinstance(nets, (core.Net, core.ExecutionStep)):
134  exit_nets.append(nets)
135  elif nets is not None:
136  raise TypeError('Unsupported type for setup: %s' % type(nets))
137  obj._setup_used = True
138 
139  if len(init_net.Proto().op) > 0:
140  init_nets.insert(0, init_net)
141  if len(exit_net.Proto().op) > 0:
142  exit_nets.insert(0, exit_net)
143  return init_nets, exit_nets
144 
145 
146 def add_setup_steps(step, init_nets, exit_nets, name):
147  if not init_nets and not exit_nets:
148  return step
149  steps = []
150  if init_nets:
151  steps.append(core.execution_step('%s:init' % name, init_nets))
152  steps.append(step)
153  if len(exit_nets) > 0:
154  steps.append(core.execution_step('%s:exit' % name, exit_nets))
155  return core.execution_step(name, steps)
156 
157 
158 @context.define_context(allow_default=False)
159 class TaskGroup(object):
160  """
161  Context that gathers tasks which will run concurrently, potentially on
162  multiple nodes. All tasks in the same node will share the same workspace
163  and thus can share blobs, while tasks running in different nodes won't
164  be able to directly share data.
165 
166  All tasks of the task group will start concurrently, and the task group
167  will finish execution when the last task of the group finishes.
168 
169  Example:
170  # supose that s1 ... s5 are execution steps or nets.
171  with TaskGroup() as tg:
172  # these tasks go to default node 'local'
173  Task(step=s1)
174  Task(step=s2)
175 
176  with Node('n2'):
177  Task(step=s3)
178  with Node('n1'):
179  Task(step=s4)
180  with Node('n2'):
181  Task(step=s5)
182 
183  # this will run all steps in parallel.
184  # s1 and s2 will run at default node 'local'
185  # s3 and s5 will run at node 'n2'
186  # s4 will run at node 'n1'
187  session.run(tg)
188  """
189  LOCAL_SETUP = 'local_setup'
190 
191  def __init__(self, workspace_type=None):
192  self._plan_cache = None
193  self._tasks = []
194  self._already_used = False
195  self._prev_active = None
196  self._tasks_to_add = []
197  self._report_nets = {}
198  self._report_steps = []
199  self._workspace_type = workspace_type
200  self._tasks_by_node = None
201 
202  def add(self, task):
203  assert not self._already_used, (
204  'Cannot add Task to an already used TaskGroup.')
205  assert (
206  self._workspace_type is None or
207  task._workspace_type is None or
208  self._workspace_type == task._workspace_type)
209  if task._workspace_type is None:
210  task._workspace_type = (
211  self._workspace_type or WorkspaceType.PRIVATE)
212  if self._workspace_type is None:
213  self._workspace_type = task._workspace_type
214  task._notify_used()
215  self._tasks.append(task)
216 
217  def tasks(self):
218  for task in self._tasks_to_add:
219  self.add(task)
220  self._tasks_to_add = []
221  self._already_used = True
222  return self._tasks
223 
224  def num_registered_tasks(self):
225  return len(self._tasks_to_add) + len(self._tasks)
226 
227  def used_nodes(self):
228  # use list to keep order
229  used = []
230  for task in self._tasks + self._tasks_to_add:
231  if task.node not in used:
232  used.append(task.node)
233  return used
234 
235  def report_step(self, step=None, node=None, interval_ms=1000):
236  """
237  Add a "report step" to this TaskGroup. This step will run repeatedly
238  every `interval_ms` milliseconds for the duration of the TaskGroup
239  execution on each of the nodes. It is guaranteed that this step
240  will be run at least once after every Task in the node has finished.
241  """
242  step = core.to_execution_step(step)
243  step.RunEveryMillis(interval_ms)
244  self._report_steps.append((str(node or Node.current(node)), step))
245 
246  def report_net(self, net=None, node=None, report_interval=5):
247  """
248  DEPRECATED. Use report_step instead.
249  """
250  node = str(node or Node.current(node))
251  assert net is None or node not in self._report_nets
252  if node not in self._report_nets:
253  self._report_nets[node] = (
254  net if net else core.Net('%s/reporter' % node),
255  report_interval)
256  return self._report_nets[node][0]
257 
258  def tasks_by_node(self, node_remap=None):
259  # tasks_by_node can't be called twice because the setup won't
260  # work properly a second time.
261  node_map = {}
262  for task in self.tasks():
263  node_map[task.node] =\
264  node_remap(task.node) if node_remap else task.node
265  if self._tasks_by_node is not None:
266  tasks_by_node, prev_node_map = self._tasks_by_node
267  assert prev_node_map == node_map, (
268  'Cannot call tasks_by_node multiple times.')
269  return tasks_by_node
270 
271  # now we have report_steps. report_net is deprecated
272  for node, (net, interval) in viewitems(self._report_nets):
273  self.report_step(net, node=node, interval_ms=interval * 1000)
274  self._report_nets = {}
275 
276  tasks_by_node = defaultdict(list)
277  for task in self.tasks():
278  mapped_node = node_map[task.node]
279  tasks_by_node[mapped_node].append(task)
280 
281  report_steps_by_node = defaultdict(list)
282  for original_node, step in self._report_steps:
283  report_steps_by_node[node_map[original_node]].append(step)
284 
285  grouped_by_node = TaskGroup()
286  for node, tasks in viewitems(tasks_by_node):
287  report_steps = report_steps_by_node[node]
288  node_inits, node_exits = get_setup_nets(
289  TaskGroup.LOCAL_SETUP,
290  [t.get_step() for t in tasks] + report_steps,
291  self)
292  # shortcut for single task with no queue
293  steps = report_steps
294  outputs = []
295  grouped_workspace_type = WorkspaceType.PRIVATE
296  for task in tasks:
297  step = task.get_step()
298  step.SetCreateWorkspace(
299  task.workspace_type() == WorkspaceType.PRIVATE)
300  if step is not None:
301  steps.append(step)
302  outputs += task.outputs()
303  # If any of the tasks in the node uses the global workspace,
304  # then set the grouped task to use the global workspace as well
305  if task.workspace_type() == WorkspaceType.GLOBAL:
306  grouped_workspace_type = WorkspaceType.GLOBAL
307  if len(steps) == 0:
308  steps.append(core.execution_step('empty', []))
309  if len(steps) == 1:
310  step = steps[0]
311  else:
312  step = core.execution_step(
313  '%s:body' % node, steps, concurrent_substeps=True)
314  if len(node_inits) > 0 or len(node_exits) > 0:
315  steps = []
316  if len(node_inits) > 0:
317  steps.append(
318  core.execution_step('%s:init' % node, node_inits))
319  steps.append(step)
320  if len(node_exits) > 0:
321  steps.append(
322  core.execution_step('%s:exit' % node, node_exits))
323  step = core.execution_step(node, steps)
324  Task(
325  node=node, step=step, outputs=outputs,
326  name='grouped_by_node',
327  group=grouped_by_node, workspace_type=grouped_workspace_type)
328  self._tasks_by_node = (grouped_by_node, node_map)
329  return grouped_by_node
330 
331  def to_task(self, node=None):
332  node = str(Node.current(node))
333  tasks = self.tasks_by_node(lambda x: node).tasks()
334  if len(tasks) == 0:
335  return Task()
336  return tasks[0]
337 
338  def workspace_type(self):
339  return self._workspace_type
340 
341 
342 class TaskOutput(object):
343  """
344  Represents the output of a task. An output can be a blob,
345  a list of blob, or a record.
346  """
347 
348  def __init__(self, names):
349  self._schema = None
350  self._is_scalar = False
351  if isinstance(names, Field):
352  self._schema = names
353  names = self._schema.field_blobs()
354  self._is_scalar = type(names) not in (tuple, list)
355  if self._is_scalar:
356  names = [names]
357  self.names = names
358  self._values = None
359 
360  def set(self, values, _fetch_func=None):
361  assert len(values) == len(self.names)
362  self._values = values
363  self._fetch_func = _fetch_func
364 
365  def get(self):
366  assert self._values is not None, 'Output value not set yet.'
367  if self._is_scalar:
368  return self._values[0]
369  elif self._schema:
370  return from_blob_list(self._schema, self._values)
371  else:
372  return self._values
373 
374  def fetch(self):
375  assert self._fetch_func is not None, (
376  'Cannot fetch value for this output.')
377  fetched_vals = [self._fetch_func(v) for v in self._values]
378  if self._is_scalar:
379  return fetched_vals[0]
380  elif self._schema:
381  return from_blob_list(self._schema, fetched_vals)
382  else:
383  return fetched_vals
384 
385 
386 def final_output(blob_or_record):
387  """
388  Adds an output to the current Task, or if no task is active,
389  create a dummy task that returns the given blob or record
390  to the client. This will return the value of the blob or record when
391  the last task of the TaskGroup for a given node finishes.
392  """
393  cur_task = Task.current(required=False) or Task()
394  return cur_task.add_output(blob_or_record)
395 
396 
397 class TaskOutputList(object):
398  """ Keeps a list of outputs for a task """
399  def __init__(self, outputs=None):
400  self.outputs = outputs or []
401 
402  def names(self):
403  """
404  Retrive the output names.
405  TODO(azzolini): make this schema-based.
406  """
407  names = []
408  for o in self.outputs:
409  names += o.names
410  return names
411 
412  def set_values(self, values, _fetch_func=None):
413  offset = 0
414  for o in self.outputs:
415  num = len(o.names)
416  o.set(values[offset:offset + num], _fetch_func)
417  offset += num
418  assert offset == len(values), 'Wrong number of output values.'
419 
420 
422 class Task(object):
423  """
424  A Task is composed of an execution step and zero or more outputs.
425  Tasks are executed in the context of a TaskGroup, which, in turn, can
426  be run by a Session.
427 
428  Task outputs are fetched by the session at the end of the run.
429 
430  The recommended way of creating a task is by using `net_builder.ops`.
431  Example:
432 
433  from net_builder import ops
434  with Node('trainer'), Task(name='my_task', num_instances=2):
435  with ops.task_init():
436  globl = ops.Const(0)
437  with ops.task_instance_init():
438  local = ops.Const(0)
439  with ops.loop(100):
440  ops.Copy(globl, local)
441  with ops.task_instance_exit():
442  ops.Add([globl, local], [globl])
443  with ops.task_exit():
444  ops.Mul([globl, globl], [blobl])
445 
446  The task above will create 2 instances that will run in parallel.
447  Each instance will copy `local` to `globl` 100 times, Then Add `local`
448  to `globl` once. The `Mul` will only execute once, after all the instances
449  of the task have finished.
450  """
451 
452  # TASK_SETUP runs once per task, before/after all
453  # concurrent task instances start/finish.
454  TASK_SETUP = 'task_setup'
455  # Setup will run once for each instance of the task.
456  TASK_INSTANCE_SETUP = 'task_instance_setup'
457  REPORT_STEP = 'report_step'
458  _global_names_used = set()
459 
460  @staticmethod
461  def _get_next_name(node, group, name):
462  basename = str(node) + '/' + str(name)
463  names_used = (
464  Task._global_names_used
465  if group is None else
466  set(t.name for t in group._tasks_to_add))
467  cur_name = basename
468  i = 0
469  while cur_name in names_used:
470  i += 1
471  cur_name = '%s:%d' % (basename, i)
472  return cur_name
473 
474  def __init__(
475  self, step=None, outputs=None,
476  workspace_type=None, group=None, node=None, name=None,
477  num_instances=None):
478  """
479  Instantiate a Task and add it to the current TaskGroup and Node.
480 
481  Args:
482  step: If provided, this task will run this ExecutionStep.
483  outputs: If provided, the task will return the provided outputs
484  to the client at completion time.
485  node: If provided, force task execution on the given node.
486  name: Name of the Task.
487  num_instances: If provided, this task will be cloned num_instances
488  times at runtime, and all instances will run
489  concurrently.
490  """
491  if not name and isinstance(step, core.ExecutionStep):
492  name = step.Proto().name
493  if not name:
494  name = 'task'
495  # register this node name with active context
496  self.node = str(Node.current(None if node is None else Node(node)))
497  self.group = TaskGroup.current(group, required=False)
498 
499  self.name = Task._get_next_name(self.node, self.group, name)
500 
501  # may need to be temporarily removed later if Task used as a context
502  if self.group is not None:
503  self.group._tasks_to_add.append(self)
504 
505  self._already_used = False
506  self._step = None
507  self._step_with_setup = None
508  self._outputs = []
509  if step is not None:
510  self.set_step(step)
511  if outputs is not None:
512  self.add_outputs(outputs)
513 
514  self._pipeline = None
515  self._is_pipeline_context = False
516  self._workspace_type = workspace_type
517  self._report_net = None
518  self._num_instances = num_instances
519 
520  def __enter__(self):
521  # temporarily remove from _tasks_to_add to ensure correct order
522  if self.group is not None:
523  self.group._tasks_to_add.remove(self)
524  self._assert_not_used()
525  assert self._step is None, 'This Task already has an execution step.'
526  from caffe2.python import net_builder
527  self._net_builder = net_builder.NetBuilder(_fullname=self.name)
528  self._net_builder.__enter__()
529  return self
530 
531  def __exit__(self, type, value, traceback):
532  self._net_builder.__exit__(type, value, traceback)
533  if type is None:
534  self.set_step(self._net_builder)
535  if self.group is not None:
536  self.group._tasks_to_add.append(self)
537  self._net_builder = None
538 
539  def workspace_type(self):
540  return self._workspace_type
541 
542  def _assert_not_used(self):
543  assert not self._already_used, (
544  'Cannot modify task since it is already been used.')
545 
546  def add_output(self, output):
547  self._assert_not_used()
548  output = (
549  output if isinstance(output, TaskOutput) else TaskOutput(output))
550  self._outputs.append(output)
551  return output
552 
553  def add_outputs(self, outputs):
554  self._assert_not_used()
555  if type(outputs) not in (list, tuple):
556  return self.add_output(outputs)
557  else:
558  return [self.add_output(output) for output in outputs]
559 
560  def set_step(self, step):
561  self._assert_not_used()
562  self._step = core.to_execution_step(step)
563 
564  def get_step(self):
565  if self._step_with_setup is not None:
566  return self._step_with_setup
567 
568  if self._step is None:
569  self._step_with_setup = core.execution_step(self.name, [])
570  return self._step_with_setup
571 
572  report_steps = [
573  s
574  for s in self._step.get_all_attributes(Task.REPORT_STEP)
575  if not hasattr(s, '_report_step_used')
576  ]
577  for step in report_steps:
578  step._report_step_used = True
579  if not step.Proto().run_every_ms:
580  step.RunEveryMillis(1000)
581  task_init_nets, task_exit_nets = get_setup_nets(
582  Task.TASK_SETUP, [self._step] + report_steps, self)
583  instance_init_nets, instance_exit_nets = get_setup_nets(
584  Task.TASK_INSTANCE_SETUP, [self._step] + report_steps, self)
585  if len(self._outputs) == 0:
586  output_net = core.Net('%s:output' % self.name)
587  self.add_output(output_net.ConstantFill(
588  [], 1, dtype=core.DataType.INT32, value=0))
589  task_exit_nets.append(output_net)
590 
591  # Add instance-level report steps
592  body = self._step if not report_steps else core.execution_step(
593  '%s:body' % self.name, report_steps + [self._step])
594  # Enclose with instance-level (thread-local) setup nets
595  step_with_instance_setup = add_setup_steps(
596  body, instance_init_nets, instance_exit_nets,
597  self.name + ':instance')
598  # Set up runtime concurrent instances
599  if self._num_instances and self._num_instances > 1:
600  step_with_instance_setup.SetCreateWorkspace(True)
601  step_with_instance_setup = core.execution_step(
602  '%s:parallel',
603  [step_with_instance_setup],
604  num_concurrent_instances=self._num_instances)
605  # Enclose with task-level setup nets
606  self._step_with_setup = add_setup_steps(
607  step_with_instance_setup, task_init_nets, task_exit_nets, self.name)
608 
609  return self._step_with_setup
610 
611  def output_list(self):
612  return TaskOutputList(self._outputs)
613 
614  def outputs(self):
615  return self._outputs
616 
617  def _notify_used(self):
618  self.get_step()
619  self._already_used = True
620 
621 
622 class SetupNets(object):
623  """
624  Allow to register a list of nets to be run at initialization
625  and finalization of Tasks or TaskGroups.
626  For example, let's say you have the following:
627 
628  init_net = core.Net('init')
629  my_val = init_net.ConstantFill([], 'my_val', value=0)
630 
631  net = core.Net('counter')
632  net.Add([my_val, net.Const(1),], [my_val])
633 
634  with TaskGroup() as task_group:
635  with Node('trainer'):
636  my_task = Task(step=[net])
637 
638  In order to have `init_net` run once before `net` runs for the
639  first time, you can do one of the following:
640 
641  net.add_attribute(Task.TASK_SETUP, SetupNets([init_net]))
642 
643  or
644 
645  net.add_attribute(TaskGroup.LOCAL_SETUP, SetupNets([init_net]))
646 
647  - With Task.TASK_SETUP, init_net will run once at my_task startup.
648  - With TaskGroup.LOCAL_SETUP, init_net will run once on node 'trainer',
649  before any task of the task group is run on that node.
650 
651  The same SetupNets object can be added to multiple nets. It will only
652  run once per Task/TaskGroup run.
653  """
654 
655  def __init__(self, init_nets=None, exit_nets=None):
656  self.init_nets = init_nets
657  self.exit_nets = exit_nets
658 
659  def setup(self, init_net):
660  return self.init_nets
661 
662  def exit(self, exit_net):
663  return self.exit_nets
def __init__(self, step=None, outputs=None, workspace_type=None, group=None, node=None, name=None, num_instances=None)
Definition: task.py:477
def add(self, task)
Definition: task.py:202
def get_step(self)
Definition: task.py:564
def set_step(self, step)
Definition: task.py:560
def add_output(self, output)
Definition: task.py:546
def tasks_by_node(self, node_remap=None)
Definition: task.py:258
def report_step(self, step=None, node=None, interval_ms=1000)
Definition: task.py:235
def add_outputs(self, outputs)
Definition: task.py:553
def report_net(self, net=None, node=None, report_interval=5)
Definition: task.py:246
def _assert_not_used(self)
Definition: task.py:542