Skip to content

Commit

Permalink
Catalog: use config interface (#8569)
Browse files Browse the repository at this point in the history
  • Loading branch information
N-o-Z authored Jan 30, 2025
1 parent 9badc63 commit 8ec58b3
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"github.com/hashicorp/go-multierror"
lru "github.com/hnlq715/golang-lru"
"github.com/rs/xid"
blockfactory "github.com/treeverse/lakefs/modules/block/factory"
"github.com/treeverse/lakefs/pkg/batch"
"github.com/treeverse/lakefs/pkg/block"
blockfactory "github.com/treeverse/lakefs/pkg/block/factory"
"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/graveler/branch"
Expand Down Expand Up @@ -214,7 +214,7 @@ type WriteRangeRequest struct {
}

type Config struct {
Config *config.BaseConfig
Config config.Config
KVStore kv.Store
WalkerFactory WalkerFactory
SettingsManagerOption settings.ManagerOption
Expand Down Expand Up @@ -313,10 +313,11 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
return nil, fmt.Errorf("build block adapter: %w", err)
}
if cfg.WalkerFactory == nil {
cfg.WalkerFactory = store.NewFactory(cfg.Config)
// TODO(niro): Walkfer factory should be removed from catalog. This is a WA which relies on Blockstore configuration
cfg.WalkerFactory = store.NewFactory(cfg.Config.GetBaseConfig())
}

tierFSParams, err := pyramidparams.NewCommittedTierFSParams(cfg.Config, adapter)
tierFSParams, err := pyramidparams.NewCommittedTierFSParams(cfg.Config.GetBaseConfig(), adapter)
if err != nil {
cancelFn()
return nil, fmt.Errorf("configure tiered FS for committed: %w", err)
Expand Down Expand Up @@ -347,11 +348,12 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
sstableManager := sstable.NewPebbleSSTableRangeManager(pebbleSSTableCache, rangeFS, hashAlg)
sstableMetaManager := sstable.NewPebbleSSTableRangeManager(pebbleSSTableCache, metaRangeFS, hashAlg)

baseCfg := cfg.Config.GetBaseConfig()
committedParams := committed.Params{
MinRangeSizeBytes: cfg.Config.Committed.Permanent.MinRangeSizeBytes,
MaxRangeSizeBytes: cfg.Config.Committed.Permanent.MaxRangeSizeBytes,
RangeSizeEntriesRaggedness: cfg.Config.Committed.Permanent.RangeRaggednessEntries,
MaxUploaders: cfg.Config.Committed.LocalCache.MaxUploadersPerWriter,
MinRangeSizeBytes: baseCfg.Committed.Permanent.MinRangeSizeBytes,
MaxRangeSizeBytes: baseCfg.Committed.Permanent.MaxRangeSizeBytes,
RangeSizeEntriesRaggedness: baseCfg.Committed.Permanent.RangeRaggednessEntries,
MaxUploaders: baseCfg.Committed.LocalCache.MaxUploadersPerWriter,
}
sstableMetaRangeManager, err := committed.NewMetaRangeManager(
committedParams,
Expand All @@ -369,7 +371,7 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
go executor.Run(ctx)

// Setup rate limiter used for background operations
limiter := newLimiter(cfg.Config.Graveler.Background.RateLimit)
limiter := newLimiter(baseCfg.Graveler.Background.RateLimit)

storeLimiter := kv.NewStoreLimiter(cfg.KVStore, limiter)
addressProvider := ident.NewHexAddressProvider()
Expand All @@ -379,21 +381,21 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
KVStore: cfg.KVStore,
KVStoreLimited: storeLimiter,
AddressProvider: addressProvider,
RepositoryCacheConfig: ref.CacheConfig(cfg.Config.Graveler.RepositoryCache),
CommitCacheConfig: ref.CacheConfig(cfg.Config.Graveler.CommitCache),
MaxBatchDelay: cfg.Config.Graveler.MaxBatchDelay,
BranchApproximateOwnershipParams: makeBranchApproximateOwnershipParams(cfg.Config.Graveler.BranchOwnership),
RepositoryCacheConfig: ref.CacheConfig(baseCfg.Graveler.RepositoryCache),
CommitCacheConfig: ref.CacheConfig(baseCfg.Graveler.CommitCache),
MaxBatchDelay: baseCfg.Graveler.MaxBatchDelay,
BranchApproximateOwnershipParams: makeBranchApproximateOwnershipParams(baseCfg.Graveler.BranchOwnership),
})
gcManager := retention.NewGarbageCollectionManager(tierFSParams.Adapter, refManager, cfg.Config.Committed.BlockStoragePrefix)
gcManager := retention.NewGarbageCollectionManager(tierFSParams.Adapter, refManager, baseCfg.Committed.BlockStoragePrefix)
settingManager := settings.NewManager(refManager, cfg.KVStore)
if cfg.SettingsManagerOption != nil {
cfg.SettingsManagerOption(settingManager)
}

protectedBranchesManager := branch.NewProtectionManager(settingManager)
stagingManager := staging.NewManager(ctx, cfg.KVStore, storeLimiter, cfg.Config.Graveler.BatchDBIOTransactionMarkers, executor)
stagingManager := staging.NewManager(ctx, cfg.KVStore, storeLimiter, baseCfg.Graveler.BatchDBIOTransactionMarkers, executor)
var deleteSensor *graveler.DeleteSensor
if cfg.Config.Graveler.CompactionSensorThreshold > 0 {
if baseCfg.Graveler.CompactionSensorThreshold > 0 {
cb := func(repositoryID graveler.RepositoryID, branchID graveler.BranchID, stagingTokenID graveler.StagingToken, inGrace bool) {
logging.FromContext(ctx).WithFields(logging.Fields{
"repositoryID": repositoryID,
Expand All @@ -402,7 +404,7 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
"inGrace": inGrace,
}).Info("Delete sensor callback")
}
deleteSensor = graveler.NewDeleteSensor(cfg.Config.Graveler.CompactionSensorThreshold, cb)
deleteSensor = graveler.NewDeleteSensor(baseCfg.Graveler.CompactionSensorThreshold, cb)
}
gStore := graveler.NewGraveler(committedManager, stagingManager, refManager, gcManager, protectedBranchesManager, deleteSensor)

Expand All @@ -412,8 +414,8 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
return &Catalog{
BlockAdapter: tierFSParams.Adapter,
Store: gStore,
UGCPrepareMaxFileSize: cfg.Config.UGC.PrepareMaxFileSize,
UGCPrepareInterval: cfg.Config.UGC.PrepareInterval,
UGCPrepareMaxFileSize: baseCfg.UGC.PrepareMaxFileSize,
UGCPrepareInterval: baseCfg.UGC.PrepareInterval,
PathProvider: cfg.PathProvider,
BackgroundLimiter: limiter,
walkerFactory: cfg.WalkerFactory,
Expand All @@ -423,7 +425,8 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
KVStoreLimited: storeLimiter,
addressProvider: addressProvider,
deleteSensor: deleteSensor,
signingKey: cfg.Config.Blockstore.Signing.SecretKey,
// TODO(niro): This should be removed - we need to return the signing key dynamically from the blockAdapter
signingKey: baseCfg.Blockstore.Signing.SecretKey,
}, nil
}

Expand Down

0 comments on commit 8ec58b3

Please sign in to comment.