From 6d36883064f08b37ea7e979c877deb4fe0084a55 Mon Sep 17 00:00:00 2001 From: Honnix Date: Sun, 4 Oct 2020 20:27:19 +0200 Subject: [PATCH] Cron schedule with offset (#188) * Extend `CronSchedule` to support `schedule` and `offset` added in https://github.com/lyft/flyteidl/pull/83 Co-authored-by: tnsetting --- flytekit/Makefile | 4 +- flytekit/dev-requirements.txt | 11 +-- flytekit/flytekit/__init__.py | 2 +- flytekit/flytekit/common/launch_plan.py | 2 + flytekit/flytekit/common/schedules.py | 78 ++++++++++++++++-- flytekit/flytekit/models/schedule.py | 54 ++++++++++++- flytekit/requirements.txt | 24 +++--- flytekit/setup.py | 2 +- .../unit/common_tests/test_launch_plan.py | 24 ++++-- .../unit/common_tests/test_schedules.py | 79 +++++++++++++++++++ .../flytekit/unit/models/test_schedule.py | 22 +++++- 11 files changed, 267 insertions(+), 35 deletions(-) diff --git a/flytekit/Makefile b/flytekit/Makefile index 14b045bdf..c9971f0b0 100644 --- a/flytekit/Makefile +++ b/flytekit/Makefile @@ -37,11 +37,11 @@ test: lint ## Run tests shellcheck **/*.sh requirements.txt: export CUSTOM_COMPILE_COMMAND := make requirements.txt -requirements.txt: requirements.in _install-piptools +requirements.txt: requirements.in install-piptools $(call PIP_COMPILE,requirements.in) dev-requirements.txt: export CUSTOM_COMPILE_COMMAND := make dev-requirements.txt -dev-requirements.txt: dev-requirements.in requirements.txt _install-piptools +dev-requirements.txt: dev-requirements.in requirements.txt install-piptools $(call PIP_COMPILE,dev-requirements.in) .PHONY: requirements diff --git a/flytekit/dev-requirements.txt b/flytekit/dev-requirements.txt index 69fef405f..7959318f4 100644 --- a/flytekit/dev-requirements.txt +++ b/flytekit/dev-requirements.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile # To update, run: # -# pip-compile dev-requirements.in +# make dev-requirements.txt # appdirs==1.4.4 # via -c requirements.txt, black attrs==20.2.0 # via -c requirements.txt, black, pytest @@ -11,13 +11,11 @@ click==7.1.2 # via -c requirements.txt, black coverage==5.3 # via -r dev-requirements.in flake8-black==0.2.1 # via -r dev-requirements.in flake8-isort==4.0.0 # via -r dev-requirements.in -flake8==3.8.3 # via -r dev-requirements.in, flake8-black, flake8-isort -importlib-metadata==1.7.0 # via -c requirements.txt, flake8, pluggy, pytest +flake8==3.8.4 # via -r dev-requirements.in, flake8-black, flake8-isort iniconfig==1.0.1 # via pytest -isort==5.5.3 # via -r dev-requirements.in, flake8-isort +isort==5.5.4 # via -r dev-requirements.in, flake8-isort mccabe==0.6.1 # via flake8 mock==4.0.2 # via -r dev-requirements.in -more-itertools==8.5.0 # via pytest packaging==20.4 # via pytest pathspec==0.8.0 # via -c requirements.txt, black pluggy==0.13.1 # via pytest @@ -25,10 +23,9 @@ py==1.9.0 # via pytest pycodestyle==2.6.0 # via flake8 pyflakes==2.2.0 # via flake8 pyparsing==2.4.7 # via packaging -pytest==6.0.2 # via -r dev-requirements.in +pytest==6.1.1 # via -r dev-requirements.in regex==2020.9.27 # via -c requirements.txt, black six==1.15.0 # via -c requirements.txt, packaging testfixtures==6.14.2 # via flake8-isort toml==0.10.1 # via -c requirements.txt, black, pytest typed-ast==1.4.1 # via -c requirements.txt, black -zipp==3.1.0 # via -c requirements.txt, importlib-metadata diff --git a/flytekit/flytekit/__init__.py b/flytekit/flytekit/__init__.py index ea33af850..c2864bbeb 100644 --- a/flytekit/flytekit/__init__.py +++ b/flytekit/flytekit/__init__.py @@ -2,7 +2,7 @@ import flytekit.plugins # noqa: F401 -__version__ = "0.13.0b11" +__version__ = "0.13.0b12" logger = _logging.getLogger("flytekit") diff --git a/flytekit/flytekit/common/launch_plan.py b/flytekit/flytekit/common/launch_plan.py index f720e5c38..f8e8d1c2d 100644 --- a/flytekit/flytekit/common/launch_plan.py +++ b/flytekit/flytekit/common/launch_plan.py @@ -148,6 +148,8 @@ def is_scheduled(self): return True elif self.entity_metadata.schedule.rate and self.entity_metadata.schedule.rate.value: return True + elif self.entity_metadata.schedule.cron_schedule and self.entity_metadata.schedule.cron_schedule.schedule: + return True else: return False diff --git a/flytekit/flytekit/common/schedules.py b/flytekit/flytekit/common/schedules.py index 82af7deac..250b7aff3 100644 --- a/flytekit/flytekit/common/schedules.py +++ b/flytekit/flytekit/common/schedules.py @@ -1,4 +1,5 @@ import datetime as _datetime +import re as _re import croniter as _croniter @@ -18,13 +19,56 @@ def from_flyte_idl(cls, proto): class CronSchedule(_ExtendedSchedule, metaclass=_sdk_bases.ExtendedSdkType): - def __init__(self, cron_expression, kickoff_time_input_arg=None): + _VALID_CRON_ALIASES = [ + "hourly", + "hours", + "@hourly", + "daily", + "days", + "@daily", + "weekly", + "weeks", + "@weekly", + "monthly", + "months", + "@monthly", + "annually", + "@annually", + "yearly", + "years", + "@yearly", + ] + + # Not a perfect regex but good enough and simple to reason about + _OFFSET_PATTERN = _re.compile("([-+]?)P([-+0-9YMWD]+)?(T([-+0-9HMS.,]+)?)?") + + def __init__(self, cron_expression=None, schedule=None, offset=None, kickoff_time_input_arg=None): """ :param Text cron_expression: + :param Text schedule: + :param Text offset: :param Text kickoff_time_input_arg: """ - CronSchedule._validate_expression(cron_expression) - super(CronSchedule, self).__init__(kickoff_time_input_arg, cron_expression=cron_expression) + if cron_expression is None and schedule is None: + raise _user_exceptions.FlyteAssertion("Either `cron_expression` or `schedule` should be specified.") + + if cron_expression is not None and offset is not None: + raise _user_exceptions.FlyteAssertion("Only `schedule` is supported when specifying `offset`.") + + if cron_expression is not None: + CronSchedule._validate_expression(cron_expression) + + if schedule is not None: + CronSchedule._validate_schedule(schedule) + + if offset is not None: + CronSchedule._validate_offset(offset) + + super(CronSchedule, self).__init__( + kickoff_time_input_arg, + cron_expression=cron_expression, + cron_schedule=_schedule_models.Schedule.CronSchedule(schedule, offset) if schedule is not None else None, + ) @staticmethod def _validate_expression(cron_expression): @@ -40,7 +84,8 @@ def _validate_expression(cron_expression): if len(tokens) != 6: raise _user_exceptions.FlyteAssertion( "Cron expression is invalid. A cron expression must have 6 fields. Cron expressions are in the " - "format of: `minute hour day-of-month month day-of-week year`. Received: `{}`".format(cron_expression) + "format of: `minute hour day-of-month month day-of-week year`. " + "Use `schedule` for 5 fields cron expression. Received: `{}`".format(cron_expression) ) if tokens[2] != "?" and tokens[4] != "?": @@ -62,13 +107,36 @@ def _validate_expression(cron_expression): " Provided cron expr: {}".format(cron_expression) ) + @staticmethod + def _validate_schedule(schedule): + if schedule.lower() not in CronSchedule._VALID_CRON_ALIASES: + try: + _croniter.croniter(schedule) + except Exception: + raise _user_exceptions.FlyteAssertion( + "Schedule is invalid. It must be set to either a cron alias or valid cron expression." + " Provided schedule: {}".format(schedule) + ) + + @staticmethod + def _validate_offset(offset): + if CronSchedule._OFFSET_PATTERN.fullmatch(offset) is None: + raise _user_exceptions.FlyteAssertion( + "Offset is invalid. It must be an ISO 8601 duration. Provided offset: {}".format(offset) + ) + @classmethod def promote_from_model(cls, base_model): """ :param flytekit.models.schedule.Schedule base_model: :rtype: CronSchedule """ - return cls(base_model.cron_expression, kickoff_time_input_arg=base_model.kickoff_time_input_arg,) + return cls( + cron_expression=base_model.cron_expression, + schedule=base_model.cron_schedule.schedule if base_model.cron_schedule is not None else None, + offset=base_model.cron_schedule.offset if base_model.cron_schedule is not None else None, + kickoff_time_input_arg=base_model.kickoff_time_input_arg, + ) class FixedRate(_ExtendedSchedule, metaclass=_sdk_bases.ExtendedSdkType): diff --git a/flytekit/flytekit/models/schedule.py b/flytekit/flytekit/models/schedule.py index f49fbda18..5a06cc010 100644 --- a/flytekit/flytekit/models/schedule.py +++ b/flytekit/flytekit/models/schedule.py @@ -61,17 +61,56 @@ def from_flyte_idl(cls, pb2_object): """ return cls(pb2_object.value, pb2_object.unit) - def __init__(self, kickoff_time_input_arg, cron_expression=None, rate=None): + class CronSchedule(_common.FlyteIdlEntity): + def __init__(self, schedule, offset): + """ + :param Text schedule: cron expression or aliases + :param Text offset: ISO_8601 Duration + """ + self._schedule = schedule + self._offset = offset + + @property + def schedule(self): + """ + :rtype: Text + """ + return self._schedule + + @property + def offset(self): + """ + :rtype: Text + """ + return self._offset + + def to_flyte_idl(self): + """ + :rtype: flyteidl.admin.schedule_pb2.FixedRate + """ + return _schedule_pb2.CronSchedule(schedule=self.schedule, offset=self.offset) + + @classmethod + def from_flyte_idl(cls, pb2_object): + """ + :param flyteidl.admin.schedule_pb2.CronSchedule pb2_object: + :rtype: Schedule.CronSchedule + """ + return cls(pb2_object.schedule or None, pb2_object.offset or None) + + def __init__(self, kickoff_time_input_arg, cron_expression=None, rate=None, cron_schedule=None): """ One of cron_expression or fixed rate must be specified. :param Text kickoff_time_input_arg: :param Text cron_expression: [Optional] :param Schedule.FixedRate rate: [Optional] + :param Schedule.CronSchedule cron_schedule: [Optional] """ self._kickoff_time_input_arg = kickoff_time_input_arg self._cron_expression = cron_expression self._rate = rate + self._cron_schedule = cron_schedule @property def kickoff_time_input_arg(self): @@ -91,9 +130,16 @@ def rate(self): """ return self._rate + @property + def cron_schedule(self): + """ + :rtype: Schedule.CronSchedule + """ + return self._cron_schedule + @property def schedule_expression(self): - return self.cron_expression or self.rate + return self.cron_expression or self.rate or self.cron_schedule def to_flyte_idl(self): """ @@ -103,6 +149,7 @@ def to_flyte_idl(self): kickoff_time_input_arg=self.kickoff_time_input_arg, cron_expression=self.cron_expression, rate=self.rate.to_flyte_idl() if self.rate is not None else None, + cron_schedule=self.cron_schedule.to_flyte_idl() if self.cron_schedule is not None else None, ) @classmethod @@ -115,4 +162,7 @@ def from_flyte_idl(cls, pb2_object): pb2_object.kickoff_time_input_arg, cron_expression=pb2_object.cron_expression if pb2_object.HasField("cron_expression") else None, rate=Schedule.FixedRate.from_flyte_idl(pb2_object.rate) if pb2_object.HasField("rate") else None, + cron_schedule=Schedule.CronSchedule.from_flyte_idl(pb2_object.cron_schedule) + if pb2_object.HasField("cron_schedule") + else None, ) diff --git a/flytekit/requirements.txt b/flytekit/requirements.txt index 1f7cc486a..9f47dcdda 100644 --- a/flytekit/requirements.txt +++ b/flytekit/requirements.txt @@ -2,35 +2,36 @@ # This file is autogenerated by pip-compile # To update, run: # -# pip-compile requirements.in +# make requirements.txt # -e file:.#egg=flytekit # via -r requirements.in ansiwrap==0.8.4 # via papermill appdirs==1.4.4 # via black -appnope==0.1.0 # via ipykernel, ipython async-generator==1.10 # via nbclient attrs==20.2.0 # via black, jsonschema backcall==0.2.0 # via ipython black==19.10b0 # via flytekit, papermill -boto3==1.15.2 # via flytekit -botocore==1.18.2 # via boto3, s3transfer +boto3==1.15.11 # via flytekit +botocore==1.18.11 # via boto3, s3transfer certifi==2020.6.20 # via requests +cffi==1.14.3 # via cryptography chardet==3.0.4 # via requests click==7.1.2 # via black, flytekit, hmsclient, papermill croniter==0.3.34 # via flytekit +cryptography==3.1.1 # via secretstorage decorator==4.4.2 # via ipython deprecated==1.2.10 # via flytekit entrypoints==0.3 # via papermill -flyteidl==0.18.6 # via flytekit +flyteidl==0.18.8 # via flytekit future==0.18.2 # via torch grpcio==1.32.0 # via flytekit hmsclient==0.1.1 # via flytekit idna==2.10 # via requests -importlib-metadata==1.7.0 # via jsonschema, keyring ipykernel==5.3.4 # via flytekit ipython-genutils==0.2.0 # via nbformat, traitlets ipython==7.18.1 # via ipykernel jedi==0.17.2 # via ipython +jeepney==0.4.3 # via keyring, secretstorage jmespath==0.10.0 # via boto3, botocore jsonschema==3.2.0 # via nbformat jupyter-client==6.1.7 # via ipykernel, nbclient @@ -40,10 +41,10 @@ keyring==21.4.0 # via flytekit natsort==7.0.1 # via croniter nbclient==0.5.0 # via papermill nbformat==5.0.7 # via nbclient, papermill -nest-asyncio==1.4.0 # via nbclient +nest-asyncio==1.4.1 # via nbclient numpy==1.19.2 # via flytekit, pandas, pyarrow, torch pandas==1.1.2 # via flytekit -papermill==2.1.3 # via flytekit +papermill==2.2.0 # via flytekit parso==0.7.1 # via jedi pathspec==0.8.0 # via black pexpect==4.8.0 # via ipython @@ -53,6 +54,7 @@ protobuf==3.13.0 # via flyteidl, flytekit, k8s-proto ptyprocess==0.6.0 # via pexpect py4j==0.10.7 # via pyspark pyarrow==0.17.1 # via flytekit +pycparser==2.20 # via cffi pygments==2.7.1 # via ipython pyrsistent==0.17.3 # via jsonschema pyspark==2.4.7 # via flytekit @@ -65,7 +67,8 @@ regex==2020.9.27 # via black requests==2.24.0 # via flytekit, papermill, responses responses==0.12.0 # via flytekit s3transfer==0.3.3 # via boto3 -six==1.15.0 # via flytekit, grpcio, jsonschema, protobuf, python-dateutil, responses, tenacity, thrift +secretstorage==3.1.2 # via keyring +six==1.15.0 # via cryptography, flytekit, grpcio, jsonschema, protobuf, python-dateutil, responses, tenacity, thrift sortedcontainers==2.2.2 # via flytekit statsd==3.3.0 # via flytekit tenacity==6.2.0 # via papermill @@ -74,13 +77,12 @@ thrift==0.13.0 # via hmsclient toml==0.10.1 # via black torch==1.6.0 # via flytekit tornado==6.0.4 # via ipykernel, jupyter-client -tqdm==4.49.0 # via papermill +tqdm==4.50.0 # via papermill traitlets==5.0.4 # via ipykernel, ipython, jupyter-client, jupyter-core, nbclient, nbformat typed-ast==1.4.1 # via black urllib3==1.25.10 # via botocore, flytekit, requests, responses wcwidth==0.2.5 # via prompt-toolkit wrapt==1.12.1 # via deprecated, flytekit -zipp==3.1.0 # via importlib-metadata # The following packages are considered to be unsafe in a requirements file: # setuptools diff --git a/flytekit/setup.py b/flytekit/setup.py index b41f639fc..f045990df 100644 --- a/flytekit/setup.py +++ b/flytekit/setup.py @@ -23,7 +23,7 @@ ] }, install_requires=[ - "flyteidl>=0.18.6,<1.0.0", + "flyteidl>=0.18.8,<1.0.0", "click>=6.6,<8.0", "croniter>=0.3.20,<4.0.0", "deprecated>=1.0,<2.0", diff --git a/flytekit/tests/flytekit/unit/common_tests/test_launch_plan.py b/flytekit/tests/flytekit/unit/common_tests/test_launch_plan.py index ffef6da28..663e343f8 100644 --- a/flytekit/tests/flytekit/unit/common_tests/test_launch_plan.py +++ b/flytekit/tests/flytekit/unit/common_tests/test_launch_plan.py @@ -8,6 +8,7 @@ from flytekit.common import schedules as _schedules from flytekit.common.exceptions import user as _user_exceptions from flytekit.models import common as _common_models +from flytekit.models import schedule as _schedule from flytekit.models import types as _type_models from flytekit.models.core import execution as _execution from flytekit.models.core import identifier as _identifier @@ -127,7 +128,21 @@ def test_no_additional_inputs(): assert lp.default_inputs.parameters["required_input"].required is True -def test_schedule(): +@_pytest.mark.parametrize( + "schedule,cron_expression,cron_schedule", + [ + (_schedules.CronSchedule("* * ? * * *"), "* * ? * * *", None), + (_schedules.CronSchedule(cron_expression="* * ? * * *"), "* * ? * * *", None), + (_schedules.CronSchedule(cron_expression="0/15 * * * ? *"), "0/15 * * * ? *", None), + (_schedules.CronSchedule(schedule="* * * * *"), None, _schedule.Schedule.CronSchedule("* * * * *", None)), + ( + _schedules.CronSchedule(schedule="* * * * *", offset="P1D"), + None, + _schedule.Schedule.CronSchedule("* * * * *", "P1D"), + ), + ], +) +def test_schedule(schedule, cron_expression, cron_schedule): workflow_to_test = _workflow.workflow( {}, inputs={ @@ -135,11 +150,10 @@ def test_schedule(): "default_input": _workflow.Input(_types.Types.Integer, default=5), }, ) - lp = workflow_to_test.create_launch_plan( - fixed_inputs={"required_input": 5}, schedule=_schedules.CronSchedule("* * ? * * *"), role="what", - ) + lp = workflow_to_test.create_launch_plan(fixed_inputs={"required_input": 5}, schedule=schedule, role="what",) assert lp.entity_metadata.schedule.kickoff_time_input_arg is None - assert lp.entity_metadata.schedule.cron_expression == "* * ? * * *" + assert lp.entity_metadata.schedule.cron_expression == cron_expression + assert lp.entity_metadata.schedule.cron_schedule == cron_schedule assert lp.is_scheduled diff --git a/flytekit/tests/flytekit/unit/common_tests/test_schedules.py b/flytekit/tests/flytekit/unit/common_tests/test_schedules.py index 14d920350..0009b424d 100644 --- a/flytekit/tests/flytekit/unit/common_tests/test_schedules.py +++ b/flytekit/tests/flytekit/unit/common_tests/test_schedules.py @@ -13,6 +13,13 @@ def test_cron(): assert obj == _schedules.CronSchedule.from_flyte_idl(obj.to_flyte_idl()) +def test_cron_karg(): + obj = _schedules.CronSchedule(cron_expression="* * ? * * *", kickoff_time_input_arg="abc") + assert obj.kickoff_time_input_arg == "abc" + assert obj.cron_expression == "* * ? * * *" + assert obj == _schedules.CronSchedule.from_flyte_idl(obj.to_flyte_idl()) + + def test_cron_validation(): with _pytest.raises(_user_exceptions.FlyteAssertion): _schedules.CronSchedule("* * * * * *", kickoff_time_input_arg="abc") @@ -49,3 +56,75 @@ def test_fixed_rate_bad_duration(): def test_fixed_rate_negative_duration(): pass + + +@_pytest.mark.parametrize( + "schedule", + [ + "hourly", + "hours", + "HOURS", + "@hourly", + "daily", + "days", + "DAYS", + "@daily", + "weekly", + "weeks", + "WEEKS", + "@weekly", + "monthly", + "months", + "MONTHS", + "@monthly", + "annually", + "@annually", + "yearly", + "years", + "YEARS", + "@yearly", + "* * * * *", + ], +) +def test_cron_schedule_schedule_validation(schedule): + obj = _schedules.CronSchedule(schedule=schedule, kickoff_time_input_arg="abc") + assert obj.cron_schedule.schedule == schedule + + +@_pytest.mark.parametrize( + "schedule", ["foo", "* *"], +) +def test_cron_schedule_schedule_validation_invalid(schedule): + with _pytest.raises(_user_exceptions.FlyteAssertion): + _schedules.CronSchedule(schedule=schedule, kickoff_time_input_arg="abc") + + +def test_cron_schedule_offset_validation_invalid(): + with _pytest.raises(_user_exceptions.FlyteAssertion): + _schedules.CronSchedule(schedule="days", offset="foo", kickoff_time_input_arg="abc") + + +def test_cron_schedule(): + obj = _schedules.CronSchedule(schedule="days", kickoff_time_input_arg="abc") + assert obj.cron_schedule.schedule == "days" + assert obj.cron_schedule.offset is None + assert obj == _schedules.CronSchedule.from_flyte_idl(obj.to_flyte_idl()) + + +def test_cron_schedule_offset(): + obj = _schedules.CronSchedule(schedule="days", offset="P1D", kickoff_time_input_arg="abc") + assert obj.cron_schedule.schedule == "days" + assert obj.cron_schedule.offset == "P1D" + assert obj == _schedules.CronSchedule.from_flyte_idl(obj.to_flyte_idl()) + + +def test_both_cron_expression_and_cron_schedule_schedule(): + with _pytest.raises(_user_exceptions.FlyteAssertion): + _schedules.CronSchedule( + cron_expression="* * ? * * *", schedule="days", offset="foo", kickoff_time_input_arg="abc" + ) + + +def test_cron_expression_and_cron_schedule_offset(): + with _pytest.raises(_user_exceptions.FlyteAssertion): + _schedules.CronSchedule(cron_expression="* * ? * * *", offset="foo", kickoff_time_input_arg="abc") diff --git a/flytekit/tests/flytekit/unit/models/test_schedule.py b/flytekit/tests/flytekit/unit/models/test_schedule.py index ac405e4a7..8bade49fc 100644 --- a/flytekit/tests/flytekit/unit/models/test_schedule.py +++ b/flytekit/tests/flytekit/unit/models/test_schedule.py @@ -1,7 +1,9 @@ +import pytest as _pytest + from flytekit.models import schedule as _schedule -def test_schedule(): +def test_schedule_cron_expression(): obj = _schedule.Schedule(kickoff_time_input_arg="fdsa", cron_expression="1 2 3 4 5 6") assert obj.rate is None assert obj.cron_expression == "1 2 3 4 5 6" @@ -32,3 +34,21 @@ def test_schedule_fixed_rate(): assert obj2.kickoff_time_input_arg == "fdsa" assert obj2.rate == fr assert obj2.schedule_expression == fr + + +@_pytest.mark.parametrize( + "offset", [None, "P1D"], +) +def test_schedule_cron_schedule(offset): + cs = _schedule.Schedule.CronSchedule("days", offset) + obj = _schedule.Schedule(cron_schedule=cs, kickoff_time_input_arg="fdsa") + assert obj.cron_schedule.schedule == "days" + assert obj.schedule_expression == cs + assert obj.rate is None + assert obj.cron_expression is None + + obj2 = _schedule.Schedule.from_flyte_idl(obj.to_flyte_idl()) + assert obj == obj2 + assert obj2.schedule_expression == cs + assert obj.rate is None + assert obj.cron_expression is None