From c82d2fd1124d0fc0671ccf6316c22cd540f3ec87 Mon Sep 17 00:00:00 2001 From: Amit Salunke Date: Sat, 24 Feb 2024 14:28:56 -0800 Subject: [PATCH] configure BlobServiceClient to use workloadIdentity --- plugins/repository-azure/build.gradle | 8 +- .../azure/AzureRepositoryPlugin.java | 1 + .../azure/AzureStorageService.java | 46 +++++++- .../azure/AzureStorageSettings.java | 104 ++++++++++-------- 4 files changed, 109 insertions(+), 50 deletions(-) diff --git a/plugins/repository-azure/build.gradle b/plugins/repository-azure/build.gradle index 51f2057b4bedb..ca89074e5d5f5 100644 --- a/plugins/repository-azure/build.gradle +++ b/plugins/repository-azure/build.gradle @@ -44,7 +44,7 @@ opensearchplugin { } dependencies { - api 'com.azure:azure-core:1.39.0' + api 'com.azure:azure-core:1.46.0' api 'com.azure:azure-json:1.0.1' api 'com.azure:azure-storage-common:12.21.2' api 'com.azure:azure-core-http-netty:1.12.8' @@ -55,7 +55,11 @@ dependencies { api "io.netty:netty-resolver-dns:${versions.netty}" api "io.netty:netty-transport-native-unix-common:${versions.netty}" implementation project(':modules:transport-netty4') - api 'com.azure:azure-storage-blob:12.23.0' + api 'com.azure:azure-storage-blob:12.25.1' + api 'com.azure:azure-identity:1.11.2' + api 'com.microsoft.azure:msal4j:1.14.2' + api 'com.nimbusds:oauth2-oidc-sdk:11.10' + api 'net.minidev:json-smart:1.0.6.3' api "io.projectreactor.netty:reactor-netty-core:${versions.reactor_netty}" api "io.projectreactor.netty:reactor-netty-http:${versions.reactor_netty}" api "org.slf4j:slf4j-api:${versions.slf4j}" diff --git a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepositoryPlugin.java b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepositoryPlugin.java index 78db7cb2d0ea7..543e603c45ebf 100644 --- a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepositoryPlugin.java +++ b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepositoryPlugin.java @@ -91,6 +91,7 @@ public List> getSettings() { AzureStorageSettings.ACCOUNT_SETTING, AzureStorageSettings.KEY_SETTING, AzureStorageSettings.SAS_TOKEN_SETTING, + AzureStorageSettings.FEDERATED_TOKEN_FILE_SETTING, AzureStorageSettings.ENDPOINT_SUFFIX_SETTING, AzureStorageSettings.TIMEOUT_SETTING, AzureStorageSettings.MAX_RETRIES_SETTING, diff --git a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureStorageService.java index 74edd4f3eb23c..3c0e88033d761 100644 --- a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureStorageService.java @@ -32,6 +32,7 @@ package org.opensearch.repositories.azure; +import com.azure.core.credential.TokenCredential; import com.azure.core.http.HttpPipelineCallContext; import com.azure.core.http.HttpPipelineNextPolicy; import com.azure.core.http.HttpPipelinePosition; @@ -40,11 +41,15 @@ import com.azure.core.http.ProxyOptions; import com.azure.core.http.netty.NettyAsyncHttpClientBuilder; import com.azure.core.http.policy.HttpPipelinePolicy; +import com.azure.core.util.ClientOptions; import com.azure.core.util.Configuration; import com.azure.core.util.Context; import com.azure.core.util.logging.ClientLogger; +import com.azure.identity.WorkloadIdentityCredential; +import com.azure.identity.WorkloadIdentityCredentialBuilder; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.storage.blob.models.ParallelTransferOptions; import com.azure.storage.blob.specialized.BlockBlobAsyncClient; import com.azure.storage.common.implementation.connectionstring.StorageConnectionString; @@ -61,12 +66,17 @@ import java.net.Authenticator; import java.net.PasswordAuthentication; +import java.net.URI; import java.net.URISyntaxException; +import java.security.AccessController; import java.security.InvalidKeyException; +import java.security.PrivilegedExceptionAction; import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -216,8 +226,12 @@ protected PasswordAuthentication getPasswordAuthentication() { * migration guide for mode details: */ private BlobServiceClientBuilder applyLocationMode(final BlobServiceClientBuilder builder, final AzureStorageSettings settings) { - final StorageConnectionString storageConnectionString = StorageConnectionString.create(settings.getConnectString(), logger); - final StorageEndpoint endpoint = storageConnectionString.getBlobEndpoint(); + StorageEndpoint endpoint; + if (settings.useWorkloadIdentity()) { + endpoint = new StorageEndpoint(URI.create(settings.getEndpoint())); + } else { + endpoint = StorageConnectionString.create(settings.getConnectString(), logger).getBlobEndpoint(); + } if (endpoint == null || endpoint.getPrimaryUri() == null) { throw new IllegalArgumentException("connectionString missing required settings to derive blob service primary endpoint."); @@ -247,9 +261,31 @@ private BlobServiceClientBuilder applyLocationMode(final BlobServiceClientBuilde return builder; } - private static BlobServiceClientBuilder createClientBuilder(AzureStorageSettings settings) throws InvalidKeyException, - URISyntaxException { - return SocketAccess.doPrivilegedException(() -> new BlobServiceClientBuilder().connectionString(settings.getConnectString())); + private static BlobServiceClientBuilder createClientBuilder(AzureStorageSettings settings) + throws InvalidKeyException, + URISyntaxException { + return SocketAccess.doPrivilegedException(() -> { + PrivilegedExceptionAction privilegedAction = Executors::newSingleThreadExecutor; + ExecutorService executorService = AccessController.doPrivileged(privilegedAction); + BlobServiceClientBuilder b = + new BlobServiceClientBuilder() + .credential(new DefaultAzureCredentialBuilder().executorService(executorService).build()); + + if (settings.useWorkloadIdentity()) { + TokenCredential workloadIdentityCredential = + new WorkloadIdentityCredentialBuilder() + .tokenFilePath(settings.getFederatedTokenFile()) + .executorService(executorService) + .build(); + b.endpoint(settings.getEndpoint()) + .credential(workloadIdentityCredential); + } else if (!settings.getConnectString().isEmpty()) { + b.connectionString(settings.getConnectString()); + } + + return b; + } + ); } /** diff --git a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureStorageSettings.java b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureStorageSettings.java index e73ded679cf2b..fdefafb601a20 100644 --- a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureStorageSettings.java +++ b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureStorageSettings.java @@ -70,11 +70,19 @@ final class AzureStorageSettings { key -> SecureSetting.secureString(key, null) ); + /** Azure web identity token */ + public static final AffixSetting FEDERATED_TOKEN_FILE_SETTING = Setting.affixKeySetting( + AZURE_CLIENT_PREFIX_KEY, + "federated_token_file", + key -> Setting.simpleString(key, Property.NodeScope), + () -> ACCOUNT_SETTING + ); + /** Azure SAS token */ public static final AffixSetting SAS_TOKEN_SETTING = Setting.affixKeySetting( - AZURE_CLIENT_PREFIX_KEY, - "sas_token", - key -> SecureSetting.secureString(key, null) + AZURE_CLIENT_PREFIX_KEY, + "sas_token", + key -> SecureSetting.secureString(key, null) ); /** max_retries: Number of retries in case of Azure errors. Defaults to 3 (RetryPolicy.DEFAULT_CLIENT_RETRY_COUNT). */ @@ -82,8 +90,7 @@ final class AzureStorageSettings { AZURE_CLIENT_PREFIX_KEY, "max_retries", (key) -> Setting.intSetting(key, 3, Setting.Property.NodeScope), - () -> ACCOUNT_SETTING, - () -> KEY_SETTING + () -> ACCOUNT_SETTING ); /** * Azure endpoint suffix. Default to core.windows.net (CloudStorageAccount.DEFAULT_DNS). @@ -100,8 +107,7 @@ final class AzureStorageSettings { AZURE_CLIENT_PREFIX_KEY, "timeout", (key) -> Setting.timeSetting(key, TimeValue.timeValueMinutes(-1), Property.NodeScope), - () -> ACCOUNT_SETTING, - () -> KEY_SETTING + () -> ACCOUNT_SETTING ); // See please NettyAsyncHttpClientBuilder#DEFAULT_CONNECT_TIMEOUT @@ -109,8 +115,7 @@ final class AzureStorageSettings { AZURE_CLIENT_PREFIX_KEY, "connect.timeout", (key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(10), Property.NodeScope), - () -> ACCOUNT_SETTING, - () -> KEY_SETTING + () -> ACCOUNT_SETTING ); // See please NettyAsyncHttpClientBuilder#DEFAULT_WRITE_TIMEOUT @@ -118,8 +123,7 @@ final class AzureStorageSettings { AZURE_CLIENT_PREFIX_KEY, "write.timeout", (key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(60), Property.NodeScope), - () -> ACCOUNT_SETTING, - () -> KEY_SETTING + () -> ACCOUNT_SETTING ); // See please NettyAsyncHttpClientBuilder#DEFAULT_READ_TIMEOUT @@ -127,8 +131,7 @@ final class AzureStorageSettings { AZURE_CLIENT_PREFIX_KEY, "read.timeout", (key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(60), Property.NodeScope), - () -> ACCOUNT_SETTING, - () -> KEY_SETTING + () -> ACCOUNT_SETTING ); // See please NettyAsyncHttpClientBuilder#DEFAULT_RESPONSE_TIMEOUT @@ -136,8 +139,7 @@ final class AzureStorageSettings { AZURE_CLIENT_PREFIX_KEY, "response.timeout", (key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(60), Property.NodeScope), - () -> ACCOUNT_SETTING, - () -> KEY_SETTING + () -> ACCOUNT_SETTING ); /** The type of the proxy to connect to azure through. Can be direct (no proxy, default), http or socks */ @@ -145,8 +147,7 @@ final class AzureStorageSettings { AZURE_CLIENT_PREFIX_KEY, "proxy.type", (key) -> new Setting<>(key, "direct", s -> ProxySettings.ProxyType.valueOf(s.toUpperCase(Locale.ROOT)), Property.NodeScope), - () -> ACCOUNT_SETTING, - () -> KEY_SETTING + () -> ACCOUNT_SETTING ); /** The host name of a proxy to connect to azure through. */ @@ -154,7 +155,6 @@ final class AzureStorageSettings { AZURE_CLIENT_PREFIX_KEY, "proxy.host", (key) -> Setting.simpleString(key, Property.NodeScope), - () -> KEY_SETTING, () -> ACCOUNT_SETTING, () -> PROXY_TYPE_SETTING ); @@ -164,7 +164,6 @@ final class AzureStorageSettings { AZURE_CLIENT_PREFIX_KEY, "proxy.port", (key) -> Setting.intSetting(key, 0, 0, 65535, Setting.Property.NodeScope), - () -> KEY_SETTING, () -> ACCOUNT_SETTING, () -> PROXY_TYPE_SETTING, () -> PROXY_HOST_SETTING @@ -175,7 +174,6 @@ final class AzureStorageSettings { AZURE_CLIENT_PREFIX_KEY, "proxy.username", key -> SecureSetting.secureString(key, null), - () -> KEY_SETTING, () -> ACCOUNT_SETTING, () -> PROXY_TYPE_SETTING, () -> PROXY_HOST_SETTING @@ -186,7 +184,6 @@ final class AzureStorageSettings { AZURE_CLIENT_PREFIX_KEY, "proxy.password", key -> SecureSetting.secureString(key, null), - () -> KEY_SETTING, () -> ACCOUNT_SETTING, () -> PROXY_TYPE_SETTING, () -> PROXY_HOST_SETTING, @@ -195,6 +192,7 @@ final class AzureStorageSettings { private final String account; private final String connectString; + private final String federatedTokenFile; private final String endpointSuffix; private final TimeValue timeout; private final int maxRetries; @@ -207,20 +205,22 @@ final class AzureStorageSettings { // copy-constructor private AzureStorageSettings( - String account, - String connectString, - String endpointSuffix, - TimeValue timeout, - int maxRetries, - LocationMode locationMode, - TimeValue connectTimeout, - TimeValue writeTimeout, - TimeValue readTimeout, - TimeValue responseTimeout, - ProxySettings proxySettings + String account, + String connectString, + String federatedTokenFile, + String endpointSuffix, + TimeValue timeout, + int maxRetries, + LocationMode locationMode, + TimeValue connectTimeout, + TimeValue writeTimeout, + TimeValue readTimeout, + TimeValue responseTimeout, + ProxySettings proxySettings ) { this.account = account; this.connectString = connectString; + this.federatedTokenFile = federatedTokenFile; this.endpointSuffix = endpointSuffix; this.timeout = timeout; this.maxRetries = maxRetries; @@ -235,6 +235,7 @@ private AzureStorageSettings( private AzureStorageSettings( String account, String key, + String federatedTokenFile, String sasToken, String endpointSuffix, TimeValue timeout, @@ -246,7 +247,8 @@ private AzureStorageSettings( ProxySettings proxySettings ) { this.account = account; - this.connectString = buildConnectString(account, key, sasToken, endpointSuffix); + this.connectString = buildConnectString(account, key, sasToken, federatedTokenFile, endpointSuffix); + this.federatedTokenFile = federatedTokenFile; this.endpointSuffix = endpointSuffix; this.timeout = timeout; this.maxRetries = maxRetries; @@ -278,20 +280,32 @@ public String getConnectString() { return connectString; } - private static String buildConnectString(String account, @Nullable String key, @Nullable String sasToken, String endpointSuffix) { + public String getFederatedTokenFile() { + return federatedTokenFile; + } + + public Boolean useWorkloadIdentity() { + return !federatedTokenFile.isEmpty(); + } + + private static String buildConnectString(String account, @Nullable String key, @Nullable String sasToken, @Nullable String federatedTokenFile, String endpointSuffix) { final boolean hasSasToken = Strings.hasText(sasToken); final boolean hasKey = Strings.hasText(key); - if (hasSasToken == false && hasKey == false) { - throw new SettingsException("Neither a secret key nor a shared access token was set."); - } + final boolean hasFederatedTokenFile = Strings.hasText(federatedTokenFile); if (hasSasToken && hasKey) { throw new SettingsException("Both a secret as well as a shared access token were set."); + } else if (hasSasToken && hasFederatedTokenFile) { + throw new SettingsException("Both a shared access token as well as an azure federated token file were set."); + } else if (hasKey && hasFederatedTokenFile) { + throw new SettingsException("Both a secret as well as an azure federated token file were set."); } final StringBuilder connectionStringBuilder = new StringBuilder(); - connectionStringBuilder.append("DefaultEndpointsProtocol=https").append(";AccountName=").append(account); + if (hasSasToken || hasKey) { + connectionStringBuilder.append("DefaultEndpointsProtocol=https").append(";AccountName=").append(account); + } if (hasKey) { connectionStringBuilder.append(";AccountKey=").append(key); - } else { + } else if (hasSasToken) { connectionStringBuilder.append(";SharedAccessSignature=").append(sasToken); } if (Strings.hasText(endpointSuffix)) { @@ -300,6 +314,10 @@ private static String buildConnectString(String account, @Nullable String key, @ return connectionStringBuilder.toString(); } + public String getEndpoint() { + return "https://" + account + ".blob." + endpointSuffix; + } + public LocationMode getLocationMode() { return locationMode; } @@ -369,6 +387,7 @@ private static AzureStorageSettings getClientSettings(Settings settings, String return new AzureStorageSettings( account.toString(), key.toString(), + getValue(settings, clientName, FEDERATED_TOKEN_FILE_SETTING), sasToken.toString(), getValue(settings, clientName, ENDPOINT_SUFFIX_SETTING), getValue(settings, clientName, TIMEOUT_SETTING), @@ -377,8 +396,7 @@ private static AzureStorageSettings getClientSettings(Settings settings, String getValue(settings, clientName, WRITE_TIMEOUT_SETTING), getValue(settings, clientName, READ_TIMEOUT_SETTING), getValue(settings, clientName, RESPONSE_TIMEOUT_SETTING), - validateAndCreateProxySettings(settings, clientName) - ); + validateAndCreateProxySettings(settings, clientName)); } } @@ -431,6 +449,7 @@ static Map overrideLocationMode( new AzureStorageSettings( entry.getValue().account, entry.getValue().connectString, + entry.getValue().federatedTokenFile, entry.getValue().endpointSuffix, entry.getValue().timeout, entry.getValue().maxRetries, @@ -439,8 +458,7 @@ static Map overrideLocationMode( entry.getValue().writeTimeout, entry.getValue().readTimeout, entry.getValue().responseTimeout, - entry.getValue().getProxySettings() - ) + entry.getValue().getProxySettings()) ); } return mapBuilder.immutableMap();