Skip to content

Commit

Permalink
Update ResponseQueueHanlder to handle multiple requests sequentially (#…
Browse files Browse the repository at this point in the history
…1503)

This update ensures that `ResponseQueueHanlder` counts requests and dispatches responses when request count is greater than 0.
  • Loading branch information
injectives authored Nov 2, 2023
1 parent 99d32cb commit fbdd1ad
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,28 @@
public class ResponseQueueHanlder {
private final Consumer<TestkitResponse> responseWriter;
private final Queue<TestkitResponse> responseQueue = new ArrayDeque<>();
private boolean responseReady;
private int requestCount;

ResponseQueueHanlder(Consumer<TestkitResponse> responseWriter) {
this.responseWriter = responseWriter;
}

public synchronized void setResponseReadyAndDispatchFirst() {
responseReady = true;
dispatchFirst();
public synchronized void increaseRequestCountAndDispatchFirstResponse() {
requestCount++;
dispatchFirstResponse();
}

public synchronized void offerAndDispatchFirst(TestkitResponse response) {
public synchronized void offerAndDispatchFirstResponse(TestkitResponse response) {
responseQueue.offer(response);
if (responseReady) {
dispatchFirst();
if (requestCount > 0) {
dispatchFirstResponse();
}
}

private synchronized void dispatchFirst() {
private synchronized void dispatchFirstResponse() {
var response = responseQueue.poll();
if (response != null) {
responseReady = false;
requestCount--;
responseWriter.accept(response);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
.exceptionally(this::createErrorResponse)
.whenComplete((response, ignored) -> {
if (response != null) {
responseQueueHanlder.offerAndDispatchFirst(response);
responseQueueHanlder.offerAndDispatchFirstResponse(response);
}
});
} catch (Throwable throwable) {
Expand All @@ -106,7 +106,7 @@ private static CompletionStage<TestkitResponse> wrapSyncRequest(
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
var response = createErrorResponse(cause);
responseQueueHanlder.offerAndDispatchFirst(response);
responseQueueHanlder.offerAndDispatchFirstResponse(response);
}

private TestkitResponse createErrorResponse(Throwable throwable) {
Expand Down Expand Up @@ -170,7 +170,7 @@ private void writeAndFlush(TestkitResponse response) {
if (channel == null) {
throw new IllegalStateException("Called before channel is initialized");
}
responseQueueHanlder.offerAndDispatchFirst(response);
responseQueueHanlder.offerAndDispatchFirstResponse(response);
}

public enum BackendMode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public TestkitRequestResponseMapperHandler(Logging logging, ResponseQueueHanlder
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
var testkitMessage = (String) msg;
log.debug("Inbound Testkit message '%s'", testkitMessage.trim());
responseQueueHanlder.setResponseReadyAndDispatchFirst();
responseQueueHanlder.increaseRequestCountAndDispatchFirstResponse();
var testkitRequest = objectMapper.readValue(testkitMessage, TestkitRequest.class);
ctx.fireChannelRead(testkitRequest);
}
Expand Down

0 comments on commit fbdd1ad

Please sign in to comment.