diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 055b690608..d888f3edd1 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -575,9 +575,6 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error { // Set the current beat. c.beat = beat - log.Infof("ChainArbitrator starting at height %d with budget=[%v]", - &c.cfg.Budget, c.beat.Height()) - // First, we'll fetch all the channels that are still open, in order to // collect them within our set of active contracts. if err := c.loadOpenChannels(); err != nil { @@ -687,6 +684,11 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error { c.dispatchBlocks() }() + log.Infof("ChainArbitrator starting at height %d with %d chain "+ + "watchers, %d channel arbitrators, and budget config=[%v]", + c.beat.Height(), len(c.activeWatchers), len(c.activeChannels), + &c.cfg.Budget) + // TODO(roasbeef): eventually move all breach watching here return nil @@ -1058,8 +1060,8 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error chanPoint := newChan.FundingOutpoint - log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)", - newChan.FundingOutpoint) + log.Infof("Creating new Chainwatcher and ChannelArbitrator for "+ + "ChannelPoint(%v)", newChan.FundingOutpoint) // If we're already watching this channel, then we'll ignore this // request. diff --git a/contractcourt/channel_arbitrator.go b/contractcourt/channel_arbitrator.go index bc2e0d9e79..da7c60b090 100644 --- a/contractcourt/channel_arbitrator.go +++ b/contractcourt/channel_arbitrator.go @@ -2954,6 +2954,10 @@ func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error { // Notify we've processed the block. defer c.NotifyBlockProcessed(beat, nil) + // Perform a non-blocking read on the close events in case the channel + // is closed in this blockbeat. + c.receiveAndProcessCloseEvent() + // Try to advance the state if we are in StateDefault. if c.state == StateDefault { // Now that a new block has arrived, we'll attempt to advance @@ -2972,6 +2976,59 @@ func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error { return nil } +// receiveAndProcessCloseEvent does a non-blocking read on all the channel +// close event channels. If an event is received, it will be further processed. +func (c *ChannelArbitrator) receiveAndProcessCloseEvent() { + select { + // Received a coop close event, we now mark the channel as resolved and + // exit. + case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure: + err := c.handleCoopCloseEvent(closeInfo) + if err != nil { + log.Errorf("Failed to handle coop close: %v", err) + return + } + + // We have broadcast our commitment, and it is now confirmed onchain. + case closeInfo := <-c.cfg.ChainEvents.LocalUnilateralClosure: + if c.state != StateCommitmentBroadcasted { + log.Errorf("ChannelArbitrator(%v): unexpected "+ + "local on-chain channel close", c.id()) + } + + err := c.handleLocalForceCloseEvent(closeInfo) + if err != nil { + log.Errorf("Failed to handle local force close: %v", + err) + + return + } + + // The remote party has broadcast the commitment. We'll examine our + // state to determine if we need to act at all. + case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure: + err := c.handleRemoteForceCloseEvent(uniClosure) + if err != nil { + log.Errorf("Failed to handle remote force close: %v", + err) + + return + } + + // The remote has breached the channel! We now launch the + // breach contract resolvers. + case breachInfo := <-c.cfg.ChainEvents.ContractBreach: + err := c.handleContractBreach(breachInfo) + if err != nil { + log.Errorf("Failed to handle contract breach: %v", err) + return + } + + default: + log.Infof("ChannelArbitrator(%v) no close event", c.id()) + } +} + // Name returns a human-readable string for this subsystem. // // NOTE: Part of chainio.Consumer interface.