Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Scylla API for restore #4192

Open
wants to merge 14 commits into
base: ml/scylla-api
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cmd/agent/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ const (
//
// ML_TODO: this is tmp solution (#4211) - verify that it's still needed before merging to master.
func objectStorageEndpointDirector(cfg agent.Config, logger log.Logger) func(r *http.Request) {
regex := regexp.MustCompile(`^/storage_service/backup$`)
regex := regexp.MustCompile(`^/storage_service/(backup|restore)$`)
resolver := endpointResolver(cfg)

return func(r *http.Request) {
Expand Down
27 changes: 27 additions & 0 deletions pkg/metrics/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type RestoreMetrics struct {
batchSize *prometheus.GaugeVec
remainingBytes *prometheus.GaugeVec
state *prometheus.GaugeVec
restoredBytes *prometheus.GaugeVec
restoreDuration *prometheus.GaugeVec
downloadedBytes *prometheus.GaugeVec
downloadDuration *prometheus.GaugeVec
streamedBytes *prometheus.GaugeVec
Expand All @@ -31,6 +33,8 @@ func NewRestoreMetrics() RestoreMetrics {
remainingBytes: g("Remaining bytes of backup to be restored yet.", "remaining_bytes",
"cluster", "snapshot_tag", "location", "dc", "node", "keyspace", "table"),
state: g("Defines current state of the restore process (idle/download/load/error).", "state", "cluster", "location", "snapshot_tag", "host"),
restoredBytes: g("Restored bytes", "restored_bytes", "cluster", "host"),
restoreDuration: g("Restore duration in ms", "restore_duration", "cluster", "host"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you need or want to use this metric for? What is the purpose of it ?

downloadedBytes: g("Downloaded bytes", "downloaded_bytes", "cluster", "location", "host"),
downloadDuration: g("Download duration in ms", "download_duration", "cluster", "location", "host"),
streamedBytes: g("Load&Streamed bytes", "streamed_bytes", "cluster", "host"),
Expand All @@ -51,6 +55,8 @@ func (m RestoreMetrics) all() []prometheus.Collector {
m.batchSize,
m.remainingBytes,
m.state,
m.restoredBytes,
m.restoreDuration,
m.downloadedBytes,
m.downloadDuration,
m.streamedBytes,
Expand Down Expand Up @@ -155,6 +161,9 @@ const (
RestoreStateLoading
// RestoreStateError means that node ended up with error.
RestoreStateError
// RestoreStateNativeRestore means that node is restoring with
// native Scylla restore task.
RestoreStateNativeRestore
)

// SetRestoreState sets restore "state" metric.
Expand All @@ -168,6 +177,24 @@ func (m RestoreMetrics) SetRestoreState(clusterID uuid.UUID, location backupspec
m.state.With(l).Set(float64(state))
}

// IncreaseRestoredBytes increases restore "restored_bytes" metric.
func (m RestoreMetrics) IncreaseRestoredBytes(clusterID uuid.UUID, host string, bytes int64) {
l := prometheus.Labels{
"cluster": clusterID.String(),
"host": host,
}
m.restoredBytes.With(l).Add(float64(bytes))
}

// IncreaseRestoreDuration increases restore "restore_duration" metric.
func (m RestoreMetrics) IncreaseRestoreDuration(clusterID uuid.UUID, host string, d time.Duration) {
l := prometheus.Labels{
"cluster": clusterID.String(),
"host": host,
}
m.restoreDuration.With(l).Add(float64(d.Milliseconds()))
}

// IncreaseRestoreDownloadedBytes increases restore "downloaded_bytes" metric.
func (m RestoreMetrics) IncreaseRestoreDownloadedBytes(clusterID uuid.UUID, location, host string, bytes int64) {
l := prometheus.Labels{
Expand Down
14 changes: 8 additions & 6 deletions pkg/schema/table/table.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions pkg/scyllaclient/client_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,24 @@ func (c *Client) ScyllaBackup(ctx context.Context, host, endpoint, bucket, prefi
return resp.GetPayload(), nil
}

// ScyllaRestore schedules Scylla restore task and returns its ID.
// tocComponents are a list of sstable.ComponentTOC of SSTables that should be restored.
func (c *Client) ScyllaRestore(ctx context.Context, host, endpoint, bucket, prefix, keyspace, table string, tocComponents []string) (string, error) {
resp, err := c.scyllaOps.StorageServiceRestorePost(&operations.StorageServiceRestorePostParams{
Context: forceHost(ctx, host),
Endpoint: endpoint,
Bucket: bucket,
Prefix: prefix,
Keyspace: keyspace,
Table: table,
Sstables: tocComponents,
})
if err != nil {
return "", err
}
return resp.GetPayload(), nil
}

// ScyllaTaskState describes Scylla task state.
type ScyllaTaskState string

Expand Down
92 changes: 75 additions & 17 deletions pkg/service/restore/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import (

"github.com/pkg/errors"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/sstable"
)

// batchDispatcher is a tool for batching SSTables from
// Workload across different hosts during restore.
// It follows a few rules:
//
// - all SSTables within a batch have the same have the same batchType
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// - all SSTables within a batch have the same have the same batchType
// - all SSTables within a batch have the same batchType

//
// - it dispatches batches from the RemoteDirWorkload with the biggest
// initial size first
//
Expand Down Expand Up @@ -103,7 +106,7 @@ type workloadProgress struct {
// that are yet to be batched.
type remoteSSTableDirProgress struct {
RemainingSize int64
RemainingSSTables []RemoteSSTable
RemainingSSTables map[batchType][]RemoteSSTable
}

func newWorkloadProgress(workload Workload, locationHosts map[Location][]string) workloadProgress {
Expand All @@ -115,7 +118,7 @@ func newWorkloadProgress(workload Workload, locationHosts map[Location][]string)
locationDC[rdw.Location.StringWithoutDC()] = append(locationDC[rdw.Location.StringWithoutDC()], rdw.DC)
p[i] = remoteSSTableDirProgress{
RemainingSize: rdw.Size,
RemainingSSTables: rdw.SSTables,
RemainingSSTables: groupSSTablesByBatchType(rdw.SSTables),
}
}
hostDCAccess := make(map[string][]string)
Expand Down Expand Up @@ -150,11 +153,24 @@ type batch struct {
TableName
*ManifestInfo

batchType batchType
RemoteSSTableDir string
Size int64
SSTables []RemoteSSTable
}

// Dividing batches by simplifies the restore procedure:
// - Files from versioned batches need to be downloaded one by one
// in order to rename them on the fly with Rclone API.
// - Batches with sstable.UUID type can be restored with native Scylla restore API.
type batchType struct {
// All SSTables within a batch have the same ID type
IDType sstable.IDType
// All SSTables within a batch are either versioned or not.
Versioned bool
// In theory, batchType{IDType: sstable.UUID, Versioned: true} shouldn't exist
}

func (b batch) NotVersionedSSTables() []RemoteSSTable {
var ssts []RemoteSSTable
for _, sst := range b.SSTables {
Expand Down Expand Up @@ -188,21 +204,36 @@ func (b batch) VersionedSize() int64 {
func (b batch) IDs() []string {
var ids []string
for _, sst := range b.SSTables {
ids = append(ids, sst.ID)
ids = append(ids, sst.ID.ID)
}
return ids
}

// TOC returns a list of batch's sstable.ComponentTOC.
func (b batch) TOC() []string {
out := make([]string, 0, len(b.SSTables))
for _, sst := range b.SSTables {
out = append(out, sst.TOC)
}
return out
}

// ValidateAllDispatched returns error if not all SSTables were dispatched.
func (bd *batchDispatcher) ValidateAllDispatched() error {
bd.mu.Lock()
defer bd.mu.Unlock()

for i, rdp := range bd.workloadProgress.remoteDir {
if rdp.RemainingSize != 0 || len(rdp.RemainingSSTables) != 0 {
failed := rdp.RemainingSize != 0
for _, ssts := range rdp.RemainingSSTables {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i understand correctly, we can have a situation when RemaningSize is 0, but there are still some RemaningSSTables? How this can happen?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't. I just wanted to keep on checking both size and sstables count just to be on the safe side (as it was done before).

To be honest, checking size seems less robust (maybe 0 size sstable is not realistic, but perhaps we might encounter some bug where Rclone reports 0 size by mistake).

I think that we can get rid of relying on remaining size and use sstable count instead.
How do you find this idea?

Copy link
Collaborator

@VAveryanov8 VAveryanov8 Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think either one of these will work. if checking the sstables count is more safe, then we can use only it.

if len(ssts) != 0 {
failed = true
}
}
if failed {
rdw := bd.workload.RemoteDir[i]
return errors.Errorf("failed to restore sstables from location %s table %s.%s (%d bytes). See logs for more info",
rdw.Location, rdw.Keyspace, rdw.Table, rdw.Size)
rdw.Location, rdw.Keyspace, rdw.Table, rdp.RemainingSize)
}
}
for dc, bytes := range bd.workloadProgress.dcBytesToBeRestored {
Expand Down Expand Up @@ -277,6 +308,21 @@ func (bd *batchDispatcher) createBatch(dirIdx int, host string) (batch, bool) {
if shardCnt == 0 {
shardCnt = 1
}

// Choose batch type and candidate sstables
var batchT batchType
var sstables []RemoteSSTable
for bt, ssts := range rdp.RemainingSSTables {
if len(ssts) > 0 {
batchT = bt
sstables = ssts
break
}
}
if len(sstables) == 0 {
return batch{}, false
}

var i int
var size int64
if bd.batchSize == maxBatchSize {
Expand All @@ -286,13 +332,13 @@ func (bd *batchDispatcher) createBatch(dirIdx int, host string) (batch, bool) {
sizeLimit := expectedNodeWorkload / 20
for {
for range shardCnt {
if i >= len(rdp.RemainingSSTables) {
if i >= len(sstables) {
break
}
size += rdp.RemainingSSTables[i].Size
size += sstables[i].Size
i++
}
if i >= len(rdp.RemainingSSTables) {
if i >= len(sstables) {
break
}
if size > sizeLimit {
Expand All @@ -301,9 +347,9 @@ func (bd *batchDispatcher) createBatch(dirIdx int, host string) (batch, bool) {
}
} else {
// Create batch containing node_shard_count*batch_size sstables.
i = min(bd.batchSize*int(shardCnt), len(rdp.RemainingSSTables))
i = min(bd.batchSize*int(shardCnt), len(sstables))
for j := range i {
size += rdp.RemainingSSTables[j].Size
size += sstables[j].Size
}
}

Expand All @@ -312,23 +358,23 @@ func (bd *batchDispatcher) createBatch(dirIdx int, host string) (batch, bool) {
}
// Extend batch if it was to leave less than
// 1 sstable per shard for the next one.
if len(rdp.RemainingSSTables)-i < int(shardCnt) {
for ; i < len(rdp.RemainingSSTables); i++ {
size += rdp.RemainingSSTables[i].Size
if len(sstables)-i < int(shardCnt) {
for ; i < len(sstables); i++ {
size += sstables[i].Size
}
}

sstables := rdp.RemainingSSTables[:i]
rdp.RemainingSSTables = rdp.RemainingSSTables[i:]
rdp.RemainingSSTables[batchT] = sstables[i:]
rdw := bd.workload.RemoteDir[dirIdx]

rdp.RemainingSize -= size
return batch{
TableName: rdw.TableName,
ManifestInfo: rdw.ManifestInfo,
batchType: batchT,
RemoteSSTableDir: rdw.RemoteSSTableDir,
Size: size,
SSTables: sstables,
SSTables: sstables[:i],
}, true
}

Expand Down Expand Up @@ -365,7 +411,7 @@ func (bd *batchDispatcher) ReportFailure(host string, b batch) error {
}

rdp := &bd.workloadProgress.remoteDir[dirIdx]
rdp.RemainingSSTables = append(b.SSTables, rdp.RemainingSSTables...)
rdp.RemainingSSTables[b.batchType] = append(b.SSTables, rdp.RemainingSSTables[b.batchType]...)
rdp.RemainingSize += b.Size

bd.wakeUpWaiting()
Expand Down Expand Up @@ -394,3 +440,15 @@ func sortWorkload(workload Workload) {
})
}
}

func groupSSTablesByBatchType(sstables []RemoteSSTable) map[batchType][]RemoteSSTable {
out := make(map[batchType][]RemoteSSTable)
for _, sst := range sstables {
bt := batchType{
IDType: sst.ID.Type,
Versioned: sst.Versioned,
}
out[bt] = append(out[bt], sst)
}
return out
}
Loading