Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23099][SS] Migrate foreach sink to DataSourceV2 #20552

Closed
wants to merge 6 commits into from

Conversation

jose-torres
Copy link
Contributor

What changes were proposed in this pull request?

Migrate the foreach sink to the DataSourceV2 API.

How was this patch tested?

existing unit tests, and new test to verify edge case

@jose-torres
Copy link
Contributor Author

/cc @tdas

@SparkQA
Copy link

SparkQA commented Feb 8, 2018

Test build #87230 has finished for PR 20552 at commit 44de1ea.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport
  • case class ForeachInternalWriter[T: Encoder](
  • case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
  • class ForeachDataWriter[T : Encoder](

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine, just a few comments. But one major concern I have is the way we are indirectly exposing the batchId to the DataWriter (using job properties). If we are not exposing it directly, then essentially we are saying is that other data source build by general developers are not supposed to use batch ids in the executors for any purpose. In a way that is very odd: there is some transactional logic that you can write with ForeachWriter (which exposes the id), but you cannot write the same logic with DataSource V2 APIs.

I know that the alternative is also not great; we will have to again split the StreamWriter to separate APIs for MicroBatch and Continuous.

}

case class ForeachInternalWriter[T: Encoder](
writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: params on different lines

}
}

case class ForeachInternalWriter[T: Encoder](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is really a small class. Maybe inline this rather than define a confusing name...InternalWriter

}
}

case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly ... maybe inline this class as well. its very small.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually.. probably should not inline this. its outer closure may not be serializable in that case.

}
}

class ForeachDataWriter[T : Encoder](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs describing the implementation of this DataWriter, especially the lifecycle of ForeachWriter (should go here than inline comments).

private def openAndSetState(epochId: Long) = {
// Create a new writer by roundtripping through the serialization for compatibility.
// In the old API, a writer instantiation would never get reused.
val byteStream = new ByteArrayOutputStream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you serializing and deserializing here? If you are reserializing the ForeachWriter, doesnt this mean that you are going to retain state (of the non-transient fields) across them? Is that what you want?

seems the best thing to do is to serialize the writer at the driver, send the bytes to the task, and then deserialize repeatedly. then you only incur the cost of deserializing between epochs and you always start with a fresh copy of the ForeachWriter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right; this suggestion is what we really want.

}

class ForeachDataWriter[T : Encoder](
private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], partitionId: Int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

params in separate lines.

@@ -255,6 +255,32 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
query.stop()
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be a test with continuous processing + foreach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good instinct, it didn't quite work. Added the test.

@SparkQA
Copy link

SparkQA commented Feb 8, 2018

Test build #87231 has finished for PR 20552 at commit 87d0bc8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jose-torres
Copy link
Contributor Author

jose-torres commented Feb 9, 2018

It's my intent to say that other data sources built by general developers aren't supposed to use batch ids in the executors for any purpose. In addition to the issue you mentioned, I don't think there's a compelling reason to do so in the DataSourceV2 model, and I worry the functionality would make it easy to write transactional logic which seems correct but isn't.

Since this interface is still evolving, I think it makes sense to revisit the question if we notice a scenario where it's infeasible to rewrite a piece of transactional logic to not use the batch ID in the executor.

@SparkQA
Copy link

SparkQA commented Feb 9, 2018

Test build #87241 has finished for PR 20552 at commit a33a35c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jose-torres
Copy link
Contributor Author

Filed SPARK-23416 for the unrelated failure in build 87241.

@SparkQA
Copy link

SparkQA commented Feb 13, 2018

Test build #87417 has finished for PR 20552 at commit 66270c5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jose-torres
Copy link
Contributor Author

This is obsolete - we're changing the lifecycle of DataWriter.

@jose-torres jose-torres closed this Mar 7, 2018
ghost pushed a commit to dbtsai/spark that referenced this pull request Apr 3, 2018
## What changes were proposed in this pull request?

Migrate foreach sink to DataSourceV2.

Since the previous attempt at this PR apache#20552, we've changed and strictly defined the lifecycle of writer components. This means we no longer need the complicated lifecycle shim from that PR; it just naturally works.

## How was this patch tested?

existing tests

Author: Jose Torres <[email protected]>

Closes apache#20951 from jose-torres/foreach.
robert3005 pushed a commit to palantir/spark that referenced this pull request Apr 4, 2018
## What changes were proposed in this pull request?

Migrate foreach sink to DataSourceV2.

Since the previous attempt at this PR apache#20552, we've changed and strictly defined the lifecycle of writer components. This means we no longer need the complicated lifecycle shim from that PR; it just naturally works.

## How was this patch tested?

existing tests

Author: Jose Torres <[email protected]>

Closes apache#20951 from jose-torres/foreach.
mshtelma pushed a commit to mshtelma/spark that referenced this pull request Apr 5, 2018
## What changes were proposed in this pull request?

Migrate foreach sink to DataSourceV2.

Since the previous attempt at this PR apache#20552, we've changed and strictly defined the lifecycle of writer components. This means we no longer need the complicated lifecycle shim from that PR; it just naturally works.

## How was this patch tested?

existing tests

Author: Jose Torres <[email protected]>

Closes apache#20951 from jose-torres/foreach.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants