Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNAP-656 Delink RDD partitions from buckets #4

Merged
merged 3 commits into from
Sep 1, 2016
Merged

Conversation

ymahajan
Copy link

@ymahajan ymahajan commented Aug 29, 2016

What changes were proposed in this pull request?

Use child's numBuckets to decide partitions of shuffled relation in ShuffleExchange
Pass numBuckets to HashPartitioning and HashPartitioner

How was this patch tested?

Other PRs :
Store - TIBCOSoftware/snappy-store#85
Spark - #4
SnppyData - TIBCOSoftware/snappydata#297

…huffleExchange

Pass numBuckets to HashPartitioning and HashPartitioner
@rishitesh
Copy link

Can we try out one thing. Leave everything as is @ catalyst layer.
a) The determination to shuffle or not to shuffle etc can be left alone. I think outputPartitioning is only used to determine shuffle. It should not have a bearing on actual scan. If we leave it as it is with numBuckets my best guess is we should be fine. Do you see any issues with that ?

b) Once catalyst determines what to to do we can put the num cpu logic at only RDD scan layer i.e. each relation's buildScan, which we already are doing.

@sumwale
Copy link

sumwale commented Aug 29, 2016

@rishitesh the issue is that we want to do shuffle even when partitions are equal and buckets are different. Spark layer will say no shuffle which is incorrect.

@ymahajan What we discussed about implementing new Partitioning did not work, either by marking Partitioning unsealed in snappy-spark or extending HashPartitioning?

@rishitesh
Copy link

@sumwale Correct. That's why I mentioned leave OrderlessHashPartitioning as that of today i.e. work on numBuckets. So if two tables have different num buckets then it will cause a shuffle. But if they are equal catalyst will say no to shuffle . Then RDD layer will take care of the scan and cpu logic will kick in.

@ymahajan
Copy link
Author

@sumwale. Yes, I tried both your approaches of new partitioning or extending HashPartitioning. we had to do changes at the catalyst layer (In EnsureRequirements, ShuffleExchange) to get this working even for the embedded mode. And for split mode, all these changes were not possible so i had to disable this optimization for split mode.

@rishitesh. Wouldn't this optimization beneficial for both scans and joins as well? outputPartitioning also decides how many target partitions to be created. If that is same as numPartitions(in other cases), it simplifies. numBuckets will be a good input for catalyst in general also. Do you see any side effects of these catalyst related changes?

@sumwale
Copy link

sumwale commented Aug 30, 2016

@rishitesh Just using OrderlessHashPartitioning will not work. Partitioning will match due to buckets but then join will fail since the number of partitions may not match. I guess the number of partitions not matching will be a concern if server groups are being used, but otherwise they should always match? Also it means that to keep the number of partitions consistent, the total number of cores (and thus the partitions + their distribution) has to be determined only once at top-level query execution (perhaps in SnappySession just before query execution) and propagated to all RDDs below to handle stray cases of new nodes joining.

@ymahajan Why were additional changes required? Today OrderlessHashPartitioning works with no additional changes to other parts.

@ymahajan
Copy link
Author

@sumwale Catalyst decides whether to use existing partitioning or go for shuffle by comparing(guarantees) with child's output partitioning. And to check the guarantees it creates HashPartitioning on the fly with expression and numPartitions) in EnsureRequirements#ensureDistributionAndOrdering. If we don't change the HashPartitioning to consider numBuckets, both the partitionings(our OrderlessHashPartitioning and HashPartitionig) satisfy guarantees and existing partitioning is used even for the two different numBuckets joins. So we need to provide numBuckets as an input to catalyst so that it will create HashPartitioning with numBuckets and compare against our OrderlessHashPartitioning

@sumwale
Copy link

sumwale commented Aug 30, 2016

@rishitesh Looking at the comment again, it seems you are saying return numBuckets in Partitioning, but use actual partitions for RDD execution? Unlikely to work because a lot of logic will depend on both being same (perhaps even assertions in code) with Spark assuming that the output will have numBuckets partitions, then try to apply further operators, including shuffle, which will likely fail. Can give it a shot but looks like a landmine to me.

@ymahajan Looking at the code there are two calls there: Partitioning.allCompatible and then inside child.outputPartitioning.guarantees. I guess you are saying that latter one won't be able to determine whether or not it guarantees since the argument is created on the fly. However, you actually don't need to. In fact for guarantees() you should only check if the number of buckets is equal to provided number of partitions (and if so then mark a flag in PartitionedPhysicalRDD to use number of buckets as partitions) because that is the only way to guarantee. For normal cases code will not even come inside there if Partitioning.allCompatible returns true (i.e. only in compatibleWith check the buckets on both sides).

@rishitesh
Copy link

Yes , just checked the code. There is an assertion checking RDD partition length with Spark plan partition length. My suggestion wont work here.

@ymahajan ymahajan merged commit 9ce9332 into snappy/branch-2.0 Sep 1, 2016
@sumwale
Copy link

sumwale commented Sep 7, 2016

@ymahajan @rishitesh I don't agree with merging these changes as noted before. It means that now for split mode the delinking will not work and it will be back to the old way.

Perhaps can keep for a moment for 0.6, but this should be reverted and better fix needed for 1.0

ymahajan added a commit that referenced this pull request Jan 13, 2017
* In catalyst, EnsureRquirements#ensureDistributionAndOrdering now uses numBuckets to decide if ShuffleExchange should be applied or not
* Modified OrderlessHashPartitioner and HashPartitioner to use numBuckets
ymahajan pushed a commit that referenced this pull request Jan 26, 2018
…pressions

## What changes were proposed in this pull request?

This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below:

```
    val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil)
    val sc = spark.sparkContext
    val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
    val df = spark.createDataFrame(rdd, inputSchema)

    // Works correctly since no nested decimal expression is involved
    // Expected result type: (26, 6) * (26, 6) = (38, 12)
    df.select($"col" * $"col").explain(true)
    df.select($"col" * $"col").printSchema()

    // Gives a wrong result since there is a nested decimal expression that should be visited first
    // Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18)
    df.select($"col" * $"col" * $"col").explain(true)
    df.select($"col" * $"col" * $"col").printSchema()
```

The example above gives the following output:

```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)#4]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- (col * col): decimal(38,12) (nullable = true)

// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- ((col * col) * col): decimal(38,12) (nullable = true)
```

## How was this patch tested?

This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios.

Author: aokolnychyi <[email protected]>

Closes apache#18583 from aokolnychyi/spark-21332.

(cherry picked from commit 0be5fb4)
Signed-off-by: gatorsmile <[email protected]>
sumwale pushed a commit that referenced this pull request Oct 13, 2021
…pressions

## What changes were proposed in this pull request?

This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below:

```
    val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil)
    val sc = spark.sparkContext
    val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
    val df = spark.createDataFrame(rdd, inputSchema)

    // Works correctly since no nested decimal expression is involved
    // Expected result type: (26, 6) * (26, 6) = (38, 12)
    df.select($"col" * $"col").explain(true)
    df.select($"col" * $"col").printSchema()

    // Gives a wrong result since there is a nested decimal expression that should be visited first
    // Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18)
    df.select($"col" * $"col" * $"col").explain(true)
    df.select($"col" * $"col" * $"col").printSchema()
```

The example above gives the following output:

```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)#4]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- (col * col): decimal(38,12) (nullable = true)

// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- ((col * col) * col): decimal(38,12) (nullable = true)
```

## How was this patch tested?

This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios.

Author: aokolnychyi <[email protected]>

Closes apache#18583 from aokolnychyi/spark-21332.

(cherry picked from commit 0be5fb4)
Signed-off-by: gatorsmile <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants