Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Brickflow plugin task fails on DBR 15.4 #195

Open
4 tasks
maxim-mityutko opened this issue Jan 10, 2025 · 0 comments
Open
4 tasks

[BUG] Brickflow plugin task fails on DBR 15.4 #195

maxim-mityutko opened this issue Jan 10, 2025 · 0 comments
Assignees
Labels
bug Something isn't working
Milestone

Comments

@maxim-mityutko
Copy link
Contributor

maxim-mityutko commented Jan 10, 2025

Describe the bug
CronUtils library is always added to the task dependencies when plugins are enabled in the Brickflow config. JVM is retrieved from SparkSession in order to utilize this library. However DBR 15.4 leverage Spark Connect under the hood, that does not expose JVM in SparkSession. This causes any task that contains "cron-utils" as a dependency fail. See logs below.

[JVM_ATTRIBUTE_NOT_SUPPORTED] Directly accessing the underlying Spark driver JVM using the attribute '_jvm' is not supported on shared clusters. If you require direct access to these fields, consider using a single-user cluster. For more details on compatibility and limitations, check: https://docs.databricks.com/compute/access-mode-limitations.html#shared-access-mode-limitations-on-unity-catalog
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/cronhelper.py:39, in CronHelper.get_jvm(cls)
     35 cron_utils = (
     36     Path(os.path.abspath(__file__)).parent.absolute()
     37     / "cron-utils-9.2.0.jar"
     38 )
---> 39 jg = JavaGateway.launch_gateway(classpath=str(cron_utils))
     40 log.info(
     41     "Launched py4j gateway with cronutils jar added to class path from py4j pip installation"
     42 )
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:2195, in JavaGateway.launch_gateway(cls, port, jarpath, classpath, javaopts, die_on_exit, redirect_stdout, redirect_stderr, daemonize_redirect, java_path, create_new_process_group, enable_auth, cwd, use_shell)
   2142 """Launch a `Gateway` in a new Java process and create a default
   2143 :class:`JavaGateway <py4j.java_gateway.JavaGateway>` to connect to
   2144 it.
   (...)
   2193     connected to the `Gateway` server.
   2194 """
-> 2195 _ret = launch_gateway(
   2196     port, jarpath, classpath, javaopts, die_on_exit,
   2197     redirect_stdout=redirect_stdout, redirect_stderr=redirect_stderr,
   2198     daemonize_redirect=daemonize_redirect, java_path=java_path,
   2199     create_new_process_group=create_new_process_group,
   2200     enable_auth=enable_auth, cwd=cwd, return_proc=True,
   2201     use_shell=use_shell)
   2202 if enable_auth:
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:297, in launch_gateway(port, jarpath, classpath, javaopts, die_on_exit, redirect_stdout, redirect_stderr, daemonize_redirect, java_path, create_new_process_group, enable_auth, cwd, return_proc, use_shell)
    296 if not os.path.exists(jarpath):
--> 297     raise Py4JError("Could not find py4j jar at {0}".format(jarpath))
    299 # Launch the server in a subprocess.
Py4JError: Could not find py4j jar at 

During handling of the above exception, another exception occurred:
PySparkAttributeError                     Traceback (most recent call last)
File <command-3899768728034622>, line 58
     54     print("done ")
     57 if __name__ == "__main__":
---> 58     main()
File <command-3899768728034622>, line 46, in main()
     42 config_path = os.path.normpath(relative_path_to_config)
     44 libraries = load_libraries(config_path)
---> 46 with Project(
     47         "weekly_refresh",
     48         git_repo="https://github.com/nike-impr-analytics/marketplace-analytics-engg.git" target="_blank" rel="noopener noreferrer">https://github.com/nike-impr-analytics/marketplace-analytics-engg.git</a></span><span>&quot;</span>,
     49         provider="github",
     50         libraries=libraries,
     51 ) as f:
     52     f.add_pkg(workflows)
     53 print(libraries)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow/engine/project.py:303, in Project.__exit__(self, exc_type, exc_val, exc_tb)
    301 workflow = self._project.get_workflow(wf_id)
    302 task = workflow.get_task(t_id)
--> 303 task.execute()
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow/engine/__init__.py:42, in with_brickflow_logger.<locals>.func(*args, **kwargs)
     35 logger_handler.setFormatter(
     36     logging.Formatter(
     37         f"[%(asctime)s] [%(levelname)s] [brickflow:{_self.name}] "
     38         "{%(module)s.py:%(funcName)s:%(lineno)d} - %(message)s"
     39     )
     40 )
     41 log.addHandler(logger_handler)
---> 42 resp = f(*args, **kwargs)
     44 log.handlers = [get_default_log_handler()]
     46 return resp
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow/engine/task.py:1069, in Task.execute(self, ignore_all_deps)
   1064 brickflow_execution_hook = get_brickflow_tasks_hook()
   1066 initial_resp: TaskResponse = brickflow_execution_hook.task_execute(
   1067     task=self, workflow=self.workflow
   1068 )
-> 1069 resp: TaskResponse = brickflow_execution_hook.handle_results(
   1070     resp=initial_resp, task=self, workflow=self.workflow
   1071 )
   1072 if resp.push_return_value is True:
   1073     ctx.task_coms.put(self.name, RETURN_VALUE_KEY, resp.response)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/pluggy/_hooks.py:513, in HookCaller.__call__(self, **kwargs)
    511 firstresult = self.spec.opts.get("firstresult", False) if self.spec else False
    512 # Copy because plugins may register other plugins during iteration (#438).
--> 513 return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/pluggy/_manager.py:120, in PluginManager._hookexec(self, hook_name, methods, kwargs, firstresult)
    111 def _hookexec(
    112     self,
    113     hook_name: str,
   (...)
    118     # called from all hookcaller instances.
    119     # enable_tracing will set its own wrapping function at self._inner_hookexec
--> 120     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
    [... skipping hidden 2 frame]
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/brickflow_task_plugin.py:59, in AirflowOperatorBrickflowTaskPluginImpl.handle_results(resp, task, workflow)
     55 if hasattr(_operator, "log"):
     56     # overwrite the operator logger if it has one to the brickflow logger
     57     setattr(_operator, "_log", ctx.log)
---> 59 context: Context = get_task_context(
     60     task.task_id,
     61     _operator,
     62     workflow.schedule_quartz_expression,
     63     epoch_to_pendulum_datetime(ctx.start_time(debug=None)),
     64     tz=workflow.timezone,
     65 )
     67 env: Optional[Environment] = Environment()
     68 env.globals.update({"macros": macros, "ti": context})
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/context/__init__.py:80, in get_task_context(task_id, operator, quartz_cron_statement, ts, tz)
     77 def get_task_context(
     78     task_id, operator: BaseOperator, quartz_cron_statement, ts, tz=TIMEZONE
     79 ) -> Context:
---> 80     execution_ts = execution_timestamp(quartz_cron_statement, ts, tz)
     81     return Context(
     82         **{
     83             "execution_date": str(execution_ts),
   (...)
     89         }
     90     )
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/context/__init__.py:72, in execution_timestamp(quartz_cron_statement, ts, tz)
     70 if ts is None:
     71     ts = DateTime.utcnow()
---> 72 cron = cron_helper.quartz_to_unix(quartz_cron_statement)
     73 tt = create_timetable(cron, tz)
     74 return tt.align_to_prev(ts)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/cronhelper.py:96, in CronHelper.quartz_to_unix(self, quartz_cron)
     94 @functools.lru_cache(maxsize=128)  # cron expression conversion will not change
     95 def quartz_to_unix(self, quartz_cron: str) -> str:
---> 96     quartz_parser = self._get_quartz_parser()
     97     unix_expr = (
     98         self._j_cron_mapper.fromQuartzToUnix()
     99         .map(quartz_parser.parse(quartz_cron))
    100         .asString()
    101     )
    102     log.info("Converted quartz cron %s to unix cron %s", quartz_cron, unix_expr)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/cronhelper.py:80, in CronHelper._get_quartz_parser(self)
     78 def _get_quartz_parser(self):
     79     if self._quartz_parser is None:
---> 80         self._initialize_jvm()
     81     return self._quartz_parser
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/cronhelper.py:57, in CronHelper._initialize_jvm(self)
     56 def _initialize_jvm(self):
---> 57     jvm = self.get_jvm()
     59     j_cron_parser = jvm.com.cronutils.parser.CronParser
     60     j_cron_definition_builder = (
     61         jvm.com.cronutils.model.definition.CronDefinitionBuilder
     62     )
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/cronhelper.py:51, in CronHelper.get_jvm(cls)
     46     log.info(
     47         "Could not find py4j jar, attempting to load JVM from SparkSession"
     48     )
     49     from pyspark.sql import SparkSession
---> 51     cls._jvm = SparkSession.getActiveSession()._jvm
     52 else:
     53     raise e
File /databricks/spark/python/pyspark/sql/connect/session.py:967, in SparkSession.__getattr__(self, name)
    965 def __getattr__(self, name: str) -> Any:
    966     if name in ["_jsc", "_jconf", "_jvm", "_jsparkSession", "sparkContext", "newSession"]:
--> 967         raise PySparkAttributeError(
    968             error_class="JVM_ATTRIBUTE_NOT_SUPPORTED", message_parameters={"attr_name": name}
    969         )
    970     return object.__getattribute__(self, name)

To Reproduce
Steps to reproduce the behavior:

  1. Setup a Brickflow project with enabled plugins
  2. Deploy a workflow to the cluster and configure it to run on DBR 15.4
  3. Confirm that task contains cron-utils as dependency
  4. Start the workflow

Expected behavior
CronUtils should be replaced in order to avoid using JVM directly. Task execution succeeds.

Screenshots
image (8)

Cloud Information

  • AWS
  • Azure
  • GCP
  • Other

Desktop (please complete the following information):

  • OS: [e.g. iOS]
  • Browser [e.g. chrome, safari]
  • Version [e.g. 22]

Additional context
Add any other context about the problem here.

@maxim-mityutko maxim-mityutko added the bug Something isn't working label Jan 10, 2025
@pariksheet pariksheet added this to the v1.3.1 milestone Jan 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants