Skip to content

Commit

Permalink
Add local storage ops implementation (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackye1995 authored Jan 24, 2025
1 parent 720f9d4 commit 2cd73a8
Show file tree
Hide file tree
Showing 17 changed files with 419 additions and 132 deletions.
19 changes: 12 additions & 7 deletions core/src/main/java/io/trinitylake/ObjectDefinitions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
import io.trinitylake.models.LakehouseDef;
import io.trinitylake.models.NamespaceDef;
import io.trinitylake.models.TableDef;
import io.trinitylake.storage.Storage;
import io.trinitylake.storage.LakehouseStorage;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class ObjectDefinitions {

public static void writeLakehouseDef(Storage storage, String path, LakehouseDef lakehouseDef) {
public static void writeLakehouseDef(
LakehouseStorage storage, String path, LakehouseDef lakehouseDef) {

try (OutputStream stream = storage.startWrite(path)) {
lakehouseDef.writeTo(stream);
Expand All @@ -35,7 +36,7 @@ public static void writeLakehouseDef(Storage storage, String path, LakehouseDef
}
}

public static LakehouseDef readLakehouseDef(Storage storage, String path) {
public static LakehouseDef readLakehouseDef(LakehouseStorage storage, String path) {
try (InputStream stream = storage.startRead(path)) {
return LakehouseDef.parseFrom(stream);
} catch (IOException e) {
Expand All @@ -48,7 +49,7 @@ public static LakehouseDef readLakehouseDef(Storage storage, String path) {
}

public static void writeNamespaceDef(
Storage storage, String path, String namespaceName, NamespaceDef namespaceDef) {
LakehouseStorage storage, String path, String namespaceName, NamespaceDef namespaceDef) {
try (OutputStream stream = storage.startWrite(path)) {
namespaceDef.writeTo(stream);
} catch (IOException e) {
Expand All @@ -61,7 +62,7 @@ public static void writeNamespaceDef(
}
}

public static NamespaceDef readNamespaceDef(Storage storage, String path) {
public static NamespaceDef readNamespaceDef(LakehouseStorage storage, String path) {
try (InputStream stream = storage.startRead(path)) {
return NamespaceDef.parseFrom(stream);
} catch (IOException e) {
Expand All @@ -74,7 +75,11 @@ public static NamespaceDef readNamespaceDef(Storage storage, String path) {
}

public static void writeTableDef(
Storage storage, String path, String namespaceName, String tableName, TableDef tableDef) {
LakehouseStorage storage,
String path,
String namespaceName,
String tableName,
TableDef tableDef) {
try (OutputStream stream = storage.startWrite(path)) {
tableDef.writeTo(stream);
} catch (IOException e) {
Expand All @@ -88,7 +93,7 @@ public static void writeTableDef(
}
}

public static TableDef readTableDef(Storage storage, String path) {
public static TableDef readTableDef(LakehouseStorage storage, String path) {
try (InputStream stream = storage.startRead(path)) {
return TableDef.parseFrom(stream);
} catch (IOException e) {
Expand Down
38 changes: 23 additions & 15 deletions core/src/main/java/io/trinitylake/TrinityLake.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.trinitylake.models.NamespaceDef;
import io.trinitylake.models.TableDef;
import io.trinitylake.storage.FilePaths;
import io.trinitylake.storage.Storage;
import io.trinitylake.storage.LakehouseStorage;
import io.trinitylake.tree.BasicTreeNode;
import io.trinitylake.tree.TreeKeys;
import io.trinitylake.tree.TreeNode;
Expand All @@ -34,7 +34,7 @@

public class TrinityLake {

public static void createLakehouse(Storage storage, LakehouseDef lakehouseDef) {
public static void createLakehouse(LakehouseStorage storage, LakehouseDef lakehouseDef) {
String lakehouseDefFilePath = FilePaths.newLakehouseDefFilePath();
ObjectDefinitions.writeLakehouseDef(storage, lakehouseDefFilePath, lakehouseDef);

Expand All @@ -45,11 +45,12 @@ public static void createLakehouse(Storage storage, LakehouseDef lakehouseDef) {
TreeOperations.writeNodeFile(storage, rootNodeFilePath, root);
}

public static RunningTransaction beginTransaction(Storage storage) {
public static RunningTransaction beginTransaction(LakehouseStorage storage) {
return beginTransaction(storage, ImmutableMap.of());
}

public static RunningTransaction beginTransaction(Storage storage, Map<String, String> options) {
public static RunningTransaction beginTransaction(
LakehouseStorage storage, Map<String, String> options) {
TreeNode current = TreeOperations.findLatestRoot(storage);
IsolationLevel isolationLevel = TransactionOptions.isolationLevel(options);
return ImmutableRunningTransaction.builder()
Expand All @@ -62,7 +63,7 @@ public static RunningTransaction beginTransaction(Storage storage, Map<String, S
}

public static CommittedTransaction commitTransaction(
Storage storage, RunningTransaction transaction) throws CommitFailureException {
LakehouseStorage storage, RunningTransaction transaction) throws CommitFailureException {
Preconditions.checkArgument(
!TreeOperations.hasVersion(transaction.runningRoot()),
"There is no change to be committed");
Expand All @@ -75,7 +76,8 @@ public static CommittedTransaction commitTransaction(
.build();
}

public static List<String> showNamespaces(Storage storage, RunningTransaction transaction) {
public static List<String> showNamespaces(
LakehouseStorage storage, RunningTransaction transaction) {
return transaction.runningRoot().allKeyValues().stream()
.map(Map.Entry::getKey)
.filter(TreeKeys::isNamespaceKey)
Expand All @@ -84,7 +86,7 @@ public static List<String> showNamespaces(Storage storage, RunningTransaction tr
}

public static NamespaceDef describeNamespace(
Storage storage, RunningTransaction transaction, String namespaceName)
LakehouseStorage storage, RunningTransaction transaction, String namespaceName)
throws ObjectNotFoundException {
LakehouseDef lakehouseDef = TreeOperations.findLakehouseDef(storage, transaction.runningRoot());
String namespaceKey = TreeKeys.namespaceKey(namespaceName, lakehouseDef);
Expand All @@ -96,7 +98,7 @@ public static NamespaceDef describeNamespace(
}

public static RunningTransaction createNamespace(
Storage storage,
LakehouseStorage storage,
RunningTransaction transaction,
String namespaceName,
NamespaceDef namespaceDef)
Expand All @@ -115,7 +117,7 @@ public static RunningTransaction createNamespace(
}

public static RunningTransaction alterNamespace(
Storage storage,
LakehouseStorage storage,
RunningTransaction transaction,
String namespaceName,
NamespaceDef namespaceDef)
Expand All @@ -134,7 +136,7 @@ public static RunningTransaction alterNamespace(
}

public static RunningTransaction dropNamespace(
Storage storage, RunningTransaction transaction, String namespaceName)
LakehouseStorage storage, RunningTransaction transaction, String namespaceName)
throws ObjectNotFoundException, CommitFailureException {
LakehouseDef lakehouseDef = TreeOperations.findLakehouseDef(storage, transaction.runningRoot());
String namespaceKey = TreeKeys.namespaceKey(namespaceName, lakehouseDef);
Expand All @@ -148,7 +150,7 @@ public static RunningTransaction dropNamespace(
}

public static List<String> showTables(
Storage storage, RunningTransaction transaction, String namespaceName)
LakehouseStorage storage, RunningTransaction transaction, String namespaceName)
throws ObjectNotFoundException {
return transaction.runningRoot().allKeyValues().stream()
.map(Map.Entry::getKey)
Expand All @@ -158,7 +160,10 @@ public static List<String> showTables(
}

public static TableDef describeTable(
Storage storage, RunningTransaction transaction, String namespaceName, String tableName)
LakehouseStorage storage,
RunningTransaction transaction,
String namespaceName,
String tableName)
throws ObjectNotFoundException {
LakehouseDef lakehouseDef = TreeOperations.findLakehouseDef(storage, transaction.runningRoot());
String namespaceKey = TreeKeys.namespaceKey(namespaceName, lakehouseDef);
Expand All @@ -175,7 +180,7 @@ public static TableDef describeTable(
}

public static RunningTransaction createTable(
Storage storage,
LakehouseStorage storage,
RunningTransaction transaction,
String namespaceName,
String tableName,
Expand All @@ -200,7 +205,7 @@ public static RunningTransaction createTable(
}

public static RunningTransaction alterTable(
Storage storage,
LakehouseStorage storage,
RunningTransaction transaction,
String namespaceName,
String tableName,
Expand All @@ -225,7 +230,10 @@ public static RunningTransaction alterTable(
}

public static RunningTransaction dropTable(
Storage storage, RunningTransaction transaction, String namespaceName, String tableName)
LakehouseStorage storage,
RunningTransaction transaction,
String namespaceName,
String tableName)
throws ObjectNotFoundException, CommitFailureException {
LakehouseDef lakehouseDef = TreeOperations.findLakehouseDef(storage, transaction.runningRoot());
String namespaceKey = TreeKeys.namespaceKey(namespaceName, lakehouseDef);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trinitylake.exception;

public class StorageCommitConflictException extends TrinityLakeRuntimeException {

public StorageCommitConflictException(Throwable cause) {
super(cause);
}

public StorageCommitConflictException(Throwable cause, String message, Object... args) {
super(cause, message, args);
}

public StorageCommitConflictException(String message, Object... args) {
super(message, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trinitylake.exception;

public class StorageListFailureException extends TrinityLakeRuntimeException {

public StorageListFailureException(Throwable cause) {
super(cause);
}

public StorageListFailureException(Throwable cause, String message, Object... args) {
super(cause, message, args);
}

public StorageListFailureException(String message, Object... args) {
super(message, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,22 @@
*/
package io.trinitylake.storage;

import io.trinitylake.exception.CommitFailureException;
import java.io.IOException;
import java.io.OutputStream;

public abstract class PositionOutputStream extends OutputStream {
public abstract class AtomicOutputStream extends OutputStream {

/**
* Return the current position in the OutputStream.
* Atomically seal the file that is being written to
*
* @return current position in bytes from the start of the stream
* @throws IOException If the underlying stream throws IOException
* @throws CommitFailureException if the sealing process fails due to atomicity conflict
* @throws IOException for any other failure in write
*/
public abstract long getPos() throws IOException;
public abstract void seal() throws CommitFailureException, IOException;

@Override
public void close() throws IOException {
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
*/
package io.trinitylake.storage;

public class BasicStorage implements Storage {
public class BasicLakehouseStorage implements LakehouseStorage {

private final URI root;
private final StorageOps ops;

public BasicStorage(URI root, StorageOps ops) {
public BasicLakehouseStorage(URI root, StorageOps ops) {
this.ops = ops;
this.root = root;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class BasicStorageOpsProperties implements StorageOpsProperties {
public class CommonStorageOpsProperties implements StorageOpsProperties {

private static final BasicStorageOpsProperties INSTANCE = new BasicStorageOpsProperties();
private static final CommonStorageOpsProperties INSTANCE = new CommonStorageOpsProperties();

public static final String DELETE_BATCH_SIZE = "delete.batch-size";
public static final int DELETE_BATCH_SIZE_DEFAULT = 1000;
Expand All @@ -39,22 +39,28 @@ public class BasicStorageOpsProperties implements StorageOpsProperties {
public static final String PREPARE_READ_STAGING_DIRECTORY_PATH_DEFAULT =
System.getProperty("java.io.tmpdir");

public static final String WRITE_STAGING_DIRECTORY = "write.staging-dir";
public static final String WRITE_STAGING_DIRECTORY_PATH_DEFAULT =
System.getProperty("java.io.tmpdir");

private final Map<String, String> propertiesMap;
private final int deleteBatchSize;
private final int prepareReadCacheSize;
private final long prepareReadCacheExpirationMillis;
private final String prepareReadStagingDirectoryPath;
private volatile File prepareReadStagingDirectory;
private final String writeStagingDirectoryPath;
private volatile File writeStagingDirectory;

public BasicStorageOpsProperties() {
public CommonStorageOpsProperties() {
this(ImmutableMap.of());
}

public static BasicStorageOpsProperties instance() {
public static CommonStorageOpsProperties instance() {
return INSTANCE;
}

public BasicStorageOpsProperties(Map<String, String> input) {
public CommonStorageOpsProperties(Map<String, String> input) {
this.propertiesMap = ImmutableMap.copyOf(input);
this.deleteBatchSize =
PropertyUtil.propertyAsInt(input, DELETE_BATCH_SIZE, DELETE_BATCH_SIZE_DEFAULT);
Expand All @@ -68,6 +74,9 @@ public BasicStorageOpsProperties(Map<String, String> input) {
this.prepareReadStagingDirectoryPath =
PropertyUtil.propertyAsString(
input, PREPARE_READ_STAGING_DIRECTORY, PREPARE_READ_STAGING_DIRECTORY_PATH_DEFAULT);
this.writeStagingDirectoryPath =
PropertyUtil.propertyAsString(
input, WRITE_STAGING_DIRECTORY, WRITE_STAGING_DIRECTORY_PATH_DEFAULT);
}

@Override
Expand Down Expand Up @@ -99,4 +108,17 @@ public File prepareReadStagingDirectory() {
}
return prepareReadStagingDirectory;
}

public File writeStagingDirectory() {
if (writeStagingDirectory == null) {
synchronized (this) {
if (writeStagingDirectory == null) {
File path = new File(writeStagingDirectoryPath);
FileUtil.createStagingDirectoryIfNotExists(path);
this.writeStagingDirectory = path;
}
}
}
return writeStagingDirectory;
}
}
Loading

0 comments on commit 2cd73a8

Please sign in to comment.