-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-37154][PYTHON] Inline hints for pyspark.rdd #35252
Conversation
7ed4d7b
to
f59f408
Compare
@overload | ||
def get(self, key: str) -> Optional[str]: | ||
... | ||
|
||
@overload | ||
def get(self, key: str, defaultValue: None) -> Optional[str]: | ||
... | ||
|
||
@overload | ||
def get(self, key: str, defaultValue: str) -> str: | ||
... | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are added to clearly indicate which calls can result in None
and, in turn, avoid ignores or casts in rdd.py
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq: Maybe that is a stupid question, but I'd like to ask if
def get(self, key: str, defaultValue: Optional[str]) -> Optional[str]:
can't cover the both
def get(self, key: str, defaultValue: None) -> Optional[str]:
and
def get(self, key: str, defaultValue: str) -> str:
??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It covers both, but doesn't capture the same relationship between arguments. With (SparkConf, str, str) -> str
we know that
conf.get("foo", "42")
is str
, which saves as cast / ignores / asserts not None later.
With only (SparkConf, str, Optional[str]) -> Optional[str]
we still have to assert that result is not None
.
(There might be other way of capturing this through type parameters, i.e. (SparkConf, str, T) -> T
where T
is TypeVar("T", None, str)
)
def dumps(self, obj): | ||
""" | ||
Serialize an object into a byte array. | ||
When batching is used, this will be called with an array of objects. | ||
""" | ||
raise NotImplementedError | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In rdd.py
we assume that implementations have dumps
method.
@@ -1421,7 +1422,7 @@ def runJob( | |||
self, | |||
rdd: "RDD[T]", | |||
partitionFunc: Callable[[Iterable[T]], Iterable[U]], | |||
partitions: Optional[List[int]] = None, | |||
partitions: Optional[Sequence[int]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use range
in rdd.py
, so we need generic type of collection.
""" | ||
The :class:`SparkContext` that this RDD was created on. | ||
""" | ||
return self.ctx | ||
|
||
def cache(self): | ||
def cache(self: "RDD[T]") -> "RDD[T]": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Covariant types cannot be reliably used, so I used T
where arbitrary RDD
is used.
@@ -1440,9 +1666,9 @@ def mean(self): | |||
>>> sc.parallelize([1, 2, 3]).mean() | |||
2.0 | |||
""" | |||
return self.stats().mean() | |||
return self.stats().mean() # type: ignore[return-value] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might revisit StatCounter
later (maybe it should be generic, but it is tricky to do right if input is empty), but for now let's use ignores.
@overload | ||
def toDF( | ||
self: "RDD[RowLike]", | ||
schema: Optional[Union[List[str], Tuple[str, ...]]] = None, | ||
sampleRatio: Optional[float] = None, | ||
) -> "DataFrame": | ||
... | ||
|
||
@overload | ||
def toDF( | ||
self: "RDD[RowLike]", schema: Optional[Union["StructType", str]] = None | ||
) -> "DataFrame": | ||
... | ||
|
||
@overload | ||
def toDF( | ||
self: "RDD[AtomicValue]", | ||
schema: Union["AtomicType", str], | ||
) -> "DataFrame": | ||
... | ||
|
||
def toDF( | ||
self: "RDD[Any]", schema: Optional[Any] = None, sampleRatio: Optional[float] = None | ||
) -> "DataFrame": | ||
raise RuntimeError("""RDD.toDF was called before SparkSession was initialized.""") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not very happy about this, but as far as I can tell it is the only way to type check toDF
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed that this was merged. So I guess we can wait for upgrade to mypy to dev and drop the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fine otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't look at this in details, though, but looks fine.
Merged into master. Thanks all! |
What changes were proposed in this pull request?
This PR proposes migration of type hints for
pyspark.rdd
from stub file to inline annotation.Why are the changes needed?
As a part of ongoing process of migration of stubs to inline hints.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests + new data tests.