Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REL-368 #25

Merged
merged 45 commits into from
Nov 17, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
91b6a1d
Merge branch 'branch-1.1' of github.com:apache/spark into csd-1.1
markhamstra Nov 4, 2014
306464d
Merge branch 'branch-1.1' of github.com:apache/spark into csd-1.1
markhamstra Nov 7, 2014
53d19dd
Merge branch 'csd-1.1' of github.com:markhamstra/spark into csd-1.1
markhamstra Nov 10, 2014
5ff1129
Merge branch 'csd-1.1' of github.com:clearstorydata/spark into csd-1.1
markhamstra Nov 10, 2014
dc38def
[SPARK-4169] [Core] Accommodate non-English Locales in unit tests
Nov 10, 2014
cdcf546
SPARK-2548 [STREAMING] JavaRecoverableWordCount is missing
srowen Nov 10, 2014
254b135
Update RecoverableNetworkWordCount.scala
comcmipi Nov 10, 2014
86b1bd0
[SPARK-2548][HOTFIX][Streaming] Removed use of o.a.s.streaming.Durati…
tdas Nov 10, 2014
b3ef06b
[SPARK-4308][SQL] Follow up of #3175 for branch 1.1
liancheng Nov 11, 2014
64945f8
[SPARK-3971][SQL] Backport #2843 to branch-1.1
liancheng Nov 11, 2014
3d889df
[SPARK-3954][Streaming] Optimization to FileInputDStream
surongquan Nov 11, 2014
be0cc99
[SPARK-3495][SPARK-3496] Backporting block replication fixes made in …
tdas Nov 11, 2014
01d233e
Update versions for 1.1.1 release
Nov 11, 2014
8a1d818
[SQL] Backport backtick and smallint JDBC fixes to 1.1
ravipesala Nov 11, 2014
d313be8
[SPARK-4330][Doc] Link to proper URL for YARN overview
sarutak Nov 11, 2014
11798d0
[BRANCH-1.1][SPARK-2652] change the default spark.serializer in pyspa…
mengxr Nov 11, 2014
b2cb357
[branch-1.1][SPARK-3990] add a note on ALS usage
mengxr Nov 11, 2014
bf867c3
[SPARK-4295][External]Fix exception in SparkSinkSuite
Nov 11, 2014
4a37550
Merge branch 'branch-1.1' of github.com:apache/spark into csd-1.1
markhamstra Nov 11, 2014
b07f253
Version to 1.1.1-candidate-csd-1-SNAPSHOT
markhamstra Nov 11, 2014
131c626
Update CHANGES.txt
andrewor14 Nov 11, 2014
f3e62ff
[maven-release-plugin] prepare release v1.1.1-rc1
andrewor14 Nov 11, 2014
5c0032a
[maven-release-plugin] prepare for next development iteration
andrewor14 Nov 11, 2014
45a01b6
Revert "SPARK-3039: Allow spark to be built using avro-mapred for had…
Nov 12, 2014
9d13735
Revert "[maven-release-plugin] prepare for next development iteration"
Nov 12, 2014
4ac5679
Revert "[maven-release-plugin] prepare release v1.1.1-rc1"
Nov 12, 2014
837deab
[maven-release-plugin] prepare release v1.1.1-rc1
andrewor14 Nov 12, 2014
86c285c
Revert "[maven-release-plugin] prepare release v1.1.1-rc1"
Nov 12, 2014
e3a5ee9
[Release] Log build output for each distribution
Nov 12, 2014
7029301
[maven-release-plugin] prepare release v1.1.1-rc1
andrewor14 Nov 12, 2014
db22a9e
[maven-release-plugin] prepare for next development iteration
andrewor14 Nov 12, 2014
d3b808f
Revert "[maven-release-plugin] prepare for next development iteration"
Nov 12, 2014
8fe1c8c
Revert "[maven-release-plugin] prepare release v1.1.1-rc1"
Nov 12, 2014
3f9e073
[maven-release-plugin] prepare release v1.1.1-rc1
andrewor14 Nov 12, 2014
6de8881
[maven-release-plugin] prepare for next development iteration
andrewor14 Nov 12, 2014
88bc482
[Release] Bring audit scripts up-to-date
andrewor14 Nov 13, 2014
ba6d81d
[Release] Correct make-distribution.sh log path
Nov 12, 2014
6f34fa0
Revert "[maven-release-plugin] prepare for next development iteration"
Nov 13, 2014
6f7b1bc
Revert "[maven-release-plugin] prepare release v1.1.1-rc1"
Nov 13, 2014
72a4fdb
[maven-release-plugin] prepare release v1.1.1-rc1
andrewor14 Nov 13, 2014
685bdd2
[maven-release-plugin] prepare for next development iteration
andrewor14 Nov 13, 2014
269569b
Merge branch 'branch-1.1' of github.com:apache/spark into csd-1.1
markhamstra Nov 13, 2014
4b1c77c
[branch-1.1][SPARK-4355] OnlineSummarizer doesn't merge mean correctly
mengxr Nov 13, 2014
1c0d789
Added datanucleus jars to deb package
markhamstra Nov 16, 2014
b3bea87
Merge branch 'branch-1.1' of github.com:apache/spark into csd-1.1
markhamstra Nov 17, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
678 changes: 678 additions & 0 deletions CHANGES.txt

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.1.0-csd-4-SNAPSHOT</version>
<version>1.1.1-candidate-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -267,6 +267,16 @@
<prefix>${deb.install.path}/lib</prefix>
</mapper>
</data>
<data>
<src>${basedir}/../lib_managed/jars</src>
<type>directory</type>
<mapper>
<type>perm</type>
<user>${deb.user}</user>
<group>${deb.user}</group>
<prefix>${deb.install.path}/lib</prefix>
</mapper>
</data>
<data>
<src>${basedir}/src/deb/RELEASE</src>
<type>file</type>
Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.1.0-csd-4-SNAPSHOT</version>
<version>1.1.1-candidate-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.1.0-csd-4-SNAPSHOT</version>
<version>1.1.1-candidate-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
object SparkContext extends Logging {

private[spark] val SPARK_VERSION = "1.1.0"
private[spark] val SPARK_VERSION = "1.1.1"

private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"

Expand Down
120 changes: 104 additions & 16 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ private[spark] class BlockManager(
private val broadcastCleaner = new MetadataCleaner(
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)

// Field related to peer block managers that are necessary for block replication
@volatile private var cachedPeers: Seq[BlockManagerId] = _
private val peerFetchLock = new Object
private var lastPeerFetchTime = 0L

initialize()

/* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
Expand Down Expand Up @@ -822,28 +827,111 @@ private[spark] class BlockManager(
}

/**
* Replicate block to another node.
* Get peer block managers in the system.
*/
private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
peerFetchLock.synchronized {
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
if (cachedPeers == null || forceFetch || timeout) {
cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
lastPeerFetchTime = System.currentTimeMillis
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
}
cachedPeers
}
}

/**
* Replicate block to another node. Not that this is a blocking call that returns after
* the block has been replicated.
*/
@volatile var cachedPeers: Seq[BlockManagerId] = null
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val numPeersToReplicateTo = level.replication - 1
val peersForReplication = new ArrayBuffer[BlockManagerId]
val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
if (cachedPeers == null) {
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
val startTime = System.currentTimeMillis
val random = new Random(blockId.hashCode)

var replicationFailed = false
var failures = 0
var done = false

// Get cached list of peers
peersForReplication ++= getPeers(forceFetch = false)

// Get a random peer. Note that this selection of a peer is deterministic on the block id.
// So assuming the list of peers does not change and no replication failures,
// if there are multiple attempts in the same node to replicate the same block,
// the same set of peers will be selected.
def getRandomPeer(): Option[BlockManagerId] = {
// If replication had failed, then force update the cached list of peers and remove the peers
// that have been already used
if (replicationFailed) {
peersForReplication.clear()
peersForReplication ++= getPeers(forceFetch = true)
peersForReplication --= peersReplicatedTo
peersForReplication --= peersFailedToReplicateTo
}
if (!peersForReplication.isEmpty) {
Some(peersForReplication(random.nextInt(peersForReplication.size)))
} else {
None
}
}
for (peer: BlockManagerId <- cachedPeers) {
val start = System.nanoTime
data.rewind()
logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " +
s"To node: $peer")
val putBlock = PutBlock(blockId, data, tLevel)
val cmId = new ConnectionManagerId(peer.host, peer.port)
val syncPutBlockSuccess = BlockManagerWorker.syncPutBlock(putBlock, cmId)
if (!syncPutBlockSuccess) {
logError(s"Failed to call syncPutBlock to $peer")

// One by one choose a random peer and try uploading the block to it
// If replication fails (e.g., target peer is down), force the list of cached peers
// to be re-fetched from driver and then pick another random peer for replication. Also
// temporarily black list the peer for which replication failed.
//
// This selection of a peer and replication is continued in a loop until one of the
// following 3 conditions is fulfilled:
// (i) specified number of peers have been replicated to
// (ii) too many failures in replicating to peers
// (iii) no peer left to replicate to
//
while (!done) {
getRandomPeer() match {
case Some(peer) =>
val onePeerStartTime = System.currentTimeMillis
data.rewind()
logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
val putBlock = PutBlock(blockId, data, tLevel)
val cmId = new ConnectionManagerId(peer.host, peer.port)
val syncPutBlockSuccess = BlockManagerWorker.syncPutBlock(putBlock, cmId)
if (syncPutBlockSuccess) {
logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %d ms"
.format((System.currentTimeMillis - onePeerStartTime)))
peersReplicatedTo += peer
peersForReplication -= peer
replicationFailed = false
if (peersReplicatedTo.size == numPeersToReplicateTo) {
done = true // specified number of peers have been replicated to
}
} else {
logWarning(s"Failed to replicate $blockId to $peer, failure #$failures")
failures += 1
replicationFailed = true
peersFailedToReplicateTo += peer
if (failures > maxReplicationFailures) { // too many failures in replicating to peers
done = true
}
}
case None => // no peer left to replicate to
done = true
}
logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes."
.format(blockId, (System.nanoTime - start) / 1e6, data.limit()))
}
val timeTakeMs = (System.currentTimeMillis - startTime)
logTrace(s"Replicating $blockId of ${data.limit()} bytes to " +
s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ class BlockManagerId private (

def nettyPort: Int = nettyPort_

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
def isDriver: Boolean = (executorId == "<driver>")

override def writeExternal(out: ObjectOutput) {
out.writeUTF(executorId_)
out.writeUTF(host_)
out.writeInt(port_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
}

/** Get ids of other nodes in the cluster from the driver */
def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
if (result.length != numPeers) {
throw new SparkException(
"Error getting peers, only got " + result.size + " instead of " + numPeers)
}
result
def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case GetLocationsMultipleBlockIds(blockIds) =>
sender ! getLocationsMultipleBlockIds(blockIds)

case GetPeers(blockManagerId, size) =>
sender ! getPeers(blockManagerId, size)
case GetPeers(blockManagerId) =>
sender ! getPeers(blockManagerId)

case GetMemoryStatus =>
sender ! memoryStatus
Expand Down Expand Up @@ -173,11 +173,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
* from the executors, but not from the driver.
*/
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
// TODO: Consolidate usages of <driver>
import context.dispatcher
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
removeFromDriver || info.blockManagerId.executorId != "<driver>"
removeFromDriver || !info.blockManagerId.isDriver
}
Future.sequence(
requiredBlockManagers.map { bm =>
Expand Down Expand Up @@ -213,7 +212,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
val minSeenTime = now - slaveTimeout
val toRemove = new mutable.HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime && info.blockManagerId.executorId != "<driver>") {
if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
+ (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
toRemove += info.blockManagerId
Expand All @@ -233,7 +232,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
*/
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
blockManagerId.executorId == "<driver>" && !isLocal
blockManagerId.isDriver && !isLocal
} else {
blockManagerInfo(blockManagerId).updateLastSeenMs()
true
Expand Down Expand Up @@ -355,7 +354,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
tachyonSize: Long) {

if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.executorId == "<driver>" && !isLocal) {
if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
sender ! true
Expand Down Expand Up @@ -403,16 +402,14 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
blockIds.map(blockId => getLocations(blockId))
}

private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = {
val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray

val selfIndex = peers.indexOf(blockManagerId)
if (selfIndex == -1) {
throw new SparkException("Self index for " + blockManagerId + " not found")
/** Get the list of the peers of the given block manager */
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = blockManagerInfo.keySet
if (blockManagerIds.contains(blockManagerId)) {
blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
} else {
Seq.empty
}

// Note that this logic will select the same node multiple times if there aren't enough peers
Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[spark] object BlockManagerMessages {

case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster

case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

case class RemoveExecutor(execId: String) extends ToBlockManagerMaster

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1528,7 +1528,7 @@ private[spark] object Utils extends Logging {
def isBindCollision(exception: Throwable): Boolean = {
exception match {
case e: BindException =>
if (e.getMessage != null && e.getMessage.contains("Address already in use")) {
if (e.getMessage != null) {
return true
}
isBindCollision(e.getCause)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
assert(statuses.size === 1)
statuses.head match { case (bm, status) =>
assert(bm.executorId === "<driver>", "Block should only be on the driver")
assert(bm.isDriver, "Block should only be on the driver")
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
assert(status.memSize > 0, "Block should be in memory store on the driver")
assert(status.diskSize === 0, "Block should not be in disk store on the driver")
Expand Down
Loading