diff --git a/build-logic/src/main/kotlin/Utilities.kt b/build-logic/src/main/kotlin/Utilities.kt index 3f2c248d014..bf8ec9c8070 100644 --- a/build-logic/src/main/kotlin/Utilities.kt +++ b/build-logic/src/main/kotlin/Utilities.kt @@ -64,6 +64,7 @@ fun ModuleDependency.withSparkExcludes(): ModuleDependency { .exclude("org.eclipse.jetty", "jetty-util") .exclude("org.apache.avro", "avro") .exclude("org.apache.arrow", "arrow-vector") + .exclude("org.apache.logging.log4j", "log4j-slf4j2-impl") } fun DependencyHandlerScope.forScala(scalaVersion: String) { diff --git a/gc/gc-base-tests/src/main/java/org/projectnessie/gc/files/tests/AbstractFiles.java b/gc/gc-base-tests/src/main/java/org/projectnessie/gc/files/tests/AbstractFiles.java index ac7b937aec4..8d2df75fcfa 100644 --- a/gc/gc-base-tests/src/main/java/org/projectnessie/gc/files/tests/AbstractFiles.java +++ b/gc/gc-base-tests/src/main/java/org/projectnessie/gc/files/tests/AbstractFiles.java @@ -45,7 +45,7 @@ public void walkEmpty() throws Exception { @Test public void deleteNonExisting() { - assertThat(deleter().delete(FileReference.of(baseUri().resolve("fileX"), baseUri(), 123L))) + assertThat(deleter().delete(FileReference.of(StorageUri.of("fileX"), baseUri(), 123L))) .isEqualTo(DeleteResult.SUCCESS); } @@ -133,7 +133,7 @@ public void manyFiles() throws Exception { .deleteMultiple( baseUri(), IntStream.rangeClosed(1, deletes) - .mapToObj(i -> baseUri().resolve(dirAndFilename(i))) + .mapToObj(i -> StorageUri.of(dirAndFilename(i))) .map(p -> FileReference.of(p, baseUri(), -1L)))) .isEqualTo(DeleteSummary.of(deletes, 0L)); diff --git a/gc/gc-base/src/main/java/org/projectnessie/gc/files/FileReference.java b/gc/gc-base/src/main/java/org/projectnessie/gc/files/FileReference.java index c8a713b30b2..e13abdeec17 100644 --- a/gc/gc-base/src/main/java/org/projectnessie/gc/files/FileReference.java +++ b/gc/gc-base/src/main/java/org/projectnessie/gc/files/FileReference.java @@ -15,6 +15,8 @@ */ package org.projectnessie.gc.files; +import static com.google.common.base.Preconditions.checkArgument; + import org.immutables.value.Value; import org.projectnessie.storage.uri.StorageUri; @@ -35,11 +37,20 @@ public interface FileReference { @Value.Auxiliary long modificationTimeMillisEpoch(); - @Value.NonAttribute + /** + * Absolute path to the file/directory. Virtually equivalent to {@code base().resolve(path())}. + */ + @Value.Lazy default StorageUri absolutePath() { return base().resolve(path()); } + @Value.Check + default void check() { + checkArgument(base().isAbsolute(), "Base location must be absolute: %s", base()); + checkArgument(!path().isAbsolute(), "Path must be relative: %s", path()); + } + static ImmutableFileReference.Builder builder() { return ImmutableFileReference.builder(); } diff --git a/gc/gc-base/src/test/java/org/projectnessie/gc/roundtrip/TestMarkAndSweep.java b/gc/gc-base/src/test/java/org/projectnessie/gc/roundtrip/TestMarkAndSweep.java index efee3116a18..42a6d3d4648 100644 --- a/gc/gc-base/src/test/java/org/projectnessie/gc/roundtrip/TestMarkAndSweep.java +++ b/gc/gc-base/src/test/java/org/projectnessie/gc/roundtrip/TestMarkAndSweep.java @@ -279,10 +279,10 @@ public void close() {} path -> { long l = markAndSweep.baseLocationToNum(path); return Stream.of( - FileReference.of(path.resolve("metadata-" + l), path, 123L), - FileReference.of(path.resolve("1-unused"), path, 123L), - FileReference.of(path.resolve("2-unused"), path, markAndSweep.newestToDeleteMillis), - FileReference.of(path.resolve("too-new-2"), path, markAndSweep.tooNewMillis)); + FileReference.of(StorageUri.of("metadata-" + l), path, 123L), + FileReference.of(StorageUri.of("1-unused"), path, 123L), + FileReference.of(StorageUri.of("2-unused"), path, markAndSweep.newestToDeleteMillis), + FileReference.of(StorageUri.of("too-new-2"), path, markAndSweep.tooNewMillis)); }; FileDeleter deleter = fileObject -> { @@ -299,13 +299,15 @@ public void close() {} .fileDeleter(deleter) .expectedFileCount(100) .contentToFiles( - contentReference -> - Stream.of( - FileReference.of( - StorageUri.of(contentReference.metadataLocation()), - markAndSweep.numToBaseLocation( - markAndSweep.contentIdToNum(contentReference.contentId())), - -1L))) + contentReference -> { + StorageUri baseLocation = + markAndSweep.numToBaseLocation( + markAndSweep.contentIdToNum(contentReference.contentId())); + StorageUri location = StorageUri.of(contentReference.metadataLocation()); + return Stream.of( + FileReference.of( + baseLocation.relativize(location), baseLocation, -1L)); + }) .maxFileModificationTime(markAndSweep.maxFileModificationTime) .build()) .build(); diff --git a/gc/gc-iceberg-files/build.gradle.kts b/gc/gc-iceberg-files/build.gradle.kts index ea458479323..fd0fe481b67 100644 --- a/gc/gc-iceberg-files/build.gradle.kts +++ b/gc/gc-iceberg-files/build.gradle.kts @@ -76,6 +76,10 @@ dependencies { testFixturesRuntimeOnly("com.google.cloud:google-cloud-storage") testFixturesRuntimeOnly(libs.google.cloud.nio) + testFixturesApi(platform(libs.azuresdk.bom)) + testFixturesApi("com.azure:azure-storage-file-datalake") + testFixturesRuntimeOnly("com.azure:azure-identity") + testFixturesApi(platform(libs.junit.bom)) testFixturesApi(libs.bundles.junit.testing) } diff --git a/gc/gc-iceberg-files/src/main/java/org/projectnessie/gc/iceberg/files/IcebergFiles.java b/gc/gc-iceberg-files/src/main/java/org/projectnessie/gc/iceberg/files/IcebergFiles.java index ddbfae6f43e..7dcbfd0939e 100644 --- a/gc/gc-iceberg-files/src/main/java/org/projectnessie/gc/iceberg/files/IcebergFiles.java +++ b/gc/gc-iceberg-files/src/main/java/org/projectnessie/gc/iceberg/files/IcebergFiles.java @@ -140,9 +140,14 @@ public Stream listRecursively(StorageUri path) throws NessieFileI } return StreamSupport.stream(fileInfos.spliterator(), false) .map( - f -> - FileReference.of( - basePath.relativize(f.location()), basePath, f.createdAtMillis())); + f -> { + StorageUri location = StorageUri.of(f.location()); + if (!location.isAbsolute()) { + location = basePath.resolve("/").resolve(location); + } + return FileReference.of( + basePath.relativize(location), basePath, f.createdAtMillis()); + }); } return listHadoop(basePath); diff --git a/gc/gc-iceberg-files/src/test/java/org/projectnessie/gc/iceberg/files/AbstractFiles.java b/gc/gc-iceberg-files/src/test/java/org/projectnessie/gc/iceberg/files/AbstractFiles.java new file mode 100644 index 00000000000..af82944d088 --- /dev/null +++ b/gc/gc-iceberg-files/src/test/java/org/projectnessie/gc/iceberg/files/AbstractFiles.java @@ -0,0 +1,173 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.gc.iceberg.files; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.projectnessie.gc.files.DeleteSummary; +import org.projectnessie.gc.files.FileReference; +import org.projectnessie.objectstoragemock.Bucket; +import org.projectnessie.objectstoragemock.MockObject; +import org.projectnessie.objectstoragemock.ObjectStorageMock; +import org.projectnessie.storage.uri.StorageUri; + +public abstract class AbstractFiles { + + @Test + public void iceberg() throws Exception { + StorageUri baseUri = storageUri("/path/"); + + Set keys = new TreeSet<>(); + keys.add("path/file-1"); + keys.add("path/file-2"); + keys.add("path/file-3"); + keys.add("path/dir-1/file-4"); + keys.add("path/dir-1/dir-2/file-5"); + + try (ObjectStorageMock.MockServer server = createServer(keys); + IcebergFiles icebergFiles = createIcebergFiles(server)) { + + Set expect = + keys.stream().map(this::storageUri).collect(Collectors.toCollection(HashSet::new)); + + try (Stream files = icebergFiles.listRecursively(baseUri)) { + assertThat(files) + .allSatisfy(f -> assertThat(f.base()).isEqualTo(baseUri)) + .map(FileReference::absolutePath) + .containsExactlyInAnyOrderElementsOf(expect); + } + + icebergFiles.deleteMultiple( + baseUri, + Stream.of( + FileReference.of(StorageUri.of("file-2"), baseUri, -1L), + FileReference.of(StorageUri.of("file-3"), baseUri, -1L))); + expect.remove(baseUri.resolve("file-2")); + expect.remove(baseUri.resolve("file-3")); + + try (Stream files = icebergFiles.listRecursively(baseUri)) { + assertThat(files) + .allSatisfy(f -> assertThat(f.base()).isEqualTo(baseUri)) + .map(FileReference::absolutePath) + .containsExactlyInAnyOrderElementsOf(expect); + } + + icebergFiles.delete(FileReference.of(StorageUri.of("dir-1/file-4"), baseUri, -1L)); + expect.remove(baseUri.resolve("dir-1/file-4")); + + try (Stream files = icebergFiles.listRecursively(baseUri)) { + assertThat(files) + .allSatisfy(f -> assertThat(f.base()).isEqualTo(baseUri)) + .map(FileReference::absolutePath) + .containsExactlyInAnyOrderElementsOf(expect); + } + } + } + + /** + * Creates many files, lists the files, deletes 10% of the created files, lists again. + * + *

Minio in the used configuration is not particularly fast - creating 100000 objects with 4 + * threads (more crashes w/ timeouts) takes about ~30 minutes (plus ~3 seconds for listing 100000 + * objects, plus ~3 seconds for deleting 10000 objects). + */ + @ParameterizedTest + @ValueSource(ints = {500}) + public void manyFiles(int numFiles) throws Exception { + StorageUri baseUri = storageUri("/path/"); + + Set keys = + IntStream.range(0, numFiles) + .mapToObj(i -> String.format("path/%d/%d", i % 100, i)) + .collect(Collectors.toCollection(HashSet::new)); + + try (ObjectStorageMock.MockServer server = createServer(keys); + IcebergFiles icebergFiles = createIcebergFiles(server)) { + + try (Stream files = icebergFiles.listRecursively(baseUri)) { + assertThat(files).hasSize(numFiles); + } + + int deletes = numFiles / 10; + assertThat( + icebergFiles.deleteMultiple( + baseUri, + IntStream.range(0, deletes) + .mapToObj(i -> StorageUri.of(String.format("%d/%d", i % 100, i))) + .map(p -> FileReference.of(p, baseUri, -1L)))) + .isEqualTo(DeleteSummary.of(deletes, 0L)); + + try (Stream files = icebergFiles.listRecursively(baseUri)) { + assertThat(files).hasSize(numFiles - deletes); + } + } + } + + private ObjectStorageMock.MockServer createServer(Set keys) { + return ObjectStorageMock.builder() + .putBuckets( + bucket(), + Bucket.builder() + .lister( + (String prefix, String offset) -> + keys.stream() + .map( + key -> + new Bucket.ListElement() { + @Override + public String key() { + return key; + } + + @Override + public MockObject object() { + return MockObject.builder().build(); + } + })) + .object(key -> keys.contains(key) ? MockObject.builder().build() : null) + .deleter(keys::remove) + .build()) + .build() + .start(); + } + + private IcebergFiles createIcebergFiles(ObjectStorageMock.MockServer server) { + return IcebergFiles.builder() + .properties(icebergProperties(server)) + .hadoopConfiguration(hadoopConfiguration(server)) + .build(); + } + + protected abstract String bucket(); + + protected abstract StorageUri storageUri(String path); + + protected abstract Map icebergProperties( + ObjectStorageMock.MockServer server); + + protected abstract Configuration hadoopConfiguration(ObjectStorageMock.MockServer server); +} diff --git a/gc/gc-iceberg-files/src/test/java/org/projectnessie/gc/iceberg/files/TestIcebergAdlsFiles.java b/gc/gc-iceberg-files/src/test/java/org/projectnessie/gc/iceberg/files/TestIcebergAdlsFiles.java new file mode 100644 index 00000000000..1d497001a2c --- /dev/null +++ b/gc/gc-iceberg-files/src/test/java/org/projectnessie/gc/iceberg/files/TestIcebergAdlsFiles.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2022 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.gc.iceberg.files; + +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.projectnessie.objectstoragemock.ObjectStorageMock.MockServer; +import org.projectnessie.storage.uri.StorageUri; + +public class TestIcebergAdlsFiles extends AbstractFiles { + + @Override + protected String bucket() { + return "$root"; + } + + @Override + protected Map icebergProperties(MockServer server) { + Map props = new HashMap<>(); + + props.put("adls.connection-string.account", server.getAdlsGen2BaseUri().toString()); + props.put("adls.auth.shared-key.account.name", "account@account.dfs.core.windows.net"); + props.put("adls.auth.shared-key.account.key", "key"); + + return props; + } + + protected Configuration hadoopConfiguration(MockServer server) { + Configuration conf = new Configuration(); + + conf.set("fs.azure.impl", "org.apache.hadoop.fs.azure.AzureNativeFileSystemStore"); + conf.set("fs.AbstractFileSystem.azure.impl", "org.apache.hadoop.fs.azurebfs.Abfs"); + conf.set("fs.azure.storage.emulator.account.name", "account"); + conf.set("fs.azure.account.auth.type", "SharedKey"); + conf.set("fs.azure.account.key.account", ""); + + return conf; + } + + @Override + protected StorageUri storageUri(String path) { + return StorageUri.of(String.format("abfs://%s@account/", bucket())).resolve(path); + } +} diff --git a/gc/gc-iceberg-files/src/test/java/org/projectnessie/gc/iceberg/files/TestIcebergGCSFiles.java b/gc/gc-iceberg-files/src/test/java/org/projectnessie/gc/iceberg/files/TestIcebergGCSFiles.java new file mode 100644 index 00000000000..bf61e58c414 --- /dev/null +++ b/gc/gc-iceberg-files/src/test/java/org/projectnessie/gc/iceberg/files/TestIcebergGCSFiles.java @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2022 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.gc.iceberg.files; + +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Disabled; +import org.projectnessie.objectstoragemock.ObjectStorageMock.MockServer; +import org.projectnessie.storage.uri.StorageUri; + +@Disabled( + "Requires implementation of the /batch/storage/v1 endpoint in object-storage-mock. " + + "That consumes a multipart/mixed content, which contains a series of serialized HTTP requests.") +public class TestIcebergGCSFiles extends AbstractFiles { + + @Override + protected String bucket() { + return "bucket"; + } + + @Override + protected Map icebergProperties(MockServer server) { + Map props = new HashMap<>(); + + props.put("gcs.project-id", "my-project"); + // MUST NOT end with a trailing slash, otherwise code like + // com.google.cloud.storage.spi.v1.HttpStorageRpc.DefaultRpcBatch.submit inserts an ambiguous + // empty path segment ("//"). + String uri = server.getGcsBaseUri().toString(); + uri = uri.substring(0, uri.length() - 1); + props.put("gcs.service.host", uri); + props.put("gcs.no-auth", "true"); + + return props; + } + + protected Configuration hadoopConfiguration(MockServer server) { + Configuration conf = new Configuration(); + + conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"); + conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"); + conf.set("fs.gs.project.id", "projectId"); + conf.set("fs.gs.auth.type", "none"); + + return conf; + } + + @Override + protected StorageUri storageUri(String path) { + return StorageUri.of(String.format("gs://%s/", bucket())).resolve(path); + } +} diff --git a/gc/gc-iceberg-files/src/test/java/org/projectnessie/gc/iceberg/files/TestIcebergS3Files.java b/gc/gc-iceberg-files/src/test/java/org/projectnessie/gc/iceberg/files/TestIcebergS3Files.java index 6e5038b345b..83fb6e3d5fb 100644 --- a/gc/gc-iceberg-files/src/test/java/org/projectnessie/gc/iceberg/files/TestIcebergS3Files.java +++ b/gc/gc-iceberg-files/src/test/java/org/projectnessie/gc/iceberg/files/TestIcebergS3Files.java @@ -15,164 +15,45 @@ */ package org.projectnessie.gc.iceberg.files; -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.HashSet; -import java.util.Set; -import java.util.TreeSet; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.Stream; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; -import org.projectnessie.gc.files.DeleteSummary; -import org.projectnessie.gc.files.FileReference; -import org.projectnessie.objectstoragemock.Bucket; -import org.projectnessie.objectstoragemock.Bucket.ListElement; -import org.projectnessie.objectstoragemock.MockObject; -import org.projectnessie.objectstoragemock.ObjectStorageMock; import org.projectnessie.objectstoragemock.ObjectStorageMock.MockServer; import org.projectnessie.storage.uri.StorageUri; -public class TestIcebergS3Files { - - private static final String BUCKET = "bucket"; - - @Test - public void icebergS3() throws Exception { - StorageUri baseUri = icebergBaseUri("/path/"); - - Set keys = new TreeSet<>(); - keys.add("path/file-1"); - keys.add("path/file-2"); - keys.add("path/file-3"); - keys.add("path/dir-1/file-4"); - keys.add("path/dir-1/dir-2/file-5"); - - try (MockServer server = createServer(keys); - IcebergFiles s3 = createIcebergFiles(server)) { - - Set expect = - keys.stream() - .map(TestIcebergS3Files::icebergBaseUri) - .collect(Collectors.toCollection(HashSet::new)); +public class TestIcebergS3Files extends AbstractFiles { - try (Stream files = s3.listRecursively(baseUri)) { - assertThat(files) - .allSatisfy(f -> assertThat(f.base()).isEqualTo(baseUri)) - .map(FileReference::absolutePath) - .containsExactlyInAnyOrderElementsOf(expect); - } - - s3.deleteMultiple( - baseUri, - Stream.of( - FileReference.of(baseUri.resolve("file-2"), baseUri, -1L), - FileReference.of(baseUri.resolve("file-3"), baseUri, -1L))); - expect.remove(baseUri.resolve("file-2")); - expect.remove(baseUri.resolve("file-3")); - - try (Stream files = s3.listRecursively(baseUri)) { - assertThat(files) - .allSatisfy(f -> assertThat(f.base()).isEqualTo(baseUri)) - .map(FileReference::absolutePath) - .containsExactlyInAnyOrderElementsOf(expect); - } - - s3.delete(FileReference.of(baseUri.resolve("dir-1/file-4"), baseUri, -1L)); - expect.remove(baseUri.resolve("dir-1/file-4")); - - try (Stream files = s3.listRecursively(baseUri)) { - assertThat(files) - .allSatisfy(f -> assertThat(f.base()).isEqualTo(baseUri)) - .map(FileReference::absolutePath) - .containsExactlyInAnyOrderElementsOf(expect); - } - } + @Override + protected String bucket() { + return "bucket"; } - /** - * Creates many files, lists the files, deletes 10% of the created files, lists again. - * - *

Minio in the used configuration is not particularly fast - creating 100000 objects with 4 - * threads (more crashes w/ timeouts) takes about ~30 minutes (plus ~3 seconds for listing 100000 - * objects, plus ~3 seconds for deleting 10000 objects). - */ - @ParameterizedTest - @ValueSource(ints = {500}) - public void manyFiles(int numFiles) throws Exception { - StorageUri baseUri = icebergBaseUri("/path/"); - - Set keys = - IntStream.range(0, numFiles) - .mapToObj(i -> String.format("path/%d/%d", i % 100, i)) - .collect(Collectors.toCollection(HashSet::new)); + @Override + protected Map icebergProperties(MockServer server) { + Map props = new HashMap<>(); - try (MockServer server = createServer(keys); - IcebergFiles s3 = createIcebergFiles(server)) { + props.put("s3.access-key-id", "accessKey"); + props.put("s3.secret-access-key", "secretKey"); + props.put("s3.endpoint", server.getS3BaseUri().toString()); + // must enforce path-style access because S3Resource has the bucket name in its path + props.put("s3.path-style-access", "true"); + props.put("http-client.type", "urlconnection"); - try (Stream files = s3.listRecursively(baseUri)) { - assertThat(files).hasSize(numFiles); - } - - int deletes = numFiles / 10; - assertThat( - s3.deleteMultiple( - baseUri, - IntStream.range(0, deletes) - .mapToObj(i -> baseUri.resolve(String.format("%d/%d", i % 100, i))) - .map(p -> FileReference.of(p, baseUri, -1L)))) - .isEqualTo(DeleteSummary.of(deletes, 0L)); - - try (Stream files = s3.listRecursively(baseUri)) { - assertThat(files).hasSize(numFiles - deletes); - } - } + return props; } - private IcebergFiles createIcebergFiles(MockServer server) { - return IcebergFiles.builder() - .properties(server.icebergProperties()) - .hadoopConfiguration(hadoopConfiguration(server)) - .build(); - } + protected Configuration hadoopConfiguration(MockServer server) { + Configuration conf = new Configuration(); - private static MockServer createServer(Set keys) { - return ObjectStorageMock.builder() - .putBuckets( - BUCKET, - Bucket.builder() - .lister( - (String prefix, String offset) -> - keys.stream() - .map( - key -> - new ListElement() { - @Override - public String key() { - return key; - } + conf.set("fs.s3a.access.key", "accessKey"); + conf.set("fs.s3a.secret.key", "secretKey"); + conf.set("fs.s3a.endpoint", server.getS3BaseUri().toString()); - @Override - public MockObject object() { - return MockObject.builder().build(); - } - })) - .deleter(keys::remove) - .build()) - .build() - .start(); - } - - protected static StorageUri icebergBaseUri(String path) { - return StorageUri.of(String.format("s3://%s/", BUCKET)).resolve(path); + return conf; } - protected Configuration hadoopConfiguration(MockServer server) { - Configuration conf = new Configuration(); - server.hadoopConfiguration().forEach(conf::set); - return conf; + @Override + protected StorageUri storageUri(String path) { + return StorageUri.of(String.format("s3://%s/", bucket())).resolve(path); } } diff --git a/gc/gc-iceberg-inttest/build.gradle.kts b/gc/gc-iceberg-inttest/build.gradle.kts index 00b81bbaa22..9bcea350c5b 100644 --- a/gc/gc-iceberg-inttest/build.gradle.kts +++ b/gc/gc-iceberg-inttest/build.gradle.kts @@ -22,12 +22,17 @@ plugins { extra["maven.name"] = "Nessie - GC - Integration tests" -val sparkScala = useSparkScalaVersionsForProject("3.4", "2.12") +val sparkScala = useSparkScalaVersionsForProject("3.5", "2.12") dependencies { implementation(libs.hadoop.client) implementation(platform(libs.iceberg.bom)) + + // Enforce a single version of Netty among dependencies + // (Spark, Hadoop and Azure) + implementation(enforcedPlatform(libs.netty.bom)) + implementation("org.apache.iceberg:iceberg-core") implementation("org.apache.iceberg:iceberg-aws") implementation("org.apache.iceberg:iceberg-gcp") diff --git a/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/AbstractITSparkIcebergNessieObjectStorage.java b/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/AbstractITSparkIcebergNessieObjectStorage.java index 82e8d564185..4cf8dcdd6e7 100644 --- a/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/AbstractITSparkIcebergNessieObjectStorage.java +++ b/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/AbstractITSparkIcebergNessieObjectStorage.java @@ -15,6 +15,7 @@ */ package org.projectnessie.gc.iceberg.inttest; +import static org.assertj.core.api.Assumptions.assumeThat; import static org.projectnessie.gc.iceberg.inttest.AbstractITSparkIcebergNessieObjectStorage.Step.dml; import static org.projectnessie.gc.iceberg.inttest.AbstractITSparkIcebergNessieObjectStorage.Step.expiredDdl; import static org.projectnessie.gc.iceberg.inttest.AbstractITSparkIcebergNessieObjectStorage.Step.expiredDml; @@ -25,6 +26,7 @@ import jakarta.annotation.Nullable; import java.time.Instant; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -120,6 +122,11 @@ static ImmutableTestCase.Builder testCase() { Map policies(); String namespace(); + + @Value.Default + default Set compatibleStorages() { + return EnumSet.allOf(Storage.class); + } } static Stream testCases() { @@ -151,6 +158,8 @@ static Stream testCases() { // 3 testCase() .namespace("tc_3") + // ADLS FileIO does not support special characters in column names + .addCompatibleStorages(Storage.S3, Storage.GCS) .addSteps( expiredDdl( // Note: intentional special chars in the column name @@ -169,6 +178,8 @@ static Stream testCases() { @MethodSource("testCases") public void roundTrips(TestCase testCase) throws Exception { + assumeThat(testCase.compatibleStorages()).contains(storage()); + api.createNamespace() .namespace(testCase.namespace()) .refName(api.getConfig().getDefaultBranch()) @@ -279,4 +290,12 @@ protected Set allFiles(IcebergFiles icebergFiles) throws NessieFileI } protected abstract StorageUri bucketUri(); + + abstract Storage storage(); + + enum Storage { + S3, + GCS, + ADLS, + } } diff --git a/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/ITSparkIcebergNessieAzure.java b/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/ITSparkIcebergNessieAzure.java index cbbf375829d..54e1284a220 100644 --- a/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/ITSparkIcebergNessieAzure.java +++ b/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/ITSparkIcebergNessieAzure.java @@ -26,7 +26,9 @@ import org.projectnessie.testing.azurite.AzuriteAccess; import org.projectnessie.testing.azurite.AzuriteExtension; -@Disabled("Needs an Iceberg release with https://github.com/apache/iceberg/pull/10045") +@Disabled( + "1) Needs an Iceberg release with https://github.com/apache/iceberg/pull/10045 " + + "2) Azurite is incompatible with ADLS v2 list-prefix REST endpoint") @ExtendWith(AzuriteExtension.class) public class ITSparkIcebergNessieAzure extends AbstractITSparkIcebergNessieObjectStorage { @@ -34,6 +36,11 @@ public class ITSparkIcebergNessieAzure extends AbstractITSparkIcebergNessieObjec private static @Azurite AzuriteAccess azuriteAccess; + @Override + Storage storage() { + return Storage.ADLS; + } + @Override protected String warehouseURI() { return azuriteAccess.location(BUCKET_URI); diff --git a/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/ITSparkIcebergNessieAzureMock.java b/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/ITSparkIcebergNessieAzureMock.java new file mode 100644 index 00000000000..d523cad1a87 --- /dev/null +++ b/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/ITSparkIcebergNessieAzureMock.java @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.gc.iceberg.inttest; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.net.URI; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.projectnessie.gc.iceberg.files.IcebergFiles; +import org.projectnessie.objectstoragemock.HeapStorageBucket; +import org.projectnessie.objectstoragemock.ObjectStorageMock; +import org.projectnessie.objectstoragemock.ObjectStorageMock.MockServer; +import org.projectnessie.storage.uri.StorageUri; + +@Disabled("Needs an Iceberg release with https://github.com/apache/iceberg/pull/10045") +public class ITSparkIcebergNessieAzureMock extends AbstractITSparkIcebergNessieObjectStorage { + + private static final String FILESYSTEM = "filesystem1"; + private static final String ACCOUNT = "account123"; + private static final String ACCOUNT_FQ = ACCOUNT + ".dfs.core.windows.net"; + private static final String SECRET = "s3cr3t"; + private static final String SECRET_BASE_64 = + new String(Base64.getEncoder().encode(SECRET.getBytes(UTF_8))); + private static final String ADLS_WAREHOUSE_LOCATION = + "abfs://" + FILESYSTEM + "@" + ACCOUNT_FQ + "/warehouse"; + + private static MockServer server; + private static URI endpoint; + + @Override + Storage storage() { + return Storage.ADLS; + } + + @BeforeAll + static void beforeAll() { + HeapStorageBucket bucket = HeapStorageBucket.newHeapStorageBucket(); + server = ObjectStorageMock.builder().putBuckets(FILESYSTEM, bucket.bucket()).build().start(); + endpoint = server.getAdlsGen2BaseUri().resolve(FILESYSTEM); + } + + @AfterAll + static void afterAll() throws Exception { + if (server != null) { + server.close(); + } + } + + @Override + protected StorageUri bucketUri() { + return StorageUri.of(ADLS_WAREHOUSE_LOCATION); + } + + @Override + protected String warehouseURI() { + return bucketUri().location(); + } + + @Override + protected Map sparkHadoop() { + Map r = new HashMap<>(); + r.put("fs.azure.impl", "org.apache.hadoop.fs.azure.AzureNativeFileSystemStore"); + r.put("fs.AbstractFileSystem.azure.impl", "org.apache.hadoop.fs.azurebfs.Abfs"); + r.put("fs.azure.always.use.https", "false"); + r.put("fs.azure.abfs.endpoint", endpoint.getHost() + ":" + endpoint.getPort()); + r.put("fs.azure.test.emulator", "true"); + r.put("fs.azure.storage.emulator.account.name", ACCOUNT); + r.put("fs.azure.account.auth.type", "SharedKey"); + r.put("fs.azure.account.key." + ACCOUNT_FQ, SECRET_BASE_64); + return r; + } + + @Override + protected Map nessieParams() { + Map r = new HashMap<>(super.nessieParams()); + r.put("io-impl", "org.apache.iceberg.azure.adlsv2.ADLSFileIO"); + r.put("adls.connection-string." + ACCOUNT_FQ, endpoint.toString()); + r.put("adls.auth.shared-key.account.name", ACCOUNT); + r.put("adls.auth.shared-key.account.key", SECRET_BASE_64); + return r; + } + + @Override + IcebergFiles icebergFiles() { + Configuration conf = new Configuration(); + sparkHadoop().forEach(conf::set); + return IcebergFiles.builder().properties(nessieParams()).hadoopConfiguration(conf).build(); + } +} diff --git a/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/ITSparkIcebergNessieGCP.java b/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/ITSparkIcebergNessieGCP.java index 357174fa527..1c3affe3d6c 100644 --- a/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/ITSparkIcebergNessieGCP.java +++ b/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/ITSparkIcebergNessieGCP.java @@ -32,6 +32,11 @@ public class ITSparkIcebergNessieGCP extends AbstractITSparkIcebergNessieObjectS private static @Gcs GcsAccess gcsAccess; + @Override + Storage storage() { + return Storage.GCS; + } + @Override protected String warehouseURI() { return gcsAccess.bucketUri(BUCKET_URI).toString(); diff --git a/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/ITSparkIcebergNessieS3.java b/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/ITSparkIcebergNessieS3.java index 40a2e2c23db..8a22b87ae56 100644 --- a/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/ITSparkIcebergNessieS3.java +++ b/gc/gc-iceberg-inttest/src/intTest/java/org/projectnessie/gc/iceberg/inttest/ITSparkIcebergNessieS3.java @@ -41,6 +41,11 @@ public class ITSparkIcebergNessieS3 extends AbstractITSparkIcebergNessieObjectSt @Minio static MinioAccess minio; + @Override + Storage storage() { + return Storage.S3; + } + @Override protected String warehouseURI() { return minio.s3BucketUri(S3_BUCKET_URI).toString(); diff --git a/gc/gc-iceberg-inttest/src/intTest/resources/logback-test.xml b/gc/gc-iceberg-inttest/src/intTest/resources/logback-test.xml index 2a6be300dc2..a63d8c2769f 100644 --- a/gc/gc-iceberg-inttest/src/intTest/resources/logback-test.xml +++ b/gc/gc-iceberg-inttest/src/intTest/resources/logback-test.xml @@ -23,8 +23,7 @@ %date{ISO8601} [%thread] %-5level %logger{36} - %msg%n - - + diff --git a/gc/gc-iceberg/src/main/java/org/projectnessie/gc/iceberg/IcebergContentToFiles.java b/gc/gc-iceberg/src/main/java/org/projectnessie/gc/iceberg/IcebergContentToFiles.java index 61c971a9565..bf7ff19eed7 100644 --- a/gc/gc-iceberg/src/main/java/org/projectnessie/gc/iceberg/IcebergContentToFiles.java +++ b/gc/gc-iceberg/src/main/java/org/projectnessie/gc/iceberg/IcebergContentToFiles.java @@ -53,7 +53,10 @@ public abstract class IcebergContentToFiles implements ContentToFiles { public static final String S3_KEY_NOT_FOUND = "software.amazon.awssdk.services.s3.model.NoSuchKeyException"; public static final String GCS_STORAGE_EXCEPTION = "com.google.cloud.storage.StorageException"; + public static final String ADLS_STORAGE_EXCEPTION = + "com.azure.storage.blob.models.BlobStorageException"; public static final String GCS_NOT_FOUND_START = "404 Not Found"; + public static final String ADLS_NOT_FOUND_CODE = "PathNotFound"; public static Builder builder() { return ImmutableIcebergContentToFiles.builder(); @@ -89,9 +92,16 @@ public Stream extractFiles(ContentReference contentReference) { || S3_KEY_NOT_FOUND.equals(notFoundCandidate.getClass().getName())) { notFound = true; } else { - for (Throwable c = notFoundCandidate.getCause(); c != null; c = c.getCause()) { - if (GCS_STORAGE_EXCEPTION.equals(c.getClass().getName()) - && c.getMessage().startsWith(GCS_NOT_FOUND_START)) { + for (Throwable c = notFoundCandidate; c != null; c = c.getCause()) { + String exceptionClass = c.getClass().getName(); + String message = c.getMessage(); + if (GCS_STORAGE_EXCEPTION.equals(exceptionClass) + && message.startsWith(GCS_NOT_FOUND_START)) { + notFound = true; + break; + } + if (ADLS_STORAGE_EXCEPTION.equals(exceptionClass) + && message.contains(ADLS_NOT_FOUND_CODE)) { notFound = true; break; } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index bd388d1a229..3832a018d30 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -20,6 +20,7 @@ logback = "1.5.6" mavenResolver = "1.9.20" mockito="4.11.0" nessieClientVersion = "0.77.1" # Must be in sync with Nessie version in the Iceberg release. +netty = "4.1.110.Final" opentelemetry = "1.38.0" opentelemetryAlpha = "1.31.0-alpha" opentracing = "0.33.0" @@ -114,6 +115,7 @@ mockito-junit-jupiter = { module = "org.mockito:mockito-junit-jupiter", version. mongodb-driver-sync = { module = "org.mongodb:mongodb-driver-sync", version = "5.1.0" } nessie-runner-common = { module = "org.projectnessie.nessie-runner:nessie-runner-common", version = "0.32.2" } nessie-ui = { module = "org.projectnessie.nessie.ui:nessie-ui", version = "0.63.6" } +netty-bom = { module = "io.netty:netty-bom", version.ref = "netty" } opentelemetry-bom = { module = "io.opentelemetry:opentelemetry-bom", version.ref = "opentelemetry" } opentelemetry-bom-alpha = { module = "io.opentelemetry:opentelemetry-bom-alpha", version.ref = "opentelemetryAlpha" } opentelemetry-instrumentation-bom-alpha = { module = "io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom-alpha", version.ref = "opentelemetryAlpha" } diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java index 62bea6658cb..b59f1e921b0 100644 --- a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java @@ -295,6 +295,7 @@ public Response list( RFC_1123_DATE_TIME.format( ZonedDateTime.ofInstant( Instant.ofEpochMilli(obj.lastModified()), ZoneId.of("UTC")))) + .creationTime(1000L) // cannot be zero .directory(false) .build()); keyCount++; diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java index 3618025101d..b40491a53c6 100644 --- a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java @@ -110,14 +110,6 @@ default Map icebergProperties() { props.put("http-client.type", "urlconnection"); return props; } - - default Map hadoopConfiguration() { - Map conf = new HashMap<>(); - conf.put("fs.s3a.access.key", "accessKey"); - conf.put("fs.s3a.secret.key", "secretKey"); - conf.put("fs.s3a.endpoint", getS3BaseUri().toString()); - return conf; - } } private static final class MockServerImpl implements MockServer { diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/adlsgen2/Path.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/adlsgen2/Path.java index b6fb1fe6f8d..7ed88e6d77a 100644 --- a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/adlsgen2/Path.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/adlsgen2/Path.java @@ -39,6 +39,12 @@ public interface Path { String lastModified(); + /** + * Not mentioned in the specs, but required by the ADLS client. See {@code + * com.azure.storage.file.datalake.models.PathItem}. + */ + long creationTime(); + String name(); @Nullable