Skip to content

Commit

Permalink
[CARBONDATA-4300] Clean files command supports specify segment ids
Browse files Browse the repository at this point in the history
  • Loading branch information
jack86596 committed Oct 9, 2021
1 parent 8b3d78b commit d80834b
Show file tree
Hide file tree
Showing 15 changed files with 829 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1996,6 +1996,8 @@ private CarbonCommonConstants() {
*/
public static final long SEGMENT_LOAD_TIME_DEFAULT = -1;

public static final String SEGMENT_ID_PATTERN = "^\\d+(\\.\\d+)?$";

/**
* default name of data base
*/
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
Expand All @@ -38,7 +36,6 @@
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;
Expand Down Expand Up @@ -67,44 +64,14 @@ private static String getSegmentPath(AbsoluteTableIdentifier identifier,
return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
}

public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
LoadMetadataDetails[] newAddedLoadHistoryList,
boolean isForceDelete,
List<PartitionSpec> specs,
boolean cleanStaleInProgress,
Set<String> loadsToDelete) {
LoadMetadataDetails[] currentDetails =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
physicalFactAndMeasureMetadataDeletion(carbonTable,
currentDetails,
isForceDelete,
specs,
currentDetails,
cleanStaleInProgress,
loadsToDelete);
if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) {
physicalFactAndMeasureMetadataDeletion(carbonTable,
newAddedLoadHistoryList,
isForceDelete,
specs,
currentDetails,
cleanStaleInProgress,
loadsToDelete);
}
}

/**
* Delete the invalid data physically from table.
* @param carbonTable table
* @param loadDetails Load details which need clean up
* @param isForceDelete Force delete Compacted and MFD segments. it will empty the trash folder
* @param specs Partition specs
* @param currLoadDetails Current table status load details which are required for update manager.
*/
private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
LoadMetadataDetails[] loadDetails, boolean isForceDelete, List<PartitionSpec> specs,
LoadMetadataDetails[] currLoadDetails, boolean cleanStaleInProgress,
Set<String> loadsToDelete) {
public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
LoadMetadataDetails[] loadDetails, List<PartitionSpec> specs) {
List<TableIndex> indexes = new ArrayList<>();
try {
for (TableIndex index : IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)) {
Expand All @@ -118,69 +85,69 @@ private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTab
carbonTable.getAbsoluteTableIdentifier().getDatabaseName(),
carbonTable.getAbsoluteTableIdentifier().getTableName()));
}
LoadMetadataDetails[] currLoadDetails =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
SegmentUpdateStatusManager updateStatusManager =
new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
for (final LoadMetadataDetails oneLoad : loadDetails) {
if (loadsToDelete.contains(oneLoad.getLoadName())) {
try {
if (oneLoad.getSegmentFile() != null) {
String tablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
Segment segment = new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile());
// No need to delete physical data for external segments.
if (oneLoad.getPath() == null || oneLoad.getPath().equalsIgnoreCase("NA")) {
SegmentFileStore.deleteSegment(tablePath, segment, specs, updateStatusManager);
}
// delete segment files for all segments.
SegmentFileStore.deleteSegmentFile(tablePath, segment);
} else {
String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad);
boolean status = false;
if (FileFactory.isFileExist(path)) {
CarbonFile file = FileFactory.getCarbonFile(path);
CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {

@Override
public boolean accept(CarbonFile file) {
return (CarbonTablePath.isCarbonDataFile(file.getName()) ||
CarbonTablePath.isCarbonIndexFile(file.getName()));
}
});
try {
if (oneLoad.getSegmentFile() != null) {
String tablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
Segment segment = new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile());
// No need to delete physical data for external segments.
if (oneLoad.getPath() == null || oneLoad.getPath().equalsIgnoreCase("NA")) {
SegmentFileStore.deleteSegment(tablePath, segment, specs, updateStatusManager);
}
// delete segment files for all segments.
SegmentFileStore.deleteSegmentFile(tablePath, segment);
} else {
String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad);
boolean status = false;
if (FileFactory.isFileExist(path)) {
CarbonFile file = FileFactory.getCarbonFile(path);
CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {

@Override
public boolean accept(CarbonFile file) {
return (CarbonTablePath.isCarbonDataFile(file.getName()) ||
CarbonTablePath.isCarbonIndexFile(file.getName()));
}
});

//if there are no fact and msr metadata files present then no need to keep
//entry in metadata.
if (filesToBeDeleted.length == 0) {
status = true;
} else {
//if there are no fact and msr metadata files present then no need to keep
//entry in metadata.
if (filesToBeDeleted.length == 0) {
status = true;
} else {

for (CarbonFile eachFile : filesToBeDeleted) {
if (!eachFile.delete()) {
LOGGER.warn("Unable to delete the file as per delete command " + eachFile
.getAbsolutePath());
status = false;
} else {
status = true;
}
}
}
// need to delete the complete folder.
if (status) {
if (!file.delete()) {
LOGGER.warn("Unable to delete the folder as per delete command " + file
for (CarbonFile eachFile : filesToBeDeleted) {
if (!eachFile.delete()) {
LOGGER.warn("Unable to delete the file as per delete command " + eachFile
.getAbsolutePath());
status = false;
} else {
status = true;
}
}

}
// need to delete the complete folder.
if (status) {
if (!file.delete()) {
LOGGER.warn("Unable to delete the folder as per delete command " + file
.getAbsolutePath());
}
}

}
List<Segment> segments = new ArrayList<>(1);
for (TableIndex index : indexes) {
segments.clear();
segments.add(new Segment(oneLoad.getLoadName()));
index.deleteIndexData(segments);
}
} catch (Exception e) {
LOGGER.warn("Unable to delete the file as per delete command " + oneLoad.getLoadName());
}
List<Segment> segments = new ArrayList<>(1);
for (TableIndex index : indexes) {
segments.clear();
segments.add(new Segment(oneLoad.getLoadName()));
index.deleteIndexData(segments);
}
} catch (Exception e) {
LOGGER.warn("Unable to delete the file as per delete command " + oneLoad.getLoadName());
}
}
}
Expand All @@ -195,6 +162,30 @@ private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
return false;
}

/**
* Used for clean files with specific segment ids, when segment ids are specified,
* only all specified segments can be deleted, then we continue the clean files operation,
* otherwise, throw exception and show the segment which cannot be deleted.
* @return segment id list which contains all the segments which cannot be deleted
*/
public static List<String> loadsCannotBeDeleted(
AbsoluteTableIdentifier absoluteTableIdentifier, List<LoadMetadataDetails> details) {
List<String> loadsCannotBeDeleted = new ArrayList<>();
if (details != null && !details.isEmpty()) {
for (LoadMetadataDetails oneLoad : details) {
if (checkIfLoadCanBeDeleted(oneLoad, true, true, absoluteTableIdentifier)) {
oneLoad.setVisibility("false");
LOGGER.info("Deleted the load " + oneLoad.getLoadName());
} else {
loadsCannotBeDeleted.add(oneLoad.getLoadName());
LOGGER.info("Segment " + oneLoad.getLoadName() + " cannot be deleted at this moment, its"
+ " status is " + oneLoad.getSegmentStatus());
}
}
}
return loadsCannotBeDeleted;
}

public static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad, boolean
isForceDelete, boolean cleanStaleInProgress, AbsoluteTableIdentifier
absoluteTableIdentifier) {
Expand Down Expand Up @@ -229,44 +220,21 @@ public static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad, boolean
}
}

private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId,
String metadataPath) {
LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
for (LoadMetadataDetails oneLoad : currentDetails) {
if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) {
return oneLoad;
}
}
return null;
}

public static Set<String> deleteLoadFoldersFromFileSystem(
AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete, LoadMetadataDetails[]
details, String metadataPath, boolean cleanStaleInProgress) {
Set<String> loadsToDelete = new HashSet<>();
public static boolean deleteLoadFoldersFromFileSystem(
AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete,
LoadMetadataDetails[] details, boolean cleanStaleInProgress) {
boolean isDeleted = false;
if (details != null && details.length != 0) {
for (LoadMetadataDetails oneLoad : details) {
if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete, cleanStaleInProgress,
absoluteTableIdentifier)) {
if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
|| oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
LoadMetadataDetails currentDetails =
getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), metadataPath);
if (currentDetails != null && checkIfLoadCanBeDeleted(currentDetails,
isForceDelete, cleanStaleInProgress, absoluteTableIdentifier)) {
oneLoad.setVisibility("false");
loadsToDelete.add(oneLoad.getLoadName());
LOGGER.info("Deleted the load " + oneLoad.getLoadName());
}
} else {
oneLoad.setVisibility("false");
loadsToDelete.add(oneLoad.getLoadName());
LOGGER.info("Deleted the load " + oneLoad.getLoadName());
}
oneLoad.setVisibility("false");
isDeleted = true;
LOGGER.info("Deleted the load " + oneLoad.getLoadName());
}
}
}
return loadsToDelete;
return isDeleted;
}

private static boolean canSegmentLockBeAcquired(LoadMetadataDetails oneLoad,
Expand Down
10 changes: 9 additions & 1 deletion docs/clean-files.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,12 @@ clean files operation, the user can disable that option by using ```statistics =
```
CLEAN FILES FOR TABLE TABLE_NAME options('statistics'='false')
```


### SEGMENT_IDS
Clean files operation can specify segments to be deleted instead of delete all the Marked For Delete and Compacted segments after the number of theses segments reaches carbon.invisible.segments.preserve.count.
User can specify segments with option ```segment_ids```. Value of this option is the segment ids user wants to delete. Only Marked for Delete and Compacted segment ids are valid. If invalid ids are given, operation will fail directly.
If segments are specified, ```force``` option will be ignored.

```
CLEAN FILES FOR TABLE TABLE_NAME options('segment_ids'='0,1,2')
```
Loading

0 comments on commit d80834b

Please sign in to comment.