Skip to content

Commit

Permalink
Merge pull request #4429 from ricardomaraschini/release-1.29-carry-over
Browse files Browse the repository at this point in the history
[Backport release-1.29] feat: implement watcher for oci bundles
  • Loading branch information
twz123 authored May 24, 2024
2 parents 8db0036 + d0a7c32 commit bff9835
Showing 1 changed file with 142 additions and 35 deletions.
177 changes: 142 additions & 35 deletions pkg/component/worker/ocibundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,31 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"time"

"github.com/avast/retry-go"
"github.com/containerd/containerd"
"github.com/containerd/containerd/platforms"
"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"

"github.com/k0sproject/k0s/internal/pkg/dir"
"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/component/prober"
"github.com/k0sproject/k0s/pkg/config"
"github.com/k0sproject/k0s/pkg/constant"
"github.com/sirupsen/logrus"
"github.com/k0sproject/k0s/pkg/debounce"
)

// OCIBundleReconciler tries to import OCI bundle into the running containerd instance
type OCIBundleReconciler struct {
k0sVars *config.CfgVars
log *logrus.Entry
k0sVars *config.CfgVars
log *logrus.Entry
alreadyImported map[string]time.Time
mtx sync.Mutex
cancel context.CancelFunc
end chan struct{}
*prober.EventEmitter
}

Expand All @@ -46,60 +54,155 @@ var _ manager.Component = (*OCIBundleReconciler)(nil)
// NewOCIBundleReconciler builds new reconciler
func NewOCIBundleReconciler(vars *config.CfgVars) *OCIBundleReconciler {
return &OCIBundleReconciler{
k0sVars: vars,
log: logrus.WithField("component", "OCIBundleReconciler"),
EventEmitter: prober.NewEventEmitter(),
k0sVars: vars,
log: logrus.WithField("component", "OCIBundleReconciler"),
EventEmitter: prober.NewEventEmitter(),
alreadyImported: map[string]time.Time{},
end: make(chan struct{}),
}
}

func (a *OCIBundleReconciler) Init(_ context.Context) error {
return dir.Init(a.k0sVars.OCIBundleDir, constant.ManifestsDirMode)
}

func (a *OCIBundleReconciler) Start(ctx context.Context) error {
files, err := os.ReadDir(a.k0sVars.OCIBundleDir)
if err != nil {
a.Emit("can't read bundles directory")
return fmt.Errorf("can't read bundles directory")
}
a.EmitWithPayload("importing OCI bundles", files)
if len(files) == 0 {
return nil
}
// loadOne connects to containerd and imports the provided OCI bundle.
func (a *OCIBundleReconciler) loadOne(ctx context.Context, fpath string) error {
var client *containerd.Client
sock := filepath.Join(a.k0sVars.RunDir, "containerd.sock")
err = retry.Do(func() error {
client, err = containerd.New(sock, containerd.WithDefaultNamespace("k8s.io"), containerd.WithDefaultPlatform(platforms.OnlyStrict(platforms.DefaultSpec())))
if err := retry.Do(func() (err error) {
client, err = containerd.New(
sock,
containerd.WithDefaultNamespace("k8s.io"),
containerd.WithDefaultPlatform(
platforms.OnlyStrict(platforms.DefaultSpec()),
),
)
if err != nil {
a.log.WithError(err).Errorf("can't connect to containerd socket %s", sock)
return err
return fmt.Errorf("failed to connect to containerd: %w", err)
}
_, err := client.ListImages(ctx)
if err != nil {
a.log.WithError(err).Errorf("can't use containerd client")
return err
if _, err = client.ListImages(ctx); err != nil {
return fmt.Errorf("failed to communicate with containerd: %w", err)
}
return nil
}, retry.Context(ctx), retry.Delay(time.Second*5))
if err != nil {
a.EmitWithPayload("can't connect to containerd socket", map[string]interface{}{"socket": sock, "error": err})
return fmt.Errorf("can't connect to containerd socket %s: %v", sock, err)
}, retry.Context(ctx), retry.Delay(time.Second*5)); err != nil {
return err
}
defer client.Close()
if err := a.unpackBundle(ctx, client, fpath); err != nil {
return fmt.Errorf("failed to process OCI bundle: %w", err)
}
return nil
}

// loadAll loads all OCI bundle files into containerd. Read all files from the OCI bundle
// directory and loads them one by one. Errors are logged but not returned, upon failure
// in one file this function logs the error and moves to the next file. Files are indexed
// by name and imported only once (if the file has not been modified).
func (a *OCIBundleReconciler) loadAll(ctx context.Context) {
// We are going to consume everything in the directory so we block. This keeps
// things simple and avoid the need to handle two imports of the same file at the
// same time without requiring locks based on file path.
a.mtx.Lock()
defer a.mtx.Unlock()

a.log.Info("Loading OCI bundles directory")
files, err := os.ReadDir(a.k0sVars.OCIBundleDir)
if err != nil {
a.log.WithError(err).Errorf("Failed to read bundles directory")
return
}
a.EmitWithPayload("importing OCI bundles", files)
for _, file := range files {
if err := a.unpackBundle(ctx, client, a.k0sVars.OCIBundleDir+"/"+file.Name()); err != nil {
a.EmitWithPayload("unpacking OCI bundle error", map[string]interface{}{"file": file.Name(), "error": err})
a.log.WithError(err).Errorf("can't unpack bundle %s", file.Name())
return fmt.Errorf("can't unpack bundle %s: %w", file.Name(), err)
fpath := filepath.Join(a.k0sVars.OCIBundleDir, file.Name())
finfo, err := os.Stat(fpath)
if err != nil {
a.log.WithError(err).Errorf("failed to stat %s", fpath)
continue
}

modtime := finfo.ModTime()
if when, ok := a.alreadyImported[fpath]; ok && when.Equal(modtime) {
continue
}

a.log.Infof("Loading OCI bundle %s", fpath)
if err := a.loadOne(ctx, fpath); err != nil {
a.log.WithError(err).Errorf("Failed to load OCI bundle %s", fpath)
continue
}

a.alreadyImported[fpath] = modtime
a.log.Infof("OCI bundle %s loaded", fpath)
}
a.Emit("finished importing OCI bundles")
}

// installWatcher creates a fs watcher on the oci bundle directory. This function calls
// loadAll every time a new file is created or updated on the oci directory. Events are
// debounced with a timeout of 10 seconds. Watcher is started with a buffer so we don't
// miss events.
func (a *OCIBundleReconciler) installWatcher(ctx context.Context) error {
watcher, err := fsnotify.NewBufferedWatcher(10)
if err != nil {
return fmt.Errorf("failed to create watcher: %w", err)
}

if err := watcher.Add(a.k0sVars.OCIBundleDir); err != nil {
return fmt.Errorf("failed to add watcher: %w", err)
}

debouncer := debounce.Debouncer[fsnotify.Event]{
Input: watcher.Events,
Timeout: 10 * time.Second,
Filter: func(item fsnotify.Event) bool {
switch item.Op {
case fsnotify.Remove, fsnotify.Rename:
return false
}
return true
},
Callback: func(ev fsnotify.Event) {
a.loadAll(ctx)
},
}

go func() {
for {
if err, ok := <-watcher.Errors; ok {
a.log.WithError(err).Error("Error watching OCI bundle directory")
continue
}
return
}
}()

go func() {
defer close(a.end)
a.log.Infof("Started to watch events on %s", a.k0sVars.OCIBundleDir)
_ = debouncer.Run(ctx)
if err := watcher.Close(); err != nil {
a.log.Errorf("Failed to close watcher: %s", err)
}
a.EmitWithPayload("unpacked OCI bundle", file.Name())
a.log.Info("OCI bundle watch bouncer ended")
}()

return nil
}

// Starts initiate the OCI bundle loader. It does an initial load of the directory and
// once it is done, it starts a watcher on its own goroutine.
func (a *OCIBundleReconciler) Start(ctx context.Context) error {
ictx, cancel := context.WithCancel(context.Background())
a.cancel = cancel
if err := a.installWatcher(ictx); err != nil {
return fmt.Errorf("failed to install watcher: %w", err)
}
a.Emit("finished importing OCI bundle")
a.loadAll(ictx)
return nil
}

func (a OCIBundleReconciler) unpackBundle(ctx context.Context, client *containerd.Client, bundlePath string) error {
func (a *OCIBundleReconciler) unpackBundle(ctx context.Context, client *containerd.Client, bundlePath string) error {
r, err := os.Open(bundlePath)
if err != nil {
return fmt.Errorf("can't open bundle file %s: %v", bundlePath, err)
Expand Down Expand Up @@ -127,5 +230,9 @@ func (a OCIBundleReconciler) unpackBundle(ctx context.Context, client *container
}

func (a *OCIBundleReconciler) Stop() error {
a.log.Info("Stopping OCI bundle loader watcher")
a.cancel()
<-a.end
a.log.Info("OCI bundle loader stopped")
return nil
}

0 comments on commit bff9835

Please sign in to comment.