4 Implementation of an in-memory dataset with structured schema. 6 Use this to store and iterate through datasets with complex schema that 9 Iterating through entries of this dataset is very fast since the dataset 10 is stored as a set of native Caffe2 tensors, thus no type conversion or 11 deserialization is necessary. 13 from __future__
import absolute_import
14 from __future__
import division
15 from __future__
import print_function
16 from __future__
import unicode_literals
21 Struct, from_blob_list, from_column_list, InitEmptyRecord)
26 def __init__(self, dataset, name, batch_size=1, enforce_batch_size=False):
27 """Don't call this directly. Instead, use dataset.reader()""" 28 Reader.__init__(self, dataset.content())
30 self.
name = name
or (dataset.name +
'_cursor')
35 def setup_ex(self, init_net, exit_net):
37 self.
cursor = init_net.CreateTreeCursor(
39 init_net.NextScopedBlob(self.
name),
40 fields=self.dataset.fields)
42 def read(self, read_net):
43 assert self.
cursor,
'setup not called.' 44 content = self.dataset.content()
45 with core.NameScope(read_net.NextName(self.
name)):
46 fields = read_net.ReadNextBatch(
47 [self.
cursor] + content.field_blobs(),
48 content.field_names(),
53 return (read_net.IsEmpty([fields[0]]), fields)
56 net.ResetCursor([self.
cursor], [])
60 def __init__(self, dataset, name, indices, batch_size=1, loop_over=False,
61 enforce_batch_size=
False):
62 """Don't call this directly. Instead, use dataset.random_reader()""" 63 Reader.__init__(self, dataset.content())
66 self.
name = name
or (dataset.name +
'_cursor')
72 def setup_ex(self, init_net, exit_net):
74 self.
cursor = init_net.CreateTreeCursor(
77 fields=self.dataset.fields)
80 net.ResetCursor([self.
cursor], [])
82 def computeoffset(self, net):
84 offsets = net.ComputeOffset(
85 [self.
cursor] + self.dataset.content().field_blobs(),
89 def sort_and_shuffle(self, net, sort_by_field=None,
90 shuffle_size=1, batch_size=1):
92 content = self.dataset.content()
93 sort_by_field_idx = -1
95 assert sort_by_field
in content.field_names(), (
96 'Must be valid field.')
97 sort_by_field_idx = content.field_names().index(sort_by_field)
100 indices = net.SortAndShuffle(
101 [self.
cursor] + content.field_blobs(),
103 sort_by_field_idx=sort_by_field_idx,
104 shuffle_size=shuffle_size,
105 batch_size=batch_size)
108 def read(self, read_net):
109 with core.NameScope(read_net.NextName(self.
name)):
110 fields = read_net.ReadRandomBatch(
112 self.dataset.content().field_blobs()),
113 self.dataset.content().field_names(),
117 return (read_net.IsEmpty([fields[0]]), fields)
122 """Don't call this directly. Use dataset.writer() instead.""" 126 def setup_ex(self, init_net, exit_net):
127 if self.
mutex is None:
128 self.
mutex = init_net.CreateMutex([])
130 def write(self, writer_net, fields):
132 Add operations to `net` that append the blobs in `fields` to the end 133 of the dataset. An additional operator will also be added that checks 134 the consistency of the data in `fields` against the dataset schema. 137 writer_net: The net that will contain the Append operators. 138 fields: A list of BlobReference to be appeneded to this dataset. 140 assert self.
mutex is not None,
'setup not called.' 141 field_blobs = self._content.field_blobs()
142 assert len(fields) == len(field_blobs), (
143 'Expected %s fields, got %s.' % (len(field_blobs), len(fields)))
144 writer_net.CheckDatasetConsistency(
145 fields, [], fields=self._content.field_names())
146 writer_net.AtomicAppend(
147 [self.
mutex] + field_blobs + list(fields),
151 """Commit is a no-op for an in-memory dataset.""" 155 def Const(net, value, dtype=None, name=None):
157 Create a 'constant' by first creating an external input in the given 158 net, and then feeding the corresponding blob with its provided value 159 in the current workspace. The name is automatically generated in order 160 to avoid clashes with existing blob names. 162 assert isinstance(net, core.Net),
'net must be a core.Net instance.' 163 value = np.array(value, dtype=dtype)
164 blob = net.AddExternalInput(net.NextName(prefix=name))
165 workspace.FeedBlob(str(blob), value)
169 def execution_step_with_progress(name, init_net, substeps, rows_read):
171 report_net = core.Net(
'report_net')
172 report_net.Print([rows_read], [])
173 return core.execution_step(
176 report_net=report_net,
177 concurrent_substeps=
True,
182 """Represents an in-memory dataset with fixed schema. 184 Use this to store and iterate through datasets with complex schema that 187 Iterating through entries of this dataset is very fast since the dataset 188 is stored as a set of native Caffe2 tensors, thus no type conversion or 189 deserialization is necessary. 193 """Create an un-initialized dataset with schema provided by `fields`. 195 Before this dataset can be used, it must be initialized, either by 196 `init_empty` or `init_from_dataframe`. 199 fields: either a schema.Struct or a list of field names in a format 200 compatible with the one described in schema.py. 201 name: optional name to prepend to blobs that will store the data. 203 assert isinstance(fields, list)
or isinstance(fields, Struct), (
204 'fields must be either a Struct or a list of raw field names.')
205 if isinstance(fields, list):
206 fields = from_column_list(fields)
208 self.
fields = fields.field_names()
210 self.
name = name
or 'dataset' 211 self.
field_blobs = fields.field_blobs()
if fields.has_blobs()
else None 213 def trim(self, net, multiple_of):
215 Trims the contents of this dataset so that the number of records is 216 multiple of the given argument. 222 multiple_of=multiple_of)
225 """Initialize the blobs for this dataset with empty values. 227 Empty arrays will be immediately fed into the current workspace, 228 and `init_net` will take those blobs as external inputs. 231 init_net, self.schema.clone_schema()).field_blobs()
234 """Initialize the blobs for this dataset from a Pandas dataframe. 236 Each column of the dataframe will be immediately fed into the current 237 workspace, and the `net` will take this blobs as external inputs. 239 assert len(self.
fields) == len(dataframe.columns)
241 Const(net, dataframe.as_matrix([col]).flatten(), name=field)
242 for col, field
in enumerate(self.
fields)]
246 Return the list of BlobReference pointing to the blobs that contain 247 the data for this dataset. 254 Return a Record of BlobReferences pointing to the full content of 260 """Return the list of field names for this dataset.""" 263 def field_types(self):
265 Return the list of field dtypes for this dataset. 267 If a list of strings, not a schema.Struct, was passed to the 268 constructor, this will return a list of dtype(np.void). 272 def reader(self, init_net=None, cursor_name=None, batch_size=1,
273 enforce_batch_size=
False):
274 """Create a Reader object that is used to iterate through the dataset. 276 This will append operations to `init_net` that create a TreeCursor, 277 used to iterate through the data. 279 NOTE: Currently, it is not safe to append to a dataset while reading. 282 init_net: net that will be run once to create the cursor. 283 cursor_name: optional name for the blob containing a pointer 285 batch_size: how many samples to read per iteration. 288 A _DatasetReader that can be used to create operators that will 289 iterate through the dataset. 291 assert self.
field_blobs,
'Dataset not initialized.' 294 if init_net
is not None:
295 reader.setup_ex(init_net,
None)
298 def random_reader(self, init_net=None, indices=None, cursor_name=None,
299 batch_size=1, loop_over=
False, enforce_batch_size=
False):
300 """Create a Reader object that is used to iterate through the dataset. 302 NOTE: The reader order depends on the order in indices. 305 init_net: net that will be run once to create the cursor. 306 indices: blob of reading order 307 cursor_name: optional name for the blob containing a pointer 309 batch_size: how many samples to read per iteration. 310 loop_over: repeat the dataset indefinitely (in the same order) 313 A DatasetReader that can be used to create operators that will 314 iterate through the dataset according to indices. 316 assert self.
field_blobs,
'Dataset not initialized.' 318 self, cursor_name, indices, batch_size, loop_over,
320 if init_net
is not None:
321 reader.setup_ex(init_net,
None)
325 """Create a Writer that can be used to append entries into the dataset. 327 NOTE: Currently, it is not safe to append to a dataset 328 while reading from it. 329 NOTE: Currently implementation of writer is not thread safe. 333 init_net: net that will be run once in order to create the writer. 336 assert self.
field_blobs,
'Dataset not initialized.' 338 if init_net
is not None:
339 writer.setup_ex(init_net,
None)
def writer(self, init_net=None)
def random_reader(self, init_net=None, indices=None, cursor_name=None, batch_size=1, loop_over=False, enforce_batch_size=False)
def init_empty(self, init_net)
def write(self, writer_net, fields)
def __init__(self, dataset, name, indices, batch_size=1, loop_over=False, enforce_batch_size=False)
def commit(self, finish_net)
def init_from_dataframe(self, net, dataframe)
def __init__(self, fields, name=None)
def __init__(self, content)
def trim(self, net, multiple_of)
def reader(self, init_net=None, cursor_name=None, batch_size=1, enforce_batch_size=False)
def __init__(self, dataset, name, batch_size=1, enforce_batch_size=False)