C2W2 assignment questions on "traffic_transform.preprocessing_fn"

Hi

I wonder to seek what I should look into to fix this problem? Here is the error message. This cause the “traffic_transform.preprocessing_fn” failed.

========================================================

TypeError Traceback (most recent call last)
in
13
14 with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
—> 15 transformed_dataset, _ = ((raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(traffic_transform.preprocessing_fn))
16
17 transformed_data, transformed_metadata = transformed_dataset

/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py in ror(self, left, label)
613 pvalueish = _SetInputPValues().visit(pvalueish, replacements)
614 self.pipeline = p
→ 615 result = p.apply(self, pvalueish, label)
616 if deferred:
617 return result

/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
696 transform.type_check_inputs(pvalueish)
697
→ 698 pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
699
700 if type_options is not None and type_options.pipeline_type_check:

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/runner.py in apply(self, transform, input, options)
183 m = getattr(self, 'apply_s' cls.name, None)
184 if m:
→ 185 return m(transform, input, options)
186 raise NotImplementedError(
187 'Execution of [%s] not implemented in runner s.' (transform, self))

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/runner.py in apply_PTransform(self, transform, input, options)
213 def apply_PTransform(self, transform, input, options):
214 # The base case of apply is to call the transform’s expand.
→ 215 return transform.expand(input)
216
217 def run_transform(self,

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in expand(self, dataset)
1269 # e.g. caching the values of expensive computations done in AnalyzeDataset.
1270 transform_fn = (
→ 1271 dataset | ‘AnalyzeDataset’ >> AnalyzeDataset(self._preprocessing_fn))
1272
1273 if Context.get_use_deep_copy_optimization():

/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py in ror(self, pvalueish, _unused)
1089
1090 def ror(self, pvalueish, _unused=None):
→ 1091 return self.transform.ror(pvalueish, self.label)
1092
1093 def expand(self, pvalue):

/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py in ror(self, left, label)
613 pvalueish = _SetInputPValues().visit(pvalueish, replacements)
614 self.pipeline = p
→ 615 result = p.apply(self, pvalueish, label)
616 if deferred:
617 return result

/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
650 try:
651 old_label, transform.label = transform.label, label
→ 652 return self.apply(transform, pvalueish)
653 finally:
654 transform.label = old_label

/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
696 transform.type_check_inputs(pvalueish)
697
→ 698 pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
699
700 if type_options is not None and type_options.pipeline_type_check:

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/runner.py in apply(self, transform, input, options)
183 m = getattr(self, 'apply_s' cls.name, None)
184 if m:
→ 185 return m(transform, input, options)
186 raise NotImplementedError(
187 'Execution of [%s] not implemented in runner s.' (transform, self))

/opt/conda/lib/python3.8/site-packages/apache_beam/runners/runner.py in apply_PTransform(self, transform, input, options)
213 def apply_PTransform(self, transform, input, options):
214 # The base case of apply is to call the transform’s expand.
→ 215 return transform.expand(input)
216
217 def run_transform(self,

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in expand(self, dataset)
1201 def expand(self, dataset):
1202 input_values, input_metadata = dataset
→ 1203 result, cache = super().expand((input_values, None, None, input_metadata))
1204 assert not cache
1205 return result

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in expand(self, dataset)
1002 # need to be serialized to SavedModel.
1003 graph, structured_inputs, structured_outputs = (
→ 1004 impl_helper.trace_preprocessing_function(self._preprocessing_fn, specs,
1005 self._use_tf_compat_v1,
1006 base_temp_dir))

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in trace_preprocessing_function(preprocessing_fn, input_specs, use_tf_compat_v1, base_temp_dir)
714 return _trace_preprocessing_fn_v1(preprocessing_fn, input_specs)
715 else:
→ 716 return _trace_preprocessing_fn_v2(preprocessing_fn, input_specs,
717 base_temp_dir)
718

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in _trace_preprocessing_fn_v2(preprocessing_fn, specs, base_temp_dir)
680 evaluated_replacements=None)
681 with annotators.object_tracker_scope(annotators.ObjectTracker()):
→ 682 concrete_fn = get_traced_transform_fn(
683 preprocessing_fn, specs, tf_graph_context).get_concrete_function()
684 return (concrete_fn.graph,

/opt/conda/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in get_concrete_function(self, *args, **kwargs)
1231 def get_concrete_function(self, *args, **kwargs):
1232 # Implements GenericFunction.get_concrete_function.
→ 1233 concrete = self._get_concrete_function_garbage_collected(*args, **kwargs)
1234 concrete._garbage_collector.release() # pylint: disable=protected-access
1235 return concrete

/opt/conda/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in _get_concrete_function_garbage_collected(self, *args, **kwargs)
1211 if self._stateful_fn is None:
1212 initializers =
→ 1213 self._initialize(args, kwargs, add_initializers_to=initializers)
1214 self._initialize_uninitialized_variables(initializers)
1215

/opt/conda/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in _initialize(self, args, kwds, add_initializers_to)
757 self._graph_deleter = FunctionDeleter(self._lifted_initializer_graph)
758 self._concrete_stateful_fn = (
→ 759 self._stateful_fn._get_concrete_function_internal_garbage_collected( # pylint: disable=protected-access
760 *args, **kwds))
761

/opt/conda/lib/python3.8/site-packages/tensorflow/python/eager/function.py in _get_concrete_function_internal_garbage_collected(self, *args, **kwargs)
3064 args, kwargs = None, None
3065 with self._lock:
→ 3066 graph_function, _ = self._maybe_define_function(args, kwargs)
3067 return graph_function
3068

/opt/conda/lib/python3.8/site-packages/tensorflow/python/eager/function.py in _maybe_define_function(self, args, kwargs)
3461
3462 self._function_cache.missed.add(call_context_key)
→ 3463 graph_function = self._create_graph_function(args, kwargs)
3464 self._function_cache.primary[cache_key] = graph_function
3465

/opt/conda/lib/python3.8/site-packages/tensorflow/python/eager/function.py in _create_graph_function(self, args, kwargs, override_flat_arg_shapes)
3296 arg_names = base_arg_names + missing_arg_names
3297 graph_function = ConcreteFunction(
→ 3298 func_graph_module.func_graph_from_py_func(
3299 self._name,
3300 self._python_function,

/opt/conda/lib/python3.8/site-packages/tensorflow/python/framework/func_graph.py in func_graph_from_py_func(name, python_func, args, kwargs, signature, func_graph, autograph, autograph_options, add_control_dependencies, arg_names, op_return_value, collections, capture_by_value, override_flat_arg_shapes, acd_record_initial_resource_uses)
1005 _, original_func = tf_decorator.unwrap(python_func)
1006
→ 1007 func_outputs = python_func(*func_args, **func_kwargs)
1008
1009 # invariant: func_outputs contains only Tensors, CompositeTensors,

/opt/conda/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in wrapped_fn(*args, **kwds)
666 # the function a weak reference to itself to avoid a reference cycle.
667 with OptionalXlaContext(compile_with_xla):
→ 668 out = weak_wrapped_fn().wrapped(*args, **kwds)
669 return out
670

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in transform_fn(inputs)
637 inputs_copy = tf_utils.copy_tensors(inputs)
638 with tf_graph_context:
→ 639 transformed_features = preprocessing_fn(inputs_copy)
640 # An empty TENSOR_REPLACEMENTS collection symbolizes that there is no
641 # analyzer left for Transform to evaluate. Either if this collection is

~/work/traffic_transform.py in preprocessing_fn(inputs)
54 # Bucketize the feature
55 for key in _BUCKET_FEATURE_KEYS:
—> 56 outputs[_transformed_name(key)] = tft.bucketize(
57 inputs[key],
58 _FEATURE_BUCKET_COUNT[key],

/opt/conda/lib/python3.8/site-packages/tensorflow_transform/common.py in wrapped_fn(*args, **kwargs)
71 collection.append(collections.Counter())
72 collection[0][fn.name] += 1
—> 73 return fn(*args, **kwargs)
74 else:
75 return fn(*args, **kwargs)

TypeError: bucketize() got an unexpected keyword argument ‘always_return_num_quantiles’

========================================================

Patrick

There is the error, it tells you the bucketize() function got an argument which is not reckognized, maybe there is some typo in it!

Hi

I do not know what function and paramter should be fit in for the following.

Use tf.cast to cast the label key to float32 and fill in the missing values.

traffic_volume = tf.cast(

Could you provide some hints?

Thanks
Patrick

Yes, with regards to tf.cast you can always refer to tesnorflow docs:

I dont remember this lab but it is asking an argument to bucketize(), what arguments does this function take!

What helped me recognizing the label key was the code block from section 2.5 - Transform. You’ll need to input the feature to predict in the tf.cast function.
The dtype= parameter is given in the instruction, which is tf.float32.