From 80fc8a63d53ca41cb9b19a8c20cae2d9450a9295 Mon Sep 17 00:00:00 2001 From: Andrii Date: Mon, 23 Sep 2024 17:38:19 +0300 Subject: [PATCH 1/3] Added new endpoint with test cases for it --- access/grpc/client.go | 7 +++ access/grpc/grpc.go | 60 ++++++++++++++++++++- access/grpc/grpc_test.go | 113 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 179 insertions(+), 1 deletion(-) diff --git a/access/grpc/client.go b/access/grpc/client.go index 746bddae3..23f84682b 100644 --- a/access/grpc/client.go +++ b/access/grpc/client.go @@ -196,6 +196,13 @@ func (c *Client) GetTransactionResultsByBlockID(ctx context.Context, blockID flo return c.grpc.GetTransactionResultsByBlockID(ctx, blockID) } +func (c *Client) SendAndSubscribeTransactionStatuses( + ctx context.Context, + tx flow.Transaction, +) (<-chan flow.TransactionStatus, <-chan error, error) { + return c.grpc.SendAndSubscribeTransactionStatuses(ctx, tx) +} + func (c *Client) GetAccount(ctx context.Context, address flow.Address) (*flow.Account, error) { return c.grpc.GetAccount(ctx, address) } diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 9c547b25b..baff35324 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -811,7 +811,6 @@ func (c *BaseClient) GetExecutionDataByBlockID( } return convert.MessageToBlockExecutionData(ed.GetBlockExecutionData()) - } func (c *BaseClient) SubscribeExecutionDataByBlockID( @@ -993,3 +992,62 @@ func (c *BaseClient) subscribeEvents( return sub, errChan, nil } + +func (c *BaseClient) SendAndSubscribeTransactionStatuses( + ctx context.Context, + tx flow.Transaction, + opts ...grpc.CallOption, +) (<-chan flow.TransactionStatus, <-chan error, error) { + txMsg, err := convert.TransactionToMessage(tx) + if err != nil { + return nil, nil, newEntityToMessageError(entityTransaction, err) + } + + req := &access.SendAndSubscribeTransactionStatusesRequest{ + Transaction: txMsg, + EventEncodingVersion: c.eventEncoding, + } + + subscribeClient, err := c.rpcClient.SendAndSubscribeTransactionStatuses(ctx, req, opts...) + if err != nil { + return nil, nil, newRPCError(err) + } + + txStatusChan := make(chan flow.TransactionStatus) + errChan := make(chan error) + + sendErr := func(err error) { + select { + case <-ctx.Done(): + case errChan <- err: + } + } + + go func() { + defer close(txStatusChan) + defer close(errChan) + + for { + // Receive the next txStatus response + txStatusResponse, err := subscribeClient.Recv() + if err != nil { + if err == io.EOF { + // End of stream, return gracefully + return + } + sendErr(fmt.Errorf("error receiving blockHeader: %w", err)) + return + } + + txStatus := flow.TransactionStatus(txStatusResponse.GetTransactionResults().Status) + + select { + case <-ctx.Done(): + return + case txStatusChan <- txStatus: + } + } + }() + + return txStatusChan, errChan, nil +} diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index 1e9a077ff..a92be2885 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -1969,3 +1969,116 @@ func (m *mockExecutionDataStream) Recv() (*executiondata.SubscribeExecutionDataR return m.responses[m.offset], nil } + +func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { + transactions := test.TransactionGenerator() + results := test.TransactionResultGenerator(flow.EventEncodingVersionCCF) + + generateTransactionStatusResponses := func(count uint64) []*access.SendAndSubscribeTransactionStatusesResponse { + var resTransactionStatuses []*access.SendAndSubscribeTransactionStatusesResponse + + for i := uint64(0); i < count; i++ { + expectedResult := results.New() + transactionResult, _ := convert.TransactionResultToMessage(expectedResult, flow.EventEncodingVersionCCF) + + response := &access.SendAndSubscribeTransactionStatusesResponse{ + TransactionResults: transactionResult, + } + + resTransactionStatuses = append(resTransactionStatuses, response) + } + + return resTransactionStatuses + } + + t.Run("Happy Path", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + responseCount := uint64(100) + tx := transactions.New() + + ctx, cancel := context.WithCancel(ctx) + stream := &mockTransactionStatusesClientStream{ + ctx: ctx, + responses: generateTransactionStatusResponses(responseCount), + } + + rpc.On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything).Return(stream, nil) + + txStatusesCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualTxStatus := <-txStatusesCh + expectedTxStatus := flow.TransactionStatus(stream.responses[i].GetTransactionResults().Status) + require.Equal(t, expectedTxStatus, actualTxStatus) + } + cancel() + + wg.Wait() + })) + + t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + ctx, cancel := context.WithCancel(ctx) + stream := &mockTransactionStatusesClientStream{ + ctx: ctx, + err: status.Error(codes.Internal, "internal error"), + } + + rpc. + On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything). + Return(stream, nil) + + txStatusChan, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, flow.Transaction{}) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoTxStatuses(t, txStatusChan, wg.Done) + + errorCount := 0 + for e := range errCh { + require.Error(t, e) + require.ErrorIs(t, e, stream.err) + errorCount += 1 + } + cancel() + + require.Equalf(t, 1, errorCount, "only 1 error is expected") + + wg.Wait() + })) + +} + +type mockTransactionStatusesClientStream struct { + grpc.ClientStream + + ctx context.Context + err error + offset int + responses []*access.SendAndSubscribeTransactionStatusesResponse +} + +func (m *mockTransactionStatusesClientStream) Recv() (*access.SendAndSubscribeTransactionStatusesResponse, error) { + if m.err != nil { + return nil, m.err + } + + if m.offset >= len(m.responses) { + <-m.ctx.Done() + return nil, io.EOF + } + defer func() { m.offset++ }() + + return m.responses[m.offset], nil +} + +func assertNoTxStatuses[TxStatus any](t *testing.T, txStatusChan <-chan TxStatus, done func()) { + defer done() + for range txStatusChan { + require.FailNow(t, "should not receive txStatus") + } +} From df00aa29188fa8a4398bbb7dbd3af948b549a22a Mon Sep 17 00:00:00 2001 From: Andrii Date: Wed, 25 Sep 2024 12:53:49 +0300 Subject: [PATCH 2/3] Added test for different encoding types --- access/grpc/grpc_test.go | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index a92be2885..4751c45fb 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -1972,14 +1972,14 @@ func (m *mockExecutionDataStream) Recv() (*executiondata.SubscribeExecutionDataR func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { transactions := test.TransactionGenerator() - results := test.TransactionResultGenerator(flow.EventEncodingVersionCCF) - generateTransactionStatusResponses := func(count uint64) []*access.SendAndSubscribeTransactionStatusesResponse { + generateTransactionStatusResponses := func(count uint64, encodingVersion flow.EventEncodingVersion) []*access.SendAndSubscribeTransactionStatusesResponse { var resTransactionStatuses []*access.SendAndSubscribeTransactionStatusesResponse + results := test.TransactionResultGenerator(encodingVersion) for i := uint64(0); i < count; i++ { expectedResult := results.New() - transactionResult, _ := convert.TransactionResultToMessage(expectedResult, flow.EventEncodingVersionCCF) + transactionResult, _ := convert.TransactionResultToMessage(expectedResult, encodingVersion) response := &access.SendAndSubscribeTransactionStatusesResponse{ TransactionResults: transactionResult, @@ -1991,14 +1991,43 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { return resTransactionStatuses } - t.Run("Happy Path", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + t.Run("Happy Path - CCF", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { responseCount := uint64(100) tx := transactions.New() ctx, cancel := context.WithCancel(ctx) stream := &mockTransactionStatusesClientStream{ ctx: ctx, - responses: generateTransactionStatusResponses(responseCount), + responses: generateTransactionStatusResponses(responseCount, flow.EventEncodingVersionCCF), + } + + rpc.On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything).Return(stream, nil) + + txStatusesCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go assertNoErrors(t, errCh, wg.Done) + + for i := uint64(0); i < responseCount; i++ { + actualTxStatus := <-txStatusesCh + expectedTxStatus := flow.TransactionStatus(stream.responses[i].GetTransactionResults().Status) + require.Equal(t, expectedTxStatus, actualTxStatus) + } + cancel() + + wg.Wait() + })) + + t.Run("Happy Path - JSON-CDC", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + responseCount := uint64(100) + tx := transactions.New() + + ctx, cancel := context.WithCancel(ctx) + stream := &mockTransactionStatusesClientStream{ + ctx: ctx, + responses: generateTransactionStatusResponses(responseCount, flow.EventEncodingVersionJSONCDC), } rpc.On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything).Return(stream, nil) From 64d91e20069e4db0902ab1fe1d90fe5980df4721 Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 27 Sep 2024 14:37:51 +0300 Subject: [PATCH 3/3] Changed endpoint to return the whole tx result, added msg index check --- access/grpc/client.go | 2 +- access/grpc/grpc.go | 27 +++++++++++++++++++------- access/grpc/grpc_test.go | 42 ++++++++++++++++++++++++++-------------- 3 files changed, 48 insertions(+), 23 deletions(-) diff --git a/access/grpc/client.go b/access/grpc/client.go index e5fcebb32..6cfcff0c2 100644 --- a/access/grpc/client.go +++ b/access/grpc/client.go @@ -207,7 +207,7 @@ func (c *Client) GetTransactionResultsByBlockID(ctx context.Context, blockID flo func (c *Client) SendAndSubscribeTransactionStatuses( ctx context.Context, tx flow.Transaction, -) (<-chan flow.TransactionStatus, <-chan error, error) { +) (<-chan flow.TransactionResult, <-chan error, error) { return c.grpc.SendAndSubscribeTransactionStatuses(ctx, tx) } diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 8ca586b5d..d3bfe99e7 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -1133,7 +1133,7 @@ func (c *BaseClient) SendAndSubscribeTransactionStatuses( ctx context.Context, tx flow.Transaction, opts ...grpc.CallOption, -) (<-chan flow.TransactionStatus, <-chan error, error) { +) (<-chan flow.TransactionResult, <-chan error, error) { txMsg, err := convert.TransactionToMessage(tx) if err != nil { return nil, nil, newEntityToMessageError(entityTransaction, err) @@ -1149,7 +1149,7 @@ func (c *BaseClient) SendAndSubscribeTransactionStatuses( return nil, nil, newRPCError(err) } - txStatusChan := make(chan flow.TransactionStatus) + txStatusChan := make(chan flow.TransactionResult) errChan := make(chan error) sendErr := func(err error) { @@ -1163,24 +1163,37 @@ func (c *BaseClient) SendAndSubscribeTransactionStatuses( defer close(txStatusChan) defer close(errChan) + messageIndex := uint64(0) + for { - // Receive the next txStatus response - txStatusResponse, err := subscribeClient.Recv() + // Receive the next txResult response + txResultsResponse, err := subscribeClient.Recv() if err != nil { if err == io.EOF { // End of stream, return gracefully return } - sendErr(fmt.Errorf("error receiving blockHeader: %w", err)) + sendErr(fmt.Errorf("error receiving transaction result: %w", err)) + return + } + + if messageIndex != txResultsResponse.GetMessageIndex() { + sendErr(fmt.Errorf("tx result response was lost")) + return + } + + txResult, err := convert.MessageToTransactionResult(txResultsResponse.GetTransactionResults(), c.jsonOptions) + if err != nil { + sendErr(fmt.Errorf("error converting transaction result: %w", err)) return } - txStatus := flow.TransactionStatus(txStatusResponse.GetTransactionResults().Status) + messageIndex++ select { case <-ctx.Done(): return - case txStatusChan <- txStatus: + case txStatusChan <- txResult: } } }() diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index c596f378a..e67052de0 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -2154,7 +2154,7 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { transactions := test.TransactionGenerator() generateTransactionStatusResponses := func(count uint64, encodingVersion flow.EventEncodingVersion) []*access.SendAndSubscribeTransactionStatusesResponse { - var resTransactionStatuses []*access.SendAndSubscribeTransactionStatusesResponse + var resTransactionResults []*access.SendAndSubscribeTransactionStatusesResponse results := test.TransactionResultGenerator(encodingVersion) for i := uint64(0); i < count; i++ { @@ -2163,12 +2163,13 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { response := &access.SendAndSubscribeTransactionStatusesResponse{ TransactionResults: transactionResult, + MessageIndex: i, } - resTransactionStatuses = append(resTransactionStatuses, response) + resTransactionResults = append(resTransactionResults, response) } - return resTransactionStatuses + return resTransactionResults } t.Run("Happy Path - CCF", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { @@ -2183,17 +2184,23 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { rpc.On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything).Return(stream, nil) - txStatusesCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx) + txResultCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx) require.NoError(t, err) wg := sync.WaitGroup{} wg.Add(1) go assertNoErrors(t, errCh, wg.Done) + expectedCounter := uint64(0) + for i := uint64(0); i < responseCount; i++ { - actualTxStatus := <-txStatusesCh - expectedTxStatus := flow.TransactionStatus(stream.responses[i].GetTransactionResults().Status) - require.Equal(t, expectedTxStatus, actualTxStatus) + actualTxResult := <-txResultCh + expectedTxResult, err := convert.MessageToTransactionResult(stream.responses[i].GetTransactionResults(), DefaultClientOptions().jsonOptions) + require.NoError(t, err) + require.Equal(t, expectedTxResult, actualTxResult) + require.Equal(t, expectedCounter, stream.responses[i].MessageIndex) + + expectedCounter++ } cancel() @@ -2212,17 +2219,22 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { rpc.On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything).Return(stream, nil) - txStatusesCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx) + txResultCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx) require.NoError(t, err) wg := sync.WaitGroup{} wg.Add(1) go assertNoErrors(t, errCh, wg.Done) + expectedCounter := uint64(0) for i := uint64(0); i < responseCount; i++ { - actualTxStatus := <-txStatusesCh - expectedTxStatus := flow.TransactionStatus(stream.responses[i].GetTransactionResults().Status) - require.Equal(t, expectedTxStatus, actualTxStatus) + actualTxResult := <-txResultCh + expectedTxResult, err := convert.MessageToTransactionResult(stream.responses[i].GetTransactionResults(), DefaultClientOptions().jsonOptions) + require.NoError(t, err) + require.Equal(t, expectedTxResult, actualTxResult) + require.Equal(t, expectedCounter, stream.responses[i].MessageIndex) + + expectedCounter++ } cancel() @@ -2240,12 +2252,12 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything). Return(stream, nil) - txStatusChan, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, flow.Transaction{}) + txResultChan, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, flow.Transaction{}) require.NoError(t, err) wg := sync.WaitGroup{} wg.Add(1) - go assertNoTxStatuses(t, txStatusChan, wg.Done) + go assertNoTxResults(t, txResultChan, wg.Done) errorCount := 0 for e := range errCh { @@ -2285,9 +2297,9 @@ func (m *mockTransactionStatusesClientStream) Recv() (*access.SendAndSubscribeTr return m.responses[m.offset], nil } -func assertNoTxStatuses[TxStatus any](t *testing.T, txStatusChan <-chan TxStatus, done func()) { +func assertNoTxResults[TxStatus any](t *testing.T, txResultChan <-chan TxStatus, done func()) { defer done() - for range txStatusChan { + for range txResultChan { require.FailNow(t, "should not receive txStatus") } }