Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNAP-656 Delink RDD partitions from buckets #297

Merged
merged 25 commits into from
Sep 1, 2016
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ee33123
Combining multiple buckets in sys.SET_BUCKETS_FOR_LOCAL_EXECUTION
Jul 5, 2016
9949a3e
Merge branch 'master' of https://github.com/SnappyDataInc/snappydata …
Jul 5, 2016
7eafc96
Mapping of buckets to RDD partitions for unified cluster mode.
Jul 11, 2016
84d720a
Merge branch 'master' of https://github.com/SnappyDataInc/snappydata …
Jul 13, 2016
3a9f3a6
Using MultiBucketExecutorPartition in unified mode, fixing precheckin…
Jul 13, 2016
f303d7d
minor change
Jul 13, 2016
1c28ce5
Merge branch 'master' into SNAP-656
Aug 11, 2016
d732dfe
Northwind trader's changes
Aug 11, 2016
d186347
replace hardcoded path
Aug 11, 2016
d994a10
+ multi-bucket query execution for SparkShellRowRDD, ColumnarStorePar…
Aug 16, 2016
ca48486
Moved queries in separate object, made it configurable
Aug 17, 2016
231f48a
precheckin failuers and dunit tests for northwind schema
Aug 17, 2016
aa49696
Merge branch 'master' into SNAP-656
Aug 23, 2016
48939bf
Merge branch 'master' into SNAP-656
Aug 23, 2016
77e9787
Use numBuckets in ShuffleExchange and HashPartitioning to decide shu…
Aug 29, 2016
2c4457d
moved these classes in cluster
Aug 29, 2016
26d1c00
Merge branch 'master' into SNAP-656
Aug 29, 2016
a4272a4
Merge branch 'master' of https://github.com/SnappyDataInc/snappydata …
Aug 30, 2016
fb57a30
a minor change in import
Aug 30, 2016
533cab4
Merge branch 'master' into SNAP-656
Aug 30, 2016
7852b86
a minor change
Aug 31, 2016
efa3a5f
Keep numPartitions same as numBuckets in RowFormatScanRDD and Columna…
Aug 31, 2016
a58ea03
fixing precheckin failures
Aug 31, 2016
8d8c6a2
precheckin failuers
Sep 1, 2016
4923cbe
Merge branch 'master' into SNAP-656
Sep 1, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 80 additions & 18 deletions core/src/main/scala/io/snappydata/impl/SparkShellRDDHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ package io.snappydata.impl

import java.sql.{Connection, ResultSet, SQLException, Statement}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

import com.gemstone.gemfire.cache.Region
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember
import com.gemstone.gemfire.internal.SocketCreator
Expand All @@ -30,16 +26,20 @@ import com.pivotal.gemfirexd.internal.engine.Misc
import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils
import com.pivotal.gemfirexd.jdbc.ClientAttribute
import io.snappydata.Constant

import org.apache.spark.Partition
import org.apache.spark.sql.collection.ExecutorLocalShellPartition
import org.apache.spark.sql.collection.{ExecutorMultiBucketLocalShellPartition}
import org.apache.spark.sql.execution.ConnectionPool
import org.apache.spark.sql.execution.columnar.{ExternalStore, ExternalStoreUtils}
import org.apache.spark.sql.execution.columnar.ExternalStoreUtils
import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry
import org.apache.spark.sql.row.GemFireXDClientDialect
import org.apache.spark.sql.sources.ConnectionProperties
import org.apache.spark.sql.store.StoreUtils

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

final class SparkShellRDDHelper {

var useLocatorURL: Boolean = false
Expand All @@ -54,19 +54,24 @@ final class SparkShellRDDHelper {
def executeQuery(conn: Connection, tableName: String,
split: Partition, query: String): (Statement, ResultSet) = {
DriverRegistry.register(Constant.JDBC_CLIENT_DRIVER)
val resolvedName = ExternalStoreUtils.lookupName(tableName, conn.getSchema)
val par = split.index
val statement = conn.createStatement()
val resolvedName = StoreUtils.lookupName(tableName, conn.getSchema)

val partition = split.asInstanceOf[ExecutorMultiBucketLocalShellPartition]
var bucketString = ""
partition.buckets.foreach( bucket => {
bucketString = bucketString + bucket + ","
})
val buckets = bucketString.substring(0, bucketString.length-1)
val statement = conn.createStatement()
if (!useLocatorURL)
statement.execute(s"call sys.SET_BUCKETS_FOR_LOCAL_EXECUTION('$resolvedName', $par)")
statement.execute(s"call sys.SET_BUCKETS_FOR_LOCAL_EXECUTION('$resolvedName', '$buckets')")

val rs = statement.executeQuery(query)
(statement, rs)
}

def getConnection(connectionProperties: ConnectionProperties, split: Partition): Connection = {
val urlsOfNetServerHost = split.asInstanceOf[ExecutorLocalShellPartition].hostList
val urlsOfNetServerHost = split.asInstanceOf[ExecutorMultiBucketLocalShellPartition].hostList
useLocatorURL = SparkShellRDDHelper.useLocatorUrl(urlsOfNetServerHost)
createConnection(connectionProperties, urlsOfNetServerHost)
}
Expand Down Expand Up @@ -106,14 +111,71 @@ final class SparkShellRDDHelper {
object SparkShellRDDHelper {

def getPartitions(tableName: String, conn: Connection): Array[Partition] = {
val resolvedName = ExternalStoreUtils.lookupName(tableName, conn.getSchema)
val resolvedName = StoreUtils.lookupName(tableName, conn.getSchema)
val bucketToServerList = getBucketToServerMapping(resolvedName)
val numPartitions = bucketToServerList.length
val partitions = new Array[Partition](numPartitions)
for (p <- 0 until numPartitions) {
partitions(p) = new ExecutorLocalShellPartition(p, bucketToServerList(p))
val numBuckets = bucketToServerList.length
Misc.getRegionForTable(resolvedName, true).asInstanceOf[Region[_, _]] match {
case pr: PartitionedRegion =>
val serverToBuckets = new mutable.HashMap[InternalDistributedMember,
mutable.HashSet[Int]]()
var totalBuckets = new mutable.HashSet[Int]()
for (p <- 0 until numBuckets) {
val owner = pr.getBucketPrimary(p)
val bucketSet = {
if (serverToBuckets.contains(owner)) serverToBuckets.get(owner).get
else new mutable.HashSet[Int]()
}
bucketSet += p
totalBuckets += p
serverToBuckets put(owner, bucketSet)
}
val numCores = Runtime.getRuntime.availableProcessors()
val numServers = GemFireXDUtils.getGfxdAdvisor.adviseDataStores(null).size()
val numPartitions = numServers * numCores
val partitions = {
if (numBuckets < numPartitions) {
new Array[Partition](numBuckets)
} else {
new Array[Partition](numPartitions)
}
}
var partCnt = 0;
serverToBuckets foreach (e => {
var numCoresPending = numCores
var localBuckets = e._2
assert(!localBuckets.isEmpty)
var maxBucketsPerPart = Math.ceil(e._2.size.toFloat / numCores)
assert(maxBucketsPerPart >= 1)
while (partCnt <= numPartitions && !localBuckets.isEmpty) {
var cntr = 0;
val bucketsPerPart = new mutable.HashSet[Int]()
maxBucketsPerPart = Math.ceil(localBuckets.size.toFloat / numCoresPending)
assert(maxBucketsPerPart >= 1)
while (cntr < maxBucketsPerPart && !localBuckets.isEmpty) {
val buck = localBuckets.head
bucketsPerPart += buck
localBuckets = localBuckets - buck
totalBuckets = totalBuckets - buck
cntr += 1
}
partitions(partCnt) = new ExecutorMultiBucketLocalShellPartition(
partCnt, bucketsPerPart, bucketToServerList(bucketsPerPart.head))
partCnt += 1
numCoresPending -= 1
}
})
partitions
case pr: DistributedRegion =>
val numPartitions = bucketToServerList.length
val partitions = new Array[Partition](numPartitions)
for (p <- 0 until numPartitions) {
partitions(p) = new ExecutorMultiBucketLocalShellPartition(
p,
mutable.HashSet.empty,
bucketToServerList(p))
}
partitions
}
partitions
}

private def useLocatorUrl(hostList: ArrayBuffer[(String, String)]): Boolean =
Expand Down
21 changes: 14 additions & 7 deletions core/src/main/scala/org/apache/spark/sql/collection/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.collection
import java.io.ObjectOutputStream
import java.sql.DriverManager

import org.apache.spark.storage.BlockManagerId

import scala.collection.mutable
import scala.collection.{Map => SMap}
import scala.language.existentials
Expand Down Expand Up @@ -686,12 +688,16 @@ class ExecutorLocalPartition(override val index: Int,
override def toString = s"ExecutorLocalPartition($index, $blockId)"
}

class MultiExecutorLocalPartition(override val index: Int,
val blockIds: Seq[BlockManagerId]) extends Partition {
class MultiBucketExecutorPartition(override val index: Int,
val buckets: mutable.HashSet[Int],
val blockIds: Seq[BlockManagerId]) extends Partition {

def hostExecutorIds = blockIds.map(blockId => Utils.getHostExecutorId(blockId))
def hostExecutorIds : Seq[String] = {
val execs = blockIds.map(blockId => Utils.getHostExecutorId(blockId))
execs
}

override def toString = s"MultiExecutorLocalPartition($index, $blockIds)"
override def toString = s"MultiBucketExecutorPartition($index, $buckets, $blockIds)"
}


Expand Down Expand Up @@ -730,9 +736,10 @@ private[spark] class CoGroupExecutorLocalPartition(
override def hashCode(): Int = idx
}

class ExecutorLocalShellPartition(override val index: Int,
val hostList: mutable.ArrayBuffer[(String, String)]) extends Partition {
override def toString = s"ExecutorLocalShellPartition($index, $hostList"
class ExecutorMultiBucketLocalShellPartition(override val index: Int,
val buckets: mutable.HashSet[Int],
val hostList: mutable.ArrayBuffer[(String, String)]) extends Partition {
override def toString = s"ExecutorMultiBucketLocalShellPartition($index, $buckets, $hostList"
}

object ToolsCallbackInit extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ object ExternalStoreUtils {

final val DEFAULT_TABLE_BUCKETS = "113"
final val DEFAULT_SAMPLE_TABLE_BUCKETS = "53"
val numCores = Runtime.getRuntime.availableProcessors()
//final val DEFAULT_TABLE_BUCKETS_LOCAL_MODE = s"$numCores"
final val DEFAULT_TABLE_BUCKETS_LOCAL_MODE = "11"
final val DEFAULT_SAMPLE_TABLE_BUCKETS_LOCAL_MODE = "7"
final val INDEX_TYPE = "INDEX_TYPE"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.columnar.impl

import java.sql.Connection

import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils

import com.gemstone.gemfire.internal.cache.PartitionedRegion
import com.pivotal.gemfirexd.internal.engine.Misc

Expand Down Expand Up @@ -90,7 +92,15 @@ class BaseColumnFormatRelation(
@transient protected lazy val region = Misc.getRegionForTable(resolvedName,
true).asInstanceOf[PartitionedRegion]

override lazy val numPartitions: Int = region.getTotalNumberOfBuckets
override lazy val numPartitions: Int = {
// val region = Misc.getRegionForTable(resolvedName, true).
// asInstanceOf[PartitionedRegion]
// region.getTotalNumberOfBuckets
val numCores = Runtime.getRuntime.availableProcessors()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is evaluated at driver node. We need to consider sever nodes. Driver node num processors is not useful to us. Can you please see SchedulerBackend.defaultParallelism. That takes total cores for slaves into consideration.
Catch however is spark.default.parallelism gets priority and if somebody configures bad we will suffer.

val numServers = GemFireXDUtils.getGfxdAdvisor.adviseDataStores(null).size()
val numPartitions = numServers * numCores
numPartitions
}

override def partitionColumns: Seq[String] = {
partitioningColumns
Expand Down Expand Up @@ -134,6 +144,7 @@ class BaseColumnFormatRelation(
leftItr ++ rightItr
}
case _ =>

val rowRdd = new SparkShellRowRDD(
sqlContext.sparkContext,
executorConnector,
Expand All @@ -144,7 +155,6 @@ class BaseColumnFormatRelation(
connProperties,
filters
).asInstanceOf[RDD[Row]]

rowRdd.zipPartitions(colRdd) { (leftItr, rightItr) =>
leftItr ++ rightItr
}
Expand Down Expand Up @@ -339,6 +349,8 @@ class BaseColumnFormatRelation(
val sql = s"CREATE TABLE $tableName $schemaExtensions " + " DISABLE CONCURRENCY CHECKS "
logInfo(s"Applying DDL (url=${connProperties.url}; " +
s"props=${connProperties.connProps}): $sql")
println(s"Applying DDL (url=${connProperties.url}; " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove println

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed println

s"props=${connProperties.connProps}): $sql")
JdbcExtendedUtils.executeUpdate(sql, conn)
dialect match {
case d: JdbcExtendedDialect => d.initializeTable(tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,20 @@ package org.apache.spark.sql.execution.columnar.impl

import java.sql.{Connection, ResultSet, Statement}

import scala.reflect.ClassTag

import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember
import com.gemstone.gemfire.internal.cache.{AbstractRegion, PartitionedRegion}
import com.pivotal.gemfirexd.internal.engine.Misc
import io.snappydata.impl.SparkShellRDDHelper

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.collection._
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.row.RowFormatScanRDD
import org.apache.spark.sql.sources.{ConnectionProperties, Filter}
import org.apache.spark.sql.store.StoreUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}

import scala.reflect.ClassTag

/**
* Column Store implementation for GemFireXD.
*/
Expand Down Expand Up @@ -105,11 +102,15 @@ class ColumnarStorePartitionedRDD[T: ClassTag](@transient _sc: SparkContext,
conn => {
val resolvedName = ExternalStoreUtils.lookupName(tableName,
conn.getSchema)
val par = split.index
val ps1 = conn.prepareStatement(
"call sys.SET_BUCKETS_FOR_LOCAL_EXECUTION(?, ?)")
ps1.setString(1, resolvedName)
ps1.setInt(2, par)
val partition = split.asInstanceOf[MultiBucketExecutorPartition]
var bucketString = ""
partition.buckets.foreach( bucket => {
bucketString = bucketString + bucket + ","
})
ps1.setString(2, bucketString.substring(0, bucketString.length-1))
ps1.execute()

val ps = conn.prepareStatement("select " + requiredColumns.mkString(
Expand All @@ -122,7 +123,7 @@ class ColumnarStorePartitionedRDD[T: ClassTag](@transient _sc: SparkContext,
}

override def getPreferredLocations(split: Partition): Seq[String] = {
split.asInstanceOf[MultiExecutorLocalPartition].hostExecutorIds
split.asInstanceOf[MultiBucketExecutorPartition].hostExecutorIds
}

override protected def getPartitions: Array[Partition] = {
Expand Down Expand Up @@ -153,7 +154,7 @@ class SparkShellCachedBatchRDD[T: ClassTag](@transient _sc: SparkContext,
}

override def getPreferredLocations(split: Partition): Seq[String] = {
split.asInstanceOf[ExecutorLocalShellPartition]
split.asInstanceOf[ExecutorMultiBucketLocalShellPartition]
.hostList.map(_._1.asInstanceOf[String]).toSeq
}

Expand All @@ -179,20 +180,25 @@ class SparkShellRowRDD[T: ClassTag](@transient sc: SparkContext,
val helper = new SparkShellRDDHelper
val conn: Connection = helper.getConnection(
connProperties, thePart)
val resolvedName = StoreUtils.lookupName(tableName, conn.getSchema)

if (isPartitioned) {
// TODO: this will fail if no network server is available unless SNAP-365 is
// fixed with the approach of having an iterator that can fetch from remote
if(isPartitioned) {
val ps = conn.prepareStatement(
"call sys.SET_BUCKETS_FOR_LOCAL_EXECUTION(?, ?)")
try {
ps.setString(1, tableName)
ps.setInt(2, thePart.index)
ps.executeUpdate()
} finally {
ps.close()
}
ps.setString(1, resolvedName)
val partition = thePart.asInstanceOf[ExecutorMultiBucketLocalShellPartition]
var bucketString = ""
partition.buckets.foreach(bucket => {
bucketString = bucketString + bucket + ","
})
ps.setString(2, bucketString.substring(0, bucketString.length - 1))
ps.executeUpdate()
ps.close()
}
val sqlText = s"SELECT $columnList FROM $resolvedName$filterWhereClause"

val sqlText = s"SELECT $columnList FROM $tableName$filterWhereClause"
val args = filterWhereArgs
val stmt = conn.prepareStatement(sqlText)
if (args ne null) {
Expand All @@ -208,7 +214,7 @@ class SparkShellRowRDD[T: ClassTag](@transient sc: SparkContext,
}

override def getPreferredLocations(split: Partition): Seq[String] = {
split.asInstanceOf[ExecutorLocalShellPartition]
split.asInstanceOf[ExecutorMultiBucketLocalShellPartition]
.hostList.map(_._1.asInstanceOf[String]).toSeq
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.gemstone.gemfire.internal.cache.{LocalRegion, PartitionedRegion}
import com.pivotal.gemfirexd.internal.engine.Misc
import com.pivotal.gemfirexd.internal.engine.access.index.GfxdIndexManager
import com.pivotal.gemfirexd.internal.engine.ddl.resolver.GfxdPartitionByExpressionResolver
import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils

import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -138,11 +139,20 @@ class RowFormatRelation(
*/
override lazy val numPartitions: Int = {
region match {
case pr: PartitionedRegion => pr.getTotalNumberOfBuckets
case _ => 1
case pr: PartitionedRegion =>
getNumPartitions
case _ =>
1
}
}

def getNumPartitions : Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments above

val numCores = Runtime.getRuntime.availableProcessors()
val numServers = GemFireXDUtils.getGfxdAdvisor.adviseDataStores(null).size()
val numPartitions = numServers * numCores
numPartitions
}

override def partitionColumns: Seq[String] = {
region match {
case pr: PartitionedRegion =>
Expand Down
Loading