Skip to content

Commit

Permalink
grpc hot reload
Browse files Browse the repository at this point in the history
  • Loading branch information
michalszynkiewicz committed Jul 7, 2020
1 parent d5c4999 commit 8464e72
Show file tree
Hide file tree
Showing 25 changed files with 824 additions and 104 deletions.
1 change: 1 addition & 0 deletions bom/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,7 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc-protoc-plugin</artifactId>
<version>${project.version}</version>
<classifier>shaded</classifier>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -143,27 +142,21 @@ public void accept(Integer integer) {
}

private void startCodeGenWatcher(QuarkusClassLoader classLoader, List<CodeGenData> codeGens) {
Executors.newSingleThreadExecutor().execute(
() -> {
Collection<FSWatchUtil.Watcher> watchers = new ArrayList<>();
for (CodeGenData codeGen : codeGens) {
watchers.add(new FSWatchUtil.Watcher(codeGen.sourceDir, codeGen.provider.inputExtension(),
modifiedPaths -> {
try {
CodeGenerator.trigger(classLoader,
codeGen,
curatedApplication.getAppModel());
} catch (Exception any) {
log.warn("Code generation failed", any);
}
}));
}
try {
FSWatchUtil.observe(watchers, 500);
} catch (InterruptedException e) {
log.debug("Watching for code gen interrupted");
}
});

Collection<FSWatchUtil.Watcher> watchers = new ArrayList<>();
for (CodeGenData codeGen : codeGens) {
watchers.add(new FSWatchUtil.Watcher(codeGen.sourceDir, codeGen.provider.inputExtension(),
modifiedPaths -> {
try {
CodeGenerator.trigger(classLoader,
codeGen,
curatedApplication.getAppModel());
} catch (Exception any) {
log.warn("Code generation failed", any);
}
}));
}
FSWatchUtil.observe(watchers, 500);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.jboss.logging.Logger;

import io.quarkus.bootstrap.runner.Timing;
import io.quarkus.deployment.util.FSWatchUtil;
import io.quarkus.deployment.util.FileUtil;
import io.quarkus.dev.spi.HotReplacementContext;
import io.quarkus.dev.spi.HotReplacementSetup;
Expand Down Expand Up @@ -536,5 +537,6 @@ public void startupFailed() {
@Override
public void close() throws IOException {
compiler.close();
FSWatchUtil.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand All @@ -18,23 +20,32 @@ public class FSWatchUtil {

private static final Logger log = Logger.getLogger(FSWatchUtil.class);

private static final List<ExecutorService> executors = new ArrayList<>();

/**
* in a loop, checks for modifications in the files
*
* @param watchers list of {@link Watcher}s
*/
public static void observe(Collection<Watcher> watchers,
long intervalMs) throws InterruptedException {
Map<Path, Long> lastModified = new HashMap<>();
long intervalMs) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(
() -> doObserve(watchers, intervalMs));
executors.add(executorService);
}

private static void doObserve(Collection<Watcher> watchers, long intervalMs) {
Map<Path, Long> lastModified = new HashMap<>();
long lastCheck = 0;
// we're assuming no changes between the compilation and first execution, so don't trigger the watcher on first run
boolean firstRun = true;
//noinspection InfiniteLoopStatement
while (true) {
for (Watcher watcher : watchers) {
try {
List<Path> matchingPaths = Files.walk(watcher.rootPath)
Path rootPath = watcher.rootPath;
List<Path> matchingPaths = Files.walk(rootPath)
.filter(path -> FilenameUtils.getExtension(path.toString()).equals(watcher.fileExtension))
.collect(Collectors.toList());
List<Path> changedFiles = new ArrayList<>();
Expand All @@ -50,20 +61,29 @@ public static void observe(Collection<Watcher> watchers,
watcher.action.accept(changedFiles);
}
} catch (IOException e) {
log.warn("Failed checking for code gen source modifications", e);
log.debug("Failed checking for code gen source modifications", e);
}
}

long toSleep = intervalMs - (System.currentTimeMillis() - lastCheck);
if (toSleep > 0) {
//noinspection BusyWait
Thread.sleep(toSleep);
try {
//noinspection BusyWait
Thread.sleep(toSleep);
} catch (InterruptedException e) {
log.debug("Watching for code gen interrupted");
}
}
lastCheck = System.currentTimeMillis();
firstRun = false;
}
}

public static void shutdown() {
executors.forEach(ExecutorService::shutdown);
executors.clear();
}

public static class Watcher {
private final Path rootPath;
private final String fileExtension;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class QuarkusPrepare extends QuarkusTask {
public static final String INIT_AND_RUN = "initAndRun";
private Path sourcesDirectory;
private Consumer<Path> sourceRegistrar;
private boolean test = false; // mstodo test task!!!
private boolean test = false;

public QuarkusPrepare() {
super("Quarkus performs pre-build preparations, such as sources generation");
Expand Down
11 changes: 11 additions & 0 deletions extensions/grpc/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc-protoc-plugin</artifactId>
<classifier>shaded</classifier>
</dependency>

<dependency>
Expand Down Expand Up @@ -159,6 +160,16 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public boolean trigger(CodeGenContext context) throws CodeGenException {
.start();
int resultCode = process.waitFor();
if (resultCode != 0) {
throw new CodeGenException("Failed to generate Java classes from proto file: " + protoFiles);
throw new CodeGenException("Failed to generate Java classes from proto files: " + protoFiles +
" to " + outDir.toAbsolutePath().toString());
}
return true;
}
Expand Down Expand Up @@ -164,15 +165,15 @@ private String osClassifier() throws CodeGenException {
}

private static Path prepareQuarkusGrpcExecutable(AppModel appModel, Path buildDir) throws CodeGenException {
Path pluginPath = findArtifactPath(appModel, "io.quarkus", "quarkus-grpc-protoc-plugin", "", "jar");
Path pluginPath = findArtifactPath(appModel, "io.quarkus", "quarkus-grpc-protoc-plugin", "shaded", "jar");
if (pluginPath == null) {
throw new CodeGenException("Failed to find Quarkus gRPC protoc plugin among dependencies");
}

if (OS.determineOS() != OS.WINDOWS) {
return writeScript(buildDir, pluginPath, "#!/bin/sh\n", ".sh");
} else {
return writeScript(buildDir, pluginPath, "", ".cmd");
return writeScript(buildDir, pluginPath, "@echo off\r\n", ".cmd");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ ServiceStartBuildItem build(GrpcServerRecorder recorder, GrpcConfiguration confi
ShutdownContextBuildItem shutdown, List<BindableServiceBuildItem> bindables,
VertxBuildItem vertx) {
if (!bindables.isEmpty()) {
recorder.initializeGrpcServer(vertx.getVertx(), config);
recorder.initializeGrpcServer(vertx.getVertx(), config, shutdown);
return new ServiceStartBuildItem(GRPC_SERVER);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.quarkus.grpc.server.devmode;

import javax.enterprise.context.ApplicationScoped;

import io.grpc.ForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;

@ApplicationScoped
public class DevModeTestInterceptor implements ServerInterceptor {

private volatile String lastStatus = "initial";

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
return serverCallHandler
.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(serverCall) {
@Override
protected ServerCall<ReqT, RespT> delegate() {
lastStatus = getStatus();
return super.delegate();
}
}, metadata);
}

public String getLastStatus() {
return lastStatus;
}

private String getStatus() {
return "status";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.quarkus.grpc.server.devmode;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

/**
* @author Michal Szynkiewicz, [email protected]
* <br>
* Date: 4/28/20
*/
@Path("/test")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public class DevModeTestRestResource {

@Inject
DevModeTestInterceptor interceptor;

@GET
public String get() {
return "testresponse";
}

@GET
@Path("/interceptor-status")
public String getInterceptorStatus() {
return interceptor.getLastStatus();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.quarkus.grpc.server.devmode;

import javax.inject.Singleton;

import devmodetest.v1.Devmodetest;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;

/**
* @author Michal Szynkiewicz, [email protected]
* <br>
* Date: 4/28/20
*/
@Singleton
public class DevModeTestService extends GreeterGrpc.GreeterImplBase {

@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
String greeting = "Hello, ";
String response;
if (request.getName().equals("HACK_TO_GET_STATUS_NUMBER")) {
response = Integer.toString(Devmodetest.DevModeResponse.Status.TEST_ONE.getNumber());
} else {
response = greeting + request.getName();
}
responseObserver.onNext(HelloReply.newBuilder().setMessage(response).build());
responseObserver.onCompleted();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.quarkus.grpc.server.devmode;

import java.time.Duration;

import javax.inject.Singleton;

import com.example.test.MutinyStreamsGrpc;
import com.example.test.StreamsOuterClass.Item;

import io.smallrye.mutiny.Multi;

/**
* @author Michal Szynkiewicz, [email protected]
* <br>
* Date: 4/29/20
*/
@Singleton
public class DevModeTestStreamService extends MutinyStreamsGrpc.StreamsImplBase {

public static final String PREFIX = "echo::";

@Override
public Multi<Item> echo(Multi<Item> request) {
return request.flatMap(value -> Multi.createFrom().ticks().every(Duration.ofMillis(20))
.map(whatever -> Item.newBuilder().setName(PREFIX + value.getName()).build()));
}
}
Loading

0 comments on commit 8464e72

Please sign in to comment.