From 003cfcde3fd3901c1279ba1db3db3a14536248b4 Mon Sep 17 00:00:00 2001 From: larsrc Date: Mon, 18 Jan 2021 08:09:45 -0800 Subject: [PATCH] Allow use of JSON protocol in multiplex workers. RELNOTES: Multiplex persistent workers can now use the JSON protocol. PiperOrigin-RevId: 352415016 --- .../build/lib/worker/WorkerMultiplexer.java | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java index 8b7ce76f78b293..407a0b18c865e5 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java @@ -72,6 +72,8 @@ public class WorkerMultiplexer { * {@code WorkerMultiplexer} gets discarded as well and a new one gets created as needed. */ private Subprocess process; + /** The implementation of the worker protocol (JSON or Proto). */ + private WorkerProtocolImpl workerProtocol; /** InputStream from the worker process. */ private RecordingInputStream recordingStream; /** True if this multiplexer was explicitly destroyed. */ @@ -142,6 +144,18 @@ public synchronized void createProcess(Path workDir) throws IOException { processBuilder.setStderr(logFile.getPathFile()); processBuilder.setEnv(workerKey.getEnv()); this.process = processBuilder.start(); + recordingStream = new RecordingInputStream(process.getInputStream()); + recordingStream.startRecording(4096); + if (workerProtocol == null) { + switch (workerKey.getProtocolFormat()) { + case JSON: + workerProtocol = new JsonWorkerProtocol(process.getOutputStream(), recordingStream); + break; + case PROTO: + workerProtocol = new ProtoWorkerProtocol(process.getOutputStream(), recordingStream); + break; + } + } String id = workerKey.getMnemonic() + "-" + workerKey.hashCode(); // TODO(larsrc): Consider moving sender/receiver threads into separate classes. this.requestSender = @@ -277,8 +291,7 @@ private boolean sendRequest() { return false; } try { - request.writeDelimitedTo(process.getOutputStream()); - process.getOutputStream().flush(); + workerProtocol.putRequest(request); } catch (IOException e) { // We can't know how much of the request was sent, so we have to assume the worker's input // now contains garbage, and this request is lost. @@ -303,11 +316,9 @@ private boolean sendRequest() { * execution cancellation, but only by a call to {@link #destroyProcess()}. */ private boolean readResponse() { - recordingStream = new RecordingInputStream(process.getInputStream()); - recordingStream.startRecording(4096); WorkResponse parsedResponse; try { - parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream); + parsedResponse = workerProtocol.getResponse(); } catch (IOException e) { if (!(e instanceof InterruptedIOException)) { report( @@ -320,7 +331,8 @@ private boolean readResponse() { destroyProcess(); return false; } - // A null parsedResponse can happen if the input stream is closed, in which case we + + // A null parsedResponse can only happen if the input stream is closed, in which case we // drop everything. if (parsedResponse == null) { report(