Skip to content

Commit

Permalink
remove list files while query and invalid cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ShreelekhyaG committed Jul 4, 2022
1 parent b8511b6 commit 0415996
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ public BlockletIndexStore(CarbonLRUCache lruCache) {

@Override
public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper) {
return get(identifierWrapper, null);
}

public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper,
Map<String, Map<String, BlockMetaInfo>> segInfoCache) {
TableBlockIndexUniqueIdentifier identifier =
identifierWrapper.getTableBlockIndexUniqueIdentifier();
String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
Expand All @@ -83,24 +78,11 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie
SegmentIndexFileStore indexFileStore =
new SegmentIndexFileStore(identifierWrapper.getConfiguration());
Set<String> filesRead = new HashSet<>();
String segmentFilePath = identifier.getIndexFilePath();
if (segInfoCache == null) {
segInfoCache = new HashMap<>();
}
Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping =
segInfoCache.get(segmentFilePath);
if (carbonDataFileBlockMetaInfoMapping == null) {
carbonDataFileBlockMetaInfoMapping =
BlockletIndexUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath,
identifierWrapper.getConfiguration());
segInfoCache.put(segmentFilePath, carbonDataFileBlockMetaInfoMapping);
}
// if the identifier is not a merge file we can directly load the indexes
if (identifier.getMergeIndexFileName() == null) {
List<DataFileFooter> indexInfos = new ArrayList<>();
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletIndexUtil
.getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping, indexInfos);
.getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead, indexInfos);
BlockIndex blockIndex =
loadAndGetIndex(identifier, indexFileStore, blockMetaInfoMap,
identifierWrapper.getCarbonTable(),
Expand All @@ -120,8 +102,7 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie
List<DataFileFooter> indexInfos = new ArrayList<>();
Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletIndexUtil.getBlockMetaInfoMap(
new TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier,
identifierWrapper.getCarbonTable()), indexFileStore, filesRead,
carbonDataFileBlockMetaInfoMapping, indexInfos);
identifierWrapper.getCarbonTable()), indexFileStore, filesRead, indexInfos);
if (!blockMetaInfoMap.isEmpty()) {
BlockIndex blockIndex =
loadAndGetIndex(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
Expand Down Expand Up @@ -157,8 +138,6 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie
public List<BlockletIndexWrapper> getAll(
List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiers)
throws IOException {
Map<String, Map<String, BlockMetaInfo>> segInfoCache =
new HashMap<String, Map<String, BlockMetaInfo>>();

List<BlockletIndexWrapper> blockletIndexWrappers =
new ArrayList<>(tableSegmentUniqueIdentifiers.size());
Expand All @@ -177,7 +156,7 @@ public List<BlockletIndexWrapper> getAll(
}
if (missedIdentifiersWrapper.size() > 0) {
for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : missedIdentifiersWrapper) {
blockletIndexWrapper = get(identifierWrapper, segInfoCache);
blockletIndexWrapper = get(identifierWrapper);
blockletIndexWrappers.add(blockletIndexWrapper);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ public static Set<TableBlockIndexUniqueIdentifier> getSegmentUniqueIdentifiers(S

public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
TableBlockIndexUniqueIdentifierWrapper identifierWrapper,
SegmentIndexFileStore indexFileStore, Set<String> filesRead,
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, List<DataFileFooter> indexInfos)
SegmentIndexFileStore indexFileStore, Set<String> filesRead, List<DataFileFooter> indexInfos)
throws IOException {
boolean isTransactionalTable = true;
TableBlockIndexUniqueIdentifier identifier =
Expand Down Expand Up @@ -130,8 +129,7 @@ public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
}
String blockPath = footer.getBlockInfo().getFilePath();
if (null == blockMetaInfoMap.get(blockPath)) {
BlockMetaInfo blockMetaInfo = createBlockMetaInfo(
fileNameToMetaInfoMapping, footer.getBlockInfo());
BlockMetaInfo blockMetaInfo = createBlockMetaInfo(footer.getBlockInfo());
// if blockMetaInfo is null that means the file has been deleted from the file system.
// This can happen in case IUD scenarios where after deleting or updating the data the
// complete block is deleted but the entry still exists in index or merge index file
Expand All @@ -143,38 +141,7 @@ public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
return blockMetaInfoMap;
}

/**
* This method will create file name to block Meta Info Mapping. This method will reduce the
* number of nameNode calls and using this method one namenode will fetch 1000 entries
*
* @param segmentFilePath
* @return
* @throws IOException
*/
public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping(
String segmentFilePath, Configuration configuration) throws IOException {
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath, configuration);
if (carbonFile instanceof AbstractDFSCarbonFile && !(carbonFile instanceof S3CarbonFile)) {
PathFilter pathFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return CarbonTablePath.isCarbonDataFile(path.getName());
}
};
CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles(pathFilter);
for (CarbonFile file : carbonFiles) {
String[] location = file.getLocations();
long len = file.getSize();
BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
fileNameToMetaInfoMapping.put(file.getPath(), blockMetaInfo);
}
}
return fileNameToMetaInfoMapping;
}

private static BlockMetaInfo createBlockMetaInfo(
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, TableBlockInfo blockInfo)
private static BlockMetaInfo createBlockMetaInfo(TableBlockInfo blockInfo)
throws IOException {
String carbonDataFile = blockInfo.getFilePath();
FileFactory.FileType fileType = FileFactory.getFileType(carbonDataFile);
Expand All @@ -193,7 +160,14 @@ private static BlockMetaInfo createBlockMetaInfo(
CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
return new BlockMetaInfo(new String[] { "localhost" }, carbonFile.getSize());
default:
return fileNameToMetaInfoMapping.get(FileFactory.getFormattedPath(carbonDataFile));
if (!FileFactory.isFileExist(carbonDataFile)) {
return null;
}
CarbonFile file = FileFactory.getCarbonFile(FileFactory.getFormattedPath(carbonDataFile));
String[] location = file.getLocations();
long len = file.getSize();
BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
return blockMetaInfo;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@ BlockletIndexDetailsWithSchema> createRecordReader(InputSplit inputSplit,
Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletIndexWrapper> cache =
CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_INDEX);
private Iterator<TableBlockIndexUniqueIdentifier> iterator;
// Cache to avoid multiple times listing of files
private Map<String, Map<String, BlockMetaInfo>> segInfoCache = new HashMap<>();

@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
Expand All @@ -152,8 +150,7 @@ public boolean nextKeyValue() {
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, table,
false, true, true);
this.tableBlockIndexUniqueIdentifierWrapper = tableBlockIndexUniqueIdentifierWrapper;
wrapper = ((BlockletIndexStore) cache)
.get(tableBlockIndexUniqueIdentifierWrapper, segInfoCache);
wrapper = ((BlockletIndexStore) cache).get(tableBlockIndexUniqueIdentifierWrapper);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.command.mutation

import scala.collection.JavaConverters._

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand All @@ -29,6 +31,7 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.index.IndexStoreManager
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
Expand Down Expand Up @@ -135,6 +138,11 @@ private[sql] case class CarbonProjectForDeleteCommand(
throw new Exception(executorErrors.errorMsg)
}

// clear invalid segments from cache
IndexStoreManager.getInstance()
.clearInvalidSegments(carbonTable,
deletedSegments.map(_.getSegmentNo).toList.asJava)

// call IUD Compaction.
HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.command.mutation

import scala.collection.JavaConverters._

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
Expand All @@ -34,7 +36,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.index.Segment
import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
Expand Down Expand Up @@ -190,6 +192,10 @@ private[sql] case class CarbonProjectForUpdateCommand(
updateTableModel,
executionErrors)

// clear invalid segments from cache
IndexStoreManager.getInstance()
.clearInvalidSegments(carbonTable,
segmentsToBeDeleted.map(_.getSegmentNo).toList.asJava)
// pre-priming for update command
DeleteExecution.reloadDistributedSegmentCache(carbonTable,
segmentsToBeDeleted, operationContext)(sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,19 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists carbonTable2")
}

test("test cache after delete") {
sql("drop table if exists carbonTable1")
sql("create table carbonTable1(col1 int, col2 string,col3 string) stored as carbondata")
sql("insert into carbonTable1 select 1, 'ab', 'vf'")
sql("insert into carbonTable1 select 1, 'ab', 'vf'")
var showCache = sql("show metacache on table carbonTable1").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("0/2 index files cached"))
sql("delete from carbonTable1 where col3 ='vf'").collect()
showCache = sql("show metacache on table carbonTable1").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("0/0 index files cached"))
sql("drop table if exists carbonTable1")
}

// Runs only when index server is enabled.
test("test embedded pruning", false) {
val mock: MockUp[CarbonInputFormat[Object]] = new MockUp[CarbonInputFormat[Object]]() {
Expand Down

0 comments on commit 0415996

Please sign in to comment.