diff --git a/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchEntityException.java b/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchEntityException.java index 27ba0569e3b..ecea805a5d2 100644 --- a/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchEntityException.java +++ b/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchEntityException.java @@ -10,7 +10,7 @@ /** This exception is thrown when an entity is not found. */ public class NoSuchEntityException extends RuntimeException { /** The no such entity message for the exception. */ - public static final String NO_SUCH_ENTITY_MESSAGE = "No such entity: %s"; + public static final String NO_SUCH_ENTITY_MESSAGE = "No such %s entity: %s"; /** * Constructs a new NoSuchEntityException. diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java index c62676293d4..a02d6499e58 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java @@ -17,8 +17,10 @@ import com.datastrato.gravitino.exceptions.NoSuchEntityException; import com.datastrato.gravitino.meta.BaseMetalake; import com.datastrato.gravitino.meta.CatalogEntity; +import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.storage.relational.service.CatalogMetaService; import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService; +import com.datastrato.gravitino.storage.relational.service.SchemaMetaService; import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper; import java.io.IOException; import java.util.List; @@ -46,6 +48,8 @@ public List list( return (List) MetalakeMetaService.getInstance().listMetalakes(); case CATALOG: return (List) CatalogMetaService.getInstance().listCatalogsByNamespace(namespace); + case SCHEMA: + return (List) SchemaMetaService.getInstance().listSchemasByNamespace(namespace); default: throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for list operation", entityType); @@ -69,6 +73,8 @@ public void insert(E e, boolean overwritten) MetalakeMetaService.getInstance().insertMetalake((BaseMetalake) e, overwritten); } else if (e instanceof CatalogEntity) { CatalogMetaService.getInstance().insertCatalog((CatalogEntity) e, overwritten); + } else if (e instanceof SchemaEntity) { + SchemaMetaService.getInstance().insertSchema((SchemaEntity) e, overwritten); } else { throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for insert operation", e.getClass()); @@ -84,6 +90,8 @@ public E update( return (E) MetalakeMetaService.getInstance().updateMetalake(ident, updater); case CATALOG: return (E) CatalogMetaService.getInstance().updateCatalog(ident, updater); + case SCHEMA: + return (E) SchemaMetaService.getInstance().updateSchema(ident, updater); default: throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for update operation", entityType); @@ -98,6 +106,8 @@ public E get( return (E) MetalakeMetaService.getInstance().getMetalakeByIdentifier(ident); case CATALOG: return (E) CatalogMetaService.getInstance().getCatalogByIdentifier(ident); + case SCHEMA: + return (E) SchemaMetaService.getInstance().getSchemaByIdentifier(ident); default: throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for get operation", entityType); @@ -111,6 +121,8 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea return MetalakeMetaService.getInstance().deleteMetalake(ident, cascade); case CATALOG: return CatalogMetaService.getInstance().deleteCatalog(ident, cascade); + case SCHEMA: + return SchemaMetaService.getInstance().deleteSchema(ident, cascade); default: throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for delete operation", entityType); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java new file mode 100644 index 00000000000..3e8723b053d --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java @@ -0,0 +1,152 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational.mapper; + +import com.datastrato.gravitino.storage.relational.po.SchemaPO; +import java.util.List; +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.annotations.Update; + +/** + * A MyBatis Mapper for schema meta operation SQLs. + * + *

This interface class is a specification defined by MyBatis. It requires this interface class + * to identify the corresponding SQLs for execution. We can write SQLs in an additional XML file, or + * write SQLs with annotations in this interface Mapper. See: + */ +public interface SchemaMetaMapper { + String TABLE_NAME = "schema_meta"; + + @Select( + "SELECT schema_id as schemaId, schema_name as schemaName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_comment as schemaComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + List listSchemaPOsByCatalogId(@Param("catalogId") Long catalogId); + + @Select( + "SELECT schema_id as schemaId FROM " + + TABLE_NAME + + " WHERE catalog_id = #{catalogId} AND schema_name = #{schemaName}" + + " AND deleted_at = 0") + Long selectSchemaIdByCatalogIdAndName( + @Param("catalogId") Long catalogId, @Param("schemaName") String name); + + @Select( + "SELECT schema_id as schemaId, schema_name as schemaName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_comment as schemaComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE catalog_id = #{catalogId} AND schema_name = #{schemaName} AND deleted_at = 0") + SchemaPO selectSchemaMetaByCatalogIdAndName( + @Param("catalogId") Long catalogId, @Param("schemaName") String name); + + @Insert( + "INSERT INTO " + + TABLE_NAME + + "(schema_id, schema_name, metalake_id," + + " catalog_id, schema_comment, properties, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{schemaMeta.schemaId}," + + " #{schemaMeta.schemaName}," + + " #{schemaMeta.metalakeId}," + + " #{schemaMeta.catalogId}," + + " #{schemaMeta.schemaComment}," + + " #{schemaMeta.properties}," + + " #{schemaMeta.auditInfo}," + + " #{schemaMeta.currentVersion}," + + " #{schemaMeta.lastVersion}," + + " #{schemaMeta.deletedAt}" + + " )") + void insertSchemaMeta(@Param("schemaMeta") SchemaPO schemaPO); + + @Insert( + "INSERT INTO " + + TABLE_NAME + + "(schema_id, schema_name, metalake_id," + + " catalog_id, schema_comment, properties, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{schemaMeta.schemaId}," + + " #{schemaMeta.schemaName}," + + " #{schemaMeta.metalakeId}," + + " #{schemaMeta.catalogId}," + + " #{schemaMeta.schemaComment}," + + " #{schemaMeta.properties}," + + " #{schemaMeta.auditInfo}," + + " #{schemaMeta.currentVersion}," + + " #{schemaMeta.lastVersion}," + + " #{schemaMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " schema_name = #{schemaMeta.schemaName}," + + " metalake_id = #{schemaMeta.metalakeId}," + + " catalog_id = #{schemaMeta.catalogId}," + + " schema_comment = #{schemaMeta.schemaComment}," + + " properties = #{schemaMeta.properties}," + + " audit_info = #{schemaMeta.auditInfo}," + + " current_version = #{schemaMeta.currentVersion}," + + " last_version = #{schemaMeta.lastVersion}," + + " deleted_at = #{schemaMeta.deletedAt}") + void insertSchemaMetaOnDuplicateKeyUpdate(@Param("schemaMeta") SchemaPO schemaPO); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET schema_name = #{newSchemaMeta.schemaName}," + + " metalake_id = #{newSchemaMeta.metalakeId}," + + " catalog_id = #{newSchemaMeta.catalogId}," + + " schema_comment = #{newSchemaMeta.schemaComment}," + + " properties = #{newSchemaMeta.properties}," + + " audit_info = #{newSchemaMeta.auditInfo}," + + " current_version = #{newSchemaMeta.currentVersion}," + + " last_version = #{newSchemaMeta.lastVersion}," + + " deleted_at = #{newSchemaMeta.deletedAt}" + + " WHERE schema_id = #{oldSchemaMeta.schemaId}" + + " AND schema_name = #{oldSchemaMeta.schemaName}" + + " AND metalake_id = #{oldSchemaMeta.metalakeId}" + + " AND catalog_id = #{oldSchemaMeta.catalogId}" + + " AND schema_comment = #{oldSchemaMeta.schemaComment}" + + " AND properties = #{oldSchemaMeta.properties}" + + " AND audit_info = #{oldSchemaMeta.auditInfo}" + + " AND current_version = #{oldSchemaMeta.currentVersion}" + + " AND last_version = #{oldSchemaMeta.lastVersion}" + + " AND deleted_at = 0") + Integer updateSchemaMeta( + @Param("newSchemaMeta") SchemaPO newSchemaPO, @Param("oldSchemaMeta") SchemaPO oldSchemaPO); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET deleted_at = UNIX_TIMESTAMP()" + + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + Integer softDeleteSchemaMetasBySchemaId(@Param("schemaId") Long schemaId); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET deleted_at = UNIX_TIMESTAMP()" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + Integer softDeleteSchemaMetasByMetalakeId(@Param("metalakeId") Long metalakeId); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET deleted_at = UNIX_TIMESTAMP()" + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + Integer softDeleteSchemaMetasByCatalogId(@Param("catalogId") Long catalogId); +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/po/CatalogPO.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/CatalogPO.java index 10f7df52c06..95d9b434f37 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/po/CatalogPO.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/CatalogPO.java @@ -6,6 +6,7 @@ package com.datastrato.gravitino.storage.relational.po; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; public class CatalogPO { private Long catalogId; @@ -164,7 +165,20 @@ public CatalogPO.Builder withDeletedAt(Long deletedAt) { return this; } + private void validate() { + Preconditions.checkArgument(metalakePO.catalogId != null, "Catalog id is required"); + Preconditions.checkArgument(metalakePO.catalogName != null, "Catalog name is required"); + Preconditions.checkArgument(metalakePO.metalakeId != null, "Metalake id is required"); + Preconditions.checkArgument(metalakePO.type != null, "Catalog type is required"); + Preconditions.checkArgument(metalakePO.provider != null, "Catalog provider is required"); + Preconditions.checkArgument(metalakePO.auditInfo != null, "Audit info is required"); + Preconditions.checkArgument(metalakePO.currentVersion != null, "Current version is required"); + Preconditions.checkArgument(metalakePO.lastVersion != null, "Last version is required"); + Preconditions.checkArgument(metalakePO.deletedAt != null, "Deleted at is required"); + } + public CatalogPO build() { + validate(); return metalakePO; } } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/po/MetalakePO.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/MetalakePO.java index 61182f10fb2..61f49545277 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/po/MetalakePO.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/MetalakePO.java @@ -6,6 +6,7 @@ package com.datastrato.gravitino.storage.relational.po; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; public class MetalakePO { private Long metalakeId; @@ -140,7 +141,18 @@ public MetalakePO.Builder withDeletedAt(Long deletedAt) { return this; } + private void validate() { + Preconditions.checkArgument(metalakePO.metalakeId != null, "Metalake id is required"); + Preconditions.checkArgument(metalakePO.metalakeName != null, "Metalake name is required"); + Preconditions.checkArgument(metalakePO.auditInfo != null, "Audit info is required"); + Preconditions.checkArgument(metalakePO.schemaVersion != null, "Schema version is required"); + Preconditions.checkArgument(metalakePO.currentVersion != null, "Current version is required"); + Preconditions.checkArgument(metalakePO.lastVersion != null, "Last version is required"); + Preconditions.checkArgument(metalakePO.deletedAt != null, "Deleted at is required"); + } + public MetalakePO build() { + validate(); return metalakePO; } } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/po/SchemaPO.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/SchemaPO.java new file mode 100644 index 00000000000..d4352e135c3 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/SchemaPO.java @@ -0,0 +1,171 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.storage.relational.po; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +public class SchemaPO { + private Long schemaId; + private String schemaName; + private Long metalakeId; + private Long catalogId; + private String schemaComment; + private String properties; + private String auditInfo; + private Long currentVersion; + private Long lastVersion; + private Long deletedAt; + + public Long getSchemaId() { + return schemaId; + } + + public String getSchemaName() { + return schemaName; + } + + public Long getMetalakeId() { + return metalakeId; + } + + public Long getCatalogId() { + return catalogId; + } + + public String getSchemaComment() { + return schemaComment; + } + + public String getProperties() { + return properties; + } + + public String getAuditInfo() { + return auditInfo; + } + + public Long getCurrentVersion() { + return currentVersion; + } + + public Long getLastVersion() { + return lastVersion; + } + + public Long getDeletedAt() { + return deletedAt; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SchemaPO)) { + return false; + } + SchemaPO schemaPO = (SchemaPO) o; + return Objects.equal(getSchemaId(), schemaPO.getSchemaId()) + && Objects.equal(getSchemaName(), schemaPO.getSchemaName()) + && Objects.equal(getMetalakeId(), schemaPO.getMetalakeId()) + && Objects.equal(getCatalogId(), schemaPO.getCatalogId()) + && Objects.equal(getSchemaComment(), schemaPO.getSchemaComment()) + && Objects.equal(getProperties(), schemaPO.getProperties()) + && Objects.equal(getAuditInfo(), schemaPO.getAuditInfo()) + && Objects.equal(getCurrentVersion(), schemaPO.getCurrentVersion()) + && Objects.equal(getLastVersion(), schemaPO.getLastVersion()) + && Objects.equal(getDeletedAt(), schemaPO.getDeletedAt()); + } + + @Override + public int hashCode() { + return Objects.hashCode( + getSchemaId(), + getSchemaName(), + getMetalakeId(), + getCatalogId(), + getSchemaComment(), + getProperties(), + getAuditInfo(), + getCurrentVersion(), + getLastVersion(), + getDeletedAt()); + } + + public static class Builder { + private final SchemaPO schemaPO; + + public Builder() { + schemaPO = new SchemaPO(); + } + + public Builder withSchemaId(Long schemaId) { + schemaPO.schemaId = schemaId; + return this; + } + + public Builder withSchemaName(String schemaName) { + schemaPO.schemaName = schemaName; + return this; + } + + public Builder withMetalakeId(Long metalakeId) { + schemaPO.metalakeId = metalakeId; + return this; + } + + public Builder withCatalogId(Long catalogId) { + schemaPO.catalogId = catalogId; + return this; + } + + public Builder withSchemaComment(String schemaComment) { + schemaPO.schemaComment = schemaComment; + return this; + } + + public Builder withProperties(String properties) { + schemaPO.properties = properties; + return this; + } + + public Builder withAuditInfo(String auditInfo) { + schemaPO.auditInfo = auditInfo; + return this; + } + + public Builder withCurrentVersion(Long currentVersion) { + schemaPO.currentVersion = currentVersion; + return this; + } + + public Builder withLastVersion(Long lastVersion) { + schemaPO.lastVersion = lastVersion; + return this; + } + + public Builder withDeletedAt(Long deletedAt) { + schemaPO.deletedAt = deletedAt; + return this; + } + + private void validate() { + Preconditions.checkArgument(schemaPO.schemaId != null, "Schema id is required"); + Preconditions.checkArgument(schemaPO.schemaName != null, "Schema name is required"); + Preconditions.checkArgument(schemaPO.metalakeId != null, "Metalake id is required"); + Preconditions.checkArgument(schemaPO.catalogId != null, "Catalog id is required"); + Preconditions.checkArgument(schemaPO.auditInfo != null, "Audit info is required"); + Preconditions.checkArgument(schemaPO.currentVersion != null, "Current version is required"); + Preconditions.checkArgument(schemaPO.lastVersion != null, "Last version is required"); + Preconditions.checkArgument(schemaPO.deletedAt != null, "Deleted at is required"); + } + + public SchemaPO build() { + validate(); + return schemaPO; + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java index 27e22d47f9a..4590545319d 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java @@ -5,20 +5,21 @@ package com.datastrato.gravitino.storage.relational.service; import com.datastrato.gravitino.Entity; -import com.datastrato.gravitino.EntityAlreadyExistsException; import com.datastrato.gravitino.HasIdentifier; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.exceptions.NonEmptyEntityException; import com.datastrato.gravitino.meta.CatalogEntity; +import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.storage.relational.mapper.CatalogMetaMapper; -import com.datastrato.gravitino.storage.relational.mapper.MetalakeMetaMapper; +import com.datastrato.gravitino.storage.relational.mapper.SchemaMetaMapper; import com.datastrato.gravitino.storage.relational.po.CatalogPO; +import com.datastrato.gravitino.storage.relational.utils.ExceptionUtils; import com.datastrato.gravitino.storage.relational.utils.POConverters; import com.datastrato.gravitino.storage.relational.utils.SessionUtils; import com.google.common.base.Preconditions; import java.io.IOException; -import java.sql.SQLIntegrityConstraintViolationException; import java.util.List; import java.util.Objects; import java.util.function.Function; @@ -35,55 +36,67 @@ public static CatalogMetaService getInstance() { private CatalogMetaService() {} - public CatalogEntity getCatalogByIdentifier(NameIdentifier identifier) { - NameIdentifier.checkCatalog(identifier); - String metalakeName = identifier.namespace().level(0); - String catalogName = identifier.name(); - Long metalakeId = - SessionUtils.getWithoutCommit( - MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(metalakeName)); - if (metalakeId == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, identifier.namespace().toString()); - } + public CatalogPO getCatalogPOByMetalakeIdAndName(Long metalakeId, String catalogName) { CatalogPO catalogPO = SessionUtils.getWithoutCommit( CatalogMetaMapper.class, mapper -> mapper.selectCatalogMetaByMetalakeIdAndName(metalakeId, catalogName)); + if (catalogPO == null) { throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, identifier.toString()); + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.CATALOG.name().toLowerCase(), + catalogName); } - return POConverters.fromCatalogPO(catalogPO, identifier.namespace()); + return catalogPO; } - public List listCatalogsByNamespace(Namespace namespace) { - Namespace.checkCatalog(namespace); - String metalakeName = namespace.level(0); - Long metalakeId = + public Long getCatalogIdByMetalakeIdAndName(Long metalakeId, String catalogName) { + Long catalogId = SessionUtils.getWithoutCommit( - MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(metalakeName)); - if (metalakeId == null) { + CatalogMetaMapper.class, + mapper -> mapper.selectCatalogIdByMetalakeIdAndName(metalakeId, catalogName)); + + if (catalogId == null) { throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, namespace.toString()); + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.CATALOG.name().toLowerCase(), + catalogName); } + return catalogId; + } + + public CatalogEntity getCatalogByIdentifier(NameIdentifier identifier) { + NameIdentifier.checkCatalog(identifier); + String catalogName = identifier.name(); + + Long metalakeId = + CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace()); + + CatalogPO catalogPO = getCatalogPOByMetalakeIdAndName(metalakeId, catalogName); + + return POConverters.fromCatalogPO(catalogPO, identifier.namespace()); + } + + public List listCatalogsByNamespace(Namespace namespace) { + Namespace.checkCatalog(namespace); + + Long metalakeId = CommonMetaService.getInstance().getParentEntityIdByNamespace(namespace); + List catalogPOS = SessionUtils.getWithoutCommit( CatalogMetaMapper.class, mapper -> mapper.listCatalogPOsByMetalakeId(metalakeId)); + return POConverters.fromCatalogPOs(catalogPOS, namespace); } public void insertCatalog(CatalogEntity catalogEntity, boolean overwrite) { try { NameIdentifier.checkCatalog(catalogEntity.nameIdentifier()); + Long metalakeId = - SessionUtils.getWithoutCommit( - MetalakeMetaMapper.class, - mapper -> mapper.selectMetalakeIdMetaByName(catalogEntity.namespace().level(0))); - if (metalakeId == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, catalogEntity.namespace().toString()); - } + CommonMetaService.getInstance().getParentEntityIdByNamespace(catalogEntity.namespace()); + SessionUtils.doWithCommit( CatalogMetaMapper.class, mapper -> { @@ -95,16 +108,8 @@ public void insertCatalog(CatalogEntity catalogEntity, boolean overwrite) { } }); } catch (RuntimeException re) { - if (re.getCause() != null - && re.getCause().getCause() != null - && re.getCause().getCause() instanceof SQLIntegrityConstraintViolationException) { - // TODO We should make more fine-grained exception judgments - // Usually throwing `SQLIntegrityConstraintViolationException` means that - // SQL violates the constraints of `primary key` and `unique key`. - // We simply think that the entity already exists at this time. - throw new EntityAlreadyExistsException( - String.format("Catalog entity: %s already exists", catalogEntity.nameIdentifier())); - } + ExceptionUtils.checkSQLConstraintException( + re, Entity.EntityType.CATALOG, catalogEntity.nameIdentifier().toString()); throw re; } } @@ -112,24 +117,12 @@ public void insertCatalog(CatalogEntity catalogEntity, boolean overwrite) { public CatalogEntity updateCatalog( NameIdentifier identifier, Function updater) throws IOException { NameIdentifier.checkCatalog(identifier); - String metalakeName = identifier.namespace().level(0); + String catalogName = identifier.name(); Long metalakeId = - SessionUtils.getWithoutCommit( - MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(metalakeName)); - if (metalakeId == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, identifier.namespace().toString()); - } + CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace()); - CatalogPO oldCatalogPO = - SessionUtils.getWithoutCommit( - CatalogMetaMapper.class, - mapper -> mapper.selectCatalogMetaByMetalakeIdAndName(metalakeId, catalogName)); - if (oldCatalogPO == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, identifier.toString()); - } + CatalogPO oldCatalogPO = getCatalogPOByMetalakeIdAndName(metalakeId, catalogName); CatalogEntity oldCatalogEntity = POConverters.fromCatalogPO(oldCatalogPO, identifier.namespace()); @@ -139,6 +132,7 @@ public CatalogEntity updateCatalog( "The updated catalog entity id: %s should be same with the catalog entity id before: %s", newEntity.id(), oldCatalogEntity.id()); + Integer updateResult; try { updateResult = @@ -149,16 +143,8 @@ public CatalogEntity updateCatalog( POConverters.updateCatalogPOWithVersion(oldCatalogPO, newEntity, metalakeId), oldCatalogPO)); } catch (RuntimeException re) { - if (re.getCause() != null - && re.getCause().getCause() != null - && re.getCause().getCause() instanceof SQLIntegrityConstraintViolationException) { - // TODO We should make more fine-grained exception judgments - // Usually throwing `SQLIntegrityConstraintViolationException` means that - // SQL violates the constraints of `primary key` and `unique key`. - // We simply think that the entity already exists at this time. - throw new EntityAlreadyExistsException( - String.format("Catalog entity: %s already exists", newEntity.nameIdentifier())); - } + ExceptionUtils.checkSQLConstraintException( + re, Entity.EntityType.CATALOG, newEntity.nameIdentifier().toString()); throw re; } @@ -171,36 +157,39 @@ public CatalogEntity updateCatalog( public boolean deleteCatalog(NameIdentifier identifier, boolean cascade) { NameIdentifier.checkCatalog(identifier); - String metalakeName = identifier.namespace().level(0); + String catalogName = identifier.name(); Long metalakeId = - SessionUtils.getWithoutCommit( - MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(metalakeName)); - if (metalakeId == null) { - throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, identifier.namespace().toString()); - } - Long catalogId = - SessionUtils.getWithoutCommit( - CatalogMetaMapper.class, - mapper -> mapper.selectCatalogIdByMetalakeIdAndName(metalakeId, catalogName)); - if (catalogId != null) { - if (cascade) { - SessionUtils.doMultipleWithCommit( - () -> - SessionUtils.doWithoutCommit( - CatalogMetaMapper.class, - mapper -> mapper.softDeleteCatalogMetasByCatalogId(catalogId)), - () -> { - // TODO We will cascade delete the metadata of sub-resources under the catalog - }); - } else { - // TODO Check whether the sub-resources are empty. If the sub-resources are not empty, - // deletion is not allowed. - SessionUtils.doWithCommit( - CatalogMetaMapper.class, mapper -> mapper.softDeleteCatalogMetasByCatalogId(catalogId)); + CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace()); + + Long catalogId = getCatalogIdByMetalakeIdAndName(metalakeId, catalogName); + + if (cascade) { + SessionUtils.doMultipleWithCommit( + () -> + SessionUtils.doWithoutCommit( + CatalogMetaMapper.class, + mapper -> mapper.softDeleteCatalogMetasByCatalogId(catalogId)), + () -> + SessionUtils.doWithoutCommit( + SchemaMetaMapper.class, + mapper -> mapper.softDeleteSchemaMetasByCatalogId(catalogId)), + () -> { + // TODO We will cascade delete the metadata of sub-resources under the catalog + }); + } else { + List schemaEntities = + SchemaMetaService.getInstance() + .listSchemasByNamespace( + Namespace.ofSchema(identifier.namespace().level(0), catalogName)); + if (!schemaEntities.isEmpty()) { + throw new NonEmptyEntityException( + "Entity %s has sub-entities, you should remove sub-entities first", identifier); } + SessionUtils.doWithCommit( + CatalogMetaMapper.class, mapper -> mapper.softDeleteCatalogMetasByCatalogId(catalogId)); } + return true; } } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CommonMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CommonMetaService.java new file mode 100644 index 00000000000..20bc92fb252 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CommonMetaService.java @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational.service; + +import com.datastrato.gravitino.Namespace; +import com.google.common.base.Preconditions; + +/** The service class for common metadata operations. */ +public class CommonMetaService { + private static final CommonMetaService INSTANCE = new CommonMetaService(); + + public static CommonMetaService getInstance() { + return INSTANCE; + } + + private CommonMetaService() {} + + public Long getParentEntityIdByNamespace(Namespace namespace) { + Preconditions.checkArgument( + !namespace.isEmpty() && namespace.levels().length <= 3, + "Namespace should not be empty and length should be less and equal than 3"); + Long parentEntityId = null; + for (int level = 0; level < namespace.levels().length; level++) { + String name = namespace.level(level); + switch (level) { + case 0: + parentEntityId = MetalakeMetaService.getInstance().getMetalakeIdByName(name); + continue; + case 1: + parentEntityId = + CatalogMetaService.getInstance() + .getCatalogIdByMetalakeIdAndName(parentEntityId, name); + continue; + case 2: + parentEntityId = + SchemaMetaService.getInstance().getSchemaIdByCatalogIdAndName(parentEntityId, name); + break; + } + } + Preconditions.checkState( + parentEntityId != null && parentEntityId > 0, + "Parent entity id should not be null and should be greater than 0."); + return parentEntityId; + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java index 3850e1e47e1..d4ae1817368 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java @@ -6,7 +6,6 @@ package com.datastrato.gravitino.storage.relational.service; import com.datastrato.gravitino.Entity; -import com.datastrato.gravitino.EntityAlreadyExistsException; import com.datastrato.gravitino.HasIdentifier; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; @@ -16,12 +15,13 @@ import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.storage.relational.mapper.CatalogMetaMapper; import com.datastrato.gravitino.storage.relational.mapper.MetalakeMetaMapper; +import com.datastrato.gravitino.storage.relational.mapper.SchemaMetaMapper; import com.datastrato.gravitino.storage.relational.po.MetalakePO; +import com.datastrato.gravitino.storage.relational.utils.ExceptionUtils; import com.datastrato.gravitino.storage.relational.utils.POConverters; import com.datastrato.gravitino.storage.relational.utils.SessionUtils; import com.google.common.base.Preconditions; import java.io.IOException; -import java.sql.SQLIntegrityConstraintViolationException; import java.util.List; import java.util.Objects; import java.util.function.Function; @@ -45,6 +45,19 @@ public List listMetalakes() { return POConverters.fromMetalakePOs(metalakePOS); } + public Long getMetalakeIdByName(String metalakeName) { + Long metalakeId = + SessionUtils.getWithoutCommit( + MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(metalakeName)); + if (metalakeId == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.METALAKE.name().toLowerCase(), + metalakeName); + } + return metalakeId; + } + public BaseMetalake getMetalakeByIdentifier(NameIdentifier ident) { NameIdentifier.checkMetalake(ident); MetalakePO metalakePO = @@ -52,7 +65,9 @@ public BaseMetalake getMetalakeByIdentifier(NameIdentifier ident) { MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeMetaByName(ident.name())); if (metalakePO == null) { throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, ident.toString()); + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.METALAKE.name().toLowerCase(), + ident.toString()); } return POConverters.fromMetalakePO(metalakePO); } @@ -71,16 +86,8 @@ public void insertMetalake(BaseMetalake baseMetalake, boolean overwrite) { } }); } catch (RuntimeException re) { - if (re.getCause() != null - && re.getCause().getCause() != null - && re.getCause().getCause() instanceof SQLIntegrityConstraintViolationException) { - // TODO We should make more fine-grained exception judgments - // Usually throwing `SQLIntegrityConstraintViolationException` means that - // SQL violates the constraints of `primary key` and `unique key`. - // We simply think that the entity already exists at this time. - throw new EntityAlreadyExistsException( - String.format("Metalake entity: %s already exists", baseMetalake.nameIdentifier())); - } + ExceptionUtils.checkSQLConstraintException( + re, Entity.EntityType.METALAKE, baseMetalake.nameIdentifier().toString()); throw re; } } @@ -93,7 +100,9 @@ public BaseMetalake updateMetalake( MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeMetaByName(ident.name())); if (oldMetalakePO == null) { throw new NoSuchEntityException( - NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, ident.toString()); + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.METALAKE.name().toLowerCase(), + ident.toString()); } BaseMetalake oldMetalakeEntity = POConverters.fromMetalakePO(oldMetalakePO); @@ -112,16 +121,8 @@ public BaseMetalake updateMetalake( MetalakeMetaMapper.class, mapper -> mapper.updateMetalakeMeta(newMetalakePO, oldMetalakePO)); } catch (RuntimeException re) { - if (re.getCause() != null - && re.getCause().getCause() != null - && re.getCause().getCause() instanceof SQLIntegrityConstraintViolationException) { - // TODO We should make more fine-grained exception judgments - // Usually throwing `SQLIntegrityConstraintViolationException` means that - // SQL violates the constraints of `primary key` and `unique key`. - // We simply think that the entity already exists at this time. - throw new EntityAlreadyExistsException( - String.format("Catalog entity: %s already exists", newMetalakeEntity.nameIdentifier())); - } + ExceptionUtils.checkSQLConstraintException( + re, Entity.EntityType.METALAKE, newMetalakeEntity.nameIdentifier().toString()); throw re; } @@ -134,9 +135,7 @@ public BaseMetalake updateMetalake( public boolean deleteMetalake(NameIdentifier ident, boolean cascade) { NameIdentifier.checkMetalake(ident); - Long metalakeId = - SessionUtils.getWithoutCommit( - MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(ident.name())); + Long metalakeId = getMetalakeIdByName(ident.name()); if (metalakeId != null) { if (cascade) { SessionUtils.doMultipleWithCommit( @@ -148,6 +147,10 @@ public boolean deleteMetalake(NameIdentifier ident, boolean cascade) { SessionUtils.doWithoutCommit( CatalogMetaMapper.class, mapper -> mapper.softDeleteCatalogMetasByMetalakeId(metalakeId)), + () -> + SessionUtils.doWithoutCommit( + SchemaMetaMapper.class, + mapper -> mapper.softDeleteSchemaMetasByMetalakeId(metalakeId)), () -> { // TODO We will cascade delete the metadata of sub-resources under the metalake }); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java new file mode 100644 index 00000000000..2d71b405aca --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java @@ -0,0 +1,197 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.storage.relational.service; + +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.HasIdentifier; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.meta.SchemaEntity; +import com.datastrato.gravitino.storage.relational.mapper.SchemaMetaMapper; +import com.datastrato.gravitino.storage.relational.po.SchemaPO; +import com.datastrato.gravitino.storage.relational.utils.ExceptionUtils; +import com.datastrato.gravitino.storage.relational.utils.POConverters; +import com.datastrato.gravitino.storage.relational.utils.SessionUtils; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +/** The service class for schema metadata. It provides the basic database operations for schema. */ +public class SchemaMetaService { + private static final SchemaMetaService INSTANCE = new SchemaMetaService(); + + public static SchemaMetaService getInstance() { + return INSTANCE; + } + + private SchemaMetaService() {} + + public SchemaPO getSchemaPOByCatalogIdAndName(Long catalogId, String schemaName) { + SchemaPO schemaPO = + SessionUtils.getWithoutCommit( + SchemaMetaMapper.class, + mapper -> mapper.selectSchemaMetaByCatalogIdAndName(catalogId, schemaName)); + + if (schemaPO == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.SCHEMA.name().toLowerCase(), + schemaName); + } + return schemaPO; + } + + public Long getSchemaIdByCatalogIdAndName(Long catalogId, String schemaName) { + Long schemaId = + SessionUtils.getWithoutCommit( + SchemaMetaMapper.class, + mapper -> mapper.selectSchemaIdByCatalogIdAndName(catalogId, schemaName)); + + if (schemaId == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.SCHEMA.name().toLowerCase(), + schemaName); + } + return schemaId; + } + + public SchemaEntity getSchemaByIdentifier(NameIdentifier identifier) { + NameIdentifier.checkSchema(identifier); + String schemaName = identifier.name(); + + Long catalogId = + CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace()); + + SchemaPO schemaPO = getSchemaPOByCatalogIdAndName(catalogId, schemaName); + + return POConverters.fromSchemaPO(schemaPO, identifier.namespace()); + } + + public List listSchemasByNamespace(Namespace namespace) { + Namespace.checkSchema(namespace); + + Long catalogId = CommonMetaService.getInstance().getParentEntityIdByNamespace(namespace); + + List schemaPOs = + SessionUtils.getWithoutCommit( + SchemaMetaMapper.class, mapper -> mapper.listSchemaPOsByCatalogId(catalogId)); + return POConverters.fromSchemaPOs(schemaPOs, namespace); + } + + public void insertSchema(SchemaEntity schemaEntity, boolean overwrite) { + try { + NameIdentifier.checkSchema(schemaEntity.nameIdentifier()); + + SchemaPO.Builder builder = new SchemaPO.Builder(); + fillSchemaPOBuilderParentEntityId(builder, schemaEntity.namespace()); + + SessionUtils.doWithCommit( + SchemaMetaMapper.class, + mapper -> { + SchemaPO po = POConverters.initializeSchemaPOWithVersion(schemaEntity, builder); + if (overwrite) { + mapper.insertSchemaMetaOnDuplicateKeyUpdate(po); + } else { + mapper.insertSchemaMeta(po); + } + }); + } catch (RuntimeException re) { + ExceptionUtils.checkSQLConstraintException( + re, Entity.EntityType.SCHEMA, schemaEntity.nameIdentifier().toString()); + throw re; + } + } + + public SchemaEntity updateSchema( + NameIdentifier identifier, Function updater) throws IOException { + NameIdentifier.checkSchema(identifier); + + String schemaName = identifier.name(); + Long catalogId = + CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace()); + + SchemaPO oldSchemaPO = getSchemaPOByCatalogIdAndName(catalogId, schemaName); + + SchemaEntity oldSchemaEntity = POConverters.fromSchemaPO(oldSchemaPO, identifier.namespace()); + SchemaEntity newEntity = (SchemaEntity) updater.apply((E) oldSchemaEntity); + Preconditions.checkArgument( + Objects.equals(oldSchemaEntity.id(), newEntity.id()), + "The updated schema entity id: %s should be same with the schema entity id before: %s", + newEntity.id(), + oldSchemaEntity.id()); + + Integer updateResult; + try { + updateResult = + SessionUtils.doWithCommitAndFetchResult( + SchemaMetaMapper.class, + mapper -> + mapper.updateSchemaMeta( + POConverters.updateSchemaPOWithVersion(oldSchemaPO, newEntity), oldSchemaPO)); + } catch (RuntimeException re) { + ExceptionUtils.checkSQLConstraintException( + re, Entity.EntityType.SCHEMA, newEntity.nameIdentifier().toString()); + throw re; + } + + if (updateResult > 0) { + return newEntity; + } else { + throw new IOException("Failed to update the entity: " + identifier); + } + } + + public boolean deleteSchema(NameIdentifier identifier, boolean cascade) { + NameIdentifier.checkSchema(identifier); + + String schemaName = identifier.name(); + Long catalogId = + CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace()); + Long schemaId = getSchemaIdByCatalogIdAndName(catalogId, schemaName); + + if (schemaId != null) { + if (cascade) { + SessionUtils.doMultipleWithCommit( + () -> + SessionUtils.doWithoutCommit( + SchemaMetaMapper.class, + mapper -> mapper.softDeleteSchemaMetasBySchemaId(schemaId)), + () -> { + // TODO We will cascade delete the metadata of sub-resources under the schema + }); + } else { + // TODO Check whether the sub-resources are empty. If the sub-resources are not empty, + // deletion is not allowed. + SessionUtils.doWithCommit( + SchemaMetaMapper.class, mapper -> mapper.softDeleteSchemaMetasBySchemaId(schemaId)); + } + } + return true; + } + + private void fillSchemaPOBuilderParentEntityId(SchemaPO.Builder builder, Namespace namespace) { + Namespace.checkSchema(namespace); + Long parentEntityId = null; + for (int level = 0; level < namespace.levels().length; level++) { + String name = namespace.level(level); + switch (level) { + case 0: + parentEntityId = MetalakeMetaService.getInstance().getMetalakeIdByName(name); + builder.withMetalakeId(parentEntityId); + continue; + case 1: + parentEntityId = + CatalogMetaService.getInstance() + .getCatalogIdByMetalakeIdAndName(parentEntityId, name); + builder.withCatalogId(parentEntityId); + break; + } + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java index b8dcd440115..618e1a65860 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java @@ -9,6 +9,7 @@ import com.datastrato.gravitino.Configs; import com.datastrato.gravitino.storage.relational.mapper.CatalogMetaMapper; import com.datastrato.gravitino.storage.relational.mapper.MetalakeMetaMapper; +import com.datastrato.gravitino.storage.relational.mapper.SchemaMetaMapper; import com.google.common.base.Preconditions; import java.sql.SQLException; import java.time.Duration; @@ -76,6 +77,7 @@ public void init(Config config) { Configuration configuration = new Configuration(environment); configuration.addMapper(MetalakeMetaMapper.class); configuration.addMapper(CatalogMetaMapper.class); + configuration.addMapper(SchemaMetaMapper.class); // Create the SqlSessionFactory object, it is a singleton object if (sqlSessionFactory == null) { diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/ExceptionUtils.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/ExceptionUtils.java new file mode 100644 index 00000000000..271389e21ba --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/ExceptionUtils.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.storage.relational.utils; + +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.EntityAlreadyExistsException; +import java.sql.SQLIntegrityConstraintViolationException; + +public class ExceptionUtils { + private ExceptionUtils() {} + + public static void checkSQLConstraintException( + RuntimeException re, Entity.EntityType type, String entityName) { + if (re.getCause() != null + && re.getCause().getCause() != null + && re.getCause().getCause() instanceof SQLIntegrityConstraintViolationException) { + // TODO We should make more fine-grained exception judgments + // Usually throwing `SQLIntegrityConstraintViolationException` means that + // SQL violates the constraints of `primary key` and `unique key`. + // We simply think that the entity already exists at this time. + throw new EntityAlreadyExistsException( + String.format("%s entity: %s already exists", type.name(), entityName)); + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java index 4aa72329d73..8f9ed18dfd0 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java @@ -11,9 +11,11 @@ import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.BaseMetalake; import com.datastrato.gravitino.meta.CatalogEntity; +import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.meta.SchemaVersion; import com.datastrato.gravitino.storage.relational.po.CatalogPO; import com.datastrato.gravitino.storage.relational.po.MetalakePO; +import com.datastrato.gravitino.storage.relational.po.SchemaPO; import com.fasterxml.jackson.core.JsonProcessingException; import java.util.List; import java.util.Map; @@ -107,11 +109,11 @@ public static BaseMetalake fromMetalakePO(MetalakePO metalakePO) { /** * Convert list of {@link MetalakePO} to list of {@link BaseMetalake} * - * @param metalakePOS list of MetalakePO objects + * @param metalakePOs list of MetalakePO objects * @return list of BaseMetalake objects from list of MetalakePO objects */ - public static List fromMetalakePOs(List metalakePOS) { - return metalakePOS.stream().map(POConverters::fromMetalakePO).collect(Collectors.toList()); + public static List fromMetalakePOs(List metalakePOs) { + return metalakePOs.stream().map(POConverters::fromMetalakePO).collect(Collectors.toList()); } /** @@ -200,16 +202,105 @@ public static CatalogEntity fromCatalogPO(CatalogPO catalogPO, Namespace namespa } /** - * Convert list of {@link MetalakePO} to list of {@link BaseMetalake} + * Convert list of {@link CatalogPO} to list of {@link CatalogEntity} * - * @param catalogPOS list of MetalakePO objects - * @param namespace Namespace object to be associated with the metalake - * @return list of BaseMetalake objects from list of MetalakePO objects + * @param catalogPOs list of CatalogPO objects + * @param namespace Namespace object to be associated with the catalog + * @return list of CatalogEntity objects from list of CatalogPO objects */ public static List fromCatalogPOs( - List catalogPOS, Namespace namespace) { - return catalogPOS.stream() + List catalogPOs, Namespace namespace) { + return catalogPOs.stream() .map(catalogPO -> POConverters.fromCatalogPO(catalogPO, namespace)) .collect(Collectors.toList()); } + + /** + * Initialize SchemaPO + * + * @param schemaEntity SchemaEntity object + * @return CatalogPO object with version initialized + */ + public static SchemaPO initializeSchemaPOWithVersion( + SchemaEntity schemaEntity, SchemaPO.Builder builder) { + try { + return builder + .withSchemaId(schemaEntity.id()) + .withSchemaName(schemaEntity.name()) + .withSchemaComment(schemaEntity.comment()) + .withProperties(JsonUtils.anyFieldMapper().writeValueAsString(schemaEntity.properties())) + .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(schemaEntity.auditInfo())) + .withCurrentVersion(1L) + .withLastVersion(1L) + .withDeletedAt(0L) + .build(); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize json object:", e); + } + } + + /** + * Update SchemaPO version + * + * @param oldSchemaPO the old SchemaPO object + * @param newSchema the new SchemaEntity object + * @return SchemaPO object with updated version + */ + public static SchemaPO updateSchemaPOWithVersion(SchemaPO oldSchemaPO, SchemaEntity newSchema) { + Long lastVersion = oldSchemaPO.getLastVersion(); + // Will set the version to the last version + 1 when having some fields need be multiple version + Long nextVersion = lastVersion; + try { + return new SchemaPO.Builder() + .withSchemaId(oldSchemaPO.getSchemaId()) + .withSchemaName(newSchema.name()) + .withMetalakeId(oldSchemaPO.getMetalakeId()) + .withCatalogId(oldSchemaPO.getCatalogId()) + .withSchemaComment(newSchema.comment()) + .withProperties(JsonUtils.anyFieldMapper().writeValueAsString(newSchema.properties())) + .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newSchema.auditInfo())) + .withCurrentVersion(nextVersion) + .withLastVersion(nextVersion) + .withDeletedAt(0L) + .build(); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize json object:", e); + } + } + + /** + * Convert {@link SchemaPO} to {@link SchemaEntity} + * + * @param schemaPO SchemaPO object to be converted + * @param namespace Namespace object to be associated with the schema + * @return SchemaEntity object from SchemaPO object + */ + public static SchemaEntity fromSchemaPO(SchemaPO schemaPO, Namespace namespace) { + try { + return new SchemaEntity.Builder() + .withId(schemaPO.getSchemaId()) + .withName(schemaPO.getSchemaName()) + .withNamespace(namespace) + .withComment(schemaPO.getSchemaComment()) + .withProperties(JsonUtils.anyFieldMapper().readValue(schemaPO.getProperties(), Map.class)) + .withAuditInfo( + JsonUtils.anyFieldMapper().readValue(schemaPO.getAuditInfo(), AuditInfo.class)) + .build(); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to deserialize json object:", e); + } + } + + /** + * Convert list of {@link SchemaPO} to list of {@link SchemaEntity} + * + * @param schemaPOs list of SchemaPO objects + * @param namespace Namespace object to be associated with the schema + * @return list of SchemaEntity objects from list of SchemaPO objects + */ + public static List fromSchemaPOs(List schemaPOs, Namespace namespace) { + return schemaPOs.stream() + .map(schemaPO -> POConverters.fromSchemaPO(schemaPO, namespace)) + .collect(Collectors.toList()); + } } diff --git a/core/src/main/resources/mysql/mysql_init.sql b/core/src/main/resources/mysql/mysql_init.sql index 613d0b692b3..78811a4cb71 100644 --- a/core/src/main/resources/mysql/mysql_init.sql +++ b/core/src/main/resources/mysql/mysql_init.sql @@ -31,4 +31,20 @@ CREATE TABLE IF NOT EXISTS `catalog_meta` ( `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'catalog deleted at', PRIMARY KEY (`catalog_id`), UNIQUE KEY `uk_mid_cn_del` (`metalake_id`, `catalog_name`, `deleted_at`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'catalog metadata'; \ No newline at end of file +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'catalog metadata'; + +CREATE TABLE IF NOT EXISTS `schema_meta` ( + `schema_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'schema id', + `schema_name` VARCHAR(128) NOT NULL COMMENT 'schema name', + `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id', + `catalog_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id', + `schema_comment` VARCHAR(256) DEFAULT '' COMMENT 'schema comment', + `properties` MEDIUMTEXT DEFAULT NULL COMMENT 'schema properties', + `audit_info` MEDIUMTEXT NOT NULL COMMENT 'schema audit info', + `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'schema current version', + `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'schema last version', + `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'schema deleted at', + PRIMARY KEY (`schema_id`), + UNIQUE KEY `uk_cid_sn_del` (`catalog_id`, `schema_name`, `deleted_at`), + KEY `idx_mid` (`metalake_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'schema metadata'; \ No newline at end of file diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java index ccf84585d20..f0be65117eb 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java @@ -31,6 +31,7 @@ import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.BaseMetalake; import com.datastrato.gravitino.meta.CatalogEntity; +import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.meta.SchemaVersion; import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper; import java.io.BufferedReader; @@ -135,8 +136,7 @@ public static void tearDown() { } @Test - public void testPutAndGet() throws IOException { - // metalake + public void testMetalakePutAndGet() throws IOException { BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); entityStore.put(metalake, false); BaseMetalake insertedMetalake = @@ -158,14 +158,20 @@ public void testPutAndGet() throws IOException { assertEquals( 1, entityStore.list(Namespace.empty(), BaseMetalake.class, Entity.EntityType.METALAKE).size()); - assertEquals("test_metalake2", insertedMetalake1.name()); - assertEquals("this is test2", insertedMetalake1.comment()); + assertEquals(overittenMetalake.name(), insertedMetalake1.name()); + assertEquals(overittenMetalake.comment(), insertedMetalake1.comment()); + } + + @Test + public void testCatalogPutAndGet() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); - // catalog CatalogEntity catalog = createCatalog( - 1L, "test_catalog", Namespace.ofCatalog("test_metalake2"), "this is catalog test"); + 1L, "test_catalog", Namespace.ofCatalog(metalake.name()), "this is catalog test"); entityStore.put(catalog, false); + CatalogEntity insertedCatalog = entityStore.get(catalog.nameIdentifier(), Entity.EntityType.CATALOG, CatalogEntity.class); assertNotNull(insertedCatalog); @@ -174,14 +180,14 @@ public void testPutAndGet() throws IOException { // overwrite false CatalogEntity duplicateCatalog = createCatalog( - 1L, "test_catalog", Namespace.ofCatalog("test_metalake2"), "this is catalog test"); + 1L, "test_catalog", Namespace.ofCatalog(metalake.name()), "this is catalog test"); assertThrows( EntityAlreadyExistsException.class, () -> entityStore.put(duplicateCatalog, false)); // overwrite true CatalogEntity overittenCatalog = createCatalog( - 1L, "test_catalog1", Namespace.ofCatalog("test_metalake2"), "this is catalog test1"); + 1L, "test_catalog1", Namespace.ofCatalog(metalake.name()), "this is catalog test1"); entityStore.put(overittenCatalog, true); CatalogEntity insertedCatalog1 = entityStore.get( @@ -191,13 +197,64 @@ public void testPutAndGet() throws IOException { entityStore .list(overittenCatalog.namespace(), CatalogEntity.class, Entity.EntityType.CATALOG) .size()); - assertEquals("test_catalog1", insertedCatalog1.name()); - assertEquals("this is catalog test1", insertedCatalog1.getComment()); + assertEquals(overittenCatalog.name(), insertedCatalog1.name()); + assertEquals(overittenCatalog.getComment(), insertedCatalog1.getComment()); + } + + @Test + public void testSchemaPutAndGet() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); + + CatalogEntity catalog = + createCatalog(1L, "test_metalake", Namespace.ofCatalog(metalake.name()), "this is test"); + entityStore.put(catalog, false); + + SchemaEntity schema = + createSchema( + 1L, + "test_schema", + Namespace.ofSchema(metalake.name(), catalog.name()), + "this is schema test"); + entityStore.put(schema, false); + + SchemaEntity insertedSchema = + entityStore.get(schema.nameIdentifier(), Entity.EntityType.SCHEMA, SchemaEntity.class); + assertNotNull(insertedSchema); + assertTrue(checkSchemaEquals(schema, insertedSchema)); + + // overwrite false + SchemaEntity duplicateSchema = + createSchema( + 1L, + "test_schema", + Namespace.ofSchema(metalake.name(), catalog.name()), + "this is schema test"); + assertThrows(EntityAlreadyExistsException.class, () -> entityStore.put(duplicateSchema, false)); + + // overwrite true + SchemaEntity overittenSchema = + createSchema( + 1L, + "test_schema1", + Namespace.ofSchema(metalake.name(), catalog.name()), + "this is schema test1"); + entityStore.put(overittenSchema, true); + SchemaEntity insertedSchema1 = + entityStore.get( + overittenSchema.nameIdentifier(), Entity.EntityType.SCHEMA, SchemaEntity.class); + + assertEquals( + 1, + entityStore + .list(insertedSchema1.namespace(), SchemaEntity.class, Entity.EntityType.SCHEMA) + .size()); + assertEquals(overittenSchema.name(), insertedSchema1.name()); + assertEquals(overittenSchema.comment(), insertedSchema1.comment()); } @Test - public void testPutAndList() throws IOException { - // metalake + public void testMetalakePutAndList() throws IOException { BaseMetalake metalake1 = createMetalake(1L, "test_metalake1", "this is test 1"); BaseMetalake metalake2 = createMetalake(2L, "test_metalake2", "this is test 2"); List beforePutList = @@ -216,14 +273,19 @@ public void testPutAndList() throws IOException { assertEquals(2, metalakes.size()); assertTrue(checkMetalakeEquals(metalake1, metalakes.get(0))); assertTrue(checkMetalakeEquals(metalake2, metalakes.get(1))); + } + + @Test + public void testCatalogPutAndList() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test 1"); + entityStore.put(metalake, false); - // catalog CatalogEntity catalog1 = createCatalog( - 1L, "test_catalog1", Namespace.ofCatalog(metalake1.name()), "this is catalog 1"); + 1L, "test_catalog1", Namespace.ofCatalog(metalake.name()), "this is catalog 1"); CatalogEntity catalog2 = createCatalog( - 2L, "test_catalog2", Namespace.ofCatalog(metalake1.name()), "this is catalog 2"); + 2L, "test_catalog2", Namespace.ofCatalog(metalake.name()), "this is catalog 2"); List beforeCatalogList = entityStore.list(catalog1.namespace(), CatalogEntity.class, Entity.EntityType.CATALOG); assertNotNull(beforeCatalogList); @@ -243,13 +305,52 @@ public void testPutAndList() throws IOException { } @Test - public void testPutAndDelete() throws IOException, InterruptedException { - // metalake + public void testSchemaPutAndList() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test 1"); + entityStore.put(metalake, false); + + CatalogEntity catalog = + createCatalog(1L, "test_catalog", Namespace.ofCatalog(metalake.name()), "this is test"); + entityStore.put(catalog, false); + + SchemaEntity schema1 = + createSchema( + 1L, + "test_schema1", + Namespace.ofSchema(metalake.name(), catalog.name()), + "this is schema 1"); + SchemaEntity schema2 = + createSchema( + 2L, + "test_schema2", + Namespace.ofSchema(metalake.name(), catalog.name()), + "this is schema 2"); + List beforeSchemaList = + entityStore.list(schema1.namespace(), SchemaEntity.class, Entity.EntityType.SCHEMA); + assertNotNull(beforeSchemaList); + assertEquals(0, beforeSchemaList.size()); + + entityStore.put(schema1, false); + entityStore.put(schema2, false); + List schemaEntities = + entityStore.list(schema1.namespace(), SchemaEntity.class, Entity.EntityType.SCHEMA).stream() + .sorted(Comparator.comparing(SchemaEntity::id)) + .collect(Collectors.toList()); + assertNotNull(schemaEntities); + assertEquals(2, schemaEntities.size()); + assertTrue(checkSchemaEquals(schema1, schemaEntities.get(0))); + assertTrue(checkSchemaEquals(schema2, schemaEntities.get(1))); + } + + @Test + public void testMetalakePutAndDelete() throws IOException, InterruptedException { BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); entityStore.put(metalake, false); + assertNotNull( entityStore.get(metalake.nameIdentifier(), Entity.EntityType.METALAKE, BaseMetalake.class)); entityStore.delete(metalake.nameIdentifier(), Entity.EntityType.METALAKE, false); + assertThrows( NoSuchEntityException.class, () -> @@ -260,13 +361,21 @@ public void testPutAndDelete() throws IOException, InterruptedException { Thread.sleep(1000); // test cascade delete - BaseMetalake metalake1 = createMetalake(2L, "test_metalake", "this is test"); + BaseMetalake metalake1 = createMetalake(2L, "test_metalake1", "this is test"); entityStore.put(metalake1, false); CatalogEntity subCatalog = createCatalog( 1L, "test_catalog", Namespace.ofCatalog(metalake1.name()), "test cascade deleted"); entityStore.put(subCatalog, false); + SchemaEntity subSchema = + createSchema( + 1L, + "test_schema", + Namespace.ofSchema(metalake1.name(), subCatalog.name()), + "test cascade deleted"); + entityStore.put(subSchema, false); + // cascade is false assertThrows( NonEmptyEntityException.class, @@ -276,16 +385,22 @@ public void testPutAndDelete() throws IOException, InterruptedException { entityStore.delete(metalake1.nameIdentifier(), Entity.EntityType.METALAKE, true); assertFalse(entityStore.exists(metalake1.nameIdentifier(), Entity.EntityType.METALAKE)); assertFalse(entityStore.exists(subCatalog.nameIdentifier(), Entity.EntityType.CATALOG)); + assertFalse(entityStore.exists(subSchema.nameIdentifier(), Entity.EntityType.SCHEMA)); + } + + @Test + public void testCatalogPutAndDelete() throws IOException, InterruptedException { + BaseMetalake metalake = createMetalake(3L, "test_metalake", "this is test"); + entityStore.put(metalake, false); - // catalog - BaseMetalake metalake2 = createMetalake(3L, "test_metalake", "this is test"); - entityStore.put(metalake2, false); CatalogEntity catalog = - createCatalog(2L, "test_catalog", Namespace.ofCatalog("test_metalake"), "this is test"); + createCatalog(2L, "test_catalog", Namespace.ofCatalog(metalake.name()), "this is test"); entityStore.put(catalog, false); + assertNotNull( entityStore.get(catalog.nameIdentifier(), Entity.EntityType.CATALOG, CatalogEntity.class)); entityStore.delete(catalog.nameIdentifier(), Entity.EntityType.CATALOG, false); + assertThrows( NoSuchEntityException.class, () -> @@ -298,22 +413,70 @@ public void testPutAndDelete() throws IOException, InterruptedException { // test cascade delete CatalogEntity catalog1 = createCatalog( - 3L, "test_catalog1", Namespace.ofCatalog(metalake2.name()), "test cascade deleted"); + 3L, "test_catalog1", Namespace.ofCatalog(metalake.name()), "test cascade deleted"); entityStore.put(catalog1, false); + SchemaEntity subSchema = + createSchema( + 1L, + "test_schema", + Namespace.ofSchema(metalake.name(), catalog1.name()), + "test cascade deleted"); + entityStore.put(subSchema, false); + // cascade is false assertThrows( NonEmptyEntityException.class, - () -> entityStore.delete(metalake2.nameIdentifier(), Entity.EntityType.METALAKE, false)); + () -> entityStore.delete(catalog1.nameIdentifier(), Entity.EntityType.CATALOG, false)); // cascade is true entityStore.delete(catalog1.nameIdentifier(), Entity.EntityType.CATALOG, true); assertFalse(entityStore.exists(catalog1.nameIdentifier(), Entity.EntityType.CATALOG)); + assertFalse(entityStore.exists(subSchema.nameIdentifier(), Entity.EntityType.SCHEMA)); + } + + @Test + public void testSchemaPutAndDelete() throws IOException, InterruptedException { + BaseMetalake metalake = createMetalake(3L, "test_metalake", "this is test"); + entityStore.put(metalake, false); + + CatalogEntity catalog = + createCatalog(2L, "test_catalog", Namespace.ofCatalog(metalake.name()), "this is test"); + entityStore.put(catalog, false); + + SchemaEntity schema = + createSchema( + 2L, "test_schema", Namespace.ofSchema(metalake.name(), catalog.name()), "this is test"); + entityStore.put(schema, false); + + assertNotNull( + entityStore.get(schema.nameIdentifier(), Entity.EntityType.SCHEMA, SchemaEntity.class)); + entityStore.delete(schema.nameIdentifier(), Entity.EntityType.SCHEMA, false); + + assertThrows( + NoSuchEntityException.class, + () -> + entityStore.get(schema.nameIdentifier(), Entity.EntityType.SCHEMA, SchemaEntity.class)); + + // sleep 1s to make delete_at seconds differently + Thread.sleep(1000); + + // test cascade delete + SchemaEntity schema1 = + createSchema( + 3L, + "test_schema1", + Namespace.ofSchema(metalake.name(), catalog.name()), + "test cascade deleted"); + entityStore.put(schema1, false); + + // cascade is true + entityStore.delete(schema1.nameIdentifier(), Entity.EntityType.SCHEMA, true); + assertFalse(entityStore.exists(schema1.nameIdentifier(), Entity.EntityType.SCHEMA)); } @Test - public void testPutAndUpdate() throws IOException { - // metalake + public void testMetalakePutAndUpdate() throws IOException { BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); entityStore.put(metalake, false); @@ -355,6 +518,7 @@ public void testPutAndUpdate() throws IOException { .withVersion(m.getVersion()); return builder.build(); }); + BaseMetalake storedMetalake = entityStore.get( updatedMetalake.nameIdentifier(), Entity.EntityType.METALAKE, BaseMetalake.class); @@ -365,6 +529,7 @@ public void testPutAndUpdate() throws IOException { BaseMetalake metalake3 = createMetalake(3L, "test_metalake3", "this is test 3"); entityStore.put(metalake3, false); + assertThrows( EntityAlreadyExistsException.class, () -> @@ -376,6 +541,7 @@ public void testPutAndUpdate() throws IOException { BaseMetalake.Builder builder = new BaseMetalake.Builder() .withId(metalake3.id()) + // metalake name already exists .withName("test_metalake2") .withComment(metalake3.comment()) .withProperties(new HashMap<>()) @@ -383,12 +549,18 @@ public void testPutAndUpdate() throws IOException { .withVersion(m.getVersion()); return builder.build(); })); + } + + @Test + public void testCatalogPutAndUpdate() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); - // catalog CatalogEntity catalog = createCatalog( - 1L, "test_catalog", Namespace.ofCatalog("test_metalake2"), "this is catalog test"); + 1L, "test_catalog", Namespace.ofCatalog(metalake.name()), "this is catalog test"); entityStore.put(catalog, false); + assertThrows( RuntimeException.class, () -> @@ -402,7 +574,7 @@ public void testPutAndUpdate() throws IOException { // Change the id, which is not allowed .withId(2L) .withName("test_catalog2") - .withNamespace(Namespace.ofCatalog(updatedMetalake.name())) + .withNamespace(Namespace.ofCatalog(metalake.name())) .withType(Catalog.Type.RELATIONAL) .withProvider("test") .withComment("this is catalog test 2") @@ -411,6 +583,8 @@ public void testPutAndUpdate() throws IOException { return builder.build(); })); + AuditInfo changedAuditInfo = + AuditInfo.builder().withCreator("changed_creator").withCreateTime(Instant.now()).build(); CatalogEntity updatedCatalog = entityStore.update( catalog.nameIdentifier(), @@ -421,7 +595,7 @@ public void testPutAndUpdate() throws IOException { CatalogEntity.builder() .withId(c.id()) .withName("test_catalog2") - .withNamespace(Namespace.ofCatalog(updatedMetalake.name())) + .withNamespace(Namespace.ofCatalog(metalake.name())) .withType(Catalog.Type.RELATIONAL) .withProvider("test") .withComment("this is catalog test 2") @@ -429,9 +603,11 @@ public void testPutAndUpdate() throws IOException { .withAuditInfo(changedAuditInfo); return builder.build(); }); + CatalogEntity storedCatalog = entityStore.get( updatedCatalog.nameIdentifier(), Entity.EntityType.CATALOG, CatalogEntity.class); + assertEquals(catalog.id(), storedCatalog.id()); assertEquals("test_catalog2", updatedCatalog.name()); assertEquals("this is catalog test 2", updatedCatalog.getComment()); @@ -439,7 +615,7 @@ public void testPutAndUpdate() throws IOException { CatalogEntity catalog3 = createCatalog( - 3L, "test_catalog3", Namespace.ofCatalog("test_metalake2"), "this is catalog test 3"); + 3L, "test_catalog3", Namespace.ofCatalog(metalake.name()), "this is catalog test 3"); entityStore.put(catalog3, false); assertThrows( EntityAlreadyExistsException.class, @@ -452,8 +628,9 @@ public void testPutAndUpdate() throws IOException { CatalogEntity.Builder builder = CatalogEntity.builder() .withId(catalog3.id()) + // catalog name already exists .withName("test_catalog2") - .withNamespace(Namespace.ofCatalog(updatedMetalake.name())) + .withNamespace(Namespace.ofCatalog(metalake.name())) .withType(Catalog.Type.RELATIONAL) .withProvider("test") .withComment(catalog3.getComment()) @@ -464,19 +641,136 @@ public void testPutAndUpdate() throws IOException { } @Test - public void testPutAndExists() throws IOException, InterruptedException { - // metalake + public void testSchemaPutAndUpdate() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); + + CatalogEntity catalog = + createCatalog( + 1L, "test_catalog", Namespace.ofCatalog(metalake.name()), "this is catalog test"); + entityStore.put(catalog, false); + + SchemaEntity schema = + createSchema( + 1L, + "test_schema", + Namespace.ofSchema(metalake.name(), catalog.name()), + "this is schema test"); + entityStore.put(schema, false); + + assertThrows( + RuntimeException.class, + () -> + entityStore.update( + schema.nameIdentifier(), + SchemaEntity.class, + Entity.EntityType.SCHEMA, + s -> { + SchemaEntity.Builder builder = + new SchemaEntity.Builder() + // Change the id, which is not allowed + .withId(2L) + .withName("test_schema2") + .withNamespace(Namespace.ofSchema(metalake.name(), catalog.name())) + .withComment("this is schema test 2") + .withProperties(new HashMap<>()) + .withAuditInfo(s.auditInfo()); + return builder.build(); + })); + + AuditInfo changedAuditInfo = + AuditInfo.builder().withCreator("changed_creator").withCreateTime(Instant.now()).build(); + SchemaEntity updatedSchema = + entityStore.update( + schema.nameIdentifier(), + SchemaEntity.class, + Entity.EntityType.SCHEMA, + s -> { + SchemaEntity.Builder builder = + new SchemaEntity.Builder() + .withId(s.id()) + .withName("test_schema2") + .withNamespace(Namespace.ofSchema(metalake.name(), catalog.name())) + .withComment("this is schema test 2") + .withProperties(new HashMap<>()) + .withAuditInfo(changedAuditInfo); + return builder.build(); + }); + + SchemaEntity storedSchema = + entityStore.get( + updatedSchema.nameIdentifier(), Entity.EntityType.SCHEMA, SchemaEntity.class); + + assertEquals(catalog.id(), storedSchema.id()); + assertEquals("test_schema2", storedSchema.name()); + assertEquals("this is schema test 2", storedSchema.comment()); + assertEquals(changedAuditInfo.creator(), storedSchema.auditInfo().creator()); + + SchemaEntity schema3 = + createSchema( + 3L, + "test_schema3", + Namespace.ofSchema(metalake.name(), catalog.name()), + "this is schema test 3"); + entityStore.put(schema3, false); + + assertThrows( + EntityAlreadyExistsException.class, + () -> + entityStore.update( + schema3.nameIdentifier(), + SchemaEntity.class, + Entity.EntityType.SCHEMA, + s -> { + SchemaEntity.Builder builder = + new SchemaEntity.Builder() + .withId(schema3.id()) + // schema name already exists + .withName("test_schema2") + .withNamespace(Namespace.ofSchema(metalake.name(), catalog.name())) + .withComment(s.comment()) + .withProperties(new HashMap<>()) + .withAuditInfo(s.auditInfo()); + return builder.build(); + })); + } + + @Test + public void testMetalakePutAndExists() throws IOException { BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); entityStore.put(metalake, false); assertTrue(entityStore.exists(metalake.nameIdentifier(), Entity.EntityType.METALAKE)); + } + + @Test + public void testCatalogPutAndExists() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); - // catalog CatalogEntity catalog = - createCatalog(1L, "test_catalog", Namespace.ofCatalog("test_metalake"), "this is test"); + createCatalog(1L, "test_catalog", Namespace.ofCatalog(metalake.name()), "this is test"); entityStore.put(catalog, false); + assertTrue(entityStore.exists(catalog.nameIdentifier(), Entity.EntityType.CATALOG)); } + @Test + public void testSchemaPutAndExists() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); + + CatalogEntity catalog = + createCatalog(1L, "test_catalog", Namespace.ofCatalog(metalake.name()), "this is test"); + entityStore.put(catalog, false); + + SchemaEntity schema = + createSchema( + 1L, "test_schema", Namespace.ofSchema(metalake.name(), catalog.name()), "this is test"); + entityStore.put(schema, false); + + assertTrue(entityStore.exists(schema.nameIdentifier(), Entity.EntityType.SCHEMA)); + } + private static BaseMetalake createMetalake(Long id, String name, String comment) { AuditInfo auditInfo = AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build(); @@ -519,6 +813,7 @@ private static boolean checkCatalogEquals(CatalogEntity expected, CatalogEntity return expected.id().equals(actual.id()) && expected.name().equals(actual.name()) && expected.namespace().equals(actual.namespace()) + && expected.getComment().equals(actual.getComment()) && expected.getType().equals(actual.getType()) && expected.getProvider().equals(actual.getProvider()) && expected.getProperties() != null @@ -526,6 +821,30 @@ private static boolean checkCatalogEquals(CatalogEntity expected, CatalogEntity && expected.auditInfo().equals(actual.auditInfo()); } + private static SchemaEntity createSchema( + Long id, String name, Namespace namespace, String comment) { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build(); + return new SchemaEntity.Builder() + .withId(id) + .withName(name) + .withNamespace(namespace) + .withComment(comment) + .withProperties(new HashMap<>()) + .withAuditInfo(auditInfo) + .build(); + } + + private static boolean checkSchemaEquals(SchemaEntity expected, SchemaEntity actual) { + return expected.id().equals(actual.id()) + && expected.name().equals(actual.name()) + && expected.namespace().equals(actual.namespace()) + && expected.comment().equals(actual.comment()) + && expected.properties() != null + && expected.properties().equals(actual.properties()) + && expected.auditInfo().equals(actual.auditInfo()); + } + private static void truncateAllTables() { try (SqlSession sqlSession = SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true)) { diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java index 544b5e713e6..1babed4e6a7 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java @@ -13,9 +13,11 @@ import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.BaseMetalake; import com.datastrato.gravitino.meta.CatalogEntity; +import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.meta.SchemaVersion; import com.datastrato.gravitino.storage.relational.po.CatalogPO; import com.datastrato.gravitino.storage.relational.po.MetalakePO; +import com.datastrato.gravitino.storage.relational.po.SchemaPO; import com.fasterxml.jackson.core.JsonProcessingException; import java.time.Instant; import java.time.LocalDateTime; @@ -72,6 +74,26 @@ public void testFromCatalogPO() throws JsonProcessingException { assertEquals(expectedCatalog.auditInfo().creator(), convertedCatalog.auditInfo().creator()); } + @Test + public void testFromSchemaPO() throws JsonProcessingException { + SchemaPO schemaPO = createSchemaPO(1L, "test", 1L, 1L, "this is test"); + + SchemaEntity expectedSchema = + createSchema( + 1L, "test", Namespace.ofSchema("test_metalake", "test_catalog"), "this is test"); + + SchemaEntity convertedSchema = + POConverters.fromSchemaPO(schemaPO, Namespace.ofSchema("test_metalake", "test_catalog")); + + // Assert + assertEquals(expectedSchema.id(), convertedSchema.id()); + assertEquals(expectedSchema.name(), convertedSchema.name()); + assertEquals(expectedSchema.comment(), convertedSchema.comment()); + assertEquals(expectedSchema.namespace(), convertedSchema.namespace()); + assertEquals(expectedSchema.properties().get("key"), convertedSchema.properties().get("key")); + assertEquals(expectedSchema.auditInfo().creator(), convertedSchema.auditInfo().creator()); + } + @Test public void testFromMetalakePOs() throws JsonProcessingException { MetalakePO metalakePO1 = createMetalakePO(1L, "test", "this is test"); @@ -132,6 +154,37 @@ public void testFromCatalogPOs() throws JsonProcessingException { } } + @Test + public void testFromSchemaPOs() throws JsonProcessingException { + SchemaPO schemaPO1 = createSchemaPO(1L, "test", 1L, 1L, "this is test"); + SchemaPO schemaPO2 = createSchemaPO(2L, "test2", 1L, 1L, "this is test2"); + List schemaPOs = new ArrayList<>(Arrays.asList(schemaPO1, schemaPO2)); + List convertedSchemas = + POConverters.fromSchemaPOs(schemaPOs, Namespace.ofSchema("test_metalake", "test_catalog")); + + SchemaEntity expectedSchema1 = + createSchema( + 1L, "test", Namespace.ofSchema("test_metalake", "test_catalog"), "this is test"); + SchemaEntity expectedSchema2 = + createSchema( + 2L, "test2", Namespace.ofSchema("test_metalake", "test_catalog"), "this is test2"); + List expectedSchemas = + new ArrayList<>(Arrays.asList(expectedSchema1, expectedSchema2)); + + // Assert + int index = 0; + for (SchemaEntity schema : convertedSchemas) { + assertEquals(expectedSchemas.get(index).id(), schema.id()); + assertEquals(expectedSchemas.get(index).name(), schema.name()); + assertEquals(expectedSchemas.get(index).comment(), schema.comment()); + assertEquals(expectedSchemas.get(index).namespace(), schema.namespace()); + assertEquals( + expectedSchemas.get(index).properties().get("key"), schema.properties().get("key")); + assertEquals(expectedSchemas.get(index).auditInfo().creator(), schema.auditInfo().creator()); + index++; + } + } + @Test public void testInitMetalakePOVersion() { BaseMetalake metalake = createMetalake(1L, "test", "this is test"); @@ -151,6 +204,20 @@ public void testInitCatalogPOVersion() { assertEquals(0, initPO.getDeletedAt()); } + @Test + public void testInitSchemaPOVersion() { + SchemaEntity schema = + createSchema( + 1L, "test", Namespace.ofSchema("test_metalake", "test_catalog"), "this is test"); + SchemaPO.Builder builder = new SchemaPO.Builder(); + builder.withMetalakeId(1L); + builder.withCatalogId(1L); + SchemaPO initPO = POConverters.initializeSchemaPOWithVersion(schema, builder); + assertEquals(1, initPO.getCurrentVersion()); + assertEquals(1, initPO.getLastVersion()); + assertEquals(0, initPO.getDeletedAt()); + } + @Test public void testUpdateMetalakePOVersion() { BaseMetalake metalake = createMetalake(1L, "test", "this is test"); @@ -177,6 +244,25 @@ public void testUpdateCatalogPOVersion() { assertEquals("this is test2", updatePO.getCatalogComment()); } + @Test + public void testUpdateSchemaPOVersion() { + SchemaEntity schema = + createSchema( + 1L, "test", Namespace.ofSchema("test_metalake", "test_catalog"), "this is test"); + SchemaEntity updatedSchema = + createSchema( + 1L, "test", Namespace.ofSchema("test_metalake", "test_catalog"), "this is test2"); + SchemaPO.Builder builder = new SchemaPO.Builder(); + builder.withMetalakeId(1L); + builder.withCatalogId(1L); + SchemaPO initPO = POConverters.initializeSchemaPOWithVersion(schema, builder); + SchemaPO updatePO = POConverters.updateSchemaPOWithVersion(initPO, updatedSchema); + assertEquals(1, initPO.getCurrentVersion()); + assertEquals(1, initPO.getLastVersion()); + assertEquals(0, initPO.getDeletedAt()); + assertEquals("this is test2", updatePO.getSchemaComment()); + } + private static BaseMetalake createMetalake(Long id, String name, String comment) { AuditInfo auditInfo = AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build(); @@ -205,6 +291,9 @@ private static MetalakePO createMetalakePO(Long id, String name, String comment) .withProperties(JsonUtils.anyFieldMapper().writeValueAsString(properties)) .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(auditInfo)) .withSchemaVersion(JsonUtils.anyFieldMapper().writeValueAsString(SchemaVersion.V_0_1)) + .withCurrentVersion(1L) + .withLastVersion(1L) + .withDeletedAt(0L) .build(); } @@ -241,6 +330,46 @@ private static CatalogPO createCatalogPO(Long id, String name, Long metalakeId, .withCatalogComment(comment) .withProperties(JsonUtils.anyFieldMapper().writeValueAsString(properties)) .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(auditInfo)) + .withCurrentVersion(1L) + .withLastVersion(1L) + .withDeletedAt(0L) + .build(); + } + + private static SchemaEntity createSchema( + Long id, String name, Namespace namespace, String comment) { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build(); + Map properties = new HashMap<>(); + properties.put("key", "value"); + return new SchemaEntity.Builder() + .withId(id) + .withName(name) + .withNamespace(namespace) + .withComment(comment) + .withProperties(properties) + .withAuditInfo(auditInfo) + .build(); + } + + private static SchemaPO createSchemaPO( + Long id, String name, Long metalakeId, Long catalogId, String comment) + throws JsonProcessingException { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build(); + Map properties = new HashMap<>(); + properties.put("key", "value"); + return new SchemaPO.Builder() + .withSchemaId(id) + .withSchemaName(name) + .withMetalakeId(metalakeId) + .withCatalogId(catalogId) + .withSchemaComment(comment) + .withProperties(JsonUtils.anyFieldMapper().writeValueAsString(properties)) + .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(auditInfo)) + .withCurrentVersion(1L) + .withLastVersion(1L) + .withDeletedAt(0L) .build(); } } diff --git a/core/src/test/resources/h2/h2-init.sql b/core/src/test/resources/h2/h2-init.sql index 9f47ee11dfe..4c9f4959531 100644 --- a/core/src/test/resources/h2/h2-init.sql +++ b/core/src/test/resources/h2/h2-init.sql @@ -18,8 +18,7 @@ CREATE TABLE IF NOT EXISTS `metalake_meta` ( ) ENGINE = InnoDB; -CREATE TABLE IF NOT EXISTS `catalog_meta` -( +CREATE TABLE IF NOT EXISTS `catalog_meta` ( `catalog_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id', `catalog_name` VARCHAR(128) NOT NULL COMMENT 'catalog name', `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id', @@ -33,4 +32,21 @@ CREATE TABLE IF NOT EXISTS `catalog_meta` `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'catalog deleted at', PRIMARY KEY (catalog_id), CONSTRAINT uk_mid_cn_del UNIQUE (metalake_id, catalog_name, deleted_at) +) ENGINE=InnoDB; + + +CREATE TABLE IF NOT EXISTS `schema_meta` ( + `schema_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'schema id', + `schema_name` VARCHAR(128) NOT NULL COMMENT 'schema name', + `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id', + `catalog_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id', + `schema_comment` VARCHAR(256) DEFAULT '' COMMENT 'schema comment', + `properties` MEDIUMTEXT DEFAULT NULL COMMENT 'schema properties', + `audit_info` MEDIUMTEXT NOT NULL COMMENT 'schema audit info', + `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'schema current version', + `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'schema last version', + `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'schema deleted at', + PRIMARY KEY (schema_id), + CONSTRAINT uk_cid_sn_del UNIQUE (catalog_id, schema_name, deleted_at), + KEY idx_mid (metalake_id) ) ENGINE=InnoDB; \ No newline at end of file