-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move process output handling to separate class & add tests #101
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,12 +25,11 @@ | |
import javax.annotation.Nonnull; | ||
|
||
import java.io.*; | ||
import java.nio.charset.StandardCharsets; | ||
import java.text.SimpleDateFormat; | ||
import java.util.Optional; | ||
import java.util.concurrent.*; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.Function; | ||
import java.util.function.Consumer; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
|
@@ -338,7 +337,7 @@ private String getOriginRemote(File directory, long nativeGitTimeoutInMs) throws | |
} | ||
|
||
/** | ||
* Runs a maven command and returns {@code true} if output was non empty. | ||
* Runs a Git command and returns {@code true} if output was non empty. | ||
* Can be used to short cut reading output from command when we know it may be a rather long one. | ||
* Return true if the result is empty. | ||
**/ | ||
|
@@ -468,12 +467,10 @@ public String run(File directory, long nativeGitTimeoutInMs, String command) thr | |
try { | ||
final StringBuilder commandResult = new StringBuilder(); | ||
|
||
final Function<String, Boolean> stdoutConsumer = line -> { | ||
final Consumer<String> stdoutConsumer = line -> { | ||
if (line != null) { | ||
commandResult.append(line).append("\n"); | ||
commandResult.append(line).append('\n'); | ||
} | ||
// return true to indicate we want to read more content | ||
return true; | ||
}; | ||
runProcess(directory, nativeGitTimeoutInMs, command, stdoutConsumer); | ||
|
||
|
@@ -489,12 +486,9 @@ public boolean runEmpty(File directory, long nativeGitTimeoutInMs, String comman | |
final AtomicBoolean empty = new AtomicBoolean(true); | ||
|
||
try { | ||
final Function<String, Boolean> stdoutConsumer = line -> { | ||
if (line != null) { | ||
empty.set(false); | ||
} | ||
// return false to indicate we don't need to read more content | ||
return false; | ||
final Consumer<String> stdoutConsumer = line -> { | ||
empty.set(false); | ||
// Ignore the content of the line | ||
}; | ||
runProcess(directory, nativeGitTimeoutInMs, command, stdoutConsumer); | ||
} catch (final InterruptedException ex) { | ||
|
@@ -507,75 +501,22 @@ private void runProcess( | |
File directory, | ||
long nativeGitTimeoutInMs, | ||
String command, | ||
final Function<String, Boolean> stdoutConsumer) throws InterruptedException, IOException, GitCommitIdExecutionException { | ||
final Consumer<String> stdoutLineConsumer) throws InterruptedException, IOException, GitCommitIdExecutionException { | ||
|
||
final ProcessBuilder builder = new ProcessBuilder(command.split("\\s")); | ||
final Process proc = builder.directory(directory).start(); | ||
|
||
final ExecutorService executorService = Executors.newFixedThreadPool(2); | ||
final StringBuilder errMsg = new StringBuilder(); | ||
|
||
final Future<Optional<RuntimeException>> stdoutFuture = executorService.submit( | ||
new CallableBufferedStreamReader(proc.getInputStream(), stdoutConsumer)); | ||
final Future<Optional<RuntimeException>> stderrFuture = executorService.submit( | ||
new CallableBufferedStreamReader(proc.getErrorStream(), | ||
line -> { | ||
errMsg.append(line); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might have been missing a |
||
// return true to indicate we want to read more content | ||
return true; | ||
})); | ||
|
||
if (!proc.waitFor(nativeGitTimeoutInMs, TimeUnit.MILLISECONDS)) { | ||
proc.destroy(); | ||
executorService.shutdownNow(); | ||
throw new RuntimeException(String.format("GIT-Command '%s' did not finish in %d milliseconds", command, nativeGitTimeoutInMs)); | ||
} | ||
|
||
try { | ||
stdoutFuture.get() | ||
.ifPresent(e -> { | ||
throw e; | ||
}); | ||
stderrFuture.get() | ||
.ifPresent(e -> { | ||
throw e; | ||
}); | ||
} catch (final ExecutionException e) { | ||
throw new RuntimeException(String.format("Executing GIT-Command '%s' threw an '%s' exception.", command, e.getMessage()), e); | ||
} | ||
try (ProcessHandler processHandler = new ProcessHandler(proc, stdoutLineConsumer)) { | ||
int exitValue = processHandler.exitValue(nativeGitTimeoutInMs, TimeUnit.MILLISECONDS); | ||
|
||
executorService.shutdown(); | ||
if (proc.exitValue() != 0) { | ||
throw new NativeCommandException(proc.exitValue(), command, directory, "", errMsg.toString()); | ||
} | ||
} | ||
|
||
private static class CallableBufferedStreamReader implements Callable<Optional<RuntimeException>> { | ||
private final InputStream is; | ||
private final Function<String, Boolean> streamConsumer; | ||
|
||
CallableBufferedStreamReader(final InputStream is, final Function<String, Boolean> streamConsumer) { | ||
this.is = is; | ||
this.streamConsumer = streamConsumer; | ||
} | ||
|
||
@Override | ||
public Optional<RuntimeException> call() { | ||
RuntimeException thrownException = null; | ||
try (final BufferedReader br = new BufferedReader( | ||
new InputStreamReader(is, StandardCharsets.UTF_8))) { | ||
for (String line = br.readLine(); | ||
line != null; | ||
line = br.readLine()) { | ||
if (!streamConsumer.apply(line)) { | ||
break; | ||
} | ||
} | ||
} catch (final IOException e) { | ||
thrownException = new RuntimeException(String.format("Executing GIT-Command threw an '%s' exception.", e.getMessage()), e); | ||
if (exitValue != 0) { | ||
throw new NativeCommandException(exitValue, command, directory, "", processHandler.getStderr()); | ||
} | ||
|
||
return Optional.ofNullable(thrownException); | ||
} catch (TimeoutException e) { | ||
throw new RuntimeException(String.format("GIT-Command '%s' did not finish in %d milliseconds", command, nativeGitTimeoutInMs), e); | ||
} catch (ExecutionException e) { | ||
throw new RuntimeException(String.format("Executing GIT-Command '%s' threw an '%s' exception.", command, e.getMessage()), e); | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
/* | ||
* This file is part of git-commit-id-plugin-core by Konrad 'ktoso' Malawski <[email protected]> | ||
* | ||
* git-commit-id-plugin-core is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU Lesser General Public License as published by | ||
* the Free Software Foundation, either version 3 of the License, or | ||
* (at your option) any later version. | ||
* | ||
* git-commit-id-plugin-core is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU Lesser General Public License | ||
* along with git-commit-id-plugin-core. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
package pl.project13.core; | ||
|
||
import java.io.BufferedReader; | ||
import java.io.InputStream; | ||
import java.io.InputStreamReader; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.Objects; | ||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.ThreadFactory; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.function.Consumer; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* Handles waiting for a {@link Process} and reading its stdout and stderr output. | ||
*/ | ||
class ProcessHandler implements AutoCloseable { | ||
private final Process process; | ||
|
||
private final ExecutorService outputReaderExecutor; | ||
private final Future<Void> stdoutFuture; | ||
private final Future<String> stderrFuture; | ||
|
||
private String stderrOutput = null; | ||
|
||
/** | ||
* @param process the process which should be handled | ||
* @param stdoutLineConsumer called asynchronously with the lines read from stdout. The consumer | ||
* must either be thread-safe, or the result it is building must only be used after | ||
* {@link #exitValue(long, TimeUnit)} has returned without throwing an exception. The | ||
* consumer must not block, otherwise this could prevent the process from writing any | ||
* output, causing it to get stuck. | ||
*/ | ||
public ProcessHandler(Process process, Consumer<String> stdoutLineConsumer) { | ||
this.process = Objects.requireNonNull(process); | ||
Objects.requireNonNull(stdoutLineConsumer); | ||
|
||
// 2 threads, one for stdout, one for stderr | ||
// The process output is consumed concurrently by separate threads because otherwise the process | ||
// could get stuck if the output is not consumed and the output buffer is full | ||
ThreadFactory threadFactory = Executors.defaultThreadFactory(); | ||
outputReaderExecutor = Executors.newFixedThreadPool(2, runnable -> { | ||
Thread t = threadFactory.newThread(runnable); | ||
// Don't prevent JVM exit | ||
t.setDaemon(true); | ||
return t; | ||
}); | ||
|
||
String processInfo; | ||
try { | ||
processInfo = this.process.info().command().orElse("?") + " [" + this.process.pid() + "]"; | ||
} catch (UnsupportedOperationException e) { | ||
processInfo = "<unknown-process>"; | ||
} | ||
stdoutFuture = | ||
outputReaderExecutor.submit(new ProcessOutputReader<>("stdout reader (" + processInfo + ")", | ||
this.process.getInputStream(), stdoutLineConsumer, | ||
// Don't create a 'result', `stdoutLineConsumer` will do that itself if needed | ||
() -> null)); | ||
|
||
StringBuilder stderrBuilder = new StringBuilder(); | ||
stderrFuture = | ||
outputReaderExecutor.submit(new ProcessOutputReader<>("stderr reader (" + processInfo + ")", | ||
this.process.getErrorStream(), line -> stderrBuilder.append(line).append('\n'), | ||
stderrBuilder::toString)); | ||
} | ||
|
||
/** | ||
* Waits for the process to finish and returns the exit value. | ||
* | ||
* @throws TimeoutException if waiting for the process to finish times out | ||
*/ | ||
public int exitValue(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException, ExecutionException { | ||
boolean finished = process.waitFor(timeout, timeUnit); | ||
if (finished) { | ||
|
||
outputReaderExecutor.shutdown(); | ||
try { | ||
stdoutFuture.get(); | ||
} catch (ExecutionException e) { | ||
throw new ExecutionException("Failed waiting for stdout", e.getCause()); | ||
} | ||
try { | ||
stderrOutput = stderrFuture.get(); | ||
} catch (ExecutionException e) { | ||
throw new ExecutionException("Failed waiting for stderr", e.getCause()); | ||
} | ||
Comment on lines
+102
to
+109
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not completely sure about this unwrapping with |
||
return process.exitValue(); | ||
} | ||
throw new TimeoutException(); | ||
} | ||
|
||
/** | ||
* Gets the stderr output. Must only be called after {@link #exitValue(long, TimeUnit)} has | ||
* returned successfully. | ||
*/ | ||
public String getStderr() { | ||
if (stderrOutput == null) { | ||
throw new IllegalStateException("Process has not finished"); | ||
} | ||
return stderrOutput; | ||
} | ||
|
||
@Override | ||
public void close() { | ||
// Perform clean-up; has no effect if process or executor have already been stopped | ||
process.destroy(); | ||
outputReaderExecutor.shutdownNow(); | ||
} | ||
|
||
private static class ProcessOutputReader<T> implements Callable<T> { | ||
private final String threadName; | ||
private final InputStream is; | ||
private final Consumer<String> lineConsumer; | ||
private final Supplier<T> resultCreator; | ||
|
||
ProcessOutputReader(String threadName, InputStream is, Consumer<String> lineConsumer, Supplier<T> resultCreator) { | ||
this.threadName = threadName; | ||
this.is = is; | ||
this.lineConsumer = lineConsumer; | ||
this.resultCreator = resultCreator; | ||
} | ||
|
||
@Override | ||
public T call() throws Exception { | ||
Thread.currentThread().setName(threadName); | ||
|
||
try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { | ||
|
||
String line; | ||
while ((line = br.readLine()) != null) { | ||
lineConsumer.accept(line); | ||
} | ||
} | ||
return resultCreator.get(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this logic to stop reading from the output. It is always necessary to read from the output, even if the lines are then discarded, otherwise the output buffer can get full and the process gets stuck writing to stdout.