Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MapGroupsWithState, and FlatMapGroupsWithState #34

Closed
wants to merge 124 commits into from
Closed

Conversation

tdas
Copy link
Owner

@tdas tdas commented Jan 21, 2017

BLAH

CodingCat and others added 30 commits January 16, 2017 18:33
…om JobScheduler.jobSets

## What changes were proposed in this pull request?

the current implementation of Spark streaming considers a batch is completed no matter the results of the jobs (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
Let's consider the following case:
A micro batch contains 2 jobs and they read from two different kafka topics respectively. One of these jobs is failed due to some problem in the user defined logic, after the other one is finished successfully.
1. The main thread in the Spark streaming application will execute the line mentioned above,
2. and another thread (checkpoint writer) will make a checkpoint file immediately after this line is executed.
3. Then due to the current error handling mechanism in Spark Streaming, StreamingContext will be closed (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
the user recovers from the checkpoint file, and because the JobSet containing the failed job has been removed (taken as completed) before the checkpoint is constructed, the data being processed by the failed job would never be reprocessed

This PR fix it by removing jobset from JobScheduler.jobSets only when all jobs in a jobset are successfully finished

## How was this patch tested?

existing tests

Author: CodingCat <[email protected]>
Author: Nan Zhu <[email protected]>

Closes apache#16542 from CodingCat/SPARK-18905.
## What changes were proposed in this pull request?

In apache#16296 , we reached a consensus that we should hide the external/managed table concept to users and only expose custom table path.

This PR renames `Catalog.createExternalTable` to `createTable`(still keep the old versions for backward compatibility), and only set the table type to EXTERNAL if `path` is specified in options.

## How was this patch tested?

new tests in `CatalogSuite`

Author: Wenchen Fan <[email protected]>

Closes apache#16528 from cloud-fan/create-table.
…n directory.

## What changes were proposed in this pull request?
apache#16092 moves YARN resource manager related code to resource-managers/yarn directory. The test case ```YarnSchedulerBackendSuite``` was added after that but with the wrong place. I move it to correct directory in this PR.

## How was this patch tested?
Existing test.

Author: Yanbo Liang <[email protected]>

Closes apache#16595 from yanboliang/yarn.
## What changes were proposed in this pull request?

SET LOCATION can also work on managed table(or table created without custom path), the behavior is a little weird, but as we have already supported it, we should add a test to explicitly show the behavior.

## How was this patch tested?

N/A

Author: Wenchen Fan <[email protected]>

Closes apache#16597 from cloud-fan/set-location.
## What changes were proposed in this pull request?

Changing the default parquet logging levels to reflect the changes made in PR [apache#15538](apache#15538), in order to prevent the flood of log messages by default.

## How was this patch tested?

Default log output when reading from parquet 1.6 files was compared with and without this change. The change eliminates the extraneous logging and makes the output readable.

Author: Nick Lavers <[email protected]>

Closes apache#16580 from nicklavers/spark-19219-set_default_parquet_log_level.
…n `sbt/sbt unidoc`

## What changes were proposed in this pull request?

This PR proposes to fix ambiguous link warnings by simply making them as code blocks for both javadoc and scaladoc.

```
[warn] .../spark/core/src/main/scala/org/apache/spark/Accumulator.scala:20: The link target "SparkContext#accumulator" is ambiguous. Several members fit the target:
[warn] .../spark/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala:281: The link target "runMiniBatchSGD" is ambiguous. Several members fit the target:
[warn] .../spark/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala:83: The link target "run" is ambiguous. Several members fit the target:
...
```

This PR also fixes javadoc8 break as below:

```
[error] .../spark/sql/core/target/java/org/apache/spark/sql/LowPrioritySQLImplicits.java:7: error: reference not found
[error]  * newProductEncoder - to disambiguate for {link List}s which are both {link Seq} and {link Product}
[error]                                                   ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/LowPrioritySQLImplicits.java:7: error: reference not found
[error]  * newProductEncoder - to disambiguate for {link List}s which are both {link Seq} and {link Product}
[error]                                                                                ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/LowPrioritySQLImplicits.java:7: error: reference not found
[error]  * newProductEncoder - to disambiguate for {link List}s which are both {link Seq} and {link Product}
[error]                                                                                                ^
[info] 3 errors
```

## How was this patch tested?

Manually via `sbt unidoc > output.txt` and the checked it via `cat output.txt | grep ambiguous`

and `sbt unidoc | grep error`.

Author: hyukjinkwon <[email protected]>

Closes apache#16604 from HyukjinKwon/SPARK-3249.
…ate docs

## What changes were proposed in this pull request?

`spark.yarn.access.namenodes` configuration cannot actually reflects the usage of it, inside the code it is the Hadoop filesystems we get tokens, not NNs. So here propose to update the name of this configuration, also change the related code and doc.

## How was this patch tested?

Local verification.

Author: jerryshao <[email protected]>

Closes apache#16560 from jerryshao/SPARK-19179.
… cloudpickle changes for PySpark to work with Python 3.6.0

## What changes were proposed in this pull request?

Currently, PySpark does not work with Python 3.6.0.

Running `./bin/pyspark` simply throws the error as below and PySpark does not work at all:

```
Traceback (most recent call last):
  File ".../spark/python/pyspark/shell.py", line 30, in <module>
    import pyspark
  File ".../spark/python/pyspark/__init__.py", line 46, in <module>
    from pyspark.context import SparkContext
  File ".../spark/python/pyspark/context.py", line 36, in <module>
    from pyspark.java_gateway import launch_gateway
  File ".../spark/python/pyspark/java_gateway.py", line 31, in <module>
    from py4j.java_gateway import java_import, JavaGateway, GatewayClient
  File "<frozen importlib._bootstrap>", line 961, in _find_and_load
  File "<frozen importlib._bootstrap>", line 950, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 646, in _load_unlocked
  File "<frozen importlib._bootstrap>", line 616, in _load_backward_compatible
  File ".../spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 18, in <module>
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pydoc.py", line 62, in <module>
    import pkgutil
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pkgutil.py", line 22, in <module>
    ModuleInfo = namedtuple('ModuleInfo', 'module_finder name ispkg')
  File ".../spark/python/pyspark/serializers.py", line 394, in namedtuple
    cls = _old_namedtuple(*args, **kwargs)
TypeError: namedtuple() missing 3 required keyword-only arguments: 'verbose', 'rename', and 'module'
```

The root cause seems because some arguments of `namedtuple` are now completely keyword-only arguments from Python 3.6.0 (See https://bugs.python.org/issue25628).

We currently copy this function via `types.FunctionType` which does not set the default values of keyword-only arguments (meaning `namedtuple.__kwdefaults__`) and this seems causing internally missing values in the function (non-bound arguments).

This PR proposes to work around this by manually setting it via `kwargs` as `types.FunctionType` seems not supporting to set this.

Also, this PR ports the changes in cloudpickle for compatibility for Python 3.6.0.

## How was this patch tested?

Manually tested with Python 2.7.6 and Python 3.6.0.

```
./bin/pyspsark
```

, manual creation of `namedtuple` both in local and rdd with Python 3.6.0,

and Jenkins tests for other Python versions.

Also,

```
./run-tests --python-executables=python3.6
```

```
Will test against the following Python executables: ['python3.6']
Will test the following Python modules: ['pyspark-core', 'pyspark-ml', 'pyspark-mllib', 'pyspark-sql', 'pyspark-streaming']
Finished test(python3.6): pyspark.sql.tests (192s)
Finished test(python3.6): pyspark.accumulators (3s)
Finished test(python3.6): pyspark.mllib.tests (198s)
Finished test(python3.6): pyspark.broadcast (3s)
Finished test(python3.6): pyspark.conf (2s)
Finished test(python3.6): pyspark.context (14s)
Finished test(python3.6): pyspark.ml.classification (21s)
Finished test(python3.6): pyspark.ml.evaluation (11s)
Finished test(python3.6): pyspark.ml.clustering (20s)
Finished test(python3.6): pyspark.ml.linalg.__init__ (0s)
Finished test(python3.6): pyspark.streaming.tests (240s)
Finished test(python3.6): pyspark.tests (240s)
Finished test(python3.6): pyspark.ml.recommendation (19s)
Finished test(python3.6): pyspark.ml.feature (36s)
Finished test(python3.6): pyspark.ml.regression (37s)
Finished test(python3.6): pyspark.ml.tuning (28s)
Finished test(python3.6): pyspark.mllib.classification (26s)
Finished test(python3.6): pyspark.mllib.evaluation (18s)
Finished test(python3.6): pyspark.mllib.clustering (44s)
Finished test(python3.6): pyspark.mllib.linalg.__init__ (0s)
Finished test(python3.6): pyspark.mllib.feature (26s)
Finished test(python3.6): pyspark.mllib.fpm (23s)
Finished test(python3.6): pyspark.mllib.random (8s)
Finished test(python3.6): pyspark.ml.tests (92s)
Finished test(python3.6): pyspark.mllib.stat.KernelDensity (0s)
Finished test(python3.6): pyspark.mllib.linalg.distributed (25s)
Finished test(python3.6): pyspark.mllib.stat._statistics (15s)
Finished test(python3.6): pyspark.mllib.recommendation (24s)
Finished test(python3.6): pyspark.mllib.regression (26s)
Finished test(python3.6): pyspark.profiler (9s)
Finished test(python3.6): pyspark.mllib.tree (16s)
Finished test(python3.6): pyspark.shuffle (1s)
Finished test(python3.6): pyspark.mllib.util (18s)
Finished test(python3.6): pyspark.serializers (11s)
Finished test(python3.6): pyspark.rdd (20s)
Finished test(python3.6): pyspark.sql.conf (8s)
Finished test(python3.6): pyspark.sql.catalog (17s)
Finished test(python3.6): pyspark.sql.column (18s)
Finished test(python3.6): pyspark.sql.context (18s)
Finished test(python3.6): pyspark.sql.group (27s)
Finished test(python3.6): pyspark.sql.dataframe (33s)
Finished test(python3.6): pyspark.sql.functions (35s)
Finished test(python3.6): pyspark.sql.types (6s)
Finished test(python3.6): pyspark.sql.streaming (13s)
Finished test(python3.6): pyspark.streaming.util (0s)
Finished test(python3.6): pyspark.sql.session (16s)
Finished test(python3.6): pyspark.sql.window (4s)
Finished test(python3.6): pyspark.sql.readwriter (35s)
Tests passed in 433 seconds
```

Author: hyukjinkwon <[email protected]>

Closes apache#16429 from HyukjinKwon/SPARK-19019.
## What changes were proposed in this pull request?

`dropDuplicates` will create an Alias using the same exprId, so `StreamExecution` should also replace Alias if necessary.

## How was this patch tested?

test("SPARK-19065: dropDuplicates should not create expressions using the same id")

Author: Shixiong Zhu <[email protected]>

Closes apache#16564 from zsxwing/SPARK-19065.
…partition spec

### What changes were proposed in this pull request?
Empty partition column values are not valid for partition specification. Before this PR, we accept users to do it; however, Hive metastore does not detect and disallow it too. Thus, users hit the following strange error.

```Scala
val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name")
df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable")
spark.sql("alter table partitionedTable drop partition(partCol1='')")
spark.table("partitionedTable").show()
```

In the above example, the WHOLE table is DROPPED when users specify a partition spec containing only one partition column with empty values.

When the partition columns contains more than one, Hive metastore APIs simply ignore the columns with empty values and treat it as partial spec. This is also not expected. This does not follow the actual Hive behaviors. This PR is to disallow users to specify such an invalid partition spec in the `SessionCatalog` APIs.

### How was this patch tested?
Added test cases

Author: gatorsmile <[email protected]>

Closes apache#16583 from gatorsmile/disallowEmptyPartColValue.
…ify the column in jdbc API

## What changes were proposed in this pull request?

The `jdbc` API do not check the `lowerBound` and `upperBound` when we
specified the ``column``, and just throw the following exception:

>```int() argument must be a string or a number, not 'NoneType'```

If we check the parameter, we can give a more friendly suggestion.

## How was this patch tested?
Test using the pyspark shell, without the lowerBound and upperBound parameters.

Author: DjvuLee <[email protected]>

Closes apache#16599 from djvulee/pysparkFix.
…in.withOrigin()

## What changes were proposed in this pull request?

Remove duplicate call of reset() function in CurrentOrigin.withOrigin().

## How was this patch tested?

Existing test cases.

Author: jiangxingbo <[email protected]>

Closes apache#16615 from jiangxb1987/dummy-code.
## What changes were proposed in this pull request?
In append mode, we check whether the schema of the write is compatible with the schema of the existing data. It can be a significant performance issue in cloud environment to find the existing schema for files. This patch removes the check.

Note that for catalog tables, we always do the check, as discussed in apache#16339 (comment)

## How was this patch tested?
N/A

Closes apache#16339.

Author: Reynold Xin <[email protected]>

Closes apache#16622 from rxin/SPARK-18917.
## What changes were proposed in this pull request?

Added outer_explode, outer_posexplode, outer_inline functions and expressions.
Some bug fixing in GenerateExec.scala for CollectionGenerator. Previously it was not correctly handling the case of outer with empty collections, only with nulls.

## How was this patch tested?

New tests added to GeneratorFunctionSuite

Author: Bogdan Raducanu <[email protected]>

Closes apache#16608 from bogdanrdc/SPARK-13721.
…,LiR

## What changes were proposed in this pull request?

add instrumentation for MLP,NB,LDA,AFT,GLM,Isotonic,LiR
## How was this patch tested?

local test in spark-shell

Author: Zheng RuiFeng <[email protected]>
Author: Ruifeng Zheng <[email protected]>

Closes apache#15671 from zhengruifeng/lir_instr.
## What changes were proposed in this pull request?

Inserting data into Hive tables has its own implementation that is distinct from data sources: `InsertIntoHiveTable`, `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`.

Note that one other major difference is that data source tables write directly to the final destination without using some staging directory, and then Spark itself adds the partitions/tables to the catalog. Hive tables actually write to some staging directory, and then call Hive metastore's loadPartition/loadTable function to load those data in. So we still need to keep `InsertIntoHiveTable` to put this special logic. In the future, we should think of writing to the hive table location directly, so that we don't need to call `loadTable`/`loadPartition` at the end and remove `InsertIntoHiveTable`.

This PR removes `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`, and create a `HiveFileFormat` to implement the write logic. In the future, we should also implement the read logic in `HiveFileFormat`.

## How was this patch tested?

existing tests

Author: Wenchen Fan <[email protected]>

Closes apache#16517 from cloud-fan/insert-hive.
## What changes were proposed in this pull request?
remove ununsed imports and outdated comments, and fix some minor code style issue.

## How was this patch tested?
existing ut

Author: uncleGen <[email protected]>

Closes apache#16591 from uncleGen/SPARK-19227.
**What changes were proposed in this pull request?**

Use Hadoop 2.6.5 for the Hadoop 2.6 profile, I see a bunch of fixes including security ones in the release notes that we should pick up

**How was this patch tested?**

Running the unit tests now with IBM's SDK for Java and let's see what happens with OpenJDK in the community builder - expecting no trouble as it is only a minor release.

Author: Adam Roberts <[email protected]>

Closes apache#16616 from a-roberts/Hadoop265Bumper.
## What changes were proposed in this pull request?

On CREATE/ALTER a view, it's no longer needed to generate a SQL text string from the LogicalPlan, instead we store the SQL query text、the output column names of the query plan, and current database to CatalogTable. Permanent views created by this approach can be resolved by current view resolution approach.

The main advantage includes:
1. If you update an underlying view, the current view also gets updated;
2. That gives us a change to get ride of SQL generation for operators.

Major changes of this PR:
1. Generate the view-specific properties(e.g. view default database, view query output column names) during permanent view creation and store them as properties in the CatalogTable;
2. Update the commands `CreateViewCommand` and `AlterViewAsCommand`, get rid of SQL generation from them.

## How was this patch tested?
Existing tests.

Author: jiangxingbo <[email protected]>

Closes apache#16613 from jiangxb1987/view-write-path.
…which are based on HadoopRDD or NewHadoopRDD

## What changes were proposed in this pull request?

For some datasources which are based on HadoopRDD or NewHadoopRDD, such as spark-xml, InputFileBlockHolder doesn't work with Python UDF.

The method to reproduce it is, running the following codes with `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`:

    from pyspark.sql.functions import udf,input_file_name
    from pyspark.sql.types import StringType
    from pyspark.sql import SparkSession

    def filename(path):
        return path

    session = SparkSession.builder.appName('APP').getOrCreate()

    session.udf.register('sameText', filename)
    sameText = udf(filename, StringType())

    df = session.read.format('xml').load('a.xml', rowTag='root').select('*', input_file_name().alias('file'))
    df.select('file').show() # works
    df.select(sameText(df['file'])).show()   # returns empty content

The issue is because in `HadoopRDD` and `NewHadoopRDD` we set the file block's info in `InputFileBlockHolder` before the returned iterator begins consuming. `InputFileBlockHolder` will record this info into thread local variable. When running Python UDF in batch, we set up another thread to consume the iterator from child plan's output rdd, so we can't read the info back in another thread.

To fix this, we have to set the info in `InputFileBlockHolder` after the iterator begins consuming. So the info can be read in correct thread.

## How was this patch tested?

Manual test with above example codes for spark-xml package on pyspark: `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`.

Added pyspark test.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <[email protected]>

Closes apache#16585 from viirya/fix-inputfileblock-hadooprdd.
…park release

## What changes were proposed in this pull request?

When R is starting as a package and it needs to download the Spark release distribution we need to handle error for download and untar, and clean up, otherwise it will get stuck.

## How was this patch tested?

manually

Author: Felix Cheung <[email protected]>

Closes apache#16589 from felixcheung/rtarreturncode.
…eceiver idempotent.

## What changes were proposed in this pull request?

Method canCommit sends AskPermissionToCommitOutput using askWithRetry. If timeout, it will send again. Thus AskPermissionToCommitOutput can be received multi times. Method canCommit should return the same value when called by the same attempt multi times.

In implementation before this fix, method handleAskPermissionToCommit just check if there is committer already registered, which is not enough. When worker retries AskPermissionToCommitOutput it will get CommitDeniedException, then the task will fail with reason TaskCommitDenied, which is not regarded as a task failure(SPARK-11178), so TaskScheduler will schedule this task infinitely.

In this fix, use `ask` to replace `askWithRetry` in `canCommit` and make receiver idempotent.

## How was this patch tested?

Added a new unit test to OutputCommitCoordinatorSuite.

Author: jinxing <[email protected]>

Closes apache#16503 from jinxing64/SPARK-18113.
…waitInitialization to avoid breaking tests

## What changes were proposed in this pull request?

apache#16492 missed one race condition: `StreamExecution.awaitInitialization` may throw fatal errors and fail the test. This PR just ignores `StreamingQueryException` thrown from `awaitInitialization` so that we can verify the exception in the `ExpectFailure` action later. It's fine since `StopStream` or `ExpectFailure` will catch `StreamingQueryException` as well.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes apache#16567 from zsxwing/SPARK-19113-2.
… error

## What changes were proposed in this pull request?

We should call `StateStore.abort()` when there should be any error before the store is committed.

## How was this patch tested?

Manually.

Author: Liwei Lin <[email protected]>

Closes apache#16547 from lw-lin/append-filter.
…ner to not block UI when generating Streaming jobs

## What changes were proposed in this pull request?

When DStreamGraph is generating a job, it will hold a lock and block other APIs. Because StreamingJobProgressListener (numInactiveReceivers, streamName(streamId: Int), streamIds) needs to call DStreamGraph's methods to access some information, the UI may hang if generating a job is very slow (e.g., talking to the slow Kafka cluster to fetch metadata).
It's better to optimize the locks in DStreamGraph and StreamingJobProgressListener to make the UI not block by job generation.

## How was this patch tested?
existing ut

cc zsxwing

Author: uncleGen <[email protected]>

Closes apache#16601 from uncleGen/SPARK-19182.
WeichenXu123 and others added 18 commits January 26, 2017 20:10
…tiplication and solve potential OOM and low parallelism usage problem By split middle dimension in matrix multiplication

## What changes were proposed in this pull request?

### The problem in current block matrix mulitiplication

As in JIRA https://issues.apache.org/jira/browse/SPARK-18218 described, block matrix multiplication in spark may cause some problem, suppose we have `M*N` dimensions matrix A multiply `N*P` dimensions matrix B, when N is much larger than M and P, then the following problem may occur:
- when the middle dimension N is too large, it will cause reducer OOM.
- even if OOM do not occur, it will still cause parallism too low.
- when N is much large than M and P, and matrix A and B have many partitions, it may cause too many partition on M and P dimension, it will cause much larger shuffled data size. (I will expain this in detail in the following.)

### Key point of my improvement

In this PR, I introduce `midDimSplitNum` parameter, and improve the algorithm, to resolve this problem.

In order to understand the improvement in this PR, first let me give a simple case to explain how the current mulitiplication works and what cause the problems above:

suppose we have block matrix A, contains 200 blocks (`2 numRowBlocks * 100 numColBlocks`), blocks arranged in 2 rows, 100 cols:
```
A00 A01 A02 ... A0,99
A10 A11 A12 ... A1,99
```
and we have block matrix B, also contains 200 blocks (`100 numRowBlocks * 2 numColBlocks`), blocks arranged in 100 rows, 2 cols:
```
B00    B01
B10    B11
B20    B21
...
B99,0  B99,1
```
Suppose all blocks in the two matrices are dense for now.
Now we call A.multiply(B), suppose the generated `resultPartitioner` contains 2 rowPartitions and 2 colPartitions (can't be more partitions because the result matrix only contains `2 * 2` blocks), the current algorithm will contains two shuffle steps:

**step-1**
Step-1 will generate 4 reducer, I tag them as reducer-00, reducer-01, reducer-10, reducer-11, and shuffle data as following:
```
A00 A01 A02 ... A0,99
B00 B10 B20 ... B99,0    shuffled into reducer-00

A00 A01 A02 ... A0,99
B01 B11 B21 ... B99,1    shuffled into reducer-01

A10 A11 A12 ... A1,99
B00 B10 B20 ... B99,0    shuffled into reducer-10

A10 A11 A12 ... A1,99
B01 B11 B21 ... B99,1    shuffled into reducer-11
```

and the shuffling above is a `cogroup` transform, note that each reducer contains **only one group**.

**step-2**
Step-2 will do an `aggregateByKey` transform on the result of step-1, will also generate 4 reducers, and generate the final result RDD, contains 4 partitions, each partition contains one block.

The main problems are in step-1. Now we have only 4 reducers, but matrix A and B have 400 blocks in total, obviously the reducer number is too small.
and, we can see that, each reducer contains only one group(the group concept in `coGroup` transform), each group contains 200 blocks. This is terrible because we know that `coGroup` transformer will load each group into memory when computing. It is un-extensable in the algorithm level. Suppose matrix A has 10000 cols blocks or more instead of 100? Than each reducer will load 20000 blocks into memory. It will easily cause reducer OOM.

This PR try to resolve the problem described above.
When matrix A with dimension M * N multiply matrix B with dimension N * P, the middle dimension N is the keypoint. If N is large, the current mulitiplication implementation works badly.
In this PR, I introduce a `numMidDimSplits` parameter, represent how many splits it will cut on the middle dimension N.
Still using the example described above, now we set `numMidDimSplits = 10`, now we can generate 40 reducers in **step-1**:

the reducer-ij above now will be splited into 10 reducers: reducer-ij0, reducer-ij1, ... reducer-ij9, each reducer will receive 20 blocks.
now the shuffle works as following:

**reducer-000 to reducer-009**
```
A0,0 A0,10 A0,20 ... A0,90
B0,0 B10,0 B20,0 ... B90,0    shuffled into reducer-000

A0,1 A0,11 A0,21 ... A0,91
B1,0 B11,0 B21,0 ... B91,0    shuffled into reducer-001

A0,2 A0,12 A0,22 ... A0,92
B2,0 B12,0 B22,0 ... B92,0    shuffled into reducer-002

...

A0,9 A0,19 A0,29 ... A0,99
B9,0 B19,0 B29,0 ... B99,0    shuffled into reducer-009
```

**reducer-010 to reducer-019**
```
A0,0 A0,10 A0,20 ... A0,90
B0,1 B10,1 B20,1 ... B90,1    shuffled into reducer-010

A0,1 A0,11 A0,21 ... A0,91
B1,1 B11,1 B21,1 ... B91,1    shuffled into reducer-011

A0,2 A0,12 A0,22 ... A0,92
B2,1 B12,1 B22,1 ... B92,1    shuffled into reducer-012

...

A0,9 A0,19 A0,29 ... A0,99
B9,1 B19,1 B29,1 ... B99,1    shuffled into reducer-019
```

**reducer-100 to reducer-109** and **reducer-110 to reducer-119** is similar to the above, I omit to write them out.

### API for this optimized algorithm

I add a new API as following:
```
  def multiply(
      other: BlockMatrix,
      numMidDimSplits: Int // middle dimension split number, expained above
): BlockMatrix
```

### Shuffled data size analysis (compared under the same parallelism)

The optimization has some subtle influence on the total shuffled data size. Appropriate `numMidDimSplits` will significantly reduce the shuffled data size,
but too large `numMidDimSplits` may increase the shuffled data in reverse. For now I don't want to introduce formula to make thing too complex, I only use a simple case to represent it here:

Suppose we have two same size square matrices X and Y, both have `16 numRowBlocks * 16 numColBlocks`. X and Y are both dense matrix. Now let me analysis the shuffling data size in the following case:

**case 1: X and Y both partitioned in 16 rowPartitions and 16 colPartitions, numMidDimSplits = 1**
ShufflingDataSize = (16 * 16 * (16 + 16) + 16 * 16) blocks = 8448 blocks
parallelism = 16 * 16 * 1 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm.

**case 2: X and Y both partitioned in 8 rowPartitions and 8 colPartitions, numMidDimSplits = 4**
ShufflingDataSize = (8 * 8 * (32 + 32) + 16 * 16 * 4) blocks = 5120 blocks
parallelism = 8 * 8 * 4 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm.

**The two cases above all have parallism = 256**, case 1 `numMidDimSplits = 1` is equivalent with current implementation in mllib, but case 2 shuffling data is 60.6% of case 1, **it shows that under the same parallelism, proper `numMidDimSplits` will significantly reduce the shuffling data size**.

## How was this patch tested?

Test suites added.
Running result:
![blockmatrix](https://cloud.githubusercontent.com/assets/19235986/21600989/5e162cc2-d1bf-11e6-868c-0ec29190b605.png)

Author: WeichenXu <[email protected]>

Closes apache#15730 from WeichenXu123/optim_block_matrix.
## What changes were proposed in this pull request?

Add R wrapper for bisecting Kmeans.

As JIRA is down, I will update title to link with corresponding JIRA later.

## How was this patch tested?

Add new unit tests.

Author: [email protected] <[email protected]>

Closes apache#16566 from wangmiao1981/bk.
## What changes were proposed in this pull request?

With doc to say this would convert DF into RDD

## How was this patch tested?

unit tests, manual tests

Author: Felix Cheung <[email protected]>

Closes apache#16668 from felixcheung/rgetnumpartitions.
## What changes were proposed in this pull request?
I propose to add the full Tweedie family into the GeneralizedLinearRegression model. The Tweedie family is characterized by a power variance function. Currently supported distributions such as Gaussian, Poisson and Gamma families are a special case of the Tweedie https://en.wikipedia.org/wiki/Tweedie_distribution.

yanboliang srowen sethah

Author: actuaryzhang <[email protected]>
Author: Wayne Zhang <[email protected]>

Closes apache#16344 from actuaryzhang/tweedie.
…Java

## What changes were proposed in this pull request?

This PR fixes both,

javadoc8 break

```
[error] .../spark/sql/hive/target/java/org/apache/spark/sql/hive/FindHiveSerdeTable.java:3: error: reference not found
[error]  * Replaces {link SimpleCatalogRelation} with {link MetastoreRelation} if its table provider is hive.
```

and the example in `StructType` as a self-contained example as below:

```scala
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val struct =
  StructType(
    StructField("a", IntegerType, true) ::
    StructField("b", LongType, false) ::
    StructField("c", BooleanType, false) :: Nil)

// Extract a single StructField.
val singleField = struct("b")
// singleField: StructField = StructField(b,LongType,false)

// If this struct does not have a field called "d", it throws an exception.
struct("d")
// java.lang.IllegalArgumentException: Field "d" does not exist.
//   ...

// Extract multiple StructFields. Field names are provided in a set.
// A StructType object will be returned.
val twoFields = struct(Set("b", "c"))
// twoFields: StructType =
//   StructType(StructField(b,LongType,false), StructField(c,BooleanType,false))

// Any names without matching fields will throw an exception.
// For the case shown below, an exception is thrown due to "d".
struct(Set("b", "c", "d"))
// java.lang.IllegalArgumentException: Field "d" does not exist.
//    ...
```

```scala
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val innerStruct =
  StructType(
    StructField("f1", IntegerType, true) ::
    StructField("f2", LongType, false) ::
    StructField("f3", BooleanType, false) :: Nil)

val struct = StructType(
  StructField("a", innerStruct, true) :: Nil)

// Create a Row with the schema defined by struct
val row = Row(Row(1, 2, true))
```

Also, now when the column is missing, it throws an exception rather than ignoring.

## How was this patch tested?

Manually via `sbt unidoc`.

- Scaladoc

  <img width="665" alt="2017-01-26 12 54 13" src="https://cloud.githubusercontent.com/assets/6477701/22297905/1245620e-e362-11e6-9e22-43bb8d9871af.png">

- Javadoc

  <img width="722" alt="2017-01-26 12 54 27" src="https://cloud.githubusercontent.com/assets/6477701/22297899/0fd87e0c-e362-11e6-9033-7590bda1aea6.png">

  <img width="702" alt="2017-01-26 12 54 32" src="https://cloud.githubusercontent.com/assets/6477701/22297900/0fe14154-e362-11e6-9882-768381c53163.png">

Author: hyukjinkwon <[email protected]>

Closes apache#16703 from HyukjinKwon/SPARK-12970.
## What changes were proposed in this pull request?

add header

## How was this patch tested?

Manual run to check vignettes html is created properly

Author: Felix Cheung <[email protected]>

Closes apache#16709 from felixcheung/rfilelicense.
…parkR

## What changes were proposed in this pull request?

This affects mostly running job from the driver in client mode when results are expected to be through stdout (which should be somewhat rare, but possible)

Before:
```
> a <- as.DataFrame(cars)
> b <- group_by(a, "dist")
> c <- count(b)
> sparkR.callJMethod(c$countjc, "explain", TRUE)
NULL
```

After:
```
> a <- as.DataFrame(cars)
> b <- group_by(a, "dist")
> c <- count(b)
> sparkR.callJMethod(c$countjc, "explain", TRUE)
count#11L
NULL
```

Now, `column.explain()` doesn't seem very useful (we can get more extensive output with `DataFrame.explain()`) but there are other more complex examples with calls of `println` in Scala/JVM side, that are getting dropped.

## How was this patch tested?

manual

Author: Felix Cheung <[email protected]>

Closes apache#16670 from felixcheung/rjvmstdout.
## What changes were proposed in this pull request?

Right now Netty PRC serializes `RequestMessage` using Java serialization, and the size of a single message (e.g., RequestMessage(..., "hello")`) is almost 1KB.

This PR optimizes it by serializing `RequestMessage` manually (eliminate unnecessary information from most messages, e.g., class names of `RequestMessage`, `NettyRpcEndpointRef`, ...), and reduces the above message size to 100+ bytes.

## How was this patch tested?

Jenkins

I did a simple test to measure the improvement:

Before
```
$ bin/spark-shell --master local-cluster[1,4,1024]
...
scala> for (i <- 1 to 10) {
     |   val start = System.nanoTime
     |   val s = sc.parallelize(1 to 1000000, 10 * 1000).count()
     |   val end = System.nanoTime
     |   println(s"$i\t" + ((end - start)/1000/1000))
     | }
1       6830
2       4353
3       3322
4       3107
5       3235
6       3139
7       3156
8       3166
9       3091
10      3029
```
After:
```
$ bin/spark-shell --master local-cluster[1,4,1024]
...
scala> for (i <- 1 to 10) {
     |   val start = System.nanoTime
     |   val s = sc.parallelize(1 to 1000000, 10 * 1000).count()
     |   val end = System.nanoTime
     |   println(s"$i\t" + ((end - start)/1000/1000))
     | }
1       6431
2       3643
3       2913
4       2679
5       2760
6       2710
7       2747
8       2793
9       2679
10      2651
```

I also captured the TCP packets for this test. Before this patch, the total size of TCP packets is ~1.5GB. After it, it reduces to ~1.2GB.

Author: Shixiong Zhu <[email protected]>

Closes apache#16706 from zsxwing/rpc-opt.
## What changes were proposed in this pull request?

Add Python API for the newly added LinearSVC algorithm.

## How was this patch tested?

Add new doc string test.

Author: [email protected] <[email protected]>

Closes apache#16694 from wangmiao1981/ser.
…pper-case by HiveExternalCatalog

## What changes were proposed in this pull request?

Hive metastore is not case preserving and keep partition columns with lower case names.

If SparkSQL create a table with upper-case partion name use HiveExternalCatalog, when we rename partition, it first call the HiveClient to renamePartition, which will create a new lower case partition path, then SparkSql rename the lower case path to the upper-case.

while if the renamed partition contains more than one depth partition ,e.g. A=1/B=2, hive renamePartition change to a=1/b=2, then SparkSql rename it to A=1/B=2, but the a=1 still exists in the filesystem, we should also delete it.

## How was this patch tested?
unit test added

Author: windpiger <[email protected]>

Closes apache#16700 from windpiger/clearUselessPathAfterRenamPartition.
## What changes were proposed in this pull request?
unpersist the input dataset if `handlePersistence` = true

## How was this patch tested?
existing tests

Author: Zheng RuiFeng <[email protected]>

Closes apache#16718 from zhengruifeng/isoReg_unpersisit.
… with upper-case by HiveExternalCatalog

### What changes were proposed in this pull request?

This PR is to revert the changes made in apache#16700. It could cause the data loss after partition rename, because we have a bug in the file renaming.

Not all the OSs have the same behaviors. For example, on mac OS, if we renaming a path from `.../tbl/a=5/b=6` to `.../tbl/A=5/B=6`. The result is `.../tbl/a=5/B=6`. The expected result is `.../tbl/A=5/B=6`. Thus, renaming on mac OS is not recursive. However, the systems used in Jenkin does not have such an issue. Although this PR is not the root cause, it exposes an existing issue on the code `tablePath.getFileSystem(hadoopConf).rename(wrongPath, rightPath)`

---

Hive metastore is not case preserving and keep partition columns with lower case names.

If SparkSQL create a table with upper-case partion name use HiveExternalCatalog, when we rename partition, it first call the HiveClient to renamePartition, which will create a new lower case partition path, then SparkSql rename the lower case path to the upper-case.

while if the renamed partition contains more than one depth partition ,e.g. A=1/B=2, hive renamePartition change to a=1/b=2, then SparkSql rename it to A=1/B=2, but the a=1 still exists in the filesystem, we should also delete it.

### How was this patch tested?
N/A

Author: gatorsmile <[email protected]>

Closes apache#16728 from gatorsmile/revert-pr-16700.
## What changes were proposed in this pull request?

After apache#16552 , `CreateHiveTableAsSelectCommand` becomes very similar to `CreateDataSourceTableAsSelectCommand`, and we can further simplify it by only creating table in the table-not-exist branch.

This PR also adds hive provider checking in DataStream reader/writer, which is missed in apache#16552

## How was this patch tested?

N/A

Author: Wenchen Fan <[email protected]>

Closes apache#16693 from cloud-fan/minor.
## What changes were proposed in this pull request?
This PR adds the first set of tests for EXISTS subquery.

File name                        | Brief description
------------------------| -----------------
exists-basic.sql              |Tests EXISTS and NOT EXISTS subqueries with both correlated and local predicates.
exists-within-and-or.sql|Tests EXISTS and NOT EXISTS subqueries embedded in AND or OR expression.

DB2 results are attached here as reference :

[exists-basic-db2.txt](https://github.com/apache/spark/files/733031/exists-basic-db2.txt)
[exists-and-or-db2.txt](https://github.com/apache/spark/files/733030/exists-and-or-db2.txt)

## How was this patch tested?
This patch is adding tests.

Author: Dilip Biswal <[email protected]>

Closes apache#16710 from dilipbiswal/exist-basic.
…ot expect such cases that l.hashcode > r.hashcode

## What changes were proposed in this pull request?

During canonicalization, `NOT(...(l, r))` should not expect such cases that `l.hashcode > r.hashcode`.

Take the rule `case NOT(GreaterThan(l, r)) if l.hashcode > r.hashcode` for example, it should never be matched since `GreaterThan(l, r)` itself would be re-written as `GreaterThan(r, l)` given `l.hashcode > r.hashcode` after canonicalization.

This patch consolidates rules like `case NOT(GreaterThan(l, r)) if l.hashcode > r.hashcode` and `case NOT(GreaterThan(l, r))`.

## How was this patch tested?

This patch expanded the `NOT` test case to cover both cases where:
- `l.hashcode > r.hashcode`
- `l.hashcode < r.hashcode`

Author: Liwei Lin <[email protected]>

Closes apache#16719 from lw-lin/canonicalize.
## What changes were proposed in this pull request?

This removes from the `__all__` list class names that are not defined (visible) in the `pyspark.sql.column`.

## How was this patch tested?

Existing unit tests.

Author: zero323 <[email protected]>

Closes apache#16742 from zero323/SPARK-19403.
tdas and others added 6 commits January 30, 2017 13:29
### What changes were proposed in this pull request?
The case are not sensitive in JDBC options, after the PR apache#15884 is merged to Spark 2.1.

### How was this patch tested?
N/A

Author: gatorsmile <[email protected]>

Closes apache#16734 from gatorsmile/fixDocCaseInsensitive.
### What changes were proposed in this pull request?
Currently, the function `to_json` allows users to provide options for generating JSON. However, it does not pass it to `JacksonGenerator`. Thus, it ignores the user-provided options. This PR is to fix it. Below is an example.

```Scala
val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a")
val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm")
df.select(to_json($"a", options)).show(false)
```
The current output is like
```
+--------------------------------------+
|structtojson(a)                       |
+--------------------------------------+
|{"_1":"2015-08-26T18:00:00.000-07:00"}|
+--------------------------------------+
```

After the fix, the output is like
```
+-------------------------+
|structtojson(a)          |
+-------------------------+
|{"_1":"26/08/2015 18:00"}|
+-------------------------+
```
### How was this patch tested?
Added test cases for both `from_json` and `to_json`

Author: gatorsmile <[email protected]>

Closes apache#16745 from gatorsmile/toJson.
## What changes were proposed in this pull request?

With extract `[[` or replace `[[<-`, the parameter `i` is a column index, that needs to be corrected in doc. Also a few minor updates: examples, links.

## How was this patch tested?

manual

Author: Felix Cheung <[email protected]>

Closes apache#16721 from felixcheung/rsubsetdoc.
## What changes were proposed in this pull request?

According to the discussion on apache#16281 which tried to upgrade toward Apache Parquet 1.9.0, Apache Spark community prefer to upgrade to 1.8.2 instead of 1.9.0. Now, Apache Parquet 1.8.2 is released officially last week on 26 Jan. We can use 1.8.2 now.

https://lists.apache.org/thread.html/af0c813f1419899289a336d96ec02b3bbeecaea23aa6ef69f435c142%3Cdev.parquet.apache.org%3E

This PR only aims to bump Parquet version to 1.8.2. It didn't touch any other codes.

## How was this patch tested?

Pass the existing tests and also manually by doing `./dev/test-dependencies.sh`.

Author: Dongjoon Hyun <[email protected]>

Closes apache#16751 from dongjoon-hyun/SPARK-19409.
@tdas tdas closed this Jan 31, 2017
tdas pushed a commit that referenced this pull request Sep 4, 2020
### What changes were proposed in this pull request?

To support formatted explain for AQE.

### Why are the changes needed?

AQE does not support formatted explain yet. It's good to support it for better user experience, debugging, etc.

Before:
```
== Physical Plan ==
AdaptiveSparkPlan (1)
+- * HashAggregate (unknown)
   +- CustomShuffleReader (unknown)
      +- ShuffleQueryStage (unknown)
         +- Exchange (unknown)
            +- * HashAggregate (unknown)
               +- * Project (unknown)
                  +- * BroadcastHashJoin Inner BuildRight (unknown)
                     :- * LocalTableScan (unknown)
                     +- BroadcastQueryStage (unknown)
                        +- BroadcastExchange (unknown)
                           +- LocalTableScan (unknown)

(1) AdaptiveSparkPlan
Output [4]: [k#7, count(v1)#32L, sum(v1)#33L, avg(v2)#34]
Arguments: HashAggregate(keys=[k#7], functions=[count(1), sum(cast(v1#8 as bigint)), avg(cast(v2#19 as bigint))]), AdaptiveExecutionContext(org.apache.spark.sql.SparkSession104ab57b), [PlanAdaptiveSubqueries(Map())], false
```

After:
```
== Physical Plan ==
 AdaptiveSparkPlan (14)
 +- * HashAggregate (13)
    +- CustomShuffleReader (12)
       +- ShuffleQueryStage (11)
          +- Exchange (10)
             +- * HashAggregate (9)
                +- * Project (8)
                   +- * BroadcastHashJoin Inner BuildRight (7)
                      :- * Project (2)
                      :  +- * LocalTableScan (1)
                      +- BroadcastQueryStage (6)
                         +- BroadcastExchange (5)
                            +- * Project (4)
                               +- * LocalTableScan (3)

 (1) LocalTableScan [codegen id : 2]
 Output [2]: [_1#x, _2#x]
 Arguments: [_1#x, _2#x]

 (2) Project [codegen id : 2]
 Output [2]: [_1#x AS k#x, _2#x AS v1#x]
 Input [2]: [_1#x, _2#x]

 (3) LocalTableScan [codegen id : 1]
 Output [2]: [_1#x, _2#x]
 Arguments: [_1#x, _2#x]

 (4) Project [codegen id : 1]
 Output [2]: [_1#x AS k#x, _2#x AS v2#x]
 Input [2]: [_1#x, _2#x]

 (5) BroadcastExchange
 Input [2]: [k#x, v2#x]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#x]

 (6) BroadcastQueryStage
 Output [2]: [k#x, v2#x]
 Arguments: 0

 (7) BroadcastHashJoin [codegen id : 2]
 Left keys [1]: [k#x]
 Right keys [1]: [k#x]
 Join condition: None

 (8) Project [codegen id : 2]
 Output [3]: [k#x, v1#x, v2#x]
 Input [4]: [k#x, v1#x, k#x, v2#x]

 (9) HashAggregate [codegen id : 2]
 Input [3]: [k#x, v1#x, v2#x]
 Keys [1]: [k#x]
 Functions [3]: [partial_count(1), partial_sum(cast(v1#x as bigint)), partial_avg(cast(v2#x as bigint))]
 Aggregate Attributes [4]: [count#xL, sum#xL, sum#x, count#xL]
 Results [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]

 (10) Exchange
 Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
 Arguments: hashpartitioning(k#x, 5), true, [id=#x]

 (11) ShuffleQueryStage
 Output [5]: [sum#xL, k#x, sum#x, count#xL, count#xL]
 Arguments: 1

 (12) CustomShuffleReader
 Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
 Arguments: coalesced

 (13) HashAggregate [codegen id : 3]
 Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
 Keys [1]: [k#x]
 Functions [3]: [count(1), sum(cast(v1#x as bigint)), avg(cast(v2#x as bigint))]
 Aggregate Attributes [3]: [count(1)#xL, sum(cast(v1#x as bigint))#xL, avg(cast(v2#x as bigint))#x]
 Results [4]: [k#x, count(1)#xL AS count(v1)#xL, sum(cast(v1#x as bigint))#xL AS sum(v1)#xL, avg(cast(v2#x as bigint))#x AS avg(v2)#x]

 (14) AdaptiveSparkPlan
 Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x]
 Arguments: isFinalPlan=true
```

### Does this PR introduce any user-facing change?

No, this should be new feature along with AQE in Spark 3.0.

### How was this patch tested?

Added a query file: `explain-aqe.sql` and a unit test.

Closes apache#28271 from Ngone51/support_formatted_explain_for_aqe.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
tdas pushed a commit that referenced this pull request Sep 4, 2020
…or its output partitioning

### What changes were proposed in this pull request?

Currently, the `BroadcastHashJoinExec`'s `outputPartitioning` only uses the streamed side's `outputPartitioning`. However, if the join type of `BroadcastHashJoinExec` is an inner-like join, the build side's info (the join keys) can be added to `BroadcastHashJoinExec`'s `outputPartitioning`.

 For example,
```Scala
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500")
val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2")
val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3")
val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4")

// join1 is a sort merge join.
val join1 = t1.join(t2, t1("i1") === t2("i2"))

// join2 is a broadcast join where t3 is broadcasted.
val join2 = join1.join(t3, join1("i1") === t3("i3"))

// Join on the column from the broadcasted side (i3).
val join3 = join2.join(t4, join2("i3") === t4("i4"))

join3.explain
```
You see that `Exchange hashpartitioning(i2#103, 200)` is introduced because there is no output partitioning info from the build side.
```
== Physical Plan ==
*(6) SortMergeJoin [i3#29], [i4#40], Inner
:- *(4) Sort [i3#29 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i3#29, 200), true, [id=apache#55]
:     +- *(3) BroadcastHashJoin [i1#7], [i3#29], Inner, BuildRight
:        :- *(3) SortMergeJoin [i1#7], [i2#18], Inner
:        :  :- *(1) Sort [i1#7 ASC NULLS FIRST], false, 0
:        :  :  +- Exchange hashpartitioning(i1#7, 200), true, [id=#28]
:        :  :     +- LocalTableScan [i1#7, j1#8]
:        :  +- *(2) Sort [i2#18 ASC NULLS FIRST], false, 0
:        :     +- Exchange hashpartitioning(i2#18, 200), true, [id=#29]
:        :        +- LocalTableScan [i2#18, j2#19]
:        +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#34]
:           +- LocalTableScan [i3#29, j3#30]
+- *(5) Sort [i4#40 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i4#40, 200), true, [id=apache#39]
      +- LocalTableScan [i4#40, j4#41]
```
This PR proposes to introduce output partitioning for the build side for `BroadcastHashJoinExec` if the streamed side has a `HashPartitioning` or a collection of `HashPartitioning`s.

There is a new internal config `spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit`, which can limit the number of partitioning a `HashPartitioning` can expand to. It can be set to "0" to disable this feature.

### Why are the changes needed?

To remove unnecessary shuffle.

### Does this PR introduce _any_ user-facing change?

Yes, now the shuffle in the above example can be eliminated:
```
== Physical Plan ==
*(5) SortMergeJoin [i3#108], [i4#119], Inner
:- *(3) Sort [i3#108 ASC NULLS FIRST], false, 0
:  +- *(3) BroadcastHashJoin [i1#86], [i3#108], Inner, BuildRight
:     :- *(3) SortMergeJoin [i1#86], [i2#97], Inner
:     :  :- *(1) Sort [i1#86 ASC NULLS FIRST], false, 0
:     :  :  +- Exchange hashpartitioning(i1#86, 200), true, [id=apache#120]
:     :  :     +- LocalTableScan [i1#86, j1#87]
:     :  +- *(2) Sort [i2#97 ASC NULLS FIRST], false, 0
:     :     +- Exchange hashpartitioning(i2#97, 200), true, [id=apache#121]
:     :        +- LocalTableScan [i2#97, j2#98]
:     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=apache#126]
:        +- LocalTableScan [i3#108, j3#109]
+- *(4) Sort [i4#119 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i4#119, 200), true, [id=apache#130]
      +- LocalTableScan [i4#119, j4#120]
```

### How was this patch tested?

Added new tests.

Closes apache#28676 from imback82/broadcast_join_output.

Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
tdas pushed a commit that referenced this pull request Sep 9, 2020
### What changes were proposed in this pull request?

To support formatted explain for AQE.

### Why are the changes needed?

AQE does not support formatted explain yet. It's good to support it for better user experience, debugging, etc.

Before:
```
== Physical Plan ==
AdaptiveSparkPlan (1)
+- * HashAggregate (unknown)
   +- CustomShuffleReader (unknown)
      +- ShuffleQueryStage (unknown)
         +- Exchange (unknown)
            +- * HashAggregate (unknown)
               +- * Project (unknown)
                  +- * BroadcastHashJoin Inner BuildRight (unknown)
                     :- * LocalTableScan (unknown)
                     +- BroadcastQueryStage (unknown)
                        +- BroadcastExchange (unknown)
                           +- LocalTableScan (unknown)

(1) AdaptiveSparkPlan
Output [4]: [k#7, count(v1)#32L, sum(v1)#33L, avg(v2)#34]
Arguments: HashAggregate(keys=[k#7], functions=[count(1), sum(cast(v1#8 as bigint)), avg(cast(v2#19 as bigint))]), AdaptiveExecutionContext(org.apache.spark.sql.SparkSession104ab57b), [PlanAdaptiveSubqueries(Map())], false
```

After:
```
== Physical Plan ==
 AdaptiveSparkPlan (14)
 +- * HashAggregate (13)
    +- CustomShuffleReader (12)
       +- ShuffleQueryStage (11)
          +- Exchange (10)
             +- * HashAggregate (9)
                +- * Project (8)
                   +- * BroadcastHashJoin Inner BuildRight (7)
                      :- * Project (2)
                      :  +- * LocalTableScan (1)
                      +- BroadcastQueryStage (6)
                         +- BroadcastExchange (5)
                            +- * Project (4)
                               +- * LocalTableScan (3)

 (1) LocalTableScan [codegen id : 2]
 Output [2]: [_1#x, _2#x]
 Arguments: [_1#x, _2#x]

 (2) Project [codegen id : 2]
 Output [2]: [_1#x AS k#x, _2#x AS v1#x]
 Input [2]: [_1#x, _2#x]

 (3) LocalTableScan [codegen id : 1]
 Output [2]: [_1#x, _2#x]
 Arguments: [_1#x, _2#x]

 (4) Project [codegen id : 1]
 Output [2]: [_1#x AS k#x, _2#x AS v2#x]
 Input [2]: [_1#x, _2#x]

 (5) BroadcastExchange
 Input [2]: [k#x, v2#x]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#x]

 (6) BroadcastQueryStage
 Output [2]: [k#x, v2#x]
 Arguments: 0

 (7) BroadcastHashJoin [codegen id : 2]
 Left keys [1]: [k#x]
 Right keys [1]: [k#x]
 Join condition: None

 (8) Project [codegen id : 2]
 Output [3]: [k#x, v1#x, v2#x]
 Input [4]: [k#x, v1#x, k#x, v2#x]

 (9) HashAggregate [codegen id : 2]
 Input [3]: [k#x, v1#x, v2#x]
 Keys [1]: [k#x]
 Functions [3]: [partial_count(1), partial_sum(cast(v1#x as bigint)), partial_avg(cast(v2#x as bigint))]
 Aggregate Attributes [4]: [count#xL, sum#xL, sum#x, count#xL]
 Results [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]

 (10) Exchange
 Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
 Arguments: hashpartitioning(k#x, 5), true, [id=#x]

 (11) ShuffleQueryStage
 Output [5]: [sum#xL, k#x, sum#x, count#xL, count#xL]
 Arguments: 1

 (12) CustomShuffleReader
 Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
 Arguments: coalesced

 (13) HashAggregate [codegen id : 3]
 Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
 Keys [1]: [k#x]
 Functions [3]: [count(1), sum(cast(v1#x as bigint)), avg(cast(v2#x as bigint))]
 Aggregate Attributes [3]: [count(1)#xL, sum(cast(v1#x as bigint))#xL, avg(cast(v2#x as bigint))#x]
 Results [4]: [k#x, count(1)#xL AS count(v1)#xL, sum(cast(v1#x as bigint))#xL AS sum(v1)#xL, avg(cast(v2#x as bigint))#x AS avg(v2)#x]

 (14) AdaptiveSparkPlan
 Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x]
 Arguments: isFinalPlan=true
```

### Does this PR introduce any user-facing change?

No, this should be new feature along with AQE in Spark 3.0.

### How was this patch tested?

Added a query file: `explain-aqe.sql` and a unit test.

Closes apache#28271 from Ngone51/support_formatted_explain_for_aqe.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 8fbfdb3)
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.