Skip to content

Commit

Permalink
Merge pull request vmware-tanzu#5190 from ywk253100/220808_restic
Browse files Browse the repository at this point in the history
Refactor the restic repo related code for Kopia integration
  • Loading branch information
qiuming-best authored Aug 18, 2022
2 parents 4e25f59 + 047c753 commit a36736e
Show file tree
Hide file tree
Showing 16 changed files with 581 additions and 526 deletions.
3 changes: 3 additions & 0 deletions pkg/apis/velero/v1/backup_repository_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ const (
BackupRepositoryPhaseNew BackupRepositoryPhase = "New"
BackupRepositoryPhaseReady BackupRepositoryPhase = "Ready"
BackupRepositoryPhaseNotReady BackupRepositoryPhase = "NotReady"

BackupRepositoryTypeRestic string = "restic"
BackupRepositoryTypeUnified string = "unified"
)

// BackupRepositoryStatus is the current status of a BackupRepository.
Expand Down
36 changes: 15 additions & 21 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ import (
"github.com/vmware-tanzu/velero/internal/storage"
"github.com/vmware-tanzu/velero/internal/util/managercontroller"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/podvolume"
"github.com/vmware-tanzu/velero/pkg/repository"
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
)

Expand Down Expand Up @@ -248,7 +250,9 @@ type server struct {
logger logrus.FieldLogger
logLevel logrus.Level
pluginRegistry clientmgmt.Registry
resticManager restic.RepositoryManager
repoManager repository.Manager
repoLocker *repository.RepoLocker
repoEnsurer *repository.RepositoryEnsurer
metrics *metrics.ServerMetrics
config serverConfig
mgr manager.Manager
Expand Down Expand Up @@ -536,22 +540,10 @@ func (s *server) initRestic() error {
return err
}

res, err := restic.NewRepositoryManager(
s.ctx,
s.namespace,
s.veleroClient,
s.sharedInformerFactory.Velero().V1().BackupRepositories(),
s.veleroClient.VeleroV1(),
s.mgr.GetClient(),
s.kubeClient.CoreV1(),
s.kubeClient.CoreV1(),
s.credentialFileStore,
s.logger,
)
if err != nil {
return err
}
s.resticManager = res
s.repoLocker = repository.NewRepoLocker()
s.repoEnsurer = repository.NewRepositoryEnsurer(s.sharedInformerFactory.Velero().V1().BackupRepositories(), s.veleroClient.VeleroV1(), s.logger)

s.repoManager = repository.NewManager(s.namespace, s.mgr.GetClient(), s.repoLocker, s.repoEnsurer, s.credentialFileStore, s.logger)

return nil
}
Expand Down Expand Up @@ -643,7 +635,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.discoveryHelper,
client.NewDynamicFactory(s.dynamicClient),
podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()),
s.resticManager,
podvolume.NewBackupperFactory(s.repoLocker, s.repoEnsurer, s.veleroClient, s.kubeClient.CoreV1(),
s.kubeClient.CoreV1(), s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger),
s.config.podVolumeOperationTimeout,
s.config.defaultVolumesToRestic,
s.config.clientPageSize,
Expand Down Expand Up @@ -704,7 +697,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
client.NewDynamicFactory(s.dynamicClient),
s.config.restoreResourcePriorities,
s.kubeClient.CoreV1().Namespaces(),
s.resticManager,
podvolume.NewRestorerFactory(s.repoLocker, s.repoEnsurer, s.veleroClient, s.kubeClient.CoreV1(),
s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger),
s.config.podVolumeOperationTimeout,
s.config.resourceTerminatingTimeout,
s.logger,
Expand Down Expand Up @@ -812,15 +806,15 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.logger.Fatal(err, "unable to create controller", "controller", controller.Schedule)
}

if err := controller.NewResticRepoReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.config.defaultResticMaintenanceFrequency, s.resticManager).SetupWithManager(s.mgr); err != nil {
if err := controller.NewResticRepoReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.config.defaultResticMaintenanceFrequency, s.repoManager).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.ResticRepo)
}

if err := controller.NewBackupDeletionReconciler(
s.logger,
s.mgr.GetClient(),
backupTracker,
s.resticManager,
s.repoManager,
s.metrics,
s.discoveryHelper,
newPluginManager,
Expand Down
11 changes: 6 additions & 5 deletions pkg/controller/backup_deletion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/persistence"
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
Expand All @@ -56,7 +57,7 @@ type backupDeletionReconciler struct {
client.Client
logger logrus.FieldLogger
backupTracker BackupTracker
resticMgr restic.RepositoryManager
repoMgr repository.Manager
metrics *metrics.ServerMetrics
clock clock.Clock
discoveryHelper discovery.Helper
Expand All @@ -69,7 +70,7 @@ func NewBackupDeletionReconciler(
logger logrus.FieldLogger,
client client.Client,
backupTracker BackupTracker,
resticMgr restic.RepositoryManager,
repoMgr repository.Manager,
metrics *metrics.ServerMetrics,
helper discovery.Helper,
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
Expand All @@ -79,7 +80,7 @@ func NewBackupDeletionReconciler(
Client: client,
logger: logger,
backupTracker: backupTracker,
resticMgr: resticMgr,
repoMgr: repoMgr,
metrics: metrics,
clock: clock.RealClock{},
discoveryHelper: helper,
Expand Down Expand Up @@ -435,7 +436,7 @@ func (r *backupDeletionReconciler) deleteExistingDeletionRequests(ctx context.Co
}

func (r *backupDeletionReconciler) deleteResticSnapshots(ctx context.Context, backup *velerov1api.Backup) []error {
if r.resticMgr == nil {
if r.repoMgr == nil {
return nil
}

Expand All @@ -449,7 +450,7 @@ func (r *backupDeletionReconciler) deleteResticSnapshots(ctx context.Context, ba

var errs []error
for _, snapshot := range snapshots {
if err := r.resticMgr.Forget(ctx2, snapshot); err != nil {
if err := r.repoMgr.Forget(ctx2, snapshot); err != nil {
errs = append(errs, err)
}
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/restic_repository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository"
repoconfig "github.com/vmware-tanzu/velero/pkg/repository/config"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/util/kube"
Expand All @@ -45,11 +46,11 @@ type ResticRepoReconciler struct {
logger logrus.FieldLogger
clock clock.Clock
defaultMaintenanceFrequency time.Duration
repositoryManager restic.RepositoryManager
repositoryManager repository.Manager
}

func NewResticRepoReconciler(namespace string, logger logrus.FieldLogger, client client.Client,
defaultMaintenanceFrequency time.Duration, repositoryManager restic.RepositoryManager) *ResticRepoReconciler {
defaultMaintenanceFrequency time.Duration, repositoryManager repository.Manager) *ResticRepoReconciler {
c := &ResticRepoReconciler{
client,
namespace,
Expand Down Expand Up @@ -163,7 +164,7 @@ func (r *ResticRepoReconciler) initializeRepo(ctx context.Context, req *velerov1
// ensureRepo checks to see if a repository exists, and attempts to initialize it if
// it does not exist. An error is returned if the repository can't be connected to
// or initialized.
func ensureRepo(repo *velerov1api.BackupRepository, repoManager restic.RepositoryManager) error {
func ensureRepo(repo *velerov1api.BackupRepository, repoManager repository.Manager) error {
if err := repoManager.ConnectToRepo(repo); err != nil {
// If the repository has not yet been initialized, the error message will always include
// the following string. This is the only scenario where we should try to initialize it.
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/restic_repository_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
ctrl "sigs.k8s.io/controller-runtime"

velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
resticmokes "github.com/vmware-tanzu/velero/pkg/restic/mocks"
repomokes "github.com/vmware-tanzu/velero/pkg/repository/mocks"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)

const defaultMaintenanceFrequency = 10 * time.Minute

func mockResticRepoReconciler(t *testing.T, rr *velerov1api.BackupRepository, mockOn string, arg interface{}, ret interface{}) *ResticRepoReconciler {
mgr := &resticmokes.RepositoryManager{}
mgr := &repomokes.RepositoryManager{}
if mockOn != "" {
mgr.On(mockOn, arg).Return(ret)
}
Expand Down
188 changes: 188 additions & 0 deletions pkg/repository/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package repository

import (
"context"
"fmt"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository/provider"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)

// Manager manages backup repositories.
type Manager interface {
// InitRepo initializes a repo with the specified name and identifier.
InitRepo(repo *velerov1api.BackupRepository) error

// ConnectToRepo runs the 'restic snapshots' command against the
// specified repo, and returns an error if it fails. This is
// intended to be used to ensure that the repo exists/can be
// authenticated to.
ConnectToRepo(repo *velerov1api.BackupRepository) error

// PruneRepo deletes unused data from a repo.
PruneRepo(repo *velerov1api.BackupRepository) error

// UnlockRepo removes stale locks from a repo.
UnlockRepo(repo *velerov1api.BackupRepository) error

// Forget removes a snapshot from the list of
// available snapshots in a repo.
Forget(context.Context, restic.SnapshotIdentifier) error
}

type manager struct {
namespace string
providers map[string]provider.Provider
client client.Client
repoLocker *RepoLocker
repoEnsurer *RepositoryEnsurer
fileSystem filesystem.Interface
log logrus.FieldLogger
}

// NewManager create a new repository manager.
func NewManager(
namespace string,
client client.Client,
repoLocker *RepoLocker,
repoEnsurer *RepositoryEnsurer,
credentialFileStore credentials.FileStore,
log logrus.FieldLogger,
) Manager {
mgr := &manager{
namespace: namespace,
client: client,
providers: map[string]provider.Provider{},
repoLocker: repoLocker,
repoEnsurer: repoEnsurer,
fileSystem: filesystem.NewFileSystem(),
log: log,
}

mgr.providers[velerov1api.BackupRepositoryTypeRestic] = provider.NewResticRepositoryProvider(credentialFileStore, mgr.fileSystem, mgr.log)

return mgr
}

func (m *manager) InitRepo(repo *velerov1api.BackupRepository) error {
m.repoLocker.LockExclusive(repo.Name)
defer m.repoLocker.UnlockExclusive(repo.Name)

prd, err := m.getRepositoryProvider(repo)
if err != nil {
return errors.WithStack(err)
}
param, err := m.assembleRepoParam(repo)
if err != nil {
return errors.WithStack(err)
}
return prd.InitRepo(context.Background(), param)
}

func (m *manager) ConnectToRepo(repo *velerov1api.BackupRepository) error {
m.repoLocker.Lock(repo.Name)
defer m.repoLocker.Unlock(repo.Name)

prd, err := m.getRepositoryProvider(repo)
if err != nil {
return errors.WithStack(err)
}
param, err := m.assembleRepoParam(repo)
if err != nil {
return errors.WithStack(err)
}
return prd.ConnectToRepo(context.Background(), param)
}

func (m *manager) PruneRepo(repo *velerov1api.BackupRepository) error {
m.repoLocker.LockExclusive(repo.Name)
defer m.repoLocker.UnlockExclusive(repo.Name)

prd, err := m.getRepositoryProvider(repo)
if err != nil {
return errors.WithStack(err)
}
param, err := m.assembleRepoParam(repo)
if err != nil {
return errors.WithStack(err)
}
return prd.PruneRepo(context.Background(), param)
}

func (m *manager) UnlockRepo(repo *velerov1api.BackupRepository) error {
m.repoLocker.Lock(repo.Name)
defer m.repoLocker.Unlock(repo.Name)

prd, err := m.getRepositoryProvider(repo)
if err != nil {
return errors.WithStack(err)
}
param, err := m.assembleRepoParam(repo)
if err != nil {
return errors.WithStack(err)
}
return prd.EnsureUnlockRepo(context.Background(), param)
}

func (m *manager) Forget(ctx context.Context, snapshot restic.SnapshotIdentifier) error {
repo, err := m.repoEnsurer.EnsureRepo(ctx, m.namespace, snapshot.VolumeNamespace, snapshot.BackupStorageLocation)
if err != nil {
return err
}

m.repoLocker.LockExclusive(repo.Name)
defer m.repoLocker.UnlockExclusive(repo.Name)

prd, err := m.getRepositoryProvider(repo)
if err != nil {
return errors.WithStack(err)
}
param, err := m.assembleRepoParam(repo)
if err != nil {
return errors.WithStack(err)
}
return prd.Forget(context.Background(), snapshot.SnapshotID, param)
}

func (m *manager) getRepositoryProvider(repo *velerov1api.BackupRepository) (provider.Provider, error) {
switch repo.Spec.RepositoryType {
case "", velerov1api.BackupRepositoryTypeRestic:
return m.providers[velerov1api.BackupRepositoryTypeRestic], nil
default:
return nil, fmt.Errorf("failed to get provider for repository %s", repo.Spec.RepositoryType)
}
}

func (m *manager) assembleRepoParam(repo *velerov1api.BackupRepository) (provider.RepoParam, error) {
bsl := &velerov1api.BackupStorageLocation{}
if err := m.client.Get(context.Background(), client.ObjectKey{m.namespace, repo.Spec.BackupStorageLocation}, bsl); err != nil {
return provider.RepoParam{}, errors.WithStack(err)
}
return provider.RepoParam{
BackupLocation: bsl,
BackupRepo: repo,
}, nil
}
Loading

0 comments on commit a36736e

Please sign in to comment.