Skip to content

Commit

Permalink
Fix job upload path display error issue
Browse files Browse the repository at this point in the history
  • Loading branch information
t-rufang committed Oct 23, 2018
1 parent e719ae0 commit 4c966b7
Showing 1 changed file with 30 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import com.microsoft.azure.hdinsight.common.ClusterManagerEx
import com.microsoft.azure.hdinsight.common.logger.ILogger
import com.microsoft.azure.hdinsight.sdk.cluster.IClusterDetail
import com.microsoft.azure.hdinsight.sdk.common.azure.serverless.AzureSparkServerlessCluster
import com.microsoft.azure.hdinsight.sdk.storage.ADLSStorageAccount
import com.microsoft.azure.hdinsight.sdk.storage.HDStorageAccount
import com.microsoft.azure.hdinsight.sdk.storage.IHDIStorageAccount
import com.microsoft.azure.hdinsight.spark.common.SparkSubmitJobUploadStorageModel
import com.microsoft.azure.hdinsight.spark.common.SparkSubmitStorageType
import com.microsoft.tooling.msservices.helpers.azure.sdk.StorageClientSDKManager
Expand Down Expand Up @@ -131,14 +134,19 @@ abstract class SparkSubmissionJobUploadStorageCtrl(val view: SparkSubmissionJobU
try {
clusterDetail.getConfigurationInfo()
val defaultStorageAccount = clusterDetail.storageAccount
val defaultContainerOrRootPath = defaultStorageAccount.defaultContainerOrRootPath
if (defaultStorageAccount != null && defaultContainerOrRootPath != null) {
errorMsg = null
uploadPath = if (clusterDetail is AzureSparkServerlessCluster) getAzureDataLakeStoragePath(defaultContainerOrRootPath) else getAzureBlobStoragePath(defaultStorageAccount.name, defaultContainerOrRootPath)
storageAccountType = SparkSubmitStorageType.DEFAULT_STORAGE_ACCOUNT
} else {
errorMsg = "Cluster have no storage account or storage container"
if (defaultStorageAccount == null) {
errorMsg = "Cluster have no storage account"
uploadPath = "-"
} else {
val path = getUploadPath(defaultStorageAccount)
if (path == null) {
errorMsg = "Error getting upload path from storage account"
uploadPath = "-"
} else {
errorMsg = null
uploadPath = path
storageAccountType = SparkSubmitStorageType.DEFAULT_STORAGE_ACCOUNT
}
}
} catch (ex: Exception) {
errorMsg = "Error getting cluster storage configuration"
Expand All @@ -152,7 +160,7 @@ abstract class SparkSubmissionJobUploadStorageCtrl(val view: SparkSubmissionJobU
uploadPath = "-"
errorMsg = "Azure Blob storage form is not completed"
} else {
uploadPath = getAzureBlobStoragePath(storageAccount, containersModel.selectedItem as String)
uploadPath = getAzureBlobStoragePath(ClusterManagerEx.getInstance().getBlobFullName(storageAccount), containersModel.selectedItem as String)
errorMsg = null
}
storageAccountType = SparkSubmitStorageType.BLOB
Expand Down Expand Up @@ -189,7 +197,7 @@ abstract class SparkSubmissionJobUploadStorageCtrl(val view: SparkSubmissionJobU
containersModel = DefaultComboBoxModel(containers)
containersModel.selectedItem = containersModel.getElementAt(0)
selectedContainer = containersModel.getElementAt(0)
uploadPath = getAzureBlobStoragePath(storageAccount, selectedContainer)
uploadPath = getAzureBlobStoragePath(ClusterManagerEx.getInstance().getBlobFullName(storageAccount), selectedContainer)
val credentialAccount = getCredentialAzureBlobAccount()
credentialAccount?.let {
view.secureStore?.savePassword(credentialAccount, storageAccount, storageKey) }
Expand Down Expand Up @@ -222,7 +230,7 @@ abstract class SparkSubmissionJobUploadStorageCtrl(val view: SparkSubmissionJobU
} else {
toUpdate.apply {
val selectedContainer = toUpdate.containersModel.selectedItem as String
uploadPath = getAzureBlobStoragePath(storageAccount, selectedContainer)
uploadPath = getAzureBlobStoragePath(ClusterManagerEx.getInstance().getBlobFullName(storageAccount), selectedContainer)
errorMsg = null
}
}
Expand All @@ -235,17 +243,18 @@ abstract class SparkSubmissionJobUploadStorageCtrl(val view: SparkSubmissionJobU
}
}

private fun getAzureBlobStoragePath(storageAccountName: String?, container: String?): String? {
return if (StringUtils.isEmpty(storageAccountName) || StringUtils.isEmpty(container)) null else
"wasbs://$container@${ClusterManagerEx.getInstance().getBlobFullName(storageAccountName)}/SparkSubmission/"
}

private fun getRootPathEndsWithSlash(rootPath: String): String {
return if (rootPath.endsWith("/")) rootPath else "$rootPath/"
private fun getAzureBlobStoragePath(fullStorageBlobName: String?, container: String?): String? {
return if (StringUtils.isBlank(fullStorageBlobName) || StringUtils.isBlank(container)) null else
"wasbs://$container@$fullStorageBlobName/SparkSubmission/"
}

private fun getAzureDataLakeStoragePath(rootPath: String?): String? {
return if (StringUtils.isEmpty(rootPath)) null else
"${getRootPathEndsWithSlash(rootPath.orEmpty())}SparkSubmission/"
}
private fun getUploadPath(account: IHDIStorageAccount): String? =
when (account) {
is HDStorageAccount -> getAzureBlobStoragePath(account.fullStorageBlobName, account.defaultContainer)
is ADLSStorageAccount ->
if (StringUtils.isBlank(account.name) || StringUtils.isBlank(account.defaultContainerOrRootPath)) null
else "adl://${account.name}.azuredatalakestore.net${account.defaultContainerOrRootPath}SparkSubmission/"
is AzureSparkServerlessCluster.StorageAccount -> account.defaultContainerOrRootPath?.let { "${it}SparkSubmission/" }
else -> null
}
}

0 comments on commit 4c966b7

Please sign in to comment.