Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically reevaluate app sessions and support interrupt #1928

Merged
merged 2 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions lib/livebook/app.ex
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,15 @@ defmodule Livebook.App do
app_session = Enum.find(state.sessions, &(&1.version == state.version))

if app_session do
if state.notebook.app_settings.zero_downtime and app_session.app_status != :executed do
Enum.find(state.sessions, &(&1.app_status == :executed))
if state.notebook.app_settings.zero_downtime and not status_ready?(app_session.app_status) do
Enum.find(state.sessions, &status_ready?(&1.app_status))
end || app_session
end
end

defp status_ready?(%{execution: :executed, lifecycle: :active}), do: true
defp status_ready?(_status), do: false

defp start_eagerly(state) when state.notebook.app_settings.multi_session, do: state

defp start_eagerly(state) do
Expand All @@ -269,7 +272,7 @@ defmodule Livebook.App do
pid: session.pid,
version: state.version,
created_at: session.created_at,
app_status: :executing,
app_status: %{execution: :executing, lifecycle: :active},
client_count: 0,
started_by_id: user && user.id
}
Expand Down Expand Up @@ -318,7 +321,7 @@ defmodule Livebook.App do
defp shutdown_old_versions(state), do: state

defp shutdown_session(app_session) do
if Livebook.Session.Data.app_active?(app_session.app_status) do
if app_session.app_status.lifecycle == :active do
Livebook.Session.app_shutdown(app_session.pid)
end
end
Expand Down
11 changes: 11 additions & 0 deletions lib/livebook/notebook.ex
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,17 @@ defmodule Livebook.Notebook do
end)
end

@doc """
Removes all outputs from the notebook.
"""
@spec clear_outputs(t()) :: t()
def clear_outputs(notebook) do
update_cells(notebook, fn
%{outputs: _outputs} = cell -> %{cell | outputs: []}
cell -> cell
end)
end

@doc """
Adds new output to the given cell.

Expand Down
6 changes: 5 additions & 1 deletion lib/livebook/runtime.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ defprotocol Livebook.Runtime do
# A control element
| {:control, attrs :: map()}
# Internal output format for errors
| {:error, message :: String.t(), type :: {:missing_secret, String.t()} | :other}
| {:error, message :: String.t(),
type ::
{:missing_secret, name :: String.t()}
| {:interrupt, variant :: :normal | :error, message :: String.t()}
| :other}

@typedoc """
Additional information about a completed evaluation.
Expand Down
5 changes: 5 additions & 0 deletions lib/livebook/runtime/evaluator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,11 @@ defmodule Livebook.Runtime.Evaluator do

metadata = %{
errored: elem(result, 0) == :error,
interrupted:
match?(
{:error, _kind, error, _stacktrace} when is_struct(error, Kino.InterruptError),
result
),
evaluation_time_ms: evaluation_time_ms,
memory_usage: memory(),
code_error: code_error,
Expand Down
3 changes: 3 additions & 0 deletions lib/livebook/runtime/evaluator/default_formatter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,8 @@ defmodule Livebook.Runtime.Evaluator.DefaultFormatter do
defp error_type(%System.EnvError{env: "LB_" <> secret_name}),
do: {:missing_secret, secret_name}

defp error_type(error) when is_struct(error, Kino.InterruptError),
do: {:interrupt, error.variant, error.message}

defp error_type(_), do: :other
end
110 changes: 59 additions & 51 deletions lib/livebook/session/data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,13 @@ defmodule Livebook.Session.Data do

@type session_mode :: :default | :app

# Note that technically the first state is :initial, but we always
# expect app to start evaluating right away, so distinguishing that
# state from :executing would not bring any value
@type app_status :: :executing | :executed | :error | :shutting_down | :deactivated
@type app_status :: %{
# Note that technically the first state is :initial, but we always
# expect app to start evaluating right away, so distinguishing that
# state from :executing would not bring any value
execution: :executing | :executed | :error,
lifecycle: :active | :shutting_down | :deactivated
}

@type app_data :: %{
status: app_status()
Expand Down Expand Up @@ -242,15 +245,23 @@ defmodule Livebook.Session.Data do

notebook = opts[:notebook]

notebook =
if opts[:mode] == :app do
Livebook.Notebook.clear_outputs(notebook)
else
notebook
end

default_runtime =
case opts[:mode] do
:app -> Livebook.Config.default_app_runtime()
_ -> Livebook.Config.default_runtime()
if opts[:mode] == :app do
Livebook.Config.default_app_runtime()
else
Livebook.Config.default_runtime()
end

app_data =
if opts[:mode] == :app do
%{status: :executing}
%{status: %{execution: :executing, lifecycle: :active}}
end

hub = Hubs.fetch_hub!(notebook.hub_id)
Expand Down Expand Up @@ -529,7 +540,6 @@ defmodule Livebook.Session.Data do
end)
|> maybe_connect_runtime(data)
|> update_validity_and_evaluation()
|> app_compute_status()
|> wrap_ok()
else
:error
Expand Down Expand Up @@ -573,7 +583,6 @@ defmodule Livebook.Session.Data do
|> update_validity_and_evaluation()
|> update_smart_cell_bases(data)
|> mark_dirty_if_persisting_outputs()
|> app_compute_status()
|> wrap_ok()
else
_ -> :error
Expand All @@ -600,7 +609,7 @@ defmodule Livebook.Session.Data do
|> with_actions()
|> clear_main_evaluation()
|> update_smart_cell_bases(data)
|> app_compute_status()
|> app_update_execution_status()
|> wrap_ok()
end

Expand All @@ -610,7 +619,7 @@ defmodule Livebook.Session.Data do
|> with_actions()
|> clear_section_evaluation(section)
|> update_smart_cell_bases(data)
|> app_compute_status()
|> app_update_execution_status()
|> wrap_ok()
end
end
Expand All @@ -622,7 +631,7 @@ defmodule Livebook.Session.Data do
|> with_actions()
|> cancel_cell_evaluation(cell, section)
|> update_smart_cell_bases(data)
|> app_compute_status()
|> app_update_execution_status()
|> wrap_ok()
else
_ -> :error
Expand Down Expand Up @@ -824,7 +833,6 @@ defmodule Livebook.Session.Data do
data
|> with_actions()
|> set_runtime(data, runtime)
|> app_compute_status()
|> wrap_ok()
end

Expand Down Expand Up @@ -1213,6 +1221,7 @@ defmodule Livebook.Session.Data do
eval_info
| status: :ready,
errored: metadata.errored,
interrupted: metadata.interrupted,
evaluation_time_ms: metadata.evaluation_time_ms,
identifiers_used: metadata.identifiers_used,
identifiers_defined: metadata.identifiers_defined,
Expand Down Expand Up @@ -1598,13 +1607,7 @@ defmodule Livebook.Session.Data do
defp erase_outputs({data, _} = data_actions) do
data_actions
|> clear_all_evaluation()
|> set!(
notebook:
Notebook.update_cells(data.notebook, fn
%{outputs: _outputs} = cell -> %{cell | outputs: []}
cell -> cell
end)
)
|> set!(notebook: Notebook.clear_outputs(data.notebook))
|> update_every_cell_info(fn
%{eval: _} = info ->
info = update_in(info.eval.outputs_batch_number, &(&1 + 1))
Expand Down Expand Up @@ -1809,18 +1812,19 @@ defmodule Livebook.Session.Data do

defp app_deactivate(data_actions) do
data_actions
|> set_app_data!(status: :deactivated)
|> update_app_data!(&put_in(&1.status.lifecycle, :deactivated))
|> add_action(:app_report_status)
end

defp app_shutdown(data_actions) do
data_actions
|> set_app_data!(status: :shutting_down)
|> update_app_data!(&put_in(&1.status.lifecycle, :shutting_down))
|> add_action(:app_report_status)
end

defp app_maybe_terminate({data, _} = data_actions) do
if data.mode == :app and data.app_data.status == :shutting_down and data.clients_map == %{} do
if data.mode == :app and data.app_data.status.lifecycle == :shutting_down and
data.clients_map == %{} do
add_action(data_actions, :app_terminate)
else
data_actions
Expand Down Expand Up @@ -2009,6 +2013,7 @@ defmodule Livebook.Session.Data do
validity: :fresh,
status: :ready,
errored: false,
interrupted: false,
evaluation_digest: nil,
evaluation_time_ms: nil,
evaluation_start: nil,
Expand Down Expand Up @@ -2083,13 +2088,8 @@ defmodule Livebook.Session.Data do
Enum.all?(attrs, fn {key, _} -> Map.has_key?(struct, key) end)
end

defp set_app_data!({data, _} = data_actions, changes) do
app_data =
Enum.reduce(changes, data.app_data, fn {key, value}, app_data ->
Map.replace!(app_data, key, value)
end)

set!(data_actions, app_data: app_data)
defp update_app_data!({data, _} = data_actions, fun) do
set!(data_actions, app_data: fun.(data.app_data))
end

@doc """
Expand Down Expand Up @@ -2157,12 +2157,14 @@ defmodule Livebook.Session.Data do
data_actions
|> compute_snapshots()
|> update_validity()
|> app_update_execution_status()
|> update_reevaluates_automatically()
# After updating validity there may be new stale cells, so we check
# if any of them is configured for automatic reevaluation
# if any of them should be automatically reevaluated
|> maybe_queue_reevaluating_cells()
|> queue_prerequisite_cells_evaluation_for_queued()
|> maybe_evaluate_queued()
|> app_update_execution_status()
end

defp compute_snapshots({data, _} = data_actions) do
Expand Down Expand Up @@ -2301,6 +2303,9 @@ defmodule Livebook.Session.Data do
end)
end

defp update_reevaluates_automatically({data, _} = data_actions) when data.mode == :app,
do: data_actions

defp update_reevaluates_automatically({data, _} = data_actions) do
eval_parents = cell_evaluation_parents(data)

Expand Down Expand Up @@ -2339,7 +2344,18 @@ defmodule Livebook.Session.Data do
|> Notebook.evaluable_cells_with_section()
|> Enum.filter(fn {cell, _section} ->
info = data.cell_infos[cell.id]
match?(%{status: :ready, validity: :stale, reevaluates_automatically: true}, info.eval)

case data.mode do
:default ->
match?(
%{status: :ready, validity: :stale, reevaluates_automatically: true},
info.eval
)

:app ->
match?(%{status: :ready, validity: :stale}, info.eval) and
data.app_data.status.execution in [:executing, :executed]
end
end)

cell_ids = for {cell, _section} <- cells_to_reevaluate, do: cell.id
Expand All @@ -2351,43 +2367,43 @@ defmodule Livebook.Session.Data do
end)
end

defp app_compute_status({data, _} = data_actions)
defp app_update_execution_status({data, _} = data_actions)
when data.mode != :app,
do: data_actions

defp app_compute_status({data, _} = data_actions)
when data.app_data.status in [:shutting_down, :deactivated],
do: data_actions

defp app_compute_status({data, _} = data_actions) do
status =
defp app_update_execution_status({data, _} = data_actions) do
execution_status =
data.notebook
|> Notebook.evaluable_cells_with_section()
|> Enum.find_value(:executed, fn {cell, _section} ->
case data.cell_infos[cell.id].eval do
%{validity: :aborted} -> :error
%{interrupted: true} -> :interrupted
%{errored: true} -> :error
%{validity: :fresh} -> :executing
%{status: :evaluating} -> :executing
%{status: :queued} -> :executing
_ -> nil
end
end)

data_actions =
if data.app_data.status == status do
if data.app_data.status.execution == execution_status do
data_actions
else
add_action(data_actions, :app_report_status)
end

# If everything was executed and an error happened, it means it
# was a runtime crash and everything is aborted
data_actions =
if data.app_data.status == :executed and status == :error do
if data.app_data.status.execution == :executed and execution_status == :error do
add_action(data_actions, :app_recover)
else
data_actions
end

set_app_data!(data_actions, status: status)
update_app_data!(data_actions, &put_in(&1.status.execution, execution_status))
end

@doc """
Expand Down Expand Up @@ -2566,12 +2582,4 @@ defmodule Livebook.Session.Data do
secret.value
end
end

@doc """
Checks if the app should be accessible and accepts new clients.
"""
@spec app_active?(app_status()) :: boolean()
def app_active?(app_status) do
app_status not in [:deactivated, :shutting_down]
end
end
Loading