4 Defines the base interface for reading and writing operations. 6 Readers/Writers are objects that produce operations that read/write sequences 7 of data. Each operation reads or writes a list of BlobReferences. 9 Readers and Writers must be implemented such that read and write operations 10 are atomic and thread safe. 12 Examples of possible Readers and Writers: 13 QueueReader, QueueWriter, 14 DatasetReader, DatasetWriter, 16 See `dataset.py` for an example of implementation. 18 from __future__
import absolute_import
19 from __future__
import division
20 from __future__
import print_function
21 from __future__
import unicode_literals
30 Reader is an abstract class to be implemented in order to provide 31 operations capable of iterating through a dataset or stream of data. 33 A Reader must implement at least one operation, `read`, which 34 adds operations to a net that read the next batch of data. Readers can 35 optionally support the `reset` operation, which is useful when multiple 36 passes over the data are required. 38 def __init__(self, schema=None):
39 if schema
is not None:
40 assert isinstance(schema, Field)
44 assert self.
_schema is not None,
'Schema not provided for this reader.' 47 def _set_schema(self, schema):
51 """Setup nets to run at task initialization and cleanup time. 54 global_init_net: A net invoked at task init time. 55 global_finish_net: A net invoked at task cleanup time. 59 def read_ex(self, local_init_net, local_finish_net):
60 read_net = core.Net(
'reader_body')
61 return ([read_net], ) + self.read(read_net)
63 def read_record_ex(self, local_init_net, local_finish_net):
64 nets, should_stop, fields = self.read_ex(
65 local_init_net, local_finish_net)
67 fields = from_blob_list(self._schema, fields)
68 return nets, should_stop, fields
71 """Append operations to read_net that will read a batch from the 72 underlying data soruce. 74 Operations added to `read_net` must be thread safe and atomic, that is, 75 it should be possible to clone `read_net` and run multiple instances of 79 read_net: the net that will be appended with read operations 82 A tuple (should_stop, fields), with: 83 should_stop: BlobReference pointing to a boolean scalar 84 blob that indicates whether the read operation 85 was succesfull or whether the end of data has 87 fields: A tuple of BlobReference containing the latest batch 88 of data that was read. 90 raise NotImplementedError(
'Readers must implement `read`.')
93 """Append operations to `net` that will reset the reader. 95 This can be used to read the data multiple times. 96 Not all readers support this operation. 98 raise NotImplementedError(
'This reader cannot be resetted.')
100 def read_record(self, read_net):
101 should_stop, fields = self.
read(read_net)
103 fields = from_blob_list(self.
_schema, fields)
104 return should_stop, fields
107 """Create an execution step with a net containing read operators. 109 The execution step will contain a `stop_blob` that knows how to stop 110 the execution loop when end of data was reached. 114 read_step, fields = reader.execution_step() 115 consume_net = core.Net('consume') 116 consume_net.Print(fields[0], []) 117 p = core.Plan('reader') 118 p.AddStep(read_step.AddNet(consume_net)) 122 reader_net_name: (optional) the name of the reader_net to be 123 created. The execution step will 124 be named accordingly. 127 A tuple (read_step, fields), with: 128 read_step: A newly created execution step containing a net with 129 read operations. The step will have `stop_blob` set, 130 in order to stop the loop on end of data. 131 fields: A tuple of BlobReference containing the latest batch 132 of data that was read. 134 reader_net =
core.Net(reader_net_name
or 'reader')
136 if external_should_stop
is not None:
137 should_stop = reader_net.Or([external_should_stop, should_stop])
138 read_step = core.execution_step(
139 '{}_step'.format(reader_net_name),
141 should_stop_blob=should_stop)
142 return (read_step, fields)
147 Writer is an abstract class to be implemented in order to provide 148 operations capable of feeding a data stream or a dataset. 150 A Writer must implement 2 operations: 151 `write`, which adds operations to a net that write the write batch of 152 data, and `commit`, which adds operations to a net in order to indicate 153 that no more data will be written. 160 def write(self, writer_net, fields):
161 """Add operations to `writer_net` that write the next batch of data. 163 Operations added to the net must be thread-safe and unique, that is: 164 multiple writers must be able to write to the dataset in parallel. 167 fields: a tuple of BlobReference containing the batch of data to 170 raise NotImplementedError(
'Writers must implement write.')
172 def write_record(self, writer_net, fields):
173 if isinstance(fields, Field):
175 fields = fields.field_blobs()
176 self.
write(writer_net, fields)
179 """Experimental, don't use yet""" 182 def write_ex(self, fields, local_init_net, local_finish_net, stop_blob):
183 """Experimental extension to the interface. Don't use yet""" 185 self.
write(write_net, fields)
189 self, fields, local_init_net, local_finish_net, stop_blob=
None):
190 """Experimental extension to the interface. Don't use yet.""" 191 if isinstance(fields, Field):
193 fields = fields.field_blobs()
194 if stop_blob
is None:
195 stop_blob = local_init_net.NextName(
"dequeue_status")
197 fields, local_init_net, local_finish_net, stop_blob)
198 return (write_nets, stop_blob)
201 """Add operations to `finish_net` that signal end of data. 203 This must be implemented by all Writers, but may be no-op for some 209 class ReaderBuilder(object):
210 """ Allow usage of a reader in distributed fashion. """ 212 raise NotImplementedError()
216 Optionally, perform one-time setup before calling new_reader(). 217 Subclass should make sure this function is only called once. 219 raise NotImplementedError()
221 def new_reader(self, **kwargs):
222 raise NotImplementedError()
226 """ReaderBuilder that modifies underlying builder by calling `piper` 227 function on each new reader produced, and return the result of 228 the function. This way, it is possible to append data processing 229 pipelines that will be replicated for each reader that gets created. 235 lambda reader: pipe(reader, processor=my_proc)) 238 def __init__(self, builder, piper):
243 return self._builder.schema()
245 def setup(self, **kwargs):
246 self._builder.setup(**kwargs)
248 def new_reader(self, **kwargs):
252 reader=self._builder.new_reader(**kwargs),
255 return output
if isinstance(output, Reader)
else output.reader()
259 def __init__(self, schema=None, obj_key=None):
268 def setup(self, global_init_net):
272 raise NotImplementedError()
275 raise NotImplementedError()
277 def num_readers(self):
278 return self._num_readers
280 def num_writers(self):
281 return self._num_writers
283 def _new_writer(self, writer_schema, writer_init_net):
284 if writer_schema
is not None and self._schema
is None:
285 self._schema = writer_schema
286 self._num_writers += 1
287 if self._obj_key
is not None:
288 writer_init_net.add_attribute(self._obj_key, self)
290 def _new_reader(self, reader_init_net):
291 self._num_readers += 1
292 if self._obj_key
is not None:
293 reader_init_net.add_attribute(self._obj_key, self)
297 """ Reader that produces increasing integers. """ 299 Reader.__init__(self, schema=
Struct((
'iter', np.int64)))
303 def setup_ex(self, global_init_net, global_finish_net):
305 self.
counter = global_init_net.CreateCounter([], init_count=0)
307 [], shape=[], dtype=core.DataType.BOOL, value=
False)
309 def read_ex(self, local_init_net, local_finish_net):
310 count_net =
core.Net(
'limited_reader_counter')
311 value = count_net.CountUp([self.
counter], 1)
316 """Abstract Reader constrained by certain conditions. 318 Base class for Reader classes which check for certain conditions to stop 319 further processing (e.g. max number of iterations or time limit). 320 Also produces a boolean blob (data_finished) that can be used to see if 321 the reader exausted all input data (true) or stopped for another reason 325 def __init__(self, reader):
326 Reader.__init__(self, schema=reader._schema)
330 self.net.NextName(
'data_finished'))
333 def setup_ex(self, global_init_net, global_finish_net):
334 global_init_net.ConstantFill(
336 shape=[], value=
False, dtype=core.DataType.BOOL)
337 self.reader.setup_ex(global_init_net, global_finish_net)
340 def read_ex(self, local_init_net, local_finish_net):
341 """Reads from an underlying Reader class, but may stop due to additional 344 Build and return network(s) to read data from a Reader with 345 additional constraints, depending on which derived class is used. 346 Derived classes implement setup_limited and check_limiter_condition 347 which determine the nature of the constraint imposed on the reader, 348 e.g. iteration limits or time limit. 351 local_init_net: A net invoked at task instance init time (Once per 353 local_finish_net: A net invoked at task instance cleanup time (Once 354 per parallel thread). 358 stop_condition_net =
core.Net(
'limited_reader_condition')
362 nets, local_data_finished, fields = self.reader.read_ex(
363 local_init_net, local_finish_net)
367 check_done_net =
core.Net(
'limited_reader_post')
372 check_done_net.Copy(local_data_finished, should_stop)
377 return [stop_condition_net] + nets + [check_done_net], should_stop, fields
380 """Configure task level init/cleanup nets required to implement limit 381 condition. Must be implemented by subclass. 384 global_init_net: A net invoked at task init time. 385 global_finish_net: A net invoked at task cleanup time. 387 raise NotImplementedError(
"Subclass must implement `setup_limiter`")
390 """Configure a net that is invoked between reading batches to see if 391 limit condition is met. Must be implemented by subclass. 394 stop_condition_net: A net invoked to evaluate an early termination 397 raise NotImplementedError(
"Subclass must implement `check_limiter_condition")
401 Return a blob that can be checked after the end of the reading task, 402 which will contain a scalar float indicating whether the underlying 403 reader has been exhausted (True) or whether we stopped because reached 404 the limit of iterations (False). 410 """Reader that stops after `num_iter` batches. 412 If `num_iter` <= 0 or is None, reverts to an unconstrained reader that 413 exports a boolean blob indicating that the reader has exhausted 417 """Class initializer. 420 reader: The underlying reader object doing the actual read. 421 num_iter: Number of batches to read. If `None`, 422 the class reverts to a normal reader except that it also 423 produces a data_finished blob as a side effect to indicate 424 whether the input stream is exhausted. 426 super(ReaderWithLimit, self).
__init__(reader)
430 self.
counter = self.net.AddExternalInput(
431 self.net.NextName(
'counter'))
433 def setup_limiter(self, global_init_net, global_finish_net):
435 global_init_net.CreateCounter(
438 def check_limiter_condition(self, stop_condition_net):
440 return stop_condition_net.CountDown([self.
counter], 1)
442 return stop_condition_net.ConstantFill(
444 shape=[], value=
False, dtype=core.DataType.BOOL)
447 def CountUntil(num_iter):
452 """Reader that stops after `duration` seconds. 454 If `duration` <= 0 or is None, reverts to an unconstrained reader that 455 exports a boolean blob indicating that the reader has exhausted 459 """Class initializer. 462 reader: The underlying reader object doing the actual read. 463 duration: Number of seconds to read. If un-specified, None, or <= 0, 464 the class reverts to a normal reader except that it also 465 produces a data_finished blob as a side effect to indicate 466 whether the input stream is exhausted. 468 super(ReaderWithTimeLimit, self).
__init__(reader)
474 def setup_limiter(self, global_init_net, global_finish_net):
476 duration_ns = int(self.
duration * (10**9))
478 self.
timer = global_init_net.TimerBegin(
479 [], counter_name=
'epoch_timer')
480 start_time = global_init_net.TimerGet(self.
timer)
482 [start_time], value=duration_ns)
484 global_finish_net.TimerEnd([self.
timer], [])
486 def check_limiter_condition(self, stop_condition_net):
488 time_elapsed = stop_condition_net.TimerGet(self.
timer)
489 return stop_condition_net.GE(
492 return stop_condition_net.ConstantFill(
493 [], 1, shape=[], value=
False, dtype=core.DataType.BOOL
Module caffe2.python.schema.
def _set_schema(self, schema)
def write_record_ex(self, fields, local_init_net, local_finish_net, stop_blob=None)
def __init__(self, reader, duration=0)
def setup_ex(self, init_net, finish_net)
def commit(self, finish_net)
def execution_step(self, reader_net_name=None, external_should_stop=None)
def check_limiter_condition(self, stop_condition_net)
def read_record(self, read_net)
def setup_limiter(self, global_init_net, global_finish_net)
def read_ex(self, local_init_net, local_finish_net)
def write(self, writer_net, fields)
def __init__(self, reader, num_iter=1)
def write_ex(self, fields, local_init_net, local_finish_net, stop_blob)
def setup_ex(self, init_net, finish_net)