Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink/Azure job graph serialization fails when used with storage account shared key authentication #10245

Closed
ms1111 opened this issue Apr 28, 2024 · 3 comments
Labels
bug Something isn't working stale

Comments

@ms1111
Copy link

ms1111 commented Apr 28, 2024

Apache Iceberg version

1.5.1 (latest release)

Query engine

Flink

Please describe the bug 🐞

ADLSFileIO has an AzureProperties object. When ADLS_SHARED_KEY_ACCOUNT_NAME or ADLS_SHARED_KEY_ACCOUNT_KEY are set, AzureProperties creates a StorageSharedKeyCredential in its constructor. StorageSharedKeyCredential is not Serializable, so serialization fails during job startup.

If the storage account key is not supplied, DefaultAzureCredential will try to get credentials from the Azure CLI or another source like workload identity. That appears to work, but some environments may require shared key authentication.

The serialization error is below:

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not serialize object for key serializedUDF.
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:322)
	... 13 more
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not serialize object for key serializedUDF.
	at org.apache.flink.streaming.api.graph.StreamConfig.lambda$serializeAllConfigs$1(StreamConfig.java:203)
	at java.base/java.util.HashMap.forEach(HashMap.java:1421)
	at org.apache.flink.streaming.api.graph.StreamConfig.serializeAllConfigs(StreamConfig.java:197)
	at org.apache.flink.streaming.api.graph.StreamConfig.lambda$triggerSerializationAndReturnFuture$0(StreamConfig.java:174)
	at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718)
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.NotSerializableException: com.azure.storage.common.StorageSharedKeyCredential

Sample app to trigger it below.

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.azure.AzureProperties;
import org.apache.iceberg.azure.adlsv2.ADLSFileIO;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.types.Types;

/**
 * Run with environment variables:
 * <ul>
 *     <li>STORAGE_ACCOUNT=storage account name</li>
 *     <li>STORAGE_ACCOUNT_KEY=key</li>
 *     <li>CONTAINER=name of storage container</li>
 * </ul>
 */
public class ADLSSharedKeyAuthIssue {
    public static void main(String[] args) throws Exception {
        final String storageAccount = System.getenv("STORAGE_ACCOUNT");
        final String storageAccountKey = System.getenv("STORAGE_ACCOUNT_KEY");
        final String container = System.getenv("CONTAINER");

        Map<String, String> options = new HashMap<>();
        options.put("warehouse", "abfss://" + container + "@" + storageAccount + ".dfs.core.windows.net");
        options.put("uri", "http://localhost:19120/api/v1");
        options.put(CatalogProperties.FILE_IO_IMPL, ADLSFileIO.class.getCanonicalName());
        options.put(AzureProperties.ADLS_SHARED_KEY_ACCOUNT_NAME, storageAccount);
        options.put(AzureProperties.ADLS_SHARED_KEY_ACCOUNT_KEY, storageAccountKey);

        CatalogLoader catalogLoader = CatalogLoader.custom(
                "flink",
                options,
                new Configuration(),
                CatalogUtil.ICEBERG_CATALOG_NESSIE);

        Catalog catalog = catalogLoader.loadCatalog();

        Schema schema = new Schema(
                Types.NestedField.required(1, "id", Types.LongType.get()));

        PartitionSpec spec = PartitionSpec.builderFor(schema).build();
        Namespace namespace = Namespace.of("nsname_" + UUID.randomUUID().toString().substring(0, 4));
        ((SupportsNamespaces) catalog).createNamespace(namespace);
        TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "t1");
        Table table = catalog.createTable(tableIdentifier, schema, spec);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Row> source = env.fromElements(1)
                .map(data -> {
                    Row row = new Row(1);
                    row.setField(0, data);
                    return row;
                });

        FlinkSink.forRow(source, FlinkSchemaUtil.toSchema(schema))
                .tableLoader(TableLoader.fromCatalog(catalogLoader, tableIdentifier))
                .overwrite(true)
                .append();

        env.execute();
    }
}

POM:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>adls-shared-key-auth-issue</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <iceberg.version>1.5.1</iceberg.version>
        <flink.version>1.18.1</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-core</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-api</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-parquet</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <!-- Needed for to load the nessie catalog  -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-nessie</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-azure</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-data</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-identity</artifactId>
        </dependency>

        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-storage-file-datalake</artifactId>
        </dependency>

        <!--  to be able to create parquet file       -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.3</version>
        </dependency>

        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-column</artifactId>
            <version>1.13.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink</artifactId>
            <version>${iceberg.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-1.18</artifactId>
            <version>${iceberg.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-metrics-dropwizard</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.azure</groupId>
                <artifactId>azure-sdk-bom</artifactId>
                <version>1.2.22</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>
@ms1111 ms1111 added the bug Something isn't working label Apr 28, 2024
@nastra
Copy link
Contributor

nastra commented Apr 29, 2024

This will be fixed by #10045

Copy link

github-actions bot commented Nov 1, 2024

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Nov 1, 2024
@ms1111
Copy link
Author

ms1111 commented Nov 1, 2024

Fixed in 1.6.0 by #10045

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working stale
Projects
None yet
Development

No branches or pull requests

2 participants