From 606c811a6d9eeca2b65c2b8648d16bfeacb18ca6 Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Sun, 23 Apr 2023 18:26:41 -0700 Subject: [PATCH 1/6] upgrade to scala 2.13 --- build.sbt | 23 +++++++++---------- .../sharing/server/SharedTableManager.scala | 12 +++++----- .../internal/DeltaSharingCDCReader.scala | 6 ++--- .../internal/PartitionFilterUtils.scala | 4 ++-- .../sharing/server/CloudFileSignerSuite.scala | 4 ++-- .../server/DeltaSharingServiceSuite.scala | 5 ++-- .../server/SharedTableManagerSuite.scala | 4 ++-- .../server/config/ServerConfigSuite.scala | 4 ++-- .../JsonPredicateFilterUtilsSuite.scala | 4 ++-- .../internal/JsonPredicateSuite.scala | 4 ++-- .../internal/PartitionFilterUtilsSuite.scala | 13 +++++++---- 11 files changed, 44 insertions(+), 39 deletions(-) diff --git a/build.sbt b/build.sbt index 68c395908..7d4667b39 100644 --- a/build.sbt +++ b/build.sbt @@ -18,11 +18,11 @@ import sbt.ExclusionRule ThisBuild / parallelExecution := false -val sparkVersion = "3.1.1" +val sparkVersion = "3.3.2" lazy val commonSettings = Seq( organization := "io.delta", - scalaVersion := "2.12.10", + scalaVersion := "2.13.5", fork := true, javacOptions ++= Seq("-source", "1.8", "-target", "1.8"), scalacOptions += "-target:jvm-1.8", @@ -76,18 +76,17 @@ lazy val server = (project in file("server")) enablePlugins(JavaAppPackaging) se libraryDependencies ++= Seq( // Pin versions for jackson libraries as the new version of `jackson-module-scala` introduces a // breaking change making us not able to use `delta-standalone`. - "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7", - "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7.3", - "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.6.7.1", - "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % "2.6.7", - "org.json4s" %% "json4s-jackson" % "3.5.3" excludeAll( + "com.fasterxml.jackson.core" % "jackson-core" % "2.13.5", + "com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5", + "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.13.5", + "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % "2.13.5", + "org.json4s" %% "json4s-jackson" % "3.6.6" excludeAll( ExclusionRule("com.fasterxml.jackson.core"), ExclusionRule("com.fasterxml.jackson.module") ), - "com.linecorp.armeria" %% "armeria-scalapb" % "1.6.0" excludeAll( + "com.linecorp.armeria" %% "armeria-scalapb" % "1.15.0" excludeAll( ExclusionRule("com.fasterxml.jackson.core"), - ExclusionRule("com.fasterxml.jackson.module"), - ExclusionRule("org.json4s") + ExclusionRule("com.fasterxml.jackson.module") ), "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf" excludeAll( ExclusionRule("com.fasterxml.jackson.core"), @@ -134,7 +133,7 @@ lazy val server = (project in file("server")) enablePlugins(JavaAppPackaging) se ExclusionRule("com.fasterxml.jackson.module"), ExclusionRule("com.google.guava", "guava") ), - "org.apache.spark" %% "spark-sql" % "2.4.7" excludeAll( + "org.apache.spark" %% "spark-sql" % "3.3.2" excludeAll( ExclusionRule("org.slf4j"), ExclusionRule("io.netty"), ExclusionRule("com.fasterxml.jackson.core"), @@ -146,7 +145,7 @@ lazy val server = (project in file("server")) enablePlugins(JavaAppPackaging) se "org.slf4j" % "slf4j-simple" % "1.6.1", "net.sourceforge.argparse4j" % "argparse4j" % "0.9.0", - "org.scalatest" %% "scalatest" % "3.0.5" % "test" + "org.scalatest" %% "scalatest" % "3.2.15" % "test" ), Compile / PB.targets := Seq( scalapb.gen() -> (Compile / sourceManaged).value / "scalapb" diff --git a/server/src/main/scala/io/delta/sharing/server/SharedTableManager.scala b/server/src/main/scala/io/delta/sharing/server/SharedTableManager.scala index 285204e1b..db884f8b7 100644 --- a/server/src/main/scala/io/delta/sharing/server/SharedTableManager.scala +++ b/server/src/main/scala/io/delta/sharing/server/SharedTableManager.scala @@ -106,9 +106,9 @@ class SharedTableManager(serverConfig: ServerConfig) { nextPageToken: Option[String] = None, maxResults: Option[Int] = None): (Seq[Share], Option[String]) = { getPage(nextPageToken, None, None, maxResults, shares.size) { (start, end) => - shares.asScala.map { share => + shares.asScala.toSeq.map { share => Share().withName(share.getName) - }.slice(start, end) + }.slice(start, end).seq } } @@ -124,7 +124,7 @@ class SharedTableManager(serverConfig: ServerConfig) { val shareConfig = getShareInternal(share) getPage(nextPageToken, Some(share), None, maxResults, shareConfig.getSchemas.size) { (start, end) => - shareConfig.getSchemas.asScala.map { schemaConfig => + shareConfig.getSchemas.asScala.toSeq.map { schemaConfig => Schema().withName(schemaConfig.getName).withShare(share) }.slice(start, end) } @@ -138,7 +138,7 @@ class SharedTableManager(serverConfig: ServerConfig) { val schemaConfig = getSchema(getShareInternal(share), schema) getPage(nextPageToken, Some(share), Some(schema), maxResults, schemaConfig.getTables.size) { (start, end) => - schemaConfig.getTables.asScala.map { + schemaConfig.getTables.asScala.toSeq.map { tableConfig => Table( name = Some(tableConfig.getName), @@ -158,8 +158,8 @@ class SharedTableManager(serverConfig: ServerConfig) { val totalSize = shareConfig.schemas.asScala.map(_.tables.size).sum getPage(nextPageToken, Some(share), None, maxResults, totalSize) { (start, end) => - shareConfig.schemas.asScala.flatMap { schema => - schema.tables.asScala.map { + shareConfig.schemas.asScala.toSeq.flatMap { schema => + schema.tables.asScala.toSeq.map { table => Table( name = Some(table.getName), diff --git a/server/src/main/scala/io/delta/standalone/internal/DeltaSharingCDCReader.scala b/server/src/main/scala/io/delta/standalone/internal/DeltaSharingCDCReader.scala index b1142b820..dd403d1d2 100644 --- a/server/src/main/scala/io/delta/standalone/internal/DeltaSharingCDCReader.scala +++ b/server/src/main/scala/io/delta/standalone/internal/DeltaSharingCDCReader.scala @@ -262,7 +262,7 @@ class DeltaSharingCDCReader(val deltaLog: DeltaLogImpl, val conf: Configuration) // generated, either of them will result in the correct cdc data. // If there are CDC actions, we read them exclusively, and ignore the add/remove actions. if (cdcActions.nonEmpty) { - changeFiles.append(CDCDataSpec(v, ts, cdcActions)) + changeFiles.append(CDCDataSpec(v, ts, cdcActions.toSeq)) } else { // MERGE will sometimes rewrite files in a way which *could* have changed data // (so dataChange = true) but did not actually do so (so no CDC will be produced). @@ -294,8 +294,8 @@ class DeltaSharingCDCReader(val deltaLog: DeltaLogImpl, val conf: Configuration) val removeActions = actions.collect { case r: RemoveFile if r.dataChange => r } - addFiles.append(CDCDataSpec(v, ts, addActions)) - removeFiles.append(CDCDataSpec(v, ts, removeActions)) + addFiles.append(CDCDataSpec(v, ts, addActions.toSeq)) + removeFiles.append(CDCDataSpec(v, ts, removeActions.toSeq)) } } } diff --git a/server/src/main/scala/io/delta/standalone/internal/PartitionFilterUtils.scala b/server/src/main/scala/io/delta/standalone/internal/PartitionFilterUtils.scala index a2b34040b..f1ed28bec 100644 --- a/server/src/main/scala/io/delta/standalone/internal/PartitionFilterUtils.scala +++ b/server/src/main/scala/io/delta/standalone/internal/PartitionFilterUtils.scala @@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory object PartitionFilterUtils { private val logger = LoggerFactory.getLogger(this.getClass) - private lazy val sqlParser = new SparkSqlParser(new SQLConf) + private lazy val sqlParser = new SparkSqlParser() def evaluatePredicate( schemaString: String, @@ -56,7 +56,7 @@ object PartitionFilterUtils { if (exprs.isEmpty) { addFiles } else { - val predicate = InterpretedPredicate.create(exprs.reduce(And), attrs) + val predicate = Predicate.create(exprs.reduce(And), attrs) predicate.initialize(0) addFiles.filter { addFile => val converter = CatalystTypeConverters.createToCatalystConverter(addSchema) diff --git a/server/src/test/scala/io/delta/sharing/server/CloudFileSignerSuite.scala b/server/src/test/scala/io/delta/sharing/server/CloudFileSignerSuite.scala index e632699cb..d6c6042de 100644 --- a/server/src/test/scala/io/delta/sharing/server/CloudFileSignerSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/CloudFileSignerSuite.scala @@ -17,9 +17,9 @@ package io.delta.sharing.server import org.apache.hadoop.fs.Path -import org.scalatest.FunSuite +import org.scalatest.funsuite.AnyFunSuite -class CloudFileSignerSuite extends FunSuite { +class CloudFileSignerSuite extends AnyFunSuite { test("GCSFileSigner.getBucketAndObjectNames") { assert(GCSFileSigner.getBucketAndObjectNames(new Path("gs://delta-sharing-test/foo")) diff --git a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala index f2c0364ba..b1b073404 100644 --- a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala @@ -28,7 +28,8 @@ import scala.collection.mutable.ArrayBuffer import com.linecorp.armeria.server.Server import io.delta.standalone.internal.DeltaCDFErrors import org.apache.commons.io.IOUtils -import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite import scalapb.json4s.JsonFormat import io.delta.sharing.server.config.ServerConfig @@ -37,7 +38,7 @@ import io.delta.sharing.server.protocol._ import io.delta.sharing.server.util.JsonUtils // scalastyle:off maxLineLength -class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { +class DeltaSharingServiceSuite extends AnyFunSuite with BeforeAndAfterAll { def shouldRunIntegrationTest: Boolean = { sys.env.get("AWS_ACCESS_KEY_ID").exists(_.length > 0) && diff --git a/server/src/test/scala/io/delta/sharing/server/SharedTableManagerSuite.scala b/server/src/test/scala/io/delta/sharing/server/SharedTableManagerSuite.scala index e04d142f2..1b07f705f 100644 --- a/server/src/test/scala/io/delta/sharing/server/SharedTableManagerSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/SharedTableManagerSuite.scala @@ -21,12 +21,12 @@ import java.util.{Arrays, Collections} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import org.scalatest.FunSuite +import org.scalatest.funsuite.AnyFunSuite import io.delta.sharing.server.config.{SchemaConfig, ServerConfig, ShareConfig, TableConfig} import io.delta.sharing.server.protocol.{Schema, Share, Table} -class SharedTableManagerSuite extends FunSuite { +class SharedTableManagerSuite extends AnyFunSuite { test("list shares") { val serverConfig = new ServerConfig() diff --git a/server/src/test/scala/io/delta/sharing/server/config/ServerConfigSuite.scala b/server/src/test/scala/io/delta/sharing/server/config/ServerConfigSuite.scala index 04e716b6c..737862c30 100644 --- a/server/src/test/scala/io/delta/sharing/server/config/ServerConfigSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/config/ServerConfigSuite.scala @@ -22,9 +22,9 @@ import java.nio.file.Files import java.util.Arrays import org.apache.commons.io.FileUtils -import org.scalatest.FunSuite +import org.scalatest.funsuite.AnyFunSuite -class ServerConfigSuite extends FunSuite { +class ServerConfigSuite extends AnyFunSuite { def testConfig(content: String, serverConfig: ServerConfig): Unit = { val tempFile = Files.createTempFile("delta-sharing-server", ".yaml").toFile diff --git a/server/src/test/scala/io/delta/standalone/internal/JsonPredicateFilterUtilsSuite.scala b/server/src/test/scala/io/delta/standalone/internal/JsonPredicateFilterUtilsSuite.scala index 94c1c4291..6f998889a 100644 --- a/server/src/test/scala/io/delta/standalone/internal/JsonPredicateFilterUtilsSuite.scala +++ b/server/src/test/scala/io/delta/standalone/internal/JsonPredicateFilterUtilsSuite.scala @@ -17,11 +17,11 @@ package io.delta.standalone.internal import io.delta.standalone.internal.actions.AddFile -import org.scalatest.FunSuite +import org.scalatest.funsuite.AnyFunSuite import io.delta.sharing.server.util.JsonUtils -class JsonPredicateFilterUtilsSuite extends FunSuite { +class JsonPredicateFilterUtilsSuite extends AnyFunSuite { import JsonPredicateFilterUtils._ diff --git a/server/src/test/scala/io/delta/standalone/internal/JsonPredicateSuite.scala b/server/src/test/scala/io/delta/standalone/internal/JsonPredicateSuite.scala index 0251306f7..a1dd18725 100644 --- a/server/src/test/scala/io/delta/standalone/internal/JsonPredicateSuite.scala +++ b/server/src/test/scala/io/delta/standalone/internal/JsonPredicateSuite.scala @@ -18,11 +18,11 @@ package io.delta.standalone.internal -import org.scalatest.FunSuite +import org.scalatest.funsuite.AnyFunSuite import io.delta.sharing.server.util.JsonUtils -class JsonPredicateSuite extends FunSuite { +class JsonPredicateSuite extends AnyFunSuite { /** * A wrapper around op evaluation. diff --git a/server/src/test/scala/io/delta/standalone/internal/PartitionFilterUtilsSuite.scala b/server/src/test/scala/io/delta/standalone/internal/PartitionFilterUtilsSuite.scala index 50d50bf8b..3e39a5324 100644 --- a/server/src/test/scala/io/delta/standalone/internal/PartitionFilterUtilsSuite.scala +++ b/server/src/test/scala/io/delta/standalone/internal/PartitionFilterUtilsSuite.scala @@ -17,15 +17,20 @@ package io.delta.standalone.internal import io.delta.standalone.internal.actions.AddFile -import org.apache.spark.sql.types.StructType -import org.scalatest.FunSuite +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.scalatest.funsuite.AnyFunSuite -class PartitionFilterUtilsSuite extends FunSuite { +class PartitionFilterUtilsSuite extends AnyFunSuite { import PartitionFilterUtils._ test("evaluatePredicate") { - val schema = StructType.fromDDL("c1 INT, c2 INT").json + val a = StructType(Array( + StructField("c1", IntegerType, true), + StructField("c2", IntegerType, true) + )) + val schema = a.json +// val schema = StructType.fromDDL("c1 INT, c2 INT").json val add1 = AddFile("foo1", Map("c2" -> "0"), 1, 1, true) val add2 = AddFile("foo2", Map("c2" -> "1"), 1, 1, true) val addFiles = add1 :: add2 :: Nil From 9fbf773c9bee32170a7611af450d5fbb9b554aba Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Sun, 23 Apr 2023 18:40:16 -0700 Subject: [PATCH 2/6] Revert "upgrade to scala 2.13" This reverts commit 606c811a6d9eeca2b65c2b8648d16bfeacb18ca6. --- build.sbt | 23 ++++++++++--------- .../sharing/server/SharedTableManager.scala | 12 +++++----- .../internal/DeltaSharingCDCReader.scala | 6 ++--- .../internal/PartitionFilterUtils.scala | 4 ++-- .../sharing/server/CloudFileSignerSuite.scala | 4 ++-- .../server/DeltaSharingServiceSuite.scala | 5 ++-- .../server/SharedTableManagerSuite.scala | 4 ++-- .../server/config/ServerConfigSuite.scala | 4 ++-- .../JsonPredicateFilterUtilsSuite.scala | 4 ++-- .../internal/JsonPredicateSuite.scala | 4 ++-- .../internal/PartitionFilterUtilsSuite.scala | 13 ++++------- 11 files changed, 39 insertions(+), 44 deletions(-) diff --git a/build.sbt b/build.sbt index 7d4667b39..68c395908 100644 --- a/build.sbt +++ b/build.sbt @@ -18,11 +18,11 @@ import sbt.ExclusionRule ThisBuild / parallelExecution := false -val sparkVersion = "3.3.2" +val sparkVersion = "3.1.1" lazy val commonSettings = Seq( organization := "io.delta", - scalaVersion := "2.13.5", + scalaVersion := "2.12.10", fork := true, javacOptions ++= Seq("-source", "1.8", "-target", "1.8"), scalacOptions += "-target:jvm-1.8", @@ -76,17 +76,18 @@ lazy val server = (project in file("server")) enablePlugins(JavaAppPackaging) se libraryDependencies ++= Seq( // Pin versions for jackson libraries as the new version of `jackson-module-scala` introduces a // breaking change making us not able to use `delta-standalone`. - "com.fasterxml.jackson.core" % "jackson-core" % "2.13.5", - "com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5", - "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.13.5", - "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % "2.13.5", - "org.json4s" %% "json4s-jackson" % "3.6.6" excludeAll( + "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7", + "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7.3", + "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.6.7.1", + "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % "2.6.7", + "org.json4s" %% "json4s-jackson" % "3.5.3" excludeAll( ExclusionRule("com.fasterxml.jackson.core"), ExclusionRule("com.fasterxml.jackson.module") ), - "com.linecorp.armeria" %% "armeria-scalapb" % "1.15.0" excludeAll( + "com.linecorp.armeria" %% "armeria-scalapb" % "1.6.0" excludeAll( ExclusionRule("com.fasterxml.jackson.core"), - ExclusionRule("com.fasterxml.jackson.module") + ExclusionRule("com.fasterxml.jackson.module"), + ExclusionRule("org.json4s") ), "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf" excludeAll( ExclusionRule("com.fasterxml.jackson.core"), @@ -133,7 +134,7 @@ lazy val server = (project in file("server")) enablePlugins(JavaAppPackaging) se ExclusionRule("com.fasterxml.jackson.module"), ExclusionRule("com.google.guava", "guava") ), - "org.apache.spark" %% "spark-sql" % "3.3.2" excludeAll( + "org.apache.spark" %% "spark-sql" % "2.4.7" excludeAll( ExclusionRule("org.slf4j"), ExclusionRule("io.netty"), ExclusionRule("com.fasterxml.jackson.core"), @@ -145,7 +146,7 @@ lazy val server = (project in file("server")) enablePlugins(JavaAppPackaging) se "org.slf4j" % "slf4j-simple" % "1.6.1", "net.sourceforge.argparse4j" % "argparse4j" % "0.9.0", - "org.scalatest" %% "scalatest" % "3.2.15" % "test" + "org.scalatest" %% "scalatest" % "3.0.5" % "test" ), Compile / PB.targets := Seq( scalapb.gen() -> (Compile / sourceManaged).value / "scalapb" diff --git a/server/src/main/scala/io/delta/sharing/server/SharedTableManager.scala b/server/src/main/scala/io/delta/sharing/server/SharedTableManager.scala index db884f8b7..285204e1b 100644 --- a/server/src/main/scala/io/delta/sharing/server/SharedTableManager.scala +++ b/server/src/main/scala/io/delta/sharing/server/SharedTableManager.scala @@ -106,9 +106,9 @@ class SharedTableManager(serverConfig: ServerConfig) { nextPageToken: Option[String] = None, maxResults: Option[Int] = None): (Seq[Share], Option[String]) = { getPage(nextPageToken, None, None, maxResults, shares.size) { (start, end) => - shares.asScala.toSeq.map { share => + shares.asScala.map { share => Share().withName(share.getName) - }.slice(start, end).seq + }.slice(start, end) } } @@ -124,7 +124,7 @@ class SharedTableManager(serverConfig: ServerConfig) { val shareConfig = getShareInternal(share) getPage(nextPageToken, Some(share), None, maxResults, shareConfig.getSchemas.size) { (start, end) => - shareConfig.getSchemas.asScala.toSeq.map { schemaConfig => + shareConfig.getSchemas.asScala.map { schemaConfig => Schema().withName(schemaConfig.getName).withShare(share) }.slice(start, end) } @@ -138,7 +138,7 @@ class SharedTableManager(serverConfig: ServerConfig) { val schemaConfig = getSchema(getShareInternal(share), schema) getPage(nextPageToken, Some(share), Some(schema), maxResults, schemaConfig.getTables.size) { (start, end) => - schemaConfig.getTables.asScala.toSeq.map { + schemaConfig.getTables.asScala.map { tableConfig => Table( name = Some(tableConfig.getName), @@ -158,8 +158,8 @@ class SharedTableManager(serverConfig: ServerConfig) { val totalSize = shareConfig.schemas.asScala.map(_.tables.size).sum getPage(nextPageToken, Some(share), None, maxResults, totalSize) { (start, end) => - shareConfig.schemas.asScala.toSeq.flatMap { schema => - schema.tables.asScala.toSeq.map { + shareConfig.schemas.asScala.flatMap { schema => + schema.tables.asScala.map { table => Table( name = Some(table.getName), diff --git a/server/src/main/scala/io/delta/standalone/internal/DeltaSharingCDCReader.scala b/server/src/main/scala/io/delta/standalone/internal/DeltaSharingCDCReader.scala index dd403d1d2..b1142b820 100644 --- a/server/src/main/scala/io/delta/standalone/internal/DeltaSharingCDCReader.scala +++ b/server/src/main/scala/io/delta/standalone/internal/DeltaSharingCDCReader.scala @@ -262,7 +262,7 @@ class DeltaSharingCDCReader(val deltaLog: DeltaLogImpl, val conf: Configuration) // generated, either of them will result in the correct cdc data. // If there are CDC actions, we read them exclusively, and ignore the add/remove actions. if (cdcActions.nonEmpty) { - changeFiles.append(CDCDataSpec(v, ts, cdcActions.toSeq)) + changeFiles.append(CDCDataSpec(v, ts, cdcActions)) } else { // MERGE will sometimes rewrite files in a way which *could* have changed data // (so dataChange = true) but did not actually do so (so no CDC will be produced). @@ -294,8 +294,8 @@ class DeltaSharingCDCReader(val deltaLog: DeltaLogImpl, val conf: Configuration) val removeActions = actions.collect { case r: RemoveFile if r.dataChange => r } - addFiles.append(CDCDataSpec(v, ts, addActions.toSeq)) - removeFiles.append(CDCDataSpec(v, ts, removeActions.toSeq)) + addFiles.append(CDCDataSpec(v, ts, addActions)) + removeFiles.append(CDCDataSpec(v, ts, removeActions)) } } } diff --git a/server/src/main/scala/io/delta/standalone/internal/PartitionFilterUtils.scala b/server/src/main/scala/io/delta/standalone/internal/PartitionFilterUtils.scala index f1ed28bec..a2b34040b 100644 --- a/server/src/main/scala/io/delta/standalone/internal/PartitionFilterUtils.scala +++ b/server/src/main/scala/io/delta/standalone/internal/PartitionFilterUtils.scala @@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory object PartitionFilterUtils { private val logger = LoggerFactory.getLogger(this.getClass) - private lazy val sqlParser = new SparkSqlParser() + private lazy val sqlParser = new SparkSqlParser(new SQLConf) def evaluatePredicate( schemaString: String, @@ -56,7 +56,7 @@ object PartitionFilterUtils { if (exprs.isEmpty) { addFiles } else { - val predicate = Predicate.create(exprs.reduce(And), attrs) + val predicate = InterpretedPredicate.create(exprs.reduce(And), attrs) predicate.initialize(0) addFiles.filter { addFile => val converter = CatalystTypeConverters.createToCatalystConverter(addSchema) diff --git a/server/src/test/scala/io/delta/sharing/server/CloudFileSignerSuite.scala b/server/src/test/scala/io/delta/sharing/server/CloudFileSignerSuite.scala index d6c6042de..e632699cb 100644 --- a/server/src/test/scala/io/delta/sharing/server/CloudFileSignerSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/CloudFileSignerSuite.scala @@ -17,9 +17,9 @@ package io.delta.sharing.server import org.apache.hadoop.fs.Path -import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.FunSuite -class CloudFileSignerSuite extends AnyFunSuite { +class CloudFileSignerSuite extends FunSuite { test("GCSFileSigner.getBucketAndObjectNames") { assert(GCSFileSigner.getBucketAndObjectNames(new Path("gs://delta-sharing-test/foo")) diff --git a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala index b1b073404..f2c0364ba 100644 --- a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala @@ -28,8 +28,7 @@ import scala.collection.mutable.ArrayBuffer import com.linecorp.armeria.server.Server import io.delta.standalone.internal.DeltaCDFErrors import org.apache.commons.io.IOUtils -import org.scalatest.BeforeAndAfterAll -import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.{BeforeAndAfterAll, FunSuite} import scalapb.json4s.JsonFormat import io.delta.sharing.server.config.ServerConfig @@ -38,7 +37,7 @@ import io.delta.sharing.server.protocol._ import io.delta.sharing.server.util.JsonUtils // scalastyle:off maxLineLength -class DeltaSharingServiceSuite extends AnyFunSuite with BeforeAndAfterAll { +class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { def shouldRunIntegrationTest: Boolean = { sys.env.get("AWS_ACCESS_KEY_ID").exists(_.length > 0) && diff --git a/server/src/test/scala/io/delta/sharing/server/SharedTableManagerSuite.scala b/server/src/test/scala/io/delta/sharing/server/SharedTableManagerSuite.scala index 1b07f705f..e04d142f2 100644 --- a/server/src/test/scala/io/delta/sharing/server/SharedTableManagerSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/SharedTableManagerSuite.scala @@ -21,12 +21,12 @@ import java.util.{Arrays, Collections} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.FunSuite import io.delta.sharing.server.config.{SchemaConfig, ServerConfig, ShareConfig, TableConfig} import io.delta.sharing.server.protocol.{Schema, Share, Table} -class SharedTableManagerSuite extends AnyFunSuite { +class SharedTableManagerSuite extends FunSuite { test("list shares") { val serverConfig = new ServerConfig() diff --git a/server/src/test/scala/io/delta/sharing/server/config/ServerConfigSuite.scala b/server/src/test/scala/io/delta/sharing/server/config/ServerConfigSuite.scala index 737862c30..04e716b6c 100644 --- a/server/src/test/scala/io/delta/sharing/server/config/ServerConfigSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/config/ServerConfigSuite.scala @@ -22,9 +22,9 @@ import java.nio.file.Files import java.util.Arrays import org.apache.commons.io.FileUtils -import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.FunSuite -class ServerConfigSuite extends AnyFunSuite { +class ServerConfigSuite extends FunSuite { def testConfig(content: String, serverConfig: ServerConfig): Unit = { val tempFile = Files.createTempFile("delta-sharing-server", ".yaml").toFile diff --git a/server/src/test/scala/io/delta/standalone/internal/JsonPredicateFilterUtilsSuite.scala b/server/src/test/scala/io/delta/standalone/internal/JsonPredicateFilterUtilsSuite.scala index 6f998889a..94c1c4291 100644 --- a/server/src/test/scala/io/delta/standalone/internal/JsonPredicateFilterUtilsSuite.scala +++ b/server/src/test/scala/io/delta/standalone/internal/JsonPredicateFilterUtilsSuite.scala @@ -17,11 +17,11 @@ package io.delta.standalone.internal import io.delta.standalone.internal.actions.AddFile -import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.FunSuite import io.delta.sharing.server.util.JsonUtils -class JsonPredicateFilterUtilsSuite extends AnyFunSuite { +class JsonPredicateFilterUtilsSuite extends FunSuite { import JsonPredicateFilterUtils._ diff --git a/server/src/test/scala/io/delta/standalone/internal/JsonPredicateSuite.scala b/server/src/test/scala/io/delta/standalone/internal/JsonPredicateSuite.scala index a1dd18725..0251306f7 100644 --- a/server/src/test/scala/io/delta/standalone/internal/JsonPredicateSuite.scala +++ b/server/src/test/scala/io/delta/standalone/internal/JsonPredicateSuite.scala @@ -18,11 +18,11 @@ package io.delta.standalone.internal -import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.FunSuite import io.delta.sharing.server.util.JsonUtils -class JsonPredicateSuite extends AnyFunSuite { +class JsonPredicateSuite extends FunSuite { /** * A wrapper around op evaluation. diff --git a/server/src/test/scala/io/delta/standalone/internal/PartitionFilterUtilsSuite.scala b/server/src/test/scala/io/delta/standalone/internal/PartitionFilterUtilsSuite.scala index 3e39a5324..50d50bf8b 100644 --- a/server/src/test/scala/io/delta/standalone/internal/PartitionFilterUtilsSuite.scala +++ b/server/src/test/scala/io/delta/standalone/internal/PartitionFilterUtilsSuite.scala @@ -17,20 +17,15 @@ package io.delta.standalone.internal import io.delta.standalone.internal.actions.AddFile -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} -import org.scalatest.funsuite.AnyFunSuite +import org.apache.spark.sql.types.StructType +import org.scalatest.FunSuite -class PartitionFilterUtilsSuite extends AnyFunSuite { +class PartitionFilterUtilsSuite extends FunSuite { import PartitionFilterUtils._ test("evaluatePredicate") { - val a = StructType(Array( - StructField("c1", IntegerType, true), - StructField("c2", IntegerType, true) - )) - val schema = a.json -// val schema = StructType.fromDDL("c1 INT, c2 INT").json + val schema = StructType.fromDDL("c1 INT, c2 INT").json val add1 = AddFile("foo1", Map("c2" -> "0"), 1, 1, true) val add2 = AddFile("foo2", Map("c2" -> "1"), 1, 1, true) val addFiles = add1 :: add2 :: Nil From 788aeea0fb5314aacf06d3c6d12b52ca9c59422d Mon Sep 17 00:00:00 2001 From: Abhijit Chakankar <98838463+chakankardb@users.noreply.github.com> Date: Thu, 27 Apr 2023 18:47:54 -0700 Subject: [PATCH 3/6] Optimize delta sharing spark client handling of presigned url response. (#294) * Optimize delta sharing spark client handling of presigned url response. * fix test. --- .../sharing/spark/DeltaSharingClient.scala | 60 ++++++++++++++----- 1 file changed, 45 insertions(+), 15 deletions(-) diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala index 36fa7842f..a470b87af 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala @@ -16,6 +16,7 @@ package io.delta.sharing.spark +import java.io.{BufferedReader, InputStream, InputStreamReader} import java.net.{URL, URLEncoder} import java.nio.charset.StandardCharsets.UTF_8 import java.sql.Timestamp @@ -23,9 +24,10 @@ import java.time.LocalDateTime import java.time.format.DateTimeFormatter.ISO_DATE_TIME import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import org.apache.commons.io.IOUtils +import org.apache.commons.io.input.BoundedInputStream import org.apache.hadoop.util.VersionInfo import org.apache.http.{HttpHeaders, HttpHost, HttpStatus} import org.apache.http.client.config.RequestConfig @@ -188,7 +190,7 @@ private[spark] class DeltaSharingRestClient( val target = getTargetUrl(s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/" + s"$encodedTableName/version$encodedParam") - val (version, _) = getResponse(new HttpGet(target), true) + val (version, _) = getResponse(new HttpGet(target), true, true) version.getOrElse { throw new IllegalStateException("Cannot find Delta-Table-Version in the header") } @@ -324,14 +326,14 @@ private[spark] class DeltaSharingRestClient( } private def getNDJson(target: String, requireVersion: Boolean = true): (Long, Seq[String]) = { - val (version, response) = getResponse(new HttpGet(target)) + val (version, lines) = getResponse(new HttpGet(target)) version.getOrElse { if (requireVersion) { throw new IllegalStateException("Cannot find Delta-Table-Version in the header") } else { 0L } - } -> response.split("[\n\r]+") + } -> lines } private def getNDJson[T: Manifest](target: String, data: T): (Long, Seq[String]) = { @@ -339,15 +341,20 @@ private[spark] class DeltaSharingRestClient( val json = JsonUtils.toJson(data) httpPost.setHeader("Content-type", "application/json") httpPost.setEntity(new StringEntity(json, UTF_8)) - val (version, response) = getResponse(httpPost) + val (version, lines) = getResponse(httpPost) version.getOrElse { throw new IllegalStateException("Cannot find Delta-Table-Version in the header") - } -> response.split("[\n\r]+") + } -> lines } private def getJson[R: Manifest](target: String): R = { - val (_, response) = getResponse(new HttpGet(target)) - JsonUtils.fromJson[R](response) + val (_, response) = getResponse(new HttpGet(target), false, true) + if (response.size != 1) { + throw new IllegalStateException( + "Unexpected response for target: " + target + ", response=" + response + ) + } + JsonUtils.fromJson[R](response(0)) } private def getHttpHost(endpoint: String): HttpHost = { @@ -393,11 +400,17 @@ private[spark] class DeltaSharingRestClient( /** * Send the http request and return the table version in the header if any, and the response * content. + * + * The response can be: + * - empty if allowNoContent is true. + * - single string, if fetchAsOneString is true. + * - multi-line response (typically, one per action). This is the default. */ private def getResponse( httpRequest: HttpRequestBase, - allowNoContent: Boolean = false - ): (Option[Long], String) = + allowNoContent: Boolean = false, + fetchAsOneString: Boolean = false + ): (Option[Long], Seq[String]) = { RetryUtils.runWithExponentialBackoff(numRetries) { val profile = profileProvider.getProfile val response = client.execute( @@ -408,12 +421,26 @@ private[spark] class DeltaSharingRestClient( try { val status = response.getStatusLine() val entity = response.getEntity() - val body = if (entity == null) { - "" + val lines = if (entity == null) { + List("") } else { val input = entity.getContent() try { - IOUtils.toString(input, UTF_8) + if (fetchAsOneString) { + Seq(IOUtils.toString(input, UTF_8)) + } else { + val reader = new BufferedReader( + new InputStreamReader(new BoundedInputStream(input), UTF_8) + ) + var line: Option[String] = None + val lineBuffer = ListBuffer[String]() + while ({ + line = Option(reader.readLine()); line.isDefined + }) { + lineBuffer += line.get + } + lineBuffer.toList + } } finally { input.close() } @@ -427,15 +454,18 @@ private[spark] class DeltaSharingRestClient( additionalErrorInfo = s"It may be caused by an expired token as it has expired " + s"at ${profile.expirationTime}" } + // Only show the last 100 lines in the error to keep it contained. + val responseToShow = lines.drop(lines.size - 100).mkString("\n") throw new UnexpectedHttpStatus( - s"HTTP request failed with status: $status $body. $additionalErrorInfo", + s"HTTP request failed with status: $status $responseToShow. $additionalErrorInfo", statusCode) } - Option(response.getFirstHeader("Delta-Table-Version")).map(_.getValue.toLong) -> body + Option(response.getFirstHeader("Delta-Table-Version")).map(_.getValue.toLong) -> lines } finally { response.close() } } + } // Add SparkStructuredStreaming in the USER_AGENT header, in order for the delta sharing server // to recognize the request for streaming, and take corresponding actions. From 5d55e301e08c2159672e56953b2697a95bb10b82 Mon Sep 17 00:00:00 2001 From: Abhijit Chakankar <98838463+chakankardb@users.noreply.github.com> Date: Wed, 10 May 2023 22:18:37 -0700 Subject: [PATCH 4/6] Enable json predicates. (#303) --- .../scala/io/delta/sharing/spark/RemoteDeltaFileIndex.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaFileIndex.scala b/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaFileIndex.scala index e4c26047f..10464459a 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaFileIndex.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaFileIndex.scala @@ -111,7 +111,7 @@ private[sharing] abstract class RemoteDeltaFileIndexBase( // not perform json predicate based filtering. protected def convertToJsonPredicate(partitionFilters: Seq[Expression]) : Option[String] = { if (!params.spark.sessionState.conf.getConfString( - "spark.delta.sharing.jsonPredicateHints.enabled", "false").toBoolean) { + "spark.delta.sharing.jsonPredicateHints.enabled", "true").toBoolean) { return None } try { From 7f0922aecdc47081a135405255416f0639c35cf7 Mon Sep 17 00:00:00 2001 From: Lin Zhou <87341375+linzhou-db@users.noreply.github.com> Date: Thu, 11 May 2023 19:21:45 -0700 Subject: [PATCH 5/6] Correct refreshing logic of pre-signed urls for delta sharing streaming (#301) * Refresh * fix tests * refresh * add comment * fix lint * fix lint * add param doc * add flag --- .../sharing/spark/DeltaSharingSource.scala | 58 ++++++++++++++++++- .../spark/RemoteDeltaCDFRelation.scala | 7 ++- .../delta/sharing/PreSignedUrlCache.scala | 26 +++++++-- .../sharing/CachedTableManagerSuite.scala | 19 +++++- 4 files changed, 101 insertions(+), 9 deletions(-) diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala index 8eccf2cad..8b3145887 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala @@ -132,6 +132,9 @@ case class DeltaSharingSource( private val tableId = initSnapshot.metadata.id + private val refreshPresignedUrls = spark.sessionState.conf.getConfString( + "spark.delta.sharing.source.refreshPresignedUrls.enabled", "false").toBoolean + // Records until which offset the delta sharing source has been processing the table files. private var previousOffset: DeltaSharingSourceOffset = null @@ -149,6 +152,11 @@ case class DeltaSharingSource( // a variable to be used by the CachedTableManager to refresh the presigned urls if the query // runs for a long time. private var latestRefreshFunc = () => { Map.empty[String, String] } + // The latest timestamp in millisecond, records the time of the last rpc sent to the server to + // fetch the pre-signed urls. + // This is used to track whether the pre-signed urls stored in sortedFetchedFiles are going to + // expire and need a refresh. + private var lastQueryTableTimestamp: Long = -1 // Check the latest table version from the delta sharing server through the client.getTableVersion // RPC. Adding a minimum interval of QUERY_TABLE_VERSION_INTERVAL_MILLIS between two consecutive @@ -231,6 +239,7 @@ case class DeltaSharingSource( fromIndex: Long, isStartingVersion: Boolean, currentLatestVersion: Long): Unit = { + lastQueryTableTimestamp = System.currentTimeMillis() if (isStartingVersion) { // If isStartingVersion is true, it means to fetch the snapshot at the fromVersion, which may // include table changes from previous versions. @@ -307,6 +316,7 @@ case class DeltaSharingSource( fromVersion: Long, fromIndex: Long, currentLatestVersion: Long): Unit = { + lastQueryTableTimestamp = System.currentTimeMillis() val tableFiles = deltaLog.client.getCDFFiles( deltaLog.table, Map(DeltaSharingOptions.CDF_START_VERSION -> fromVersion.toString), true) latestRefreshFunc = () => { @@ -459,6 +469,51 @@ case class DeltaSharingSource( endOffset: DeltaSharingSourceOffset): DataFrame = { maybeGetFileChanges(startVersion, startIndex, isStartingVersion) + if (refreshPresignedUrls && + (CachedTableManager.INSTANCE.preSignedUrlExpirationMs + lastQueryTableTimestamp - + System.currentTimeMillis() < CachedTableManager.INSTANCE.refreshThresholdMs)) { + // force a refresh if needed. + lastQueryTableTimestamp = System.currentTimeMillis() + val newIdToUrl = latestRefreshFunc() + sortedFetchedFiles = sortedFetchedFiles.map { indexedFile => + IndexedFile( + version = indexedFile.version, + index = indexedFile.index, + add = if (indexedFile.add == null) { + null + } else { + val newUrl = newIdToUrl.getOrElse( + indexedFile.add.id, + throw new IllegalStateException(s"cannot find url for id ${indexedFile.add.id} " + + s"when refreshing table ${deltaLog.path}") + ) + indexedFile.add.copy(url = newUrl) + }, + remove = if (indexedFile.remove == null) { + null + } else { + val newUrl = newIdToUrl.getOrElse( + indexedFile.remove.id, + throw new IllegalStateException(s"cannot find url for id ${indexedFile.remove.id} " + + s"when refreshing table ${deltaLog.path}") + ) + indexedFile.remove.copy(url = newUrl) + }, + cdc = if (indexedFile.cdc == null) { + null + } else { + val newUrl = newIdToUrl.getOrElse( + indexedFile.cdc.id, + throw new IllegalStateException(s"cannot find url for id ${indexedFile.cdc.id} " + + s"when refreshing table ${deltaLog.path}") + ) + indexedFile.cdc.copy(url = newUrl) + }, + isLast = indexedFile.isLast + ) + } + } + val fileActions = sortedFetchedFiles.takeWhile { case IndexedFile(version, index, _, _, _, _) => version < endOffset.tableVersion || @@ -545,7 +600,8 @@ case class DeltaSharingSource( removeFiles, schema, isStreaming = true, - latestRefreshFunc + latestRefreshFunc, + lastQueryTableTimestamp ) } diff --git a/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaCDFRelation.scala b/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaCDFRelation.scala index 9fb3fa6d4..994cbeb16 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaCDFRelation.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaCDFRelation.scala @@ -71,7 +71,9 @@ object DeltaSharingCDFReader { removeFiles: Seq[RemoveFile], schema: StructType, isStreaming: Boolean, - refresher: () => Map[String, String]): DataFrame = { + refresher: () => Map[String, String], + lastQueryTableTimestamp: Long = System.currentTimeMillis() + ): DataFrame = { val dfs = ListBuffer[DataFrame]() val refs = ListBuffer[WeakReference[AnyRef]]() @@ -92,7 +94,8 @@ object DeltaSharingCDFReader { getIdToUrl(addFiles, cdfFiles, removeFiles), refs, params.profileProvider, - refresher + refresher, + lastQueryTableTimestamp ) dfs.reduce((df1, df2) => df1.unionAll(df2)) diff --git a/spark/src/main/scala/org/apache/spark/delta/sharing/PreSignedUrlCache.scala b/spark/src/main/scala/org/apache/spark/delta/sharing/PreSignedUrlCache.scala index f5f003e79..2e52128cb 100644 --- a/spark/src/main/scala/org/apache/spark/delta/sharing/PreSignedUrlCache.scala +++ b/spark/src/main/scala/org/apache/spark/delta/sharing/PreSignedUrlCache.scala @@ -45,9 +45,9 @@ class CachedTable( val refresher: () => Map[String, String]) class CachedTableManager( - preSignedUrlExpirationMs: Long, + val preSignedUrlExpirationMs: Long, refreshCheckIntervalMs: Long, - refreshThresholdMs: Long, + val refreshThresholdMs: Long, expireAfterAccessMs: Long) extends Logging { private val cache = new java.util.concurrent.ConcurrentHashMap[String, CachedTable]() @@ -134,19 +134,35 @@ class CachedTableManager( * signed url cache of this table form the cache. * @param profileProvider a profile Provider that can provide customized refresher function. * @param refresher A function to re-generate pre signed urls for the table. + * @param lastQueryTableTimestamp A timestamp to indicate the last time the idToUrl mapping is + * generated, to refresh the urls in time based on it. */ def register( tablePath: String, idToUrl: Map[String, String], refs: Seq[WeakReference[AnyRef]], profileProvider: DeltaSharingProfileProvider, - refresher: () => Map[String, String]): Unit = { + refresher: () => Map[String, String], + lastQueryTableTimestamp: Long = System.currentTimeMillis()): Unit = { val customTablePath = profileProvider.getCustomTablePath(tablePath) val customRefresher = profileProvider.getCustomRefresher(refresher) val cachedTable = new CachedTable( - preSignedUrlExpirationMs + System.currentTimeMillis(), - idToUrl, + if (preSignedUrlExpirationMs + lastQueryTableTimestamp - System.currentTimeMillis() < + refreshThresholdMs) { + // If there is a refresh, start counting from now. + preSignedUrlExpirationMs + System.currentTimeMillis() + } else { + // Otherwise, start counting from lastQueryTableTimestamp. + preSignedUrlExpirationMs + lastQueryTableTimestamp + }, + idToUrl = if (preSignedUrlExpirationMs + lastQueryTableTimestamp - System.currentTimeMillis() + < refreshThresholdMs) { + // force a refresh upon register + customRefresher() + } else { + idToUrl + }, refs, System.currentTimeMillis(), customRefresher diff --git a/spark/src/test/scala/org/apache/spark/delta/sharing/CachedTableManagerSuite.scala b/spark/src/test/scala/org/apache/spark/delta/sharing/CachedTableManagerSuite.scala index 1522e0c56..51c9be90c 100644 --- a/spark/src/test/scala/org/apache/spark/delta/sharing/CachedTableManagerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/delta/sharing/CachedTableManagerSuite.scala @@ -81,6 +81,22 @@ class CachedTableManagerSuite extends SparkFunSuite { intercept[IllegalStateException](manager.getPreSignedUrl( provider.getCustomTablePath("test-table-path3"), "id1")) } + + manager.register( + "test-table-path4", + Map("id1" -> "url1", "id2" -> "url2"), + Seq(new WeakReference(ref)), + provider, + () => { + Map("id1" -> "url3", "id2" -> "url4") + }, + -1 + ) + // We should get new urls immediately because it's refreshed upon register + assert(manager.getPreSignedUrl(provider.getCustomTablePath("test-table-path4"), + "id1")._1 == "url3") + assert(manager.getPreSignedUrl(provider.getCustomTablePath("test-table-path4"), + "id2")._1 == "url4") } finally { manager.stop() } @@ -108,7 +124,8 @@ class CachedTableManagerSuite extends SparkFunSuite { Thread.sleep(1000) // We should remove the cached table when it's not accessed intercept[IllegalStateException](manager.getPreSignedUrl( - provider.getCustomTablePath("test-table-path"), "id1")) + provider.getCustomTablePath("test-table-path"), "id1") + ) } finally { manager.stop() } From 24771a3f5b12c10333a5245f1bdec8874e77d4dc Mon Sep 17 00:00:00 2001 From: Lin Zhou <87341375+linzhou-db@users.noreply.github.com> Date: Thu, 11 May 2023 23:25:25 -0700 Subject: [PATCH 6/6] Set spark.delta.sharing.source.refreshPresignedUrls.enabled default to true (#306) --- .../main/scala/io/delta/sharing/spark/DeltaSharingSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala index 8b3145887..ed55b5218 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala @@ -133,7 +133,7 @@ case class DeltaSharingSource( private val tableId = initSnapshot.metadata.id private val refreshPresignedUrls = spark.sessionState.conf.getConfString( - "spark.delta.sharing.source.refreshPresignedUrls.enabled", "false").toBoolean + "spark.delta.sharing.source.refreshPresignedUrls.enabled", "true").toBoolean // Records until which offset the delta sharing source has been processing the table files. private var previousOffset: DeltaSharingSourceOffset = null