Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce header regexps #2135

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 38 additions & 45 deletions agent/header_times_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ import (
"github.com/buildkite/agent/v3/status"
)

// If you change header parsing here make sure to change it in the
// buildkite.com frontend logic, too

var (
headerRE = regexp.MustCompile(`^(---|\+\+\+|~~~)\s`)
headerExpansionRE = regexp.MustCompile(`^\^\^\^\s\+\+\+`)
ansiColourRE = regexp.MustCompile(`\x1b\[([;\d]+)?[mK]`)
)

type headerTimesStreamer struct {
// The logger instance to use
logger logger.Logger
Expand Down Expand Up @@ -88,27 +97,33 @@ func (h *headerTimesStreamer) Run(ctx context.Context) {
}

// Scan takes a line of log output and tracks a time if it's a header.
// Returns true for header lines
// Returns true for header lines or header expansion lines.
func (h *headerTimesStreamer) Scan(line string) bool {
// Keep track of how many line scans we need to do
h.scanWaitGroup.Add(1)
defer h.scanWaitGroup.Done()

if isHeader(line) {
h.logger.Debug("[HeaderTimesStreamer] Found header %q", line)

// Acquire a lock on the times and then add the current time to
// our times slice.
h.timesMutex.Lock()
h.times = append(h.times, time.Now().UTC().Format(time.RFC3339Nano))
h.timesMutex.Unlock()
// Make sure all ANSI colours are removed from the string before we
// check to see if it's a header (sometimes a colour escape sequence may
// be the first thing on the line, which will cause the regex to ignore it)
line = ansiColourRE.ReplaceAllString(line, "")

// Add the time to the wait group
h.uploadWaitGroup.Add(1)
return true
if !headerRE.MatchString(line) {
// It's not a header, but could be a header expansion.
return headerExpansionRE.MatchString(line)
}

return false
h.logger.Debug("[HeaderTimesStreamer] Found header %q", line)

// Acquire a lock on the times and then add the current time to
// our times slice.
h.timesMutex.Lock()
h.times = append(h.times, time.Now().UTC().Format(time.RFC3339Nano))
h.timesMutex.Unlock()

// Add the time to the wait group
h.uploadWaitGroup.Add(1)
return true
}

func (h *headerTimesStreamer) Upload(ctx context.Context) {
Expand All @@ -135,15 +150,17 @@ func (h *headerTimesStreamer) Upload(ctx context.Context) {
timesToUpload := len(times)

// Do we even have some times to upload
if timesToUpload > 0 {
// Call our callback with the times for upload
h.logger.Debug("[HeaderTimesStreamer] Uploading header times %d..%d", c, length-1)
h.uploadCallback(ctx, c, length, payload)
h.logger.Debug("[HeaderTimesStreamer] Finished uploading header times %d..%d", c, length-1)

// Decrement the wait group for every time we've uploaded.
h.uploadWaitGroup.Add(timesToUpload * -1)
if timesToUpload == 0 {
return
}

// Call our callback with the times for upload
h.logger.Debug("[HeaderTimesStreamer] Uploading header times %d..%d", c, length-1)
h.uploadCallback(ctx, c, length, payload)
h.logger.Debug("[HeaderTimesStreamer] Finished uploading header times %d..%d", c, length-1)

// Decrement the wait group for every time we've uploaded.
h.uploadWaitGroup.Add(timesToUpload * -1)
}

func (h *headerTimesStreamer) Stop() {
Expand All @@ -159,27 +176,3 @@ func (h *headerTimesStreamer) Stop() {
h.streaming = false
h.streamingMutex.Unlock()
}

// If you change header parsing here make sure to change it in the
// buildkite.com frontend logic, too

var (
headerRegex = regexp.MustCompile(`^(?:---|\+\+\+|~~~)\s(.+)?$`)
headerExpansionRegex = regexp.MustCompile(`^(?:\^\^\^\s+\+\+\+)\s*$`)
ansiColorRegex = regexp.MustCompile(`\x1b\[([;\d]+)?[mK]`)
)

func isHeader(line string) bool {
// Make sure all ANSI colors are removed from the string before we
// check to see if it's a header (sometimes a color escape sequence may
// be the first thing on the line, which will cause the regex to ignore it)
line = ansiColorRegex.ReplaceAllString(line, "")

// To avoid running the regex over every single line, we'll first do a
// length check. Hopefully there are no heeaders over 500 characters!
return len(line) < 500 && headerRegex.MatchString(line)
}

func isHeaderExpansion(line string) bool {
return len(line) < 50 && headerExpansionRegex.MatchString(line)
}
5 changes: 3 additions & 2 deletions agent/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,11 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con
// Use a scanner to process output line by line
err := process.NewScanner(r.logger).ScanLines(pr, func(line string) {
// Send to our header streamer and determine if it's a header
isHeader := r.headerTimesStreamer.Scan(line)
// or header expansion.
isHeaderOrExpansion := r.headerTimesStreamer.Scan(line)

// Prefix non-header log lines with timestamps
if !(isHeaderExpansion(line) || isHeader) {
if !isHeaderOrExpansion {
line = fmt.Sprintf("[%s] %s", time.Now().UTC().Format(time.RFC3339), line)
}

Expand Down