From a8679427c91f208ec436c13a35046974109f34f9 Mon Sep 17 00:00:00 2001 From: Chris White Date: Tue, 29 Dec 2020 19:13:04 -0800 Subject: [PATCH 1/5] Initial untested implementation of flow group timezones --- Changelog.md | 8 ++++++++ changes/pr157.yaml | 2 -- src/prefect_server/api/flow_groups.py | 16 ++++++++++++++-- src/prefect_server/graphql/flow_groups.py | 6 +++++- src/prefect_server/graphql/schema/flows.graphql | 2 ++ 5 files changed, 29 insertions(+), 5 deletions(-) delete mode 100644 changes/pr157.yaml diff --git a/Changelog.md b/Changelog.md index 4ebc373d..2c0c89bd 100644 --- a/Changelog.md +++ b/Changelog.md @@ -1,5 +1,13 @@ # Changelog +## December 16, 2020 + +Released on December 16, 2020. + +### Enhancements + +- Add the `events` RBAC permission to agents created by the helm chart - [#157](https://github.com/PrefectHQ/cloud/pull/157) + ## December 14, 2020 Released on December 14, 2020. diff --git a/changes/pr157.yaml b/changes/pr157.yaml deleted file mode 100644 index 24b11a27..00000000 --- a/changes/pr157.yaml +++ /dev/null @@ -1,2 +0,0 @@ -enhancement: - - "Add the `events` RBAC permission to agents created by the helm chart - [#157](https://github.com/PrefectHQ/cloud/pull/157)" diff --git a/src/prefect_server/api/flow_groups.py b/src/prefect_server/api/flow_groups.py index 9918aad7..84f628ad 100644 --- a/src/prefect_server/api/flow_groups.py +++ b/src/prefect_server/api/flow_groups.py @@ -67,13 +67,16 @@ async def set_flow_group_default_parameters( @register_api("flow_groups.set_flow_group_schedule") -async def set_flow_group_schedule(flow_group_id: str, clocks: List[dict]) -> bool: +async def set_flow_group_schedule( + flow_group_id: str, clocks: List[dict], timezone: str = None +) -> bool: """ Sets a schedule for a flow group Args: - flow_group_id (str): the ID of the flow group to update - clocks (List[dict]): a list of dictionaries defining clocks for the schedule + - timezone (str, optional): an optional timezone to set for the schedule Returns: - bool: whether setting the schedule was successful @@ -81,6 +84,15 @@ async def set_flow_group_schedule(flow_group_id: str, clocks: List[dict]) -> boo Raises: - ValueError: if flow group ID isn't provided """ + if timezone: + if timezone not in pendulum.timezones: + raise ValueError(f"Invalid timezone provided for schedule: {timezone}") + start_date = { + "dt": pendulum.now("utc").naive().to_iso8601_string(), + "tz": timezone, + } + else: + start_date = None for clock in clocks: try: ClockSchema().load(clock) @@ -89,7 +101,7 @@ async def set_flow_group_schedule(flow_group_id: str, clocks: List[dict]) -> boo if not flow_group_id: raise ValueError("Invalid flow group ID") result = await models.FlowGroup.where(id=flow_group_id).update( - set=dict(schedule=dict(type="Schedule", clocks=clocks)) + set=dict(schedule=dict(type="Schedule", clocks=clocks, start_date=start_date)) ) deleted_runs = await models.FlowRun.where( diff --git a/src/prefect_server/graphql/flow_groups.py b/src/prefect_server/graphql/flow_groups.py index 30f5d1c9..440eef7f 100644 --- a/src/prefect_server/graphql/flow_groups.py +++ b/src/prefect_server/graphql/flow_groups.py @@ -1,3 +1,4 @@ +import pendulum from typing import Any from graphql import GraphQLResolveInfo @@ -58,8 +59,11 @@ async def resolve_set_flow_group_schedule( interval_clocks.append(clock) clocks = cron_clocks + interval_clocks + result = await api.flow_groups.set_flow_group_schedule( - flow_group_id=input["flow_group_id"], clocks=clocks + flow_group_id=input["flow_group_id"], + clocks=clocks, + timezone=input.get("timezone"), ) return {"success": result} diff --git a/src/prefect_server/graphql/schema/flows.graphql b/src/prefect_server/graphql/schema/flows.graphql index 23449cbd..32889cc4 100644 --- a/src/prefect_server/graphql/schema/flows.graphql +++ b/src/prefect_server/graphql/schema/flows.graphql @@ -168,6 +168,8 @@ input set_flow_group_schedule_input { cron_clocks: [cron_clock_input!] "A list of interval clocks for the schedule" interval_clocks: [interval_clock_input!] + "An optional timezone to use for the schedule + timezone: String } input cron_clock_input { From b0b25c61f4ac8286184f7136b4b0b79a87fa219c Mon Sep 17 00:00:00 2001 From: Chris White Date: Tue, 29 Dec 2020 19:33:04 -0800 Subject: [PATCH 2/5] Add api level test --- src/prefect_server/api/flow_groups.py | 4 ++- tests/api/test_flow_groups.py | 48 +++++++++++++++++++++++++-- 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/src/prefect_server/api/flow_groups.py b/src/prefect_server/api/flow_groups.py index 84f628ad..faecee04 100644 --- a/src/prefect_server/api/flow_groups.py +++ b/src/prefect_server/api/flow_groups.py @@ -1,3 +1,4 @@ +import pendulum from typing import List, Dict, Any from prefect import api, models @@ -94,6 +95,7 @@ async def set_flow_group_schedule( else: start_date = None for clock in clocks: + clock["start_date"] = start_date try: ClockSchema().load(clock) except: @@ -101,7 +103,7 @@ async def set_flow_group_schedule( if not flow_group_id: raise ValueError("Invalid flow group ID") result = await models.FlowGroup.where(id=flow_group_id).update( - set=dict(schedule=dict(type="Schedule", clocks=clocks, start_date=start_date)) + set=dict(schedule=dict(type="Schedule", clocks=clocks)) ) deleted_runs = await models.FlowRun.where( diff --git a/tests/api/test_flow_groups.py b/tests/api/test_flow_groups.py index 8c907c7d..b77c51c5 100644 --- a/tests/api/test_flow_groups.py +++ b/tests/api/test_flow_groups.py @@ -3,6 +3,7 @@ import pytest from prefect import api, models +from prefect.serialization.schedule import ScheduleSchema class TestSetFlowGroupDefaultParameter: @@ -172,6 +173,49 @@ async def test_set_flow_group_schedule(self, flow_group_id, clock): flow_group = await models.FlowGroup.where(id=flow_group_id).first({"schedule"}) assert flow_group.schedule["clocks"][0] == clock + schema = ScheduleSchema() + schedule = schema.load(flow_group.schedule) + assert schedule.clocks[0].start_date is None + + @pytest.mark.parametrize( + "clock", + [ + {"type": "CronClock", "cron": "42 0 0 * * *"}, + {"type": "IntervalClock", "interval": 420000000}, + ], + ) + async def test_set_flow_group_schedule_raises_for_invalid_timezones( + self, flow_group_id, clock + ): + with pytest.raises(ValueError, match="timezone"): + await api.flow_groups.set_flow_group_schedule( + flow_group_id=flow_group_id, clocks=[clock], timezone="Kalamazoo" + ) + + @pytest.mark.parametrize( + "clock", + [ + {"type": "CronClock", "cron": "42 0 0 * * *"}, + {"type": "IntervalClock", "interval": 420000000}, + ], + ) + async def test_set_flow_group_schedule_respects_passed_timezones( + self, flow_group_id, clock + ): + flow_group = await models.FlowGroup.where(id=flow_group_id).first({"schedule"}) + assert flow_group.schedule is None + + success = await api.flow_groups.set_flow_group_schedule( + flow_group_id=flow_group_id, clocks=[clock], timezone="US/Pacific" + ) + assert success is True + + schema = ScheduleSchema() + flow_group = await models.FlowGroup.where(id=flow_group_id).first({"schedule"}) + schedule = schema.load(flow_group.schedule) + + assert schedule.clocks[0].start_date.timezone_name == "US/Pacific" + async def test_setting_schedule_deletes_runs(self, flow_id, flow_group_id): """ This test takes a flow with a schedule and ensures that updating the Flow Group @@ -227,8 +271,8 @@ async def test_set_schedule_with_two_clocks(self, flow_group_id): flow_group = await models.FlowGroup.where(id=flow_group_id).first({"schedule"}) assert flow_group.schedule["clocks"] == [ - {"type": "CronClock", "cron": "42 0 0 * * *"}, - {"type": "CronClock", "cron": "43 0 0 * * *"}, + {"type": "CronClock", "cron": "42 0 0 * * *", "start_date": None}, + {"type": "CronClock", "cron": "43 0 0 * * *", "start_date": None}, ] @pytest.mark.parametrize( From 2e810343610a2748f6d120558303372dc1f7511e Mon Sep 17 00:00:00 2001 From: Chris White Date: Tue, 29 Dec 2020 19:43:48 -0800 Subject: [PATCH 3/5] Add additional test at the graphQL layer --- src/prefect_server/api/flow_groups.py | 2 +- .../graphql/schema/flows.graphql | 2 +- tests/api/test_flow_groups.py | 2 + tests/graphql/test_flow_groups.py | 40 +++++++++++++++++++ 4 files changed, 44 insertions(+), 2 deletions(-) diff --git a/src/prefect_server/api/flow_groups.py b/src/prefect_server/api/flow_groups.py index faecee04..24761ad7 100644 --- a/src/prefect_server/api/flow_groups.py +++ b/src/prefect_server/api/flow_groups.py @@ -89,7 +89,7 @@ async def set_flow_group_schedule( if timezone not in pendulum.timezones: raise ValueError(f"Invalid timezone provided for schedule: {timezone}") start_date = { - "dt": pendulum.now("utc").naive().to_iso8601_string(), + "dt": pendulum.now(timezone).naive().to_iso8601_string(), "tz": timezone, } else: diff --git a/src/prefect_server/graphql/schema/flows.graphql b/src/prefect_server/graphql/schema/flows.graphql index 32889cc4..10380c04 100644 --- a/src/prefect_server/graphql/schema/flows.graphql +++ b/src/prefect_server/graphql/schema/flows.graphql @@ -168,7 +168,7 @@ input set_flow_group_schedule_input { cron_clocks: [cron_clock_input!] "A list of interval clocks for the schedule" interval_clocks: [interval_clock_input!] - "An optional timezone to use for the schedule + "An optional timezone to use for the schedule" timezone: String } diff --git a/tests/api/test_flow_groups.py b/tests/api/test_flow_groups.py index b77c51c5..9eee244f 100644 --- a/tests/api/test_flow_groups.py +++ b/tests/api/test_flow_groups.py @@ -1,4 +1,5 @@ import uuid +import pendulum import pytest @@ -214,6 +215,7 @@ async def test_set_flow_group_schedule_respects_passed_timezones( flow_group = await models.FlowGroup.where(id=flow_group_id).first({"schedule"}) schedule = schema.load(flow_group.schedule) + assert schedule.clocks[0].start_date <= pendulum.now("utc") assert schedule.clocks[0].start_date.timezone_name == "US/Pacific" async def test_setting_schedule_deletes_runs(self, flow_id, flow_group_id): diff --git a/tests/graphql/test_flow_groups.py b/tests/graphql/test_flow_groups.py index af87f069..7bf8b194 100644 --- a/tests/graphql/test_flow_groups.py +++ b/tests/graphql/test_flow_groups.py @@ -1,3 +1,4 @@ +import pendulum import pytest from prefect import models @@ -171,6 +172,45 @@ async def test_add_cron_and_interval_clocks_to_flow_group_schedule( ], } + async def test_add_cron_and_interval_clocks_to_flow_group_schedule_with_timezone( + self, run_query, flow_group_id + ): + result = await run_query( + query=self.mutation, + variables=dict( + input=dict( + flow_group_id=flow_group_id, + cron_clocks=[{"cron": "42 0 0 * * *"}], + interval_clocks=[{"interval": 4200}], + timezone="US/Pacific", + ) + ), + ) + assert result.data.set_flow_group_schedule.success is True + flow_group = await models.FlowGroup.where(id=flow_group_id).first({"schedule"}) + + schedule = ScheduleSchema().load(flow_group.schedule) + assert len(schedule.clocks) == 2 + assert all([c.start_date <= pendulum.now("utc") for c in schedule.clocks]) + assert all( + [c.start_date.timezone_name == "US/Pacific" for c in schedule.clocks] + ) + + async def test_add_clocks_to_flow_group_schedule_with_timezone_raises_informative_error( + self, run_query, flow_group_id + ): + result = await run_query( + query=self.mutation, + variables=dict( + input=dict( + flow_group_id=flow_group_id, + cron_clocks=[{"cron": "42 0 0 * * *"}], + timezone="US/Ocean", + ) + ), + ) + assert "Invalid timezone" in result.errors[0].message + async def test_interval_clock_units(self, run_query, flow_group_id): await run_query( query=self.mutation, From e2bcaf8397a7af24df4c0d0621d18d3dd526b952 Mon Sep 17 00:00:00 2001 From: Chris White Date: Tue, 29 Dec 2020 19:46:19 -0800 Subject: [PATCH 4/5] Add changelog entry --- changes/pr169.yaml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 changes/pr169.yaml diff --git a/changes/pr169.yaml b/changes/pr169.yaml new file mode 100644 index 00000000..4e8862fc --- /dev/null +++ b/changes/pr169.yaml @@ -0,0 +1,2 @@ +enhancement: + - "Allow for timezone specification on flow group schedules - [#169](https://github.com/PrefectHQ/server/pull/169)" From 41a55954848460677f084ba8281546fc6b3b8dd4 Mon Sep 17 00:00:00 2001 From: Chris White Date: Tue, 29 Dec 2020 20:16:27 -0800 Subject: [PATCH 5/5] FIx up tests --- tests/graphql/test_flow_groups.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/graphql/test_flow_groups.py b/tests/graphql/test_flow_groups.py index 7bf8b194..c9b5cc95 100644 --- a/tests/graphql/test_flow_groups.py +++ b/tests/graphql/test_flow_groups.py @@ -115,8 +115,9 @@ async def test_set_cron_clocks_for_flow_group_schedule( "type": "CronClock", "cron": "42 0 0 * * *", "parameter_defaults": {"meep": "morp"}, + "start_date": None, }, - {"type": "CronClock", "cron": "43 0 0 * * *"}, + {"type": "CronClock", "cron": "43 0 0 * * *", "start_date": None}, ], } @@ -144,8 +145,9 @@ async def test_add_interval_clocks_to_flow_group_schedule( "type": "IntervalClock", "interval": 4200000000, "parameter_defaults": {"meep": "morp"}, + "start_date": None, }, - {"type": "IntervalClock", "interval": 4300000000}, + {"type": "IntervalClock", "interval": 4300000000, "start_date": None}, ], } @@ -167,8 +169,8 @@ async def test_add_cron_and_interval_clocks_to_flow_group_schedule( assert flow_group.schedule == { "type": "Schedule", "clocks": [ - {"type": "CronClock", "cron": "42 0 0 * * *"}, - {"type": "IntervalClock", "interval": 4200000000}, + {"type": "CronClock", "cron": "42 0 0 * * *", "start_date": None}, + {"type": "IntervalClock", "interval": 4200000000, "start_date": None}, ], }