Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move process output handling to separate class & add tests #101

Merged
merged 1 commit into from
Mar 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 16 additions & 75 deletions src/main/java/pl/project13/core/NativeGitProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
import javax.annotation.Nonnull;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -338,7 +337,7 @@ private String getOriginRemote(File directory, long nativeGitTimeoutInMs) throws
}

/**
* Runs a maven command and returns {@code true} if output was non empty.
* Runs a Git command and returns {@code true} if output was non empty.
* Can be used to short cut reading output from command when we know it may be a rather long one.
* Return true if the result is empty.
**/
Expand Down Expand Up @@ -468,12 +467,10 @@ public String run(File directory, long nativeGitTimeoutInMs, String command) thr
try {
final StringBuilder commandResult = new StringBuilder();

final Function<String, Boolean> stdoutConsumer = line -> {
final Consumer<String> stdoutConsumer = line -> {
if (line != null) {
commandResult.append(line).append("\n");
commandResult.append(line).append('\n');
}
// return true to indicate we want to read more content
return true;
};
runProcess(directory, nativeGitTimeoutInMs, command, stdoutConsumer);

Expand All @@ -489,12 +486,9 @@ public boolean runEmpty(File directory, long nativeGitTimeoutInMs, String comman
final AtomicBoolean empty = new AtomicBoolean(true);

try {
final Function<String, Boolean> stdoutConsumer = line -> {
if (line != null) {
empty.set(false);
}
// return false to indicate we don't need to read more content
return false;
Comment on lines -496 to -497
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this logic to stop reading from the output. It is always necessary to read from the output, even if the lines are then discarded, otherwise the output buffer can get full and the process gets stuck writing to stdout.

final Consumer<String> stdoutConsumer = line -> {
empty.set(false);
// Ignore the content of the line
};
runProcess(directory, nativeGitTimeoutInMs, command, stdoutConsumer);
} catch (final InterruptedException ex) {
Expand All @@ -507,75 +501,22 @@ private void runProcess(
File directory,
long nativeGitTimeoutInMs,
String command,
final Function<String, Boolean> stdoutConsumer) throws InterruptedException, IOException, GitCommitIdExecutionException {
final Consumer<String> stdoutLineConsumer) throws InterruptedException, IOException, GitCommitIdExecutionException {

final ProcessBuilder builder = new ProcessBuilder(command.split("\\s"));
final Process proc = builder.directory(directory).start();

final ExecutorService executorService = Executors.newFixedThreadPool(2);
final StringBuilder errMsg = new StringBuilder();

final Future<Optional<RuntimeException>> stdoutFuture = executorService.submit(
new CallableBufferedStreamReader(proc.getInputStream(), stdoutConsumer));
final Future<Optional<RuntimeException>> stderrFuture = executorService.submit(
new CallableBufferedStreamReader(proc.getErrorStream(),
line -> {
errMsg.append(line);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might have been missing a \n after every line.

// return true to indicate we want to read more content
return true;
}));

if (!proc.waitFor(nativeGitTimeoutInMs, TimeUnit.MILLISECONDS)) {
proc.destroy();
executorService.shutdownNow();
throw new RuntimeException(String.format("GIT-Command '%s' did not finish in %d milliseconds", command, nativeGitTimeoutInMs));
}

try {
stdoutFuture.get()
.ifPresent(e -> {
throw e;
});
stderrFuture.get()
.ifPresent(e -> {
throw e;
});
} catch (final ExecutionException e) {
throw new RuntimeException(String.format("Executing GIT-Command '%s' threw an '%s' exception.", command, e.getMessage()), e);
}
try (ProcessHandler processHandler = new ProcessHandler(proc, stdoutLineConsumer)) {
int exitValue = processHandler.exitValue(nativeGitTimeoutInMs, TimeUnit.MILLISECONDS);

executorService.shutdown();
if (proc.exitValue() != 0) {
throw new NativeCommandException(proc.exitValue(), command, directory, "", errMsg.toString());
}
}

private static class CallableBufferedStreamReader implements Callable<Optional<RuntimeException>> {
private final InputStream is;
private final Function<String, Boolean> streamConsumer;

CallableBufferedStreamReader(final InputStream is, final Function<String, Boolean> streamConsumer) {
this.is = is;
this.streamConsumer = streamConsumer;
}

@Override
public Optional<RuntimeException> call() {
RuntimeException thrownException = null;
try (final BufferedReader br = new BufferedReader(
new InputStreamReader(is, StandardCharsets.UTF_8))) {
for (String line = br.readLine();
line != null;
line = br.readLine()) {
if (!streamConsumer.apply(line)) {
break;
}
}
} catch (final IOException e) {
thrownException = new RuntimeException(String.format("Executing GIT-Command threw an '%s' exception.", e.getMessage()), e);
if (exitValue != 0) {
throw new NativeCommandException(exitValue, command, directory, "", processHandler.getStderr());
}

return Optional.ofNullable(thrownException);
} catch (TimeoutException e) {
throw new RuntimeException(String.format("GIT-Command '%s' did not finish in %d milliseconds", command, nativeGitTimeoutInMs), e);
} catch (ExecutionException e) {
throw new RuntimeException(String.format("Executing GIT-Command '%s' threw an '%s' exception.", command, e.getMessage()), e);
}
}
}
Expand Down
160 changes: 160 additions & 0 deletions src/main/java/pl/project13/core/ProcessHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* This file is part of git-commit-id-plugin-core by Konrad 'ktoso' Malawski <[email protected]>
*
* git-commit-id-plugin-core is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* git-commit-id-plugin-core is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with git-commit-id-plugin-core. If not, see <http://www.gnu.org/licenses/>.
*/

package pl.project13.core;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* Handles waiting for a {@link Process} and reading its stdout and stderr output.
*/
class ProcessHandler implements AutoCloseable {
private final Process process;

private final ExecutorService outputReaderExecutor;
private final Future<Void> stdoutFuture;
private final Future<String> stderrFuture;

private String stderrOutput = null;

/**
* @param process the process which should be handled
* @param stdoutLineConsumer called asynchronously with the lines read from stdout. The consumer
* must either be thread-safe, or the result it is building must only be used after
* {@link #exitValue(long, TimeUnit)} has returned without throwing an exception. The
* consumer must not block, otherwise this could prevent the process from writing any
* output, causing it to get stuck.
*/
public ProcessHandler(Process process, Consumer<String> stdoutLineConsumer) {
this.process = Objects.requireNonNull(process);
Objects.requireNonNull(stdoutLineConsumer);

// 2 threads, one for stdout, one for stderr
// The process output is consumed concurrently by separate threads because otherwise the process
// could get stuck if the output is not consumed and the output buffer is full
ThreadFactory threadFactory = Executors.defaultThreadFactory();
outputReaderExecutor = Executors.newFixedThreadPool(2, runnable -> {
Thread t = threadFactory.newThread(runnable);
// Don't prevent JVM exit
t.setDaemon(true);
return t;
});

String processInfo;
try {
processInfo = this.process.info().command().orElse("?") + " [" + this.process.pid() + "]";
} catch (UnsupportedOperationException e) {
processInfo = "<unknown-process>";
}
stdoutFuture =
outputReaderExecutor.submit(new ProcessOutputReader<>("stdout reader (" + processInfo + ")",
this.process.getInputStream(), stdoutLineConsumer,
// Don't create a 'result', `stdoutLineConsumer` will do that itself if needed
() -> null));

StringBuilder stderrBuilder = new StringBuilder();
stderrFuture =
outputReaderExecutor.submit(new ProcessOutputReader<>("stderr reader (" + processInfo + ")",
this.process.getErrorStream(), line -> stderrBuilder.append(line).append('\n'),
stderrBuilder::toString));
}

/**
* Waits for the process to finish and returns the exit value.
*
* @throws TimeoutException if waiting for the process to finish times out
*/
public int exitValue(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException, ExecutionException {
boolean finished = process.waitFor(timeout, timeUnit);
if (finished) {

outputReaderExecutor.shutdown();
try {
stdoutFuture.get();
} catch (ExecutionException e) {
throw new ExecutionException("Failed waiting for stdout", e.getCause());
}
try {
stderrOutput = stderrFuture.get();
} catch (ExecutionException e) {
throw new ExecutionException("Failed waiting for stderr", e.getCause());
}
Comment on lines +102 to +109
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not completely sure about this unwrapping with e.getCause() and then rewrapping as ExecutionException. Could also directly propagate e, but then you would have to inspect the stack trace to understand whether the exception occurred for the stdout or stderr thread.

return process.exitValue();
}
throw new TimeoutException();
}

/**
* Gets the stderr output. Must only be called after {@link #exitValue(long, TimeUnit)} has
* returned successfully.
*/
public String getStderr() {
if (stderrOutput == null) {
throw new IllegalStateException("Process has not finished");
}
return stderrOutput;
}

@Override
public void close() {
// Perform clean-up; has no effect if process or executor have already been stopped
process.destroy();
outputReaderExecutor.shutdownNow();
}

private static class ProcessOutputReader<T> implements Callable<T> {
private final String threadName;
private final InputStream is;
private final Consumer<String> lineConsumer;
private final Supplier<T> resultCreator;

ProcessOutputReader(String threadName, InputStream is, Consumer<String> lineConsumer, Supplier<T> resultCreator) {
this.threadName = threadName;
this.is = is;
this.lineConsumer = lineConsumer;
this.resultCreator = resultCreator;
}

@Override
public T call() throws Exception {
Thread.currentThread().setName(threadName);

try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {

String line;
while ((line = br.readLine()) != null) {
lineConsumer.accept(line);
}
}
return resultCreator.get();
}
}
}
Loading