diff --git a/chainntnfs/txnotifier.go b/chainntnfs/txnotifier.go index 71b9c929df..12366abf95 100644 --- a/chainntnfs/txnotifier.go +++ b/chainntnfs/txnotifier.go @@ -1757,10 +1757,6 @@ func (n *TxNotifier) NotifyHeight(height uint32) error { for ntfn := range n.ntfnsByConfirmHeight[height] { confSet := n.confNotifications[ntfn.ConfRequest] - Log.Debugf("Dispatching %v confirmation notification for "+ - "conf_id=%v, %v", ntfn.NumConfirmations, ntfn.ConfID, - ntfn.ConfRequest) - // The default notification we assigned above includes the // block along with the rest of the details. However not all // clients want the block, so we make a copy here w/o the block @@ -1770,6 +1766,20 @@ func (n *TxNotifier) NotifyHeight(height uint32) error { confDetails.Block = nil } + // If the `confDetails` has already been sent before, we'll + // skip it and continue processing the next one. + if ntfn.dispatched { + Log.Debugf("Skipped dispatched conf details for "+ + "request %v conf_id=%v", ntfn.ConfRequest, + ntfn.ConfID) + + continue + } + + Log.Debugf("Dispatching %v confirmation notification for "+ + "conf_id=%v, %v", ntfn.NumConfirmations, ntfn.ConfID, + ntfn.ConfRequest) + select { case ntfn.Event.Confirmed <- &confDetails: ntfn.dispatched = true diff --git a/chanrestore.go b/chanrestore.go index 5b221c105a..a041f571a8 100644 --- a/chanrestore.go +++ b/chanrestore.go @@ -8,6 +8,7 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/chanbackup" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" @@ -286,6 +287,9 @@ func (c *chanDBRestorer) RestoreChansFromSingles(backups ...chanbackup.Single) e ltndLog.Infof("Informing chain watchers of new restored channels") + // Create a slice of channel points. + chanPoints := make([]wire.OutPoint, 0, len(channelShells)) + // Finally, we'll need to inform the chain arbitrator of these new // channels so we'll properly watch for their ultimate closure on chain // and sweep them via the DLP. @@ -294,8 +298,15 @@ func (c *chanDBRestorer) RestoreChansFromSingles(backups ...chanbackup.Single) e if err != nil { return err } + + chanPoints = append( + chanPoints, restoredChannel.Chan.FundingOutpoint, + ) } + // With all the channels restored, we'll now re-send the blockbeat. + c.chainArb.RedispatchBlockbeat(chanPoints) + return nil } @@ -314,7 +325,7 @@ func (s *server) ConnectPeer(nodePub *btcec.PublicKey, addrs []net.Addr) error { // to ensure the new connection is created after this new link/channel // is known. if err := s.DisconnectPeer(nodePub); err != nil { - ltndLog.Infof("Peer(%v) is already connected, proceeding "+ + ltndLog.Infof("Peer(%x) is already connected, proceeding "+ "with chan restore", nodePub.SerializeCompressed()) } diff --git a/contractcourt/briefcase.go b/contractcourt/briefcase.go index 7d199c5c28..3e58147c61 100644 --- a/contractcourt/briefcase.go +++ b/contractcourt/briefcase.go @@ -249,6 +249,15 @@ func (a ArbitratorState) String() string { } } +// IsContractClosed returns a bool to indicate whether the closing/breaching tx +// has been confirmed onchain. If the state is StateContractClosed, +// StateWaitingFullResolution, or StateFullyResolved, it means the contract has +// been closed and all related contracts have been launched. +func (a ArbitratorState) IsContractClosed() bool { + return a == StateContractClosed || a == StateWaitingFullResolution || + a == StateFullyResolved +} + // resolverType is an enum that enumerates the various types of resolvers. When // writing resolvers to disk, we prepend this to the raw bytes stored. This // allows us to properly decode the resolver into the proper type. diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 011b5225cd..83ae56c7a6 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -578,9 +578,6 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error { // Set the current beat. c.beat = beat - log.Infof("ChainArbitrator starting at height %d with budget=[%v]", - &c.cfg.Budget, c.beat.Height()) - // First, we'll fetch all the channels that are still open, in order to // collect them within our set of active contracts. if err := c.loadOpenChannels(); err != nil { @@ -690,6 +687,11 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error { c.dispatchBlocks() }() + log.Infof("ChainArbitrator starting at height %d with %d chain "+ + "watchers, %d channel arbitrators, and budget config=[%v]", + c.beat.Height(), len(c.activeWatchers), len(c.activeChannels), + &c.cfg.Budget) + // TODO(roasbeef): eventually move all breach watching here return nil @@ -727,19 +729,34 @@ func (c *ChainArbitrator) handleBlockbeat(beat chainio.Blockbeat) { // Create a slice to record active channel arbitrator. channels := make([]chainio.Consumer, 0, len(c.activeChannels)) + watchers := make([]chainio.Consumer, 0, len(c.activeWatchers)) // Copy the active channels to the slice. for _, channel := range c.activeChannels { channels = append(channels, channel) } + for _, watcher := range c.activeWatchers { + watchers = append(watchers, watcher) + } + c.Unlock() + // Iterate all the copied watchers and send the blockbeat to them. + err := chainio.DispatchConcurrent(beat, watchers) + if err != nil { + log.Errorf("Notify blockbeat for chainWatcher failed: %v", err) + } + // Iterate all the copied channels and send the blockbeat to them. // // NOTE: This method will timeout if the processing of blocks of the // subsystems is too long (60s). - err := chainio.DispatchConcurrent(beat, channels) + err = chainio.DispatchConcurrent(beat, channels) + if err != nil { + log.Errorf("Notify blockbeat for ChannelArbitrator failed: %v", + err) + } // Notify the chain arbitrator has processed the block. c.NotifyBlockProcessed(beat, err) @@ -1046,8 +1063,8 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error chanPoint := newChan.FundingOutpoint - log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)", - chanPoint) + log.Infof("Creating new chainWatcher and ChannelArbitrator for "+ + "ChannelPoint(%v)", chanPoint) // If we're already watching this channel, then we'll ignore this // request. @@ -1356,3 +1373,43 @@ func (c *ChainArbitrator) loadPendingCloseChannels() error { return nil } + +// RedispatchBlockbeat resends the current blockbeat to the channels specified +// by the chanPoints. It is used when a channel is added to the chain +// arbitrator after it has been started, e.g., during the channel restore +// process. +func (c *ChainArbitrator) RedispatchBlockbeat(chanPoints []wire.OutPoint) { + // Get the current blockbeat. + beat := c.beat + + // Prepare two sets of consumers. + channels := make([]chainio.Consumer, 0, len(chanPoints)) + watchers := make([]chainio.Consumer, 0, len(chanPoints)) + + // Read the active channels in a lock. + c.Lock() + for _, op := range chanPoints { + if channel, ok := c.activeChannels[op]; ok { + channels = append(channels, channel) + } + + if watcher, ok := c.activeWatchers[op]; ok { + watchers = append(watchers, watcher) + } + } + c.Unlock() + + // Iterate all the copied watchers and send the blockbeat to them. + err := chainio.DispatchConcurrent(beat, watchers) + if err != nil { + log.Errorf("Notify blockbeat for chainWatcher failed: %v", err) + } + + // Iterate all the copied channels and send the blockbeat to them. + err = chainio.DispatchConcurrent(beat, channels) + if err != nil { + // Shutdown lnd if there's an error processing the block. + log.Errorf("Notify blockbeat for ChannelArbitrator failed: %v", + err) + } +} diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go index e29f21e7f4..fef70b945a 100644 --- a/contractcourt/chain_watcher.go +++ b/contractcourt/chain_watcher.go @@ -16,6 +16,7 @@ import ( "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/chainio" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/fn/v2" @@ -210,6 +211,10 @@ type chainWatcher struct { started int32 // To be used atomically. stopped int32 // To be used atomically. + // Embed the blockbeat consumer struct to get access to the method + // `NotifyBlockProcessed` and the `BlockbeatChan`. + chainio.BeatConsumer + quit chan struct{} wg sync.WaitGroup @@ -219,13 +224,6 @@ type chainWatcher struct { // the current state number on the commitment transactions. stateHintObfuscator [lnwallet.StateHintSize]byte - // fundingPkScript is the pkScript of the funding output. - fundingPkScript []byte - - // heightHint is the height hint used to checkpoint scans on chain for - // conf/spend events. - heightHint uint32 - // All the fields below are protected by this mutex. sync.Mutex @@ -236,6 +234,22 @@ type chainWatcher struct { // clientSubscriptions is a map that keeps track of all the active // client subscriptions for events related to this channel. clientSubscriptions map[uint64]*ChainEventSubscription + + // fundingSpendNtfn is the spending notification subscription for the + // funding outpoint. + fundingSpendNtfn *chainntnfs.SpendEvent + + // fundingConfirmedNtfn is the confirmation notification subscription + // for the funding outpoint. This is only created if the channel is + // both taproot and pending confirmation. + // + // For taproot pkscripts, `RegisterSpendNtfn` will only notify on the + // outpoint being spent and not the outpoint+pkscript due to + // `ComputePkScript` being unable to compute the pkscript if a key + // spend is used. We need to add a `RegisterConfirmationsNtfn` here to + // ensure that the outpoint+pkscript pair is confirmed before calling + // `RegisterSpendNtfn`. + fundingConfirmedNtfn *chainntnfs.ConfirmationEvent } // newChainWatcher returns a new instance of a chainWatcher for a channel given @@ -260,12 +274,61 @@ func newChainWatcher(cfg chainWatcherConfig) (*chainWatcher, error) { ) } - return &chainWatcher{ + // Get the witness script for the funding output. + fundingPkScript, err := deriveFundingPkScript(chanState) + if err != nil { + return nil, err + } + + // Get the channel opening block height. + heightHint := deriveHeightHint(chanState) + + // We'll register for a notification to be dispatched if the funding + // output is spent. + spendNtfn, err := cfg.notifier.RegisterSpendNtfn( + &chanState.FundingOutpoint, fundingPkScript, heightHint, + ) + if err != nil { + return nil, err + } + + c := &chainWatcher{ cfg: cfg, stateHintObfuscator: stateHint, quit: make(chan struct{}), clientSubscriptions: make(map[uint64]*ChainEventSubscription), - }, nil + fundingSpendNtfn: spendNtfn, + } + + // If this is a pending taproot channel, we need to register for a + // confirmation notification of the funding tx. Check the docs in + // `fundingConfirmedNtfn` for details. + if c.cfg.chanState.IsPending && c.cfg.chanState.ChanType.IsTaproot() { + confNtfn, err := cfg.notifier.RegisterConfirmationsNtfn( + &chanState.FundingOutpoint.Hash, fundingPkScript, 1, + heightHint, + ) + if err != nil { + return nil, err + } + + c.fundingConfirmedNtfn = confNtfn + } + + // Mount the block consumer. + c.BeatConsumer = chainio.NewBeatConsumer(c.quit, c.Name()) + + return c, nil +} + +// Compile-time check for the chainio.Consumer interface. +var _ chainio.Consumer = (*chainWatcher)(nil) + +// Name returns the name of the watcher. +// +// NOTE: part of the `chainio.Consumer` interface. +func (c *chainWatcher) Name() string { + return fmt.Sprintf("ChainWatcher(%v)", c.cfg.chanState.FundingOutpoint) } // Start starts all goroutines that the chainWatcher needs to perform its @@ -275,75 +338,11 @@ func (c *chainWatcher) Start() error { return nil } - chanState := c.cfg.chanState log.Debugf("Starting chain watcher for ChannelPoint(%v)", - chanState.FundingOutpoint) - - // First, we'll register for a notification to be dispatched if the - // funding output is spent. - fundingOut := &chanState.FundingOutpoint - - // As a height hint, we'll try to use the opening height, but if the - // channel isn't yet open, then we'll use the height it was broadcast - // at. This may be an unconfirmed zero-conf channel. - c.heightHint = c.cfg.chanState.ShortChanID().BlockHeight - if c.heightHint == 0 { - c.heightHint = chanState.BroadcastHeight() - } - - // Since no zero-conf state is stored in a channel backup, the below - // logic will not be triggered for restored, zero-conf channels. Set - // the height hint for zero-conf channels. - if chanState.IsZeroConf() { - if chanState.ZeroConfConfirmed() { - // If the zero-conf channel is confirmed, we'll use the - // confirmed SCID's block height. - c.heightHint = chanState.ZeroConfRealScid().BlockHeight - } else { - // The zero-conf channel is unconfirmed. We'll need to - // use the FundingBroadcastHeight. - c.heightHint = chanState.BroadcastHeight() - } - } - - localKey := chanState.LocalChanCfg.MultiSigKey.PubKey - remoteKey := chanState.RemoteChanCfg.MultiSigKey.PubKey - - var ( - err error - ) - if chanState.ChanType.IsTaproot() { - c.fundingPkScript, _, err = input.GenTaprootFundingScript( - localKey, remoteKey, 0, chanState.TapscriptRoot, - ) - if err != nil { - return err - } - } else { - multiSigScript, err := input.GenMultiSigScript( - localKey.SerializeCompressed(), - remoteKey.SerializeCompressed(), - ) - if err != nil { - return err - } - c.fundingPkScript, err = input.WitnessScriptHash(multiSigScript) - if err != nil { - return err - } - } - - spendNtfn, err := c.cfg.notifier.RegisterSpendNtfn( - fundingOut, c.fundingPkScript, c.heightHint, - ) - if err != nil { - return err - } + c.cfg.chanState.FundingOutpoint) - // With the spend notification obtained, we'll now dispatch the - // closeObserver which will properly react to any changes. c.wg.Add(1) - go c.closeObserver(spendNtfn) + go c.closeObserver() return nil } @@ -555,7 +554,7 @@ func newChainSet(chanState *channeldb.OpenChannel) (*chainSet, error) { localCommit, remoteCommit, err := chanState.LatestCommitments() if err != nil { return nil, fmt.Errorf("unable to fetch channel state for "+ - "chan_point=%v", chanState.FundingOutpoint) + "chan_point=%v: %v", chanState.FundingOutpoint, err) } log.Tracef("ChannelPoint(%v): local_commit_type=%v, local_commit=%v", @@ -622,167 +621,49 @@ func newChainSet(chanState *channeldb.OpenChannel) (*chainSet, error) { // close observer will assembled the proper materials required to claim the // funds of the channel on-chain (if required), then dispatch these as // notifications to all subscribers. -func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) { +func (c *chainWatcher) closeObserver() { defer c.wg.Done() + defer c.fundingSpendNtfn.Cancel() log.Infof("Close observer for ChannelPoint(%v) active", c.cfg.chanState.FundingOutpoint) - // If this is a taproot channel, before we proceed, we want to ensure - // that the expected funding output has confirmed on chain. - if c.cfg.chanState.ChanType.IsTaproot() { - fundingPoint := c.cfg.chanState.FundingOutpoint - - confNtfn, err := c.cfg.notifier.RegisterConfirmationsNtfn( - &fundingPoint.Hash, c.fundingPkScript, 1, c.heightHint, - ) - if err != nil { - log.Warnf("unable to register for conf: %v", err) - } - - log.Infof("Waiting for taproot ChannelPoint(%v) to confirm...", - c.cfg.chanState.FundingOutpoint) - + for { select { - case _, ok := <-confNtfn.Confirmed: + // A new block is received, we will check whether this block + // contains a spending tx that we are interested in. + case beat := <-c.BlockbeatChan: + log.Debugf("ChainWatcher(%v) received blockbeat %v", + c.cfg.chanState.FundingOutpoint, beat.Height()) + + // Process the block. + c.handleBlockbeat(beat) + + // If the funding outpoint is spent, we now go ahead and handle + // it. Note that we cannot rely solely on the `block` event + // above to trigger a close event, as deep down, the receiving + // of block notifications and the receiving of spending + // notifications are done in two different goroutines, so the + // expected order: [receive block -> receive spend] is not + // guaranteed . + case spend, ok := <-c.fundingSpendNtfn.Spend: // If the channel was closed, then this means that the // notifier exited, so we will as well. if !ok { return } - case <-c.quit: - return - } - } - - select { - // We've detected a spend of the channel onchain! Depending on the type - // of spend, we'll act accordingly, so we'll examine the spending - // transaction to determine what we should do. - // - // TODO(Roasbeef): need to be able to ensure this only triggers - // on confirmation, to ensure if multiple txns are broadcast, we - // act on the one that's timestamped - case commitSpend, ok := <-spendNtfn.Spend: - // If the channel was closed, then this means that the notifier - // exited, so we will as well. - if !ok { - return - } - - // Otherwise, the remote party might have broadcast a prior - // revoked state...!!! - commitTxBroadcast := commitSpend.SpendingTx - - // First, we'll construct the chainset which includes all the - // data we need to dispatch an event to our subscribers about - // this possible channel close event. - chainSet, err := newChainSet(c.cfg.chanState) - if err != nil { - log.Errorf("unable to create commit set: %v", err) - return - } - // Decode the state hint encoded within the commitment - // transaction to determine if this is a revoked state or not. - obfuscator := c.stateHintObfuscator - broadcastStateNum := c.cfg.extractStateNumHint( - commitTxBroadcast, obfuscator, - ) - - // We'll go on to check whether it could be our own commitment - // that was published and know is confirmed. - ok, err = c.handleKnownLocalState( - commitSpend, broadcastStateNum, chainSet, - ) - if err != nil { - log.Errorf("Unable to handle known local state: %v", - err) - return - } - - if ok { - return - } - - // Now that we know it is neither a non-cooperative closure nor - // a local close with the latest state, we check if it is the - // remote that closed with any prior or current state. - ok, err = c.handleKnownRemoteState( - commitSpend, broadcastStateNum, chainSet, - ) - if err != nil { - log.Errorf("Unable to handle known remote state: %v", - err) - return - } - - if ok { - return - } - - // Next, we'll check to see if this is a cooperative channel - // closure or not. This is characterized by having an input - // sequence number that's finalized. This won't happen with - // regular commitment transactions due to the state hint - // encoding scheme. - switch commitTxBroadcast.TxIn[0].Sequence { - case wire.MaxTxInSequenceNum: - fallthrough - case mempool.MaxRBFSequence: - // TODO(roasbeef): rare but possible, need itest case - // for - err := c.dispatchCooperativeClose(commitSpend) + err := c.handleCommitSpend(spend) if err != nil { - log.Errorf("unable to handle co op close: %v", err) + log.Errorf("Failed to handle commit spend: %v", + err) } - return - } - - log.Warnf("Unknown commitment broadcast for "+ - "ChannelPoint(%v) ", c.cfg.chanState.FundingOutpoint) - // We'll try to recover as best as possible from losing state. - // We first check if this was a local unknown state. This could - // happen if we force close, then lose state or attempt - // recovery before the commitment confirms. - ok, err = c.handleUnknownLocalState( - commitSpend, broadcastStateNum, chainSet, - ) - if err != nil { - log.Errorf("Unable to handle known local state: %v", - err) - return - } - - if ok { - return - } - - // Since it was neither a known remote state, nor a local state - // that was published, it most likely mean we lost state and - // the remote node closed. In this case we must start the DLP - // protocol in hope of getting our money back. - ok, err = c.handleUnknownRemoteState( - commitSpend, broadcastStateNum, chainSet, - ) - if err != nil { - log.Errorf("Unable to handle unknown remote state: %v", - err) - return - } - - if ok { + // The chainWatcher has been signalled to exit, so we'll do so + // now. + case <-c.quit: return } - - log.Warnf("Unable to handle spending tx %v of channel point %v", - commitTxBroadcast.TxHash(), c.cfg.chanState.FundingOutpoint) - return - - // The chainWatcher has been signalled to exit, so we'll do so now. - case <-c.quit: - return } } @@ -1412,3 +1293,263 @@ func (c *chainWatcher) waitForCommitmentPoint() *btcec.PublicKey { } } } + +// deriveFundingPkScript derives the script used in the funding output. +func deriveFundingPkScript(chanState *channeldb.OpenChannel) ([]byte, error) { + localKey := chanState.LocalChanCfg.MultiSigKey.PubKey + remoteKey := chanState.RemoteChanCfg.MultiSigKey.PubKey + + var ( + err error + fundingPkScript []byte + ) + + if chanState.ChanType.IsTaproot() { + fundingPkScript, _, err = input.GenTaprootFundingScript( + localKey, remoteKey, 0, chanState.TapscriptRoot, + ) + if err != nil { + return nil, err + } + } else { + multiSigScript, err := input.GenMultiSigScript( + localKey.SerializeCompressed(), + remoteKey.SerializeCompressed(), + ) + if err != nil { + return nil, err + } + fundingPkScript, err = input.WitnessScriptHash(multiSigScript) + if err != nil { + return nil, err + } + } + + return fundingPkScript, nil +} + +// deriveHeightHint derives the block height for the channel opening. +func deriveHeightHint(chanState *channeldb.OpenChannel) uint32 { + // As a height hint, we'll try to use the opening height, but if the + // channel isn't yet open, then we'll use the height it was broadcast + // at. This may be an unconfirmed zero-conf channel. + heightHint := chanState.ShortChanID().BlockHeight + if heightHint == 0 { + heightHint = chanState.BroadcastHeight() + } + + // Since no zero-conf state is stored in a channel backup, the below + // logic will not be triggered for restored, zero-conf channels. Set + // the height hint for zero-conf channels. + if chanState.IsZeroConf() { + if chanState.ZeroConfConfirmed() { + // If the zero-conf channel is confirmed, we'll use the + // confirmed SCID's block height. + heightHint = chanState.ZeroConfRealScid().BlockHeight + } else { + // The zero-conf channel is unconfirmed. We'll need to + // use the FundingBroadcastHeight. + heightHint = chanState.BroadcastHeight() + } + } + + return heightHint +} + +// handleCommitSpend takes a spending tx of the funding output and handles the +// channel close based on the closure type. +func (c *chainWatcher) handleCommitSpend( + commitSpend *chainntnfs.SpendDetail) error { + + commitTxBroadcast := commitSpend.SpendingTx + + // First, we'll construct the chainset which includes all the data we + // need to dispatch an event to our subscribers about this possible + // channel close event. + chainSet, err := newChainSet(c.cfg.chanState) + if err != nil { + return fmt.Errorf("create commit set: %w", err) + } + + // Decode the state hint encoded within the commitment transaction to + // determine if this is a revoked state or not. + obfuscator := c.stateHintObfuscator + broadcastStateNum := c.cfg.extractStateNumHint( + commitTxBroadcast, obfuscator, + ) + + // We'll go on to check whether it could be our own commitment that was + // published and know is confirmed. + ok, err := c.handleKnownLocalState( + commitSpend, broadcastStateNum, chainSet, + ) + if err != nil { + return fmt.Errorf("handle known local state: %w", err) + } + if ok { + return nil + } + + // Now that we know it is neither a non-cooperative closure nor a local + // close with the latest state, we check if it is the remote that + // closed with any prior or current state. + ok, err = c.handleKnownRemoteState( + commitSpend, broadcastStateNum, chainSet, + ) + if err != nil { + return fmt.Errorf("handle known remote state: %w", err) + } + if ok { + return nil + } + + // Next, we'll check to see if this is a cooperative channel closure or + // not. This is characterized by having an input sequence number that's + // finalized. This won't happen with regular commitment transactions + // due to the state hint encoding scheme. + switch commitTxBroadcast.TxIn[0].Sequence { + case wire.MaxTxInSequenceNum: + fallthrough + case mempool.MaxRBFSequence: + // TODO(roasbeef): rare but possible, need itest case for + err := c.dispatchCooperativeClose(commitSpend) + if err != nil { + return fmt.Errorf("handle coop close: %w", err) + } + + return nil + } + + log.Warnf("Unknown commitment broadcast for ChannelPoint(%v) ", + c.cfg.chanState.FundingOutpoint) + + // We'll try to recover as best as possible from losing state. We + // first check if this was a local unknown state. This could happen if + // we force close, then lose state or attempt recovery before the + // commitment confirms. + ok, err = c.handleUnknownLocalState( + commitSpend, broadcastStateNum, chainSet, + ) + if err != nil { + return fmt.Errorf("handle known local state: %w", err) + } + if ok { + return nil + } + + // Since it was neither a known remote state, nor a local state that + // was published, it most likely mean we lost state and the remote node + // closed. In this case we must start the DLP protocol in hope of + // getting our money back. + ok, err = c.handleUnknownRemoteState( + commitSpend, broadcastStateNum, chainSet, + ) + if err != nil { + return fmt.Errorf("handle unknown remote state: %w", err) + } + if ok { + return nil + } + + log.Errorf("Unable to handle spending tx %v of channel point %v", + commitTxBroadcast.TxHash(), c.cfg.chanState.FundingOutpoint) + + return nil +} + +// checkFundingSpend performs a non-blocking read on the spendNtfn channel to +// check whether there's a commit spend already. Returns the spend details if +// found. +func (c *chainWatcher) checkFundingSpend() *chainntnfs.SpendDetail { + select { + // We've detected a spend of the channel onchain! Depending on the type + // of spend, we'll act accordingly, so we'll examine the spending + // transaction to determine what we should do. + // + // TODO(Roasbeef): need to be able to ensure this only triggers + // on confirmation, to ensure if multiple txns are broadcast, we + // act on the one that's timestamped + case spend, ok := <-c.fundingSpendNtfn.Spend: + // If the channel was closed, then this means that the notifier + // exited, so we will as well. + if !ok { + return nil + } + + log.Debugf("Found spend details for funding output: %v", + spend.SpenderTxHash) + + return spend + + default: + } + + return nil +} + +// chanPointConfirmed checks whether the given channel point has confirmed. +// This is used to ensure that the funding output has confirmed on chain before +// we proceed with the rest of the close observer logic for taproot channels. +// Check the docs in `fundingConfirmedNtfn` for details. +func (c *chainWatcher) chanPointConfirmed() bool { + op := c.cfg.chanState.FundingOutpoint + + select { + case _, ok := <-c.fundingConfirmedNtfn.Confirmed: + // If the channel was closed, then this means that the notifier + // exited, so we will as well. + if !ok { + return false + } + + log.Debugf("Taproot ChannelPoint(%v) confirmed", op) + + // The channel point has confirmed on chain. We now cancel the + // subscription. + c.fundingConfirmedNtfn.Cancel() + + return true + + default: + log.Infof("Taproot ChannelPoint(%v) not confirmed yet", op) + + return false + } +} + +// handleBlockbeat takes a blockbeat and queries for a spending tx for the +// funding output. If the spending tx is found, it will be handled based on the +// closure type. +func (c *chainWatcher) handleBlockbeat(beat chainio.Blockbeat) { + // Notify the chain watcher has processed the block. + defer c.NotifyBlockProcessed(beat, nil) + + // If we have a fundingConfirmedNtfn, it means this is a taproot + // channel that is pending, before we proceed, we want to ensure that + // the expected funding output has confirmed on chain. Check the docs + // in `fundingConfirmedNtfn` for details. + if c.fundingConfirmedNtfn != nil { + // If the funding output hasn't confirmed in this block, we + // will check it again in the next block. + if !c.chanPointConfirmed() { + return + } + } + + // Perform a non-blocking read to check whether the funding output was + // spent. + spend := c.checkFundingSpend() + if spend == nil { + log.Tracef("No spend found for ChannelPoint(%v) in block %v", + c.cfg.chanState.FundingOutpoint, beat.Height()) + + return + } + + // The funding output was spent, we now handle it by sending a close + // event to the channel arbitrator. + err := c.handleCommitSpend(spend) + if err != nil { + log.Errorf("Failed to handle commit spend: %v", err) + } +} diff --git a/contractcourt/chain_watcher_test.go b/contractcourt/chain_watcher_test.go index baea8d8738..0b3e23e714 100644 --- a/contractcourt/chain_watcher_test.go +++ b/contractcourt/chain_watcher_test.go @@ -9,10 +9,11 @@ import ( "time" "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/chainio" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/input" - "github.com/lightningnetwork/lnd/lntest/mock" + lnmock "github.com/lightningnetwork/lnd/lntest/mock" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/stretchr/testify/require" @@ -33,8 +34,8 @@ func TestChainWatcherRemoteUnilateralClose(t *testing.T) { // With the channels created, we'll now create a chain watcher instance // which will be watching for any closes of Alice's channel. - aliceNotifier := &mock.ChainNotifier{ - SpendChan: make(chan *chainntnfs.SpendDetail), + aliceNotifier := &lnmock.ChainNotifier{ + SpendChan: make(chan *chainntnfs.SpendDetail, 1), EpochChan: make(chan *chainntnfs.BlockEpoch), ConfChan: make(chan *chainntnfs.TxConfirmation), } @@ -49,6 +50,20 @@ func TestChainWatcherRemoteUnilateralClose(t *testing.T) { require.NoError(t, err, "unable to start chain watcher") defer aliceChainWatcher.Stop() + // Create a mock blockbeat and send it to Alice's BlockbeatChan. + mockBeat := &chainio.MockBlockbeat{} + + // Mock the logger. We don't care how many times it's called as it's + // not critical. + mockBeat.On("logger").Return(log) + + // Mock a fake block height - this is called based on the debuglevel. + mockBeat.On("Height").Return(int32(1)).Maybe() + + // Mock `NotifyBlockProcessed` to be call once. + mockBeat.On("NotifyBlockProcessed", + nil, aliceChainWatcher.quit).Return().Once() + // We'll request a new channel event subscription from Alice's chain // watcher. chanEvents := aliceChainWatcher.SubscribeChannelEvents() @@ -61,7 +76,19 @@ func TestChainWatcherRemoteUnilateralClose(t *testing.T) { SpenderTxHash: &bobTxHash, SpendingTx: bobCommit, } - aliceNotifier.SpendChan <- bobSpend + + // Here we mock the behavior of a restart. + select { + case aliceNotifier.SpendChan <- bobSpend: + case <-time.After(1 * time.Second): + t.Fatalf("unable to send spend details") + } + + select { + case aliceChainWatcher.BlockbeatChan <- mockBeat: + case <-time.After(time.Second * 1): + t.Fatalf("unable to send blockbeat") + } // We should get a new spend event over the remote unilateral close // event channel. @@ -117,7 +144,7 @@ func TestChainWatcherRemoteUnilateralClosePendingCommit(t *testing.T) { // With the channels created, we'll now create a chain watcher instance // which will be watching for any closes of Alice's channel. - aliceNotifier := &mock.ChainNotifier{ + aliceNotifier := &lnmock.ChainNotifier{ SpendChan: make(chan *chainntnfs.SpendDetail), EpochChan: make(chan *chainntnfs.BlockEpoch), ConfChan: make(chan *chainntnfs.TxConfirmation), @@ -165,7 +192,32 @@ func TestChainWatcherRemoteUnilateralClosePendingCommit(t *testing.T) { SpenderTxHash: &bobTxHash, SpendingTx: bobCommit, } - aliceNotifier.SpendChan <- bobSpend + + // Create a mock blockbeat and send it to Alice's BlockbeatChan. + mockBeat := &chainio.MockBlockbeat{} + + // Mock the logger. We don't care how many times it's called as it's + // not critical. + mockBeat.On("logger").Return(log) + + // Mock a fake block height - this is called based on the debuglevel. + mockBeat.On("Height").Return(int32(1)).Maybe() + + // Mock `NotifyBlockProcessed` to be call once. + mockBeat.On("NotifyBlockProcessed", + nil, aliceChainWatcher.quit).Return().Once() + + select { + case aliceNotifier.SpendChan <- bobSpend: + case <-time.After(1 * time.Second): + t.Fatalf("unable to send spend details") + } + + select { + case aliceChainWatcher.BlockbeatChan <- mockBeat: + case <-time.After(time.Second * 1): + t.Fatalf("unable to send blockbeat") + } // We should get a new spend event over the remote unilateral close // event channel. @@ -279,7 +331,7 @@ func TestChainWatcherDataLossProtect(t *testing.T) { // With the channels created, we'll now create a chain watcher // instance which will be watching for any closes of Alice's // channel. - aliceNotifier := &mock.ChainNotifier{ + aliceNotifier := &lnmock.ChainNotifier{ SpendChan: make(chan *chainntnfs.SpendDetail), EpochChan: make(chan *chainntnfs.BlockEpoch), ConfChan: make(chan *chainntnfs.TxConfirmation), @@ -326,7 +378,34 @@ func TestChainWatcherDataLossProtect(t *testing.T) { SpenderTxHash: &bobTxHash, SpendingTx: bobCommit, } - aliceNotifier.SpendChan <- bobSpend + + // Create a mock blockbeat and send it to Alice's + // BlockbeatChan. + mockBeat := &chainio.MockBlockbeat{} + + // Mock the logger. We don't care how many times it's called as + // it's not critical. + mockBeat.On("logger").Return(log) + + // Mock a fake block height - this is called based on the + // debuglevel. + mockBeat.On("Height").Return(int32(1)).Maybe() + + // Mock `NotifyBlockProcessed` to be call once. + mockBeat.On("NotifyBlockProcessed", + nil, aliceChainWatcher.quit).Return().Once() + + select { + case aliceNotifier.SpendChan <- bobSpend: + case <-time.After(time.Second * 1): + t.Fatalf("failed to send spend notification") + } + + select { + case aliceChainWatcher.BlockbeatChan <- mockBeat: + case <-time.After(time.Second * 1): + t.Fatalf("unable to send blockbeat") + } // We should get a new uni close resolution that indicates we // processed the DLP scenario. @@ -453,7 +532,7 @@ func TestChainWatcherLocalForceCloseDetect(t *testing.T) { // With the channels created, we'll now create a chain watcher // instance which will be watching for any closes of Alice's // channel. - aliceNotifier := &mock.ChainNotifier{ + aliceNotifier := &lnmock.ChainNotifier{ SpendChan: make(chan *chainntnfs.SpendDetail), EpochChan: make(chan *chainntnfs.BlockEpoch), ConfChan: make(chan *chainntnfs.TxConfirmation), @@ -497,7 +576,33 @@ func TestChainWatcherLocalForceCloseDetect(t *testing.T) { SpenderTxHash: &aliceTxHash, SpendingTx: aliceCommit, } - aliceNotifier.SpendChan <- aliceSpend + // Create a mock blockbeat and send it to Alice's + // BlockbeatChan. + mockBeat := &chainio.MockBlockbeat{} + + // Mock the logger. We don't care how many times it's called as + // it's not critical. + mockBeat.On("logger").Return(log) + + // Mock a fake block height - this is called based on the + // debuglevel. + mockBeat.On("Height").Return(int32(1)).Maybe() + + // Mock `NotifyBlockProcessed` to be call once. + mockBeat.On("NotifyBlockProcessed", + nil, aliceChainWatcher.quit).Return().Once() + + select { + case aliceNotifier.SpendChan <- aliceSpend: + case <-time.After(time.Second * 1): + t.Fatalf("unable to send spend notification") + } + + select { + case aliceChainWatcher.BlockbeatChan <- mockBeat: + case <-time.After(time.Second * 1): + t.Fatalf("unable to send blockbeat") + } // We should get a local force close event from Alice as she // should be able to detect the close based on the commitment diff --git a/contractcourt/channel_arbitrator.go b/contractcourt/channel_arbitrator.go index 9b554cb648..0b0a9961c2 100644 --- a/contractcourt/channel_arbitrator.go +++ b/contractcourt/channel_arbitrator.go @@ -2795,8 +2795,6 @@ func (c *ChannelArbitrator) updateActiveHTLCs() { // Nursery for incubation, and ultimate sweeping. // // NOTE: This MUST be run as a goroutine. -// -//nolint:funlen func (c *ChannelArbitrator) channelAttendant(bestHeight int32, commitSet *CommitSet) { @@ -2860,26 +2858,11 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32, // We've cooperatively closed the channel, so we're no longer // needed. We'll mark the channel as resolved and exit. case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure: - log.Infof("ChannelArbitrator(%v) marking channel "+ - "cooperatively closed", c.cfg.ChanPoint) - - err := c.cfg.MarkChannelClosed( - closeInfo.ChannelCloseSummary, - channeldb.ChanStatusCoopBroadcasted, - ) + err := c.handleCoopCloseEvent(closeInfo) if err != nil { - log.Errorf("Unable to mark channel closed: "+ - "%v", err) - return - } + log.Errorf("Failed to handle coop close: %v", + err) - // We'll now advance our state machine until it reaches - // a terminal state, and the channel is marked resolved. - _, _, err = c.advanceState( - closeInfo.CloseHeight, coopCloseTrigger, nil, - ) - if err != nil { - log.Errorf("Unable to advance state: %v", err) return } @@ -2892,226 +2875,39 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32, c.cfg.ChanPoint) } - closeTx := closeInfo.CloseTx - - resolutions, err := closeInfo.ContractResolutions. - UnwrapOrErr( - fmt.Errorf("resolutions not found"), - ) - if err != nil { - log.Errorf("ChannelArbitrator(%v): unable to "+ - "get resolutions: %v", c.cfg.ChanPoint, - err) - - return - } - - // We make sure that the htlc resolutions are present - // otherwise we would panic dereferencing the pointer. - // - // TODO(ziggie): Refactor ContractResolutions to use - // options. - if resolutions.HtlcResolutions == nil { - log.Errorf("ChannelArbitrator(%v): htlc "+ - "resolutions not found", - c.cfg.ChanPoint) - - return - } - - log.Infof("ChannelArbitrator(%v): local force close "+ - "tx=%v confirmed", c.cfg.ChanPoint, - closeTx.TxHash()) - - contractRes := &ContractResolutions{ - CommitHash: closeTx.TxHash(), - CommitResolution: resolutions.CommitResolution, - HtlcResolutions: *resolutions.HtlcResolutions, - AnchorResolution: resolutions.AnchorResolution, - } - - // When processing a unilateral close event, we'll - // transition to the ContractClosed state. We'll log - // out the set of resolutions such that they are - // available to fetch in that state, we'll also write - // the commit set so we can reconstruct our chain - // actions on restart. - err = c.log.LogContractResolutions(contractRes) + err := c.handleLocalForceCloseEvent(closeInfo) if err != nil { - log.Errorf("Unable to write resolutions: %v", - err) - return - } - err = c.log.InsertConfirmedCommitSet( - &closeInfo.CommitSet, - ) - if err != nil { - log.Errorf("Unable to write commit set: %v", - err) - return - } + log.Errorf("Failed to handle local force "+ + "close: %v", err) - // After the set of resolutions are successfully - // logged, we can safely close the channel. After this - // succeeds we won't be getting chain events anymore, - // so we must make sure we can recover on restart after - // it is marked closed. If the next state transition - // fails, we'll start up in the prior state again, and - // we won't be longer getting chain events. In this - // case we must manually re-trigger the state - // transition into StateContractClosed based on the - // close status of the channel. - err = c.cfg.MarkChannelClosed( - closeInfo.ChannelCloseSummary, - channeldb.ChanStatusLocalCloseInitiator, - ) - if err != nil { - log.Errorf("Unable to mark "+ - "channel closed: %v", err) return } - // We'll now advance our state machine until it reaches - // a terminal state. - _, _, err = c.advanceState( - uint32(closeInfo.SpendingHeight), - localCloseTrigger, &closeInfo.CommitSet, - ) - if err != nil { - log.Errorf("Unable to advance state: %v", err) - } - // The remote party has broadcast the commitment on-chain. // We'll examine our state to determine if we need to act at // all. case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure: - log.Infof("ChannelArbitrator(%v): remote party has "+ - "closed channel out on-chain", c.cfg.ChanPoint) - - // If we don't have a self output, and there are no - // active HTLC's, then we can immediately mark the - // contract as fully resolved and exit. - contractRes := &ContractResolutions{ - CommitHash: *uniClosure.SpenderTxHash, - CommitResolution: uniClosure.CommitResolution, - HtlcResolutions: *uniClosure.HtlcResolutions, - AnchorResolution: uniClosure.AnchorResolution, - } - - // When processing a unilateral close event, we'll - // transition to the ContractClosed state. We'll log - // out the set of resolutions such that they are - // available to fetch in that state, we'll also write - // the commit set so we can reconstruct our chain - // actions on restart. - err := c.log.LogContractResolutions(contractRes) - if err != nil { - log.Errorf("Unable to write resolutions: %v", - err) - return - } - err = c.log.InsertConfirmedCommitSet( - &uniClosure.CommitSet, - ) + err := c.handleRemoteForceCloseEvent(uniClosure) if err != nil { - log.Errorf("Unable to write commit set: %v", - err) - return - } + log.Errorf("Failed to handle remote force "+ + "close: %v", err) - // After the set of resolutions are successfully - // logged, we can safely close the channel. After this - // succeeds we won't be getting chain events anymore, - // so we must make sure we can recover on restart after - // it is marked closed. If the next state transition - // fails, we'll start up in the prior state again, and - // we won't be longer getting chain events. In this - // case we must manually re-trigger the state - // transition into StateContractClosed based on the - // close status of the channel. - closeSummary := &uniClosure.ChannelCloseSummary - err = c.cfg.MarkChannelClosed( - closeSummary, - channeldb.ChanStatusRemoteCloseInitiator, - ) - if err != nil { - log.Errorf("Unable to mark channel closed: %v", - err) return } - // We'll now advance our state machine until it reaches - // a terminal state. - _, _, err = c.advanceState( - uint32(uniClosure.SpendingHeight), - remoteCloseTrigger, &uniClosure.CommitSet, - ) - if err != nil { - log.Errorf("Unable to advance state: %v", err) - } - // The remote has breached the channel. As this is handled by // the ChainWatcher and BreachArbitrator, we don't have to do // anything in particular, so just advance our state and // gracefully exit. case breachInfo := <-c.cfg.ChainEvents.ContractBreach: - log.Infof("ChannelArbitrator(%v): remote party has "+ - "breached channel!", c.cfg.ChanPoint) - - // In the breach case, we'll only have anchor and - // breach resolutions. - contractRes := &ContractResolutions{ - CommitHash: breachInfo.CommitHash, - BreachResolution: breachInfo.BreachResolution, - AnchorResolution: breachInfo.AnchorResolution, - } - - // We'll transition to the ContractClosed state and log - // the set of resolutions such that they can be turned - // into resolvers later on. We'll also insert the - // CommitSet of the latest set of commitments. - err := c.log.LogContractResolutions(contractRes) + err := c.handleContractBreach(breachInfo) if err != nil { - log.Errorf("Unable to write resolutions: %v", - err) - return - } - err = c.log.InsertConfirmedCommitSet( - &breachInfo.CommitSet, - ) - if err != nil { - log.Errorf("Unable to write commit set: %v", - err) - return - } + log.Errorf("Failed to handle contract breach: "+ + "%v", err) - // The channel is finally marked pending closed here as - // the BreachArbitrator and channel arbitrator have - // persisted the relevant states. - closeSummary := &breachInfo.CloseSummary - err = c.cfg.MarkChannelClosed( - closeSummary, - channeldb.ChanStatusRemoteCloseInitiator, - ) - if err != nil { - log.Errorf("Unable to mark channel closed: %v", - err) return } - log.Infof("Breached channel=%v marked pending-closed", - breachInfo.BreachResolution.FundingOutPoint) - - // We'll advance our state machine until it reaches a - // terminal state. - _, _, err = c.advanceState( - uint32(bestHeight), breachCloseTrigger, - &breachInfo.CommitSet, - ) - if err != nil { - log.Errorf("Unable to advance state: %v", err) - } - // A new contract has just been resolved, we'll now check our // log to see if all contracts have been resolved. If so, then // we can exit as the contract is fully resolved. @@ -3196,6 +2992,22 @@ func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error { // Notify we've processed the block. defer c.NotifyBlockProcessed(beat, nil) + // If the state is StateContractClosed, StateWaitingFullResolution, or + // StateFullyResolved, there's no need to read the close event channel + // and launch the resolvers since the arbitrator can only get to this + // state after processing a previous close event and launched all its + // resolvers. + if c.state.IsContractClosed() { + log.Infof("ChannelArbitrator(%v): skipping launching "+ + "resolvers in state=%v", c.cfg.ChanPoint, c.state) + + return nil + } + + // Perform a non-blocking read on the close events in case the channel + // is closed in this blockbeat. + c.receiveAndProcessCloseEvent() + // Try to advance the state if we are in StateDefault. if c.state == StateDefault { // Now that a new block has arrived, we'll attempt to advance @@ -3214,6 +3026,60 @@ func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error { return nil } +// receiveAndProcessCloseEvent does a non-blocking read on all the channel +// close event channels. If an event is received, it will be further processed. +func (c *ChannelArbitrator) receiveAndProcessCloseEvent() { + select { + // Received a coop close event, we now mark the channel as resolved and + // exit. + case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure: + err := c.handleCoopCloseEvent(closeInfo) + if err != nil { + log.Errorf("Failed to handle coop close: %v", err) + return + } + + // We have broadcast our commitment, and it is now confirmed onchain. + case closeInfo := <-c.cfg.ChainEvents.LocalUnilateralClosure: + if c.state != StateCommitmentBroadcasted { + log.Errorf("ChannelArbitrator(%v): unexpected "+ + "local on-chain channel close", c.cfg.ChanPoint) + } + + err := c.handleLocalForceCloseEvent(closeInfo) + if err != nil { + log.Errorf("Failed to handle local force close: %v", + err) + + return + } + + // The remote party has broadcast the commitment. We'll examine our + // state to determine if we need to act at all. + case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure: + err := c.handleRemoteForceCloseEvent(uniClosure) + if err != nil { + log.Errorf("Failed to handle remote force close: %v", + err) + + return + } + + // The remote has breached the channel! We now launch the breach + // contract resolvers. + case breachInfo := <-c.cfg.ChainEvents.ContractBreach: + err := c.handleContractBreach(breachInfo) + if err != nil { + log.Errorf("Failed to handle contract breach: %v", err) + return + } + + default: + log.Infof("ChannelArbitrator(%v) no close event", + c.cfg.ChanPoint) + } +} + // Name returns a human-readable string for this subsystem. // // NOTE: Part of chainio.Consumer interface. @@ -3506,3 +3372,226 @@ func (c *ChannelArbitrator) abandonForwards(htlcs fn.Set[uint64]) error { return nil } + +// handleCoopCloseEvent takes a coop close event from ChainEvents, marks the +// channel as closed and advances the state. +func (c *ChannelArbitrator) handleCoopCloseEvent( + closeInfo *CooperativeCloseInfo) error { + + log.Infof("ChannelArbitrator(%v) marking channel cooperatively closed "+ + "at height %v", c.cfg.ChanPoint, closeInfo.CloseHeight) + + err := c.cfg.MarkChannelClosed( + closeInfo.ChannelCloseSummary, + channeldb.ChanStatusCoopBroadcasted, + ) + if err != nil { + return fmt.Errorf("unable to mark channel closed: %w", err) + } + + // We'll now advance our state machine until it reaches a terminal + // state, and the channel is marked resolved. + _, _, err = c.advanceState(closeInfo.CloseHeight, coopCloseTrigger, nil) + if err != nil { + log.Errorf("Unable to advance state: %v", err) + } + + return nil +} + +// handleLocalForceCloseEvent takes a local force close event from ChainEvents, +// saves the contract resolutions to disk, mark the channel as closed and +// advance the state. +func (c *ChannelArbitrator) handleLocalForceCloseEvent( + closeInfo *LocalUnilateralCloseInfo) error { + + closeTx := closeInfo.CloseTx + + resolutions, err := closeInfo.ContractResolutions. + UnwrapOrErr( + fmt.Errorf("resolutions not found"), + ) + if err != nil { + return fmt.Errorf("unable to get resolutions: %w", err) + } + + // We make sure that the htlc resolutions are present + // otherwise we would panic dereferencing the pointer. + // + // TODO(ziggie): Refactor ContractResolutions to use + // options. + if resolutions.HtlcResolutions == nil { + return fmt.Errorf("htlc resolutions is nil") + } + + log.Infof("ChannelArbitrator(%v): local force close tx=%v confirmed", + c.cfg.ChanPoint, closeTx.TxHash()) + + contractRes := &ContractResolutions{ + CommitHash: closeTx.TxHash(), + CommitResolution: resolutions.CommitResolution, + HtlcResolutions: *resolutions.HtlcResolutions, + AnchorResolution: resolutions.AnchorResolution, + } + + // When processing a unilateral close event, we'll transition to the + // ContractClosed state. We'll log out the set of resolutions such that + // they are available to fetch in that state, we'll also write the + // commit set so we can reconstruct our chain actions on restart. + err = c.log.LogContractResolutions(contractRes) + if err != nil { + return fmt.Errorf("unable to write resolutions: %w", err) + } + + err = c.log.InsertConfirmedCommitSet(&closeInfo.CommitSet) + if err != nil { + return fmt.Errorf("unable to write commit set: %w", err) + } + + // After the set of resolutions are successfully logged, we can safely + // close the channel. After this succeeds we won't be getting chain + // events anymore, so we must make sure we can recover on restart after + // it is marked closed. If the next state transition fails, we'll start + // up in the prior state again, and we won't be longer getting chain + // events. In this case we must manually re-trigger the state + // transition into StateContractClosed based on the close status of the + // channel. + err = c.cfg.MarkChannelClosed( + closeInfo.ChannelCloseSummary, + channeldb.ChanStatusLocalCloseInitiator, + ) + if err != nil { + return fmt.Errorf("unable to mark channel closed: %w", err) + } + + // We'll now advance our state machine until it reaches a terminal + // state. + _, _, err = c.advanceState( + uint32(closeInfo.SpendingHeight), + localCloseTrigger, &closeInfo.CommitSet, + ) + if err != nil { + log.Errorf("Unable to advance state: %v", err) + } + + return nil +} + +// handleRemoteForceCloseEvent takes a remote force close event from +// ChainEvents, saves the contract resolutions to disk, mark the channel as +// closed and advance the state. +func (c *ChannelArbitrator) handleRemoteForceCloseEvent( + closeInfo *RemoteUnilateralCloseInfo) error { + + log.Infof("ChannelArbitrator(%v): remote party has force closed "+ + "channel at height %v", c.cfg.ChanPoint, + closeInfo.SpendingHeight) + + // If we don't have a self output, and there are no active HTLC's, then + // we can immediately mark the contract as fully resolved and exit. + contractRes := &ContractResolutions{ + CommitHash: *closeInfo.SpenderTxHash, + CommitResolution: closeInfo.CommitResolution, + HtlcResolutions: *closeInfo.HtlcResolutions, + AnchorResolution: closeInfo.AnchorResolution, + } + + // When processing a unilateral close event, we'll transition to the + // ContractClosed state. We'll log out the set of resolutions such that + // they are available to fetch in that state, we'll also write the + // commit set so we can reconstruct our chain actions on restart. + err := c.log.LogContractResolutions(contractRes) + if err != nil { + return fmt.Errorf("unable to write resolutions: %w", err) + } + + err = c.log.InsertConfirmedCommitSet(&closeInfo.CommitSet) + if err != nil { + return fmt.Errorf("unable to write commit set: %w", err) + } + + // After the set of resolutions are successfully logged, we can safely + // close the channel. After this succeeds we won't be getting chain + // events anymore, so we must make sure we can recover on restart after + // it is marked closed. If the next state transition fails, we'll start + // up in the prior state again, and we won't be longer getting chain + // events. In this case we must manually re-trigger the state + // transition into StateContractClosed based on the close status of the + // channel. + closeSummary := &closeInfo.ChannelCloseSummary + err = c.cfg.MarkChannelClosed( + closeSummary, + channeldb.ChanStatusRemoteCloseInitiator, + ) + if err != nil { + return fmt.Errorf("unable to mark channel closed: %w", err) + } + + // We'll now advance our state machine until it reaches a terminal + // state. + _, _, err = c.advanceState( + uint32(closeInfo.SpendingHeight), + remoteCloseTrigger, &closeInfo.CommitSet, + ) + if err != nil { + log.Errorf("Unable to advance state: %v", err) + } + + return nil +} + +// handleContractBreach takes a breach close event from ChainEvents, saves the +// contract resolutions to disk, mark the channel as closed and advance the +// state. +func (c *ChannelArbitrator) handleContractBreach( + breachInfo *BreachCloseInfo) error { + + closeSummary := &breachInfo.CloseSummary + + log.Infof("ChannelArbitrator(%v): remote party has breached channel "+ + "at height %v!", c.cfg.ChanPoint, closeSummary.CloseHeight) + + // In the breach case, we'll only have anchor and breach resolutions. + contractRes := &ContractResolutions{ + CommitHash: breachInfo.CommitHash, + BreachResolution: breachInfo.BreachResolution, + AnchorResolution: breachInfo.AnchorResolution, + } + + // We'll transition to the ContractClosed state and log the set of + // resolutions such that they can be turned into resolvers later on. + // We'll also insert the CommitSet of the latest set of commitments. + err := c.log.LogContractResolutions(contractRes) + if err != nil { + return fmt.Errorf("unable to write resolutions: %w", err) + } + + err = c.log.InsertConfirmedCommitSet(&breachInfo.CommitSet) + if err != nil { + return fmt.Errorf("unable to write commit set: %w", err) + } + + // The channel is finally marked pending closed here as the + // BreachArbitrator and channel arbitrator have persisted the relevant + // states. + err = c.cfg.MarkChannelClosed( + closeSummary, channeldb.ChanStatusRemoteCloseInitiator, + ) + if err != nil { + return fmt.Errorf("unable to mark channel closed: %w", err) + } + + log.Infof("Breached channel=%v marked pending-closed", + breachInfo.BreachResolution.FundingOutPoint) + + // We'll advance our state machine until it reaches a terminal state. + _, _, err = c.advanceState( + closeSummary.CloseHeight, breachCloseTrigger, + &breachInfo.CommitSet, + ) + if err != nil { + log.Errorf("Unable to advance state: %v", err) + } + + return nil +} diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index 0b576fc7de..d2ef485d6b 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -120,6 +120,18 @@ * LND updates channel.backup file at shutdown time. +* A new subsystem `chainio` is + [introduced](https://github.com/lightningnetwork/lnd/pull/9277) to make sure + the subsystems are in sync with their current best block. Previously, when + resolving a force close channel, the sweeping of HTLCs may be delayed for one + or two blocks due to block heights not in sync in the relevant subsystems + (`ChainArbitrator`, `UtxoSweeper` and `TxPublisher`), causing a slight + inaccuracy when deciding the sweeping feerate and urgency. With `chainio`, + this is now fixed as these subsystems now share the same view on the best + block. Check + [here](https://github.com/lightningnetwork/lnd/blob/master/chainio/README.md) + to learn more. + ## RPC Updates * Some RPCs that previously just returned an empty response message now at least