Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

static addr loopin: add support for sweepless sweep batching #844

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions loopd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
Store: staticAddressLoopInStore,
WalletKit: d.lnd.WalletKit,
ChainNotifier: d.lnd.ChainNotifier,
NotificationManager: notificationManager,
ChainParams: d.lnd.ChainParams,
Signer: d.lnd.Signer,
ValidateLoopInContract: loop.ValidateLoopInContract,
Expand Down
7 changes: 2 additions & 5 deletions loopd/swapclient_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1796,14 +1796,11 @@ func toClientStaticAddressLoopInState(
case loopin.HtlcTimeoutSwept:
return looprpc.StaticAddressLoopInSwapState_HTLC_STATIC_ADDRESS_TIMEOUT_SWEPT

case loopin.FetchSignPushSweeplessSweepTx:
return looprpc.StaticAddressLoopInSwapState_FETCH_SIGN_PUSH_SWEEPLESS_SWEEP_TX

case loopin.Succeeded:
return looprpc.StaticAddressLoopInSwapState_SUCCEEDED

case loopin.SucceededSweeplessSigFailed:
return looprpc.StaticAddressLoopInSwapState_SUCCEEDED_SWEEPLESS_SIG_FAILED
case loopin.SucceededTransitioningFailed:
return looprpc.StaticAddressLoopInSwapState_SUCCEEDED_TRANSITIONING_FAILED

case loopin.UnlockDeposits:
return looprpc.StaticAddressLoopInSwapState_UNLOCK_DEPOSITS
Expand Down
346 changes: 170 additions & 176 deletions looprpc/client.pb.go

Large diffs are not rendered by default.

12 changes: 4 additions & 8 deletions looprpc/client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1855,23 +1855,19 @@ enum StaticAddressLoopInSwapState {

/*
*/
FETCH_SIGN_PUSH_SWEEPLESS_SWEEP_TX = 8;
SUCCEEDED = 8;

/*
*/
SUCCEEDED = 9;
SUCCEEDED_TRANSITIONING_FAILED = 9;

/*
*/
SUCCEEDED_SWEEPLESS_SIG_FAILED = 10;
UNLOCK_DEPOSITS = 10;

/*
*/
UNLOCK_DEPOSITS = 11;

/*
*/
FAILED_STATIC_ADDRESS_SWAP = 12;
FAILED_STATIC_ADDRESS_SWAP = 11;
}

message StaticAddressLoopInRequest {
Expand Down
3 changes: 1 addition & 2 deletions looprpc/client.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1634,9 +1634,8 @@
"SWEEP_STATIC_ADDRESS_HTLC_TIMEOUT",
"MONITOR_HTLC_TIMEOUT_SWEEP",
"HTLC_STATIC_ADDRESS_TIMEOUT_SWEPT",
"FETCH_SIGN_PUSH_SWEEPLESS_SWEEP_TX",
"SUCCEEDED",
"SUCCEEDED_SWEEPLESS_SIG_FAILED",
"SUCCEEDED_TRANSITIONING_FAILED",
"UNLOCK_DEPOSITS",
"FAILED_STATIC_ADDRESS_SWAP"
],
Expand Down
60 changes: 53 additions & 7 deletions notifications/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ const (
// NotificationTypeReservation is the notification type for reservation
// notifications.
NotificationTypeReservation

// NotificationTypeStaticLoopInSweepRequest is the notification type for
// static loop in sweep requests.
NotificationTypeStaticLoopInSweepRequest
)

// Client is the interface that the notification manager needs to implement in
Expand Down Expand Up @@ -79,7 +83,8 @@ func (m *Manager) SubscribeReservations(ctx context.Context,

m.addSubscriber(NotificationTypeReservation, sub)

// Start a goroutine to remove the subscriber when the context is canceled
// Start a goroutine to remove the subscriber when the context is
// canceled.
go func() {
<-ctx.Done()
m.removeSubscriber(NotificationTypeReservation, sub)
Expand All @@ -89,6 +94,34 @@ func (m *Manager) SubscribeReservations(ctx context.Context,
return notifChan
}

// SubscribeStaticLoopInSweepRequests subscribes to the static loop in sweep
// requests.
func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
) <-chan *swapserverrpc.ServerStaticLoopInSweepNotification {

notifChan := make(
chan *swapserverrpc.ServerStaticLoopInSweepNotification, 1,
)
sub := subscriber{
subCtx: ctx,
recvChan: notifChan,
}

m.addSubscriber(NotificationTypeStaticLoopInSweepRequest, sub)

// Start a goroutine to remove the subscriber when the context is
// canceled.
go func() {
<-ctx.Done()
m.removeSubscriber(
NotificationTypeStaticLoopInSweepRequest, sub,
)
close(notifChan)
}()

return notifChan
}

// Run starts the notification manager. It will keep on running until the
// context is canceled. It will subscribe to notifications and forward them to
// the subscribers. On a first successful connection to the server, it will
Expand Down Expand Up @@ -160,7 +193,7 @@ func (m *Manager) subscribeNotifications(ctx context.Context,
for {
notification, err := notifStream.Recv()
if err == nil && notification != nil {
log.Debugf("Received notification: %v", notification)
log.Tracef("Received notification: %v", notification)
m.handleNotification(notification)
continue
}
Expand All @@ -173,13 +206,13 @@ func (m *Manager) subscribeNotifications(ctx context.Context,

// handleNotification handles an incoming notification from the server,
// forwarding it to the appropriate subscribers.
func (m *Manager) handleNotification(notification *swapserverrpc.
func (m *Manager) handleNotification(ntfn *swapserverrpc.
SubscribeNotificationsResponse) {

switch notification.Notification.(type) {
case *swapserverrpc.SubscribeNotificationsResponse_ReservationNotification:
switch ntfn.Notification.(type) {
case *swapserverrpc.SubscribeNotificationsResponse_ReservationNotification: // nolint: lll
// We'll forward the reservation notification to all subscribers.
reservationNtfn := notification.GetReservationNotification()
reservationNtfn := ntfn.GetReservationNotification()
m.Lock()
defer m.Unlock()

Expand All @@ -189,10 +222,23 @@ func (m *Manager) handleNotification(notification *swapserverrpc.

recvChan <- reservationNtfn
}
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInSweep: // nolint: lll
// We'll forward the static loop in sweep request to all
// subscribers.
staticLoopInSweepRequestNtfn := ntfn.GetStaticLoopInSweep()
m.Lock()
defer m.Unlock()

for _, sub := range m.subscribers[NotificationTypeStaticLoopInSweepRequest] { // nolint: lll
recvChan := sub.recvChan.(chan *swapserverrpc.
ServerStaticLoopInSweepNotification)

recvChan <- staticLoopInSweepRequestNtfn
}

default:
log.Warnf("Received unknown notification type: %v",
notification)
ntfn)
}
}

Expand Down
11 changes: 0 additions & 11 deletions staticaddr/address/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,6 @@ func (m *mockStaticAddressClient) PushStaticAddressSweeplessSigs(ctx context.Con
args.Error(1)
}

func (m *mockStaticAddressClient) FetchSweeplessSweepTx(ctx context.Context,
in *swapserverrpc.FetchSweeplessSweepTxRequest,
opts ...grpc.CallOption) (
*swapserverrpc.FetchSweeplessSweepTxResponse, error) {

args := m.Called(ctx, in, opts)

return args.Get(0).(*swapserverrpc.FetchSweeplessSweepTxResponse),
args.Error(1)
}

func (m *mockStaticAddressClient) PushStaticAddressHtlcSigs(ctx context.Context,
in *swapserverrpc.PushStaticAddressHtlcSigsRequest,
opts ...grpc.CallOption) (
Expand Down
11 changes: 0 additions & 11 deletions staticaddr/deposit/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,6 @@ func (m *mockStaticAddressClient) PushStaticAddressSweeplessSigs(ctx context.Con
args.Error(1)
}

func (m *mockStaticAddressClient) FetchSweeplessSweepTx(ctx context.Context,
in *swapserverrpc.FetchSweeplessSweepTxRequest,
opts ...grpc.CallOption) (
*swapserverrpc.FetchSweeplessSweepTxResponse, error) {

args := m.Called(ctx, in, opts)

return args.Get(0).(*swapserverrpc.FetchSweeplessSweepTxResponse),
args.Error(1)
}

func (m *mockStaticAddressClient) PushStaticAddressHtlcSigs(ctx context.Context,
in *swapserverrpc.PushStaticAddressHtlcSigsRequest,
opts ...grpc.CallOption) (
Expand Down
Loading
Loading