From 07dd9cf7b0d056f92cc46e4f6a2c96f70af9fa6f Mon Sep 17 00:00:00 2001 From: Pascal Grange Date: Thu, 27 Jul 2023 09:48:47 +0200 Subject: [PATCH] Reduce data in SnapshoRequested event The pruning of the transactions can be done in the events aggregation function instead of in the update function so we do not need to carry transactions and utxo in the event anymore --- hydra-node/src/Hydra/HeadLogic.hs | 114 ++++++++++++---------- hydra-node/src/Hydra/HeadLogic/Outcome.hs | 4 +- hydra-node/src/Hydra/Node.hs | 6 +- hydra-node/test/Hydra/HeadLogicSpec.hs | 2 +- 4 files changed, 67 insertions(+), 59 deletions(-) diff --git a/hydra-node/src/Hydra/HeadLogic.hs b/hydra-node/src/Hydra/HeadLogic.hs index eda7eddab6b..e4805f8d882 100644 --- a/hydra-node/src/Hydra/HeadLogic.hs +++ b/hydra-node/src/Hydra/HeadLogic.hs @@ -369,8 +369,12 @@ onOpenNetworkReqSn env ledger st otherParty sn requestedTxIds = -- Spec: require s = ŝ + 1 and leader(s) = j requireReqSn $ + -- Spec: wait s̅ = ŝ + -- Spec: wait s̅ = ŝ waitNoSnapshotInFlight $ + -- Spec: wait ∀h ∈ Treq : (h, ·) ∈ Tall + -- Spec: wait ∀h ∈ Treq : (h, ·) ∈ Tall waitResolvableTxs $ do -- Spec: Treq ← {Tall [h] | h ∈ Treq#} @@ -381,19 +385,13 @@ onOpenNetworkReqSn env ledger st otherParty sn requestedTxIds = let nextSnapshot = Snapshot (confSn + 1) u requestedTxIds -- Spec: σᵢ let snapshotSignature = sign signingKey nextSnapshot - (Effects [NetworkEffect $ AckSn snapshotSignature sn] `Combined`) $ do - -- Spec: for loop which updates T̂ and L̂ - let (localTxs', localUTxO') = pruneTransactions u - -- TODO: where to put this spec comment now? - -- Spec: Tall ← {tx | ∀tx ∈ Tall : tx ∉ Treq } - StateChanged - ( SnapshotAppliedToLocalUTxO - { snapshot = nextSnapshot - , txs = localTxs' - , utxo = localUTxO' - , requestedTxIds - } - ) + (Effects [NetworkEffect $ AckSn snapshotSignature sn] `Combined`) $ + do + -- TODO: unclear where to put these comments + -- Spec: for loop which updates T̂ and L̂ + -- TODO: where to put this spec comment now? + -- Spec: Tall ← {tx | ∀tx ∈ Tall : tx ∉ Treq } + StateChanged SnapshotRequested{snapshot = nextSnapshot, requestedTxIds} where requireReqSn continue | sn /= seenSn + 1 = @@ -425,18 +423,6 @@ onOpenNetworkReqSn env ledger st otherParty sn requestedTxIds = Error $ RequireFailed $ SnapshotDoesNotApply sn (txId tx) err Right u -> cont u - pruneTransactions utxo = do - foldr go ([], utxo) localTxs - where - go tx (txs, u) = - -- XXX: We prune transactions on any error, while only some of them are - -- actually expected. - -- For example: `OutsideValidityIntervalUTxO` ledger errors are expected - -- here when a tx becomes invalid. - case applyTransactions ledger currentSlot u [tx] of - Left (_, _) -> (txs, u) - Right u' -> (txs <> [tx], u') - confSn = case confirmedSnapshot of InitialSnapshot{} -> 0 ConfirmedSnapshot{snapshot = Snapshot{number}} -> number @@ -447,7 +433,7 @@ onOpenNetworkReqSn env ledger st otherParty sn requestedTxIds = InitialSnapshot{initialUTxO} -> initialUTxO ConfirmedSnapshot{snapshot = Snapshot{utxo}} -> utxo - CoordinatedHeadState{confirmedSnapshot, seenSnapshot, localTxs, allTxs} = coordinatedHeadState + CoordinatedHeadState{confirmedSnapshot, seenSnapshot, allTxs} = coordinatedHeadState OpenState{parameters, coordinatedHeadState, currentSlot} = st @@ -739,8 +725,8 @@ update env ledger st ev = case (st, ev) of -- * HeadState aggregate -- | Reflect 'StateChanged' events onto the 'HeadState' aggregate. -aggregate :: (IsChainState tx) => HeadState tx -> StateChanged tx -> HeadState tx -aggregate st = \case +aggregate :: (IsChainState tx) => Ledger tx -> HeadState tx -> StateChanged tx -> HeadState tx +aggregate ledger st = \case HeadInitialized{parameters = parameters@HeadParameters{parties}, headId, chainState} -> Initial InitialState @@ -765,6 +751,20 @@ aggregate st = \case newCommitted = Map.insert party committedUTxO committed remainingParties = Set.delete party pendingCommits _otherState -> st + TransactionReceived{tx} -> + case st of + Open os@OpenState{coordinatedHeadState} -> + Open + os + { coordinatedHeadState = + -- Spec: Tall ← ̂Tall ∪ { (hash(tx), tx) } + coordinatedHeadState + { allTxs = allTxs <> fromList [(txId tx, tx)] + } + } + where + CoordinatedHeadState{allTxs} = coordinatedHeadState + _otherState -> st TransactionAppliedToLocalUTxO{tx, utxo} -> case st of Open os@OpenState{coordinatedHeadState} -> @@ -796,21 +796,35 @@ aggregate st = \case where CoordinatedHeadState{seenSnapshot} = coordinatedHeadState _otherState -> st - SnapshotAppliedToLocalUTxO{snapshot, txs, utxo, requestedTxIds} -> + SnapshotRequested{snapshot, requestedTxIds} -> case st of - Open os@OpenState{coordinatedHeadState} -> + Open os@OpenState{coordinatedHeadState, currentSlot} -> Open os { coordinatedHeadState = coordinatedHeadState { seenSnapshot = SeenSnapshot snapshot mempty - , localTxs = txs - , localUTxO = utxo + , localTxs = localTxs' + , localUTxO = localUTxO' , allTxs = foldr Map.delete allTxs requestedTxIds } } where - CoordinatedHeadState{allTxs} = coordinatedHeadState + (localTxs', localUTxO') = pruneTransactions snapshotUtxo + Snapshot{utxo = snapshotUtxo} = snapshot + + CoordinatedHeadState{allTxs, localTxs} = coordinatedHeadState + pruneTransactions utxo = do + foldr go ([], utxo) localTxs + where + go tx (txs, u) = + -- XXX: We prune transactions on any error, while only some of them are + -- actually expected. + -- For example: `OutsideValidityIntervalUTxO` ledger errors are expected + -- here when a tx becomes invalid. + case applyTransactions ledger currentSlot u [tx] of + Left (_, _) -> (txs, u) + Right u' -> (txs <> [tx], u') _otherState -> st HeadAborted{chainState} -> Idle $ IdleState{chainState} HeadClosed{chainState, contestationDeadline} -> @@ -857,20 +871,6 @@ aggregate st = \case , currentSlot = chainStateSlot chainState } _otherState -> st - TransactionReceived{tx} -> - case st of - Open os@OpenState{coordinatedHeadState} -> - Open - os - { coordinatedHeadState = - -- Spec: Tall ← ̂Tall ∪ { (hash(tx), tx) } - coordinatedHeadState - { allTxs = allTxs <> fromList [(txId tx, tx)] - } - } - where - CoordinatedHeadState{allTxs} = coordinatedHeadState - _otherState -> st SnapshotHasBeenConfirmed{snapshot, signatures} -> case st of Open os@OpenState{coordinatedHeadState} -> @@ -910,9 +910,14 @@ aggregate st = \case Open ost@OpenState{} -> Open ost{currentSlot = chainSlot} _otherState -> st -aggregateState :: IsChainState tx => HeadState tx -> Outcome tx -> HeadState tx -aggregateState s outcome = - recoverState s $ collectStateChanged outcome +aggregateState :: + IsChainState tx => + Ledger tx -> + HeadState tx -> + Outcome tx -> + HeadState tx +aggregateState ledger s outcome = + recoverState ledger s $ collectStateChanged outcome where collectStateChanged = \case NoOutcome -> [] @@ -923,5 +928,10 @@ aggregateState s outcome = Combined l r -> collectStateChanged l <> collectStateChanged r -recoverState :: IsChainState tx => HeadState tx -> [StateChanged tx] -> HeadState tx -recoverState = foldl' aggregate +recoverState :: + (Foldable t, IsChainState tx) => + Ledger tx -> + HeadState tx -> + t (StateChanged tx) -> + HeadState tx +recoverState ledger = foldl' (aggregate ledger) diff --git a/hydra-node/src/Hydra/HeadLogic/Outcome.hs b/hydra-node/src/Hydra/HeadLogic/Outcome.hs index fa198eae012..9beba1e3815 100644 --- a/hydra-node/src/Hydra/HeadLogic/Outcome.hs +++ b/hydra-node/src/Hydra/HeadLogic/Outcome.hs @@ -58,10 +58,8 @@ data StateChanged tx , utxo :: UTxOType tx } | SnapshotRequestDecided {snapshotNumber :: SnapshotNumber} - | SnapshotAppliedToLocalUTxO + | SnapshotRequested { snapshot :: Snapshot tx - , txs :: [tx] - , utxo :: UTxOType tx , requestedTxIds :: [TxIdType tx] } | HeadAborted {chainState :: ChainStateType tx} diff --git a/hydra-node/src/Hydra/Node.hs b/hydra-node/src/Hydra/Node.hs index 29b1f4c7051..7c30f509d26 100644 --- a/hydra-node/src/Hydra/Node.hs +++ b/hydra-node/src/Hydra/Node.hs @@ -119,7 +119,7 @@ stepHydraNode tracer node = do StateChanged sc -> do -- TODO: We should not need to query the head state here s <- atomically queryHeadState - save $ aggregate s sc + save $ aggregate ledger s sc Effects _ -> pure () Combined l r -> handleOutcome e l >> handleOutcome e r @@ -136,7 +136,7 @@ stepHydraNode tracer node = do NodeState{queryHeadState} = nodeState - HydraNode{persistence, eq, env, nodeState} = node + HydraNode{persistence, eq, env, nodeState, ledger} = node -- | The time to wait between re-enqueuing a 'Wait' outcome from 'HeadLogic'. waitDelay :: DiffTime @@ -151,7 +151,7 @@ processNextEvent :: processNextEvent HydraNode{nodeState, ledger, env} e = modifyHeadState $ \s -> let outcome = Logic.update env ledger s e - in (outcome, aggregateState s outcome) + in (outcome, aggregateState ledger s outcome) where NodeState{modifyHeadState} = nodeState diff --git a/hydra-node/test/Hydra/HeadLogicSpec.hs b/hydra-node/test/Hydra/HeadLogicSpec.hs index ea47e3b66ec..5aea06e19a9 100644 --- a/hydra-node/test/Hydra/HeadLogicSpec.hs +++ b/hydra-node/test/Hydra/HeadLogicSpec.hs @@ -637,7 +637,7 @@ step :: step event = do StepState{headState, env, ledger} <- get let outcome = update env ledger headState event - let headState' = aggregateState headState $ outcome + let headState' = aggregateState ledger headState outcome put StepState{env, ledger, headState = headState'} pure outcome