Skip to content

Commit

Permalink
hare3: support multiple keys (#5126)
Browse files Browse the repository at this point in the history
closes: #5085

this change refactors original hare oracle to accept key that will be used to determine eligibility. and extends hare3 with a collection of keys that can be used to participate from the same physical node.

limitation is that ecvrf and ed signatures are computed sequentially and may cause messages to be delayed, if number of nodes are larger then 100. larger numbers are useful only if we expect it to be used with smallest possible atxs, which is by itself a problem.
  • Loading branch information
dshulyak committed Oct 6, 2023
1 parent 26b8b55 commit d70d8c7
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 104 deletions.
21 changes: 10 additions & 11 deletions hare/eligibility/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,13 @@ func (o *Oracle) resetCacheOnSynced(ctx context.Context) {
}
}

// buildVRFMessage builds the VRF message used as input for the BLS (msg=Beacon##Layer##Round).
// buildVRFMessage builds the VRF message used as input for hare eligibility validation.
func (o *Oracle) buildVRFMessage(ctx context.Context, layer types.LayerID, round uint32) ([]byte, error) {
beacon, err := o.beacons.GetBeacon(layer.GetEpoch())
if err != nil {
return nil, fmt.Errorf("get beacon: %w", err)
}

msg := VrfMessage{Type: types.EligibilityHare, Beacon: beacon, Round: round, Layer: layer}
buf, err := codec.Encode(&msg)
if err != nil {
o.WithContext(ctx).With().Fatal("failed to encode", log.Err(err))
}
return buf, nil
return codec.MustEncode(&VrfMessage{Type: types.EligibilityHare, Beacon: beacon, Round: round, Layer: layer}), nil
}

func (o *Oracle) totalWeight(ctx context.Context, layer types.LayerID) (uint64, error) {
Expand Down Expand Up @@ -344,11 +338,16 @@ func (o *Oracle) CalcEligibility(

// Proof returns the role proof for the current Layer & Round.
func (o *Oracle) Proof(ctx context.Context, layer types.LayerID, round uint32) (types.VrfSignature, error) {
msg, err := o.buildVRFMessage(ctx, layer, round)
beacon, err := o.beacons.GetBeacon(layer.GetEpoch())
if err != nil {
return types.EmptyVrfSignature, err
return types.EmptyVrfSignature, fmt.Errorf("get beacon: %w", err)
}
return o.vrfSigner.Sign(msg), nil
return o.GenVRF(ctx, o.vrfSigner, beacon, layer, round), nil
}

// GenVRF generates vrf for hare eligibility.
func (o *Oracle) GenVRF(ctx context.Context, signer *signing.VRFSigner, beacon types.Beacon, layer types.LayerID, round uint32) types.VrfSignature {
return signer.Sign(codec.MustEncode(&VrfMessage{Type: types.EligibilityHare, Beacon: beacon, Round: round, Layer: layer}))
}

// Returns a map of all active node IDs in the specified layer id.
Expand Down
165 changes: 106 additions & 59 deletions hare3/hare.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/jonboulle/clockwork"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/codec"
Expand Down Expand Up @@ -137,7 +138,6 @@ func New(
pubsub pubsub.PublishSubsciber,
db *datastore.CachedDB,
verifier *signing.EdVerifier,
signer *signing.EdSigner,
oracle oracle,
sync system.SyncStateProvider,
patrol *layerpatrol.LayerPatrol,
Expand All @@ -149,6 +149,7 @@ func New(
cancel: cancel,
results: make(chan ConsensusOutput, 32),
coins: make(chan WeakCoinOutput, 32),
signers: map[string]*signing.EdSigner{},
sessions: map[types.LayerID]*protocol{},

config: DefaultConfig(),
Expand All @@ -159,7 +160,6 @@ func New(
pubsub: pubsub,
db: db,
verifier: verifier,
signer: signer,
oracle: &legacyOracle{
log: zap.NewNop(),
oracle: oracle,
Expand All @@ -183,6 +183,7 @@ type Hare struct {
results chan ConsensusOutput
coins chan WeakCoinOutput
mu sync.Mutex
signers map[string]*signing.EdSigner
sessions map[types.LayerID]*protocol

// options
Expand All @@ -195,13 +196,19 @@ type Hare struct {
pubsub pubsub.PublishSubsciber
db *datastore.CachedDB
verifier *signing.EdVerifier
signer *signing.EdSigner
oracle *legacyOracle
sync system.SyncStateProvider
patrol *layerpatrol.LayerPatrol
tracer Tracer
}

func (h *Hare) Register(signer *signing.EdSigner) {
h.mu.Lock()
defer h.mu.Unlock()
h.log.Info("register signing key", zap.Stringer("node", signer.NodeID()))
h.signers[string(signer.NodeID().Bytes())] = signer
}

func (h *Hare) Results() <-chan ConsensusOutput {
return h.results
}
Expand Down Expand Up @@ -323,15 +330,24 @@ func (h *Hare) onLayer(layer types.LayerID) {
return
}
h.patrol.SetHareInCharge(layer)
proto := newProtocol(h.config.Committee/2 + 1)

h.mu.Lock()
h.sessions[layer] = proto
// signer can't join mid session
s := &session{
lid: layer,
beacon: beacon,
signers: maps.Values(h.signers),
vrfs: make([]*types.HareEligibility, len(h.signers)),
proto: newProtocol(h.config.Committee/2 + 1),
}
h.sessions[layer] = s.proto
h.mu.Unlock()

sessionStart.Inc()
h.tracer.OnStart(layer)
h.log.Debug("registered layer", zap.Uint32("lid", layer.Uint32()))
h.eg.Go(func() error {
if err := h.run(layer, beacon, proto); err != nil {
if err := h.run(s); err != nil {
h.log.Warn("failed",
zap.Uint32("lid", layer.Uint32()),
zap.Error(err),
Expand All @@ -354,56 +370,68 @@ func (h *Hare) onLayer(layer types.LayerID) {
})
}

func (h *Hare) run(layer types.LayerID, beacon types.Beacon, proto *protocol) error {
func (h *Hare) run(session *session) error {
// oracle may load non-negligible amount of data from disk
// we do it before preround starts, so that load can have some slack time
// before it needs to be used in validation
current := IterRound{Round: preround}

start := time.Now()
vrf := h.oracle.active(h.signer.NodeID(), layer, current)
var (
current = IterRound{Round: preround}
start = time.Now()
active bool
)
for i := range session.signers {
session.vrfs[i] = h.oracle.active(session.signers[i], session.beacon, session.lid, current)
active = active || session.vrfs[i] != nil
}
h.tracer.OnActive(session.vrfs)
activeLatency.Observe(time.Since(start).Seconds())
h.tracer.OnActive(vrf)

walltime := h.nodeclock.LayerToTime(layer).Add(h.config.PreroundDelay)
if vrf != nil {
h.log.Debug("active in preround", zap.Uint32("lid", layer.Uint32()))
walltime := h.nodeclock.LayerToTime(session.lid).Add(h.config.PreroundDelay)
if active {
h.log.Debug("active in preround", zap.Uint32("lid", session.lid.Uint32()))
// initial set is not needed if node is not active in preround
select {
case <-h.wallclock.After(walltime.Sub(h.wallclock.Now())):
case <-h.ctx.Done():
return h.ctx.Err()
}
start := time.Now()
proto.OnInitial(h.proposals(layer, beacon))
session.proto.OnInitial(h.proposals(session))
proposalsLatency.Observe(time.Since(start).Seconds())
}
if err := h.onOutput(layer, current, proto.Next(vrf != nil), vrf); err != nil {
if err := h.onOutput(session, current, session.proto.Next(active)); err != nil {
return err
}
result := false
for {
walltime = walltime.Add(h.config.RoundDuration)
current := proto.IterRound
var vrf *types.HareEligibility
if current.IsMessageRound() {
start := time.Now()
vrf = h.oracle.active(h.signer.NodeID(), layer, current)
activeLatency.Observe(time.Since(start).Seconds())
active = false
current = session.proto.IterRound
start = time.Now()

for i := range session.signers {
if current.IsMessageRound() {
session.vrfs[i] = h.oracle.active(session.signers[i], session.beacon, session.lid, current)
} else {
session.vrfs[i] = nil
}
active = active || session.vrfs[i] != nil
}
h.tracer.OnActive(vrf)
h.tracer.OnActive(session.vrfs)
activeLatency.Observe(time.Since(start).Seconds())

select {
case <-h.wallclock.After(walltime.Sub(h.wallclock.Now())):
h.log.Debug("execute round",
zap.Uint32("lid", layer.Uint32()),
zap.Uint8("iter", proto.Iter), zap.Stringer("round", proto.Round),
zap.Bool("active", vrf != nil),
zap.Uint32("lid", session.lid.Uint32()),
zap.Uint8("iter", session.proto.Iter), zap.Stringer("round", session.proto.Round),
zap.Bool("active", active),
)
out := proto.Next(vrf != nil)
out := session.proto.Next(active)
if out.result != nil {
result = true
}
if err := h.onOutput(layer, current, out, vrf); err != nil {
if err := h.onOutput(session, current, out); err != nil {
return err
}
if out.terminated {
Expand All @@ -422,66 +450,77 @@ func (h *Hare) run(layer types.LayerID, beacon types.Beacon, proto *protocol) er
}
}

func (h *Hare) onOutput(layer types.LayerID, ir IterRound, out output, vrf *types.HareEligibility) error {
if out.message != nil {
out.message.Layer = layer
out.message.Eligibility = *vrf
out.message.Sender = h.signer.NodeID()
out.message.Signature = h.signer.Sign(signing.HARE, out.message.ToMetadata().ToBytes())
if err := h.pubsub.Publish(h.ctx, h.config.ProtocolName, out.message.ToBytes()); err != nil {
h.log.Error("failed to publish", zap.Inline(out.message), zap.Error(err))
func (h *Hare) onOutput(session *session, ir IterRound, out output) error {
for i, vrf := range session.vrfs {
if vrf == nil || out.message == nil {
continue
}
msg := *out.message // shallow copy
msg.Layer = session.lid
msg.Eligibility = *vrf
msg.Sender = session.signers[i].NodeID()
msg.Signature = session.signers[i].Sign(signing.HARE, msg.ToMetadata().ToBytes())
if err := h.pubsub.Publish(h.ctx, h.config.ProtocolName, msg.ToBytes()); err != nil {
h.log.Error("failed to publish", zap.Inline(&msg), zap.Error(err))
}
}
h.tracer.OnMessageSent(out.message)
h.log.Debug("round output",
zap.Uint32("lid", layer.Uint32()),
zap.Uint32("lid", session.lid.Uint32()),
zap.Uint8("iter", ir.Iter), zap.Stringer("round", ir.Round),
zap.Inline(&out),
)
h.tracer.OnMessageSent(out.message)
if out.coin != nil {
select {
case <-h.ctx.Done():
return h.ctx.Err()
case h.coins <- WeakCoinOutput{Layer: layer, Coin: *out.coin}:
case h.coins <- WeakCoinOutput{Layer: session.lid, Coin: *out.coin}:
}
sessionCoin.Inc()
}
if out.result != nil {
select {
case <-h.ctx.Done():
return h.ctx.Err()
case h.results <- ConsensusOutput{Layer: layer, Proposals: out.result}:
case h.results <- ConsensusOutput{Layer: session.lid, Proposals: out.result}:
}
sessionResult.Inc()
}
return nil
}

func (h *Hare) proposals(lid types.LayerID, epochBeacon types.Beacon) []types.ProposalID {
func (h *Hare) proposals(session *session) []types.ProposalID {
h.log.Debug("requested proposals",
zap.Uint32("lid", lid.Uint32()),
zap.Stringer("beacon", epochBeacon),
zap.Uint32("lid", session.lid.Uint32()),
zap.Stringer("beacon", session.beacon),
)
props, err := proposals.GetByLayer(h.db, lid)
props, err := proposals.GetByLayer(h.db, session.lid)
if err != nil {
if errors.Is(err, sql.ErrNotFound) {
h.log.Warn("no proposals found for hare, using empty set",
zap.Uint32("lid", lid.Uint32()), zap.Error(err))
zap.Uint32("lid", session.lid.Uint32()), zap.Error(err))
} else {
h.log.Error("failed to get proposals for hare",
zap.Uint32("lid", lid.Uint32()), zap.Error(err))
zap.Uint32("lid", session.lid.Uint32()), zap.Error(err))
}
return []types.ProposalID{}
}
var (
beacon types.Beacon
result []types.ProposalID
beacon types.Beacon
result []types.ProposalID
min, own *types.ActivationTxHeader
)
own, err := h.db.GetEpochAtx(lid.GetEpoch()-1, h.signer.NodeID())
if err != nil {
h.log.Warn("no atxs in the requested epoch",
zap.Uint32("epoch", lid.GetEpoch().Uint32()-1),
zap.Error(err))
for _, signer := range session.signers {
own, err = h.db.GetEpochAtx(session.lid.GetEpoch()-1, signer.NodeID())
if err != nil && !errors.Is(err, sql.ErrNotFound) {
return []types.ProposalID{}
}
if min == nil || (min != nil && own != nil && own.TickHeight() < min.TickHeight()) {
min = own
}
}
if min == nil {
h.log.Debug("no atxs in the requested epoch", zap.Uint32("epoch", session.lid.GetEpoch().Uint32()-1))
return []types.ProposalID{}
}
atxs := map[types.ATXID]int{}
Expand All @@ -508,10 +547,10 @@ func (h *Hare) proposals(lid types.LayerID, epochBeacon types.Beacon) []types.Pr
h.log.Error("atx is not loaded", zap.Error(err), zap.Stringer("atxid", p.AtxID))
return []types.ProposalID{}
}
if hdr.BaseTickHeight >= own.TickHeight() {
if hdr.BaseTickHeight >= min.TickHeight() {
// does not vote for future proposal
h.log.Warn("proposal base tick height too high. skipping",
zap.Uint32("lid", lid.Uint32()),
zap.Uint32("lid", session.lid.Uint32()),
zap.Uint64("proposal_height", hdr.BaseTickHeight),
zap.Uint64("own_height", own.TickHeight()),
)
Expand All @@ -532,14 +571,14 @@ func (h *Hare) proposals(lid types.LayerID, epochBeacon types.Beacon) []types.Pr
} else {
beacon = refBallot.EpochData.Beacon
}
if beacon == epochBeacon {
if beacon == session.beacon {
result = append(result, p.ID())
} else {
h.log.Warn("proposal has different beacon value",
zap.Uint32("lid", lid.Uint32()),
zap.Uint32("lid", session.lid.Uint32()),
zap.Stringer("id", p.ID()),
zap.String("proposal_beacon", beacon.ShortString()),
zap.String("epoch_beacon", epochBeacon.ShortString()),
zap.String("epoch_beacon", session.beacon.ShortString()),
)
}
}
Expand All @@ -553,3 +592,11 @@ func (h *Hare) Stop() {
close(h.coins)
h.log.Info("stopped")
}

type session struct {
proto *protocol
lid types.LayerID
beacon types.Beacon
signers []*signing.EdSigner
vrfs []*types.HareEligibility
}
Loading

0 comments on commit d70d8c7

Please sign in to comment.