Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23186][SQL] Initialize DriverManager first before loading JDBC Drivers #20359

Closed
wants to merge 2 commits into from
Closed

[SPARK-23186][SQL] Initialize DriverManager first before loading JDBC Drivers #20359

wants to merge 2 commits into from

Conversation

dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Jan 23, 2018

What changes were proposed in this pull request?

Since some JDBC Drivers have class initialization code to call DriverManager, we need to initialize DriverManager first in order to avoid potential executor-side deadlock situations like the following (or STORM-2527).

Thread 9587: (state = BLOCKED)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance0(java.lang.reflect.Constructor, java.lang.Object[]) @bci=0 (Compiled frame; information may be imprecise)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance(java.lang.Object[]) @bci=85, line=62 (Compiled frame)
 - sun.reflect.DelegatingConstructorAccessorImpl.newInstance(java.lang.Object[]) @bci=5, line=45 (Compiled frame)
 - java.lang.reflect.Constructor.newInstance(java.lang.Object[]) @bci=79, line=423 (Compiled frame)
 - java.lang.Class.newInstance() @bci=138, line=442 (Compiled frame)
 - java.util.ServiceLoader$LazyIterator.nextService() @bci=119, line=380 (Interpreted frame)
 - java.util.ServiceLoader$LazyIterator.next() @bci=11, line=404 (Interpreted frame)
 - java.util.ServiceLoader$1.next() @bci=37, line=480 (Interpreted frame)
 - java.sql.DriverManager$2.run() @bci=21, line=603 (Interpreted frame)
 - java.sql.DriverManager$2.run() @bci=1, line=583 (Interpreted frame)
 - java.security.AccessController.doPrivileged(java.security.PrivilegedAction) @bci=0 (Compiled frame)
 - java.sql.DriverManager.loadInitialDrivers() @bci=27, line=583 (Interpreted frame)
 - java.sql.DriverManager.<clinit>() @bci=32, line=101 (Interpreted frame)
 - org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(java.lang.String, java.lang.Integer, java.lang.String, java.util.Properties) @bci=12, line=98 (Interpreted frame)
 - org.apache.phoenix.mapreduce.util.ConnectionUtil.getInputConnection(org.apache.hadoop.conf.Configuration, java.util.Properties) @bci=22, line=57 (Interpreted frame)
 - org.apache.phoenix.mapreduce.PhoenixInputFormat.getQueryPlan(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.conf.Configuration) @bci=61, line=116 (Interpreted frame)
 - org.apache.phoenix.mapreduce.PhoenixInputFormat.createRecordReader(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext) @bci=10, line=71 (Interpreted frame)
 - org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(org.apache.spark.rdd.NewHadoopRDD, org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=233, line=156 (Interpreted frame)

Thread 9170: (state = BLOCKED)
 - org.apache.phoenix.jdbc.PhoenixDriver.<clinit>() @bci=35, line=125 (Interpreted frame)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance0(java.lang.reflect.Constructor, java.lang.Object[]) @bci=0 (Compiled frame)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance(java.lang.Object[]) @bci=85, line=62 (Compiled frame)
 - sun.reflect.DelegatingConstructorAccessorImpl.newInstance(java.lang.Object[]) @bci=5, line=45 (Compiled frame)
 - java.lang.reflect.Constructor.newInstance(java.lang.Object[]) @bci=79, line=423 (Compiled frame)
 - java.lang.Class.newInstance() @bci=138, line=442 (Compiled frame)
 - org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(java.lang.String) @bci=89, line=46 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply() @bci=7, line=53 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply() @bci=1, line=52 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.<init>(org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD, org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=81, line=347 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=7, line=339 (Interpreted frame)

How was this patch tested?

N/A

@SparkQA
Copy link

SparkQA commented Jan 23, 2018

Test build #86514 has finished for PR 20359 at commit 234a637.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jan 23, 2018

Test build #86530 has finished for PR 20359 at commit 234a637.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -32,6 +32,9 @@ import org.apache.spark.util.Utils
*/
object DriverRegistry extends Logging {

// Initialize DriverManager first to prevent potential deadlocks between DriverManager and Driver
DriverManager.getDrivers
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @rxin .
Could you give me some comments on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More context about this change? Based on your PR description, this is to resolve the deadlocks among executors? How does it work after applying this change?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jan 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the same situation like STORM in the PR description and this occurs in Spark, too.
In the Spark executor, the stacks shows the deadlock between DriverManager and Driver.

  • Pheonix library call DriverManager.loadInitialDrivers()
  • Spark DriverRegistry call PhoenixDriver constructor before DriverManager created.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, so far, I only have this log only. It's difficult to reproduce this deadlock.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you test it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a test; otherwise, it is not the right thing to merge such a PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean a unit test case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if you can merge this PR if I can test this patch in somewhere of our customer cluster. :)

Copy link
Contributor

@cloud-fan cloud-fan Jan 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's too hard to write a UT, I think a manual test is also fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @cloud-fan .
So far, this deadlock situation is reported intermittently without any logs.

@dongjoon-hyun
Copy link
Member Author

Ping, @gatorsmile and @cloud-fan .

@@ -32,6 +32,9 @@ import org.apache.spark.util.Utils
*/
object DriverRegistry extends Logging {

// Initialize DriverManager first to prevent potential deadlocks between DriverManager and Driver
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to say more about why this can avoid deadlock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can copy something from the storm PR: https://github.com/apache/storm/pull/2134/files

@dongjoon-hyun
Copy link
Member Author

Thank you for review, @cloud-fan and @gatorsmile .
So far, there is no test case to generate this kind of Spark job hung (in executor-side).
I fully understand your viewpoints. I'll try this way in our customer production environment first after internal QE. Since this is intermittent deadlock situation, we don't know whether this is clearly removed or not. But, we can monitor that at least.

@SparkQA
Copy link

SparkQA commented Feb 3, 2018

Test build #87020 has finished for PR 20359 at commit 52e6f19.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

It's a well-known irrelevant failure.

  • org.apache.spark.sql.kafka010.KafkaContinuousSourceStressForDontFailOnDataLossSuite.stress test for failOnDataLoss=false

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Feb 3, 2018

Test build #87025 has finished for PR 20359 at commit 52e6f19.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Retest this please

@SparkQA
Copy link

SparkQA commented Feb 3, 2018

Test build #87036 has finished for PR 20359 at commit 52e6f19.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

FYI, this is tested on a production cluster for two weeks and the deadlock issue is not reported until now.

@dongjoon-hyun
Copy link
Member Author

Thank you for review and approval, @srowen .

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.3!

asfgit pushed a commit that referenced this pull request Feb 9, 2018
… Drivers

## What changes were proposed in this pull request?

Since some JDBC Drivers have class initialization code to call `DriverManager`, we need to initialize `DriverManager` first in order to avoid potential executor-side **deadlock** situations like the following (or [STORM-2527](https://issues.apache.org/jira/browse/STORM-2527)).

```
Thread 9587: (state = BLOCKED)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance0(java.lang.reflect.Constructor, java.lang.Object[]) bci=0 (Compiled frame; information may be imprecise)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance(java.lang.Object[]) bci=85, line=62 (Compiled frame)
 - sun.reflect.DelegatingConstructorAccessorImpl.newInstance(java.lang.Object[]) bci=5, line=45 (Compiled frame)
 - java.lang.reflect.Constructor.newInstance(java.lang.Object[]) bci=79, line=423 (Compiled frame)
 - java.lang.Class.newInstance() bci=138, line=442 (Compiled frame)
 - java.util.ServiceLoader$LazyIterator.nextService() bci=119, line=380 (Interpreted frame)
 - java.util.ServiceLoader$LazyIterator.next() bci=11, line=404 (Interpreted frame)
 - java.util.ServiceLoader$1.next() bci=37, line=480 (Interpreted frame)
 - java.sql.DriverManager$2.run() bci=21, line=603 (Interpreted frame)
 - java.sql.DriverManager$2.run() bci=1, line=583 (Interpreted frame)
 - java.security.AccessController.doPrivileged(java.security.PrivilegedAction) bci=0 (Compiled frame)
 - java.sql.DriverManager.loadInitialDrivers() bci=27, line=583 (Interpreted frame)
 - java.sql.DriverManager.<clinit>() bci=32, line=101 (Interpreted frame)
 - org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(java.lang.String, java.lang.Integer, java.lang.String, java.util.Properties) bci=12, line=98 (Interpreted frame)
 - org.apache.phoenix.mapreduce.util.ConnectionUtil.getInputConnection(org.apache.hadoop.conf.Configuration, java.util.Properties) bci=22, line=57 (Interpreted frame)
 - org.apache.phoenix.mapreduce.PhoenixInputFormat.getQueryPlan(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.conf.Configuration) bci=61, line=116 (Interpreted frame)
 - org.apache.phoenix.mapreduce.PhoenixInputFormat.createRecordReader(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext) bci=10, line=71 (Interpreted frame)
 - org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(org.apache.spark.rdd.NewHadoopRDD, org.apache.spark.Partition, org.apache.spark.TaskContext) bci=233, line=156 (Interpreted frame)

Thread 9170: (state = BLOCKED)
 - org.apache.phoenix.jdbc.PhoenixDriver.<clinit>() bci=35, line=125 (Interpreted frame)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance0(java.lang.reflect.Constructor, java.lang.Object[]) bci=0 (Compiled frame)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance(java.lang.Object[]) bci=85, line=62 (Compiled frame)
 - sun.reflect.DelegatingConstructorAccessorImpl.newInstance(java.lang.Object[]) bci=5, line=45 (Compiled frame)
 - java.lang.reflect.Constructor.newInstance(java.lang.Object[]) bci=79, line=423 (Compiled frame)
 - java.lang.Class.newInstance() bci=138, line=442 (Compiled frame)
 - org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(java.lang.String) bci=89, line=46 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply() bci=7, line=53 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply() bci=1, line=52 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.<init>(org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD, org.apache.spark.Partition, org.apache.spark.TaskContext) bci=81, line=347 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) bci=7, line=339 (Interpreted frame)
```

## How was this patch tested?

N/A

Author: Dongjoon Hyun <[email protected]>

Closes #20359 from dongjoon-hyun/SPARK-23186.

(cherry picked from commit 8cbcc33)
Signed-off-by: Wenchen Fan <[email protected]>
@asfgit asfgit closed this in 8cbcc33 Feb 9, 2018
@dongjoon-hyun
Copy link
Member Author

Thank you for merging, @cloud-fan .
And thank you again, @HyukjinKwon , @gatorsmile , and @srowen !

@dongjoon-hyun dongjoon-hyun deleted the SPARK-23186 branch February 9, 2018 06:05
@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Feb 9, 2018

@cloud-fan . Can we have this in branch-2.2, too? Although it's cherry-pickable, I'll make a PR if you want to trigger the Jenkins test.

@cloud-fan
Copy link
Contributor

let's send a new PR

@dongjoon-hyun
Copy link
Member Author

Thank you!

robert3005 pushed a commit to palantir/spark that referenced this pull request Feb 12, 2018
… Drivers

## What changes were proposed in this pull request?

Since some JDBC Drivers have class initialization code to call `DriverManager`, we need to initialize `DriverManager` first in order to avoid potential executor-side **deadlock** situations like the following (or [STORM-2527](https://issues.apache.org/jira/browse/STORM-2527)).

```
Thread 9587: (state = BLOCKED)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance0(java.lang.reflect.Constructor, java.lang.Object[]) bci=0 (Compiled frame; information may be imprecise)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance(java.lang.Object[]) bci=85, line=62 (Compiled frame)
 - sun.reflect.DelegatingConstructorAccessorImpl.newInstance(java.lang.Object[]) bci=5, line=45 (Compiled frame)
 - java.lang.reflect.Constructor.newInstance(java.lang.Object[]) bci=79, line=423 (Compiled frame)
 - java.lang.Class.newInstance() bci=138, line=442 (Compiled frame)
 - java.util.ServiceLoader$LazyIterator.nextService() bci=119, line=380 (Interpreted frame)
 - java.util.ServiceLoader$LazyIterator.next() bci=11, line=404 (Interpreted frame)
 - java.util.ServiceLoader$1.next() bci=37, line=480 (Interpreted frame)
 - java.sql.DriverManager$2.run() bci=21, line=603 (Interpreted frame)
 - java.sql.DriverManager$2.run() bci=1, line=583 (Interpreted frame)
 - java.security.AccessController.doPrivileged(java.security.PrivilegedAction) bci=0 (Compiled frame)
 - java.sql.DriverManager.loadInitialDrivers() bci=27, line=583 (Interpreted frame)
 - java.sql.DriverManager.<clinit>() bci=32, line=101 (Interpreted frame)
 - org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(java.lang.String, java.lang.Integer, java.lang.String, java.util.Properties) bci=12, line=98 (Interpreted frame)
 - org.apache.phoenix.mapreduce.util.ConnectionUtil.getInputConnection(org.apache.hadoop.conf.Configuration, java.util.Properties) bci=22, line=57 (Interpreted frame)
 - org.apache.phoenix.mapreduce.PhoenixInputFormat.getQueryPlan(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.conf.Configuration) bci=61, line=116 (Interpreted frame)
 - org.apache.phoenix.mapreduce.PhoenixInputFormat.createRecordReader(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext) bci=10, line=71 (Interpreted frame)
 - org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(org.apache.spark.rdd.NewHadoopRDD, org.apache.spark.Partition, org.apache.spark.TaskContext) bci=233, line=156 (Interpreted frame)

Thread 9170: (state = BLOCKED)
 - org.apache.phoenix.jdbc.PhoenixDriver.<clinit>() bci=35, line=125 (Interpreted frame)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance0(java.lang.reflect.Constructor, java.lang.Object[]) bci=0 (Compiled frame)
 - sun.reflect.NativeConstructorAccessorImpl.newInstance(java.lang.Object[]) bci=85, line=62 (Compiled frame)
 - sun.reflect.DelegatingConstructorAccessorImpl.newInstance(java.lang.Object[]) bci=5, line=45 (Compiled frame)
 - java.lang.reflect.Constructor.newInstance(java.lang.Object[]) bci=79, line=423 (Compiled frame)
 - java.lang.Class.newInstance() bci=138, line=442 (Compiled frame)
 - org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(java.lang.String) bci=89, line=46 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply() bci=7, line=53 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply() bci=1, line=52 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.<init>(org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD, org.apache.spark.Partition, org.apache.spark.TaskContext) bci=81, line=347 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) bci=7, line=339 (Interpreted frame)
```

## How was this patch tested?

N/A

Author: Dongjoon Hyun <[email protected]>

Closes apache#20359 from dongjoon-hyun/SPARK-23186.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants