From d80834b4ac06a7cda75907d8fb509195dafbb0b7 Mon Sep 17 00:00:00 2001 From: jack86596 Date: Sat, 9 Oct 2021 17:10:28 +0800 Subject: [PATCH] [CARBONDATA-4300] Clean files command supports specify segment ids --- .../core/constants/CarbonCommonConstants.java | 2 + .../statusmanager/SegmentStatusManager.java | 311 +++++++++--------- .../core/util/DeleteLoadFolders.java | 204 +++++------- docs/clean-files.md | 10 +- .../secondaryindex/TestCleanFilesWithSI.scala | 163 +++++++++ .../carbondata/events/CleanFilesEvents.scala | 1 + .../carbondata/spark/util/CommonUtil.scala | 4 + .../carbondata/trash/DataTrashManager.scala | 8 +- .../management/CarbonCleanFilesCommand.scala | 15 +- .../events/CleanFilesPostEventListener.scala | 11 +- .../org/apache/spark/util/CleanFiles.scala | 1 + .../cleanfiles/TestCleanFileCommand.scala | 5 +- .../TestCleanFilesCommandPartitionTable.scala | 5 +- ...sCommandPartitionTableWithSegmentIds.scala | 149 +++++++++ .../TestCleanFilesCommandWithSegmentIds.scala | 227 +++++++++++++ 15 files changed, 829 insertions(+), 287 deletions(-) create mode 100644 index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCleanFilesWithSI.scala create mode 100644 integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTableWithSegmentIds.scala create mode 100644 integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandWithSegmentIds.scala diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index d72d6c1b9f0..786312fe527 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1996,6 +1996,8 @@ private CarbonCommonConstants() { */ public static final long SEGMENT_LOAD_TIME_DEFAULT = -1; + public static final String SEGMENT_ID_PATTERN = "^\\d+(\\.\\d+)?$"; + /** * default name of data base */ diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index 5735839e980..d338445d097 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -29,6 +29,7 @@ import java.nio.charset.Charset; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -56,6 +57,7 @@ import org.apache.carbondata.core.util.DeleteLoadFolders; import org.apache.carbondata.core.util.path.CarbonTablePath; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR; import static org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET; import com.google.gson.Gson; @@ -1033,123 +1035,170 @@ private static List updateLoadMetadataFromOldToNew( return newListMetadata; } - private static class ReturnTuple { - LoadMetadataDetails[] details; - Set loadsToDelete; - ReturnTuple(LoadMetadataDetails[] details, Set loadsToDelete) { - this.details = details; - this.loadsToDelete = loadsToDelete; - } - } - - private static ReturnTuple isUpdateRequired(boolean isForceDeletion, CarbonTable carbonTable, - AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[] details, - boolean cleanStaleInProgress) { - // Delete marked loads - Set loadsToDelete = DeleteLoadFolders - .deleteLoadFoldersFromFileSystem(absoluteTableIdentifier, isForceDeletion, details, - carbonTable.getMetadataPath(), cleanStaleInProgress); - return new ReturnTuple(details, loadsToDelete); - } - - public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, boolean isForceDeletion, - List partitionSpecs, boolean cleanStaleInprogress, - boolean isCleanFilesOperation) throws IOException { - LoadMetadataDetails[] metadataDetails = - SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); + public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, + List filterSegmentList, boolean isForceDeletion, List partitionSpecs, + boolean cleanStaleInprogress, boolean checkSegmentExistence) throws IOException { // delete the expired segment lock files CarbonLockUtil.deleteExpiredSegmentLockFiles(carbonTable); - if (isLoadDeletionRequired(metadataDetails)) { - AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); - boolean updateCompletionStatus = false; - Set loadsToDelete = new HashSet<>(); - LoadMetadataDetails[] newAddedLoadHistoryList = null; - ReturnTuple tuple = - isUpdateRequired(isForceDeletion, carbonTable, identifier, metadataDetails, - cleanStaleInprogress); - if (!tuple.loadsToDelete.isEmpty()) { - ICarbonLock carbonTableStatusLock = - CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK); - boolean locked = false; - try { - int retryCount = CarbonLockUtil - .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, - CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); - int maxTimeout = CarbonLockUtil - .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, - CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); - // Update load metadata file after cleaning deleted nodes - locked = carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout); - if (locked) { - LOG.info("Table status lock has been successfully acquired."); - // Again read status and check to verify update required or not. - LoadMetadataDetails[] details = - SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); - ReturnTuple tuple2 = - isUpdateRequired(isForceDeletion, carbonTable, - identifier, details, cleanStaleInprogress); - if (tuple2.loadsToDelete.isEmpty()) { - return; + AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); + ICarbonLock carbonTableStatusLock = + CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK); + boolean locked = false; + LoadMetadataDetails[] segmentToBeDeletePhysically = null; + try { + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + locked = carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout); + if (locked) { + LOG.info("Table status lock has been successfully acquired."); + LoadMetadataDetails[] metadataDetails = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); + int maxSegmentId = SegmentStatusManager.getMaxSegmentId(metadataDetails); + if (filterSegmentList != null) { + Map> metadataMap = Arrays.stream(metadataDetails) + .collect(Collectors.partitioningBy( + detail -> filterSegmentList.contains(detail.getLoadName()))); + List moveToHistoryDetails = metadataMap.get(true); + if (checkSegmentExistence) { + if (moveToHistoryDetails.isEmpty() && !filterSegmentList.isEmpty()) { + String errorMsg = "Clean files request is failed for " + + carbonTable.getQualifiedName() + ". Segment " + + String.join(",", filterSegmentList) + + " are not exists, please retry later or specify other segments."; + LOG.error(errorMsg); + throw new IOException(errorMsg); } - // read latest table status again. - LoadMetadataDetails[] latestMetadata = - SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); - - int invisibleSegmentPreserveCnt = - CarbonProperties.getInstance().getInvisibleSegmentPreserveCount(); - int maxSegmentId = SegmentStatusManager.getMaxSegmentId(tuple2.details); - int invisibleSegmentCnt = SegmentStatusManager.countInvisibleSegments( - tuple2.details, maxSegmentId); - // if execute command 'clean files' or the number of invisible segment info - // exceeds the value of 'carbon.invisible.segments.preserve.count', - // it need to append the invisible segment list to 'tablestatus.history' file. - if (isCleanFilesOperation || invisibleSegmentCnt > invisibleSegmentPreserveCnt) { - TableStatusReturnTuple tableStatusReturn = separateVisibleAndInvisibleSegments( - tuple2.details, latestMetadata, invisibleSegmentCnt, maxSegmentId); - LoadMetadataDetails[] oldLoadHistoryList = readLoadHistoryMetadata( - carbonTable.getMetadataPath()); - LoadMetadataDetails[] newLoadHistoryList = appendLoadHistoryList( - oldLoadHistoryList, tableStatusReturn.arrayOfLoadHistoryDetails); - writeLoadDetailsIntoFile( - CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()), - tableStatusReturn.arrayOfLoadDetails); - writeLoadDetailsIntoFile( - CarbonTablePath.getTableStatusHistoryFilePath(carbonTable.getTablePath()), - newLoadHistoryList); - // the segments which will be moved to history file need to be deleted - newAddedLoadHistoryList = tableStatusReturn.arrayOfLoadHistoryDetails; - } else { - // update the metadata details from old to new status. - List latestStatus = - updateLoadMetadataFromOldToNew(tuple2.details, latestMetadata); - writeLoadDetailsIntoFile( - CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()), - latestStatus.toArray(new LoadMetadataDetails[0])); + List nonExistsFilterSegments = new ArrayList<>(); + for (String segmentId : filterSegmentList) { + boolean found = false; + for (LoadMetadataDetails detail : moveToHistoryDetails) { + if (detail.getLoadName().equalsIgnoreCase(segmentId)) { + found = true; + break; + } + } + if (!found) { + nonExistsFilterSegments.add(segmentId); + } } - updateCompletionStatus = true; - loadsToDelete = tuple2.loadsToDelete; - } else { - String dbName = identifier.getCarbonTableIdentifier().getDatabaseName(); - String tableName = identifier.getCarbonTableIdentifier().getTableName(); - String errorMsg = "Clean files request is failed for " + - dbName + "." + tableName + - ". Not able to acquire the table status lock due to other operation " + - "running in the background."; + if (!nonExistsFilterSegments.isEmpty()) { + String errorMsg = "Clean files request is failed for " + + carbonTable.getQualifiedName() + ". Segment " + + String.join(",", nonExistsFilterSegments) + + " are not exists, please retry later or specify other segments."; + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + } + if (moveToHistoryDetails.isEmpty()) { + return; + } + List loadCannotBeDeleted = DeleteLoadFolders.loadsCannotBeDeleted( + identifier, moveToHistoryDetails); + if (checkSegmentExistence && !loadCannotBeDeleted.isEmpty()) { + String errorMsg = "Clean files request is failed for " + carbonTable.getQualifiedName() + + ". Segment " + String.join(",", loadCannotBeDeleted) + + " cannot be deleted, please retry later or specify other segments."; LOG.error(errorMsg); - throw new IOException(errorMsg + " Please try after some time."); + throw new IOException(errorMsg); } - } finally { - if (locked) { - CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); + List retainInStatusDetails = metadataMap.get(false); + segmentToBeDeletePhysically = + prepareSegmentsToBeDeletedPhysicallyAndSaveMetadataDetails(carbonTable, + moveToHistoryDetails, retainInStatusDetails, maxSegmentId); + } else { + if (isLoadDeletionRequired(metadataDetails)) { + boolean isUpdateRequired = DeleteLoadFolders.deleteLoadFoldersFromFileSystem(identifier, + isForceDeletion, metadataDetails, cleanStaleInprogress); + if (isUpdateRequired) { + int invisibleSegmentPreserveCnt = + CarbonProperties.getInstance().getInvisibleSegmentPreserveCount(); + int invisibleSegmentCnt = + SegmentStatusManager.countInvisibleSegments(metadataDetails, maxSegmentId); + // if execute command 'clean files' or the number of invisible segment info + // exceeds the value of 'carbon.invisible.segments.preserve.count', + // it needs to append the invisible segment list to 'tablestatus.history' file. + if (isForceDeletion || invisibleSegmentCnt > invisibleSegmentPreserveCnt) { + Map> metadataMap = + Arrays.stream(metadataDetails).collect(Collectors.partitioningBy(detail -> + detail.getVisibility().equalsIgnoreCase("false"))); + List moveToHistoryDetails = metadataMap.get(true); + List retainInStatusDetails = metadataMap.get(false); + segmentToBeDeletePhysically = + prepareSegmentsToBeDeletedPhysicallyAndSaveMetadataDetails(carbonTable, + moveToHistoryDetails, retainInStatusDetails, maxSegmentId); + } else { + // even no newly added history details, but the visibility of load may change + // during deleteLoadFoldersFromFileSystem method, so need to save the change to disk + writeLoadDetailsIntoFile( + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()), + metadataDetails); + } + } } - if (updateCompletionStatus) { - DeleteLoadFolders - .physicalFactAndMeasureMetadataDeletion(carbonTable, newAddedLoadHistoryList, - isForceDeletion, partitionSpecs, cleanStaleInprogress, loadsToDelete); + } + } else { + String errorMsg = "Clean files request is failed for " + carbonTable.getQualifiedName() + + ". Not able to acquire the table status lock due to other operation" + + "running in the background."; + } + } finally { + if (locked) { + CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); + } + if (segmentToBeDeletePhysically != null) { + DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion(carbonTable, + segmentToBeDeletePhysically, partitionSpecs); + } + } + } + + public static LoadMetadataDetails[] prepareSegmentsToBeDeletedPhysicallyAndSaveMetadataDetails( + CarbonTable carbonTable, List moveToHistoryDetails, + List retainInStatusDetails, int maxSegmentId) throws IOException { + LoadMetadataDetails[] segmentToBeDeletedPhysically = + new LoadMetadataDetails[moveToHistoryDetails.size()]; + Iterator iterator = moveToHistoryDetails.iterator(); + int i = 0; + while (iterator.hasNext()) { + LoadMetadataDetails detail = iterator.next(); + // the segment which will be moved to history file need to be deleted + segmentToBeDeletedPhysically[i++] = detail; + // segment 0 and segment which max id are special, + // should always keep in tablestatus file, so remove them from deleted details lists, + // add them back to left details list + if (detail.getLoadName().equalsIgnoreCase("0")) { + iterator.remove(); + retainInStatusDetails.add(0, detail); + } else if (detail.getLoadName().equalsIgnoreCase(String.valueOf(maxSegmentId))) { + iterator.remove(); + int position = -1; + for (int j = retainInStatusDetails.size() - 1; j >= 0; j--) { + if (detail.getLoadStartTime() > retainInStatusDetails.get(j).getLoadStartTime()) { + position = j; + break; } } + retainInStatusDetails.add(position + 1, detail); } } + writeLoadDetailsIntoFile( + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()), + retainInStatusDetails.toArray(new LoadMetadataDetails[0])); + if (!moveToHistoryDetails.isEmpty()) { + LoadMetadataDetails[] oldLoadHistoryList = + readLoadHistoryMetadata(carbonTable.getMetadataPath()); + LoadMetadataDetails[] newLoadHistoryList = appendLoadHistoryList( + oldLoadHistoryList, moveToHistoryDetails.toArray(new LoadMetadataDetails[0])); + writeLoadDetailsIntoFile( + CarbonTablePath.getTableStatusHistoryFilePath(carbonTable.getTablePath()), + newLoadHistoryList); + } + return segmentToBeDeletedPhysically; } public static void truncateTable(CarbonTable carbonTable) @@ -1226,56 +1275,6 @@ public static int countInvisibleSegments( return invisibleSegmentCnt; } - private static class TableStatusReturnTuple { - LoadMetadataDetails[] arrayOfLoadDetails; - LoadMetadataDetails[] arrayOfLoadHistoryDetails; - TableStatusReturnTuple(LoadMetadataDetails[] arrayOfLoadDetails, - LoadMetadataDetails[] arrayOfLoadHistoryDetails) { - this.arrayOfLoadDetails = arrayOfLoadDetails; - this.arrayOfLoadHistoryDetails = arrayOfLoadHistoryDetails; - } - } - - /** - * Separate visible and invisible segments into two array. - */ - public static TableStatusReturnTuple separateVisibleAndInvisibleSegments( - LoadMetadataDetails[] oldList, - LoadMetadataDetails[] newList, - int invisibleSegmentCnt, - int maxSegmentId) { - int newSegmentsLength = newList.length; - int visibleSegmentCnt = newSegmentsLength - invisibleSegmentCnt; - LoadMetadataDetails[] arrayOfVisibleSegments = new LoadMetadataDetails[visibleSegmentCnt]; - LoadMetadataDetails[] arrayOfInvisibleSegments = new LoadMetadataDetails[invisibleSegmentCnt]; - int oldSegmentsLength = oldList.length; - int visibleIdx = 0; - int invisibleIdx = 0; - for (int i = 0; i < newSegmentsLength; i++) { - LoadMetadataDetails newSegment = newList[i]; - if (i < oldSegmentsLength) { - LoadMetadataDetails oldSegment = oldList[i]; - if (newSegment.getLoadName().equalsIgnoreCase("0") - || newSegment.getLoadName().equalsIgnoreCase(String.valueOf(maxSegmentId))) { - newSegment.setVisibility(oldSegment.getVisibility()); - arrayOfVisibleSegments[visibleIdx] = newSegment; - visibleIdx++; - } else if ("false".equalsIgnoreCase(oldSegment.getVisibility())) { - newSegment.setVisibility("false"); - arrayOfInvisibleSegments[invisibleIdx] = newSegment; - invisibleIdx++; - } else { - arrayOfVisibleSegments[visibleIdx] = newSegment; - visibleIdx++; - } - } else { - arrayOfVisibleSegments[visibleIdx] = newSegment; - visibleIdx++; - } - } - return new TableStatusReturnTuple(arrayOfVisibleSegments, arrayOfInvisibleSegments); - } - /** * Return an array containing all invisible segment entries in appendList and historyList. */ diff --git a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java index 746fbb6e04f..391095dfcaa 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java @@ -19,9 +19,7 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; @@ -38,7 +36,6 @@ import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; -import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -67,44 +64,14 @@ private static String getSegmentPath(AbsoluteTableIdentifier identifier, return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId); } - public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable, - LoadMetadataDetails[] newAddedLoadHistoryList, - boolean isForceDelete, - List specs, - boolean cleanStaleInProgress, - Set loadsToDelete) { - LoadMetadataDetails[] currentDetails = - SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); - physicalFactAndMeasureMetadataDeletion(carbonTable, - currentDetails, - isForceDelete, - specs, - currentDetails, - cleanStaleInProgress, - loadsToDelete); - if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) { - physicalFactAndMeasureMetadataDeletion(carbonTable, - newAddedLoadHistoryList, - isForceDelete, - specs, - currentDetails, - cleanStaleInProgress, - loadsToDelete); - } - } - /** * Delete the invalid data physically from table. * @param carbonTable table * @param loadDetails Load details which need clean up - * @param isForceDelete Force delete Compacted and MFD segments. it will empty the trash folder * @param specs Partition specs - * @param currLoadDetails Current table status load details which are required for update manager. */ - private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable, - LoadMetadataDetails[] loadDetails, boolean isForceDelete, List specs, - LoadMetadataDetails[] currLoadDetails, boolean cleanStaleInProgress, - Set loadsToDelete) { + public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable, + LoadMetadataDetails[] loadDetails, List specs) { List indexes = new ArrayList<>(); try { for (TableIndex index : IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)) { @@ -118,69 +85,69 @@ private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTab carbonTable.getAbsoluteTableIdentifier().getDatabaseName(), carbonTable.getAbsoluteTableIdentifier().getTableName())); } + LoadMetadataDetails[] currLoadDetails = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(carbonTable, currLoadDetails); for (final LoadMetadataDetails oneLoad : loadDetails) { - if (loadsToDelete.contains(oneLoad.getLoadName())) { - try { - if (oneLoad.getSegmentFile() != null) { - String tablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath(); - Segment segment = new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()); - // No need to delete physical data for external segments. - if (oneLoad.getPath() == null || oneLoad.getPath().equalsIgnoreCase("NA")) { - SegmentFileStore.deleteSegment(tablePath, segment, specs, updateStatusManager); - } - // delete segment files for all segments. - SegmentFileStore.deleteSegmentFile(tablePath, segment); - } else { - String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad); - boolean status = false; - if (FileFactory.isFileExist(path)) { - CarbonFile file = FileFactory.getCarbonFile(path); - CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() { - - @Override - public boolean accept(CarbonFile file) { - return (CarbonTablePath.isCarbonDataFile(file.getName()) || - CarbonTablePath.isCarbonIndexFile(file.getName())); - } - }); + try { + if (oneLoad.getSegmentFile() != null) { + String tablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath(); + Segment segment = new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()); + // No need to delete physical data for external segments. + if (oneLoad.getPath() == null || oneLoad.getPath().equalsIgnoreCase("NA")) { + SegmentFileStore.deleteSegment(tablePath, segment, specs, updateStatusManager); + } + // delete segment files for all segments. + SegmentFileStore.deleteSegmentFile(tablePath, segment); + } else { + String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad); + boolean status = false; + if (FileFactory.isFileExist(path)) { + CarbonFile file = FileFactory.getCarbonFile(path); + CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() { + + @Override + public boolean accept(CarbonFile file) { + return (CarbonTablePath.isCarbonDataFile(file.getName()) || + CarbonTablePath.isCarbonIndexFile(file.getName())); + } + }); - //if there are no fact and msr metadata files present then no need to keep - //entry in metadata. - if (filesToBeDeleted.length == 0) { - status = true; - } else { + //if there are no fact and msr metadata files present then no need to keep + //entry in metadata. + if (filesToBeDeleted.length == 0) { + status = true; + } else { - for (CarbonFile eachFile : filesToBeDeleted) { - if (!eachFile.delete()) { - LOGGER.warn("Unable to delete the file as per delete command " + eachFile - .getAbsolutePath()); - status = false; - } else { - status = true; - } - } - } - // need to delete the complete folder. - if (status) { - if (!file.delete()) { - LOGGER.warn("Unable to delete the folder as per delete command " + file + for (CarbonFile eachFile : filesToBeDeleted) { + if (!eachFile.delete()) { + LOGGER.warn("Unable to delete the file as per delete command " + eachFile .getAbsolutePath()); + status = false; + } else { + status = true; } } - } + // need to delete the complete folder. + if (status) { + if (!file.delete()) { + LOGGER.warn("Unable to delete the folder as per delete command " + file + .getAbsolutePath()); + } + } + } - List segments = new ArrayList<>(1); - for (TableIndex index : indexes) { - segments.clear(); - segments.add(new Segment(oneLoad.getLoadName())); - index.deleteIndexData(segments); - } - } catch (Exception e) { - LOGGER.warn("Unable to delete the file as per delete command " + oneLoad.getLoadName()); } + List segments = new ArrayList<>(1); + for (TableIndex index : indexes) { + segments.clear(); + segments.add(new Segment(oneLoad.getLoadName())); + index.deleteIndexData(segments); + } + } catch (Exception e) { + LOGGER.warn("Unable to delete the file as per delete command " + oneLoad.getLoadName()); } } } @@ -195,6 +162,30 @@ private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad, return false; } + /** + * Used for clean files with specific segment ids, when segment ids are specified, + * only all specified segments can be deleted, then we continue the clean files operation, + * otherwise, throw exception and show the segment which cannot be deleted. + * @return segment id list which contains all the segments which cannot be deleted + */ + public static List loadsCannotBeDeleted( + AbsoluteTableIdentifier absoluteTableIdentifier, List details) { + List loadsCannotBeDeleted = new ArrayList<>(); + if (details != null && !details.isEmpty()) { + for (LoadMetadataDetails oneLoad : details) { + if (checkIfLoadCanBeDeleted(oneLoad, true, true, absoluteTableIdentifier)) { + oneLoad.setVisibility("false"); + LOGGER.info("Deleted the load " + oneLoad.getLoadName()); + } else { + loadsCannotBeDeleted.add(oneLoad.getLoadName()); + LOGGER.info("Segment " + oneLoad.getLoadName() + " cannot be deleted at this moment, its" + + " status is " + oneLoad.getSegmentStatus()); + } + } + } + return loadsCannotBeDeleted; + } + public static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad, boolean isForceDelete, boolean cleanStaleInProgress, AbsoluteTableIdentifier absoluteTableIdentifier) { @@ -229,44 +220,21 @@ public static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad, boolean } } - private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId, - String metadataPath) { - LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath); - for (LoadMetadataDetails oneLoad : currentDetails) { - if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) { - return oneLoad; - } - } - return null; - } - - public static Set deleteLoadFoldersFromFileSystem( - AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete, LoadMetadataDetails[] - details, String metadataPath, boolean cleanStaleInProgress) { - Set loadsToDelete = new HashSet<>(); + public static boolean deleteLoadFoldersFromFileSystem( + AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete, + LoadMetadataDetails[] details, boolean cleanStaleInProgress) { + boolean isDeleted = false; if (details != null && details.length != 0) { for (LoadMetadataDetails oneLoad : details) { if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete, cleanStaleInProgress, absoluteTableIdentifier)) { - if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS - || oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) { - LoadMetadataDetails currentDetails = - getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), metadataPath); - if (currentDetails != null && checkIfLoadCanBeDeleted(currentDetails, - isForceDelete, cleanStaleInProgress, absoluteTableIdentifier)) { - oneLoad.setVisibility("false"); - loadsToDelete.add(oneLoad.getLoadName()); - LOGGER.info("Deleted the load " + oneLoad.getLoadName()); - } - } else { - oneLoad.setVisibility("false"); - loadsToDelete.add(oneLoad.getLoadName()); - LOGGER.info("Deleted the load " + oneLoad.getLoadName()); - } + oneLoad.setVisibility("false"); + isDeleted = true; + LOGGER.info("Deleted the load " + oneLoad.getLoadName()); } } } - return loadsToDelete; + return isDeleted; } private static boolean canSegmentLockBeAcquired(LoadMetadataDetails oneLoad, diff --git a/docs/clean-files.md b/docs/clean-files.md index e5bde49ac43..dc69d2f6c89 100644 --- a/docs/clean-files.md +++ b/docs/clean-files.md @@ -101,4 +101,12 @@ clean files operation, the user can disable that option by using ```statistics = ``` CLEAN FILES FOR TABLE TABLE_NAME options('statistics'='false') ``` - \ No newline at end of file + +### SEGMENT_IDS +Clean files operation can specify segments to be deleted instead of delete all the Marked For Delete and Compacted segments after the number of theses segments reaches carbon.invisible.segments.preserve.count. +User can specify segments with option ```segment_ids```. Value of this option is the segment ids user wants to delete. Only Marked for Delete and Compacted segment ids are valid. If invalid ids are given, operation will fail directly. +If segments are specified, ```force``` option will be ignored. + + ``` + CLEAN FILES FOR TABLE TABLE_NAME options('segment_ids'='0,1,2') + ``` diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCleanFilesWithSI.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCleanFilesWithSI.scala new file mode 100644 index 00000000000..049e95827a0 --- /dev/null +++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCleanFilesWithSI.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.secondaryindex + +import java.io.File + +import org.apache.spark.sql.CarbonEnv +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterEach + +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath + + +/** + * test cases for testing clean files command on table with SI + */ +class TestCleanFilesWithSI extends QueryTest with BeforeAndAfterEach { + + var mainTable : CarbonTable = _ + var mainTablePath : String = _ + var indexTable : CarbonTable = _ + var indexTablePath : String = _ + + override protected def beforeEach(): Unit = { + sql("drop table if exists test.maintable") + sql("drop database if exists test cascade") + sql("create database test") + sql("CREATE TABLE test.maintable(a INT, b STRING, c STRING) stored as carbondata") + sql("CREATE INDEX indextable1 on table test.maintable(c) as 'carbondata'") + sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'") + sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'") + sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'") + sql("DELETE FROM TABLE test.maintable WHERE SEGMENT.ID IN(0,1,2)") + + mainTable = + CarbonEnv.getCarbonTable(Option("test"), "maintable")(sqlContext.sparkSession) + mainTablePath = + CarbonTablePath.getPartitionDir(mainTable.getTablePath) + indexTable = + CarbonEnv.getCarbonTable(Option("test"), "indextable1")(sqlContext.sparkSession) + indexTablePath = + CarbonTablePath.getPartitionDir(indexTable.getTablePath) + } + + test("Test clean files on table with secondary index and si missing segment 0") { + // simulate main table and SI table segment inconsistent scenario + sql("CLEAN FILES FOR TABLE test.indextable1 options('segment_ids'='0')") + sql("CLEAN FILES FOR TABLE test.maintable options('segment_ids'='0,1')") + + var mainTableSegmentCount = sql("SHOW SEGMENTS FOR TABLE test.maintable").count() + // main table segment 2 + assert(mainTableSegmentCount == 1) + var siTableSegmentCount = sql("SHOW SEGMENTS FOR TABLE test.indextable1").count() + // si table segment 2 + assert(siTableSegmentCount == 1) + + var segmentFolders = new File(mainTablePath).listFiles() + assert(segmentFolders.length == 1) + segmentFolders = new File(indexTablePath).listFiles() + assert(segmentFolders.length == 1) + + sql("CLEAN FILES FOR TABLE test.maintable options('segment_ids'='2')") + mainTableSegmentCount = sql("SHOW SEGMENTS FOR TABLE test.maintable").count() + // main table has no segment left + assert(mainTableSegmentCount == 0) + siTableSegmentCount = sql("SHOW SEGMENTS FOR TABLE test.indextable1").count() + // si table has no segment left + assert(siTableSegmentCount == 0) + + segmentFolders = new File(mainTablePath).listFiles() + assert(segmentFolders.length == 0) + segmentFolders = new File(indexTablePath).listFiles() + assert(segmentFolders.length == 0) + } + + test("Test clean files on table with secondary index and si missing segment 1") { + // simulate main table and SI table segment inconsistent scenario + sql("CLEAN FILES FOR TABLE test.indextable1 options('segment_ids'='1')") + sql("CLEAN FILES FOR TABLE test.maintable options('segment_ids'='0,1')") + + var mainTableSegmentCount = sql("SHOW SEGMENTS FOR TABLE test.maintable").count() + // main table segment 2 + assert(mainTableSegmentCount == 1) + var siTableSegmentCount = sql("SHOW SEGMENTS FOR TABLE test.indextable1").count() + // si table segment 2 + assert(siTableSegmentCount == 1) + + var segmentFolders = new File(mainTablePath).listFiles() + assert(segmentFolders.length == 1) + segmentFolders = new File(indexTablePath).listFiles() + assert(segmentFolders.length == 1) + + sql("CLEAN FILES FOR TABLE test.maintable options('segment_ids'='2')") + mainTableSegmentCount = sql("SHOW SEGMENTS FOR TABLE test.maintable").count() + // main table has no segment left + assert(mainTableSegmentCount == 0) + siTableSegmentCount = sql("SHOW SEGMENTS FOR TABLE test.indextable1").count() + // si table has no segment left + assert(siTableSegmentCount == 0) + + segmentFolders = new File(mainTablePath).listFiles() + assert(segmentFolders.length == 0) + segmentFolders = new File(indexTablePath).listFiles() + assert(segmentFolders.length == 0) + } + + test("Test clean files on table with secondary index" + + " and si metadata and physical files is not consistent") { + // simulate SI table metadata and physical files not consistent, remove segment 1 from si + // tablestatus + val siLoadDetails = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath) + val modifiedLoadDetails = new Array[LoadMetadataDetails](siLoadDetails.length - 1) + modifiedLoadDetails(0) = siLoadDetails(0) + modifiedLoadDetails(1) = siLoadDetails(2) + SegmentStatusManager.writeLoadDetailsIntoFile( + CarbonTablePath.getTableStatusFilePath(indexTable.getTablePath), modifiedLoadDetails) + + sql("CLEAN FILES FOR TABLE test.maintable options('segment_ids'='0,1')") + var mainTableSegmentCount = sql("SHOW SEGMENTS FOR TABLE test.maintable").count() + // main table segment 2 + assert(mainTableSegmentCount == 1) + var siTableSegmentCount = sql("SHOW SEGMENTS FOR TABLE test.indextable1").count() + // si table segment 2 + assert(siTableSegmentCount == 1) + + var segmentFolders = new File(mainTablePath).listFiles() + assert(segmentFolders.length == 1) + segmentFolders = new File(indexTablePath).listFiles() + // si table Segment_1 folder is not cleaned because segment 1 is not presented in tablestatus + assert(segmentFolders.length == 2) + + sql("CLEAN FILES FOR TABLE test.maintable options('segment_ids'='2')") + mainTableSegmentCount = sql("SHOW SEGMENTS FOR TABLE test.maintable").count() + // main table has no segment left + assert(mainTableSegmentCount == 0) + siTableSegmentCount = sql("SHOW SEGMENTS FOR TABLE test.indextable1").count() + // si table has no segment left + assert(siTableSegmentCount == 0) + + segmentFolders = new File(mainTablePath).listFiles() + assert(segmentFolders.length == 0) + segmentFolders = new File(indexTablePath).listFiles() + // si table Segment_1 folder is not cleaned because segment 1 is not presented in tablestatus + assert(segmentFolders.length == 1) + assert(segmentFolders.head.getName.equalsIgnoreCase("Segment_1")) + } +} diff --git a/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala b/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala index 44add8b59d6..b951ee8638c 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala @@ -25,6 +25,7 @@ case class CleanFilesPreEvent(carbonTable: CarbonTable, sparkSession: SparkSessi case class CleanFilesPostEvent( carbonTable: CarbonTable, + filterSegmentList: java.util.List[String], sparkSession: SparkSession, options: Map[String, String]) extends Event with CleanFilesEventInfo diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 19056da6358..62a34ac3a33 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -1067,4 +1067,8 @@ object CommonUtil { } } } + + def isSegmentId(str: String): Boolean = { + CarbonCommonConstants.SEGMENT_ID_PATTERN.r.pattern.matcher(str).matches(); + } } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala b/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala index a41a323bf91..5554517efde 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala @@ -49,6 +49,7 @@ object DataTrashManager { */ def cleanGarbageData( carbonTable: CarbonTable, + filterSegmentList: java.util.List[String], isForceDelete: Boolean, cleanStaleInProgress: Boolean, showStatistics: Boolean, @@ -88,13 +89,13 @@ object DataTrashManager { if (showStatistics) { val metadataDetails = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) val sizeBeforeCleaning = getPreOpSizeSnapshot(carbonTable, metadataDetails) - checkAndCleanExpiredSegments(carbonTable, isForceDelete, + checkAndCleanExpiredSegments(carbonTable, filterSegmentList, isForceDelete, cleanStaleInProgress, partitionSpecs) val sizeAfterCleaning = getPostOpSizeSnapshot(carbonTable, metadataDetails .map(a => a.getLoadName).toSet) (sizeBeforeCleaning - sizeAfterCleaning + trashFolderSizeStats._1).abs } else { - checkAndCleanExpiredSegments(carbonTable, isForceDelete, + checkAndCleanExpiredSegments(carbonTable, filterSegmentList, isForceDelete, cleanStaleInProgress, partitionSpecs) 0 } @@ -192,11 +193,12 @@ object DataTrashManager { private def checkAndCleanExpiredSegments( carbonTable: CarbonTable, + filterSegmentList: java.util.List[String], isForceDelete: Boolean, cleanStaleInProgress: Boolean, partitionSpecsOption: Option[Seq[PartitionSpec]]): Unit = { val partitionSpecs = partitionSpecsOption.map(_.asJava).orNull - SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, + SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, filterSegmentList, isForceDelete, partitionSpecs, cleanStaleInProgress, true) if (carbonTable.isHivePartitionTable && partitionSpecsOption.isDefined) { SegmentFileStore.cleanSegments(carbonTable, partitionSpecs, isForceDelete) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala index 6461604e350..d22f43432d1 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala @@ -30,6 +30,7 @@ import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.ByteUtil import org.apache.carbondata.events._ +import org.apache.carbondata.spark.util.CommonUtil import org.apache.carbondata.trash.DataTrashManager /** @@ -91,11 +92,23 @@ case class CarbonCleanFilesCommand( Seq(Row(ByteUtil.convertByteToReadable(result._1), ByteUtil .convertByteToReadable(result._2))) } else { + val segment_ids = options.get("segment_ids"); + import collection.JavaConverters._ + val segmentList: java.util.List[String] = segment_ids match { + case Some(str) if str.nonEmpty => str.split(",").map(_.trim).toList.asJava + case Some(str) if str.isEmpty => List.empty.asJava + case _ => null + } + if (segmentList != null && !segmentList.isEmpty && + segmentList.asScala.exists(!CommonUtil.isSegmentId(_))) { + throw new MalformedCarbonCommandException("Invalid segment id in clean files command.") + } val preEvent = CleanFilesPreEvent(carbonTable, sparkSession) - val postEvent = CleanFilesPostEvent(carbonTable, sparkSession, options) + val postEvent = CleanFilesPostEvent(carbonTable, segmentList, sparkSession, options) withEvents(preEvent, postEvent) { sizeCleaned = DataTrashManager.cleanGarbageData( carbonTable, + segmentList, options.getOrElse("force", "false").toBoolean, options.getOrElse("stale_inprogress", "false").toBoolean, showStats, diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala index 2c946090872..008eadf8456 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala @@ -54,6 +54,7 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging { cleanFilesForIndex( cleanFilesPostEvent.sparkSession, cleanFilesPostEvent.carbonTable, + cleanFilesPostEvent.filterSegmentList, cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean, cleanFilesPostEvent.options.getOrElse("stale_inprogress", "false").toBoolean) @@ -67,6 +68,7 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging { private def cleanFilesForIndex( sparkSession: SparkSession, carbonTable: CarbonTable, + filterSegmentList: java.util.List[String], isForceDelete: Boolean, cleanStaleInProgress: Boolean): Unit = { val indexTables = CarbonIndexUtil @@ -76,10 +78,11 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging { Seq.empty[Expression], sparkSession, indexTable) - SegmentStatusManager.deleteLoadsAndUpdateMetadata( - indexTable, isForceDelete, partitions.map(_.asJava).orNull, cleanStaleInProgress, - true) - cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable) + SegmentStatusManager.deleteLoadsAndUpdateMetadata(indexTable, filterSegmentList, + isForceDelete, partitions.map(_.asJava).orNull, cleanStaleInProgress, false) + if (filterSegmentList == null) { + cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable) + } } } diff --git a/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala index 40bd805bc4b..e92e9129b48 100644 --- a/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala +++ b/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala @@ -38,6 +38,7 @@ object CleanFiles { val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark) DataTrashManager.cleanGarbageData( carbonTable, + null, isForceDeletion, cleanStaleInProgress, false, diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala index b36872c3447..eecf9166a4d 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala @@ -73,7 +73,7 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll { val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession) .getTablePath val trashFolderPath = CarbonTablePath.getTrashFolderPath(path) - editTableStatusFile(path) + assert(editTableStatusFile(path)) assert(!FileFactory.isFileExist(trashFolderPath)) val segmentNumber1 = sql(s"""show segments for table cleantest""").count() @@ -531,7 +531,7 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll { } - def editTableStatusFile(carbonTablePath: String) : Unit = { + def editTableStatusFile(carbonTablePath: String) : Boolean = { // original table status file val f1 = new File(CarbonTablePath.getTableStatusFilePath(carbonTablePath)) val f2 = new File(CarbonTablePath.getMetadataPath(carbonTablePath) + CarbonCommonConstants @@ -547,6 +547,7 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll { // scalastyle:on println bufferedSource.close() w.close() + f1.delete(); f2.renameTo(f1) } diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala index 7f8ef03c264..724390c2106 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala @@ -41,7 +41,7 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession) .getTablePath val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR - editTableStatusFile(path) + assert(editTableStatusFile(path)) assert(!FileFactory.isFileExist(trashFolderPath)) val segmentNumber1 = sql(s"""show segments for table cleantest""").count() assert(segmentNumber1 == 4) @@ -291,7 +291,7 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA sql("""DROP TABLE IF EXISTS CLEANTEST""") } - def editTableStatusFile(carbonTablePath: String) : Unit = { + def editTableStatusFile(carbonTablePath: String) : Boolean = { // Original Table status file val f1 = new File(CarbonTablePath.getTableStatusFilePath(carbonTablePath)) // duplicate @@ -308,6 +308,7 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA // scalastyle:on println bufferedSource.close w.close() + f1.delete(); f2.renameTo(f1) } diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTableWithSegmentIds.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTableWithSegmentIds.scala new file mode 100644 index 00000000000..331fc992f2e --- /dev/null +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTableWithSegmentIds.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.cleanfiles + +import java.io.File + +import org.apache.spark.sql.CarbonEnv +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterEach + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil + +class TestCleanFilesCommandPartitionTableWithSegmentIds extends QueryTest with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { + createPartitionTable() + loadData() + } + + test("Test clean files after delete command") { + sql(s"""Delete from table cleantest where segment.id in(0, 1, 2, 3)""") + var showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 4) + assert(showSegments.count(_._2 == "Marked for Delete") == 4) + sql("CLEAN FILES FOR TABLE cleantest OPTIONS('segment_ids'='0, 1,\t2,3')") + showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.isEmpty) + showSegments = sql("show history segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 4) + assert(showSegments.count(_._2 == "Marked for Delete") == 4) + + // check all segments data are deleted successfully + val table = CarbonEnv.getCarbonTable(None, "cleantest")(sqlContext.sparkSession) + val tableFolder = new File(table.getTablePath).listFiles() + assert(!tableFolder.exists(f => + f.getName.contains("add=abc") || f.getName.contains("add=adc"))) + } + + test("Test clean files after compaction command with segment_ids option") { + sql(s"""alter table cleantest compact 'custom' where segment.id in(0, 1, 2, 3)""") + var showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 5) + assert(showSegments.count(_._2 == "Compacted") == 4) + assert(showSegments.count(_._2 == "Success") == 1) + sql("CLEAN FILES FOR TABLE cleantest OPTIONS('segment_ids'='0, 1,\t2,3')") + showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 1) + + // check all segments data before compaction are deleted successfully + val table = CarbonEnv.getCarbonTable(None, "cleantest")(sqlContext.sparkSession) + val abcFiles = new File( + table.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "add=abc").listFiles() + val adcFiles = new File( + table.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "add=adc").listFiles() + for (file <- abcFiles) { + if (file.getPath.endsWith("carbondata")) { + assert(DataFileUtil.getSegmentNo(file.getName).equals("0.1")) + } + } + for (file <- adcFiles) { + if (file.getPath.endsWith("carbondata")) { + assert(DataFileUtil.getSegmentNo(file.getName).equals("0.1")) + } + } + } + + test("Test clean files after compaction command without segment_ids option") { + sql(s"""alter table cleantest compact 'custom' where segment.id in(0, 1, 2, 3)""") + var showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 5) + assert(showSegments.count(_._2 == "Compacted") == 4) + assert(showSegments.count(_._2 == "Success") == 1) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true") + sql("CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')") + CarbonProperties.getInstance() + .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED) + showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 1) + + // check all segments data before compaction are deleted successfully + val table = CarbonEnv.getCarbonTable(None, "cleantest")(sqlContext.sparkSession) + val abcFiles = new File( + table.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "add=abc").listFiles() + val adcFiles = new File( + table.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "add=adc").listFiles() + for (file <- abcFiles) { + if (file.getPath.endsWith("carbondata")) { + assert(DataFileUtil.getSegmentNo(file.getName).equals("0.1")) + } + } + for (file <- adcFiles) { + if (file.getPath.endsWith("carbondata")) { + assert(DataFileUtil.getSegmentNo(file.getName).equals("0.1")) + } + } + } + + override protected def afterEach(): Unit = { + sql("drop table if exists cleantest") + } + + def createPartitionTable() : Unit = { + sql("""DROP TABLE IF EXISTS CLEANTEST""") + sql( + """ + | CREATE TABLE CLEANTEST (id Int, id1 INT, name STRING ) PARTITIONED BY (add String) + | STORED AS carbondata + """.stripMargin) + } + + def loadData() : Unit = { + sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"bob","abc"""") + sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"jack","abc"""") + sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"johnny","adc"""") + sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"Reddit","adc"""") + } +} diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandWithSegmentIds.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandWithSegmentIds.scala new file mode 100644 index 00000000000..c6a02847b24 --- /dev/null +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandWithSegmentIds.scala @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.cleanfiles + +import java.io.{File, IOException} + +import org.apache.spark.sql.CarbonEnv +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterEach + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.spark.util.CommonUtil + +class TestCleanFilesCommandWithSegmentIds extends QueryTest with BeforeAndAfterEach { + override protected def beforeEach(): Unit = { + sql( + """ + | CREATE TABLE if not exists cleantest (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int, empno int) + | STORED AS carbondata + """.stripMargin) + loadData() + } + + test("Test clean files after delete command") { + sql(s"""Delete from table cleantest where segment.id in(0, 1, 2, 3)""") + var showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 4) + assert(showSegments.count(_._2 == "Marked for Delete") == 4) + sql("CLEAN FILES FOR TABLE cleantest OPTIONS('segment_ids'='0, 1,\t2,3')") + showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.isEmpty) + showSegments = sql("show history segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 4) + assert(showSegments.count(_._2 == "Marked for Delete") == 4) + + // check all segments data are deleted successfully + val table = CarbonEnv.getCarbonTable(None, "cleantest")(sqlContext.sparkSession) + val path = CarbonTablePath.getPartitionDir(table.getTablePath) + val segmentFolders = new File(path).listFiles() + assert(segmentFolders.isEmpty) + } + + test("Test clean files after delete command with empty segment_ids option") { + sql(s"""Delete from table cleantest where segment.id in(0, 1, 2, 3)""") + var showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 4) + assert(showSegments.count(_._2 == "Marked for Delete") == 4) + sql("CLEAN FILES FOR TABLE cleantest OPTIONS('segment_ids'='')") + showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 4) + assert(showSegments.count(_._2 == "Marked for Delete") == 4) + + // check no segment data are deleted + val table = CarbonEnv.getCarbonTable(None, "cleantest")(sqlContext.sparkSession) + val path = CarbonTablePath.getPartitionDir(table.getTablePath) + val segmentFolders = new File(path).listFiles() + assert(segmentFolders.length == 4) + } + + test("Test clean files after compaction command with segment_ids option") { + sql(s"""alter table cleantest compact 'custom' where segment.id in(0, 1, 2, 3)""") + var showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 5) + assert(showSegments.count(_._2 == "Compacted") == 4) + assert(showSegments.count(_._2 == "Success") == 1) + sql("CLEAN FILES FOR TABLE cleantest OPTIONS('segment_ids'='0, 1,\t2,3')") + showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 1) + + // check all segments data are deleted successfully + val table = CarbonEnv.getCarbonTable(None, "cleantest")(sqlContext.sparkSession) + val path = CarbonTablePath.getPartitionDir(table.getTablePath) + val segmentFolders = new File(path).listFiles() + assert(segmentFolders.length == 1) + assert(segmentFolders.head.getName.contains("Segment_0.1")) + } + + test("Test clean files after compaction command with segment_ids option with 1 segment") { + sql(s"""alter table cleantest compact 'custom' where segment.id in(0, 1, 2, 3)""") + var showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 5) + assert(showSegments.count(_._2 == "Compacted") == 4) + assert(showSegments.count(_._2 == "Success") == 1) + sql("CLEAN FILES FOR TABLE cleantest OPTIONS('segment_ids'='0')") + showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 4) + + // check all segments data are deleted successfully + val table = CarbonEnv.getCarbonTable(None, "cleantest")(sqlContext.sparkSession) + val path = CarbonTablePath.getPartitionDir(table.getTablePath) + val segmentFolders = new File(path).listFiles() + assert(segmentFolders.length == 4) + } + + test("Test clean files after compaction command without segment_ids option") { + sql(s"""alter table cleantest compact 'custom' where segment.id in(0, 1, 2, 3)""") + var showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 5) + assert(showSegments.count(_._2 == "Compacted") == 4) + assert(showSegments.count(_._2 == "Success") == 1) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true") + sql("CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')") + CarbonProperties.getInstance() + .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED) + showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 1) + + // check all segments data are deleted successfully + val table = CarbonEnv.getCarbonTable(None, "cleantest")(sqlContext.sparkSession) + val path = CarbonTablePath.getPartitionDir(table.getTablePath) + val segmentFolders = new File(path).listFiles() + assert(segmentFolders.length == 1) + assert(segmentFolders.head.getName.contains("Segment_0.1")) + } + + test("Test clean files on Success segment") { + // clean files on all segments + val ex = intercept[IOException] { + sql("CLEAN FILES FOR TABLE cleantest OPTIONS('segment_ids'='0, 1, 2, 3')") + } + assert(ex.getMessage.contains( + "Clean files request is failed for default.cleantest. Segment 0,1,2,3")) + + // check no segment data are deleted + val table = CarbonEnv.getCarbonTable(None, "cleantest")(sqlContext.sparkSession) + val path = CarbonTablePath.getPartitionDir(table.getTablePath) + val segmentFolders = new File(path).listFiles() + assert(segmentFolders.length == 4) + } + + test("Test clean files on non exists segment") { + // clean files on non exists segments + val ex = intercept[IOException] { + sql("CLEAN FILES FOR TABLE cleantest OPTIONS('segment_ids'='4')") + } + assert(ex.getMessage.contains( + "Clean files request is failed for default.cleantest. Segment 4 are not exists")) + val showSegments = sql("show segments for table cleantest").collect().map { + a => (a.get(0), a.get(1)) + } + assert(showSegments.length == 4) + assert(showSegments.count(_._2 == "Success") == 4) + + // check no segment data are deleted + val table = CarbonEnv.getCarbonTable(None, "cleantest")(sqlContext.sparkSession) + val path = CarbonTablePath.getPartitionDir(table.getTablePath) + val segmentFolders = new File(path).listFiles() + assert(segmentFolders.length == 4) + } + + test("Test clean files with incorrect segment_ids option") { + val ex = intercept[MalformedCarbonCommandException] { + sql("CLEAN FILES FOR TABLE cleantest OPTIONS('segment_ids'='s, d, e, f')") + } + assert(ex.getMessage.contains("Invalid segment id in clean files command.")) + } + + test("Test CommonUtil.isSegmentId") { + val segmentIds = List("0", "1", "2", "0.1", "1.1") + val nonSegmentIds = List("1.", "1..", ".0", "1..1", "a", "1.a") + assert(segmentIds.count(CommonUtil.isSegmentId) == 5) + assert(!nonSegmentIds.exists(CommonUtil.isSegmentId)) + } + + override protected def afterEach(): Unit = { + sql("drop table if exists cleantest") + } + + def loadData() : Unit = { + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE cleantest OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE cleantest OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE cleantest OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE cleantest OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + } +}