Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[grid] ensure --drain-after-session-count is respected with a lot of sessions in the queue #14987

Merged
merged 1 commit into from
Dec 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 49 additions & 12 deletions java/src/org/openqa/selenium/grid/node/local/LocalNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -133,7 +135,7 @@ public class LocalNode extends Node implements Closeable {
private final int connectionLimitPerSession;

private final boolean bidiEnabled;
private final AtomicBoolean drainAfterSessions = new AtomicBoolean();
private final boolean drainAfterSessions;
private final List<SessionSlot> factories;
private final Cache<SessionId, SessionSlot> currentSessions;
private final Cache<SessionId, TemporaryFilesystem> uploadsTempFileSystem;
Expand All @@ -142,6 +144,7 @@ public class LocalNode extends Node implements Closeable {
private final AtomicInteger pendingSessions = new AtomicInteger();
private final AtomicInteger sessionCount = new AtomicInteger();
private final Runnable shutdown;
private final ReadWriteLock drainLock = new ReentrantReadWriteLock();

protected LocalNode(
Tracer tracer,
Expand Down Expand Up @@ -177,7 +180,7 @@ protected LocalNode(
this.factories = ImmutableList.copyOf(factories);
Require.nonNull("Registration secret", registrationSecret);
this.configuredSessionCount = drainAfterSessionCount;
this.drainAfterSessions.set(this.configuredSessionCount > 0);
this.drainAfterSessions = this.configuredSessionCount > 0;
this.sessionCount.set(drainAfterSessionCount);
this.cdpEnabled = cdpEnabled;
this.bidiEnabled = bidiEnabled;
Expand Down Expand Up @@ -443,6 +446,9 @@ public Either<WebDriverException, CreateSessionResponse> newSession(
CreateSessionRequest sessionRequest) {
Require.nonNull("Session request", sessionRequest);

Lock lock = drainLock.readLock();
lock.lock();

try (Span span = tracer.getCurrentContext().createSpan("node.new_session")) {
AttributeMap attributeMap = tracer.createAttributeMap();
attributeMap.put(AttributeKey.LOGGER_CLASS.getKey(), getClass().getName());
Expand All @@ -455,13 +461,14 @@ public Either<WebDriverException, CreateSessionResponse> newSession(
span.setAttribute("current.session.count", currentSessionCount);
attributeMap.put("current.session.count", currentSessionCount);

if (getCurrentSessionCount() >= maxSessionCount) {
if (currentSessionCount >= maxSessionCount) {
span.setAttribute(AttributeKey.ERROR.getKey(), true);
span.setStatus(Status.RESOURCE_EXHAUSTED);
attributeMap.put("max.session.count", maxSessionCount);
span.addEvent("Max session count reached", attributeMap);
return Either.left(new RetrySessionRequestException("Max session count reached."));
}

if (isDraining()) {
span.setStatus(
Status.UNAVAILABLE.withDescription(
Expand Down Expand Up @@ -492,6 +499,15 @@ public Either<WebDriverException, CreateSessionResponse> newSession(
new RetrySessionRequestException("No slot matched the requested capabilities."));
}

if (!decrementSessionCount()) {
slotToUse.release();
span.setAttribute(AttributeKey.ERROR.getKey(), true);
span.setStatus(Status.RESOURCE_EXHAUSTED);
attributeMap.put("drain.after.session.count", configuredSessionCount);
span.addEvent("Drain after session count reached", attributeMap);
return Either.left(new RetrySessionRequestException("Drain after session count reached."));
}

UUID uuidForSessionDownloads = UUID.randomUUID();
Capabilities desiredCapabilities = sessionRequest.getDesiredCapabilities();
if (managedDownloadsRequested(desiredCapabilities)) {
Expand Down Expand Up @@ -548,6 +564,7 @@ public Either<WebDriverException, CreateSessionResponse> newSession(
return Either.left(possibleSession.left());
}
} finally {
lock.unlock();
checkSessionCount();
}
}
Expand Down Expand Up @@ -1020,20 +1037,40 @@ public void drain() {
}

private void checkSessionCount() {
if (this.drainAfterSessions.get()) {
if (this.drainAfterSessions) {
Lock lock = drainLock.writeLock();
if (!lock.tryLock()) {
// in case we can't get a write lock another thread does hold a read lock and will call
// checkSessionCount as soon as he releases the read lock. So we do not need to wait here
// for the other session to start and release the lock, just continue and let the other
// session start to drain the node.
return;
}
try {
int remainingSessions = this.sessionCount.get();
if (remainingSessions <= 0) {
LOG.info(
String.format(
"Draining Node, configured sessions value (%s) has been reached.",
this.configuredSessionCount));
drain();
}
} finally {
lock.unlock();
}
}
}

private boolean decrementSessionCount() {
if (this.drainAfterSessions) {
int remainingSessions = this.sessionCount.decrementAndGet();
LOG.log(
Debug.getDebugLogLevel(),
"{0} remaining sessions before draining Node",
remainingSessions);
if (remainingSessions <= 0) {
LOG.info(
String.format(
"Draining Node, configured sessions value (%s) has been reached.",
this.configuredSessionCount));
drain();
}
return remainingSessions >= 0;
}
return true;
}

private Map<String, Object> toJson() {
Expand Down
Loading