Caffe2 - Python API
A deep learning, cross platform ML framework
dataio.py
1 ## @package dataio
2 # Module caffe2.python.dataio
3 """
4 Defines the base interface for reading and writing operations.
5 
6 Readers/Writers are objects that produce operations that read/write sequences
7 of data. Each operation reads or writes a list of BlobReferences.
8 
9 Readers and Writers must be implemented such that read and write operations
10 are atomic and thread safe.
11 
12 Examples of possible Readers and Writers:
13  QueueReader, QueueWriter,
14  DatasetReader, DatasetWriter,
15 
16 See `dataset.py` for an example of implementation.
17 """
18 from __future__ import absolute_import
19 from __future__ import division
20 from __future__ import print_function
21 from __future__ import unicode_literals
22 
23 from caffe2.python import core
24 from caffe2.python.schema import Field, Struct, from_blob_list
25 import numpy as np
26 
27 
28 class Reader(object):
29  """
30  Reader is an abstract class to be implemented in order to provide
31  operations capable of iterating through a dataset or stream of data.
32 
33  A Reader must implement at least one operation, `read`, which
34  adds operations to a net that read the next batch of data. Readers can
35  optionally support the `reset` operation, which is useful when multiple
36  passes over the data are required.
37  """
38  def __init__(self, schema=None):
39  if schema is not None:
40  assert isinstance(schema, Field)
41  self._schema = schema
42 
43  def schema(self):
44  assert self._schema is not None, 'Schema not provided for this reader.'
45  return self._schema
46 
47  def _set_schema(self, schema):
48  self._schema = schema
49 
50  def setup_ex(self, init_net, finish_net):
51  """Setup nets to run at task initialization and cleanup time.
52 
53  Args:
54  global_init_net: A net invoked at task init time.
55  global_finish_net: A net invoked at task cleanup time.
56  """
57  pass
58 
59  def read_ex(self, local_init_net, local_finish_net):
60  read_net = core.Net('reader_body')
61  return ([read_net], ) + self.read(read_net)
62 
63  def read_record_ex(self, local_init_net, local_finish_net):
64  nets, should_stop, fields = self.read_ex(
65  local_init_net, local_finish_net)
66  if self._schema:
67  fields = from_blob_list(self._schema, fields)
68  return nets, should_stop, fields
69 
70  def read(self, read_net):
71  """Append operations to read_net that will read a batch from the
72  underlying data soruce.
73 
74  Operations added to `read_net` must be thread safe and atomic, that is,
75  it should be possible to clone `read_net` and run multiple instances of
76  it in parallel.
77 
78  Args:
79  read_net: the net that will be appended with read operations
80 
81  Returns:
82  A tuple (should_stop, fields), with:
83  should_stop: BlobReference pointing to a boolean scalar
84  blob that indicates whether the read operation
85  was succesfull or whether the end of data has
86  been reached.
87  fields: A tuple of BlobReference containing the latest batch
88  of data that was read.
89  """
90  raise NotImplementedError('Readers must implement `read`.')
91 
92  def reset(self, net):
93  """Append operations to `net` that will reset the reader.
94 
95  This can be used to read the data multiple times.
96  Not all readers support this operation.
97  """
98  raise NotImplementedError('This reader cannot be resetted.')
99 
100  def read_record(self, read_net):
101  should_stop, fields = self.read(read_net)
102  if self._schema:
103  fields = from_blob_list(self._schema, fields)
104  return should_stop, fields
105 
106  def execution_step(self, reader_net_name=None, external_should_stop=None):
107  """Create an execution step with a net containing read operators.
108 
109  The execution step will contain a `stop_blob` that knows how to stop
110  the execution loop when end of data was reached.
111 
112  E.g.:
113 
114  read_step, fields = reader.execution_step()
115  consume_net = core.Net('consume')
116  consume_net.Print(fields[0], [])
117  p = core.Plan('reader')
118  p.AddStep(read_step.AddNet(consume_net))
119  core.RunPlan(p)
120 
121  Args:
122  reader_net_name: (optional) the name of the reader_net to be
123  created. The execution step will
124  be named accordingly.
125 
126  Returns:
127  A tuple (read_step, fields), with:
128  read_step: A newly created execution step containing a net with
129  read operations. The step will have `stop_blob` set,
130  in order to stop the loop on end of data.
131  fields: A tuple of BlobReference containing the latest batch
132  of data that was read.
133  """
134  reader_net = core.Net(reader_net_name or 'reader')
135  should_stop, fields = self.read_record(reader_net)
136  if external_should_stop is not None:
137  should_stop = reader_net.Or([external_should_stop, should_stop])
138  read_step = core.execution_step(
139  '{}_step'.format(reader_net_name),
140  reader_net,
141  should_stop_blob=should_stop)
142  return (read_step, fields)
143 
144 
145 class Writer(object):
146  """
147  Writer is an abstract class to be implemented in order to provide
148  operations capable of feeding a data stream or a dataset.
149 
150  A Writer must implement 2 operations:
151  `write`, which adds operations to a net that write the write batch of
152  data, and `commit`, which adds operations to a net in order to indicate
153  that no more data will be written.
154  """
155  _schema = None
156 
157  def schema(self):
158  return self._schema
159 
160  def write(self, writer_net, fields):
161  """Add operations to `writer_net` that write the next batch of data.
162 
163  Operations added to the net must be thread-safe and unique, that is:
164  multiple writers must be able to write to the dataset in parallel.
165 
166  Args:
167  fields: a tuple of BlobReference containing the batch of data to
168  write.
169  """
170  raise NotImplementedError('Writers must implement write.')
171 
172  def write_record(self, writer_net, fields):
173  if isinstance(fields, Field):
174  self._schema = fields
175  fields = fields.field_blobs()
176  self.write(writer_net, fields)
177 
178  def setup_ex(self, init_net, finish_net):
179  """Experimental, don't use yet"""
180  self.commit(finish_net)
181 
182  def write_ex(self, fields, local_init_net, local_finish_net, stop_blob):
183  """Experimental extension to the interface. Don't use yet"""
184  write_net = core.Net('write_net')
185  self.write(write_net, fields)
186  return [write_net]
187 
188  def write_record_ex(
189  self, fields, local_init_net, local_finish_net, stop_blob=None):
190  """Experimental extension to the interface. Don't use yet."""
191  if isinstance(fields, Field):
192  self._schema = fields
193  fields = fields.field_blobs()
194  if stop_blob is None:
195  stop_blob = local_init_net.NextName("dequeue_status")
196  write_nets = self.write_ex(
197  fields, local_init_net, local_finish_net, stop_blob)
198  return (write_nets, stop_blob)
199 
200  def commit(self, finish_net):
201  """Add operations to `finish_net` that signal end of data.
202 
203  This must be implemented by all Writers, but may be no-op for some
204  of them.
205  """
206  pass
207 
208 
209 class ReaderBuilder(object):
210  """ Allow usage of a reader in distributed fashion. """
211  def schema(self):
212  raise NotImplementedError()
213 
214  def setup(self, **kwargs):
215  """
216  Optionally, perform one-time setup before calling new_reader().
217  Subclass should make sure this function is only called once.
218  """
219  raise NotImplementedError()
220 
221  def new_reader(self, **kwargs):
222  raise NotImplementedError()
223 
224 
226  """ReaderBuilder that modifies underlying builder by calling `piper`
227  function on each new reader produced, and return the result of
228  the function. This way, it is possible to append data processing
229  pipelines that will be replicated for each reader that gets created.
230 
231  E.g.:
232 
233  PipedReaderBuilder(
234  ReaderBuilder(...),
235  lambda reader: pipe(reader, processor=my_proc))
236  """
237 
238  def __init__(self, builder, piper):
239  self._builder = builder
240  self._piper = piper
241 
242  def schema(self):
243  return self._builder.schema()
244 
245  def setup(self, **kwargs):
246  self._builder.setup(**kwargs)
247 
248  def new_reader(self, **kwargs):
249  # Passing everything down since you could wrap a PipedReaderBuilder in
250  # another PipedReaderBuilder
251  output = self._piper(
252  reader=self._builder.new_reader(**kwargs),
253  **kwargs
254  )
255  return output if isinstance(output, Reader) else output.reader()
256 
257 
258 class Pipe(object):
259  def __init__(self, schema=None, obj_key=None):
260  self._num_writers = 0
261  self._num_readers = 0
262  self._schema = schema
263  self._obj_key = obj_key
264 
265  def schema(self):
266  return self._schema
267 
268  def setup(self, global_init_net):
269  pass
270 
271  def reader(self):
272  raise NotImplementedError()
273 
274  def writer(self):
275  raise NotImplementedError()
276 
277  def num_readers(self):
278  return self._num_readers
279 
280  def num_writers(self):
281  return self._num_writers
282 
283  def _new_writer(self, writer_schema, writer_init_net):
284  if writer_schema is not None and self._schema is None:
285  self._schema = writer_schema
286  self._num_writers += 1
287  if self._obj_key is not None:
288  writer_init_net.add_attribute(self._obj_key, self)
289 
290  def _new_reader(self, reader_init_net):
291  self._num_readers += 1
292  if self._obj_key is not None:
293  reader_init_net.add_attribute(self._obj_key, self)
294 
295 
297  """ Reader that produces increasing integers. """
298  def __init__(self):
299  Reader.__init__(self, schema=Struct(('iter', np.int64)))
300  self.counter = None
301  self.should_stop = None
302 
303  def setup_ex(self, global_init_net, global_finish_net):
304  if self.counter is None:
305  self.counter = global_init_net.CreateCounter([], init_count=0)
306  self.should_stop = global_init_net.ConstantFill(
307  [], shape=[], dtype=core.DataType.BOOL, value=False)
308 
309  def read_ex(self, local_init_net, local_finish_net):
310  count_net = core.Net('limited_reader_counter')
311  value = count_net.CountUp([self.counter], 1)
312  return [count_net], self.should_stop, [value]
313 
314 
316  """Abstract Reader constrained by certain conditions.
317 
318  Base class for Reader classes which check for certain conditions to stop
319  further processing (e.g. max number of iterations or time limit).
320  Also produces a boolean blob (data_finished) that can be used to see if
321  the reader exausted all input data (true) or stopped for another reason
322  (false).
323  """
324 
325  def __init__(self, reader):
326  Reader.__init__(self, schema=reader._schema)
327  self.reader = reader
328  self.net = core.Net('reader_with_limit')
329  self._data_finished = self.net.AddExternalInput(
330  self.net.NextName('data_finished'))
331  self.should_stop = None
332 
333  def setup_ex(self, global_init_net, global_finish_net):
334  global_init_net.ConstantFill(
335  [], [self._data_finished],
336  shape=[], value=False, dtype=core.DataType.BOOL)
337  self.reader.setup_ex(global_init_net, global_finish_net)
338  self.setup_limiter(global_init_net, global_finish_net)
339 
340  def read_ex(self, local_init_net, local_finish_net):
341  """Reads from an underlying Reader class, but may stop due to additional
342  constraints.
343 
344  Build and return network(s) to read data from a Reader with
345  additional constraints, depending on which derived class is used.
346  Derived classes implement setup_limited and check_limiter_condition
347  which determine the nature of the constraint imposed on the reader,
348  e.g. iteration limits or time limit.
349 
350  Args:
351  local_init_net: A net invoked at task instance init time (Once per
352  parallel thread).
353  local_finish_net: A net invoked at task instance cleanup time (Once
354  per parallel thread).
355  """
356 
357  # Check if limiting constraint is met.
358  stop_condition_net = core.Net('limited_reader_condition')
359  should_stop = self.check_limiter_condition(stop_condition_net)
360 
361  # Call original reader.
362  nets, local_data_finished, fields = self.reader.read_ex(
363  local_init_net, local_finish_net)
364  self._set_schema(self.reader._schema)
365 
366  # Check if original reader is done.
367  check_done_net = core.Net('limited_reader_post')
368  # Copy to the same blob as the counter output to trigger reader
369  # stopping - this is ok because execution will check should_stop_blob
370  # after every single operation, so it has already been checked on this
371  # iteration by this point.
372  check_done_net.Copy(local_data_finished, should_stop)
373  # Update externally-accessible flag indicating if reader is done
374  check_done_net.Or([self._data_finished, local_data_finished],
375  [self._data_finished])
376 
377  return [stop_condition_net] + nets + [check_done_net], should_stop, fields
378 
379  def setup_limiter(self, global_init_net, global_finish_net):
380  """Configure task level init/cleanup nets required to implement limit
381  condition. Must be implemented by subclass.
382 
383  Args:
384  global_init_net: A net invoked at task init time.
385  global_finish_net: A net invoked at task cleanup time.
386  """
387  raise NotImplementedError("Subclass must implement `setup_limiter`")
388 
389  def check_limiter_condition(self, stop_condition_net):
390  """Configure a net that is invoked between reading batches to see if
391  limit condition is met. Must be implemented by subclass.
392 
393  Args:
394  stop_condition_net: A net invoked to evaluate an early termination
395  condition.
396  """
397  raise NotImplementedError("Subclass must implement `check_limiter_condition")
398 
399  def data_finished(self):
400  """
401  Return a blob that can be checked after the end of the reading task,
402  which will contain a scalar float indicating whether the underlying
403  reader has been exhausted (True) or whether we stopped because reached
404  the limit of iterations (False).
405  """
406  return self._data_finished
407 
408 
410  """Reader that stops after `num_iter` batches.
411 
412  If `num_iter` <= 0 or is None, reverts to an unconstrained reader that
413  exports a boolean blob indicating that the reader has exhausted
414  the data steam.
415  """
416  def __init__(self, reader, num_iter=1):
417  """Class initializer.
418 
419  Args:
420  reader: The underlying reader object doing the actual read.
421  num_iter: Number of batches to read. If `None`,
422  the class reverts to a normal reader except that it also
423  produces a data_finished blob as a side effect to indicate
424  whether the input stream is exhausted.
425  """
426  super(ReaderWithLimit, self).__init__(reader)
427  self.counter = None
428  self.num_iter = num_iter
429  if self.num_iter is not None:
430  self.counter = self.net.AddExternalInput(
431  self.net.NextName('counter'))
432 
433  def setup_limiter(self, global_init_net, global_finish_net):
434  if self.counter:
435  global_init_net.CreateCounter(
436  [], [self.counter], init_count=int(self.num_iter))
437 
438  def check_limiter_condition(self, stop_condition_net):
439  if self.counter:
440  return stop_condition_net.CountDown([self.counter], 1)
441  else:
442  return stop_condition_net.ConstantFill(
443  [], 1,
444  shape=[], value=False, dtype=core.DataType.BOOL)
445 
446 
447 def CountUntil(num_iter):
448  return ReaderWithLimit(CounterReader(), num_iter)
449 
450 
452  """Reader that stops after `duration` seconds.
453 
454  If `duration` <= 0 or is None, reverts to an unconstrained reader that
455  exports a boolean blob indicating that the reader has exhausted
456  the data steam.
457  """
458  def __init__(self, reader, duration=0):
459  """Class initializer.
460 
461  Args:
462  reader: The underlying reader object doing the actual read.
463  duration: Number of seconds to read. If un-specified, None, or <= 0,
464  the class reverts to a normal reader except that it also
465  produces a data_finished blob as a side effect to indicate
466  whether the input stream is exhausted.
467  """
468  super(ReaderWithTimeLimit, self).__init__(reader)
469 
470  self.timer = None
471  self.duration = duration
472  self.duration_ns_blob = None
473 
474  def setup_limiter(self, global_init_net, global_finish_net):
475  if self.duration is not None and self.duration > 0:
476  duration_ns = int(self.duration * (10**9))
477 
478  self.timer = global_init_net.TimerBegin(
479  [], counter_name='epoch_timer')
480  start_time = global_init_net.TimerGet(self.timer)
481  self.duration_ns_blob = global_init_net.ConstantFill(
482  [start_time], value=duration_ns)
483 
484  global_finish_net.TimerEnd([self.timer], [])
485 
486  def check_limiter_condition(self, stop_condition_net):
487  if self.duration:
488  time_elapsed = stop_condition_net.TimerGet(self.timer)
489  return stop_condition_net.GE(
490  [time_elapsed, self.duration_ns_blob], str(self.should_stop))
491  else:
492  return stop_condition_net.ConstantFill(
493  [], 1, shape=[], value=False, dtype=core.DataType.BOOL
494  )
Module caffe2.python.schema.
def read(self, read_net)
Definition: dataio.py:70
def _set_schema(self, schema)
Definition: dataio.py:47
def write_record_ex(self, fields, local_init_net, local_finish_net, stop_blob=None)
Definition: dataio.py:189
def __init__(self, reader, duration=0)
Definition: dataio.py:458
def setup_ex(self, init_net, finish_net)
Definition: dataio.py:178
def commit(self, finish_net)
Definition: dataio.py:200
def execution_step(self, reader_net_name=None, external_should_stop=None)
Definition: dataio.py:106
def check_limiter_condition(self, stop_condition_net)
Definition: dataio.py:389
def reset(self, net)
Definition: dataio.py:92
def read_record(self, read_net)
Definition: dataio.py:100
def setup_limiter(self, global_init_net, global_finish_net)
Definition: dataio.py:379
def read_ex(self, local_init_net, local_finish_net)
Definition: dataio.py:340
def write(self, writer_net, fields)
Definition: dataio.py:160
def __init__(self, reader, num_iter=1)
Definition: dataio.py:416
def write_ex(self, fields, local_init_net, local_finish_net, stop_blob)
Definition: dataio.py:182
def setup_ex(self, init_net, finish_net)
Definition: dataio.py:50