Skip to content

Commit

Permalink
refactor gateway bft test
Browse files Browse the repository at this point in the history
Signed-off-by: Fedor Partanskiy <[email protected]>
  • Loading branch information
pfi79 committed Jan 27, 2024
1 parent ef20365 commit 0a0ee31
Showing 1 changed file with 55 additions and 30 deletions.
85 changes: 55 additions & 30 deletions integration/gateway/gateway_bft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var _ = Describe("GatewayService with BFT ordering service", func() {
peerProcesses ifrit.Process
channel = "testchannel1"
peerGinkgoRunner []*ginkgomon.Runner
ordererRunners []*ginkgomon.Runner
)

BeforeEach(func() {
Expand All @@ -62,6 +63,7 @@ var _ = Describe("GatewayService with BFT ordering service", func() {
for _, orderer := range network.Orderers {
runner := network.OrdererRunner(orderer,
"ORDERER_GENERAL_BACKOFF_MAXDELAY=20s")
ordererRunners = append(ordererRunners, runner)
proc := ifrit.Invoke(runner)
ordererProcesses[orderer.Name] = proc
Eventually(proc.Ready(), network.EventuallyTimeout).Should(BeClosed())
Expand Down Expand Up @@ -112,28 +114,27 @@ var _ = Describe("GatewayService with BFT ordering service", func() {
})

It("Submit transaction", func() {
ctx, cancel := context.WithTimeout(context.Background(), network.EventuallyTimeout*2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

org1Peer0 := network.Peer("Org1", "peer0")
conn := network.PeerClientConn(org1Peer0)
defer conn.Close()
gw := gateway.NewGatewayClient(conn)
signer := network.PeerUserSigner(org1Peer0, "User1")

By("Submitting a new transaction")
submitRequest := prepareTransaction(ctx, gw, signer, channel, "gatewaycc", "invoke", []string{"a", "b", "10"})
_, err := gw.Submit(ctx, submitRequest)
By("Submitting a new transaction 1")
submitRequest := prepareTransaction(ctx, gw, signer, channel, "gatewaycc", "invoke", []string{"a", "b", "10"}, network.EventuallyTimeout)
err := submitWithTimeout(ctx, gw, submitRequest, network.EventuallyTimeout)
Expect(err).NotTo(HaveOccurred())

waitForCommit(ctx, gw, signer, channel, submitRequest.TransactionId)
waitForCommit(ctx, gw, signer, channel, submitRequest.TransactionId, network.EventuallyTimeout)

By("Checking the ledger state")
result := evaluateTransaction(ctx, gw, signer, channel, "gatewaycc", "query", []string{"a"})
By("Checking the ledger state 1")
result := evaluateTransaction(ctx, gw, signer, channel, "gatewaycc", "query", []string{"a"}, network.EventuallyTimeout)
Expect(result.Payload).To(Equal([]byte("90")))

By("Resubmitting the same transaction")
_, err = gw.Submit(ctx, submitRequest)
By("Resubmitting the same transaction 1")
err = submitWithTimeout(ctx, gw, submitRequest, network.EventuallyTimeout)
Expect(err).To(HaveOccurred())
rpcErr := status.Convert(err)
Expect(rpcErr.Message()).To(Equal("insufficient number of orderers could successfully process transaction to satisfy quorum requirement"))
Expand All @@ -144,24 +145,24 @@ var _ = Describe("GatewayService with BFT ordering service", func() {
ordererProcesses["orderer2"].Signal(syscall.SIGTERM)
Eventually(ordererProcesses["orderer2"].Wait(), network.EventuallyTimeout).Should(Receive())

By("Submitting a new transaction")
submitRequest = prepareTransaction(ctx, gw, signer, channel, "gatewaycc", "invoke", []string{"a", "b", "10"})
_, err = gw.Submit(ctx, submitRequest)
By("Submitting a new transaction 2")
submitRequest = prepareTransaction(ctx, gw, signer, channel, "gatewaycc", "invoke", []string{"a", "b", "10"}, network.EventuallyTimeout)
err = submitWithTimeout(ctx, gw, submitRequest, network.EventuallyTimeout)
Expect(err).NotTo(HaveOccurred())

waitForCommit(ctx, gw, signer, channel, submitRequest.TransactionId)
waitForCommit(ctx, gw, signer, channel, submitRequest.TransactionId, network.EventuallyTimeout*2)

By("Checking the ledger state")
result = evaluateTransaction(ctx, gw, signer, channel, "gatewaycc", "query", []string{"a"})
By("Checking the ledger state 2")
result = evaluateTransaction(ctx, gw, signer, channel, "gatewaycc", "query", []string{"a"}, network.EventuallyTimeout)
Expect(result.Payload).To(Equal([]byte("80")))

By("Shutting down orderer1 - no longer quorate")
ordererProcesses["orderer1"].Signal(syscall.SIGTERM)
Eventually(ordererProcesses["orderer1"].Wait(), network.EventuallyTimeout).Should(Receive())

By("Submitting a new transaction")
submitRequest = prepareTransaction(ctx, gw, signer, channel, "gatewaycc", "invoke", []string{"a", "b", "10"})
_, err = gw.Submit(ctx, submitRequest)
By("Submitting a new transaction 3")
submitRequest = prepareTransaction(ctx, gw, signer, channel, "gatewaycc", "invoke", []string{"a", "b", "10"}, network.EventuallyTimeout)
err = submitWithTimeout(ctx, gw, submitRequest, network.EventuallyTimeout)
Expect(err).To(HaveOccurred())
rpcErr = status.Convert(err)
Expect(rpcErr.Message()).To(Equal("insufficient number of orderers could successfully process transaction to satisfy quorum requirement"))
Expand All @@ -175,29 +176,33 @@ var _ = Describe("GatewayService with BFT ordering service", func() {
By("Restarting orderer2")
runner := network.OrdererRunner(network.Orderers[1],
"ORDERER_GENERAL_BACKOFF_MAXDELAY=20s")
ordererRunners[1] = runner
ordererProcesses["orderer2"] = ifrit.Invoke(runner)
Eventually(ordererProcesses["orderer2"].Ready(), network.EventuallyTimeout).Should(BeClosed())
// wait peer conecting to orderer2
Eventually(peerGinkgoRunner[0].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say(fmt.Sprintf("%s, \\{READY", needAdr)))
Eventually(ordererRunners[1].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Starting view with number 0"))
// awaiting the selection of a new leader
Eventually(ordererRunners[1].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Starting view with number"))

By("Resubmitting the same transaction")
_, err = gw.Submit(ctx, submitRequest)
By("Resubmitting the same transaction 2")
err = submitWithTimeout(ctx, gw, submitRequest, network.EventuallyTimeout)
Expect(err).NotTo(HaveOccurred())
waitForCommit(ctx, gw, signer, channel, submitRequest.TransactionId)
waitForCommit(ctx, gw, signer, channel, submitRequest.TransactionId, network.EventuallyTimeout*3)

By("Checking the ledger state")
result = evaluateTransaction(ctx, gw, signer, channel, "gatewaycc", "query", []string{"a"})
By("Checking the ledger state 3")
result = evaluateTransaction(ctx, gw, signer, channel, "gatewaycc", "query", []string{"a"}, network.EventuallyTimeout)
Expect(result.Payload).To(Equal([]byte("70")))

By("Submitting a new transaction")
submitRequest = prepareTransaction(ctx, gw, signer, channel, "gatewaycc", "invoke", []string{"a", "b", "10"})
_, err = gw.Submit(ctx, submitRequest)
By("Submitting a new transaction 4")
submitRequest = prepareTransaction(ctx, gw, signer, channel, "gatewaycc", "invoke", []string{"a", "b", "10"}, network.EventuallyTimeout)
err = submitWithTimeout(ctx, gw, submitRequest, network.EventuallyTimeout)
Expect(err).NotTo(HaveOccurred())

waitForCommit(ctx, gw, signer, channel, submitRequest.TransactionId)
waitForCommit(ctx, gw, signer, channel, submitRequest.TransactionId, network.EventuallyTimeout)

By("Checking the ledger state")
result = evaluateTransaction(ctx, gw, signer, channel, "gatewaycc", "query", []string{"a"})
By("Checking the ledger state 4")
result = evaluateTransaction(ctx, gw, signer, channel, "gatewaycc", "query", []string{"a"}, network.EventuallyTimeout)
Expect(result.Payload).To(Equal([]byte("60")))
})
})
Expand All @@ -224,6 +229,14 @@ func scanAddrGRPCconnectStructInLog(data []byte, listenPort uint16) string {
return string(data[start : start+12])
}

func submitWithTimeout(ctx context.Context, gw gateway.GatewayClient, submitRequest *gateway.SubmitRequest, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

_, err := gw.Submit(ctx, submitRequest)
return err
}

func prepareTransaction(
ctx context.Context,
gatewayClient gateway.GatewayClient,
Expand All @@ -232,6 +245,7 @@ func prepareTransaction(
chaincode string,
transactionName string,
arguments []string,
timeout time.Duration,
) *gateway.SubmitRequest {
args := [][]byte{}
for _, arg := range arguments {
Expand All @@ -252,6 +266,9 @@ func prepareTransaction(
ProposedTransaction: proposedTransaction,
}

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

endorseResponse, err := gatewayClient.Endorse(ctx, endorseRequest)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -272,6 +289,7 @@ func waitForCommit(
signer *nwo.SigningIdentity,
channel string,
transactionId string,
timeout time.Duration,
) {
idBytes, err := signer.Serialize()
Expect(err).NotTo(HaveOccurred())
Expand All @@ -292,6 +310,9 @@ func waitForCommit(
Signature: signature,
}

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

statusResponse, err := gatewayClient.CommitStatus(ctx, signedStatusRequest)
Expect(err).NotTo(HaveOccurred())
Expect(statusResponse.Result).To(Equal(peer.TxValidationCode_VALID))
Expand All @@ -305,6 +326,7 @@ func evaluateTransaction(
chaincode string,
transactionName string,
arguments []string,
timeout time.Duration,
) *peer.Response {
args := [][]byte{}
for _, arg := range arguments {
Expand All @@ -325,6 +347,9 @@ func evaluateTransaction(
ProposedTransaction: proposedTransaction,
}

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

evaluateResponse, err := gatewayClient.Evaluate(ctx, evaluateRequest)
Expect(err).NotTo(HaveOccurred())

Expand Down

0 comments on commit 0a0ee31

Please sign in to comment.