Skip to content

Commit

Permalink
Improve the daml-rpc and daml-tp parallel behavior
Browse files Browse the repository at this point in the history
This puts a guard in place so that the daml-rpc doesn't
submit more transactions than it can reasonably expect
responses from.  This is determined dynamically based on
the number of cores/threads available.

This also improves the responsiveness of the ZmqStream
dramatically, which in turn improves performance
quite a bit.

Signed-off-by: Kevin O'Donnell <[email protected]>
  • Loading branch information
scealiontach committed Dec 13, 2019
1 parent 0a7fe83 commit 3ffc629
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,26 @@ public final Message receive(final long timeout) throws TimeoutException {
return result.getMessage();
}


/**
* Get a message that has been received. If the timeout is expired, return null.
* Also put the resolution of the timer down to millis.
* @param timeout time to wait for a message.
* @return result, a protobuf Message
*/
public final Message receiveNoException(final long timeout) {
SendReceiveThread.MessageWrapper result = null;
try {
result = this.receiveQueue.poll(timeout, TimeUnit.MILLISECONDS);
if (result == null) {
return null;
}
} catch (InterruptedException ie) {
return null;
}
return result.getMessage();

}
/**
* generate a random String, to correlate sent messages. with futures
* @return a random String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;

import com.blockchaintp.sawtooth.daml.messaging.ZmqStream;
Expand Down Expand Up @@ -95,6 +98,18 @@ public final class SawtoothWriteService implements WriteService {

private ExecutorService watchThreadPool = Executors.newWorkStealingPool();

/**
* Lock associated with the Condition.
*/
private final Lock lock = new ReentrantLock();

/**
* Condition to wait for setup to happen.
*/
private final Condition condition = lock.newCondition();

private int outstandingBatches = 0;

/**
* Construct a SawtoothWriteService instance from a concrete stream.
*
Expand Down Expand Up @@ -229,7 +244,6 @@ private Map.Entry<String, ClientBatchSubmitResponse.Status> submissionToResponse

private Future sendToValidator(final Batch batch) {
// Push to TraceTransaction class

try {
Printer includingDefaultValueFields = JsonFormat.printer().preservingProtoFieldNames()
.includingDefaultValueFields();
Expand All @@ -239,15 +253,21 @@ private Future sendToValidator(final Batch batch) {
// Can't do anything so not passing information to tracer
}

lock.lock();

while (this.outstandingBatches > Runtime.getRuntime().availableProcessors()) {
this.condition.awaitUninterruptibly();
}
LOGGER.info(String.format("Batch submission %s", batch.getHeaderSignature()));
ClientBatchSubmitRequest cbsReq = ClientBatchSubmitRequest.newBuilder().addBatches(batch).build();
Future streamToValidator = this.stream.send(Message.MessageType.CLIENT_BATCH_SUBMIT_REQUEST,
cbsReq.toByteString());
Future streamToValidator = this.stream.send(Message.MessageType.CLIENT_BATCH_SUBMIT_REQUEST, cbsReq.toByteString());
this.outstandingBatches++;
this.lock.unlock();
return streamToValidator;
}

private CompletionStage<Map.Entry<String, ClientBatchSubmitResponse.Status>> waitForSubmitResponse(
final Batch batch, final Future streamToValidator) {
private CompletionStage<Map.Entry<String, ClientBatchSubmitResponse.Status>> waitForSubmitResponse(final Batch batch,
final Future streamToValidator) {
return CompletableFuture.supplyAsync(() -> {
try {
return submissionToResponse(batch.getHeaderSignature(), streamToValidator);
Expand All @@ -260,6 +280,10 @@ private CompletionStage<Map.Entry<String, ClientBatchSubmitResponse.Status>> wai

private SubmissionResult batchSubmitToSubmissionResult(
final Map.Entry<String, ClientBatchSubmitResponse.Status> submission) {
lock.lock();
this.outstandingBatches--;
this.condition.signalAll();
lock.unlock();
switch (submission.getValue()) {
case OK:
return new SubmissionResult.Acknowledged$();
Expand Down Expand Up @@ -301,18 +325,29 @@ private Map.Entry<String, ClientBatchStatus.Status> checkBatchWaitForTerminal(
consolidatedStatus = thisStatus.getStatus();
}
}
this.lock.lock();
switch (consolidatedStatus) {
case COMMITTED:
this.outstandingBatches--;
this.condition.signalAll();
lock.unlock();
return Map.entry(batchid, ClientBatchStatus.Status.COMMITTED);
case INVALID:
this.outstandingBatches--;
this.condition.signalAll();
lock.unlock();
return Map.entry(batchid, ClientBatchStatus.Status.INVALID);
case UNKNOWN:
case PENDING:
default:
}
} catch (InvalidProtocolBufferException | InterruptedException | ValidatorConnectionError e) {
this.outstandingBatches--;
this.condition.signalAll();
lock.unlock();
return Map.entry(batchid, ClientBatchStatus.Status.UNKNOWN);
}
this.lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -27,7 +25,6 @@
import com.google.protobuf.InvalidProtocolBufferException;

import sawtooth.sdk.messaging.Future;
import sawtooth.sdk.messaging.Stream;
import sawtooth.sdk.processor.Context;
import sawtooth.sdk.processor.StreamContext;
import sawtooth.sdk.processor.TransactionHandler;
Expand All @@ -45,6 +42,8 @@
*/
public class MTTransactionProcessor implements Runnable {

private static final int LOG_METRICS_INTERVAL = 1000;

private static final int DEFAULT_MAX_THREADS = 10;

private static final Logger LOGGER = Logger.getLogger(MTTransactionProcessor.class.getName());
Expand All @@ -53,13 +52,14 @@ public class MTTransactionProcessor implements Runnable {

private BlockingQueue<Map.Entry<String, TpProcessResponse>> outQueue;

private Stream stream;
private ZmqStream stream;

private ExecutorService executor;

/**
* Constructs a MTTransactionProcessor utilizing the given transaction handler.
* NOTE: The TransactionHandler.apply() method must be thread-safe.
*
* @param txHandler The handler to apply to this processor
* @param address the address of the ZMQ stream
*/
Expand All @@ -73,62 +73,63 @@ public MTTransactionProcessor(final TransactionHandler txHandler, final String a
@Override
public final void run() {
boolean stopped = false;
try {
this.register();
int outStandingTx = 0;
while (!stopped) {
int enqueueCount = 0;
int dequeueCount = 0;
try {
Message inMessage = this.stream.receive(1);
while (inMessage != null) {
if (inMessage.getMessageType() == Message.MessageType.PING_REQUEST) {
LOGGER.fine("Recieved Ping Message.");
PingResponse pingResponse = PingResponse.newBuilder().build();
this.stream.sendBack(Message.MessageType.PING_RESPONSE, inMessage.getCorrelationId(),
pingResponse.toByteString());
} else {
Runnable processMessage = new ProcessRunnable(inMessage, this.handler, this.outQueue);
this.executor.submit(processMessage);
enqueueCount++;
inMessage = this.stream.receive(1);
}
}
if (inMessage == null) {
// Then the validator has disconnected and we need to reregister.
this.register();
}
} catch (TimeoutException e) {
LOGGER.log(Level.FINER, "Nothing to process on this iteration.");
}
Map.Entry<String, TpProcessResponse> outPair = outQueue.poll(1, TimeUnit.MILLISECONDS);
while (outPair != null) {
this.stream.sendBack(Message.MessageType.TP_PROCESS_REQUEST, outPair.getKey(),
outPair.getValue().toByteString());
dequeueCount++;
outPair = outQueue.poll(1, TimeUnit.MILLISECONDS);
}
outStandingTx += enqueueCount;
outStandingTx -= dequeueCount;
if (enqueueCount > 0 || dequeueCount > 0 || outStandingTx > 0) {
LOGGER.info(String.format("Enqueued %s transactions, Dequeued %s responses, outStanding tx=%s", enqueueCount,
dequeueCount, outStandingTx));
}
this.register();
long outStandingTx = 0;
long enqueueCount = 0;
long dequeueCount = 0;
while (!stopped) {
Message inMessage = this.stream.receiveNoException(1);
while (inMessage != null) {
enqueueCount += handleInbound(inMessage);
Map.Entry<String, TpProcessResponse> outPair = outQueue.poll();
dequeueCount += handleOutbound(outPair);
inMessage = this.stream.receiveNoException(1);
}
Map.Entry<String, TpProcessResponse> outPair = outQueue.poll();
while (outPair != null) {
dequeueCount += handleOutbound(outPair);
outPair = outQueue.poll();
}
outStandingTx = enqueueCount - dequeueCount;
if (enqueueCount % LOG_METRICS_INTERVAL == 0) {
LOGGER.info(String.format("Enqueued %s transactions, Dequeued %s responses, outStanding tx=%s", enqueueCount,
dequeueCount, outStandingTx));
}
} catch (InterruptedException e) {
LOGGER.info("Processor interrupted, shutting down");
}

}

private int handleOutbound(final Map.Entry<String, TpProcessResponse> outPair) {
if (outPair != null) {
this.stream.sendBack(Message.MessageType.TP_PROCESS_REQUEST, outPair.getKey(),
outPair.getValue().toByteString());
return 1;
}
return 0;
}

private int handleInbound(final Message inMessage) {
if (inMessage.getMessageType() == Message.MessageType.PING_REQUEST) {
LOGGER.fine("Recieved Ping Message.");
PingResponse pingResponse = PingResponse.newBuilder().build();
this.stream.sendBack(Message.MessageType.PING_RESPONSE, inMessage.getCorrelationId(),
pingResponse.toByteString());
return 0;
} else {
Runnable processMessage = new ProcessRunnable(inMessage, this.handler, this.outQueue);
this.executor.submit(processMessage);
return 1;
}
}

private void register() {
LOGGER.info("Registering TP");
boolean registered = false;
while (!registered) {
try {
TpRegisterRequest registerRequest = TpRegisterRequest.newBuilder()
.setFamily(this.handler.transactionFamilyName()).addAllNamespaces(this.handler.getNameSpaces())
.setVersion(this.handler.getVersion()).setMaxOccupancy(2).build();
.setVersion(this.handler.getVersion()).setMaxOccupancy(Runtime.getRuntime().availableProcessors()).build();
Future fut = this.stream.send(Message.MessageType.TP_REGISTER_REQUEST, registerRequest.toByteString());
fut.getResult();
registered = true;
Expand Down

0 comments on commit 3ffc629

Please sign in to comment.