diff --git a/lib/quantum/clock_broadcaster.ex b/lib/quantum/clock_broadcaster.ex index a2f9715..f477d3e 100644 --- a/lib/quantum/clock_broadcaster.ex +++ b/lib/quantum/clock_broadcaster.ex @@ -35,29 +35,41 @@ defmodule Quantum.ClockBroadcaster do :unknown -> start_time date -> date end + |> NaiveDateTime.truncate(:second) + # Roll back one second since handle_tick will start at `now + 1`. + |> NaiveDateTime.add(-1, :second) + + :timer.send_interval(1000, :tick) {:producer, %State{ - time: %{start_time | microsecond: {0, 0}}, + time: start_time, debug_logging: debug_logging, - remaining_demand: 0, - timer: nil + remaining_demand: 0 }} end @impl GenStage - def handle_demand( - demand, - %State{remaining_demand: remaining_demand, time: time, timer: nil} = state - ) - when demand > 0 do - expected_event_count = demand + remaining_demand + def handle_demand(demand, %State{remaining_demand: remaining_demand} = state) do + handle_tick(%State{state | remaining_demand: remaining_demand + demand}) + end + + @impl GenStage + def handle_info(:tick, state) do + handle_tick(state) + end + + defp handle_tick(%State{remaining_demand: 0} = state) do + {:noreply, [], state} + end - now = NaiveDateTime.utc_now() + defp handle_tick(%State{remaining_demand: remaining_demand, time: time} = state) + when remaining_demand > 0 do + now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second) {events, new_time} = Enum.reduce_while( - 1..expected_event_count, + 1..remaining_demand, {[], time}, fn _, {list, time} = acc -> new_time = NaiveDateTime.add(time, 1, :second) @@ -66,7 +78,10 @@ defmodule Quantum.ClockBroadcaster do :lt -> {:cont, {[%Event{time: new_time, catch_up: true} | list], new_time}} - _ -> + :eq -> + {:cont, {[%Event{time: new_time, catch_up: false} | list], new_time}} + + :gt -> {:halt, acc} end end @@ -74,66 +89,13 @@ defmodule Quantum.ClockBroadcaster do events = Enum.reverse(events) - new_remaining_demand = expected_event_count - Enum.count(events) + new_remaining_demand = remaining_demand - Enum.count(events) if remaining_demand > 0 and new_remaining_demand == 0 do log_catched_up(state) end - new_timer = - if new_remaining_demand > 0 do - schedule_next_event_timer(new_time, now) - end - - {:noreply, events, - %{state | time: new_time, remaining_demand: new_remaining_demand, timer: new_timer}} - end - - def handle_demand(demand, %State{timer: timer} = state) do - Process.cancel_timer(timer) - handle_demand(demand, %{state | timer: nil}) - end - - @impl GenStage - def handle_info(:ping, %State{remaining_demand: 0} = state) do - {:noreply, [], state} - end - - def handle_info(:ping, %State{time: time, remaining_demand: remaining_demand} = state) - when remaining_demand > 0 do - now = NaiveDateTime.utc_now() - new_time = NaiveDateTime.add(time, 1, :second) - - case NaiveDateTime.compare(new_time, now) do - :lt -> - timer = schedule_next_event_timer(new_time, now) - - {:noreply, [%Event{time: new_time, catch_up: false}], - %{state | time: new_time, timer: timer}} - - _ -> - warn_event_too_early() - - timer = schedule_next_event_timer(time, now) - - {:noreply, [], %{state | timer: timer}} - end - end - - defp schedule_next_event_timer(time, now) do - next_event_diff = - %{time | microsecond: {0, 0}} - |> NaiveDateTime.add(1, :second) - |> NaiveDateTime.diff(now, :millisecond) - - next_event_diff = - if next_event_diff < 0 do - 0 - else - next_event_diff - end - - Process.send_after(self(), :ping, next_event_diff) + {:noreply, events, %State{state | time: new_time, remaining_demand: new_remaining_demand}} end defp log_catched_up(%State{debug_logging: false}), do: :ok @@ -143,10 +105,4 @@ defmodule Quantum.ClockBroadcaster do Logger.debug(fn -> "[#{inspect(Node.self())}][#{__MODULE__}] Clock Producer catched up with past times and is now running in normal time" end) - - defp warn_event_too_early, - do: - Logger.warn(fn -> - "[#{inspect(Node.self())}][#{__MODULE__}] Clock Producer received a too early ping event, rescheduling" - end) end diff --git a/lib/quantum/clock_broadcaster/state.ex b/lib/quantum/clock_broadcaster/state.ex index c5dec9b..7ec1574 100644 --- a/lib/quantum/clock_broadcaster/state.ex +++ b/lib/quantum/clock_broadcaster/state.ex @@ -6,12 +6,10 @@ defmodule Quantum.ClockBroadcaster.State do @type t :: %__MODULE__{ debug_logging: boolean(), time: NaiveDateTime.t(), - # catch_up: boolean(), - remaining_demand: non_neg_integer, - timer: reference | nil + remaining_demand: non_neg_integer } - @enforce_keys [:debug_logging, :time, :remaining_demand, :timer] + @enforce_keys [:debug_logging, :time, :remaining_demand] defstruct @enforce_keys end