diff --git a/brickflow/cli/entrypoint.template b/brickflow/cli/entrypoint.template index 447e948d..55200763 100644 --- a/brickflow/cli/entrypoint.template +++ b/brickflow/cli/entrypoint.template @@ -11,7 +11,6 @@ def main() -> None: git_repo="{{ git_https_url }}", provider="{{ git_provider }}", libraries=[ - MavenTaskLibrary(coordinates="com.cronutils:cron-utils:9.2.0"), # PypiTaskLibrary(package="spark-expectations=={{spark_expectations_version}}"), # Uncomment if spark-expectations is needed ], ) as f: diff --git a/brickflow/engine/task.py b/brickflow/engine/task.py index 97709b16..3a74cbb0 100644 --- a/brickflow/engine/task.py +++ b/brickflow/engine/task.py @@ -1137,10 +1137,6 @@ def filter_bf_related_libraries( if isinstance(lib, PypiTaskLibrary): if lib.package.startswith("apache-airflow") is True: continue - if isinstance(lib, MavenTaskLibrary): - # TODO: clean this up but no one should really be using cron-utils at the moment for outside of brickflow - if lib.coordinates.startswith("com.cronutils:cron-utils:9.2.0") is True: - continue resp.append(lib) return resp @@ -1212,7 +1208,6 @@ def get_brickflow_libraries(enable_plugins: bool = False) -> List[TaskLibrary]: PypiTaskLibrary("tableauserverclient==0.25"), PypiTaskLibrary("boxsdk==3.9.2"), PypiTaskLibrary("cerberus-python-client==2.5.4"), - MavenTaskLibrary("com.cronutils:cron-utils:9.2.0"), ] else: return [bf_lib] diff --git a/brickflow_plugins/airflow/cron-utils-9.2.0.jar b/brickflow_plugins/airflow/cron-utils-9.2.0.jar deleted file mode 100644 index e3f056b5..00000000 Binary files a/brickflow_plugins/airflow/cron-utils-9.2.0.jar and /dev/null differ diff --git a/brickflow_plugins/airflow/cronhelper.py b/brickflow_plugins/airflow/cronhelper.py index 33e8a241..66b9bc70 100644 --- a/brickflow_plugins/airflow/cronhelper.py +++ b/brickflow_plugins/airflow/cronhelper.py @@ -1,106 +1,134 @@ +import re import functools -import os -from pathlib import Path from brickflow_plugins import log -try: - from py4j.protocol import Py4JError -except ImportError: - raise ImportError( - "You must install py4j to use cronhelper, " - "please try pip install py4j. " - "This library is not installed as " - "it is provided by databricks OOTB." - ) - class CronHelper: - _jvm = None - - def __init__(self): - self._j_cron_mapper = None - self._unix_parser = None - self._quartz_parser = None - - @classmethod - def get_jvm(cls): - if cls._jvm is None: # Initialize the JVM only once and cache it - try: - log.info( - "Attempting to load JVM from pip installation of py4j for cronhelper" - ) - from py4j.java_gateway import JavaGateway - - cron_utils = ( - Path(os.path.abspath(__file__)).parent.absolute() - / "cron-utils-9.2.0.jar" - ) - jg = JavaGateway.launch_gateway(classpath=str(cron_utils)) - log.info( - "Launched py4j gateway with cronutils jar added to class path from py4j pip installation" - ) - cls._jvm = jg.jvm - except Py4JError as e: - if str(e).startswith("Could not find py4j jar"): - log.info( - "Could not find py4j jar, attempting to load JVM from SparkSession" - ) - from pyspark.sql import SparkSession - - cls._jvm = SparkSession.getActiveSession()._jvm - else: - raise e - return cls._jvm - - def _initialize_jvm(self): - jvm = self.get_jvm() - - j_cron_parser = jvm.com.cronutils.parser.CronParser - j_cron_definition_builder = ( - jvm.com.cronutils.model.definition.CronDefinitionBuilder - ) - j_cron_type = jvm.com.cronutils.model.CronType - - self._j_cron_mapper = jvm.com.cronutils.mapper.CronMapper - self._unix_parser = j_cron_parser( - j_cron_definition_builder.instanceDefinitionFor(j_cron_type.UNIX) + EVERY_X_UNITS_REPLACE_PLACEHOLDER = "%s" + QUARTZ_EVERY_X_UNITS_REGEX = re.compile(r"^0/(\d+)$") # For handling 0/5 units + UNIX_EVERY_X_UNITS_REGEX = re.compile(r"^\*/(\d+)$") # For handling */5 units + QUARTZ_EVERY_X_UNITS_REPLACE_PATTERN = f"0/{EVERY_X_UNITS_REPLACE_PLACEHOLDER}" + UNIX_EVERY_X_UNITS_REPLACE_PATTERN = f"*/{EVERY_X_UNITS_REPLACE_PLACEHOLDER}" + + @staticmethod + def __get_expression_parts(expression: str) -> list: + parts = [part.strip() for part in expression.split(" ")] + + # Unix cron expression have 5 parts, Quartz cron expression have 6 or 7 parts + if len(parts) in [5, 7]: + return parts + # Year is an optional part in Quartz cron expression, adding the extra element to mimic 7 part Quartz expression + if len(parts) == 6: + parts.append("*") + return parts + + raise ValueError("Invalid cron expression!") + + @staticmethod + def convert_interval_parts(part: str, is_quartz: bool = False) -> str: + every_x_units_pattern = ( + CronHelper.QUARTZ_EVERY_X_UNITS_REGEX + if is_quartz + else CronHelper.UNIX_EVERY_X_UNITS_REGEX ) - self._quartz_parser = j_cron_parser( - j_cron_definition_builder.instanceDefinitionFor(j_cron_type.QUARTZ) + matches = every_x_units_pattern.match(part) + every_x_units_replace_pattern = ( + CronHelper.QUARTZ_EVERY_X_UNITS_REPLACE_PATTERN + if is_quartz + else CronHelper.UNIX_EVERY_X_UNITS_REPLACE_PATTERN ) - def _get_unix_parser(self): - if self._unix_parser is None: - self._initialize_jvm() - return self._unix_parser + if matches: + return every_x_units_replace_pattern.replace( + CronHelper.EVERY_X_UNITS_REPLACE_PLACEHOLDER, matches.group(1) + ) - def _get_quartz_parser(self): - if self._quartz_parser is None: - self._initialize_jvm() - return self._quartz_parser + return part @functools.lru_cache(maxsize=128) # cron expression conversion will not change def unix_to_quartz(self, unix_cron: str) -> str: - unix_parser = self._get_unix_parser() - quartz_expr = ( - self._j_cron_mapper.fromUnixToQuartz() - .map(unix_parser.parse(unix_cron)) - .asString() - ) - log.info("Converted unix cron %s to quartz cron %s", unix_cron, quartz_expr) - return quartz_expr + parts = self.__get_expression_parts(expression=unix_cron) + + if len(parts) != 5: + raise ValueError("Invalid Unix cron expression") + + minute, hour, dom, month, dow = map(self.convert_interval_parts, parts) + + # Converting Unix DOW to Quartz DOW + def shift_days(day: str) -> str: + """ + Quartz DOW starts from 1 (Sunday) while Unix DOW starts from 0 (Sunday) + """ + if "-" in day: + return "-".join([shift_days(day=d) for d in day.split("-")]) + + # Unix cron Sunday can be represented as 0 or 7, but only as 1 in Quartz cron + if day in ["0", "7"]: + return "1" + if day in ["SUN", "MON", "TUE", "WED", "THU", "FRI", "SAT"]: + return day + return str(int(day) + 1) + + if "," in dow: + quartz_dow = ",".join([shift_days(day=day) for day in dow.split(",")]) + elif dow == "*": + quartz_dow = dow + else: + quartz_dow = shift_days(day=dow) + + quartz_dom = dom + + if dom != "*" and dow == "*": + quartz_dow = "?" + elif dom == "*": + quartz_dom = "?" + + quartz_cron = f"0 {minute} {hour} {quartz_dom} {month} {quartz_dow} *" + log.info("Converted unix cron %s to quartz cron %s", unix_cron, quartz_cron) + return quartz_cron @functools.lru_cache(maxsize=128) # cron expression conversion will not change def quartz_to_unix(self, quartz_cron: str) -> str: - quartz_parser = self._get_quartz_parser() - unix_expr = ( - self._j_cron_mapper.fromQuartzToUnix() - .map(quartz_parser.parse(quartz_cron)) - .asString() + parts = self.__get_expression_parts(expression=quartz_cron) + + if len(parts) != 7: + raise ValueError("Invalid Quartz cron expression") + + if "L" in quartz_cron or "W" in quartz_cron or "#" in quartz_cron: + raise ValueError("Support for 'L, W, #' in Quartz cron is not implemented") + + # Unix cron expression does not support '?' + parts = [part.replace("?", "*") for part in parts] + + _, minute, hour, dom, month, dow, _ = map( + lambda part: self.convert_interval_parts(part, True), parts ) - log.info("Converted quartz cron %s to unix cron %s", quartz_cron, unix_expr) - return unix_expr + + # Converting Quartz DOW to Unix DOW + def shift_days(day: str) -> str: + """ + Quartz DOW starts from 1 (Sunday) while Unix DOW starts from 0 (Sunday) + """ + if "-" in day: + return "-".join([shift_days(day=d) for d in day.split("-")]) + if day in ["SUN", "MON", "TUE", "WED", "THU", "FRI", "SAT"]: + return day + + return str(int(day) - 1) + + if "," in dow: + unix_dow = ",".join([shift_days(day=day) for day in dow.split(",")]) + elif dow == "*": + unix_dow = "*" + else: + unix_dow = shift_days(day=dow) + + unix_dom = dom + + unix_cron = f"{minute} {hour} {unix_dom} {month} {unix_dow}" + log.info("Converted quartz cron %s to unix cron %s", quartz_cron, unix_cron) + return unix_cron cron_helper = CronHelper() diff --git a/tests/codegen/expected_bundles/dev_bundle_polyrepo_with_auto_libs.yml b/tests/codegen/expected_bundles/dev_bundle_polyrepo_with_auto_libs.yml index 43c89df0..f44efbb5 100644 --- a/tests/codegen/expected_bundles/dev_bundle_polyrepo_with_auto_libs.yml +++ b/tests/codegen/expected_bundles/dev_bundle_polyrepo_with_auto_libs.yml @@ -48,10 +48,6 @@ targets: - pypi: package: cerberus-python-client==2.5.4 repo: null - - maven: - coordinates: com.cronutils:cron-utils:9.2.0 - exclusions: null - repo: null max_retries: null min_retry_interval_millis: null notebook_task: diff --git a/tests/engine/test_task.py b/tests/engine/test_task.py index 70f408af..2747144b 100644 --- a/tests/engine/test_task.py +++ b/tests/engine/test_task.py @@ -449,7 +449,7 @@ def test_get_brickflow_lib_version(self): def test_get_brickflow_libraries(self): settings = BrickflowProjectDeploymentSettings() settings.brickflow_project_runtime_version = "1.0.0" - assert len(get_brickflow_libraries(enable_plugins=True)) == 7 + assert len(get_brickflow_libraries(enable_plugins=True)) == 6 assert len(get_brickflow_libraries(enable_plugins=False)) == 1 lib = get_brickflow_libraries(enable_plugins=False)[0].dict expected = { @@ -465,7 +465,7 @@ def test_get_brickflow_libraries_semver_non_numeric(self): settings = BrickflowProjectDeploymentSettings() tag = "1.0.1rc1234" settings.brickflow_project_runtime_version = tag - assert len(get_brickflow_libraries(enable_plugins=True)) == 7 + assert len(get_brickflow_libraries(enable_plugins=True)) == 6 assert len(get_brickflow_libraries(enable_plugins=False)) == 1 lib = get_brickflow_libraries(enable_plugins=False)[0].dict expected = { @@ -481,7 +481,7 @@ def test_get_brickflow_libraries_non_semver(self): settings = BrickflowProjectDeploymentSettings() tag = "somebranch" settings.brickflow_project_runtime_version = tag - assert len(get_brickflow_libraries(enable_plugins=True)) == 7 + assert len(get_brickflow_libraries(enable_plugins=True)) == 6 assert len(get_brickflow_libraries(enable_plugins=False)) == 1 lib = get_brickflow_libraries(enable_plugins=False)[0].dict expected = { diff --git a/tests/test_plugins.py b/tests/test_plugins.py index a77fa193..ed8ddecc 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -1,7 +1,6 @@ import copy from typing import List from unittest import mock -from unittest.mock import patch import pluggy import pytest @@ -45,21 +44,48 @@ def test_plugins_ensure_installation_import_error(self): get_brickflow_tasks_hook(pm) assert_plugin_manager(pm, ["default"]) - def test_cron_import_nopy4j(self): - remove_cron_helper = { - "brickflow_plugins.airflow.cronhelper": None, - "py4j": None, - "py4j.protocol": None, - "py4j.java_gateway": None, - } - with patch.dict("sys.modules", remove_cron_helper): - with pytest.raises(ImportError): - import brickflow_plugins.airflow.cronhelper as cronhelper # noqa + @pytest.mark.parametrize( + "quartz_cron, expected_unix_cron", + [ + ("0 * * ? * * *", "* * * * *"), + ("0 */5 * ? * * *", "*/5 * * * *"), + ("0 30 * ? * * *", "30 * * * *"), + ("0 0 12 ? * * *", "0 12 * * *"), + ("0 0 12 ? * 2 *", "0 12 * * 1"), + ("0 0 0 10 * ? *", "0 0 10 * *"), + ("0 0 0 1 1 ? *", "0 0 1 1 *"), + ("0 0/5 14,18 * * ?", "0/5 14,18 * * *"), + ("0 0 12 ? * 1,2,5-7 *", "0 12 * * 0,1,4-6"), + ("0 0 12 ? * SUN,MON,THU-SAT *", "0 12 * * SUN,MON,THU-SAT"), + ], + ) + def test_cron_conversion(self, quartz_cron, expected_unix_cron): + import brickflow_plugins.airflow.cronhelper as cronhelper # noqa + + converted_unix_cron = cronhelper.cron_helper.quartz_to_unix(quartz_cron) + converted_quartz_cron = cronhelper.cron_helper.unix_to_quartz( + converted_unix_cron + ) + converted_unix_cron_second = cronhelper.cron_helper.quartz_to_unix( + converted_quartz_cron + ) - def test_cron_conversion(self): + assert ( + converted_unix_cron == converted_unix_cron_second + ), "cron conversion should be idempotent" + assert converted_unix_cron == expected_unix_cron + + @pytest.mark.parametrize( + "quartz_cron", + [ + "0 0 12 ? * L *", + "0 0 12 ? * 1L *", + "0 0 12 ? * 1W *", + "0 0 12 ? * 1#5 *", + ], + ) + def test_unsupported_cron_expressions(self, quartz_cron): import brickflow_plugins.airflow.cronhelper as cronhelper # noqa - unix_cron = cronhelper.cron_helper.quartz_to_unix("0 0 12 * * ?") - quartz_cron = cronhelper.cron_helper.unix_to_quartz(unix_cron) - unix_cron_second = cronhelper.cron_helper.quartz_to_unix(quartz_cron) - assert unix_cron == unix_cron_second, "cron conversion should be idempotent" + with pytest.raises(ValueError): + cronhelper.cron_helper.quartz_to_unix(quartz_cron)