You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Describe the bug CronUtils library is always added to the task dependencies when plugins are enabled in the Brickflow config. JVM is retrieved from SparkSession in order to utilize this library. However DBR 15.4 leverage Spark Connect under the hood, that does not expose JVM in SparkSession. This causes any task that contains "cron-utils" as a dependency fail. See logs below.
[JVM_ATTRIBUTE_NOT_SUPPORTED] Directly accessing the underlying Spark driver JVM using the attribute '_jvm' is not supported on shared clusters. If you require direct access to these fields, consider using a single-user cluster. For more details on compatibility and limitations, check: https://docs.databricks.com/compute/access-mode-limitations.html#shared-access-mode-limitations-on-unity-catalog
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/cronhelper.py:39, in CronHelper.get_jvm(cls)
35 cron_utils = (
36 Path(os.path.abspath(__file__)).parent.absolute()
37 / "cron-utils-9.2.0.jar"
38 )
---> 39 jg = JavaGateway.launch_gateway(classpath=str(cron_utils))
40 log.info(
41 "Launched py4j gateway with cronutils jar added to class path from py4j pip installation"
42 )
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:2195, in JavaGateway.launch_gateway(cls, port, jarpath, classpath, javaopts, die_on_exit, redirect_stdout, redirect_stderr, daemonize_redirect, java_path, create_new_process_group, enable_auth, cwd, use_shell)
2142 """Launch a `Gateway` in a new Java process and create a default
2143 :class:`JavaGateway <py4j.java_gateway.JavaGateway>` to connect to
2144 it.
(...)
2193 connected to the `Gateway` server.
2194 """
-> 2195 _ret = launch_gateway(
2196 port, jarpath, classpath, javaopts, die_on_exit,
2197 redirect_stdout=redirect_stdout, redirect_stderr=redirect_stderr,
2198 daemonize_redirect=daemonize_redirect, java_path=java_path,
2199 create_new_process_group=create_new_process_group,
2200 enable_auth=enable_auth, cwd=cwd, return_proc=True,
2201 use_shell=use_shell)
2202 if enable_auth:
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:297, in launch_gateway(port, jarpath, classpath, javaopts, die_on_exit, redirect_stdout, redirect_stderr, daemonize_redirect, java_path, create_new_process_group, enable_auth, cwd, return_proc, use_shell)
296 if not os.path.exists(jarpath):
--> 297 raise Py4JError("Could not find py4j jar at {0}".format(jarpath))
299 # Launch the server in a subprocess.
Py4JError: Could not find py4j jar at
During handling of the above exception, another exception occurred:
PySparkAttributeError Traceback (most recent call last)
File <command-3899768728034622>, line 58
54 print("done ")
57 if __name__ == "__main__":
---> 58 main()
File <command-3899768728034622>, line 46, in main()
42 config_path = os.path.normpath(relative_path_to_config)
44 libraries = load_libraries(config_path)
---> 46 with Project(
47 "weekly_refresh",
48 git_repo="https://github.com/nike-impr-analytics/marketplace-analytics-engg.git" target="_blank" rel="noopener noreferrer">https://github.com/nike-impr-analytics/marketplace-analytics-engg.git</a></span><span>"</span>,
49 provider="github",
50 libraries=libraries,
51 ) as f:
52 f.add_pkg(workflows)
53 print(libraries)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow/engine/project.py:303, in Project.__exit__(self, exc_type, exc_val, exc_tb)
301 workflow = self._project.get_workflow(wf_id)
302 task = workflow.get_task(t_id)
--> 303 task.execute()
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow/engine/__init__.py:42, in with_brickflow_logger.<locals>.func(*args, **kwargs)
35 logger_handler.setFormatter(
36 logging.Formatter(
37 f"[%(asctime)s] [%(levelname)s] [brickflow:{_self.name}] "
38 "{%(module)s.py:%(funcName)s:%(lineno)d} - %(message)s"
39 )
40 )
41 log.addHandler(logger_handler)
---> 42 resp = f(*args, **kwargs)
44 log.handlers = [get_default_log_handler()]
46 return resp
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow/engine/task.py:1069, in Task.execute(self, ignore_all_deps)
1064 brickflow_execution_hook = get_brickflow_tasks_hook()
1066 initial_resp: TaskResponse = brickflow_execution_hook.task_execute(
1067 task=self, workflow=self.workflow
1068 )
-> 1069 resp: TaskResponse = brickflow_execution_hook.handle_results(
1070 resp=initial_resp, task=self, workflow=self.workflow
1071 )
1072 if resp.push_return_value is True:
1073 ctx.task_coms.put(self.name, RETURN_VALUE_KEY, resp.response)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/pluggy/_hooks.py:513, in HookCaller.__call__(self, **kwargs)
511 firstresult = self.spec.opts.get("firstresult", False) if self.spec else False
512 # Copy because plugins may register other plugins during iteration (#438).
--> 513 return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/pluggy/_manager.py:120, in PluginManager._hookexec(self, hook_name, methods, kwargs, firstresult)
111 def _hookexec(
112 self,
113 hook_name: str,
(...)
118 # called from all hookcaller instances.
119 # enable_tracing will set its own wrapping function at self._inner_hookexec
--> 120 return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
[... skipping hidden 2 frame]
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/brickflow_task_plugin.py:59, in AirflowOperatorBrickflowTaskPluginImpl.handle_results(resp, task, workflow)
55 if hasattr(_operator, "log"):
56 # overwrite the operator logger if it has one to the brickflow logger
57 setattr(_operator, "_log", ctx.log)
---> 59 context: Context = get_task_context(
60 task.task_id,
61 _operator,
62 workflow.schedule_quartz_expression,
63 epoch_to_pendulum_datetime(ctx.start_time(debug=None)),
64 tz=workflow.timezone,
65 )
67 env: Optional[Environment] = Environment()
68 env.globals.update({"macros": macros, "ti": context})
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/context/__init__.py:80, in get_task_context(task_id, operator, quartz_cron_statement, ts, tz)
77 def get_task_context(
78 task_id, operator: BaseOperator, quartz_cron_statement, ts, tz=TIMEZONE
79 ) -> Context:
---> 80 execution_ts = execution_timestamp(quartz_cron_statement, ts, tz)
81 return Context(
82 **{
83 "execution_date": str(execution_ts),
(...)
89 }
90 )
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/context/__init__.py:72, in execution_timestamp(quartz_cron_statement, ts, tz)
70 if ts is None:
71 ts = DateTime.utcnow()
---> 72 cron = cron_helper.quartz_to_unix(quartz_cron_statement)
73 tt = create_timetable(cron, tz)
74 return tt.align_to_prev(ts)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/cronhelper.py:96, in CronHelper.quartz_to_unix(self, quartz_cron)
94 @functools.lru_cache(maxsize=128) # cron expression conversion will not change
95 def quartz_to_unix(self, quartz_cron: str) -> str:
---> 96 quartz_parser = self._get_quartz_parser()
97 unix_expr = (
98 self._j_cron_mapper.fromQuartzToUnix()
99 .map(quartz_parser.parse(quartz_cron))
100 .asString()
101 )
102 log.info("Converted quartz cron %s to unix cron %s", quartz_cron, unix_expr)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/cronhelper.py:80, in CronHelper._get_quartz_parser(self)
78 def _get_quartz_parser(self):
79 if self._quartz_parser is None:
---> 80 self._initialize_jvm()
81 return self._quartz_parser
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/cronhelper.py:57, in CronHelper._initialize_jvm(self)
56 def _initialize_jvm(self):
---> 57 jvm = self.get_jvm()
59 j_cron_parser = jvm.com.cronutils.parser.CronParser
60 j_cron_definition_builder = (
61 jvm.com.cronutils.model.definition.CronDefinitionBuilder
62 )
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/brickflow_plugins/airflow/cronhelper.py:51, in CronHelper.get_jvm(cls)
46 log.info(
47 "Could not find py4j jar, attempting to load JVM from SparkSession"
48 )
49 from pyspark.sql import SparkSession
---> 51 cls._jvm = SparkSession.getActiveSession()._jvm
52 else:
53 raise e
File /databricks/spark/python/pyspark/sql/connect/session.py:967, in SparkSession.__getattr__(self, name)
965 def __getattr__(self, name: str) -> Any:
966 if name in ["_jsc", "_jconf", "_jvm", "_jsparkSession", "sparkContext", "newSession"]:
--> 967 raise PySparkAttributeError(
968 error_class="JVM_ATTRIBUTE_NOT_SUPPORTED", message_parameters={"attr_name": name}
969 )
970 return object.__getattribute__(self, name)
To Reproduce
Steps to reproduce the behavior:
Setup a Brickflow project with enabled plugins
Deploy a workflow to the cluster and configure it to run on DBR 15.4
Confirm that task contains cron-utils as dependency
Start the workflow
Expected behavior
CronUtils should be replaced in order to avoid using JVM directly. Task execution succeeds.
Screenshots
Cloud Information
AWS
Azure
GCP
Other
Desktop (please complete the following information):
OS: [e.g. iOS]
Browser [e.g. chrome, safari]
Version [e.g. 22]
Additional context
Add any other context about the problem here.
The text was updated successfully, but these errors were encountered:
Describe the bug
CronUtils
library is always added to the task dependencies when plugins are enabled in the Brickflow config. JVM is retrieved from SparkSession in order to utilize this library. However DBR 15.4 leverage Spark Connect under the hood, that does not expose JVM in SparkSession. This causes any task that contains "cron-utils" as a dependency fail. See logs below.To Reproduce
Steps to reproduce the behavior:
cron-utils
as dependencyExpected behavior
CronUtils should be replaced in order to avoid using JVM directly. Task execution succeeds.
Screenshots
Cloud Information
Desktop (please complete the following information):
Additional context
Add any other context about the problem here.
The text was updated successfully, but these errors were encountered: