-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-46937][SQL] Improve concurrency performance for FunctionRegistry #44976
Conversation
ping @cloud-fan cc @MaxGekk @viirya |
What's the level of concurrency you expect for function registration/lookup? Do you have perf numbers? |
The perf data has been added into PR description. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you rebase this to the master branch once more, @beliefer ?
This approach (and the benchmark result) looks reasonable to me for Apache Spark 4.0.0.
Do you still have some concern, @cloud-fan ?
Could you resolve the conflicts, @beliefer ? |
I resolved the conflicts for you~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM (Pending CIs).
Also, cc @cloud-fan once more because he has a concern last time.
Let me rebase again. |
Thank you, @beliefer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting, even with read-only cases, this change also helps. LGTM then.
Merged into master. |
…cement ### What changes were proposed in this pull request? A followup of #44976 . `ConcurrentHashMap#put` has a different semantic than the scala map, and it returns null if the key is new. We should update the checking code accordingly. ### Why are the changes needed? avoid wrong warning messages ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? manual ### Was this patch authored or co-authored using generative AI tooling? no Closes #46876 from cloud-fan/log. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Kent Yao <[email protected]>
@@ -1032,7 +1026,11 @@ class SimpleTableFunctionRegistry extends SimpleFunctionRegistryBase[LogicalPlan | |||
|
|||
override def clone(): SimpleTableFunctionRegistry = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a problem here. We don't synchronize the write path, how can we safely clone the ConcurrentHashMap
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can introduce a ReentrantReadWriteLock
guarding the ConcurrentHashMap
.
clone
and clear
would take the write lock on the ConcurrentHashMap
.
The other methods take read lock on the map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may work, but makes the code complicated. We should only do it if this does make a difference to real-world workloads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, It's safe here.
Because the entrySet()
of ConcurrentHashMap
is thread safe. We don't need synchronized.
Shall we revert this if #44976 (comment) is a real issue? I don't think this is a critical path for performance (how much parallelism do you expect for function lookups in a Spark session?), and synchronizing |
### What changes were proposed in this pull request? This PR propose to improve concurrency performance for `FunctionRegistry`. ### Why are the changes needed? Currently, `SimpleFunctionRegistryBase` adopted the `mutable.Map` caching function infos. The `SimpleFunctionRegistryBase` guarded by this so as ensure security under multithreading. Because all the mutable state are related to `functionBuilders`, we can delegate security to `ConcurrentHashMap`. `ConcurrentHashMap ` has higher concurrency activity and responsiveness. After this change, `FunctionRegistry` have better perf than before. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? GA. The benchmark test. ``` object FunctionRegistryBenchmark extends BenchmarkBase { override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runBenchmark("FunctionRegistry") { val iters = 1000000 val threadNum = 4 val functionRegistry = FunctionRegistry.builtin val names = FunctionRegistry.expressions.keys.toSeq val barrier = new CyclicBarrier(threadNum + 1) val threadPool = ThreadUtils.newDaemonFixedThreadPool(threadNum, "test-function-registry") val benchmark = new Benchmark("SimpleFunctionRegistry", iters, output = output) benchmark.addCase("only read") { _ => for (_ <- 1 to threadNum) { threadPool.execute(new Runnable { val random = new Random() override def run(): Unit = { barrier.await() for (_ <- 1 to iters) { val name = names(random.nextInt(names.size)) val fun = functionRegistry.lookupFunction(new FunctionIdentifier(name)) assert(fun.map(_.getName).get == name) functionRegistry.listFunction() } barrier.await() } }) } barrier.await() barrier.await() } benchmark.run() } } } ``` The benchmark output before this PR. ``` Java HotSpot(TM) 64-Bit Server VM 17.0.9+11-LTS-201 on Mac OS X 10.14.6 Intel(R) Core(TM) i5-5350U CPU 1.80GHz SimpleFunctionRegistry: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ only read 54858 55043 261 0.0 54858.1 1.0X ``` The benchmark output after this PR. ``` Java HotSpot(TM) 64-Bit Server VM 17.0.9+11-LTS-201 on Mac OS X 10.14.6 Intel(R) Core(TM) i5-5350U CPU 1.80GHz SimpleFunctionRegistry: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ only read 20202 20264 88 0.0 20202.1 1.0X ``` ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes apache#44976 from beliefer/SPARK-46937. Authored-by: beliefer <[email protected]> Signed-off-by: beliefer <[email protected]>
+1 for #44976 (comment) |
I've sent out the revert PR: #46940 |
…ctionRegistry" ### What changes were proposed in this pull request? Reverts #44976 as it breaks thread-safety ### Why are the changes needed? Fix thread-safety ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? no Closes #46940 from cloud-fan/revert. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This PR propose to improve concurrency performance for
FunctionRegistry
.Why are the changes needed?
Currently,
SimpleFunctionRegistryBase
adopted themutable.Map
caching function infos. TheSimpleFunctionRegistryBase
guarded by this so as ensure security under multithreading.Because all the mutable state are related to
functionBuilders
, we can delegate security toConcurrentHashMap
.ConcurrentHashMap
has higher concurrency activity and responsiveness.After this change,
FunctionRegistry
have better perf than before.Does this PR introduce any user-facing change?
'No'.
How was this patch tested?
GA.
The benchmark test.
The benchmark output before this PR.
The benchmark output after this PR.
Was this patch authored or co-authored using generative AI tooling?
'No'.