Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-34037
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn committed Jan 7, 2021
2 parents 4288380 + 9b5df2a commit 238cd1c
Show file tree
Hide file tree
Showing 95 changed files with 4,008 additions and 2,731 deletions.
9 changes: 5 additions & 4 deletions R/pkg/inst/worker/worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -196,25 +196,26 @@ if (isEmpty != 0) {
outputs <- list()
for (i in seq_len(length(data))) {
# Timing reading input data for execution
inputElap <- elapsedSecs()
computeStart <- elapsedSecs()
output <- compute(mode, partition, serializer, deserializer, keys[[i]],
colNames, computeFunc, data[[i]])
computeElap <- elapsedSecs()
if (serializer == "arrow") {
outputs[[length(outputs) + 1L]] <- output
} else {
outputResult(serializer, output, outputCon)
outputComputeElapsDiff <- outputComputeElapsDiff + (elapsedSecs() - computeElap)
}
outputElap <- elapsedSecs()
computeInputElapsDiff <- computeInputElapsDiff + (computeElap - inputElap)
outputComputeElapsDiff <- outputComputeElapsDiff + (outputElap - computeElap)
computeInputElapsDiff <- computeInputElapsDiff + (computeElap - computeStart)
}

if (serializer == "arrow") {
# See https://stat.ethz.ch/pipermail/r-help/2010-September/252046.html
# rbind.fill might be an alternative to make it faster if plyr is installed.
outputStart <- elapsedSecs()
combined <- do.call("rbind", outputs)
SparkR:::writeSerializeInArrow(outputCon, combined)
outputComputeElapsDiff <- elapsedSecs() - outputStart
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,12 @@ private[spark] class AppStatusListener(

// Implicitly exclude every available executor for the stage associated with this node
Option(liveStages.get((stageId, stageAttemptId))).foreach { stage =>
val executorIds = liveExecutors.values.filter(_.host == hostId).map(_.executorId).toSeq
val executorIds = liveExecutors.values.filter(exec => exec.host == hostId
&& exec.executorId != SparkContext.DRIVER_IDENTIFIER).map(_.executorId).toSeq
setStageExcludedStatus(stage, now, executorIds: _*)
}
liveExecutors.values.filter(_.hostname == hostId).foreach { exec =>
liveExecutors.values.filter(exec => exec.hostname == hostId
&& exec.executorId != SparkContext.DRIVER_IDENTIFIER).foreach { exec =>
addExcludedStageTo(exec, stageId, now)
}
}
Expand Down Expand Up @@ -416,7 +418,7 @@ private[spark] class AppStatusListener(

// Implicitly (un)exclude every executor associated with the node.
liveExecutors.values.foreach { exec =>
if (exec.hostname == host) {
if (exec.hostname == host && exec.executorId != SparkContext.DRIVER_IDENTIFIER) {
updateExecExclusionStatus(exec, excluded, now)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"totalInputBytes" : 0,
"totalShuffleRead" : 0,
"totalShuffleWrite" : 0,
"isBlacklisted" : true,
"isBlacklisted" : false,
"maxMemory" : 908381388,
"addTime" : "2016-11-16T22:33:31.477GMT",
"executorLogs" : { },
Expand All @@ -30,7 +30,7 @@
"attributes" : { },
"resources" : { },
"resourceProfileId" : 0,
"isExcluded" : true,
"isExcluded" : false,
"excludedInStages" : [ ]
}, {
"id" : "3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"totalInputBytes" : 0,
"totalShuffleRead" : 0,
"totalShuffleWrite" : 0,
"isBlacklisted" : true,
"isBlacklisted" : false,
"maxMemory" : 908381388,
"addTime" : "2016-11-16T22:33:31.477GMT",
"executorLogs" : { },
Expand All @@ -30,7 +30,7 @@
"attributes" : { },
"resources" : { },
"resourceProfileId" : 0,
"isExcluded" : true,
"isExcluded" : false,
"excludedInStages" : [ ]
}, {
"id" : "3",
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ curator-recipes/2.7.1//curator-recipes-2.7.1.jar
datanucleus-api-jdo/4.2.4//datanucleus-api-jdo-4.2.4.jar
datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar
datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar
derby/10.12.1.1//derby-10.12.1.1.jar
derby/10.14.2.0//derby-10.14.2.0.jar
dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar
flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar
generex/1.0.2//generex-1.0.2.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3.2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ curator-recipes/2.13.0//curator-recipes-2.13.0.jar
datanucleus-api-jdo/4.2.4//datanucleus-api-jdo-4.2.4.jar
datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar
datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar
derby/10.12.1.1//derby-10.12.1.1.jar
derby/10.14.2.0//derby-10.14.2.0.jar
dnsjava/2.1.7//dnsjava-2.1.7.jar
dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar
ehcache/3.3.1//ehcache-3.3.1.jar
Expand Down
135 changes: 129 additions & 6 deletions docs/sql-data-sources-orc.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,115 @@ license: |
limitations under the License.
---

Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files.
To do that, the following configurations are newly added. The vectorized reader is used for the
native ORC tables (e.g., the ones created using the clause `USING ORC`) when `spark.sql.orc.impl`
is set to `native` and `spark.sql.orc.enableVectorizedReader` is set to `true`. For the Hive ORC
serde tables (e.g., the ones created using the clause `USING HIVE OPTIONS (fileFormat 'ORC')`),
the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is also set to `true`.
* Table of contents
{:toc}

[Apache ORC](https://orc.apache.org) is a columnar format which has more advanced features like native zstd compression, bloom filter and columnar encryption.

### ORC Implementation

Spark supports two ORC implementations (`native` and `hive`) which is controlled by `spark.sql.orc.impl`.
Two implementations share most functionalities with different design goals.
- `native` implementation is designed to follow Spark's data source behavior like `Parquet`.
- `hive` implementation is designed to follow Hive's behavior and uses Hive SerDe.

For example, historically, `native` implementation handles `CHAR/VARCHAR` with Spark's native `String` while `hive` implementation handles it via Hive `CHAR/VARCHAR`. The query results are different. Since Spark 3.1.0, [SPARK-33480](https://issues.apache.org/jira/browse/SPARK-33480) removes this difference by supporting `CHAR/VARCHAR` from Spark-side.

### Vectorized Reader

`native` implementation supports a vectorized ORC reader and has been the default ORC implementaion since Spark 2.3.
The vectorized reader is used for the native ORC tables (e.g., the ones created using the clause `USING ORC`) when `spark.sql.orc.impl` is set to `native` and `spark.sql.orc.enableVectorizedReader` is set to `true`.
For the Hive ORC serde tables (e.g., the ones created using the clause `USING HIVE OPTIONS (fileFormat 'ORC')`),
the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is also set to `true`, and is turned on by default.

### Schema Merging

Like Protocol Buffer, Avro, and Thrift, ORC also supports schema evolution. Users can start with
a simple schema, and gradually add more columns to the schema as needed. In this way, users may end
up with multiple ORC files with different but mutually compatible schemas. The ORC data
source is now able to automatically detect this case and merge schemas of all these files.

Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we
turned it off by default . You may enable it by

1. setting data source option `mergeSchema` to `true` when reading ORC files, or
2. setting the global SQL option `spark.sql.orc.mergeSchema` to `true`.

### Zstandard

Spark supports both Hadoop 2 and 3. Since Spark 3.2, you can take advantage
of Zstandard compression in ORC files on both Hadoop versions.
Please see [Zstandard](https://facebook.github.io/zstd/) for the benefits.

<div class="codetabs">
<div data-lang="SQL" markdown="1">

{% highlight sql %}
CREATE TABLE compressed (
key STRING,
value STRING
)
USING ORC
OPTIONS (
compression 'zstd'
)
{% endhighlight %}
</div>
</div>

### Bloom Filters

You can control bloom filters and dictionary encodings for ORC data sources. The following ORC example will create bloom filter and use dictionary encoding only for `favorite_color`. To find more detailed information about the extra ORC options, visit the official Apache ORC websites.

<div class="codetabs">
<div data-lang="SQL" markdown="1">

{% highlight sql %}
CREATE TABLE users_with_options (
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
)
USING ORC
OPTIONS (
orc.bloom.filter.columns 'favorite_color',
orc.dictionary.key.threshold '1.0',
orc.column.encoding.direct 'name'
)
{% endhighlight %}
</div>
</div>

### Columnar Encryption

Since Spark 3.2, columnar encryption is supported for ORC tables with Apache ORC 1.6.
The following example is using Hadoop KMS as a key provider with the given location.
Please visit [Apache Hadoop KMS](https://hadoop.apache.org/docs/current/hadoop-kms/index.html) for the detail.

<div class="codetabs">
<div data-lang="SQL" markdown="1">
{% highlight sql %}
CREATE TABLE encrypted (
ssn STRING,
email STRING,
name STRING
)
USING ORC
OPTIONS (
hadoop.security.key.provider.path "kms://http@localhost:9600/kms",
orc.key.provider "hadoop",
orc.encrypt "pii:ssn,email",
orc.mask "nullify:ssn;sha256:email"
)
{% endhighlight %}
</div>
</div>

### Hive metastore ORC table conversion

When reading from Hive metastore ORC tables and inserting to Hive metastore ORC tables, Spark SQL will try to use its own ORC support instead of Hive SerDe for better performance. For CTAS statement, only non-partitioned Hive metastore ORC tables are converted. This behavior is controlled by the `spark.sql.hive.convertMetastoreOrc` configuration, and is turned on by default.

### Configuration

<table class="table">
<tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th><th><b>Since Version</b></th></tr>
Expand All @@ -48,4 +151,24 @@ the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is also
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.sql.orc.mergeSchema</code></td>
<td>false</td>
<td>
<p>
When true, the ORC data source merges schemas collected from all data files,
otherwise the schema is picked from a random data file.
</p>
</td>
<td>3.0.0</td>
</tr>
<tr>
<td><code>spark.sql.hive.convertMetastoreOrc</code></td>
<td>true</td>
<td>
When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of the built in
support.
</td>
<td>2.0.0</td>
</tr>
</table>
2 changes: 1 addition & 1 deletion docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ license: |

- In Spark 3.2, `ALTER TABLE .. RENAME TO PARTITION` throws `PartitionAlreadyExistsException` instead of `AnalysisException` for tables from Hive external when the target partition already exists.

- In Spark 3.2, script transform default FIELD DELIMIT is `\u0001` for no serde mode. In Spark 3.1 or earlier, the default FIELD DELIMIT is `\t`.
- In Spark 3.2, script transform default FIELD DELIMIT is `\u0001` for no serde mode, serde property `field.delim` is `\t` for Hive serde mode when user specifies serde. In Spark 3.1 or earlier, the default FIELD DELIMIT is `\t`, serde property `field.delim` is `\u0001` for Hive serde mode when user specifies serde.

## Upgrading from Spark SQL 3.0 to 3.1

Expand Down
60 changes: 59 additions & 1 deletion docs/sql-ref-syntax-qry-select-like.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ license: |

### Description

A LIKE predicate is used to search for a specific pattern.
A LIKE predicate is used to search for a specific pattern. This predicate also supports multiple patterns with quantifiers include `ANY`, `SOME` and `ALL`.

### Syntax

```sql
[ NOT ] { LIKE search_pattern [ ESCAPE esc_char ] | [ RLIKE | REGEXP ] regex_pattern }

[ NOT ] { LIKE quantifiers ( search_pattern [ , ... ]) }
```

### Parameters
Expand All @@ -45,6 +47,10 @@ A LIKE predicate is used to search for a specific pattern.
* **regex_pattern**

Specifies a regular expression search pattern to be searched by the `RLIKE` or `REGEXP` clause.

* **quantifiers**

Specifies the predicate quantifiers include `ANY`, `SOME` and `ALL`. `ANY` or `SOME` means if one of the patterns matches the input, then return true; `ALL` means if all the patterns matches the input, then return true.

### Examples

Expand Down Expand Up @@ -111,6 +117,58 @@ SELECT * FROM person WHERE name LIKE '%$_%' ESCAPE '$';
+---+------+---+
|500|Evan_W| 16|
+---+------+---+

SELECT * FROM person WHERE name LIKE ALL ('%an%', '%an');
+---+----+----+
| id|name| age|
+---+----+----+
|400| Dan| 50|
+---+----+----+

SELECT * FROM person WHERE name LIKE ANY ('%an%', '%an');
+---+------+---+
| id| name|age|
+---+------+---+
|400| Dan| 50|
|500|Evan_W| 16|
+---+------+---+

SELECT * FROM person WHERE name LIKE SOME ('%an%', '%an');
+---+------+---+
| id| name|age|
+---+------+---+
|400| Dan| 50|
|500|Evan_W| 16|
+---+------+---+

SELECT * FROM person WHERE name NOT LIKE ALL ('%an%', '%an');
+---+----+----+
| id|name| age|
+---+----+----+
|100|John| 30|
|200|Mary|null|
|300|Mike| 80|
+---+----+----+

SELECT * FROM person WHERE name NOT LIKE ANY ('%an%', '%an');
+---+------+----+
| id| name| age|
+---+------+----+
|100| John| 30|
|200| Mary|null|
|300| Mike| 80|
|500|Evan_W| 16|
+---+------+----+

SELECT * FROM person WHERE name NOT LIKE SOME ('%an%', '%an');
+---+------+----+
| id| name| age|
+---+------+----+
|100| John| 30|
|200| Mary|null|
|300| Mike| 80|
|500|Evan_W| 16|
+---+------+----+
```

### Related Statements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ private[spark] object KafkaTokenUtil extends Logging {
sparkConf: SparkConf,
params: ju.Map[String, Object],
clusterConfig: Option[KafkaTokenClusterConf]): Boolean = {
if (HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka") &&
clusterConfig.isDefined && params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
if (clusterConfig.isDefined && params.containsKey(SaslConfigs.SASL_JAAS_CONFIG) &&
HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka")) {
logDebug("Delegation token used by connector, checking if uses the latest token.")
val connectorJaasParams = params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String]
getTokenJaasParams(clusterConfig.get) != connectorJaasParams
Expand Down
11 changes: 9 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@
<hive.version.short>2.3</hive.version.short>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
<kafka.version>2.6.0</kafka.version>
<derby.version>10.12.1.1</derby.version>
<!-- After 10.15.1.3, the minimum required version is JDK9 -->
<derby.version>10.14.2.0</derby.version>
<parquet.version>1.10.1</parquet.version>
<orc.version>1.6.6</orc.version>
<jetty.version>9.4.34.v20201102</jetty.version>
Expand Down Expand Up @@ -2468,7 +2469,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
<version>4.3.0</version>
<executions>
<execution>
<id>eclipse-add-source</id>
Expand Down Expand Up @@ -3371,6 +3372,12 @@
<properties>
<leveldbjni.group>org.openlabtesting.leveldbjni</leveldbjni.group>
</properties>
<activation>
<os>
<family>linux</family>
<arch>aarch64</arch>
</os>
</activation>
</profile>
</profiles>
</project>
Loading

0 comments on commit 238cd1c

Please sign in to comment.