Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beat [4/4]: implement Consumer in chainWatcher #9277

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions chainntnfs/txnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1757,10 +1757,6 @@ func (n *TxNotifier) NotifyHeight(height uint32) error {
for ntfn := range n.ntfnsByConfirmHeight[height] {
confSet := n.confNotifications[ntfn.ConfRequest]

Log.Debugf("Dispatching %v confirmation notification for "+
"conf_id=%v, %v", ntfn.NumConfirmations, ntfn.ConfID,
ntfn.ConfRequest)

// The default notification we assigned above includes the
// block along with the rest of the details. However not all
// clients want the block, so we make a copy here w/o the block
Expand All @@ -1770,6 +1766,20 @@ func (n *TxNotifier) NotifyHeight(height uint32) error {
confDetails.Block = nil
}

// If the `confDetails` has already been sent before, we'll
// skip it and continue processing the next one.
if ntfn.dispatched {
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved
Log.Debugf("Skipped dispatched conf details for "+
"request %v conf_id=%v", ntfn.ConfRequest,
ntfn.ConfID)

continue
}

Log.Debugf("Dispatching %v confirmation notification for "+
"conf_id=%v, %v", ntfn.NumConfirmations, ntfn.ConfID,
ntfn.ConfRequest)

select {
case ntfn.Event.Confirmed <- &confDetails:
ntfn.dispatched = true
Expand Down
13 changes: 12 additions & 1 deletion chanrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
Expand Down Expand Up @@ -286,6 +287,9 @@ func (c *chanDBRestorer) RestoreChansFromSingles(backups ...chanbackup.Single) e

ltndLog.Infof("Informing chain watchers of new restored channels")

// Create a slice of channel points.
chanPoints := make([]wire.OutPoint, 0, len(channelShells))

// Finally, we'll need to inform the chain arbitrator of these new
// channels so we'll properly watch for their ultimate closure on chain
// and sweep them via the DLP.
Expand All @@ -294,8 +298,15 @@ func (c *chanDBRestorer) RestoreChansFromSingles(backups ...chanbackup.Single) e
if err != nil {
return err
}

chanPoints = append(
chanPoints, restoredChannel.Chan.FundingOutpoint,
)
}

// With all the channels restored, we'll now re-send the blockbeat.
c.chainArb.RedispatchBlockbeat(chanPoints)

ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand All @@ -314,7 +325,7 @@ func (s *server) ConnectPeer(nodePub *btcec.PublicKey, addrs []net.Addr) error {
// to ensure the new connection is created after this new link/channel
// is known.
if err := s.DisconnectPeer(nodePub); err != nil {
ltndLog.Infof("Peer(%v) is already connected, proceeding "+
ltndLog.Infof("Peer(%x) is already connected, proceeding "+
"with chan restore", nodePub.SerializeCompressed())
}

Expand Down
9 changes: 9 additions & 0 deletions contractcourt/briefcase.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,15 @@ func (a ArbitratorState) String() string {
}
}

// IsContractClosed returns a bool to indicate whether the closing/breaching tx
// has been confirmed onchain. If the state is StateContractClosed,
// StateWaitingFullResolution, or StateFullyResolved, it means the contract has
// been closed and all related contracts have been launched.
func (a ArbitratorState) IsContractClosed() bool {
return a == StateContractClosed || a == StateWaitingFullResolution ||
a == StateFullyResolved
}

// resolverType is an enum that enumerates the various types of resolvers. When
// writing resolvers to disk, we prepend this to the raw bytes stored. This
// allows us to properly decode the resolver into the proper type.
Expand Down
69 changes: 63 additions & 6 deletions contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,9 +578,6 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
// Set the current beat.
c.beat = beat

log.Infof("ChainArbitrator starting at height %d with budget=[%v]",
&c.cfg.Budget, c.beat.Height())

// First, we'll fetch all the channels that are still open, in order to
// collect them within our set of active contracts.
if err := c.loadOpenChannels(); err != nil {
Expand Down Expand Up @@ -690,6 +687,11 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
c.dispatchBlocks()
}()

log.Infof("ChainArbitrator starting at height %d with %d chain "+
"watchers, %d channel arbitrators, and budget config=[%v]",
c.beat.Height(), len(c.activeWatchers), len(c.activeChannels),
&c.cfg.Budget)

// TODO(roasbeef): eventually move all breach watching here

return nil
Expand Down Expand Up @@ -727,19 +729,34 @@ func (c *ChainArbitrator) handleBlockbeat(beat chainio.Blockbeat) {

// Create a slice to record active channel arbitrator.
channels := make([]chainio.Consumer, 0, len(c.activeChannels))
watchers := make([]chainio.Consumer, 0, len(c.activeWatchers))

// Copy the active channels to the slice.
for _, channel := range c.activeChannels {
channels = append(channels, channel)
}

for _, watcher := range c.activeWatchers {
watchers = append(watchers, watcher)
}

c.Unlock()

// Iterate all the copied watchers and send the blockbeat to them.
err := chainio.DispatchConcurrent(beat, watchers)
if err != nil {
log.Errorf("Notify blockbeat for chainWatcher failed: %v", err)
}

// Iterate all the copied channels and send the blockbeat to them.
//
// NOTE: This method will timeout if the processing of blocks of the
// subsystems is too long (60s).
err := chainio.DispatchConcurrent(beat, channels)
err = chainio.DispatchConcurrent(beat, channels)
if err != nil {
log.Errorf("Notify blockbeat for ChannelArbitrator failed: %v",
err)
}

// Notify the chain arbitrator has processed the block.
c.NotifyBlockProcessed(beat, err)
Expand Down Expand Up @@ -1046,8 +1063,8 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error

chanPoint := newChan.FundingOutpoint

log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)",
chanPoint)
log.Infof("Creating new chainWatcher and ChannelArbitrator for "+
"ChannelPoint(%v)", chanPoint)

// If we're already watching this channel, then we'll ignore this
// request.
Expand Down Expand Up @@ -1356,3 +1373,43 @@ func (c *ChainArbitrator) loadPendingCloseChannels() error {

return nil
}

// RedispatchBlockbeat resends the current blockbeat to the channels specified
// by the chanPoints. It is used when a channel is added to the chain
// arbitrator after it has been started, e.g., during the channel restore
// process.
func (c *ChainArbitrator) RedispatchBlockbeat(chanPoints []wire.OutPoint) {
// Get the current blockbeat.
beat := c.beat

// Prepare two sets of consumers.
channels := make([]chainio.Consumer, 0, len(chanPoints))
watchers := make([]chainio.Consumer, 0, len(chanPoints))

// Read the active channels in a lock.
c.Lock()
for _, op := range chanPoints {
if channel, ok := c.activeChannels[op]; ok {
channels = append(channels, channel)
}

if watcher, ok := c.activeWatchers[op]; ok {
watchers = append(watchers, watcher)
}
}
c.Unlock()

// Iterate all the copied watchers and send the blockbeat to them.
err := chainio.DispatchConcurrent(beat, watchers)
if err != nil {
log.Errorf("Notify blockbeat for chainWatcher failed: %v", err)
}

// Iterate all the copied channels and send the blockbeat to them.
err = chainio.DispatchConcurrent(beat, channels)
if err != nil {
// Shutdown lnd if there's an error processing the block.
log.Errorf("Notify blockbeat for ChannelArbitrator failed: %v",
err)
}
}
Loading
Loading