From 35019925656f61f1e4da657a44e7aa96c9a403a7 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Tue, 14 May 2024 09:49:41 +0200 Subject: [PATCH] WebSockets Next: improve the concurrency limiter - minor refactoring + queue counter is only used for debugging --- .../next/runtime/ConcurrencyLimiter.java | 46 ++++++++----------- .../next/runtime/WebSocketConnectionBase.java | 3 ++ 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConcurrencyLimiter.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConcurrencyLimiter.java index 8a690d793ce5de..2e05b106482470 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConcurrencyLimiter.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConcurrencyLimiter.java @@ -25,7 +25,8 @@ class ConcurrencyLimiter { ConcurrencyLimiter(WebSocketConnectionBase connection) { this.connection = connection; this.uncompleted = new AtomicLong(); - this.queueCounter = new AtomicLong(); + // Counter is only used for debugging + this.queueCounter = LOG.isDebugEnabled() ? new AtomicLong() : null; this.queue = Queues.createMpscQueue(); } @@ -51,7 +52,7 @@ void run(Context context, Runnable action) { LOG.debugf("Run action: %s", connection); action.run(); } else { - long queueIndex = queueCounter.incrementAndGet(); + long queueIndex = queueCounter != null ? queueCounter.incrementAndGet() : 0l; LOG.debugf("Action queued as %s: %s", queueIndex, connection); queue.offer(new Action(queueIndex, action, context)); // We need to make sure that at least one completion is in flight @@ -76,18 +77,7 @@ void failure(Throwable t) { try { promise.fail(t); } finally { - if (uncompleted.decrementAndGet() == 0) { - return; - } - Action queuedAction = queue.poll(); - assert queuedAction != null; - LOG.debugf("Run action %s from queue: %s", queuedAction.queueIndex, connection); - queuedAction.context.runOnContext(new Handler() { - @Override - public void handle(Void event) { - queuedAction.runnable.run(); - } - }); + tryNext(); } } @@ -95,19 +85,23 @@ void complete() { try { promise.complete(); } finally { - if (uncompleted.decrementAndGet() == 0) { - return; - } - Action queuedAction = queue.poll(); - assert queuedAction != null; - LOG.debugf("Run action %s from queue: %s", queuedAction.queueIndex, connection); - queuedAction.context.runOnContext(new Handler() { - @Override - public void handle(Void event) { - queuedAction.runnable.run(); - } - }); + tryNext(); + } + } + + private void tryNext() { + if (uncompleted.decrementAndGet() == 0) { + return; } + Action queuedAction = queue.poll(); + assert queuedAction != null; + LOG.debugf("Run action %s from queue: %s", queuedAction.queueIndex, connection); + queuedAction.context.runOnContext(new Handler() { + @Override + public void handle(Void event) { + queuedAction.runnable.run(); + } + }); } } diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java index 0887228169bafa..66f201826bd474 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java @@ -93,6 +93,9 @@ public Uni close() { } public Uni close(CloseReason reason) { + if (isClosed()) { + throw new IllegalStateException("Connection already closed: " + toString()); + } return UniHelper.toUni(webSocket().close((short) reason.getCode(), reason.getMessage())); }