Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix linked non-cluster-default blob storage not working for HDI cluster issue #4827

Merged
merged 2 commits into from
Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,27 +66,10 @@ class ArcadiaSparkBatchRunner : SparkBatchJobRunner() {
compute.workSpace.webUrl
)

if (submitModel.jobUploadStorageModel.storageAccountType == SparkSubmitStorageType.BLOB) {
val fsRoot = WasbUri.parse(arcadiaModel.jobUploadStorageModel.uploadPath
?: throw ExecutionException("No uploading path set in Run Configuration"))
val storageKey = arcadiaModel.jobUploadStorageModel.storageKey
val configEntry = submitModel.submissionParameter.jobConfig[SparkSubmissionParameter.Conf]
val wrappedConfig = if (configEntry != null && configEntry is java.util.Map<*, *>) {
SparkConfigures(configEntry)
} else {
SparkConfigures()
}
submitModel.submissionParameter.jobConfig[SparkSubmissionParameter.Conf] =
wrappedConfig.apply {
put("spark.hadoop.fs.azure.account.key.${fsRoot.storageAccount}.blob.core.windows.net",
storageKey)
}
}

val jobDeploy = SparkBatchJobDeployFactory.getInstance().buildSparkBatchJobDeploy(submitModel, compute)

ArcadiaSparkBatchJob(
prepareSubmissionParameterWithTransformedGen2Uri(submitModel.submissionParameter),
updateStorageConfigForSubmissionParameter(submitModel),
submission,
jobDeploy)
}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ class ArisSparkBatchRunner : SparkBatchJobRunner() {
// Livy release notes: https://livy.apache.org/history/
// JIRA: https://issues.apache.org/jira/browse/LIVY-41
submitModel.submissionParameter.apply {
name = mainClassName + "_$currentUtcTime"
prepareSubmissionParameterWithTransformedGen2Uri(this)
updateStorageConfigForSubmissionParameter(submitModel).apply { name = mainClassName + "_$currentUtcTime" }
},
SparkBatchSubmission.getInstance(),
jobDeploy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class CosmosServerlessSparkBatchRunner : SparkBatchJobRunner() {
return "CosmosServerlessSparkBatchRun"
}

override fun prepareSubmissionParameterWithTransformedGen2Uri(parameter: SparkSubmissionParameter): SparkSubmissionParameter {
return CreateSparkBatchJobParameters.copyOf(parameter as CreateSparkBatchJobParameters).apply {
override fun updateStorageConfigForSubmissionParameter(submitModel: SparkSubmitModel): SparkSubmissionParameter {
return CreateSparkBatchJobParameters.copyOf(submitModel.submissionParameter as CreateSparkBatchJobParameters).apply {
referencedJars = this.referencedJars.stream()
.map { transformToGen2Uri(it) }
.collect(Collectors.toList())
Expand All @@ -70,7 +70,7 @@ class CosmosServerlessSparkBatchRunner : SparkBatchJobRunner() {
CosmosServerlessSparkBatchJob(
account,
AdlsDeploy(storageRootPath, accessToken),
prepareSubmissionParameterWithTransformedGen2Uri(submissionParameter) as CreateSparkBatchJobParameters,
updateStorageConfigForSubmissionParameter(submitModel) as CreateSparkBatchJobParameters,
SparkBatchSubmission.getInstance())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@ package com.microsoft.azure.hdinsight.spark.run

import com.intellij.execution.ExecutionException
import com.intellij.execution.configurations.RunProfile
import com.microsoft.azure.hdinsight.common.MessageInfoType
import com.microsoft.azure.hdinsight.sdk.common.azure.serverless.AzureSparkCosmosClusterManager
import com.microsoft.azure.hdinsight.spark.common.*
import com.microsoft.azure.hdinsight.spark.run.configuration.CosmosSparkRunConfiguration
import rx.Observable
import rx.Observable.just
import rx.Observer
import java.net.URI
import java.util.AbstractMap.SimpleImmutableEntry

class CosmosSparkBatchRunner : SparkBatchJobRunner() {
override fun canRun(executorId: String, profile: RunProfile): Boolean {
Expand All @@ -55,7 +52,7 @@ class CosmosSparkBatchRunner : SparkBatchJobRunner() {
?.let { just(URI.create(it)) }
?: clusterDetail.get().map { it.livyUri } }
.map { livyUri -> CosmosSparkBatchJob(
prepareSubmissionParameterWithTransformedGen2Uri(submitModel.submissionParameter),
updateStorageConfigForSubmissionParameter(submitModel),
SparkBatchAzureSubmission(tenantId, accountName, clusterId, livyUri))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.intellij.openapi.project.Project;
import com.microsoft.azure.hdinsight.common.AbfsUri;
import com.microsoft.azure.hdinsight.common.ClusterManagerEx;
import com.microsoft.azure.hdinsight.common.WasbUri;
import com.microsoft.azure.hdinsight.common.logger.ILogger;
import com.microsoft.azure.hdinsight.sdk.cluster.IClusterDetail;
import com.microsoft.azure.hdinsight.spark.common.*;
Expand All @@ -59,6 +60,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UnknownFormatConversionException;
import java.util.stream.Collectors;

import static com.microsoft.azure.hdinsight.spark.common.SparkBatchSubmission.getClusterSubmission;
Expand All @@ -85,19 +87,45 @@ protected String transformToGen2Uri(String url) {
: url;
}

// If we use virtual file system to select referenced jars or files on ADLS Gen2 storage, the selected file path will
// be of URI schema which starts with "https://". Then job submission will fail with error like
// "Server returned HTTP response code: 401 for URL: https://accountName.dfs.core.windows.net/fs0/Reference.jar"
// Therefore, we need to transform the Gen2 "https" URI to "abfs" url to avoid the error.
protected SparkSubmissionParameter prepareSubmissionParameterWithTransformedGen2Uri(SparkSubmissionParameter parameter) {
final SparkSubmissionParameter newParameter = SparkSubmissionParameter.copyOf(parameter);
newParameter.setReferencedJars(newParameter.getReferencedJars().stream()
protected SparkSubmissionParameter updateStorageConfigForSubmissionParameter(SparkSubmitModel submitModel) throws ExecutionException {
// If we use virtual file system to select referenced jars or files on ADLS Gen2 storage, the selected file path will
// be of URI schema which starts with "https://". Then job submission will fail with error like
// "Server returned HTTP response code: 401 for URL: https://accountName.dfs.core.windows.net/fs0/Reference.jar"
// Therefore, we need to transform the Gen2 "https" URI to "abfs" url to avoid the error.
final SparkSubmissionParameter submissionParameter = submitModel.getSubmissionParameter();
submissionParameter.setReferencedJars(submissionParameter.getReferencedJars().stream()
.map(this::transformToGen2Uri)
.collect(Collectors.toList()));
newParameter.setReferencedFiles(newParameter.getReferencedFiles().stream()
submissionParameter.setReferencedFiles(submissionParameter.getReferencedFiles().stream()
.map(this::transformToGen2Uri)
.collect(Collectors.toList()));
return newParameter;

// If job upload storage type is Azure Blob storage, we need to put blob storage credential into livy configuration
if (submitModel.getJobUploadStorageModel().getStorageAccountType() == SparkSubmitStorageType.BLOB) {
try {
final WasbUri fsRoot = WasbUri.parse(submitModel.getJobUploadStorageModel().getUploadPath());
final String storageKey = submitModel.getJobUploadStorageModel().getStorageKey();
final Object existingConfigEntry = submissionParameter.getJobConfig().get(SparkSubmissionParameter.Conf);
final SparkConfigures wrappedConfig = existingConfigEntry instanceof Map
? new SparkConfigures(existingConfigEntry)
: new SparkConfigures();
wrappedConfig.put("spark.hadoop." + fsRoot.getHadoopBlobFsPropertyKey(), storageKey);
submissionParameter.getJobConfig().put(SparkSubmissionParameter.Conf, wrappedConfig);
} catch (final UnknownFormatConversionException error) {
final String errorHint = "Azure blob storage uploading path is not in correct format";
log().warn(String.format("%s. Uploading Path: %s. Error message: %s. Stacktrace:\n%s",
errorHint, submitModel.getJobUploadStorageModel().getUploadPath(), error.getMessage(),
ExceptionUtils.getStackTrace(error)));
throw new ExecutionException(errorHint);
} catch (final Exception error) {
final String errorHint = "Failed to update config for linked Azure Blob storage";
log().warn(String.format("%s. Error message: %s. Stacktrace:\n%s",
errorHint, error.getMessage(), ExceptionUtils.getStackTrace(error)));
throw new ExecutionException(errorHint);
}
}

return submissionParameter;
}

@Override
Expand All @@ -123,8 +151,7 @@ public Observable<ISparkBatchJob> buildSparkBatchJob(@NotNull SparkSubmitModel s
final Deployable jobDeploy = SparkBatchJobDeployFactory.getInstance().buildSparkBatchJobDeploy(
submitModel, clusterDetail);

final SparkSubmissionParameter submissionParameter =
prepareSubmissionParameterWithTransformedGen2Uri(submitModel.getSubmissionParameter());
final SparkSubmissionParameter submissionParameter = updateStorageConfigForSubmissionParameter(submitModel);

updateCurrentBackgroundableTaskIndicator(progressIndicator -> {
progressIndicator.setFraction(1.0f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.microsoft.azure.hdinsight.sdk.rest.IConvertible;
import com.microsoft.azuretools.azurecommons.helpers.NotNull;
import com.microsoft.azuretools.azurecommons.helpers.Nullable;
Expand Down Expand Up @@ -123,10 +121,10 @@ public static SparkSubmissionParameter copyOf(SparkSubmissionParameter parameter
parameter.getLocalArtifactPath(),
parameter.getFile(),
parameter.getMainClassName(),
ImmutableList.copyOf(parameter.getReferencedFiles()),
ImmutableList.copyOf(parameter.getReferencedJars()),
ImmutableList.copyOf(parameter.getArgs()),
ImmutableMap.copyOf(parameter.getJobConfig()));
new ArrayList<>(parameter.getReferencedFiles()),
new ArrayList<>(parameter.getReferencedJars()),
new ArrayList<>(parameter.getArgs()),
new HashMap<>(parameter.getJobConfig()));
copiedParameter.setName(parameter.getName());
return copiedParameter;
}
Expand Down