-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31999][SQL] Add REFRESH FUNCTION command #28840
Changes from 10 commits
69a47a1
a95dcb6
3fc807e
b282348
f677a4a
a6c5d8b
de54470
9e09875
63695c0
9e9d5ce
c434821
35fd44b
f83fd8b
e444943
afd510b
1241bde
93f5d71
dc684b5
0ea7dd6
643969c
6cb2edd
cffc207
4b6408d
5d5fe71
4ba345b
6765395
dc86b82
a38d656
cdea55b
5e227d7
703ad47
a79f72b
a4d144a
3bd8d23
60ac2a0
b36b760
c5937a2
56ec5ea
c129a54
a956144
711656d
5d4c152
94fa132
fc4789f
e83194f
b18437c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
--- | ||
layout: global | ||
title: REFRESH FUNCTION | ||
displayTitle: REFRESH FUNCTION | ||
license: | | ||
Licensed to the Apache Software Foundation (ASF) under one or more | ||
contributor license agreements. See the NOTICE file distributed with | ||
this work for additional information regarding copyright ownership. | ||
The ASF licenses this file to You under the Apache License, Version 2.0 | ||
(the "License"); you may not use this file except in compliance with | ||
the License. You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
--- | ||
|
||
### Description | ||
|
||
`REFRESH FUNCTION` statement invalidates the cached function entry, which include class name | ||
and resource location of the given function. The invalidated cache is populated right away. | ||
Note that, refresh function only works for permanent function. Refresh native function or temporary function will cause exception. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, the suggestion I gave you yesterday has a few grammar mistakes. which include class name -> which includes the class name Note that, refresh function only works for permanent function. -> Note that Refresh native function or temporary function will cause exception. -> |
||
|
||
### Syntax | ||
|
||
```sql | ||
REFRESH FUNCTION function_identifier | ||
``` | ||
|
||
### Parameters | ||
|
||
* **function_identifier** | ||
|
||
Specifies a function name, which is either a qualified or unqualified name. If no database identifier is provided, use the current database. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use the current database -> uses the current database |
||
|
||
**Syntax:** `[ database_name. ] function_name` | ||
|
||
### Examples | ||
|
||
```sql | ||
-- The cached entries of the function will be refreshed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
-- The function is resolved from the current database as the function name is unqualified. | ||
REFRESH FUNCTION func1; | ||
|
||
-- The cached entries of the function will be refreshed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto: |
||
-- The function is resolved from tempDB database as the function name is qualified. | ||
REFRESH FUNCTION tempDB.func1; | ||
``` | ||
|
||
### Related Statements | ||
|
||
* [CACHE TABLE](sql-ref-syntax-aux-cache-cache-table.html) | ||
* [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) | ||
* [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to mention the above three data-related statement? The following There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just feel they are part of the |
||
* [REFRESH TABLE](sql-ref-syntax-aux-refresh-table.html) | ||
* [REFRESH](sql-ref-syntax-aux-cache-refresh.html) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -217,6 +217,7 @@ class Analyzer( | |
ResolveInsertInto :: | ||
ResolveRelations :: | ||
ResolveTables :: | ||
ResolveFunc(catalogManager) :: | ||
ResolveReferences :: | ||
ResolveCreateNamedStruct :: | ||
ResolveDeserializer :: | ||
|
@@ -834,6 +835,14 @@ class Analyzer( | |
} | ||
} | ||
|
||
case class ResolveFunc(catalogManager: CatalogManager) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we resolve |
||
extends Rule[LogicalPlan] with LookupCatalog { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
case UnresolvedFunc(CatalogAndFunctionIdentifier(catalog, identifier)) => | ||
ResolvedFunc(catalog, identifier) | ||
} | ||
} | ||
|
||
private def isResolvingView: Boolean = AnalysisContext.get.catalogAndNamespace.nonEmpty | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
|
||
package org.apache.spark.sql.catalyst.analysis | ||
|
||
import org.apache.spark.sql.catalyst.FunctionIdentifier | ||
import org.apache.spark.sql.catalyst.expressions.Attribute | ||
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} | ||
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, SupportsNamespaces, Table, TableCatalog} | ||
|
@@ -50,6 +51,11 @@ case class UnresolvedTableOrView(multipartIdentifier: Seq[String]) extends LeafN | |
override def output: Seq[Attribute] = Nil | ||
} | ||
|
||
case class UnresolvedFunc(multipartIdentifier: Seq[String]) extends LeafNode { | ||
override lazy val resolved: Boolean = false | ||
override def output: Seq[Attribute] = Nil | ||
} | ||
|
||
/** | ||
* A plan containing resolved namespace. | ||
*/ | ||
|
@@ -74,3 +80,8 @@ case class ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: T | |
case class ResolvedView(identifier: Identifier) extends LeafNode { | ||
override def output: Seq[Attribute] = Nil | ||
} | ||
|
||
case class ResolvedFunc(catalog: CatalogPlugin, functionIdentifier: FunctionIdentifier) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we put There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added it. BTW do we need a |
||
extends LeafNode { | ||
override def output: Seq[Attribute] = Nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1341,6 +1341,16 @@ class SessionCatalog( | |
functionRegistry.registerFunction(func, info, builder) | ||
} | ||
|
||
/** | ||
* Unregister a temporary or permanent function from a session-specific [[FunctionRegistry]] | ||
*/ | ||
def unregisterFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not used. If the function not exists, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we remove it now? |
||
if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) { | ||
throw new NoSuchFunctionException( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, it does not throw this exception because this check's already done in https://github.com/apache/spark/pull/28840/files#diff-d2a203f08c862bd762e6740c16e972f7R267-R268 ? |
||
formatDatabaseName(name.database.getOrElse(currentDb)), name.funcName) | ||
} | ||
} | ||
|
||
/** | ||
* Drop a temporary function. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ package org.apache.spark.sql.connector.catalog | |
|
||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.sql.AnalysisException | ||
import org.apache.spark.sql.catalyst.TableIdentifier | ||
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} | ||
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} | ||
|
||
/** | ||
|
@@ -155,4 +155,37 @@ private[sql] trait LookupCatalog extends Logging { | |
None | ||
} | ||
} | ||
|
||
/** | ||
* Extract catalog and function identifier from a multi-part name with the current catalog if | ||
* needed. | ||
* | ||
* Note that: now function is only supported in v1 catalog. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
*/ | ||
object CatalogAndFunctionIdentifier { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have you checked my comment? https://github.com/apache/spark/pull/28840/files#r442021563 I personally think you don't need this refactoring. Could you just use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for remind, missed here. Because of the dependency that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, moving There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will revert related change later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After some thought. We have to modify the
And after this, we also have to update the existing code in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sorry, but what does it mean? I made a PR for your branch. Please check my suggestion: ulysses-you#6 |
||
def unapply(nameParts: Seq[String]): Some[(CatalogPlugin, FunctionIdentifier)] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
if (nameParts.length == 1 && catalogManager.v1SessionCatalog.isTempFunction(nameParts.head)) { | ||
return Some(currentCatalog, FunctionIdentifier(nameParts.head)) | ||
} | ||
|
||
nameParts match { | ||
case SessionCatalogAndIdentifier(catalog, ident) => | ||
if (nameParts.length == 1) { | ||
// If there is only one name part, it means the current catalog is the session catalog. | ||
// Here we don't fill the default database, to keep the error message unchanged for | ||
// v1 commands. | ||
Some(catalog, FunctionIdentifier(nameParts.head, None)) | ||
} else { | ||
ident.namespace match { | ||
case Array(db) => Some(catalog, FunctionIdentifier(ident.name, Some(db))) | ||
case _ => | ||
throw new AnalysisException(s"Unsupported function name '$ident'") | ||
} | ||
} | ||
|
||
case _ => throw new AnalysisException(s"Function command is only supported in v1 catalog") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: drop |
||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -566,24 +566,19 @@ class ResolveSessionCatalog( | |
case ShowTableProperties(r: ResolvedView, propertyKey) => | ||
ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey) | ||
|
||
case DescribeFunctionStatement(nameParts, extended) => | ||
val functionIdent = | ||
parseSessionCatalogFunctionIdentifier(nameParts, "DESCRIBE FUNCTION") | ||
case DescribeFunctionStatement(CatalogAndFunctionIdentifier(_, functionIdent), extended) => | ||
DescribeFunctionCommand(functionIdent, extended) | ||
|
||
case ShowFunctionsStatement(userScope, systemScope, pattern, fun) => | ||
val (database, function) = fun match { | ||
case Some(nameParts) => | ||
val FunctionIdentifier(fn, db) = | ||
parseSessionCatalogFunctionIdentifier(nameParts, "SHOW FUNCTIONS") | ||
case Some(CatalogAndFunctionIdentifier(_, FunctionIdentifier(fn, db))) => | ||
(db, Some(fn)) | ||
case None => (None, pattern) | ||
} | ||
ShowFunctionsCommand(database, function, userScope, systemScope) | ||
|
||
case DropFunctionStatement(nameParts, ifExists, isTemp) => | ||
val FunctionIdentifier(function, database) = | ||
parseSessionCatalogFunctionIdentifier(nameParts, "DROP FUNCTION") | ||
case DropFunctionStatement( | ||
CatalogAndFunctionIdentifier(_, FunctionIdentifier(function, database)), ifExists, isTemp) => | ||
DropFunctionCommand(database, function, ifExists, isTemp) | ||
|
||
case CreateFunctionStatement(nameParts, | ||
|
@@ -606,38 +601,16 @@ class ResolveSessionCatalog( | |
ignoreIfExists, | ||
replace) | ||
} else { | ||
val FunctionIdentifier(function, database) = | ||
parseSessionCatalogFunctionIdentifier(nameParts, "CREATE FUNCTION") | ||
CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists, | ||
replace) | ||
} | ||
} | ||
|
||
// TODO: move function related v2 statements to the new framework. | ||
private def parseSessionCatalogFunctionIdentifier( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this method to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR needs the change? |
||
nameParts: Seq[String], | ||
sql: String): FunctionIdentifier = { | ||
if (nameParts.length == 1 && isTempFunction(nameParts.head)) { | ||
return FunctionIdentifier(nameParts.head) | ||
} | ||
|
||
nameParts match { | ||
case SessionCatalogAndIdentifier(_, ident) => | ||
if (nameParts.length == 1) { | ||
// If there is only one name part, it means the current catalog is the session catalog. | ||
// Here we don't fill the default database, to keep the error message unchanged for | ||
// v1 commands. | ||
FunctionIdentifier(nameParts.head, None) | ||
} else { | ||
ident.namespace match { | ||
case Array(db) => FunctionIdentifier(ident.name, Some(db)) | ||
case _ => | ||
throw new AnalysisException(s"Unsupported function name '$ident'") | ||
} | ||
nameParts match { | ||
case CatalogAndFunctionIdentifier(_, FunctionIdentifier(function, database)) => | ||
CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists, | ||
replace) | ||
} | ||
} | ||
|
||
case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog") | ||
} | ||
case RefreshFunction(ResolvedFunc(_, func)) => | ||
// Fallback to v1 command | ||
RefreshFunctionCommand(func.database, func.funcName) | ||
} | ||
|
||
private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc: @huaxingao