Skip to content

Commit

Permalink
Fix a few issues in BatchLogging.groovy (#3443)
Browse files Browse the repository at this point in the history
1. Adds the projectId to BatchLogging's LoggingOptions so the logs
will be fetched from the correct project. Previously I got a lot of
"that resource might not exist" responses from the log read requests.
I'm not sure if there's something special about my environment that
causes me to need the projectId in the options, but I think it
couldn't hurt to have the projectId in general.

2. Removes the old parseOutput function, which relied on the STDERR and
STDOUT prefixes in the payload of the log entries. Batch no longer adds
prefixes so that parsing doesn't work. Now we can distinguish stderr
from stdout by looking at the logEntry's severity.

3. Catches any exception thrown by the log reading code. It could be a
permissions issue like I was seeing, or a thread pool issuee as in
#3166. Either way,
something going wrong while trying to read task logs should probably
not stop the whole workflow.

With these changes, the nf-core/methylseq workflow can be run with
the google-batch executor.

Signed-off-by: Aaron Golden <[email protected]>

Signed-off-by: Aaron Golden <[email protected]>
Co-authored-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
aaronegolden and pditommaso authored Dec 3, 2022
1 parent 927c410 commit e2bbcf1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@ package nextflow.cloud.google.batch.logging
import com.google.cloud.logging.LogEntry
import com.google.cloud.logging.Logging
import com.google.cloud.logging.LoggingOptions
import com.google.cloud.logging.Severity
import groovy.transform.Memoized
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.cloud.google.batch.client.BatchConfig
/**
* Batch logging client
*
* @author Paolo Di Tommaso <[email protected]>
*/
@Slf4j
class BatchLogging {

private static final String STDOUT = 'STDOUT: '
private static final String STDERR = 'STDERR: '

private String mode = STDOUT
private LoggingOptions opts
private String projectId

Expand All @@ -45,8 +43,8 @@ class BatchLogging {

BatchLogging(BatchConfig config) {
final creds = config.googleOpts.credentials
this.opts = LoggingOptions .newBuilder() .setCredentials(creds) .build()
this.projectId = config.googleOpts.projectId
this.opts = LoggingOptions .newBuilder() .setCredentials(creds) .setProjectId(this.projectId) .build()
}

String stdout(String jobId) {
Expand All @@ -57,45 +55,33 @@ class BatchLogging {
return fetchLogs(jobId)[1]
}

@PackageScope String currentMode() { mode }

@Memoized(maxCacheSize = 1000)
@PackageScope List<String> fetchLogs(String uid) {
final stdout = new StringBuilder()
final stderr = new StringBuilder()
try(Logging logging = opts.getService()) {
// use logging here
final filter = "resource.type=generic_task AND logName=projects/${projectId}/logs/batch_task_logs AND labels.job_uid=$uid"
final entries = logging.listLogEntries(
Logging.EntryListOption.filter(filter),
Logging.EntryListOption.pageSize(1000) )

final stdout = new StringBuilder()
final stderr = new StringBuilder()
final page = entries.getValues()
for (LogEntry logEntry : page.iterator()) {
final output = logEntry.payload.data.toString()
parseOutput(output, stdout, stderr)
parseOutput(logEntry, stdout, stderr)
}

return [ stdout.toString(), stderr.toString() ]
} catch (Exception e) {
log.debug "[GOOGLE BATCH] Cannot read logs for job: `$uid` | ${e.message}"
}
return [ stdout.toString(), stderr.toString() ]
}

protected void parseOutput(String output, StringBuilder stdout, StringBuilder stderr) {
// check stderr
def p = output.indexOf(STDERR)
if( p>=0 ) {
mode = STDERR
output = output.substring(p+STDERR.size())
}
else if( (p = output.indexOf(STDOUT))>=0 ) {
mode = STDOUT
output = output.substring(p+STDOUT.size())
}
// now append the result
if( mode==STDOUT )
stdout.append(output)
else
protected void parseOutput(LogEntry logEntry, StringBuilder stdout, StringBuilder stderr) {
final output = logEntry.payload.data.toString()
if (logEntry.severity == Severity.ERROR) {
stderr.append(output)
} else {
stdout.append(output)
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import com.google.cloud.batch.v1.LogsPolicy
import com.google.cloud.batch.v1.Runnable
import com.google.cloud.batch.v1.TaskGroup
import com.google.cloud.batch.v1.TaskSpec
import com.google.cloud.logging.LogEntry
import com.google.cloud.logging.Payload.StringPayload
import com.google.cloud.logging.Severity
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.cloud.google.batch.client.BatchClient
Expand All @@ -43,51 +46,45 @@ class BatchLoggingTest extends Specification {

def 'should parse stdout and stderr' () {
given:
def OUT_TEXT = ' Task action/STARTUP/0/0/group0/0, STDOUT: No user sessions are running outdated binaries.\n'
def ERR_TEXT = ' Task action/STARTUP/0/0/group0/0, STDERR: Oops something has failed. We are sorry.\n'
def OUT_ENTRY1 = LogEntry.newBuilder(StringPayload.of('No user sessions are running outdated binaries.\n')).setSeverity(Severity.INFO).build()
def OUT_ENTRY2 = LogEntry.newBuilder(StringPayload.of('Hello world')).setSeverity(Severity.INFO).build()
def ERR_ENTRY1 = LogEntry.newBuilder(StringPayload.of('Oops something has failed. We are sorry.\n')).setSeverity(Severity.ERROR).build()
def ERR_ENTRY2 = LogEntry.newBuilder(StringPayload.of('blah blah')).setSeverity(Severity.ERROR).build()
and:
def client = new BatchLogging()

when:
def stdout = new StringBuilder()
def stderr = new StringBuilder()
and:
client.parseOutput(OUT_TEXT, stdout, stderr)
client.parseOutput(OUT_ENTRY1, stdout, stderr)
then:
stdout.toString() == 'No user sessions are running outdated binaries.\n'
and:
stderr.toString() == ''
and:
client.currentMode() == 'STDOUT: '

when:
client.parseOutput(ERR_TEXT, stdout, stderr)
client.parseOutput(ERR_ENTRY1, stdout, stderr)
then:
stderr.toString() == 'Oops something has failed. We are sorry.\n'
and:
client.currentMode() == 'STDERR: '

when:
client.parseOutput('blah blah', stdout, stderr)
client.parseOutput(ERR_ENTRY2, stdout, stderr)
then:
// the message is appended to the stderr because not prefix is provided
stderr.toString() == 'Oops something has failed. We are sorry.\nblah blah'
and:
// no change to the stdout
stdout.toString() == 'No user sessions are running outdated binaries.\n'
and:
client.currentMode() == 'STDERR: '

when:
client.parseOutput('STDOUT: Hello world', stdout, stderr)
client.parseOutput(OUT_ENTRY2, stdout, stderr)
then:
// the message is added to the stdout
stdout.toString() == 'No user sessions are running outdated binaries.\nHello world'
and:
// no change to the stderr
stderr.toString() == 'Oops something has failed. We are sorry.\nblah blah'
and:
client.currentMode() == 'STDOUT: '

}

Expand Down

0 comments on commit e2bbcf1

Please sign in to comment.