Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix] Replace CronUtils library #196

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion brickflow/cli/entrypoint.template
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ def main() -> None:
git_repo="{{ git_https_url }}",
provider="{{ git_provider }}",
libraries=[
MavenTaskLibrary(coordinates="com.cronutils:cron-utils:9.2.0"),
# PypiTaskLibrary(package="spark-expectations=={{spark_expectations_version}}"), # Uncomment if spark-expectations is needed
],
) as f:
Expand Down
5 changes: 0 additions & 5 deletions brickflow/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1137,10 +1137,6 @@ def filter_bf_related_libraries(
if isinstance(lib, PypiTaskLibrary):
if lib.package.startswith("apache-airflow") is True:
continue
if isinstance(lib, MavenTaskLibrary):
# TODO: clean this up but no one should really be using cron-utils at the moment for outside of brickflow
if lib.coordinates.startswith("com.cronutils:cron-utils:9.2.0") is True:
continue
resp.append(lib)
return resp

Expand Down Expand Up @@ -1212,7 +1208,6 @@ def get_brickflow_libraries(enable_plugins: bool = False) -> List[TaskLibrary]:
PypiTaskLibrary("tableauserverclient==0.25"),
PypiTaskLibrary("boxsdk==3.9.2"),
PypiTaskLibrary("cerberus-python-client==2.5.4"),
MavenTaskLibrary("com.cronutils:cron-utils:9.2.0"),
]
else:
return [bf_lib]
Binary file removed brickflow_plugins/airflow/cron-utils-9.2.0.jar
Binary file not shown.
200 changes: 114 additions & 86 deletions brickflow_plugins/airflow/cronhelper.py
Original file line number Diff line number Diff line change
@@ -1,106 +1,134 @@
import re
import functools
import os
from pathlib import Path

from brickflow_plugins import log

try:
from py4j.protocol import Py4JError
except ImportError:
raise ImportError(
"You must install py4j to use cronhelper, "
"please try pip install py4j. "
"This library is not installed as "
"it is provided by databricks OOTB."
)


class CronHelper:
_jvm = None

def __init__(self):
self._j_cron_mapper = None
self._unix_parser = None
self._quartz_parser = None

@classmethod
def get_jvm(cls):
if cls._jvm is None: # Initialize the JVM only once and cache it
try:
log.info(
"Attempting to load JVM from pip installation of py4j for cronhelper"
)
from py4j.java_gateway import JavaGateway

cron_utils = (
Path(os.path.abspath(__file__)).parent.absolute()
/ "cron-utils-9.2.0.jar"
)
jg = JavaGateway.launch_gateway(classpath=str(cron_utils))
log.info(
"Launched py4j gateway with cronutils jar added to class path from py4j pip installation"
)
cls._jvm = jg.jvm
except Py4JError as e:
if str(e).startswith("Could not find py4j jar"):
log.info(
"Could not find py4j jar, attempting to load JVM from SparkSession"
)
from pyspark.sql import SparkSession

cls._jvm = SparkSession.getActiveSession()._jvm
else:
raise e
return cls._jvm

def _initialize_jvm(self):
jvm = self.get_jvm()

j_cron_parser = jvm.com.cronutils.parser.CronParser
j_cron_definition_builder = (
jvm.com.cronutils.model.definition.CronDefinitionBuilder
)
j_cron_type = jvm.com.cronutils.model.CronType

self._j_cron_mapper = jvm.com.cronutils.mapper.CronMapper
self._unix_parser = j_cron_parser(
j_cron_definition_builder.instanceDefinitionFor(j_cron_type.UNIX)
EVERY_X_UNITS_REPLACE_PLACEHOLDER = "%s"
QUARTZ_EVERY_X_UNITS_REGEX = re.compile(r"^0/(\d+)$") # For handling 0/5 units
UNIX_EVERY_X_UNITS_REGEX = re.compile(r"^\*/(\d+)$") # For handling */5 units
QUARTZ_EVERY_X_UNITS_REPLACE_PATTERN = f"0/{EVERY_X_UNITS_REPLACE_PLACEHOLDER}"
UNIX_EVERY_X_UNITS_REPLACE_PATTERN = f"*/{EVERY_X_UNITS_REPLACE_PLACEHOLDER}"

@staticmethod
def __get_expression_parts(expression: str) -> list:
parts = [part.strip() for part in expression.split(" ")]

# Unix cron expression have 5 parts, Quartz cron expression have 6 or 7 parts
if len(parts) in [5, 7]:
return parts
# Year is an optional part in Quartz cron expression, adding the extra element to mimic 7 part Quartz expression
if len(parts) == 6:
parts.append("*")
return parts

raise ValueError("Invalid cron expression!")

@staticmethod
def convert_interval_parts(part: str, is_quartz: bool = False) -> str:
every_x_units_pattern = (
CronHelper.QUARTZ_EVERY_X_UNITS_REGEX
if is_quartz
else CronHelper.UNIX_EVERY_X_UNITS_REGEX
)
self._quartz_parser = j_cron_parser(
j_cron_definition_builder.instanceDefinitionFor(j_cron_type.QUARTZ)
matches = every_x_units_pattern.match(part)
every_x_units_replace_pattern = (
CronHelper.QUARTZ_EVERY_X_UNITS_REPLACE_PATTERN
if is_quartz
else CronHelper.UNIX_EVERY_X_UNITS_REPLACE_PATTERN
)

def _get_unix_parser(self):
if self._unix_parser is None:
self._initialize_jvm()
return self._unix_parser
if matches:
return every_x_units_replace_pattern.replace(
CronHelper.EVERY_X_UNITS_REPLACE_PLACEHOLDER, matches.group(1)
)

def _get_quartz_parser(self):
if self._quartz_parser is None:
self._initialize_jvm()
return self._quartz_parser
return part

@functools.lru_cache(maxsize=128) # cron expression conversion will not change
def unix_to_quartz(self, unix_cron: str) -> str:
unix_parser = self._get_unix_parser()
quartz_expr = (
self._j_cron_mapper.fromUnixToQuartz()
.map(unix_parser.parse(unix_cron))
.asString()
)
log.info("Converted unix cron %s to quartz cron %s", unix_cron, quartz_expr)
return quartz_expr
parts = self.__get_expression_parts(expression=unix_cron)

if len(parts) != 5:
raise ValueError("Invalid Unix cron expression")

minute, hour, dom, month, dow = map(self.convert_interval_parts, parts)

# Converting Unix DOW to Quartz DOW
def shift_days(day: str) -> str:
"""
Quartz DOW starts from 1 (Sunday) while Unix DOW starts from 0 (Sunday)
"""
if "-" in day:
return "-".join([shift_days(day=d) for d in day.split("-")])

# Unix cron Sunday can be represented as 0 or 7, but only as 1 in Quartz cron
if day in ["0", "7"]:
return "1"
if day in ["SUN", "MON", "TUE", "WED", "THU", "FRI", "SAT"]:
return day
return str(int(day) + 1)

if "," in dow:
quartz_dow = ",".join([shift_days(day=day) for day in dow.split(",")])
elif dow == "*":
quartz_dow = dow
else:
quartz_dow = shift_days(day=dow)

quartz_dom = dom

if dom != "*" and dow == "*":
quartz_dow = "?"
elif dom == "*":
quartz_dom = "?"

quartz_cron = f"0 {minute} {hour} {quartz_dom} {month} {quartz_dow} *"
log.info("Converted unix cron %s to quartz cron %s", unix_cron, quartz_cron)
return quartz_cron

@functools.lru_cache(maxsize=128) # cron expression conversion will not change
def quartz_to_unix(self, quartz_cron: str) -> str:
quartz_parser = self._get_quartz_parser()
unix_expr = (
self._j_cron_mapper.fromQuartzToUnix()
.map(quartz_parser.parse(quartz_cron))
.asString()
parts = self.__get_expression_parts(expression=quartz_cron)

if len(parts) != 7:
raise ValueError("Invalid Quartz cron expression")

if "L" in quartz_cron or "W" in quartz_cron or "#" in quartz_cron:
raise ValueError("Support for 'L, W, #' in Quartz cron is not implemented")

# Unix cron expression does not support '?'
parts = [part.replace("?", "*") for part in parts]

_, minute, hour, dom, month, dow, _ = map(
lambda part: self.convert_interval_parts(part, True), parts
)
log.info("Converted quartz cron %s to unix cron %s", quartz_cron, unix_expr)
return unix_expr

# Converting Quartz DOW to Unix DOW
def shift_days(day: str) -> str:
"""
Quartz DOW starts from 1 (Sunday) while Unix DOW starts from 0 (Sunday)
"""
if "-" in day:
return "-".join([shift_days(day=d) for d in day.split("-")])
if day in ["SUN", "MON", "TUE", "WED", "THU", "FRI", "SAT"]:
return day

return str(int(day) - 1)

if "," in dow:
unix_dow = ",".join([shift_days(day=day) for day in dow.split(",")])
elif dow == "*":
unix_dow = "*"
else:
unix_dow = shift_days(day=dow)

unix_dom = dom

unix_cron = f"{minute} {hour} {unix_dom} {month} {unix_dow}"
log.info("Converted quartz cron %s to unix cron %s", quartz_cron, unix_cron)
return unix_cron


cron_helper = CronHelper()
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ targets:
- pypi:
package: cerberus-python-client==2.5.4
repo: null
- maven:
coordinates: com.cronutils:cron-utils:9.2.0
exclusions: null
repo: null
max_retries: null
min_retry_interval_millis: null
notebook_task:
Expand Down
6 changes: 3 additions & 3 deletions tests/engine/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ def test_get_brickflow_lib_version(self):
def test_get_brickflow_libraries(self):
settings = BrickflowProjectDeploymentSettings()
settings.brickflow_project_runtime_version = "1.0.0"
assert len(get_brickflow_libraries(enable_plugins=True)) == 7
assert len(get_brickflow_libraries(enable_plugins=True)) == 6
assert len(get_brickflow_libraries(enable_plugins=False)) == 1
lib = get_brickflow_libraries(enable_plugins=False)[0].dict
expected = {
Expand All @@ -465,7 +465,7 @@ def test_get_brickflow_libraries_semver_non_numeric(self):
settings = BrickflowProjectDeploymentSettings()
tag = "1.0.1rc1234"
settings.brickflow_project_runtime_version = tag
assert len(get_brickflow_libraries(enable_plugins=True)) == 7
assert len(get_brickflow_libraries(enable_plugins=True)) == 6
assert len(get_brickflow_libraries(enable_plugins=False)) == 1
lib = get_brickflow_libraries(enable_plugins=False)[0].dict
expected = {
Expand All @@ -481,7 +481,7 @@ def test_get_brickflow_libraries_non_semver(self):
settings = BrickflowProjectDeploymentSettings()
tag = "somebranch"
settings.brickflow_project_runtime_version = tag
assert len(get_brickflow_libraries(enable_plugins=True)) == 7
assert len(get_brickflow_libraries(enable_plugins=True)) == 6
assert len(get_brickflow_libraries(enable_plugins=False)) == 1
lib = get_brickflow_libraries(enable_plugins=False)[0].dict
expected = {
Expand Down
58 changes: 42 additions & 16 deletions tests/test_plugins.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import copy
from typing import List
from unittest import mock
from unittest.mock import patch

import pluggy
import pytest
Expand Down Expand Up @@ -45,21 +44,48 @@ def test_plugins_ensure_installation_import_error(self):
get_brickflow_tasks_hook(pm)
assert_plugin_manager(pm, ["default"])

def test_cron_import_nopy4j(self):
remove_cron_helper = {
"brickflow_plugins.airflow.cronhelper": None,
"py4j": None,
"py4j.protocol": None,
"py4j.java_gateway": None,
}
with patch.dict("sys.modules", remove_cron_helper):
with pytest.raises(ImportError):
import brickflow_plugins.airflow.cronhelper as cronhelper # noqa
@pytest.mark.parametrize(
"quartz_cron, expected_unix_cron",
[
("0 * * ? * * *", "* * * * *"),
("0 */5 * ? * * *", "*/5 * * * *"),
("0 30 * ? * * *", "30 * * * *"),
("0 0 12 ? * * *", "0 12 * * *"),
("0 0 12 ? * 2 *", "0 12 * * 1"),
("0 0 0 10 * ? *", "0 0 10 * *"),
("0 0 0 1 1 ? *", "0 0 1 1 *"),
("0 0/5 14,18 * * ?", "0/5 14,18 * * *"),
("0 0 12 ? * 1,2,5-7 *", "0 12 * * 0,1,4-6"),
("0 0 12 ? * SUN,MON,THU-SAT *", "0 12 * * SUN,MON,THU-SAT"),
],
)
def test_cron_conversion(self, quartz_cron, expected_unix_cron):
import brickflow_plugins.airflow.cronhelper as cronhelper # noqa

converted_unix_cron = cronhelper.cron_helper.quartz_to_unix(quartz_cron)
converted_quartz_cron = cronhelper.cron_helper.unix_to_quartz(
converted_unix_cron
)
converted_unix_cron_second = cronhelper.cron_helper.quartz_to_unix(
converted_quartz_cron
)

def test_cron_conversion(self):
assert (
converted_unix_cron == converted_unix_cron_second
), "cron conversion should be idempotent"
assert converted_unix_cron == expected_unix_cron

@pytest.mark.parametrize(
"quartz_cron",
[
"0 0 12 ? * L *",
"0 0 12 ? * 1L *",
"0 0 12 ? * 1W *",
"0 0 12 ? * 1#5 *",
],
)
def test_unsupported_cron_expressions(self, quartz_cron):
import brickflow_plugins.airflow.cronhelper as cronhelper # noqa

unix_cron = cronhelper.cron_helper.quartz_to_unix("0 0 12 * * ?")
quartz_cron = cronhelper.cron_helper.unix_to_quartz(unix_cron)
unix_cron_second = cronhelper.cron_helper.quartz_to_unix(quartz_cron)
assert unix_cron == unix_cron_second, "cron conversion should be idempotent"
with pytest.raises(ValueError):
cronhelper.cron_helper.quartz_to_unix(quartz_cron)