Skip to content

Commit

Permalink
Revert "Update typecheck err msg (#32880)"
Browse files Browse the repository at this point in the history
This reverts commit 76c5d56.
  • Loading branch information
jrmccluskey authored Nov 11, 2024
1 parent 2488ca1 commit c17e964
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 66 deletions.
3 changes: 1 addition & 2 deletions sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,8 +901,7 @@ def test_write_messages_with_attributes_error(self, mock_pubsub):

options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with self.assertRaisesRegex(Exception,
r'requires.*PubsubMessage.*applied.*str'):
with self.assertRaisesRegex(Exception, r'Type hint violation'):
with TestPipeline(options=options) as p:
_ = (
p
Expand Down
25 changes: 4 additions & 21 deletions sdks/python/apache_beam/transforms/ptransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,12 +497,13 @@ def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
at_context = ' %s %s' % (input_or_output, context) if context else ''
raise TypeCheckError(
'{type} type hint violation at {label}{context}: expected {hint}, '
'got {actual_type}'.format(
'got {actual_type}\nFull type hint:\n{debug_str}'.format(
type=input_or_output.title(),
label=self.label,
context=at_context,
hint=hint,
actual_type=pvalue_.element_type))
actual_type=pvalue_.element_type,
debug_str=type_hints.debug_str()))

def _infer_output_coder(self, input_type=None, input_coder=None):
# type: (...) -> Optional[coders.Coder]
Expand Down Expand Up @@ -938,25 +939,7 @@ def element_type(side_input):
bindings = getcallargs_forhints(argspec_fn, *arg_types, **kwargs_types)
hints = getcallargs_forhints(
argspec_fn, *input_types[0], **input_types[1])

# First check the main input.
arg_hints = iter(hints.items())
element_arg, element_hint = next(arg_hints)
if not typehints.is_consistent_with(
bindings.get(element_arg, typehints.Any), element_hint):
transform_nest_level = self.label.count("/")
split_producer_label = pvalueish.producer.full_label.split("/")
producer_label = "/".join(
split_producer_label[:transform_nest_level + 1])
raise TypeCheckError(
f"The transform '{self.label}' requires "
f"PCollections of type '{element_hint}' "
f"but was applied to a PCollection of type"
f" '{bindings[element_arg]}' "
f"(produced by the transform '{producer_label}'). ")

# Now check the side inputs.
for arg, hint in arg_hints:
for arg, hint in hints.items():
if arg.startswith('__unknown__'):
continue
if hint is None:
Expand Down
90 changes: 64 additions & 26 deletions sdks/python/apache_beam/transforms/ptransform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,13 +1298,17 @@ class ToUpperCaseWithPrefix(beam.DoFn):
def process(self, element, prefix):
return [prefix + element.upper()]

with self.assertRaisesRegex(typehints.TypeCheckError,
r'Upper.*requires.*str.*applied.*int'):
with self.assertRaises(typehints.TypeCheckError) as e:
(
self.p
| 'T' >> beam.Create([1, 2, 3]).with_output_types(int)
| 'Upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello'))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Upper': "
"requires {} but got {} for element".format(str, int))

def test_do_fn_pipeline_runtime_type_check_satisfied(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True

Expand All @@ -1331,14 +1335,18 @@ class AddWithNum(beam.DoFn):
def process(self, element, num):
return [element + num]

with self.assertRaisesRegex(typehints.TypeCheckError,
r'Add.*requires.*int.*applied.*str'):
with self.assertRaises(typehints.TypeCheckError) as e:
(
self.p
| 'T' >> beam.Create(['1', '2', '3']).with_output_types(str)
| 'Add' >> beam.ParDo(AddWithNum(), 5))
self.p.run()

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Add': "
"requires {} but got {} for element".format(int, str))

def test_pardo_does_not_type_check_using_type_hint_decorators(self):
@with_input_types(a=int)
@with_output_types(typing.List[str])
Expand All @@ -1347,13 +1355,17 @@ def int_to_str(a):

# The function above is expecting an int for its only parameter. However, it
# will receive a str instead, which should result in a raised exception.
with self.assertRaisesRegex(typehints.TypeCheckError,
r'ToStr.*requires.*int.*applied.*str'):
with self.assertRaises(typehints.TypeCheckError) as e:
(
self.p
| 'S' >> beam.Create(['b', 'a', 'r']).with_output_types(str)
| 'ToStr' >> beam.FlatMap(int_to_str))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'ToStr': "
"requires {} but got {} for a".format(int, str))

def test_pardo_properly_type_checks_using_type_hint_decorators(self):
@with_input_types(a=str)
@with_output_types(typing.List[str])
Expand All @@ -1375,8 +1387,7 @@ def to_all_upper_case(a):
def test_pardo_does_not_type_check_using_type_hint_methods(self):
# The first ParDo outputs pcoll's of type int, however the second ParDo is
# expecting pcoll's of type str instead.
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Upper.*requires.*str.*applied.*int'):
with self.assertRaises(typehints.TypeCheckError) as e:
(
self.p
| 'S' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
Expand All @@ -1387,6 +1398,11 @@ def test_pardo_does_not_type_check_using_type_hint_methods(self):
'Upper' >> beam.FlatMap(lambda x: [x.upper()]).with_input_types(
str).with_output_types(str)))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Upper': "
"requires {} but got {} for x".format(str, int))

def test_pardo_properly_type_checks_using_type_hint_methods(self):
# Pipeline should be created successfully without an error
d = (
Expand All @@ -1403,14 +1419,18 @@ def test_pardo_properly_type_checks_using_type_hint_methods(self):
def test_map_does_not_type_check_using_type_hints_methods(self):
# The transform before 'Map' has indicated that it outputs PCollections with
# int's, while Map is expecting one of str.
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Upper.*requires.*str.*applied.*int'):
with self.assertRaises(typehints.TypeCheckError) as e:
(
self.p
| 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
| 'Upper' >> beam.Map(lambda x: x.upper()).with_input_types(
str).with_output_types(str))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Upper': "
"requires {} but got {} for x".format(str, int))

def test_map_properly_type_checks_using_type_hints_methods(self):
# No error should be raised if this type-checks properly.
d = (
Expand All @@ -1429,13 +1449,17 @@ def upper(s):

# Hinted function above expects a str at pipeline construction.
# However, 'Map' should detect that Create has hinted an int instead.
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Upper.*requires.*str.*applied.*int'):
with self.assertRaises(typehints.TypeCheckError) as e:
(
self.p
| 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
| 'Upper' >> beam.Map(upper))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Upper': "
"requires {} but got {} for s".format(str, int))

def test_map_properly_type_checks_using_type_hints_decorator(self):
@with_input_types(a=bool)
@with_output_types(int)
Expand All @@ -1453,8 +1477,7 @@ def bool_to_int(a):
def test_filter_does_not_type_check_using_type_hints_method(self):
# Filter is expecting an int but instead looks to the 'left' and sees a str
# incoming.
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Below 3.*requires.*int.*applied.*str'):
with self.assertRaises(typehints.TypeCheckError) as e:
(
self.p
| 'Strs' >> beam.Create(['1', '2', '3', '4', '5'
Expand All @@ -1463,6 +1486,11 @@ def test_filter_does_not_type_check_using_type_hints_method(self):
str).with_output_types(str)
| 'Below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Below 3': "
"requires {} but got {} for x".format(int, str))

def test_filter_type_checks_using_type_hints_method(self):
# No error should be raised if this type-checks properly.
d = (
Expand All @@ -1480,13 +1508,17 @@ def more_than_half(a):
return a > 0.50

# Func above was hinted to only take a float, yet a str will be passed.
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Half.*requires.*float.*applied.*str'):
with self.assertRaises(typehints.TypeCheckError) as e:
(
self.p
| 'Ints' >> beam.Create(['1', '2', '3', '4']).with_output_types(str)
| 'Half' >> beam.Filter(more_than_half))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Half': "
"requires {} but got {} for a".format(float, str))

def test_filter_type_checks_using_type_hints_decorator(self):
@with_input_types(b=int)
def half(b):
Expand Down Expand Up @@ -2096,10 +2128,14 @@ def test_mean_globally_pipeline_checking_violated(self):
self.p
| 'C' >> beam.Create(['test']).with_output_types(str)
| 'Mean' >> combine.Mean.Globally())
err_msg = e.exception.args[0]
assert "CombinePerKey" in err_msg
assert "Tuple[TypeVariable[K]" in err_msg
assert "Tuple[None, <class 'str'>" in err_msg

expected_msg = \
"Type hint violation for 'CombinePerKey': " \
"requires Tuple[TypeVariable[K], Union[<class 'float'>, <class 'int'>, " \
"<class 'numpy.float64'>, <class 'numpy.int64'>]] " \
"but got Tuple[None, <class 'str'>] for element"

self.assertStartswith(e.exception.args[0], expected_msg)

def test_mean_globally_runtime_checking_satisfied(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True
Expand Down Expand Up @@ -2159,12 +2195,14 @@ def test_mean_per_key_pipeline_checking_violated(self):
typing.Tuple[str, str]))
| 'EvenMean' >> combine.Mean.PerKey())
self.p.run()
err_msg = e.exception.args[0]
assert "CombinePerKey(MeanCombineFn)" in err_msg
assert "requires" in err_msg
assert "Tuple[TypeVariable[K]" in err_msg
assert "applied" in err_msg
assert "Tuple[<class 'str'>, <class 'str'>]" in err_msg

expected_msg = \
"Type hint violation for 'CombinePerKey(MeanCombineFn)': " \
"requires Tuple[TypeVariable[K], Union[<class 'float'>, <class 'int'>, " \
"<class 'numpy.float64'>, <class 'numpy.int64'>]] " \
"but got Tuple[<class 'str'>, <class 'str'>] for element"

self.assertStartswith(e.exception.args[0], expected_msg)

def test_mean_per_key_runtime_checking_satisfied(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/typehints/decorators_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def fn(a: int) -> int:
return a

with self.assertRaisesRegex(TypeCheckError,
r'requires .*int.* but was applied .*str'):
r'requires .*int.* but got .*str'):
_ = ['a', 'b', 'c'] | Map(fn)

# Same pipeline doesn't raise without annotations on fn.
Expand All @@ -423,7 +423,7 @@ def fn(a: int) -> int:
_ = [1, 2, 3] | Map(fn) # Doesn't raise - correct types.

with self.assertRaisesRegex(TypeCheckError,
r'requires .*int.* but was applied .*str'):
r'requires .*int.* but got .*str'):
_ = ['a', 'b', 'c'] | Map(fn)

@decorators.no_annotations
Expand Down
30 changes: 15 additions & 15 deletions sdks/python/apache_beam/typehints/typed_pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ def process(self, element):
self.assertEqual(['1', '2', '3'], sorted(result))

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*applied.*str'):
r'requires.*int.*got.*str'):
['a', 'b', 'c'] | beam.ParDo(MyDoFn())

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*applied.*str'):
r'requires.*int.*got.*str'):
[1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))

def test_typed_dofn_method(self):
Expand All @@ -104,11 +104,11 @@ def process(self, element: int) -> typehints.Tuple[str]:
self.assertEqual(['1', '2', '3'], sorted(result))

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*applied.*str'):
r'requires.*int.*got.*str'):
_ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn())

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*applied.*str'):
r'requires.*int.*got.*str'):
_ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))

def test_typed_dofn_method_with_class_decorators(self):
Expand All @@ -124,12 +124,12 @@ def process(self, element: int) -> typehints.Tuple[str]:

with self.assertRaisesRegex(
typehints.TypeCheckError,
r'requires.*Tuple\[<class \'int\'>, <class \'int\'>\].*applied.*str'):
r'requires.*Tuple\[<class \'int\'>, <class \'int\'>\].*got.*str'):
_ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn())

with self.assertRaisesRegex(
typehints.TypeCheckError,
r'requires.*Tuple\[<class \'int\'>, <class \'int\'>\].*applied.*int'):
r'requires.*Tuple\[<class \'int\'>, <class \'int\'>\].*got.*int'):
_ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))

def test_typed_callable_iterable_output(self):
Expand All @@ -156,11 +156,11 @@ def process(self, element: typehints.Tuple[int, int]) -> \
self.assertEqual(['1', '2', '3'], sorted(result))

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*applied.*str'):
r'requires.*int.*got.*str'):
_ = ['a', 'b', 'c'] | beam.ParDo(my_do_fn)

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*applied.*str'):
r'requires.*int.*got.*str'):
_ = [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn))

def test_typed_callable_instance(self):
Expand All @@ -177,11 +177,11 @@ def do_fn(element: typehints.Tuple[int, int]) -> typehints.Generator[str]:
self.assertEqual(['1', '2', '3'], sorted(result))

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*applied.*str'):
r'requires.*int.*got.*str'):
_ = ['a', 'b', 'c'] | pardo

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*applied.*str'):
r'requires.*int.*got.*str'):
_ = [1, 2, 3] | (pardo | 'again' >> pardo)

def test_filter_type_hint(self):
Expand Down Expand Up @@ -430,7 +430,7 @@ def fn(element: float):
return pcoll | beam.ParDo(fn)

with self.assertRaisesRegex(typehints.TypeCheckError,
r'ParDo.*requires.*float.*applied.*str'):
r'ParDo.*requires.*float.*got.*str'):
_ = ['1', '2', '3'] | MyMap()
with self.assertRaisesRegex(typehints.TypeCheckError,
r'MyMap.*expected.*str.*got.*bytes'):
Expand Down Expand Up @@ -632,14 +632,14 @@ def produces_unkown(e):
return e

@typehints.with_input_types(int)
def accepts_int(e):
def requires_int(e):
return e

class MyPTransform(beam.PTransform):
def expand(self, pcoll):
unknowns = pcoll | beam.Map(produces_unkown)
ints = pcoll | beam.Map(int)
return (unknowns, ints) | beam.Flatten() | beam.Map(accepts_int)
return (unknowns, ints) | beam.Flatten() | beam.Map(requires_int)

_ = [1, 2, 3] | MyPTransform()

Expand Down Expand Up @@ -761,8 +761,8 @@ def test_var_positional_only_side_input_hint(self):

with self.assertRaisesRegex(
typehints.TypeCheckError,
r'requires.*Tuple\[Union\[<class \'int\'>, <class \'str\'>\], ...\].*'
r'applied.*Tuple\[Union\[<class \'float\'>, <class \'int\'>\], ...\]'):
r'requires Tuple\[Union\[<class \'int\'>, <class \'str\'>\], ...\] but '
r'got Tuple\[Union\[<class \'float\'>, <class \'int\'>\], ...\]'):
_ = [1.2] | beam.Map(lambda *_: 'a', 5).with_input_types(int, str)

def test_var_keyword_side_input_hint(self):
Expand Down

0 comments on commit c17e964

Please sign in to comment.