-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Scylla API for restore #4192
base: ml/scylla-api
Are you sure you want to change the base?
Changes from all commits
39011de
d8c54f1
c73371c
bd8180b
2efd8b4
55aa3f0
2c5e263
0d62fb0
055d2ad
bb0bba0
63d029a
773cf1e
da7a59b
a21b97c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -9,12 +9,15 @@ import ( | |||||
|
||||||
"github.com/pkg/errors" | ||||||
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" | ||||||
"github.com/scylladb/scylla-manager/v3/pkg/sstable" | ||||||
) | ||||||
|
||||||
// batchDispatcher is a tool for batching SSTables from | ||||||
// Workload across different hosts during restore. | ||||||
// It follows a few rules: | ||||||
// | ||||||
// - all SSTables within a batch have the same have the same batchType | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
// | ||||||
// - it dispatches batches from the RemoteDirWorkload with the biggest | ||||||
// initial size first | ||||||
// | ||||||
|
@@ -103,7 +106,7 @@ type workloadProgress struct { | |||||
// that are yet to be batched. | ||||||
type remoteSSTableDirProgress struct { | ||||||
RemainingSize int64 | ||||||
RemainingSSTables []RemoteSSTable | ||||||
RemainingSSTables map[batchType][]RemoteSSTable | ||||||
} | ||||||
|
||||||
func newWorkloadProgress(workload Workload, locationHosts map[Location][]string) workloadProgress { | ||||||
|
@@ -115,7 +118,7 @@ func newWorkloadProgress(workload Workload, locationHosts map[Location][]string) | |||||
locationDC[rdw.Location.StringWithoutDC()] = append(locationDC[rdw.Location.StringWithoutDC()], rdw.DC) | ||||||
p[i] = remoteSSTableDirProgress{ | ||||||
RemainingSize: rdw.Size, | ||||||
RemainingSSTables: rdw.SSTables, | ||||||
RemainingSSTables: groupSSTablesByBatchType(rdw.SSTables), | ||||||
} | ||||||
} | ||||||
hostDCAccess := make(map[string][]string) | ||||||
|
@@ -150,11 +153,24 @@ type batch struct { | |||||
TableName | ||||||
*ManifestInfo | ||||||
|
||||||
batchType batchType | ||||||
RemoteSSTableDir string | ||||||
Size int64 | ||||||
SSTables []RemoteSSTable | ||||||
} | ||||||
|
||||||
// Dividing batches by simplifies the restore procedure: | ||||||
// - Files from versioned batches need to be downloaded one by one | ||||||
// in order to rename them on the fly with Rclone API. | ||||||
// - Batches with sstable.UUID type can be restored with native Scylla restore API. | ||||||
type batchType struct { | ||||||
// All SSTables within a batch have the same ID type | ||||||
IDType sstable.IDType | ||||||
// All SSTables within a batch are either versioned or not. | ||||||
Versioned bool | ||||||
// In theory, batchType{IDType: sstable.UUID, Versioned: true} shouldn't exist | ||||||
} | ||||||
|
||||||
func (b batch) NotVersionedSSTables() []RemoteSSTable { | ||||||
var ssts []RemoteSSTable | ||||||
for _, sst := range b.SSTables { | ||||||
|
@@ -188,21 +204,36 @@ func (b batch) VersionedSize() int64 { | |||||
func (b batch) IDs() []string { | ||||||
var ids []string | ||||||
for _, sst := range b.SSTables { | ||||||
ids = append(ids, sst.ID) | ||||||
ids = append(ids, sst.ID.ID) | ||||||
} | ||||||
return ids | ||||||
} | ||||||
|
||||||
// TOC returns a list of batch's sstable.ComponentTOC. | ||||||
func (b batch) TOC() []string { | ||||||
out := make([]string, 0, len(b.SSTables)) | ||||||
for _, sst := range b.SSTables { | ||||||
out = append(out, sst.TOC) | ||||||
} | ||||||
return out | ||||||
} | ||||||
|
||||||
// ValidateAllDispatched returns error if not all SSTables were dispatched. | ||||||
func (bd *batchDispatcher) ValidateAllDispatched() error { | ||||||
bd.mu.Lock() | ||||||
defer bd.mu.Unlock() | ||||||
|
||||||
for i, rdp := range bd.workloadProgress.remoteDir { | ||||||
if rdp.RemainingSize != 0 || len(rdp.RemainingSSTables) != 0 { | ||||||
failed := rdp.RemainingSize != 0 | ||||||
for _, ssts := range rdp.RemainingSSTables { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if i understand correctly, we can have a situation when RemaningSize is 0, but there are still some RemaningSSTables? How this can happen? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It shouldn't. I just wanted to keep on checking both size and sstables count just to be on the safe side (as it was done before). To be honest, checking size seems less robust (maybe 0 size sstable is not realistic, but perhaps we might encounter some bug where Rclone reports 0 size by mistake). I think that we can get rid of relying on remaining size and use sstable count instead. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I think either one of these will work. if checking the sstables count is more safe, then we can use only it. |
||||||
if len(ssts) != 0 { | ||||||
failed = true | ||||||
} | ||||||
} | ||||||
if failed { | ||||||
rdw := bd.workload.RemoteDir[i] | ||||||
return errors.Errorf("failed to restore sstables from location %s table %s.%s (%d bytes). See logs for more info", | ||||||
rdw.Location, rdw.Keyspace, rdw.Table, rdw.Size) | ||||||
rdw.Location, rdw.Keyspace, rdw.Table, rdp.RemainingSize) | ||||||
} | ||||||
} | ||||||
for dc, bytes := range bd.workloadProgress.dcBytesToBeRestored { | ||||||
|
@@ -277,6 +308,21 @@ func (bd *batchDispatcher) createBatch(dirIdx int, host string) (batch, bool) { | |||||
if shardCnt == 0 { | ||||||
shardCnt = 1 | ||||||
} | ||||||
|
||||||
// Choose batch type and candidate sstables | ||||||
var batchT batchType | ||||||
var sstables []RemoteSSTable | ||||||
for bt, ssts := range rdp.RemainingSSTables { | ||||||
if len(ssts) > 0 { | ||||||
batchT = bt | ||||||
sstables = ssts | ||||||
break | ||||||
} | ||||||
} | ||||||
if len(sstables) == 0 { | ||||||
return batch{}, false | ||||||
} | ||||||
|
||||||
var i int | ||||||
var size int64 | ||||||
if bd.batchSize == maxBatchSize { | ||||||
|
@@ -286,13 +332,13 @@ func (bd *batchDispatcher) createBatch(dirIdx int, host string) (batch, bool) { | |||||
sizeLimit := expectedNodeWorkload / 20 | ||||||
for { | ||||||
for range shardCnt { | ||||||
if i >= len(rdp.RemainingSSTables) { | ||||||
if i >= len(sstables) { | ||||||
break | ||||||
} | ||||||
size += rdp.RemainingSSTables[i].Size | ||||||
size += sstables[i].Size | ||||||
i++ | ||||||
} | ||||||
if i >= len(rdp.RemainingSSTables) { | ||||||
if i >= len(sstables) { | ||||||
break | ||||||
} | ||||||
if size > sizeLimit { | ||||||
|
@@ -301,9 +347,9 @@ func (bd *batchDispatcher) createBatch(dirIdx int, host string) (batch, bool) { | |||||
} | ||||||
} else { | ||||||
// Create batch containing node_shard_count*batch_size sstables. | ||||||
i = min(bd.batchSize*int(shardCnt), len(rdp.RemainingSSTables)) | ||||||
i = min(bd.batchSize*int(shardCnt), len(sstables)) | ||||||
for j := range i { | ||||||
size += rdp.RemainingSSTables[j].Size | ||||||
size += sstables[j].Size | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -312,23 +358,23 @@ func (bd *batchDispatcher) createBatch(dirIdx int, host string) (batch, bool) { | |||||
} | ||||||
// Extend batch if it was to leave less than | ||||||
// 1 sstable per shard for the next one. | ||||||
if len(rdp.RemainingSSTables)-i < int(shardCnt) { | ||||||
for ; i < len(rdp.RemainingSSTables); i++ { | ||||||
size += rdp.RemainingSSTables[i].Size | ||||||
if len(sstables)-i < int(shardCnt) { | ||||||
for ; i < len(sstables); i++ { | ||||||
size += sstables[i].Size | ||||||
} | ||||||
} | ||||||
|
||||||
sstables := rdp.RemainingSSTables[:i] | ||||||
rdp.RemainingSSTables = rdp.RemainingSSTables[i:] | ||||||
rdp.RemainingSSTables[batchT] = sstables[i:] | ||||||
rdw := bd.workload.RemoteDir[dirIdx] | ||||||
|
||||||
rdp.RemainingSize -= size | ||||||
return batch{ | ||||||
TableName: rdw.TableName, | ||||||
ManifestInfo: rdw.ManifestInfo, | ||||||
batchType: batchT, | ||||||
RemoteSSTableDir: rdw.RemoteSSTableDir, | ||||||
Size: size, | ||||||
SSTables: sstables, | ||||||
SSTables: sstables[:i], | ||||||
}, true | ||||||
} | ||||||
|
||||||
|
@@ -365,7 +411,7 @@ func (bd *batchDispatcher) ReportFailure(host string, b batch) error { | |||||
} | ||||||
|
||||||
rdp := &bd.workloadProgress.remoteDir[dirIdx] | ||||||
rdp.RemainingSSTables = append(b.SSTables, rdp.RemainingSSTables...) | ||||||
rdp.RemainingSSTables[b.batchType] = append(b.SSTables, rdp.RemainingSSTables[b.batchType]...) | ||||||
rdp.RemainingSize += b.Size | ||||||
|
||||||
bd.wakeUpWaiting() | ||||||
|
@@ -394,3 +440,15 @@ func sortWorkload(workload Workload) { | |||||
}) | ||||||
} | ||||||
} | ||||||
|
||||||
func groupSSTablesByBatchType(sstables []RemoteSSTable) map[batchType][]RemoteSSTable { | ||||||
out := make(map[batchType][]RemoteSSTable) | ||||||
for _, sst := range sstables { | ||||||
bt := batchType{ | ||||||
IDType: sst.ID.Type, | ||||||
Versioned: sst.Versioned, | ||||||
} | ||||||
out[bt] = append(out[bt], sst) | ||||||
} | ||||||
return out | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you need or want to use this metric for? What is the purpose of it ?