From c63d26c86937fc9c654c40e0bd2ee2e83ef02bab Mon Sep 17 00:00:00 2001 From: ShreelekhyaG Date: Mon, 4 Jul 2022 19:00:20 +0530 Subject: [PATCH] remove list files while query and invalid cache --- .../core/indexstore/BlockletIndexStore.java | 28 ++-------- .../core/util/BlockletIndexUtil.java | 54 ++++--------------- .../jobs/BlockletIndexInputFormat.java | 8 +-- .../CarbonProjectForDeleteCommand.scala | 8 +++ .../CarbonProjectForUpdateCommand.scala | 8 ++- .../commands/TestCarbonShowCacheCommand.scala | 13 +++++ 6 files changed, 43 insertions(+), 76 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java index 14d0f775226..2320bdc579d 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -67,11 +66,6 @@ public BlockletIndexStore(CarbonLRUCache lruCache) { @Override public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper) { - return get(identifierWrapper, null); - } - - public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper, - Map> segInfoCache) { TableBlockIndexUniqueIdentifier identifier = identifierWrapper.getTableBlockIndexUniqueIdentifier(); String lruCacheKey = identifier.getUniqueTableSegmentIdentifier(); @@ -83,24 +77,11 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(identifierWrapper.getConfiguration()); Set filesRead = new HashSet<>(); - String segmentFilePath = identifier.getIndexFilePath(); - if (segInfoCache == null) { - segInfoCache = new HashMap<>(); - } - Map carbonDataFileBlockMetaInfoMapping = - segInfoCache.get(segmentFilePath); - if (carbonDataFileBlockMetaInfoMapping == null) { - carbonDataFileBlockMetaInfoMapping = - BlockletIndexUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath, - identifierWrapper.getConfiguration()); - segInfoCache.put(segmentFilePath, carbonDataFileBlockMetaInfoMapping); - } // if the identifier is not a merge file we can directly load the indexes if (identifier.getMergeIndexFileName() == null) { List indexInfos = new ArrayList<>(); Map blockMetaInfoMap = BlockletIndexUtil - .getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead, - carbonDataFileBlockMetaInfoMapping, indexInfos); + .getBlockMetaInfoMap(identifierWrapper, indexFileStore, filesRead, indexInfos); BlockIndex blockIndex = loadAndGetIndex(identifier, indexFileStore, blockMetaInfoMap, identifierWrapper.getCarbonTable(), @@ -120,8 +101,7 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie List indexInfos = new ArrayList<>(); Map blockMetaInfoMap = BlockletIndexUtil.getBlockMetaInfoMap( new TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier, - identifierWrapper.getCarbonTable()), indexFileStore, filesRead, - carbonDataFileBlockMetaInfoMapping, indexInfos); + identifierWrapper.getCarbonTable()), indexFileStore, filesRead, indexInfos); if (!blockMetaInfoMap.isEmpty()) { BlockIndex blockIndex = loadAndGetIndex(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap, @@ -157,8 +137,6 @@ public BlockletIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifie public List getAll( List tableSegmentUniqueIdentifiers) throws IOException { - Map> segInfoCache = - new HashMap>(); List blockletIndexWrappers = new ArrayList<>(tableSegmentUniqueIdentifiers.size()); @@ -177,7 +155,7 @@ public List getAll( } if (missedIdentifiersWrapper.size() > 0) { for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : missedIdentifiersWrapper) { - blockletIndexWrapper = get(identifierWrapper, segInfoCache); + blockletIndexWrapper = get(identifierWrapper); blockletIndexWrappers.add(blockletIndexWrapper); } } diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletIndexUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletIndexUtil.java index 6f427682e27..dc6f7dea707 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/BlockletIndexUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletIndexUtil.java @@ -31,16 +31,13 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.compression.CompressorFactory; -import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.filesystem.S3CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.index.Segment; import org.apache.carbondata.core.indexstore.BlockMetaInfo; @@ -61,9 +58,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.commons.io.FilenameUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.log4j.Logger; public class BlockletIndexUtil { @@ -79,8 +73,7 @@ public static Set getSegmentUniqueIdentifiers(S public static Map getBlockMetaInfoMap( TableBlockIndexUniqueIdentifierWrapper identifierWrapper, - SegmentIndexFileStore indexFileStore, Set filesRead, - Map fileNameToMetaInfoMapping, List indexInfos) + SegmentIndexFileStore indexFileStore, Set filesRead, List indexInfos) throws IOException { boolean isTransactionalTable = true; TableBlockIndexUniqueIdentifier identifier = @@ -130,8 +123,7 @@ public static Map getBlockMetaInfoMap( } String blockPath = footer.getBlockInfo().getFilePath(); if (null == blockMetaInfoMap.get(blockPath)) { - BlockMetaInfo blockMetaInfo = createBlockMetaInfo( - fileNameToMetaInfoMapping, footer.getBlockInfo()); + BlockMetaInfo blockMetaInfo = createBlockMetaInfo(footer.getBlockInfo()); // if blockMetaInfo is null that means the file has been deleted from the file system. // This can happen in case IUD scenarios where after deleting or updating the data the // complete block is deleted but the entry still exists in index or merge index file @@ -143,38 +135,7 @@ public static Map getBlockMetaInfoMap( return blockMetaInfoMap; } - /** - * This method will create file name to block Meta Info Mapping. This method will reduce the - * number of nameNode calls and using this method one namenode will fetch 1000 entries - * - * @param segmentFilePath - * @return - * @throws IOException - */ - public static Map createCarbonDataFileBlockMetaInfoMapping( - String segmentFilePath, Configuration configuration) throws IOException { - Map fileNameToMetaInfoMapping = new TreeMap(); - CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath, configuration); - if (carbonFile instanceof AbstractDFSCarbonFile && !(carbonFile instanceof S3CarbonFile)) { - PathFilter pathFilter = new PathFilter() { - @Override - public boolean accept(Path path) { - return CarbonTablePath.isCarbonDataFile(path.getName()); - } - }; - CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles(pathFilter); - for (CarbonFile file : carbonFiles) { - String[] location = file.getLocations(); - long len = file.getSize(); - BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len); - fileNameToMetaInfoMapping.put(file.getPath(), blockMetaInfo); - } - } - return fileNameToMetaInfoMapping; - } - - private static BlockMetaInfo createBlockMetaInfo( - Map fileNameToMetaInfoMapping, TableBlockInfo blockInfo) + private static BlockMetaInfo createBlockMetaInfo(TableBlockInfo blockInfo) throws IOException { String carbonDataFile = blockInfo.getFilePath(); FileFactory.FileType fileType = FileFactory.getFileType(carbonDataFile); @@ -193,7 +154,14 @@ private static BlockMetaInfo createBlockMetaInfo( CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile); return new BlockMetaInfo(new String[] { "localhost" }, carbonFile.getSize()); default: - return fileNameToMetaInfoMapping.get(FileFactory.getFormattedPath(carbonDataFile)); + if (!FileFactory.isFileExist(carbonDataFile)) { + return null; + } + CarbonFile file = FileFactory.getCarbonFile(FileFactory.getFormattedPath(carbonDataFile)); + String[] location = file.getLocations(); + long len = file.getSize(); + BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len); + return blockMetaInfo; } } diff --git a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/jobs/BlockletIndexInputFormat.java b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/jobs/BlockletIndexInputFormat.java index 19a48eeec33..dab102d3cae 100644 --- a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/jobs/BlockletIndexInputFormat.java +++ b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/jobs/BlockletIndexInputFormat.java @@ -20,11 +20,9 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -38,7 +36,6 @@ import org.apache.carbondata.core.index.dev.CacheableIndex; import org.apache.carbondata.core.index.dev.IndexFactory; import org.apache.carbondata.core.index.dev.expr.IndexExprWrapper; -import org.apache.carbondata.core.indexstore.BlockMetaInfo; import org.apache.carbondata.core.indexstore.BlockletIndexStore; import org.apache.carbondata.core.indexstore.BlockletIndexWrapper; import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; @@ -127,8 +124,6 @@ BlockletIndexDetailsWithSchema> createRecordReader(InputSplit inputSplit, Cache cache = CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_INDEX); private Iterator iterator; - // Cache to avoid multiple times listing of files - private Map> segInfoCache = new HashMap<>(); @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) @@ -152,8 +147,7 @@ public boolean nextKeyValue() { new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, table, false, true, true); this.tableBlockIndexUniqueIdentifierWrapper = tableBlockIndexUniqueIdentifierWrapper; - wrapper = ((BlockletIndexStore) cache) - .get(tableBlockIndexUniqueIdentifierWrapper, segInfoCache); + wrapper = ((BlockletIndexStore) cache).get(tableBlockIndexUniqueIdentifierWrapper); return true; } return false; diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala index 5435ed374da..268abd4143d 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command.mutation +import scala.collection.JavaConverters._ + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -29,6 +31,7 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.features.TableOperation +import org.apache.carbondata.core.index.IndexStoreManager import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager @@ -135,6 +138,11 @@ private[sql] case class CarbonProjectForDeleteCommand( throw new Exception(executorErrors.errorMsg) } + // clear invalid segments from cache + IndexStoreManager.getInstance() + .clearInvalidSegments(carbonTable, + deletedSegments.map(_.getSegmentNo).toList.asJava) + // call IUD Compaction. HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 773c4a10691..d611456436b 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command.mutation +import scala.collection.JavaConverters._ + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} @@ -34,7 +36,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.features.TableOperation -import org.apache.carbondata.core.index.Segment +import org.apache.carbondata.core.index.{IndexStoreManager, Segment} import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} @@ -190,6 +192,10 @@ private[sql] case class CarbonProjectForUpdateCommand( updateTableModel, executionErrors) + // clear invalid segments from cache + IndexStoreManager.getInstance() + .clearInvalidSegments(carbonTable, + segmentsToBeDeleted.map(_.getSegmentNo).toList.asJava) // pre-priming for update command DeleteExecution.reloadDistributedSegmentCache(carbonTable, segmentsToBeDeleted, operationContext)(sparkSession) diff --git a/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala b/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala index a282fe01447..22d2a571f26 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala @@ -442,6 +442,19 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll { sql("drop table if exists carbonTable2") } + test("test cache after delete") { + sql("drop table if exists carbonTable1") + sql("create table carbonTable1(col1 int, col2 string,col3 string) stored as carbondata") + sql("insert into carbonTable1 select 1, 'ab', 'vf'") + sql("insert into carbonTable1 select 1, 'ab', 'vf'") + var showCache = sql("show metacache on table carbonTable1").collect() + assert(showCache(0).get(2).toString.equalsIgnoreCase("0/2 index files cached")) + sql("delete from carbonTable1 where col3 ='vf'").collect() + showCache = sql("show metacache on table carbonTable1").collect() + assert(showCache(0).get(2).toString.equalsIgnoreCase("0/0 index files cached")) + sql("drop table if exists carbonTable1") + } + // Runs only when index server is enabled. test("test embedded pruning", false) { val mock: MockUp[CarbonInputFormat[Object]] = new MockUp[CarbonInputFormat[Object]]() {