Skip to content

Commit

Permalink
Reduce number of DB queries in proposal building process (#6393)
Browse files Browse the repository at this point in the history
## Motivation

During the proposal building process we do not actually need to query the DB for ATXs. Instead we can just lookup the ATX in the `atxsdata` cache which should be significantly faster and reduce proposal building times.
  • Loading branch information
fasmat committed Oct 18, 2024
1 parent 494035c commit 58b0c6c
Show file tree
Hide file tree
Showing 27 changed files with 693 additions and 361 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ See [RELEASE](./RELEASE.md) for workflow instructions.
* [#6387](https://github.com/spacemeshos/go-spacemesh/pull/6387) Fix an issue were in rare cases invalid proofs for
malicious identities were created.

* [#6393](https://github.com/spacemeshos/go-spacemesh/pull/6393) Further improved proposal building process to avoid
late proposals in 1:N setups and during cyclegap.

## v1.7.4

### Improvements
Expand Down
3 changes: 1 addition & 2 deletions activation/activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/localsql"
"github.com/spacemeshos/go-spacemesh/sql/localsql/nipost"
sqlmocks "github.com/spacemeshos/go-spacemesh/sql/mocks"
"github.com/spacemeshos/go-spacemesh/sql/statesql"
)

Expand Down Expand Up @@ -1396,7 +1395,7 @@ func TestGetPositioningAtx(t *testing.T) {
t.Parallel()
tab := newTestBuilder(t, 1)

db := sqlmocks.NewMockExecutor(gomock.NewController(t))
db := sql.NewMockExecutor(gomock.NewController(t))
tab.Builder.db = db
expected := errors.New("db error")
db.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any()).Return(0, expected)
Expand Down
6 changes: 3 additions & 3 deletions activation/e2e/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestCheckpoint_PublishingSoloATXs(t *testing.T) {
require.NoError(t, err)

cfg := testPostConfig()
db := statesql.InMemory()
db := statesql.InMemoryTest(t)
cdb := datastore.NewCachedDB(db, logger)

opts := testPostSetupOpts(t)
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestCheckpoint_PublishingSoloATXs(t *testing.T) {
require.NoError(t, err)
t.Cleanup(clock.Close)

localDB := localsql.InMemory()
localDB := localsql.InMemoryTest(t)
nb, err := activation.NewNIPostBuilder(
localDB,
svc,
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestCheckpoint_PublishingSoloATXs(t *testing.T) {
poetDb, err = activation.NewPoetDb(newDB, logger.Named("poetDb"))
require.NoError(t, err)
cdb = datastore.NewCachedDB(newDB, logger)
atxdata, err = atxsdata.Warm(newDB, 1, logger)
atxdata, err = atxsdata.Warm(newDB, 1, logger, sig)
poetService = activation.NewPoetServiceWithClient(poetDb, client, poetCfg, logger)
validator = activation.NewValidator(newDB, poetDb, cfg, opts.Scrypt, verifier)
require.NoError(t, err)
Expand Down
89 changes: 72 additions & 17 deletions atxsdata/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"golang.org/x/exp/maps"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/signing"
)

// SAFETY: all exported fields are read-only and are safe to read concurrently.
Expand All @@ -23,7 +24,10 @@ type ATX struct {
func New() *Data {
return &Data{
malicious: map[types.NodeID]struct{}{},
epochs: map[types.EpochID]epochCache{},
epochs: map[types.EpochID]map[types.ATXID]*ATX{},

signers: map[types.NodeID]struct{}{},
managed: map[types.EpochID]map[types.NodeID]types.ATXID{},
}
}

Expand All @@ -32,11 +36,26 @@ type Data struct {

mu sync.RWMutex
malicious map[types.NodeID]struct{}
epochs map[types.EpochID]epochCache
epochs map[types.EpochID]map[types.ATXID]*ATX

signers map[types.NodeID]struct{}
managed map[types.EpochID]map[types.NodeID]types.ATXID
}

type epochCache struct {
index map[types.ATXID]*ATX
func (d *Data) Register(sig *signing.EdSigner) {
if _, exists := d.signers[sig.NodeID()]; exists {
return
}
d.signers[sig.NodeID()] = struct{}{}

// update quick access for newly registered signer
for epoch, ecache := range d.epochs {
for id, atx := range ecache {
if atx.Node == sig.NodeID() {
d.managed[epoch][sig.NodeID()] = id
}
}
}
}

func (d *Data) Evicted() types.EpochID {
Expand All @@ -58,6 +77,11 @@ func (d *Data) EvictEpoch(evict types.EpochID) {
if d.evicted.Load() < evict.Uint32() {
d.evicted.Store(evict.Uint32())
}
for epoch := range d.managed {
if epoch <= evict {
delete(d.managed, epoch)
}
}
for epoch := range d.epochs {
if epoch <= evict {
delete(d.epochs, epoch)
Expand Down Expand Up @@ -92,19 +116,23 @@ func (d *Data) AddAtx(target types.EpochID, id types.ATXID, atx *ATX) bool {
}
ecache, exists := d.epochs[target]
if !exists {
ecache = epochCache{
index: map[types.ATXID]*ATX{},
}
d.epochs[target] = ecache
d.epochs[target] = map[types.ATXID]*ATX{}
ecache = d.epochs[target]
}

if _, exists = ecache.index[id]; exists {
if _, exists = ecache[id]; exists {
return false
}

atxsCounter.WithLabelValues(target.String()).Inc()

ecache.index[id] = atx
ecache[id] = atx
if _, exists = d.signers[atx.Node]; exists {
managedCache, exists := d.managed[target]
if !exists {
d.managed[target] = map[types.NodeID]types.ATXID{}
managedCache = d.managed[target]
}
managedCache[atx.Node] = id
}
return true
}

Expand Down Expand Up @@ -164,21 +192,48 @@ func (d *Data) Get(epoch types.EpochID, atx types.ATXID) *ATX {
if !exists {
return nil
}
data, exists := ecache.index[atx]
data, exists := ecache[atx]
if !exists {
return nil
}
return data
}

// GetByEpochAndNodeID returns atx data by epoch and node id. This query will be slow for nodeIDs that
// are not managed by the node.
func (d *Data) GetByEpochAndNodeID(epoch types.EpochID, node types.NodeID) (types.ATXID, *ATX) {
d.mu.RLock()
defer d.mu.RUnlock()
atxcache, exists := d.managed[epoch]
if exists {
if atxid, exists := atxcache[node]; exists {
atx, exists := d.epochs[epoch][atxid]
if exists {
return atxid, atx
}
}
}

ecache, exists := d.epochs[epoch]
if !exists {
return types.EmptyATXID, nil
}
for id, atx := range ecache {
if atx.Node == node {
return id, atx
}
}
return types.EmptyATXID, nil
}

func (d *Data) Size(target types.EpochID) int {
d.mu.RLock()
defer d.mu.RUnlock()
ecache, exists := d.epochs[target]
if !exists {
return 0
}
return len(ecache.index)
return len(ecache)
}

type lockGuard struct{}
Expand All @@ -203,7 +258,7 @@ func (d *Data) IterateInEpoch(epoch types.EpochID, fn func(types.ATXID, *ATX), f
if !exists {
return
}
for id, atx := range ecache.index {
for id, atx := range ecache {
ok := true
for _, filter := range filters {
ok = ok && filter(d, atx, lockGuard{})
Expand Down Expand Up @@ -250,7 +305,7 @@ func (d *Data) MissingInEpoch(epoch types.EpochID, atxs []types.ATXID) []types.A
}
var missing []types.ATXID
for _, id := range atxs {
if _, exists := ecache.index[id]; !exists {
if _, exists := ecache[id]; !exists {
missing = append(missing, id)
}
}
Expand All @@ -271,7 +326,7 @@ func (d *Data) WeightForSet(epoch types.EpochID, set []types.ATXID) (uint64, []b
}
var weight uint64
for i, id := range set {
if data, exists := ecache.index[id]; exists {
if data, exists := ecache[id]; exists {
weight += data.Weight
used[i] = true
}
Expand Down
Loading

0 comments on commit 58b0c6c

Please sign in to comment.