Skip to content

Commit

Permalink
Merge pull request #3107 from aaronlehmann/net-ns-pool
Browse files Browse the repository at this point in the history
Add network namespace pool support
  • Loading branch information
tonistiigi authored Sep 15, 2022
2 parents f567525 + f6b002e commit 81b6ff2
Show file tree
Hide file tree
Showing 20 changed files with 327 additions and 57 deletions.
1 change: 1 addition & 0 deletions cmd/buildkitd/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type NetworkConfig struct {
Mode string `toml:"networkMode"`
CNIConfigPath string `toml:"cniConfigPath"`
CNIBinaryPath string `toml:"cniBinaryPath"`
CNIPoolSize int `toml:"cniPoolSize"`
}

type OCIConfig struct {
Expand Down
1 change: 1 addition & 0 deletions cmd/buildkitd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func main() {
if err != nil {
return err
}
defer controller.Close()

controller.Register(server)

Expand Down
9 changes: 9 additions & 0 deletions cmd/buildkitd/main_containerd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func init() {
Usage: "path of cni binary files",
Value: defaultConf.Workers.Containerd.NetworkConfig.CNIBinaryPath,
},
cli.IntFlag{
Name: "containerd-cni-pool-size",
Usage: "size of cni network namespace pool",
Value: defaultConf.Workers.Containerd.NetworkConfig.CNIPoolSize,
},
cli.StringFlag{
Name: "containerd-worker-snapshotter",
Usage: "snapshotter name to use",
Expand Down Expand Up @@ -208,6 +213,9 @@ func applyContainerdFlags(c *cli.Context, cfg *config.Config) error {
if c.GlobalIsSet("containerd-cni-config-path") {
cfg.Workers.Containerd.NetworkConfig.CNIConfigPath = c.GlobalString("containerd-cni-config-path")
}
if c.GlobalIsSet("containerd-cni-pool-size") {
cfg.Workers.Containerd.NetworkConfig.CNIPoolSize = c.GlobalInt("containerd-cni-pool-size")
}
if c.GlobalIsSet("containerd-cni-binary-dir") {
cfg.Workers.Containerd.NetworkConfig.CNIBinaryPath = c.GlobalString("containerd-cni-binary-dir")
}
Expand Down Expand Up @@ -247,6 +255,7 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
Root: common.config.Root,
ConfigPath: common.config.Workers.Containerd.CNIConfigPath,
BinaryDir: common.config.Workers.Containerd.CNIBinaryPath,
PoolSize: common.config.Workers.Containerd.CNIPoolSize,
},
}

Expand Down
9 changes: 9 additions & 0 deletions cmd/buildkitd/main_oci_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ func init() {
Usage: "path of cni binary files",
Value: defaultConf.Workers.OCI.NetworkConfig.CNIBinaryPath,
},
cli.IntFlag{
Name: "oci-cni-pool-size",
Usage: "size of cni network namespace pool",
Value: defaultConf.Workers.OCI.NetworkConfig.CNIPoolSize,
},
cli.StringFlag{
Name: "oci-worker-binary",
Usage: "name of specified oci worker binary",
Expand Down Expand Up @@ -223,6 +228,9 @@ func applyOCIFlags(c *cli.Context, cfg *config.Config) error {
if c.GlobalIsSet("oci-cni-binary-dir") {
cfg.Workers.OCI.NetworkConfig.CNIBinaryPath = c.GlobalString("oci-cni-binary-dir")
}
if c.GlobalIsSet("oci-cni-pool-size") {
cfg.Workers.OCI.NetworkConfig.CNIPoolSize = c.GlobalInt("oci-cni-pool-size")
}
if c.GlobalIsSet("oci-worker-binary") {
cfg.Workers.OCI.Binary = c.GlobalString("oci-worker-binary")
}
Expand Down Expand Up @@ -282,6 +290,7 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
Root: common.config.Root,
ConfigPath: common.config.Workers.OCI.CNIConfigPath,
BinaryDir: common.config.Workers.OCI.CNIBinaryPath,
PoolSize: common.config.Workers.OCI.CNIPoolSize,
},
}

Expand Down
4 changes: 4 additions & 0 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func NewController(opt Opt) (*Controller, error) {
return c, nil
}

func (c *Controller) Close() error {
return c.opt.WorkerController.Close()
}

func (c *Controller) Register(server *grpc.Server) {
controlapi.RegisterControlServer(server, c)
c.gatewayForwarder.Register(server)
Expand Down
7 changes: 7 additions & 0 deletions docs/buildkitd.toml.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ insecure-entitlements = [ "network.host", "security.insecure" ]
apparmor-profile = ""
# limit the number of parallel build steps that can run at the same time
max-parallelism = 4
# maintain a pool of reusable CNI network namespaces to amortize the overhead
# of allocating and releasing the namespaces
cniPoolSize = 16
[worker.oci.labels]
"foo" = "bar"
Expand All @@ -77,6 +80,10 @@ insecure-entitlements = [ "network.host", "security.insecure" ]
gc = true
# gckeepstorage sets storage limit for default gc profile, in MB.
gckeepstorage = 9000
# maintain a pool of reusable CNI network namespaces to amortize the overhead
# of allocating and releasing the namespaces
cniPoolSize = 16
[worker.containerd.labels]
"foo" = "bar"
Expand Down
2 changes: 1 addition & 1 deletion executor/containerdexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M
if !ok {
return errors.Errorf("unknown network mode %s", meta.NetMode)
}
namespace, err := provider.New(meta.Hostname)
namespace, err := provider.New(ctx, meta.Hostname)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/runcexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
if !ok {
return errors.Errorf("unknown network mode %s", meta.NetMode)
}
namespace, err := provider.New(meta.Hostname)
namespace, err := provider.New(ctx, meta.Hostname)
if err != nil {
return err
}
Expand Down
177 changes: 172 additions & 5 deletions util/network/cniprovider/cni.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,26 @@ import (
"os"
"runtime"
"strings"
"sync"
"time"

cni "github.com/containerd/go-cni"
"github.com/gofrs/flock"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/network"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
)

const aboveTargetGracePeriod = 5 * time.Minute

type Opt struct {
Root string
ConfigPath string
BinaryDir string
PoolSize int
}

func New(opt Opt) (network.Provider, error) {
Expand Down Expand Up @@ -47,16 +54,24 @@ func New(opt Opt) (network.Provider, error) {
return nil, err
}

cp := &cniProvider{CNI: cniHandle, root: opt.Root}
cp := &cniProvider{
CNI: cniHandle,
root: opt.Root,
}
cleanOldNamespaces(cp)

cp.nsPool = &cniPool{targetSize: opt.PoolSize, provider: cp}
if err := cp.initNetwork(); err != nil {
return nil, err
}
go cp.nsPool.fillPool(context.TODO())
return cp, nil
}

type cniProvider struct {
cni.CNI
root string
root string
nsPool *cniPool
}

func (c *cniProvider) initNetwork() error {
Expand All @@ -67,19 +82,153 @@ func (c *cniProvider) initNetwork() error {
}
defer l.Unlock()
}
ns, err := c.New("")
ns, err := c.New(context.TODO(), "")
if err != nil {
return err
}
return ns.Close()
}

func (c *cniProvider) New(hostname string) (network.Namespace, error) {
func (c *cniProvider) Close() error {
c.nsPool.close()
return nil
}

type cniPool struct {
provider *cniProvider
mu sync.Mutex
targetSize int
actualSize int
// LIFO: Ordered least recently used to most recently used
available []*cniNS
closed bool
}

func (pool *cniPool) close() {
bklog.L.Debugf("cleaning up cni pool")

pool.mu.Lock()
pool.closed = true
defer pool.mu.Unlock()
for len(pool.available) > 0 {
_ = pool.available[0].release()
pool.available = pool.available[1:]
pool.actualSize--
}
}

func (pool *cniPool) fillPool(ctx context.Context) {
for {
pool.mu.Lock()
if pool.closed {
pool.mu.Unlock()
return
}
actualSize := pool.actualSize
pool.mu.Unlock()
if actualSize >= pool.targetSize {
return
}
ns, err := pool.getNew(ctx)
if err != nil {
bklog.G(ctx).Errorf("failed to create new network namespace while prefilling pool: %s", err)
return
}
pool.put(ns)
}
}

func (pool *cniPool) get(ctx context.Context) (*cniNS, error) {
pool.mu.Lock()
if len(pool.available) > 0 {
ns := pool.available[len(pool.available)-1]
pool.available = pool.available[:len(pool.available)-1]
pool.mu.Unlock()
trace.SpanFromContext(ctx).AddEvent("returning network namespace from pool")
bklog.G(ctx).Debugf("returning network namespace %s from pool", ns.id)
return ns, nil
}
pool.mu.Unlock()

return pool.getNew(ctx)
}

func (pool *cniPool) getNew(ctx context.Context) (*cniNS, error) {
ns, err := pool.provider.newNS(ctx, "")
if err != nil {
return nil, err
}
ns.pool = pool

pool.mu.Lock()
defer pool.mu.Unlock()
if pool.closed {
return nil, errors.New("cni pool is closed")
}
pool.actualSize++
return ns, nil
}

func (pool *cniPool) put(ns *cniNS) {
putTime := time.Now()
ns.lastUsed = putTime

pool.mu.Lock()
defer pool.mu.Unlock()
if pool.closed {
_ = ns.release()
return
}
pool.available = append(pool.available, ns)
actualSize := pool.actualSize

if actualSize > pool.targetSize {
// We have more network namespaces than our target number, so
// schedule a shrinking pass.
time.AfterFunc(aboveTargetGracePeriod, pool.cleanupToTargetSize)
}
}

func (pool *cniPool) cleanupToTargetSize() {
var toRelease []*cniNS
defer func() {
for _, poolNS := range toRelease {
_ = poolNS.release()
}
}()

pool.mu.Lock()
defer pool.mu.Unlock()
for pool.actualSize > pool.targetSize &&
len(pool.available) > 0 &&
time.Since(pool.available[0].lastUsed) >= aboveTargetGracePeriod {
bklog.L.Debugf("releasing network namespace %s since it was last used at %s", pool.available[0].id, pool.available[0].lastUsed)
toRelease = append(toRelease, pool.available[0])
pool.available = pool.available[1:]
pool.actualSize--
}
}

func (c *cniProvider) New(ctx context.Context, hostname string) (network.Namespace, error) {
// We can't use the pool for namespaces that need a custom hostname.
// We also avoid using it on windows because we don't have a cleanup
// mechanism for Windows yet.
if hostname == "" || runtime.GOOS == "windows" {
return c.nsPool.get(ctx)
}
return c.newNS(ctx, hostname)
}

func (c *cniProvider) newNS(ctx context.Context, hostname string) (*cniNS, error) {
id := identity.NewID()
trace.SpanFromContext(ctx).AddEvent("creating new network namespace")
bklog.G(ctx).Debugf("creating new network namespace %s", id)
nativeID, err := createNetNS(c, id)
if err != nil {
return nil, err
}
trace.SpanFromContext(ctx).AddEvent("finished creating network namespace")
bklog.G(ctx).Debugf("finished creating network namespace %s", id)

nsOpts := []cni.NamespaceOpts{}

Expand All @@ -97,22 +246,40 @@ func (c *cniProvider) New(hostname string) (network.Namespace, error) {
deleteNetNS(nativeID)
return nil, errors.Wrap(err, "CNI setup error")
}
trace.SpanFromContext(ctx).AddEvent("finished setting up network namespace")
bklog.G(ctx).Debugf("finished setting up network namespace %s", id)

return &cniNS{nativeID: nativeID, id: id, handle: c.CNI, opts: nsOpts}, nil
return &cniNS{
nativeID: nativeID,
id: id,
handle: c.CNI,
opts: nsOpts,
}, nil
}

type cniNS struct {
pool *cniPool
handle cni.CNI
id string
nativeID string
opts []cni.NamespaceOpts
lastUsed time.Time
}

func (ns *cniNS) Set(s *specs.Spec) error {
return setNetNS(s, ns.nativeID)
}

func (ns *cniNS) Close() error {
if ns.pool == nil {
return ns.release()
}
ns.pool.put(ns)
return nil
}

func (ns *cniNS) release() error {
bklog.L.Debugf("releasing cni network namespace %s", ns.id)
err := ns.handle.Remove(context.TODO(), ns.id, ns.nativeID, ns.opts...)
if err1 := unmountNetNS(ns.nativeID); err1 != nil && err == nil {
err = err1
Expand Down
Loading

0 comments on commit 81b6ff2

Please sign in to comment.