Caffe2 - Python API
A deep learning, cross platform ML framework
feature_sparse_to_dense.py
1 # @package sparse_to_dense
2 # Module caffe2.python.layers.sparse_to_dense
3 from __future__ import absolute_import
4 from __future__ import division
5 from __future__ import print_function
6 from __future__ import unicode_literals
7 
8 from caffe2.python import schema
9 from caffe2.python.layers.layers import (
10  ModelLayer,
11 )
12 import numpy as np
13 
14 
15 class FeatureSparseToDense(ModelLayer):
16 
17  def __init__(self, model, input_record, input_specs,
18  name='feature_sparse_to_dense', **kwargs):
19  """
20  `input_specs` follows the format of FeatureSpec from schema. To be more
21  precise it's a namedtuple that should have:
22  'feature_type', 'feature_names', 'feature_ids'
23  """
24  super(FeatureSparseToDense, self).__init__(model, name,
25  input_record, **kwargs)
26 
27  self.input_specs = input_specs
28 
29  outputs = []
30  for field, feature_specs in self.input_specs:
31  assert len(feature_specs.feature_names) ==\
32  len(feature_specs.feature_ids)
33  if feature_specs.feature_type == 'FLOAT':
34  outputs.append((
35  field,
37  (np.float32, (len(feature_specs.feature_ids), )),
38  self.get_next_blob_reference(field + '_output')
39  )
40  ))
41  elif feature_specs.feature_type == 'ID_LIST':
42  outputs.append((
43  field,
45  ('ranges',
47  (
48  np.int32,
49  (len(feature_specs.feature_ids), 2)
50  ),
51  self.get_next_blob_reference(
52  field + '_ranges')
53  ),
54  ),
55  ('values',
56  schema.Scalar(np.int64,
57  self.get_next_blob_reference(
58  field + '_values')
59  ),
60  )
61  )
62  ))
63  elif feature_specs.feature_type == 'ID_SCORE_LIST':
64  outputs.append((
65  field,
67  ('ranges',
69  (
70  np.int32,
71  (len(feature_specs.feature_ids), 2)
72  ),
73  self.get_next_blob_reference(
74  field + '_ranges')
75  ),
76  ),
77  ('ids',
78  schema.Scalar(np.int64,
79  self.get_next_blob_reference(
80  field + '_ids')
81  ),
82  ),
83  ('scores',
84  schema.Scalar(np.float32,
85  self.get_next_blob_reference(
86  field + '_scores')
87  ),
88  )
89  )
90  ))
91  elif feature_specs.feature_type == 'EMBEDDING':
92  # We don't know dimensions of embeddings in input data.
93  # Even though they should match dimensions from feature config,
94  # we keep ranges blob to check input data later.
95  outputs.append((
96  field,
98  ('ranges',
100  (
101  np.int32,
102  (len(feature_specs.feature_ids), 2)
103  ),
104  self.get_next_blob_reference(
105  field + '_ranges')
106  ),
107  ),
108  ('values',
109  schema.Scalar(np.float32,
110  self.get_next_blob_reference(
111  field + '_values')
112  ),
113  )
114  )
115  ))
116  else:
117  raise TypeError(
118  "Unsupported input type: {0}".
119  format(feature_specs.feature_type))
120 
121  # TODO(amalevich): This schema is producing ranges. And thus if there is
122  # something using it it should support ranges as well. It might be
123  # confusing, if we don't add better support for ranges/have it as a
124  # first layer
126  *outputs
127  )
128 
129  # TODO(amalevich): Consider moving this data to schema, instead
130  # Structs doens't support attaching metadata to them and clonning
131  # will break things badly, but this is the most elegant way to pass
132  # this info around. Should we change it or it'll be too much work and
133  # not worse it?
134  for field, feature_specs in input_specs:
135  schema.attach_metadata_to_scalars(
136  self.output_schema[field],
138  feature_specs=feature_specs)
139  )
140  self.zero = model.global_constants['ZERO']
141  self.zero_range = model.global_constants['ZERO_RANGE']
142 
143  # Add operators to all types that need to be densified
144  def add_ops(self, net):
145  record = self.input_record
146  for field, feature_specs in self.input_specs:
147  if feature_specs.feature_type == 'FLOAT':
148  net.SparseToDenseMask(
149  [
150  record[field].keys(),
151  record[field].values(),
152  self.zero,
153  record[field].lengths(),
154  ],
155  [
156  self.output_schema[field](),
157  ],
158  mask=feature_specs.feature_ids,
159  )
160  elif feature_specs.feature_type == 'ID_LIST':
161  id_list_ranges = net.LengthsToRanges(
162  record[field].values.lengths(),
163  net.NextScopedBlob('id_list_ranges')
164  )
165  net.SparseToDenseMask(
166  [
167  record[field].keys(), id_list_ranges, self.zero_range,
168  record[field].lengths()
169  ],
170  self.output_schema[field].ranges(),
171  mask=feature_specs.feature_ids,
172  )
173  # Alias helps to enforce the fact that all SparseToDense calls
174  # produce new blobs.
175  # Reusing blob names might result in some weird consequences
176  # during the delivery time, when content of the blobs is
177  # generated based on the inputSpecs.
178  net.Alias(record[field].values.items(),
179  self.output_schema[field].values())
180  elif feature_specs.feature_type == 'ID_SCORE_LIST':
181  # TODO: merge this to the case above?
182  id_list_ranges = net.LengthsToRanges(
183  record[field].values.lengths(),
184  net.NextScopedBlob('id_score_list_ranges')
185  )
186  net.SparseToDenseMask(
187  [
188  record[field].keys(), id_list_ranges, self.zero_range,
189  record[field].lengths()
190  ],
191  self.output_schema[field].ranges(),
192  mask=feature_specs.feature_ids,
193  )
194  # Alias helps to enforce the fact that all SparseToDense calls
195  # produce new blobs.
196  # Reusing blob names might result in some weird consequences
197  # during the delivery time, when content of the blobs is
198  # generated based on the inputSpecs.
199  net.Alias(record[field].values.keys(),
200  self.output_schema[field].ids())
201  net.Alias(record[field].values.values(),
202  self.output_schema[field].scores())
203  elif feature_specs.feature_type == 'EMBEDDING':
204  ranges = net.LengthsToRanges(
205  record[field].values.lengths(),
206  net.NextScopedBlob('embeddings_ranges')
207  )
208  net.SparseToDenseMask(
209  [
210  record[field].keys(),
211  ranges,
212  self.zero_range,
213  record[field].lengths()
214  ],
215  self.output_schema[field].ranges(),
216  mask=feature_specs.feature_ids,
217  )
218  # Alias helps to enforce the fact that all SparseToDense calls
219  # produce new blobs.
220  # Reusing blob names might result in some weird consequences
221  # during the delivery time, when content of the blobs is
222  # generated based on the inputSpecs.
223  net.Alias(record[field].values.items(),
224  self.output_schema[field].values())
225 
226  def get_metadata(self):
227  metadata = []
228  for field, feature_specs in self.input_specs:
229  metadata.append(
230  (
231  {
232  'type': feature_specs.feature_type,
233  'names': feature_specs.feature_names,
234  'ids': feature_specs.feature_ids,
235  },
236  self.output_schema[field].field_blobs(),
237  self.output_schema[field].field_types()
238  )
239  )
240  if feature_specs.feature_type == 'FLOAT':
241  metadata[-1][0]['cardinality'] = 1
242  return metadata
def __init__(self, model, input_record, input_specs, name='feature_sparse_to_dense', kwargs)