Skip to content

Commit

Permalink
[SDP-1397] Circle Payouts pt2: error handling & retries (#491)
Browse files Browse the repository at this point in the history
### What

Errors in the POST /recipient endpoint are not being properly handled. What;s missing?
- When POST /recipient fails with `ErrCircleRecipientCreationFailedTooManyTimes`, we update the payments table with the failed status explaining the reason.
- When a payment that failed due to recipient creation issues is retried, the recipient is reset with `status=NULL` and `sync_attempts=0`.
- The method `ensureRecipientIsReady` was split into different sub-methods for better modularity and clarity, as suggested in #486 (comment).

<img width="1151" alt="Screenshot 2024-12-12 at 4 39 08 PM" src="https://github.com/user-attachments/assets/53ed59d9-bba0-42e2-b149-0fca3bfb5a66" />

- [x] Tested with scheduled jobs
- [x] Tested with Kafka

### Why

To wrap up https://stellarorg.atlassian.net/browse/SDP-1397
  • Loading branch information
marcelosalloum authored Dec 13, 2024
1 parent 38b91ca commit f9e2520
Show file tree
Hide file tree
Showing 6 changed files with 569 additions and 73 deletions.
35 changes: 35 additions & 0 deletions internal/data/circle_recipient.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

"github.com/lib/pq"

"github.com/stellar/stellar-disbursement-platform-backend/db"
)

Expand Down Expand Up @@ -123,6 +125,39 @@ func (m CircleRecipientModel) Insert(ctx context.Context, receiverWalletID strin
return &circleRecipient, nil
}

// ResetRecipientsForRetryIfNeeded resets the status of the circle recipients for the given payment IDs to NULL if the status is not active.
func (m CircleRecipientModel) ResetRecipientsForRetryIfNeeded(ctx context.Context, sqlExec db.SQLExecuter, paymentIDs ...string) ([]*CircleRecipient, error) {
if len(paymentIDs) == 0 {
return nil, fmt.Errorf("at least one payment ID is required: %w", ErrMissingInput)
}

const query = `
UPDATE
circle_recipients
SET
status = NULL,
sync_attempts = 0,
last_sync_attempt_at = NULL,
response_body = NULL
WHERE
receiver_wallet_id IN (
SELECT DISTINCT receiver_wallet_id
FROM payments
WHERE id = ANY($1)
)
AND (status != $2 OR status IS NULL)
RETURNING
` + circleRecipientFields

var updatedRecipients []*CircleRecipient
err := sqlExec.SelectContext(ctx, &updatedRecipients, query, pq.Array(paymentIDs), CircleRecipientStatusActive)
if err != nil {
return nil, fmt.Errorf("getting context: %w", err)
}

return updatedRecipients, nil
}

func (m CircleRecipientModel) Update(ctx context.Context, receiverWalletID string, update CircleRecipientUpdate) (*CircleRecipient, error) {
if receiverWalletID == "" {
return nil, fmt.Errorf("receiverWalletID is required")
Expand Down
175 changes: 175 additions & 0 deletions internal/data/circle_recipient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package data

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -159,3 +160,177 @@ func Test_CircleRecipientModel_GetByReceiverWalletID(t *testing.T) {
assert.Equal(t, insertedCircleRecipient, fetchedCircleRecipient)
})
}

func Test_CircleRecipientModel_ResetRecipientsForRetryIfNeeded(t *testing.T) {
dbt := dbtest.Open(t)
defer dbt.Close()
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()

ctx := context.Background()

m, err := NewModels(dbConnectionPool)
require.NoError(t, err)

asset := CreateAssetFixture(t, ctx, dbConnectionPool, "FOO1", "GA5ZSEJYB37JRC5AVCIA5MOP4RHTM335X2KGX3IHOJAPP5RE34K4KZVV")
receiver := CreateReceiverFixture(t, ctx, dbConnectionPool, &Receiver{})

walletA := CreateWalletFixture(t, ctx, dbConnectionPool, "walletA", "https://www.a.com", "www.a.com", "a://")
disbursementA := CreateDisbursementFixture(t, ctx, dbConnectionPool, m.Disbursements, &Disbursement{
Wallet: walletA,
Status: ReadyDisbursementStatus,
Asset: asset,
ReceiverRegistrationMessageTemplate: "Disbursement SMS Registration Message Template A1",
})
rwA := CreateReceiverWalletFixture(t, ctx, dbConnectionPool, receiver.ID, walletA.ID, RegisteredReceiversWalletStatus)
paymentA1 := CreatePaymentFixture(t, ctx, dbConnectionPool, m.Payment, &Payment{
ReceiverWallet: rwA,
Disbursement: disbursementA,
Asset: *asset,
Status: ReadyPaymentStatus,
Amount: "100",
})
paymentA2 := CreatePaymentFixture(t, ctx, dbConnectionPool, m.Payment, &Payment{
ReceiverWallet: rwA,
Disbursement: disbursementA,
Asset: *asset,
Status: ReadyPaymentStatus,
Amount: "100",
})

walletB := CreateWalletFixture(t, ctx, dbConnectionPool, "walletB", "https://www.b.com", "www.b.com", "b://")
disbursementB := CreateDisbursementFixture(t, ctx, dbConnectionPool, m.Disbursements, &Disbursement{
Wallet: walletB,
Status: ReadyDisbursementStatus,
Asset: asset,
ReceiverRegistrationMessageTemplate: "Disbursement SMS Registration Message Template A1",
})
rwB := CreateReceiverWalletFixture(t, ctx, dbConnectionPool, receiver.ID, walletB.ID, RegisteredReceiversWalletStatus)
paymentB1 := CreatePaymentFixture(t, ctx, dbConnectionPool, m.Payment, &Payment{
ReceiverWallet: rwB,
Disbursement: disbursementB,
Asset: *asset,
Status: ReadyPaymentStatus,
Amount: "100",
})
paymentB2 := CreatePaymentFixture(t, ctx, dbConnectionPool, m.Payment, &Payment{
ReceiverWallet: rwB,
Disbursement: disbursementA,
Asset: *asset,
Status: ReadyPaymentStatus,
Amount: "100",
})

t.Run("🔴 fails if no Payment IDs are provided", func(t *testing.T) {
circleRecipients, err := m.CircleRecipient.ResetRecipientsForRetryIfNeeded(ctx, dbConnectionPool)
require.ErrorContains(t, err, "at least one payment ID is required")
require.Nil(t, circleRecipients)
})

now := time.Now()
type TestCases struct {
name string
prepareFixturesFn func(t *testing.T, models *Models)
paymentIDs []string
wantUpdatedIDs []string
}
testCases := []TestCases{
{
name: "🟡 nothing happens if no paymentIDs are found",
paymentIDs: []string{"non-existent", "non-existent-2"},
wantUpdatedIDs: []string{},
},
{
name: "🟡 nothing happens if no circle recipients are found for the given payment IDs",
paymentIDs: []string{paymentA1.ID, paymentA2.ID, paymentB1.ID, paymentB2.ID},
wantUpdatedIDs: []string{},
},
{
name: "🟢 only 'paymentA*' related recipients were updated",
prepareFixturesFn: func(t *testing.T, models *Models) {
for _, rwID := range []string{rwA.ID, rwB.ID} {
CreateCircleRecipientFixture(t, ctx, dbConnectionPool, CircleRecipient{
IdempotencyKey: uuid.NewString(),
ReceiverWalletID: rwID,
UpdatedAt: now,
CreatedAt: now,
Status: CircleRecipientStatusDenied,
})
}
},
paymentIDs: []string{paymentA1.ID, paymentA2.ID},
wantUpdatedIDs: []string{rwA.ID},
},
{
name: "🟢 only 'payment*1' related recipients were updated",
prepareFixturesFn: func(t *testing.T, models *Models) {
for _, rwID := range []string{rwA.ID, rwB.ID} {
CreateCircleRecipientFixture(t, ctx, dbConnectionPool, CircleRecipient{
IdempotencyKey: uuid.NewString(),
ReceiverWalletID: rwID,
UpdatedAt: now,
CreatedAt: now,
Status: CircleRecipientStatusDenied,
})
}
},
paymentIDs: []string{paymentA1.ID, paymentB1.ID},
wantUpdatedIDs: []string{rwA.ID, rwB.ID},
},
}

for _, nonActiveStatus := range []CircleRecipientStatus{
CircleRecipientStatusDenied,
CircleRecipientStatusInactive,
CircleRecipientStatusPending,
"",
} {
testCases = append(testCases, TestCases{
name: fmt.Sprintf("🟢 all recipients were updated [status=%s]", nonActiveStatus),
prepareFixturesFn: func(t *testing.T, models *Models) {
for _, rwID := range []string{rwA.ID, rwB.ID} {
CreateCircleRecipientFixture(t, ctx, dbConnectionPool, CircleRecipient{
IdempotencyKey: uuid.NewString(),
ReceiverWalletID: rwID,
UpdatedAt: now,
CreatedAt: now,
Status: nonActiveStatus,
})
}
},
paymentIDs: []string{paymentA1.ID, paymentA2.ID, paymentB1.ID, paymentB2.ID},
wantUpdatedIDs: []string{rwA.ID, rwB.ID},
})
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer DeleteAllCircleRecipientsFixtures(t, ctx, m.DBConnectionPool)
if tc.prepareFixturesFn != nil {
tc.prepareFixturesFn(t, m)
}

circleRecipients, err := m.CircleRecipient.ResetRecipientsForRetryIfNeeded(ctx, dbConnectionPool, tc.paymentIDs...)
require.NoError(t, err)

updatedIDs := make([]string, 0, len(circleRecipients))
for _, cr := range circleRecipients {
updatedIDs = append(updatedIDs, cr.ReceiverWalletID)
}
require.ElementsMatch(t, tc.wantUpdatedIDs, updatedIDs)

if len(updatedIDs) > 0 {
for _, rwID := range updatedIDs {
circleRecipient, err := m.CircleRecipient.GetByReceiverWalletID(ctx, rwID)
require.NoError(t, err)
require.NotNil(t, circleRecipient)
assert.Empty(t, circleRecipient.Status)
assert.Empty(t, circleRecipient.SyncAttempts)
assert.Empty(t, circleRecipient.LastSyncAttemptAt)
assert.Empty(t, circleRecipient.ResponseBody)
}
}
})
}
}
11 changes: 11 additions & 0 deletions internal/serve/httphandler/payments_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stellar/stellar-disbursement-platform-backend/internal/transactionsubmission/engine/signing"
"github.com/stellar/stellar-disbursement-platform-backend/internal/utils"
"github.com/stellar/stellar-disbursement-platform-backend/stellar-auth/pkg/auth"
"github.com/stellar/stellar-disbursement-platform-backend/stellar-multitenant/pkg/tenant"
)

type PaymentsHandler struct {
Expand Down Expand Up @@ -178,6 +179,16 @@ func (p PaymentsHandler) RetryPayments(rw http.ResponseWriter, req *http.Request
return nil, fmt.Errorf("retrying failed payments: %w", err)
}

var tnt *tenant.Tenant
if tnt, err = tenant.GetTenantFromContext(ctx); err != nil {
return nil, fmt.Errorf("getting tenant from context: %w", err)
} else if tnt.DistributionAccountType.IsCircle() {
_, err = p.Models.CircleRecipient.ResetRecipientsForRetryIfNeeded(ctx, dbTx, reqBody.PaymentIDs...)
if err != nil {
return nil, fmt.Errorf("resetting circle recipients for retry if needed: %w", err)
}
}

// Producing event to send ready payments to TSS
var payments []*data.Payment
payments, err = p.Models.Payment.GetReadyByID(ctx, dbTx, reqBody.PaymentIDs...)
Expand Down
40 changes: 26 additions & 14 deletions internal/serve/httphandler/payments_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,8 @@ func Test_PaymentHandler_RetryPayments(t *testing.T) {

t.Run("successfully retries failed circle payment", func(t *testing.T) {
data.DeleteAllPaymentsFixtures(t, ctx, dbConnectionPool)
data.DeleteAllCircleRecipientsFixtures(t, ctx, dbConnectionPool)
data.DeleteAllCircleTransferRequests(t, ctx, dbConnectionPool)

failedPayment := data.CreatePaymentFixture(t, ctx, dbConnectionPool, models.Payment, &data.Payment{
Amount: "1",
Expand All @@ -1129,22 +1131,31 @@ func Test_PaymentHandler_RetryPayments(t *testing.T) {
ReceiverWallet: receiverWallet,
Asset: *asset,
})
circleRecipient := data.CreateCircleRecipientFixture(t, ctx, dbConnectionPool, data.CircleRecipient{
ReceiverWalletID: receiverWallet.ID,
Status: data.CircleRecipientStatusDenied,
CircleRecipientID: "circle-recipient-id-1",
SyncAttempts: 5,
LastSyncAttemptAt: time.Now(),
})

ctx = context.WithValue(ctx, middleware.TokenContextKey, "mytoken")
circleTnt := tenant.Tenant{ID: "tenant-id", DistributionAccountType: schema.DistributionAccountCircleDBVault}
circleCtx := tenant.SaveTenantInContext(context.Background(), &circleTnt)
circleCtx = context.WithValue(circleCtx, middleware.TokenContextKey, "mytoken")

payload := strings.NewReader(fmt.Sprintf(`{ "payment_ids": [%q] } `, failedPayment.ID))
req, err := http.NewRequestWithContext(ctx, http.MethodPatch, "/retry", payload)
req, err := http.NewRequestWithContext(circleCtx, http.MethodPatch, "/retry", payload)
require.NoError(t, err)

// Prepare the handler and its mocks
authManagerMock := auth.NewAuthManagerMock(t)
authManagerMock.
On("GetUser", ctx, "mytoken").
On("GetUser", circleCtx, "mytoken").
Return(&auth.User{Email: "[email protected]"}, nil).
Once()
eventProducerMock := events.NewMockProducer(t)
eventProducerMock.
On("WriteMessages", ctx, []events.Message{
On("WriteMessages", circleCtx, []events.Message{
{
Topic: events.CirclePaymentReadyToPayTopic,
Key: tnt.ID,
Expand Down Expand Up @@ -1175,20 +1186,27 @@ func Test_PaymentHandler_RetryPayments(t *testing.T) {

rw := httptest.NewRecorder()
http.HandlerFunc(handler.RetryPayments).ServeHTTP(rw, req)

resp := rw.Result()
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
require.NoError(t, err)

// Assert response
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.JSONEq(t, `{"message": "Payments retried successfully"}`, string(respBody))

previouslyFailedPayment, err := models.Payment.Get(ctx, failedPayment.ID, dbConnectionPool)
// Assert payment status
previouslyFailedPayment, err := models.Payment.Get(circleCtx, failedPayment.ID, dbConnectionPool)
require.NoError(t, err)

assert.Equal(t, data.ReadyPaymentStatus, previouslyFailedPayment.Status)

// Assert circle transfer request status
circleRecipient, err = models.CircleRecipient.GetByReceiverWalletID(circleCtx, circleRecipient.ReceiverWalletID)
require.NoError(t, err)
assert.Empty(t, circleRecipient.Status)
assert.Empty(t, circleRecipient.SyncAttempts)
assert.Empty(t, circleRecipient.LastSyncAttemptAt)
assert.Empty(t, circleRecipient.ResponseBody)
})

t.Run("returns error when tenant is not in the context", func(t *testing.T) {
Expand Down Expand Up @@ -1231,10 +1249,6 @@ func Test_PaymentHandler_RetryPayments(t *testing.T) {
Return(&auth.User{Email: "[email protected]"}, nil).
Once()
distAccountResolverMock := sigMocks.NewMockDistributionAccountResolver(t)
distAccountResolverMock.
On("DistributionAccountFromContext", mock.Anything).
Return(schema.TransactionAccount{Type: schema.DistributionAccountStellarEnv}, nil).
Once()
handler := PaymentsHandler{
Models: models,
DBConnectionPool: dbConnectionPool,
Expand All @@ -1244,10 +1258,8 @@ func Test_PaymentHandler_RetryPayments(t *testing.T) {

rw := httptest.NewRecorder()
http.HandlerFunc(handler.RetryPayments).ServeHTTP(rw, req)

resp := rw.Result()
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
require.NoError(t, err)

Expand Down
Loading

0 comments on commit f9e2520

Please sign in to comment.