Skip to content

Commit

Permalink
1 goroutine per glob pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
DrJosh9000 committed Oct 30, 2023
1 parent f858cd9 commit d191162
Showing 1 changed file with 101 additions and 128 deletions.
229 changes: 101 additions & 128 deletions agent/artifact_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ func (a *ArtifactUploader) Collect(ctx context.Context) ([]*api.Artifact, error)
return nil, fmt.Errorf("getting working directory: %w", err)
}

workCh := make(chan string)
var wg sync.WaitGroup

ac := &artifactCollector{
Expand All @@ -125,14 +124,18 @@ func (a *ArtifactUploader) Collect(ctx context.Context) ([]*api.Artifact, error)
seenPaths: make(map[string]bool),
}

// Start N workers in a new context
wctx, cancel := context.WithCancelCause(ctx)
for i := 0; i < runtime.NumCPU(); i++ {
for _, globPath := range strings.Split(a.conf.Paths, ArtifactPathDelimiter) {
globPath := strings.TrimSpace(globPath)
if globPath == "" {
continue
}

wg.Add(1)
go func() {
defer wg.Done()

if err := ac.worker(wctx, workCh); err != nil {
if err := ac.worker(wctx, globPath); err != nil {
cancel(err)
}
}()
Expand All @@ -141,21 +144,6 @@ func (a *ArtifactUploader) Collect(ctx context.Context) ([]*api.Artifact, error)
return nil, err
}

// Push work into the workers
for _, globPath := range strings.Split(a.conf.Paths, ArtifactPathDelimiter) {
globPath = strings.TrimSpace(globPath)
if globPath == "" {
continue
}
select {
case <-ctx.Done():
return nil, ctx.Err()

case workCh <- globPath:
}
}
close(workCh)

// Wait for workers to complete
wg.Wait()

Expand All @@ -172,136 +160,121 @@ type artifactCollector struct {
artifacts []*api.Artifact
}

func (c *artifactCollector) worker(ctx context.Context, workCh <-chan string) error {
for {
// Either read work from workCh until it is closed, or bail if the
// context is finished.
var globPath string
select {
case <-ctx.Done():
return ctx.Err()
func (c *artifactCollector) worker(ctx context.Context, globPath string) error {
c.logger.Debug("Searching for %s", globPath)

case gp, open := <-workCh:
if !open {
return nil
}
globPath = gp
// Resolve the globs (with * and ** in them)
var files []string
if experiments.IsEnabled(ctx, experiments.UseZZGlob) {
// New zzglob library.
pattern, err := zzglob.Parse(globPath)
if err != nil {
return fmt.Errorf("invalid glob pattern: %w", err)
}

c.logger.Debug("Searching for %s", globPath)

// Resolve the globs (with * and ** in them)
var files []string
if experiments.IsEnabled(ctx, experiments.UseZZGlob) {
// New zzglob library.
pattern, err := zzglob.Parse(globPath)
walkDirFunc := func(path string, d fs.DirEntry, err error) error {
if err != nil {
return fmt.Errorf("invalid glob pattern: %w", err)
}

walkDirFunc := func(path string, d fs.DirEntry, err error) error {
if err != nil {
c.logger.Warn("Couldn't walk path %s", path)
return nil
}
if d != nil && d.IsDir() {
c.logger.Warn("Glob pattern %s matched a directory %s", globPath, path)
return nil
}
files = append(files, path)
c.logger.Warn("Couldn't walk path %s", path)
return nil
}
err = pattern.Glob(walkDirFunc, zzglob.TraverseSymlinks(c.conf.GlobResolveFollowSymlinks))
if err != nil {
return fmt.Errorf("globbing pattern: %w", err)
}
} else {
// Old go-zglob library.
globfunc := zglob.Glob
if c.conf.GlobResolveFollowSymlinks {
// Follow symbolic links for files & directories while expanding globs
globfunc = zglob.GlobFollowSymlinks
}
fs, err := globfunc(globPath)
if errors.Is(err, os.ErrNotExist) {
c.logger.Info("File not found: %s", globPath)
continue
}
if err != nil {
return fmt.Errorf("resolving glob: %w", err)
if d != nil && d.IsDir() {
c.logger.Warn("Glob pattern %s matched a directory %s", globPath, path)
return nil
}
files = fs
files = append(files, path)
return nil
}
err = pattern.Glob(walkDirFunc, zzglob.TraverseSymlinks(c.conf.GlobResolveFollowSymlinks))
if err != nil {
return fmt.Errorf("globbing pattern: %w", err)
}
} else {
// Old go-zglob library.
globfunc := zglob.Glob
if c.conf.GlobResolveFollowSymlinks {
// Follow symbolic links for files & directories while expanding globs
globfunc = zglob.GlobFollowSymlinks
}
fs, err := globfunc(globPath)
if errors.Is(err, os.ErrNotExist) {
c.logger.Info("File not found: %s", globPath)
return nil
}
if err != nil {
return fmt.Errorf("resolving glob: %w", err)
}
files = fs
}

// Process each glob match into an api.Artifact
for _, file := range files {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

absolutePath, err := filepath.Abs(file)
if err != nil {
return fmt.Errorf("resolving absolute path for file %s: %w", file, err)
}
// Process each glob match into an api.Artifact
for _, file := range files {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// dedupe based on resolved absolutePath
c.mu.Lock()
seen := c.seenPaths[absolutePath]
c.seenPaths[absolutePath] = true
c.mu.Unlock()
absolutePath, err := filepath.Abs(file)
if err != nil {
return fmt.Errorf("resolving absolute path for file %s: %w", file, err)
}

if seen {
c.logger.Debug("Skipping duplicate path %s", file)
continue
}
// dedupe based on resolved absolutePath
c.mu.Lock()
seen := c.seenPaths[absolutePath]
c.seenPaths[absolutePath] = true
c.mu.Unlock()

// Ignore directories, we only want files
if isDir(absolutePath) {
c.logger.Debug("Skipping directory %s", file)
continue
}
if seen {
c.logger.Debug("Skipping duplicate path %s", file)
continue
}

if c.conf.UploadSkipSymlinks && isSymlink(absolutePath) {
c.logger.Debug("Skipping symlink %s", file)
continue
}
// Ignore directories, we only want files
if isDir(absolutePath) {
c.logger.Debug("Skipping directory %s", file)
continue
}

// If a glob is absolute, we need to make it relative to the root so that
// it can be combined with the download destination to make a valid path.
// This is possibly weird and crazy, this logic dates back to
// https://github.com/buildkite/agent/commit/8ae46d975aa60d1ae0e2cc0bff7a43d3bf960935
// from 2014, so I'm replicating it here to avoid breaking things
basepath := c.wd
if filepath.IsAbs(globPath) {
basepath = "/"
if runtime.GOOS == "windows" {
basepath = filepath.VolumeName(absolutePath) + "/"
}
}
if c.conf.UploadSkipSymlinks && isSymlink(absolutePath) {
c.logger.Debug("Skipping symlink %s", file)
continue
}

path, err := filepath.Rel(basepath, absolutePath)
if err != nil {
return fmt.Errorf("resolving relative path for file %s: %w", file, err)
// If a glob is absolute, we need to make it relative to the root so that
// it can be combined with the download destination to make a valid path.
// This is possibly weird and crazy, this logic dates back to
// https://github.com/buildkite/agent/commit/8ae46d975aa60d1ae0e2cc0bff7a43d3bf960935
// from 2014, so I'm replicating it here to avoid breaking things
basepath := c.wd
if filepath.IsAbs(globPath) {
basepath = "/"
if runtime.GOOS == "windows" {
basepath = filepath.VolumeName(absolutePath) + "/"
}
}

if experiments.IsEnabled(ctx, experiments.NormalisedUploadPaths) {
// Convert any Windows paths to Unix/URI form
path = filepath.ToSlash(path)
}
path, err := filepath.Rel(basepath, absolutePath)
if err != nil {
return fmt.Errorf("resolving relative path for file %s: %w", file, err)
}

// Build an artifact object using the paths we have.
artifact, err := c.build(path, absolutePath, globPath)
if err != nil {
return fmt.Errorf("building artifact: %w", err)
}
if experiments.IsEnabled(ctx, experiments.NormalisedUploadPaths) {
// Convert any Windows paths to Unix/URI form
path = filepath.ToSlash(path)
}

c.mu.Lock()
c.artifacts = append(c.artifacts, artifact)
c.mu.Unlock()
// Build an artifact object using the paths we have.
artifact, err := c.build(path, absolutePath, globPath)
if err != nil {
return fmt.Errorf("building artifact: %w", err)
}

c.mu.Lock()
c.artifacts = append(c.artifacts, artifact)
c.mu.Unlock()
}
return nil
}

func (a *ArtifactUploader) build(path string, absolutePath string, globPath string) (*api.Artifact, error) {
Expand Down

0 comments on commit d191162

Please sign in to comment.