From 67e6a6aba3da82d3ee053fe651da07a144ea0fe3 Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Wed, 29 Sep 2021 23:23:29 +0100 Subject: [PATCH 01/21] Add start stop validators e2e test --- e2e_test/e2e_test.go | 110 +++++++++++++++++++++++++++++++++- test/node.go | 138 ++++++++++++++++++++++--------------------- 2 files changed, 180 insertions(+), 68 deletions(-) diff --git a/e2e_test/e2e_test.go b/e2e_test/e2e_test.go index d108145d26..0bb31ac8f0 100644 --- a/e2e_test/e2e_test.go +++ b/e2e_test/e2e_test.go @@ -2,9 +2,11 @@ package e2e_test import ( "context" + "errors" "testing" "time" + "github.com/celo-org/celo-blockchain/core/types" "github.com/celo-org/celo-blockchain/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -14,7 +16,7 @@ func init() { // This statement is commented out but left here since its very useful for // debugging problems and its non trivial to construct. // - //log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) + // log.Root().SetHandler(log.LvlFilterHandler(log.LvlDebug, log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) } // This test starts a network submits a transaction and waits for the whole @@ -66,3 +68,109 @@ func TestEpochBlockMarshaling(t *testing.T) { assert.True(t, len(b.EpochSnarkData().Signature) > 0) assert.True(t, b.EpochSnarkData().Bitmap.Uint64() > 0) } + +// This test checks that a network can have validators shut down mid operation +// and that it can continue to function, it also checks that if more than f +// validators are shut down, when they restart the network is able to continue. +func TestStartStopValidators(t *testing.T) { + accounts := test.Accounts(4) + gc, ec, err := test.BuildConfig(accounts) + require.NoError(t, err) + network, err := test.NewNetwork(accounts, gc, ec) + require.NoError(t, err) + defer network.Shutdown() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + var txs []*types.Transaction + + // Send 1 celo from the dev account attached to node 0 to the dev account + // attached to node 1. + tx, err := network[0].SendCelo(ctx, network[1].DevAddress, 1) + require.NoError(t, err) + txs = append(txs, tx) + + // Wait for the whole network to process the transaction. + err = network.AwaitTransactions(ctx, txs...) + require.NoError(t, err) + + // Stop one node, the rest of the network should still be able to progress + err = network[3].Close() + require.NoError(t, err) + + // Send 1 celo from the dev account attached to node 0 to the dev account + // attached to node 1. + tx, err = network[0].SendCelo(ctx, network[1].DevAddress, 1) + require.NoError(t, err) + txs = append(txs, tx) + + // Check that the remaining network can still process this transction. + err = network[:3].AwaitTransactions(ctx, txs...) + require.NoError(t, err) + + // Stop another node, the network should now be stuck + err = network[2].Close() + require.NoError(t, err) + + // Now we will check that the network does not process transactions in this + // state, by waiting for a reasonable amount of time for it to process a + // transaction and assuming it is not processing transactions if we time out. + shortCtx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + // Send 1 celo from the dev account attached to node 0 to the dev account + // attached to node 1. + tx, err = network[0].SendCelo(shortCtx, network[1].DevAddress, 1) + require.NoError(t, err) + txs = append(txs, tx) + + err = network[:2].AwaitTransactions(shortCtx, txs...) + // Expect DeadlineExceeded error + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("expecting %q, instead got: %v ", context.DeadlineExceeded.Error(), err) + } + + // Start the last stopped node + err = network[2].Start() + require.NoError(t, err) + + // Connect last stopped node to running nodes + network[2].AddPeers(network[:2]...) + time.Sleep(25 * time.Millisecond) + for _, n := range network[:3] { + err = n.GossipEnodeCertificatge() + require.NoError(t, err) + } + + // Check that the network now processes the previous transaction. + err = network[:3].AwaitTransactions(ctx, txs...) + require.NoError(t, err) + + // Check that the network now quickly processes incoming transactions. + tx, err = network[0].SendCelo(ctx, network[1].DevAddress, 1) + require.NoError(t, err) + txs = append(txs, tx) + + err = network[:3].AwaitTransactions(ctx, txs...) + require.NoError(t, err) + + // Start the first stopped node + err = network[3].Start() + require.NoError(t, err) + + // Connect final node to rest of network + network[3].AddPeers(network[:3]...) + time.Sleep(25 * time.Millisecond) + for _, n := range network { + err = n.GossipEnodeCertificatge() + require.NoError(t, err) + } + + // Check that the network continues to quickly processes incoming transactions. + tx, err = network[0].SendCelo(ctx, network[1].DevAddress, 1) + require.NoError(t, err) + txs = append(txs, tx) + + err = network.AwaitTransactions(ctx, txs...) + require.NoError(t, err) + +} diff --git a/test/node.go b/test/node.go index c5b4abf27c..630822a6c0 100644 --- a/test/node.go +++ b/test/node.go @@ -8,15 +8,14 @@ import ( "fmt" "io/ioutil" "math/big" - "net" "os" - "strconv" "time" ethereum "github.com/celo-org/celo-blockchain" "github.com/celo-org/celo-blockchain/accounts/keystore" "github.com/celo-org/celo-blockchain/common" + "github.com/celo-org/celo-blockchain/common/hexutil" "github.com/celo-org/celo-blockchain/consensus/istanbul" "github.com/celo-org/celo-blockchain/consensus/istanbul/backend" "github.com/celo-org/celo-blockchain/core" @@ -25,7 +24,6 @@ import ( "github.com/celo-org/celo-blockchain/eth" "github.com/celo-org/celo-blockchain/eth/downloader" "github.com/celo-org/celo-blockchain/ethclient" - "github.com/celo-org/celo-blockchain/log" "github.com/celo-org/celo-blockchain/mycelo/env" "github.com/celo-org/celo-blockchain/mycelo/genesis" "github.com/celo-org/celo-blockchain/node" @@ -69,11 +67,14 @@ var ( // 50ms is a really low timeout, we set this here because nodes // fail the first round of consensus and so setting this higher // makes tests run slower. - RequestTimeout: 50, - Epoch: 10, - ProposerPolicy: istanbul.ShuffledRoundRobin, - DefaultLookbackWindow: 3, - BlockPeriod: 0, + RequestTimeout: 200, + TimeoutBackoffFactor: 200, + MinResendRoundChangeTimeout: 200, + MaxResendRoundChangeTimeout: 10000, + Epoch: 20, + ProposerPolicy: istanbul.ShuffledRoundRobin, + DefaultLookbackWindow: 3, + BlockPeriod: 0, }, } ) @@ -84,6 +85,7 @@ type Node struct { *node.Node Config *node.Config P2PListenAddr string + Enode *enode.Node Eth *eth.Ethereum EthConfig *eth.Config WsClient *ethclient.Client @@ -155,17 +157,6 @@ func (n *Node) Start() error { return err } - // Give this logger context based on the node address so that we can easily - // trace single node execution in the logs. We set the logger only on the - // copy, since it is not useful for black box testing and it is also not - // marshalable since the implementation contains unexported fields. - // - // Note unfortunately there are many other loggers created in geth separate - // from this one, which means we still see a lot of output that is not - // attributable to a specific node. - nodeConfigCopy.Logger = log.New("node", n.Address.String()[2:7]) - nodeConfigCopy.Logger.SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) - n.Node, err = node.New(nodeConfigCopy) if err != nil { return err @@ -206,6 +197,7 @@ func (n *Node) Start() error { if err != nil { return err } + _, _, err = core.SetupGenesisBlock(n.Eth.ChainDb(), n.EthConfig.Genesis) if err != nil { return err @@ -222,7 +214,63 @@ func (n *Node) Start() error { if err != nil { return err } - return n.Eth.StartMining() + err = n.Eth.StartMining() + if err != nil { + return err + } + + // Note we need to use the LocalNode from the p2p server because that is + // what is also used by the announce protocol when building enode + // certificates, so that is what is used by the announce protocol to check + // if a validator enode has changed. If we constructed the enode ourselves + // here we could not be sure it matches the enode from the p2p.Server. + n.Enode = n.Server().LocalNode().Node() + + return nil +} + +// Provides a short representation of a hash +func shortAddress(a common.Address) string { + return hexutil.Encode(a[:2]) +} + +func (n *Node) AddPeers(nodes ...*Node) { + // Add the given nodes as peers. Although this means that nodes can reach + // each other nodes don't start sending consensus messages to another node + // until they have received an enode certificate from that node. + for _, no := range nodes { + n.Server().AddPeer(no.Enode, p2p.ValidatorPurpose) + } +} + +// GossipEnodeCertificatge gossips this nodes enode certificates to the rest of +// the network. +func (n *Node) GossipEnodeCertificatge() error { + enodeCertificate := &istanbul.EnodeCertificate{ + EnodeURL: n.Enode.URLv4(), + Version: uint(time.Now().Unix()), + } + enodeCertificateBytes, err := rlp.EncodeToBytes(enodeCertificate) + if err != nil { + return err + } + msg := &istanbul.Message{ + Code: istanbul.EnodeCertificateMsg, + Address: n.Address, + Msg: enodeCertificateBytes, + } + b := n.Eth.Engine().(*backend.Backend) + if err := msg.Sign(b.Sign); err != nil { + return err + } + payload, err := msg.Payload() + if err != nil { + return err + } + // Share enode certificates to the other nodes, nodes wont consider other + // nodes valid validators without seeing an enode certificate message from + // them. + return b.Gossip(payload, istanbul.EnodeCertificateMsg) } // Close shuts down the node and releases all resources and removes the datadir @@ -379,27 +427,11 @@ func NewNetwork(accounts *env.AccountsConfig, gc *genesis.Config, ec *eth.Config network[i] = n } - enodes := make([]*enode.Node, len(network)) - for i, n := range network { - host, port, err := net.SplitHostPort(n.P2PListenAddr) - if err != nil { - return nil, err - } - portNum, err := strconv.Atoi(port) - if err != nil { - return nil, err - } - en := enode.NewV4(&n.Key.PublicKey, net.ParseIP(host), portNum, portNum) - enodes[i] = en - } // Connect nodes to each other, although this means that nodes can reach // each other nodes don't start sending consensus messages to another node // until they have received an enode certificate from that node. - for i, en := range enodes { - // Connect to the remaining nodes - for _, n := range network[i+1:] { - n.Server().AddPeer(en, p2p.ValidatorPurpose) - } + for i := range network { + network[i].AddPeers(network[i+1:]...) } // Give nodes some time to connect. Also there is a race condition in @@ -411,36 +443,8 @@ func NewNetwork(accounts *env.AccountsConfig, gc *genesis.Config, ec *eth.Config // bit and cross our fingers. time.Sleep(25 * time.Millisecond) - version := uint(time.Now().Unix()) - - // Share enode certificates between nodes, nodes wont consider other nodes - // valid validators without seeing an enode certificate message from them. for i := range network { - enodeCertificate := &istanbul.EnodeCertificate{ - EnodeURL: enodes[i].URLv4(), - Version: version, - } - enodeCertificateBytes, err := rlp.EncodeToBytes(enodeCertificate) - if err != nil { - return nil, err - } - - b := network[i].Eth.Engine().(*backend.Backend) - msg := &istanbul.Message{ - Code: istanbul.EnodeCertificateMsg, - Address: b.Address(), - Msg: enodeCertificateBytes, - } - // Sign the message - if err := msg.Sign(b.Sign); err != nil { - return nil, err - } - p, err := msg.Payload() - if err != nil { - return nil, err - } - - err = b.Gossip(p, istanbul.EnodeCertificateMsg) + err := network[i].GossipEnodeCertificatge() if err != nil { return nil, err } From 35b3d7e438fe1ea9383b131babd21a26608c29a0 Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Tue, 28 Sep 2021 16:12:45 +0100 Subject: [PATCH 02/21] Fix panic on StopTracking --- test/tracker.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/tracker.go b/test/tracker.go index 04f4b74c9b..2ed7d276ae 100644 --- a/test/tracker.go +++ b/test/tracker.go @@ -197,6 +197,9 @@ func (tr *Tracker) StopTracking() error { tr.sub.Unsubscribe() close(tr.stopCh) tr.wg.Wait() + // Set this to nil to mark the tracker as stopped. This must be done after + // waiting for wg, to avoid a data race in trackTransactions. + tr.sub = nil tr.wg = sync.WaitGroup{} return nil } From f9dbb7f6d369d0f82678e7f5596108886b294862 Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Wed, 29 Sep 2021 15:37:39 +0100 Subject: [PATCH 03/21] Use ephemeral key for validator p2p network --- test/node.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/test/node.go b/test/node.go index 630822a6c0..841ef265ce 100644 --- a/test/node.go +++ b/test/node.go @@ -110,8 +110,13 @@ func NewNode( // Copy the node config so we can modify it without damaging the original ncCopy := *nc - // p2p key and address - ncCopy.P2P.PrivateKey = validatorAccount.PrivateKey + + // p2p key and address, this is not the same as the validator key. + p2pKey, err := crypto.GenerateKey() + if err != nil { + return nil, err + } + ncCopy.P2P.PrivateKey = p2pKey // Make temp datadir datadir, err := ioutil.TempDir("", "celo_datadir") From 2cb021a42715ddbbbeb4db34fc152a075b271489 Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Wed, 29 Sep 2021 22:55:42 +0100 Subject: [PATCH 04/21] Allow configuring the min peers needed to sync In order to run small scale e2e tests reliably we need to be able to let nodes sync from just one other node. This commit allows the minimum sync peers to be configured, where previously it was hardcoded to 5. --- eth/backend.go | 23 ++++++++++++----------- eth/ethconfig/config.go | 4 ++++ eth/handler.go | 26 +++++++++++++------------- eth/sync.go | 19 ++++++++++++++----- test/node.go | 1 + 5 files changed, 44 insertions(+), 29 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index 4104e9d4ca..aacddc63f7 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -216,17 +216,18 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { checkpoint = params.TrustedCheckpoints[genesisHash] } if eth.handler, err = newHandler(&handlerConfig{ - Database: chainDb, - Chain: eth.blockchain, - TxPool: eth.txPool, - Network: config.NetworkId, - Sync: config.SyncMode, - BloomCache: uint64(cacheLimit), - EventMux: eth.eventMux, - Checkpoint: checkpoint, - Whitelist: config.Whitelist, - server: stack.Server(), - proxyServer: stack.ProxyServer(), + Database: chainDb, + Chain: eth.blockchain, + TxPool: eth.txPool, + Network: config.NetworkId, + Sync: config.SyncMode, + BloomCache: uint64(cacheLimit), + EventMux: eth.eventMux, + Checkpoint: checkpoint, + Whitelist: config.Whitelist, + server: stack.Server(), + proxyServer: stack.ProxyServer(), + MinSyncPeers: config.MinSyncPeers, }); err != nil { return nil, err } diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 225762dab9..de7c60420b 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -150,6 +150,10 @@ type Config struct { // E block override (TODO: remove after the fork) OverrideEHardfork *big.Int `toml:",omitempty"` + + // The minimum required peers in order for syncing to be initiated, if left + // at 0 then the default will be used. + MinSyncPeers int `toml:",omitempty"` } // CreateConsensusEngine creates the required type of consensus engine instance for an Ethereum service diff --git a/eth/handler.go b/eth/handler.go index 31ef766bf2..91317ca06a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -78,17 +78,18 @@ type txPool interface { // handlerConfig is the collection of initialization parameters to create a full // node network handler. type handlerConfig struct { - Database ethdb.Database // Database for direct sync insertions - Chain *core.BlockChain // Blockchain to serve data from - TxPool txPool // Transaction pool to propagate from - Network uint64 // Network identifier to adfvertise - Sync downloader.SyncMode // Whether to fast or full sync - BloomCache uint64 // Megabytes to alloc for fast sync bloom - EventMux *event.TypeMux // Legacy event mux, deprecate for `feed` - Checkpoint *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges - Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged - server *p2p.Server - proxyServer *p2p.Server + Database ethdb.Database // Database for direct sync insertions + Chain *core.BlockChain // Blockchain to serve data from + TxPool txPool // Transaction pool to propagate from + Network uint64 // Network identifier to adfvertise + Sync downloader.SyncMode // Whether to fast or full sync + BloomCache uint64 // Megabytes to alloc for fast sync bloom + EventMux *event.TypeMux // Legacy event mux, deprecate for `feed` + Checkpoint *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges + Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged + server *p2p.Server + proxyServer *p2p.Server + MinSyncPeers int // The minimum peers required to sstart syncing } type handler struct { @@ -132,7 +133,6 @@ type handler struct { proxyServer *p2p.Server } -// newHandler returns a handler for all Ethereum chain management protocol. func newHandler(config *handlerConfig) (*handler, error) { // Create the protocol manager with the base fields if config.EventMux == nil { @@ -243,7 +243,7 @@ func newHandler(config *handlerConfig) (*handler, error) { return p.RequestTxs(hashes) } h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, h.txpool.AddRemotes, fetchTx) - h.chainSync = newChainSyncer(h) + h.chainSync = newChainSyncer(h, config.MinSyncPeers) return h, nil } diff --git a/eth/sync.go b/eth/sync.go index 254e907d97..62139ea22b 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -164,6 +164,9 @@ type chainSyncer struct { forced bool // true when force timer fired peerEventCh chan struct{} doneCh chan error // non-nil when sync is running + + // The minimum number of peers required to initiate a sync. + minSyncPeers int } // chainSyncOp is a scheduled sync operation. @@ -174,11 +177,17 @@ type chainSyncOp struct { head common.Hash } -// newChainSyncer creates a chainSyncer. -func newChainSyncer(handler *handler) *chainSyncer { +// newChainSyncer creates a chainSyncer, specifying a protocol manager and the +// minimum number of peers required to sync, if minSyncPeers is 0 then the +// default value is used. +func newChainSyncer(handler *handler, minSyncPeers int) *chainSyncer { + if minSyncPeers == 0 { + minSyncPeers = defaultMinSyncPeers + } return &chainSyncer{ - handler: handler, - peerEventCh: make(chan struct{}), + handler: handler, + peerEventCh: make(chan struct{}), + minSyncPeers: minSyncPeers, } } @@ -244,7 +253,7 @@ func (cs *chainSyncer) nextSyncOp() *chainSyncOp { } // Ensure we're at minimum peer count. - minPeers := defaultMinSyncPeers + minPeers := cs.minSyncPeers if cs.forced { minPeers = 1 } else if minPeers > cs.handler.maxPeers { diff --git a/test/node.go b/test/node.go index 841ef265ce..a6b700e0e2 100644 --- a/test/node.go +++ b/test/node.go @@ -53,6 +53,7 @@ var ( baseEthConfig = ð.Config{ SyncMode: downloader.FullSync, + MinSyncPeers: 1, DatabaseCache: 256, DatabaseHandles: 256, TxPool: core.DefaultTxPoolConfig, From 727c3e54b7ba70fc471ab718de4d9aac17772bf2 Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Wed, 29 Sep 2021 22:57:25 +0100 Subject: [PATCH 05/21] Allow modification of dial and connection timeouts Now dialHistoryExpiration and inboundThrottleTime can be configured via config. This allows for the writing of fast tests. --- p2p/dial.go | 8 +++++++- p2p/server.go | 29 +++++++++++++++++++++-------- test/node.go | 8 +++++--- 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/p2p/dial.go b/p2p/dial.go index 70855907d2..f55a5dada5 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -139,6 +139,9 @@ type dialConfig struct { log log.Logger clock mclock.Clock rand *mrand.Rand + + // The time waited before redialling a certain node + dialHistoryExpiration time.Duration } func (cfg dialConfig) withDefaults() dialConfig { @@ -157,6 +160,9 @@ func (cfg dialConfig) withDefaults() dialConfig { seed := int64(binary.BigEndian.Uint64(seedb)) cfg.rand = mrand.New(mrand.NewSource(seed)) } + if cfg.dialHistoryExpiration == 0 { + cfg.dialHistoryExpiration = dialHistoryExpiration + } return cfg } @@ -456,7 +462,7 @@ func (d *dialScheduler) removeFromStaticPool(idx int) { func (d *dialScheduler) startDial(task *dialTask) { d.log.Trace("Starting p2p dial", "id", task.dest.ID(), "ip", task.dest.IP(), "flag", task.flags) hkey := string(task.dest.ID().Bytes()) - d.history.add(hkey, d.clock.Now().Add(dialHistoryExpiration)) + d.history.add(hkey, d.clock.Now().Add(d.dialHistoryExpiration)) d.dialing[task.dest.ID()] = task go func() { task.run(d) diff --git a/p2p/server.go b/p2p/server.go index 73072b45b9..c68f5d3667 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -170,6 +170,15 @@ type Config struct { Logger log.Logger `toml:",omitempty"` clock mclock.Clock + + // DialHistoryExpiration is the time waited between dialling a specific node. + DialHistoryExpiration time.Duration + + // InboundThrottleTime is used to rate limit inbound connection attempts + // from a specific IP. If setting up a small private network this should + // probably be set smaller than DialHistoryExpiration to avoid lots of + // failed dial attempts. + InboundThrottleTime time.Duration } // Server manages all peer connections. @@ -525,6 +534,9 @@ func (srv *Server) Start() (err error) { if srv.running { return errors.New("server already running") } + if srv.InboundThrottleTime == 0 { + srv.InboundThrottleTime = inboundThrottleTime + } srv.running = true srv.log = srv.Config.Logger if srv.log == nil { @@ -708,13 +720,14 @@ func (srv *Server) setupDiscovery() error { func (srv *Server) setupDialScheduler() { config := dialConfig{ - self: srv.localnode.ID(), - maxDialPeers: srv.maxDialedConns(), - maxActiveDials: srv.MaxPendingPeers, - log: srv.Logger, - netRestrict: srv.NetRestrict, - dialer: srv.Dialer, - clock: srv.clock, + self: srv.localnode.ID(), + maxDialPeers: srv.maxDialedConns(), + maxActiveDials: srv.MaxPendingPeers, + log: srv.Logger, + netRestrict: srv.NetRestrict, + dialer: srv.Dialer, + clock: srv.clock, + dialHistoryExpiration: srv.DialHistoryExpiration, } if srv.ntab != nil { config.resolver = srv.ntab @@ -1107,7 +1120,7 @@ func (srv *Server) checkInboundConn(remoteIP net.IP) error { if !netutil.IsLAN(remoteIP) && srv.inboundHistory.contains(remoteIP.String()) { return fmt.Errorf("too many attempts") } - srv.inboundHistory.add(remoteIP.String(), now.Add(inboundThrottleTime)) + srv.inboundHistory.add(remoteIP.String(), now.Add(srv.InboundThrottleTime)) return nil } diff --git a/test/node.go b/test/node.go index a6b700e0e2..fceb9bd11e 100644 --- a/test/node.go +++ b/test/node.go @@ -38,9 +38,11 @@ var ( Name: "celo", Version: params.Version, P2P: p2p.Config{ - MaxPeers: 100, - NoDiscovery: true, - ListenAddr: "0.0.0.0:0", + MaxPeers: 100, + NoDiscovery: true, + ListenAddr: "0.0.0.0:0", + InboundThrottleTime: 200 * time.Millisecond, + DialHistoryExpiration: 210 * time.Millisecond, }, NoUSB: true, // It is important that HTTPHost and WSHost remain the same. This From 960df1555a7395cc7cca1f6f2cf06d4fbe85a199 Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Wed, 29 Sep 2021 22:04:00 +0100 Subject: [PATCH 06/21] Fix use of waitgroup in core.Start Ensure that Add is called before Wait. --- consensus/istanbul/core/handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consensus/istanbul/core/handler.go b/consensus/istanbul/core/handler.go index 546252ed51..58689153b0 100644 --- a/consensus/istanbul/core/handler.go +++ b/consensus/istanbul/core/handler.go @@ -47,6 +47,8 @@ func (c *core) Start() error { // Tests will handle events itself, so we have to make subscribeEvents() // be able to call in test. c.subscribeEvents() + + c.handlerWg.Add(1) go c.handleEvents() return nil @@ -95,8 +97,6 @@ func (c *core) handleEvents() { // Clear state defer c.handlerWg.Done() - c.handlerWg.Add(1) - for { logger := c.newLogger("func", "handleEvents") select { From 0e44393cf2c357ced36fa1cfac369d988a415859 Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Wed, 29 Sep 2021 22:06:34 +0100 Subject: [PATCH 07/21] Make backend.Backend wait for goroutines to close backend.Backend.Commit was starting a goroutine but not waiting for it to complete. This commit adds a wait group to wait for the goroutine to complete when backend.Backend is closed. --- consensus/istanbul/backend/backend.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/consensus/istanbul/backend/backend.go b/consensus/istanbul/backend/backend.go index 4456dcc660..0a1d3cab5b 100644 --- a/consensus/istanbul/backend/backend.go +++ b/consensus/istanbul/backend/backend.go @@ -211,6 +211,7 @@ type Backend struct { aWallets atomic.Value + wg sync.WaitGroup core istanbulCore.Engine logger log.Logger db ethdb.Database @@ -460,6 +461,7 @@ func (sb *Backend) Close() error { concatenatedErrs = fmt.Errorf("%v; %v", concatenatedErrs, err) } } + sb.wg.Wait() return concatenatedErrs } @@ -550,7 +552,11 @@ func (sb *Backend) Commit(proposal istanbul.Proposal, aggregatedSeal types.Istan return err } } - go sb.onNewConsensusBlock(block, result.Receipts, result.Logs, result.State) + sb.wg.Add(1) + go func() { + defer sb.wg.Done() + sb.onNewConsensusBlock(block, result.Receipts, result.Logs, result.State) + }() return nil } From 1efbe2057b9f7d26534ce3d256888a58c472d2f8 Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Tue, 21 Sep 2021 22:17:09 +0100 Subject: [PATCH 08/21] Ensure the miner is closed Also make miner wait for goroutines to end at Shutdown. --- eth/backend.go | 1 + miner/miner.go | 4 +++- miner/worker.go | 10 +++++++++- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index aacddc63f7..68ee89d409 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -608,6 +608,7 @@ func (s *Ethereum) Stop() error { close(s.closeBloomHandler) s.txPool.Stop() s.miner.Stop() + s.miner.Close() s.blockchain.Stop() s.engine.Close() rawdb.PopUncleanShutdownMarker(s.chainDb) diff --git a/miner/miner.go b/miner/miner.go index 789f71540f..6e22cdf230 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -171,6 +171,7 @@ func (miner *Miner) update() { miner.worker.stop() case <-miner.exitCh: miner.worker.close() + miner.exitCh <- struct{}{} return } } @@ -187,7 +188,8 @@ func (miner *Miner) Stop() { } func (miner *Miner) Close() { - close(miner.exitCh) + miner.exitCh <- struct{}{} + <-miner.exitCh } func (miner *Miner) Mining() bool { diff --git a/miner/worker.go b/miner/worker.go index b55170f5d7..6aa4a854bc 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -84,6 +84,7 @@ type worker struct { // Channels startCh chan struct{} exitCh chan struct{} + wg sync.WaitGroup mu sync.RWMutex // The lock used to protect the validator, txFeeRecipient and extra fields validator common.Address @@ -129,7 +130,11 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus // Subscribe events for blockchain worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) - go worker.mainLoop() + worker.wg.Add(1) + go func() { + defer worker.wg.Done() + worker.mainLoop() + }() return worker } @@ -236,6 +241,7 @@ func (w *worker) isRunning() bool { func (w *worker) close() { atomic.StoreInt32(&w.running, 0) close(w.exitCh) + w.wg.Wait() } // constructAndSubmitNewBlock constructs a new block and if the worker is running, submits @@ -373,6 +379,8 @@ func (w *worker) mainLoop() { var cancel context.CancelFunc var wg sync.WaitGroup + // Ensure that block construction is complete before exiting this function. + defer wg.Wait() txsCh := make(chan core.NewTxsEvent, txChanSize) generateNewBlock := func() { From dd0d0c932be60ec92cefd1cb4e3e731fafbf0dc7 Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Wed, 22 Sep 2021 15:33:24 +0100 Subject: [PATCH 09/21] Fix use of waitgroup in validatorPeerHandler Ensure Add is called before Wait --- consensus/istanbul/backend/peer_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/istanbul/backend/peer_handler.go b/consensus/istanbul/backend/peer_handler.go index 43d27b568f..b962bb4c9f 100644 --- a/consensus/istanbul/backend/peer_handler.go +++ b/consensus/istanbul/backend/peer_handler.go @@ -51,6 +51,7 @@ func (vph *validatorPeerHandler) startThread() error { } vph.threadRunning = true + vph.threadWg.Add(1) go vph.thread() return nil @@ -72,7 +73,6 @@ func (vph *validatorPeerHandler) stopThread() error { } func (vph *validatorPeerHandler) thread() { - vph.threadWg.Add(1) defer vph.threadWg.Done() refreshValidatorPeersTicker := time.NewTicker(1 * time.Minute) From 18bd409bf3c35f49780fb16efba7d01f0efa1882 Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Wed, 22 Sep 2021 22:44:26 +0100 Subject: [PATCH 10/21] Fix data race on current (round state) access core.current was being read via calls to core.ParentCommits from the worker's main loop, and written via calls to core.Stop coming from the miner's update loop. The fix is to add a mutext to synchronise access to current. ================== WARNING: DATA RACE Write at 0x00c00114dd98 by goroutine 329: github.com/celo-org/celo-blockchain/consensus/istanbul/core.(*core).Stop() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/core/handler.go:67 +0x8e github.com/celo-org/celo-blockchain/consensus/istanbul/backend.(*Backend).StopValidating() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/backend/engine.go:707 +0x193 github.com/celo-org/celo-blockchain/miner.(*worker).stop() /home/pierspowlesland/projects/celo-blockchain/miner/worker.go:220 +0xbb github.com/celo-org/celo-blockchain/miner.(*Miner).update() /home/pierspowlesland/projects/celo-blockchain/miner/miner.go:173 +0x6fb Previous read at 0x00c00114dd98 by goroutine 471: github.com/celo-org/celo-blockchain/consensus/istanbul/core.(*core).ParentCommits() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/core/core.go:204 +0x4a github.com/celo-org/celo-blockchain/consensus/istanbul/backend.(*Backend).addParentSeal.func1() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/backend/engine.go:989 +0x38a github.com/celo-org/celo-blockchain/consensus/istanbul/backend.(*Backend).addParentSeal() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/backend/engine.go:1019 +0x3ab github.com/celo-org/celo-blockchain/consensus/istanbul/backend.(*Backend).Prepare() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/backend/engine.go:404 +0x404 github.com/celo-org/celo-blockchain/miner.prepareBlock() /home/pierspowlesland/projects/celo-blockchain/miner/block.go:89 +0x62f github.com/celo-org/celo-blockchain/miner.(*worker).constructAndSubmitNewBlock() /home/pierspowlesland/projects/celo-blockchain/miner/worker.go:242 +0x84 github.com/celo-org/celo-blockchain/miner.(*worker).mainLoop.func1.1() /home/pierspowlesland/projects/celo-blockchain/miner/worker.go:379 +0x5c --- consensus/istanbul/core/core.go | 6 ++++++ consensus/istanbul/core/handler.go | 2 ++ 2 files changed, 8 insertions(+) diff --git a/consensus/istanbul/core/core.go b/consensus/istanbul/core/core.go index e8ba04911c..836cfd0fc2 100644 --- a/consensus/istanbul/core/core.go +++ b/consensus/istanbul/core/core.go @@ -127,6 +127,7 @@ type core struct { rsdb RoundStateDB current RoundState + currentMu sync.RWMutex handlerWg *sync.WaitGroup roundChangeSet *roundChangeSet @@ -200,6 +201,11 @@ func (c *core) CurrentView() *istanbul.View { func (c *core) CurrentRoundState() RoundState { return c.current } func (c *core) ParentCommits() MessageSet { + // ParentCommits is called by Prepare which is called by miner.worker the + // main loop, we need to synchronise this access with the write which + // occurs in Stop, which is called from the miner's update loop. + c.currentMu.RLock() + defer c.currentMu.RUnlock() if c.current == nil { return nil } diff --git a/consensus/istanbul/core/handler.go b/consensus/istanbul/core/handler.go index 58689153b0..3b13530741 100644 --- a/consensus/istanbul/core/handler.go +++ b/consensus/istanbul/core/handler.go @@ -62,6 +62,8 @@ func (c *core) Stop() error { // Make sure the handler goroutine exits c.handlerWg.Wait() + c.currentMu.Lock() + defer c.currentMu.Unlock() c.current = nil return nil } From 0722c7d7fa5aca080f8b77912529a01988d26d3e Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Wed, 22 Sep 2021 22:53:13 +0100 Subject: [PATCH 11/21] Fix resendRoundChangeMessageTimer data race core.resetResendRoundChangeTimer was being called from the main engine go-routine and core.stopResendRoundChangeTimer was being called via core.Stop from miner's update go-routine. Both accessed resendRoundChangeMessageTimer without syncronisation. This commit adds a mutex to ensure synchronised access to resendRoundChangeMessageTimer. ================== WARNING: DATA RACE Read at 0x00c01377f568 by goroutine 72: github.com/celo-org/celo-blockchain/consensus/istanbul/core.(*core).stopResendRoundChangeTimer() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/core/core.go:710 +0x47 github.com/celo-org/celo-blockchain/consensus/istanbul/core.(*core).stopAllTimers() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/core/core.go:719 +0x54 github.com/celo-org/celo-blockchain/consensus/istanbul/core.(*core).Stop() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/core/handler.go:61 +0x44 github.com/celo-org/celo-blockchain/consensus/istanbul/backend.(*Backend).StopValidating() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/backend/engine.go:707 +0x193 github.com/celo-org/celo-blockchain/miner.(*worker).stop() /home/pierspowlesland/projects/celo-blockchain/miner/worker.go:220 +0xbb github.com/celo-org/celo-blockchain/miner.(*Miner).update() /home/pierspowlesland/projects/celo-blockchain/miner/miner.go:173 +0x6fb Previous write at 0x00c01377f568 by goroutine 312: github.com/celo-org/celo-blockchain/consensus/istanbul/core.(*core).resetResendRoundChangeTimer() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/core/core.go:780 +0x368 github.com/celo-org/celo-blockchain/consensus/istanbul/core.(*core).resendRoundChangeMessage() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/core/core.go:795 +0x7b github.com/celo-org/celo-blockchain/consensus/istanbul/core.(*core).handleResendRoundChangeEvent() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/core/handler.go:247 +0x353 github.com/celo-org/celo-blockchain/consensus/istanbul/core.(*core).handleEvents() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/core/handler.go:143 +0x77e --- consensus/istanbul/core/core.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/consensus/istanbul/core/core.go b/consensus/istanbul/core/core.go index 836cfd0fc2..ebb9ba8520 100644 --- a/consensus/istanbul/core/core.go +++ b/consensus/istanbul/core/core.go @@ -115,8 +115,9 @@ type core struct { finalCommittedSub *event.TypeMuxSubscription timeoutSub *event.TypeMuxSubscription - futurePreprepareTimer *time.Timer - resendRoundChangeMessageTimer *time.Timer + futurePreprepareTimer *time.Timer + resendRoundChangeMessageTimer *time.Timer + resendRoundChangeMessageTimerMu sync.Mutex roundChangeTimer *time.Timer roundChangeTimerMu sync.RWMutex @@ -688,6 +689,8 @@ func (c *core) stopRoundChangeTimer() { } func (c *core) stopResendRoundChangeTimer() { + c.resendRoundChangeMessageTimerMu.Lock() + defer c.resendRoundChangeMessageTimerMu.Unlock() if c.resendRoundChangeMessageTimer != nil { c.resendRoundChangeMessageTimer.Stop() c.resendRoundChangeMessageTimer = nil @@ -777,6 +780,8 @@ func (c *core) resetResendRoundChangeTimer() { resendTimeout = maxResendTimeout } view := &istanbul.View{Sequence: c.current.Sequence(), Round: c.current.DesiredRound()} + c.resendRoundChangeMessageTimerMu.Lock() + defer c.resendRoundChangeMessageTimerMu.Unlock() c.resendRoundChangeMessageTimer = time.AfterFunc(resendTimeout, func() { c.sendEvent(resendRoundChangeEvent{view}) }) From f2f2ef1f2f8c1cb754a0758c54ba11eee70817ba Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Wed, 22 Sep 2021 23:02:55 +0100 Subject: [PATCH 12/21] Fix other current (round state) data race The current round state of the engine was being read without synchronisation via core.CurrentView by the miner the current round state was also being written in the engines stop function. The fix is to use the pre-existing mutex to syncronise accesses through CurrentView. ================== WARNING: DATA RACE Read at 0x00c000bb20b8 by goroutine 506: github.com/celo-org/celo-blockchain/consensus/istanbul/core.(*core).CurrentView() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/core/core.go:195 +0x4a github.com/celo-org/celo-blockchain/consensus/istanbul/backend.waitCoreToReachSequence() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/backend/engine.go:1203 +0x24c github.com/celo-org/celo-blockchain/consensus/istanbul/backend.(*Backend).addParentSeal.func1() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/backend/engine.go:982 +0x130 github.com/celo-org/celo-blockchain/consensus/istanbul/backend.(*Backend).addParentSeal() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/backend/engine.go:1019 +0x3ab github.com/celo-org/celo-blockchain/consensus/istanbul/backend.(*Backend).Prepare() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/backend/engine.go:404 +0x404 github.com/celo-org/celo-blockchain/miner.prepareBlock() /home/pierspowlesland/projects/celo-blockchain/miner/block.go:89 +0x62f github.com/celo-org/celo-blockchain/miner.(*worker).constructAndSubmitNewBlock() /home/pierspowlesland/projects/celo-blockchain/miner/worker.go:242 +0x84 github.com/celo-org/celo-blockchain/miner.(*worker).mainLoop.func1.1() /home/pierspowlesland/projects/celo-blockchain/miner/worker.go:379 +0x5c Previous write at 0x00c000bb20b8 by goroutine 93: github.com/celo-org/celo-blockchain/consensus/istanbul/core.(*core).Stop() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/core/handler.go:67 +0x8e github.com/celo-org/celo-blockchain/consensus/istanbul/backend.(*Backend).StopValidating() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/backend/engine.go:707 +0x193 github.com/celo-org/celo-blockchain/miner.(*worker).stop() /home/pierspowlesland/projects/celo-blockchain/miner/worker.go:220 +0xbb github.com/celo-org/celo-blockchain/miner.(*Miner).update() /home/pierspowlesland/projects/celo-blockchain/miner/miner.go:173 +0x6fb --- consensus/istanbul/core/core.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/consensus/istanbul/core/core.go b/consensus/istanbul/core/core.go index ebb9ba8520..09ceabda65 100644 --- a/consensus/istanbul/core/core.go +++ b/consensus/istanbul/core/core.go @@ -193,6 +193,11 @@ func (c *core) SetAddress(address common.Address) { } func (c *core) CurrentView() *istanbul.View { + // CurrentView is called by Prepare which is called by miner.worker the + // main loop, we need to synchronise this access with the write which occurs + // in Stop, which is called from the miner's update loop. + c.currentMu.RLock() + defer c.currentMu.RUnlock() if c.current == nil { return nil } From bebc4f01759e5315d45bcebd9a7d935d216da12f Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Wed, 22 Sep 2021 23:05:55 +0100 Subject: [PATCH 13/21] Fix data race in downloader The eth.chainSyncer main loop periodically calls eth.chainSyncer.startSync which fires off a sync operation in its own goroutine which will eventually result in Downloader.Cancel being called. The eth.chainSyncer also calls Downloader.Cancel when exiting its main loop. This resulted in a data race on Downloader.ancientLimit, the solution was to add a mutex to lock over Downloader.ancientLimit in Downloader.Cancel. ================== WARNING: DATA RACE Write at 0x00c01a0e3ce0 by goroutine 466: github.com/celo-org/celo-blockchain/eth/downloader.(*Downloader).Cancel() /home/pierspowlesland/projects/celo-blockchain/eth/downloader/downloader.go:615 +0x68 github.com/celo-org/celo-blockchain/eth/downloader.(*Downloader).synchronise() /home/pierspowlesland/projects/celo-blockchain/eth/downloader/downloader.go:444 +0x601 github.com/celo-org/celo-blockchain/eth/downloader.(*Downloader).Synchronise() /home/pierspowlesland/projects/celo-blockchain/eth/downloader/downloader.go:354 +0xb5 github.com/celo-org/celo-blockchain/eth.(*ProtocolManager).doSync() /home/pierspowlesland/projects/celo-blockchain/eth/sync.go:327 +0x1ad github.com/celo-org/celo-blockchain/eth.(*chainSyncer).startSync.func1() /home/pierspowlesland/projects/celo-blockchain/eth/sync.go:303 +0x59 Previous write at 0x00c01a0e3ce0 by goroutine 182: github.com/celo-org/celo-blockchain/eth/downloader.(*Downloader).Cancel() /home/pierspowlesland/projects/celo-blockchain/eth/downloader/downloader.go:615 +0x68 github.com/celo-org/celo-blockchain/eth/downloader.(*Downloader).Terminate() /home/pierspowlesland/projects/celo-blockchain/eth/downloader/downloader.go:635 +0xcc github.com/celo-org/celo-blockchain/eth.(*chainSyncer).loop() /home/pierspowlesland/projects/celo-blockchain/eth/sync.go:232 +0x65b --- eth/downloader/downloader.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 96546c2ab5..389f714790 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -121,6 +121,7 @@ type Downloader struct { notified int32 committed int32 ancientLimit uint64 // The maximum block number which can be regarded as ancient data. + ancientLimitMu sync.Mutex // Channels headerCh chan dataPack // Channel receiving inbound block headers @@ -646,6 +647,10 @@ func (d *Downloader) cancel() { func (d *Downloader) Cancel() { d.cancel() d.cancelWg.Wait() + d.ancientLimitMu.Lock() + defer d.ancientLimitMu.Unlock() + d.ancientLimit = 0 + log.Debug("Reset ancient limit to zero") } // Terminate interrupts the downloader, canceling all pending operations. From 3c44dfe13cc733f45778040f9de4c8e78ba92e05 Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Wed, 22 Sep 2021 23:19:50 +0100 Subject: [PATCH 14/21] Fix data race on p2p peer purposes p2p.PurposeFlag is not threadsafe but was being accessed from many go-routines without syncronisation via p2p.Peer.HasPurpose. The solution was to lock over the access in HasPurpose using the mutex that already existed to synchronise access to a peer's purposes flag. ================== WARNING: DATA RACE Read at 0x00c001520680 by goroutine 291: github.com/celo-org/celo-blockchain/p2p.(*Peer).HasPurpose() /home/pierspowlesland/projects/celo-blockchain/p2p/peer.go:167 +0x5fd github.com/celo-org/celo-blockchain/eth.(*peer).PurposeIsSet() /home/pierspowlesland/projects/celo-blockchain/eth/peer.go:696 +0x516 github.com/celo-org/celo-blockchain/eth.(*ProtocolManager).FindPeers() /home/pierspowlesland/projects/celo-blockchain/eth/handler.go:1055 +0x612 github.com/celo-org/celo-blockchain/consensus/istanbul/backend.(*validatorPeerHandler).ReplaceValidatorPeers() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/backend/peer_handler.go:147 +0x201 github.com/celo-org/celo-blockchain/consensus/istanbul/backend/internal/enodes.(*ValidatorEnodeDB).RefreshValPeers() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/backend/internal/enodes/val_enode_db.go:480 +0x641 github.com/celo-org/celo-blockchain/consensus/istanbul/backend.(*Backend).RefreshValPeers() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/backend/backend.go:846 +0x264 github.com/celo-org/celo-blockchain/consensus/istanbul/backend.(*Backend).StartValidating() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/backend/engine.go:691 +0x3a5 github.com/celo-org/celo-blockchain/miner.(*worker).start() /home/pierspowlesland/projects/celo-blockchain/miner/worker.go:210 +0x1a2 github.com/celo-org/celo-blockchain/miner.(*Miner).update() /home/pierspowlesland/projects/celo-blockchain/miner/miner.go:161 +0x6a4 Previous write at 0x00c001520680 by goroutine 343: github.com/celo-org/celo-blockchain/p2p.(*Peer).AddPurpose() /home/pierspowlesland/projects/celo-blockchain/p2p/peer.go:148 +0xd5 github.com/celo-org/celo-blockchain/p2p.(*Server).run.func3() /home/pierspowlesland/projects/celo-blockchain/p2p/server.go:866 +0x2d1 github.com/celo-org/celo-blockchain/p2p.(*Server).run() /home/pierspowlesland/projects/celo-blockchain/p2p/server.go:916 +0x241a --- p2p/peer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/p2p/peer.go b/p2p/peer.go index 554b70031c..1a166f128c 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -166,6 +166,8 @@ func (p *Peer) RemovePurpose(purpose PurposeFlag) { } func (p *Peer) HasPurpose(purpose PurposeFlag) bool { + p.purposesMu.Lock() + defer p.purposesMu.Unlock() return p.purposes.IsSet(purpose) } From 270f67210d80a7af701867f1585345a92f14000c Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Wed, 22 Sep 2021 23:20:14 +0100 Subject: [PATCH 15/21] Fix data race on announceRunning announceRunning is set in istanbul.Backend.StartAnnouncing and istanbul.Backend.StopAnnouncing and also was being accessed without syncronisation from within istanbul.Backend.newChainHead which is executed repeatedly from within its own go-routine. This commit read locks over announceRunning in istanbul.Backend.newChainHead to avoid a data race. Unfortunately we can't see both sides of this data race, it seems the stack was corrupted somehow since the first stack trace cannot be correct (how can fmt.Fscanf call TestStartStopValidators ?). Ive shortened the stack with an elipsis because it was also very long. ================== WARNING: DATA RACE Write at 0x00c001448520 by goroutine 354: github.com/celo-org/celo-blockchain/consensus/istanbul/backend.(*Backend).StopAnnouncing() /home/pierspowlesland/projects/celo-blockchain/consensus/istanbul/backend/engine.go:749 +0x12b github.com/celo-org/celo-blockchain/eth.(*Ethereum).stopAnnounce() /home/pierspowlesland/projects/celo-blockchain/eth/backend.go:507 +0x2ad github.com/celo-org/celo-blockchain/eth.(*Ethereum).Stop() /home/pierspowlesland/projects/celo-blockchain/eth/backend.go:575 +0x2ae github.com/celo-org/celo-blockchain/node.(*Node).stopServices() /home/pierspowlesland/projects/celo-blockchain/node/node.go:309 +0x150 github.com/celo-org/celo-blockchain/node.(*Node).Close() /home/pierspowlesland/projects/celo-blockchain/node/node.go:221 +0x1bb github.com/celo-org/celo-blockchain/test.(*Node).Close() /home/pierspowlesland/projects/celo-blockchain/test/node.go:312 +0x3b8 github.com/celo-org/celo-blockchain/test.Network.Shutdown() /home/pierspowlesland/projects/celo-blockchain/test/node.go:498 +0x9a runtime.call32() /usr/local/go/src/runtime/asm_amd64.s:551 +0x3d testing.(*T).FailNow() :1 +0x44 github.com/stretchr/testify/require.NoError() /home/pierspowlesland/go/pkg/mod/github.com/stretchr/testify@v1.4.0/require/require.go:974 +0x104 github.com/celo-org/celo-blockchain/e2e_test_test.TestStartStopValidators() /home/pierspowlesland/projects/celo-blockchain/e2e_test/e2e_test.go:168 +0x15ce fmt.Fscanf() /usr/local/go/src/fmt/scan.go:143 +0xee fmt.Sscanf() /usr/local/go/src/fmt/scan.go:114 +0x191 github.com/syndtr/goleveldb/leveldb/storage.fsParseName() /home/pierspowlesland/go/pkg/mod/github.com/syndtr/goleveldb@v1.0.1-0.20190923125748-758128399b1d/leveldb/storage/file_storage.go:643 +0xa6 github.com/syndtr/goleveldb/leveldb/storage.(*fileStorage).List() /home/pierspowlesland/go/pkg/mod/github.com/syndtr/goleveldb@v1.0.1-0.20190923125748-758128399b1d/leveldb/storage/file_storage.go:458 +0x309 github.com/syndtr/goleveldb/leveldb.(*DB).checkAndCleanFiles() /home/pierspowlesland/go/pkg/mod/github.com/syndtr/goleveldb@v1.0.1-0.20190923125748-758128399b1d/leveldb/db_util.go:52 +0x301 github.com/syndtr/goleveldb/leveldb.openDB() /home/pierspowlesland/go/pkg/mod/github.com/syndtr/goleveldb@v1.0.1-0.20190923125748-758128399b1d/leveldb/db.go:136 +0x9bb fmt.(*ss).doScanf() /usr/local/go/src/fmt/scan.go:1230 +0x411 fmt.Fscanf() ... Previous read at 0x00c001448520 by goroutine 203: [failed to restore the stack] --- consensus/istanbul/backend/handler.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/consensus/istanbul/backend/handler.go b/consensus/istanbul/backend/handler.go index ab4caeff2d..8c19df3414 100644 --- a/consensus/istanbul/backend/handler.go +++ b/consensus/istanbul/backend/handler.go @@ -346,6 +346,8 @@ func (sb *Backend) newChainHead(newBlock *types.Block) { sb.logger.Info("Validator Election Results", "address", sb.ValidatorAddress(), "elected", valSetIndex >= 0, "number", newBlock.Number().Uint64()) + sb.announceMu.Lock() + defer sb.announceMu.Unlock() if sb.announceRunning { sb.logger.Trace("At end of epoch and going to refresh validator peers", "new_block_number", newBlock.Number().Uint64()) if err := sb.RefreshValPeers(); err != nil { From 2988ea9e2dd7022949a04a3a586adc9822212cb0 Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Tue, 5 Oct 2021 13:38:47 +0100 Subject: [PATCH 16/21] Add comment to clarify lock over announceRunning --- consensus/istanbul/backend/handler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/consensus/istanbul/backend/handler.go b/consensus/istanbul/backend/handler.go index 8c19df3414..a62c9585e1 100644 --- a/consensus/istanbul/backend/handler.go +++ b/consensus/istanbul/backend/handler.go @@ -346,6 +346,9 @@ func (sb *Backend) newChainHead(newBlock *types.Block) { sb.logger.Info("Validator Election Results", "address", sb.ValidatorAddress(), "elected", valSetIndex >= 0, "number", newBlock.Number().Uint64()) + // We lock here to protect access to announceRunning because + // announceRunning is also accessed in StartAnnouncing and + // StopAnnouncing. sb.announceMu.Lock() defer sb.announceMu.Unlock() if sb.announceRunning { From b428549d10ce75fe9e2545a531f66594a437336a Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Tue, 19 Oct 2021 18:22:21 +0100 Subject: [PATCH 17/21] Remove WaitGroup & go-routine from Backend.Commit Removing the go-routine does not seem to have any effect on the tests. --- consensus/istanbul/backend/backend.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/consensus/istanbul/backend/backend.go b/consensus/istanbul/backend/backend.go index 0a1d3cab5b..6e2e788d16 100644 --- a/consensus/istanbul/backend/backend.go +++ b/consensus/istanbul/backend/backend.go @@ -211,7 +211,6 @@ type Backend struct { aWallets atomic.Value - wg sync.WaitGroup core istanbulCore.Engine logger log.Logger db ethdb.Database @@ -461,7 +460,6 @@ func (sb *Backend) Close() error { concatenatedErrs = fmt.Errorf("%v; %v", concatenatedErrs, err) } } - sb.wg.Wait() return concatenatedErrs } @@ -552,12 +550,7 @@ func (sb *Backend) Commit(proposal istanbul.Proposal, aggregatedSeal types.Istan return err } } - sb.wg.Add(1) - go func() { - defer sb.wg.Done() - sb.onNewConsensusBlock(block, result.Receipts, result.Logs, result.State) - }() - + sb.onNewConsensusBlock(block, result.Receipts, result.Logs, result.State) return nil } From 704ec7dfaa22c15b0d585e2b06ec2559d85fbf1b Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Wed, 20 Oct 2021 09:32:08 +0100 Subject: [PATCH 18/21] Remove unused debug function --- test/node.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/test/node.go b/test/node.go index fceb9bd11e..21ddf8b972 100644 --- a/test/node.go +++ b/test/node.go @@ -15,7 +15,6 @@ import ( "github.com/celo-org/celo-blockchain/accounts/keystore" "github.com/celo-org/celo-blockchain/common" - "github.com/celo-org/celo-blockchain/common/hexutil" "github.com/celo-org/celo-blockchain/consensus/istanbul" "github.com/celo-org/celo-blockchain/consensus/istanbul/backend" "github.com/celo-org/celo-blockchain/core" @@ -237,11 +236,6 @@ func (n *Node) Start() error { return nil } -// Provides a short representation of a hash -func shortAddress(a common.Address) string { - return hexutil.Encode(a[:2]) -} - func (n *Node) AddPeers(nodes ...*Node) { // Add the given nodes as peers. Although this means that nodes can reach // each other nodes don't start sending consensus messages to another node From 7c463961f0e1853caec3849c67640b39981a48ee Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Wed, 20 Oct 2021 09:58:36 +0100 Subject: [PATCH 19/21] Add debug stack trace to help track down failures It seems that very occasionally we are hitting this error condition and its not clear why. --- test/tracker.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/tracker.go b/test/tracker.go index 2ed7d276ae..ca0eee1f02 100644 --- a/test/tracker.go +++ b/test/tracker.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "runtime/debug" "sync" ethereum "github.com/celo-org/celo-blockchain" @@ -192,7 +193,7 @@ func (tr *Tracker) await(ctx context.Context, condition func() bool) error { // StopTracking shuts down all the goroutines in the tracker. func (tr *Tracker) StopTracking() error { if tr.sub == nil { - return errors.New("attempted to stop already stopped tracker") + return fmt.Errorf("attempted to stop already stopped tracker - stack: \n%s", string(debug.Stack())) } tr.sub.Unsubscribe() close(tr.stopCh) From e414f5f6636268a9b17a10f4ef76fc10451e79c7 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Thu, 7 Oct 2021 15:47:50 +0200 Subject: [PATCH 20/21] cherry-pick: core: improve shutdown synchronization in BlockChain (#22853) This change removes misuses of sync.WaitGroup in BlockChain. Before this change, block insertion modified the WaitGroup counter in order to ensure that Stop would wait for pending operations to complete. This was racy and could even lead to crashes if Stop was called at an unfortunate time. The issue is resolved by adding a specialized 'closable' mutex, which prevents chain modifications after stopping while also synchronizing writers with each other. Co-authored-by: Felix Lange --- core/blockchain.go | 146 +++++++++++++++++++++++++--------------- core/blockchain_test.go | 4 +- internal/syncx/mutex.go | 64 ++++++++++++++++++ 3 files changed, 156 insertions(+), 58 deletions(-) create mode 100644 internal/syncx/mutex.go diff --git a/core/blockchain.go b/core/blockchain.go index a940413996..d5e396c4f3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -42,6 +42,7 @@ import ( "github.com/celo-org/celo-blockchain/core/vm/vmcontext" "github.com/celo-org/celo-blockchain/ethdb" "github.com/celo-org/celo-blockchain/event" + "github.com/celo-org/celo-blockchain/internal/syncx" "github.com/celo-org/celo-blockchain/log" "github.com/celo-org/celo-blockchain/metrics" "github.com/celo-org/celo-blockchain/params" @@ -84,6 +85,7 @@ var ( errInsertionInterrupted = errors.New("insertion is interrupted") errCommitmentNotFound = errors.New("randomness commitment not found") + errChainStopped = errors.New("blockchain is stopped") ) const ( @@ -187,7 +189,9 @@ type BlockChain struct { scope event.SubscriptionScope genesisBlock *types.Block - chainmu sync.RWMutex // blockchain insertion lock + // This mutex synchronizes chain write operations. + // Readers don't need to take it, they can just read the database. + chainmu *syncx.ClosableMutex currentBlock atomic.Value // Current head of the block chain currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) @@ -200,8 +204,8 @@ type BlockChain struct { txLookupCache *lru.Cache // Cache for the most recent transaction lookup data. futureBlocks *lru.Cache // future blocks are blocks added for later processing - quit chan struct{} // blockchain quit channel - wg sync.WaitGroup // chain processing wait group for shutting down + wg sync.WaitGroup // + quit chan struct{} // shutdown signal, closed in Stop. running int32 // 0 if chain is running, 1 when stopped procInterrupt int32 // interrupt signaler for block processing @@ -240,6 +244,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par Preimages: cacheConfig.Preimages, }), quit: make(chan struct{}), + chainmu: syncx.NewClosableMutex(), shouldPreserve: shouldPreserve, bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, @@ -283,6 +288,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par if err := bc.loadLastState(); err != nil { return nil, err } + // Make sure the state associated with the block is available head := bc.CurrentBlock() if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil { @@ -311,6 +317,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } } } + // Ensure that a previous crash in SetHead doesn't leave extra ancients if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { var ( @@ -362,6 +369,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } } } + // Load any existing snapshot, regenerating it if loading failed if bc.cacheConfig.SnapshotLimit > 0 { // If the chain was rewound past the snapshot persistent layer (causing @@ -377,14 +385,19 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } bc.snaps, _ = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, head.Root(), !bc.cacheConfig.SnapshotWait, true, recover) } - // Take ownership of this particular state - go bc.update() + + // Start future block processor. + bc.wg.Add(1) + go bc.futureBlocksLoop() + + // Start tx indexer/unindexer. if txLookupLimit != nil { bc.txLookupLimit = *txLookupLimit bc.wg.Add(1) go bc.maintainTxIndex(txIndexBlock) } + // If periodic cache journal is required, spin it up. if bc.cacheConfig.TrieCleanRejournal > 0 { if bc.cacheConfig.TrieCleanRejournal < time.Minute { @@ -510,7 +523,9 @@ func (bc *BlockChain) SetHead(head uint64) error { // // The method returns the block number where the requested root cap was found. func (bc *BlockChain) SetHeadBeyondRoot(head uint64, root common.Hash) (uint64, error) { - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } defer bc.chainmu.Unlock() // Track the block number of the requested root hash @@ -656,8 +671,11 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { if _, err := trie.NewSecure(block.Root(), bc.stateCache.TrieDB()); err != nil { return err } - // If all checks out, manually set the head block - bc.chainmu.Lock() + + // If all checks out, manually set the head block. + if !bc.chainmu.TryLock() { + return errChainStopped + } bc.currentBlock.Store(block) headBlockGauge.Update(int64(block.NumberU64())) bc.chainmu.Unlock() @@ -725,7 +743,9 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { if err := bc.SetHead(0); err != nil { return err } - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return errChainStopped + } defer bc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain @@ -755,8 +775,10 @@ func (bc *BlockChain) Export(w io.Writer) error { // ExportN writes a subset of the active chain to the given writer. func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { - bc.chainmu.RLock() - defer bc.chainmu.RUnlock() + if !bc.chainmu.TryLock() { + return errChainStopped + } + defer bc.chainmu.Unlock() if first > last { return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last) @@ -998,10 +1020,21 @@ func (bc *BlockChain) Stop() { if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) { return } - // Unsubscribe all subscriptions registered from blockchain + + // Unsubscribe all subscriptions registered from blockchain. bc.scope.Close() + + // Signal shutdown to all goroutines. close(bc.quit) bc.StopInsert() + + // Now wait for all chain modifications to end and persistent goroutines to exit. + // + // Note: Close waits for the mutex to become available, i.e. any running chain + // modification will have exited when Close returns. Since we also called StopInsert, + // the mutex should become available quickly. It cannot be taken again after Close has + // returned. + bc.chainmu.Close() bc.wg.Wait() // Ensure that the entirety of the state snapshot is journalled to disk. @@ -1012,6 +1045,7 @@ func (bc *BlockChain) Stop() { log.Error("Failed to journal state snapshot", "err", err) } } + // Ensure the state of a recent block is also stored to disk before exiting. // We're writing three different states to catch different restart scenarios: // - HEAD: So we don't need to reprocess any blocks in the general case @@ -1155,7 +1189,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // updateHead updates the head fast sync block if the inserted blocks are better // and returns an indicator whether the inserted blocks are canonical. updateHead := func(head *types.Block) bool { - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return false + } + defer bc.chainmu.Unlock() // Rewind may have occurred, skip in that case. if bc.CurrentHeader().Number.Cmp(head.Number()) >= 0 { @@ -1164,11 +1201,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ rawdb.WriteHeadFastBlockHash(bc.db, head.Hash()) bc.currentFastBlock.Store(head) headFastBlockGauge.Update(int64(head.NumberU64())) - bc.chainmu.Unlock() return true } } - bc.chainmu.Unlock() return false } // writeAncient writes blockchain and corresponding receipt chain into ancient store. @@ -1402,8 +1437,9 @@ var lastWrite uint64 // but does not write any state. This is used to construct competing side forks // up to the point where they exceed the canonical total difficulty. func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) { - bc.wg.Add(1) - defer bc.wg.Done() + if bc.insertStopped() { + return errInsertionInterrupted + } batch := bc.db.NewBatch() rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td) @@ -1417,9 +1453,6 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e // writeKnownBlock updates the head block flag with a known block // and introduces chain reorg if necessary. func (bc *BlockChain) writeKnownBlock(block *types.Block) error { - bc.wg.Add(1) - defer bc.wg.Done() - current := bc.CurrentBlock() if block.ParentHash() != current.Hash() { if err := bc.reorg(current, block); err != nil { @@ -1433,7 +1466,9 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error { // InsertPreprocessedBlock inserts a block which is already processed. // It can only insert the new Head block func (bc *BlockChain) InsertPreprocessedBlock(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB) error { - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return errInsertionInterrupted + } defer bc.chainmu.Unlock() // check we are trying to insert the NEXT block @@ -1448,8 +1483,9 @@ func (bc *BlockChain) InsertPreprocessedBlock(block *types.Block, receipts []*ty // insertPreprocessedBlock writes the block and all associated state to the database, // but is expects the chain mutex to be held. func (bc *BlockChain) insertPreprocessedBlock(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { - bc.wg.Add(1) - defer bc.wg.Done() + if bc.insertStopped() { + return NonStatTy, errInsertionInterrupted + } randomCommitment := common.Hash{} if istEngine, isIstanbul := bc.engine.(consensus.Istanbul); isIstanbul { @@ -1649,14 +1685,9 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { bc.blockProcFeed.Send(true) defer bc.blockProcFeed.Send(false) - // Remove already known canon-blocks - var ( - block, prev *types.Block - ) - // Do a sanity check that the provided chain is actually ordered and linked + // Do a sanity check that the provided chain is actually ordered and linked. for i := 1; i < len(chain); i++ { - block = chain[i] - prev = chain[i-1] + block, prev := chain[i], chain[i-1] if block.NumberU64() != prev.NumberU64()+1 || block.ParentHash() != prev.Hash() { // Chain broke ancestry, log a message (programming error) and skip insertion log.Error("Non contiguous block insert", "number", block.Number(), "hash", block.Hash(), @@ -1666,14 +1697,13 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { prev.Hash().Bytes()[:4], i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4]) } } - // Pre-checks passed, start the full block imports - bc.wg.Add(1) - bc.chainmu.Lock() - n, err := bc.insertChain(chain, true) - bc.chainmu.Unlock() - bc.wg.Done() - return n, err + // Pre-check passed, start the full block imports. + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } + defer bc.chainmu.Unlock() + return bc.insertChain(chain, true) } // InsertChainWithoutSealVerification works exactly the same @@ -1682,14 +1712,11 @@ func (bc *BlockChain) InsertChainWithoutSealVerification(block *types.Block) (in bc.blockProcFeed.Send(true) defer bc.blockProcFeed.Send(false) - // Pre-checks passed, start the full block imports - bc.wg.Add(1) - bc.chainmu.Lock() - n, err := bc.insertChain(types.Blocks([]*types.Block{block}), false) - bc.chainmu.Unlock() - bc.wg.Done() - - return n, err + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } + defer bc.chainmu.Unlock() + return bc.insertChain(types.Blocks([]*types.Block{block}), false) } // insertChain is the internal implementation of InsertChain, which assumes that @@ -1701,10 +1728,11 @@ func (bc *BlockChain) InsertChainWithoutSealVerification(block *types.Block) (in // is imported, but then new canon-head is added before the actual sidechain // completes, then the historic state could be pruned again func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, error) { - // If the chain is terminating, don't even bother starting up - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + // If the chain is terminating, don't even bother starting up. + if bc.insertStopped() { return 0, nil } + // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) @@ -1739,8 +1767,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er // First block (and state) is known // 1. We did a roll-back, and should now do a re-import // 2. The block is stored as a sidechain, and is lying about it's stateroot, and passes a stateroot - // from the canonical chain, which has not been verified. - // Skip all known blocks that are behind us + // from the canonical chain, which has not been verified. + // Skip all known blocks that are behind us. var ( current = bc.CurrentBlock() localTd = bc.GetTd(current.Hash(), current.NumberU64()) @@ -1856,9 +1884,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er lastCanon = block continue } + // Retrieve the parent block and it's state to execute on top start := time.Now() - parent := it.previous() if parent == nil { parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) @@ -1870,7 +1898,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er // Enable prefetching to pull in trie node paths while processing transactions statedb.StartPrefetcher("chain") activeState = statedb - // If we have a followup block, run that against the current state to pre-cache // transactions and probabilistically some of the account/storage trie nodes. var followupInterrupt uint32 @@ -1888,6 +1915,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er }(time.Now(), followup, throwaway, &followupInterrupt) } } + // Process block using the parent state as reference point substart := time.Now() receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) @@ -1896,6 +1924,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er atomic.StoreUint32(&followupInterrupt, 1) return it.index, err } + // Update the metrics touched during block processing accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them @@ -1970,6 +1999,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er dirty, _ := bc.stateCache.TrieDB().Size() stats.report(chain, it.index, dirty) } + // Any blocks remaining here? The only ones we care about are the future ones if block != nil && errors.Is(err, consensus.ErrFutureBlock) { if err := bc.addFutureBlock(block); err != nil { @@ -2296,7 +2326,10 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return nil } -func (bc *BlockChain) update() { +// futureBlocksLoop processes the 'future block' queue. +func (bc *BlockChain) futureBlocksLoop() { + defer bc.wg.Done() + futureTimer := time.NewTicker(5 * time.Second) defer futureTimer.Stop() for { @@ -2333,6 +2366,7 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) { } rawdb.IndexTransactions(bc.db, from, ancients, bc.quit) } + // indexBlocks reindexes or unindexes transactions depending on user configuration indexBlocks := func(tail *uint64, head uint64, done chan struct{}) { defer func() { done <- struct{}{} }() @@ -2365,6 +2399,7 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) { rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit) } } + // Any reindexing done, start listening to chain events and moving the index window var ( done chan struct{} // Non-nil if background unindexing or reindexing routine is active. @@ -2432,12 +2467,11 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int, co return i, err } - // Make sure only one thread manipulates the chain at once - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } defer bc.chainmu.Unlock() - bc.wg.Add(1) - defer bc.wg.Done() _, err := bc.hc.InsertHeaderChain(chain, start) return 0, err } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 128b4049e0..00c7247b46 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -162,7 +162,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { blockchain.reportBlock(block, receipts, err) return err } - blockchain.chainmu.Lock() + blockchain.chainmu.MustLock() rawdb.WriteTd(blockchain.db, block.Hash(), block.NumberU64(), block.TotalDifficulty()) rawdb.WriteBlock(blockchain.db, block) statedb.Commit(false) @@ -180,7 +180,7 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error return err } // Manually insert the header into the database, but don't reorganise (allows subsequent testing) - blockchain.chainmu.Lock() + blockchain.chainmu.MustLock() rawdb.WriteTd(blockchain.db, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Number, big.NewInt(1))) rawdb.WriteHeader(blockchain.db, header) blockchain.chainmu.Unlock() diff --git a/internal/syncx/mutex.go b/internal/syncx/mutex.go new file mode 100644 index 0000000000..96a21986c6 --- /dev/null +++ b/internal/syncx/mutex.go @@ -0,0 +1,64 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package syncx contains exotic synchronization primitives. +package syncx + +// ClosableMutex is a mutex that can also be closed. +// Once closed, it can never be taken again. +type ClosableMutex struct { + ch chan struct{} +} + +func NewClosableMutex() *ClosableMutex { + ch := make(chan struct{}, 1) + ch <- struct{}{} + return &ClosableMutex{ch} +} + +// TryLock attempts to lock cm. +// If the mutex is closed, TryLock returns false. +func (cm *ClosableMutex) TryLock() bool { + _, ok := <-cm.ch + return ok +} + +// MustLock locks cm. +// If the mutex is closed, MustLock panics. +func (cm *ClosableMutex) MustLock() { + _, ok := <-cm.ch + if !ok { + panic("mutex closed") + } +} + +// Unlock unlocks cm. +func (cm *ClosableMutex) Unlock() { + select { + case cm.ch <- struct{}{}: + default: + panic("Unlock of already-unlocked ClosableMutex") + } +} + +// Close locks the mutex, then closes it. +func (cm *ClosableMutex) Close() { + _, ok := <-cm.ch + if !ok { + panic("Close of already-closed ClosableMutex") + } + close(cm.ch) +} From 1ab51ee370510db23c90f58ae44be21845157b14 Mon Sep 17 00:00:00 2001 From: Piers Powlesland Date: Thu, 21 Oct 2021 20:27:12 +0100 Subject: [PATCH 21/21] Dont print expected errors on e2e test shutdown The start stop test would sometimes time out while nodes were stopped and the Network.Shutdown method would print errors relating to trying to stop a node twice. The shutdown mechanism has been updated to allow for more flexibility in deciding what errors are printed, allowing us to avoid printing double shutdown errors when we have manually stopped nodes. --- e2e_test/e2e_test.go | 27 +++++++++++++++++------ test/node.go | 51 +++++++++++++++++++++++++++----------------- test/tracker.go | 6 +++--- 3 files changed, 54 insertions(+), 30 deletions(-) diff --git a/e2e_test/e2e_test.go b/e2e_test/e2e_test.go index 0bb31ac8f0..68b29022b2 100644 --- a/e2e_test/e2e_test.go +++ b/e2e_test/e2e_test.go @@ -3,10 +3,12 @@ package e2e_test import ( "context" "errors" + "fmt" "testing" "time" "github.com/celo-org/celo-blockchain/core/types" + "github.com/celo-org/celo-blockchain/node" "github.com/celo-org/celo-blockchain/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -25,9 +27,9 @@ func TestSendCelo(t *testing.T) { accounts := test.Accounts(3) gc, ec, err := test.BuildConfig(accounts) require.NoError(t, err) - network, err := test.NewNetwork(accounts, gc, ec) + network, shutdown, err := test.NewNetwork(accounts, gc, ec) require.NoError(t, err) - defer network.Shutdown() + defer shutdown() ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() @@ -52,9 +54,9 @@ func TestEpochBlockMarshaling(t *testing.T) { // and it needs to be < (epoch -2). ec.Istanbul.Epoch = 6 ec.Istanbul.DefaultLookbackWindow = 3 - network, err := test.NewNetwork(accounts, gc, ec) + network, shutdown, err := test.NewNetwork(accounts, gc, ec) require.NoError(t, err) - defer network.Shutdown() + defer shutdown() ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() @@ -76,10 +78,21 @@ func TestStartStopValidators(t *testing.T) { accounts := test.Accounts(4) gc, ec, err := test.BuildConfig(accounts) require.NoError(t, err) - network, err := test.NewNetwork(accounts, gc, ec) + network, _, err := test.NewNetwork(accounts, gc, ec) require.NoError(t, err) - defer network.Shutdown() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + + // We define our own shutdown function because we don't want to print + // errors about already stopped nodes. Since this test can fail while we + // have stopped nodes. + defer func() { + for _, err := range network.Shutdown() { + if !errors.Is(err, test.ErrTrackerAlreadyStopped) && !errors.Is(err, node.ErrNodeStopped) { + fmt.Println(err.Error()) + } + } + }() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*40) defer cancel() var txs []*types.Transaction diff --git a/test/node.go b/test/node.go index 21ddf8b972..6b74b809a0 100644 --- a/test/node.go +++ b/test/node.go @@ -277,18 +277,19 @@ func (n *Node) GossipEnodeCertificatge() error { // Close shuts down the node and releases all resources and removes the datadir // unless an error is returned, in which case there is no guarantee that all -// resources are released. +// resources are released. It is assumed that this is only called after calling +// Start otherwise it will panic. func (n *Node) Close() error { err := n.Tracker.StopTracking() if err != nil { return err } n.WsClient.Close() - if n.Node != nil { - err = n.Node.Close() // This also shuts down the Eth service + err = n.Node.Close() // This also shuts down the Eth service + if err != nil { + return err } - os.RemoveAll(n.Config.DataDir) - return err + return os.RemoveAll(n.Config.DataDir) } // SendCeloTracked functions like SendCelo but also waits for the transaction to be processed. @@ -396,10 +397,11 @@ func BuildConfig(accounts *env.AccountsConfig) (*genesis.Config, *eth.Config, er // NewNetwork generates a network of nodes that are running and mining. For // each provided validator account a corresponding node is created and each // node is also assigned a developer account, there must be at least as many -// developer accounts provided as validator accounts. If there is an error it -// will be returned immediately, meaning that some nodes may be running and -// others not. -func NewNetwork(accounts *env.AccountsConfig, gc *genesis.Config, ec *eth.Config) (Network, error) { +// developer accounts provided as validator accounts. A shutdown function is +// also returned which will shutdown and clean up the network when called. In +// the case that an error is returned the shutdown function will be nil and so +// no attempt should be made to call it. +func NewNetwork(accounts *env.AccountsConfig, gc *genesis.Config, ec *eth.Config) (Network, func(), error) { // Copy eth istanbul config fields to the genesis istanbul config. // There is a ticket to remove this duplication of config. @@ -414,17 +416,17 @@ func NewNetwork(accounts *env.AccountsConfig, gc *genesis.Config, ec *eth.Config genesis, err := genesis.GenerateGenesis(accounts, gc, "../compiled-system-contracts") if err != nil { - return nil, fmt.Errorf("failed to generate genesis: %v", err) + return nil, nil, fmt.Errorf("failed to generate genesis: %v", err) } va := accounts.ValidatorAccounts() da := accounts.DeveloperAccounts() - network := make([]*Node, len(va)) - for i := range va { + var network Network = make([]*Node, len(va)) + for i := range va { n, err := NewNode(&va[i], &da[i], baseNodeConfig, ec, genesis) if err != nil { - return nil, fmt.Errorf("failed to build node for network: %v", err) + return nil, nil, fmt.Errorf("failed to build node for network: %v", err) } network[i] = n } @@ -448,11 +450,18 @@ func NewNetwork(accounts *env.AccountsConfig, gc *genesis.Config, ec *eth.Config for i := range network { err := network[i].GossipEnodeCertificatge() if err != nil { - return nil, err + network.Shutdown() + return nil, nil, err } } - return network, nil + shutdown := func() { + for _, err := range network.Shutdown() { + fmt.Println(err.Error()) + } + } + + return network, shutdown, nil } // AwaitTransactions ensures that the entire network has processed the provided transactions. @@ -477,17 +486,19 @@ func (n Network) AwaitBlock(ctx context.Context, num uint64) error { return nil } -// Shutdown closes all nodes in the network, any errors that are encountered are -// printed to stdout. -func (n Network) Shutdown() { - for _, node := range n { +// Shutdown closes all nodes in the network, any errors encountered when +// shutting down nodes are returned in a slice. +func (n Network) Shutdown() []error { + var errors []error + for i, node := range n { if node != nil { err := node.Close() if err != nil { - fmt.Printf("error shutting down node %v: %v", node.Address.String(), err) + errors = append(errors, fmt.Errorf("error shutting down node %v index %d: %w", node.Address.String(), i, err)) } } } + return errors } // ValueTransferTransaction builds a signed value transfer transaction from the diff --git a/test/tracker.go b/test/tracker.go index ca0eee1f02..5ca3e325c7 100644 --- a/test/tracker.go +++ b/test/tracker.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "runtime/debug" "sync" ethereum "github.com/celo-org/celo-blockchain" @@ -15,7 +14,8 @@ import ( ) var ( - errStopped = errors.New("transaction tracker closed") + errStopped = errors.New("transaction tracker closed") + ErrTrackerAlreadyStopped = errors.New("attempted to stop already stopped tracker") ) // Tracker tracks processed blocks and transactions through a subscription with @@ -193,7 +193,7 @@ func (tr *Tracker) await(ctx context.Context, condition func() bool) error { // StopTracking shuts down all the goroutines in the tracker. func (tr *Tracker) StopTracking() error { if tr.sub == nil { - return fmt.Errorf("attempted to stop already stopped tracker - stack: \n%s", string(debug.Stack())) + return ErrTrackerAlreadyStopped } tr.sub.Unsubscribe() close(tr.stopCh)