Skip to content

Commit

Permalink
Merge pull request #659 from lightninglabs/add-recv-complete-event
Browse files Browse the repository at this point in the history
Custodian emits a new asset-receive-complete event to notification subscribers
  • Loading branch information
ffranr authored Nov 27, 2023
2 parents 80e7c85 + 6cacee9 commit 346d838
Show file tree
Hide file tree
Showing 6 changed files with 545 additions and 302 deletions.
112 changes: 65 additions & 47 deletions itest/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,10 +807,7 @@ func testReattemptFailedSendUniCourier(t *harnessTest) {
// receiving an asset proof will be reattempted by the receiving tapd node. This
// test targets the universe proof courier.
func testReattemptFailedReceiveUniCourier(t *harnessTest) {
var (
ctxb = context.Background()
wg sync.WaitGroup
)
ctxb := context.Background()

// This tapd node will send the asset to the receiving tapd node.
// It will also transfer proof the related transfer proofs to the
Expand Down Expand Up @@ -892,56 +889,47 @@ func testReattemptFailedReceiveUniCourier(t *harnessTest) {

// Test to ensure that we receive the minimum expected number of backoff
// wait event notifications.
//
// This test is executed in a goroutine to ensure that we can receive
// the event notification(s) from the tapd node as soon as the proof
// courier service is restarted.
wg.Add(1)
go func() {
defer wg.Done()

// Define a target event selector to match the backoff wait
// event. This function selects for a specific event type.
targetEventSelector := func(event *taprpc.ReceiveAssetEvent) bool {
switch eventTyped := event.Event.(type) {
case *taprpc.ReceiveAssetEvent_ProofTransferBackoffWaitEvent:
ev := eventTyped.ProofTransferBackoffWaitEvent

// We are attempting to identify receive
// transfer types. Skip the event if it is not
// a receiving transfer type.
if ev.TransferType != taprpc.ProofTransferType_PROOF_TRANSFER_TYPE_RECEIVE {
return false
}
t.Logf("Waiting for the receiving tapd node to complete backoff " +
"proof retrieval attempts")

t.Logf("Found event ntfs: %v", ev)
return true
// Define a target event selector to match the backoff wait event. This
// function selects for a specific event type.
targetEventSelector := func(event *taprpc.ReceiveAssetEvent) bool {
switch eventTyped := event.Event.(type) {
case *taprpc.ReceiveAssetEvent_ProofTransferBackoffWaitEvent:
ev := eventTyped.ProofTransferBackoffWaitEvent

// We are attempting to identify receive transfer types.
// Skip the event if it is not a receiving transfer
// type.
if ev.TransferType != taprpc.ProofTransferType_PROOF_TRANSFER_TYPE_RECEIVE {
return false
}

return false
t.Logf("Found event ntfs: %v", ev)
return true
}

// Expected minimum number of events to receive.
expectedEventCount := 3

// Context timeout scales with expected number of events.
timeout := time.Duration(expectedEventCount) *
defaultProofTransferReceiverAckTimeout
// Add overhead buffer to context timeout.
timeout += 5 * time.Second
ctx, cancel := context.WithTimeout(ctxb, timeout)
defer cancel()
return false
}

assertAssetRecvNtfsEvent(
t, ctx, eventNtfns, targetEventSelector,
expectedEventCount,
)
}()
// Expected minimum number of events to receive.
expectedEventCount := 3

// Context timeout scales with expected number of events.
timeout := time.Duration(expectedEventCount) *
defaultProofTransferReceiverAckTimeout
// Add overhead buffer to context timeout.
timeout += 5 * time.Second
ctx, cancel := context.WithTimeout(ctxb, timeout)
defer cancel()

// Assert that the receiver tapd node has accomplished our minimum
// expected number of backoff procedure receive attempts.
assertAssetRecvNtfsEvent(
t, ctx, eventNtfns, targetEventSelector, expectedEventCount,
)

// Wait for the receiver node's backoff attempts to complete.
t.Logf("Waiting for the receiving tapd node to complete backoff " +
"proof retrieval attempts")
wg.Wait()
t.Logf("Finished waiting for the receiving tapd node to complete " +
"backoff procedure")

Expand All @@ -956,6 +944,13 @@ func testReattemptFailedReceiveUniCourier(t *harnessTest) {
// proof(s).
t.Logf("Attempting to confirm asset received by receiver node")
AssertNonInteractiveRecvComplete(t.t, receiveTapd, 1)

// Confirm that the sender tapd node eventually receives the asset
// transfer and publishes an asset recv complete event.
t.Logf("Check for asset recv complete event from receiver tapd node")
assertAssetRecvCompleteEvent(
t, ctxb, 5*time.Second, recvAddr.Encoded, eventNtfns,
)
}

// testOfflineReceiverEventuallyReceives tests that a receiver node will
Expand Down Expand Up @@ -1169,6 +1164,29 @@ func assertAssetRecvNtfsEvent(t *harnessTest, ctx context.Context,
expectedCount, countFound)
}

// assertAssetRecvNtfsEvent asserts that the given asset receive complete event
// notification was received. This function will block until the event is
// received or the event stream is closed.
func assertAssetRecvCompleteEvent(t *harnessTest, ctxb context.Context,
timeout time.Duration, encodedAddr string,
eventNtfns taprpc.TaprootAssets_SubscribeReceiveAssetEventNtfnsClient) {

ctx, cancel := context.WithTimeout(ctxb, timeout)
defer cancel()

eventSelector := func(event *taprpc.ReceiveAssetEvent) bool {
switch eventTyped := event.Event.(type) {
case *taprpc.ReceiveAssetEvent_AssetReceiveCompleteEvent:
ev := eventTyped.AssetReceiveCompleteEvent
return encodedAddr == ev.Address.Encoded
default:
return false
}
}

assertAssetRecvNtfsEvent(t, ctx, eventNtfns, eventSelector, 1)
}

// testMultiInputSendNonInteractiveSingleID tests that we can properly
// non-interactively send a single asset from multiple inputs.
//
Expand Down
25 changes: 22 additions & 3 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2350,7 +2350,9 @@ func (r *rpcServer) SubscribeReceiveAssetEventNtfns(
// RPC event type and sent over the stream.
case event := <-eventSubscriber.NewItemCreated.ChanOut():

rpcEvent, err := marshallReceiveAssetEvent(event)
rpcEvent, err := marshallReceiveAssetEvent(
event, r.cfg.TapAddrBook,
)
if err != nil {
return fmt.Errorf("failed to marshall "+
"asset receive event into RPC event: "+
Expand Down Expand Up @@ -2385,8 +2387,8 @@ func (r *rpcServer) SubscribeReceiveAssetEventNtfns(
}

// marshallReceiveAssetEvent maps an asset receive event to its RPC counterpart.
func marshallReceiveAssetEvent(
eventInterface fn.Event) (*taprpc.ReceiveAssetEvent, error) {
func marshallReceiveAssetEvent(eventInterface fn.Event,
db address.Storage) (*taprpc.ReceiveAssetEvent, error) {

switch event := eventInterface.(type) {
case *proof.BackoffWaitEvent:
Expand All @@ -2413,6 +2415,23 @@ func marshallReceiveAssetEvent(
Event: &eventRpc,
}, nil

case *tapgarden.AssetReceiveCompleteEvent:
rpcAddr, err := marshalAddr(&event.Address, db)
if err != nil {
return nil, fmt.Errorf("error marshaling addr: %w", err)
}

eventRpc := taprpc.ReceiveAssetEvent_AssetReceiveCompleteEvent{
AssetReceiveCompleteEvent: &taprpc.AssetReceiveCompleteEvent{
Timestamp: event.Timestamp().UnixMicro(),
Address: rpcAddr,
Outpoint: event.OutPoint.String(),
},
}
return &taprpc.ReceiveAssetEvent{
Event: &eventRpc,
}, nil

default:
return nil, fmt.Errorf("unknown event type: %T", eventInterface)
}
Expand Down
59 changes: 59 additions & 0 deletions tapgarden/custodian.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,36 @@ import (
"github.com/lightningnetwork/lnd/lnrpc"
)

// AssetReceiveCompleteEvent is an event that is sent to a subscriber once the
// asset receive process has finished for a given address and outpoint.
type AssetReceiveCompleteEvent struct {
// timestamp is the time the event was created.
timestamp time.Time

// Address is the address associated with the asset that was received.
Address address.Tap

// OutPoint is the outpoint of the transaction that was used to receive
// the asset.
OutPoint wire.OutPoint
}

// Timestamp returns the timestamp of the event.
func (e *AssetReceiveCompleteEvent) Timestamp() time.Time {
return e.timestamp
}

// NewAssetRecvCompleteEvent creates a new AssetReceiveCompleteEvent.
func NewAssetRecvCompleteEvent(addr address.Tap,
outpoint wire.OutPoint) *AssetReceiveCompleteEvent {

return &AssetReceiveCompleteEvent{
timestamp: time.Now().UTC(),
Address: addr,
OutPoint: outpoint,
}
}

// CustodianConfig houses all the items that the Custodian needs to carry out
// its duties.
type CustodianConfig struct {
Expand Down Expand Up @@ -458,6 +488,18 @@ func (c *Custodian) inspectWalletTx(walletTx *lndclient.Transaction) error {
log.Errorf("unable to import proofs: %v", err)
return
}

// At this point the "receive" process is complete. We
// will now notify all status event subscribers.
recvCompleteEvent := NewAssetRecvCompleteEvent(
*addr, op,
)
err = c.publishSubscriberStatusEvent(recvCompleteEvent)
if err != nil {
log.Errorf("unable publish status event: %v",
err)
return
}
}()
}

Expand Down Expand Up @@ -702,6 +744,23 @@ func (c *Custodian) RegisterSubscriber(receiver *fn.EventReceiver[fn.Event],
return nil
}

// publishSubscriberStatusEvent publishes an event to all status events
// subscribers.
func (c *Custodian) publishSubscriberStatusEvent(event fn.Event) error {
// Lock the subscriber mutex to ensure that we don't modify the
// subscriber map while we're iterating over it.
c.statusEventsSubsMtx.Lock()
defer c.statusEventsSubsMtx.Unlock()

for _, sub := range c.statusEventsSubs {
if !fn.SendOrQuit(sub.NewItemCreated.ChanIn(), event, c.Quit) {
return fmt.Errorf("custodian shutting down")
}
}

return nil
}

// RemoveSubscriber removes a subscriber from the set of status event
// subscribers.
func (c *Custodian) RemoveSubscriber(
Expand Down
Loading

0 comments on commit 346d838

Please sign in to comment.