diff --git a/lib/quantum/executor.ex b/lib/quantum/executor.ex index b77852c..60ea3f0 100644 --- a/lib/quantum/executor.ex +++ b/lib/quantum/executor.ex @@ -25,11 +25,15 @@ defmodule Quantum.Executor do @spec execute(StartOpts.t(), Job.t(), Node.t()) :: :ok # Execute task on all given nodes without checking for overlap defp execute( - %StartOpts{task_supervisor_reference: task_supervisor, debug_logging: debug_logging}, + %StartOpts{ + task_supervisor_reference: task_supervisor, + debug_logging: debug_logging, + scheduler: scheduler + }, %Job{overlap: true} = job, node ) do - run(node, job, task_supervisor, debug_logging) + run(node, job, task_supervisor, debug_logging, scheduler) :ok end @@ -39,7 +43,8 @@ defmodule Quantum.Executor do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Job{overlap: false, name: job_name} = job, node @@ -51,7 +56,7 @@ defmodule Quantum.Executor do case TaskRegistry.mark_running(task_registry, job_name, node) do :marked_running -> - %Task{ref: ref} = run(node, job, task_supervisor, debug_logging) + %Task{ref: ref} = run(node, job, task_supervisor, debug_logging, scheduler) receive do {^ref, _} -> @@ -69,8 +74,8 @@ defmodule Quantum.Executor do end # Ececute the given function on a given node via the task supervisor - @spec run(Node.t(), Job.t(), GenServer.server(), boolean()) :: Task.t() - defp run(node, %{name: job_name, task: task}, task_supervisor, debug_logging) do + @spec run(Node.t(), Job.t(), GenServer.server(), boolean(), atom()) :: Task.t() + defp run(node, %{name: job_name, task: task}, task_supervisor, debug_logging, scheduler) do debug_logging && Logger.debug(fn -> "[#{inspect(Node.self())}][#{__MODULE__}] Task for job #{inspect(job_name)} started on node #{ @@ -84,6 +89,15 @@ defmodule Quantum.Executor do "[#{inspect(Node.self())}][#{__MODULE__}] Execute started for job #{inspect(job_name)}" end) + # Note: we are intentionally mimicking the ":telemetry.span" here to keep current functionality + start_monotonic_time = :erlang.monotonic_time() + + :telemetry.execute([:quantum, :job, :start], %{system_time: start_monotonic_time}, %{ + job_name: job_name, + node: inspect(node), + scheduler: scheduler + }) + try do execute_task(task) catch @@ -94,6 +108,16 @@ defmodule Quantum.Executor do inspect(job_name) }, which failed due to: #{Exception.format(type, value, __STACKTRACE__)}" end) + + duration = :erlang.monotonic_time() - start_monotonic_time + + :telemetry.execute([:quantum, :job, :exception], %{duration: duration}, %{ + job_name: job_name, + node: inspect(node), + reason: value, + stacktrace: __STACKTRACE__, + scheduler: scheduler + }) else result -> debug_logging && @@ -102,6 +126,14 @@ defmodule Quantum.Executor do inspect(job_name) }, which yielded result: #{inspect(result)}" end) + + duration = :erlang.monotonic_time() - start_monotonic_time + + :telemetry.execute([:quantum, :job, :stop], %{duration: duration}, %{ + job_name: job_name, + node: inspect(node), + scheduler: scheduler + }) end :ok diff --git a/lib/quantum/executor/start_opts.ex b/lib/quantum/executor/start_opts.ex index 2115332..e26d9e8 100644 --- a/lib/quantum/executor/start_opts.ex +++ b/lib/quantum/executor/start_opts.ex @@ -6,13 +6,15 @@ defmodule Quantum.Executor.StartOpts do @type t :: %__MODULE__{ task_supervisor_reference: GenServer.server(), task_registry_reference: GenServer.server(), - debug_logging: boolean + debug_logging: boolean, + scheduler: atom() } @enforce_keys [ :task_supervisor_reference, :task_registry_reference, - :debug_logging + :debug_logging, + :scheduler ] defstruct @enforce_keys end diff --git a/lib/quantum/executor_supervisor.ex b/lib/quantum/executor_supervisor.ex index 5644186..ba98275 100644 --- a/lib/quantum/executor_supervisor.ex +++ b/lib/quantum/executor_supervisor.ex @@ -19,7 +19,8 @@ defmodule Quantum.ExecutorSupervisor do :node_selector_broadcaster_reference, :task_supervisor_reference, :task_registry_reference, - :debug_logging + :debug_logging, + :scheduler ]) ), name: name @@ -49,7 +50,8 @@ defmodule Quantum.ExecutorSupervisor do Map.take(opts, [ :task_supervisor_reference, :task_registry_reference, - :debug_logging + :debug_logging, + :scheduler ]) ) diff --git a/lib/quantum/executor_supervisor/init_opts.ex b/lib/quantum/executor_supervisor/init_opts.ex index aa1edbf..cbff5f4 100644 --- a/lib/quantum/executor_supervisor/init_opts.ex +++ b/lib/quantum/executor_supervisor/init_opts.ex @@ -7,14 +7,16 @@ defmodule Quantum.ExecutorSupervisor.InitOpts do node_selector_broadcaster_reference: GenServer.server(), task_supervisor_reference: GenServer.server(), task_registry_reference: GenServer.server(), - debug_logging: boolean + debug_logging: boolean, + scheduler: atom() } @enforce_keys [ :node_selector_broadcaster_reference, :task_supervisor_reference, :task_registry_reference, - :debug_logging + :debug_logging, + :scheduler ] defstruct @enforce_keys end diff --git a/lib/quantum/executor_supervisor/start_opts.ex b/lib/quantum/executor_supervisor/start_opts.ex index 09b6fee..d42fe57 100644 --- a/lib/quantum/executor_supervisor/start_opts.ex +++ b/lib/quantum/executor_supervisor/start_opts.ex @@ -8,7 +8,8 @@ defmodule Quantum.ExecutorSupervisor.StartOpts do node_selector_broadcaster_reference: GenServer.server(), task_supervisor_reference: GenServer.server(), task_registry_reference: GenServer.server(), - debug_logging: boolean() + debug_logging: boolean(), + scheduler: atom() } @enforce_keys [ @@ -16,7 +17,8 @@ defmodule Quantum.ExecutorSupervisor.StartOpts do :node_selector_broadcaster_reference, :task_supervisor_reference, :task_registry_reference, - :debug_logging + :debug_logging, + :scheduler ] defstruct @enforce_keys end diff --git a/lib/quantum/job_broadcaster.ex b/lib/quantum/job_broadcaster.ex index 369d1e4..0467337 100644 --- a/lib/quantum/job_broadcaster.ex +++ b/lib/quantum/job_broadcaster.ex @@ -60,6 +60,16 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Storage, skipping config" end) + for %Job{state: :active, name: name} = job <- storage_jobs do + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :add], %{}, %{ + job_name: name, + job: job, + node: inspect(Node.self()), + scheduler: scheduler + }) + end + storage_jobs end @@ -99,6 +109,14 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}" end) + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :update], %{}, %{ + job_name: job_name, + job: job, + node: inspect(Node.self()), + scheduler: state.scheduler + }) + :ok = storage.delete_job(storage_pid, job_name) :ok = storage.add_job(storage_pid, job) @@ -111,6 +129,14 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}" end) + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :update], %{}, %{ + job_name: job_name, + job: job, + node: inspect(Node.self()), + scheduler: state.scheduler + }) + :ok = storage.delete_job(storage_pid, job_name) :ok = storage.add_job(storage_pid, job) @@ -122,6 +148,14 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}" end) + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :add], %{}, %{ + job_name: job_name, + job: job, + node: inspect(Node.self()), + scheduler: state.scheduler + }) + :ok = storage.add_job(storage_pid, job) {:noreply, [{:add, job}], %{state | jobs: Map.put(jobs, job_name, job)}} @@ -144,6 +178,14 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}" end) + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :update], %{}, %{ + job_name: job_name, + job: job, + node: inspect(Node.self()), + scheduler: state.scheduler + }) + :ok = storage.delete_job(storage_pid, job_name) :ok = storage.add_job(storage_pid, job) @@ -155,6 +197,14 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}" end) + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :update], %{}, %{ + job_name: job_name, + job: job, + node: inspect(Node.self()), + scheduler: state.scheduler + }) + :ok = storage.delete_job(storage_pid, job_name) :ok = storage.add_job(storage_pid, job) @@ -166,6 +216,14 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}" end) + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :add], %{}, %{ + job_name: job_name, + job: job, + node: inspect(Node.self()), + scheduler: state.scheduler + }) + :ok = storage.add_job(storage_pid, job) {:noreply, [], %{state | jobs: Map.put(jobs, job_name, job)}} @@ -187,12 +245,28 @@ defmodule Quantum.JobBroadcaster do end) case Map.fetch(jobs, name) do - {:ok, %{state: :active}} -> + {:ok, %{state: :active, name: name} = job} -> + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :delete], %{}, %{ + job_name: name, + job: job, + node: inspect(Node.self()), + scheduler: state.scheduler + }) + :ok = storage.delete_job(storage_pid, name) {:noreply, [{:remove, name}], %{state | jobs: Map.delete(jobs, name)}} - {:ok, %{state: :inactive}} -> + {:ok, %{state: :inactive, name: name} = job} -> + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :delete], %{}, %{ + job_name: name, + job: job, + node: inspect(Node.self()), + scheduler: state.scheduler + }) + :ok = storage.delete_job(storage_pid, name) {:noreply, [], %{state | jobs: Map.delete(jobs, name)}} @@ -224,6 +298,14 @@ defmodule Quantum.JobBroadcaster do {:noreply, [], state} {:ok, job} -> + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :update], %{}, %{ + job_name: name, + job: job, + node: inspect(Node.self()), + scheduler: state.scheduler + }) + jobs = Map.update!(jobs, name, &Job.set_state(&1, new_state)) :ok = storage.update_job_state(storage_pid, job.name, new_state) @@ -252,6 +334,16 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Deleting all jobs" end) + for {name, %Job{} = job} <- jobs do + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :delete], %{}, %{ + job_name: name, + job: job, + node: inspect(Node.self()), + scheduler: state.scheduler + }) + end + messages = for {name, %Job{state: :active}} <- jobs, do: {:remove, name} :ok = storage.purge(storage_pid) diff --git a/lib/quantum/supervisor.ex b/lib/quantum/supervisor.ex index a548c70..0468d2f 100644 --- a/lib/quantum/supervisor.ex +++ b/lib/quantum/supervisor.ex @@ -88,6 +88,7 @@ defmodule Quantum.Supervisor do |> Map.put(:task_supervisor_reference, task_supervisor_name) |> Map.put(:task_registry_reference, task_registry_name) |> Map.put(:name, executor_supervisor_name) + |> Map.put(:scheduler, scheduler) ) Supervisor.init( diff --git a/mix.exs b/mix.exs index 5942654..72013bf 100644 --- a/mix.exs +++ b/mix.exs @@ -101,7 +101,8 @@ defmodule Quantum.Mixfile do {:ex_doc, "~> 0.19", only: [:dev, :docs], runtime: false}, {:excoveralls, "~> 0.5", only: [:test], runtime: false}, {:dialyxir, "~> 1.0-rc", only: [:dev], runtime: false}, - {:credo, "~> 1.0", only: [:dev], runtime: false} + {:credo, "~> 1.0", only: [:dev], runtime: false}, + {:telemetry, "~> 0.4"} ] end end diff --git a/pages/configuration.md b/pages/configuration.md index e4dae7d..3e683c5 100644 --- a/pages/configuration.md +++ b/pages/configuration.md @@ -154,3 +154,6 @@ Timezones can also be configured on a per-job basis. This overrides the default timezone: "America/New_York" } ``` + +## Telemetry Support + diff --git a/test/quantum/executor_test.exs b/test/quantum/executor_test.exs index 6f58f32..2201a07 100644 --- a/test/quantum/executor_test.exs +++ b/test/quantum/executor_test.exs @@ -17,6 +17,65 @@ defmodule Quantum.ExecutorTest do use Quantum, otp_app: :job_broadcaster_test end + defmodule TelemetryTestHandler do + require Logger + + def handle_event( + [:quantum, :job, :start], + %{system_time: _system_time} = _measurements, + %{job_name: job_name, node: _node, scheduler: _scheduler} = _metadata, + %{parent_thread: parent_thread, test_id: test_id} + ) do + send(parent_thread, %{test_id: test_id, job_name: job_name, type: :start}) + end + + def handle_event( + [:quantum, :job, :stop], + %{duration: _duration} = _measurements, + %{job_name: job_name, node: _node, scheduler: _scheduler} = _metadata, + %{parent_thread: parent_thread, test_id: test_id} + ) do + send(parent_thread, %{test_id: test_id, job_name: job_name, type: :stop}) + end + + def handle_event( + [:quantum, :job, :exception], + %{duration: _duration} = _measurements, + %{ + job_name: job_name, + node: _node, + reason: reason, + stacktrace: stacktrace, + scheduler: _scheduler + } = _metadata, + %{parent_thread: parent_thread, test_id: test_id} + ) do + send(parent_thread, %{ + test_id: test_id, + job_name: job_name, + type: :exception, + reason: reason, + stacktrace: stacktrace + }) + end + end + + defp attach_telemetry(test_id, parent_thread) do + :telemetry.attach_many( + test_id, + [ + [:quantum, :job, :start], + [:quantum, :job, :stop], + [:quantum, :job, :exception] + ], + &TelemetryTestHandler.handle_event/4, + %{ + parent_thread: parent_thread, + test_id: test_id + } + ) + end + setup tags do {:ok, _task_supervisor} = start_supervised({Task.Supervisor, [name: Module.concat(__MODULE__, TaskSupervisor)]}) @@ -39,7 +98,8 @@ defmodule Quantum.ExecutorTest do %{ task_supervisor: Module.concat(__MODULE__, TaskSupervisor), task_registry: Module.concat(__MODULE__, TaskRegistry), - debug_logging: true + debug_logging: true, + scheduler: TestScheduler } } end @@ -48,10 +108,15 @@ defmodule Quantum.ExecutorTest do test "executes given task using anonymous function", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do caller = self() + test_id = "log-anonymous-job-handler" + + :ok = attach_telemetry(test_id, self()) + job = TestScheduler.new_job() |> Job.set_task(fn -> send(caller, :executed) end) @@ -61,22 +126,31 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) assert_receive :executed + + assert_receive %{test_id: ^test_id, type: :start} + assert_receive %{test_id: ^test_id, type: :stop}, 2000 end) end test "executes given task using function tuple", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do caller = self() + test_id = "log-function-tuple-job-handler" + + :ok = attach_telemetry(test_id, self()) + job = TestScheduler.new_job() |> Job.set_task({__MODULE__, :send, [caller]}) @@ -86,21 +160,29 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) assert_receive :executed end) + + assert_receive %{test_id: ^test_id, type: :start} + assert_receive %{test_id: ^test_id, type: :stop}, 2000 end test "executes given task without overlap", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do caller = self() + test_id = "log-task-no-overlap-handler" + + :ok = attach_telemetry(test_id, self()) job = TestScheduler.new_job() @@ -115,7 +197,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) @@ -124,7 +207,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) @@ -132,14 +216,21 @@ defmodule Quantum.ExecutorTest do assert_receive :executed refute_receive :executed end) + + assert_receive %{test_id: ^test_id, type: :start} + assert_receive %{test_id: ^test_id, type: :stop}, 2000 end test "releases lock on success", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do caller = self() + test_id = "release-lock-on-success-handler" + + :ok = attach_telemetry(test_id, self()) job = TestScheduler.new_job() @@ -162,7 +253,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: node} ) @@ -179,13 +271,21 @@ defmodule Quantum.ExecutorTest do assert :marked_running = TaskRegistry.mark_running(task_registry, job.name, Node.self()) end) + + assert_receive %{test_id: ^test_id, type: :start} + assert_receive %{test_id: ^test_id, type: :stop}, 2000 end test "releases lock on error", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do + test_id = "release-lock-on-error-handler" + + :ok = attach_telemetry(test_id, self()) + job = TestScheduler.new_job() |> Job.set_task(fn -> raise "failed" end) @@ -200,7 +300,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) @@ -209,13 +310,33 @@ defmodule Quantum.ExecutorTest do end) assert :marked_running = TaskRegistry.mark_running(task_registry, job.name, Node.self()) + assert_receive %{test_id: ^test_id, type: :start} + + assert_receive %{ + test_id: ^test_id, + type: :exception, + reason: %RuntimeError{message: "failed"}, + stacktrace: [ + {Quantum.ExecutorTest, _, _, _}, + {Quantum.Executor, _, _, _}, + {Task.Supervised, _, _, _}, + {Task.Supervised, _, _, _}, + {:proc_lib, _, _, _} + ] + }, + 2000 end test "logs error", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do + test_id = "logs-error-handler" + + :ok = attach_telemetry(test_id, self()) + job = TestScheduler.new_job() |> Job.set_task(fn -> raise "failed" end) @@ -228,7 +349,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) @@ -237,13 +359,33 @@ defmodule Quantum.ExecutorTest do end) assert logs =~ ~r/\(RuntimeError\) failed/ + assert_receive %{test_id: ^test_id, type: :start} + + assert_receive %{ + test_id: ^test_id, + type: :exception, + reason: %RuntimeError{message: "failed"}, + stacktrace: [ + {Quantum.ExecutorTest, _, _, _}, + {Quantum.Executor, _, _, _}, + {Task.Supervised, _, _, _}, + {Task.Supervised, _, _, _}, + {:proc_lib, _, _, _} + ] + }, + 2000 end test "logs exit", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do + test_id = "logs-exit-handler" + + :ok = attach_telemetry(test_id, self()) + job = TestScheduler.new_job() |> Job.set_task(fn -> exit(:failure) end) @@ -256,7 +398,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) @@ -265,13 +408,33 @@ defmodule Quantum.ExecutorTest do end) assert logs =~ ~r/\(exit\) :failure/ + assert_receive %{test_id: ^test_id, type: :start} + + assert_receive %{ + test_id: ^test_id, + type: :exception, + reason: :failure, + stacktrace: [ + {Quantum.ExecutorTest, _, _, _}, + {Quantum.Executor, _, _, _}, + {Task.Supervised, _, _, _}, + {Task.Supervised, _, _, _}, + {:proc_lib, _, _, _} + ] + }, + 2000 end test "logs throw", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do + test_id = "logs-throw-handler" + + :ok = attach_telemetry(test_id, self()) + ref = make_ref() job = @@ -286,7 +449,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) @@ -295,6 +459,21 @@ defmodule Quantum.ExecutorTest do end) assert logs =~ "(throw) #{inspect(ref)}" + assert_receive %{test_id: ^test_id, type: :start} + + assert_receive %{ + test_id: ^test_id, + type: :exception, + reason: ^ref, + stacktrace: [ + {Quantum.ExecutorTest, _, _, _}, + {Quantum.Executor, _, _, _}, + {Task.Supervised, _, _, _}, + {Task.Supervised, _, _, _}, + {:proc_lib, _, _, _} + ] + }, + 2000 end end diff --git a/test/quantum/job_broadcaster_test.exs b/test/quantum/job_broadcaster_test.exs index 6a977e8..a5b9a04 100644 --- a/test/quantum/job_broadcaster_test.exs +++ b/test/quantum/job_broadcaster_test.exs @@ -1,7 +1,8 @@ defmodule Quantum.JobBroadcasterTest do @moduledoc false - use ExUnit.Case, async: true + # , async: true causes random failures due to race conditions with telemetry tests + use ExUnit.Case alias Quantum.{Job, JobBroadcaster, JobBroadcaster.StartOpts} alias Quantum.Storage.Test, as: TestStorage @@ -20,6 +21,49 @@ defmodule Quantum.JobBroadcasterTest do use Quantum, otp_app: :job_broadcaster_test end + defmodule TelemetryTestHandler do + require Logger + + def handle_event( + [:quantum, :job, :add], + _measurements, + %{job_name: job_name, job: _job, node: _node, scheduler: _scheduler} = _metadata, + %{parent_thread: parent_thread, test_id: test_id} + ) do + send(parent_thread, %{test_id: test_id, job_name: job_name, type: :add}) + end + + def handle_event( + [:quantum, :job, :delete], + _measurements, + %{job_name: job_name, job: _job, node: _node, scheduler: _scheduler} = _metadata, + %{parent_thread: parent_thread, test_id: test_id} + ) do + send(parent_thread, %{test_id: test_id, job_name: job_name, type: :delete}) + end + + def handle_event( + [:quantum, :job, :update], + _measurements, + %{job_name: job_name, job: _job, node: _node, scheduler: _scheduler} = _metadata, + %{parent_thread: parent_thread, test_id: test_id} + ) do + send(parent_thread, %{test_id: test_id, job_name: job_name, type: :update}) + end + end + + defp attach_telemetry(last_atom, test_id, parent_thread) do + :telemetry.attach( + test_id, + [:quantum, :job, last_atom], + &TelemetryTestHandler.handle_event/4, + %{ + parent_thread: parent_thread, + test_id: test_id + } + ) + end + setup tags do if tags[:listen_storage] do Process.put(:test_pid, self()) @@ -84,6 +128,10 @@ defmodule Quantum.JobBroadcasterTest do @tag manual_dispatch: true test "storage jobs", %{active_job: active_job, inactive_job: inactive_job} do + test_id = "init-storage-jobs-handler" + + :ok = attach_telemetry(:add, test_id, self()) + capture_log(fn -> defmodule FullStorage do @moduledoc false @@ -114,12 +162,20 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:received, {:add, _}} refute_receive {:received, {:add, _}} end) + + # Ensure exactly one :add telemetry notifation because only one job is active + assert_receive %{test_id: ^test_id, type: :add} + refute_receive %{test_id: ^test_id, type: :add} end end describe "add" do @tag listen_storage: true test "active", %{broadcaster: broadcaster, active_job: active_job} do + test_id = "add-active-job-handler" + + :ok = attach_telemetry(:add, test_id, self()) + assert capture_log(fn -> TestScheduler.add_job(broadcaster, active_job) @@ -127,6 +183,8 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:add_job, ^active_job, _} end) =~ "Adding job #Reference" + + assert_receive %{test_id: ^test_id} end test "active (without debug-logging)", %{init_jobs: init_jobs, active_job: active_job} do @@ -157,6 +215,10 @@ defmodule Quantum.JobBroadcasterTest do @tag listen_storage: true test "inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do + test_id = "add-inactive-job-handler" + + :ok = attach_telemetry(:add, test_id, self()) + capture_log(fn -> TestScheduler.add_job(broadcaster, inactive_job) @@ -164,6 +226,8 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:add_job, ^inactive_job, _} end) + + assert_receive %{test_id: ^test_id} end @tag listen_storage: true @@ -264,6 +328,10 @@ defmodule Quantum.JobBroadcasterTest do test "active", %{broadcaster: broadcaster, active_job: active_job} do active_job_name = active_job.name + test_id = "log-delete-active-job-handler" + + :ok = attach_telemetry(:delete, test_id, self()) + capture_log(fn -> TestScheduler.delete_job(broadcaster, active_job.name) @@ -275,6 +343,8 @@ defmodule Quantum.JobBroadcasterTest do key == active_job_name end) end) + + assert_receive %{test_id: ^test_id, type: :delete} end @tag listen_storage: true @@ -290,6 +360,10 @@ defmodule Quantum.JobBroadcasterTest do @tag jobs: :inactive, listen_storage: true test "inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do + test_id = "delete-inactive-job-handler" + + :ok = attach_telemetry(:delete, test_id, self()) + capture_log(fn -> inactive_job_name = inactive_job.name @@ -303,6 +377,8 @@ defmodule Quantum.JobBroadcasterTest do key == inactive_job.name end) end) + + assert_receive %{test_id: ^test_id, type: :delete} end end @@ -311,6 +387,10 @@ defmodule Quantum.JobBroadcasterTest do test "active => inactive", %{broadcaster: broadcaster, active_job: active_job} do active_job_name = active_job.name + test_id = "update-active-to-inactive-job-handler" + + :ok = attach_telemetry(:update, test_id, self()) + capture_log(fn -> TestScheduler.deactivate_job(broadcaster, active_job.name) @@ -318,10 +398,16 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:update_job_state, {_, _}, _} end) + + assert_receive %{test_id: ^test_id} end @tag jobs: :inactive, listen_storage: true test "inactive => active", %{broadcaster: broadcaster, inactive_job: inactive_job} do + test_id = "update-inactive-to-active-job-handler" + + :ok = attach_telemetry(:update, test_id, self()) + capture_log(fn -> TestScheduler.activate_job(broadcaster, inactive_job.name) @@ -331,45 +417,70 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:update_job_state, {_, _}, _} end) + + assert_receive %{test_id: ^test_id} end @tag jobs: :active, listen_storage: true test "active => active", %{broadcaster: broadcaster, active_job: active_job} do + test_id = "update-active-to-active-job-handler" + + :ok = attach_telemetry(:update, test_id, self()) + # Initial assert_receive {:received, {:add, ^active_job}} + name = active_job.name capture_log(fn -> - TestScheduler.activate_job(broadcaster, active_job.name) + TestScheduler.activate_job(broadcaster, name) refute_receive {:received, {:add, ^active_job}} refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) + + refute_receive %{test_id: ^test_id, job_name: ^name} end @tag jobs: :inactive, listen_storage: true test "inactive => inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do + test_id = "update-inactive-to-inactive-job-handler" + + :ok = attach_telemetry(:update, test_id, self()) + inactive_job_name = inactive_job.name capture_log(fn -> - TestScheduler.deactivate_job(broadcaster, inactive_job.name) + TestScheduler.deactivate_job(broadcaster, inactive_job_name) refute_receive {:received, {:remove, ^inactive_job_name}} refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) + + refute_receive %{test_id: ^test_id, job_name: ^inactive_job_name} end @tag listen_storage: true test "missing", %{broadcaster: broadcaster} do + test_id = "update-missing-job-handler" + + :ok = attach_telemetry(:update, test_id, self()) + + ref1 = make_ref() + ref2 = make_ref() + capture_log(fn -> - TestScheduler.deactivate_job(broadcaster, make_ref()) - TestScheduler.activate_job(broadcaster, make_ref()) + TestScheduler.deactivate_job(broadcaster, ref1) + TestScheduler.activate_job(broadcaster, ref2) refute_receive {:received, {:remove, _}} refute_receive {:received, {:add, _}} refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) + + refute_receive %{test_id: ^test_id, job_name: ^ref1} + refute_receive %{test_id: ^test_id, job_name: ^ref2} end end @@ -380,6 +491,10 @@ defmodule Quantum.JobBroadcasterTest do active_job: active_job, inactive_job: inactive_job } do + test_id = "delete-all-active-jobs-handler" + + :ok = attach_telemetry(:delete, test_id, self()) + active_job_name = active_job.name inactive_job_name = inactive_job.name @@ -391,6 +506,9 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:purge, _, _} end) + + assert_receive %{test_id: ^test_id, job_name: ^inactive_job_name, type: :delete} + assert_receive %{test_id: ^test_id, job_name: ^active_job_name, type: :delete} end end