-
Notifications
You must be signed in to change notification settings - Fork 203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNAP-656 Delink RDD partitions from buckets #297
Changes from 10 commits
ee33123
9949a3e
7eafc96
84d720a
3a9f3a6
f303d7d
1c28ce5
d732dfe
d186347
d994a10
ca48486
231f48a
aa49696
48939bf
77e9787
2c4457d
26d1c00
a4272a4
fb57a30
533cab4
7852b86
efa3a5f
a58ea03
8d8c6a2
4923cbe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.columnar.impl | |
|
||
import java.sql.Connection | ||
|
||
import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils | ||
|
||
import com.gemstone.gemfire.internal.cache.PartitionedRegion | ||
import com.pivotal.gemfirexd.internal.engine.Misc | ||
|
||
|
@@ -90,7 +92,15 @@ class BaseColumnFormatRelation( | |
@transient protected lazy val region = Misc.getRegionForTable(resolvedName, | ||
true).asInstanceOf[PartitionedRegion] | ||
|
||
override lazy val numPartitions: Int = region.getTotalNumberOfBuckets | ||
override lazy val numPartitions: Int = { | ||
// val region = Misc.getRegionForTable(resolvedName, true). | ||
// asInstanceOf[PartitionedRegion] | ||
// region.getTotalNumberOfBuckets | ||
val numCores = Runtime.getRuntime.availableProcessors() | ||
val numServers = GemFireXDUtils.getGfxdAdvisor.adviseDataStores(null).size() | ||
val numPartitions = numServers * numCores | ||
numPartitions | ||
} | ||
|
||
override def partitionColumns: Seq[String] = { | ||
partitioningColumns | ||
|
@@ -134,6 +144,7 @@ class BaseColumnFormatRelation( | |
leftItr ++ rightItr | ||
} | ||
case _ => | ||
|
||
val rowRdd = new SparkShellRowRDD( | ||
sqlContext.sparkContext, | ||
executorConnector, | ||
|
@@ -144,7 +155,6 @@ class BaseColumnFormatRelation( | |
connProperties, | ||
filters | ||
).asInstanceOf[RDD[Row]] | ||
|
||
rowRdd.zipPartitions(colRdd) { (leftItr, rightItr) => | ||
leftItr ++ rightItr | ||
} | ||
|
@@ -339,6 +349,8 @@ class BaseColumnFormatRelation( | |
val sql = s"CREATE TABLE $tableName $schemaExtensions " + " DISABLE CONCURRENCY CHECKS " | ||
logInfo(s"Applying DDL (url=${connProperties.url}; " + | ||
s"props=${connProperties.connProps}): $sql") | ||
println(s"Applying DDL (url=${connProperties.url}; " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove println There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed println |
||
s"props=${connProperties.connProps}): $sql") | ||
JdbcExtendedUtils.executeUpdate(sql, conn) | ||
dialect match { | ||
case d: JdbcExtendedDialect => d.initializeTable(tableName, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ import com.gemstone.gemfire.internal.cache.{LocalRegion, PartitionedRegion} | |
import com.pivotal.gemfirexd.internal.engine.Misc | ||
import com.pivotal.gemfirexd.internal.engine.access.index.GfxdIndexManager | ||
import com.pivotal.gemfirexd.internal.engine.ddl.resolver.GfxdPartitionByExpressionResolver | ||
import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils | ||
|
||
import org.apache.spark.Partition | ||
import org.apache.spark.rdd.RDD | ||
|
@@ -138,11 +139,20 @@ class RowFormatRelation( | |
*/ | ||
override lazy val numPartitions: Int = { | ||
region match { | ||
case pr: PartitionedRegion => pr.getTotalNumberOfBuckets | ||
case _ => 1 | ||
case pr: PartitionedRegion => | ||
getNumPartitions | ||
case _ => | ||
1 | ||
} | ||
} | ||
|
||
def getNumPartitions : Int = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See comments above |
||
val numCores = Runtime.getRuntime.availableProcessors() | ||
val numServers = GemFireXDUtils.getGfxdAdvisor.adviseDataStores(null).size() | ||
val numPartitions = numServers * numCores | ||
numPartitions | ||
} | ||
|
||
override def partitionColumns: Seq[String] = { | ||
region match { | ||
case pr: PartitionedRegion => | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is evaluated at driver node. We need to consider sever nodes. Driver node num processors is not useful to us. Can you please see SchedulerBackend.defaultParallelism. That takes total cores for slaves into consideration.
Catch however is spark.default.parallelism gets priority and if somebody configures bad we will suffer.