Skip to content

Commit

Permalink
Solution: Storage Adapter (#313)
Browse files Browse the repository at this point in the history
  • Loading branch information
maennchen authored and c-rack committed Apr 5, 2018
1 parent 5c2eb20 commit bab91cb
Show file tree
Hide file tree
Showing 13 changed files with 516 additions and 42 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ This project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased

### Added
- Experimental Storage API

Diff for [unreleased]

## 2.2.7 - 2018-03-22
Expand Down
3 changes: 3 additions & 0 deletions lib/quantum.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Quantum do
alias Quantum.Normalizer
alias Quantum.Job
alias Quantum.RunStrategy.Random
alias Quantum.Storage.Noop

@defaults [
global: false,
Expand Down Expand Up @@ -46,13 +47,15 @@ defmodule Quantum do
task_supervisor = Module.concat(quantum, Task.Supervisor)

config
|> Keyword.put_new(:quantum, quantum)
|> update_in([:schedule], &Normalizer.normalize_schedule/1)
|> Keyword.put_new(:task_stages_supervisor, task_stages_supervisor)
|> Keyword.put_new(:job_broadcaster, job_broadcaster)
|> Keyword.put_new(:execution_broadcaster, execution_broadcaster)
|> Keyword.put_new(:executor_supervisor, executor_supervisor)
|> Keyword.put_new(:task_registry, task_registry)
|> Keyword.put_new(:task_supervisor, task_supervisor)
|> Keyword.put_new(:storage, Noop)
end

@doc """
Expand Down
69 changes: 58 additions & 11 deletions lib/quantum/execution_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ defmodule Quantum.ExecutionBroadcaster do
require Logger

alias Quantum.{Job, Util, DateLibrary}
alias Crontab.{Scheduler, CronExpression}
alias Quantum.DateLibrary.{InvalidDateTimeForTimezoneError, InvalidTimezoneError}
alias Crontab.Scheduler, as: CrontabScheduler
alias Crontab.CronExpression
alias Quantum.Storage.Adapter
alias Quantum.Scheduler

defmodule JobInPastError do
defexception message:
Expand All @@ -25,22 +28,57 @@ defmodule Quantum.ExecutionBroadcaster do
* `job_broadcaster` - The name of the stage to listen to
"""
@spec start_link(GenServer.server(), GenServer.server(), boolean()) :: GenServer.on_start()
def start_link(name, job_broadcaster, debug_logging) do
@spec start_link(GenServer.server(), GenServer.server(), Adapter, Scheduler, boolean()) ::
GenServer.on_start()
def start_link(name, job_broadcaster, storage, scheduler, debug_logging) do
__MODULE__
|> GenStage.start_link({job_broadcaster, debug_logging}, name: name)
|> GenStage.start_link({job_broadcaster, storage, scheduler, debug_logging}, name: name)
|> Util.start_or_link()
end

@doc false
@spec child_spec({GenServer.server(), GenServer.server(), boolean()}) :: Supervisor.child_spec()
def child_spec({name, job_broadcaster, debug_logging}) do
%{super([]) | start: {__MODULE__, :start_link, [name, job_broadcaster, debug_logging]}}
@spec child_spec({GenServer.server(), GenServer.server(), Adapter, Scheduler, boolean()}) ::
Supervisor.child_spec()
def child_spec({name, job_broadcaster, storage, scheduler, debug_logging}) do
%{
super([])
| start:
{__MODULE__, :start_link, [name, job_broadcaster, storage, scheduler, debug_logging]}
}
end

@doc false
def init({job_broadcaster, debug_logging}) do
state = %{jobs: [], time: NaiveDateTime.utc_now(), timer: nil, debug_logging: debug_logging}
def init({job_broadcaster, storage, scheduler, debug_logging}) do
last_execution_date =
case storage.last_execution_date(scheduler) do
%NaiveDateTime{} = date ->
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Using last known execution time #{
NaiveDateTime.to_iso8601(date)
}"
end)

date

:unknown ->
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Unknown last execution time, using now"
end)

NaiveDateTime.utc_now()
end

state = %{
jobs: [],
time: last_execution_date,
timer: nil,
storage: storage,
scheduler: scheduler,
debug_logging: debug_logging
}

{:producer_consumer, state, subscribe_to: [job_broadcaster]}
end

Expand Down Expand Up @@ -71,8 +109,15 @@ defmodule Quantum.ExecutionBroadcaster do

def handle_info(
:execute,
%{jobs: [{time_to_execute, jobs_to_execute} | tail], debug_logging: debug_logging} = state
%{
jobs: [{time_to_execute, jobs_to_execute} | tail],
storage: storage,
scheduler: scheduler,
debug_logging: debug_logging
} = state
) do
:ok = storage.update_last_execution_date(scheduler, time_to_execute)

state =
state
|> Map.put(:timer, nil)
Expand Down Expand Up @@ -126,6 +171,8 @@ defmodule Quantum.ExecutionBroadcaster do
end)

%{state | jobs: jobs}
|> sort_state
|> reset_timer
end

defp add_job_to_state(
Expand Down Expand Up @@ -162,7 +209,7 @@ defmodule Quantum.ExecutionBroadcaster do
time
) do
schedule
|> Scheduler.get_next_run_date(DateLibrary.to_tz!(time, timezone))
|> CrontabScheduler.get_next_run_date(DateLibrary.to_tz!(time, timezone))
|> case do
{:ok, date} ->
{:ok, DateLibrary.to_utc!(date, timezone)}
Expand Down
3 changes: 2 additions & 1 deletion lib/quantum/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ defmodule Quantum.Job do
state: :active
]

@type name :: atom | reference()
@type state :: :active | :inactive
@type task :: {atom, atom, [any]} | (() -> any)
@type timezone :: :utc | :local | String.t()
@type schedule :: Crontab.CronExpression.t()

@type t :: %__MODULE__{
name: atom | Reference,
name: name,
schedule: schedule | nil,
task: task | nil,
state: state,
Expand Down
82 changes: 67 additions & 15 deletions lib/quantum/job_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ defmodule Quantum.JobBroadcaster do

require Logger

alias Quantum.{Job, Util}
alias Quantum.{Job, Util, Scheduler}
alias Quantum.Storage.Adapter

@doc """
Start Job Broadcaster
Expand All @@ -18,24 +19,52 @@ defmodule Quantum.JobBroadcaster do
* `jobs` - Array of `Quantum.Job`
"""
@spec start_link(GenServer.server(), [Job.t()], boolean()) :: GenServer.on_start()
def start_link(name, jobs, debug_logging) do
@spec start_link(GenServer.server(), [Job.t()], Adapter, Scheduler, boolean()) ::
GenServer.on_start()
def start_link(name, jobs, storage, scheduler, debug_logging) do
__MODULE__
|> GenStage.start_link({jobs, debug_logging}, name: name)
|> GenStage.start_link({jobs, storage, scheduler, debug_logging}, name: name)
|> Util.start_or_link()
end

@doc false
@spec child_spec({GenServer.server(), [Job.t()], boolean()}) :: Supervisor.child_spec()
def child_spec({name, jobs, debug_logging}) do
%{super([]) | start: {__MODULE__, :start_link, [name, jobs, debug_logging]}}
@spec child_spec({GenServer.server(), [Job.t()], Adapter, Scheduler, boolean()}) ::
Supervisor.child_spec()
def child_spec({name, jobs, storage, scheduler, debug_logging}) do
%{
super([])
| start: {__MODULE__, :start_link, [name, jobs, storage, scheduler, debug_logging]}
}
end

@doc false
def init({jobs, debug_logging}) do
def init({jobs, storage, scheduler, debug_logging}) do
effective_jobs =
scheduler
|> storage.jobs()
|> case do
:not_applicable ->
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Config"
end)

jobs

storage_jobs when is_list(storage_jobs) ->
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Storage, skipping config"
end)

storage_jobs
end

state = %{
jobs: Enum.into(jobs, %{}, fn %{name: name} = job -> {name, job} end),
buffer: for(%{state: :active} = job <- jobs, do: {:add, job}),
jobs: Enum.into(effective_jobs, %{}, fn %{name: name} = job -> {name, job} end),
buffer: for(%{state: :active} = job <- effective_jobs, do: {:add, job}),
storage: storage,
scheduler: scheduler,
debug_logging: debug_logging
}

Expand All @@ -50,39 +79,53 @@ defmodule Quantum.JobBroadcaster do

def handle_cast(
{:add, %Job{state: :active, name: job_name} = job},
%{jobs: jobs, debug_logging: debug_logging} = state
%{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
state
) do
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
end)

:ok = storage.add_job(scheduler, job)

{:noreply, [{:add, job}], %{state | jobs: Map.put(jobs, job_name, job)}}
end

def handle_cast(
{:add, %Job{state: :inactive, name: job_name} = job},
%{jobs: jobs, debug_logging: debug_logging} = state
%{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
state
) do
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
end)

:ok = storage.add_job(scheduler, job)

{:noreply, [], %{state | jobs: Map.put(jobs, job_name, job)}}
end

def handle_cast({:delete, name}, %{jobs: jobs, debug_logging: debug_logging} = state) do
def handle_cast(
{:delete, name},
%{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
state
) do
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Deleting job #{inspect(name)}"
end)

case Map.fetch(jobs, name) do
{:ok, %{state: :active}} ->
:ok = storage.delete_job(scheduler, name)

{:noreply, [{:remove, name}], %{state | jobs: Map.delete(jobs, name)}}

{:ok, %{state: :inactive}} ->
:ok = storage.delete_job(scheduler, name)

{:noreply, [], %{state | jobs: Map.delete(jobs, name)}}

:error ->
Expand All @@ -92,7 +135,8 @@ defmodule Quantum.JobBroadcaster do

def handle_cast(
{:change_state, name, new_state},
%{jobs: jobs, debug_logging: debug_logging} = state
%{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
state
) do
debug_logging &&
Logger.debug(fn ->
Expand All @@ -109,6 +153,8 @@ defmodule Quantum.JobBroadcaster do
{:ok, job} ->
jobs = Map.update!(jobs, name, &Job.set_state(&1, new_state))

:ok = storage.update_job_state(scheduler, job.name, new_state)

case new_state do
:active ->
{:noreply, [{:add, %{job | state: new_state}}], %{state | jobs: jobs}}
Expand All @@ -119,14 +165,20 @@ defmodule Quantum.JobBroadcaster do
end
end

def handle_cast(:delete_all, %{jobs: jobs, debug_logging: debug_logging} = state) do
def handle_cast(
:delete_all,
%{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
state
) do
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Deleting all jobs"
end)

messages = for {name, %Job{state: :active}} <- jobs, do: {:remove, name}

:ok = storage.purge(scheduler)

{:noreply, messages, %{state | jobs: %{}}}
end

Expand Down
64 changes: 64 additions & 0 deletions lib/quantum/storage/adapter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
defmodule Quantum.Storage.Adapter do
@moduledoc """
Bahaviour to be implemented by all Storage Adapters.
**WARNING: This Adapter is experimental and will therefore not adhere to semantic versioning.
It could undergo massive changes even in patch releases.**
"""

alias Quantum.Job

@typedoc """
The calling scheduler Module
"""
@type scheduler_module :: atom

@typedoc """
The expected return is `:ok`, every other result will terminate the scheduler.
"""
@type ok :: :ok

@doc """
Load saved jobs from storage
Returns `:not_applicable` if the storage has never received an `add_job` call or after it has been purged.
In this case the jobs from the configuration weill be loaded.
"""
@callback jobs(scheduler_module) :: :not_applicable | [Job.t()]

@doc """
Save new job in storage.
"""
@callback add_job(scheduler_module, job :: Job.t()) :: ok

@doc """
Delete new job in storage.
"""
@callback delete_job(scheduler_module, job :: Job.name()) :: ok

@doc """
Change Job State from given job.
"""
@callback update_job_state(scheduler_module, job :: Job.name(), state :: Job.state()) :: ok

@doc """
Load last execution time from storage
Returns `:unknown` if the storage does not know the last execution time.
In this case all jobs will be run at the next applicable date.
"""
@callback last_execution_date(scheduler_module) :: :unknown | NaiveDateTime.t()

@doc """
Update last execution time to given date.
"""
@callback update_last_execution_date(
scheduler_module,
last_execution_date :: NaiveDateTime.t()
) :: ok

@doc """
Purge all date from storage and go back to initial state.
"""
@callback purge(scheduler_module) :: ok
end
Loading

0 comments on commit bab91cb

Please sign in to comment.