diff --git a/java/src/org/openqa/selenium/grid/node/local/LocalNode.java b/java/src/org/openqa/selenium/grid/node/local/LocalNode.java index fc5ab455f0439..832732f7a2714 100644 --- a/java/src/org/openqa/selenium/grid/node/local/LocalNode.java +++ b/java/src/org/openqa/selenium/grid/node/local/LocalNode.java @@ -59,9 +59,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -133,7 +135,7 @@ public class LocalNode extends Node implements Closeable { private final int connectionLimitPerSession; private final boolean bidiEnabled; - private final AtomicBoolean drainAfterSessions = new AtomicBoolean(); + private final boolean drainAfterSessions; private final List factories; private final Cache currentSessions; private final Cache uploadsTempFileSystem; @@ -142,6 +144,7 @@ public class LocalNode extends Node implements Closeable { private final AtomicInteger pendingSessions = new AtomicInteger(); private final AtomicInteger sessionCount = new AtomicInteger(); private final Runnable shutdown; + private final ReadWriteLock drainLock = new ReentrantReadWriteLock(); protected LocalNode( Tracer tracer, @@ -177,7 +180,7 @@ protected LocalNode( this.factories = ImmutableList.copyOf(factories); Require.nonNull("Registration secret", registrationSecret); this.configuredSessionCount = drainAfterSessionCount; - this.drainAfterSessions.set(this.configuredSessionCount > 0); + this.drainAfterSessions = this.configuredSessionCount > 0; this.sessionCount.set(drainAfterSessionCount); this.cdpEnabled = cdpEnabled; this.bidiEnabled = bidiEnabled; @@ -443,6 +446,9 @@ public Either newSession( CreateSessionRequest sessionRequest) { Require.nonNull("Session request", sessionRequest); + Lock lock = drainLock.readLock(); + lock.lock(); + try (Span span = tracer.getCurrentContext().createSpan("node.new_session")) { AttributeMap attributeMap = tracer.createAttributeMap(); attributeMap.put(AttributeKey.LOGGER_CLASS.getKey(), getClass().getName()); @@ -455,13 +461,14 @@ public Either newSession( span.setAttribute("current.session.count", currentSessionCount); attributeMap.put("current.session.count", currentSessionCount); - if (getCurrentSessionCount() >= maxSessionCount) { + if (currentSessionCount >= maxSessionCount) { span.setAttribute(AttributeKey.ERROR.getKey(), true); span.setStatus(Status.RESOURCE_EXHAUSTED); attributeMap.put("max.session.count", maxSessionCount); span.addEvent("Max session count reached", attributeMap); return Either.left(new RetrySessionRequestException("Max session count reached.")); } + if (isDraining()) { span.setStatus( Status.UNAVAILABLE.withDescription( @@ -492,6 +499,15 @@ public Either newSession( new RetrySessionRequestException("No slot matched the requested capabilities.")); } + if (!decrementSessionCount()) { + slotToUse.release(); + span.setAttribute(AttributeKey.ERROR.getKey(), true); + span.setStatus(Status.RESOURCE_EXHAUSTED); + attributeMap.put("drain.after.session.count", configuredSessionCount); + span.addEvent("Drain after session count reached", attributeMap); + return Either.left(new RetrySessionRequestException("Drain after session count reached.")); + } + UUID uuidForSessionDownloads = UUID.randomUUID(); Capabilities desiredCapabilities = sessionRequest.getDesiredCapabilities(); if (managedDownloadsRequested(desiredCapabilities)) { @@ -548,6 +564,7 @@ public Either newSession( return Either.left(possibleSession.left()); } } finally { + lock.unlock(); checkSessionCount(); } } @@ -1020,20 +1037,40 @@ public void drain() { } private void checkSessionCount() { - if (this.drainAfterSessions.get()) { + if (this.drainAfterSessions) { + Lock lock = drainLock.writeLock(); + if (!lock.tryLock()) { + // in case we can't get a write lock another thread does hold a read lock and will call + // checkSessionCount as soon as he releases the read lock. So we do not need to wait here + // for the other session to start and release the lock, just continue and let the other + // session start to drain the node. + return; + } + try { + int remainingSessions = this.sessionCount.get(); + if (remainingSessions <= 0) { + LOG.info( + String.format( + "Draining Node, configured sessions value (%s) has been reached.", + this.configuredSessionCount)); + drain(); + } + } finally { + lock.unlock(); + } + } + } + + private boolean decrementSessionCount() { + if (this.drainAfterSessions) { int remainingSessions = this.sessionCount.decrementAndGet(); LOG.log( Debug.getDebugLogLevel(), "{0} remaining sessions before draining Node", remainingSessions); - if (remainingSessions <= 0) { - LOG.info( - String.format( - "Draining Node, configured sessions value (%s) has been reached.", - this.configuredSessionCount)); - drain(); - } + return remainingSessions >= 0; } + return true; } private Map toJson() {