-
Notifications
You must be signed in to change notification settings - Fork 300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add log group headers and timestamps to job verification success and failure logs #2461
Add log group headers and timestamps to job verification success and failure logs #2461
Conversation
agent/run_job.go
Outdated
@@ -148,6 +157,50 @@ func (r *JobRunner) Run(ctx context.Context) error { | |||
return nil | |||
} | |||
|
|||
func (r *JobRunner) prependTimestampForLogs(s string, args ...any) []byte { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right now this func both prepending stuff and also sprintf-ing its args... is there a reason that it has to happen all in one go? or can we sprintf prior to passing something to this func?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just seems easier to call one method rather than 2. If you want to sprintf prior to this method, then this interface does not prevent that. But if you don't want to, the suggested interface does.
But I should rename the method, its name only suggests the prepending, not the sprintfing.
s.Logger.Commentf("Job API server listening on %s", s.SocketPath) | ||
s.Logger.Printf("~~~ Job API") | ||
s.Logger.Printf("Server listening on %s", s.SocketPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👩🍳💋
agent/run_job.go
Outdated
time.Now().UnixNano()/int64(time.Millisecond), | ||
fmt.Sprintf(s, args...), | ||
)) | ||
case r.conf.AgentConfiguration.TimestampLines: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a single stream for processing job output + stuff the job runner wants to pretend is job output? Aside from duplicating the timestamping logic, pushing logs directly into r.logStreamer
could make header times incorrect with non-ANSI timestamps (skipping the header times streamer), and might also skip the local log output file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can have a single stream. The verification logs happen in the agent start
process, while most of the other logs are streamed from the agent bootstrap
process. So I think the best we can do is have two streams that are created from a common method.
We could have something in the local API so that the bootstrap process can stream logs over the local API to the remote agent API, but creating that for this task seems like a massive yak shave.
pushing logs directly into r.logStreamer could make header times incorrect with non-ANSI timestamps
I don't really want to carry around the legacy of non-ANSI timestamps. Are you happy for me to remove support for these from this PR?
might also skip the local log output file?
Yeah, they probably will. I'll look into something for this, and a way to not duplicate the timestamp logic, but if the answer to the question above is in the affirmative, then only about 1 line will be duplicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can have a single stream. The verification logs happen in the agent start process, while most of the other logs are streamed from the agent bootstrap process. So I think the best we can do is have two streams that are created from a common method.
Looks like I was wrong about this. I'll do the work to make a single stream.
a8f8148
to
86f16ad
Compare
// The logger to use | ||
logger logger.Logger | ||
// agentLogger is a agentLogger that outputs to the agent logs | ||
agentLogger logger.Logger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To reduce confusion between agent logs and job logs, I renamed the logger
to make it clear that it is not the job logs.
@@ -131,9 +131,12 @@ type JobRunner struct { | |||
// The internal header time streamer | |||
headerTimesStreamer *headerTimesStreamer | |||
|
|||
// The internal log streamer | |||
// The internal log streamer. Don't write to this directly, use `jobLogs` instead |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I toyed with ways to enforce this, but ultimately they proved too invasive.
@@ -102,7 +106,7 @@ func (ls *LogStreamer) FailedChunks() int { | |||
} | |||
|
|||
// Process streams the output. | |||
func (ls *LogStreamer) Process(output []byte) error { | |||
func (ls *LogStreamer) Process(output []byte) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method always returns nil
, and is not part of an interface.
} | ||
|
||
// Waits for all the chunks to be uploaded, then shuts down all the workers | ||
func (ls *LogStreamer) Stop() error { | ||
func (ls *LogStreamer) Stop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
@@ -204,6 +242,10 @@ func (r *JobRunner) runJob(ctx context.Context) processExit { | |||
func (r *JobRunner) cleanup(ctx context.Context, wg *sync.WaitGroup, exit processExit) { | |||
finishedAt := time.Now() | |||
|
|||
// Flush the job logs. These should have been flushed already if the process started, but if it | |||
// never started, then logs from prior to the attempt to start the process will still be buffered. | |||
r.logStreamer.Process(r.output.ReadAndTruncate()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Realising that we need to do this took too long.
// every few seconds and sends it back to Buildkite. | ||
func (r *JobRunner) jobLogStreamer(ctx context.Context, wg *sync.WaitGroup) { | ||
func (r *JobRunner) streamJobLogsAfterProcessStart(ctx context.Context, wg *sync.WaitGroup) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed this method to make it clearer that it waits for the process to start before shipping logs.
r.output.Close() | ||
} | ||
// Send the output of the process to the log streamer for processing | ||
r.logStreamer.Process(r.output.ReadAndTruncate()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because r.logStreamer.Process
can never error, this is greatly simplified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can easily resurrect the error return once we start enforcing job log limits.
c1a85e9
to
4eaf8ad
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome work!
// jobLogs is an io.Writer that sends data to the job logs | ||
jobLogs io.Writer | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🥳
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
☘️
Co-authored-by: Ben Moskovitz <[email protected]>
Co-authored-by: Ben Moskovitz <[email protected]>
94cf78d
to
4619a37
Compare
I had to also add a header to the Job API logs as otherwise they would appear to be part of the verification log group.
There are probably other such logs like this, we need to weed them out eventually.
I also simplified some of the logging, in particular, the signature is not printed as part of the log group header, but in a subsequent line.
As the log group will be collapsed unless there is a failure and the agent is configured to block, the signature will not be immediately visible.
Also, for the agent logs, I tried to add fields using the
logger.WithFields
method. These will be displayed in the logfmt format (or json if configured to do so). This will make the logs more structured.Finally, I've removed the use of
(*JobRunner).logStreamer.Process
to send logs directly to the API as it bypasses various transformations that happen before (like timestamping). Users will still be able to calllogStreamer.Process
, but should use(*JobRunner).jobLogs
, which exposes anio.Writer
interface, instead.Screenshots
Verification Failure Warning
Before
After
Verification Failure Block
Before
After
Verification Success
Before
After