Skip to content

Commit

Permalink
Flush query should include begin and end block events (#1125)
Browse files Browse the repository at this point in the history
* Include begin and end block events

* disable flushing when termination condition is set

* Still flush for FlushLifecycle

* Add sort for flush logging to avoid confusion
  • Loading branch information
agouin authored Mar 20, 2023
1 parent 72d1ad2 commit 15317d7
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 13 deletions.
56 changes: 49 additions & 7 deletions relayer/chains/cosmos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"

abci "github.com/cometbft/cometbft/abci/types"
Expand Down Expand Up @@ -52,14 +53,55 @@ func (cc *CosmosProvider) queryIBCMessages(ctx context.Context, log *zap.Logger,
return nil, errors.New("limit must greater than 0")
}

res, err := cc.RPCClient.TxSearch(ctx, query, true, &page, &limit, "")
if err != nil {
return nil, err
}
var ibcMsgs []ibcMessage
var eg errgroup.Group
chainID := cc.ChainId()
for _, tx := range res.Txs {
ibcMsgs = append(ibcMsgs, ibcMessagesFromEvents(log, tx.TxResult.Events, chainID, 0, base64Encoded)...)
var ibcMsgs []ibcMessage
var mu sync.Mutex

eg.Go(func() error {
res, err := cc.RPCClient.BlockSearch(ctx, query, &page, &limit, "")
if err != nil {
return err
}

var nestedEg errgroup.Group

for _, b := range res.Blocks {
b := b
nestedEg.Go(func() error {
block, err := cc.RPCClient.BlockResults(ctx, &b.Block.Height)
if err != nil {
return err
}

mu.Lock()
defer mu.Unlock()
ibcMsgs = append(ibcMsgs, ibcMessagesFromEvents(log, block.BeginBlockEvents, chainID, 0, base64Encoded)...)
ibcMsgs = append(ibcMsgs, ibcMessagesFromEvents(log, block.EndBlockEvents, chainID, 0, base64Encoded)...)

return nil
})
}
return nestedEg.Wait()
})

eg.Go(func() error {
res, err := cc.RPCClient.TxSearch(ctx, query, true, &page, &limit, "")
if err != nil {
return err
}

mu.Lock()
defer mu.Unlock()
for _, tx := range res.Txs {
ibcMsgs = append(ibcMsgs, ibcMessagesFromEvents(log, tx.TxResult.Events, chainID, 0, base64Encoded)...)
}

return nil
})

if err := eg.Wait(); err != nil {
return nil, err
}

return ibcMsgs, nil
Expand Down
31 changes: 25 additions & 6 deletions relayer/processor/path_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,7 @@ func NewPathProcessor(
clientUpdateThresholdTime time.Duration,
flushInterval time.Duration,
) *PathProcessor {
if flushInterval == 0 {
// "disable" periodic flushing by using a large value.
flushInterval = 200 * 24 * 365 * time.Hour
}
return &PathProcessor{
pp := &PathProcessor{
log: log,
pathEnd1: newPathEndRuntime(log, pathEnd1, metrics),
pathEnd2: newPathEndRuntime(log, pathEnd2, metrics),
Expand All @@ -109,10 +105,33 @@ func NewPathProcessor(
flushInterval: flushInterval,
metrics: metrics,
}
if flushInterval == 0 {
pp.disablePeriodicFlush()
}
return pp
}

// disablePeriodicFlush will "disable" periodic flushing by using a large value.
func (pp *PathProcessor) disablePeriodicFlush() {
pp.flushInterval = 200 * 24 * 365 * time.Hour
}

func (pp *PathProcessor) SetMessageLifecycle(messageLifecycle MessageLifecycle) {
pp.messageLifecycle = messageLifecycle
if !pp.shouldFlush() {
// disable flushing when termination conditions are set, e.g. connection/channel handshakes
pp.disablePeriodicFlush()
}
}

func (pp *PathProcessor) shouldFlush() bool {
if pp.messageLifecycle == nil {
return true
}
if _, ok := pp.messageLifecycle.(*FlushLifecycle); ok {
return true
}
return false
}

// TEST USE ONLY
Expand Down Expand Up @@ -299,7 +318,7 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) {
continue
}

if !pp.initialFlushComplete {
if pp.shouldFlush() && !pp.initialFlushComplete {
pp.flush(ctx)
pp.initialFlushComplete = true
} else if pp.shouldTerminateForFlushComplete(ctx, cancel) {
Expand Down
3 changes: 3 additions & 0 deletions relayer/processor/path_processor_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,9 @@ func queryPacketCommitments(
for i, p := range c.Commitments {
commitments[k][i] = p.Sequence
}
sort.SliceStable(commitments[k], func(i, j int) bool {
return commitments[k][i] < commitments[k][j]
})
return nil
}
}
Expand Down

0 comments on commit 15317d7

Please sign in to comment.