Skip to content

Commit

Permalink
Log streamer cleanups
Browse files Browse the repository at this point in the history
- Use the WaitGroup to count workers instead of pending log uploads, averting any potential races on the WaitGroup
- Signal to workers that work is complete by closing the channel, instead of sending nil several times
- Handle context cancellation in worker with a select
- Add context arg to Process, and a select to handle it there, too
- Track whether the streamer has been stopped, and use that to return an error from Process.
- Reinstate error handling inside streamJobLogsAfterProcessStart
- Neaten up core of Process loop
- Doc comments
  • Loading branch information
DrJosh9000 committed Nov 15, 2023
1 parent 50b07ee commit 56b1472
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 52 deletions.
119 changes: 69 additions & 50 deletions agent/log_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (

const defaultLogMaxSize = 1024 * 1024 * 1024 // 1 GiB

// Returned from Process after Stop has been called.
var errStreamerStopped = errors.New("streamer stopped")

// LogStreamerConfig contains configuration options for the log streamer.
type LogStreamerConfig struct {
// How many log streamer workers are running at any one time
Concurrency int
Expand All @@ -25,6 +29,9 @@ type LogStreamerConfig struct {
MaxSizeBytes uint64
}

// LogStreamer divides job log output into chunks (Process), and log streamer
// workers (goroutines created by Start) receive and upload those chunks.
// The actual uploading is performed by the callback.
type LogStreamer struct {
// The configuration
conf LogStreamerConfig
Expand All @@ -47,16 +54,17 @@ type LogStreamer struct {
// Each chunk is assigned an order
order uint64

// Every time we add a job to the queue, we increase the wait group
// queue so when the streamer shuts down, we can block until all work
// has been added.
chunkWaitGroup sync.WaitGroup
// Counts workers that are still running
workerWG sync.WaitGroup

// Only allow processing one at a time
processMutex sync.Mutex

// Have we logged a warning about the size?
warnedAboutSize bool

// Have we stopped?
stopped bool
}

type LogStreamerChunk struct {
Expand All @@ -73,21 +81,21 @@ type LogStreamerChunk struct {
Size uint64
}

// Creates a new instance of the log streamer
// NewLogStreamer creates a new instance of the log streamer.
func NewLogStreamer(
l logger.Logger,
cb func(context.Context, *LogStreamerChunk) error,
c LogStreamerConfig,
agentLogger logger.Logger,
callback func(context.Context, *LogStreamerChunk) error,
conf LogStreamerConfig,
) *LogStreamer {
return &LogStreamer{
logger: l,
conf: c,
callback: cb,
logger: agentLogger,
conf: conf,
callback: callback,
queue: make(chan *LogStreamerChunk, 1024),
}
}

// Spins up x number of log streamer workers
// Start spins up a number of log streamer workers.
func (ls *LogStreamer) Start(ctx context.Context) error {
if ls.conf.MaxChunkSizeBytes <= 0 {
return errors.New("Maximum chunk size must be more than 0. No logs will be sent.")
Expand All @@ -97,6 +105,7 @@ func (ls *LogStreamer) Start(ctx context.Context) error {
ls.conf.MaxSizeBytes = defaultLogMaxSize
}

ls.workerWG.Add(ls.conf.Concurrency)
for i := 0; i < ls.conf.Concurrency; i++ {
go ls.worker(ctx, i)
}
Expand All @@ -108,12 +117,18 @@ func (ls *LogStreamer) FailedChunks() int {
return int(atomic.LoadInt32(&ls.chunksFailedCount))
}

// Process streams the output.
func (ls *LogStreamer) Process(output []byte) {
// Process streams the output. It returns an error if the output data cannot be
// processed at all (e.g. the streamer was stopped or a hard limit was reached).
// Transient failures to upload logs are instead handled in the callback.
func (ls *LogStreamer) Process(ctx context.Context, output []byte) error {
// Only allow one streamer process at a time
ls.processMutex.Lock()
defer ls.processMutex.Unlock()

if ls.stopped {
return errStreamerStopped
}

for len(output) > 0 {
// Have we exceeded the max size?
// (This check is also performed on the server side.)
Expand All @@ -125,72 +140,81 @@ func (ls *LogStreamer) Process(output []byte) {
humanize.Bytes(ls.bytes), humanize.Bytes(ls.conf.MaxSizeBytes))
ls.warnedAboutSize = true
// In a future version, this will error out, e.g.:
//return fmt.Errorf("job log has exceeded max job log size (%d > %d)", ls.bytes, ls.conf.MaxSizeBytes)
//return fmt.Errorf("%w (%d > %d)", errLogExceededMaxSize, ls.bytes, ls.conf.MaxSizeBytes)
}

// Add another chunk...
ls.chunkWaitGroup.Add(1)

// Find the upper limit of the blob
// The next chunk will be up to MaxChunkSizeBytes in size.
size := ls.conf.MaxChunkSizeBytes
if lenout := uint64(len(output)); size > lenout {
size = lenout
}

// Grab the ≤100kb section of the blob.
// Leave the remainder for the next iteration.
chunk := output[:size]
output = output[size:]

// Take the chunk from the start of output, leave the remainder for the
// next iteration.
ls.order++

// Create the chunk and append it to our list
ls.queue <- &LogStreamerChunk{
Data: chunk,
chunk := &LogStreamerChunk{
Data: output[:size],
Order: ls.order,
Offset: ls.bytes,
Size: size,
}
output = output[size:]

// Save the new amount of bytes
// Stream the chunk onto the queue!
select {
case ls.queue <- chunk:
// Streamed!
case <-ctx.Done(): // pack it up
return ctx.Err()
}
ls.bytes += size
}

return nil
}

// Waits for all the chunks to be uploaded, then shuts down all the workers
// Stop stops the streamer.
func (ls *LogStreamer) Stop() {
ls.logger.Debug("[LogStreamer] Waiting for all the chunks to be uploaded")

ls.chunkWaitGroup.Wait()

ls.logger.Debug("[LogStreamer] Shutting down all workers")

for n := 0; n < ls.conf.Concurrency; n++ {
ls.queue <- nil
ls.processMutex.Lock()
if ls.stopped {
ls.processMutex.Unlock()
return
}
ls.stopped = true
close(ls.queue)
ls.processMutex.Unlock()

ls.logger.Debug("[LogStreamer] Waiting for workers to shut down")
ls.workerWG.Wait()
}

// The actual log streamer worker
func (ls *LogStreamer) worker(ctx context.Context, id int) {
ls.logger.Debug("[LogStreamer/Worker#%d] Worker is starting...", id)

defer ls.logger.Debug("[LogStreamer/Worker#%d] Worker has shutdown", id)
defer ls.workerWG.Done()

ctx, setStat, done := status.AddSimpleItem(ctx, fmt.Sprintf("Log Streamer Worker %d", id))
defer done()
setStat("🏃 Starting...")

for {
setStat("⌚️ Waiting for chunk")
setStat("⌚️ Waiting for a chunk")

// Get the next chunk (pointer) from the queue. This will block
// until something is returned.
chunk := <-ls.queue

// If the next chunk is nil, then there is no more work to do
if chunk == nil {
break
var chunk *LogStreamerChunk
select {
case chunk = <-ls.queue:
if chunk == nil { // channel was closed
return
}
case <-ctx.Done(): // pack it up
return
}

setStat("📨 Passing chunk to callback")
setStat("📨 Uploading chunk")

// Upload the chunk
err := ls.callback(ctx, chunk)
Expand All @@ -199,10 +223,5 @@ func (ls *LogStreamer) worker(ctx context.Context, id int) {

ls.logger.Error("Giving up on uploading chunk %d, this will result in only a partial build log on Buildkite", chunk.Order)
}

// Signal to the chunkWaitGroup that this one is done
ls.chunkWaitGroup.Done()
}

ls.logger.Debug("[LogStreamer/Worker#%d] Worker has shutdown", id)
}
13 changes: 11 additions & 2 deletions agent/run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (r *JobRunner) cleanup(ctx context.Context, wg *sync.WaitGroup, exit proces
// Flush the job logs. If the process is never started, then logs from prior to the attempt to
// start the process will still be buffered. Also, there may still be logs in the buffer that
// were left behind because the uploader goroutine exited before it could flush them.
r.logStreamer.Process(r.output.ReadAndTruncate())
r.logStreamer.Process(ctx, r.output.ReadAndTruncate())

// Stop the log streamer. This will block until all the chunks have been uploaded
r.logStreamer.Stop()
Expand Down Expand Up @@ -360,7 +360,16 @@ func (r *JobRunner) streamJobLogsAfterProcessStart(ctx context.Context, wg *sync
setStat("📨 Sending process output to log streamer")

// Send the output of the process to the log streamer for processing
r.logStreamer.Process(r.output.ReadAndTruncate())
if err := r.logStreamer.Process(ctx, r.output.ReadAndTruncate()); err != nil {
r.agentLogger.Error("Could not stream the log output: %v", err)
// LogStreamer.Process only returns an error when it can no longer
// accept logs (maybe Stop was called, or a hard limit was reached).
// Since we can no longer send logs, Close the buffer, which causes
// future Writes to return io.ErrClosedPipe, typically SIGPIPE-ing
// the running process (if it is still running).
r.output.Close()
return
}

setStat("😴 Sleeping for a bit")

Expand Down

0 comments on commit 56b1472

Please sign in to comment.