Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: track sessions with a group construct #1551

Merged
merged 4 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cache/remotecache/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
v1 "github.com/moby/buildkit/cache/remotecache/v1"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/progress"
Expand All @@ -19,7 +20,7 @@ import (
"github.com/pkg/errors"
)

type ResolveCacheExporterFunc func(ctx context.Context, attrs map[string]string) (Exporter, error)
type ResolveCacheExporterFunc func(ctx context.Context, g session.Group, attrs map[string]string) (Exporter, error)

func oneOffProgress(ctx context.Context, id string) func(err error) error {
pw, _, _ := progress.FromContext(ctx)
Expand Down
3 changes: 2 additions & 1 deletion cache/remotecache/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
v1 "github.com/moby/buildkit/cache/remotecache/v1"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/worker"
Expand All @@ -21,7 +22,7 @@ import (
)

// ResolveCacheImporterFunc returns importer and descriptor.
type ResolveCacheImporterFunc func(ctx context.Context, attrs map[string]string) (Importer, ocispec.Descriptor, error)
type ResolveCacheImporterFunc func(ctx context.Context, g session.Group, attrs map[string]string) (Importer, ocispec.Descriptor, error)

type Importer interface {
Resolve(ctx context.Context, desc ocispec.Descriptor, id string, w worker.Worker) (solver.CacheManager, error)
Expand Down
3 changes: 2 additions & 1 deletion cache/remotecache/inline/inline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (

"github.com/moby/buildkit/cache/remotecache"
v1 "github.com/moby/buildkit/cache/remotecache/v1"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
digest "github.com/opencontainers/go-digest"
"github.com/sirupsen/logrus"
)

func ResolveCacheExporterFunc() remotecache.ResolveCacheExporterFunc {
return func(ctx context.Context, _ map[string]string) (remotecache.Exporter, error) {
return func(ctx context.Context, _ session.Group, _ map[string]string) (remotecache.Exporter, error) {
return NewExporter(), nil
}
}
Expand Down
13 changes: 7 additions & 6 deletions cache/remotecache/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ const (

// ResolveCacheExporterFunc for "local" cache exporter.
func ResolveCacheExporterFunc(sm *session.Manager) remotecache.ResolveCacheExporterFunc {
return func(ctx context.Context, attrs map[string]string) (remotecache.Exporter, error) {
return func(ctx context.Context, g session.Group, attrs map[string]string) (remotecache.Exporter, error) {
store := attrs[attrDest]
if store == "" {
return nil, errors.New("local cache exporter requires dest")
}
csID := contentStoreIDPrefix + store
cs, err := getContentStore(ctx, sm, csID)
cs, err := getContentStore(ctx, sm, g, csID)
if err != nil {
return nil, err
}
Expand All @@ -38,7 +38,7 @@ func ResolveCacheExporterFunc(sm *session.Manager) remotecache.ResolveCacheExpor

// ResolveCacheImporterFunc for "local" cache importer.
func ResolveCacheImporterFunc(sm *session.Manager) remotecache.ResolveCacheImporterFunc {
return func(ctx context.Context, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) {
return func(ctx context.Context, g session.Group, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) {
dgstStr := attrs[attrDigest]
if dgstStr == "" {
return nil, specs.Descriptor{}, errors.New("local cache importer requires explicit digest")
Expand All @@ -49,7 +49,7 @@ func ResolveCacheImporterFunc(sm *session.Manager) remotecache.ResolveCacheImpor
return nil, specs.Descriptor{}, errors.New("local cache importer requires src")
}
csID := contentStoreIDPrefix + store
cs, err := getContentStore(ctx, sm, csID)
cs, err := getContentStore(ctx, sm, g, csID)
if err != nil {
return nil, specs.Descriptor{}, err
}
Expand All @@ -67,8 +67,9 @@ func ResolveCacheImporterFunc(sm *session.Manager) remotecache.ResolveCacheImpor
}
}

func getContentStore(ctx context.Context, sm *session.Manager, storeID string) (content.Store, error) {
sessionID := session.FromContext(ctx)
func getContentStore(ctx context.Context, sm *session.Manager, g session.Group, storeID string) (content.Store, error) {
// TODO: to ensure correct session is detected, new api for finding if storeID is supported is needed
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean by storeID is supported?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there are multiple sessions daemon could send "detect" request to know which one supports the current storeID. If a specific session does not know about storeID then the next one is tried.

sessionID := g.SessionIterator().NextSession()
if sessionID == "" {
return nil, errors.New("local cache exporter/importer requires session")
}
Expand Down
8 changes: 4 additions & 4 deletions cache/remotecache/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ const (
)

func ResolveCacheExporterFunc(sm *session.Manager, hosts docker.RegistryHosts) remotecache.ResolveCacheExporterFunc {
return func(ctx context.Context, attrs map[string]string) (remotecache.Exporter, error) {
return func(ctx context.Context, g session.Group, attrs map[string]string) (remotecache.Exporter, error) {
ref, err := canonicalizeRef(attrs[attrRef])
if err != nil {
return nil, err
}
remote := resolver.New(ctx, hosts, sm)
remote := resolver.New(hosts, resolver.NewSessionAuthenticator(sm, g))
pusher, err := remote.Pusher(ctx, ref)
if err != nil {
return nil, err
Expand All @@ -47,12 +47,12 @@ func ResolveCacheExporterFunc(sm *session.Manager, hosts docker.RegistryHosts) r
}

func ResolveCacheImporterFunc(sm *session.Manager, cs content.Store, hosts docker.RegistryHosts) remotecache.ResolveCacheImporterFunc {
return func(ctx context.Context, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) {
return func(ctx context.Context, g session.Group, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) {
ref, err := canonicalizeRef(attrs[attrRef])
if err != nil {
return nil, specs.Descriptor{}, err
}
remote := resolver.New(ctx, hosts, sm)
remote := resolver.New(hosts, resolver.NewSessionAuthenticator(sm, g))
xref, desc, err := remote.Resolve(ctx, ref)
if err != nil {
return nil, specs.Descriptor{}, err
Expand Down
5 changes: 2 additions & 3 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
if err := translateLegacySolveRequest(req); err != nil {
return nil, err
}
ctx = session.NewContext(ctx, req.Session)

defer func() {
time.AfterFunc(time.Second, c.throttledGC)
Expand Down Expand Up @@ -260,7 +259,7 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
if !ok {
return nil, errors.Errorf("unknown cache exporter: %q", e.Type)
}
cacheExporter, err = cacheExporterFunc(ctx, e.Attrs)
cacheExporter, err = cacheExporterFunc(ctx, session.NewGroup(req.Session), e.Attrs)
if err != nil {
return nil, err
}
Expand All @@ -273,7 +272,7 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
})
}

resp, err := c.solver.Solve(ctx, req.Ref, frontend.SolveRequest{
resp, err := c.solver.Solve(ctx, req.Ref, req.Session, frontend.SolveRequest{
Frontend: req.Frontend,
Definition: req.Definition,
FrontendOpt: req.FrontendAttrs,
Expand Down
4 changes: 2 additions & 2 deletions exporter/containerimage/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (e *imageExporterInstance) Name() string {
return "exporting to image"
}

func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source) (map[string]string, error) {
func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, sessionID string) (map[string]string, error) {
if src.Metadata == nil {
src.Metadata = make(map[string][]byte)
}
Expand Down Expand Up @@ -237,7 +237,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source)
}
}
if e.push {
if err := push.Push(ctx, e.opt.SessionManager, e.opt.ImageWriter.ContentStore(), desc.Digest, targetName, e.insecure, e.opt.RegistryHosts, e.pushByDigest); err != nil {
if err := push.Push(ctx, e.opt.SessionManager, sessionID, e.opt.ImageWriter.ContentStore(), desc.Digest, targetName, e.insecure, e.opt.RegistryHosts, e.pushByDigest); err != nil {
return nil, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Exporter interface {

type ExporterInstance interface {
Name() string
Export(context.Context, Source) (map[string]string, error)
Export(ctx context.Context, src Source, sessionID string) (map[string]string, error)
}

type Source struct {
Expand Down
31 changes: 12 additions & 19 deletions exporter/local/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/progress"
"github.com/pkg/errors"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
"golang.org/x/sync/errgroup"
Expand All @@ -36,33 +35,27 @@ func New(opt Opt) (exporter.Exporter, error) {
}

func (e *localExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
id := session.FromContext(ctx)
if id == "" {
return nil, errors.New("could not access local files without session")
}

timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

caller, err := e.opt.SessionManager.Get(timeoutCtx, id)
if err != nil {
return nil, err
}

li := &localExporterInstance{localExporter: e, caller: caller}
return li, nil
return &localExporterInstance{localExporter: e}, nil
}

type localExporterInstance struct {
*localExporter
caller session.Caller
}

func (e *localExporterInstance) Name() string {
return "exporting to client"
}

func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source) (map[string]string, error) {
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, sessionID string) (map[string]string, error) {

timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID)
if err != nil {
return nil, err
}

isMap := len(inp.Refs) > 0

export := func(ctx context.Context, k string, ref cache.ImmutableRef) func() error {
Expand Down Expand Up @@ -125,7 +118,7 @@ func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source)
}

progress := newProgressHandler(ctx, lbl)
if err := filesync.CopyToCaller(ctx, fs, e.caller, progress); err != nil {
if err := filesync.CopyToCaller(ctx, fs, caller, progress); err != nil {
return err
}
return nil
Expand Down
27 changes: 10 additions & 17 deletions exporter/oci/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,9 @@ func New(opt Opt) (exporter.Exporter, error) {
}

func (e *imageExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
id := session.FromContext(ctx)
if id == "" {
return nil, errors.New("could not access local files without session")
}

timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

caller, err := e.opt.SessionManager.Get(timeoutCtx, id)
if err != nil {
return nil, err
}

var ot *bool
i := &imageExporterInstance{
imageExporter: e,
caller: caller,
layerCompression: blobs.DefaultCompression,
}
for k, v := range opt {
Expand Down Expand Up @@ -110,7 +96,6 @@ func (e *imageExporter) Resolve(ctx context.Context, opt map[string]string) (exp
type imageExporterInstance struct {
*imageExporter
meta map[string][]byte
caller session.Caller
name string
ociTypes bool
layerCompression blobs.CompressionType
Expand All @@ -120,7 +105,7 @@ func (e *imageExporterInstance) Name() string {
return "exporting to oci image format"
}

func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source) (map[string]string, error) {
func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, sessionID string) (map[string]string, error) {
if e.opt.Variant == VariantDocker && len(src.Refs) > 0 {
return nil, errors.Errorf("docker exporter does not currently support exporting manifest lists")
}
Expand Down Expand Up @@ -175,7 +160,15 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source)
return nil, errors.Errorf("invalid variant %q", e.opt.Variant)
}

w, err := filesync.CopyFileWriter(ctx, resp, e.caller)
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID)
if err != nil {
return nil, err
}

w, err := filesync.CopyFileWriter(ctx, resp, caller)
if err != nil {
return nil, err
}
Expand Down
29 changes: 11 additions & 18 deletions exporter/tar/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/progress"
"github.com/pkg/errors"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
)
Expand All @@ -34,33 +33,19 @@ func New(opt Opt) (exporter.Exporter, error) {
}

func (e *localExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
id := session.FromContext(ctx)
if id == "" {
return nil, errors.New("could not access local files without session")
}

timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

caller, err := e.opt.SessionManager.Get(timeoutCtx, id)
if err != nil {
return nil, err
}

li := &localExporterInstance{localExporter: e, caller: caller}
li := &localExporterInstance{localExporter: e}
return li, nil
}

type localExporterInstance struct {
*localExporter
caller session.Caller
}

func (e *localExporterInstance) Name() string {
return "exporting to client"
}

func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source) (map[string]string, error) {
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, sessionID string) (map[string]string, error) {
var defers []func()

defer func() {
Expand Down Expand Up @@ -147,7 +132,15 @@ func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source)
fs = d.FS
}

w, err := filesync.CopyFileWriter(ctx, nil, e.caller)
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID)
if err != nil {
return nil, err
}

w, err := filesync.CopyFileWriter(ctx, nil, caller)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
)

type Frontend interface {
Solve(ctx context.Context, llb FrontendLLBBridge, opt map[string]string, inputs map[string]*pb.Definition) (*Result, error)
Solve(ctx context.Context, llb FrontendLLBBridge, opt map[string]string, inputs map[string]*pb.Definition, sid string) (*Result, error)
}

type FrontendLLBBridge interface {
Solve(ctx context.Context, req SolveRequest) (*Result, error)
Solve(ctx context.Context, req SolveRequest, sid string) (*Result, error)
ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt) (digest.Digest, []byte, error)
Exec(ctx context.Context, meta executor.Meta, rootfs cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error
}
Expand Down
Loading