-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31999][SQL] Add REFRESH FUNCTION command #28840
Conversation
Test build #124115 has finished for PR 28840 at commit
|
@cloud-fan @maropu @HyukjinKwon thanks for review |
Hive supports this feature? Anyway, please update the SQL doc, too? |
Hive support
|
### Description | ||
|
||
`REFRESH FUNCTION` statement invalidates the cached entries, which include class name | ||
and resource location of the given function. The invalidated cache is populated right now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A little difference with refresh table
, it's light to populate function cache right now.
limitations under the License. | ||
--- | ||
|
||
### Description |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc: @huaxingao
Test build #124145 has finished for PR 28840 at commit
|
### Description | ||
|
||
`REFRESH FUNCTION` statement invalidates the cached entries, which include class name | ||
and resource location of the given function. The invalidated cache is populated right now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is populated right now -> is populated right away?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may want to add a little more detail, something like refresh function only works for permanent function, refresh native function or temporary function will cause Exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, update this later.
|
||
### Description | ||
|
||
`REFRESH FUNCTION` statement invalidates the cached entries, which include class name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the cached entries
-> the cached function entry
/** | ||
* REFRESH FUNCTION statement, as parsed from SQL | ||
*/ | ||
case class RefreshFunctionStatement( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it's a new command, can we follow CommentOnTable
and use the new command framework?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will move it later.
/** | ||
* The logical plan of the REFRESH FUNCTION command that works for v2 catalogs. | ||
*/ | ||
case class RefreshFunction(func: Seq[String]) extends Command |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create a UnresolvedFunc
, similar to UnresolvedTable
?
The key point is to do the resolution in the analyzer, not at runtime in RefreshFunctionCommand.run
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get it.
Test build #124164 has finished for PR 28840 at commit
|
Test build #124161 has finished for PR 28840 at commit
|
} | ||
|
||
// TODO: move function related v2 statements to the new framework. | ||
private def parseSessionCatalogFunctionIdentifier( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this method to LookupCatalog.CatalogAndFunctionIdentifier
and drop the sql param.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR needs the change?
Test build #124165 has finished for PR 28840 at commit
|
Test build #124168 has finished for PR 28840 at commit
|
|
||
`REFRESH FUNCTION` statement invalidates the cached function entry, which include class name | ||
and resource location of the given function. The invalidated cache is populated right away. | ||
Note that, refresh function only works for permanent function. Refresh native function or temporary function will cause exception. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, the suggestion I gave you yesterday has a few grammar mistakes.
which include class name -> which includes the class name
Note that, refresh function only works for permanent function. -> Note that REFRESH FUNCTION
only works for permanent functions.
Refresh native function or temporary function will cause exception. ->
Refreshing native functions or temporary functions will cause an exception.
|
||
* **function_identifier** | ||
|
||
Specifies a function name, which is either a qualified or unqualified name. If no database identifier is provided, use the current database. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the current database -> uses the current database
catalog.registerFunction(func, true) | ||
} else if (catalog.isRegisteredFunction(identifier)) { | ||
// clear cached function. | ||
catalog.unregisterFunction(identifier, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does unregisterFunction
need to take a boolean parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
Test build #125839 has finished for PR 28840 at commit
|
Test build #125834 has finished for PR 28840 at commit
|
retest this please |
Test build #125915 has finished for PR 28840 at commit
|
// register overwrite function. | ||
val func = catalog.getFunctionMetadata(identifier) | ||
catalog.registerFunction(func, true) | ||
} else if (catalog.isRegisteredFunction(identifier)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can simplify it
... else {
catalog.unregisterFunction(identifier)
}
unregisterFunction
will fail if function is not registered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
Show resolved
Hide resolved
Test build #125969 has finished for PR 28840 at commit
|
Test build #126007 has finished for PR 28840 at commit
|
Test build #126026 has finished for PR 28840 at commit
|
retest this please |
Test build #126224 has finished for PR 28840 at commit
|
Test build #126241 has finished for PR 28840 at commit
|
} else { | ||
// clear cached function, if not exists throw exception | ||
if (!catalog.unregisterFunction(identifier)) { | ||
throw new NoSuchFunctionException(identifier.database.get, identifier.funcName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I may not make myself clear.
I mean to go back to your original proposal, which always throw an exception if the function doesn't exist in the metastore. That said, we should do
catalog.unregisterFunction(identifier)
throw new NoSuchFunctionException(identifier.database.get, identifier.funcName)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, get it!
Test build #126249 has finished for PR 28840 at commit
|
All github actions pass, merging to master, thanks! |
@cloud-fan Thanks for merge ! Thanks all ! |
|
||
override def run(sparkSession: SparkSession): Seq[Row] = { | ||
val catalog = sparkSession.sessionState.catalog | ||
if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still can create persistent function with the same name as the built-in function. For example,
CREATE FUNCTION rand AS 'org.apache.spark.sql.catalyst.expressions.Abs'
DESC function default.rand
I think we should still allow this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems no meaning to refresh a persistent function whose name is same as a built-in function.
Yes, we can create a persistent function with the same name as the built-in function, but just create in metastore. The actual function we used is the built-in function. The reason is built-in functions are pre-cached in registry and we lookup cached function first.
e.g., CREATE FUNCTION rand AS 'xxx'
, DESC FUNCTION rand
will always return Class: org.apache.spark.sql.catalyst.expressions.Rand
.
BTW, maybe it's the reason why we create function and load it lazy that just be a Hive client, otherwise we can't create such function like rand
,md5
in metastore. @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
CREATE FUNCTION rand AS 'xxx';
DESC FUNCTION default.rand;
I think this is similar to table and temp views. Spark will try to look up temp view first, so if the name conflicts, temp view is preferred. But users can still use a qualified table name to read the table explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right.
Missed qualified name case, I will fix this in followup.
override def run(sparkSession: SparkSession): Seq[Row] = { | ||
val catalog = sparkSession.sessionState.catalog | ||
if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { | ||
throw new AnalysisException(s"Cannot refresh builtin function $functionName") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: built-in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get it.
val func = FunctionIdentifier("func1", Some("default")) | ||
sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") | ||
assert(!spark.sessionState.catalog.isRegisteredFunction(func)) | ||
sql("REFRESH FUNCTION func1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only positive test case. Could you think more and try to cover more cases?
### What changes were proposed in this pull request? Address the [#comment](#28840 (comment)). ### Why are the changes needed? Make code robust. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ut. Closes #29453 from ulysses-you/SPARK-31999-FOLLOWUP. Authored-by: ulysses <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
In Hive mode, permanent functions are shared with Hive metastore so that functions may be modified by other Hive client. With in long-lived spark scene, it's hard to update the change of function.
Here are 2 reasons:
FunctionRegistry
.replace function
.Note that we use v2 command code path to add new command.
Why are the changes needed?
Give a easy way to make spark function registry sync with Hive metastore.
Then we can call
Does this PR introduce any user-facing change?
Yes, new command.
How was this patch tested?
New UT.