From 01e851f24be74587c6ef2252a33550794daa85dc Mon Sep 17 00:00:00 2001 From: raviranak <126759945+raviranak@users.noreply.github.com> Date: Tue, 9 Apr 2024 06:55:31 +0530 Subject: [PATCH] dynamic allocation executor pending for addition flood fix (#396) * dynamicallocation executor pending for addition flood fix * scala checkstyle fix * scala checkstyle fix * scala checkstyle fix --- .../org/apache/spark/deploy/raydp/RayAppMaster.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala index 08945732..4eef0fd5 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala @@ -270,6 +270,16 @@ class RayAppMaster(host: String, .map{ case (name, amount) => s"${name}: ${amount}"}.mkString(", ")} }..") // TODO: Support generic fractional logical resources using prefix spark.ray.actor.resource.* + // This will check with dynamic auto scale no additional pending executor actor added more + // than max executors count as this result in executor even running after job completion + val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false) + if (dynamicAllocationEnabled) { + val maxExecutor = conf.getInt("spark.dynamicAllocation.maxExecutors", 0) + if (restartedExecutors.size >= maxExecutor) { + return + } + } + val handler = RayExecutorUtils.createExecutorActor( executorId, getAppMasterEndpointUrl(), rayActorCPU,