Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage adapter implementation: Implemented Quantum.Storage.Adapter behavior via PersistentETS. #318

Closed
wants to merge 7 commits into from
10 changes: 6 additions & 4 deletions lib/quantum/job_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ defmodule Quantum.JobBroadcaster do

@doc false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your merge was not clean. What changes did you make here?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I messed up a couple of times doing the rebase stuff...
I think I fixed that in the latest commit.
Still TravisCI still complains about the formatting issues.
Though when I try to do mix format a while bunch of changes pops up.
I believe I should only commit format changes to the files Travis is complaining about (there will be three of them).
I'll try do to that shortly.

def init({jobs, storage, scheduler}) do
jobs =
case storage.jobs(scheduler) do
effective_jobs =
scheduler
|> storage.jobs()
|> case do
:not_applicable ->
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Config"
Expand All @@ -52,8 +54,8 @@ defmodule Quantum.JobBroadcaster do
end

state = %{
jobs: Enum.into(jobs, %{}, fn %{name: name} = job -> {name, job} end),
buffer: for(%{state: :active} = job <- jobs, do: {:add, job}),
jobs: Enum.into(effective_jobs, %{}, fn %{name: name} = job -> {name, job} end),
buffer: for(%{state: :active} = job <- effective_jobs, do: {:add, job}),
storage: storage,
scheduler: scheduler
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that the existing code was rebinding the jobs variable, so...effective_jobs variable is basically not needed.
I could probably stick with the jobs variable.
I mean this part could be reverted (what existed in the master repo storage branch is just fine).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good like this 👍

}
Expand Down
303 changes: 303 additions & 0 deletions lib/quantum/storage/persistent_ets.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
if Code.ensure_compiled?(PersistentEts) do
defmodule Quantum.Storage.PersistentEts do
@moduledoc """
persistent_ets based implementation of a `Quantum.Storage.Adapter`.
See https://hexdocs.pm/persistent_ets
"""
require Logger
use GenServer
defstruct [:schedulers]

def start_link() do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end

# Callbacks

defp __server__, do: __MODULE__

def init(_) do
{:ok, %__MODULE__{schedulers: %{}}}
end

def handle_call(
{:add_job, scheduler_module, job},
_from,
%__MODULE__{schedulers: schedulers} = state
) do
{
:reply,
do_add_job(scheduler_module, job),
%{
state
| schedulers:
schedulers
|> Map.put_new_lazy(scheduler_module, fn ->
create_scheduler_module_atom(scheduler_module)
end)
}
}
end

def handle_call({:jobs, scheduler_module}, _from, %__MODULE__{schedulers: schedulers} = state) do
{
:reply,
do_get_jobs(scheduler_module),
%{
state
| schedulers:
schedulers
|> Map.put_new_lazy(scheduler_module, fn ->
create_scheduler_module_atom(scheduler_module)
end)
}
}
end

def handle_call(
{:delete_job, scheduler_module, job},
_from,
%__MODULE__{schedulers: schedulers} = state
) do
{
:reply,
do_delete_job(scheduler_module, job),
%{
state
| schedulers:
schedulers
|> Map.put_new_lazy(scheduler_module, fn ->
create_scheduler_module_atom(scheduler_module)
end)
}
}
end

def handle_call(
{:update_job_state, scheduler_module, job_name, job_state},
_from,
%__MODULE__{schedulers: schedulers} = state
) do
{
:reply,
do_update_job_state(scheduler_module, job_name, job_state),
%{
state
| schedulers:
schedulers
|> Map.put_new_lazy(scheduler_module, fn ->
create_scheduler_module_atom(scheduler_module)
end)
}
}
end

def handle_call(
{:last_execution_date, scheduler_module},
_from,
%__MODULE__{schedulers: schedulers} = state
) do
{
:reply,
do_get_last_execution_date(scheduler_module),
%{
state
| schedulers:
schedulers
|> Map.put_new_lazy(scheduler_module, fn ->
create_scheduler_module_atom(scheduler_module)
end)
}
}
end

def handle_call(
{:update_last_execution_date, scheduler_module, last_execution_date},
_from,
%__MODULE__{schedulers: schedulers} = state
) do
{
:reply,
do_update_last_execution_date(scheduler_module, last_execution_date),
%{
state
| schedulers:
schedulers
|> Map.put_new_lazy(scheduler_module, fn ->
create_scheduler_module_atom(scheduler_module)
end)
}
}
end

def handle_call(
{:purge, scheduler_module},
_from,
%__MODULE__{schedulers: schedulers} = state
) do
{
:reply,
do_purge(scheduler_module),
%{
state
| schedulers:
schedulers
|> Map.put_new_lazy(scheduler_module, fn ->
create_scheduler_module_atom(scheduler_module)
end)
}
}
end

# Helpers
defp create_scheduler_module_atom(scheduler_module) do
scheduler_module
end

defp job_key(job_name) do
{:job, job_name}
end

defp get_ets_by_scheduler(scheduler_module) do
scheduler_module_atom = create_scheduler_module_atom(scheduler_module)

unless ets_exist?(scheduler_module_atom) do
PersistentEts.new(scheduler_module_atom, "#{scheduler_module_atom}.tab", [
:named_table,
:set
])
else
scheduler_module_atom
end
end

defp ets_exist?(ets_name) do
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Determining whether ETS table with name [#{
inspect(ets_name)
}] exists"
end)

result =
case :ets.info(ets_name) do
:undefined -> false
_ -> true
end

Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] ETS table with name [#{inspect(ets_name)}] #{
if result, do: ~S|exists|, else: ~S|does not exist|
}"
end)

result
end

# Private functions
defp do_add_job(scheduler_module, job) do
table = get_ets_by_scheduler(scheduler_module)
:ets.insert(table, entry = {job_key(job.name), job})

Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] inserting [#{inspect(entry)}] into Persistent ETS table [#{
table
}]"
end)

:ok
end

defp do_get_jobs(scheduler_module) do
table = get_ets_by_scheduler(scheduler_module)

result =
case :ets.match(table, {{:job, :_}, :"$1"}) do
[] -> :not_applicable
[_h | _t] = jobs -> jobs |> List.flatten()
end

Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] jobs are: #{inspect(result)}"
end)

result
end

defp do_delete_job(scheduler_module, job_name) do
table = get_ets_by_scheduler(scheduler_module)
:ets.delete(table, job_key(job_name))
:ok
end

defp do_update_job_state(scheduler_module, job_name, state) do
table = get_ets_by_scheduler(scheduler_module)

job =
case :ets.lookup(table, {:job, job_name}) do
# TODO: should we raise here or should we handle the situation with a return value of a special kind?
[] ->
raise "Job #{job_name} does not exist in the storage"

[j | _t] ->
j
end

upd_job = %{job | state: state}
:ets.update_element(table, job_key(job_name), {1, upd_job})
:ok
end

defp do_get_last_execution_date(scheduler_module) do
table = get_ets_by_scheduler(scheduler_module)

case :ets.lookup(table, :last_execution_date) do
[] -> :unknown
[{:last_execution_date, date} | _t] -> date
{:last_execution_date, d} -> d
end
end

defp do_update_last_execution_date(scheduler_module, last_execution_date) do
table = get_ets_by_scheduler(scheduler_module)
:ets.insert(table, {:last_execution_date, last_execution_date})
:ok
end

defp do_purge(scheduler_module) do
table = get_ets_by_scheduler(scheduler_module)
:ets.delete_all_objects(table)
:ok
end

@behaviour Quantum.Storage.Adapter

def jobs(scheduler_module) do
__server__ |> GenServer.call({:jobs, scheduler_module})
end

def add_job(scheduler_module, job) do
__server__ |> GenServer.call({:add_job, scheduler_module, job})
end

def delete_job(scheduler_module, job_name) do
__server__ |> GenServer.call({:delete_job, scheduler_module, job_name})
end

def update_job_state(scheduler_module, job_name, state) do
__server__ |> GenServer.call({:update_job_state, scheduler_module, job_name, state})
end

def last_execution_date(scheduler_module) do
__server__ |> GenServer.call({:last_execution_date, scheduler_module})
end

def update_last_execution_date(scheduler_module, last_execution_date) do
__server__
|> GenServer.call({:update_last_execution_date, scheduler_module, last_execution_date})
end

def purge(scheduler_module) do
__server__ |> GenServer.call({:purge, scheduler_module})
end
end
end
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ defmodule Quantum.Mixfile do
{:excoveralls, "~> 0.5", only: [:dev, :test], runtime: false},
{:inch_ex, "~> 0.5", only: [:dev, :docs], runtime: false},
{:dialyxir, "~> 0.5", only: [:dev, :test], runtime: false},
{:credo, "~> 0.7", only: [:dev, :test], runtime: false}
{:credo, "~> 0.7", only: [:dev, :test], runtime: false},
{:persistent_ets, "~> 0.1.0", optional: true},
]
end
end