Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go 4825 migrate objectidspaceid mappings #2000

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/anytype/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ func (c *Config) FSConfig() (FSConfig, error) {
return FSConfig{IPFSStorageAddr: res.CustomFileStorePath}, nil
}

func (c *Config) GetRepoPath() string {
return c.RepoPath
}

func (c *Config) GetConfigPath() string {
return filepath.Join(c.RepoPath, ConfigFileName)
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/lib/localstore/objectstore/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,14 @@ func (w *walletStub) Name() string { return wallet.CName }
func NewStoreFixture(t testing.TB) *StoreFixture {
ctx, cancel := context.WithCancel(context.Background())

walletService := newWalletStub(t)

fullText := ftsearch.TantivyNew()
testApp := &app.App{}

dataStore, err := datastore.NewInMemory()
require.NoError(t, err)

testApp.Register(newWalletStub(t))
testApp.Register(dataStore)
testApp.Register(walletService)
err = fullText.Init(testApp)
require.NoError(t, err)
err = fullText.Run(context.Background())
Expand All @@ -100,7 +98,7 @@ func NewStoreFixture(t testing.TB) *StoreFixture {
fts: fullText,
sourceService: &detailsFromId{},
arenaPool: &anyenc.ArenaPool{},
repoPath: walletService.RepoPath(),
objectStorePath: t.TempDir(),
oldStore: oldStore,
spaceIndexes: map[string]spaceindex.Store{},
techSpaceIdProvider: &stubTechSpaceIdProvider{},
Expand Down
53 changes: 31 additions & 22 deletions pkg/lib/localstore/objectstore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore/anystorehelper"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore/oldstore"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore/spaceindex"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore/spaceresolverstore"
"github.com/anyproto/anytype-heart/pkg/lib/logging"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
)
Expand Down Expand Up @@ -52,7 +53,7 @@ type ObjectStore interface {
GetCrdtDb(spaceId string) anystore.DB

SpaceNameGetter
SpaceIdBinder
spaceresolverstore.Store
CrossSpace
}

Expand Down Expand Up @@ -87,9 +88,11 @@ type TechSpaceIdProvider interface {
}

type dsObjectStore struct {
repoPath string
techSpaceId string
anyStoreConfig anystore.Config
spaceresolverstore.Store

objectStorePath string
techSpaceId string
anyStoreConfig anystore.Config

anyStore anystore.DB
anyStoreLockRemove func() error
Expand All @@ -98,7 +101,6 @@ type dsObjectStore struct {
virtualSpaces anystore.Collection
system anystore.Collection
fulltextQueue anystore.Collection
bindId anystore.Collection

arenaPool *anyenc.ArenaPool

Expand Down Expand Up @@ -147,15 +149,20 @@ func New() ObjectStore {

func (s *dsObjectStore) Init(a *app.App) (err error) {
s.sourceService = app.MustComponent[spaceindex.SourceDetailsFromID](a)

repoPath := app.MustComponent[wallet.Wallet](a).RepoPath()

fts := a.Component(ftsearch.CName)
if fts == nil {
log.Warnf("init objectstore without fulltext")
} else {
s.fts = fts.(ftsearch.FTSearch)
}
s.arenaPool = &anyenc.ArenaPool{}
s.repoPath = app.MustComponent[wallet.Wallet](a).RepoPath()
s.anyStoreConfig = *app.MustComponent[configProvider](a).GetAnyStoreConfig()

cfg := app.MustComponent[configProvider](a)
s.objectStorePath = filepath.Join(repoPath, "objectstore")
s.anyStoreConfig = *cfg.GetAnyStoreConfig()
s.setDefaultConfig()
s.oldStore = app.MustComponent[oldstore.Service](a)
s.techSpaceIdProvider = app.MustComponent[TechSpaceIdProvider](a)
Expand All @@ -170,12 +177,23 @@ func (s *dsObjectStore) Name() (name string) {
func (s *dsObjectStore) Run(ctx context.Context) error {
s.techSpaceId = s.techSpaceIdProvider.TechSpaceId()

dbDir := s.storeRootDir()
err := ensureDirExists(dbDir)
err := ensureDirExists(s.objectStorePath)
if err != nil {
return err
}
return s.openDatabase(ctx, filepath.Join(dbDir, "objects.db"))
err = s.openDatabase(ctx, filepath.Join(s.objectStorePath, "objects.db"))
if err != nil {
return fmt.Errorf("open db: %w", err)
}

store, err := spaceresolverstore.New(s.componentCtx, s.anyStore)
if err != nil {
return fmt.Errorf("new space resolver store: %w", err)
}

s.Store = store

return err
}

func (s *dsObjectStore) setDefaultConfig() {
Expand All @@ -186,10 +204,6 @@ func (s *dsObjectStore) setDefaultConfig() {
s.anyStoreConfig.SQLiteConnectionOptions["synchronous"] = "off"
}

func (s *dsObjectStore) storeRootDir() string {
return filepath.Join(s.repoPath, "objectstore")
}

func ensureDirExists(dir string) error {
_, err := os.Stat(dir)
if errors.Is(err, os.ErrNotExist) {
Expand Down Expand Up @@ -222,10 +236,6 @@ func (s *dsObjectStore) openDatabase(ctx context.Context, path string) error {
if err != nil {
return errors.Join(store.Close(), fmt.Errorf("open virtualSpaces collection: %w", err))
}
bindId, err := store.Collection(ctx, "bindId")
if err != nil {
return errors.Join(store.Close(), fmt.Errorf("open bindId collection: %w", err))
}

s.anyStore = store
s.anyStoreLockRemove = lockRemove
Expand All @@ -234,7 +244,6 @@ func (s *dsObjectStore) openDatabase(ctx context.Context, path string) error {
s.system = system
s.indexerChecksums = indexerChecksums
s.virtualSpaces = virtualSpaces
s.bindId = bindId

return nil
}
Expand All @@ -245,7 +254,7 @@ func (s *dsObjectStore) preloadExistingObjectStores() error {
var err error
s.spaceStoreDirsCheck.Do(func() {
var entries []os.DirEntry
entries, err = os.ReadDir(s.storeRootDir())
entries, err = os.ReadDir(s.objectStorePath)
s.Lock()
defer s.Unlock()
for _, entry := range entries {
Expand Down Expand Up @@ -307,7 +316,7 @@ func (s *dsObjectStore) SpaceIndex(spaceId string) spaceindex.Store {
func (s *dsObjectStore) getOrInitSpaceIndex(spaceId string) spaceindex.Store {
store, ok := s.spaceIndexes[spaceId]
if !ok {
dir := filepath.Join(s.storeRootDir(), spaceId)
dir := filepath.Join(s.objectStorePath, spaceId)
err := ensureDirExists(dir)
if err != nil {
return spaceindex.NewInvalidStore(err)
Expand Down Expand Up @@ -341,7 +350,7 @@ func (s *dsObjectStore) GetCrdtDb(spaceId string) anystore.DB {

db, ok := s.crdtDbs[spaceId]
if !ok {
dir := filepath.Join(s.storeRootDir(), spaceId)
dir := filepath.Join(s.objectStorePath, spaceId)
err := ensureDirExists(dir)
if err != nil {
return nil
Expand Down
48 changes: 0 additions & 48 deletions pkg/lib/localstore/objectstore/space.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,16 @@
package objectstore

import (
"context"
"errors"

anystore "github.com/anyproto/any-store"
"github.com/anyproto/any-store/anyenc"
"github.com/anyproto/any-store/query"

"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/database"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
)

const bindKey = "b"

type SpaceNameGetter interface {
GetSpaceName(spaceId string) string
}

type SpaceIdBinder interface {
BindSpaceId(spaceId, objectId string) error
GetSpaceId(objectId string) (spaceId string, err error)
}

func (d *dsObjectStore) GetSpaceName(spaceId string) string {
records, err := d.SpaceIndex(d.techSpaceId).Query(database.Query{
Filters: []database.FilterRequest{
Expand All @@ -49,37 +35,3 @@ func (d *dsObjectStore) GetSpaceName(spaceId string) string {
}
return spaceName
}

func (d *dsObjectStore) BindSpaceId(spaceId, objectId string) error {
return d.modifyBind(d.componentCtx, objectId, spaceId)
}

func (d *dsObjectStore) GetSpaceId(objectId string) (spaceId string, err error) {
doc, err := d.bindId.FindId(d.componentCtx, objectId)
if err != nil {
if errors.Is(err, anystore.ErrDocNotFound) {
return "", domain.ErrObjectNotFound
}
return "", err
}
return doc.Value().GetString(bindKey), nil
}

func (d *dsObjectStore) modifyBind(ctx context.Context, objectId, spaceId string) error {
tx, err := d.bindId.WriteTx(ctx)
if err != nil {
return err
}
arena := d.arenaPool.Get()
defer d.arenaPool.Put(arena)
mod := query.ModifyFunc(func(a *anyenc.Arena, v *anyenc.Value) (result *anyenc.Value, modified bool, err error) {
v.Set(bindKey, arena.NewString(spaceId))
return v, true, nil
})
_, err = d.bindId.UpsertId(tx.Context(), objectId, mod)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
71 changes: 71 additions & 0 deletions pkg/lib/localstore/objectstore/spaceresolverstore/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package spaceresolverstore

import (
"context"
"errors"
"fmt"

anystore "github.com/anyproto/any-store"
"github.com/anyproto/any-store/anyenc"
"github.com/anyproto/any-store/query"

"github.com/anyproto/anytype-heart/core/domain"
)

const bindKey = "b"

type Store interface {
BindSpaceId(spaceId, objectId string) error
GetSpaceId(objectId string) (spaceId string, err error)
}

type dsObjectStore struct {
componentCtx context.Context
collection anystore.Collection
arenaPool *anyenc.ArenaPool
}

func New(componentCtx context.Context, db anystore.DB) (Store, error) {
collection, err := db.Collection(componentCtx, "bindId")
if err != nil {
return nil, fmt.Errorf("open bindId collection: %w", err)
}
return &dsObjectStore{
componentCtx: componentCtx,
arenaPool: &anyenc.ArenaPool{},
collection: collection,
}, nil
}

func (d *dsObjectStore) BindSpaceId(spaceId, objectId string) error {
return d.modifyBind(d.componentCtx, objectId, spaceId)
}

func (d *dsObjectStore) GetSpaceId(objectId string) (spaceId string, err error) {
doc, err := d.collection.FindId(d.componentCtx, objectId)
if err != nil {
if errors.Is(err, anystore.ErrDocNotFound) {
return "", domain.ErrObjectNotFound
}
return "", err
}
return doc.Value().GetString(bindKey), nil
}

func (d *dsObjectStore) modifyBind(ctx context.Context, objectId, spaceId string) error {
tx, err := d.collection.WriteTx(ctx)
if err != nil {
return err
}
arena := d.arenaPool.Get()
defer d.arenaPool.Put(arena)
mod := query.ModifyFunc(func(a *anyenc.Arena, v *anyenc.Value) (result *anyenc.Value, modified bool, err error) {
v.Set(bindKey, arena.NewString(spaceId))
return v, true, nil
})
_, err = d.collection.UpsertId(tx.Context(), objectId, mod)
if err != nil {
return errors.Join(err, tx.Rollback())
}
return tx.Commit()
}
3 changes: 2 additions & 1 deletion space/spacecore/oldstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const (
type ClientStorage interface {
oldstorage.SpaceStorageProvider
app.ComponentRunnable
GetBinds(spaceId string) ([]string, error)
// GetBoundObjectIds returns list of object ids bound (mapped) to space id
GetBoundObjectIds(spaceId string) ([]string, error)
AllSpaceIds() (ids []string, err error)
DeleteSpaceStorage(ctx context.Context, spaceId string) error
}
Expand Down
55 changes: 55 additions & 0 deletions space/spacecore/storage/badgerstorage/spacestorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,58 @@ func TestSpaceStorage_StoredIds_BigTxn(t *testing.T) {
require.NoError(t, err)
require.Len(t, storedIds, 0)
}

func newServiceFixture(t *testing.T) *storageService {
fx := newFixture(t)
fx.open(t)

t.Cleanup(func() {
fx.stop(t)
})

s := &storageService{
db: fx.db,
keys: newStorageServiceKeys(),
lockedSpaces: map[string]*lockSpace{},
}
return s
}

func TestStorageService_BindSpaceID(t *testing.T) {
fx := newServiceFixture(t)

err := fx.BindSpaceID("spaceId1", "objectId1")
require.NoError(t, err)

spaceId, err := fx.GetSpaceID("objectId1")
require.NoError(t, err)

require.Equal(t, spaceId, "spaceId1")
}

func TestStorageService_GetBoundObjectIds(t *testing.T) {
t.Run("with no bindings", func(t *testing.T) {
fx := newServiceFixture(t)

ids, err := fx.GetBoundObjectIds("spaceId")
require.NoError(t, err)
assert.Empty(t, ids)
})

t.Run("ok", func(t *testing.T) {
fx := newServiceFixture(t)

spaceId := "spaceId1"
err := fx.BindSpaceID(spaceId, "objectId1")
require.NoError(t, err)

err = fx.BindSpaceID(spaceId, "objectId2")
require.NoError(t, err)

ids, err := fx.GetBoundObjectIds(spaceId)
require.NoError(t, err)

assert.ElementsMatch(t, []string{"objectId1", "objectId2"}, ids)
})

}
Loading
Loading