Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rest] Refactor RESTTokenFileIO to cache FileIO in static cache #4965

Merged
merged 5 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.annotation.concurrent.ThreadSafe;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
Expand Down Expand Up @@ -62,7 +63,7 @@
*/
@Public
@ThreadSafe
public interface FileIO extends Serializable {
public interface FileIO extends Serializable, Closeable {

Logger LOG = LoggerFactory.getLogger(FileIO.class);

Expand Down Expand Up @@ -230,6 +231,13 @@ default FileStatus[] listDirectories(Path path) throws IOException {
*/
boolean rename(Path src, Path dst) throws IOException;

/**
* Override this method to empty, many FileIO implementation classes rely on static variables
* and do not have the ability to close them.
*/
@Override
default void close() {}

// -------------------------------------------------------------------------
// utils
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public FileIO fileIO() {
return fileIO;
}

@Override
public FileIO fileIO(Path path) {
protected FileIO fileIO(Path path) {
return fileIO;
}

Expand Down Expand Up @@ -370,7 +369,12 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
new RenamingSnapshotCommit.Factory(
lockFactory().orElse(null), lockContext().orElse(null));
return CatalogUtils.loadTable(
this, identifier, fileIO(), this::loadTableMetadata, commitFactory);
this,
identifier,
p -> fileIO(),
this::fileIO,
this::loadTableMetadata,
commitFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.annotation.Public;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -375,9 +374,6 @@ default void repairTable(Identifier identifier) throws TableNotExistException {
/** {@link FileIO} of this catalog. It can access {@link #warehouse()} path. */
FileIO fileIO();

/** {@link FileIO} of this catalog. */
FileIO fileIO(Path path);

/** Catalog options for re-creating this catalog. */
Map<String, String> options();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
import static org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGCY_NAME;
Expand Down Expand Up @@ -171,7 +172,8 @@ public static List<Partition> listPartitionsFromFileSystem(Table table) {
public static Table loadTable(
Catalog catalog,
Identifier identifier,
FileIO fileIO,
Function<Path, FileIO> dataFileIO,
Function<Path, FileIO> objectFileIO,
TableMetadata.Loader metadataLoader,
SnapshotCommit.Factory commitFactory)
throws Catalog.TableNotExistException {
Expand All @@ -190,10 +192,11 @@ public static Table loadTable(
new CatalogEnvironment(
identifier, metadata.uuid(), catalog.catalogLoader(), commitFactory);
Path path = new Path(schema.options().get(PATH.key()));
FileStoreTable table = FileStoreTableFactory.create(fileIO, path, schema, catalogEnv);
FileStoreTable table =
FileStoreTableFactory.create(dataFileIO.apply(path), path, schema, catalogEnv);

if (options.type() == TableType.OBJECT_TABLE) {
table = toObjectTable(catalog, table);
table = toObjectTable(objectFileIO, table);
}

if (identifier.isSystemTable()) {
Expand Down Expand Up @@ -265,10 +268,11 @@ private static FormatTable toFormatTable(Identifier identifier, TableSchema sche
.build();
}

private static ObjectTable toObjectTable(Catalog catalog, FileStoreTable underlyingTable) {
private static ObjectTable toObjectTable(
Function<Path, FileIO> fileIOLoader, FileStoreTable underlyingTable) {
CoreOptions options = underlyingTable.coreOptions();
String objectLocation = options.objectLocation();
FileIO objectFileIO = catalog.fileIO(new Path(objectLocation));
FileIO objectFileIO = fileIOLoader.apply(new Path(objectLocation));
return ObjectTable.builder()
.underlyingTable(underlyingTable)
.objectLocation(objectLocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.catalog;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -62,11 +61,6 @@ public FileIO fileIO() {
return wrapped.fileIO();
}

@Override
public FileIO fileIO(Path path) {
return wrapped.fileIO(path);
}

@Override
public List<String> listDatabases() {
return wrapped.listDatabases();
Expand Down
123 changes: 56 additions & 67 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.rest.auth.AuthSession;
Expand All @@ -56,6 +55,7 @@
import org.apache.paimon.rest.responses.ErrorResponseResourceType;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
import org.apache.paimon.rest.responses.GetTableTokenResponse;
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListPartitionsResponse;
Expand All @@ -75,10 +75,8 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -95,104 +93,81 @@
import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.apache.paimon.rest.RESTUtil.extractPrefixMap;
import static org.apache.paimon.rest.auth.AuthSession.createAuthSession;
import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;

/** A catalog implementation for REST. */
public class RESTCatalog implements Catalog {

private static final Logger LOG = LoggerFactory.getLogger(RESTCatalog.class);
public static final String HEADER_PREFIX = "header.";

private final RESTClient client;
private final ResourcePaths resourcePaths;
private final AuthSession catalogAuth;
private final Options options;
private final boolean fileIORefreshCredentialEnable;
private final CatalogContext context;
private final boolean dataTokenEnabled;
private final FileIO fileIO;

private volatile ScheduledExecutorService refreshExecutor = null;

public RESTCatalog(CatalogContext context) {
if (context.options().getOptional(CatalogOptions.WAREHOUSE).isPresent()) {
throw new IllegalArgumentException("Can not config warehouse in RESTCatalog.");
}
this(context, true);
}

public RESTCatalog(CatalogContext context, boolean configRequired) {
this.client = new HttpClient(context.options());
this.catalogAuth = createAuthSession(context.options(), tokenRefreshExecutor());

Map<String, String> initHeaders =
RESTUtil.merge(
extractPrefixMap(context.options(), HEADER_PREFIX),
catalogAuth.getHeaders());
this.options =
new Options(
client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, initHeaders)
.merge(context.options().toMap()));
this.resourcePaths = ResourcePaths.forCatalogProperties(options);

this.fileIORefreshCredentialEnable =
options.get(RESTCatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE);
try {
if (fileIORefreshCredentialEnable) {
this.fileIO = null;
} else {
String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
this.fileIO =
FileIO.get(
new Path(warehouseStr),
CatalogContext.create(
options, context.preferIO(), context.fallbackIO()));
Options options = context.options();
if (configRequired) {
if (context.options().contains(WAREHOUSE)) {
throw new IllegalArgumentException("Can not config warehouse in RESTCatalog.");
}
} catch (IOException e) {
LOG.warn("Can not get FileIO from options.");
throw new RuntimeException(e);

Map<String, String> initHeaders =
RESTUtil.merge(
extractPrefixMap(context.options(), HEADER_PREFIX),
catalogAuth.getHeaders());
options =
new Options(
client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, initHeaders)
.merge(context.options().toMap()));
}
}

protected RESTCatalog(Options options, FileIO fileIO) {
this.client = new HttpClient(options);
this.catalogAuth = createAuthSession(options, tokenRefreshExecutor());
this.options = options;
context = CatalogContext.create(options, context.preferIO(), context.fallbackIO());
this.context = context;
this.resourcePaths = ResourcePaths.forCatalogProperties(options);
this.fileIO = fileIO;
this.fileIORefreshCredentialEnable =
options.get(RESTCatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE);

this.dataTokenEnabled = options.get(RESTCatalogOptions.DATA_TOKEN_ENABLED);
this.fileIO = dataTokenEnabled ? null : fileIOFromOptions(new Path(options.get(WAREHOUSE)));
}

@Override
public String warehouse() {
return options.get(CatalogOptions.WAREHOUSE);
return context.options().get(WAREHOUSE);
}

@Override
public Map<String, String> options() {
return options.toMap();
return context.options().toMap();
}

@Override
public RESTCatalogLoader catalogLoader() {
return new RESTCatalogLoader(options, fileIO);
return new RESTCatalogLoader(context);
}

@Override
public FileIO fileIO() {
if (fileIORefreshCredentialEnable) {
// TODO remove Catalog.fileIO
if (dataTokenEnabled) {
throw new UnsupportedOperationException();
}
return fileIO;
}

@Override
public FileIO fileIO(Path path) {
try {
return FileIO.get(path, CatalogContext.create(options));
} catch (IOException e) {
LOG.warn("Can not get FileIO from options.");
throw new RuntimeException(e);
}
}

@Override
public List<String> listDatabases() {
ListDatabasesResponse response =
Expand Down Expand Up @@ -306,11 +281,33 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
return CatalogUtils.loadTable(
this,
identifier,
this.fileIO(identifier),
path -> fileIOForData(path, identifier),
this::fileIOFromOptions,
this::loadTableMetadata,
new RESTSnapshotCommitFactory(catalogLoader()));
}

private FileIO fileIOForData(Path path, Identifier identifier) {
return dataTokenEnabled
? new RESTTokenFileIO(catalogLoader(), this, identifier, path)
: this.fileIO;
}

private FileIO fileIOFromOptions(Path path) {
try {
return FileIO.get(path, context);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

protected GetTableTokenResponse loadTableToken(Identifier identifier) {
return client.get(
resourcePaths.tableToken(identifier.getDatabaseName(), identifier.getObjectName()),
GetTableTokenResponse.class,
catalogAuth.getHeaders());
}

public boolean commitSnapshot(Identifier identifier, Snapshot snapshot) {
CommitTableRequest request = new CommitTableRequest(identifier, snapshot);
CommitTableResponse response =
Expand Down Expand Up @@ -630,7 +627,7 @@ public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfN

@Override
public boolean caseSensitive() {
return options.getOptional(CASE_SENSITIVE).orElse(true);
return context.options().getOptional(CASE_SENSITIVE).orElse(true);
}

@Override
Expand Down Expand Up @@ -663,12 +660,4 @@ private ScheduledExecutorService tokenRefreshExecutor() {

return refreshExecutor;
}

private FileIO fileIO(Identifier identifier) {
if (fileIORefreshCredentialEnable) {
return new RefreshCredentialFileIO(
resourcePaths, catalogAuth, options, client, identifier);
}
return fileIO;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,26 @@

package org.apache.paimon.rest;

import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.options.Options;

/** Loader to create {@link RESTCatalog}. */
public class RESTCatalogLoader implements CatalogLoader {

private static final long serialVersionUID = 1L;

private final Options options;
private final FileIO fileIO;
private final CatalogContext context;

public RESTCatalogLoader(Options options, FileIO fileIO) {
this.options = options;
this.fileIO = fileIO;
public RESTCatalogLoader(CatalogContext context) {
this.context = context;
}

public CatalogContext context() {
return context;
}

@Override
public RESTCatalog load() {
return new RESTCatalog(options, fileIO);
return new RESTCatalog(context, false);
}
}
Loading
Loading