Hi
I wonder to seek what I should look into to fix this problem? Here is the error message. This cause the “traffic_transform.preprocessing_fn” failed.
========================================================
TypeError Traceback (most recent call last)
in
13
14 with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
—> 15 transformed_dataset, _ = ((raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(traffic_transform.preprocessing_fn))
16
17 transformed_data, transformed_metadata = transformed_dataset
/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py in ror(self, left, label)
613 pvalueish = _SetInputPValues().visit(pvalueish, replacements)
614 self.pipeline = p
→ 615 result = p.apply(self, pvalueish, label)
616 if deferred:
617 return result
/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
696 transform.type_check_inputs(pvalueish)
697
→ 698 pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
699
700 if type_options is not None and type_options.pipeline_type_check:
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/runner.py in apply(self, transform, input, options)
183 m = getattr(self, 'apply_s' cls.name, None)
184 if m:
→ 185 return m(transform, input, options)
186 raise NotImplementedError(
187 'Execution of [%s] not implemented in runner s.' (transform, self))
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/runner.py in apply_PTransform(self, transform, input, options)
213 def apply_PTransform(self, transform, input, options):
214 # The base case of apply is to call the transform’s expand.
→ 215 return transform.expand(input)
216
217 def run_transform(self,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in expand(self, dataset)
1269 # e.g. caching the values of expensive computations done in AnalyzeDataset.
1270 transform_fn = (
→ 1271 dataset | ‘AnalyzeDataset’ >> AnalyzeDataset(self._preprocessing_fn))
1272
1273 if Context.get_use_deep_copy_optimization():
/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py in ror(self, pvalueish, _unused)
1089
1090 def ror(self, pvalueish, _unused=None):
→ 1091 return self.transform.ror(pvalueish, self.label)
1092
1093 def expand(self, pvalue):
/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py in ror(self, left, label)
613 pvalueish = _SetInputPValues().visit(pvalueish, replacements)
614 self.pipeline = p
→ 615 result = p.apply(self, pvalueish, label)
616 if deferred:
617 return result
/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
650 try:
651 old_label, transform.label = transform.label, label
→ 652 return self.apply(transform, pvalueish)
653 finally:
654 transform.label = old_label
/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
696 transform.type_check_inputs(pvalueish)
697
→ 698 pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
699
700 if type_options is not None and type_options.pipeline_type_check:
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/runner.py in apply(self, transform, input, options)
183 m = getattr(self, 'apply_s' cls.name, None)
184 if m:
→ 185 return m(transform, input, options)
186 raise NotImplementedError(
187 'Execution of [%s] not implemented in runner s.' (transform, self))
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/runner.py in apply_PTransform(self, transform, input, options)
213 def apply_PTransform(self, transform, input, options):
214 # The base case of apply is to call the transform’s expand.
→ 215 return transform.expand(input)
216
217 def run_transform(self,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in expand(self, dataset)
1201 def expand(self, dataset):
1202 input_values, input_metadata = dataset
→ 1203 result, cache = super().expand((input_values, None, None, input_metadata))
1204 assert not cache
1205 return result
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in expand(self, dataset)
1002 # need to be serialized to SavedModel.
1003 graph, structured_inputs, structured_outputs = (
→ 1004 impl_helper.trace_preprocessing_function(self._preprocessing_fn, specs,
1005 self._use_tf_compat_v1,
1006 base_temp_dir))
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in trace_preprocessing_function(preprocessing_fn, input_specs, use_tf_compat_v1, base_temp_dir)
714 return _trace_preprocessing_fn_v1(preprocessing_fn, input_specs)
715 else:
→ 716 return _trace_preprocessing_fn_v2(preprocessing_fn, input_specs,
717 base_temp_dir)
718
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in _trace_preprocessing_fn_v2(preprocessing_fn, specs, base_temp_dir)
680 evaluated_replacements=None)
681 with annotators.object_tracker_scope(annotators.ObjectTracker()):
→ 682 concrete_fn = get_traced_transform_fn(
683 preprocessing_fn, specs, tf_graph_context).get_concrete_function()
684 return (concrete_fn.graph,
/opt/conda/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in get_concrete_function(self, *args, **kwargs)
1231 def get_concrete_function(self, *args, **kwargs):
1232 # Implements GenericFunction.get_concrete_function.
→ 1233 concrete = self._get_concrete_function_garbage_collected(*args, **kwargs)
1234 concrete._garbage_collector.release() # pylint: disable=protected-access
1235 return concrete
/opt/conda/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in _get_concrete_function_garbage_collected(self, *args, **kwargs)
1211 if self._stateful_fn is None:
1212 initializers =
→ 1213 self._initialize(args, kwargs, add_initializers_to=initializers)
1214 self._initialize_uninitialized_variables(initializers)
1215
/opt/conda/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in _initialize(self, args, kwds, add_initializers_to)
757 self._graph_deleter = FunctionDeleter(self._lifted_initializer_graph)
758 self._concrete_stateful_fn = (
→ 759 self._stateful_fn._get_concrete_function_internal_garbage_collected( # pylint: disable=protected-access
760 *args, **kwds))
761
/opt/conda/lib/python3.8/site-packages/tensorflow/python/eager/function.py in _get_concrete_function_internal_garbage_collected(self, *args, **kwargs)
3064 args, kwargs = None, None
3065 with self._lock:
→ 3066 graph_function, _ = self._maybe_define_function(args, kwargs)
3067 return graph_function
3068
/opt/conda/lib/python3.8/site-packages/tensorflow/python/eager/function.py in _maybe_define_function(self, args, kwargs)
3461
3462 self._function_cache.missed.add(call_context_key)
→ 3463 graph_function = self._create_graph_function(args, kwargs)
3464 self._function_cache.primary[cache_key] = graph_function
3465
/opt/conda/lib/python3.8/site-packages/tensorflow/python/eager/function.py in _create_graph_function(self, args, kwargs, override_flat_arg_shapes)
3296 arg_names = base_arg_names + missing_arg_names
3297 graph_function = ConcreteFunction(
→ 3298 func_graph_module.func_graph_from_py_func(
3299 self._name,
3300 self._python_function,
/opt/conda/lib/python3.8/site-packages/tensorflow/python/framework/func_graph.py in func_graph_from_py_func(name, python_func, args, kwargs, signature, func_graph, autograph, autograph_options, add_control_dependencies, arg_names, op_return_value, collections, capture_by_value, override_flat_arg_shapes, acd_record_initial_resource_uses)
1005 _, original_func = tf_decorator.unwrap(python_func)
1006
→ 1007 func_outputs = python_func(*func_args, **func_kwargs)
1008
1009 # invariant: func_outputs
contains only Tensors, CompositeTensors,
/opt/conda/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py in wrapped_fn(*args, **kwds)
666 # the function a weak reference to itself to avoid a reference cycle.
667 with OptionalXlaContext(compile_with_xla):
→ 668 out = weak_wrapped_fn().wrapped(*args, **kwds)
669 return out
670
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in transform_fn(inputs)
637 inputs_copy = tf_utils.copy_tensors(inputs)
638 with tf_graph_context:
→ 639 transformed_features = preprocessing_fn(inputs_copy)
640 # An empty TENSOR_REPLACEMENTS
collection symbolizes that there is no
641 # analyzer left for Transform to evaluate. Either if this collection is
~/work/traffic_transform.py in preprocessing_fn(inputs)
54 # Bucketize the feature
55 for key in _BUCKET_FEATURE_KEYS:
—> 56 outputs[_transformed_name(key)] = tft.bucketize(
57 inputs[key],
58 _FEATURE_BUCKET_COUNT[key],
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/common.py in wrapped_fn(*args, **kwargs)
71 collection.append(collections.Counter())
72 collection[0][fn.name] += 1
—> 73 return fn(*args, **kwargs)
74 else:
75 return fn(*args, **kwargs)
TypeError: bucketize() got an unexpected keyword argument ‘always_return_num_quantiles’
========================================================
Patrick