Skip to content

Commit

Permalink
Merge pull request #844 from sputn1ck/static_sweep_batching
Browse files Browse the repository at this point in the history
static addr loopin: add support for sweepless sweep batching
  • Loading branch information
sputn1ck authored Nov 18, 2024
2 parents 0d87b69 + 144b62c commit 627a830
Show file tree
Hide file tree
Showing 19 changed files with 1,098 additions and 1,253 deletions.
1 change: 1 addition & 0 deletions loopd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
Store: staticAddressLoopInStore,
WalletKit: d.lnd.WalletKit,
ChainNotifier: d.lnd.ChainNotifier,
NotificationManager: notificationManager,
ChainParams: d.lnd.ChainParams,
Signer: d.lnd.Signer,
ValidateLoopInContract: loop.ValidateLoopInContract,
Expand Down
7 changes: 2 additions & 5 deletions loopd/swapclient_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1796,14 +1796,11 @@ func toClientStaticAddressLoopInState(
case loopin.HtlcTimeoutSwept:
return looprpc.StaticAddressLoopInSwapState_HTLC_STATIC_ADDRESS_TIMEOUT_SWEPT

case loopin.FetchSignPushSweeplessSweepTx:
return looprpc.StaticAddressLoopInSwapState_FETCH_SIGN_PUSH_SWEEPLESS_SWEEP_TX

case loopin.Succeeded:
return looprpc.StaticAddressLoopInSwapState_SUCCEEDED

case loopin.SucceededSweeplessSigFailed:
return looprpc.StaticAddressLoopInSwapState_SUCCEEDED_SWEEPLESS_SIG_FAILED
case loopin.SucceededTransitioningFailed:
return looprpc.StaticAddressLoopInSwapState_SUCCEEDED_TRANSITIONING_FAILED

case loopin.UnlockDeposits:
return looprpc.StaticAddressLoopInSwapState_UNLOCK_DEPOSITS
Expand Down
346 changes: 170 additions & 176 deletions looprpc/client.pb.go

Large diffs are not rendered by default.

12 changes: 4 additions & 8 deletions looprpc/client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1855,23 +1855,19 @@ enum StaticAddressLoopInSwapState {

/*
*/
FETCH_SIGN_PUSH_SWEEPLESS_SWEEP_TX = 8;
SUCCEEDED = 8;

/*
*/
SUCCEEDED = 9;
SUCCEEDED_TRANSITIONING_FAILED = 9;

/*
*/
SUCCEEDED_SWEEPLESS_SIG_FAILED = 10;
UNLOCK_DEPOSITS = 10;

/*
*/
UNLOCK_DEPOSITS = 11;

/*
*/
FAILED_STATIC_ADDRESS_SWAP = 12;
FAILED_STATIC_ADDRESS_SWAP = 11;
}

message StaticAddressLoopInRequest {
Expand Down
3 changes: 1 addition & 2 deletions looprpc/client.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1634,9 +1634,8 @@
"SWEEP_STATIC_ADDRESS_HTLC_TIMEOUT",
"MONITOR_HTLC_TIMEOUT_SWEEP",
"HTLC_STATIC_ADDRESS_TIMEOUT_SWEPT",
"FETCH_SIGN_PUSH_SWEEPLESS_SWEEP_TX",
"SUCCEEDED",
"SUCCEEDED_SWEEPLESS_SIG_FAILED",
"SUCCEEDED_TRANSITIONING_FAILED",
"UNLOCK_DEPOSITS",
"FAILED_STATIC_ADDRESS_SWAP"
],
Expand Down
60 changes: 53 additions & 7 deletions notifications/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ const (
// NotificationTypeReservation is the notification type for reservation
// notifications.
NotificationTypeReservation

// NotificationTypeStaticLoopInSweepRequest is the notification type for
// static loop in sweep requests.
NotificationTypeStaticLoopInSweepRequest
)

// Client is the interface that the notification manager needs to implement in
Expand Down Expand Up @@ -79,7 +83,8 @@ func (m *Manager) SubscribeReservations(ctx context.Context,

m.addSubscriber(NotificationTypeReservation, sub)

// Start a goroutine to remove the subscriber when the context is canceled
// Start a goroutine to remove the subscriber when the context is
// canceled.
go func() {
<-ctx.Done()
m.removeSubscriber(NotificationTypeReservation, sub)
Expand All @@ -89,6 +94,34 @@ func (m *Manager) SubscribeReservations(ctx context.Context,
return notifChan
}

// SubscribeStaticLoopInSweepRequests subscribes to the static loop in sweep
// requests.
func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
) <-chan *swapserverrpc.ServerStaticLoopInSweepNotification {

notifChan := make(
chan *swapserverrpc.ServerStaticLoopInSweepNotification, 1,
)
sub := subscriber{
subCtx: ctx,
recvChan: notifChan,
}

m.addSubscriber(NotificationTypeStaticLoopInSweepRequest, sub)

// Start a goroutine to remove the subscriber when the context is
// canceled.
go func() {
<-ctx.Done()
m.removeSubscriber(
NotificationTypeStaticLoopInSweepRequest, sub,
)
close(notifChan)
}()

return notifChan
}

// Run starts the notification manager. It will keep on running until the
// context is canceled. It will subscribe to notifications and forward them to
// the subscribers. On a first successful connection to the server, it will
Expand Down Expand Up @@ -160,7 +193,7 @@ func (m *Manager) subscribeNotifications(ctx context.Context,
for {
notification, err := notifStream.Recv()
if err == nil && notification != nil {
log.Debugf("Received notification: %v", notification)
log.Tracef("Received notification: %v", notification)
m.handleNotification(notification)
continue
}
Expand All @@ -173,13 +206,13 @@ func (m *Manager) subscribeNotifications(ctx context.Context,

// handleNotification handles an incoming notification from the server,
// forwarding it to the appropriate subscribers.
func (m *Manager) handleNotification(notification *swapserverrpc.
func (m *Manager) handleNotification(ntfn *swapserverrpc.
SubscribeNotificationsResponse) {

switch notification.Notification.(type) {
case *swapserverrpc.SubscribeNotificationsResponse_ReservationNotification:
switch ntfn.Notification.(type) {
case *swapserverrpc.SubscribeNotificationsResponse_ReservationNotification: // nolint: lll
// We'll forward the reservation notification to all subscribers.
reservationNtfn := notification.GetReservationNotification()
reservationNtfn := ntfn.GetReservationNotification()
m.Lock()
defer m.Unlock()

Expand All @@ -189,10 +222,23 @@ func (m *Manager) handleNotification(notification *swapserverrpc.

recvChan <- reservationNtfn
}
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInSweep: // nolint: lll
// We'll forward the static loop in sweep request to all
// subscribers.
staticLoopInSweepRequestNtfn := ntfn.GetStaticLoopInSweep()
m.Lock()
defer m.Unlock()

for _, sub := range m.subscribers[NotificationTypeStaticLoopInSweepRequest] { // nolint: lll
recvChan := sub.recvChan.(chan *swapserverrpc.
ServerStaticLoopInSweepNotification)

recvChan <- staticLoopInSweepRequestNtfn
}

default:
log.Warnf("Received unknown notification type: %v",
notification)
ntfn)
}
}

Expand Down
11 changes: 0 additions & 11 deletions staticaddr/address/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,6 @@ func (m *mockStaticAddressClient) PushStaticAddressSweeplessSigs(ctx context.Con
args.Error(1)
}

func (m *mockStaticAddressClient) FetchSweeplessSweepTx(ctx context.Context,
in *swapserverrpc.FetchSweeplessSweepTxRequest,
opts ...grpc.CallOption) (
*swapserverrpc.FetchSweeplessSweepTxResponse, error) {

args := m.Called(ctx, in, opts)

return args.Get(0).(*swapserverrpc.FetchSweeplessSweepTxResponse),
args.Error(1)
}

func (m *mockStaticAddressClient) PushStaticAddressHtlcSigs(ctx context.Context,
in *swapserverrpc.PushStaticAddressHtlcSigsRequest,
opts ...grpc.CallOption) (
Expand Down
11 changes: 0 additions & 11 deletions staticaddr/deposit/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,6 @@ func (m *mockStaticAddressClient) PushStaticAddressSweeplessSigs(ctx context.Con
args.Error(1)
}

func (m *mockStaticAddressClient) FetchSweeplessSweepTx(ctx context.Context,
in *swapserverrpc.FetchSweeplessSweepTxRequest,
opts ...grpc.CallOption) (
*swapserverrpc.FetchSweeplessSweepTxResponse, error) {

args := m.Called(ctx, in, opts)

return args.Get(0).(*swapserverrpc.FetchSweeplessSweepTxResponse),
args.Error(1)
}

func (m *mockStaticAddressClient) PushStaticAddressHtlcSigs(ctx context.Context,
in *swapserverrpc.PushStaticAddressHtlcSigsRequest,
opts ...grpc.CallOption) (
Expand Down
Loading

0 comments on commit 627a830

Please sign in to comment.