Skip to content

Commit

Permalink
Commit messages tracker
Browse files Browse the repository at this point in the history
- Removes oldest commit message when capacity is reached
- Efficient removal of messages if they get processed
- Efficient cleanup of oldest message
- Uses a bit more space to store each block hash
  • Loading branch information
qdm12 committed Apr 26, 2022
1 parent 54c8b6b commit 9e8b032
Show file tree
Hide file tree
Showing 3 changed files with 459 additions and 15 deletions.
35 changes: 20 additions & 15 deletions lib/grandpa/message_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,26 @@ type tracker struct {
blockState BlockState
handler *MessageHandler
votes votesTracker

// map of commit block hash to commit message
commitMessages map[common.Hash]*CommitMessage
mapLock sync.Mutex
in chan *types.Block // receive imported block from BlockState
stopped chan struct{}
commits commitsTracker
mapLock sync.Mutex
in chan *types.Block // receive imported block from BlockState
stopped chan struct{}

catchUpResponseMessageMutex sync.Mutex
// round(uint64) is used as key and *CatchUpResponse as value
catchUpResponseMessages map[uint64]*CatchUpResponse
}

func newTracker(bs BlockState, handler *MessageHandler) *tracker {
const votesCapacity = 1000
const (
votesCapacity = 1000
commitsCapacity = 1000
)
return &tracker{
blockState: bs,
handler: handler,
votes: newVotesTracker(votesCapacity),
commitMessages: make(map[common.Hash]*CommitMessage),
commits: newCommitsTracker(commitsCapacity),
mapLock: sync.Mutex{},
in: bs.GetImportedBlockNotifierChannel(),
stopped: make(chan struct{}),
Expand Down Expand Up @@ -68,7 +69,7 @@ func (t *tracker) addVote(peerID peer.ID, message *VoteMessage) {
func (t *tracker) addCommit(cm *CommitMessage) {
t.mapLock.Lock()
defer t.mapLock.Unlock()
t.commitMessages[cm.Vote.Hash] = cm
t.commits.add(cm)
}

func (t *tracker) addCatchUpResponse(_ *CatchUpResponse) {
Expand Down Expand Up @@ -115,13 +116,14 @@ func (t *tracker) handleBlock(b *types.Block) {

t.votes.delete(h)

if cm, has := t.commitMessages[h]; has {
cm := t.commits.getMessageForBlockHash(h)
if cm != nil {
_, err := t.handler.handleMessage("", cm)
if err != nil {
logger.Warnf("failed to handle commit message %v: %s", cm, err)
}

delete(t.commitMessages, h)
t.commits.delete(h)
}
}

Expand All @@ -145,13 +147,16 @@ func (t *tracker) handleTick() {
t.votes.delete(blockHashDone)
}

for _, cm := range t.commitMessages {
t.commits.forEach(func(cm *CommitMessage) {
_, err := t.handler.handleMessage("", cm)
if err != nil {
logger.Debugf("failed to handle commit message %v: %s", cm, err)
continue
return
}

delete(t.commitMessages, cm.Vote.Hash)
}
// deleting while iterating is safe to do since
// each block hash has at most 1 commit message we
// just handled above.
t.commits.delete(cm.Vote.Hash)
})
}
118 changes: 118 additions & 0 deletions lib/grandpa/tracker_commits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2022 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package grandpa

import (
"container/list"

"github.com/ChainSafe/gossamer/lib/common"
)

// commitsTracker tracks vote messages that could
// not be processed, and removes the oldest ones once
// its maximum capacity is reached.
// It is NOT THREAD SAFE to use.
type commitsTracker struct {
// map of commit block hash to data
// data = message + tracking linked list element pointer
mapping map[common.Hash]commitMessageMapData
// double linked list of block hash
// to track the order commit messages were added in.
linkedList *list.List
capacity int
}

type commitMessageMapData struct {
message *CommitMessage
// element contains a block hash value.
element *list.Element
}

// newCommitsTracker creates a new commit messages tracker
// with the capacity specified.
func newCommitsTracker(capacity int) commitsTracker {
return commitsTracker{
mapping: make(map[common.Hash]commitMessageMapData, capacity),
linkedList: list.New(),
capacity: capacity,
}
}

// add adds a commit message to the commit message tracker.
// If the commit message tracker capacity is reached,
// the oldest commit message is removed.
func (ct *commitsTracker) add(commitMessage *CommitMessage) {
blockHash := commitMessage.Vote.Hash

data, has := ct.mapping[blockHash]
if has {
// commit already exists so override the commit for the block hash;
// do not move the list element in the linked list to avoid
// someone re-sending the same commit message and going at the
// front of the list, hence erasing other possible valid commit messages
// in the tracker.
data.message = commitMessage
ct.mapping[blockHash] = data
return
}

// add new block hash in tracker
ct.cleanup()
element := ct.linkedList.PushFront(blockHash)
data = commitMessageMapData{
message: commitMessage,
element: element,
}
ct.mapping[blockHash] = data
}

// cleanup removes the oldest commit message from the tracker
// if the number of commit messages is at the tracker capacity.
// This method is designed to be called automatically from the
// add method and should not be called elsewhere.
func (ct *commitsTracker) cleanup() {
if ct.linkedList.Len() < ct.capacity {
return
}

oldestElement := ct.linkedList.Back()
ct.linkedList.Remove(oldestElement)

oldestBlockHash := oldestElement.Value.(common.Hash)
delete(ct.mapping, oldestBlockHash)
}

// delete deletes all the vote messages for a particular
// block hash from the vote messages tracker.
func (ct *commitsTracker) delete(blockHash common.Hash) {
data, has := ct.mapping[blockHash]
if !has {
return
}

ct.linkedList.Remove(data.element)
delete(ct.mapping, blockHash)
}

// getMessageForBlockHash returns a pointer to the
// commit message for a particular block hash from
// the tracker. It returns nil if the block hash
// does not exist in the tracker
func (ct *commitsTracker) getMessageForBlockHash(
blockHash common.Hash) (message *CommitMessage) {
data, ok := ct.mapping[blockHash]
if !ok {
return nil
}

return data.message
}

// forEach runs the function `f` on each
// commit message stored in the tracker.
func (ct *commitsTracker) forEach(f func(message *CommitMessage)) {
for _, data := range ct.mapping {
f(data.message)
}
}
Loading

0 comments on commit 9e8b032

Please sign in to comment.