Caffe2 - Python API
A deep learning, cross platform ML framework
dataset.py
1 ## @package dataset
2 # Module caffe2.python.dataset
3 """
4 Implementation of an in-memory dataset with structured schema.
5 
6 Use this to store and iterate through datasets with complex schema that
7 fit in memory.
8 
9 Iterating through entries of this dataset is very fast since the dataset
10 is stored as a set of native Caffe2 tensors, thus no type conversion or
11 deserialization is necessary.
12 """
13 from __future__ import absolute_import
14 from __future__ import division
15 from __future__ import print_function
16 from __future__ import unicode_literals
17 
18 from caffe2.python import core, workspace
19 from caffe2.python.dataio import Reader, Writer
20 from caffe2.python.schema import (
21  Struct, from_blob_list, from_column_list, InitEmptyRecord)
22 import numpy as np
23 
24 
26  def __init__(self, dataset, name, batch_size=1, enforce_batch_size=False):
27  """Don't call this directly. Instead, use dataset.reader()"""
28  Reader.__init__(self, dataset.content())
29  self.dataset = dataset
30  self.name = name or (dataset.name + '_cursor')
31  self.batch_size = batch_size
32  self.enforce_batch_size = enforce_batch_size
33  self.cursor = None
34 
35  def setup_ex(self, init_net, exit_net):
36  if self.cursor is None:
37  self.cursor = init_net.CreateTreeCursor(
38  [],
39  init_net.NextScopedBlob(self.name),
40  fields=self.dataset.fields)
41 
42  def read(self, read_net):
43  assert self.cursor, 'setup not called.'
44  content = self.dataset.content()
45  with core.NameScope(read_net.NextName(self.name)):
46  fields = read_net.ReadNextBatch(
47  [self.cursor] + content.field_blobs(),
48  content.field_names(),
49  batch_size=self.batch_size,
50  enforce_batch_size=self.enforce_batch_size)
51  if type(fields) is core.BlobReference:
52  fields = [fields]
53  return (read_net.IsEmpty([fields[0]]), fields)
54 
55  def reset(self, net):
56  net.ResetCursor([self.cursor], [])
57 
58 
60  def __init__(self, dataset, name, indices, batch_size=1, loop_over=False,
61  enforce_batch_size=False):
62  """Don't call this directly. Instead, use dataset.random_reader()"""
63  Reader.__init__(self, dataset.content())
64  self.dataset = dataset
65  self.cursor = None
66  self.name = name or (dataset.name + '_cursor')
67  self.indices = indices
68  self.batch_size = batch_size
69  self.loop_over = loop_over
70  self.enforce_batch_size = enforce_batch_size
71 
72  def setup_ex(self, init_net, exit_net):
73  if self.cursor is None:
74  self.cursor = init_net.CreateTreeCursor(
75  [],
76  [self.name],
77  fields=self.dataset.fields)
78 
79  def reset(self, net):
80  net.ResetCursor([self.cursor], [])
81 
82  def computeoffset(self, net):
83  self.reset(net)
84  offsets = net.ComputeOffset(
85  [self.cursor] + self.dataset.content().field_blobs(),
86  'offsets')
87  self.offsets = offsets
88 
89  def sort_and_shuffle(self, net, sort_by_field=None,
90  shuffle_size=1, batch_size=1):
91  # no sorting by default
92  content = self.dataset.content()
93  sort_by_field_idx = -1
94  if sort_by_field:
95  assert sort_by_field in content.field_names(), (
96  'Must be valid field.')
97  sort_by_field_idx = content.field_names().index(sort_by_field)
98  self.reset(net)
99 
100  indices = net.SortAndShuffle(
101  [self.cursor] + content.field_blobs(),
102  'indices',
103  sort_by_field_idx=sort_by_field_idx,
104  shuffle_size=shuffle_size,
105  batch_size=batch_size)
106  self.indices = indices
107 
108  def read(self, read_net):
109  with core.NameScope(read_net.NextName(self.name)):
110  fields = read_net.ReadRandomBatch(
111  [self.cursor, self.indices, self.offsets] + (
112  self.dataset.content().field_blobs()),
113  self.dataset.content().field_names(),
114  batch_size=self.batch_size,
115  enforce_batch_size=self.enforce_batch_size,
116  loop_over=self.loop_over)
117  return (read_net.IsEmpty([fields[0]]), fields)
118 
119 
121  def __init__(self, content):
122  """Don't call this directly. Use dataset.writer() instead."""
123  self._content = content
124  self.mutex = None
125 
126  def setup_ex(self, init_net, exit_net):
127  if self.mutex is None:
128  self.mutex = init_net.CreateMutex([])
129 
130  def write(self, writer_net, fields):
131  """
132  Add operations to `net` that append the blobs in `fields` to the end
133  of the dataset. An additional operator will also be added that checks
134  the consistency of the data in `fields` against the dataset schema.
135 
136  Args:
137  writer_net: The net that will contain the Append operators.
138  fields: A list of BlobReference to be appeneded to this dataset.
139  """
140  assert self.mutex is not None, 'setup not called.'
141  field_blobs = self._content.field_blobs()
142  assert len(fields) == len(field_blobs), (
143  'Expected %s fields, got %s.' % (len(field_blobs), len(fields)))
144  writer_net.CheckDatasetConsistency(
145  fields, [], fields=self._content.field_names())
146  writer_net.AtomicAppend(
147  [self.mutex] + field_blobs + list(fields),
148  field_blobs)
149 
150  def commit(self, finish_net):
151  """Commit is a no-op for an in-memory dataset."""
152  pass
153 
154 
155 def Const(net, value, dtype=None, name=None):
156  """
157  Create a 'constant' by first creating an external input in the given
158  net, and then feeding the corresponding blob with its provided value
159  in the current workspace. The name is automatically generated in order
160  to avoid clashes with existing blob names.
161  """
162  assert isinstance(net, core.Net), 'net must be a core.Net instance.'
163  value = np.array(value, dtype=dtype)
164  blob = net.AddExternalInput(net.NextName(prefix=name))
165  workspace.FeedBlob(str(blob), value)
166  return blob
167 
168 
169 def execution_step_with_progress(name, init_net, substeps, rows_read):
170  # progress reporter
171  report_net = core.Net('report_net')
172  report_net.Print([rows_read], [])
173  return core.execution_step(
174  name,
175  substeps,
176  report_net=report_net,
177  concurrent_substeps=True,
178  report_interval=5)
179 
180 
181 class Dataset(object):
182  """Represents an in-memory dataset with fixed schema.
183 
184  Use this to store and iterate through datasets with complex schema that
185  fit in memory.
186 
187  Iterating through entries of this dataset is very fast since the dataset
188  is stored as a set of native Caffe2 tensors, thus no type conversion or
189  deserialization is necessary.
190  """
191 
192  def __init__(self, fields, name=None):
193  """Create an un-initialized dataset with schema provided by `fields`.
194 
195  Before this dataset can be used, it must be initialized, either by
196  `init_empty` or `init_from_dataframe`.
197 
198  Args:
199  fields: either a schema.Struct or a list of field names in a format
200  compatible with the one described in schema.py.
201  name: optional name to prepend to blobs that will store the data.
202  """
203  assert isinstance(fields, list) or isinstance(fields, Struct), (
204  'fields must be either a Struct or a list of raw field names.')
205  if isinstance(fields, list):
206  fields = from_column_list(fields)
207  self.schema = fields
208  self.fields = fields.field_names()
209  self.field_types = fields.field_types()
210  self.name = name or 'dataset'
211  self.field_blobs = fields.field_blobs() if fields.has_blobs() else None
212 
213  def trim(self, net, multiple_of):
214  """
215  Trims the contents of this dataset so that the number of records is
216  multiple of the given argument.
217  """
218  net.TrimDataset(
219  self.field_blobs,
220  self.field_blobs,
221  fields=self.fields,
222  multiple_of=multiple_of)
223 
224  def init_empty(self, init_net):
225  """Initialize the blobs for this dataset with empty values.
226 
227  Empty arrays will be immediately fed into the current workspace,
228  and `init_net` will take those blobs as external inputs.
229  """
230  self.field_blobs = InitEmptyRecord(
231  init_net, self.schema.clone_schema()).field_blobs()
232 
233  def init_from_dataframe(self, net, dataframe):
234  """Initialize the blobs for this dataset from a Pandas dataframe.
235 
236  Each column of the dataframe will be immediately fed into the current
237  workspace, and the `net` will take this blobs as external inputs.
238  """
239  assert len(self.fields) == len(dataframe.columns)
240  self.field_blobs = [
241  Const(net, dataframe.as_matrix([col]).flatten(), name=field)
242  for col, field in enumerate(self.fields)]
243 
244  def get_blobs(self):
245  """
246  Return the list of BlobReference pointing to the blobs that contain
247  the data for this dataset.
248  """
249  assert self
250  return self.field_blobs
251 
252  def content(self):
253  """
254  Return a Record of BlobReferences pointing to the full content of
255  this dataset.
256  """
257  return from_blob_list(self.schema, self.field_blobs)
258 
259  def field_names(self):
260  """Return the list of field names for this dataset."""
261  return self.fields
262 
263  def field_types(self):
264  """
265  Return the list of field dtypes for this dataset.
266 
267  If a list of strings, not a schema.Struct, was passed to the
268  constructor, this will return a list of dtype(np.void).
269  """
270  return self.field_types
271 
272  def reader(self, init_net=None, cursor_name=None, batch_size=1,
273  enforce_batch_size=False):
274  """Create a Reader object that is used to iterate through the dataset.
275 
276  This will append operations to `init_net` that create a TreeCursor,
277  used to iterate through the data.
278 
279  NOTE: Currently, it is not safe to append to a dataset while reading.
280 
281  Args:
282  init_net: net that will be run once to create the cursor.
283  cursor_name: optional name for the blob containing a pointer
284  to the cursor.
285  batch_size: how many samples to read per iteration.
286 
287  Returns:
288  A _DatasetReader that can be used to create operators that will
289  iterate through the dataset.
290  """
291  assert self.field_blobs, 'Dataset not initialized.'
292  reader = _DatasetReader(self, cursor_name, batch_size,
293  enforce_batch_size)
294  if init_net is not None:
295  reader.setup_ex(init_net, None)
296  return reader
297 
298  def random_reader(self, init_net=None, indices=None, cursor_name=None,
299  batch_size=1, loop_over=False, enforce_batch_size=False):
300  """Create a Reader object that is used to iterate through the dataset.
301 
302  NOTE: The reader order depends on the order in indices.
303 
304  Args:
305  init_net: net that will be run once to create the cursor.
306  indices: blob of reading order
307  cursor_name: optional name for the blob containing a pointer
308  to the cursor.
309  batch_size: how many samples to read per iteration.
310  loop_over: repeat the dataset indefinitely (in the same order)
311 
312  Returns:
313  A DatasetReader that can be used to create operators that will
314  iterate through the dataset according to indices.
315  """
316  assert self.field_blobs, 'Dataset not initialized.'
317  reader = _DatasetRandomReader(
318  self, cursor_name, indices, batch_size, loop_over,
319  enforce_batch_size)
320  if init_net is not None:
321  reader.setup_ex(init_net, None)
322  return reader
323 
324  def writer(self, init_net=None):
325  """Create a Writer that can be used to append entries into the dataset.
326 
327  NOTE: Currently, it is not safe to append to a dataset
328  while reading from it.
329  NOTE: Currently implementation of writer is not thread safe.
330  TODO: fixme
331 
332  Args:
333  init_net: net that will be run once in order to create the writer.
334  (currently not used)
335  """
336  assert self.field_blobs, 'Dataset not initialized.'
337  writer = _DatasetWriter(self.content())
338  if init_net is not None:
339  writer.setup_ex(init_net, None)
340  return writer
def writer(self, init_net=None)
Definition: dataset.py:324
def random_reader(self, init_net=None, indices=None, cursor_name=None, batch_size=1, loop_over=False, enforce_batch_size=False)
Definition: dataset.py:299
def init_empty(self, init_net)
Definition: dataset.py:224
def write(self, writer_net, fields)
Definition: dataset.py:130
def __init__(self, dataset, name, indices, batch_size=1, loop_over=False, enforce_batch_size=False)
Definition: dataset.py:61
def commit(self, finish_net)
Definition: dataset.py:150
def init_from_dataframe(self, net, dataframe)
Definition: dataset.py:233
def __init__(self, fields, name=None)
Definition: dataset.py:192
def trim(self, net, multiple_of)
Definition: dataset.py:213
def reset(self, net)
Definition: dataio.py:92
def reader(self, init_net=None, cursor_name=None, batch_size=1, enforce_batch_size=False)
Definition: dataset.py:273
def __init__(self, dataset, name, batch_size=1, enforce_batch_size=False)
Definition: dataset.py:26