-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9853][Core][Follow-up] Regularize all the shuffle configurations related to adaptive execution #26147
Conversation
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
Test build #112213 has finished for PR 26147 at commit
|
Test build #112220 has finished for PR 26147 at commit
|
thanks, merging to master! |
val FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED = | ||
buildConf("spark.sql.adaptive.fetchShuffleBlocksInBatch.enabled") | ||
buildConf("spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we need to improve the documentation because it doesn't seem to support the old shuffle service:
19/11/21 01:02:51 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 33.2 (TID 192743, hdc49-mcc10-01-0710-2509-029-tess0035.stratus.rno.ebay.com, executor 734): FetchFailed(BlockManagerId(730, hdc49-mcc10-01-0710-4003-037-tess0035.stratus.rno.ebay.com, 7337, None), shuffleId=14, mapIndex=2991, mapId=2991, reduceId=6507, message=
org.apache.spark.shuffle.FetchFailedException: Failure while fetching StreamChunkId{streamId=1015050587051, chunkIndex=1}: java.lang.IndexOutOfBoundsException
at java.nio.Buffer.checkIndex(Buffer.java:540)
at java.nio.ByteBufferAsLongBufferB.get(ByteBufferAsLongBufferB.java:115)
at org.apache.spark.network.shuffle.ShuffleIndexInformation.getIndex(ShuffleIndexInformation.java:64)
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:242)
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:175)
at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler$ManagedBufferIterator.next(ExternalShuffleBlockHandler.java:252)
at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler$ManagedBufferIterator.next(ExternalShuffleBlockHandler.java:208)
at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92)
at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:96)
at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
at org.spark_project.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:38)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:353)
at org.spark_project.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at org.spark_project.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:649)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:562)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:69)
at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:337)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:425)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:428)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId{streamId=1015050587051, chunkIndex=1}: java.lang.IndexOutOfBoundsException
at java.nio.Buffer.checkIndex(Buffer.java:540)
at java.nio.ByteBufferAsLongBufferB.get(ByteBufferAsLongBufferB.java:115)
at org.apache.spark.network.shuffle.ShuffleIndexInformation.getIndex(ShuffleIndexInformation.java:64)
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:242)
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:175)
at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler$ManagedBufferIterator.next(ExternalShuffleBlockHandler.java:252)
at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler$ManagedBufferIterator.next(ExternalShuffleBlockHandler.java:208)
at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92)
at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:96)
at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
at org.spark_project.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:38)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:353)
at org.spark_project.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at org.spark_project.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:139)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should improve the error message as well. cc @xuanyuanking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reporting this, fix it in #26663.
@@ -349,53 +349,43 @@ object SQLConf { | |||
.checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive") | |||
.createWithDefault(200) | |||
|
|||
val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should update/add the corresponding description for all the SQLConf that are affected by this conf. Otherwise, end users might not know the relation between these confs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in #26664.
What changes were proposed in this pull request?
BlockStoreShuffleReader.shouldBatchFetch
.Why are the changes needed?
It's a follow-up PR for #26040.
Regularize the existing
spark.sql.adaptive.shuffle
namespace in SQLConf.Does this PR introduce any user-facing change?
Rename one released user config
spark.sql.adaptive.minNumPostShufflePartitions
tospark.sql.adaptive.shuffle.minNumPostShufflePartitions
, other changed configs is not released yet.How was this patch tested?
Existing UT.