Skip to content

Commit

Permalink
[Redelegation] fix: EventRedelegation unmarshaling failure (#435)
Browse files Browse the repository at this point in the history
Co-authored-by: Dmitry K <[email protected]>
  • Loading branch information
red-0ne and okdas authored Mar 21, 2024
1 parent c77435d commit 1c13536
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 83 deletions.
47 changes: 15 additions & 32 deletions .github/workflows-helpers/run-e2e-test.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Enhanced Script for Debugging and Error Handling

# Log environment variables for debugging
echo "Environment variables:"
echo "NAMESPACE: ${NAMESPACE}"
Expand All @@ -13,34 +11,23 @@ while :; do
# Log the command
echo "Running kubectl command to get pods with matching purpose=validator:"

# Get all pods with the matching purpose
PODS_JSON=$(kubectl get pods -n "${NAMESPACE}" -l pokt.network/purpose=validator -o json)

# Log the raw output for debugging
echo "${PODS_JSON}"

# Validate JSON output
if ! echo "${PODS_JSON}" | jq empty; then
echo "Error: kubectl command did not produce valid JSON."
exit 1
fi

# Check if any pods are running and have the correct image SHA
READY_POD=$(echo "${PODS_JSON}" | jq -r ".items[] | select(.status.phase == \"Running\") | select(.spec.containers[].image | contains(\"${IMAGE_TAG}\")) | .metadata.name")
READY_POD=$(kubectl get pods -n "${NAMESPACE}" -l pokt.network/purpose=validator -o json | jq -r ".items[] | select(.status.phase == \"Running\") | select(any(.spec.containers[]; .image | contains(\"${IMAGE_TAG}\"))) | .metadata.name")

# Check for non-running pods with incorrect image SHA to delete
NON_RUNNING_PODS=$(echo "${PODS_JSON}" | jq -r ".items[] | select(.status.phase != \"Running\") | .metadata.name")
INCORRECT_POD=$(echo "${NON_RUNNING_PODS}" | jq -r "select(.spec.containers[].image | contains(\"${IMAGE_TAG}\") | not) | .metadata.name")
kubectl get pods -n "${NAMESPACE}" -l pokt.network/purpose=validator -o json | jq -r ".items[] | select(.status.phase != \"Running\") | select(any(.spec.containers[]; .image | contains(\"${IMAGE_TAG}\") | not)) | .metadata.name" | while read INCORRECT_POD; do
if [[ -n "${INCORRECT_POD}" ]]; then
echo "Non-ready pod with incorrect image found: ${INCORRECT_POD}. Deleting..."
kubectl delete pod -n "${NAMESPACE}" "${INCORRECT_POD}"
echo "Pod deleted. StatefulSet will recreate the pod."
# Wait for a short duration to allow the StatefulSet to recreate the pod before checking again
sleep 10
fi
done

if [[ -n "${READY_POD}" ]]; then
echo "Ready pod found: ${READY_POD}"
break
elif [[ -n "${INCORRECT_POD}" ]]; then
echo "Non-ready pod with incorrect image found: ${INCORRECT_POD}. Deleting..."
kubectl delete pod -n ${NAMESPACE} ${INCORRECT_POD}
echo "Pod deleted. StatefulSet will recreate the pod."
# Wait for a short duration to allow the StatefulSet to recreate the pod before checking again
sleep 10
else
echo "Validator with image ${IMAGE_TAG} is not ready yet and no incorrect pods found. Will retry checking for ready or incorrect pods in 10 seconds..."
sleep 10
Expand All @@ -65,9 +52,9 @@ kubectl apply -f job.yaml
# Wait for the pod to be created and be in a running state
echo "Waiting for the e2e test pod to be in the running state..."
while :; do
POD_NAME=$(kubectl get pods -n ${NAMESPACE} --selector=job-name=${JOB_NAME} -o jsonpath='{.items[*].metadata.name}')
POD_NAME=$(kubectl get pods -n "${NAMESPACE}" --selector=job-name=${JOB_NAME} -o jsonpath='{.items[*].metadata.name}')
[[ -z "${POD_NAME}" ]] && echo "Waiting for pod to be scheduled..." && sleep 5 && continue
POD_STATUS=$(kubectl get pod ${POD_NAME} -n ${NAMESPACE} -o jsonpath='{.status.phase}')
POD_STATUS=$(kubectl get pod "${POD_NAME}" -n "${NAMESPACE}" -o jsonpath='{.status.phase}')
[[ "${POD_STATUS}" == "Running" ]] && break
echo "Current pod status: ${POD_STATUS}. Waiting for 'Running' status..."
sleep 5
Expand All @@ -76,23 +63,19 @@ done
echo "Pod is running. Monitoring logs and status..."

# Stream the pod logs in the background
kubectl logs -f ${POD_NAME} -n ${NAMESPACE} &
kubectl logs -f "${POD_NAME}" -n "${NAMESPACE}" &

# Monitor pod status in a loop
while :; do
CURRENT_STATUS=$(kubectl get pod ${POD_NAME} -n ${NAMESPACE} -o jsonpath="{.status.containerStatuses[0].state}")
CURRENT_STATUS=$(kubectl get pod "${POD_NAME}" -n "${NAMESPACE}" -o jsonpath="{.status.containerStatuses[0].state}")
if echo $CURRENT_STATUS | grep -q 'terminated'; then
EXIT_CODE=$(echo $CURRENT_STATUS | jq '.terminated.exitCode')
if [[ "$EXIT_CODE" != "0" ]]; then
echo "Container terminated with exit code ${EXIT_CODE}"
kubectl delete job ${JOB_NAME} -n ${NAMESPACE}
kubectl delete job "${JOB_NAME}" -n "${NAMESPACE}"
exit 1
fi
break
fi
sleep 5
done

# If the loop exits without failure, the job succeeded
echo "Job completed successfully"
kubectl delete job ${JOB_NAME} -n ${NAMESPACE}
24 changes: 7 additions & 17 deletions pkg/client/delegation/redelegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ package delegation
// of listening to all events and doing a verbose filter.

import (
"encoding/json"
"strconv"

"cosmossdk.io/api/tendermint/abci"

"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/client/events"
"github.com/pokt-network/poktroll/pkg/client/tx"
)

// redelegationEventType is the type of the EventRedelegation event emitted by
Expand All @@ -20,10 +18,6 @@ const redelegationEventType = "pocket.application.EventRedelegation"

var _ client.Redelegation = (*redelegation)(nil)

// TxEvent is an alias for the CometBFT TxResult type used to decode the
// response bytes from the EventsQueryClient's subscription
type TxEvent = abci.TxResult

// redelegation wraps the EventRedelegation event emitted by the application
// module, for use in the observable, it is one of the log entries embedded
// within the log field of the response struct from the app module's query.
Expand All @@ -49,19 +43,15 @@ func (d redelegation) GetGatewayAddress() string {
// fails then the error is returned.
func newRedelegationEventFactoryFn() events.NewEventsFn[client.Redelegation] {
return func(eventBz []byte) (client.Redelegation, error) {
txEvent := new(TxEvent)
// Try to deserialize the provided bytes into a TxEvent.
if err := json.Unmarshal(eventBz, txEvent); err != nil {
// Try to deserialize the provided bytes into an abci.TxResult.
txResult, err := tx.UnmarshalTxResult(eventBz)
if err != nil {
return nil, err
}
// Check if the TxEvent has empty transaction bytes, which indicates
// the message is probably not a valid transaction event.
if len(txEvent.Tx) == 0 {
return nil, events.ErrEventsUnmarshalEvent.Wrap("empty transaction bytes")
}

// Iterate through the log entries to find EventRedelegation
for _, event := range txEvent.Result.Events {
if event.GetType_() != redelegationEventType {
for _, event := range txResult.Result.Events {
if event.GetType() != redelegationEventType {
continue
}
var redelegationEvent redelegation
Expand Down
14 changes: 7 additions & 7 deletions pkg/client/tx/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ const (
// In order to simplify the logic of the TxClient
var _ client.TxClient = (*txClient)(nil)

// cometTxEvent is used to deserialize incoming transaction event messages
// CometTxEvent is used to deserialize incoming transaction event messages
// from the respective events query subscription. This structure is adapted
// to handle CometBFT's unique serialization format, which diverges from
// conventional approaches seen in implementations like rollkit's. The design
// ensures accurate parsing and compatibility with CometBFT's serialization
// of transaction results.
type cometTxEvent struct {
type CometTxEvent struct {
Data struct {
// TxResult is nested to accommodate CometBFT's serialization format,
// ensuring correct deserialization of transaction results.
Expand Down Expand Up @@ -512,24 +512,24 @@ func (txnClient *txClient) getTxTimeoutError(ctx context.Context, txHashHex stri
// If the resulting TxResult has empty transaction bytes, it assumes that
// the message was not a transaction results and returns an error.
func UnmarshalTxResult(txResultBz []byte) (*abci.TxResult, error) {
var rpcReponse rpctypes.RPCResponse
var rpcResponse rpctypes.RPCResponse

// Try to deserialize the provided bytes into an RPCResponse.
if err := json.Unmarshal(txResultBz, &rpcReponse); err != nil {
if err := json.Unmarshal(txResultBz, &rpcResponse); err != nil {
return nil, events.ErrEventsUnmarshalEvent.Wrap(err.Error())
}

var txResult cometTxEvent
var txResult CometTxEvent

// Try to deserialize the provided bytes into a TxResult.
if err := json.Unmarshal(rpcReponse.Result, &txResult); err != nil {
if err := json.Unmarshal(rpcResponse.Result, &txResult); err != nil {
return nil, events.ErrEventsUnmarshalEvent.Wrap(err.Error())
}

// Check if the TxResult has empty transaction bytes, which indicates
// the message might not be a valid transaction event.
if bytes.Equal(txResult.Data.Value.TxResult.Tx, []byte{}) {
return nil, events.ErrEventsUnmarshalEvent.Wrap("event bytes do not correspond to an comettypes.EventDataTx event")
return nil, events.ErrEventsUnmarshalEvent.Wrap("event bytes do not correspond to an abci.TxResult")
}

return &txResult.Data.Value.TxResult, nil
Expand Down
27 changes: 2 additions & 25 deletions pkg/client/tx/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"cosmossdk.io/depinject"
"cosmossdk.io/math"
abci "github.com/cometbft/cometbft/abci/types"
cometbytes "github.com/cometbft/cometbft/libs/bytes"
"github.com/cometbft/cometbft/libs/json"
rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types"
Expand Down Expand Up @@ -108,13 +107,8 @@ func TestTxClient_SignAndBroadcast_Succeeds(t *testing.T) {
require.NoError(t, err)

// Construct the expected transaction event bytes from the expected transaction bytes.
txResultEvent := &testTxEvent{
Data: testTxEventDataStruct{
Value: testTxEventValueStruct{
TxResult: abci.TxResult{Tx: expectedTx},
},
},
}
txResultEvent := &tx.CometTxEvent{}
txResultEvent.Data.Value.TxResult.Tx = expectedTx

txResultBz, err := json.Marshal(txResultEvent)
require.NoError(t, err)
Expand Down Expand Up @@ -438,20 +432,3 @@ func TestTxClient_SignAndBroadcast_Timeout(t *testing.T) {
func TestTxClient_SignAndBroadcast_MultipleMsgs(t *testing.T) {
t.SkipNow()
}

// TODO_BLOCKER: Fix duplicate definitions of this type across tests & source code.
// This duplicates the unexported `cometTxEvent` from `pkg/client/tx/client.go`.
// We need to answer the following questions to avoid this:
// - Should tests be their own packages? (i.e. `package block` vs `package block_test`)
// - Should we prefer export types which are not required for API consumption?
// - Should we use `//go:build“ test constraint on new files using it for testing purposes?
// - Should we enforce all tests to use `-tags=test`?
type testTxEvent struct {
Data testTxEventDataStruct `json:"data"`
}
type testTxEventDataStruct struct {
Value testTxEventValueStruct `json:"value"`
}
type testTxEventValueStruct struct {
TxResult abci.TxResult
}
22 changes: 20 additions & 2 deletions testutil/testclient/testdelegation/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"testing"

"cosmossdk.io/depinject"
"github.com/cometbft/cometbft/libs/json"
rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/client/delegation"
"github.com/pokt-network/poktroll/pkg/client/tx"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/testutil/mockclient"
Expand Down Expand Up @@ -149,6 +152,21 @@ func NewRedelegationEventBytes(
) []byte {
t.Helper()
jsonTemplate := `{"tx":"SGVsbG8sIHdvcmxkIQ==","result":{"events":[{"type":"message","attributes":[{"key":"action","value":"/pocket.application.MsgDelegateToGateway"},{"key":"sender","value":"pokt1exampleaddress"},{"key":"module","value":"application"}]},{"type":"pocket.application.EventRedelegation","attributes":[{"key":"app_address","value":"\"%s\""},{"key":"gateway_address","value":"\"%s\""}]}]}}`
json := fmt.Sprintf(jsonTemplate, appAddress, gatewayAddress)
return []byte(json)

txResultEvent := &tx.CometTxEvent{}

err := json.Unmarshal(
[]byte(fmt.Sprintf(jsonTemplate, appAddress, gatewayAddress)),
&txResultEvent.Data.Value.TxResult,
)
require.NoError(t, err)

txResultBz, err := json.Marshal(txResultEvent)
require.NoError(t, err)

rpcResult := &rpctypes.RPCResponse{Result: txResultBz}
rpcResultBz, err := json.Marshal(rpcResult)
require.NoError(t, err)

return rpcResultBz
}

0 comments on commit 1c13536

Please sign in to comment.