From 6420985bdd087fd62f41900c7dd5ecbb73197ae2 Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Mon, 31 Aug 2020 16:54:07 -0700 Subject: [PATCH 01/16] finished adding, deleteing and updating jobs telemetry tests --- .tool-versions | 2 + Makefile | 14 +++ README.md | 10 ++- lib/quantum/job_broadcaster.ex | 28 ++++++ mix.exs | 3 +- test/quantum/job_broadcaster_test.exs | 121 ++++++++++++++++++++++++++ 6 files changed, 173 insertions(+), 5 deletions(-) create mode 100644 .tool-versions create mode 100644 Makefile diff --git a/.tool-versions b/.tool-versions new file mode 100644 index 0000000..df178cf --- /dev/null +++ b/.tool-versions @@ -0,0 +1,2 @@ +elixir 1.9.4-otp-22 +erlang 22.1.8 diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d33c41e --- /dev/null +++ b/Makefile @@ -0,0 +1,14 @@ +pre-push: + make compile && make lint && make test + +compile: + rm -rf _build/dev/lib/quantum && mix compile --warnings-as-errors + +lint: + mix format && make dialyzer + +dialyzer: + mix dialyzer --format dialyxir + +test: + mix test diff --git a/README.md b/README.md index 1670782..a2ee27e 100644 --- a/README.md +++ b/README.md @@ -100,10 +100,12 @@ terms of this contract." 1. Check for [open issues](https://github.com/quantum-elixir/quantum-core/issues) or [open a new issue](https://github.com/quantum-elixir/quantum-core/issues/new) to start a discussion around [a problem](https://www.youtube.com/watch?v=_QF9sFJGJuc). 2. Issues SHALL be named as "Problem: _description of the problem_". 3. Fork the [quantum-elixir repository on GitHub](https://github.com/quantum-elixir/quantum-core) to start making your changes -4. If possible, write a test which shows that the problem was solved. -5. Send a pull request. -6. Pull requests SHALL be named as "Solution: _description of your solution_" -7. Your pull request is merged and you are added to the [list of contributors](https://github.com/quantum-elixir/quantum-core/graphs/contributors) +4. If needed, run `mix deps.get` +5. If possible, write a test which shows that the problem was solved. +6. run `make pre-push` to verify all tests pass +7. Send a pull request. +8. Pull requests SHALL be named as "Solution: _description of your solution_" +9. Your pull request is merged and you are added to the [list of contributors](https://github.com/quantum-elixir/quantum-core/graphs/contributors) ### Code Contributors diff --git a/lib/quantum/job_broadcaster.ex b/lib/quantum/job_broadcaster.ex index 99f4d98..f0e82d9 100644 --- a/lib/quantum/job_broadcaster.ex +++ b/lib/quantum/job_broadcaster.ex @@ -96,6 +96,13 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}" end) + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :add], %{}, %{ + job_name: job_name, + node: inspect(Node.self()), + module: __MODULE__ + }) + :ok = storage.add_job(storage_pid, job) {:noreply, [{:add, job}], %{state | jobs: Map.put(jobs, job_name, job)}} @@ -115,6 +122,13 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}" end) + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :add], %{}, %{ + job_name: job_name, + node: inspect(Node.self()), + module: __MODULE__ + }) + :ok = storage.add_job(storage_pid, job) {:noreply, [], %{state | jobs: Map.put(jobs, job_name, job)}} @@ -134,6 +148,13 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Deleting job #{inspect(name)}" end) + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :delete], %{}, %{ + job_name: name, + node: inspect(Node.self()), + module: __MODULE__ + }) + case Map.fetch(jobs, name) do {:ok, %{state: :active}} -> :ok = storage.delete_job(storage_pid, name) @@ -172,6 +193,13 @@ defmodule Quantum.JobBroadcaster do {:noreply, [], state} {:ok, job} -> + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :update], %{}, %{ + job_name: name, + node: inspect(Node.self()), + module: __MODULE__ + }) + jobs = Map.update!(jobs, name, &Job.set_state(&1, new_state)) :ok = storage.update_job_state(storage_pid, job.name, new_state) diff --git a/mix.exs b/mix.exs index 5942654..72013bf 100644 --- a/mix.exs +++ b/mix.exs @@ -101,7 +101,8 @@ defmodule Quantum.Mixfile do {:ex_doc, "~> 0.19", only: [:dev, :docs], runtime: false}, {:excoveralls, "~> 0.5", only: [:test], runtime: false}, {:dialyxir, "~> 1.0-rc", only: [:dev], runtime: false}, - {:credo, "~> 1.0", only: [:dev], runtime: false} + {:credo, "~> 1.0", only: [:dev], runtime: false}, + {:telemetry, "~> 0.4"} ] end end diff --git a/test/quantum/job_broadcaster_test.exs b/test/quantum/job_broadcaster_test.exs index c1c0abc..20118c1 100644 --- a/test/quantum/job_broadcaster_test.exs +++ b/test/quantum/job_broadcaster_test.exs @@ -18,6 +18,37 @@ defmodule Quantum.JobBroadcasterTest do use Quantum, otp_app: :job_broadcaster_test end + defmodule TelemetryTestHandler do + require Logger + + def handle_event( + [:quantum, :job, :add], + _measurements, + %{job_name: _job_name, module: _module, node: _node} = _metadata, + %{parent_thread: parent_thread} + ) do + send(parent_thread, :telemetry_pass) + end + + def handle_event( + [:quantum, :job, :delete], + _measurements, + %{job_name: _job_name, module: _module, node: _node} = _metadata, + %{parent_thread: parent_thread} + ) do + send(parent_thread, :telemetry_pass) + end + + def handle_event( + [:quantum, :job, :update], + _measurements, + %{job_name: _job_name, module: _module, node: _node} = _metadata, + %{parent_thread: parent_thread} + ) do + send(parent_thread, :telemetry_pass) + end + end + setup tags do if tags[:listen_storage] do Process.put(:test_pid, self()) @@ -118,6 +149,14 @@ defmodule Quantum.JobBroadcasterTest do describe "add" do @tag listen_storage: true test "active", %{broadcaster: broadcaster, active_job: active_job} do + :ok = + :telemetry.attach( + "add-active-job-handler", + [:quantum, :job, :add], + &TelemetryTestHandler.handle_event/4, + %{parent_thread: self()} + ) + assert capture_log(fn -> TestScheduler.add_job(broadcaster, active_job) @@ -125,6 +164,8 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:add_job, ^active_job, _} end) =~ "Adding job #Reference" + + assert_receive :telemetry_pass end test "active (without debug-logging)", %{init_jobs: init_jobs, active_job: active_job} do @@ -155,6 +196,14 @@ defmodule Quantum.JobBroadcasterTest do @tag listen_storage: true test "inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do + :ok = + :telemetry.attach( + "add-inactive-job-handler", + [:quantum, :job, :add], + &TelemetryTestHandler.handle_event/4, + %{parent_thread: self()} + ) + capture_log(fn -> TestScheduler.add_job(broadcaster, inactive_job) @@ -162,6 +211,8 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:add_job, ^inactive_job, _} end) + + assert_receive :telemetry_pass end end @@ -170,6 +221,14 @@ defmodule Quantum.JobBroadcasterTest do test "active", %{broadcaster: broadcaster, active_job: active_job} do active_job_name = active_job.name + :ok = + :telemetry.attach( + "log-delete-active-job-handler", + [:quantum, :job, :delete], + &TelemetryTestHandler.handle_event/4, + %{parent_thread: self()} + ) + capture_log(fn -> TestScheduler.delete_job(broadcaster, active_job.name) @@ -181,6 +240,8 @@ defmodule Quantum.JobBroadcasterTest do key == active_job_name end) end) + + assert_receive :telemetry_pass end @tag listen_storage: true @@ -196,6 +257,14 @@ defmodule Quantum.JobBroadcasterTest do @tag jobs: :inactive, listen_storage: true test "inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do + :ok = + :telemetry.attach( + "delete-inactive-job-handler", + [:quantum, :job, :delete], + &TelemetryTestHandler.handle_event/4, + %{parent_thread: self()} + ) + capture_log(fn -> inactive_job_name = inactive_job.name @@ -209,6 +278,8 @@ defmodule Quantum.JobBroadcasterTest do key == inactive_job.name end) end) + + assert_receive :telemetry_pass end end @@ -217,6 +288,14 @@ defmodule Quantum.JobBroadcasterTest do test "active => inactive", %{broadcaster: broadcaster, active_job: active_job} do active_job_name = active_job.name + :ok = + :telemetry.attach( + "update-active-to-inactive-job-handler", + [:quantum, :job, :update], + &TelemetryTestHandler.handle_event/4, + %{parent_thread: self()} + ) + capture_log(fn -> TestScheduler.deactivate_job(broadcaster, active_job.name) @@ -224,10 +303,20 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:update_job_state, {_, _}, _} end) + + assert_receive :telemetry_pass end @tag jobs: :inactive, listen_storage: true test "inactive => active", %{broadcaster: broadcaster, inactive_job: inactive_job} do + :ok = + :telemetry.attach( + "update-inactive-to-active-job-handler", + [:quantum, :job, :update], + &TelemetryTestHandler.handle_event/4, + %{parent_thread: self()} + ) + capture_log(fn -> TestScheduler.activate_job(broadcaster, inactive_job.name) @@ -237,10 +326,20 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:update_job_state, {_, _}, _} end) + + assert_receive :telemetry_pass end @tag jobs: :active, listen_storage: true test "active => active", %{broadcaster: broadcaster, active_job: active_job} do + :ok = + :telemetry.attach( + "update-active-to-active-job-handler", + [:quantum, :job, :update], + &TelemetryTestHandler.handle_event/4, + %{parent_thread: self()} + ) + # Initial assert_receive {:received, {:add, ^active_job}} @@ -251,10 +350,20 @@ defmodule Quantum.JobBroadcasterTest do refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) + + refute_receive :telemetry_pass end @tag jobs: :inactive, listen_storage: true test "inactive => inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do + :ok = + :telemetry.attach( + "update-inactive-to-inactive-job-handler", + [:quantum, :job, :update], + &TelemetryTestHandler.handle_event/4, + %{parent_thread: self()} + ) + inactive_job_name = inactive_job.name capture_log(fn -> @@ -264,10 +373,20 @@ defmodule Quantum.JobBroadcasterTest do refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) + + refute_receive :telemetry_pass end @tag listen_storage: true test "missing", %{broadcaster: broadcaster} do + :ok = + :telemetry.attach( + "update-missing-job-handler", + [:quantum, :job, :update], + &TelemetryTestHandler.handle_event/4, + %{parent_thread: self()} + ) + capture_log(fn -> TestScheduler.deactivate_job(broadcaster, make_ref()) TestScheduler.activate_job(broadcaster, make_ref()) @@ -276,6 +395,8 @@ defmodule Quantum.JobBroadcasterTest do refute_receive {:received, {:add, _}} refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) + + refute_receive :telemetry_pass end end From 615f94520c03dc47a6a650aada4a02026d41d0f0 Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Tue, 1 Sep 2020 13:46:20 -0700 Subject: [PATCH 02/16] Fixed CI async overlay by passing a unique value for each test --- test/quantum/job_broadcaster_test.exs | 103 +++++++++++++++++--------- 1 file changed, 70 insertions(+), 33 deletions(-) diff --git a/test/quantum/job_broadcaster_test.exs b/test/quantum/job_broadcaster_test.exs index 20118c1..247b453 100644 --- a/test/quantum/job_broadcaster_test.exs +++ b/test/quantum/job_broadcaster_test.exs @@ -25,27 +25,27 @@ defmodule Quantum.JobBroadcasterTest do [:quantum, :job, :add], _measurements, %{job_name: _job_name, module: _module, node: _node} = _metadata, - %{parent_thread: parent_thread} + %{parent_thread: parent_thread, test_id: test_id} ) do - send(parent_thread, :telemetry_pass) + send(parent_thread, %{ test_id: test_id }) end def handle_event( [:quantum, :job, :delete], _measurements, %{job_name: _job_name, module: _module, node: _node} = _metadata, - %{parent_thread: parent_thread} + %{parent_thread: parent_thread, test_id: test_id} ) do - send(parent_thread, :telemetry_pass) + send(parent_thread, %{ test_id: test_id }) end def handle_event( [:quantum, :job, :update], _measurements, %{job_name: _job_name, module: _module, node: _node} = _metadata, - %{parent_thread: parent_thread} + %{parent_thread: parent_thread, test_id: test_id} ) do - send(parent_thread, :telemetry_pass) + send(parent_thread, %{ test_id: test_id }) end end @@ -149,12 +149,16 @@ defmodule Quantum.JobBroadcasterTest do describe "add" do @tag listen_storage: true test "active", %{broadcaster: broadcaster, active_job: active_job} do + test_id = "add-active-job-handler" :ok = :telemetry.attach( - "add-active-job-handler", + test_id, [:quantum, :job, :add], &TelemetryTestHandler.handle_event/4, - %{parent_thread: self()} + %{ + parent_thread: self(), + test_id: test_id + } ) assert capture_log(fn -> @@ -165,7 +169,7 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:add_job, ^active_job, _} end) =~ "Adding job #Reference" - assert_receive :telemetry_pass + assert_receive %{ test_id: ^test_id } end test "active (without debug-logging)", %{init_jobs: init_jobs, active_job: active_job} do @@ -196,12 +200,16 @@ defmodule Quantum.JobBroadcasterTest do @tag listen_storage: true test "inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do + test_id = "add-inactive-job-handler" :ok = :telemetry.attach( - "add-inactive-job-handler", + test_id, [:quantum, :job, :add], &TelemetryTestHandler.handle_event/4, - %{parent_thread: self()} + %{ + parent_thread: self(), + test_id: test_id + } ) capture_log(fn -> @@ -212,7 +220,7 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:add_job, ^inactive_job, _} end) - assert_receive :telemetry_pass + assert_receive %{ test_id: ^test_id } end end @@ -221,12 +229,16 @@ defmodule Quantum.JobBroadcasterTest do test "active", %{broadcaster: broadcaster, active_job: active_job} do active_job_name = active_job.name + test_id = "log-delete-active-job-handler" :ok = :telemetry.attach( - "log-delete-active-job-handler", + test_id, [:quantum, :job, :delete], &TelemetryTestHandler.handle_event/4, - %{parent_thread: self()} + %{ + parent_thread: self(), + test_id: test_id + } ) capture_log(fn -> @@ -241,7 +253,7 @@ defmodule Quantum.JobBroadcasterTest do end) end) - assert_receive :telemetry_pass + assert_receive %{ test_id: ^test_id } end @tag listen_storage: true @@ -257,12 +269,16 @@ defmodule Quantum.JobBroadcasterTest do @tag jobs: :inactive, listen_storage: true test "inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do + test_id = "delete-inactive-job-handler" :ok = :telemetry.attach( - "delete-inactive-job-handler", + test_id, [:quantum, :job, :delete], &TelemetryTestHandler.handle_event/4, - %{parent_thread: self()} + %{ + parent_thread: self(), + test_id: test_id + } ) capture_log(fn -> @@ -279,7 +295,7 @@ defmodule Quantum.JobBroadcasterTest do end) end) - assert_receive :telemetry_pass + assert_receive %{ test_id: ^test_id } end end @@ -288,12 +304,16 @@ defmodule Quantum.JobBroadcasterTest do test "active => inactive", %{broadcaster: broadcaster, active_job: active_job} do active_job_name = active_job.name + test_id = "update-active-to-inactive-job-handler" :ok = :telemetry.attach( - "update-active-to-inactive-job-handler", + test_id, [:quantum, :job, :update], &TelemetryTestHandler.handle_event/4, - %{parent_thread: self()} + %{ + parent_thread: self(), + test_id: test_id + } ) capture_log(fn -> @@ -304,17 +324,21 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:update_job_state, {_, _}, _} end) - assert_receive :telemetry_pass + assert_receive %{ test_id: ^test_id } end @tag jobs: :inactive, listen_storage: true test "inactive => active", %{broadcaster: broadcaster, inactive_job: inactive_job} do + test_id = "update-inactive-to-active-job-handler" :ok = :telemetry.attach( - "update-inactive-to-active-job-handler", + test_id, [:quantum, :job, :update], &TelemetryTestHandler.handle_event/4, - %{parent_thread: self()} + %{ + parent_thread: self(), + test_id: test_id + } ) capture_log(fn -> @@ -327,17 +351,22 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:update_job_state, {_, _}, _} end) - assert_receive :telemetry_pass + assert_receive %{ test_id: ^test_id } end @tag jobs: :active, listen_storage: true test "active => active", %{broadcaster: broadcaster, active_job: active_job} do + + test_id = "update-active-to-active-job-handler" :ok = :telemetry.attach( - "update-active-to-active-job-handler", + test_id, [:quantum, :job, :update], &TelemetryTestHandler.handle_event/4, - %{parent_thread: self()} + %{ + parent_thread: self(), + test_id: test_id + } ) # Initial @@ -351,17 +380,21 @@ defmodule Quantum.JobBroadcasterTest do refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) - refute_receive :telemetry_pass + refute_receive %{ test_id: ^test_id } end @tag jobs: :inactive, listen_storage: true test "inactive => inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do + test_id = "update-inactive-to-inactive-job-handler" :ok = :telemetry.attach( - "update-inactive-to-inactive-job-handler", + test_id, [:quantum, :job, :update], &TelemetryTestHandler.handle_event/4, - %{parent_thread: self()} + %{ + parent_thread: self(), + test_id: test_id + } ) inactive_job_name = inactive_job.name @@ -374,17 +407,21 @@ defmodule Quantum.JobBroadcasterTest do refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) - refute_receive :telemetry_pass + refute_receive %{ test_id: ^test_id } end @tag listen_storage: true test "missing", %{broadcaster: broadcaster} do + test_id = "update-missing-job-handler" :ok = :telemetry.attach( - "update-missing-job-handler", + test_id, [:quantum, :job, :update], &TelemetryTestHandler.handle_event/4, - %{parent_thread: self()} + %{ + parent_thread: self(), + test_id: test_id + } ) capture_log(fn -> @@ -396,7 +433,7 @@ defmodule Quantum.JobBroadcasterTest do refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) - refute_receive :telemetry_pass + refute_receive %{ test_id: ^test_id } end end From c0003dc9dc50f80a4e552dbb555e510dd05c0753 Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Tue, 1 Sep 2020 13:49:13 -0700 Subject: [PATCH 03/16] lint --- test/quantum/job_broadcaster_test.exs | 70 +++++++++++++++------------ 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/test/quantum/job_broadcaster_test.exs b/test/quantum/job_broadcaster_test.exs index 247b453..2cea400 100644 --- a/test/quantum/job_broadcaster_test.exs +++ b/test/quantum/job_broadcaster_test.exs @@ -27,7 +27,7 @@ defmodule Quantum.JobBroadcasterTest do %{job_name: _job_name, module: _module, node: _node} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do - send(parent_thread, %{ test_id: test_id }) + send(parent_thread, %{test_id: test_id}) end def handle_event( @@ -36,7 +36,7 @@ defmodule Quantum.JobBroadcasterTest do %{job_name: _job_name, module: _module, node: _node} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do - send(parent_thread, %{ test_id: test_id }) + send(parent_thread, %{test_id: test_id}) end def handle_event( @@ -45,7 +45,7 @@ defmodule Quantum.JobBroadcasterTest do %{job_name: _job_name, module: _module, node: _node} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do - send(parent_thread, %{ test_id: test_id }) + send(parent_thread, %{test_id: test_id}) end end @@ -150,14 +150,15 @@ defmodule Quantum.JobBroadcasterTest do @tag listen_storage: true test "active", %{broadcaster: broadcaster, active_job: active_job} do test_id = "add-active-job-handler" + :ok = :telemetry.attach( test_id, [:quantum, :job, :add], &TelemetryTestHandler.handle_event/4, %{ - parent_thread: self(), - test_id: test_id + parent_thread: self(), + test_id: test_id } ) @@ -169,7 +170,7 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:add_job, ^active_job, _} end) =~ "Adding job #Reference" - assert_receive %{ test_id: ^test_id } + assert_receive %{test_id: ^test_id} end test "active (without debug-logging)", %{init_jobs: init_jobs, active_job: active_job} do @@ -201,14 +202,15 @@ defmodule Quantum.JobBroadcasterTest do @tag listen_storage: true test "inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do test_id = "add-inactive-job-handler" + :ok = :telemetry.attach( test_id, [:quantum, :job, :add], &TelemetryTestHandler.handle_event/4, %{ - parent_thread: self(), - test_id: test_id + parent_thread: self(), + test_id: test_id } ) @@ -220,7 +222,7 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:add_job, ^inactive_job, _} end) - assert_receive %{ test_id: ^test_id } + assert_receive %{test_id: ^test_id} end end @@ -230,14 +232,15 @@ defmodule Quantum.JobBroadcasterTest do active_job_name = active_job.name test_id = "log-delete-active-job-handler" + :ok = :telemetry.attach( test_id, [:quantum, :job, :delete], &TelemetryTestHandler.handle_event/4, %{ - parent_thread: self(), - test_id: test_id + parent_thread: self(), + test_id: test_id } ) @@ -253,7 +256,7 @@ defmodule Quantum.JobBroadcasterTest do end) end) - assert_receive %{ test_id: ^test_id } + assert_receive %{test_id: ^test_id} end @tag listen_storage: true @@ -270,14 +273,15 @@ defmodule Quantum.JobBroadcasterTest do @tag jobs: :inactive, listen_storage: true test "inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do test_id = "delete-inactive-job-handler" + :ok = :telemetry.attach( test_id, [:quantum, :job, :delete], &TelemetryTestHandler.handle_event/4, %{ - parent_thread: self(), - test_id: test_id + parent_thread: self(), + test_id: test_id } ) @@ -295,7 +299,7 @@ defmodule Quantum.JobBroadcasterTest do end) end) - assert_receive %{ test_id: ^test_id } + assert_receive %{test_id: ^test_id} end end @@ -305,14 +309,15 @@ defmodule Quantum.JobBroadcasterTest do active_job_name = active_job.name test_id = "update-active-to-inactive-job-handler" + :ok = :telemetry.attach( test_id, [:quantum, :job, :update], &TelemetryTestHandler.handle_event/4, %{ - parent_thread: self(), - test_id: test_id + parent_thread: self(), + test_id: test_id } ) @@ -324,20 +329,21 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:update_job_state, {_, _}, _} end) - assert_receive %{ test_id: ^test_id } + assert_receive %{test_id: ^test_id} end @tag jobs: :inactive, listen_storage: true test "inactive => active", %{broadcaster: broadcaster, inactive_job: inactive_job} do test_id = "update-inactive-to-active-job-handler" + :ok = :telemetry.attach( test_id, [:quantum, :job, :update], &TelemetryTestHandler.handle_event/4, %{ - parent_thread: self(), - test_id: test_id + parent_thread: self(), + test_id: test_id } ) @@ -351,21 +357,21 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:update_job_state, {_, _}, _} end) - assert_receive %{ test_id: ^test_id } + assert_receive %{test_id: ^test_id} end @tag jobs: :active, listen_storage: true test "active => active", %{broadcaster: broadcaster, active_job: active_job} do - test_id = "update-active-to-active-job-handler" + :ok = :telemetry.attach( test_id, [:quantum, :job, :update], &TelemetryTestHandler.handle_event/4, %{ - parent_thread: self(), - test_id: test_id + parent_thread: self(), + test_id: test_id } ) @@ -380,20 +386,21 @@ defmodule Quantum.JobBroadcasterTest do refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) - refute_receive %{ test_id: ^test_id } + refute_receive %{test_id: ^test_id} end @tag jobs: :inactive, listen_storage: true test "inactive => inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do test_id = "update-inactive-to-inactive-job-handler" + :ok = :telemetry.attach( test_id, [:quantum, :job, :update], &TelemetryTestHandler.handle_event/4, %{ - parent_thread: self(), - test_id: test_id + parent_thread: self(), + test_id: test_id } ) @@ -407,20 +414,21 @@ defmodule Quantum.JobBroadcasterTest do refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) - refute_receive %{ test_id: ^test_id } + refute_receive %{test_id: ^test_id} end @tag listen_storage: true test "missing", %{broadcaster: broadcaster} do test_id = "update-missing-job-handler" + :ok = :telemetry.attach( test_id, [:quantum, :job, :update], &TelemetryTestHandler.handle_event/4, %{ - parent_thread: self(), - test_id: test_id + parent_thread: self(), + test_id: test_id } ) @@ -433,7 +441,7 @@ defmodule Quantum.JobBroadcasterTest do refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) - refute_receive %{ test_id: ^test_id } + refute_receive %{test_id: ^test_id} end end From 2e3f22f886ffaf404243b484ee27d9c586344752 Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Wed, 2 Sep 2020 09:53:00 -0700 Subject: [PATCH 04/16] added logs to debug CI failure --- test/quantum/job_broadcaster_test.exs | 29 ++++++++++++++++++--------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/test/quantum/job_broadcaster_test.exs b/test/quantum/job_broadcaster_test.exs index 2cea400..ec02df6 100644 --- a/test/quantum/job_broadcaster_test.exs +++ b/test/quantum/job_broadcaster_test.exs @@ -24,28 +24,28 @@ defmodule Quantum.JobBroadcasterTest do def handle_event( [:quantum, :job, :add], _measurements, - %{job_name: _job_name, module: _module, node: _node} = _metadata, + %{job_name: job_name, module: _module, node: _node} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do - send(parent_thread, %{test_id: test_id}) + send(parent_thread, %{test_id: test_id, job_name: job_name}) end def handle_event( [:quantum, :job, :delete], _measurements, - %{job_name: _job_name, module: _module, node: _node} = _metadata, + %{job_name: job_name, module: _module, node: _node} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do - send(parent_thread, %{test_id: test_id}) + send(parent_thread, %{test_id: test_id, job_name: job_name}) end def handle_event( [:quantum, :job, :update], _measurements, - %{job_name: _job_name, module: _module, node: _node} = _metadata, + %{job_name: job_name, module: _module, node: _node} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do - send(parent_thread, %{test_id: test_id}) + send(parent_thread, %{test_id: test_id, job_name: job_name}) end end @@ -334,6 +334,8 @@ defmodule Quantum.JobBroadcasterTest do @tag jobs: :inactive, listen_storage: true test "inactive => active", %{broadcaster: broadcaster, inactive_job: inactive_job} do + IO.inspect "testingA" + :erlang.process_info(self(), :messages) |> IO.inspect() test_id = "update-inactive-to-active-job-handler" :ok = @@ -356,12 +358,17 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:update_job_state, {_, _}, _} end) - + IO.inspect "testingB" + :erlang.process_info(self(), :messages) |> IO.inspect() assert_receive %{test_id: ^test_id} + + end @tag jobs: :active, listen_storage: true test "active => active", %{broadcaster: broadcaster, active_job: active_job} do + IO.inspect "process mailbox before active-active test" + :erlang.process_info(self(), :messages) |> IO.inspect() test_id = "update-active-to-active-job-handler" :ok = @@ -377,16 +384,18 @@ defmodule Quantum.JobBroadcasterTest do # Initial assert_receive {:received, {:add, ^active_job}} - + name = active_job.name capture_log(fn -> - TestScheduler.activate_job(broadcaster, active_job.name) + TestScheduler.activate_job(broadcaster, name) refute_receive {:received, {:add, ^active_job}} refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) - refute_receive %{test_id: ^test_id} + IO.inspect "process mailbox after active-active test" + :erlang.process_info(self(), :messages) |> IO.inspect() + refute_receive %{test_id: ^test_id, job_name: ^name} end @tag jobs: :inactive, listen_storage: true From 896c86a3114da55169c6d125c6dd11d486c4d8f8 Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Wed, 2 Sep 2020 10:09:36 -0700 Subject: [PATCH 05/16] added Job_name to prevent collision in CI --- test/quantum/job_broadcaster_test.exs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/test/quantum/job_broadcaster_test.exs b/test/quantum/job_broadcaster_test.exs index ec02df6..9df97bd 100644 --- a/test/quantum/job_broadcaster_test.exs +++ b/test/quantum/job_broadcaster_test.exs @@ -334,7 +334,7 @@ defmodule Quantum.JobBroadcasterTest do @tag jobs: :inactive, listen_storage: true test "inactive => active", %{broadcaster: broadcaster, inactive_job: inactive_job} do - IO.inspect "testingA" + IO.inspect("testingA") :erlang.process_info(self(), :messages) |> IO.inspect() test_id = "update-inactive-to-active-job-handler" @@ -358,17 +358,14 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:update_job_state, {_, _}, _} end) - IO.inspect "testingB" + + IO.inspect("testingB") :erlang.process_info(self(), :messages) |> IO.inspect() assert_receive %{test_id: ^test_id} - - end @tag jobs: :active, listen_storage: true test "active => active", %{broadcaster: broadcaster, active_job: active_job} do - IO.inspect "process mailbox before active-active test" - :erlang.process_info(self(), :messages) |> IO.inspect() test_id = "update-active-to-active-job-handler" :ok = @@ -385,6 +382,7 @@ defmodule Quantum.JobBroadcasterTest do # Initial assert_receive {:received, {:add, ^active_job}} name = active_job.name + capture_log(fn -> TestScheduler.activate_job(broadcaster, name) @@ -393,8 +391,6 @@ defmodule Quantum.JobBroadcasterTest do refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) - IO.inspect "process mailbox after active-active test" - :erlang.process_info(self(), :messages) |> IO.inspect() refute_receive %{test_id: ^test_id, job_name: ^name} end @@ -416,14 +412,14 @@ defmodule Quantum.JobBroadcasterTest do inactive_job_name = inactive_job.name capture_log(fn -> - TestScheduler.deactivate_job(broadcaster, inactive_job.name) + TestScheduler.deactivate_job(broadcaster, inactive_job_name) refute_receive {:received, {:remove, ^inactive_job_name}} refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) - refute_receive %{test_id: ^test_id} + refute_receive %{test_id: ^test_id, job_name: ^inactive_job_name} end @tag listen_storage: true @@ -441,16 +437,20 @@ defmodule Quantum.JobBroadcasterTest do } ) + ref1 = make_ref() + ref2 = make_ref() + capture_log(fn -> - TestScheduler.deactivate_job(broadcaster, make_ref()) - TestScheduler.activate_job(broadcaster, make_ref()) + TestScheduler.deactivate_job(broadcaster, ref1) + TestScheduler.activate_job(broadcaster, ref2) refute_receive {:received, {:remove, _}} refute_receive {:received, {:add, _}} refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) - refute_receive %{test_id: ^test_id} + refute_receive %{test_id: ^test_id, job_name: ref1} + refute_receive %{test_id: ^test_id, job_name: ref2} end end From 9716f95b7ae03f4ffbfeebd02dfbf879cfd64a36 Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Wed, 2 Sep 2020 10:31:12 -0700 Subject: [PATCH 06/16] fixed typo --- test/quantum/job_broadcaster_test.exs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/test/quantum/job_broadcaster_test.exs b/test/quantum/job_broadcaster_test.exs index 9df97bd..6e59b5b 100644 --- a/test/quantum/job_broadcaster_test.exs +++ b/test/quantum/job_broadcaster_test.exs @@ -334,8 +334,6 @@ defmodule Quantum.JobBroadcasterTest do @tag jobs: :inactive, listen_storage: true test "inactive => active", %{broadcaster: broadcaster, inactive_job: inactive_job} do - IO.inspect("testingA") - :erlang.process_info(self(), :messages) |> IO.inspect() test_id = "update-inactive-to-active-job-handler" :ok = @@ -359,8 +357,6 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:update_job_state, {_, _}, _} end) - IO.inspect("testingB") - :erlang.process_info(self(), :messages) |> IO.inspect() assert_receive %{test_id: ^test_id} end @@ -449,8 +445,8 @@ defmodule Quantum.JobBroadcasterTest do refute_receive {:update_job_state, {TestScheduler, _, _}, _} end) - refute_receive %{test_id: ^test_id, job_name: ref1} - refute_receive %{test_id: ^test_id, job_name: ref2} + refute_receive %{test_id: ^test_id, job_name: ^ref1} + refute_receive %{test_id: ^test_id, job_name: ^ref2} end end From ecdd52839bcc354973edc8be30c1a5d50dd62a07 Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Wed, 2 Sep 2020 15:48:00 -0700 Subject: [PATCH 07/16] added telemetry job span tracking --- .tool-versions | 2 - Makefile | 14 --- lib/quantum/executor.ex | 27 +++++ test/quantum/executor_test.exs | 161 ++++++++++++++++++++++++++ test/quantum/job_broadcaster_test.exs | 111 ++++-------------- 5 files changed, 209 insertions(+), 106 deletions(-) delete mode 100644 .tool-versions delete mode 100644 Makefile diff --git a/.tool-versions b/.tool-versions deleted file mode 100644 index df178cf..0000000 --- a/.tool-versions +++ /dev/null @@ -1,2 +0,0 @@ -elixir 1.9.4-otp-22 -erlang 22.1.8 diff --git a/Makefile b/Makefile deleted file mode 100644 index d33c41e..0000000 --- a/Makefile +++ /dev/null @@ -1,14 +0,0 @@ -pre-push: - make compile && make lint && make test - -compile: - rm -rf _build/dev/lib/quantum && mix compile --warnings-as-errors - -lint: - mix format && make dialyzer - -dialyzer: - mix dialyzer --format dialyxir - -test: - mix test diff --git a/lib/quantum/executor.ex b/lib/quantum/executor.ex index b77852c..37d70f3 100644 --- a/lib/quantum/executor.ex +++ b/lib/quantum/executor.ex @@ -84,6 +84,15 @@ defmodule Quantum.Executor do "[#{inspect(Node.self())}][#{__MODULE__}] Execute started for job #{inspect(job_name)}" end) + # Note: we are intentionally mimicking the ":telemetry.span" here to keep current functionality + startMonotonicTime = :erlang.monotonic_time() + + :telemetry.execute([:quantum, :job, :start], %{system_time: startMonotonicTime}, %{ + job_name: job_name, + node: inspect(node), + module: __MODULE__ + }) + try do execute_task(task) catch @@ -94,6 +103,16 @@ defmodule Quantum.Executor do inspect(job_name) }, which failed due to: #{Exception.format(type, value, __STACKTRACE__)}" end) + + duration = :erlang.monotonic_time() - startMonotonicTime + + :telemetry.execute([:quantum, :job, :exception], %{duration: duration}, %{ + job_name: job_name, + node: inspect(node), + module: __MODULE__, + reason: value, + stacktrace: __STACKTRACE__ + }) else result -> debug_logging && @@ -102,6 +121,14 @@ defmodule Quantum.Executor do inspect(job_name) }, which yielded result: #{inspect(result)}" end) + + duration = :erlang.monotonic_time() - startMonotonicTime + + :telemetry.execute([:quantum, :job, :stop], %{duration: duration}, %{ + job_name: job_name, + node: inspect(node), + module: __MODULE__ + }) end :ok diff --git a/test/quantum/executor_test.exs b/test/quantum/executor_test.exs index aaa1780..ae3f122 100644 --- a/test/quantum/executor_test.exs +++ b/test/quantum/executor_test.exs @@ -17,6 +17,65 @@ defmodule Quantum.ExecutorTest do use Quantum, otp_app: :job_broadcaster_test end + defmodule TelemetryTestHandler do + require Logger + + def handle_event( + [:quantum, :job, :start], + %{system_time: _system_time} = _measurements, + %{job_name: job_name, module: _module, node: _node} = _metadata, + %{parent_thread: parent_thread, test_id: test_id} + ) do + send(parent_thread, %{test_id: test_id, job_name: job_name, type: :start}) + end + + def handle_event( + [:quantum, :job, :stop], + %{duration: _duration} = _measurements, + %{job_name: job_name, module: _module, node: _node} = _metadata, + %{parent_thread: parent_thread, test_id: test_id} + ) do + send(parent_thread, %{test_id: test_id, job_name: job_name, type: :stop}) + end + + def handle_event( + [:quantum, :job, :exception], + %{duration: _duration} = _measurements, + %{ + job_name: job_name, + module: _module, + node: _node, + reason: reason, + stacktrace: stacktrace + } = _metadata, + %{parent_thread: parent_thread, test_id: test_id} + ) do + send(parent_thread, %{ + test_id: test_id, + job_name: job_name, + type: :exception, + reason: reason, + stacktrace: stacktrace + }) + end + end + + defp attach_telemetry(test_id, parent_thread) do + :telemetry.attach_many( + test_id, + [ + [:quantum, :job, :start], + [:quantum, :job, :stop], + [:quantum, :job, :exception] + ], + &TelemetryTestHandler.handle_event/4, + %{ + parent_thread: parent_thread, + test_id: test_id + } + ) + end + setup do {:ok, _task_supervisor} = start_supervised({Task.Supervisor, [name: Module.concat(__MODULE__, TaskSupervisor)]}) @@ -44,6 +103,10 @@ defmodule Quantum.ExecutorTest do } do caller = self() + test_id = "log-anonymous-job-handler" + + :ok = attach_telemetry(test_id, self()) + job = TestScheduler.new_job() |> Job.set_task(fn -> send(caller, :executed) end) @@ -59,6 +122,9 @@ defmodule Quantum.ExecutorTest do ) assert_receive :executed + + assert_receive %{test_id: ^test_id, type: :start} + assert_receive %{test_id: ^test_id, type: :stop}, 2000 end) end @@ -69,6 +135,10 @@ defmodule Quantum.ExecutorTest do } do caller = self() + test_id = "log-function-tuple-job-handler" + + :ok = attach_telemetry(test_id, self()) + job = TestScheduler.new_job() |> Job.set_task({__MODULE__, :send, [caller]}) @@ -85,6 +155,9 @@ defmodule Quantum.ExecutorTest do assert_receive :executed end) + + assert_receive %{test_id: ^test_id, type: :start} + assert_receive %{test_id: ^test_id, type: :stop}, 2000 end test "executes given task without overlap", %{ @@ -93,6 +166,9 @@ defmodule Quantum.ExecutorTest do debug_logging: debug_logging } do caller = self() + test_id = "log-task-no-overlap-handler" + + :ok = attach_telemetry(test_id, self()) job = TestScheduler.new_job() @@ -124,6 +200,9 @@ defmodule Quantum.ExecutorTest do assert_receive :executed refute_receive :executed end) + + assert_receive %{test_id: ^test_id, type: :start} + assert_receive %{test_id: ^test_id, type: :stop}, 2000 end test "releases lock on success", %{ @@ -132,6 +211,9 @@ defmodule Quantum.ExecutorTest do debug_logging: debug_logging } do caller = self() + test_id = "release-lock-on-success-handler" + + :ok = attach_telemetry(test_id, self()) job = TestScheduler.new_job() @@ -161,6 +243,9 @@ defmodule Quantum.ExecutorTest do assert :marked_running = TaskRegistry.mark_running(task_registry, job.name, Node.self()) end) + + assert_receive %{test_id: ^test_id, type: :start} + assert_receive %{test_id: ^test_id, type: :stop}, 2000 end test "releases lock on error", %{ @@ -168,6 +253,10 @@ defmodule Quantum.ExecutorTest do task_registry: task_registry, debug_logging: debug_logging } do + test_id = "release-lock-on-error-handler" + + :ok = attach_telemetry(test_id, self()) + job = TestScheduler.new_job() |> Job.set_task(fn -> raise "failed" end) @@ -188,6 +277,21 @@ defmodule Quantum.ExecutorTest do end) assert :marked_running = TaskRegistry.mark_running(task_registry, job.name, Node.self()) + assert_receive %{test_id: ^test_id, type: :start} + + assert_receive %{ + test_id: ^test_id, + type: :exception, + reason: %RuntimeError{message: "failed"}, + stacktrace: [ + {Quantum.ExecutorTest, _, _, _}, + {Quantum.Executor, _, _, _}, + {Task.Supervised, _, _, _}, + {Task.Supervised, _, _, _}, + {:proc_lib, _, _, _} + ] + }, + 2000 end test "logs error", %{ @@ -195,6 +299,10 @@ defmodule Quantum.ExecutorTest do task_registry: task_registry, debug_logging: debug_logging } do + test_id = "logs-error-handler" + + :ok = attach_telemetry(test_id, self()) + job = TestScheduler.new_job() |> Job.set_task(fn -> raise "failed" end) @@ -216,6 +324,21 @@ defmodule Quantum.ExecutorTest do end) assert logs =~ ~r/\(RuntimeError\) failed/ + assert_receive %{test_id: ^test_id, type: :start} + + assert_receive %{ + test_id: ^test_id, + type: :exception, + reason: %RuntimeError{message: "failed"}, + stacktrace: [ + {Quantum.ExecutorTest, _, _, _}, + {Quantum.Executor, _, _, _}, + {Task.Supervised, _, _, _}, + {Task.Supervised, _, _, _}, + {:proc_lib, _, _, _} + ] + }, + 2000 end test "logs exit", %{ @@ -223,6 +346,10 @@ defmodule Quantum.ExecutorTest do task_registry: task_registry, debug_logging: debug_logging } do + test_id = "logs-exit-handler" + + :ok = attach_telemetry(test_id, self()) + job = TestScheduler.new_job() |> Job.set_task(fn -> exit(:failure) end) @@ -244,6 +371,21 @@ defmodule Quantum.ExecutorTest do end) assert logs =~ ~r/\(exit\) :failure/ + assert_receive %{test_id: ^test_id, type: :start} + + assert_receive %{ + test_id: ^test_id, + type: :exception, + reason: :failure, + stacktrace: [ + {Quantum.ExecutorTest, _, _, _}, + {Quantum.Executor, _, _, _}, + {Task.Supervised, _, _, _}, + {Task.Supervised, _, _, _}, + {:proc_lib, _, _, _} + ] + }, + 2000 end test "logs throw", %{ @@ -251,6 +393,10 @@ defmodule Quantum.ExecutorTest do task_registry: task_registry, debug_logging: debug_logging } do + test_id = "logs-throw-handler" + + :ok = attach_telemetry(test_id, self()) + ref = make_ref() job = @@ -274,6 +420,21 @@ defmodule Quantum.ExecutorTest do end) assert logs =~ "(throw) #{inspect(ref)}" + assert_receive %{test_id: ^test_id, type: :start} + + assert_receive %{ + test_id: ^test_id, + type: :exception, + reason: ^ref, + stacktrace: [ + {Quantum.ExecutorTest, _, _, _}, + {Quantum.Executor, _, _, _}, + {Task.Supervised, _, _, _}, + {Task.Supervised, _, _, _}, + {:proc_lib, _, _, _} + ] + }, + 2000 end end diff --git a/test/quantum/job_broadcaster_test.exs b/test/quantum/job_broadcaster_test.exs index 6e59b5b..f29c952 100644 --- a/test/quantum/job_broadcaster_test.exs +++ b/test/quantum/job_broadcaster_test.exs @@ -49,6 +49,18 @@ defmodule Quantum.JobBroadcasterTest do end end + defp attach_telemetry(last_atom, test_id, parent_thread) do + :telemetry.attach( + test_id, + [:quantum, :job, last_atom], + &TelemetryTestHandler.handle_event/4, + %{ + parent_thread: parent_thread, + test_id: test_id + } + ) + end + setup tags do if tags[:listen_storage] do Process.put(:test_pid, self()) @@ -151,16 +163,7 @@ defmodule Quantum.JobBroadcasterTest do test "active", %{broadcaster: broadcaster, active_job: active_job} do test_id = "add-active-job-handler" - :ok = - :telemetry.attach( - test_id, - [:quantum, :job, :add], - &TelemetryTestHandler.handle_event/4, - %{ - parent_thread: self(), - test_id: test_id - } - ) + :ok = attach_telemetry(:add, test_id, self()) assert capture_log(fn -> TestScheduler.add_job(broadcaster, active_job) @@ -203,16 +206,7 @@ defmodule Quantum.JobBroadcasterTest do test "inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do test_id = "add-inactive-job-handler" - :ok = - :telemetry.attach( - test_id, - [:quantum, :job, :add], - &TelemetryTestHandler.handle_event/4, - %{ - parent_thread: self(), - test_id: test_id - } - ) + :ok = attach_telemetry(:add, test_id, self()) capture_log(fn -> TestScheduler.add_job(broadcaster, inactive_job) @@ -233,16 +227,7 @@ defmodule Quantum.JobBroadcasterTest do test_id = "log-delete-active-job-handler" - :ok = - :telemetry.attach( - test_id, - [:quantum, :job, :delete], - &TelemetryTestHandler.handle_event/4, - %{ - parent_thread: self(), - test_id: test_id - } - ) + :ok = attach_telemetry(:delete, test_id, self()) capture_log(fn -> TestScheduler.delete_job(broadcaster, active_job.name) @@ -274,16 +259,7 @@ defmodule Quantum.JobBroadcasterTest do test "inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do test_id = "delete-inactive-job-handler" - :ok = - :telemetry.attach( - test_id, - [:quantum, :job, :delete], - &TelemetryTestHandler.handle_event/4, - %{ - parent_thread: self(), - test_id: test_id - } - ) + :ok = attach_telemetry(:delete, test_id, self()) capture_log(fn -> inactive_job_name = inactive_job.name @@ -310,16 +286,7 @@ defmodule Quantum.JobBroadcasterTest do test_id = "update-active-to-inactive-job-handler" - :ok = - :telemetry.attach( - test_id, - [:quantum, :job, :update], - &TelemetryTestHandler.handle_event/4, - %{ - parent_thread: self(), - test_id: test_id - } - ) + :ok = attach_telemetry(:update, test_id, self()) capture_log(fn -> TestScheduler.deactivate_job(broadcaster, active_job.name) @@ -336,16 +303,7 @@ defmodule Quantum.JobBroadcasterTest do test "inactive => active", %{broadcaster: broadcaster, inactive_job: inactive_job} do test_id = "update-inactive-to-active-job-handler" - :ok = - :telemetry.attach( - test_id, - [:quantum, :job, :update], - &TelemetryTestHandler.handle_event/4, - %{ - parent_thread: self(), - test_id: test_id - } - ) + :ok = attach_telemetry(:update, test_id, self()) capture_log(fn -> TestScheduler.activate_job(broadcaster, inactive_job.name) @@ -364,16 +322,7 @@ defmodule Quantum.JobBroadcasterTest do test "active => active", %{broadcaster: broadcaster, active_job: active_job} do test_id = "update-active-to-active-job-handler" - :ok = - :telemetry.attach( - test_id, - [:quantum, :job, :update], - &TelemetryTestHandler.handle_event/4, - %{ - parent_thread: self(), - test_id: test_id - } - ) + :ok = attach_telemetry(:update, test_id, self()) # Initial assert_receive {:received, {:add, ^active_job}} @@ -394,16 +343,7 @@ defmodule Quantum.JobBroadcasterTest do test "inactive => inactive", %{broadcaster: broadcaster, inactive_job: inactive_job} do test_id = "update-inactive-to-inactive-job-handler" - :ok = - :telemetry.attach( - test_id, - [:quantum, :job, :update], - &TelemetryTestHandler.handle_event/4, - %{ - parent_thread: self(), - test_id: test_id - } - ) + :ok = attach_telemetry(:update, test_id, self()) inactive_job_name = inactive_job.name @@ -422,16 +362,7 @@ defmodule Quantum.JobBroadcasterTest do test "missing", %{broadcaster: broadcaster} do test_id = "update-missing-job-handler" - :ok = - :telemetry.attach( - test_id, - [:quantum, :job, :update], - &TelemetryTestHandler.handle_event/4, - %{ - parent_thread: self(), - test_id: test_id - } - ) + :ok = attach_telemetry(:update, test_id, self()) ref1 = make_ref() ref2 = make_ref() From dadec6d8590b101e9e42ef547e07825b6e4c2f24 Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Wed, 2 Sep 2020 16:01:11 -0700 Subject: [PATCH 08/16] fixed readme --- README.md | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index a2ee27e..1670782 100644 --- a/README.md +++ b/README.md @@ -100,12 +100,10 @@ terms of this contract." 1. Check for [open issues](https://github.com/quantum-elixir/quantum-core/issues) or [open a new issue](https://github.com/quantum-elixir/quantum-core/issues/new) to start a discussion around [a problem](https://www.youtube.com/watch?v=_QF9sFJGJuc). 2. Issues SHALL be named as "Problem: _description of the problem_". 3. Fork the [quantum-elixir repository on GitHub](https://github.com/quantum-elixir/quantum-core) to start making your changes -4. If needed, run `mix deps.get` -5. If possible, write a test which shows that the problem was solved. -6. run `make pre-push` to verify all tests pass -7. Send a pull request. -8. Pull requests SHALL be named as "Solution: _description of your solution_" -9. Your pull request is merged and you are added to the [list of contributors](https://github.com/quantum-elixir/quantum-core/graphs/contributors) +4. If possible, write a test which shows that the problem was solved. +5. Send a pull request. +6. Pull requests SHALL be named as "Solution: _description of your solution_" +7. Your pull request is merged and you are added to the [list of contributors](https://github.com/quantum-elixir/quantum-core/graphs/contributors) ### Code Contributors From a7c7b76db00a59bb410252efad734c60bcf0bb9b Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Wed, 2 Sep 2020 16:22:42 -0700 Subject: [PATCH 09/16] fixed dialyzer error --- lib/quantum/executor.ex | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/quantum/executor.ex b/lib/quantum/executor.ex index 37d70f3..bc49ae9 100644 --- a/lib/quantum/executor.ex +++ b/lib/quantum/executor.ex @@ -85,9 +85,9 @@ defmodule Quantum.Executor do end) # Note: we are intentionally mimicking the ":telemetry.span" here to keep current functionality - startMonotonicTime = :erlang.monotonic_time() + start_monotonic_time = :erlang.monotonic_time() - :telemetry.execute([:quantum, :job, :start], %{system_time: startMonotonicTime}, %{ + :telemetry.execute([:quantum, :job, :start], %{system_time: start_monotonic_time}, %{ job_name: job_name, node: inspect(node), module: __MODULE__ @@ -104,7 +104,7 @@ defmodule Quantum.Executor do }, which failed due to: #{Exception.format(type, value, __STACKTRACE__)}" end) - duration = :erlang.monotonic_time() - startMonotonicTime + duration = :erlang.monotonic_time() - start_monotonic_time :telemetry.execute([:quantum, :job, :exception], %{duration: duration}, %{ job_name: job_name, @@ -122,7 +122,7 @@ defmodule Quantum.Executor do }, which yielded result: #{inspect(result)}" end) - duration = :erlang.monotonic_time() - startMonotonicTime + duration = :erlang.monotonic_time() - start_monotonic_time :telemetry.execute([:quantum, :job, :stop], %{duration: duration}, %{ job_name: job_name, From b600a9ab7201097ba1ee565b26746082e2599232 Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Tue, 8 Sep 2020 16:58:28 -0700 Subject: [PATCH 10/16] requested changes --- lib/quantum/job_broadcaster.ex | 53 ++++++++++++++++++++++----- pages/configuration.md | 3 ++ test/quantum/job_broadcaster_test.exs | 30 +++++++++++---- 3 files changed, 69 insertions(+), 17 deletions(-) diff --git a/lib/quantum/job_broadcaster.ex b/lib/quantum/job_broadcaster.ex index f0e82d9..9d4a1a2 100644 --- a/lib/quantum/job_broadcaster.ex +++ b/lib/quantum/job_broadcaster.ex @@ -60,6 +60,16 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Loading Initial Jobs from Storage, skipping config" end) + for %Job{state: :active, name: name} = job <- storage_jobs do + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :add], %{}, %{ + job_name: name, + job: job, + node: inspect(Node.self()), + module: __MODULE__ + }) + end + storage_jobs end @@ -99,6 +109,7 @@ defmodule Quantum.JobBroadcaster do # Send event to telemetry incase the end user wants to monitor events :telemetry.execute([:quantum, :job, :add], %{}, %{ job_name: job_name, + job: job, node: inspect(Node.self()), module: __MODULE__ }) @@ -125,6 +136,7 @@ defmodule Quantum.JobBroadcaster do # Send event to telemetry incase the end user wants to monitor events :telemetry.execute([:quantum, :job, :add], %{}, %{ job_name: job_name, + job: job, node: inspect(Node.self()), module: __MODULE__ }) @@ -148,20 +160,29 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Deleting job #{inspect(name)}" end) - # Send event to telemetry incase the end user wants to monitor events - :telemetry.execute([:quantum, :job, :delete], %{}, %{ - job_name: name, - node: inspect(Node.self()), - module: __MODULE__ - }) - case Map.fetch(jobs, name) do - {:ok, %{state: :active}} -> + {:ok, %{state: :active, name: name} = job} -> + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :delete], %{}, %{ + job_name: name, + job: job, + node: inspect(Node.self()), + module: __MODULE__ + }) + :ok = storage.delete_job(storage_pid, name) {:noreply, [{:remove, name}], %{state | jobs: Map.delete(jobs, name)}} - {:ok, %{state: :inactive}} -> + {:ok, %{state: :inactive, name: name} = job} -> + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :delete], %{}, %{ + job_name: name, + job: job, + node: inspect(Node.self()), + module: __MODULE__ + }) + :ok = storage.delete_job(storage_pid, name) {:noreply, [], %{state | jobs: Map.delete(jobs, name)}} @@ -196,6 +217,7 @@ defmodule Quantum.JobBroadcaster do # Send event to telemetry incase the end user wants to monitor events :telemetry.execute([:quantum, :job, :update], %{}, %{ job_name: name, + job: job, node: inspect(Node.self()), module: __MODULE__ }) @@ -228,7 +250,18 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Deleting all jobs" end) - messages = for {name, %Job{state: :active}} <- jobs, do: {:remove, name} + messages = + for {name, %Job{state: :active} = job} <- jobs do + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :delete], %{}, %{ + job_name: name, + job: job, + node: inspect(Node.self()), + module: __MODULE__ + }) + + {:remove, name} + end :ok = storage.purge(storage_pid) diff --git a/pages/configuration.md b/pages/configuration.md index e4dae7d..3e683c5 100644 --- a/pages/configuration.md +++ b/pages/configuration.md @@ -154,3 +154,6 @@ Timezones can also be configured on a per-job basis. This overrides the default timezone: "America/New_York" } ``` + +## Telemetry Support + diff --git a/test/quantum/job_broadcaster_test.exs b/test/quantum/job_broadcaster_test.exs index f29c952..52a72d7 100644 --- a/test/quantum/job_broadcaster_test.exs +++ b/test/quantum/job_broadcaster_test.exs @@ -1,7 +1,8 @@ defmodule Quantum.JobBroadcasterTest do @moduledoc false - use ExUnit.Case, async: true + # , async: true causes random failures due to race conditions with telemetry tests + use ExUnit.Case alias Quantum.{Job, JobBroadcaster, JobBroadcaster.StartOpts} alias Quantum.Storage.Test, as: TestStorage @@ -24,28 +25,28 @@ defmodule Quantum.JobBroadcasterTest do def handle_event( [:quantum, :job, :add], _measurements, - %{job_name: job_name, module: _module, node: _node} = _metadata, + %{job_name: job_name, job: _job, module: _module, node: _node} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do - send(parent_thread, %{test_id: test_id, job_name: job_name}) + send(parent_thread, %{test_id: test_id, job_name: job_name, type: :add}) end def handle_event( [:quantum, :job, :delete], _measurements, - %{job_name: job_name, module: _module, node: _node} = _metadata, + %{job_name: job_name, job: _job, module: _module, node: _node} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do - send(parent_thread, %{test_id: test_id, job_name: job_name}) + send(parent_thread, %{test_id: test_id, job_name: job_name, type: :delete}) end def handle_event( [:quantum, :job, :update], _measurements, - %{job_name: job_name, module: _module, node: _node} = _metadata, + %{job_name: job_name, job: _job, module: _module, node: _node} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do - send(parent_thread, %{test_id: test_id, job_name: job_name}) + send(parent_thread, %{test_id: test_id, job_name: job_name, type: :update}) end end @@ -125,6 +126,10 @@ defmodule Quantum.JobBroadcasterTest do @tag manual_dispatch: true test "storage jobs", %{active_job: active_job, inactive_job: inactive_job} do + test_id = "init-storage-jobs-handler" + + :ok = attach_telemetry(:add, test_id, self()) + capture_log(fn -> defmodule FullStorage do @moduledoc false @@ -155,6 +160,10 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:received, {:add, _}} refute_receive {:received, {:add, _}} end) + + # Ensure exactly one :add telemetry notifation because only one job is active + assert_receive %{test_id: ^test_id, type: :add} + refute_receive %{test_id: ^test_id, type: :add} end end @@ -388,6 +397,10 @@ defmodule Quantum.JobBroadcasterTest do active_job: active_job, inactive_job: inactive_job } do + test_id = "delete-all-active-jobs-handler" + + :ok = attach_telemetry(:delete, test_id, self()) + active_job_name = active_job.name inactive_job_name = inactive_job.name @@ -399,6 +412,9 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:purge, _, _} end) + + refute_receive %{test_id: ^test_id, job_name: ^inactive_job_name, type: :delete} + assert_receive %{test_id: ^test_id, job_name: ^active_job_name, type: :delete} end end From 9efa1938b9be321a57fe0ee6c94c60ffcdffd992 Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Thu, 10 Sep 2020 14:14:08 -0700 Subject: [PATCH 11/16] lint --- lib/quantum/job_broadcaster.ex | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/lib/quantum/job_broadcaster.ex b/lib/quantum/job_broadcaster.ex index c4b1783..09690b1 100644 --- a/lib/quantum/job_broadcaster.ex +++ b/lib/quantum/job_broadcaster.ex @@ -108,6 +108,7 @@ defmodule Quantum.JobBroadcaster do Logger.debug(fn -> "[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}" end) + # Send event to telemetry incase the end user wants to monitor events :telemetry.execute([:quantum, :job, :add], %{}, %{ job_name: job_name, @@ -115,7 +116,7 @@ defmodule Quantum.JobBroadcaster do node: inspect(Node.self()), module: __MODULE__ }) - + :ok = storage.delete_job(storage_pid, job_name) :ok = storage.add_job(storage_pid, job) @@ -138,14 +139,15 @@ defmodule Quantum.JobBroadcaster do Logger.debug(fn -> "[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}" end) - - # Send event to telemetry incase the end user wants to monitor events + + # Send event to telemetry incase the end user wants to monitor events :telemetry.execute([:quantum, :job, :add], %{}, %{ job_name: job_name, job: job, node: inspect(Node.self()), module: __MODULE__ }) + :ok = storage.add_job(storage_pid, job) {:noreply, [{:add, job}], %{state | jobs: Map.put(jobs, job_name, job)}} @@ -167,14 +169,15 @@ defmodule Quantum.JobBroadcaster do Logger.debug(fn -> "[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}" end) - - # Send event to telemetry incase the end user wants to monitor events + + # Send event to telemetry incase the end user wants to monitor events :telemetry.execute([:quantum, :job, :add], %{}, %{ job_name: job_name, job: job, node: inspect(Node.self()), module: __MODULE__ }) + :ok = storage.delete_job(storage_pid, job_name) :ok = storage.add_job(storage_pid, job) @@ -185,8 +188,6 @@ defmodule Quantum.JobBroadcaster do Logger.debug(fn -> "[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}" end) - - :ok = storage.delete_job(storage_pid, job_name) :ok = storage.add_job(storage_pid, job) @@ -198,6 +199,7 @@ defmodule Quantum.JobBroadcaster do Logger.debug(fn -> "[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}" end) + # Send event to telemetry incase the end user wants to monitor events :telemetry.execute([:quantum, :job, :add], %{}, %{ job_name: job_name, @@ -205,6 +207,7 @@ defmodule Quantum.JobBroadcaster do node: inspect(Node.self()), module: __MODULE__ }) + :ok = storage.add_job(storage_pid, job) {:noreply, [], %{state | jobs: Map.put(jobs, job_name, job)}} From 93a1b1ff73a589d7b5ae01537a37ec7617141cb9 Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Thu, 10 Sep 2020 16:39:44 -0700 Subject: [PATCH 12/16] Injected scheduler to incude in telemetry metrics --- lib/quantum/executor.ex | 22 ++++--- lib/quantum/executor/start_opts.ex | 6 +- lib/quantum/executor_supervisor.ex | 6 +- lib/quantum/executor_supervisor/init_opts.ex | 6 +- lib/quantum/executor_supervisor/start_opts.ex | 6 +- lib/quantum/job_broadcaster.ex | 27 +++++--- lib/quantum/supervisor.ex | 1 + test/quantum/executor_test.exs | 61 ++++++++++++------- test/quantum/job_broadcaster_test.exs | 9 ++- 9 files changed, 94 insertions(+), 50 deletions(-) diff --git a/lib/quantum/executor.ex b/lib/quantum/executor.ex index bc49ae9..2d8e781 100644 --- a/lib/quantum/executor.ex +++ b/lib/quantum/executor.ex @@ -25,11 +25,11 @@ defmodule Quantum.Executor do @spec execute(StartOpts.t(), Job.t(), Node.t()) :: :ok # Execute task on all given nodes without checking for overlap defp execute( - %StartOpts{task_supervisor_reference: task_supervisor, debug_logging: debug_logging}, + %StartOpts{task_supervisor_reference: task_supervisor, debug_logging: debug_logging, scheduler: scheduler}, %Job{overlap: true} = job, node ) do - run(node, job, task_supervisor, debug_logging) + run(node, job, task_supervisor, debug_logging, scheduler) :ok end @@ -39,7 +39,8 @@ defmodule Quantum.Executor do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Job{overlap: false, name: job_name} = job, node @@ -51,7 +52,7 @@ defmodule Quantum.Executor do case TaskRegistry.mark_running(task_registry, job_name, node) do :marked_running -> - %Task{ref: ref} = run(node, job, task_supervisor, debug_logging) + %Task{ref: ref} = run(node, job, task_supervisor, debug_logging, scheduler) receive do {^ref, _} -> @@ -69,8 +70,8 @@ defmodule Quantum.Executor do end # Ececute the given function on a given node via the task supervisor - @spec run(Node.t(), Job.t(), GenServer.server(), boolean()) :: Task.t() - defp run(node, %{name: job_name, task: task}, task_supervisor, debug_logging) do + @spec run(Node.t(), Job.t(), GenServer.server(), boolean(), Scheduler) :: Task.t() + defp run(node, %{name: job_name, task: task}, task_supervisor, debug_logging, scheduler) do debug_logging && Logger.debug(fn -> "[#{inspect(Node.self())}][#{__MODULE__}] Task for job #{inspect(job_name)} started on node #{ @@ -90,7 +91,8 @@ defmodule Quantum.Executor do :telemetry.execute([:quantum, :job, :start], %{system_time: start_monotonic_time}, %{ job_name: job_name, node: inspect(node), - module: __MODULE__ + module: __MODULE__, + scheduler: scheduler }) try do @@ -111,7 +113,8 @@ defmodule Quantum.Executor do node: inspect(node), module: __MODULE__, reason: value, - stacktrace: __STACKTRACE__ + stacktrace: __STACKTRACE__, + scheduler: scheduler }) else result -> @@ -127,7 +130,8 @@ defmodule Quantum.Executor do :telemetry.execute([:quantum, :job, :stop], %{duration: duration}, %{ job_name: job_name, node: inspect(node), - module: __MODULE__ + module: __MODULE__, + scheduler: scheduler }) end diff --git a/lib/quantum/executor/start_opts.ex b/lib/quantum/executor/start_opts.ex index 2115332..84901a5 100644 --- a/lib/quantum/executor/start_opts.ex +++ b/lib/quantum/executor/start_opts.ex @@ -6,13 +6,15 @@ defmodule Quantum.Executor.StartOpts do @type t :: %__MODULE__{ task_supervisor_reference: GenServer.server(), task_registry_reference: GenServer.server(), - debug_logging: boolean + debug_logging: boolean, + scheduler: Scheduler } @enforce_keys [ :task_supervisor_reference, :task_registry_reference, - :debug_logging + :debug_logging, + :scheduler ] defstruct @enforce_keys end diff --git a/lib/quantum/executor_supervisor.ex b/lib/quantum/executor_supervisor.ex index 5644186..ba98275 100644 --- a/lib/quantum/executor_supervisor.ex +++ b/lib/quantum/executor_supervisor.ex @@ -19,7 +19,8 @@ defmodule Quantum.ExecutorSupervisor do :node_selector_broadcaster_reference, :task_supervisor_reference, :task_registry_reference, - :debug_logging + :debug_logging, + :scheduler ]) ), name: name @@ -49,7 +50,8 @@ defmodule Quantum.ExecutorSupervisor do Map.take(opts, [ :task_supervisor_reference, :task_registry_reference, - :debug_logging + :debug_logging, + :scheduler ]) ) diff --git a/lib/quantum/executor_supervisor/init_opts.ex b/lib/quantum/executor_supervisor/init_opts.ex index aa1edbf..dd580d4 100644 --- a/lib/quantum/executor_supervisor/init_opts.ex +++ b/lib/quantum/executor_supervisor/init_opts.ex @@ -7,14 +7,16 @@ defmodule Quantum.ExecutorSupervisor.InitOpts do node_selector_broadcaster_reference: GenServer.server(), task_supervisor_reference: GenServer.server(), task_registry_reference: GenServer.server(), - debug_logging: boolean + debug_logging: boolean, + scheduler: Scheduler } @enforce_keys [ :node_selector_broadcaster_reference, :task_supervisor_reference, :task_registry_reference, - :debug_logging + :debug_logging, + :scheduler ] defstruct @enforce_keys end diff --git a/lib/quantum/executor_supervisor/start_opts.ex b/lib/quantum/executor_supervisor/start_opts.ex index 09b6fee..4ae0a19 100644 --- a/lib/quantum/executor_supervisor/start_opts.ex +++ b/lib/quantum/executor_supervisor/start_opts.ex @@ -8,7 +8,8 @@ defmodule Quantum.ExecutorSupervisor.StartOpts do node_selector_broadcaster_reference: GenServer.server(), task_supervisor_reference: GenServer.server(), task_registry_reference: GenServer.server(), - debug_logging: boolean() + debug_logging: boolean(), + scheduler: Scheduler } @enforce_keys [ @@ -16,7 +17,8 @@ defmodule Quantum.ExecutorSupervisor.StartOpts do :node_selector_broadcaster_reference, :task_supervisor_reference, :task_registry_reference, - :debug_logging + :debug_logging, + :scheduler ] defstruct @enforce_keys end diff --git a/lib/quantum/job_broadcaster.ex b/lib/quantum/job_broadcaster.ex index 09690b1..fc90034 100644 --- a/lib/quantum/job_broadcaster.ex +++ b/lib/quantum/job_broadcaster.ex @@ -66,7 +66,8 @@ defmodule Quantum.JobBroadcaster do job_name: name, job: job, node: inspect(Node.self()), - module: __MODULE__ + module: __MODULE__, + scheduler: scheduler }) end @@ -114,7 +115,8 @@ defmodule Quantum.JobBroadcaster do job_name: job_name, job: job, node: inspect(Node.self()), - module: __MODULE__ + module: __MODULE__, + scheduler: state.scheduler }) :ok = storage.delete_job(storage_pid, job_name) @@ -145,7 +147,8 @@ defmodule Quantum.JobBroadcaster do job_name: job_name, job: job, node: inspect(Node.self()), - module: __MODULE__ + module: __MODULE__, + scheduler: state.scheduler }) :ok = storage.add_job(storage_pid, job) @@ -175,7 +178,8 @@ defmodule Quantum.JobBroadcaster do job_name: job_name, job: job, node: inspect(Node.self()), - module: __MODULE__ + module: __MODULE__, + scheduler: state.scheduler }) :ok = storage.delete_job(storage_pid, job_name) @@ -205,7 +209,8 @@ defmodule Quantum.JobBroadcaster do job_name: job_name, job: job, node: inspect(Node.self()), - module: __MODULE__ + module: __MODULE__, + scheduler: state.scheduler }) :ok = storage.add_job(storage_pid, job) @@ -235,7 +240,8 @@ defmodule Quantum.JobBroadcaster do job_name: name, job: job, node: inspect(Node.self()), - module: __MODULE__ + module: __MODULE__, + scheduler: state.scheduler }) :ok = storage.delete_job(storage_pid, name) @@ -248,7 +254,8 @@ defmodule Quantum.JobBroadcaster do job_name: name, job: job, node: inspect(Node.self()), - module: __MODULE__ + module: __MODULE__, + scheduler: state.scheduler }) :ok = storage.delete_job(storage_pid, name) @@ -287,7 +294,8 @@ defmodule Quantum.JobBroadcaster do job_name: name, job: job, node: inspect(Node.self()), - module: __MODULE__ + module: __MODULE__, + scheduler: state.scheduler }) jobs = Map.update!(jobs, name, &Job.set_state(&1, new_state)) @@ -325,7 +333,8 @@ defmodule Quantum.JobBroadcaster do job_name: name, job: job, node: inspect(Node.self()), - module: __MODULE__ + module: __MODULE__, + scheduler: state.scheduler }) {:remove, name} diff --git a/lib/quantum/supervisor.ex b/lib/quantum/supervisor.ex index a548c70..0468d2f 100644 --- a/lib/quantum/supervisor.ex +++ b/lib/quantum/supervisor.ex @@ -88,6 +88,7 @@ defmodule Quantum.Supervisor do |> Map.put(:task_supervisor_reference, task_supervisor_name) |> Map.put(:task_registry_reference, task_registry_name) |> Map.put(:name, executor_supervisor_name) + |> Map.put(:scheduler, scheduler) ) Supervisor.init( diff --git a/test/quantum/executor_test.exs b/test/quantum/executor_test.exs index d52701e..1ec363f 100644 --- a/test/quantum/executor_test.exs +++ b/test/quantum/executor_test.exs @@ -23,7 +23,7 @@ defmodule Quantum.ExecutorTest do def handle_event( [:quantum, :job, :start], %{system_time: _system_time} = _measurements, - %{job_name: job_name, module: _module, node: _node} = _metadata, + %{job_name: job_name, module: _module, node: _node, scheduler: _scheduler} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do send(parent_thread, %{test_id: test_id, job_name: job_name, type: :start}) @@ -32,7 +32,7 @@ defmodule Quantum.ExecutorTest do def handle_event( [:quantum, :job, :stop], %{duration: _duration} = _measurements, - %{job_name: job_name, module: _module, node: _node} = _metadata, + %{job_name: job_name, module: _module, node: _node, scheduler: _scheduler} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do send(parent_thread, %{test_id: test_id, job_name: job_name, type: :stop}) @@ -46,7 +46,8 @@ defmodule Quantum.ExecutorTest do module: _module, node: _node, reason: reason, - stacktrace: stacktrace + stacktrace: stacktrace, + scheduler: _scheduler } = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do @@ -98,7 +99,8 @@ defmodule Quantum.ExecutorTest do %{ task_supervisor: Module.concat(__MODULE__, TaskSupervisor), task_registry: Module.concat(__MODULE__, TaskRegistry), - debug_logging: true + debug_logging: true, + scheduler: TestScheduler } } end @@ -107,7 +109,8 @@ defmodule Quantum.ExecutorTest do test "executes given task using anonymous function", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do caller = self() @@ -124,7 +127,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) @@ -139,7 +143,8 @@ defmodule Quantum.ExecutorTest do test "executes given task using function tuple", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do caller = self() @@ -156,7 +161,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) @@ -171,7 +177,8 @@ defmodule Quantum.ExecutorTest do test "executes given task without overlap", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do caller = self() test_id = "log-task-no-overlap-handler" @@ -191,7 +198,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) @@ -200,7 +208,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) @@ -216,7 +225,8 @@ defmodule Quantum.ExecutorTest do test "releases lock on success", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do caller = self() test_id = "release-lock-on-success-handler" @@ -244,7 +254,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: node} ) @@ -269,7 +280,8 @@ defmodule Quantum.ExecutorTest do test "releases lock on error", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do test_id = "release-lock-on-error-handler" @@ -289,7 +301,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) @@ -318,7 +331,8 @@ defmodule Quantum.ExecutorTest do test "logs error", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do test_id = "logs-error-handler" @@ -336,7 +350,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) @@ -365,7 +380,8 @@ defmodule Quantum.ExecutorTest do test "logs exit", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do test_id = "logs-exit-handler" @@ -383,7 +399,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) @@ -412,7 +429,8 @@ defmodule Quantum.ExecutorTest do test "logs throw", %{ task_supervisor: task_supervisor, task_registry: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler } do test_id = "logs-throw-handler" @@ -432,7 +450,8 @@ defmodule Quantum.ExecutorTest do %StartOpts{ task_supervisor_reference: task_supervisor, task_registry_reference: task_registry, - debug_logging: debug_logging + debug_logging: debug_logging, + scheduler: scheduler }, %Event{job: job, node: Node.self()} ) diff --git a/test/quantum/job_broadcaster_test.exs b/test/quantum/job_broadcaster_test.exs index 08eb426..599c80b 100644 --- a/test/quantum/job_broadcaster_test.exs +++ b/test/quantum/job_broadcaster_test.exs @@ -27,7 +27,8 @@ defmodule Quantum.JobBroadcasterTest do def handle_event( [:quantum, :job, :add], _measurements, - %{job_name: job_name, job: _job, module: _module, node: _node} = _metadata, + %{job_name: job_name, job: _job, module: _module, node: _node, scheduler: _scheduler} = + _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do send(parent_thread, %{test_id: test_id, job_name: job_name, type: :add}) @@ -36,7 +37,8 @@ defmodule Quantum.JobBroadcasterTest do def handle_event( [:quantum, :job, :delete], _measurements, - %{job_name: job_name, job: _job, module: _module, node: _node} = _metadata, + %{job_name: job_name, job: _job, module: _module, node: _node, scheduler: _scheduler} = + _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do send(parent_thread, %{test_id: test_id, job_name: job_name, type: :delete}) @@ -45,7 +47,8 @@ defmodule Quantum.JobBroadcasterTest do def handle_event( [:quantum, :job, :update], _measurements, - %{job_name: job_name, job: _job, module: _module, node: _node} = _metadata, + %{job_name: job_name, job: _job, module: _module, node: _node, scheduler: _scheduler} = + _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do send(parent_thread, %{test_id: test_id, job_name: job_name, type: :update}) From e960b42db284109bb5902d6d1de41ece443f937f Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Thu, 10 Sep 2020 16:40:44 -0700 Subject: [PATCH 13/16] lint --- lib/quantum/executor.ex | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/quantum/executor.ex b/lib/quantum/executor.ex index 2d8e781..465a9e9 100644 --- a/lib/quantum/executor.ex +++ b/lib/quantum/executor.ex @@ -25,7 +25,11 @@ defmodule Quantum.Executor do @spec execute(StartOpts.t(), Job.t(), Node.t()) :: :ok # Execute task on all given nodes without checking for overlap defp execute( - %StartOpts{task_supervisor_reference: task_supervisor, debug_logging: debug_logging, scheduler: scheduler}, + %StartOpts{ + task_supervisor_reference: task_supervisor, + debug_logging: debug_logging, + scheduler: scheduler + }, %Job{overlap: true} = job, node ) do From d797fd6ba9e5e0f97f918ed119107ca78c1b63c2 Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Fri, 11 Sep 2020 09:37:26 -0700 Subject: [PATCH 14/16] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jonatan Männchen --- lib/quantum/executor.ex | 5 +---- lib/quantum/executor/start_opts.ex | 2 +- lib/quantum/executor_supervisor/init_opts.ex | 2 +- lib/quantum/executor_supervisor/start_opts.ex | 2 +- lib/quantum/job_broadcaster.ex | 13 ++----------- 5 files changed, 6 insertions(+), 18 deletions(-) diff --git a/lib/quantum/executor.ex b/lib/quantum/executor.ex index 465a9e9..bb3ecac 100644 --- a/lib/quantum/executor.ex +++ b/lib/quantum/executor.ex @@ -74,7 +74,7 @@ defmodule Quantum.Executor do end # Ececute the given function on a given node via the task supervisor - @spec run(Node.t(), Job.t(), GenServer.server(), boolean(), Scheduler) :: Task.t() + @spec run(Node.t(), Job.t(), GenServer.server(), boolean(), Module.t()) :: Task.t() defp run(node, %{name: job_name, task: task}, task_supervisor, debug_logging, scheduler) do debug_logging && Logger.debug(fn -> @@ -95,7 +95,6 @@ defmodule Quantum.Executor do :telemetry.execute([:quantum, :job, :start], %{system_time: start_monotonic_time}, %{ job_name: job_name, node: inspect(node), - module: __MODULE__, scheduler: scheduler }) @@ -115,7 +114,6 @@ defmodule Quantum.Executor do :telemetry.execute([:quantum, :job, :exception], %{duration: duration}, %{ job_name: job_name, node: inspect(node), - module: __MODULE__, reason: value, stacktrace: __STACKTRACE__, scheduler: scheduler @@ -134,7 +132,6 @@ defmodule Quantum.Executor do :telemetry.execute([:quantum, :job, :stop], %{duration: duration}, %{ job_name: job_name, node: inspect(node), - module: __MODULE__, scheduler: scheduler }) end diff --git a/lib/quantum/executor/start_opts.ex b/lib/quantum/executor/start_opts.ex index 84901a5..a12a421 100644 --- a/lib/quantum/executor/start_opts.ex +++ b/lib/quantum/executor/start_opts.ex @@ -7,7 +7,7 @@ defmodule Quantum.Executor.StartOpts do task_supervisor_reference: GenServer.server(), task_registry_reference: GenServer.server(), debug_logging: boolean, - scheduler: Scheduler + scheduler: Module.t() } @enforce_keys [ diff --git a/lib/quantum/executor_supervisor/init_opts.ex b/lib/quantum/executor_supervisor/init_opts.ex index dd580d4..e2b27f0 100644 --- a/lib/quantum/executor_supervisor/init_opts.ex +++ b/lib/quantum/executor_supervisor/init_opts.ex @@ -8,7 +8,7 @@ defmodule Quantum.ExecutorSupervisor.InitOpts do task_supervisor_reference: GenServer.server(), task_registry_reference: GenServer.server(), debug_logging: boolean, - scheduler: Scheduler + scheduler: Module.t() } @enforce_keys [ diff --git a/lib/quantum/executor_supervisor/start_opts.ex b/lib/quantum/executor_supervisor/start_opts.ex index 4ae0a19..59761c2 100644 --- a/lib/quantum/executor_supervisor/start_opts.ex +++ b/lib/quantum/executor_supervisor/start_opts.ex @@ -9,7 +9,7 @@ defmodule Quantum.ExecutorSupervisor.StartOpts do task_supervisor_reference: GenServer.server(), task_registry_reference: GenServer.server(), debug_logging: boolean(), - scheduler: Scheduler + scheduler: Module.t() } @enforce_keys [ diff --git a/lib/quantum/job_broadcaster.ex b/lib/quantum/job_broadcaster.ex index fc90034..3d69e3f 100644 --- a/lib/quantum/job_broadcaster.ex +++ b/lib/quantum/job_broadcaster.ex @@ -66,7 +66,6 @@ defmodule Quantum.JobBroadcaster do job_name: name, job: job, node: inspect(Node.self()), - module: __MODULE__, scheduler: scheduler }) end @@ -111,11 +110,10 @@ defmodule Quantum.JobBroadcaster do end) # Send event to telemetry incase the end user wants to monitor events - :telemetry.execute([:quantum, :job, :add], %{}, %{ + :telemetry.execute([:quantum, :job, :update], %{}, %{ job_name: job_name, job: job, node: inspect(Node.self()), - module: __MODULE__, scheduler: state.scheduler }) @@ -147,7 +145,6 @@ defmodule Quantum.JobBroadcaster do job_name: job_name, job: job, node: inspect(Node.self()), - module: __MODULE__, scheduler: state.scheduler }) @@ -174,11 +171,10 @@ defmodule Quantum.JobBroadcaster do end) # Send event to telemetry incase the end user wants to monitor events - :telemetry.execute([:quantum, :job, :add], %{}, %{ + :telemetry.execute([:quantum, :job, :update], %{}, %{ job_name: job_name, job: job, node: inspect(Node.self()), - module: __MODULE__, scheduler: state.scheduler }) @@ -209,7 +205,6 @@ defmodule Quantum.JobBroadcaster do job_name: job_name, job: job, node: inspect(Node.self()), - module: __MODULE__, scheduler: state.scheduler }) @@ -240,7 +235,6 @@ defmodule Quantum.JobBroadcaster do job_name: name, job: job, node: inspect(Node.self()), - module: __MODULE__, scheduler: state.scheduler }) @@ -254,7 +248,6 @@ defmodule Quantum.JobBroadcaster do job_name: name, job: job, node: inspect(Node.self()), - module: __MODULE__, scheduler: state.scheduler }) @@ -294,7 +287,6 @@ defmodule Quantum.JobBroadcaster do job_name: name, job: job, node: inspect(Node.self()), - module: __MODULE__, scheduler: state.scheduler }) @@ -333,7 +325,6 @@ defmodule Quantum.JobBroadcaster do job_name: name, job: job, node: inspect(Node.self()), - module: __MODULE__, scheduler: state.scheduler }) From 695ce423490b051e104387d531ea2fa5c04308d7 Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Fri, 11 Sep 2020 14:05:12 -0700 Subject: [PATCH 15/16] fixed tests, dialyzer doesnt like Module.t --- lib/quantum/job_broadcaster.ex | 37 +++++++++++++++++++-------- test/quantum/executor_test.exs | 5 ++-- test/quantum/job_broadcaster_test.exs | 15 +++++------ 3 files changed, 34 insertions(+), 23 deletions(-) diff --git a/lib/quantum/job_broadcaster.ex b/lib/quantum/job_broadcaster.ex index 3d69e3f..0467337 100644 --- a/lib/quantum/job_broadcaster.ex +++ b/lib/quantum/job_broadcaster.ex @@ -129,6 +129,14 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}" end) + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :update], %{}, %{ + job_name: job_name, + job: job, + node: inspect(Node.self()), + scheduler: state.scheduler + }) + :ok = storage.delete_job(storage_pid, job_name) :ok = storage.add_job(storage_pid, job) @@ -189,6 +197,14 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Replacing job #{inspect(job_name)}" end) + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :update], %{}, %{ + job_name: job_name, + job: job, + node: inspect(Node.self()), + scheduler: state.scheduler + }) + :ok = storage.delete_job(storage_pid, job_name) :ok = storage.add_job(storage_pid, job) @@ -318,18 +334,17 @@ defmodule Quantum.JobBroadcaster do "[#{inspect(Node.self())}][#{__MODULE__}] Deleting all jobs" end) - messages = - for {name, %Job{state: :active} = job} <- jobs do - # Send event to telemetry incase the end user wants to monitor events - :telemetry.execute([:quantum, :job, :delete], %{}, %{ - job_name: name, - job: job, - node: inspect(Node.self()), - scheduler: state.scheduler - }) + for {name, %Job{} = job} <- jobs do + # Send event to telemetry incase the end user wants to monitor events + :telemetry.execute([:quantum, :job, :delete], %{}, %{ + job_name: name, + job: job, + node: inspect(Node.self()), + scheduler: state.scheduler + }) + end - {:remove, name} - end + messages = for {name, %Job{state: :active}} <- jobs, do: {:remove, name} :ok = storage.purge(storage_pid) diff --git a/test/quantum/executor_test.exs b/test/quantum/executor_test.exs index 1ec363f..2201a07 100644 --- a/test/quantum/executor_test.exs +++ b/test/quantum/executor_test.exs @@ -23,7 +23,7 @@ defmodule Quantum.ExecutorTest do def handle_event( [:quantum, :job, :start], %{system_time: _system_time} = _measurements, - %{job_name: job_name, module: _module, node: _node, scheduler: _scheduler} = _metadata, + %{job_name: job_name, node: _node, scheduler: _scheduler} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do send(parent_thread, %{test_id: test_id, job_name: job_name, type: :start}) @@ -32,7 +32,7 @@ defmodule Quantum.ExecutorTest do def handle_event( [:quantum, :job, :stop], %{duration: _duration} = _measurements, - %{job_name: job_name, module: _module, node: _node, scheduler: _scheduler} = _metadata, + %{job_name: job_name, node: _node, scheduler: _scheduler} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do send(parent_thread, %{test_id: test_id, job_name: job_name, type: :stop}) @@ -43,7 +43,6 @@ defmodule Quantum.ExecutorTest do %{duration: _duration} = _measurements, %{ job_name: job_name, - module: _module, node: _node, reason: reason, stacktrace: stacktrace, diff --git a/test/quantum/job_broadcaster_test.exs b/test/quantum/job_broadcaster_test.exs index 599c80b..a5b9a04 100644 --- a/test/quantum/job_broadcaster_test.exs +++ b/test/quantum/job_broadcaster_test.exs @@ -27,8 +27,7 @@ defmodule Quantum.JobBroadcasterTest do def handle_event( [:quantum, :job, :add], _measurements, - %{job_name: job_name, job: _job, module: _module, node: _node, scheduler: _scheduler} = - _metadata, + %{job_name: job_name, job: _job, node: _node, scheduler: _scheduler} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do send(parent_thread, %{test_id: test_id, job_name: job_name, type: :add}) @@ -37,8 +36,7 @@ defmodule Quantum.JobBroadcasterTest do def handle_event( [:quantum, :job, :delete], _measurements, - %{job_name: job_name, job: _job, module: _module, node: _node, scheduler: _scheduler} = - _metadata, + %{job_name: job_name, job: _job, node: _node, scheduler: _scheduler} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do send(parent_thread, %{test_id: test_id, job_name: job_name, type: :delete}) @@ -47,8 +45,7 @@ defmodule Quantum.JobBroadcasterTest do def handle_event( [:quantum, :job, :update], _measurements, - %{job_name: job_name, job: _job, module: _module, node: _node, scheduler: _scheduler} = - _metadata, + %{job_name: job_name, job: _job, node: _node, scheduler: _scheduler} = _metadata, %{parent_thread: parent_thread, test_id: test_id} ) do send(parent_thread, %{test_id: test_id, job_name: job_name, type: :update}) @@ -347,7 +344,7 @@ defmodule Quantum.JobBroadcasterTest do end) end) - assert_receive %{test_id: ^test_id} + assert_receive %{test_id: ^test_id, type: :delete} end @tag listen_storage: true @@ -381,7 +378,7 @@ defmodule Quantum.JobBroadcasterTest do end) end) - assert_receive %{test_id: ^test_id} + assert_receive %{test_id: ^test_id, type: :delete} end end @@ -510,7 +507,7 @@ defmodule Quantum.JobBroadcasterTest do assert_receive {:purge, _, _} end) - refute_receive %{test_id: ^test_id, job_name: ^inactive_job_name, type: :delete} + assert_receive %{test_id: ^test_id, job_name: ^inactive_job_name, type: :delete} assert_receive %{test_id: ^test_id, job_name: ^active_job_name, type: :delete} end end From 5a0b9f2c1917ba71c5967bfe6be674472d71c4cb Mon Sep 17 00:00:00 2001 From: Marc Smith Date: Fri, 11 Sep 2020 14:16:28 -0700 Subject: [PATCH 16/16] Module.t() to atom --- lib/quantum/executor.ex | 2 +- lib/quantum/executor/start_opts.ex | 2 +- lib/quantum/executor_supervisor/init_opts.ex | 2 +- lib/quantum/executor_supervisor/start_opts.ex | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/quantum/executor.ex b/lib/quantum/executor.ex index bb3ecac..60ea3f0 100644 --- a/lib/quantum/executor.ex +++ b/lib/quantum/executor.ex @@ -74,7 +74,7 @@ defmodule Quantum.Executor do end # Ececute the given function on a given node via the task supervisor - @spec run(Node.t(), Job.t(), GenServer.server(), boolean(), Module.t()) :: Task.t() + @spec run(Node.t(), Job.t(), GenServer.server(), boolean(), atom()) :: Task.t() defp run(node, %{name: job_name, task: task}, task_supervisor, debug_logging, scheduler) do debug_logging && Logger.debug(fn -> diff --git a/lib/quantum/executor/start_opts.ex b/lib/quantum/executor/start_opts.ex index a12a421..e26d9e8 100644 --- a/lib/quantum/executor/start_opts.ex +++ b/lib/quantum/executor/start_opts.ex @@ -7,7 +7,7 @@ defmodule Quantum.Executor.StartOpts do task_supervisor_reference: GenServer.server(), task_registry_reference: GenServer.server(), debug_logging: boolean, - scheduler: Module.t() + scheduler: atom() } @enforce_keys [ diff --git a/lib/quantum/executor_supervisor/init_opts.ex b/lib/quantum/executor_supervisor/init_opts.ex index e2b27f0..cbff5f4 100644 --- a/lib/quantum/executor_supervisor/init_opts.ex +++ b/lib/quantum/executor_supervisor/init_opts.ex @@ -8,7 +8,7 @@ defmodule Quantum.ExecutorSupervisor.InitOpts do task_supervisor_reference: GenServer.server(), task_registry_reference: GenServer.server(), debug_logging: boolean, - scheduler: Module.t() + scheduler: atom() } @enforce_keys [ diff --git a/lib/quantum/executor_supervisor/start_opts.ex b/lib/quantum/executor_supervisor/start_opts.ex index 59761c2..d42fe57 100644 --- a/lib/quantum/executor_supervisor/start_opts.ex +++ b/lib/quantum/executor_supervisor/start_opts.ex @@ -9,7 +9,7 @@ defmodule Quantum.ExecutorSupervisor.StartOpts do task_supervisor_reference: GenServer.server(), task_registry_reference: GenServer.server(), debug_logging: boolean(), - scheduler: Module.t() + scheduler: atom() } @enforce_keys [