Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR][DOCS][Structured Streaming] Minor doc fixes around DataFrameWriter and DataStreamWriter #13952

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ def isLocal(self):
def isStreaming(self):
"""Returns true if this :class:`Dataset` contains one or more sources that continuously
return data as it arrives. A :class:`Dataset` that reads data from a streaming source
must be executed as a :class:`StreamingQuery` using the :func:`startStream` method in
:class:`DataFrameWriter`. Methods that return a single answer, (e.g., :func:`count` or
must be executed as a :class:`StreamingQuery` using the :func:`start` method in
:class:`DataStreamWriter`. Methods that return a single answer, (e.g., :func:`count` or
:func:`collect`) will throw an :class:`AnalysisException` when there is a streaming
source present.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object UnsupportedOperationChecker {
def checkForBatch(plan: LogicalPlan): Unit = {
plan.foreachUp {
case p if p.isStreaming =>
throwError("Queries with streaming sources must be executed with write.startStream()")(p)
throwError("Queries with streaming sources must be executed with writeStream.start()")(p)

case _ =>
}
Expand All @@ -40,7 +40,7 @@ object UnsupportedOperationChecker {

if (!plan.isStreaming) {
throwError(
"Queries without streaming sources cannot be executed with write.startStream()")(plan)
"Queries without streaming sources cannot be executed with writeStream.start()")(plan)
}

// Disallow multiple streaming aggregations
Expand Down Expand Up @@ -154,7 +154,7 @@ object UnsupportedOperationChecker {

case ReturnAnswer(child) if child.isStreaming =>
throwError("Cannot return immediate result on streaming DataFrames/Dataset. Queries " +
"with streaming DataFrames/Datasets must be executed with write.startStream().")
"with streaming DataFrames/Datasets must be executed with writeStream.start().")

case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
assertNotSupportedInBatchPlan(
"streaming source",
streamRelation,
Seq("with streaming source", "startStream"))
Seq("with streaming source", "start"))

assertNotSupportedInBatchPlan(
"select on streaming source",
streamRelation.select($"count(*)"),
Seq("with streaming source", "startStream"))
Seq("with streaming source", "start"))


/*
Expand All @@ -70,7 +70,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
// Batch plan in streaming query
testError(
"streaming plan - no streaming source",
Seq("without streaming source", "startStream")) {
Seq("without streaming source", "start")) {
UnsupportedOperationChecker.checkForStreaming(batchRelation.select($"count(*)"), Append)
}

Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,8 @@ class Dataset[T] private[sql](
/**
* Returns true if this Dataset contains one or more sources that continuously
* return data as it arrives. A Dataset that reads data from a streaming source
* must be executed as a [[StreamingQuery]] using the `startStream()` method in
* [[DataFrameWriter]]. Methods that return a single answer, e.g. `count()` or
* must be executed as a [[StreamingQuery]] using the `start()` method in
* [[DataStreamWriter]]. Methods that return a single answer, e.g. `count()` or
* `collect()`, will throw an [[AnalysisException]] when there is a streaming
* source present.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {

/**
* :: Experimental ::
* Specifies the name of the [[StreamingQuery]] that can be started with `startStream()`.
* Specifies the name of the [[StreamingQuery]] that can be started with `start()`.
* This name must be unique among all the currently active queries in the associated SQLContext.
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ trait StreamingQuery {

/**
* Returns the name of the query. This name is unique across all active queries. This can be
* set in the[[org.apache.spark.sql.DataFrameWriter DataFrameWriter]] as
* `dataframe.write().queryName("query").startStream()`.
* set in the [[org.apache.spark.sql.DataStreamWriter DataStreamWriter]] as
* `dataframe.writeStream.queryName("query").start()`.
* @since 2.0.0
*/
def name: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ abstract class StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.DataFrameWriter `DataFrameWriter.startStream()`]],
* [[org.apache.spark.sql.DataStreamWriter `DataStreamWriter.start()`]],
* that is, `onQueryStart` will be called on all listeners before
* `DataFrameWriter.startStream()` returns the corresponding [[StreamingQuery]]. Please
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]]. Please
* don't block this method as it will block your query.
* @since 2.0.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ class StreamSuite extends StreamTest {
}

// Running streaming plan as a batch query
assertError("startStream" :: Nil) {
assertError("start" :: Nil) {
streamInput.toDS.map { i => i }.count()
}

// Running non-streaming plan with as a streaming query
assertError("without streaming sources" :: "startStream" :: Nil) {
assertError("without streaming sources" :: "start" :: Nil) {
val ds = batchInput.map { i => i }
testStream(ds)()
}
Expand Down