From cf2ac421e167fae844fe45810f7b81e9976ecadf Mon Sep 17 00:00:00 2001 From: Kayla Firestack Date: Thu, 26 Sep 2024 16:06:35 -0400 Subject: [PATCH 1/5] feat(ex/mix): add `dns_cluster` To be able to syncronize multiple Skate instances in production, we need to setup distributed Elixir. Following the way https://github.com/mbta/ride_along did it, this uses `DNSCluster` to find the other Skate instances, and if the variable is not present, it does nothing so this is safe to deploy to production before the corrisponding devops PR. --- config/config.exs | 4 ++++ config/runtime.exs | 2 ++ lib/skate/application.ex | 1 + mix.exs | 1 + mix.lock | 1 + 5 files changed, 9 insertions(+) diff --git a/config/config.exs b/config/config.exs index 7fa6e690f..328be2159 100644 --- a/config/config.exs +++ b/config/config.exs @@ -130,6 +130,10 @@ config :skate, Oban, } ] +config :skate, DNSCluster, + query: :ignore, + log: :info + # Include 2 logger backends config :logger, backends: [:console, Sentry.LoggerBackend] diff --git a/config/runtime.exs b/config/runtime.exs index a345591b5..d41f217f6 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -104,4 +104,6 @@ if config_env() == :prod do # Configure TripModifications to publish if the env var is present config :skate, Skate.Detours.TripModificationPublisher, start: true end + + config :skate, DNSCluster, query: System.get_env("DNS_CLUSTER_QUERY") || :ignore end diff --git a/lib/skate/application.ex b/lib/skate/application.ex index bdbfa54f2..7587f1e4b 100644 --- a/lib/skate/application.ex +++ b/lib/skate/application.ex @@ -38,6 +38,7 @@ defmodule Skate.Application do end ++ [ {Phoenix.PubSub, name: Skate.PubSub}, + {DNSCluster, Application.get_env(:skate, DNSCluster)}, SkateWeb.Endpoint, Skate.Migrate, {Oban, Application.fetch_env!(:skate, Oban)}, diff --git a/mix.exs b/mix.exs index 2d9de237e..8e4a4b53b 100644 --- a/mix.exs +++ b/mix.exs @@ -62,6 +62,7 @@ defmodule Skate.MixProject do {:csv, "~> 2.4.1"}, {:dialyxir, "~> 1.4", only: [:dev, :test], runtime: false}, {:diskusage_logger, "~> 0.2.0"}, + {:dns_cluster, "~> 0.1.3"}, {:ecto_sql, "~> 3.4"}, {:ehmon, github: "mbta/ehmon", only: :prod}, {:emqtt_failover, "~> 0.3.0"}, diff --git a/mix.lock b/mix.lock index 00f4723eb..c224c8758 100644 --- a/mix.lock +++ b/mix.lock @@ -15,6 +15,7 @@ "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, "dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"}, "diskusage_logger": {:hex, :diskusage_logger, "0.2.0", "04fc48b538fe4de43153542a71ea94f623d54707d85844123baacfceedf625c3", [:mix], [], "hexpm", "e3f2aed1b0fc4590931c089a6453a4c4eb4c945912aa97bcabcc0cff7851f34d"}, + "dns_cluster": {:hex, :dns_cluster, "0.1.3", "0bc20a2c88ed6cc494f2964075c359f8c2d00e1bf25518a6a6c7fd277c9b0c66", [:mix], [], "hexpm", "46cb7c4a1b3e52c7ad4cbe33ca5079fbde4840dedeafca2baf77996c2da1bc33"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "ecto": {:hex, :ecto, "3.11.2", "e1d26be989db350a633667c5cda9c3d115ae779b66da567c68c80cfb26a8c9ee", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3c38bca2c6f8d8023f2145326cc8a80100c3ffe4dcbd9842ff867f7fc6156c65"}, "ecto_sql": {:hex, :ecto_sql, "3.11.1", "e9abf28ae27ef3916b43545f9578b4750956ccea444853606472089e7d169470", [:mix], [{:db_connection, "~> 2.5 or ~> 2.4.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.11.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16.0 or ~> 0.17.0 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ce14063ab3514424276e7e360108ad6c2308f6d88164a076aac8a387e1fea634"}, From 5ed633d061293175036b2778de4af55dfa773d1f Mon Sep 17 00:00:00 2001 From: Kayla Firestack Date: Fri, 27 Sep 2024 08:10:19 -0400 Subject: [PATCH 2/5] feat(ex/cfg/dev): get `PORT` from env if set --- config/dev.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/dev.exs b/config/dev.exs index 4af18e3f2..fc03f84f0 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -19,7 +19,7 @@ config :skate, Skate.Detours.TripModificationPublisher, start: false config :skate, SkateWeb.Endpoint, https: [ ip: {0, 0, 0, 0}, - port: 4000, + port: System.get_env("PORT") || 4000, cipher_suite: :strong, keyfile: "priv/cert/selfsigned_key.pem", certfile: "priv/cert/selfsigned.pem", From c17eea037e98867e568c048eab73a377b1646f74 Mon Sep 17 00:00:00 2001 From: Kayla Firestack Date: Fri, 27 Sep 2024 08:10:19 -0400 Subject: [PATCH 3/5] fix:test: use server instead of `__MODULE__` like other tests --- test/notifications/notification_server_test.exs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/notifications/notification_server_test.exs b/test/notifications/notification_server_test.exs index 097d11e91..6e618a909 100644 --- a/test/notifications/notification_server_test.exs +++ b/test/notifications/notification_server_test.exs @@ -621,7 +621,7 @@ defmodule Notifications.NotificationServerTest do %{server: server} end - test "saves to database" do + test "saves to database", %{server: server} do notification_count = 3 # create new notification for _ <- 1..notification_count do @@ -629,7 +629,7 @@ defmodule Notifications.NotificationServerTest do detour = insert(:detour) - NotificationServer.detour_activated(detour, notify_finished: self(), server: __MODULE__) + NotificationServer.detour_activated(detour, notify_finished: self(), server: server) assert_receive {:new_notification, detour: ^id} end From 01be4d066b6a7bf2b135d4a277a952c7f578b1e9 Mon Sep 17 00:00:00 2001 From: Kayla Firestack Date: Fri, 27 Sep 2024 08:10:19 -0400 Subject: [PATCH 4/5] feat(ex/notifications/notification_server): send notification to all connected Skate instances refactor: reuse `detour_activated_notification` cast for all detour notifications --- lib/notifications/notification.ex | 5 +- lib/notifications/notification_server.ex | 63 +++++++++++++++++++++--- 2 files changed, 60 insertions(+), 8 deletions(-) diff --git a/lib/notifications/notification.ex b/lib/notifications/notification.ex index 8298bfd63..177527685 100644 --- a/lib/notifications/notification.ex +++ b/lib/notifications/notification.ex @@ -31,7 +31,10 @@ defmodule Notifications.Notification do id: id(), created_at: Util.Time.timestamp(), state: NotificationState.t(), - content: Notifications.Db.BlockWaiver.t() | Notifications.Db.BridgeMovement.t() + content: + Notifications.Db.BlockWaiver.t() + | Notifications.Db.BridgeMovement.t() + | Notifications.Db.Detour.t() } @derive Jason.Encoder diff --git a/lib/notifications/notification_server.ex b/lib/notifications/notification_server.ex index 0b30c01af..bfbc1e770 100644 --- a/lib/notifications/notification_server.ex +++ b/lib/notifications/notification_server.ex @@ -22,8 +22,8 @@ defmodule Notifications.NotificationServer do @spec start_link(Keyword.t()) :: GenServer.on_start() def start_link(opts \\ []) do - name = Keyword.get(opts, :name, __MODULE__) - GenServer.start_link(__MODULE__, nil, name: name) + opts = Keyword.put_new(opts, :name, default_name()) + GenServer.start_link(__MODULE__, opts, name: opts[:name]) end @spec new_block_waivers(BlockWaiver.block_waivers_by_block_key(), GenServer.server()) :: :ok @@ -68,9 +68,11 @@ defmodule Notifications.NotificationServer do # Server + @enforce_keys [:name] + defstruct [:name] @impl GenServer - def init(_) do - {:ok, nil} + def init(opts \\ []) do + {:ok, struct(__MODULE__, opts)} end @impl true @@ -102,11 +104,31 @@ defmodule Notifications.NotificationServer do }, state ) do - detour - |> Notifications.Notification.create_activated_detour_notification_from_detour() - |> broadcast(self()) + notification = + Notifications.Notification.create_activated_detour_notification_from_detour(detour) + + broadcast(notification, self()) notify_caller_new_notification(notify_finished_caller_id, detour: id) + # Send to processes with same name on other nodes + broadcast_notification_to_other_instances(notification, state.name) + + {:noreply, state} + end + + @impl true + # "Private" method for fetching and sending notifications from distributed + # Elixir + def handle_cast( + { + :broadcast_new_detour_notification, + notification_id + }, + state + ) do + notification_id + |> Notifications.Notification.get_detour_notification() + |> broadcast(self()) {:noreply, state} end @@ -120,6 +142,33 @@ defmodule Notifications.NotificationServer do send(caller_id, {:new_notification, value}) end + defp broadcast_notification_to_other_instances( + %Notifications.Notification{ + id: notification_id, + content: %Notifications.Db.Detour{} + }, + server + ) + when not is_nil(notification_id) do + # Currently, we've implemented our own "PubSub" for notifications and we + # are not using the provided `Phoenix.PubSub` that comes with Phoenix + # channels. This means we don't benefit from Phoenix PubSub's ability to + # send messages using distributed Elixir, and that we need to implement + # this ourselves at this current time. + # Ideally, Notifications would be delivered using + # `Phoenix.Channel.broadcast` instead of our custom `broadcast` function + # in `NotificationServer`. To do this, we'd need to implement the same + # filtering mechanism that this module has implemented. For now, we'll + # send messages to other Skate instances letting them know about new + # Notifications. + + # Skate instances currently do not "specialize", and therefore we need to + # send the notification to all instances + for node <- Node.list() do + GenServer.cast({server, node}, {:broadcast_new_detour_notification, notification_id}) + end + end + @spec convert_new_block_waivers_to_notifications([BlockWaiver.t()]) :: [ Notification.t() ] From 9a070d9e9b3b91bd681a30718cd39de8ac97a38e Mon Sep 17 00:00:00 2001 From: Kayla Firestack Date: Fri, 4 Oct 2024 11:07:47 -0400 Subject: [PATCH 5/5] fix: expose EPMD and Erlang RPC ports --- Dockerfile | 3 ++- lib/notifications/notification_server.ex | 10 +++++++++- rel/env.sh.eex | 14 ++++++++++---- rel/vm.args.eex | 4 ++++ 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/Dockerfile b/Dockerfile index 6c8f1d6de..d2811f919 100644 --- a/Dockerfile +++ b/Dockerfile @@ -71,7 +71,8 @@ COPY --from=app-builder --chown=skate:skate /root/_build/prod/rel/skate . COPY --from=app-builder --chown=skate:skate /root/aws-cert-bundle.pem ./priv/aws-cert-bundle.pem -EXPOSE 4000 +# Expose HTTP, EPMD, and Erlang RPC +EXPOSE 4000 4369 57195 ENTRYPOINT ["/usr/bin/dumb-init", "--"] diff --git a/lib/notifications/notification_server.ex b/lib/notifications/notification_server.ex index bfbc1e770..f10d4daf7 100644 --- a/lib/notifications/notification_server.ex +++ b/lib/notifications/notification_server.ex @@ -8,6 +8,8 @@ defmodule Notifications.NotificationServer do use GenServer + require Logger + alias Notifications.Bridge alias Notifications.Notification alias Notifications.NotificationReason @@ -164,7 +166,13 @@ defmodule Notifications.NotificationServer do # Skate instances currently do not "specialize", and therefore we need to # send the notification to all instances - for node <- Node.list() do + nodes = Node.list() + + Logger.info( + "notifying other instances of detour notification_id=#{notification_id} nodes=#{inspect(nodes)}" + ) + + for node <- nodes do GenServer.cast({server, node}, {:broadcast_new_detour_notification, notification_id}) end end diff --git a/rel/env.sh.eex b/rel/env.sh.eex index 3114eb703..5ab3f0c24 100644 --- a/rel/env.sh.eex +++ b/rel/env.sh.eex @@ -1,5 +1,11 @@ #!/bin/sh -INSTANCE_ID=$(curl --max-time 5 -s http://169.254.169.254/latest/meta-data/instance-id || true) -if [ -n "${INSTANCE_ID}" ]; then - export RELEASE_NODE=skate-${INSTANCE_ID} -fi +ip_address=$(hostname -i) + +## Use "long names" for node names for Distributed Elixir +## https://hexdocs.pm/elixir/main/config-and-releases.html#operating-system-environment-configuration +export RELEASE_DISTRIBUTION=name +## Set our node's "long name" so we can find each instance using DNSCluster +export RELEASE_NODE="skate@$ip_address" + +echo "Release Distribution: $RELEASE_DISTRIBUTION" +echo "Release Node: $RELEASE_NODE" diff --git a/rel/vm.args.eex b/rel/vm.args.eex index 8ab7174a9..7091d232c 100644 --- a/rel/vm.args.eex +++ b/rel/vm.args.eex @@ -3,3 +3,7 @@ ## Give more memory to the literal allocator, used by :persistent_term +MIscs 2048 + +## Set and restrict Erlang RPC port +-kernel inet_dist_listen_min 57195 +-kernel inet_dist_listen_max 57195