From b54509749797a98f7795bd3abdb426fb0c7a72fc Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Mon, 16 Oct 2023 16:22:42 +0000 Subject: [PATCH] miner: multi signers (#5135) closes: https://github.com/spacemeshos/go-spacemesh/issues/5087 data is separated into shared data (beacon and active set) and signer specific data. both beacon and activeset are used from shared data, until smeshers generated a reference ballot. once any smesher generated a ballot it will be using data recorded in the reference ballot. build method now loops over all signers (copied at the start of the layer). there are parts that have to be run once for all signers and parts that makes sense to run in parallel. serial parts: - loading share data (beacon and active set) - deciding on mesh hash - tally votes & encode votes tortoise calls parallel parts: - loading data (it can be also run serially, but it was convenient to run it in parallel) - computing eligibilities (this is done once per node startup) - selecting txs - publishing proposal. this is the most important to avoid blocking serially in Publish while it runs validation worker pool (errgroup) is limited by number of cores as there is no network requests during parallel work. --- miner/metrics.go | 4 +- miner/proposal_builder.go | 427 ++++++++++++++++++++------------- miner/proposal_builder_test.go | 153 +++++++++--- node/node.go | 364 +++++++++++++++++++++++----- proposals/util.go | 12 +- 5 files changed, 697 insertions(+), 263 deletions(-) diff --git a/miner/metrics.go b/miner/metrics.go index 2361af1a56..40eb79d4a5 100644 --- a/miner/metrics.go +++ b/miner/metrics.go @@ -33,8 +33,8 @@ func (lt *latencyTracker) total() time.Duration { func (lt *latencyTracker) MarshalLogObject(encoder log.ObjectEncoder) error { encoder.AddDuration("data", lt.data.Sub(lt.start)) encoder.AddDuration("tortoise", lt.tortoise.Sub(lt.data)) - encoder.AddDuration("txs", lt.txs.Sub(lt.tortoise)) - encoder.AddDuration("hash", lt.hash.Sub(lt.txs)) + encoder.AddDuration("hash", lt.hash.Sub(lt.tortoise)) + encoder.AddDuration("txs", lt.txs.Sub(lt.hash)) encoder.AddDuration("publish", lt.publish.Sub(lt.hash)) total := lt.total() encoder.AddDuration("total", total) diff --git a/miner/proposal_builder.go b/miner/proposal_builder.go index 984536f8aa..6a1d42d826 100644 --- a/miner/proposal_builder.go +++ b/miner/proposal_builder.go @@ -6,11 +6,14 @@ import ( "context" "errors" "fmt" + "runtime" + "slices" "sort" + "sync" "time" "golang.org/x/exp/maps" - "golang.org/x/exp/slices" + "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" @@ -68,23 +71,37 @@ type ProposalBuilder struct { tortoise votesEncoder syncer system.SyncStateProvider + mu sync.Mutex + signers map[types.NodeID]*signerSession + shared sharedSession +} + +type signerSession struct { signer *signing.EdSigner - session *session + log log.Log + session session + latency latencyTracker } -// session per every signing key for the whole epoch. -type session struct { - epoch types.EpochID - beacon types.Beacon - atx types.ATXID - atxWeight uint64 - ref types.BallotID - prev types.LayerID - nonce types.VRFPostIndex - active struct { +// shared data for all signers in the epoch. +type sharedSession struct { + epoch types.EpochID + beacon types.Beacon + active struct { set types.ATXIDList weight uint64 } +} + +// session per every signing key for the whole epoch. +type session struct { + epoch types.EpochID + atx types.ATXID + atxWeight uint64 + ref types.BallotID + beacon types.Beacon + prev types.LayerID + nonce types.VRFPostIndex eligibilities struct { proofs map[types.LayerID][]types.VotingEligibility slots uint32 @@ -132,9 +149,9 @@ type config struct { hdist uint32 minActiveSetWeight uint64 networkDelay time.Duration - + workersLimit int // used to determine whether a node has enough information on the active set this epoch - GoodAtxPercent int + goodAtxPercent int } func (c *config) MarshalLogObject(encoder log.ObjectEncoder) error { @@ -143,7 +160,7 @@ func (c *config) MarshalLogObject(encoder log.ObjectEncoder) error { encoder.AddUint32("hdist", c.hdist) encoder.AddUint64("min active weight", c.minActiveSetWeight) encoder.AddDuration("network delay", c.networkDelay) - encoder.AddInt("good atx percent", c.GoodAtxPercent) + encoder.AddInt("good atx percent", c.goodAtxPercent) return nil } @@ -157,6 +174,14 @@ func WithLayerSize(size uint32) Opt { } } +// WithWorkersLimit configures paralelization factor for builder operation when working with +// more than one signer. +func WithWorkersLimit(limit int) Opt { + return func(pb *ProposalBuilder) { + pb.cfg.workersLimit = limit + } +} + // WithLayerPerEpoch defines the number of layers per epoch. func WithLayerPerEpoch(layers uint32) Opt { return func(pb *ProposalBuilder) { @@ -191,14 +216,23 @@ func WithNetworkDelay(delay time.Duration) Opt { func WithMinGoodAtxPercent(percent int) Opt { return func(pb *ProposalBuilder) { - pb.cfg.GoodAtxPercent = percent + pb.cfg.goodAtxPercent = percent + } +} + +// WithSigners guarantees that builder will start execution with provided list of signers. +// Should be after logging. +func WithSigners(signers ...*signing.EdSigner) Opt { + return func(pb *ProposalBuilder) { + for _, signer := range signers { + pb.Register(signer) + } } } // New creates a struct of block builder type. func New( clock layerClock, - signer *signing.EdSigner, cdb *datastore.CachedDB, publisher pubsub.Publisher, trtl votesEncoder, @@ -207,14 +241,17 @@ func New( opts ...Opt, ) *ProposalBuilder { pb := &ProposalBuilder{ + cfg: config{ + workersLimit: runtime.NumCPU(), + }, logger: log.NewNop(), - signer: signer, clock: clock, cdb: cdb, publisher: publisher, tortoise: trtl, syncer: syncer, conState: conState, + signers: map[types.NodeID]*signerSession{}, } for _, opt := range opts { opt(pb) @@ -222,6 +259,18 @@ func New( return pb } +func (pb *ProposalBuilder) Register(signer *signing.EdSigner) { + pb.mu.Lock() + defer pb.mu.Unlock() + _, exist := pb.signers[signer.NodeID()] + if !exist { + pb.signers[signer.NodeID()] = &signerSession{ + signer: signer, + log: pb.logger.WithFields(log.String("signer", signer.NodeID().ShortString())), + } + } +} + // Start the loop that listens to layers and build proposals. func (pb *ProposalBuilder) Run(ctx context.Context) error { next := pb.clock.CurrentLayer().Add(1) @@ -240,16 +289,16 @@ func (pb *ProposalBuilder) Run(ctx context.Context) error { continue } next = current.Add(1) - sctx := log.WithNewSessionID(ctx) - if current <= types.GetEffectiveGenesis() || !pb.syncer.IsSynced(sctx) { + ctx := log.WithNewSessionID(ctx) + if current <= types.GetEffectiveGenesis() || !pb.syncer.IsSynced(ctx) { continue } if err := pb.build(ctx, current); err != nil { if errors.Is(err, errAtxNotAvailable) { pb.logger.With(). - Debug("signer is not active in epoch", log.Context(sctx), log.Uint32("lid", current.Uint32()), log.Err(err)) + Debug("signer is not active in epoch", log.Context(ctx), log.Uint32("lid", current.Uint32()), log.Err(err)) } else { - pb.logger.With().Warning("failed to build proposal", log.Context(sctx), log.Uint32("lid", current.Uint32()), log.Err(err)) + pb.logger.With().Warning("failed to build proposal", log.Context(ctx), log.Uint32("lid", current.Uint32()), log.Err(err)) } } } @@ -317,141 +366,174 @@ func (pb *ProposalBuilder) decideMeshHash(ctx context.Context, current types.Lay return mesh } -func (pb *ProposalBuilder) initSessionData(ctx context.Context, lid types.LayerID) error { - if pb.session == nil || pb.session.epoch != lid.GetEpoch() { - pb.session = &session{epoch: lid.GetEpoch()} +func (pb *ProposalBuilder) initSharedData(ctx context.Context, lid types.LayerID) error { + if pb.shared.epoch != lid.GetEpoch() { + pb.shared = sharedSession{epoch: lid.GetEpoch()} } - if pb.session.atx == types.EmptyATXID { - atx, err := atxs.GetByEpochAndNodeID(pb.cdb, pb.session.epoch-1, pb.signer.NodeID()) + if pb.shared.beacon == types.EmptyBeacon { + beacon, err := beacons.Get(pb.cdb, pb.shared.epoch) + if err != nil || beacon == types.EmptyBeacon { + return fmt.Errorf("missing beacon for epoch %d", pb.shared.epoch) + } + pb.shared.beacon = beacon + } + if pb.shared.active.set == nil { + weight, set, err := generateActiveSet( + pb.logger, + pb.cdb, + pb.shared.epoch, + pb.clock.LayerToTime(pb.shared.epoch.FirstLayer()), + pb.cfg.goodAtxPercent, + pb.cfg.networkDelay, + ) + if err != nil { + return err + } + pb.shared.active.set = set + pb.shared.active.weight = weight + } + return nil +} + +func (pb *ProposalBuilder) initSignerData( + ctx context.Context, + ss *signerSession, + lid types.LayerID, +) error { + if ss.session.epoch != lid.GetEpoch() { + ss.session = session{epoch: lid.GetEpoch()} + } + if ss.session.atx == types.EmptyATXID { + atx, err := atxs.GetByEpochAndNodeID(pb.cdb, ss.session.epoch-1, ss.signer.NodeID()) if err != nil { if errors.Is(err, sql.ErrNotFound) { err = errAtxNotAvailable } - return fmt.Errorf("get atx in epoch %v: %w", pb.session.epoch-1, err) + return fmt.Errorf("get atx in epoch %v: %w", ss.session.epoch-1, err) } - pb.session.atx = atx.ID() - pb.session.atxWeight = atx.GetWeight() + ss.session.atx = atx.ID() + ss.session.atxWeight = atx.GetWeight() } - if pb.session.nonce == 0 { - nonce, err := pb.cdb.VRFNonce(pb.signer.NodeID(), pb.session.epoch) + if ss.session.nonce == 0 { + nonce, err := pb.cdb.VRFNonce(ss.signer.NodeID(), ss.session.epoch) if err != nil { return fmt.Errorf("missing nonce: %w", err) } - pb.session.nonce = nonce + ss.session.nonce = nonce } - if pb.session.beacon == types.EmptyBeacon { - beacon, err := beacons.Get(pb.cdb, pb.session.epoch) - if err != nil || beacon == types.EmptyBeacon { - return fmt.Errorf("missing beacon for epoch %d", pb.session.epoch) - } - pb.session.beacon = beacon - } - if pb.session.prev == 0 { - prev, err := ballots.LastInEpoch(pb.cdb, pb.session.atx, pb.session.epoch) - switch { - case err == nil: - pb.session.prev = prev.Layer - case errors.Is(err, sql.ErrNotFound): - default: + if ss.session.prev == 0 { + prev, err := ballots.LastInEpoch(pb.cdb, ss.session.atx, ss.session.epoch) + if err != nil && !errors.Is(err, sql.ErrNotFound) { return err } + if err == nil { + ss.session.prev = prev.Layer + } } - if pb.session.ref == types.EmptyBallotID { - ballot, err := ballots.FirstInEpoch(pb.cdb, pb.session.atx, pb.session.epoch) + if ss.session.ref == types.EmptyBallotID { + ballot, err := ballots.FirstInEpoch(pb.cdb, ss.session.atx, ss.session.epoch) if err != nil && !errors.Is(err, sql.ErrNotFound) { return fmt.Errorf("get refballot %w", err) } if errors.Is(err, sql.ErrNotFound) { - weight, set, err := generateActiveSet( - pb.logger, - pb.cdb, - pb.signer.VRFSigner(), - pb.session.epoch, - pb.clock.LayerToTime(pb.session.epoch.FirstLayer()), - pb.cfg.GoodAtxPercent, - pb.cfg.networkDelay, - pb.session.atx, - pb.session.atxWeight, - ) - if err != nil { - return err - } - pb.session.active.set = set - pb.session.active.weight = weight - pb.session.eligibilities.slots = proposals.MustGetNumEligibleSlots( - pb.session.atxWeight, + ss.session.beacon = pb.shared.beacon + ss.session.eligibilities.slots = proposals.MustGetNumEligibleSlots( + ss.session.atxWeight, pb.cfg.minActiveSetWeight, - weight, + pb.shared.active.weight, pb.cfg.layerSize, pb.cfg.layersPerEpoch, ) } else { if ballot.EpochData == nil { - return fmt.Errorf("atx %d created invalid first ballot", pb.session.atx) - } - hash := ballot.EpochData.ActiveSetHash - set, err := activesets.Get(pb.cdb, hash) - if err != nil { - return fmt.Errorf("get activeset %s: %w", hash.String(), err) - } - var weight uint64 - for _, id := range set.Set { - atx, err := pb.cdb.GetAtxHeader(id) - if err != nil { - return err - } - weight += atx.GetWeight() + return fmt.Errorf("atx %d created invalid first ballot", ss.session.atx) } - pb.session.ref = ballot.ID() - pb.session.active.set = set.Set - pb.session.active.weight = weight - pb.session.eligibilities.slots = ballot.EpochData.EligibilityCount + ss.session.ref = ballot.ID() + ss.session.beacon = ballot.EpochData.Beacon + ss.session.eligibilities.slots = ballot.EpochData.EligibilityCount } } - if pb.session.eligibilities.proofs == nil { - pb.session.eligibilities.proofs = calcEligibilityProofs( - pb.signer.VRFSigner(), - pb.session.epoch, - pb.session.beacon, - pb.session.nonce, - pb.session.eligibilities.slots, + if ss.session.eligibilities.proofs == nil { + ss.session.eligibilities.proofs = calcEligibilityProofs( + ss.signer.VRFSigner(), + ss.session.epoch, + ss.session.beacon, + ss.session.nonce, + ss.session.eligibilities.slots, pb.cfg.layersPerEpoch, ) - pb.logger.With().Info("proposal eligibilities for an epoch", log.Inline(pb.session)) + ss.log.With().Info("proposal eligibilities for an epoch", log.Inline(&ss.session)) events.EmitEligibilities( - pb.session.epoch, - pb.session.beacon, - pb.session.atx, - uint32(len(pb.session.active.set)), - pb.session.eligibilities.proofs, + ss.session.epoch, + ss.session.beacon, + ss.session.atx, + uint32(len(pb.shared.active.set)), + ss.session.eligibilities.proofs, ) } return nil } func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error { - latency := latencyTracker{start: time.Now()} - if err := pb.initSessionData(ctx, lid); err != nil { + start := time.Now() + if err := pb.initSharedData(ctx, lid); err != nil { return err } - latency.data = time.Now() - if lid <= pb.session.prev { - return fmt.Errorf("layer %d was already built", lid) + pb.mu.Lock() + // don't accept registration in the middle of computing proposals + signers := maps.Values(pb.signers) + pb.mu.Unlock() + + var eg errgroup.Group + eg.SetLimit(pb.cfg.workersLimit) + for _, ss := range signers { + ss := ss + ss.latency.start = start + eg.Go(func() error { + if err := pb.initSignerData(ctx, ss, lid); err != nil { + if errors.Is(err, errAtxNotAvailable) { + ss.log.With().Info("smesher doesn't have atx that targets this epoch", + log.Context(ctx), ss.session.epoch.Field(), + ) + } else { + return err + } + } + if lid <= ss.session.prev { + return fmt.Errorf( + "layer %d was already built by signer %s", + lid, + ss.signer.NodeID().ShortString(), + ) + } + ss.session.prev = lid + ss.latency.data = time.Now() + return nil + }) + } + if err := eg.Wait(); err != nil { + return err } - pb.session.prev = lid - proofs := pb.session.eligibilities.proofs[lid] - if len(proofs) == 0 { - pb.logger.With().Debug("not eligible for proposal in layer", - log.Context(ctx), log.Uint32("lid", lid.Uint32()), - log.Uint32("epoch", lid.GetEpoch().Uint32()), - ) + any := false + for _, ss := range signers { + if n := len(ss.session.eligibilities.proofs[lid]); n == 0 { + ss.log.With().Debug("not eligible for proposal in layer", + log.Context(ctx), + lid.Field(), lid.GetEpoch().Field()) + continue + } else { + ss.log.With().Debug("eligible for proposals in layer", + log.Context(ctx), + lid.Field(), log.Int("num proposals", n), + ) + any = true + } + } + if !any { return nil } - pb.logger.With().Debug("eligible for proposals in layer", - log.Context(ctx), log.Uint32("lid", lid.Uint32()), log.Int("num proposals", len(proofs)), - ) pb.tortoise.TallyVotes(ctx, lid) // TODO(dshulyak) get rid from the EncodeVotesWithCurrent option in a followup @@ -460,45 +542,78 @@ func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error { if err != nil { return fmt.Errorf("encode votes: %w", err) } - latency.tortoise = time.Now() - - txs := pb.conState.SelectProposalTXs(lid, len(proofs)) - latency.txs = time.Now() + for _, ss := range signers { + ss.latency.tortoise = time.Now() + } meshHash := pb.decideMeshHash(ctx, lid) - latency.hash = time.Now() - - proposal := createProposal(pb.session, pb.signer, lid, txs, opinion, proofs, meshHash) + for _, ss := range signers { + ss.latency.hash = time.Now() + } - // needs to be saved before publishing, as we will query it in handler - if pb.session.ref == types.EmptyBallotID { - if err := activesets.Add(pb.cdb, proposal.EpochData.ActiveSetHash, &types.EpochActiveSet{ - Epoch: pb.session.epoch, - Set: pb.session.active.set, - }); err != nil && !errors.Is(err, sql.ErrObjectExists) { - return err + for _, ss := range signers { + proofs := ss.session.eligibilities.proofs[lid] + if len(proofs) == 0 { + ss.log.With().Debug("not eligible for proposal in layer", + log.Context(ctx), + lid.Field(), lid.GetEpoch().Field()) + continue } - } - if err = pb.publisher.Publish(ctx, pubsub.ProposalProtocol, codec.MustEncode(proposal)); err != nil { - return fmt.Errorf( - "failed to publish proposal %d/%s: %w", - proposal.Layer, - proposal.ID(), - err, + ss.log.With().Debug("eligible for proposals in layer", + log.Context(ctx), + lid.Field(), log.Int("num proposals", len(proofs)), ) - } - latency.publish = time.Now() - pb.logger.With(). - Info("proposal created", log.Context(ctx), log.Inline(proposal), log.Object("latency", &latency)) - proposalBuild.Observe(latency.total().Seconds()) - events.EmitProposal(lid, proposal.ID()) - events.ReportProposal(events.ProposalCreated, proposal) - return nil + txs := pb.conState.SelectProposalTXs(lid, len(proofs)) + ss.latency.txs = time.Now() + + // needs to be saved before publishing, as we will query it in handler + if ss.session.ref == types.EmptyBallotID { + if err := activesets.Add(pb.cdb, pb.shared.active.set.Hash(), &types.EpochActiveSet{ + Epoch: ss.session.epoch, + Set: pb.shared.active.set, + }); err != nil && !errors.Is(err, sql.ErrObjectExists) { + return err + } + } + + ss := ss + eg.Go(func() error { + proposal := createProposal( + &ss.session, + pb.shared.beacon, + pb.shared.active.set, + ss.signer, + lid, + txs, + opinion, + proofs, + meshHash, + ) + if err := pb.publisher.Publish(ctx, pubsub.ProposalProtocol, codec.MustEncode(proposal)); err != nil { + ss.log.Error("failed to publish proposal", + log.Context(ctx), + log.Uint32("lid", proposal.Layer.Uint32()), + log.Stringer("id", proposal.ID()), + log.Err(err), + ) + } else { + ss.latency.publish = time.Now() + ss.log.With().Info("proposal created", log.Context(ctx), log.Inline(proposal), log.Object("latency", &ss.latency)) + proposalBuild.Observe(ss.latency.total().Seconds()) + events.EmitProposal(lid, proposal.ID()) + events.ReportProposal(events.ProposalCreated, proposal) + } + return nil + }) + } + return eg.Wait() } func createProposal( session *session, + beacon types.Beacon, + activeset types.ATXIDList, signer *signing.EdSigner, lid types.LayerID, txs []types.TransactionID, @@ -524,8 +639,8 @@ func createProposal( if session.ref == types.EmptyBallotID { p.Ballot.RefBallot = types.EmptyBallotID p.Ballot.EpochData = &types.EpochData{ - ActiveSetHash: session.active.set.Hash(), - Beacon: session.beacon, + ActiveSetHash: activeset.Hash(), + Beacon: beacon, EligibilityCount: session.eligibilities.slots, } } else { @@ -576,45 +691,30 @@ func activeSetFromBlock(db sql.Executor, bid types.BlockID) ([]types.ATXID, erro func activesFromFirstBlock( cdb *datastore.CachedDB, - signer *signing.VRFSigner, target types.EpochID, - ownAtx types.ATXID, - ownWeight uint64, ) (uint64, []types.ATXID, error) { set, err := ActiveSetFromEpochFirstBlock(cdb, target) if err != nil { return 0, nil, err } - var ( - totalWeight uint64 - ownIncluded bool - ) + var totalWeight uint64 for _, id := range set { - ownIncluded = ownIncluded || id == ownAtx atx, err := cdb.GetAtxHeader(id) if err != nil { return 0, nil, err } totalWeight += atx.GetWeight() } - if !ownIncluded { - // miner is not included in the active set derived from the epoch's first block - set = append(set, ownAtx) - totalWeight += ownWeight - } return totalWeight, set, nil } func generateActiveSet( logger log.Log, cdb *datastore.CachedDB, - signer *signing.VRFSigner, target types.EpochID, epochStart time.Time, goodAtxPercent int, networkDelay time.Duration, - ownAtx types.ATXID, - ownWeight uint64, ) (uint64, []types.ATXID, error) { var ( totalWeight uint64 @@ -626,12 +726,11 @@ func generateActiveSet( if err != nil { return err } - if grade != good && header.NodeID != signer.NodeID() { + if grade != good { logger.With().Info("atx omitted from active set", header.ID, log.Int("grade", int(grade)), log.Stringer("smesher", header.NodeID), - log.Bool("own", header.NodeID == signer.NodeID()), log.Time("received", header.Received), log.Time("epoch_start", epochStart), ) @@ -652,7 +751,7 @@ func generateActiveSet( // for all the atx and malfeasance proof. this active set is not usable. // TODO: change after timing info of ATXs and malfeasance proofs is sync'ed from peers as well var err error - totalWeight, set, err = activesFromFirstBlock(cdb, signer, target, ownAtx, ownWeight) + totalWeight, set, err = activesFromFirstBlock(cdb, target) if err != nil { return 0, nil, err } diff --git a/miner/proposal_builder_test.go b/miner/proposal_builder_test.go index 7e42ca624d..9b9b4e829e 100644 --- a/miner/proposal_builder_test.go +++ b/miner/proposal_builder_test.go @@ -1,10 +1,12 @@ package miner import ( + "bytes" "context" "errors" "math/rand" "os" + "sort" "testing" "time" @@ -218,13 +220,20 @@ type step struct { encodeVotesErr, publishErr error - expectProposal *types.Proposal - expectErr string + expectProposal *types.Proposal + expectProposals []*types.Proposal + expectErr string } func TestBuild(t *testing.T) { - signer, err := signing.NewEdSigner(signing.WithKeyFromRand(rand.New(rand.NewSource(10101)))) - require.NoError(t, err) + signers := make([]*signing.EdSigner, 4) + rng := rand.New(rand.NewSource(10101)) + for i := range signers { + signer, err := signing.NewEdSigner(signing.WithKeyFromRand(rng)) + require.NoError(t, err) + signers[i] = signer + } + signer := signers[0] defaults := []Opt{ WithLayerPerEpoch(types.GetLayersPerEpoch()), WithLayerSize(10), @@ -301,6 +310,7 @@ func TestBuild(t *testing.T) { gballot(types.BallotID{1}, types.ATXID{1}, signer.NodeID(), 15, &types.EpochData{ ActiveSetHash: types.ATXIDList{{1}, {2}}.Hash(), EligibilityCount: 5, + Beacon: types.Beacon{1}, }), }, activeset: types.ATXIDList{{1}, {2}}, @@ -320,46 +330,41 @@ func TestBuild(t *testing.T) { steps: []step{ { lid: 15, - expectErr: "atx not available", + expectErr: "missing beacon", + }, + { + lid: 15, + beacon: types.Beacon{1}, + expectErr: "empty active set", }, { lid: 15, atxs: []*types.VerifiedActivationTx{ - gatx(types.ATXID{10}, 2, signer.NodeID(), 1), + gatx(types.ATXID{20}, 2, types.NodeID{20}, 1), }, - expectErr: "missing nonce", }, { lid: 15, atxs: []*types.VerifiedActivationTx{ - gatx(types.ATXID{1}, 2, signer.NodeID(), 1, genAtxWithNonce(777)), + gatx(types.ATXID{10}, 2, signer.NodeID(), 1), }, - expectErr: "missing beacon", + expectErr: "missing nonce", }, { - lid: 16, - beacon: types.Beacon{1}, + lid: 16, + atxs: []*types.VerifiedActivationTx{ + gatx(types.ATXID{1}, 2, signer.NodeID(), 1, genAtxWithNonce(777)), + }, ballots: []*types.Ballot{ gballot(types.BallotID{1}, types.ATXID{10}, signer.NodeID(), 15, &types.EpochData{ ActiveSetHash: types.ATXIDList{{10}, {2}}.Hash(), EligibilityCount: 5, + Beacon: types.Beacon{1}, }), }, - expectErr: "get activeset", - }, - { - lid: 16, - activeset: types.ATXIDList{{10}, {2}}, - expectErr: "get ATXs from DB", - }, - { - lid: 16, - atxs: []*types.VerifiedActivationTx{ - gatx(types.ATXID{2}, 2, types.NodeID{1}, 1), - }, opinion: &types.Opinion{Hash: types.Hash32{1}}, txs: []types.TransactionID{{1}}, - latestComplete: 10, + latestComplete: 14, expectProposal: expectProposal( signer, 16, types.ATXID{10}, types.Opinion{Hash: types.Hash32{1}}, expectRef(types.BallotID{1}), @@ -406,6 +411,7 @@ func TestBuild(t *testing.T) { gballot(types.BallotID{1}, types.ATXID{1}, signer.NodeID(), 15, &types.EpochData{ ActiveSetHash: types.ATXIDList{{1}}.Hash(), EligibilityCount: 10, + Beacon: types.Beacon{1}, }), }, activeset: types.ATXIDList{{1}}, @@ -428,6 +434,7 @@ func TestBuild(t *testing.T) { gballot(types.BallotID{1}, types.ATXID{1}, signer.NodeID(), 15, &types.EpochData{ ActiveSetHash: types.ATXIDList{{1}}.Hash(), EligibilityCount: 10, + Beacon: types.Beacon{1}, }), }, activeset: types.ATXIDList{{1}}, @@ -435,7 +442,11 @@ func TestBuild(t *testing.T) { opinion: &types.Opinion{}, txs: []types.TransactionID{}, publishErr: errors.New("test publish"), - expectErr: "test publish", + expectProposal: expectProposal( + signer, 16, types.ATXID{1}, types.Opinion{}, + expectRef(types.BallotID{1}), + expectCounters(signer, 3, types.Beacon{1}, 777, 2, 5), + ), }, }, }, @@ -560,6 +571,7 @@ func TestBuild(t *testing.T) { gballot(types.BallotID{1}, types.ATXID{1}, signer.NodeID(), 15, &types.EpochData{ ActiveSetHash: types.ATXIDList{{1}}.Hash(), EligibilityCount: 10, + Beacon: types.Beacon{1}, }), }, activeset: types.ATXIDList{{1}}, @@ -611,6 +623,7 @@ func TestBuild(t *testing.T) { gballot(types.BallotID{1}, types.ATXID{1}, signer.NodeID(), 15, &types.EpochData{ ActiveSetHash: types.ATXIDList{{1}}.Hash(), EligibilityCount: 10, + Beacon: types.Beacon{1}, }), }, activeset: types.ATXIDList{{1}}, @@ -639,6 +652,45 @@ func TestBuild(t *testing.T) { }, }, }, + { + desc: "multi signers", + opts: []Opt{WithSigners(signers...), WithWorkersLimit(len(signers) / 2)}, + steps: []step{ + { + lid: 15, + beacon: types.Beacon{1}, + atxs: []*types.VerifiedActivationTx{ + gatx(types.ATXID{1}, 2, signers[0].NodeID(), 1, genAtxWithNonce(777)), + gatx(types.ATXID{2}, 2, signers[1].NodeID(), 1, genAtxWithNonce(999)), + }, + opinion: &types.Opinion{Hash: types.Hash32{1}}, + txs: []types.TransactionID{{1}, {2}}, + latestComplete: 14, + expectProposals: []*types.Proposal{ + expectProposal( + signers[0], 15, types.ATXID{1}, types.Opinion{Hash: types.Hash32{1}}, + expectEpochData( + gactiveset(types.ATXID{1}, types.ATXID{2}), + 25, + types.Beacon{1}, + ), + expectTxs([]types.TransactionID{{1}, {2}}), + expectCounters(signers[0], 3, types.Beacon{1}, 777, 0, 6, 9, 12, 16, 18, 20, 23), + ), + expectProposal( + signers[1], 15, types.ATXID{2}, types.Opinion{Hash: types.Hash32{1}}, + expectEpochData( + gactiveset(types.ATXID{1}, types.ATXID{2}), + 25, + types.Beacon{1}, + ), + expectTxs([]types.TransactionID{{1}, {2}}), + expectCounters(signers[1], 3, types.Beacon{1}, 999, 0, 4, 6, 8, 9, 17), + ), + }, + }, + }, + }, } { tc := tc t.Run(tc.desc, func(t *testing.T) { @@ -655,9 +707,10 @@ func TestBuild(t *testing.T) { clock.EXPECT().LayerToTime(gomock.Any()).Return(time.Unix(0, 0)).AnyTimes() - full := append(defaults, tc.opts...) - full = append(full, WithLogger(logtest.New(t))) - builder := New(clock, signer, cdb, publisher, tortoise, syncer, conState, full...) + full := append(defaults, WithLogger(logtest.New(t)), WithSigners(signer)) + full = append(full, tc.opts...) + builder := New(clock, cdb, publisher, tortoise, syncer, conState, full...) + var decoded chan *types.Proposal for _, step := range tc.steps { { if step.beacon != types.EmptyBeacon { @@ -710,33 +763,59 @@ func TestBuild(t *testing.T) { Return(step.opinion, step.encodeVotesErr) } if step.txs != nil { - conState.EXPECT().SelectProposalTXs(step.lid, gomock.Any()).Return(step.txs) + conState.EXPECT(). + SelectProposalTXs(step.lid, gomock.Any()). + Return(step.txs). + AnyTimes() } if step.latestComplete != 0 { tortoise.EXPECT().LatestComplete().Return(step.latestComplete) } } - var decoded *types.Proposal - if step.expectProposal != nil || step.publishErr != nil { + decoded = make( + chan *types.Proposal, + len(signers), + ) // set the maximum possible capacity + if step.expectProposal != nil || len(step.expectProposals) > 0 || + step.publishErr != nil { publisher.EXPECT(). Publish(ctx, pubsub.ProposalProtocol, gomock.Any()). DoAndReturn(func(_ context.Context, _ string, msg []byte) error { var proposal types.Proposal codec.MustDecode(msg, &proposal) proposal.MustInitialize() - decoded = &proposal + select { + case decoded <- &proposal: + default: + require.FailNow( + t, + "blocking in Publish. check decoded channel capacity", + ) + } return step.publishErr - }) + }). + AnyTimes() } err := builder.build(ctx, step.lid) + close(decoded) if len(step.expectErr) > 0 { require.ErrorContains(t, err, step.expectErr) } else { require.NoError(t, err) + expect := step.expectProposals if step.expectProposal != nil { - require.Equal(t, *step.expectProposal, *decoded) - } else { - require.Nil(t, decoded) + expect = []*types.Proposal{step.expectProposal} + } + received := []*types.Proposal{} + for proposal := range decoded { + received = append(received, proposal) + } + sort.Slice(received, func(i, j int) bool { + return bytes.Compare(received[i].SmesherID[:], received[j].SmesherID[:]) == -1 + }) + require.Len(t, received, len(expect)) + for i := range expect { + require.Equal(t, expect[i], received[i], "i=%d", i) } } } @@ -809,7 +888,6 @@ func TestStartStop(t *testing.T) { builder := New( clock, - signer, cdb, publisher, tortoise, @@ -817,6 +895,7 @@ func TestStartStop(t *testing.T) { conState, WithLogger(logtest.New(t)), ) + builder.Register(signer) var ( ctx, cancel = context.WithCancel(context.Background()) eg errgroup.Group diff --git a/node/node.go b/node/node.go index d114f4f9db..9a5bf0f060 100644 --- a/node/node.go +++ b/node/node.go @@ -170,7 +170,10 @@ func GetCommand() *cobra.Command { // This blocks until the context is finished or until an error is produced err = app.Start(ctx) - cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 30*time.Second) + cleanupCtx, cleanupCancel := context.WithTimeout( + context.Background(), + 30*time.Second, + ) defer cleanupCancel() done := make(chan struct{}, 1) // FIXME: per https://github.com/spacemeshos/go-spacemesh/issues/3830 @@ -452,7 +455,9 @@ func (app *App) Initialize() error { // vote against all blocks in that layer. so it's important to make sure zdist takes longer than // hare's max time duration to run consensus for a layer maxHareRoundsPerLayer := 1 + app.Config.HARE.LimitIterations*hare.RoundsPerIteration // pre-round + 4 rounds per iteration - maxHareLayerDuration := app.Config.HARE.WakeupDelta + time.Duration(maxHareRoundsPerLayer)*app.Config.HARE.RoundDuration + maxHareLayerDuration := app.Config.HARE.WakeupDelta + time.Duration( + maxHareRoundsPerLayer, + )*app.Config.HARE.RoundDuration if app.Config.LayerDuration*time.Duration(app.Config.Tortoise.Zdist) <= maxHareLayerDuration { app.log.With().Error("incompatible params", log.Uint32("tortoise_zdist", app.Config.Tortoise.Zdist), @@ -553,7 +558,11 @@ func (app *App) initServices(ctx context.Context) error { nipostValidatorLogger := app.addLogger(NipostValidatorLogger, lg) postVerifiers := make([]activation.PostVerifier, 0, app.Config.SMESHING.VerifyingOpts.Workers) lg.Debug("creating post verifier") - verifier, err := activation.NewPostVerifier(app.Config.POST, nipostValidatorLogger.Zap(), verifying.WithPowFlags(app.Config.SMESHING.VerifyingOpts.Flags.Value())) + verifier, err := activation.NewPostVerifier( + app.Config.POST, + nipostValidatorLogger.Zap(), + verifying.WithPowFlags(app.Config.SMESHING.VerifyingOpts.Flags.Value()), + ) lg.With().Debug("created post verifier", log.Err(err)) if err != nil { return err @@ -564,7 +573,13 @@ func (app *App) initServices(ctx context.Context) error { app.postVerifier = activation.NewOffloadingPostVerifier(postVerifiers, nipostValidatorLogger) if app.Config.SMESHING.Start { - app.postSupervisor, err = activation.NewPostSupervisor(app.log.Zap(), app.Config.POSTService, app.Config.POST, app.Config.SMESHING.Opts, app.Config.SMESHING.ProvingOpts) + app.postSupervisor, err = activation.NewPostSupervisor( + app.log.Zap(), + app.Config.POSTService, + app.Config.POST, + app.Config.SMESHING.Opts, + app.Config.SMESHING.ProvingOpts, + ) if err != nil { return fmt.Errorf("start post service: %w", err) } @@ -573,7 +588,12 @@ func (app *App) initServices(ctx context.Context) error { } } - validator := activation.NewValidator(poetDb, app.Config.POST, app.Config.SMESHING.Opts.Scrypt, app.postVerifier) + validator := activation.NewValidator( + poetDb, + app.Config.POST, + app.Config.SMESHING.Opts.Scrypt, + app.postVerifier, + ) app.validator = validator cfg := vm.DefaultConfig() @@ -593,7 +613,11 @@ func (app *App) initServices(ctx context.Context) error { if len(genesisAccts) > 0 { exists, err := state.AccountExists(genesisAccts[0].Address) if err != nil { - return fmt.Errorf("failed to check genesis account %v: %w", genesisAccts[0].Address, err) + return fmt.Errorf( + "failed to check genesis account %v: %w", + genesisAccts[0].Address, + err, + ) } if !exists { if err = state.ApplyGenesis(genesisAccts); err != nil { @@ -607,10 +631,18 @@ func (app *App) initServices(ctx context.Context) error { return errors.New("invalid golden atx id") } - app.edVerifier = signing.NewEdVerifier(signing.WithVerifierPrefix(app.Config.Genesis.GenesisID().Bytes())) + app.edVerifier = signing.NewEdVerifier( + signing.WithVerifierPrefix(app.Config.Genesis.GenesisID().Bytes()), + ) vrfVerifier := signing.NewVRFVerifier() - beaconProtocol := beacon.New(app.host, app.edSgn, app.edVerifier, vrfVerifier, app.cachedDB, app.clock, + beaconProtocol := beacon.New( + app.host, + app.edSgn, + app.edVerifier, + vrfVerifier, + app.cachedDB, + app.clock, beacon.WithContext(ctx), beacon.WithConfig(app.Config.Beacon), beacon.WithLogger(app.addLogger(BeaconLogger, lg)), @@ -648,7 +680,12 @@ func (app *App) initServices(ctx context.Context) error { return nil }) - executor := mesh.NewExecutor(app.cachedDB, state, app.conState, app.addLogger(ExecutorLogger, lg)) + executor := mesh.NewExecutor( + app.cachedDB, + state, + app.conState, + app.addLogger(ExecutorLogger, lg), + ) mlog := app.addLogger(MeshLogger, lg) msh, err := mesh.NewMesh(app.cachedDB, app.clock, trtl, executor, app.conState, mlog) if err != nil { @@ -656,7 +693,14 @@ func (app *App) initServices(ctx context.Context) error { } app.eg.Go(func() error { - prune.Prune(ctx, mlog.Zap(), app.db, app.clock, app.Config.Tortoise.Hdist, app.Config.DatabasePruneInterval) + prune.Prune( + ctx, + mlog.Zap(), + app.db, + app.clock, + app.Config.Tortoise.Hdist, + app.Config.DatabasePruneInterval, + ) return nil }) @@ -680,11 +724,23 @@ func (app *App) initServices(ctx context.Context) error { // we can't have an epoch offset which is greater/equal than the number of layers in an epoch if app.Config.HareEligibility.ConfidenceParam >= app.Config.BaseConfig.LayersPerEpoch { - return fmt.Errorf("confidence param should be smaller than layers per epoch. eligibility-confidence-param: %d. layers-per-epoch: %d", - app.Config.HareEligibility.ConfidenceParam, app.Config.BaseConfig.LayersPerEpoch) + return fmt.Errorf( + "confidence param should be smaller than layers per epoch. eligibility-confidence-param: %d. layers-per-epoch: %d", + app.Config.HareEligibility.ConfidenceParam, + app.Config.BaseConfig.LayersPerEpoch, + ) } - proposalListener := proposals.NewHandler(app.cachedDB, app.edVerifier, app.host, fetcherWrapped, beaconProtocol, msh, trtl, vrfVerifier, app.clock, + proposalListener := proposals.NewHandler( + app.cachedDB, + app.edVerifier, + app.host, + fetcherWrapped, + beaconProtocol, + msh, + trtl, + vrfVerifier, + app.clock, proposals.WithLogger(app.addLogger(ProposalListenerLogger, lg)), proposals.WithConfig(proposals.Config{ LayerSize: layerSize, @@ -705,7 +761,15 @@ func (app *App) initServices(ctx context.Context) error { app.addLogger(TxHandlerLogger, lg), ) - app.hOracle = eligibility.New(beaconProtocol, app.cachedDB, vrfVerifier, vrfSigner, app.Config.LayersPerEpoch, app.Config.HareEligibility, app.addLogger(HareOracleLogger, lg)) + app.hOracle = eligibility.New( + beaconProtocol, + app.cachedDB, + vrfVerifier, + vrfSigner, + app.Config.LayersPerEpoch, + app.Config.HareEligibility, + app.addLogger(HareOracleLogger, lg), + ) // TODO: genesisMinerWeight is set to app.Config.SpaceToCommit, because PoET ticks are currently hardcoded to 1 bscfg := app.Config.Bootstrap @@ -717,7 +781,16 @@ func (app *App) initServices(ctx context.Context) error { bootstrap.WithLogger(app.addLogger(BootstrapLogger, lg)), ) - app.certifier = blocks.NewCertifier(app.cachedDB, app.hOracle, app.edSgn.NodeID(), app.edSgn, app.edVerifier, app.host, app.clock, beaconProtocol, trtl, + app.certifier = blocks.NewCertifier( + app.cachedDB, + app.hOracle, + app.edSgn.NodeID(), + app.edSgn, + app.edVerifier, + app.host, + app.clock, + beaconProtocol, + trtl, blocks.WithCertContext(ctx), blocks.WithCertConfig(blocks.CertConfig{ CommitteeSize: app.Config.HARE.N, @@ -744,7 +817,15 @@ func (app *App) initServices(ctx context.Context) error { syncerConf.HareDelayLayers = app.Config.Tortoise.Zdist syncerConf.SyncCertDistance = app.Config.Tortoise.Hdist syncerConf.Standalone = app.Config.Standalone - newSyncer := syncer.NewSyncer(app.cachedDB, app.clock, beaconProtocol, msh, trtl, fetcher, patrol, app.certifier, + newSyncer := syncer.NewSyncer( + app.cachedDB, + app.clock, + beaconProtocol, + msh, + trtl, + fetcher, + patrol, + app.certifier, syncer.WithConfig(syncerConf), syncer.WithLogger(app.addLogger(SyncLogger, lg)), ) @@ -753,7 +834,13 @@ func (app *App) initServices(ctx context.Context) error { app.hOracle.SetSync(newSyncer) hareOutputCh := make(chan hare.LayerOutput, app.Config.HARE.LimitConcurrent) - app.blockGen = blocks.NewGenerator(app.cachedDB, executor, msh, fetcherWrapped, app.certifier, patrol, + app.blockGen = blocks.NewGenerator( + app.cachedDB, + executor, + msh, + fetcherWrapped, + app.certifier, + patrol, blocks.WithContext(ctx), blocks.WithConfig(blocks.Config{ BlockGasLimit: app.Config.BlockGasLimit, @@ -761,7 +848,8 @@ func (app *App) initServices(ctx context.Context) error { GenBlockInterval: 500 * time.Millisecond, }), blocks.WithHareOutputChan(hareOutputCh), - blocks.WithGeneratorLogger(app.addLogger(BlockGenLogger, lg))) + blocks.WithGeneratorLogger(app.addLogger(BlockGenLogger, lg)), + ) hareCfg := app.Config.HARE hareCfg.Hdist = app.Config.Tortoise.Hdist @@ -795,7 +883,12 @@ func (app *App) initServices(ctx context.Context) error { app.hare3.Register(app.edSgn) app.hare3.Start() app.eg.Go(func() error { - compat.ReportWeakcoin(ctx, logger, app.hare3.Coins(), tortoiseWeakCoin{db: app.cachedDB, tortoise: trtl}) + compat.ReportWeakcoin( + ctx, + logger, + app.hare3.Coins(), + tortoiseWeakCoin{db: app.cachedDB, tortoise: trtl}, + ) return nil }) app.eg.Go(func() error { @@ -810,7 +903,6 @@ func (app *App) initServices(ctx context.Context) error { } proposalBuilder := miner.New( app.clock, - app.edSgn, app.cachedDB, app.host, trtl, @@ -824,6 +916,7 @@ func (app *App) initServices(ctx context.Context) error { miner.WithMinGoodAtxPercent(minerGoodAtxPct), miner.WithLogger(app.addLogger(ProposalBuilderLogger, lg)), ) + proposalBuilder.Register(app.edSgn) postSetupMgr, err := activation.NewPostSetupManager( app.edSgn.NodeID(), @@ -856,7 +949,11 @@ func (app *App) initServices(ctx context.Context) error { if app.Config.SMESHING.Start { coinbaseAddr, err = types.StringToAddress(app.Config.SMESHING.CoinbaseAccount) if err != nil { - app.log.Panic("failed to parse CoinbaseAccount address `%s`: %v", app.Config.SMESHING.CoinbaseAccount, err) + app.log.Panic( + "failed to parse CoinbaseAccount address `%s`: %v", + app.Config.SMESHING.CoinbaseAccount, + err, + ) } if coinbaseAddr.IsEmpty() { app.log.Panic("invalid coinbase account") @@ -897,15 +994,53 @@ func (app *App) initServices(ctx context.Context) error { trtl, ) fetcher.SetValidators( - fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(atxHandler.HandleSyncedAtx, app.host, lg)), - fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(poetDb.ValidateAndStoreMsg, app.host, lg)), - fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(proposalListener.HandleSyncedBallot, app.host, lg)), - fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(proposalListener.HandleActiveSet, app.host, lg)), - fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(blockHandler.HandleSyncedBlock, app.host, lg)), - fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(proposalListener.HandleSyncedProposal, app.host, lg)), - fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(app.txHandler.HandleBlockTransaction, app.host, lg)), - fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(app.txHandler.HandleProposalTransaction, app.host, lg)), - fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(malfeasanceHandler.HandleSyncedMalfeasanceProof, app.host, lg)), + fetch.ValidatorFunc( + pubsub.DropPeerOnSyncValidationReject(atxHandler.HandleSyncedAtx, app.host, lg), + ), + fetch.ValidatorFunc( + pubsub.DropPeerOnSyncValidationReject(poetDb.ValidateAndStoreMsg, app.host, lg), + ), + fetch.ValidatorFunc( + pubsub.DropPeerOnSyncValidationReject( + proposalListener.HandleSyncedBallot, + app.host, + lg, + ), + ), + fetch.ValidatorFunc( + pubsub.DropPeerOnSyncValidationReject(proposalListener.HandleActiveSet, app.host, lg), + ), + fetch.ValidatorFunc( + pubsub.DropPeerOnSyncValidationReject(blockHandler.HandleSyncedBlock, app.host, lg), + ), + fetch.ValidatorFunc( + pubsub.DropPeerOnSyncValidationReject( + proposalListener.HandleSyncedProposal, + app.host, + lg, + ), + ), + fetch.ValidatorFunc( + pubsub.DropPeerOnSyncValidationReject( + app.txHandler.HandleBlockTransaction, + app.host, + lg, + ), + ), + fetch.ValidatorFunc( + pubsub.DropPeerOnSyncValidationReject( + app.txHandler.HandleProposalTransaction, + app.host, + lg, + ), + ), + fetch.ValidatorFunc( + pubsub.DropPeerOnSyncValidationReject( + malfeasanceHandler.HandleSyncedMalfeasanceProof, + app.host, + lg, + ), + ), ) syncHandler := func(_ context.Context, _ p2p.Peer, _ []byte) error { @@ -922,17 +1057,51 @@ func (app *App) initServices(ctx context.Context) error { } if app.Config.Beacon.RoundsNumber > 0 { - app.host.Register(pubsub.BeaconWeakCoinProtocol, pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleWeakCoinProposal), pubsub.WithValidatorInline(true)) - app.host.Register(pubsub.BeaconProposalProtocol, pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleProposal), pubsub.WithValidatorInline(true)) - app.host.Register(pubsub.BeaconFirstVotesProtocol, pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleFirstVotes), pubsub.WithValidatorInline(true)) - app.host.Register(pubsub.BeaconFollowingVotesProtocol, pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleFollowingVotes), pubsub.WithValidatorInline(true)) - } - app.host.Register(pubsub.ProposalProtocol, pubsub.ChainGossipHandler(syncHandler, proposalListener.HandleProposal)) - app.host.Register(pubsub.AtxProtocol, pubsub.ChainGossipHandler(atxSyncHandler, atxHandler.HandleGossipAtx)) - app.host.Register(pubsub.TxProtocol, pubsub.ChainGossipHandler(syncHandler, app.txHandler.HandleGossipTransaction)) - app.host.Register(pubsub.HareProtocol, pubsub.ChainGossipHandler(syncHandler, app.hare.GetHareMsgHandler())) - app.host.Register(pubsub.BlockCertify, pubsub.ChainGossipHandler(syncHandler, app.certifier.HandleCertifyMessage)) - app.host.Register(pubsub.MalfeasanceProof, pubsub.ChainGossipHandler(atxSyncHandler, malfeasanceHandler.HandleMalfeasanceProof)) + app.host.Register( + pubsub.BeaconWeakCoinProtocol, + pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleWeakCoinProposal), + pubsub.WithValidatorInline(true), + ) + app.host.Register( + pubsub.BeaconProposalProtocol, + pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleProposal), + pubsub.WithValidatorInline(true), + ) + app.host.Register( + pubsub.BeaconFirstVotesProtocol, + pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleFirstVotes), + pubsub.WithValidatorInline(true), + ) + app.host.Register( + pubsub.BeaconFollowingVotesProtocol, + pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleFollowingVotes), + pubsub.WithValidatorInline(true), + ) + } + app.host.Register( + pubsub.ProposalProtocol, + pubsub.ChainGossipHandler(syncHandler, proposalListener.HandleProposal), + ) + app.host.Register( + pubsub.AtxProtocol, + pubsub.ChainGossipHandler(atxSyncHandler, atxHandler.HandleGossipAtx), + ) + app.host.Register( + pubsub.TxProtocol, + pubsub.ChainGossipHandler(syncHandler, app.txHandler.HandleGossipTransaction), + ) + app.host.Register( + pubsub.HareProtocol, + pubsub.ChainGossipHandler(syncHandler, app.hare.GetHareMsgHandler()), + ) + app.host.Register( + pubsub.BlockCertify, + pubsub.ChainGossipHandler(syncHandler, app.certifier.HandleCertifyMessage), + ) + app.host.Register( + pubsub.MalfeasanceProof, + pubsub.ChainGossipHandler(atxSyncHandler, malfeasanceHandler.HandleMalfeasanceProof), + ) app.proposalBuilder = proposalBuilder app.proposalListener = proposalListener @@ -966,7 +1135,10 @@ func (app *App) launchStandalone(ctx context.Context) error { return nil } if len(app.Config.PoETServers) != 1 { - return fmt.Errorf("to launch in a standalone mode provide single local address for poet: %v", app.Config.PoETServers) + return fmt.Errorf( + "to launch in a standalone mode provide single local address for poet: %v", + app.Config.PoETServers, + ) } value := types.Beacon{} genesis := app.Config.Genesis.GenesisID() @@ -1058,7 +1230,11 @@ func (app *App) startServices(ctx context.Context) error { if app.Config.SMESHING.Start { coinbaseAddr, err := types.StringToAddress(app.Config.SMESHING.CoinbaseAccount) if err != nil { - app.log.Panic("failed to parse CoinbaseAccount address on start `%s`: %v", app.Config.SMESHING.CoinbaseAccount, err) + app.log.Panic( + "failed to parse CoinbaseAccount address on start `%s`: %v", + app.Config.SMESHING.CoinbaseAccount, + err, + ) } if err := app.atxBuilder.StartSmeshing(coinbaseAddr, app.Config.SMESHING.Opts); err != nil { app.log.Panic("failed to start smeshing: %v", err) @@ -1077,44 +1253,100 @@ func (app *App) startServices(ctx context.Context) error { return nil } -func (app *App) initService(ctx context.Context, svc grpcserver.Service) (grpcserver.ServiceAPI, error) { +func (app *App) initService( + ctx context.Context, + svc grpcserver.Service, +) (grpcserver.ServiceAPI, error) { switch svc { case grpcserver.Debug: return grpcserver.NewDebugService(app.db, app.conState, app.host, app.hOracle), nil case grpcserver.GlobalState: return grpcserver.NewGlobalStateService(app.mesh, app.conState), nil case grpcserver.Mesh: - return grpcserver.NewMeshService(app.cachedDB, app.mesh, app.conState, app.clock, app.Config.LayersPerEpoch, app.Config.Genesis.GenesisID(), app.Config.LayerDuration, app.Config.LayerAvgSize, uint32(app.Config.TxsPerProposal)), nil + return grpcserver.NewMeshService( + app.cachedDB, + app.mesh, + app.conState, + app.clock, + app.Config.LayersPerEpoch, + app.Config.Genesis.GenesisID(), + app.Config.LayerDuration, + app.Config.LayerAvgSize, + uint32(app.Config.TxsPerProposal), + ), nil case grpcserver.Node: - return grpcserver.NewNodeService(app.host, app.mesh, app.clock, app.syncer, cmd.Version, cmd.Commit), nil + return grpcserver.NewNodeService( + app.host, + app.mesh, + app.clock, + app.syncer, + cmd.Version, + cmd.Commit, + ), nil case grpcserver.Admin: return grpcserver.NewAdminService(app.db, app.Config.DataDir(), app.host), nil case grpcserver.Smesher: - return grpcserver.NewSmesherService(app.postSetupMgr, app.atxBuilder, app.postSupervisor, app.Config.API.SmesherStreamInterval, app.Config.SMESHING.Opts), nil + return grpcserver.NewSmesherService( + app.postSetupMgr, + app.atxBuilder, + app.postSupervisor, + app.Config.API.SmesherStreamInterval, + app.Config.SMESHING.Opts, + ), nil case grpcserver.Post: return app.grpcPostService, nil case grpcserver.Transaction: - return grpcserver.NewTransactionService(app.db, app.host, app.mesh, app.conState, app.syncer, app.txHandler), nil + return grpcserver.NewTransactionService( + app.db, + app.host, + app.mesh, + app.conState, + app.syncer, + app.txHandler, + ), nil case grpcserver.Activation: - return grpcserver.NewActivationService(app.cachedDB, types.ATXID(app.Config.Genesis.GoldenATX())), nil + return grpcserver.NewActivationService( + app.cachedDB, + types.ATXID(app.Config.Genesis.GoldenATX()), + ), nil } return nil, fmt.Errorf("unknown service %s", svc) } -func unaryGrpcLogStart(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { +func unaryGrpcLogStart( + ctx context.Context, + req any, + _ *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, +) (any, error) { ctxzap.Info(ctx, "started unary call") return handler(ctx, req) } -func streamingGrpcLogStart(srv any, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { +func streamingGrpcLogStart( + srv any, + stream grpc.ServerStream, + _ *grpc.StreamServerInfo, + handler grpc.StreamHandler, +) error { ctxzap.Info(stream.Context(), "started streaming call") return handler(srv, stream) } func (app *App) newGrpc(logger log.Log, endpoint string) *grpcserver.Server { - return grpcserver.New(endpoint, logger, - grpc.ChainStreamInterceptor(grpctags.StreamServerInterceptor(), grpczap.StreamServerInterceptor(logger.Zap()), streamingGrpcLogStart), - grpc.ChainUnaryInterceptor(grpctags.UnaryServerInterceptor(), grpczap.UnaryServerInterceptor(logger.Zap()), unaryGrpcLogStart), + return grpcserver.New( + endpoint, + logger, + grpc.ChainStreamInterceptor( + grpctags.StreamServerInterceptor(), + grpczap.StreamServerInterceptor(logger.Zap()), + streamingGrpcLogStart, + ), + grpc.ChainUnaryInterceptor( + grpctags.UnaryServerInterceptor(), + grpczap.UnaryServerInterceptor(logger.Zap()), + unaryGrpcLogStart, + ), grpc.MaxSendMsgSize(app.Config.API.GrpcSendMsgSize), grpc.MaxRecvMsgSize(app.Config.API.GrpcRecvMsgSize), ) @@ -1162,7 +1394,10 @@ func (app *App) startAPIServices(ctx context.Context) error { if len(public) == 0 { return fmt.Errorf("can't start json server without public services") } - app.jsonAPIService = grpcserver.NewJSONHTTPServer(app.Config.API.JSONListener, logger.WithName("JSON")) + app.jsonAPIService = grpcserver.NewJSONHTTPServer( + app.Config.API.JSONListener, + logger.WithName("JSON"), + ) app.jsonAPIService.StartService(ctx, public...) } if app.grpcPublicService != nil { @@ -1356,9 +1591,18 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error { } app.db = sqlDB if app.Config.CollectMetrics && app.Config.DatabaseSizeMeteringInterval != 0 { - app.dbMetrics = dbmetrics.NewDBMetricsCollector(ctx, sqlDB, app.addLogger(StateDbLogger, lg), app.Config.DatabaseSizeMeteringInterval) + app.dbMetrics = dbmetrics.NewDBMetricsCollector( + ctx, + sqlDB, + app.addLogger(StateDbLogger, lg), + app.Config.DatabaseSizeMeteringInterval, + ) } - app.cachedDB = datastore.NewCachedDB(sqlDB, app.addLogger(CachedDBLogger, lg), datastore.WithConfig(app.Config.Cache)) + app.cachedDB = datastore.NewCachedDB( + sqlDB, + app.addLogger(CachedDBLogger, lg), + datastore.WithConfig(app.Config.Cache), + ) return nil } @@ -1411,7 +1655,11 @@ func (app *App) startSynchronous(ctx context.Context) (err error) { ) if err := os.MkdirAll(app.Config.DataDir(), 0o700); err != nil { - return fmt.Errorf("data-dir %s not found or could not be created: %w", app.Config.DataDir(), err) + return fmt.Errorf( + "data-dir %s not found or could not be created: %w", + app.Config.DataDir(), + err, + ) } /* Setup monitoring */ diff --git a/proposals/util.go b/proposals/util.go index a5ca4afcf7..1d765c62cd 100644 --- a/proposals/util.go +++ b/proposals/util.go @@ -11,7 +11,10 @@ var ( GetNumEligibleSlots = util.GetNumEligibleSlots ) -func MustGetNumEligibleSlots(weight, minWeight, totalWeight uint64, committeeSize, layersPerEpoch uint32) uint32 { +func MustGetNumEligibleSlots( + weight, minWeight, totalWeight uint64, + committeeSize, layersPerEpoch uint32, +) uint32 { slots, err := GetNumEligibleSlots(weight, minWeight, totalWeight, committeeSize, layersPerEpoch) if err != nil { panic(err) @@ -31,7 +34,12 @@ type VrfMessage struct { } // MustSerializeVRFMessage serializes a message for generating/verifying a VRF signature. -func MustSerializeVRFMessage(beacon types.Beacon, epoch types.EpochID, nonce types.VRFPostIndex, counter uint32) []byte { +func MustSerializeVRFMessage( + beacon types.Beacon, + epoch types.EpochID, + nonce types.VRFPostIndex, + counter uint32, +) []byte { m := VrfMessage{ Type: types.EligibilityVoting, Beacon: beacon,