diff --git a/python/raydp/spark/ray_cluster.py b/python/raydp/spark/ray_cluster.py index be4d0b5f..2e7b0242 100644 --- a/python/raydp/spark/ray_cluster.py +++ b/python/raydp/spark/ray_cluster.py @@ -39,7 +39,8 @@ def __init__(self, executor_cores, executor_memory, enable_hive, - configs): + configs, + runtime_env=None): super().__init__(None) self._app_name = app_name self._spark_master = None @@ -49,24 +50,31 @@ def __init__(self, self._enable_hive = enable_hive self._configs = configs self._prepare_spark_configs() - self._set_up_master(resources=self._get_master_resources(self._configs), kwargs=None) + self._set_up_master(resources=self._get_master_resources(self._configs), + runtime_env=runtime_env) self._spark_session: SparkSession = None - def _set_up_master(self, resources: Dict[str, float], kwargs: Dict[Any, Any]): + def _set_up_master(self, resources: Dict[str, float], runtime_env: Dict[Any, Any]): # TODO: specify the app master resource spark_master_name = self._app_name + RAYDP_SPARK_MASTER_SUFFIX + if runtime_env is None: + runtime_env = {} + if resources: num_cpu = 1 if "CPU" in resources: num_cpu = resources["CPU"] resources.pop("CPU", None) + self._spark_master_handle = RayDPSparkMaster.options(name=spark_master_name, num_cpus=num_cpu, - resources=resources) \ + resources=resources, + runtime_env=runtime_env) \ .remote(self._configs) else: - self._spark_master_handle = RayDPSparkMaster.options(name=spark_master_name) \ + self._spark_master_handle = RayDPSparkMaster.options(name=spark_master_name, + runtime_env=runtime_env) \ .remote(self._configs) ray.get(self._spark_master_handle.start_up.remote())