Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: #7747 sealing: Adding conf variable for capping number of concurrent unsealing jobs #7884

Merged
merged 5 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions documentation/en/default-lotus-miner-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,14 @@
# env var: LOTUS_DAGSTORE_MAXCONCURRENTREADYFETCHES
#MaxConcurrentReadyFetches = 0

# The maximum amount of unseals that can be processed simultaneously
# from the storage subsystem. 0 means unlimited.
# Default value: 0 (unlimited).
#
# type: int
# env var: LOTUS_DAGSTORE_MAXCONCURRENTUNSEALS
#MaxConcurrentUnseals = 5

# The maximum number of simultaneous inflight API calls to the storage
# subsystem.
# Default value: 100.
Expand Down
38 changes: 26 additions & 12 deletions markets/dagstore/miner_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,28 @@ type SectorAccessor interface {
}

type minerAPI struct {
pieceStore piecestore.PieceStore
sa SectorAccessor
throttle throttle.Throttler
readyMgr *shared.ReadyManager
pieceStore piecestore.PieceStore
sa SectorAccessor
throttle throttle.Throttler
unsealThrottle throttle.Throttler
readyMgr *shared.ReadyManager
}

var _ MinerAPI = (*minerAPI)(nil)

func NewMinerAPI(store piecestore.PieceStore, sa SectorAccessor, concurrency int) MinerAPI {
func NewMinerAPI(store piecestore.PieceStore, sa SectorAccessor, concurrency int, unsealConcurrency int) MinerAPI {
var unsealThrottle throttle.Throttler
if unsealConcurrency == 0 {
unsealThrottle = throttle.Noop()
} else {
unsealThrottle = throttle.Fixed(unsealConcurrency)
}
return &minerAPI{
pieceStore: store,
sa: sa,
throttle: throttle.Fixed(concurrency),
readyMgr: shared.NewReadyManager(),
pieceStore: store,
sa: sa,
throttle: throttle.Fixed(concurrency),
unsealThrottle: unsealThrottle,
readyMgr: shared.NewReadyManager(),
}
}

Expand Down Expand Up @@ -152,13 +160,19 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (mo
}

lastErr := xerrors.New("no sectors found to unseal from")

// if there is no unsealed sector containing the piece, just read the piece from the first sector we are able to unseal.
for _, deal := range pieceInfo.Deals {
// Note that if the deal data is not already unsealed, unsealing may
// block for a long time with the current PoRep
//
// This path is unthrottled.
reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
var reader mount.Reader
deal := deal
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
// Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing.
reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
return err
})

if err != nil {
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
log.Warn(lastErr.Error())
Expand Down
6 changes: 3 additions & 3 deletions markets/dagstore/miner_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
rpn := &mockRPN{
sectors: mockData,
}
api := NewMinerAPI(ps, rpn, 100)
api := NewMinerAPI(ps, rpn, 100, 5)
require.NoError(t, api.Start(ctx))

// Add deals to piece store
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {

ps := getPieceStore(t)
rpn := &mockRPN{}
api := NewMinerAPI(ps, rpn, 100)
api := NewMinerAPI(ps, rpn, 100, 5)
require.NoError(t, api.Start(ctx))

// Add a deal with data Length 10
Expand All @@ -142,7 +142,7 @@ func TestThrottle(t *testing.T) {
unsealedSectorID: "foo",
},
}
api := NewMinerAPI(ps, rpn, 3)
api := NewMinerAPI(ps, rpn, 3, 5)
require.NoError(t, api.Start(ctx))

// Add a deal with data Length 10
Expand Down
2 changes: 1 addition & 1 deletion markets/dagstore/wrapper_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestShardRegistration(t *testing.T) {
cfg := config.DefaultStorageMiner().DAGStore
cfg.RootDir = t.TempDir()

mapi := NewMinerAPI(ps, &wrappedSA{sa}, 10)
mapi := NewMinerAPI(ps, &wrappedSA{sa}, 10, 5)
dagst, w, err := NewDAGStore(cfg, mapi)
require.NoError(t, err)
require.NotNil(t, dagst)
Expand Down
1 change: 1 addition & 0 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func DefaultStorageMiner() *StorageMiner {
DAGStore: DAGStoreConfig{
MaxConcurrentIndex: 5,
MaxConcurrencyStorageCalls: 100,
MaxConcurrentUnseals: 5,
GCInterval: Duration(1 * time.Minute),
},
}
Expand Down
8 changes: 8 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ type DAGStoreConfig struct {
// Default value: 0 (unlimited).
MaxConcurrentReadyFetches int

// The maximum amount of unseals that can be processed simultaneously
// from the storage subsystem. 0 means unlimited.
// Default value: 0 (unlimited).
MaxConcurrentUnseals int

// The maximum number of simultaneous inflight API calls to the storage
// subsystem.
// Default value: 100.
Expand Down
2 changes: 1 addition & 1 deletion node/modules/storageminer_dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderP
}
}

mountApi := mdagstore.NewMinerAPI(pieceStore, sa, cfg.MaxConcurrencyStorageCalls)
mountApi := mdagstore.NewMinerAPI(pieceStore, sa, cfg.MaxConcurrencyStorageCalls, cfg.MaxConcurrentUnseals)
ready := make(chan error, 1)
pieceStore.OnReady(func(err error) {
ready <- err
Expand Down