-
Notifications
You must be signed in to change notification settings - Fork 208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix epoch block marshaling #1688
Merged
Merged
Changes from 4 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
f1d2ad8
Extend transaction tracker to track blocks as well
piersy b848c70
Rename transaction_tracker.go -> tracker.go
piersy 5393345
Fix epoch block marshaling and add test for it
piersy 37ed12d
Merge branch 'master' into piersy/fix-epoch-block-marshaling
piersy 3dda340
Don't skip nil fields when unmarshaling EpochSnarkData
piersy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
package test | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"sync" | ||
|
||
ethereum "github.com/celo-org/celo-blockchain" | ||
"github.com/celo-org/celo-blockchain/common" | ||
"github.com/celo-org/celo-blockchain/core/types" | ||
"github.com/celo-org/celo-blockchain/ethclient" | ||
"github.com/celo-org/celo-blockchain/event" | ||
) | ||
|
||
var ( | ||
errStopped = errors.New("transaction tracker closed") | ||
) | ||
|
||
// Tracker tracks processed blocks and transactions through a subscription with | ||
// an ethclient. It provides the ability to check whether blocks or | ||
// transactions have been processed and to wait till those blocks or | ||
// transactions have been processed. | ||
type Tracker struct { | ||
client *ethclient.Client | ||
heads chan *types.Header | ||
sub ethereum.Subscription | ||
wg sync.WaitGroup | ||
// processedTxs maps transaction hashes to the block they were processed in. | ||
processedTxs map[common.Hash]*types.Block | ||
// processedBlocks maps block number to processed blocks. | ||
processedBlocks map[uint64]*types.Block | ||
processedMu sync.Mutex | ||
stopCh chan struct{} | ||
newBlock event.Feed | ||
} | ||
|
||
// NewTracker creates a new tracker. | ||
func NewTracker() *Tracker { | ||
return &Tracker{ | ||
heads: make(chan *types.Header, 10), | ||
processedTxs: make(map[common.Hash]*types.Block), | ||
processedBlocks: make(map[uint64]*types.Block), | ||
} | ||
} | ||
|
||
// GetProcessedTx returns the processed transaction with the given hash or nil | ||
// if the tracker has not seen a processed transaction with the given hash. | ||
func (tr *Tracker) GetProcessedTx(hash common.Hash) *types.Transaction { | ||
tr.processedMu.Lock() | ||
defer tr.processedMu.Unlock() | ||
return tr.processedTxs[hash].Transaction(hash) | ||
} | ||
|
||
// GetProcessedBlockForTx returns the block that a transaction with the given | ||
// hash was processed in or nil if the tracker has not seen a processed | ||
// transaction with the given hash. | ||
func (tr *Tracker) GetProcessedBlockForTx(hash common.Hash) *types.Block { | ||
tr.processedMu.Lock() | ||
defer tr.processedMu.Unlock() | ||
return tr.processedTxs[hash] | ||
} | ||
|
||
// GetProcessedBlock returns processed block with the given num or nil if the | ||
// tracker has not seen a processed block with that num. | ||
func (tr *Tracker) GetProcessedBlock(num uint64) *types.Block { | ||
tr.processedMu.Lock() | ||
defer tr.processedMu.Unlock() | ||
return tr.processedBlocks[num] | ||
} | ||
|
||
// StartTracking subscribes to new head events on the client and starts | ||
// processing the events in a goroutine. | ||
func (tr *Tracker) StartTracking(client *ethclient.Client) error { | ||
if tr.sub != nil { | ||
return errors.New("attempted to start already started tracker") | ||
} | ||
// The subscription client will buffer 20000 notifications before closing | ||
// the subscription, if that happens the Err() chan will return | ||
// ErrSubscriptionQueueOverflow | ||
sub, err := client.SubscribeNewHead(context.Background(), tr.heads) | ||
if err != nil { | ||
return err | ||
} | ||
tr.client = client | ||
tr.sub = sub | ||
tr.stopCh = make(chan struct{}) | ||
|
||
tr.wg.Add(1) | ||
go func() { | ||
defer tr.wg.Done() | ||
err := tr.track() | ||
if err != nil { | ||
fmt.Printf("track failed with error: %v\n", err) | ||
} | ||
}() | ||
return nil | ||
} | ||
|
||
// track reads new heads from the heads channel and for each head retrieves the | ||
// block, places the block in processedBlocks and places the transactions into | ||
// processedTxs. It signals the sub Subscription for each retrieved block. | ||
func (tr *Tracker) track() error { | ||
for { | ||
select { | ||
case h := <-tr.heads: | ||
b, err := tr.client.BlockByHash(context.Background(), h.Hash()) | ||
if err != nil { | ||
return err | ||
} | ||
tr.processedMu.Lock() | ||
tr.processedBlocks[b.NumberU64()] = b | ||
// If we have transactions then process them | ||
if len(b.Transactions()) > 0 { | ||
for _, t := range b.Transactions() { | ||
tr.processedTxs[t.Hash()] = b | ||
} | ||
} | ||
tr.processedMu.Unlock() | ||
// signal | ||
tr.newBlock.Send(struct{}{}) | ||
case err := <-tr.sub.Err(): | ||
// Will be nil if closed by calling Unsubscribe() | ||
return err | ||
case <-tr.stopCh: | ||
return nil | ||
} | ||
} | ||
} | ||
|
||
// AwaitTransactions waits for the transactions listed in hashes to be | ||
// processed, it will return the ctx.Err() if ctx expires before all the | ||
// transactions in hashes were processed or ErrStopped if StopTracking is | ||
// called before all the transactions in hashes were processed. | ||
func (tr *Tracker) AwaitTransactions(ctx context.Context, hashes []common.Hash) error { | ||
hashmap := make(map[common.Hash]struct{}, len(hashes)) | ||
for i := range hashes { | ||
hashmap[hashes[i]] = struct{}{} | ||
} | ||
condition := func() bool { | ||
for hash := range hashmap { | ||
_, ok := tr.processedTxs[hash] | ||
if ok { | ||
delete(hashmap, hash) | ||
} | ||
} | ||
// If there are no transactions left then they have all been processed. | ||
return len(hashmap) == 0 | ||
} | ||
return tr.await(ctx, condition) | ||
} | ||
|
||
// AwaitBlock waits for a block with the given num to be processed, it will | ||
// return the ctx.Err() if ctx expires before a block with that number has been | ||
// processed or ErrStopped if StopTracking is called before a block with that | ||
// number is processed. | ||
func (tr *Tracker) AwaitBlock(ctx context.Context, num uint64) error { | ||
condition := func() bool { | ||
return tr.processedBlocks[num] != nil | ||
} | ||
return tr.await(ctx, condition) | ||
} | ||
|
||
// await waits for the provided condition to return true, it rechecks the | ||
// condition every time a new block is received by the Tracker. Await returns | ||
// nil when the condition returns true, otherwise it will return ctx.Err() if | ||
// ctx expires before the condition returns true or ErrStopped if StopTracking | ||
// is called before the condition returns true. | ||
func (tr *Tracker) await(ctx context.Context, condition func() bool) error { | ||
ch := make(chan struct{}, 10) | ||
sub := tr.newBlock.Subscribe(ch) | ||
defer sub.Unsubscribe() | ||
for { | ||
tr.processedMu.Lock() | ||
found := condition() | ||
tr.processedMu.Unlock() | ||
// If we found what we are looking for then return. | ||
if found { | ||
return nil | ||
} | ||
select { | ||
case <-ch: | ||
continue | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
case <-tr.stopCh: | ||
return errStopped | ||
} | ||
} | ||
} | ||
|
||
// StopTracking shuts down all the goroutines in the tracker. | ||
func (tr *Tracker) StopTracking() error { | ||
if tr.sub == nil { | ||
return errors.New("attempted to stop already stopped tracker") | ||
} | ||
tr.sub.Unsubscribe() | ||
close(tr.stopCh) | ||
tr.wg.Wait() | ||
tr.wg = sync.WaitGroup{} | ||
return nil | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What should
r.UnmarshalJSON()
do if the receivedbitmap
andsignature
are nil, butr
holds some values for it? Should it:r
values with the nil interpretation of itI have a hunch that it should do 2, but I'm honestly asking.
Even so, this implementation seems that it does a mix. If bitmap is nil, it won't overwrite the value, but if signature is not, it would, while not returning any error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not clear what it should do, but I would veer towards something that in most cases would make (marshal(unmarshal(json)) be as idempotent as possible.
Or maybe I'm just overthinking this and it's not that important
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, its not that clear.
I think there's only one valid case for calling UnmarshalJson on non empty object, that would be to re-use a struct for unmarshaling to avoid re-allocating memory.
I think we should disallow code from setting some fields of a struct and expecting the rest to be set by unmarshaling, it seems like a fragile and confusing approach.
So to support that behavior, the unmarshal function would need to set all fields when executed. Interestingly though it looks like that implementation would violate the convention you quoted above, but I wrote a test on go playground to check this and it seems that Unmarshal will overwrite a struct with nil when decoding []byte("null") so it looks like its not a no-op.
https://play.golang.org/p/dHiWmj8X7KD
So based on that I think we can drop the nil checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok so I actually raised an issue on the golang repo because this was not clear to me golang/go#48646, but for now I still think dropping the nil checks is the best course of action.