Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(health): Handling SparkApplication CRD health status if dynamic allocation is enabled (#7557) #11522

Merged
merged 2 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,55 @@
health_status = {}
-- Can't use standard lib, math.huge equivalent
infinity = 2^1024-1

local function executor_range_api()
min_executor_instances = 0
max_executor_instances = infinity
if obj.spec.dynamicAllocation.maxExecutors then
max_executor_instances = obj.spec.dynamicAllocation.maxExecutors
end
if obj.spec.dynamicAllocation.minExecutors then
min_executor_instances = obj.spec.dynamicAllocation.minExecutors
end
return min_executor_instances, max_executor_instances
end

local function maybe_executor_range_spark_conf()
min_executor_instances = 0
max_executor_instances = infinity
if obj.spec.sparkConf["spark.streaming.dynamicAllocation.enabled"] ~= nil and
obj.spec.sparkConf["spark.streaming.dynamicAllocation.enabled"] == "true" then
if(obj.spec.sparkConf["spark.streaming.dynamicAllocation.maxExecutors"] ~= nil) then
max_executor_instances = tonumber(obj.spec.sparkConf["spark.streaming.dynamicAllocation.maxExecutors"])
end
if(obj.spec.sparkConf["spark.streaming.dynamicAllocation.minExecutors"] ~= nil) then
min_executor_instances = tonumber(obj.spec.sparkConf["spark.streaming.dynamicAllocation.minExecutors"])
end
return min_executor_instances, max_executor_instances
elseif obj.spec.sparkConf["spark.dynamicAllocation.enabled"] ~= nil and
obj.spec.sparkConf["spark.dynamicAllocation.enabled"] == "true" then
if(obj.spec.sparkConf["spark.dynamicAllocation.maxExecutors"] ~= nil) then
max_executor_instances = tonumber(obj.spec.sparkConf["spark.dynamicAllocation.maxExecutors"])
end
if(obj.spec.sparkConf["spark.dynamicAllocation.minExecutors"] ~= nil) then
min_executor_instances = tonumber(obj.spec.sparkConf["spark.dynamicAllocation.minExecutors"])
end
return min_executor_instances, max_executor_instances
else
return nil
end
end

local function maybe_executor_range()
if obj.spec["dynamicAllocation"] and obj.spec.dynamicAllocation.enabled then
return executor_range_api()
elseif obj.spec["sparkConf"] ~= nil then
return maybe_executor_range_spark_conf()
else
return nil
end
end

if obj.status ~= nil then
if obj.status.applicationState.state ~= nil then
if obj.status.applicationState.state == "" then
Expand All @@ -19,6 +70,13 @@ if obj.status ~= nil then
health_status.status = "Healthy"
health_status.message = "SparkApplication is Running"
return health_status
elseif maybe_executor_range() then
min_executor_instances, max_executor_instances = maybe_executor_range()
if count >= min_executor_instances and count <= max_executor_instances then
health_status.status = "Healthy"
health_status.message = "SparkApplication is Running"
return health_status
end
end
end
end
Expand Down Expand Up @@ -72,4 +130,4 @@ if obj.status ~= nil then
end
health_status.status = "Progressing"
health_status.message = "Waiting for Executor pods"
return health_status
return health_status
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,15 @@ tests:
status: Healthy
message: "SparkApplication is Running"
inputPath: testdata/healthy.yaml
- healthStatus:
status: Healthy
message: "SparkApplication is Running"
inputPath: testdata/healthy_dynamic_alloc.yaml
- healthStatus:
status: Healthy
message: "SparkApplication is Running"
inputPath: testdata/healthy_dynamic_alloc_dstream.yaml
- healthStatus:
status: Healthy
message: "SparkApplication is Running"
inputPath: testdata/healthy_dynamic_alloc_operator_api.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
generation: 4
labels:
argocd.argoproj.io/instance: spark-job
name: spark-job-app
namespace: spark-cluster
resourceVersion: "31812990"
uid: bfee52b0-74ca-4465-8005-f6643097ed64
spec:
executor:
instances: 4
sparkConf:
spark.dynamicAllocation.enabled: 'true'
spark.dynamicAllocation.maxExecutors: '10'
spark.dynamicAllocation.minExecutors: '2'
status:
applicationState:
state: RUNNING
driverInfo:
podName: ingestion-datalake-news-app-driver
webUIAddress: 172.20.207.161:4040
webUIPort: 4040
webUIServiceName: ingestion-datalake-news-app-ui-svc
executionAttempts: 13
executorState:
ingestion-datalake-news-app-1591613851251-exec-1: RUNNING
ingestion-datalake-news-app-1591613851251-exec-2: RUNNING
ingestion-datalake-news-app-1591613851251-exec-4: RUNNING
ingestion-datalake-news-app-1591613851251-exec-5: RUNNING
ingestion-datalake-news-app-1591613851251-exec-6: RUNNING
lastSubmissionAttemptTime: "2020-06-08T10:57:32Z"
sparkApplicationId: spark-a5920b2a5aa04d22a737c60759b5bf82
submissionAttempts: 1
submissionID: 3e713ec8-9f6c-4e78-ac28-749797c846f0
terminationTime: null
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
generation: 4
labels:
argocd.argoproj.io/instance: spark-job
name: spark-job-app
namespace: spark-cluster
resourceVersion: "31812990"
uid: bfee52b0-74ca-4465-8005-f6643097ed64
spec:
executor:
instances: 4
sparkConf:
spark.streaming.dynamicAllocation.enabled: 'true'
spark.streaming.dynamicAllocation.maxExecutors: '10'
spark.streaming.dynamicAllocation.minExecutors: '2'
status:
applicationState:
state: RUNNING
driverInfo:
podName: ingestion-datalake-news-app-driver
webUIAddress: 172.20.207.161:4040
webUIPort: 4040
webUIServiceName: ingestion-datalake-news-app-ui-svc
executionAttempts: 13
executorState:
ingestion-datalake-news-app-1591613851251-exec-1: RUNNING
ingestion-datalake-news-app-1591613851251-exec-4: RUNNING
ingestion-datalake-news-app-1591613851251-exec-6: RUNNING
lastSubmissionAttemptTime: "2020-06-08T10:57:32Z"
sparkApplicationId: spark-a5920b2a5aa04d22a737c60759b5bf82
submissionAttempts: 1
submissionID: 3e713ec8-9f6c-4e78-ac28-749797c846f0
terminationTime: null
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
generation: 4
labels:
argocd.argoproj.io/instance: spark-job
name: spark-job-app
namespace: spark-cluster
resourceVersion: "31812990"
uid: bfee52b0-74ca-4465-8005-f6643097ed64
spec:
executor:
instances: 4
dynamicAllocation:
enabled: true
initialExecutors: 2
minExecutors: 2
maxExecutors: 10
status:
applicationState:
state: RUNNING
driverInfo:
podName: ingestion-datalake-news-app-driver
webUIAddress: 172.20.207.161:4040
webUIPort: 4040
webUIServiceName: ingestion-datalake-news-app-ui-svc
executionAttempts: 13
executorState:
ingestion-datalake-news-app-1591613851251-exec-1: RUNNING
ingestion-datalake-news-app-1591613851251-exec-2: RUNNING
ingestion-datalake-news-app-1591613851251-exec-4: RUNNING
ingestion-datalake-news-app-1591613851251-exec-5: RUNNING
ingestion-datalake-news-app-1591613851251-exec-6: RUNNING
lastSubmissionAttemptTime: "2020-06-08T10:57:32Z"
sparkApplicationId: spark-a5920b2a5aa04d22a737c60759b5bf82
submissionAttempts: 1
submissionID: 3e713ec8-9f6c-4e78-ac28-749797c846f0
terminationTime: null