Skip to content

Commit

Permalink
[#6380] fix(postgres-sql): Fix errors for PG backend about `delete...…
Browse files Browse the repository at this point in the history
…limit..` clause. (#6393)

### What changes were proposed in this pull request?

PostgreSQL does not support SQL sentences like `DELETE FROM xxxx_table
where xxxx limit 10` , Cluase `limit xxx` is not allowed in the `Delete
syntax`

### Why are the changes needed?

it's a bug.

Fix: #6380 

### Does this PR introduce _any_ user-facing change?

N/A

### How was this patch tested?

UT
  • Loading branch information
yuqi1129 authored Feb 6, 2025
1 parent 71998d9 commit d74ce36
Show file tree
Hide file tree
Showing 22 changed files with 226 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public void start() {
garbageCollectorPool.scheduleAtFixedRate(this::collectAndClean, 5, frequency, TimeUnit.MINUTES);
}

private void collectAndClean() {
@VisibleForTesting
public void collectAndClean() {
long threadId = Thread.currentThread().getId();
LOG.info("Thread {} start to collect garbage...", threadId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ public String softDeleteCatalogMetasByMetalakeId(Long metalakeId) {
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
}

@Override
public String deleteCatalogMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ TABLE_NAME
+ " WHERE catalog_id IN (SELECT catalog_id FROM "
+ TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}

@Override
public String insertCatalogMetaOnDuplicateKeyUpdate(CatalogPO catalogPO) {
return "INSERT INTO "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.gravitino.storage.relational.mapper.provider.base.FilesetMetaBaseSQLProvider;
import org.apache.gravitino.storage.relational.po.FilesetPO;
import org.apache.ibatis.annotations.Param;

public class FilesetMetaPostgreSQLProvider extends FilesetMetaBaseSQLProvider {
@Override
Expand Down Expand Up @@ -60,6 +61,16 @@ public String softDeleteFilesetMetasByFilesetId(Long filesetId) {
+ " WHERE fileset_id = #{filesetId} AND deleted_at = 0";
}

@Override
public String deleteFilesetMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ META_TABLE_NAME
+ " WHERE fileset_id IN (SELECT fileset_id FROM "
+ META_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}

@Override
public String insertFilesetMetaOnDuplicateKeyUpdate(FilesetPO filesetPO) {
return "INSERT INTO "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.gravitino.storage.relational.mapper.provider.base.FilesetVersionBaseSQLProvider;
import org.apache.gravitino.storage.relational.po.FilesetVersionPO;
import org.apache.ibatis.annotations.Param;

public class FilesetVersionPostgreSQLProvider extends FilesetVersionBaseSQLProvider {
@Override
Expand Down Expand Up @@ -60,6 +61,16 @@ public String softDeleteFilesetVersionsByFilesetId(Long filesetId) {
+ " WHERE fileset_id = #{filesetId} AND deleted_at = 0";
}

@Override
public String deleteFilesetVersionsByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ VERSION_TABLE_NAME
+ " WHERE id IN (SELECT id FROM "
+ VERSION_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}

@Override
public String softDeleteFilesetVersionsByRetentionLine(
Long filesetId, long versionRetentionLine, int limit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.gravitino.storage.relational.mapper.provider.base.GroupMetaBaseSQLProvider;
import org.apache.gravitino.storage.relational.po.GroupPO;
import org.apache.ibatis.annotations.Param;

public class GroupMetaPostgreSQLProvider extends GroupMetaBaseSQLProvider {
@Override
Expand Down Expand Up @@ -95,4 +96,14 @@ public String listExtendedGroupPOsByMetalakeId(Long metalakeId) {
+ " gt.metalake_id = #{metalakeId}"
+ " GROUP BY gt.group_id";
}

@Override
public String deleteGroupMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ GROUP_TABLE_NAME
+ " WHERE group_id IN (SELECT group_id FROM "
+ GROUP_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.List;
import org.apache.gravitino.storage.relational.mapper.provider.base.GroupRoleRelBaseSQLProvider;
import org.apache.ibatis.annotations.Param;

public class GroupRoleRelPostgreSQLProvider extends GroupRoleRelBaseSQLProvider {
@Override
Expand Down Expand Up @@ -70,4 +71,14 @@ public String softDeleteGroupRoleRelByRoleId(Long roleId) {
+ " timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE role_id = #{roleId} AND deleted_at = 0";
}

@Override
public String deleteGroupRoleRelMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ GROUP_ROLE_RELATION_TABLE_NAME
+ " WHERE id IN (SELECT id FROM "
+ GROUP_ROLE_RELATION_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,14 @@ public String updateMetalakeMeta(
+ " AND last_version = #{oldMetalakeMeta.lastVersion}"
+ " AND deleted_at = 0";
}

@Override
public String deleteMetalakeMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ TABLE_NAME
+ " WHERE metalake_id IN (SELECT metalake_id FROM "
+ TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,14 @@ public String softDeleteModelMetasBySchemaId(@Param("schemaId") Long schemaId) {
+ " timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE schema_id = #{schemaId} AND deleted_at = 0";
}

@Override
public String deleteModelMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ ModelMetaMapper.TABLE_NAME
+ " WHERE model_id IN (SELECT model_id FROM "
+ ModelMetaMapper.TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,14 @@ public String softDeleteModelVersionAliasRelsByMetalakeId(@Param("metalakeId") L
+ ModelMetaMapper.TABLE_NAME
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0) AND deleted_at = 0";
}

@Override
public String deleteModelVersionAliasRelsByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ ModelVersionAliasRelMapper.TABLE_NAME
+ " WHERE id IN (SELECT id FROM "
+ ModelVersionAliasRelMapper.TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,14 @@ public String softDeleteModelVersionMetasByMetalakeId(@Param("metalakeId") Long
+ " timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
}

@Override
public String deleteModelVersionMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ ModelVersionMetaMapper.TABLE_NAME
+ " WHERE id IN (SELECT id FROM "
+ ModelMetaMapper.TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
import org.apache.gravitino.storage.relational.mapper.TopicMetaMapper;
import org.apache.gravitino.storage.relational.mapper.provider.base.OwnerMetaBaseSQLProvider;
import org.apache.ibatis.annotations.Param;

public class OwnerMetaPostgreSQLProvider extends OwnerMetaBaseSQLProvider {
@Override
Expand Down Expand Up @@ -117,4 +118,14 @@ public String sotDeleteOwnerRelBySchemaId(Long schemaId) {
+ "ft.fileset_id = ot.metadata_object_id AND ot.metadata_object_type = 'FILESET'"
+ ")";
}

@Override
public String deleteOwnerMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ OWNER_TABLE_NAME
+ " WHERE id IN (SELECT id FROM "
+ OWNER_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.gravitino.storage.relational.mapper.provider.base.RoleMetaBaseSQLProvider;
import org.apache.gravitino.storage.relational.po.RolePO;
import org.apache.ibatis.annotations.Param;

public class RoleMetaPostgreSQLProvider extends RoleMetaBaseSQLProvider {
@Override
Expand Down Expand Up @@ -67,4 +68,14 @@ public String insertRoleMetaOnDuplicateKeyUpdate(RolePO rolePO) {
+ " last_version = #{roleMeta.lastVersion},"
+ " deleted_at = #{roleMeta.deletedAt}";
}

@Override
public String deleteRoleMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ ROLE_TABLE_NAME
+ " WHERE role_id IN (SELECT role_id FROM "
+ ROLE_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.gravitino.storage.relational.mapper.provider.base.SchemaMetaBaseSQLProvider;
import org.apache.gravitino.storage.relational.po.SchemaPO;
import org.apache.ibatis.annotations.Param;

public class SchemaMetaPostgreSQLProvider extends SchemaMetaBaseSQLProvider {
@Override
Expand Down Expand Up @@ -81,4 +82,13 @@ public String softDeleteSchemaMetasByCatalogId(Long catalogId) {
+ " timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
}

public String deleteSchemaMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ TABLE_NAME
+ " WHERE schema_id IN (SELECT schema_id FROM "
+ TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,14 @@ public String softDeleteObjectRelsBySchemaId(@Param("schemaId") Long schemaId) {
+ "ft.fileset_id = sect.metadata_object_id AND sect.type = 'FILESET'"
+ ")";
}

@Override
public String deleteSecurableObjectsByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ SECURABLE_OBJECT_TABLE_NAME
+ " WHERE id IN (SELECT id FROM "
+ ROLE_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,14 @@ public String softDeleteColumnsBySchemaId(@Param("schemaId") Long schemaId) {
+ " timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE schema_id = #{schemaId} AND deleted_at = 0";
}

@Override
public String deleteColumnPOsByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ TableColumnMapper.COLUMN_TABLE_NAME
+ " WHERE id IN (SELECT id FROM "
+ TableColumnMapper.COLUMN_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.gravitino.storage.relational.mapper.provider.base.TableMetaBaseSQLProvider;
import org.apache.gravitino.storage.relational.po.TablePO;
import org.apache.ibatis.annotations.Param;

public class TableMetaPostgreSQLProvider extends TableMetaBaseSQLProvider {
@Override
Expand Down Expand Up @@ -88,4 +89,14 @@ public String softDeleteTableMetasBySchemaId(Long schemaId) {
+ " timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE schema_id = #{schemaId} AND deleted_at = 0";
}

@Override
public String deleteTableMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ TABLE_NAME
+ " WHERE table_id IN (SELECT table_id FROM "
+ TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,14 @@ public String updateTagMeta(
+ " AND last_version = #{oldTagMeta.lastVersion}"
+ " AND deleted_at = 0";
}

@Override
public String deleteTagMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ TAG_TABLE_NAME
+ " WHERE tag_id IN (SELECT tag_id FROM "
+ TAG_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,14 @@ public String listTagMetadataObjectRelsByMetalakeAndTagName(String metalakeName,
+ " WHERE mm.metalake_name = #{metalakeName} AND tm.tag_name = #{tagName}"
+ " AND te.deleted_at = 0 AND tm.deleted_at = 0 AND mm.deleted_at = 0";
}

@Override
public String deleteTagEntityRelsByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ TagMetadataObjectRelMapper.TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+ " WHERE id IN (SELECT id FROM "
+ TagMetadataObjectRelMapper.TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.gravitino.storage.relational.mapper.provider.base.TopicMetaBaseSQLProvider;
import org.apache.gravitino.storage.relational.po.TopicPO;
import org.apache.ibatis.annotations.Param;

public class TopicMetaPostgreSQLProvider extends TopicMetaBaseSQLProvider {

Expand Down Expand Up @@ -93,4 +94,14 @@ public String insertTopicMetaOnDuplicateKeyUpdate(TopicPO topicPO) {
+ " last_version = #{topicMeta.lastVersion},"
+ " deleted_at = #{topicMeta.deletedAt}";
}

@Override
public String deleteTopicMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ TABLE_NAME
+ " WHERE topic_id IN (SELECT topic_id FROM "
+ TABLE_NAME
+ " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.gravitino.storage.relational.mapper.provider.base.UserMetaBaseSQLProvider;
import org.apache.gravitino.storage.relational.po.UserPO;
import org.apache.ibatis.annotations.Param;

public class UserMetaPostgreSQLProvider extends UserMetaBaseSQLProvider {
@Override
Expand Down Expand Up @@ -95,4 +96,14 @@ public String listExtendedUserPOsByMetalakeId(Long metalakeId) {
+ " ut.metalake_id = #{metalakeId}"
+ " GROUP BY ut.user_id";
}

@Override
public String deleteUserMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
return "DELETE FROM "
+ USER_TABLE_NAME
+ " WHERE user_id IN (SELECT user_id FROM "
+ USER_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit})";
}
}
Loading

0 comments on commit d74ce36

Please sign in to comment.