From dc763a8b513272ff6a229c455a380a527ee9a3b3 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Wed, 22 Jan 2025 23:47:06 +0100 Subject: [PATCH] Switch Azure FS http client to netty OkHttp is abandoned at this stage so it's better to switch to maintained alternative --- lib/trino-filesystem-azure/pom.xml | 39 ++++++------------- .../azure/AzureFileSystemFactory.java | 39 ++++++++----------- plugin/trino-delta-lake/pom.xml | 10 +++-- .../TestDeltaLakeAdlsConnectorSmokeTest.java | 11 ++---- pom.xml | 6 +++ 5 files changed, 43 insertions(+), 62 deletions(-) diff --git a/lib/trino-filesystem-azure/pom.xml b/lib/trino-filesystem-azure/pom.xml index e67100cd4db4..eabe63f5f5e3 100644 --- a/lib/trino-filesystem-azure/pom.xml +++ b/lib/trino-filesystem-azure/pom.xml @@ -24,7 +24,7 @@ com.azure - azure-core-http-okhttp + azure-core-http-netty @@ -36,10 +36,6 @@ com.azure azure-identity - - com.azure - azure-core-http-netty - com.nimbusds oauth2-oidc-sdk @@ -54,34 +50,16 @@ com.azure azure-storage-blob - - - com.azure - azure-core-http-netty - - com.azure azure-storage-common - - - com.azure - azure-core-http-netty - - com.azure azure-storage-file-datalake - - - com.azure - azure-core-http-netty - - @@ -94,11 +72,6 @@ guice - - com.squareup.okhttp3 - okhttp - - io.airlift configuration @@ -109,11 +82,21 @@ units + + io.netty + netty-transport + + io.opentelemetry opentelemetry-api + + io.projectreactor.netty + reactor-netty-core + + io.trino trino-filesystem diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java index 95f7a5cb1d06..a05b71198a66 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java @@ -14,22 +14,19 @@ package io.trino.filesystem.azure; import com.azure.core.http.HttpClient; -import com.azure.core.http.okhttp.OkHttpAsyncHttpClientBuilder; +import com.azure.core.http.netty.NettyAsyncHttpClientBuilder; import com.azure.core.tracing.opentelemetry.OpenTelemetryTracingOptions; import com.azure.core.util.HttpClientOptions; import com.azure.core.util.TracingOptions; import com.google.inject.Inject; import io.airlift.units.DataSize; +import io.netty.channel.nio.NioEventLoopGroup; import io.opentelemetry.api.OpenTelemetry; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.spi.security.ConnectorIdentity; import jakarta.annotation.PreDestroy; -import okhttp3.ConnectionPool; -import okhttp3.Dispatcher; -import okhttp3.OkHttpClient; - -import java.util.concurrent.TimeUnit; +import reactor.netty.resources.ConnectionProvider; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -44,8 +41,8 @@ public class AzureFileSystemFactory private final int maxWriteConcurrency; private final DataSize maxSingleUploadSize; private final TracingOptions tracingOptions; - private final OkHttpClient okHttpClient; private final HttpClient httpClient; + private final ConnectionProvider connectionProvider; @Inject public AzureFileSystemFactory(OpenTelemetry openTelemetry, AzureAuth azureAuth, AzureFileSystemConfig config) @@ -80,24 +77,20 @@ public AzureFileSystemFactory( this.maxWriteConcurrency = maxWriteConcurrency; this.maxSingleUploadSize = requireNonNull(maxSingleUploadSize, "maxSingleUploadSize is null"); this.tracingOptions = new OpenTelemetryTracingOptions().setOpenTelemetry(openTelemetry); - - Dispatcher dispatcher = new Dispatcher(); - dispatcher.setMaxRequests(maxHttpRequests); - dispatcher.setMaxRequestsPerHost(maxHttpRequests); - okHttpClient = new OkHttpClient.Builder() - .dispatcher(dispatcher) - .build(); + this.connectionProvider = ConnectionProvider.create(applicationId, maxHttpRequests); HttpClientOptions clientOptions = new HttpClientOptions(); clientOptions.setTracingOptions(tracingOptions); clientOptions.setApplicationId(applicationId); - httpClient = createAzureHttpClient(okHttpClient, clientOptions); + clientOptions.setMaximumConnectionPoolSize(maxHttpRequests); + httpClient = createAzureHttpClient(connectionProvider, clientOptions); } @PreDestroy public void destroy() { - okHttpClient.dispatcher().executorService().shutdownNow(); - okHttpClient.connectionPool().evictAll(); + if (connectionProvider != null) { + connectionProvider.dispose(); + } } @Override @@ -106,20 +99,20 @@ public TrinoFileSystem create(ConnectorIdentity identity) return new AzureFileSystem(httpClient, tracingOptions, auth, endpoint, readBlockSize, writeBlockSize, maxWriteConcurrency, maxSingleUploadSize); } - public static HttpClient createAzureHttpClient(OkHttpClient okHttpClient, HttpClientOptions clientOptions) + public static HttpClient createAzureHttpClient(ConnectionProvider connectionProvider, HttpClientOptions clientOptions) { Integer poolSize = clientOptions.getMaximumConnectionPoolSize(); - // By default, OkHttp uses a maximum idle connection count of 5. int maximumConnectionPoolSize = (poolSize != null && poolSize > 0) ? poolSize : 5; + clientOptions.setMaximumConnectionPoolSize(maximumConnectionPoolSize); - return new OkHttpAsyncHttpClientBuilder(okHttpClient) + return new NettyAsyncHttpClientBuilder() .proxy(clientOptions.getProxyOptions()) .configuration(clientOptions.getConfiguration()) - .connectionTimeout(clientOptions.getConnectTimeout()) + .connectTimeout(clientOptions.getConnectTimeout()) .writeTimeout(clientOptions.getWriteTimeout()) .readTimeout(clientOptions.getReadTimeout()) - .connectionPool(new ConnectionPool(maximumConnectionPoolSize, - clientOptions.getConnectionIdleTimeout().toMillis(), TimeUnit.MILLISECONDS)) + .connectionProvider(connectionProvider) + .eventLoopGroup(new NioEventLoopGroup(maximumConnectionPoolSize)) // default is 2 * availableProcessors .build(); } } diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml index efc785b7db2e..7fad2e4291e0 100644 --- a/plugin/trino-delta-lake/pom.xml +++ b/plugin/trino-delta-lake/pom.xml @@ -260,10 +260,6 @@ azure-storage-blob runtime - - com.azure - azure-core-http-netty - com.fasterxml.jackson.dataformat jackson-dataformat-xml @@ -289,6 +285,12 @@ runtime + + io.projectreactor.netty + reactor-netty-core + runtime + + io.trino trino-filesystem-azure diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java index 8c1e94a1addb..1ef73182529a 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java @@ -25,9 +25,9 @@ import com.google.common.reflect.ClassPath; import io.trino.plugin.hive.containers.HiveHadoop; import io.trino.testing.QueryRunner; -import okhttp3.OkHttpClient; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.TestInstance; +import reactor.netty.resources.ConnectionProvider; import java.io.IOException; import java.io.UncheckedIOException; @@ -77,13 +77,10 @@ protected HiveHadoop createHiveHadoop() throws Exception { String connectionString = format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.windows.net", account, accessKey); - OkHttpClient okHttpClient = new OkHttpClient.Builder().build(); - closeAfterClass(() -> { - okHttpClient.dispatcher().executorService().shutdownNow(); - okHttpClient.connectionPool().evictAll(); - }); + ConnectionProvider provider = ConnectionProvider.create("TestDeltaLakeAdsl"); + closeAfterClass(provider::dispose); BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().connectionString(connectionString) - .httpClient(createAzureHttpClient(okHttpClient, new HttpClientOptions())) + .httpClient(createAzureHttpClient(provider, new HttpClientOptions())) .buildClient(); this.azureContainerClient = blobServiceClient.getBlobContainerClient(container); diff --git a/pom.xml b/pom.xml index f92830c9b173..5791fcd2814b 100644 --- a/pom.xml +++ b/pom.xml @@ -945,6 +945,12 @@ 3.7.2 + + io.projectreactor.netty + reactor-netty-core + 1.1.21 + + io.swagger.core.v3