From e5a79edcaa9605a6f2803dafe6851d4eedd24920 Mon Sep 17 00:00:00 2001 From: fenzhu Date: Fri, 15 Jul 2022 17:06:25 +0800 Subject: [PATCH] [CARMEL-6076][Followup] Fix UT failure caused by compacting empty directory (#1010) * [CARMEL-6076][Followup] Fix UT failure caused by compacting empty directory dddd tttt * show path --- .../apache/spark/sql/internal/SQLConf.scala | 4 ++-- .../datasources/InMemoryFileIndex.scala | 20 ++++++++++++------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ee8c0c6eea045..a13cd5fa867a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2521,11 +2521,11 @@ object SQLConf { val CONCURRENT_READ_LOCK_CLUSTER = buildConf("spark.sql.source.output.concurrent.read.lock.cluster") .doc("Clusters that enable read lock for empty path listing. We can configure" + - "it as comma seperated string list, e.g., 'hermes,apollo'.") + "it as comma separated string list, e.g., 'hermes,apollo'.") .version("3.0.0") .internal() .stringConf - .createWithDefault("hermes") + .createWithDefault("all") val TABLE_COMPACT_FLAG_PATH = buildConf("spark.sql.source.table.compact.path") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index d55ace1bea83c..341721ac61268 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -216,14 +216,20 @@ object InMemoryFileIndex extends Logging { )) val validTableLocation: Option[URI] = if (tableLocation != null && tableLocation.isDefined) { - val tl = tableLocation.get.getPath.trim.toLowerCase(Locale.ROOT) + val tl = new Path(tableLocation.get.getPath.trim) + val fs = SparkHadoopWriterUtils.getNewFileSystem(tl, hadoopConf) + val tlp = fs.makeQualified(tl).toString.toLowerCase(Locale.ROOT) val readLockCluster = sparkSession.sessionState.conf.concurrentReadLockCluster - val readLockClusters: Seq[String] = - readLockCluster.split(",").toSeq.map(_.trim.toLowerCase(Locale.ROOT)) - logInfo(s"Configured cluster list to enable read lock: $readLockClusters") - val enforced: Boolean = readLockClusters.exists(cluster => tl.contains(cluster)) - logInfo(s"Table location is: $tl, read lock enforced = $enforced") - if (enforced) tableLocation else null + logInfo(s"Configured cluster that enables read lock for empty directory: $readLockCluster") + if (readLockCluster.trim.equalsIgnoreCase("all")) { + tableLocation + } else { + val readLockClusters: Seq[String] = + readLockCluster.split(",").toSeq.map(_.trim.toLowerCase(Locale.ROOT)) + val enforced: Boolean = readLockClusters.exists(cluster => tlp.contains(cluster)) + logInfo(s"Table location is: $tlp, read lock enforced = $enforced") + if (enforced) tableLocation else null + } } else { tableLocation }