Skip to content

Commit

Permalink
Merge pull request #169 from PrefectHQ/fg-schedule-timezones
Browse files Browse the repository at this point in the history
Flow Group schedule timezones
  • Loading branch information
cicdw authored Dec 30, 2020
2 parents 8a18341 + 41a5595 commit 91960a3
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 10 deletions.
8 changes: 8 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## December 16, 2020 <Badge text="beta" type="success" />

Released on December 16, 2020.

### Enhancements

- Add the `events` RBAC permission to agents created by the helm chart - [#157](https://github.com/PrefectHQ/cloud/pull/157)

## December 14, 2020 <Badge text="beta" type="success" />

Released on December 14, 2020.
Expand Down
2 changes: 0 additions & 2 deletions changes/pr157.yaml

This file was deleted.

2 changes: 2 additions & 0 deletions changes/pr169.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enhancement:
- "Allow for timezone specification on flow group schedules - [#169](https://github.com/PrefectHQ/server/pull/169)"
16 changes: 15 additions & 1 deletion src/prefect_server/api/flow_groups.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pendulum
from typing import List, Dict, Any

from prefect import api, models
Expand Down Expand Up @@ -67,21 +68,34 @@ async def set_flow_group_default_parameters(


@register_api("flow_groups.set_flow_group_schedule")
async def set_flow_group_schedule(flow_group_id: str, clocks: List[dict]) -> bool:
async def set_flow_group_schedule(
flow_group_id: str, clocks: List[dict], timezone: str = None
) -> bool:
"""
Sets a schedule for a flow group
Args:
- flow_group_id (str): the ID of the flow group to update
- clocks (List[dict]): a list of dictionaries defining clocks for the schedule
- timezone (str, optional): an optional timezone to set for the schedule
Returns:
- bool: whether setting the schedule was successful
Raises:
- ValueError: if flow group ID isn't provided
"""
if timezone:
if timezone not in pendulum.timezones:
raise ValueError(f"Invalid timezone provided for schedule: {timezone}")
start_date = {
"dt": pendulum.now(timezone).naive().to_iso8601_string(),
"tz": timezone,
}
else:
start_date = None
for clock in clocks:
clock["start_date"] = start_date
try:
ClockSchema().load(clock)
except:
Expand Down
6 changes: 5 additions & 1 deletion src/prefect_server/graphql/flow_groups.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pendulum
from typing import Any

from graphql import GraphQLResolveInfo
Expand Down Expand Up @@ -58,8 +59,11 @@ async def resolve_set_flow_group_schedule(
interval_clocks.append(clock)

clocks = cron_clocks + interval_clocks

result = await api.flow_groups.set_flow_group_schedule(
flow_group_id=input["flow_group_id"], clocks=clocks
flow_group_id=input["flow_group_id"],
clocks=clocks,
timezone=input.get("timezone"),
)
return {"success": result}

Expand Down
2 changes: 2 additions & 0 deletions src/prefect_server/graphql/schema/flows.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ input set_flow_group_schedule_input {
cron_clocks: [cron_clock_input!]
"A list of interval clocks for the schedule"
interval_clocks: [interval_clock_input!]
"An optional timezone to use for the schedule"
timezone: String
}

input cron_clock_input {
Expand Down
50 changes: 48 additions & 2 deletions tests/api/test_flow_groups.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import uuid
import pendulum

import pytest

from prefect import api, models
from prefect.serialization.schedule import ScheduleSchema


class TestSetFlowGroupDefaultParameter:
Expand Down Expand Up @@ -172,6 +174,50 @@ async def test_set_flow_group_schedule(self, flow_group_id, clock):
flow_group = await models.FlowGroup.where(id=flow_group_id).first({"schedule"})
assert flow_group.schedule["clocks"][0] == clock

schema = ScheduleSchema()
schedule = schema.load(flow_group.schedule)
assert schedule.clocks[0].start_date is None

@pytest.mark.parametrize(
"clock",
[
{"type": "CronClock", "cron": "42 0 0 * * *"},
{"type": "IntervalClock", "interval": 420000000},
],
)
async def test_set_flow_group_schedule_raises_for_invalid_timezones(
self, flow_group_id, clock
):
with pytest.raises(ValueError, match="timezone"):
await api.flow_groups.set_flow_group_schedule(
flow_group_id=flow_group_id, clocks=[clock], timezone="Kalamazoo"
)

@pytest.mark.parametrize(
"clock",
[
{"type": "CronClock", "cron": "42 0 0 * * *"},
{"type": "IntervalClock", "interval": 420000000},
],
)
async def test_set_flow_group_schedule_respects_passed_timezones(
self, flow_group_id, clock
):
flow_group = await models.FlowGroup.where(id=flow_group_id).first({"schedule"})
assert flow_group.schedule is None

success = await api.flow_groups.set_flow_group_schedule(
flow_group_id=flow_group_id, clocks=[clock], timezone="US/Pacific"
)
assert success is True

schema = ScheduleSchema()
flow_group = await models.FlowGroup.where(id=flow_group_id).first({"schedule"})
schedule = schema.load(flow_group.schedule)

assert schedule.clocks[0].start_date <= pendulum.now("utc")
assert schedule.clocks[0].start_date.timezone_name == "US/Pacific"

async def test_setting_schedule_deletes_runs(self, flow_id, flow_group_id):
"""
This test takes a flow with a schedule and ensures that updating the Flow Group
Expand Down Expand Up @@ -227,8 +273,8 @@ async def test_set_schedule_with_two_clocks(self, flow_group_id):

flow_group = await models.FlowGroup.where(id=flow_group_id).first({"schedule"})
assert flow_group.schedule["clocks"] == [
{"type": "CronClock", "cron": "42 0 0 * * *"},
{"type": "CronClock", "cron": "43 0 0 * * *"},
{"type": "CronClock", "cron": "42 0 0 * * *", "start_date": None},
{"type": "CronClock", "cron": "43 0 0 * * *", "start_date": None},
]

@pytest.mark.parametrize(
Expand Down
50 changes: 46 additions & 4 deletions tests/graphql/test_flow_groups.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pendulum
import pytest

from prefect import models
Expand Down Expand Up @@ -114,8 +115,9 @@ async def test_set_cron_clocks_for_flow_group_schedule(
"type": "CronClock",
"cron": "42 0 0 * * *",
"parameter_defaults": {"meep": "morp"},
"start_date": None,
},
{"type": "CronClock", "cron": "43 0 0 * * *"},
{"type": "CronClock", "cron": "43 0 0 * * *", "start_date": None},
],
}

Expand Down Expand Up @@ -143,8 +145,9 @@ async def test_add_interval_clocks_to_flow_group_schedule(
"type": "IntervalClock",
"interval": 4200000000,
"parameter_defaults": {"meep": "morp"},
"start_date": None,
},
{"type": "IntervalClock", "interval": 4300000000},
{"type": "IntervalClock", "interval": 4300000000, "start_date": None},
],
}

Expand All @@ -166,11 +169,50 @@ async def test_add_cron_and_interval_clocks_to_flow_group_schedule(
assert flow_group.schedule == {
"type": "Schedule",
"clocks": [
{"type": "CronClock", "cron": "42 0 0 * * *"},
{"type": "IntervalClock", "interval": 4200000000},
{"type": "CronClock", "cron": "42 0 0 * * *", "start_date": None},
{"type": "IntervalClock", "interval": 4200000000, "start_date": None},
],
}

async def test_add_cron_and_interval_clocks_to_flow_group_schedule_with_timezone(
self, run_query, flow_group_id
):
result = await run_query(
query=self.mutation,
variables=dict(
input=dict(
flow_group_id=flow_group_id,
cron_clocks=[{"cron": "42 0 0 * * *"}],
interval_clocks=[{"interval": 4200}],
timezone="US/Pacific",
)
),
)
assert result.data.set_flow_group_schedule.success is True
flow_group = await models.FlowGroup.where(id=flow_group_id).first({"schedule"})

schedule = ScheduleSchema().load(flow_group.schedule)
assert len(schedule.clocks) == 2
assert all([c.start_date <= pendulum.now("utc") for c in schedule.clocks])
assert all(
[c.start_date.timezone_name == "US/Pacific" for c in schedule.clocks]
)

async def test_add_clocks_to_flow_group_schedule_with_timezone_raises_informative_error(
self, run_query, flow_group_id
):
result = await run_query(
query=self.mutation,
variables=dict(
input=dict(
flow_group_id=flow_group_id,
cron_clocks=[{"cron": "42 0 0 * * *"}],
timezone="US/Ocean",
)
),
)
assert "Invalid timezone" in result.errors[0].message

async def test_interval_clock_units(self, run_query, flow_group_id):
await run_query(
query=self.mutation,
Expand Down

0 comments on commit 91960a3

Please sign in to comment.