3 from __future__
import absolute_import
4 from __future__
import division
5 from __future__
import print_function
6 from __future__
import unicode_literals
10 from collections
import defaultdict
12 from future.utils
import viewitems
15 def _merge_node_kwargs(a, b):
26 @context.define_context(allow_default=
True)
29 Context that keeps track of all the node names used. 30 Users shouldn't have to use them directly, since a Cluster is automatically 31 generated at the first usage of 'Node'. 39 def add_node(self, node):
40 if str(node)
not in self.
_nodes:
41 self._nodes.append(str(node))
44 self._node_kwargs.get(str(node)))
48 Returns the list of unique node names used within this context. 52 def node_kwargs(self):
59 A Node context is used to indicate that all Tasks instantiated within will 60 run on the given node name. (Only the name of the node actually counts.) 63 with TaskGroup() as tg: 65 s1 = execution_step(...) 68 s2 = execution_step(...) 70 s3 = execution_step(...) 72 In this example, all three execution steps will run in parallel. 73 Moreover, s1 and s3 will run on the same node, and can see each 76 Additionally, a Node can be passed implementation-specific kwargs, 77 in order to specify properties of the node. 80 def __init__(self, node='local', **kwargs):
81 self.
_name = str(node)
83 Cluster.current().add_node(self)
94 Determines whether tasks of a TaskGroup will run directly at the global 95 workspace, which is kept alive across runs, or whether a new child 96 workspace will be created for the run and destroyed afterwards. 102 def get_setup_nets(key, steps_or_nets, target):
108 for step_or_net
in steps_or_nets:
109 if hasattr(step_or_net,
'get_all_attributes'):
110 objs += step_or_net.get_all_attributes(key)
111 elif hasattr(step_or_net,
'get_attributes'):
112 objs += step_or_net.get_attributes(key)
116 if hasattr(obj,
'_setup_used')
and obj._setup_used:
118 if hasattr(obj,
'_setup_target')
and obj._setup_target != target:
120 if hasattr(obj,
'setup'):
121 nets = obj.setup(init_net)
122 if isinstance(nets, (list, tuple)):
125 init_nets.append(nets)
126 elif nets
is not None:
127 raise TypeError(
'Unsupported type for setup: %s' % type(nets))
128 obj._setup_used =
True 129 if hasattr(obj,
'exit'):
130 nets = obj.exit(exit_net)
131 if isinstance(nets, (list, tuple)):
134 exit_nets.append(nets)
135 elif nets
is not None:
136 raise TypeError(
'Unsupported type for setup: %s' % type(nets))
137 obj._setup_used =
True 139 if len(init_net.Proto().op) > 0:
140 init_nets.insert(0, init_net)
141 if len(exit_net.Proto().op) > 0:
142 exit_nets.insert(0, exit_net)
143 return init_nets, exit_nets
146 def add_setup_steps(step, init_nets, exit_nets, name):
147 if not init_nets
and not exit_nets:
151 steps.append(core.execution_step(
'%s:init' % name, init_nets))
153 if len(exit_nets) > 0:
154 steps.append(core.execution_step(
'%s:exit' % name, exit_nets))
155 return core.execution_step(name, steps)
161 Context that gathers tasks which will run concurrently, potentially on 162 multiple nodes. All tasks in the same node will share the same workspace 163 and thus can share blobs, while tasks running in different nodes won't 164 be able to directly share data. 166 All tasks of the task group will start concurrently, and the task group 167 will finish execution when the last task of the group finishes. 170 # supose that s1 ... s5 are execution steps or nets. 171 with TaskGroup() as tg: 172 # these tasks go to default node 'local' 183 # this will run all steps in parallel. 184 # s1 and s2 will run at default node 'local' 185 # s3 and s5 will run at node 'n2' 186 # s4 will run at node 'n1' 189 LOCAL_SETUP =
'local_setup' 191 def __init__(self, workspace_type=None):
204 'Cannot add Task to an already used TaskGroup.')
207 task._workspace_type
is None or 209 if task._workspace_type
is None:
210 task._workspace_type = (
215 self._tasks.append(task)
224 def num_registered_tasks(self):
227 def used_nodes(self):
231 if task.node
not in used:
232 used.append(task.node)
237 Add a "report step" to this TaskGroup. This step will run repeatedly 238 every `interval_ms` milliseconds for the duration of the TaskGroup 239 execution on each of the nodes. It is guaranteed that this step 240 will be run at least once after every Task in the node has finished. 242 step = core.to_execution_step(step)
243 step.RunEveryMillis(interval_ms)
244 self._report_steps.append((str(node
or Node.current(node)), step))
246 def report_net(self, net=None, node=None, report_interval=5):
248 DEPRECATED. Use report_step instead. 250 node = str(node
or Node.current(node))
254 net
if net
else core.Net(
'%s/reporter' % node),
258 def tasks_by_node(self, node_remap=None):
262 for task
in self.
tasks():
263 node_map[task.node] =\
264 node_remap(task.node)
if node_remap
else task.node
267 assert prev_node_map == node_map, (
268 'Cannot call tasks_by_node multiple times.')
272 for node, (net, interval)
in viewitems(self.
_report_nets):
273 self.
report_step(net, node=node, interval_ms=interval * 1000)
276 tasks_by_node = defaultdict(list)
277 for task
in self.
tasks():
278 mapped_node = node_map[task.node]
279 tasks_by_node[mapped_node].append(task)
281 report_steps_by_node = defaultdict(list)
283 report_steps_by_node[node_map[original_node]].append(step)
286 for node, tasks
in viewitems(tasks_by_node):
287 report_steps = report_steps_by_node[node]
288 node_inits, node_exits = get_setup_nets(
289 TaskGroup.LOCAL_SETUP,
290 [t.get_step()
for t
in tasks] + report_steps,
295 grouped_workspace_type = WorkspaceType.PRIVATE
297 step = task.get_step()
298 step.SetCreateWorkspace(
299 task.workspace_type() == WorkspaceType.PRIVATE)
302 outputs += task.outputs()
305 if task.workspace_type() == WorkspaceType.GLOBAL:
306 grouped_workspace_type = WorkspaceType.GLOBAL
308 steps.append(core.execution_step(
'empty', []))
312 step = core.execution_step(
313 '%s:body' % node, steps, concurrent_substeps=
True)
314 if len(node_inits) > 0
or len(node_exits) > 0:
316 if len(node_inits) > 0:
318 core.execution_step(
'%s:init' % node, node_inits))
320 if len(node_exits) > 0:
322 core.execution_step(
'%s:exit' % node, node_exits))
323 step = core.execution_step(node, steps)
325 node=node, step=step, outputs=outputs,
326 name=
'grouped_by_node',
327 group=grouped_by_node, workspace_type=grouped_workspace_type)
329 return grouped_by_node
331 def to_task(self, node=None):
332 node = str(Node.current(node))
338 def workspace_type(self):
344 Represents the output of a task. An output can be a blob, 345 a list of blob, or a record. 348 def __init__(self, names):
351 if isinstance(names, Field):
353 names = self._schema.field_blobs()
354 self.
_is_scalar = type(names)
not in (tuple, list)
360 def set(self, values, _fetch_func=None):
361 assert len(values) == len(self.
names)
366 assert self.
_values is not None,
'Output value not set yet.' 376 'Cannot fetch value for this output.')
379 return fetched_vals[0]
381 return from_blob_list(self.
_schema, fetched_vals)
386 def final_output(blob_or_record):
388 Adds an output to the current Task, or if no task is active, 389 create a dummy task that returns the given blob or record 390 to the client. This will return the value of the blob or record when 391 the last task of the TaskGroup for a given node finishes. 393 cur_task = Task.current(required=
False)
or Task()
394 return cur_task.add_output(blob_or_record)
398 """ Keeps a list of outputs for a task """ 399 def __init__(self, outputs=None):
404 Retrive the output names. 405 TODO(azzolini): make this schema-based. 412 def set_values(self, values, _fetch_func=None):
416 o.set(values[offset:offset + num], _fetch_func)
418 assert offset == len(values),
'Wrong number of output values.' 424 A Task is composed of an execution step and zero or more outputs. 425 Tasks are executed in the context of a TaskGroup, which, in turn, can 428 Task outputs are fetched by the session at the end of the run. 430 The recommended way of creating a task is by using `net_builder.ops`. 433 from net_builder import ops 434 with Node('trainer'), Task(name='my_task', num_instances=2): 435 with ops.task_init(): 437 with ops.task_instance_init(): 440 ops.Copy(globl, local) 441 with ops.task_instance_exit(): 442 ops.Add([globl, local], [globl]) 443 with ops.task_exit(): 444 ops.Mul([globl, globl], [blobl]) 446 The task above will create 2 instances that will run in parallel. 447 Each instance will copy `local` to `globl` 100 times, Then Add `local` 448 to `globl` once. The `Mul` will only execute once, after all the instances 449 of the task have finished. 454 TASK_SETUP =
'task_setup' 456 TASK_INSTANCE_SETUP =
'task_instance_setup' 457 REPORT_STEP =
'report_step' 458 _global_names_used = set()
461 def _get_next_name(node, group, name):
462 basename = str(node) +
'/' + str(name)
464 Task._global_names_used
465 if group
is None else 466 set(t.name
for t
in group._tasks_to_add))
469 while cur_name
in names_used:
471 cur_name =
'%s:%d' % (basename, i)
475 self, step=
None, outputs=
None,
476 workspace_type=
None, group=
None, node=
None, name=
None,
479 Instantiate a Task and add it to the current TaskGroup and Node. 482 step: If provided, this task will run this ExecutionStep. 483 outputs: If provided, the task will return the provided outputs 484 to the client at completion time. 485 node: If provided, force task execution on the given node. 486 name: Name of the Task. 487 num_instances: If provided, this task will be cloned num_instances 488 times at runtime, and all instances will run 492 name = step.Proto().name
496 self.
node = str(Node.current(
None if node
is None else Node(node)))
497 self.
group = TaskGroup.current(group, required=
False)
502 if self.
group is not None:
503 self.group._tasks_to_add.append(self)
511 if outputs
is not None:
522 if self.
group is not None:
523 self.group._tasks_to_add.remove(self)
525 assert self.
_step is None,
'This Task already has an execution step.' 528 self._net_builder.__enter__()
531 def __exit__(self, type, value, traceback):
532 self._net_builder.__exit__(type, value, traceback)
535 if self.
group is not None:
536 self.group._tasks_to_add.append(self)
539 def workspace_type(self):
542 def _assert_not_used(self):
544 'Cannot modify task since it is already been used.')
546 def add_output(self, output):
549 output
if isinstance(output, TaskOutput)
else TaskOutput(output))
550 self._outputs.append(output)
553 def add_outputs(self, outputs):
555 if type(outputs)
not in (list, tuple):
558 return [self.
add_output(output)
for output
in outputs]
560 def set_step(self, step):
562 self.
_step = core.to_execution_step(step)
568 if self.
_step is None:
574 for s
in self._step.get_all_attributes(Task.REPORT_STEP)
575 if not hasattr(s,
'_report_step_used')
577 for step
in report_steps:
578 step._report_step_used =
True 579 if not step.Proto().run_every_ms:
580 step.RunEveryMillis(1000)
581 task_init_nets, task_exit_nets = get_setup_nets(
582 Task.TASK_SETUP, [self.
_step] + report_steps, self)
583 instance_init_nets, instance_exit_nets = get_setup_nets(
584 Task.TASK_INSTANCE_SETUP, [self.
_step] + report_steps, self)
588 [], 1, dtype=core.DataType.INT32, value=0))
589 task_exit_nets.append(output_net)
592 body = self.
_step if not report_steps
else core.execution_step(
593 '%s:body' % self.
name, report_steps + [self.
_step])
595 step_with_instance_setup = add_setup_steps(
596 body, instance_init_nets, instance_exit_nets,
597 self.
name +
':instance')
600 step_with_instance_setup.SetCreateWorkspace(
True)
601 step_with_instance_setup = core.execution_step(
603 [step_with_instance_setup],
607 step_with_instance_setup, task_init_nets, task_exit_nets, self.
name)
611 def output_list(self):
617 def _notify_used(self):
624 Allow to register a list of nets to be run at initialization 625 and finalization of Tasks or TaskGroups. 626 For example, let's say you have the following: 628 init_net = core.Net('init') 629 my_val = init_net.ConstantFill([], 'my_val', value=0) 631 net = core.Net('counter') 632 net.Add([my_val, net.Const(1),], [my_val]) 634 with TaskGroup() as task_group: 635 with Node('trainer'): 636 my_task = Task(step=[net]) 638 In order to have `init_net` run once before `net` runs for the 639 first time, you can do one of the following: 641 net.add_attribute(Task.TASK_SETUP, SetupNets([init_net])) 645 net.add_attribute(TaskGroup.LOCAL_SETUP, SetupNets([init_net])) 647 - With Task.TASK_SETUP, init_net will run once at my_task startup. 648 - With TaskGroup.LOCAL_SETUP, init_net will run once on node 'trainer', 649 before any task of the task group is run on that node. 651 The same SetupNets object can be added to multiple nets. It will only 652 run once per Task/TaskGroup run. 655 def __init__(self, init_nets=None, exit_nets=None):
659 def setup(self, init_net):
662 def exit(self, exit_net):
def __init__(self, step=None, outputs=None, workspace_type=None, group=None, node=None, name=None, num_instances=None)
def add_output(self, output)
def tasks_by_node(self, node_remap=None)
def report_step(self, step=None, node=None, interval_ms=1000)
def add_outputs(self, outputs)
def report_net(self, net=None, node=None, report_interval=5)
def _assert_not_used(self)