-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22673][SQL] InMemoryRelation should utilize existing stats whenever possible #19864
Conversation
Test build #84382 has finished for PR 19864 at commit
|
Test build #84384 has finished for PR 19864 at commit
|
// available, return the default statistics. | ||
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) | ||
children.filter(_.isInstanceOf[LogicalRelation]) match { | ||
case Seq(c @ LogicalRelation(_, _, _, _), _) if c.conf.cboEnabled => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InMemoryRelation
is a logical.LeafNode
. I think it has no children?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, @viirya , yes, you're right! I misread the generated plan, working on it
tableName) | ||
if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { | ||
inMemoryRelation.setStatsFromCachedPlan(planToCache) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to make InMemoryRelation stateful to avoid breaking APIs.....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a more ideal change is to put the original plan stats into the constructor of InMemoryRelation
, instead of making it mutable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like I have no way to access InMemoryRelation from outside of spark package, though it is not a package private class...how is that achieved?
If this is the case, I can modify the constructor
Thanks @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InMemoryRelation
is not part of the public API and should be treated as unstable/internal. You can use it at your own risk. Changing the constructor is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Test build #84390 has finished for PR 19864 at commit
|
Test build #84392 has finished for PR 19864 at commit
|
Is this initial statistics important? After the columnar RDD is
materialized, we will get accurate statistics then. Don't we?
…On Dec 3, 2017 1:43 AM, "Nan Zhu" ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In sql/core/src/main/scala/org/apache/spark/sql/execution/
CacheManager.scala
<#19864 (comment)>:
> - planToCache,
- InMemoryRelation(
- sparkSession.sessionState.conf.useCompression,
- sparkSession.sessionState.conf.columnBatchSize,
- storageLevel,
- sparkSession.sessionState.executePlan(planToCache).executedPlan,
- tableName)))
+ val inMemoryRelation = InMemoryRelation(
+ sparkSession.sessionState.conf.useCompression,
+ sparkSession.sessionState.conf.columnBatchSize,
+ storageLevel,
+ sparkSession.sessionState.executePlan(planToCache).executedPlan,
+ tableName)
+ if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) {
+ inMemoryRelation.setStatsFromCachedPlan(planToCache)
+ }
I have to make InMemoryRelation stateful to avoid breaking APIs.....
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#19864 (review)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAEM96llEjZsyqac_xi9Nkks_2idfmgEks5s8YxWgaJpZM4QzBjk>
.
|
@viirya yes, we can get more accurate stats later, however, the first stats is also important as it enables the user to pay less for The current implementation always chooses the most expensive plan in the first run, e.g. always resort to sortmergejoin instead of broadcastjoin even it is possible, CBO is actually disabled for any operator which locates in downstream of InMemoryRelation. Additionally, it makes execution plan inconsistent even for the same query over the same dataset. Of course, all of these issues happen in the first run. IMHO, we have a chance to make it better, why not? |
@viirya any thoughts? |
storageLevel, | ||
sparkSession.sessionState.executePlan(planToCache).executedPlan, | ||
tableName) | ||
if (planToCache.conf.cboEnabled && planToCache.stats.rowCount.isDefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to limit to those conditions? I think we can pass the stats into the created InMemoryRelation
even the two conditions don't match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the reason I put it here is that when we did not enable CBO, the stats in the underlying plan might be much smaller than the actual size in memory leading to the potential risk of OOM error.
The underlying cause is that without CBO enabled, the size of the plan is calculated with BaseRelation's sizeInBytes, but with CBO, we can have a more accurate estimation,
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
Lines 42 to 46 in 03fdc92
override def computeStats(): Statistics = { | |
catalogTable | |
.flatMap(_.stats.map(_.toPlanStats(output, conf.cboEnabled))) | |
.getOrElse(Statistics(sizeInBytes = relation.sizeInBytes)) | |
} |
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
Lines 370 to 381 in 03fdc92
def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = { | |
if (cboEnabled && rowCount.isDefined) { | |
val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _))) | |
// Estimate size as number of rows * row size. | |
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats) | |
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats) | |
} else { | |
// When CBO is disabled or the table doesn't have other statistics, we apply the size-only | |
// estimation strategy and only propagate sizeInBytes in statistics. | |
Statistics(sizeInBytes = sizeInBytes) | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When CBO is disabled, don't we just set the sizeInBytes
to defaultSizeInBytes
? Is it different than current statistics of first run?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, if CBO is disabled, the relation's sizeInBytes is the file size
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
Line 85 in 5c3a1f3
override def sizeInBytes: Long = location.sizeInBytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LogicalRelation
uses the statistics from the relation
only when there is no given catalogTable
. In this case, it doesn't consider if CBO is enabled or not.
Only catalogTable
considers CBO when computing its statistics in toPlanStats
. It doesn't refer to relation's statistics, IIUC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a catalog table doesn't have statistics in its metadata, we will fill it with defaultSizeInBytes
.
spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
Lines 121 to 134 in 326f1d6
val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { | |
try { | |
val hadoopConf = session.sessionState.newHadoopConf() | |
val tablePath = new Path(table.location) | |
val fs: FileSystem = tablePath.getFileSystem(hadoopConf) | |
fs.getContentSummary(tablePath).getLength | |
} catch { | |
case e: IOException => | |
logWarning("Failed to get table size from hdfs.", e) | |
session.sessionState.conf.defaultSizeInBytes | |
} | |
} else { | |
session.sessionState.conf.defaultSizeInBytes | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya you're right! Thanks for clearing the confusion
however, to prevent using relation's stats which can be much smaller than the in-memory size and lead to a potential OOM error, we should still have this condition here (we can remove cboEnabled though), right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The statistics from relation is based on files size, will it easily cause OOM issue? I think in the cases other than cached query, we still use this relation's statistics. If this is an issue, doesn't it also affect the other cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's true, it affects I believe....there is a similar discussion in #19743
import org.apache.spark.storage.StorageLevel | ||
import org.apache.spark.util.LongAccumulator | ||
|
||
|
||
object InMemoryRelation { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary change.
import org.apache.spark.sql.execution.SparkPlan | ||
import org.apache.spark.sql.execution.datasources.LogicalRelation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused import.
Test build #84494 has finished for PR 19864 at commit
|
thanks @viirya @cloud-fan and @hvanhovell, just addressed the comments and answered the question |
Test build #84496 has finished for PR 19864 at commit
|
Test build #84499 has finished for PR 19864 at commit
|
|
||
test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { | ||
withSQLConf("spark.sql.cbo.enabled" -> "true") { | ||
val workDir = s"${Utils.createTempDir()}/table1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use withTempDir
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
assert(inMemoryRelation.computeStats().sizeInBytes === 16) | ||
|
||
// test of catalog table | ||
val dfFromTable = spark.catalog.createTable("table1", workDir).cache() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: wrap with withTable
, which will clean up the table automatically at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
// InMemoryRelation's stats should be updated after calculating stats of the table | ||
spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") | ||
assert(inMemoryRelation2.computeStats().sizeInBytes === 16) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happened here? InMemoryRelation.statsOfPlanToCache
gets updated aotumatically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was a mistake here, AnalyzeTableCommand would actually force the table to be evaluated with count(), and it actually hits the longAccumulator's value,
fixed in the latest commit
Test build #85030 has finished for PR 19864 at commit
|
retest this please |
Let's hold retesting on. Seems globally failing. |
Test build #85035 has finished for PR 19864 at commit
|
retest this please |
withSQLConf("spark.sql.cbo.enabled" -> "true") { | ||
withTempDir { workDir => | ||
withTable("table1") { | ||
val workDirPath = workDir.getAbsolutePath + "/table1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems you can use withTempPath
, which just gives you a path string without creating it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then we don't need to append /table1
to the path
LGTM except one comment |
Test build #85051 has finished for PR 19864 at commit
|
Test build #85067 has finished for PR 19864 at commit
|
spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") | ||
val inMemoryRelation3 = spark.read.table("table1").cache().queryExecution.optimizedPlan. | ||
collect { case plan: InMemoryRelation => plan }.head | ||
assert(inMemoryRelation3.computeStats().sizeInBytes === 48) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missed this one, why does it have a different stats than the table cache stats 16
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because 16 is the exact
in-memory size which is got by reading the accumulator's value after evaluating the RDD
48 is calculated by EstimationUtils:
Lines 65 to 88 in bdb5e55
def getOutputSize( | |
attributes: Seq[Attribute], | |
outputRowCount: BigInt, | |
attrStats: AttributeMap[ColumnStat] = AttributeMap(Nil)): BigInt = { | |
// We assign a generic overhead for a Row object, the actual overhead is different for different | |
// Row format. | |
val sizePerRow = 8 + attributes.map { attr => | |
if (attrStats.contains(attr)) { | |
attr.dataType match { | |
case StringType => | |
// UTF8String: base + offset + numBytes | |
attrStats(attr).avgLen + 8 + 4 | |
case _ => | |
attrStats(attr).avgLen | |
} | |
} else { | |
attr.dataType.defaultSize | |
} | |
}.sum | |
// Output size can't be zero, or sizeInBytes of BinaryNode will also be zero | |
// (simple computation of statistics returns product of children). | |
if (outputRowCount > 0) outputRowCount * sizePerRow else 1 | |
} |
(8 + 4 (sum of average attribute length)) * 4
thanks, merging to master! |
thanks |
…tion's size ## What changes were proposed in this pull request? as per discussion in apache#19864 (comment) the current HadoopFsRelation is purely based on the underlying file size which is not accurate and makes the execution vulnerable to errors like OOM Users can enable CBO with the functionalities in apache#19864 to avoid this issue This JIRA proposes to add a configurable factor to sizeInBytes method in HadoopFsRelation class so that users can mitigate this problem without CBO ## How was this patch tested? Existing tests Author: CodingCat <[email protected]> Author: Nan Zhu <[email protected]> Closes apache#20072 from CodingCat/SPARK-22790.
…tion's size ## What changes were proposed in this pull request? as per discussion in #19864 (comment) the current HadoopFsRelation is purely based on the underlying file size which is not accurate and makes the execution vulnerable to errors like OOM Users can enable CBO with the functionalities in #19864 to avoid this issue This JIRA proposes to add a configurable factor to sizeInBytes method in HadoopFsRelation class so that users can mitigate this problem without CBO ## How was this patch tested? Existing tests Author: CodingCat <[email protected]> Author: Nan Zhu <[email protected]> Closes #20072 from CodingCat/SPARK-22790. (cherry picked from commit ba891ec) Signed-off-by: gatorsmile <[email protected]>
@@ -60,7 +62,8 @@ case class InMemoryRelation( | |||
@transient child: SparkPlan, | |||
tableName: Option[String])( | |||
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null, | |||
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) | |||
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, | |||
statsOfPlanToCache: Statistics = null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we set this to null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eh...we do not have other options, it's more like a placeholder, since InMemoryRelation is created by CacheManager through apply() in companion object it's no harm here IMHO
## What changes were proposed in this pull request? This is a regression introduced by apache#19864 When we lookup cache, we should not carry the hint info, as this cache entry might be added by a plan having hint info, while the input plan for this lookup may not have hint info, or have different hint info. ## How was this patch tested? a new test. Author: Wenchen Fan <[email protected]> Closes apache#20394 from cloud-fan/cache.
## What changes were proposed in this pull request? This is a regression introduced by #19864 When we lookup cache, we should not carry the hint info, as this cache entry might be added by a plan having hint info, while the input plan for this lookup may not have hint info, or have different hint info. ## How was this patch tested? a new test. Author: Wenchen Fan <[email protected]> Closes #20394 from cloud-fan/cache. (cherry picked from commit 5b5447c) Signed-off-by: gatorsmile <[email protected]>
What changes were proposed in this pull request?
The current implementation of InMemoryRelation always uses the most expensive execution plan when writing cache
With CBO enabled, we can actually have a more exact estimation of the underlying table size...
How was this patch tested?
existing test