-
Notifications
You must be signed in to change notification settings - Fork 115
Customize transform function to process data
This document introduces how to define function for function
of tf.keras.layers.Lambda
or normalizer_fn
of tf.feature_column.numeric_column
to transform feature.
For tf.keras.layers.Lambda
tf.keras.layers.Lambda(
lambda x: transform_fn(x)
)
For tf.feature_column.numeric_column
tf.feature_column.numeric_column(
name,
dtype=tf.int32,
normalizer_fn=transform_fn
)
The transform function not only can process data during training, but also need to be saved in SavedModel along with keras model. So, we utilize TensorFlow operators to implement the transform function so we can export the function to graph and save it to SaveModel.
Why do we need to transform feature to id?
Though, we can use tf.feature_column.embedding_column
to wrap categroy_column
and transform
the feature into dense tensor. The tf.feature_column.embedding_column
can only transform one column and we must make many embedding_column
when we have many category features. In this case,
we must create many embedding weights for embedding_column
which needs more memory.
Sometimes, we want to split features into a group and make embedding for the values in each group.
To make embedding, we need to transform the feature value to id and then use tf.keras.layers.Embedding
layer.
During transformation, we should guarantee that the ids between features in the same group cannot conflict.
For example, there are 5 features in the training data.
age | workclass | education | marital_status | hours_per_week |
---|---|---|---|---|
39 | State-gov | Bachelors | Never-married | 40 |
50 | Private | Bachelors | Divorced | 45 |
38 | Local-gov | Doctorate | Separated | 35 |
If we want to place "age" and "workclass" to a group and "education", "marital_status" and "hours_per_week"
to another group. For numeric features "age" and "hours_per_week", we can bucket the value to id. For other category features, we can transform the value to id by lookuping a vocabulary list.
Suppose the boundaries for "age" and hours_per_week" are
"age": [49,100]
"hours_per_week": [30,40,50]
The vocabularies are:
"workclass": ["State-gov", "Private", "Local-gov"]
"education": ["Bachelors", "Bachelors", "Doctorate"]
"marital_status": ["Never-married", "Divorced", "Separated"]
In the first group, the id range for "age" is [0, 1, 2] and the id range for "workclass" is [3, 4, 5]. Because the element ids in ["State-gov", "Private", "Local-gov"] must add the offset len([0, 1, 2]) to avoid conflicting with ids of "age". For another group, the "education" ids range is [0, 1, 2] and the "marital_status" ids range is [3, 4, 5] and the "hours_per_week" ids range is [6, 7, 8, 9]. Now, we can transform the training data into the following table.
age | workclass | education | marital_stattus | hours_per_week |
---|---|---|---|---|
0 | 2 | 0 | 3 | 6 |
1 | 3 | 1 | 4 | 7 |
0 | 4 | 2 | 5 | 5 |
We can utilize the tf.feature_column.numeric_column
and define the normalizer_fn
to transform feature to id column by column.
tf.feature_column.numeric_column(
name,
dtype=tf.int32,
normalizer_fn=transform_fn
)
In the following, this document introduces how to define normalizer_fn
transform category and numeric feature to id.
def hash_bucket_id(x, bucket_size, offset=0):
if x.dtype is not tf.string:
x = tf.strings.as_string(x)
return tf.strings.to_hash_bucket_fast(x, bucket_size) + offset
transform_fn = (
lambda x, size=HASH_BUCKET_SIZE, offset=id_offset: (
hash_bucket_id(x, size, offset)
)
)
def vocabulary_lookup_id(x, vocabulary_list, offset=0):
table = lookup_ops.index_table_from_tensor(
vocabulary_list=vocabulary_list, num_oov_buckets=1, default_value=-1
)
return table.lookup(x) + offset
transform_fn = (
lambda x, voca_list=vocabulary_list, offset=id_offset: (
vocabulary_lookup_id(x, voca_list, offset)
)
def bucket_id(x, boundaries, offset=0):
if x.dtype is tf.string:
x = tf.strings.to_number(x, out_type=tf.float32)
else:
x = tf.cast(x, tf.float32)
bucket_id = math_ops._bucketize(x, boundaries=boundaries)
return bucket_id + offset
transform_fn = (
lambda x, boundaries=LOG_BOUNDARIES, offset=id_offset: (
bucket_id(x, boundaries, offset)
)
)
def is_ragged(value):
"""Returns true if `value` is a ragged tensor or ragged tensor value."""
return isinstance(
value, (tf.RaggedTensor, ragged_tensor_value.RaggedTensorValue)
)
def pad_ragged_tensor(
ragged_tensor, default_value=None, name=None, maxlen=None, padding="pre"
):
self = ragged_tensor
with ops.name_scope(name, "RaggedToTensor", [self, default_value]):
if default_value is not None:
default_value = ops.convert_to_tensor(
default_value, name="default_value", dtype=self.dtype
)
# If ragged_rank > 1, then recursively convert the ragged values into a
# `Tensor` before we proceed.
values = self.values
if is_ragged(values):
values = pad_ragged_tensor(values, maxlen=maxlen, padding=padding)
# Tile the default value, if necessary.
if default_value is not None:
if values.shape.ndims is not None:
default_value.shape.with_rank_at_most(values.shape.ndims - 1)
if (
values.shape.ndims is None
or default_value.shape.ndims is None
or values.shape.ndims != default_value.shape.ndims + 1
):
value_shape = array_ops.shape(values)[1:]
default_value = array_ops.broadcast_to(
default_value, value_shape
)
default_value.shape.assert_is_compatible_with(values.shape[1:])
# Get the expected dense shape ([nrows, ncols] + value_shape).
rt_row_lengths = [self.row_splits[1:] - self.row_splits[:-1]]
nrows = (
array_ops.shape(self.row_splits, out_type=self._row_splits.dtype)[
0
]
- 1
)
ncols = math_ops.maximum(math_ops.reduce_max(rt_row_lengths), 0)
if maxlen is not None and not is_ragged(self.values):
ncols = tf.constant(maxlen, dtype=tf.int64)
values_shape = array_ops.shape(values, out_type=self._row_splits.dtype)
value_shape = values_shape[1:]
nvals = values_shape[0]
# Build a default value if none was supplied.
if default_value is None:
default_value = array_ops.zeros(value_shape, dtype=values.dtype)
default_value.shape.assert_is_compatible_with(values.shape[1:])
default_value.set_shape(values.shape[1:])
# Get the row start indices, and expand to shape=[nrows, 1].
starts = array_ops.expand_dims(self.row_splits[:-1], 1)
# Get the row limit indices, and expand to shape=[nrows, 1].
limits = array_ops.expand_dims(self.row_splits[1:], 1)
# Get the column indices, and expand to shape=[1, ncols].
columns = array_ops.expand_dims(math_ops.range(0, ncols), 0)
reverse_columns = array_ops.reverse(columns, [-1])
# Build a list containing the values plus the default value. We will
# use tf.gather to collect values from this list for the `Tensor`
# (using nvals as the index for the default value).
values_and_default = array_ops.concat(
[values, array_ops.stack([default_value])], axis=0
)
# Construct a matrix "indices" pointing into values_and_default. I.e.,
# output[r, c] = values_and_default[indices[r, c].
if padding == "pre" and not is_ragged(self.values):
has_value = starts + reverse_columns < limits
nondefault_index = (
starts
+ columns
- gen_array_ops.reshape(ncols - rt_row_lengths[0], (-1, 1))
)
else:
nondefault_index = starts + columns
has_value = nondefault_index < limits
default_index = array_ops.fill(array_ops.stack([nrows, ncols]), nvals)
indices = array_ops.where(has_value, nondefault_index, default_index)
tensor = array_ops.gather(values_and_default, indices)
# Gather the results into a `Tensor`.
return tensor
def pad_sequence(x, maxlen):
x = tf.strings.split(x, sep=",")
x = tf.strings.to_number(x, tf.int64)
return pad_ragged_tensor(x.values, maxlen=maxlen, padding="pre")
transform_fn = lambda x, maxlen=50: pad_sequence(x, maxlen),