Skip to content
This repository has been archived by the owner on Dec 20, 2023. It is now read-only.

Commit

Permalink
Cron schedule with offset (#188)
Browse files Browse the repository at this point in the history
* Extend `CronSchedule` to support `schedule` and `offset` added in flyteorg/flyteidl#83

Co-authored-by: tnsetting <[email protected]>
  • Loading branch information
honnix and tnsetting authored Oct 4, 2020
1 parent 7e7a78f commit 6d36883
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 35 deletions.
4 changes: 2 additions & 2 deletions flytekit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ test: lint ## Run tests
shellcheck **/*.sh

requirements.txt: export CUSTOM_COMPILE_COMMAND := make requirements.txt
requirements.txt: requirements.in _install-piptools
requirements.txt: requirements.in install-piptools
$(call PIP_COMPILE,requirements.in)

dev-requirements.txt: export CUSTOM_COMPILE_COMMAND := make dev-requirements.txt
dev-requirements.txt: dev-requirements.in requirements.txt _install-piptools
dev-requirements.txt: dev-requirements.in requirements.txt install-piptools
$(call PIP_COMPILE,dev-requirements.in)

.PHONY: requirements
Expand Down
11 changes: 4 additions & 7 deletions flytekit/dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# This file is autogenerated by pip-compile
# To update, run:
#
# pip-compile dev-requirements.in
# make dev-requirements.txt
#
appdirs==1.4.4 # via -c requirements.txt, black
attrs==20.2.0 # via -c requirements.txt, black, pytest
Expand All @@ -11,24 +11,21 @@ click==7.1.2 # via -c requirements.txt, black
coverage==5.3 # via -r dev-requirements.in
flake8-black==0.2.1 # via -r dev-requirements.in
flake8-isort==4.0.0 # via -r dev-requirements.in
flake8==3.8.3 # via -r dev-requirements.in, flake8-black, flake8-isort
importlib-metadata==1.7.0 # via -c requirements.txt, flake8, pluggy, pytest
flake8==3.8.4 # via -r dev-requirements.in, flake8-black, flake8-isort
iniconfig==1.0.1 # via pytest
isort==5.5.3 # via -r dev-requirements.in, flake8-isort
isort==5.5.4 # via -r dev-requirements.in, flake8-isort
mccabe==0.6.1 # via flake8
mock==4.0.2 # via -r dev-requirements.in
more-itertools==8.5.0 # via pytest
packaging==20.4 # via pytest
pathspec==0.8.0 # via -c requirements.txt, black
pluggy==0.13.1 # via pytest
py==1.9.0 # via pytest
pycodestyle==2.6.0 # via flake8
pyflakes==2.2.0 # via flake8
pyparsing==2.4.7 # via packaging
pytest==6.0.2 # via -r dev-requirements.in
pytest==6.1.1 # via -r dev-requirements.in
regex==2020.9.27 # via -c requirements.txt, black
six==1.15.0 # via -c requirements.txt, packaging
testfixtures==6.14.2 # via flake8-isort
toml==0.10.1 # via -c requirements.txt, black, pytest
typed-ast==1.4.1 # via -c requirements.txt, black
zipp==3.1.0 # via -c requirements.txt, importlib-metadata
2 changes: 1 addition & 1 deletion flytekit/flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import flytekit.plugins # noqa: F401

__version__ = "0.13.0b11"
__version__ = "0.13.0b12"

logger = _logging.getLogger("flytekit")

Expand Down
2 changes: 2 additions & 0 deletions flytekit/flytekit/common/launch_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ def is_scheduled(self):
return True
elif self.entity_metadata.schedule.rate and self.entity_metadata.schedule.rate.value:
return True
elif self.entity_metadata.schedule.cron_schedule and self.entity_metadata.schedule.cron_schedule.schedule:
return True
else:
return False

Expand Down
78 changes: 73 additions & 5 deletions flytekit/flytekit/common/schedules.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime as _datetime
import re as _re

import croniter as _croniter

Expand All @@ -18,13 +19,56 @@ def from_flyte_idl(cls, proto):


class CronSchedule(_ExtendedSchedule, metaclass=_sdk_bases.ExtendedSdkType):
def __init__(self, cron_expression, kickoff_time_input_arg=None):
_VALID_CRON_ALIASES = [
"hourly",
"hours",
"@hourly",
"daily",
"days",
"@daily",
"weekly",
"weeks",
"@weekly",
"monthly",
"months",
"@monthly",
"annually",
"@annually",
"yearly",
"years",
"@yearly",
]

# Not a perfect regex but good enough and simple to reason about
_OFFSET_PATTERN = _re.compile("([-+]?)P([-+0-9YMWD]+)?(T([-+0-9HMS.,]+)?)?")

def __init__(self, cron_expression=None, schedule=None, offset=None, kickoff_time_input_arg=None):
"""
:param Text cron_expression:
:param Text schedule:
:param Text offset:
:param Text kickoff_time_input_arg:
"""
CronSchedule._validate_expression(cron_expression)
super(CronSchedule, self).__init__(kickoff_time_input_arg, cron_expression=cron_expression)
if cron_expression is None and schedule is None:
raise _user_exceptions.FlyteAssertion("Either `cron_expression` or `schedule` should be specified.")

if cron_expression is not None and offset is not None:
raise _user_exceptions.FlyteAssertion("Only `schedule` is supported when specifying `offset`.")

if cron_expression is not None:
CronSchedule._validate_expression(cron_expression)

if schedule is not None:
CronSchedule._validate_schedule(schedule)

if offset is not None:
CronSchedule._validate_offset(offset)

super(CronSchedule, self).__init__(
kickoff_time_input_arg,
cron_expression=cron_expression,
cron_schedule=_schedule_models.Schedule.CronSchedule(schedule, offset) if schedule is not None else None,
)

@staticmethod
def _validate_expression(cron_expression):
Expand All @@ -40,7 +84,8 @@ def _validate_expression(cron_expression):
if len(tokens) != 6:
raise _user_exceptions.FlyteAssertion(
"Cron expression is invalid. A cron expression must have 6 fields. Cron expressions are in the "
"format of: `minute hour day-of-month month day-of-week year`. Received: `{}`".format(cron_expression)
"format of: `minute hour day-of-month month day-of-week year`. "
"Use `schedule` for 5 fields cron expression. Received: `{}`".format(cron_expression)
)

if tokens[2] != "?" and tokens[4] != "?":
Expand All @@ -62,13 +107,36 @@ def _validate_expression(cron_expression):
" Provided cron expr: {}".format(cron_expression)
)

@staticmethod
def _validate_schedule(schedule):
if schedule.lower() not in CronSchedule._VALID_CRON_ALIASES:
try:
_croniter.croniter(schedule)
except Exception:
raise _user_exceptions.FlyteAssertion(
"Schedule is invalid. It must be set to either a cron alias or valid cron expression."
" Provided schedule: {}".format(schedule)
)

@staticmethod
def _validate_offset(offset):
if CronSchedule._OFFSET_PATTERN.fullmatch(offset) is None:
raise _user_exceptions.FlyteAssertion(
"Offset is invalid. It must be an ISO 8601 duration. Provided offset: {}".format(offset)
)

@classmethod
def promote_from_model(cls, base_model):
"""
:param flytekit.models.schedule.Schedule base_model:
:rtype: CronSchedule
"""
return cls(base_model.cron_expression, kickoff_time_input_arg=base_model.kickoff_time_input_arg,)
return cls(
cron_expression=base_model.cron_expression,
schedule=base_model.cron_schedule.schedule if base_model.cron_schedule is not None else None,
offset=base_model.cron_schedule.offset if base_model.cron_schedule is not None else None,
kickoff_time_input_arg=base_model.kickoff_time_input_arg,
)


class FixedRate(_ExtendedSchedule, metaclass=_sdk_bases.ExtendedSdkType):
Expand Down
54 changes: 52 additions & 2 deletions flytekit/flytekit/models/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,56 @@ def from_flyte_idl(cls, pb2_object):
"""
return cls(pb2_object.value, pb2_object.unit)

def __init__(self, kickoff_time_input_arg, cron_expression=None, rate=None):
class CronSchedule(_common.FlyteIdlEntity):
def __init__(self, schedule, offset):
"""
:param Text schedule: cron expression or aliases
:param Text offset: ISO_8601 Duration
"""
self._schedule = schedule
self._offset = offset

@property
def schedule(self):
"""
:rtype: Text
"""
return self._schedule

@property
def offset(self):
"""
:rtype: Text
"""
return self._offset

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.schedule_pb2.FixedRate
"""
return _schedule_pb2.CronSchedule(schedule=self.schedule, offset=self.offset)

@classmethod
def from_flyte_idl(cls, pb2_object):
"""
:param flyteidl.admin.schedule_pb2.CronSchedule pb2_object:
:rtype: Schedule.CronSchedule
"""
return cls(pb2_object.schedule or None, pb2_object.offset or None)

def __init__(self, kickoff_time_input_arg, cron_expression=None, rate=None, cron_schedule=None):
"""
One of cron_expression or fixed rate must be specified.
:param Text kickoff_time_input_arg:
:param Text cron_expression: [Optional]
:param Schedule.FixedRate rate: [Optional]
:param Schedule.CronSchedule cron_schedule: [Optional]
"""
self._kickoff_time_input_arg = kickoff_time_input_arg
self._cron_expression = cron_expression
self._rate = rate
self._cron_schedule = cron_schedule

@property
def kickoff_time_input_arg(self):
Expand All @@ -91,9 +130,16 @@ def rate(self):
"""
return self._rate

@property
def cron_schedule(self):
"""
:rtype: Schedule.CronSchedule
"""
return self._cron_schedule

@property
def schedule_expression(self):
return self.cron_expression or self.rate
return self.cron_expression or self.rate or self.cron_schedule

def to_flyte_idl(self):
"""
Expand All @@ -103,6 +149,7 @@ def to_flyte_idl(self):
kickoff_time_input_arg=self.kickoff_time_input_arg,
cron_expression=self.cron_expression,
rate=self.rate.to_flyte_idl() if self.rate is not None else None,
cron_schedule=self.cron_schedule.to_flyte_idl() if self.cron_schedule is not None else None,
)

@classmethod
Expand All @@ -115,4 +162,7 @@ def from_flyte_idl(cls, pb2_object):
pb2_object.kickoff_time_input_arg,
cron_expression=pb2_object.cron_expression if pb2_object.HasField("cron_expression") else None,
rate=Schedule.FixedRate.from_flyte_idl(pb2_object.rate) if pb2_object.HasField("rate") else None,
cron_schedule=Schedule.CronSchedule.from_flyte_idl(pb2_object.cron_schedule)
if pb2_object.HasField("cron_schedule")
else None,
)
24 changes: 13 additions & 11 deletions flytekit/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,36 @@
# This file is autogenerated by pip-compile
# To update, run:
#
# pip-compile requirements.in
# make requirements.txt
#
-e file:.#egg=flytekit # via -r requirements.in
ansiwrap==0.8.4 # via papermill
appdirs==1.4.4 # via black
appnope==0.1.0 # via ipykernel, ipython
async-generator==1.10 # via nbclient
attrs==20.2.0 # via black, jsonschema
backcall==0.2.0 # via ipython
black==19.10b0 # via flytekit, papermill
boto3==1.15.2 # via flytekit
botocore==1.18.2 # via boto3, s3transfer
boto3==1.15.11 # via flytekit
botocore==1.18.11 # via boto3, s3transfer
certifi==2020.6.20 # via requests
cffi==1.14.3 # via cryptography
chardet==3.0.4 # via requests
click==7.1.2 # via black, flytekit, hmsclient, papermill
croniter==0.3.34 # via flytekit
cryptography==3.1.1 # via secretstorage
decorator==4.4.2 # via ipython
deprecated==1.2.10 # via flytekit
entrypoints==0.3 # via papermill
flyteidl==0.18.6 # via flytekit
flyteidl==0.18.8 # via flytekit
future==0.18.2 # via torch
grpcio==1.32.0 # via flytekit
hmsclient==0.1.1 # via flytekit
idna==2.10 # via requests
importlib-metadata==1.7.0 # via jsonschema, keyring
ipykernel==5.3.4 # via flytekit
ipython-genutils==0.2.0 # via nbformat, traitlets
ipython==7.18.1 # via ipykernel
jedi==0.17.2 # via ipython
jeepney==0.4.3 # via keyring, secretstorage
jmespath==0.10.0 # via boto3, botocore
jsonschema==3.2.0 # via nbformat
jupyter-client==6.1.7 # via ipykernel, nbclient
Expand All @@ -40,10 +41,10 @@ keyring==21.4.0 # via flytekit
natsort==7.0.1 # via croniter
nbclient==0.5.0 # via papermill
nbformat==5.0.7 # via nbclient, papermill
nest-asyncio==1.4.0 # via nbclient
nest-asyncio==1.4.1 # via nbclient
numpy==1.19.2 # via flytekit, pandas, pyarrow, torch
pandas==1.1.2 # via flytekit
papermill==2.1.3 # via flytekit
papermill==2.2.0 # via flytekit
parso==0.7.1 # via jedi
pathspec==0.8.0 # via black
pexpect==4.8.0 # via ipython
Expand All @@ -53,6 +54,7 @@ protobuf==3.13.0 # via flyteidl, flytekit, k8s-proto
ptyprocess==0.6.0 # via pexpect
py4j==0.10.7 # via pyspark
pyarrow==0.17.1 # via flytekit
pycparser==2.20 # via cffi
pygments==2.7.1 # via ipython
pyrsistent==0.17.3 # via jsonschema
pyspark==2.4.7 # via flytekit
Expand All @@ -65,7 +67,8 @@ regex==2020.9.27 # via black
requests==2.24.0 # via flytekit, papermill, responses
responses==0.12.0 # via flytekit
s3transfer==0.3.3 # via boto3
six==1.15.0 # via flytekit, grpcio, jsonschema, protobuf, python-dateutil, responses, tenacity, thrift
secretstorage==3.1.2 # via keyring
six==1.15.0 # via cryptography, flytekit, grpcio, jsonschema, protobuf, python-dateutil, responses, tenacity, thrift
sortedcontainers==2.2.2 # via flytekit
statsd==3.3.0 # via flytekit
tenacity==6.2.0 # via papermill
Expand All @@ -74,13 +77,12 @@ thrift==0.13.0 # via hmsclient
toml==0.10.1 # via black
torch==1.6.0 # via flytekit
tornado==6.0.4 # via ipykernel, jupyter-client
tqdm==4.49.0 # via papermill
tqdm==4.50.0 # via papermill
traitlets==5.0.4 # via ipykernel, ipython, jupyter-client, jupyter-core, nbclient, nbformat
typed-ast==1.4.1 # via black
urllib3==1.25.10 # via botocore, flytekit, requests, responses
wcwidth==0.2.5 # via prompt-toolkit
wrapt==1.12.1 # via deprecated, flytekit
zipp==3.1.0 # via importlib-metadata

# The following packages are considered to be unsafe in a requirements file:
# setuptools
2 changes: 1 addition & 1 deletion flytekit/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
]
},
install_requires=[
"flyteidl>=0.18.6,<1.0.0",
"flyteidl>=0.18.8,<1.0.0",
"click>=6.6,<8.0",
"croniter>=0.3.20,<4.0.0",
"deprecated>=1.0,<2.0",
Expand Down
Loading

0 comments on commit 6d36883

Please sign in to comment.