Skip to content

Commit

Permalink
Solution: Add telemetry to track metrics (#453)
Browse files Browse the repository at this point in the history
* finished adding, deleteing and updating jobs telemetry tests

* Fixed CI async overlay by passing a unique value for each test

* lint

* added logs to debug CI failure

* added Job_name to prevent collision in CI

* fixed typo

* added telemetry job span tracking

* fixed readme

* fixed dialyzer error

* requested changes

* lint

* Injected scheduler to incude in telemetry metrics

* lint

* Apply suggestions from code review

Co-authored-by: Jonatan Männchen <[email protected]>

* fixed tests, dialyzer doesnt like Module.t

* Module.t() to atom

Co-authored-by: Marc Smith <[email protected]>
Co-authored-by: Jonatan Männchen <[email protected]>
  • Loading branch information
3 people authored Sep 11, 2020
1 parent 5467582 commit e922d4a
Show file tree
Hide file tree
Showing 11 changed files with 474 additions and 40 deletions.
44 changes: 38 additions & 6 deletions lib/quantum/executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ defmodule Quantum.Executor do
@spec execute(StartOpts.t(), Job.t(), Node.t()) :: :ok
# Execute task on all given nodes without checking for overlap
defp execute(
%StartOpts{task_supervisor_reference: task_supervisor, debug_logging: debug_logging},
%StartOpts{
task_supervisor_reference: task_supervisor,
debug_logging: debug_logging,
scheduler: scheduler
},
%Job{overlap: true} = job,
node
) do
run(node, job, task_supervisor, debug_logging)
run(node, job, task_supervisor, debug_logging, scheduler)

:ok
end
Expand All @@ -39,7 +43,8 @@ defmodule Quantum.Executor do
%StartOpts{
task_supervisor_reference: task_supervisor,
task_registry_reference: task_registry,
debug_logging: debug_logging
debug_logging: debug_logging,
scheduler: scheduler
},
%Job{overlap: false, name: job_name} = job,
node
Expand All @@ -51,7 +56,7 @@ defmodule Quantum.Executor do

case TaskRegistry.mark_running(task_registry, job_name, node) do
:marked_running ->
%Task{ref: ref} = run(node, job, task_supervisor, debug_logging)
%Task{ref: ref} = run(node, job, task_supervisor, debug_logging, scheduler)

receive do
{^ref, _} ->
Expand All @@ -69,8 +74,8 @@ defmodule Quantum.Executor do
end

# Ececute the given function on a given node via the task supervisor
@spec run(Node.t(), Job.t(), GenServer.server(), boolean()) :: Task.t()
defp run(node, %{name: job_name, task: task}, task_supervisor, debug_logging) do
@spec run(Node.t(), Job.t(), GenServer.server(), boolean(), atom()) :: Task.t()
defp run(node, %{name: job_name, task: task}, task_supervisor, debug_logging, scheduler) do
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Task for job #{inspect(job_name)} started on node #{
Expand All @@ -84,6 +89,15 @@ defmodule Quantum.Executor do
"[#{inspect(Node.self())}][#{__MODULE__}] Execute started for job #{inspect(job_name)}"
end)

# Note: we are intentionally mimicking the ":telemetry.span" here to keep current functionality
start_monotonic_time = :erlang.monotonic_time()

:telemetry.execute([:quantum, :job, :start], %{system_time: start_monotonic_time}, %{
job_name: job_name,
node: inspect(node),
scheduler: scheduler
})

try do
execute_task(task)
catch
Expand All @@ -94,6 +108,16 @@ defmodule Quantum.Executor do
inspect(job_name)
}, which failed due to: #{Exception.format(type, value, __STACKTRACE__)}"
end)

duration = :erlang.monotonic_time() - start_monotonic_time

:telemetry.execute([:quantum, :job, :exception], %{duration: duration}, %{
job_name: job_name,
node: inspect(node),
reason: value,
stacktrace: __STACKTRACE__,
scheduler: scheduler
})
else
result ->
debug_logging &&
Expand All @@ -102,6 +126,14 @@ defmodule Quantum.Executor do
inspect(job_name)
}, which yielded result: #{inspect(result)}"
end)

duration = :erlang.monotonic_time() - start_monotonic_time

:telemetry.execute([:quantum, :job, :stop], %{duration: duration}, %{
job_name: job_name,
node: inspect(node),
scheduler: scheduler
})
end

:ok
Expand Down
6 changes: 4 additions & 2 deletions lib/quantum/executor/start_opts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ defmodule Quantum.Executor.StartOpts do
@type t :: %__MODULE__{
task_supervisor_reference: GenServer.server(),
task_registry_reference: GenServer.server(),
debug_logging: boolean
debug_logging: boolean,
scheduler: atom()
}

@enforce_keys [
:task_supervisor_reference,
:task_registry_reference,
:debug_logging
:debug_logging,
:scheduler
]
defstruct @enforce_keys
end
6 changes: 4 additions & 2 deletions lib/quantum/executor_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ defmodule Quantum.ExecutorSupervisor do
:node_selector_broadcaster_reference,
:task_supervisor_reference,
:task_registry_reference,
:debug_logging
:debug_logging,
:scheduler
])
),
name: name
Expand Down Expand Up @@ -49,7 +50,8 @@ defmodule Quantum.ExecutorSupervisor do
Map.take(opts, [
:task_supervisor_reference,
:task_registry_reference,
:debug_logging
:debug_logging,
:scheduler
])
)

Expand Down
6 changes: 4 additions & 2 deletions lib/quantum/executor_supervisor/init_opts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ defmodule Quantum.ExecutorSupervisor.InitOpts do
node_selector_broadcaster_reference: GenServer.server(),
task_supervisor_reference: GenServer.server(),
task_registry_reference: GenServer.server(),
debug_logging: boolean
debug_logging: boolean,
scheduler: atom()
}

@enforce_keys [
:node_selector_broadcaster_reference,
:task_supervisor_reference,
:task_registry_reference,
:debug_logging
:debug_logging,
:scheduler
]
defstruct @enforce_keys
end
6 changes: 4 additions & 2 deletions lib/quantum/executor_supervisor/start_opts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ defmodule Quantum.ExecutorSupervisor.StartOpts do
node_selector_broadcaster_reference: GenServer.server(),
task_supervisor_reference: GenServer.server(),
task_registry_reference: GenServer.server(),
debug_logging: boolean()
debug_logging: boolean(),
scheduler: atom()
}

@enforce_keys [
:name,
:node_selector_broadcaster_reference,
:task_supervisor_reference,
:task_registry_reference,
:debug_logging
:debug_logging,
:scheduler
]
defstruct @enforce_keys
end
96 changes: 94 additions & 2 deletions lib/quantum/job_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Storage, skipping config"
end)

for %Job{state: :active, name: name} = job <- storage_jobs do
# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :add], %{}, %{
job_name: name,
job: job,
node: inspect(Node.self()),
scheduler: scheduler
})
end

storage_jobs
end

Expand Down Expand Up @@ -99,6 +109,14 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}"
end)

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :update], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.delete_job(storage_pid, job_name)
:ok = storage.add_job(storage_pid, job)

Expand All @@ -111,6 +129,14 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}"
end)

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :update], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.delete_job(storage_pid, job_name)
:ok = storage.add_job(storage_pid, job)

Expand All @@ -122,6 +148,14 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
end)

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :add], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.add_job(storage_pid, job)

{:noreply, [{:add, job}], %{state | jobs: Map.put(jobs, job_name, job)}}
Expand All @@ -144,6 +178,14 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}"
end)

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :update], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.delete_job(storage_pid, job_name)
:ok = storage.add_job(storage_pid, job)

Expand All @@ -155,6 +197,14 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}"
end)

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :update], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.delete_job(storage_pid, job_name)
:ok = storage.add_job(storage_pid, job)

Expand All @@ -166,6 +216,14 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
end)

# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :add], %{}, %{
job_name: job_name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.add_job(storage_pid, job)

{:noreply, [], %{state | jobs: Map.put(jobs, job_name, job)}}
Expand All @@ -187,12 +245,28 @@ defmodule Quantum.JobBroadcaster do
end)

case Map.fetch(jobs, name) do
{:ok, %{state: :active}} ->
{:ok, %{state: :active, name: name} = job} ->
# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :delete], %{}, %{
job_name: name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.delete_job(storage_pid, name)

{:noreply, [{:remove, name}], %{state | jobs: Map.delete(jobs, name)}}

{:ok, %{state: :inactive}} ->
{:ok, %{state: :inactive, name: name} = job} ->
# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :delete], %{}, %{
job_name: name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

:ok = storage.delete_job(storage_pid, name)

{:noreply, [], %{state | jobs: Map.delete(jobs, name)}}
Expand Down Expand Up @@ -224,6 +298,14 @@ defmodule Quantum.JobBroadcaster do
{:noreply, [], state}

{:ok, job} ->
# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :update], %{}, %{
job_name: name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})

jobs = Map.update!(jobs, name, &Job.set_state(&1, new_state))

:ok = storage.update_job_state(storage_pid, job.name, new_state)
Expand Down Expand Up @@ -252,6 +334,16 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Deleting all jobs"
end)

for {name, %Job{} = job} <- jobs do
# Send event to telemetry incase the end user wants to monitor events
:telemetry.execute([:quantum, :job, :delete], %{}, %{
job_name: name,
job: job,
node: inspect(Node.self()),
scheduler: state.scheduler
})
end

messages = for {name, %Job{state: :active}} <- jobs, do: {:remove, name}

:ok = storage.purge(storage_pid)
Expand Down
1 change: 1 addition & 0 deletions lib/quantum/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ defmodule Quantum.Supervisor do
|> Map.put(:task_supervisor_reference, task_supervisor_name)
|> Map.put(:task_registry_reference, task_registry_name)
|> Map.put(:name, executor_supervisor_name)
|> Map.put(:scheduler, scheduler)
)

Supervisor.init(
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ defmodule Quantum.Mixfile do
{:ex_doc, "~> 0.19", only: [:dev, :docs], runtime: false},
{:excoveralls, "~> 0.5", only: [:test], runtime: false},
{:dialyxir, "~> 1.0-rc", only: [:dev], runtime: false},
{:credo, "~> 1.0", only: [:dev], runtime: false}
{:credo, "~> 1.0", only: [:dev], runtime: false},
{:telemetry, "~> 0.4"}
]
end
end
3 changes: 3 additions & 0 deletions pages/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,6 @@ Timezones can also be configured on a per-job basis. This overrides the default
timezone: "America/New_York"
}
```

## Telemetry Support

Loading

0 comments on commit e922d4a

Please sign in to comment.