Skip to content

Commit

Permalink
Reduce data in SnapshoRequested event
Browse files Browse the repository at this point in the history
The pruning of the transactions can be done in the
events aggregation function instead of in the update
function so we do not need to carry transactions and
utxo in the event anymore
  • Loading branch information
pgrange committed Jul 27, 2023
1 parent fd7d16e commit 07dd9cf
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 59 deletions.
114 changes: 62 additions & 52 deletions hydra-node/src/Hydra/HeadLogic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,12 @@ onOpenNetworkReqSn env ledger st otherParty sn requestedTxIds =

-- Spec: require s = ŝ + 1 and leader(s) = j
requireReqSn $
-- Spec: wait s̅ = ŝ

-- Spec: wait s̅ = ŝ
waitNoSnapshotInFlight $
-- Spec: wait ∀h ∈ Treq : (h, ·) ∈ Tall

-- Spec: wait ∀h ∈ Treq : (h, ·) ∈ Tall
waitResolvableTxs $ do
-- Spec: Treq ← {Tall [h] | h ∈ Treq#}
Expand All @@ -381,19 +385,13 @@ onOpenNetworkReqSn env ledger st otherParty sn requestedTxIds =
let nextSnapshot = Snapshot (confSn + 1) u requestedTxIds
-- Spec: σᵢ
let snapshotSignature = sign signingKey nextSnapshot
(Effects [NetworkEffect $ AckSn snapshotSignature sn] `Combined`) $ do
-- Spec: for loop which updates T̂ and L̂
let (localTxs', localUTxO') = pruneTransactions u
-- TODO: where to put this spec comment now?
-- Spec: Tall ← {tx | ∀tx ∈ Tall : tx ∉ Treq }
StateChanged
( SnapshotAppliedToLocalUTxO
{ snapshot = nextSnapshot
, txs = localTxs'
, utxo = localUTxO'
, requestedTxIds
}
)
(Effects [NetworkEffect $ AckSn snapshotSignature sn] `Combined`) $
do
-- TODO: unclear where to put these comments
-- Spec: for loop which updates T̂ and L̂
-- TODO: where to put this spec comment now?
-- Spec: Tall ← {tx | ∀tx ∈ Tall : tx ∉ Treq }
StateChanged SnapshotRequested{snapshot = nextSnapshot, requestedTxIds}
where
requireReqSn continue
| sn /= seenSn + 1 =
Expand Down Expand Up @@ -425,18 +423,6 @@ onOpenNetworkReqSn env ledger st otherParty sn requestedTxIds =
Error $ RequireFailed $ SnapshotDoesNotApply sn (txId tx) err
Right u -> cont u

pruneTransactions utxo = do
foldr go ([], utxo) localTxs
where
go tx (txs, u) =
-- XXX: We prune transactions on any error, while only some of them are
-- actually expected.
-- For example: `OutsideValidityIntervalUTxO` ledger errors are expected
-- here when a tx becomes invalid.
case applyTransactions ledger currentSlot u [tx] of
Left (_, _) -> (txs, u)
Right u' -> (txs <> [tx], u')

confSn = case confirmedSnapshot of
InitialSnapshot{} -> 0
ConfirmedSnapshot{snapshot = Snapshot{number}} -> number
Expand All @@ -447,7 +433,7 @@ onOpenNetworkReqSn env ledger st otherParty sn requestedTxIds =
InitialSnapshot{initialUTxO} -> initialUTxO
ConfirmedSnapshot{snapshot = Snapshot{utxo}} -> utxo

CoordinatedHeadState{confirmedSnapshot, seenSnapshot, localTxs, allTxs} = coordinatedHeadState
CoordinatedHeadState{confirmedSnapshot, seenSnapshot, allTxs} = coordinatedHeadState

OpenState{parameters, coordinatedHeadState, currentSlot} = st

Expand Down Expand Up @@ -739,8 +725,8 @@ update env ledger st ev = case (st, ev) of
-- * HeadState aggregate

-- | Reflect 'StateChanged' events onto the 'HeadState' aggregate.
aggregate :: (IsChainState tx) => HeadState tx -> StateChanged tx -> HeadState tx
aggregate st = \case
aggregate :: (IsChainState tx) => Ledger tx -> HeadState tx -> StateChanged tx -> HeadState tx
aggregate ledger st = \case
HeadInitialized{parameters = parameters@HeadParameters{parties}, headId, chainState} ->
Initial
InitialState
Expand All @@ -765,6 +751,20 @@ aggregate st = \case
newCommitted = Map.insert party committedUTxO committed
remainingParties = Set.delete party pendingCommits
_otherState -> st
TransactionReceived{tx} ->
case st of
Open os@OpenState{coordinatedHeadState} ->
Open
os
{ coordinatedHeadState =
-- Spec: Tall ← ̂Tall ∪ { (hash(tx), tx) }
coordinatedHeadState
{ allTxs = allTxs <> fromList [(txId tx, tx)]
}
}
where
CoordinatedHeadState{allTxs} = coordinatedHeadState
_otherState -> st
TransactionAppliedToLocalUTxO{tx, utxo} ->
case st of
Open os@OpenState{coordinatedHeadState} ->
Expand Down Expand Up @@ -796,21 +796,35 @@ aggregate st = \case
where
CoordinatedHeadState{seenSnapshot} = coordinatedHeadState
_otherState -> st
SnapshotAppliedToLocalUTxO{snapshot, txs, utxo, requestedTxIds} ->
SnapshotRequested{snapshot, requestedTxIds} ->
case st of
Open os@OpenState{coordinatedHeadState} ->
Open os@OpenState{coordinatedHeadState, currentSlot} ->
Open
os
{ coordinatedHeadState =
coordinatedHeadState
{ seenSnapshot = SeenSnapshot snapshot mempty
, localTxs = txs
, localUTxO = utxo
, localTxs = localTxs'
, localUTxO = localUTxO'
, allTxs = foldr Map.delete allTxs requestedTxIds
}
}
where
CoordinatedHeadState{allTxs} = coordinatedHeadState
(localTxs', localUTxO') = pruneTransactions snapshotUtxo
Snapshot{utxo = snapshotUtxo} = snapshot

CoordinatedHeadState{allTxs, localTxs} = coordinatedHeadState
pruneTransactions utxo = do
foldr go ([], utxo) localTxs
where
go tx (txs, u) =
-- XXX: We prune transactions on any error, while only some of them are
-- actually expected.
-- For example: `OutsideValidityIntervalUTxO` ledger errors are expected
-- here when a tx becomes invalid.
case applyTransactions ledger currentSlot u [tx] of
Left (_, _) -> (txs, u)
Right u' -> (txs <> [tx], u')
_otherState -> st
HeadAborted{chainState} -> Idle $ IdleState{chainState}
HeadClosed{chainState, contestationDeadline} ->
Expand Down Expand Up @@ -857,20 +871,6 @@ aggregate st = \case
, currentSlot = chainStateSlot chainState
}
_otherState -> st
TransactionReceived{tx} ->
case st of
Open os@OpenState{coordinatedHeadState} ->
Open
os
{ coordinatedHeadState =
-- Spec: Tall ← ̂Tall ∪ { (hash(tx), tx) }
coordinatedHeadState
{ allTxs = allTxs <> fromList [(txId tx, tx)]
}
}
where
CoordinatedHeadState{allTxs} = coordinatedHeadState
_otherState -> st
SnapshotHasBeenConfirmed{snapshot, signatures} ->
case st of
Open os@OpenState{coordinatedHeadState} ->
Expand Down Expand Up @@ -910,9 +910,14 @@ aggregate st = \case
Open ost@OpenState{} -> Open ost{currentSlot = chainSlot}
_otherState -> st

aggregateState :: IsChainState tx => HeadState tx -> Outcome tx -> HeadState tx
aggregateState s outcome =
recoverState s $ collectStateChanged outcome
aggregateState ::
IsChainState tx =>
Ledger tx ->
HeadState tx ->
Outcome tx ->
HeadState tx
aggregateState ledger s outcome =
recoverState ledger s $ collectStateChanged outcome
where
collectStateChanged = \case
NoOutcome -> []
Expand All @@ -923,5 +928,10 @@ aggregateState s outcome =
Combined l r ->
collectStateChanged l <> collectStateChanged r

recoverState :: IsChainState tx => HeadState tx -> [StateChanged tx] -> HeadState tx
recoverState = foldl' aggregate
recoverState ::
(Foldable t, IsChainState tx) =>
Ledger tx ->
HeadState tx ->
t (StateChanged tx) ->
HeadState tx
recoverState ledger = foldl' (aggregate ledger)
4 changes: 1 addition & 3 deletions hydra-node/src/Hydra/HeadLogic/Outcome.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ data StateChanged tx
, utxo :: UTxOType tx
}
| SnapshotRequestDecided {snapshotNumber :: SnapshotNumber}
| SnapshotAppliedToLocalUTxO
| SnapshotRequested
{ snapshot :: Snapshot tx
, txs :: [tx]
, utxo :: UTxOType tx
, requestedTxIds :: [TxIdType tx]
}
| HeadAborted {chainState :: ChainStateType tx}
Expand Down
6 changes: 3 additions & 3 deletions hydra-node/src/Hydra/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ stepHydraNode tracer node = do
StateChanged sc -> do
-- TODO: We should not need to query the head state here
s <- atomically queryHeadState
save $ aggregate s sc
save $ aggregate ledger s sc
Effects _ -> pure ()
Combined l r -> handleOutcome e l >> handleOutcome e r

Expand All @@ -136,7 +136,7 @@ stepHydraNode tracer node = do

NodeState{queryHeadState} = nodeState

HydraNode{persistence, eq, env, nodeState} = node
HydraNode{persistence, eq, env, nodeState, ledger} = node

-- | The time to wait between re-enqueuing a 'Wait' outcome from 'HeadLogic'.
waitDelay :: DiffTime
Expand All @@ -151,7 +151,7 @@ processNextEvent ::
processNextEvent HydraNode{nodeState, ledger, env} e =
modifyHeadState $ \s ->
let outcome = Logic.update env ledger s e
in (outcome, aggregateState s outcome)
in (outcome, aggregateState ledger s outcome)
where
NodeState{modifyHeadState} = nodeState

Expand Down
2 changes: 1 addition & 1 deletion hydra-node/test/Hydra/HeadLogicSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ step ::
step event = do
StepState{headState, env, ledger} <- get
let outcome = update env ledger headState event
let headState' = aggregateState headState $ outcome
let headState' = aggregateState ledger headState outcome
put StepState{env, ledger, headState = headState'}
pure outcome

Expand Down

0 comments on commit 07dd9cf

Please sign in to comment.