Skip to content

Commit

Permalink
Merge pull request #101 from Marcono1234/process-handling
Browse files Browse the repository at this point in the history
Move process output handling to separate class & add tests
  • Loading branch information
TheSnoozer authored Mar 3, 2024
2 parents d92eba9 + af7092f commit a5f1676
Show file tree
Hide file tree
Showing 3 changed files with 369 additions and 75 deletions.
91 changes: 16 additions & 75 deletions src/main/java/pl/project13/core/NativeGitProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
import javax.annotation.Nonnull;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -338,7 +337,7 @@ private String getOriginRemote(File directory, long nativeGitTimeoutInMs) throws
}

/**
* Runs a maven command and returns {@code true} if output was non empty.
* Runs a Git command and returns {@code true} if output was non empty.
* Can be used to short cut reading output from command when we know it may be a rather long one.
* Return true if the result is empty.
**/
Expand Down Expand Up @@ -468,12 +467,10 @@ public String run(File directory, long nativeGitTimeoutInMs, String command) thr
try {
final StringBuilder commandResult = new StringBuilder();

final Function<String, Boolean> stdoutConsumer = line -> {
final Consumer<String> stdoutConsumer = line -> {
if (line != null) {
commandResult.append(line).append("\n");
commandResult.append(line).append('\n');
}
// return true to indicate we want to read more content
return true;
};
runProcess(directory, nativeGitTimeoutInMs, command, stdoutConsumer);

Expand All @@ -489,12 +486,9 @@ public boolean runEmpty(File directory, long nativeGitTimeoutInMs, String comman
final AtomicBoolean empty = new AtomicBoolean(true);

try {
final Function<String, Boolean> stdoutConsumer = line -> {
if (line != null) {
empty.set(false);
}
// return false to indicate we don't need to read more content
return false;
final Consumer<String> stdoutConsumer = line -> {
empty.set(false);
// Ignore the content of the line
};
runProcess(directory, nativeGitTimeoutInMs, command, stdoutConsumer);
} catch (final InterruptedException ex) {
Expand All @@ -507,75 +501,22 @@ private void runProcess(
File directory,
long nativeGitTimeoutInMs,
String command,
final Function<String, Boolean> stdoutConsumer) throws InterruptedException, IOException, GitCommitIdExecutionException {
final Consumer<String> stdoutLineConsumer) throws InterruptedException, IOException, GitCommitIdExecutionException {

final ProcessBuilder builder = new ProcessBuilder(command.split("\\s"));
final Process proc = builder.directory(directory).start();

final ExecutorService executorService = Executors.newFixedThreadPool(2);
final StringBuilder errMsg = new StringBuilder();

final Future<Optional<RuntimeException>> stdoutFuture = executorService.submit(
new CallableBufferedStreamReader(proc.getInputStream(), stdoutConsumer));
final Future<Optional<RuntimeException>> stderrFuture = executorService.submit(
new CallableBufferedStreamReader(proc.getErrorStream(),
line -> {
errMsg.append(line);
// return true to indicate we want to read more content
return true;
}));

if (!proc.waitFor(nativeGitTimeoutInMs, TimeUnit.MILLISECONDS)) {
proc.destroy();
executorService.shutdownNow();
throw new RuntimeException(String.format("GIT-Command '%s' did not finish in %d milliseconds", command, nativeGitTimeoutInMs));
}

try {
stdoutFuture.get()
.ifPresent(e -> {
throw e;
});
stderrFuture.get()
.ifPresent(e -> {
throw e;
});
} catch (final ExecutionException e) {
throw new RuntimeException(String.format("Executing GIT-Command '%s' threw an '%s' exception.", command, e.getMessage()), e);
}
try (ProcessHandler processHandler = new ProcessHandler(proc, stdoutLineConsumer)) {
int exitValue = processHandler.exitValue(nativeGitTimeoutInMs, TimeUnit.MILLISECONDS);

executorService.shutdown();
if (proc.exitValue() != 0) {
throw new NativeCommandException(proc.exitValue(), command, directory, "", errMsg.toString());
}
}

private static class CallableBufferedStreamReader implements Callable<Optional<RuntimeException>> {
private final InputStream is;
private final Function<String, Boolean> streamConsumer;

CallableBufferedStreamReader(final InputStream is, final Function<String, Boolean> streamConsumer) {
this.is = is;
this.streamConsumer = streamConsumer;
}

@Override
public Optional<RuntimeException> call() {
RuntimeException thrownException = null;
try (final BufferedReader br = new BufferedReader(
new InputStreamReader(is, StandardCharsets.UTF_8))) {
for (String line = br.readLine();
line != null;
line = br.readLine()) {
if (!streamConsumer.apply(line)) {
break;
}
}
} catch (final IOException e) {
thrownException = new RuntimeException(String.format("Executing GIT-Command threw an '%s' exception.", e.getMessage()), e);
if (exitValue != 0) {
throw new NativeCommandException(exitValue, command, directory, "", processHandler.getStderr());
}

return Optional.ofNullable(thrownException);
} catch (TimeoutException e) {
throw new RuntimeException(String.format("GIT-Command '%s' did not finish in %d milliseconds", command, nativeGitTimeoutInMs), e);
} catch (ExecutionException e) {
throw new RuntimeException(String.format("Executing GIT-Command '%s' threw an '%s' exception.", command, e.getMessage()), e);
}
}
}
Expand Down
160 changes: 160 additions & 0 deletions src/main/java/pl/project13/core/ProcessHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* This file is part of git-commit-id-plugin-core by Konrad 'ktoso' Malawski <[email protected]>
*
* git-commit-id-plugin-core is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* git-commit-id-plugin-core is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with git-commit-id-plugin-core. If not, see <http://www.gnu.org/licenses/>.
*/

package pl.project13.core;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* Handles waiting for a {@link Process} and reading its stdout and stderr output.
*/
class ProcessHandler implements AutoCloseable {
private final Process process;

private final ExecutorService outputReaderExecutor;
private final Future<Void> stdoutFuture;
private final Future<String> stderrFuture;

private String stderrOutput = null;

/**
* @param process the process which should be handled
* @param stdoutLineConsumer called asynchronously with the lines read from stdout. The consumer
* must either be thread-safe, or the result it is building must only be used after
* {@link #exitValue(long, TimeUnit)} has returned without throwing an exception. The
* consumer must not block, otherwise this could prevent the process from writing any
* output, causing it to get stuck.
*/
public ProcessHandler(Process process, Consumer<String> stdoutLineConsumer) {
this.process = Objects.requireNonNull(process);
Objects.requireNonNull(stdoutLineConsumer);

// 2 threads, one for stdout, one for stderr
// The process output is consumed concurrently by separate threads because otherwise the process
// could get stuck if the output is not consumed and the output buffer is full
ThreadFactory threadFactory = Executors.defaultThreadFactory();
outputReaderExecutor = Executors.newFixedThreadPool(2, runnable -> {
Thread t = threadFactory.newThread(runnable);
// Don't prevent JVM exit
t.setDaemon(true);
return t;
});

String processInfo;
try {
processInfo = this.process.info().command().orElse("?") + " [" + this.process.pid() + "]";
} catch (UnsupportedOperationException e) {
processInfo = "<unknown-process>";
}
stdoutFuture =
outputReaderExecutor.submit(new ProcessOutputReader<>("stdout reader (" + processInfo + ")",
this.process.getInputStream(), stdoutLineConsumer,
// Don't create a 'result', `stdoutLineConsumer` will do that itself if needed
() -> null));

StringBuilder stderrBuilder = new StringBuilder();
stderrFuture =
outputReaderExecutor.submit(new ProcessOutputReader<>("stderr reader (" + processInfo + ")",
this.process.getErrorStream(), line -> stderrBuilder.append(line).append('\n'),
stderrBuilder::toString));
}

/**
* Waits for the process to finish and returns the exit value.
*
* @throws TimeoutException if waiting for the process to finish times out
*/
public int exitValue(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException, ExecutionException {
boolean finished = process.waitFor(timeout, timeUnit);
if (finished) {

outputReaderExecutor.shutdown();
try {
stdoutFuture.get();
} catch (ExecutionException e) {
throw new ExecutionException("Failed waiting for stdout", e.getCause());
}
try {
stderrOutput = stderrFuture.get();
} catch (ExecutionException e) {
throw new ExecutionException("Failed waiting for stderr", e.getCause());
}
return process.exitValue();
}
throw new TimeoutException();
}

/**
* Gets the stderr output. Must only be called after {@link #exitValue(long, TimeUnit)} has
* returned successfully.
*/
public String getStderr() {
if (stderrOutput == null) {
throw new IllegalStateException("Process has not finished");
}
return stderrOutput;
}

@Override
public void close() {
// Perform clean-up; has no effect if process or executor have already been stopped
process.destroy();
outputReaderExecutor.shutdownNow();
}

private static class ProcessOutputReader<T> implements Callable<T> {
private final String threadName;
private final InputStream is;
private final Consumer<String> lineConsumer;
private final Supplier<T> resultCreator;

ProcessOutputReader(String threadName, InputStream is, Consumer<String> lineConsumer, Supplier<T> resultCreator) {
this.threadName = threadName;
this.is = is;
this.lineConsumer = lineConsumer;
this.resultCreator = resultCreator;
}

@Override
public T call() throws Exception {
Thread.currentThread().setName(threadName);

try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {

String line;
while ((line = br.readLine()) != null) {
lineConsumer.accept(line);
}
}
return resultCreator.get();
}
}
}
Loading

0 comments on commit a5f1676

Please sign in to comment.