Skip to content

Commit

Permalink
Clean up CNI NS pool when main app context is cancelled
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Lehmann <[email protected]>
  • Loading branch information
aaronlehmann committed Sep 12, 2022
1 parent 7586cfe commit 702d105
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 30 deletions.
12 changes: 6 additions & 6 deletions cmd/buildkitd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type workerInitializerOpt struct {
}

type workerInitializer struct {
fn func(c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error)
fn func(ctx context.Context, c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error)
// less priority number, more preferred
priority int
}
Expand Down Expand Up @@ -261,7 +261,7 @@ func main() {
os.RemoveAll(lockPath)
}()

controller, err := newController(c, &cfg)
controller, err := newController(ctx, c, &cfg)
if err != nil {
return err
}
Expand Down Expand Up @@ -612,7 +612,7 @@ func serverCredentials(cfg config.TLSConfig) (*tls.Config, error) {
return tlsConf, nil
}

func newController(c *cli.Context, cfg *config.Config) (*control.Controller, error) {
func newController(ctx context.Context, c *cli.Context, cfg *config.Config) (*control.Controller, error) {
sessionManager, err := session.NewManager()
if err != nil {
return nil, err
Expand All @@ -631,7 +631,7 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err
}
}

wc, err := newWorkerController(c, workerInitializerOpt{
wc, err := newWorkerController(ctx, c, workerInitializerOpt{
config: cfg,
sessionManager: sessionManager,
traceSocket: traceSocket,
Expand Down Expand Up @@ -686,11 +686,11 @@ func resolverFunc(cfg *config.Config) docker.RegistryHosts {
return resolver.NewRegistryConfig(cfg.Registries)
}

func newWorkerController(c *cli.Context, wiOpt workerInitializerOpt) (*worker.Controller, error) {
func newWorkerController(ctx context.Context, c *cli.Context, wiOpt workerInitializerOpt) (*worker.Controller, error) {
wc := &worker.Controller{}
nWorkers := 0
for _, wi := range workerInitializers {
ws, err := wi.fn(c, wiOpt)
ws, err := wi.fn(ctx, c, wiOpt)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/buildkitd/main_containerd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func applyContainerdFlags(c *cli.Context, cfg *config.Config) error {
return nil
}

func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error) {
func containerdWorkerInitializer(ctx context.Context, c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error) {
if err := applyContainerdFlags(c, common.config); err != nil {
return nil, err
}
Expand Down Expand Up @@ -268,7 +268,7 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
if cfg.Snapshotter != "" {
snapshotter = cfg.Snapshotter
}
opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Rootless, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, parallelismSem, common.traceSocket, ctd.WithTimeout(60*time.Second))
opt, err := containerd.NewWorkerOpt(ctx, common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Rootless, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, parallelismSem, common.traceSocket, ctd.WithTimeout(60*time.Second))
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/buildkitd/main_oci_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func applyOCIFlags(c *cli.Context, cfg *config.Config) error {
return nil
}

func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error) {
func ociWorkerInitializer(ctx context.Context, c *cli.Context, common workerInitializerOpt) ([]worker.Worker, error) {
if err := applyOCIFlags(c, common.config); err != nil {
return nil, err
}
Expand Down Expand Up @@ -299,7 +299,7 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
parallelismSem = semaphore.NewWeighted(int64(cfg.MaxParallelism))
}

opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, parallelismSem, common.traceSocket, cfg.DefaultCgroupParent)
opt, err := runc.NewWorkerOpt(ctx, common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, parallelismSem, common.traceSocket, cfg.DefaultCgroupParent)
if err != nil {
return nil, err
}
Expand Down
63 changes: 52 additions & 11 deletions util/network/cniprovider/cni.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Opt struct {
PoolSize int
}

func New(opt Opt) (network.Provider, error) {
func New(ctx context.Context, opt Opt) (network.Provider, error) {
if _, err := os.Stat(opt.ConfigPath); err != nil {
return nil, errors.Wrapf(err, "failed to read cni config %q", opt.ConfigPath)
}
Expand All @@ -54,14 +54,17 @@ func New(opt Opt) (network.Provider, error) {
return nil, err
}

cp := &cniProvider{CNI: cniHandle, root: opt.Root}
cp := &cniProvider{
CNI: cniHandle,
root: opt.Root,
}
cleanOldNamespaces(cp)

cp.nsPool = &cniPool{targetSize: opt.PoolSize, provider: cp}
cp.nsPool = newPool(ctx, opt.PoolSize, cp)
if err := cp.initNetwork(); err != nil {
return nil, err
}
go cp.nsPool.fillPool(context.TODO())
go cp.nsPool.fillPool()
return cp, nil
}

Expand All @@ -87,6 +90,7 @@ func (c *cniProvider) initNetwork() error {
}

type cniPool struct {
ctx context.Context
provider *cniProvider
mu sync.Mutex
targetSize int
Expand All @@ -95,17 +99,44 @@ type cniPool struct {
available []*cniNS
}

func (pool *cniPool) fillPool(ctx context.Context) {
func newPool(ctx context.Context, targetSize int, provider *cniProvider) *cniPool {
pool := &cniPool{
ctx: ctx,
targetSize: targetSize,
provider: provider,
}

go func() {
<-ctx.Done()
pool.mu.Lock()
defer pool.mu.Unlock()
for len(pool.available) > 0 {
_ = pool.available[0].release()
pool.available = pool.available[1:]
pool.actualSize--
}
}()

return pool
}

func (pool *cniPool) fillPool() {
for {
select {
case <-pool.ctx.Done():
return
default:
}

pool.mu.Lock()
actualSize := pool.actualSize
pool.mu.Unlock()
if actualSize >= pool.targetSize {
return
}
ns, err := pool.getNew(ctx)
ns, err := pool.getNew(pool.ctx)
if err != nil {
bklog.G(ctx).Errorf("failed to create new network namespace while prefilling pool: %s", err)
bklog.G(pool.ctx).Errorf("failed to create new network namespace while prefilling pool: %s", err)
return
}
pool.put(ns)
Expand Down Expand Up @@ -135,8 +166,13 @@ func (pool *cniPool) getNew(ctx context.Context) (*cniNS, error) {
ns.pool = pool

pool.mu.Lock()
defer pool.mu.Unlock()
select {
case <-pool.ctx.Done():
return nil, errors.New("cni pool is closed")
default:
}
pool.actualSize++
pool.mu.Unlock()
return ns, nil
}

Expand All @@ -145,22 +181,27 @@ func (pool *cniPool) put(ns *cniNS) {
ns.lastUsed = putTime

pool.mu.Lock()
select {
case <-pool.ctx.Done():
_ = ns.release()
default:
}
pool.available = append(pool.available, ns)
actualSize := pool.actualSize
pool.mu.Unlock()

if actualSize > pool.targetSize {
// We have more network namespaces than our target number, so
// schedule a shrinking pass.
time.AfterFunc(aboveTargetGracePeriod, pool.cleanupToDefaultSize)
time.AfterFunc(aboveTargetGracePeriod, pool.cleanupToTargetSize)
}
}

func (pool *cniPool) cleanupToDefaultSize() {
func (pool *cniPool) cleanupToTargetSize() {
var toRelease []*cniNS
defer func() {
for _, poolNS := range toRelease {
poolNS.release()
_ = poolNS.release()
}
}()

Expand Down
7 changes: 4 additions & 3 deletions util/network/netproviders/network.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package netproviders

import (
"context"
"os"

"github.com/moby/buildkit/solver/pb"
Expand All @@ -16,11 +17,11 @@ type Opt struct {

// Providers returns the network provider set.
// When opt.Mode is "auto" or "", resolvedMode is set to either "cni" or "host".
func Providers(opt Opt) (providers map[pb.NetMode]network.Provider, resolvedMode string, err error) {
func Providers(ctx context.Context, opt Opt) (providers map[pb.NetMode]network.Provider, resolvedMode string, err error) {
var defaultProvider network.Provider
switch opt.Mode {
case "cni":
cniProvider, err := cniprovider.New(opt.CNI)
cniProvider, err := cniprovider.New(ctx, opt.CNI)
if err != nil {
return nil, resolvedMode, err
}
Expand All @@ -35,7 +36,7 @@ func Providers(opt Opt) (providers map[pb.NetMode]network.Provider, resolvedMode
resolvedMode = opt.Mode
case "auto", "":
if _, err := os.Stat(opt.CNI.ConfigPath); err == nil {
cniProvider, err := cniprovider.New(opt.CNI)
cniProvider, err := cniprovider.New(ctx, opt.CNI)
if err != nil {
return nil, resolvedMode, err
}
Expand Down
8 changes: 4 additions & 4 deletions worker/containerd/containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ import (
)

// NewWorkerOpt creates a WorkerOpt.
func NewWorkerOpt(root string, address, snapshotterName, ns string, rootless bool, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted, traceSocket string, opts ...containerd.ClientOpt) (base.WorkerOpt, error) {
func NewWorkerOpt(ctx context.Context, root string, address, snapshotterName, ns string, rootless bool, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted, traceSocket string, opts ...containerd.ClientOpt) (base.WorkerOpt, error) {
opts = append(opts, containerd.WithDefaultNamespace(ns))
client, err := containerd.New(address, opts...)
if err != nil {
return base.WorkerOpt{}, errors.Wrapf(err, "failed to connect client to %q . make sure containerd is running", address)
}
return newContainerd(root, client, snapshotterName, ns, rootless, labels, dns, nopt, apparmorProfile, parallelismSem, traceSocket)
return newContainerd(ctx, root, client, snapshotterName, ns, rootless, labels, dns, nopt, apparmorProfile, parallelismSem, traceSocket)
}

func newContainerd(root string, client *containerd.Client, snapshotterName, ns string, rootless bool, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted, traceSocket string) (base.WorkerOpt, error) {
func newContainerd(ctx context.Context, root string, client *containerd.Client, snapshotterName, ns string, rootless bool, labels map[string]string, dns *oci.DNSConfig, nopt netproviders.Opt, apparmorProfile string, parallelismSem *semaphore.Weighted, traceSocket string) (base.WorkerOpt, error) {
if strings.Contains(snapshotterName, "/") {
return base.WorkerOpt{}, errors.Errorf("bad snapshotter name: %q", snapshotterName)
}
Expand All @@ -57,7 +57,7 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s
return base.WorkerOpt{}, err
}

np, npResolvedMode, err := netproviders.Providers(nopt)
np, npResolvedMode, err := netproviders.Providers(ctx, nopt)
if err != nil {
return base.WorkerOpt{}, err
}
Expand Down
4 changes: 2 additions & 2 deletions worker/runc/runc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ type SnapshotterFactory struct {
}

// NewWorkerOpt creates a WorkerOpt.
func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, processMode oci.ProcessMode, labels map[string]string, idmap *idtools.IdentityMapping, nopt netproviders.Opt, dns *oci.DNSConfig, binary, apparmorProfile string, parallelismSem *semaphore.Weighted, traceSocket, defaultCgroupParent string) (base.WorkerOpt, error) {
func NewWorkerOpt(ctx context.Context, root string, snFactory SnapshotterFactory, rootless bool, processMode oci.ProcessMode, labels map[string]string, idmap *idtools.IdentityMapping, nopt netproviders.Opt, dns *oci.DNSConfig, binary, apparmorProfile string, parallelismSem *semaphore.Weighted, traceSocket, defaultCgroupParent string) (base.WorkerOpt, error) {
var opt base.WorkerOpt
name := "runc-" + snFactory.Name
root = filepath.Join(root, name)
if err := os.MkdirAll(root, 0700); err != nil {
return opt, err
}

np, npResolvedMode, err := netproviders.Providers(nopt)
np, npResolvedMode, err := netproviders.Providers(ctx, nopt)
if err != nil {
return opt, err
}
Expand Down

0 comments on commit 702d105

Please sign in to comment.