Skip to content

Commit

Permalink
[SPARK-36533][SS][FOLLOWUP] Support Trigger.AvailableNow in PySpark
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to add Trigger.AvailableNow in PySpark on top of #33763.

### Why are the changes needed?

We missed adding Trigger.AvailableNow in PySpark in #33763.

### Does this PR introduce _any_ user-facing change?

Yes, Trigger.AvailableNow will be available in PySpark as well.

### How was this patch tested?

Added simple validation in PySpark doc. Manually tested as below:

```
>>> spark.readStream.format("text").load("/WorkArea/ScalaProjects/spark-apache/dist/inputs").writeStream.format("console").trigger(once=True).start()
<pyspark.sql.streaming.StreamingQuery object at 0x118dff6d0>
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
|    a|
|    b|
|    c|
|    d|
|    e|
+-----+

>>> spark.readStream.format("text").load("/WorkArea/ScalaProjects/spark-apache/dist/inputs").writeStream.format("console").trigger(availableNow=True).start()
<pyspark.sql.streaming.StreamingQuery object at 0x118dffe50>
>>> -------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
|    a|
|    b|
|    c|
|    d|
|    e|
+-----+

>>> spark.readStream.format("text").option("maxfilespertrigger", "2").load("/WorkArea/ScalaProjects/spark-apache/dist/inputs").writeStream.format("console").trigger(availableNow=True).start()
<pyspark.sql.streaming.StreamingQuery object at 0x118dff820>
>>> -------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
|    a|
|    b|
+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
|    c|
|    d|
+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+
|value|
+-----+
|    e|
+-----+

>>>
```

Closes #34592 from HeartSaVioR/SPARK-36533-FOLLOWUP-pyspark.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Nov 14, 2021
1 parent 775e05f commit edbc7cf
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -1005,12 +1005,17 @@ def trigger(self, *, once: bool) -> "DataStreamWriter":
def trigger(self, *, continuous: str) -> "DataStreamWriter":
...

@overload
def trigger(self, *, availableNow: bool) -> "DataStreamWriter":
...

def trigger(
self,
*,
processingTime: Optional[str] = None,
once: Optional[bool] = None,
continuous: Optional[str] = None,
availableNow: Optional[bool] = None,
) -> "DataStreamWriter":
"""Set the trigger for the stream query. If this is not set it will run the query as fast
as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
Expand All @@ -1030,6 +1035,9 @@ def trigger(
a time interval as a string, e.g. '5 seconds', '1 minute'.
Set a trigger that runs a continuous query with a given checkpoint
interval. Only one trigger can be set.
availableNow : bool, optional
if set to True, set a trigger that processes all available data in multiple
batches then terminates the query. Only one trigger can be set.
Notes
-----
Expand All @@ -1043,12 +1051,14 @@ def trigger(
>>> writer = sdf.writeStream.trigger(once=True)
>>> # trigger the query for execution every 5 seconds
>>> writer = sdf.writeStream.trigger(continuous='5 seconds')
>>> # trigger the query for reading all available data with multiple batches
>>> writer = sdf.writeStream.trigger(availableNow=True)
"""
params = [processingTime, once, continuous]
params = [processingTime, once, continuous, availableNow]

if params.count(None) == 3:
if params.count(None) == 4:
raise ValueError("No trigger provided")
elif params.count(None) < 2:
elif params.count(None) < 3:
raise ValueError("Multiple triggers not allowed.")

jTrigger = None
Expand All @@ -1069,7 +1079,7 @@ def trigger(
self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once() # type: ignore[attr-defined]
)

else:
elif continuous is not None:
if type(continuous) != str or len(continuous.strip()) == 0:
raise ValueError(
"Value for continuous must be a non empty string. Got: %s" % continuous
Expand All @@ -1078,6 +1088,12 @@ def trigger(
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Continuous( # type: ignore[attr-defined]
interval
)
else:
if availableNow is not True:
raise ValueError("Value for availableNow must be True. Got: %s" % availableNow)
jTrigger = (
self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.AvailableNow() # type: ignore[attr-defined]
)

self._jwrite = self._jwrite.trigger(jTrigger)
return self
Expand Down

0 comments on commit edbc7cf

Please sign in to comment.