Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solution: Add telemetry to track metrics #453

Merged
merged 17 commits into from
Sep 11, 2020
44 changes: 38 additions & 6 deletions lib/quantum/executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ defmodule Quantum.Executor do
@spec execute(StartOpts.t(), Job.t(), Node.t()) :: :ok
# Execute task on all given nodes without checking for overlap
defp execute(
%StartOpts{task_supervisor_reference: task_supervisor, debug_logging: debug_logging},
%StartOpts{
task_supervisor_reference: task_supervisor,
debug_logging: debug_logging,
scheduler: scheduler
},
%Job{overlap: true} = job,
node
) do
run(node, job, task_supervisor, debug_logging)
run(node, job, task_supervisor, debug_logging, scheduler)

:ok
end
Expand All @@ -39,7 +43,8 @@ defmodule Quantum.Executor do
%StartOpts{
task_supervisor_reference: task_supervisor,
task_registry_reference: task_registry,
debug_logging: debug_logging
debug_logging: debug_logging,
scheduler: scheduler
},
%Job{overlap: false, name: job_name} = job,
node
Expand All @@ -51,7 +56,7 @@ defmodule Quantum.Executor do

case TaskRegistry.mark_running(task_registry, job_name, node) do
:marked_running ->
%Task{ref: ref} = run(node, job, task_supervisor, debug_logging)
%Task{ref: ref} = run(node, job, task_supervisor, debug_logging, scheduler)

receive do
{^ref, _} ->
Expand All @@ -69,8 +74,8 @@ defmodule Quantum.Executor do
end

# Ececute the given function on a given node via the task supervisor
@spec run(Node.t(), Job.t(), GenServer.server(), boolean()) :: Task.t()
defp run(node, %{name: job_name, task: task}, task_supervisor, debug_logging) do
@spec run(Node.t(), Job.t(), GenServer.server(), boolean(), atom()) :: Task.t()
defp run(node, %{name: job_name, task: task}, task_supervisor, debug_logging, scheduler) do
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Task for job #{inspect(job_name)} started on node #{
Expand All @@ -84,6 +89,15 @@ defmodule Quantum.Executor do
"[#{inspect(Node.self())}][#{__MODULE__}] Execute started for job #{inspect(job_name)}"
end)

# Note: we are intentionally mimicking the ":telemetry.span" here to keep current functionality
start_monotonic_time = :erlang.monotonic_time()

:telemetry.execute([:quantum, :job, :start], %{system_time: start_monotonic_time}, %{
job_name: job_name,
node: inspect(node),
scheduler: scheduler
})

try do
execute_task(task)
catch
Expand All @@ -94,6 +108,16 @@ defmodule Quantum.Executor do
inspect(job_name)
}, which failed due to: #{Exception.format(type, value, __STACKTRACE__)}"
end)

duration = :erlang.monotonic_time() - start_monotonic_time

:telemetry.execute([:quantum, :job, :exception], %{duration: duration}, %{
job_name: job_name,
node: inspect(node),
reason: value,
stacktrace: __STACKTRACE__,
scheduler: scheduler
})
else
result ->
debug_logging &&
Expand All @@ -102,6 +126,14 @@ defmodule Quantum.Executor do
inspect(job_name)
}, which yielded result: #{inspect(result)}"
end)

duration = :erlang.monotonic_time() - start_monotonic_time

:telemetry.execute([:quantum, :job, :stop], %{duration: duration}, %{
job_name: job_name,
node: inspect(node),
scheduler: scheduler
})
end

:ok
Expand Down
6 changes: 4 additions & 2 deletions lib/quantum/executor/start_opts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ defmodule Quantum.Executor.StartOpts do
@type t :: %__MODULE__{
task_supervisor_reference: GenServer.server(),
task_registry_reference: GenServer.server(),
debug_logging: boolean
debug_logging: boolean,
scheduler: atom()
}

@enforce_keys [
:task_supervisor_reference,
:task_registry_reference,
:debug_logging
:debug_logging,
:scheduler
]
defstruct @enforce_keys
end
6 changes: 4 additions & 2 deletions lib/quantum/executor_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ defmodule Quantum.ExecutorSupervisor do
:node_selector_broadcaster_reference,
:task_supervisor_reference,
:task_registry_reference,
:debug_logging
:debug_logging,
:scheduler
])
),
name: name
Expand Down Expand Up @@ -49,7 +50,8 @@ defmodule Quantum.ExecutorSupervisor do
Map.take(opts, [
:task_supervisor_reference,
:task_registry_reference,
:debug_logging
:debug_logging,
:scheduler
])
)

Expand Down
6 changes: 4 additions & 2 deletions lib/quantum/executor_supervisor/init_opts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ defmodule Quantum.ExecutorSupervisor.InitOpts do
node_selector_broadcaster_reference: GenServer.server(),
task_supervisor_reference: GenServer.server(),
task_registry_reference: GenServer.server(),
debug_logging: boolean
debug_logging: boolean,
scheduler: atom()
}

@enforce_keys [
:node_selector_broadcaster_reference,
:task_supervisor_reference,
:task_registry_reference,
:debug_logging
:debug_logging,
:scheduler
]
defstruct @enforce_keys
end
6 changes: 4 additions & 2 deletions lib/quantum/executor_supervisor/start_opts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ defmodule Quantum.ExecutorSupervisor.StartOpts do
node_selector_broadcaster_reference: GenServer.server(),
task_supervisor_reference: GenServer.server(),
task_registry_reference: GenServer.server(),
debug_logging: boolean()
debug_logging: boolean(),
scheduler: atom()
}

@enforce_keys [
:name,
:node_selector_broadcaster_reference,
:task_supervisor_reference,
:task_registry_reference,
:debug_logging
:debug_logging,
:scheduler
]
defstruct @enforce_keys
end
96 changes: 94 additions & 2 deletions lib/quantum/job_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Storage, skipping config"
end)

for %Job{state: :active, name: name} = job <- storage_jobs do
# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :add], %{}, %{
job_name: name,
job: job,
node: inspect(Node.self()),
scheduler: scheduler
})
end

storage_jobs
end

Expand Down Expand Up @@ -99,6 +109,14 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}"
end)

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :update], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.delete_job(storage_pid, job_name)
:ok = storage.add_job(storage_pid, job)

Expand All @@ -111,6 +129,14 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}"
end)

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :update], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.delete_job(storage_pid, job_name)
:ok = storage.add_job(storage_pid, job)

Expand All @@ -122,6 +148,14 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
end)

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :add], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.add_job(storage_pid, job)

{:noreply, [{:add, job}], %{state | jobs: Map.put(jobs, job_name, job)}}
Expand All @@ -144,6 +178,14 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}"
end)

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :update], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.delete_job(storage_pid, job_name)
:ok = storage.add_job(storage_pid, job)

Expand All @@ -155,6 +197,14 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}"
end)

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :update], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.delete_job(storage_pid, job_name)
:ok = storage.add_job(storage_pid, job)

Expand All @@ -166,6 +216,14 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
end)

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :add], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.add_job(storage_pid, job)

{:noreply, [], %{state | jobs: Map.put(jobs, job_name, job)}}
Expand All @@ -187,12 +245,28 @@ defmodule Quantum.JobBroadcaster do
end)

case Map.fetch(jobs, name) do
{:ok, %{state: :active}} ->
{:ok, %{state: :active, name: name} = job} ->
# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :delete], %{}, %{
job_name: name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.delete_job(storage_pid, name)

{:noreply, [{:remove, name}], %{state | jobs: Map.delete(jobs, name)}}

{:ok, %{state: :inactive}} ->
{:ok, %{state: :inactive, name: name} = job} ->
# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :delete], %{}, %{
job_name: name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.delete_job(storage_pid, name)

{:noreply, [], %{state | jobs: Map.delete(jobs, name)}}
Expand Down Expand Up @@ -224,6 +298,14 @@ defmodule Quantum.JobBroadcaster do
{:noreply, [], state}

{:ok, job} ->
# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :update], %{}, %{
job_name: name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

jobs = Map.update!(jobs, name, &Job.set_state(&1, new_state))

:ok = storage.update_job_state(storage_pid, job.name, new_state)
Expand Down Expand Up @@ -252,6 +334,16 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Deleting all jobs"
end)

for {name, %Job{} = job} <- jobs do
# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :delete], %{}, %{
job_name: name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})
end

messages = for {name, %Job{state: :active}} <- jobs, do: {:remove, name}

:ok = storage.purge(storage_pid)
Expand Down
1 change: 1 addition & 0 deletions lib/quantum/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ defmodule Quantum.Supervisor do
|> Map.put(:task_supervisor_reference, task_supervisor_name)
|> Map.put(:task_registry_reference, task_registry_name)
|> Map.put(:name, executor_supervisor_name)
|> Map.put(:scheduler, scheduler)
)

Supervisor.init(
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ defmodule Quantum.Mixfile do
{:ex_doc, "~> 0.19", only: [:dev, :docs], runtime: false},
{:excoveralls, "~> 0.5", only: [:test], runtime: false},
{:dialyxir, "~> 1.0-rc", only: [:dev], runtime: false},
{:credo, "~> 1.0", only: [:dev], runtime: false}
{:credo, "~> 1.0", only: [:dev], runtime: false},
{:telemetry, "~> 0.4"}
]
end
end
3 changes: 3 additions & 0 deletions pages/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,6 @@ Timezones can also be configured on a per-job basis. This overrides the default
timezone: "America/New_York"
}
```

## Telemetry Support

Loading