Skip to content

Commit

Permalink
Switch Azure FS http client to netty
Browse files Browse the repository at this point in the history
OkHttp is abandoned at this stage so it's better to switch to maintained alternative
  • Loading branch information
wendigo committed Jan 23, 2025
1 parent 8c85bd9 commit dc763a8
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 62 deletions.
39 changes: 11 additions & 28 deletions lib/trino-filesystem-azure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-http-okhttp</artifactId>
<artifactId>azure-core-http-netty</artifactId>
</dependency>

<dependency>
Expand All @@ -36,10 +36,6 @@
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<exclusions>
<exclusion>
<groupId>com.azure</groupId>
<artifactId>azure-core-http-netty</artifactId>
</exclusion>
<exclusion>
<groupId>com.nimbusds</groupId>
<artifactId>oauth2-oidc-sdk</artifactId>
Expand All @@ -54,34 +50,16 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<exclusions>
<exclusion>
<groupId>com.azure</groupId>
<artifactId>azure-core-http-netty</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-common</artifactId>
<exclusions>
<exclusion>
<groupId>com.azure</groupId>
<artifactId>azure-core-http-netty</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-file-datalake</artifactId>
<exclusions>
<exclusion>
<groupId>com.azure</groupId>
<artifactId>azure-core-http-netty</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand All @@ -94,11 +72,6 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>configuration</artifactId>
Expand All @@ -109,11 +82,21 @@
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-core</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,19 @@
package io.trino.filesystem.azure;

import com.azure.core.http.HttpClient;
import com.azure.core.http.okhttp.OkHttpAsyncHttpClientBuilder;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.core.tracing.opentelemetry.OpenTelemetryTracingOptions;
import com.azure.core.util.HttpClientOptions;
import com.azure.core.util.TracingOptions;
import com.google.inject.Inject;
import io.airlift.units.DataSize;
import io.netty.channel.nio.NioEventLoopGroup;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.spi.security.ConnectorIdentity;
import jakarta.annotation.PreDestroy;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.OkHttpClient;

import java.util.concurrent.TimeUnit;
import reactor.netty.resources.ConnectionProvider;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
Expand All @@ -44,8 +41,8 @@ public class AzureFileSystemFactory
private final int maxWriteConcurrency;
private final DataSize maxSingleUploadSize;
private final TracingOptions tracingOptions;
private final OkHttpClient okHttpClient;
private final HttpClient httpClient;
private final ConnectionProvider connectionProvider;

@Inject
public AzureFileSystemFactory(OpenTelemetry openTelemetry, AzureAuth azureAuth, AzureFileSystemConfig config)
Expand Down Expand Up @@ -80,24 +77,20 @@ public AzureFileSystemFactory(
this.maxWriteConcurrency = maxWriteConcurrency;
this.maxSingleUploadSize = requireNonNull(maxSingleUploadSize, "maxSingleUploadSize is null");
this.tracingOptions = new OpenTelemetryTracingOptions().setOpenTelemetry(openTelemetry);

Dispatcher dispatcher = new Dispatcher();
dispatcher.setMaxRequests(maxHttpRequests);
dispatcher.setMaxRequestsPerHost(maxHttpRequests);
okHttpClient = new OkHttpClient.Builder()
.dispatcher(dispatcher)
.build();
this.connectionProvider = ConnectionProvider.create(applicationId, maxHttpRequests);
HttpClientOptions clientOptions = new HttpClientOptions();
clientOptions.setTracingOptions(tracingOptions);
clientOptions.setApplicationId(applicationId);
httpClient = createAzureHttpClient(okHttpClient, clientOptions);
clientOptions.setMaximumConnectionPoolSize(maxHttpRequests);
httpClient = createAzureHttpClient(connectionProvider, clientOptions);
}

@PreDestroy
public void destroy()
{
okHttpClient.dispatcher().executorService().shutdownNow();
okHttpClient.connectionPool().evictAll();
if (connectionProvider != null) {
connectionProvider.dispose();
}
}

@Override
Expand All @@ -106,20 +99,20 @@ public TrinoFileSystem create(ConnectorIdentity identity)
return new AzureFileSystem(httpClient, tracingOptions, auth, endpoint, readBlockSize, writeBlockSize, maxWriteConcurrency, maxSingleUploadSize);
}

public static HttpClient createAzureHttpClient(OkHttpClient okHttpClient, HttpClientOptions clientOptions)
public static HttpClient createAzureHttpClient(ConnectionProvider connectionProvider, HttpClientOptions clientOptions)
{
Integer poolSize = clientOptions.getMaximumConnectionPoolSize();
// By default, OkHttp uses a maximum idle connection count of 5.
int maximumConnectionPoolSize = (poolSize != null && poolSize > 0) ? poolSize : 5;
clientOptions.setMaximumConnectionPoolSize(maximumConnectionPoolSize);

return new OkHttpAsyncHttpClientBuilder(okHttpClient)
return new NettyAsyncHttpClientBuilder()
.proxy(clientOptions.getProxyOptions())
.configuration(clientOptions.getConfiguration())
.connectionTimeout(clientOptions.getConnectTimeout())
.connectTimeout(clientOptions.getConnectTimeout())
.writeTimeout(clientOptions.getWriteTimeout())
.readTimeout(clientOptions.getReadTimeout())
.connectionPool(new ConnectionPool(maximumConnectionPoolSize,
clientOptions.getConnectionIdleTimeout().toMillis(), TimeUnit.MILLISECONDS))
.connectionProvider(connectionProvider)
.eventLoopGroup(new NioEventLoopGroup(maximumConnectionPoolSize)) // default is 2 * availableProcessors
.build();
}
}
10 changes: 6 additions & 4 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,6 @@
<artifactId>azure-storage-blob</artifactId>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>com.azure</groupId>
<artifactId>azure-core-http-netty</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
Expand All @@ -289,6 +285,12 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-core</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem-azure</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import com.google.common.reflect.ClassPath;
import io.trino.plugin.hive.containers.HiveHadoop;
import io.trino.testing.QueryRunner;
import okhttp3.OkHttpClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.TestInstance;
import reactor.netty.resources.ConnectionProvider;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -77,13 +77,10 @@ protected HiveHadoop createHiveHadoop()
throws Exception
{
String connectionString = format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.windows.net", account, accessKey);
OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
closeAfterClass(() -> {
okHttpClient.dispatcher().executorService().shutdownNow();
okHttpClient.connectionPool().evictAll();
});
ConnectionProvider provider = ConnectionProvider.create("TestDeltaLakeAdsl");
closeAfterClass(provider::dispose);
BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().connectionString(connectionString)
.httpClient(createAzureHttpClient(okHttpClient, new HttpClientOptions()))
.httpClient(createAzureHttpClient(provider, new HttpClientOptions()))
.buildClient();
this.azureContainerClient = blobServiceClient.getBlobContainerClient(container);

Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,12 @@
<version>3.7.2</version>
</dependency>

<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-core</artifactId>
<version>1.1.21</version>
</dependency>

<!-- io.confluent:kafka-avro-serializer uses multiple versions of this transitive dependency -->
<dependency>
<groupId>io.swagger.core.v3</groupId>
Expand Down

0 comments on commit dc763a8

Please sign in to comment.