diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index b2d06f22f3365..53a098ce49858 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -1005,12 +1005,17 @@ def trigger(self, *, once: bool) -> "DataStreamWriter": def trigger(self, *, continuous: str) -> "DataStreamWriter": ... + @overload + def trigger(self, *, availableNow: bool) -> "DataStreamWriter": + ... + def trigger( self, *, processingTime: Optional[str] = None, once: Optional[bool] = None, continuous: Optional[str] = None, + availableNow: Optional[bool] = None, ) -> "DataStreamWriter": """Set the trigger for the stream query. If this is not set it will run the query as fast as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``. @@ -1030,6 +1035,9 @@ def trigger( a time interval as a string, e.g. '5 seconds', '1 minute'. Set a trigger that runs a continuous query with a given checkpoint interval. Only one trigger can be set. + availableNow : bool, optional + if set to True, set a trigger that processes all available data in multiple + batches then terminates the query. Only one trigger can be set. Notes ----- @@ -1043,12 +1051,14 @@ def trigger( >>> writer = sdf.writeStream.trigger(once=True) >>> # trigger the query for execution every 5 seconds >>> writer = sdf.writeStream.trigger(continuous='5 seconds') + >>> # trigger the query for reading all available data with multiple batches + >>> writer = sdf.writeStream.trigger(availableNow=True) """ - params = [processingTime, once, continuous] + params = [processingTime, once, continuous, availableNow] - if params.count(None) == 3: + if params.count(None) == 4: raise ValueError("No trigger provided") - elif params.count(None) < 2: + elif params.count(None) < 3: raise ValueError("Multiple triggers not allowed.") jTrigger = None @@ -1069,7 +1079,7 @@ def trigger( self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once() # type: ignore[attr-defined] ) - else: + elif continuous is not None: if type(continuous) != str or len(continuous.strip()) == 0: raise ValueError( "Value for continuous must be a non empty string. Got: %s" % continuous @@ -1078,6 +1088,12 @@ def trigger( jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Continuous( # type: ignore[attr-defined] interval ) + else: + if availableNow is not True: + raise ValueError("Value for availableNow must be True. Got: %s" % availableNow) + jTrigger = ( + self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.AvailableNow() # type: ignore[attr-defined] + ) self._jwrite = self._jwrite.trigger(jTrigger) return self