Skip to content

Commit

Permalink
Remove old device_connections in worker, remove connection state st…
Browse files Browse the repository at this point in the history
…ored on `device` (#1807)

- Introduce `device.latest_connection`, which references the latest
`device_connection` record for the device
- Start cleaning up `device_connections` every hour with
`NervesHub.Workers.DeleteOldDeviceConnections`
- This targets rows that have a `last_seen_at` older than two weeks by
default (configurable), a `status` of `:disconnected` and does not
delete a device's last `device_connection`, even if the previous
criteria are met
- Remove `connection_status`, `connection_established_at`, and
`connection_last_seen_at` from the `device` schema.
- Slot in `device.latest_connection`, simplifying a lot of queries and
logic related to `device_connections`
- Remove `Devices.clean_connection_states` and it's call from a worker
since `device.connection_status` was removed and connection logic is
moving to `device.latest_connection`
- When a device connects to the socket, we set the created
`device_connection` as `device.lastest_connection`
- Create `device.latest_connection` when seeding devices

Todo:
- [x] check filtering UI locally
  • Loading branch information
nshoes authored Jan 23, 2025
1 parent bf0efe9 commit 53e6dc5
Show file tree
Hide file tree
Showing 22 changed files with 225 additions and 259 deletions.
5 changes: 3 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ config :nerves_hub, Oban,
{Oban.Plugins.Pruner, max_age: 604_800},
{Oban.Plugins.Cron,
crontab: [
{"0 * * * *", NervesHub.Workers.ScheduleOrgAuditLogTruncation, max_attempts: 1},
{"*/1 * * * *", NervesHub.Workers.CleanDeviceConnectionStates},
{"0 * * * *", NervesHub.Workers.ScheduleOrgAuditLogTruncation},
{"*/1 * * * *", NervesHub.Workers.CleanStaleDeviceConnections},
{"0 */1 * * *", NervesHub.Workers.DeleteOldDeviceConnections},
{"*/5 * * * *", NervesHub.Workers.ExpireInflightUpdates},
{"*/15 * * * *", NervesHub.Workers.DeviceHealthTruncation}
]}
Expand Down
4 changes: 4 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ config :nerves_hub,
String.to_integer(System.get_env("DEVICE_DEPLOYMENT_CHANGE_JITTER_SECONDS", "10")),
device_last_seen_update_interval_minutes:
String.to_integer(System.get_env("DEVICE_LAST_SEEN_UPDATE_INTERVAL_MINUTES", "5")),
device_connection_max_age_days:
String.to_integer(System.get_env("DEVICE_CONNECTION_MAX_AGE_DAYS", "14")),
device_connection_delete_limit:
String.to_integer(System.get_env("DEVICE_CONNECTION_DELETE_LIMIT", "100000")),
deployment_calculator_interval_seconds:
String.to_integer(System.get_env("DEPLOYMENT_CALCULATOR_INTERVAL_SECONDS", "3600")),
mapbox_access_token: System.get_env("MAPBOX_ACCESS_TOKEN"),
Expand Down
3 changes: 0 additions & 3 deletions lib/mix/tasks/gen.devices.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ defmodule Mix.Tasks.NervesHub.Gen.Devices do
org_id: org.id,
product_id: product.id,
identifier: "generated-#{i}",
connection_status: :connected,
connection_established_at: DateTime.now!("Etc/UTC"),
connection_last_seen_at: DateTime.now!("Etc/UTC"),
connection_metadata: %{
"location" => %{
"longitude" => lng,
Expand Down
77 changes: 26 additions & 51 deletions lib/nerves_hub/devices.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ defmodule NervesHub.Devices do
alias NervesHub.Deployments.Deployment
alias NervesHub.Deployments.Orchestrator
alias NervesHub.Devices.CACertificate
alias NervesHub.Devices.Connections
alias NervesHub.Devices.Device
alias NervesHub.Devices.DeviceCertificate
alias NervesHub.Devices.DeviceHealth
Expand All @@ -36,13 +35,6 @@ defmodule NervesHub.Devices do
Repo.get(Device, device_id)
end

def get_device(device_id, :preload_latest_connection) when is_integer(device_id) do
Device
|> where(id: ^device_id)
|> Connections.preload_latest_connection()
|> Repo.one()
end

def get_active_device(filters) do
Device
|> Repo.exclude_deleted()
Expand All @@ -61,15 +53,6 @@ defmodule NervesHub.Devices do
|> Repo.all()
end

def get_devices_by_org_id_and_product_id(org_id, product_id, :preload_latest_connection) do
Device
|> where([d], d.org_id == ^org_id)
|> where([d], d.product_id == ^product_id)
|> Connections.preload_latest_connection()
|> Repo.exclude_deleted()
|> Repo.all()
end

def get_devices_by_org_id_and_product_id(org_id, product_id, opts) do
{entries, _pager} = get_devices_by_org_id_and_product_id_with_pager(org_id, product_id, opts)
entries
Expand All @@ -89,11 +72,16 @@ defmodule NervesHub.Devices do
|> join(:left, [d, o], p in assoc(d, :product))
|> join(:left, [d, o, p], dp in assoc(d, :deployment))
|> join(:left, [d, o, p, dp], f in assoc(dp, :firmware))
|> join(:left, [d, o, p, dp, f], lc in assoc(d, :latest_connection), as: :latest_connection)
|> Repo.exclude_deleted()
|> order_by(^sort_devices(sorting))
|> sort_devices(sorting)
|> Filtering.build_filters(filters)
|> preload([d, o, p, dp, f], org: o, product: p, deployment: {dp, firmware: f})
|> Connections.preload_latest_connection()
|> preload([d, o, p, dp, f, latest_connection: lc],
org: o,
product: p,
deployment: {dp, firmware: f},
latest_connection: lc
)
|> Flop.run(flop)
end

Expand Down Expand Up @@ -126,10 +114,11 @@ defmodule NervesHub.Devices do

Device
|> where([d], d.product_id == ^product_id)
|> Connections.preload_latest_connection()
|> Repo.exclude_deleted()
|> join(:left, [d], dc in assoc(d, :latest_connection), as: :latest_connection)
|> preload([latest_connection: lc], latest_connection: lc)
|> Filtering.build_filters(filters)
|> order_by(^sort_devices(sorting))
|> sort_devices(sorting)
|> Flop.run(flop)
|> then(fn {entries, meta} ->
meta
Expand All @@ -145,14 +134,15 @@ defmodule NervesHub.Devices do

def get_minimal_device_location_by_org_id_and_product_id(org_id, product_id) do
Device
|> join(:inner, [d], dc in DeviceConnection, on: d.latest_connection_id == dc.id)
|> where(org_id: ^org_id)
|> where(product_id: ^product_id)
|> where([d], not is_nil(fragment("?->'location'->'latitude'", d.connection_metadata)))
|> where([d], not is_nil(fragment("?->'location'->'longitude'", d.connection_metadata)))
|> select([d, dc], %{
id: d.id,
identifier: d.identifier,
connection_status: d.connection_status,
connection_status: dc.connection_status,
latitude: fragment("?->'location'->'latitude'", d.connection_metadata),
longitude: fragment("?->'location'->'longitude'", d.connection_metadata),
firmware_uuid: fragment("?->'uuid'", d.firmware_metadata)
Expand Down Expand Up @@ -204,13 +194,19 @@ defmodule NervesHub.Devices do
end)
end

defp sort_devices({:asc, :connection_last_seen_at}),
do: {:asc_nulls_first, :connection_last_seen_at}
defp sort_devices(query, {:asc, :connection_last_seen_at}) do
order_by(query, [latest_connection: latest_connection],
desc_nulls_last: latest_connection.last_seen_at
)
end

defp sort_devices({:desc, :connection_last_seen_at}),
do: {:desc_nulls_last, :connection_last_seen_at}
defp sort_devices(query, {:desc, :connection_last_seen_at}) do
order_by(query, [latest_connection: latest_connection],
asc_nulls_first: latest_connection.last_seen_at
)
end

defp sort_devices(sort), do: sort
defp sort_devices(query, sort), do: order_by(query, [], ^sort)

def get_device_count_by_org_id(org_id) do
q =
Expand Down Expand Up @@ -299,7 +295,8 @@ defmodule NervesHub.Devices do

defp join_and_preload(query, :latest_connection) do
query
|> Connections.preload_latest_connection()
|> join(:left, [d], dc in assoc(d, :latest_connection), as: :latest_connection)
|> preload([latest_connection: lc], latest_connection: lc)
end

@spec get_shared_secret_auth(String.t()) ::
Expand Down Expand Up @@ -655,28 +652,6 @@ defmodule NervesHub.Devices do
|> Repo.all()
end

def clean_connection_states() do
interval = Application.get_env(:nerves_hub, :device_last_seen_update_interval_minutes)
a_minute_ago = DateTime.shift(DateTime.utc_now(), minute: -(interval + 1))

Device
|> where(connection_status: :connected)
|> where([d], d.connection_last_seen_at < ^a_minute_ago)
|> Repo.update_all(
set: [
connection_status: :disconnected,
connection_disconnected_at: DateTime.utc_now()
]
)
end

def connected_count(product) do
Device
|> where(connection_status: :connected)
|> where(product_id: ^product.id)
|> Repo.aggregate(:count)
end

def update_firmware_metadata(device, nil) do
{:ok, device}
end
Expand Down
104 changes: 47 additions & 57 deletions lib/nerves_hub/devices/connections.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,6 @@ defmodule NervesHub.Devices.Connections do
|> Repo.one()
end

@doc """
Preload latest respective connection in a device query.
"""
@spec preload_latest_connection(Ecto.Query.t()) :: Ecto.Query.t()
def preload_latest_connection(query) do
query
|> preload(device_connections: ^distinct_on_device())
end

@doc """
Creates a device connection, reported from device socket
"""
Expand All @@ -48,14 +39,25 @@ defmodule NervesHub.Devices.Connections do
def device_connected(device_id) do
now = DateTime.utc_now()

%{
device_id: device_id,
established_at: now,
last_seen_at: now,
status: :connected
}
|> DeviceConnection.create_changeset()
|> Repo.insert()
changeset =
DeviceConnection.create_changeset(%{
device_id: device_id,
established_at: now,
last_seen_at: now,
status: :connected
})

case Repo.insert(changeset) do
{:ok, device_connection} ->
Device
|> where(id: ^device_id)
|> Repo.update_all(set: [latest_connection_id: device_connection.id])

{:ok, device_connection}

{:error, _} = error ->
error
end
end

@doc """
Expand Down Expand Up @@ -89,46 +91,6 @@ defmodule NervesHub.Devices.Connections do
|> Repo.update()
end

@doc """
Selects devices id's which has provided status in it's latest connection record.
"""
@spec query_devices_with_connection_status(String.t()) :: Ecto.Query.t()
def query_devices_with_connection_status(status) do
(lr in subquery(latest_row_query()))
|> from()
|> where([lr], lr.rn == 1)
|> where(
[lr],
lr.status == ^String.to_existing_atom(status)
)
|> join(:inner, [lr], d in Device, on: lr.device_id == d.id)
|> select([lr, d], d.id)
end

@doc """
Generates a query to retrieve the most recent `DeviceConnection` for devices.
The query includes the row number (`rn`)
for each record, which is used to identify the most recent connection.
Returns an Ecto query.
"""
@spec latest_row_query() :: Ecto.Query.t()
def latest_row_query() do
DeviceConnection
|> select([dc], %{
device_id: dc.device_id,
status: dc.status,
last_seen_at: dc.last_seen_at,
rn: row_number() |> over(partition_by: dc.device_id, order_by: [desc: dc.last_seen_at])
})
end

defp distinct_on_device() do
DeviceConnection
|> distinct(:device_id)
|> order_by([:device_id, desc: :last_seen_at])
end

def clean_stale_connections() do
interval = Application.get_env(:nerves_hub, :device_last_seen_update_interval_minutes)
a_minute_ago = DateTime.shift(DateTime.utc_now(), minute: -(interval + 1))
Expand All @@ -144,4 +106,32 @@ defmodule NervesHub.Devices.Connections do
]
)
end

def delete_old_connections() do
interval = Application.get_env(:nerves_hub, :device_connection_max_age_days)
delete_limit = Application.get_env(:nerves_hub, :device_connection_delete_limit)
days_ago = DateTime.shift(DateTime.utc_now(), day: -interval)

query =
DeviceConnection
|> join(:inner, [dc], d in Device, on: dc.device_id == d.id)
|> where([dc, _d], dc.last_seen_at < ^days_ago)
|> where([dc, _d], dc.status != :connected)
|> where([dc, d], dc.id != d.latest_connection_id)
|> select([dc], dc.id)
|> limit(^delete_limit)

{delete_count, _} =
DeviceConnection
|> where([d], d.id in subquery(query))
|> Repo.delete_all()

if delete_count == 0 do
:ok
else
# relax stress on Ecto pool and go again
Process.sleep(2000)
delete_old_connections()
end
end
end
23 changes: 10 additions & 13 deletions lib/nerves_hub/devices/device.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ defmodule NervesHub.Devices.Device do
:updates_blocked_until,
:connecting_code,
:deployment_id,
:connection_status,
:connection_established_at,
:connection_disconnected_at,
:connection_last_seen_at,
:connection_types,
:connection_metadata,
:status,
Expand All @@ -41,6 +37,7 @@ defmodule NervesHub.Devices.Device do
belongs_to(:org, Org, where: [deleted_at: nil])
belongs_to(:product, Product, where: [deleted_at: nil])
belongs_to(:deployment, Deployment)
belongs_to(:latest_connection, DeviceConnection, type: :binary_id)
embeds_one(:firmware_metadata, FirmwareMetadata, on_replace: :update)
has_many(:device_certificates, DeviceCertificate, on_delete: :delete_all)
has_many(:device_connections, DeviceConnection, on_delete: :delete_all)
Expand All @@ -67,15 +64,15 @@ defmodule NervesHub.Devices.Device do

timestamps()

# Deprecated fields, replaced with device_connections table.
field(:connection_status, Ecto.Enum,
values: [:connected, :disconnected, :not_seen],
default: :not_seen
)

field(:connection_established_at, :utc_datetime)
field(:connection_disconnected_at, :utc_datetime)
field(:connection_last_seen_at, :utc_datetime)
# Deprecated fields, remove these any time after 29/1/2025.
# Also remove index from NervesHub.Repo.Migrations.AddConnectionStatusIndexToDevices.
# field(:connection_status, Ecto.Enum,
# values: [:connected, :disconnected, :not_seen],
# default: :not_seen
# )
# field(:connection_established_at, :utc_datetime)
# field(:connection_disconnected_at, :utc_datetime)
# field(:connection_last_seen_at, :utc_datetime)
embeds_one(:extensions, DeviceExtensionsSetting, on_replace: :update)
end

Expand Down
Loading

0 comments on commit 53e6dc5

Please sign in to comment.