From 5ba519f63bcb57dad22112cbe1d85881b86f634a Mon Sep 17 00:00:00 2001 From: Yevgeniy Fridland Date: Thu, 1 Dec 2022 19:05:35 +0100 Subject: [PATCH] Hadling SparkApplication CRD health status if dynamic allocation is enabled Signed-off-by: Yevgeniy Fridland --- .../SparkApplication/health.lua | 60 ++++++++++++++++++- .../SparkApplication/health_test.yaml | 12 ++++ .../testdata/healthy_dynamic_alloc.yaml | 37 ++++++++++++ .../healthy_dynamic_alloc_dstream.yaml | 35 +++++++++++ .../healthy_dynamic_alloc_operator_api.yaml | 38 ++++++++++++ 5 files changed, 181 insertions(+), 1 deletion(-) create mode 100644 resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc.yaml create mode 100644 resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc_dstream.yaml create mode 100644 resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc_operator_api.yaml diff --git a/resource_customizations/sparkoperator.k8s.io/SparkApplication/health.lua b/resource_customizations/sparkoperator.k8s.io/SparkApplication/health.lua index 98e8feee227db..5a504602eb83c 100644 --- a/resource_customizations/sparkoperator.k8s.io/SparkApplication/health.lua +++ b/resource_customizations/sparkoperator.k8s.io/SparkApplication/health.lua @@ -1,4 +1,55 @@ health_status = {} +-- Can't use standard lib, math.huge equivalent +infinity = 2^1024-1 + +local function executor_range_api() + min_executor_instances = 0 + max_executor_instances = infinity + if obj.spec.dynamicAllocation.maxExecutors then + max_executor_instances = obj.spec.dynamicAllocation.maxExecutors + end + if obj.spec.dynamicAllocation.minExecutors then + min_executor_instances = obj.spec.dynamicAllocation.minExecutors + end + return min_executor_instances, max_executor_instances +end + +local function maybe_executor_range_spark_conf() + min_executor_instances = 0 + max_executor_instances = infinity + if obj.spec.sparkConf["spark.streaming.dynamicAllocation.enabled"] ~= nil and + obj.spec.sparkConf["spark.streaming.dynamicAllocation.enabled"] == "true" then + if(obj.spec.sparkConf["spark.streaming.dynamicAllocation.maxExecutors"] ~= nil) then + max_executor_instances = tonumber(obj.spec.sparkConf["spark.streaming.dynamicAllocation.maxExecutors"]) + end + if(obj.spec.sparkConf["spark.streaming.dynamicAllocation.minExecutors"] ~= nil) then + min_executor_instances = tonumber(obj.spec.sparkConf["spark.streaming.dynamicAllocation.minExecutors"]) + end + return min_executor_instances, max_executor_instances + elseif obj.spec.sparkConf["spark.dynamicAllocation.enabled"] ~= nil and + obj.spec.sparkConf["spark.dynamicAllocation.enabled"] == "true" then + if(obj.spec.sparkConf["spark.dynamicAllocation.maxExecutors"] ~= nil) then + max_executor_instances = tonumber(obj.spec.sparkConf["spark.dynamicAllocation.maxExecutors"]) + end + if(obj.spec.sparkConf["spark.dynamicAllocation.minExecutors"] ~= nil) then + min_executor_instances = tonumber(obj.spec.sparkConf["spark.dynamicAllocation.minExecutors"]) + end + return min_executor_instances, max_executor_instances + else + return nil + end +end + +local function maybe_executor_range() + if obj.spec["dynamicAllocation"] and obj.spec.dynamicAllocation.enabled then + return executor_range_api() + elseif obj.spec["sparkConf"] ~= nil then + return maybe_executor_range_spark_conf() + else + return nil + end +end + if obj.status ~= nil then if obj.status.applicationState.state ~= nil then if obj.status.applicationState.state == "" then @@ -19,6 +70,13 @@ if obj.status ~= nil then health_status.status = "Healthy" health_status.message = "SparkApplication is Running" return health_status + elseif maybe_executor_range() then + min_executor_instances, max_executor_instances = maybe_executor_range() + if count >= min_executor_instances and count <= max_executor_instances then + health_status.status = "Healthy" + health_status.message = "SparkApplication is Running" + return health_status + end end end end @@ -72,4 +130,4 @@ if obj.status ~= nil then end health_status.status = "Progressing" health_status.message = "Waiting for Executor pods" -return health_status \ No newline at end of file +return health_status diff --git a/resource_customizations/sparkoperator.k8s.io/SparkApplication/health_test.yaml b/resource_customizations/sparkoperator.k8s.io/SparkApplication/health_test.yaml index 7c971cbe666b8..582b446eca324 100644 --- a/resource_customizations/sparkoperator.k8s.io/SparkApplication/health_test.yaml +++ b/resource_customizations/sparkoperator.k8s.io/SparkApplication/health_test.yaml @@ -11,3 +11,15 @@ tests: status: Healthy message: "SparkApplication is Running" inputPath: testdata/healthy.yaml +- healthStatus: + status: Healthy + message: "SparkApplication is Running" + inputPath: testdata/healthy_dynamic_alloc.yaml +- healthStatus: + status: Healthy + message: "SparkApplication is Running" + inputPath: testdata/healthy_dynamic_alloc_dstream.yaml +- healthStatus: + status: Healthy + message: "SparkApplication is Running" + inputPath: testdata/healthy_dynamic_alloc_operator_api.yaml diff --git a/resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc.yaml b/resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc.yaml new file mode 100644 index 0000000000000..9ff52e78b98c5 --- /dev/null +++ b/resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc.yaml @@ -0,0 +1,37 @@ +apiVersion: sparkoperator.k8s.io/v1beta2 +kind: SparkApplication +metadata: + generation: 4 + labels: + argocd.argoproj.io/instance: spark-job + name: spark-job-app + namespace: spark-cluster + resourceVersion: "31812990" + uid: bfee52b0-74ca-4465-8005-f6643097ed64 +spec: + executor: + instances: 4 + sparkConf: + spark.dynamicAllocation.enabled: 'true' + spark.dynamicAllocation.maxExecutors: '10' + spark.dynamicAllocation.minExecutors: '2' +status: + applicationState: + state: RUNNING + driverInfo: + podName: ingestion-datalake-news-app-driver + webUIAddress: 172.20.207.161:4040 + webUIPort: 4040 + webUIServiceName: ingestion-datalake-news-app-ui-svc + executionAttempts: 13 + executorState: + ingestion-datalake-news-app-1591613851251-exec-1: RUNNING + ingestion-datalake-news-app-1591613851251-exec-2: RUNNING + ingestion-datalake-news-app-1591613851251-exec-4: RUNNING + ingestion-datalake-news-app-1591613851251-exec-5: RUNNING + ingestion-datalake-news-app-1591613851251-exec-6: RUNNING + lastSubmissionAttemptTime: "2020-06-08T10:57:32Z" + sparkApplicationId: spark-a5920b2a5aa04d22a737c60759b5bf82 + submissionAttempts: 1 + submissionID: 3e713ec8-9f6c-4e78-ac28-749797c846f0 + terminationTime: null diff --git a/resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc_dstream.yaml b/resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc_dstream.yaml new file mode 100644 index 0000000000000..ce24ff77177d2 --- /dev/null +++ b/resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc_dstream.yaml @@ -0,0 +1,35 @@ +apiVersion: sparkoperator.k8s.io/v1beta2 +kind: SparkApplication +metadata: + generation: 4 + labels: + argocd.argoproj.io/instance: spark-job + name: spark-job-app + namespace: spark-cluster + resourceVersion: "31812990" + uid: bfee52b0-74ca-4465-8005-f6643097ed64 +spec: + executor: + instances: 4 + sparkConf: + spark.streaming.dynamicAllocation.enabled: 'true' + spark.streaming.dynamicAllocation.maxExecutors: '10' + spark.streaming.dynamicAllocation.minExecutors: '2' +status: + applicationState: + state: RUNNING + driverInfo: + podName: ingestion-datalake-news-app-driver + webUIAddress: 172.20.207.161:4040 + webUIPort: 4040 + webUIServiceName: ingestion-datalake-news-app-ui-svc + executionAttempts: 13 + executorState: + ingestion-datalake-news-app-1591613851251-exec-1: RUNNING + ingestion-datalake-news-app-1591613851251-exec-4: RUNNING + ingestion-datalake-news-app-1591613851251-exec-6: RUNNING + lastSubmissionAttemptTime: "2020-06-08T10:57:32Z" + sparkApplicationId: spark-a5920b2a5aa04d22a737c60759b5bf82 + submissionAttempts: 1 + submissionID: 3e713ec8-9f6c-4e78-ac28-749797c846f0 + terminationTime: null diff --git a/resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc_operator_api.yaml b/resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc_operator_api.yaml new file mode 100644 index 0000000000000..538a27991bb5a --- /dev/null +++ b/resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc_operator_api.yaml @@ -0,0 +1,38 @@ +apiVersion: sparkoperator.k8s.io/v1beta2 +kind: SparkApplication +metadata: + generation: 4 + labels: + argocd.argoproj.io/instance: spark-job + name: spark-job-app + namespace: spark-cluster + resourceVersion: "31812990" + uid: bfee52b0-74ca-4465-8005-f6643097ed64 +spec: + executor: + instances: 4 + dynamicAllocation: + enabled: true + initialExecutors: 2 + minExecutors: 2 + maxExecutors: 10 +status: + applicationState: + state: RUNNING + driverInfo: + podName: ingestion-datalake-news-app-driver + webUIAddress: 172.20.207.161:4040 + webUIPort: 4040 + webUIServiceName: ingestion-datalake-news-app-ui-svc + executionAttempts: 13 + executorState: + ingestion-datalake-news-app-1591613851251-exec-1: RUNNING + ingestion-datalake-news-app-1591613851251-exec-2: RUNNING + ingestion-datalake-news-app-1591613851251-exec-4: RUNNING + ingestion-datalake-news-app-1591613851251-exec-5: RUNNING + ingestion-datalake-news-app-1591613851251-exec-6: RUNNING + lastSubmissionAttemptTime: "2020-06-08T10:57:32Z" + sparkApplicationId: spark-a5920b2a5aa04d22a737c60759b5bf82 + submissionAttempts: 1 + submissionID: 3e713ec8-9f6c-4e78-ac28-749797c846f0 + terminationTime: null