Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custodian emits a new asset-receive-complete event to notification subscribers #659

Merged
merged 4 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 65 additions & 47 deletions itest/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,10 +807,7 @@ func testReattemptFailedSendUniCourier(t *harnessTest) {
// receiving an asset proof will be reattempted by the receiving tapd node. This
// test targets the universe proof courier.
func testReattemptFailedReceiveUniCourier(t *harnessTest) {
var (
ctxb = context.Background()
wg sync.WaitGroup
)
ctxb := context.Background()

// This tapd node will send the asset to the receiving tapd node.
// It will also transfer proof the related transfer proofs to the
Expand Down Expand Up @@ -892,56 +889,47 @@ func testReattemptFailedReceiveUniCourier(t *harnessTest) {

// Test to ensure that we receive the minimum expected number of backoff
// wait event notifications.
//
// This test is executed in a goroutine to ensure that we can receive
// the event notification(s) from the tapd node as soon as the proof
// courier service is restarted.
wg.Add(1)
go func() {
defer wg.Done()

// Define a target event selector to match the backoff wait
// event. This function selects for a specific event type.
targetEventSelector := func(event *taprpc.ReceiveAssetEvent) bool {
switch eventTyped := event.Event.(type) {
case *taprpc.ReceiveAssetEvent_ProofTransferBackoffWaitEvent:
ev := eventTyped.ProofTransferBackoffWaitEvent

// We are attempting to identify receive
// transfer types. Skip the event if it is not
// a receiving transfer type.
if ev.TransferType != taprpc.ProofTransferType_PROOF_TRANSFER_TYPE_RECEIVE {
return false
}
t.Logf("Waiting for the receiving tapd node to complete backoff " +
"proof retrieval attempts")

t.Logf("Found event ntfs: %v", ev)
return true
// Define a target event selector to match the backoff wait event. This
// function selects for a specific event type.
targetEventSelector := func(event *taprpc.ReceiveAssetEvent) bool {
switch eventTyped := event.Event.(type) {
case *taprpc.ReceiveAssetEvent_ProofTransferBackoffWaitEvent:
ev := eventTyped.ProofTransferBackoffWaitEvent

// We are attempting to identify receive transfer types.
// Skip the event if it is not a receiving transfer
// type.
if ev.TransferType != taprpc.ProofTransferType_PROOF_TRANSFER_TYPE_RECEIVE {
return false
}

return false
t.Logf("Found event ntfs: %v", ev)
return true
}

// Expected minimum number of events to receive.
expectedEventCount := 3

// Context timeout scales with expected number of events.
timeout := time.Duration(expectedEventCount) *
defaultProofTransferReceiverAckTimeout
// Add overhead buffer to context timeout.
timeout += 5 * time.Second
ctx, cancel := context.WithTimeout(ctxb, timeout)
defer cancel()
return false
}

assertAssetRecvNtfsEvent(
t, ctx, eventNtfns, targetEventSelector,
expectedEventCount,
)
}()
// Expected minimum number of events to receive.
expectedEventCount := 3

// Context timeout scales with expected number of events.
timeout := time.Duration(expectedEventCount) *
defaultProofTransferReceiverAckTimeout
// Add overhead buffer to context timeout.
timeout += 5 * time.Second
ctx, cancel := context.WithTimeout(ctxb, timeout)
defer cancel()

// Assert that the receiver tapd node has accomplished our minimum
// expected number of backoff procedure receive attempts.
assertAssetRecvNtfsEvent(
t, ctx, eventNtfns, targetEventSelector, expectedEventCount,
)

// Wait for the receiver node's backoff attempts to complete.
t.Logf("Waiting for the receiving tapd node to complete backoff " +
"proof retrieval attempts")
wg.Wait()
t.Logf("Finished waiting for the receiving tapd node to complete " +
"backoff procedure")

Expand All @@ -956,6 +944,13 @@ func testReattemptFailedReceiveUniCourier(t *harnessTest) {
// proof(s).
t.Logf("Attempting to confirm asset received by receiver node")
AssertNonInteractiveRecvComplete(t.t, receiveTapd, 1)

// Confirm that the sender tapd node eventually receives the asset
// transfer and publishes an asset recv complete event.
t.Logf("Check for asset recv complete event from receiver tapd node")
assertAssetRecvCompleteEvent(
t, ctxb, 5*time.Second, recvAddr.Encoded, eventNtfns,
)
}

// testOfflineReceiverEventuallyReceives tests that a receiver node will
Expand Down Expand Up @@ -1169,6 +1164,29 @@ func assertAssetRecvNtfsEvent(t *harnessTest, ctx context.Context,
expectedCount, countFound)
}

// assertAssetRecvNtfsEvent asserts that the given asset receive complete event
// notification was received. This function will block until the event is
// received or the event stream is closed.
func assertAssetRecvCompleteEvent(t *harnessTest, ctxb context.Context,
timeout time.Duration, encodedAddr string,
eventNtfns taprpc.TaprootAssets_SubscribeReceiveAssetEventNtfnsClient) {

ctx, cancel := context.WithTimeout(ctxb, timeout)
defer cancel()

eventSelector := func(event *taprpc.ReceiveAssetEvent) bool {
switch eventTyped := event.Event.(type) {
case *taprpc.ReceiveAssetEvent_AssetReceiveCompleteEvent:
ev := eventTyped.AssetReceiveCompleteEvent
return encodedAddr == ev.Address.Encoded
default:
return false
}
}

assertAssetRecvNtfsEvent(t, ctx, eventNtfns, eventSelector, 1)
}

// testMultiInputSendNonInteractiveSingleID tests that we can properly
// non-interactively send a single asset from multiple inputs.
//
Expand Down
25 changes: 22 additions & 3 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2350,7 +2350,9 @@ func (r *rpcServer) SubscribeReceiveAssetEventNtfns(
// RPC event type and sent over the stream.
case event := <-eventSubscriber.NewItemCreated.ChanOut():

rpcEvent, err := marshallReceiveAssetEvent(event)
rpcEvent, err := marshallReceiveAssetEvent(
event, r.cfg.TapAddrBook,
)
if err != nil {
return fmt.Errorf("failed to marshall "+
"asset receive event into RPC event: "+
Expand Down Expand Up @@ -2385,8 +2387,8 @@ func (r *rpcServer) SubscribeReceiveAssetEventNtfns(
}

// marshallReceiveAssetEvent maps an asset receive event to its RPC counterpart.
func marshallReceiveAssetEvent(
eventInterface fn.Event) (*taprpc.ReceiveAssetEvent, error) {
func marshallReceiveAssetEvent(eventInterface fn.Event,
db address.Storage) (*taprpc.ReceiveAssetEvent, error) {

switch event := eventInterface.(type) {
case *proof.BackoffWaitEvent:
Expand All @@ -2413,6 +2415,23 @@ func marshallReceiveAssetEvent(
Event: &eventRpc,
}, nil

case *tapgarden.AssetReceiveCompleteEvent:
rpcAddr, err := marshalAddr(&event.Address, db)
if err != nil {
return nil, fmt.Errorf("error marshaling addr: %w", err)
}

eventRpc := taprpc.ReceiveAssetEvent_AssetReceiveCompleteEvent{
AssetReceiveCompleteEvent: &taprpc.AssetReceiveCompleteEvent{
Timestamp: event.Timestamp().UnixMicro(),
Address: rpcAddr,
Outpoint: event.OutPoint.String(),
},
}
return &taprpc.ReceiveAssetEvent{
Event: &eventRpc,
}, nil

default:
return nil, fmt.Errorf("unknown event type: %T", eventInterface)
}
Expand Down
59 changes: 59 additions & 0 deletions tapgarden/custodian.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,36 @@ import (
"github.com/lightningnetwork/lnd/lnrpc"
)

// AssetReceiveCompleteEvent is an event that is sent to a subscriber once the
// asset receive process has finished for a given address and outpoint.
type AssetReceiveCompleteEvent struct {
// timestamp is the time the event was created.
timestamp time.Time

// Address is the address associated with the asset that was received.
Address address.Tap

// OutPoint is the outpoint of the transaction that was used to receive
// the asset.
OutPoint wire.OutPoint
}

// Timestamp returns the timestamp of the event.
func (e *AssetReceiveCompleteEvent) Timestamp() time.Time {
return e.timestamp
}

// NewAssetRecvCompleteEvent creates a new AssetReceiveCompleteEvent.
func NewAssetRecvCompleteEvent(addr address.Tap,
outpoint wire.OutPoint) *AssetReceiveCompleteEvent {

return &AssetReceiveCompleteEvent{
timestamp: time.Now().UTC(),
Address: addr,
OutPoint: outpoint,
}
}

// CustodianConfig houses all the items that the Custodian needs to carry out
// its duties.
type CustodianConfig struct {
Expand Down Expand Up @@ -458,6 +488,18 @@ func (c *Custodian) inspectWalletTx(walletTx *lndclient.Transaction) error {
log.Errorf("unable to import proofs: %v", err)
return
}

// At this point the "receive" process is complete. We
// will now notify all status event subscribers.
recvCompleteEvent := NewAssetRecvCompleteEvent(
*addr, op,
)
err = c.publishSubscriberStatusEvent(recvCompleteEvent)
if err != nil {
log.Errorf("unable publish status event: %v",
err)
return
}
}()
}

Expand Down Expand Up @@ -702,6 +744,23 @@ func (c *Custodian) RegisterSubscriber(receiver *fn.EventReceiver[fn.Event],
return nil
}

// publishSubscriberStatusEvent publishes an event to all status events
// subscribers.
func (c *Custodian) publishSubscriberStatusEvent(event fn.Event) error {
// Lock the subscriber mutex to ensure that we don't modify the
// subscriber map while we're iterating over it.
c.statusEventsSubsMtx.Lock()
defer c.statusEventsSubsMtx.Unlock()

for _, sub := range c.statusEventsSubs {
if !fn.SendOrQuit(sub.NewItemCreated.ChanIn(), event, c.Quit) {
return fmt.Errorf("custodian shutting down")
}
}

return nil
}

// RemoveSubscriber removes a subscriber from the set of status event
// subscribers.
func (c *Custodian) RemoveSubscriber(
Expand Down
Loading