diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 964f0a7a7b4e7..8f017a6d96f92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -21,15 +21,21 @@ import java.util.NoSuchElementException import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Dataset, Row, SQLContext} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog.CatalogFunction import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Row +import org.apache.spark.sql.SQLContext /** @@ -434,3 +440,57 @@ case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { override val output: Seq[Attribute] = Seq.empty } + +/** + * A command for users to create a function. + * The syntax of using this command in SQL is + * {{{ + * CREATE TEMPORARY FUNCTION function_name AS class_name; + * CREATE FUNCTION [db_name.]function_name AS class_name + * [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ]; + * }}} + */ +case class CreateFunction( + databaseName: Option[String], + functionName: String, alias: String, + resources: Seq[(String, String)], + isTemp: Boolean)(sql: String) extends RunnableCommand { + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val db = if (databaseName.isDefined) { + databaseName + } else { + Some(catalog.getCurrentDatabase) + } + val functionIdentifier = FunctionIdentifier(functionName, db) + catalog.createFunction(CatalogFunction(functionIdentifier, alias)) + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} + +/** + * The DDL command that drops a function. + * ifExists: returns an error if the function doesn't exist, unless this is true. + * isTemp: indicates if it is a temporary function. + */ +case class DropFunction( + databaseName: Option[String], + functionName: String, + ifExists: Boolean, + isTemp: Boolean)(sql: String) extends RunnableCommand { + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val db = if (databaseName.isDefined) { + databaseName + } else { + Some(catalog.getCurrentDatabase) + } + val functionIdentifier = FunctionIdentifier(functionName, db) + catalog.dropFunction(functionIdentifier) + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 0e51abb44b91d..ac4c093ce7142 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -86,26 +86,6 @@ case class DescribeDatabase( extended: Boolean)(sql: String) extends NativeDDLCommand(sql) with Logging -case class CreateFunction( - databaseName: Option[String], - functionName: String, - alias: String, - resources: Seq[(String, String)], - isTemp: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging - -/** - * The DDL command that drops a function. - * ifExists: returns an error if the function doesn't exist, unless this is true. - * isTemp: indicates if it is a temporary function. - */ -case class DropFunction( - databaseName: Option[String], - functionName: String, - ifExists: Boolean, - isTemp: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging - case class AlterTableRename( oldName: TableIdentifier, newName: TableIdentifier)(sql: String) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index c958eac266d61..05ff18eca5943 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -21,10 +21,13 @@ import java.math.MathContext import java.sql.Timestamp import org.apache.spark.AccumulatorSuite +import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.catalog.CatalogFunction import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.logical.Aggregate import org.apache.spark.sql.execution.aggregate +import org.apache.spark.sql.execution.command.CreateFunction import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 7a6343748ba9e..1f636e8017fea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -142,13 +142,13 @@ class DDLCommandSuite extends PlanTest { test("create function") { val sql1 = """ - |CREATE TEMPORARY FUNCTION helloworld as + |CREATE TEMPORARY FUNCTION helloworld AS |'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1', |JAR '/path/to/jar2' """.stripMargin val sql2 = """ - |CREATE FUNCTION hello.world as + |CREATE FUNCTION hello.world AS |'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive', |FILE '/path/to/file' """.stripMargin diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala new file mode 100644 index 0000000000000..f4031e3a8f3f6 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogFunction +import org.apache.spark.sql.test.SharedSQLContext + +class DDLSuite extends QueryTest with SharedSQLContext { + + test("SPARK-14123: Create / drop function DDL") { + // test the create functions + val sql1 = + """ + |CREATE TEMPORARY FUNCTION helloworld1 AS + |'spark.example.SimpleUDFExample1' USING JAR '/path/to/jar1', + |JAR '/path/to/jar2' + """.stripMargin + val sql2 = + """ + |CREATE FUNCTION helloworld2 AS + |'spark.example.SimpleUDFExample2' USING ARCHIVE '/path/to/archive', + |FILE '/path/to/file' + """.stripMargin + sql(sql1) + sql(sql2) + + val catalog = sqlContext.sessionState.catalog + val id1 = FunctionIdentifier("helloworld1", Some(catalog.getCurrentDatabase)) + val id2 = FunctionIdentifier("helloworld2", Some(catalog.getCurrentDatabase)) + + val f1 = catalog.getFunction(id1) + val f2 = catalog.getFunction(id2) + assert(f1 == CatalogFunction(id1, "spark.example.SimpleUDFExample1")) + assert(f2 == CatalogFunction(id2, "spark.example.SimpleUDFExample2")) + + // test the drop functions + assert(catalog.listFunctions(catalog.getCurrentDatabase, "helloworld*").size == 2) + + val sql3 = "DROP TEMPORARY FUNCTION helloworld1" + val sql4 = "DROP FUNCTION IF EXISTS helloworld2" + sql(sql3) + sql(sql4) + + assert(catalog.listFunctions(catalog.getCurrentDatabase, "helloworld*").isEmpty) + } +}