Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC: fix incorrect paths when listing prefixes with ADLS #8661

Merged
merged 6 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build-logic/src/main/kotlin/Utilities.kt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ fun ModuleDependency.withSparkExcludes(): ModuleDependency {
.exclude("org.eclipse.jetty", "jetty-util")
.exclude("org.apache.avro", "avro")
.exclude("org.apache.arrow", "arrow-vector")
.exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
}

fun DependencyHandlerScope.forScala(scalaVersion: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void walkEmpty() throws Exception {

@Test
public void deleteNonExisting() {
assertThat(deleter().delete(FileReference.of(baseUri().resolve("fileX"), baseUri(), 123L)))
assertThat(deleter().delete(FileReference.of(StorageUri.of("fileX"), baseUri(), 123L)))
.isEqualTo(DeleteResult.SUCCESS);
}

Expand Down Expand Up @@ -133,7 +133,7 @@ public void manyFiles() throws Exception {
.deleteMultiple(
baseUri(),
IntStream.rangeClosed(1, deletes)
.mapToObj(i -> baseUri().resolve(dirAndFilename(i)))
.mapToObj(i -> StorageUri.of(dirAndFilename(i)))
.map(p -> FileReference.of(p, baseUri(), -1L))))
.isEqualTo(DeleteSummary.of(deletes, 0L));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.projectnessie.gc.files;

import static com.google.common.base.Preconditions.checkArgument;

import org.immutables.value.Value;
import org.projectnessie.storage.uri.StorageUri;

Expand All @@ -35,11 +37,20 @@ public interface FileReference {
@Value.Auxiliary
long modificationTimeMillisEpoch();

@Value.NonAttribute
/**
* Absolute path to the file/directory. Virtually equivalent to {@code base().resolve(path())}.
*/
@Value.Lazy
default StorageUri absolutePath() {
return base().resolve(path());
}

@Value.Check
default void check() {
checkArgument(base().isAbsolute(), "Base location must be absolute: %s", base());
checkArgument(!path().isAbsolute(), "Path must be relative: %s", path());
}

static ImmutableFileReference.Builder builder() {
return ImmutableFileReference.builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,10 @@ public void close() {}
path -> {
long l = markAndSweep.baseLocationToNum(path);
return Stream.of(
FileReference.of(path.resolve("metadata-" + l), path, 123L),
FileReference.of(path.resolve("1-unused"), path, 123L),
FileReference.of(path.resolve("2-unused"), path, markAndSweep.newestToDeleteMillis),
FileReference.of(path.resolve("too-new-2"), path, markAndSweep.tooNewMillis));
FileReference.of(StorageUri.of("metadata-" + l), path, 123L),
FileReference.of(StorageUri.of("1-unused"), path, 123L),
FileReference.of(StorageUri.of("2-unused"), path, markAndSweep.newestToDeleteMillis),
FileReference.of(StorageUri.of("too-new-2"), path, markAndSweep.tooNewMillis));
};
FileDeleter deleter =
fileObject -> {
Expand All @@ -299,13 +299,15 @@ public void close() {}
.fileDeleter(deleter)
.expectedFileCount(100)
.contentToFiles(
contentReference ->
Stream.of(
FileReference.of(
StorageUri.of(contentReference.metadataLocation()),
markAndSweep.numToBaseLocation(
markAndSweep.contentIdToNum(contentReference.contentId())),
-1L)))
contentReference -> {
StorageUri baseLocation =
markAndSweep.numToBaseLocation(
markAndSweep.contentIdToNum(contentReference.contentId()));
StorageUri location = StorageUri.of(contentReference.metadataLocation());
return Stream.of(
FileReference.of(
baseLocation.relativize(location), baseLocation, -1L));
})
.maxFileModificationTime(markAndSweep.maxFileModificationTime)
.build())
.build();
Expand Down
4 changes: 4 additions & 0 deletions gc/gc-iceberg-files/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ dependencies {
testFixturesRuntimeOnly("com.google.cloud:google-cloud-storage")
testFixturesRuntimeOnly(libs.google.cloud.nio)

testFixturesApi(platform(libs.azuresdk.bom))
testFixturesApi("com.azure:azure-storage-file-datalake")
testFixturesRuntimeOnly("com.azure:azure-identity")

testFixturesApi(platform(libs.junit.bom))
testFixturesApi(libs.bundles.junit.testing)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,14 @@ public Stream<FileReference> listRecursively(StorageUri path) throws NessieFileI
}
return StreamSupport.stream(fileInfos.spliterator(), false)
.map(
f ->
FileReference.of(
basePath.relativize(f.location()), basePath, f.createdAtMillis()));
f -> {
StorageUri location = StorageUri.of(f.location());
if (!location.isAbsolute()) {
location = basePath.resolve("/").resolve(location);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't basePath.relativize(location) below equal to location in this case?
(Maybe I'm too tired ATM tho)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh no :-)

Here is a typical case with ADLS:

basePath  = abfs://[email protected]/warehouse/ns/table/
f.location() = warehouse/ns/table/whatever.parquet

So basePath.relativize(location) would yield warehouse/ns/table/whatever.parquet which would be then resolved against basePath giving a wrong result:

abfs://[email protected]/warehouse/ns/table/warehouse/ns/table/whatever.parquet

So we need to first take the "root" URI:

basePath.resolve("/") = abfs://[email protected]/

Then resolve location:

root.resolve(location) = abfs://[email protected]/warehouse/ns/table/whatever.parquet

}
return FileReference.of(
basePath.relativize(location), basePath, f.createdAtMillis());
});
}

return listHadoop(basePath);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright (C) 2024 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.gc.iceberg.files;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.projectnessie.gc.files.DeleteSummary;
import org.projectnessie.gc.files.FileReference;
import org.projectnessie.objectstoragemock.Bucket;
import org.projectnessie.objectstoragemock.MockObject;
import org.projectnessie.objectstoragemock.ObjectStorageMock;
import org.projectnessie.storage.uri.StorageUri;

public abstract class AbstractFiles {

@Test
public void iceberg() throws Exception {
StorageUri baseUri = storageUri("/path/");

Set<String> keys = new TreeSet<>();
keys.add("path/file-1");
keys.add("path/file-2");
keys.add("path/file-3");
keys.add("path/dir-1/file-4");
keys.add("path/dir-1/dir-2/file-5");

try (ObjectStorageMock.MockServer server = createServer(keys);
IcebergFiles icebergFiles = createIcebergFiles(server)) {

Set<StorageUri> expect =
keys.stream().map(this::storageUri).collect(Collectors.toCollection(HashSet::new));

try (Stream<FileReference> files = icebergFiles.listRecursively(baseUri)) {
assertThat(files)
.allSatisfy(f -> assertThat(f.base()).isEqualTo(baseUri))
.map(FileReference::absolutePath)
.containsExactlyInAnyOrderElementsOf(expect);
}

icebergFiles.deleteMultiple(
baseUri,
Stream.of(
FileReference.of(StorageUri.of("file-2"), baseUri, -1L),
FileReference.of(StorageUri.of("file-3"), baseUri, -1L)));
expect.remove(baseUri.resolve("file-2"));
expect.remove(baseUri.resolve("file-3"));

try (Stream<FileReference> files = icebergFiles.listRecursively(baseUri)) {
assertThat(files)
.allSatisfy(f -> assertThat(f.base()).isEqualTo(baseUri))
.map(FileReference::absolutePath)
.containsExactlyInAnyOrderElementsOf(expect);
}

icebergFiles.delete(FileReference.of(StorageUri.of("dir-1/file-4"), baseUri, -1L));
expect.remove(baseUri.resolve("dir-1/file-4"));

try (Stream<FileReference> files = icebergFiles.listRecursively(baseUri)) {
assertThat(files)
.allSatisfy(f -> assertThat(f.base()).isEqualTo(baseUri))
.map(FileReference::absolutePath)
.containsExactlyInAnyOrderElementsOf(expect);
}
}
}

/**
* Creates many files, lists the files, deletes 10% of the created files, lists again.
*
* <p>Minio in the used configuration is not particularly fast - creating 100000 objects with 4
* threads (more crashes w/ timeouts) takes about ~30 minutes (plus ~3 seconds for listing 100000
* objects, plus ~3 seconds for deleting 10000 objects).
*/
@ParameterizedTest
@ValueSource(ints = {500})
public void manyFiles(int numFiles) throws Exception {
StorageUri baseUri = storageUri("/path/");

Set<String> keys =
IntStream.range(0, numFiles)
.mapToObj(i -> String.format("path/%d/%d", i % 100, i))
.collect(Collectors.toCollection(HashSet::new));

try (ObjectStorageMock.MockServer server = createServer(keys);
IcebergFiles icebergFiles = createIcebergFiles(server)) {

try (Stream<FileReference> files = icebergFiles.listRecursively(baseUri)) {
assertThat(files).hasSize(numFiles);
}

int deletes = numFiles / 10;
assertThat(
icebergFiles.deleteMultiple(
baseUri,
IntStream.range(0, deletes)
.mapToObj(i -> StorageUri.of(String.format("%d/%d", i % 100, i)))
.map(p -> FileReference.of(p, baseUri, -1L))))
.isEqualTo(DeleteSummary.of(deletes, 0L));

try (Stream<FileReference> files = icebergFiles.listRecursively(baseUri)) {
assertThat(files).hasSize(numFiles - deletes);
}
}
}

private ObjectStorageMock.MockServer createServer(Set<String> keys) {
return ObjectStorageMock.builder()
.putBuckets(
bucket(),
Bucket.builder()
.lister(
(String prefix, String offset) ->
keys.stream()
.map(
key ->
new Bucket.ListElement() {
@Override
public String key() {
return key;
}

@Override
public MockObject object() {
return MockObject.builder().build();
}
}))
.object(key -> keys.contains(key) ? MockObject.builder().build() : null)
.deleter(keys::remove)
.build())
.build()
.start();
}

private IcebergFiles createIcebergFiles(ObjectStorageMock.MockServer server) {
return IcebergFiles.builder()
.properties(icebergProperties(server))
.hadoopConfiguration(hadoopConfiguration(server))
.build();
}

protected abstract String bucket();

protected abstract StorageUri storageUri(String path);

protected abstract Map<String, ? extends String> icebergProperties(
ObjectStorageMock.MockServer server);

protected abstract Configuration hadoopConfiguration(ObjectStorageMock.MockServer server);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (C) 2022 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.gc.iceberg.files;

import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.projectnessie.objectstoragemock.ObjectStorageMock.MockServer;
import org.projectnessie.storage.uri.StorageUri;

public class TestIcebergAdlsFiles extends AbstractFiles {

@Override
protected String bucket() {
return "$root";
}

@Override
protected Map<String, ? extends String> icebergProperties(MockServer server) {
Map<String, String> props = new HashMap<>();

props.put("adls.connection-string.account", server.getAdlsGen2BaseUri().toString());
props.put("adls.auth.shared-key.account.name", "[email protected]");
props.put("adls.auth.shared-key.account.key", "key");

return props;
}

protected Configuration hadoopConfiguration(MockServer server) {
Configuration conf = new Configuration();

conf.set("fs.azure.impl", "org.apache.hadoop.fs.azure.AzureNativeFileSystemStore");
conf.set("fs.AbstractFileSystem.azure.impl", "org.apache.hadoop.fs.azurebfs.Abfs");
conf.set("fs.azure.storage.emulator.account.name", "account");
conf.set("fs.azure.account.auth.type", "SharedKey");
conf.set("fs.azure.account.key.account", "<base-64-encoded-secret>");

return conf;
}

@Override
protected StorageUri storageUri(String path) {
return StorageUri.of(String.format("abfs://%s@account/", bucket())).resolve(path);
}
}
Loading