From 033b1acf973b12e764f45118a05cd7bd177aab79 Mon Sep 17 00:00:00 2001 From: bomeng Date: Sat, 26 Mar 2016 15:33:30 -0700 Subject: [PATCH 1/5] ddl: Function related commands --- .../sql/execution/command/commands.scala | 27 ++++++++++++++++++- .../spark/sql/execution/command/ddl.scala | 7 ----- .../execution/command/DDLCommandSuite.scala | 4 +-- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 964f0a7a7b4e7..e3bc3cb42bdb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -21,8 +21,9 @@ import java.util.NoSuchElementException import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.catalog.CatalogFunction import org.apache.spark.sql.{Dataset, Row, SQLContext} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical @@ -434,3 +435,27 @@ case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { override val output: Seq[Attribute] = Seq.empty } + +/** + * A command for users to create a function. + * The syntax of using this command in SQL is + * {{{ + * CREATE TEMPORARY FUNCTION function_name AS class_name; + * CREATE FUNCTION [db_name.]function_name AS class_name + * [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ]; + * }}} + */ +case class CreateFunction( + functionName: String, + alias: String, + resources: Seq[(String, String)], + isTemp: Boolean)(sql: String) extends RunnableCommand { + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val functionIdentifier = FunctionIdentifier(functionName, Some(catalog.getCurrentDatabase)) + catalog.createFunction(CatalogFunction(functionIdentifier, alias)) + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 07c89afafb6b6..374cc04a84852 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -52,13 +52,6 @@ case class CreateDatabase( props: Map[String, String])(sql: String) extends NativeDDLCommand(sql) with Logging -case class CreateFunction( - functionName: String, - alias: String, - resources: Seq[(String, String)], - isTemp: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging - case class AlterTableRename( oldName: TableIdentifier, newName: TableIdentifier)(sql: String) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 6f1eea273fafa..bdc192a61b2d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -47,13 +47,13 @@ class DDLCommandSuite extends PlanTest { test("create function") { val sql1 = """ - |CREATE TEMPORARY FUNCTION helloworld as + |CREATE TEMPORARY FUNCTION helloworld AS |'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1', |JAR '/path/to/jar2' """.stripMargin val sql2 = """ - |CREATE FUNCTION hello.world as + |CREATE FUNCTION hello.world AS |'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive', |FILE '/path/to/file' """.stripMargin From bdf44b4e71f180ebf970ef40d5850d558d23a073 Mon Sep 17 00:00:00 2001 From: bomeng Date: Sat, 26 Mar 2016 23:09:43 -0700 Subject: [PATCH 2/5] add test cases and update code style --- .../sql/execution/command/commands.scala | 8 ++--- .../org/apache/spark/sql/SQLQuerySuite.scala | 29 +++++++++++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index e3bc3cb42bdb8..32b6f4c0164ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -446,10 +446,8 @@ case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { * }}} */ case class CreateFunction( - functionName: String, - alias: String, - resources: Seq[(String, String)], - isTemp: Boolean)(sql: String) extends RunnableCommand { + functionName: String, alias: String, resources: Seq[(String, String)], + isTemp: Boolean)(sql: String) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { val catalog = sqlContext.sessionState.catalog val functionIdentifier = FunctionIdentifier(functionName, Some(catalog.getCurrentDatabase)) @@ -458,4 +456,4 @@ case class CreateFunction( } override val output: Seq[Attribute] = Seq.empty -} \ No newline at end of file +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index c958eac266d61..9809a1a23fa4b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -21,10 +21,13 @@ import java.math.MathContext import java.sql.Timestamp import org.apache.spark.AccumulatorSuite +import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.catalog.CatalogFunction import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.logical.Aggregate import org.apache.spark.sql.execution.aggregate +import org.apache.spark.sql.execution.command.CreateFunction import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -2376,4 +2379,30 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Row("r3c1x", "r3c2", "t1r3c3", "r3c2", "t1r3c3") :: Nil) } } + + test("SPARK-14123") { + val sql1 = + """ + |CREATE TEMPORARY FUNCTION helloworld1 AS + |'spark.example.SimpleUDFExample1' USING JAR '/path/to/jar1', + |JAR '/path/to/jar2' + """.stripMargin + val sql2 = + """ + |CREATE FUNCTION helloworld2 AS + |'spark.example.SimpleUDFExample2' USING ARCHIVE '/path/to/archive', + |FILE '/path/to/file' + """.stripMargin + sql(sql1) + sql(sql2) + + val catalog = sqlContext.sessionState.catalog + val id1 = FunctionIdentifier("helloworld1", Some(catalog.getCurrentDatabase)) + val id2 = FunctionIdentifier("helloworld2", Some(catalog.getCurrentDatabase)) + + val f1 = catalog.getFunction(id1) + val f2 = catalog.getFunction(id2) + assert(f1 == CatalogFunction(id1, "spark.example.SimpleUDFExample1")) + assert(f2 == CatalogFunction(id2, "spark.example.SimpleUDFExample2")) + } } From 3d4761a81c0209d21e1e77656cdb7d6aa5dcda75 Mon Sep 17 00:00:00 2001 From: bomeng Date: Sun, 27 Mar 2016 08:37:47 -0700 Subject: [PATCH 3/5] add drop function support and create a new test case file --- .../sql/execution/command/commands.scala | 36 +++++++++++- .../spark/sql/execution/command/ddl.scala | 20 ------- .../org/apache/spark/sql/SQLQuerySuite.scala | 26 --------- .../sql/execution/command/DDLSuite.scala | 57 +++++++++++++++++++ 4 files changed, 91 insertions(+), 48 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 32b6f4c0164ff..74da4c1205c1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -446,14 +446,46 @@ case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { * }}} */ case class CreateFunction( - functionName: String, alias: String, resources: Seq[(String, String)], + databaseName: Option[String], + functionName: String, alias: String, + resources: Seq[(String, String)], isTemp: Boolean)(sql: String) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { val catalog = sqlContext.sessionState.catalog - val functionIdentifier = FunctionIdentifier(functionName, Some(catalog.getCurrentDatabase)) + val db = if (databaseName.isDefined) { + databaseName + } else { + Some(catalog.getCurrentDatabase) + } + val functionIdentifier = FunctionIdentifier(functionName, db) catalog.createFunction(CatalogFunction(functionIdentifier, alias)) Seq.empty[Row] } override val output: Seq[Attribute] = Seq.empty } + +/** + * The DDL command that drops a function. + * ifExists: returns an error if the function doesn't exist, unless this is true. + * isTemp: indicates if it is a temporary function. + */ +case class DropFunction( + databaseName: Option[String], + functionName: String, + ifExists: Boolean, + isTemp: Boolean)(sql: String) extends RunnableCommand { + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val db = if (databaseName.isDefined) { + databaseName + } else { + Some(catalog.getCurrentDatabase) + } + val functionIdentifier = FunctionIdentifier(functionName, db) + catalog.dropFunction(functionIdentifier) + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 0e51abb44b91d..ac4c093ce7142 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -86,26 +86,6 @@ case class DescribeDatabase( extended: Boolean)(sql: String) extends NativeDDLCommand(sql) with Logging -case class CreateFunction( - databaseName: Option[String], - functionName: String, - alias: String, - resources: Seq[(String, String)], - isTemp: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging - -/** - * The DDL command that drops a function. - * ifExists: returns an error if the function doesn't exist, unless this is true. - * isTemp: indicates if it is a temporary function. - */ -case class DropFunction( - databaseName: Option[String], - functionName: String, - ifExists: Boolean, - isTemp: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging - case class AlterTableRename( oldName: TableIdentifier, newName: TableIdentifier)(sql: String) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 9809a1a23fa4b..05ff18eca5943 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2379,30 +2379,4 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Row("r3c1x", "r3c2", "t1r3c3", "r3c2", "t1r3c3") :: Nil) } } - - test("SPARK-14123") { - val sql1 = - """ - |CREATE TEMPORARY FUNCTION helloworld1 AS - |'spark.example.SimpleUDFExample1' USING JAR '/path/to/jar1', - |JAR '/path/to/jar2' - """.stripMargin - val sql2 = - """ - |CREATE FUNCTION helloworld2 AS - |'spark.example.SimpleUDFExample2' USING ARCHIVE '/path/to/archive', - |FILE '/path/to/file' - """.stripMargin - sql(sql1) - sql(sql2) - - val catalog = sqlContext.sessionState.catalog - val id1 = FunctionIdentifier("helloworld1", Some(catalog.getCurrentDatabase)) - val id2 = FunctionIdentifier("helloworld2", Some(catalog.getCurrentDatabase)) - - val f1 = catalog.getFunction(id1) - val f2 = catalog.getFunction(id2) - assert(f1 == CatalogFunction(id1, "spark.example.SimpleUDFExample1")) - assert(f2 == CatalogFunction(id2, "spark.example.SimpleUDFExample2")) - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala new file mode 100644 index 0000000000000..c25ba97971173 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogFunction +import org.apache.spark.sql.test.SharedSQLContext + +class DDLSuite extends QueryTest with SharedSQLContext { + + test("SPARK-14123: Create / drop function DDL") { + val sql1 = + """ + |CREATE TEMPORARY FUNCTION helloworld1 AS + |'spark.example.SimpleUDFExample1' USING JAR '/path/to/jar1', + |JAR '/path/to/jar2' + """.stripMargin + val sql2 = + """ + |CREATE FUNCTION helloworld2 AS + |'spark.example.SimpleUDFExample2' USING ARCHIVE '/path/to/archive', + |FILE '/path/to/file' + """.stripMargin + sql(sql1) + sql(sql2) + + val catalog = sqlContext.sessionState.catalog + val id1 = FunctionIdentifier("helloworld1", Some(catalog.getCurrentDatabase)) + val id2 = FunctionIdentifier("helloworld2", Some(catalog.getCurrentDatabase)) + + val f1 = catalog.getFunction(id1) + val f2 = catalog.getFunction(id2) + assert(f1 == CatalogFunction(id1, "spark.example.SimpleUDFExample1")) + assert(f2 == CatalogFunction(id2, "spark.example.SimpleUDFExample2")) + + assert(catalog.listFunctions(catalog.getCurrentDatabase, "helloworld*").size == 2) + catalog.dropFunction(id1) + catalog.dropFunction(id2) + assert(catalog.listFunctions(catalog.getCurrentDatabase, "helloworld*").isEmpty) + } +} From 7f5325c0a6924aea1f0c75b01e73cc4febe1eb9f Mon Sep 17 00:00:00 2001 From: bomeng Date: Sun, 27 Mar 2016 08:43:24 -0700 Subject: [PATCH 4/5] update test cases --- .../apache/spark/sql/execution/command/DDLSuite.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index c25ba97971173..f4031e3a8f3f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.test.SharedSQLContext class DDLSuite extends QueryTest with SharedSQLContext { test("SPARK-14123: Create / drop function DDL") { + // test the create functions val sql1 = """ |CREATE TEMPORARY FUNCTION helloworld1 AS @@ -49,9 +50,14 @@ class DDLSuite extends QueryTest with SharedSQLContext { assert(f1 == CatalogFunction(id1, "spark.example.SimpleUDFExample1")) assert(f2 == CatalogFunction(id2, "spark.example.SimpleUDFExample2")) + // test the drop functions assert(catalog.listFunctions(catalog.getCurrentDatabase, "helloworld*").size == 2) - catalog.dropFunction(id1) - catalog.dropFunction(id2) + + val sql3 = "DROP TEMPORARY FUNCTION helloworld1" + val sql4 = "DROP FUNCTION IF EXISTS helloworld2" + sql(sql3) + sql(sql4) + assert(catalog.listFunctions(catalog.getCurrentDatabase, "helloworld*").isEmpty) } } From 601aba50032830f29881fc83299cc970e53c4cb2 Mon Sep 17 00:00:00 2001 From: bomeng Date: Sun, 27 Mar 2016 09:10:27 -0700 Subject: [PATCH 5/5] update style check --- .../apache/spark/sql/execution/command/commands.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 74da4c1205c1a..8f017a6d96f92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -22,15 +22,20 @@ import java.util.NoSuchElementException import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.catalog.CatalogFunction -import org.apache.spark.sql.{Dataset, Row, SQLContext} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Row +import org.apache.spark.sql.SQLContext /**