Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into expr_bin
Browse files Browse the repository at this point in the history
Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
  • Loading branch information
viirya committed Jun 15, 2015
2 parents dea9c12 + 4eb48ed commit 0cf20f2
Show file tree
Hide file tree
Showing 54 changed files with 958 additions and 837 deletions.
33 changes: 21 additions & 12 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -832,11 +832,7 @@ private[spark] object SparkSubmitUtils {
ivyConfName: String,
md: DefaultModuleDescriptor): Unit = {
// Add scala exclusion rule
val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
val scalaDependencyExcludeRule =
new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
scalaDependencyExcludeRule.addConfiguration(ivyConfName)
md.addExcludeRule(scalaDependencyExcludeRule)
md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName))

// We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and
// other spark-streaming utility components. Underscore is there to differentiate between
Expand All @@ -845,13 +841,8 @@ private[spark] object SparkSubmitUtils {
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")

components.foreach { comp =>
val sparkArtifacts =
new ArtifactId(new ModuleId("org.apache.spark", s"spark-$comp*"), "*", "*", "*")
val sparkDependencyExcludeRule =
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
sparkDependencyExcludeRule.addConfiguration(ivyConfName)

md.addExcludeRule(sparkDependencyExcludeRule)
md.addExcludeRule(createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings,
ivyConfName))
}
}

Expand All @@ -864,13 +855,15 @@ private[spark] object SparkSubmitUtils {
* @param coordinates Comma-delimited string of maven coordinates
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @param ivyPath The path to the local ivy repository
* @param exclusions Exclusions to apply when resolving transitive dependencies
* @return The comma-delimited path to the jars of the given maven artifacts including their
* transitive dependencies
*/
def resolveMavenCoordinates(
coordinates: String,
remoteRepos: Option[String],
ivyPath: Option[String],
exclusions: Seq[String] = Nil,
isTest: Boolean = false): String = {
if (coordinates == null || coordinates.trim.isEmpty) {
""
Expand Down Expand Up @@ -928,6 +921,10 @@ private[spark] object SparkSubmitUtils {
// add all supplied maven artifacts as dependencies
addDependenciesToIvy(md, artifacts, ivyConfName)

exclusions.foreach { e =>
md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName))
}

// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
if (rr.hasError) {
Expand All @@ -944,6 +941,18 @@ private[spark] object SparkSubmitUtils {
}
}
}

private def createExclusion(
coords: String,
ivySettings: IvySettings,
ivyConfName: String): ExcludeRule = {
val c = extractMavenCoordinates(coords)(0)
val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*")
val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null)
rule.addConfiguration(ivyConfName)
rule
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ private[spark] object UnsafeShuffleManager extends Logging {
} else if (dependency.aggregator.isDefined) {
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because an aggregator is defined")
false
} else if (dependency.keyOrdering.isDefined) {
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key ordering is defined")
false
} else if (dependency.partitioner.numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS) {
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has more than " +
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS partitions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
IvyTestUtils.withRepository(main, None, None) { repo =>
// end to end
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, Option(repo),
Option(tempIvyPath), true)
Option(tempIvyPath), isTest = true)
assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path")
}
}
Expand All @@ -115,29 +115,31 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val main = new MavenCoordinate("my.awesome.lib", "mylib", "0.1")
// Local M2 repository
IvyTestUtils.withRepository(main, None, Some(SparkSubmitUtils.m2Path)) { repo =>
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, true)
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None,
isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
}
// Local Ivy Repository
val settings = new IvySettings
val ivyLocal = new File(settings.getDefaultIvyUserDir, "local" + File.separator)
IvyTestUtils.withRepository(main, None, Some(ivyLocal), true) { repo =>
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, true)
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None,
isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
}
// Local ivy repository with modified home
val dummyIvyLocal = new File(tempIvyPath, "local" + File.separator)
IvyTestUtils.withRepository(main, None, Some(dummyIvyLocal), true) { repo =>
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None,
Some(tempIvyPath), true)
Some(tempIvyPath), isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path")
}
}

test("dependency not found throws RuntimeException") {
intercept[RuntimeException] {
SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, true)
SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, isTest = true)
}
}

Expand All @@ -149,12 +151,12 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") +
",org.apache.spark:spark-core_fake:1.2.0"

val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, true)
val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, isTest = true)
assert(path === "", "should return empty path")
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0")
IvyTestUtils.withRepository(main, None, None) { repo =>
val files = SparkSubmitUtils.resolveMavenCoordinates(coordinates + "," + main.toString,
Some(repo), None, true)
Some(repo), None, isTest = true)
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers {
mapSideCombine = false
)))

// Shuffles with key orderings are supported as long as no aggregator is specified
assert(canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
serializer = kryo,
keyOrdering = Some(mock(classOf[Ordering[Any]])),
aggregator = None,
mapSideCombine = false
)))

}

test("unsupported shuffle dependencies") {
Expand All @@ -100,22 +109,14 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers {
mapSideCombine = false
)))

// We do not support shuffles that perform any kind of aggregation or sorting of keys
assert(!canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
serializer = kryo,
keyOrdering = Some(mock(classOf[Ordering[Any]])),
aggregator = None,
mapSideCombine = false
)))
// We do not support shuffles that perform aggregation
assert(!canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
serializer = kryo,
keyOrdering = None,
aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
mapSideCombine = false
)))
// We do not support shuffles that perform any kind of aggregation or sorting of keys
assert(!canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
serializer = kryo,
Expand Down
4 changes: 2 additions & 2 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -995,11 +995,11 @@ List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>()
schemaPeople # The DataFrame from the previous example.

# DataFrames can be saved as Parquet files, maintaining the schema information.
schemaPeople.read.parquet("people.parquet")
schemaPeople.write.parquet("people.parquet")

# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = sqlContext.write.parquet("people.parquet")
parquetFile = sqlContext.read.parquet("people.parquet")

# Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
Expand Down
26 changes: 12 additions & 14 deletions docs/streaming-custom-receivers.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ title: Spark Streaming Custom Receivers
---

Spark Streaming can receive streaming data from any arbitrary data source beyond
the one's for which it has in-built support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.).
the ones for which it has built-in support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.).
This requires the developer to implement a *receiver* that is customized for receiving data from
the concerned data source. This guide walks through the process of implementing a custom receiver
and using it in a Spark Streaming application. Note that custom receivers can be implemented
Expand All @@ -21,15 +21,15 @@ A custom receiver must extend this abstract class by implementing two methods
- `onStop()`: Things to do to stop receiving data.

Both `onStart()` and `onStop()` must not block indefinitely. Typically, `onStart()` would start the threads
that responsible for receiving the data and `onStop()` would ensure that the receiving by those threads
that are responsible for receiving the data, and `onStop()` would ensure that these threads receiving the data
are stopped. The receiving threads can also use `isStopped()`, a `Receiver` method, to check whether they
should stop receiving data.

Once the data is received, that data can be stored inside Spark
by calling `store(data)`, which is a method provided by the Receiver class.
There are number of flavours of `store()` which allow you store the received data
record-at-a-time or as whole collection of objects / serialized bytes. Note that the flavour of
`store()` used to implemented a receiver affects its reliability and fault-tolerance semantics.
There are a number of flavors of `store()` which allow one to store the received data
record-at-a-time or as whole collection of objects / serialized bytes. Note that the flavor of
`store()` used to implement a receiver affects its reliability and fault-tolerance semantics.
This is discussed [later](#receiver-reliability) in more detail.

Any exception in the receiving threads should be caught and handled properly to avoid silent
Expand Down Expand Up @@ -60,7 +60,7 @@ class CustomReceiver(host: String, port: Int)

def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
// is designed to stop by itself if isStopped() returns false
}

/** Create a socket connection and receive data until receiver is stopped */
Expand Down Expand Up @@ -123,7 +123,7 @@ public class JavaCustomReceiver extends Receiver<String> {

public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
// is designed to stop by itself if isStopped() returns false
}

/** Create a socket connection and receive data until receiver is stopped */
Expand Down Expand Up @@ -167,7 +167,7 @@ public class JavaCustomReceiver extends Receiver<String> {

The custom receiver can be used in a Spark Streaming application by using
`streamingContext.receiverStream(<instance of custom receiver>)`. This will create
input DStream using data received by the instance of custom receiver, as shown below
an input DStream using data received by the instance of custom receiver, as shown below:

<div class="codetabs">
<div data-lang="scala" markdown="1" >
Expand Down Expand Up @@ -206,22 +206,20 @@ there are two kinds of receivers based on their reliability and fault-tolerance
and stored in Spark reliably (that is, replicated successfully). Usually,
implementing this receiver involves careful consideration of the semantics of source
acknowledgements.
1. *Unreliable Receiver* - These are receivers for unreliable sources that do not support
acknowledging. Even for reliable sources, one may implement an unreliable receiver that
do not go into the complexity of acknowledging correctly.
1. *Unreliable Receiver* - An *unreliable receiver* does *not* send acknowledgement to a source. This can be used for sources that do not support acknowledgement, or even for reliable sources when one does not want or need to go into the complexity of acknowledgement.

To implement a *reliable receiver*, you have to use `store(multiple-records)` to store data.
This flavour of `store` is a blocking call which returns only after all the given records have
This flavor of `store` is a blocking call which returns only after all the given records have
been stored inside Spark. If the receiver's configured storage level uses replication
(enabled by default), then this call returns after replication has completed.
Thus it ensures that the data is reliably stored, and the receiver can now acknowledge the
source appropriately. This ensures that no data is caused when the receiver fails in the middle
source appropriately. This ensures that no data is lost when the receiver fails in the middle
of replicating data -- the buffered data will not be acknowledged and hence will be later resent
by the source.

An *unreliable receiver* does not have to implement any of this logic. It can simply receive
records from the source and insert them one-at-a-time using `store(single-record)`. While it does
not get the reliability guarantees of `store(multiple-records)`, it has the following advantages.
not get the reliability guarantees of `store(multiple-records)`, it has the following advantages:

- The system takes care of chunking that data into appropriate sized blocks (look for block
interval in the [Spark Streaming Programming Guide](streaming-programming-guide.html)).
Expand Down
Loading

0 comments on commit 0cf20f2

Please sign in to comment.