Skip to content

Commit

Permalink
[SPARK-19148][SQL] do not expose the external table concept in Catalog
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In apache#16296 , we reached a consensus that we should hide the external/managed table concept to users and only expose custom table path.

This PR renames `Catalog.createExternalTable` to `createTable`(still keep the old versions for backward compatibility), and only set the table type to EXTERNAL if `path` is specified in options.

## How was this patch tested?

new tests in `CatalogSuite`

Author: Wenchen Fan <[email protected]>

Closes apache#16528 from cloud-fan/create-table.
  • Loading branch information
cloud-fan authored and uzadude committed Jan 27, 2017
1 parent 417c762 commit 8d14f39
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 103 deletions.
5 changes: 4 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ object MimaExcludes {
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions"),

// [SPARK-18537] Add a REST api to spark streaming
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted"),

// [SPARK-19148][SQL] do not expose the external table concept in Catalog
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.createTable")
)

// Exclude rules for 2.1.x
Expand Down
27 changes: 24 additions & 3 deletions python/pyspark/sql/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
#

import warnings
from collections import namedtuple

from pyspark import since
Expand Down Expand Up @@ -138,7 +139,27 @@ def listColumns(self, tableName, dbName=None):

@since(2.0)
def createExternalTable(self, tableName, path=None, source=None, schema=None, **options):
"""Creates an external table based on the dataset in a data source.
"""Creates a table based on the dataset in a data source.
It returns the DataFrame associated with the external table.
The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created external table.
:return: :class:`DataFrame`
"""
warnings.warn(
"createExternalTable is deprecated since Spark 2.2, please use createTable instead.",
DeprecationWarning)
return self.createTable(tableName, path, source, schema, **options)

@since(2.2)
def createTable(self, tableName, path=None, source=None, schema=None, **options):
"""Creates a table based on the dataset in a data source.
It returns the DataFrame associated with the external table.
Expand All @@ -157,12 +178,12 @@ def createExternalTable(self, tableName, path=None, source=None, schema=None, **
source = self._sparkSession.conf.get(
"spark.sql.sources.default", "org.apache.spark.sql.parquet")
if schema is None:
df = self._jcatalog.createExternalTable(tableName, source, options)
df = self._jcatalog.createTable(tableName, source, options)
else:
if not isinstance(schema, StructType):
raise TypeError("schema should be StructType")
scala_datatype = self._jsparkSession.parseDataType(schema.json())
df = self._jcatalog.createExternalTable(tableName, source, scala_datatype, options)
df = self._jcatalog.createTable(tableName, source, scala_datatype, options)
return DataFrame(df, self._sparkSession._wrapped)

@since(2.0)
Expand Down
129 changes: 109 additions & 20 deletions sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalog

import scala.collection.JavaConverters._

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -187,82 +189,169 @@ abstract class Catalog {
def functionExists(dbName: String, functionName: String): Boolean

/**
* :: Experimental ::
* Creates an external table from the given path and returns the corresponding DataFrame.
* Creates a table from the given path and returns the corresponding DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
*
* @since 2.0.0
*/
@deprecated("use createTable instead.", "2.2.0")
def createExternalTable(tableName: String, path: String): DataFrame = {
createTable(tableName, path)
}

/**
* :: Experimental ::
* Creates a table from the given path and returns the corresponding DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
*
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
def createExternalTable(tableName: String, path: String): DataFrame
def createTable(tableName: String, path: String): DataFrame

/**
* :: Experimental ::
* Creates an external table from the given path based on a data source
* and returns the corresponding DataFrame.
* Creates a table from the given path based on a data source and returns the corresponding
* DataFrame.
*
* @since 2.0.0
*/
@deprecated("use createTable instead.", "2.2.0")
def createExternalTable(tableName: String, path: String, source: String): DataFrame = {
createTable(tableName, path, source)
}

/**
* :: Experimental ::
* Creates a table from the given path based on a data source and returns the corresponding
* DataFrame.
*
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
def createExternalTable(tableName: String, path: String, source: String): DataFrame
def createTable(tableName: String, path: String, source: String): DataFrame

/**
* :: Experimental ::
* Creates an external table from the given path based on a data source and a set of options.
* Creates a table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.0.0
*/
@deprecated("use createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
options: java.util.Map[String, String]): DataFrame = {
createTable(tableName, source, options)
}

/**
* :: Experimental ::
* Creates a table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
def createTable(
tableName: String,
source: String,
options: java.util.Map[String, String]): DataFrame = {
createTable(tableName, source, options.asScala.toMap)
}

/**
* (Scala-specific)
* Creates a table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.0.0
*/
@deprecated("use createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
options: java.util.Map[String, String]): DataFrame
options: Map[String, String]): DataFrame = {
createTable(tableName, source, options)
}

/**
* :: Experimental ::
* (Scala-specific)
* Creates an external table from the given path based on a data source and a set of options.
* Creates a table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.0.0
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
def createExternalTable(
def createTable(
tableName: String,
source: String,
options: Map[String, String]): DataFrame

/**
* :: Experimental ::
* Create an external table from the given path based on a data source, a schema and
* a set of options. Then, returns the corresponding DataFrame.
* Create a table from the given path based on a data source, a schema and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.0.0
*/
@deprecated("use createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
schema: StructType,
options: java.util.Map[String, String]): DataFrame = {
createTable(tableName, source, schema, options)
}

/**
* :: Experimental ::
* Create a table from the given path based on a data source, a schema and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
def createTable(
tableName: String,
source: String,
schema: StructType,
options: java.util.Map[String, String]): DataFrame = {
createTable(tableName, source, schema, options.asScala.toMap)
}

/**
* (Scala-specific)
* Create a table from the given path based on a data source, a schema and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.0.0
*/
@deprecated("use createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
schema: StructType,
options: java.util.Map[String, String]): DataFrame
options: Map[String, String]): DataFrame = {
createTable(tableName, source, schema, options)
}

/**
* :: Experimental ::
* (Scala-specific)
* Create an external table from the given path based on a data source, a schema and
* a set of options. Then, returns the corresponding DataFrame.
* Create a table from the given path based on a data source, a schema and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.0.0
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
def createExternalTable(
def createTable(
tableName: String,
source: String,
schema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
options = table.storage.properties ++ pathOption,
catalogTable = Some(tableWithDefaultOptions)).resolveRelation()

dataSource match {
case fs: HadoopFsRelation =>
if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) {
throw new AnalysisException(
"Cannot create a file-based external data source table without path")
}
case _ =>
}

val partitionColumnNames = if (table.schema.nonEmpty) {
table.partitionColumnNames
} else {
Expand Down
Loading

0 comments on commit 8d14f39

Please sign in to comment.