Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show closed and disabled channels, fix policy order #76

Merged
merged 4 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
TransactionCreated = "transaction.created"
WalletBalanceUpdated = "wallet.balance.updated"
RoutingEventUpdated = "routing.event.updated"
GraphUpdated = "graph.updated"
)

type Event struct {
Expand Down
2 changes: 2 additions & 0 deletions network/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ type Backend interface {
SubscribeTransactions(context.Context, chan *models.Transaction) error

SubscribeRoutingEvents(context.Context, chan *models.RoutingEvent) error

SubscribeGraphEvents(context.Context, chan *models.ChannelEdgeUpdate) error
}
103 changes: 82 additions & 21 deletions network/backend/lnd/lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lnd

import (
"context"
"encoding/hex"
"fmt"
"time"

Expand All @@ -21,7 +22,7 @@ import (

const (
lndDefaultInvoiceExpiry = 3600
lndMinPoolCapacity = 4
lndMinPoolCapacity = 6
)

type Client struct {
Expand Down Expand Up @@ -91,7 +92,7 @@ func (l Backend) SubscribeInvoice(ctx context.Context, channelInvoice chan *mode
for {
select {
case <-ctx.Done():
break
return nil
default:
invoice, err := cltInvoices.Recv()
if err != nil {
Expand Down Expand Up @@ -123,7 +124,7 @@ func (l Backend) SubscribeTransactions(ctx context.Context, channel chan *models
for {
select {
case <-ctx.Done():
break
return nil
default:
transaction, err := cltTransactions.Recv()
if err != nil {
Expand All @@ -141,24 +142,84 @@ func (l Backend) SubscribeTransactions(ctx context.Context, channel chan *models
}

func (l Backend) SubscribeChannels(ctx context.Context, events chan *models.ChannelUpdate) error {
_, err := l.Client(ctx)
clt, err := l.Client(ctx)
if err != nil {
return err
}
defer clt.Close()

// events, err := clt.SubscribeChannelEvents(ctx, &lnrpc.ChannelEventSubscription{})
// if err != nil {
// return err
// }
channelEvents, err := clt.SubscribeChannelEvents(ctx, &lnrpc.ChannelEventSubscription{})
if err != nil {
return err
}

// for {
// event, err := events.Recv()
// if err != nil {
// return err
// }
// events <-
//}
return nil
for {
select {
case <-ctx.Done():
return nil
default:
event, err := channelEvents.Recv()
if err != nil {
st, ok := status.FromError(err)
if ok && st.Code() == codes.Canceled {
l.logger.Debug("stopping subscribe channels: context canceled")
return nil
}
return err
}
if event.Type == lnrpc.ChannelEventUpdate_FULLY_RESOLVED_CHANNEL {
events <- &models.ChannelUpdate{}
}

}
}
}

func chanpointToString(c *lnrpc.ChannelPoint) string {
hash := c.GetFundingTxidBytes()
for i := 0; i < len(hash)/2; i++ {
hash[i], hash[len(hash)-i-1] = hash[len(hash)-i-1], hash[i]
}
output := c.OutputIndex
result := fmt.Sprintf("%s:%d", hex.EncodeToString(hash), output)
return result
}

func (l Backend) SubscribeGraphEvents(ctx context.Context, events chan *models.ChannelEdgeUpdate) error {
clt, err := l.Client(ctx)
if err != nil {
return err
}
defer clt.Close()

graphEvents, err := clt.SubscribeChannelGraph(ctx, &lnrpc.GraphTopologySubscription{})
if err != nil {
return err
}

for {
select {
case <-ctx.Done():
return nil
default:
event, err := graphEvents.Recv()
if err != nil {
st, ok := status.FromError(err)
if ok && st.Code() == codes.Canceled {
l.logger.Debug("stopping subscribe graph: context canceled")
return nil
}
return err
}
chanPoints := []string{}
for _, c := range event.ChannelUpdates {
chanPoints = append(chanPoints, chanpointToString(c.ChanPoint))
}
if len(chanPoints) > 0 {
events <- &models.ChannelEdgeUpdate{ChanPoints: chanPoints}
}
}
}
}

func (l Backend) SubscribeRoutingEvents(ctx context.Context, channelEvents chan *models.RoutingEvent) error {
Expand All @@ -176,7 +237,7 @@ func (l Backend) SubscribeRoutingEvents(ctx context.Context, channelEvents chan
for {
select {
case <-ctx.Done():
break
return nil
default:
event, err := cltRoutingEvents.Recv()
if err != nil {
Expand Down Expand Up @@ -351,15 +412,15 @@ func (l Backend) GetChannelInfo(ctx context.Context, channel *models.Channel) er

t := time.Unix(int64(uint64(resp.LastUpdate)), 0)
channel.LastUpdate = &t
channel.Policy1 = protoToRoutingPolicy(resp.Node1Policy)
channel.Policy2 = protoToRoutingPolicy(resp.Node2Policy)
channel.LocalPolicy = protoToRoutingPolicy(resp.Node1Policy)
channel.RemotePolicy = protoToRoutingPolicy(resp.Node2Policy)

info, err := clt.GetInfo(ctx, &lnrpc.GetInfoRequest{})
if err != nil {
return errors.WithStack(err)
}
if info != nil {
channel.WeFirst = resp.Node1Pub == info.IdentityPubkey
if info != nil && resp.Node1Pub != info.IdentityPubkey {
channel.LocalPolicy, channel.RemotePolicy = channel.RemotePolicy, channel.LocalPolicy
}

return nil
Expand Down
6 changes: 3 additions & 3 deletions network/backend/lnd/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,11 @@ func nodeProtoToNode(resp *lnrpc.NodeInfo) *models.Node {
ID: c.ChannelId,
ChannelPoint: c.ChanPoint,
Capacity: c.Capacity,
Policy1: protoToRoutingPolicy(c.Node1Policy),
Policy2: protoToRoutingPolicy(c.Node2Policy),
LocalPolicy: protoToRoutingPolicy(c.Node1Policy),
RemotePolicy: protoToRoutingPolicy(c.Node2Policy),
}
if c.Node1Pub != resp.Node.PubKey {
ch.Policy1, ch.Policy2 = ch.Policy2, ch.Policy1
ch.LocalPolicy, ch.RemotePolicy = ch.RemotePolicy, ch.LocalPolicy
}
channels = append(channels, ch)
}
Expand Down
4 changes: 4 additions & 0 deletions network/backend/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (b *Backend) SubscribeRoutingEvents(ctx context.Context, channel chan *mode
return nil
}

func (b *Backend) SubscribeGraphEvents(ctx context.Context, channel chan *models.ChannelEdgeUpdate) error {
return nil
}

func (b *Backend) GetNode(ctx context.Context, pubkey string, includeChannels bool) (*models.Node, error) {
return &models.Node{}, nil
}
Expand Down
12 changes: 8 additions & 4 deletions network/models/channel.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package models

import (
"strings"
"time"

"github.com/edouardparis/lntop/logging"
Expand All @@ -13,6 +14,7 @@ const (
ChannelClosing
ChannelForceClosing
ChannelWaitingClose
ChannelClosed
)

type ChannelsBalance struct {
Expand Down Expand Up @@ -47,9 +49,8 @@ type Channel struct {
PendingHTLC []*HTLC
LastUpdate *time.Time
Node *Node
WeFirst bool
Policy1 *RoutingPolicy
Policy2 *RoutingPolicy
LocalPolicy *RoutingPolicy
RemotePolicy *RoutingPolicy
}

func (m Channel) MarshalLogObject(enc logging.ObjectEncoder) error {
Expand Down Expand Up @@ -78,7 +79,7 @@ func (m Channel) ShortAlias() (alias string, forced bool) {
} else if m.Node == nil || m.Node.Alias == "" {
alias = m.RemotePubKey[:24]
} else {
alias = m.Node.Alias
alias = strings.ReplaceAll(m.Node.Alias, "\ufe0f", "")
}
if len(alias) > 25 {
alias = alias[:24]
Expand All @@ -89,6 +90,9 @@ func (m Channel) ShortAlias() (alias string, forced bool) {
type ChannelUpdate struct {
}

type ChannelEdgeUpdate struct {
ChanPoints []string
}
type RoutingPolicy struct {
TimeLockDelta uint32
MinHtlc int64
Expand Down
60 changes: 60 additions & 0 deletions pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,64 @@ func (p *PubSub) routingUpdates(ctx context.Context, sub chan *events.Event) {
}()
}

func (p *PubSub) graphUpdates(ctx context.Context, sub chan *events.Event) {
p.wg.Add(3)
graphUpdates := make(chan *models.ChannelEdgeUpdate)
ctx, cancel := context.WithCancel(ctx)

go func() {
for gu := range graphUpdates {
p.logger.Debug("receive graph update")
sub <- events.NewWithData(events.GraphUpdated, gu)
}
p.wg.Done()
}()

go func() {
err := p.network.SubscribeGraphEvents(ctx, graphUpdates)
if err != nil {
p.logger.Error("SubscribeGraphEvents returned an error", logging.Error(err))
}
p.wg.Done()
}()

go func() {
<-p.stop
cancel()
close(graphUpdates)
p.wg.Done()
}()
}

func (p *PubSub) channels(ctx context.Context, sub chan *events.Event) {
p.wg.Add(3)
channels := make(chan *models.ChannelUpdate)
ctx, cancel := context.WithCancel(ctx)

go func() {
for range channels {
p.logger.Debug("channels updated")
sub <- events.New(events.ChannelActive)
}
p.wg.Done()
}()

go func() {
err := p.network.SubscribeChannels(ctx, channels)
if err != nil {
p.logger.Error("SubscribeChannels returned an error", logging.Error(err))
}
p.wg.Done()
}()

go func() {
<-p.stop
cancel()
close(channels)
p.wg.Done()
}()
}

func (p *PubSub) Stop() {
p.stop <- true
close(p.stop)
Expand All @@ -129,6 +187,8 @@ func (p *PubSub) Run(ctx context.Context, sub chan *events.Event) {
p.invoices(ctx, sub)
p.transactions(ctx, sub)
p.routingUpdates(ctx, sub)
p.channels(ctx, sub)
p.graphUpdates(ctx, sub)
p.ticker(ctx, sub,
withTickerInfo(),
withTickerChannelsBalance(),
Expand Down
2 changes: 2 additions & 0 deletions ui/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ func (c *controller) Listen(ctx context.Context, g *gocui.Gui, sub chan *events.
refresh(c.models.RefreshInfo)
case events.RoutingEventUpdated:
refresh(c.models.RefreshRouting(event.Data))
case events.GraphUpdated:
refresh(c.models.RefreshPolicies(event.Data))
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions ui/models/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ func (c *Channels) Update(newChannel *models.Channel) {
oldChannel.LastUpdate = newChannel.LastUpdate
}

if newChannel.Policy1 != nil {
oldChannel.Policy1 = newChannel.Policy1
if newChannel.LocalPolicy != nil {
oldChannel.LocalPolicy = newChannel.LocalPolicy
}

if newChannel.Policy2 != nil {
oldChannel.Policy2 = newChannel.Policy2
if newChannel.RemotePolicy != nil {
oldChannel.RemotePolicy = newChannel.RemotePolicy
}
}

Expand Down
Loading