[grid] External datastore Redis-backed for Session Queue
Signed-off-by: Viet Nguyen Duc <nguyenducviet4496@gmail.com>
diff --git a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java
index 835150e..ef93904 100644
--- a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java
+++ b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java
@@ -38,6 +38,7 @@
import java.io.UncheckedIOException;
import java.net.URI;
import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -248,13 +249,15 @@ public LocalDistributor(
this.healthcheckInterval.toMillis(),
TimeUnit.MILLISECONDS);
- // if sessionRequestRetryInterval is 0, we will schedule session creation every 10 millis
+ // Default to 100ms if no interval is specified (was 10ms)
long period =
- sessionRequestRetryInterval.isZero() ? 10 : sessionRequestRetryInterval.toMillis();
- newSessionService.scheduleAtFixedRate(
+ sessionRequestRetryInterval.isZero() ? 100 : sessionRequestRetryInterval.toMillis();
+
+ // Use scheduleWithFixedDelay instead of scheduleAtFixedRate to prevent task pileup
+ newSessionService.scheduleWithFixedDelay(
GuardedRunnable.guard(newSessionRunnable),
- sessionRequestRetryInterval.toMillis(),
- period,
+ period, // Initial delay
+ period, // Subsequent delays
TimeUnit.MILLISECONDS);
new JMXHelper().register(this);
@@ -771,12 +774,47 @@ public void close() {
}
private class NewSessionRunnable implements Runnable {
+ private long backoffMs = 100; // Start with 100ms backoff
+ private static final long MAX_BACKOFF_MS = 5000; // Max 5 seconds backoff
+ private static final long MIN_BACKOFF_MS = 100; // Min 100ms backoff
+ private Instant lastNodeAvailableCheck = Instant.MIN;
+ private boolean hadNodesLastCheck = false;
@Override
public void run() {
Set<RequestId> inQueue;
boolean pollQueue;
+ // Check if we have any available nodes
+ boolean hasNodes = !getAvailableNodes().isEmpty();
+
+ // If we had nodes before but don't now, or vice versa, reset the backoff
+ if (hasNodes != hadNodesLastCheck
+ || Duration.between(lastNodeAvailableCheck, Instant.now()).toMillis() > 5000) {
+ backoffMs = MIN_BACKOFF_MS;
+ }
+
+ hadNodesLastCheck = hasNodes;
+ lastNodeAvailableCheck = Instant.now();
+
+ // If no nodes available, apply backoff before proceeding
+ if (!hasNodes) {
+ try {
+ // Add some jitter to prevent thundering herd
+ long jitter = (long) (Math.random() * backoffMs * 0.1); // Up to 10% jitter
+ Thread.sleep(backoffMs + jitter);
+
+ // Double the backoff for next time, up to the max
+ backoffMs = Math.min(backoffMs * 2, MAX_BACKOFF_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ } else {
+ // Reset backoff when we have nodes
+ backoffMs = MIN_BACKOFF_MS;
+ }
+
if (rejectUnsupportedCaps) {
inQueue =
sessionQueue.getQueueContents().stream()
@@ -801,9 +839,21 @@ public void run() {
Collectors.groupingBy(ImmutableCapabilities::copyOf, Collectors.counting()));
if (!stereotypes.isEmpty()) {
- List<SessionRequest> matchingRequests = sessionQueue.getNextAvailable(stereotypes);
- matchingRequests.forEach(
- req -> sessionCreatorExecutor.execute(() -> handleNewSessionRequest(req)));
+ try {
+ List<SessionRequest> matchingRequests = sessionQueue.getNextAvailable(stereotypes);
+ if (!matchingRequests.isEmpty()) {
+ // Process requests in batch
+ matchingRequests.forEach(
+ req -> sessionCreatorExecutor.execute(() -> handleNewSessionRequest(req)));
+ } else if (backoffMs < MAX_BACKOFF_MS) {
+ // If we didn't get any requests, increase backoff slightly
+ backoffMs = Math.min((long) (backoffMs * 1.5), MAX_BACKOFF_MS);
+ }
+ } catch (Exception e) {
+ LOG.log(Level.SEVERE, "Error processing session requests", e);
+ // On error, back off more aggressively
+ backoffMs = Math.min(backoffMs * 2, MAX_BACKOFF_MS);
+ }
}
}
diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java b/java/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java
index ef6e74e..8e2c8f2 100644
--- a/java/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java
+++ b/java/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java
@@ -50,6 +50,12 @@ public NewSessionQueueOptions(Config config) {
public URI getSessionQueueUri() {
+ BaseServerOptions serverOptions = new BaseServerOptions(config);
+ String scheme =
+ config
+ .get(SESSION_QUEUE_SECTION, "scheme")
+ .orElse((serverOptions.isSecure() || serverOptions.isSelfSigned()) ? "https" : "http");
+
Optional<URI> host =
config
.get(SESSION_QUEUE_SECTION, "host")
@@ -72,8 +78,6 @@ public URI getSessionQueueUri() {
return host.get();
}
- BaseServerOptions serverOptions = new BaseServerOptions(config);
- String schema = (serverOptions.isSecure() || serverOptions.isSelfSigned()) ? "https" : "http";
Optional<Integer> port = config.getInt(SESSION_QUEUE_SECTION, "port");
Optional<String> hostname = config.get(SESSION_QUEUE_SECTION, "hostname");
@@ -82,7 +86,7 @@ public URI getSessionQueueUri() {
}
try {
- return new URI(schema, null, hostname.get(), port.get(), "", null, null);
+ return new URI(scheme, null, hostname.get(), port.get(), "", null, null);
} catch (URISyntaxException e) {
throw new ConfigException(
"Session queue server uri configured through host (%s) and port (%d) is not a valid URI",
diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel b/java/src/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel
new file mode 100644
index 0000000..d87d585
--- /dev/null
+++ b/java/src/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel
@@ -0,0 +1,30 @@
+load("@rules_jvm_external//:defs.bzl", "artifact")
+load("//java:defs.bzl", "java_export")
+load("//java:version.bzl", "SE_VERSION")
+
+java_export(
+ name = "redis",
+ srcs = glob(["*.java"]),
+ maven_coordinates = "org.seleniumhq.selenium:selenium-session-queue-redis:%s" % SE_VERSION,
+ pom_template = "//java/src/org/openqa/selenium:template-pom",
+ tags = [
+ "release-artifact",
+ ],
+ visibility = [
+ "//visibility:public",
+ ],
+ exports = [
+ "//java/src/org/openqa/selenium/grid",
+ ],
+ deps = [
+ "//java:auto-service",
+ "//java/src/org/openqa/selenium/grid",
+ "//java/src/org/openqa/selenium/json",
+ "//java/src/org/openqa/selenium/redis",
+ "//java/src/org/openqa/selenium/remote",
+ artifact("com.beust:jcommander"),
+ artifact("com.google.guava:guava"),
+ artifact("io.lettuce:lettuce-core"),
+ artifact("org.redisson:redisson"),
+ ],
+)
diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueue.java b/java/src/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueue.java
new file mode 100644
index 0000000..76a4733
--- /dev/null
+++ b/java/src/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueue.java
@@ -0,0 +1,851 @@
+// Licensed to the Software Freedom Conservancy (SFC) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The SFC licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.openqa.selenium.grid.sessionqueue.redis;
+
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openqa.selenium.concurrent.ExecutorServices.shutdownGracefully;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import java.io.Closeable;
+import java.net.URI;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import org.openqa.selenium.Capabilities;
+import org.openqa.selenium.SessionNotCreatedException;
+import org.openqa.selenium.concurrent.GuardedRunnable;
+import org.openqa.selenium.grid.config.Config;
+import org.openqa.selenium.grid.data.CreateSessionResponse;
+import org.openqa.selenium.grid.data.RequestId;
+import org.openqa.selenium.grid.data.SessionRequest;
+import org.openqa.selenium.grid.data.SessionRequestCapability;
+import org.openqa.selenium.grid.data.SlotMatcher;
+import org.openqa.selenium.grid.data.TraceSessionRequest;
+import org.openqa.selenium.grid.distributor.config.DistributorOptions;
+import org.openqa.selenium.grid.jmx.JMXHelper;
+import org.openqa.selenium.grid.jmx.ManagedAttribute;
+import org.openqa.selenium.grid.jmx.ManagedService;
+import org.openqa.selenium.grid.log.LoggingOptions;
+import org.openqa.selenium.grid.security.Secret;
+import org.openqa.selenium.grid.security.SecretOptions;
+import org.openqa.selenium.grid.sessionqueue.NewSessionQueue;
+import org.openqa.selenium.grid.sessionqueue.config.NewSessionQueueOptions;
+import org.openqa.selenium.internal.Either;
+import org.openqa.selenium.internal.Require;
+import org.openqa.selenium.json.Json;
+import org.openqa.selenium.redis.GridRedisClient;
+import org.openqa.selenium.remote.http.Contents;
+import org.openqa.selenium.remote.http.HttpResponse;
+import org.openqa.selenium.remote.tracing.Span;
+import org.openqa.selenium.remote.tracing.TraceContext;
+import org.openqa.selenium.remote.tracing.Tracer;
+
+/**
+ * A Redis-backed implementation of the list of new session requests.
+ *
+ * <p>The lifecycle of a request can be described as:
+ *
+ * <ol>
+ * <li>User adds an item on to the queue using {@link #addToQueue(SessionRequest)}. This will
+ * block until the request completes in some way.
+ * <li>If the session request is completed, then {@link #complete(RequestId, Either)} must be
+ * called. This will ensure that {@link #addToQueue(SessionRequest)} returns.
+ * <li>If the request cannot be handled right now, call {@link #retryAddToQueue(SessionRequest)}
+ * to return the session request to the front of the queue.
+ * </ol>
+ *
+ * <p>There is a background thread that will reap {@link SessionRequest}s that have timed out. This
+ * means that a request can either complete by a listener calling {@link #complete(RequestId,
+ * Either)} directly, or by being reaped by the thread.
+ *
+ * <p>Redis persistence ensures that session requests survive restarts and can be processed by
+ * multiple Grid instances.
+ */
+@ManagedService(
+ objectName = "org.seleniumhq.grid:type=SessionQueue,name=RedisBackedSessionQueue",
+ description = "Redis backed session queue")
+public class RedisBackedSessionQueue extends NewSessionQueue implements Closeable {
+
+ private static final Logger LOG = Logger.getLogger(RedisBackedSessionQueue.class.getName());
+ private static final String NAME = "Redis Backed New Session Queue";
+ private static final Json JSON = new Json();
+
+ // Redis keys
+ private static final String QUEUE_KEY = "selenium:session:queue";
+ private static final String REQUEST_KEY_PREFIX = "selenium:session:request:";
+ private static final String DATA_KEY_PREFIX = "selenium:session:data:";
+ private static final String CONTEXT_KEY_PREFIX = "selenium:session:context:";
+
+ private final SlotMatcher slotMatcher;
+ private final GridRedisClient redisClient;
+ private final URI sessionQueueUri;
+ private final Duration requestTimeout;
+ private final Duration maximumResponseDelay;
+ private final int batchSize;
+
+ // In-memory state management (mirrors LocalNewSessionQueue)
+ private final Map<RequestId, Data> requests;
+ private final Map<RequestId, TraceContext> contexts;
+ private final Deque<SessionRequest> queue;
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private final ScheduledExecutorService service =
+ Executors.newSingleThreadScheduledExecutor(
+ r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName(NAME);
+ return thread;
+ });
+
+ public RedisBackedSessionQueue(
+ Tracer tracer,
+ SlotMatcher slotMatcher,
+ URI sessionQueueUri,
+ Duration requestTimeoutCheck,
+ Duration requestTimeout,
+ Duration maximumResponseDelay,
+ Secret registrationSecret,
+ int batchSize) {
+ super(tracer, registrationSecret);
+
+ this.slotMatcher = Require.nonNull("Slot matcher", slotMatcher);
+ this.sessionQueueUri = Require.nonNull("Redis URI", sessionQueueUri);
+ this.redisClient = new GridRedisClient(sessionQueueUri);
+
+ Require.nonNegative("Retry period", requestTimeoutCheck);
+ this.requestTimeout = Require.positive("Request timeout", requestTimeout);
+ this.maximumResponseDelay = Require.positive("Maximum response delay", maximumResponseDelay);
+ this.batchSize = Require.positive("Batch size", batchSize);
+
+ this.requests = new ConcurrentHashMap<>();
+ this.queue = new ConcurrentLinkedDeque<>();
+ this.contexts = new ConcurrentHashMap<>();
+
+ // Restore state from Redis on startup
+ restoreStateFromRedis();
+
+ service.scheduleAtFixedRate(
+ GuardedRunnable.guard(this::timeoutSessions),
+ requestTimeoutCheck.toMillis(),
+ requestTimeoutCheck.toMillis(),
+ MILLISECONDS);
+
+ new JMXHelper().register(this);
+ }
+
+ public static NewSessionQueue create(Config config) {
+ LoggingOptions loggingOptions = new LoggingOptions(config);
+ Tracer tracer = loggingOptions.getTracer();
+
+ NewSessionQueueOptions newSessionQueueOptions = new NewSessionQueueOptions(config);
+ SecretOptions secretOptions = new SecretOptions(config);
+
+ // Use the factory to create a SlotMatcher to avoid circular dependencies
+ SlotMatcher slotMatcher = new DistributorOptions(config).getSlotMatcher();
+
+ return new RedisBackedSessionQueue(
+ tracer,
+ slotMatcher,
+ newSessionQueueOptions.getSessionQueueUri(),
+ newSessionQueueOptions.getSessionRequestTimeoutPeriod(),
+ newSessionQueueOptions.getSessionRequestTimeout(),
+ newSessionQueueOptions.getMaximumResponseDelay(),
+ secretOptions.getRegistrationSecret(),
+ newSessionQueueOptions.getBatchSize());
+ }
+
+ /** Restores in-memory state from Redis on startup */
+ private void restoreStateFromRedis() {
+ try {
+ String queueData = redisClient.get(QUEUE_KEY);
+ LOG.info(
+ "[RedisBackedSessionQueue.restoreStateFromRedis] Raw queue data from Redis: ["
+ + queueData
+ + "]");
+ if (queueData != null && !queueData.isEmpty()) {
+ String[] requestIdArray = queueData.split(",");
+ for (String requestIdStr : requestIdArray) {
+ if (requestIdStr.trim().isEmpty()) continue;
+ try {
+ RequestId requestId = new RequestId(java.util.UUID.fromString(requestIdStr.trim()));
+ String requestKey = REQUEST_KEY_PREFIX + requestIdStr.trim();
+ String dataKey = DATA_KEY_PREFIX + requestIdStr.trim();
+ String requestJson = redisClient.get(requestKey);
+ String dataJson = redisClient.get(dataKey);
+ LOG.info(
+ "[restoreStateFromRedis] Read from Redis: requestKey="
+ + requestKey
+ + ", requestJson="
+ + requestJson);
+ LOG.info(
+ "[restoreStateFromRedis] Read from Redis: dataKey="
+ + dataKey
+ + ", dataJson="
+ + dataJson);
+ if (requestJson != null && dataJson != null) {
+ SessionRequest request = JSON.toType(requestJson, SessionRequest.class);
+ Data data = JSON.toType(dataJson, Data.class); // Deserialize full Data object
+ requests.put(requestId, data);
+ queue.add(request);
+ if (isTimedOut(Instant.now(), data)) {
+ failDueToTimeout(requestId);
+ }
+ }
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to restore request from Redis: " + requestIdStr, e);
+ cleanupRedisRequest(requestIdStr.trim());
+ }
+ }
+ }
+ LOG.info("Restored " + requests.size() + " session requests from Redis");
+ } catch (Exception e) {
+ LOG.log(Level.SEVERE, "Failed to restore state from Redis", e);
+ }
+ }
+
+ /** Persists request state to Redis asynchronously */
+ private void persistToRedis(SessionRequest request, Data data) {
+ String requestKey = REQUEST_KEY_PREFIX + request.getRequestId();
+ String dataKey = DATA_KEY_PREFIX + request.getRequestId();
+ String requestJson = JSON.toJson(request);
+ String dataJson = JSON.toJson(data);
+
+ // Log what we're about to write
+ LOG.info(
+ "[persistToRedis] Writing to Redis: requestKey="
+ + requestKey
+ + ", requestJson="
+ + requestJson);
+ LOG.info("[persistToRedis] Writing to Redis: dataKey=" + dataKey + ", dataJson=" + dataJson);
+
+ // Use mset to write both keys at once
+ Map<String, String> keyValues = new HashMap<>();
+ keyValues.put(requestKey, requestJson);
+ keyValues.put(dataKey, dataJson);
+ redisClient.mset(keyValues);
+
+ // Update the queue key
+ StringBuilder queueBuilder = new StringBuilder();
+ for (SessionRequest req : queue) {
+ queueBuilder.append(req.getRequestId().toString()).append(",");
+ }
+ String queueString =
+ queueBuilder.length() > 0 ? queueBuilder.substring(0, queueBuilder.length() - 1) : "";
+
+ LOG.info(
+ "[persistToRedis] Writing to Redis: QUEUE_KEY="
+ + QUEUE_KEY
+ + ", queueString="
+ + queueString);
+
+ // Update queue using mset
+ Map<String, String> queueUpdate = new HashMap<>();
+ queueUpdate.put(QUEUE_KEY, queueString);
+ redisClient.mset(queueUpdate);
+ }
+
+ /** Removes request from Redis asynchronously */
+ private void removeFromRedis(RequestId requestId) {
+ service.execute(
+ () -> {
+ try {
+ String requestIdStr = requestId.toString();
+ cleanupRedisRequest(requestIdStr);
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to remove request from Redis: " + requestId, e);
+ }
+ });
+ }
+
+ private void cleanupRedisRequest(String requestIdStr) {
+ try {
+ String requestKey = REQUEST_KEY_PREFIX + requestIdStr;
+ String dataKey = DATA_KEY_PREFIX + requestIdStr;
+ String contextKey = CONTEXT_KEY_PREFIX + requestIdStr;
+
+ // Remove all associated data
+ redisClient.del(requestKey, dataKey, contextKey);
+
+ // Remove from queue list
+ String currentQueue = redisClient.get(QUEUE_KEY);
+ if (currentQueue != null && !currentQueue.isEmpty()) {
+ String[] requestIds = currentQueue.split(",");
+ StringBuilder newQueue = new StringBuilder();
+ for (String id : requestIds) {
+ if (!id.trim().equals(requestIdStr)) {
+ if (newQueue.length() > 0) {
+ newQueue.append(",");
+ }
+ newQueue.append(id.trim());
+ }
+ }
+ Map<String, String> queueUpdate = new HashMap<>();
+ queueUpdate.put(QUEUE_KEY, newQueue.toString());
+ redisClient.mset(queueUpdate);
+ int queueCount = newQueue.toString().isEmpty() ? 0 : newQueue.toString().split(",").length;
+ LOG.info(
+ "[RedisBackedSessionQueue.cleanupRedisRequest] Queue after removal: ["
+ + newQueue
+ + "] ("
+ + queueCount
+ + " requests)");
+ }
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to cleanup Redis request: " + requestIdStr, e);
+ }
+ }
+
+ private void timeoutSessions() {
+ Instant now = Instant.now();
+
+ Lock readLock = lock.readLock();
+ readLock.lock();
+ Set<RequestId> ids;
+ try {
+ ids =
+ requests.entrySet().stream()
+ .filter(
+ entry ->
+ queue.stream()
+ .anyMatch(
+ sessionRequest ->
+ sessionRequest.getRequestId().equals(entry.getKey())))
+ .filter(entry -> isTimedOut(now, entry.getValue()))
+ .map(Map.Entry::getKey)
+ .collect(HashSet::new, Set::add, Set::addAll);
+ } finally {
+ readLock.unlock();
+ }
+ ids.forEach(this::failDueToTimeout);
+ }
+
+ private boolean isTimedOut(Instant now, Data data) {
+ return data.endTime.isBefore(now);
+ }
+
+ @Override
+ public boolean peekEmpty() {
+ Lock readLock = lock.readLock();
+ readLock.lock();
+ try {
+ return requests.isEmpty() && queue.isEmpty();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public HttpResponse addToQueue(SessionRequest request) {
+ Require.nonNull("New session request", request);
+ Require.nonNull("Request id", request.getRequestId());
+
+ TraceContext context = TraceSessionRequest.extract(tracer, request);
+ try (Span ignored = context.createSpan("sessionqueue.add_to_queue")) {
+ contexts.put(request.getRequestId(), context);
+ Data data = injectIntoQueue(request);
+
+ if (isTimedOut(Instant.now(), data)) {
+ failDueToTimeout(request.getRequestId());
+ }
+
+ Either<SessionNotCreatedException, CreateSessionResponse> result;
+ try {
+
+ boolean sessionCreated = data.latch.await(requestTimeout.toMillis(), MILLISECONDS);
+
+ if (sessionCreated) {
+ result = data.getResult();
+ } else {
+ result = Either.left(new SessionNotCreatedException("New session request timed out"));
+ }
+ } catch (InterruptedException e) {
+ // the client will never see the session, ensure the session is disposed
+ data.cancel();
+ Thread.currentThread().interrupt();
+ result =
+ Either.left(new SessionNotCreatedException("Interrupted when creating the session", e));
+ } catch (RuntimeException e) {
+ // the client will never see the session, ensure the session is disposed
+ data.cancel();
+ result =
+ Either.left(
+ new SessionNotCreatedException("An error occurred creating the session", e));
+ }
+
+ Lock writeLock = this.lock.writeLock();
+ if (!writeLock.tryLock()) {
+ writeLock.lock();
+ }
+ try {
+ requests.remove(request.getRequestId());
+ queue.remove(request);
+ contexts.remove(request.getRequestId());
+ } finally {
+ writeLock.unlock();
+ }
+
+ // Clean up from Redis
+ removeFromRedis(request.getRequestId());
+
+ HttpResponse res = new HttpResponse();
+ if (result.isRight()) {
+ res.setContent(Contents.bytes(result.right().getDownstreamEncodedResponse()));
+ } else {
+ res.setStatus(HTTP_INTERNAL_ERROR)
+ .setContent(
+ Contents.asJson(
+ ImmutableMap.of(
+ "value",
+ ImmutableMap.of(
+ "error", "session not created",
+ "message", result.left().getMessage(),
+ "stacktrace", result.left().getStackTrace()))));
+ }
+
+ return res;
+ }
+ }
+
+ @VisibleForTesting
+ Data injectIntoQueue(SessionRequest request) {
+ Require.nonNull("Session request", request);
+
+ Data data = new Data(request.getEnqueued(), requestTimeout);
+
+ Lock writeLock = lock.writeLock();
+ if (!writeLock.tryLock()) {
+ writeLock.lock();
+ }
+ try {
+ requests.put(request.getRequestId(), data);
+ queue.addLast(request);
+ } finally {
+ writeLock.unlock();
+ }
+
+ // Persist to Redis asynchronously
+ persistToRedis(request, data);
+
+ return data;
+ }
+
+ @Override
+ public boolean retryAddToQueue(SessionRequest request) {
+ Require.nonNull("New session request", request);
+
+ boolean added;
+ TraceContext context =
+ contexts.getOrDefault(request.getRequestId(), tracer.getCurrentContext());
+ try (Span ignored = context.createSpan("sessionqueue.retry")) {
+ Lock writeLock = lock.writeLock();
+ if (!writeLock.tryLock()) {
+ writeLock.lock();
+ }
+ try {
+ if (!requests.containsKey(request.getRequestId())) {
+ return false;
+ }
+ Data data = requests.get(request.getRequestId());
+ if (isTimedOut(Instant.now(), data)) {
+ // as we try to re-add a session request that has already expired, force session timeout
+ failDueToTimeout(request.getRequestId());
+ // return true to avoid handleNewSessionRequest to call 'complete' an other time
+ return true;
+ } else if (data.isCanceled()) {
+ failDueToCanceled(request.getRequestId());
+ // return true to avoid handleNewSessionRequest to call 'complete' an other time
+ return true;
+ }
+
+ if (queue.contains(request)) {
+ // No need to re-add this
+ return true;
+ } else {
+ added = queue.offerFirst(request);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+
+ return added;
+ }
+ }
+
+ @Override
+ public Optional<SessionRequest> remove(RequestId reqId) {
+ Require.nonNull("Request ID", reqId);
+
+ Lock writeLock = lock.writeLock();
+ if (!writeLock.tryLock()) {
+ writeLock.lock();
+ }
+ try {
+ Iterator<SessionRequest> iterator = queue.iterator();
+ while (iterator.hasNext()) {
+ SessionRequest req = iterator.next();
+ if (reqId.equals(req.getRequestId())) {
+ iterator.remove();
+ // Remove from Redis
+ removeFromRedis(reqId);
+ return Optional.of(req);
+ }
+ }
+ return Optional.empty();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private volatile long lastNonEmptyQueueTime = System.currentTimeMillis();
+ private volatile boolean wasEmpty = false;
+ private static final long MAX_BACKOFF_MS = 1000; // Maximum 1 second backoff
+ private static final long MIN_BACKOFF_MS = 10; // Minimum 10ms backoff
+
+ @Override
+ public List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes) {
+ Require.nonNull("Stereotypes", stereotypes);
+
+ // Convert maximumResponseDelay to milliseconds for easier comparison
+ long maxDelayMs = maximumResponseDelay.toMillis();
+ long startTime = System.currentTimeMillis();
+ long backoffMs = MIN_BACKOFF_MS;
+
+ // delay the response to avoid heavy polling via http
+ while (maxDelayMs > System.currentTimeMillis() - startTime) {
+ boolean isEmpty = true;
+
+ // Check queue status with read lock
+ Lock readLock = lock.readLock();
+ readLock.lock();
+ try {
+ isEmpty = queue.isEmpty();
+ if (!isEmpty) {
+ lastNonEmptyQueueTime = System.currentTimeMillis();
+ wasEmpty = false;
+ break; // Exit loop if we found requests to process
+ }
+ } finally {
+ readLock.unlock();
+ }
+
+ // If queue is empty, use backoff with jitter
+ if (isEmpty) {
+ long timeSinceLastNonEmpty = System.currentTimeMillis() - lastNonEmptyQueueTime;
+
+ // If queue has been empty for a while, increase backoff
+ if (wasEmpty && timeSinceLastNonEmpty > 100) {
+ backoffMs = Math.min(backoffMs * 2, MAX_BACKOFF_MS);
+ } else {
+ backoffMs = MIN_BACKOFF_MS;
+ }
+
+ // Don't sleep longer than the remaining delay time
+ long remainingDelay = maxDelayMs - (System.currentTimeMillis() - startTime);
+ if (remainingDelay <= 0) {
+ break;
+ }
+
+ long sleepTime = Math.min(backoffMs, remainingDelay);
+ if (sleepTime <= 0) {
+ break;
+ }
+
+ // Add jitter to prevent thundering herd (up to 10% of sleep time)
+ long jitter = (long) (Math.random() * sleepTime * 0.1);
+ wasEmpty = true;
+
+ try {
+ Thread.sleep(sleepTime + jitter);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+
+ Predicate<Capabilities> matchesStereotype =
+ caps ->
+ stereotypes.entrySet().stream()
+ .filter(entry -> entry.getValue() > 0)
+ .anyMatch(
+ entry -> {
+ boolean matches = slotMatcher.matches(entry.getKey(), caps);
+ if (matches) {
+ Long value = entry.getValue();
+ entry.setValue(value - 1);
+ }
+ return matches;
+ });
+
+ Lock writeLock = lock.writeLock();
+ if (!writeLock.tryLock()) {
+ writeLock.lock();
+ }
+ try {
+ List<SessionRequest> availableRequests =
+ queue.stream()
+ .filter(req -> req.getDesiredCapabilities().stream().anyMatch(matchesStereotype))
+ .limit(batchSize)
+ .collect(Collectors.toList());
+
+ availableRequests.removeIf(
+ (req) -> {
+ Data data = this.requests.get(req.getRequestId());
+
+ if (data.isCanceled()) {
+ failDueToCanceled(req.getRequestId());
+ return true;
+ }
+
+ this.remove(req.getRequestId());
+ return false;
+ });
+
+ return availableRequests;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /** Returns true if the session is still valid (not timed out and not canceled) */
+ @Override
+ public boolean complete(
+ RequestId reqId, Either<SessionNotCreatedException, CreateSessionResponse> result) {
+ Require.nonNull("New session request", reqId);
+ Require.nonNull("Result", result);
+ TraceContext context = contexts.getOrDefault(reqId, tracer.getCurrentContext());
+ try (Span ignored = context.createSpan("sessionqueue.completed")) {
+ Data data;
+ Lock writeLock = lock.writeLock();
+ if (!writeLock.tryLock()) {
+ writeLock.lock();
+ }
+ try {
+ data = requests.remove(reqId);
+ queue.removeIf(req -> reqId.equals(req.getRequestId()));
+ contexts.remove(reqId);
+ } finally {
+ writeLock.unlock();
+ }
+
+ if (data == null) {
+ return false;
+ }
+
+ return data.setResult(result);
+ }
+ }
+
+ @Override
+ public int clearQueue() {
+ Lock writeLock = lock.writeLock();
+ if (!writeLock.tryLock()) {
+ writeLock.lock();
+ }
+
+ try {
+ int size = queue.size();
+ queue.clear();
+ requests.forEach(
+ (reqId, data) ->
+ data.setResult(
+ Either.left(new SessionNotCreatedException("Request queue was cleared"))));
+ requests.clear();
+ // Do not clear contexts for strict alignment
+ // Clear Redis asynchronously
+ service.execute(
+ () -> {
+ try {
+ redisClient.del(QUEUE_KEY);
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to clear Redis queue", e);
+ }
+ });
+
+ return size;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public List<SessionRequestCapability> getQueueContents() {
+ Lock readLock = lock.readLock();
+ readLock.lock();
+
+ try {
+ return queue.stream()
+ .map(
+ req -> new SessionRequestCapability(req.getRequestId(), req.getDesiredCapabilities()))
+ .collect(Collectors.toList());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @ManagedAttribute(name = "NewSessionQueueSize")
+ public int getQueueSize() {
+ return queue.size();
+ }
+
+ @ManagedAttribute(name = "SessionQueueUri")
+ public String getSessionQueueUri() {
+ return sessionQueueUri.toString();
+ }
+
+ @Override
+ public boolean isReady() {
+ try {
+ return redisClient.isOpen();
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void close() {
+ shutdownGracefully(NAME, service);
+ try {
+ redisClient.close();
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to close Redis connection", e);
+ }
+ }
+
+ private void failDueToTimeout(RequestId reqId) {
+ complete(reqId, Either.left(new SessionNotCreatedException("Timed out creating session")));
+ }
+
+ private void failDueToCanceled(RequestId reqId) {
+ // this error should never reach the client, as this is a client initiated state
+ complete(reqId, Either.left(new SessionNotCreatedException("Client has gone away")));
+ }
+
+ private static class Data {
+ public Instant endTime;
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private Either<SessionNotCreatedException, CreateSessionResponse> result;
+ private boolean complete;
+ private boolean canceled;
+
+ // No-arg constructor for JSON deserialization
+ public Data() {
+ this.endTime = Instant.now();
+ this.complete = false;
+ this.canceled = false;
+ this.result = Either.left(new SessionNotCreatedException("Session not created"));
+ }
+
+ public Data(Instant enqueued, Duration requestTimeout) {
+ this.endTime = Instant.now().plus(requestTimeout);
+ this.result = Either.left(new SessionNotCreatedException("Session not created"));
+ }
+
+ // Constructor for JSON deserialization
+ public Data(Instant endTime, boolean complete, boolean canceled) {
+ this.endTime = endTime;
+ this.complete = complete;
+ this.canceled = canceled;
+ this.result = Either.left(new SessionNotCreatedException("Session not created"));
+ }
+
+ // Add a constructor for full deserialization
+ public Data(
+ Instant endTime,
+ boolean complete,
+ boolean canceled,
+ Either<SessionNotCreatedException, CreateSessionResponse> result) {
+ this.endTime = endTime;
+ this.complete = complete;
+ this.canceled = canceled;
+ this.result = result;
+ }
+
+ public synchronized Either<SessionNotCreatedException, CreateSessionResponse> getResult() {
+ return result;
+ }
+
+ public synchronized void cancel() {
+ canceled = true;
+ }
+
+ public synchronized boolean isCanceled() {
+ return canceled;
+ }
+
+ public synchronized boolean setResult(
+ Either<SessionNotCreatedException, CreateSessionResponse> result) {
+ if (complete || canceled) {
+ return false;
+ }
+ this.result = result;
+ complete = true;
+ latch.countDown();
+ return true;
+ }
+
+ // Remove static from fromJson method and make it an instance method
+ public Data fromJson(
+ Instant endTime, boolean complete, boolean canceled, Map<String, Object> resultMap) {
+ Either<SessionNotCreatedException, CreateSessionResponse> result;
+ if (resultMap.containsKey("right")) {
+ CreateSessionResponse resp = (CreateSessionResponse) resultMap.get("right");
+ result = Either.right(resp);
+ } else {
+ SessionNotCreatedException ex = (SessionNotCreatedException) resultMap.get("left");
+ result = Either.left(ex);
+ }
+ return new Data(endTime, complete, canceled, result);
+ }
+
+ // Setters for JSON deserialization
+ public void setEndTime(Instant endTime) {
+ this.endTime = endTime;
+ }
+
+ public void setComplete(boolean complete) {
+ this.complete = complete;
+ }
+
+ public void setCanceled(boolean canceled) {
+ this.canceled = canceled;
+ }
+ }
+}
diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/remote/RemoteNewSessionQueue.java b/java/src/org/openqa/selenium/grid/sessionqueue/remote/RemoteNewSessionQueue.java
index 7b5fc00..fba0c16 100644
--- a/java/src/org/openqa/selenium/grid/sessionqueue/remote/RemoteNewSessionQueue.java
+++ b/java/src/org/openqa/selenium/grid/sessionqueue/remote/RemoteNewSessionQueue.java
@@ -68,6 +68,10 @@ public class RemoteNewSessionQueue extends NewSessionQueue {
private static final Json JSON = new Json();
private final HttpClient client;
private final Filter addSecret;
+ private volatile long backoffMs = 100; // Start with 100ms backoff
+ private static final long MAX_BACKOFF_MS = 5000; // Max 5 seconds backoff
+ private static final long MIN_BACKOFF_MS = 100; // Min 100ms backoff
+ private volatile long lastRequestTime = 0;
public RemoteNewSessionQueue(Tracer tracer, HttpClient client, Secret registrationSecret) {
super(tracer, registrationSecret);
@@ -146,17 +150,51 @@ public Optional<SessionRequest> remove(RequestId reqId) {
public List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes) {
Require.nonNull("Stereotypes", stereotypes);
- Map<String, Long> stereotypeJson = new HashMap<>();
- stereotypes.forEach((k, v) -> stereotypeJson.put(JSON.toJson(k), v));
+ // Apply backoff if needed
+ long now = System.currentTimeMillis();
+ long timeSinceLastRequest = now - lastRequestTime;
- HttpRequest upstream =
- new HttpRequest(POST, "/se/grid/newsessionqueue/session/next")
- .setContent(Contents.asJson(stereotypeJson));
+ if (timeSinceLastRequest < backoffMs) {
+ long sleepTime = backoffMs - timeSinceLastRequest;
+ try {
+ // Add some jitter to prevent thundering herd
+ long jitter = (long) (Math.random() * sleepTime * 0.1); // Up to 10% jitter
+ Thread.sleep(sleepTime + jitter);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return List.of();
+ }
+ }
- HttpTracing.inject(tracer, tracer.getCurrentContext(), upstream);
- HttpResponse response = client.with(addSecret).execute(upstream);
+ try {
+ Map<String, Long> stereotypeJson = new HashMap<>();
+ stereotypes.forEach((k, v) -> stereotypeJson.put(JSON.toJson(k), v));
- return Values.get(response, SESSION_REQUEST_TYPE);
+ HttpRequest upstream =
+ new HttpRequest(POST, "/se/grid/newsessionqueue/session/next")
+ .setContent(Contents.asJson(stereotypeJson));
+
+ HttpTracing.inject(tracer, tracer.getCurrentContext(), upstream);
+ HttpResponse response = client.with(addSecret).execute(upstream);
+
+ List<SessionRequest> result = Values.get(response, SESSION_REQUEST_TYPE);
+
+ // If we got results, reduce backoff. Otherwise, increase it.
+ if (result == null || result.isEmpty()) {
+ backoffMs = Math.min((long) (backoffMs * 1.5), MAX_BACKOFF_MS);
+ } else {
+ backoffMs = Math.max(MIN_BACKOFF_MS, backoffMs / 2);
+ }
+
+ return result != null ? result : List.of();
+
+ } catch (Exception e) {
+ // On error, increase backoff more aggressively
+ backoffMs = Math.min(backoffMs * 2, MAX_BACKOFF_MS);
+ throw e;
+ } finally {
+ lastRequestTime = System.currentTimeMillis();
+ }
}
@Override
diff --git a/java/test/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel b/java/test/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel
new file mode 100644
index 0000000..10adbef
--- /dev/null
+++ b/java/test/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel
@@ -0,0 +1,22 @@
+load("@rules_jvm_external//:defs.bzl", "artifact")
+load("//java:defs.bzl", "JUNIT5_DEPS", "java_test_suite")
+
+java_test_suite(
+ name = "MediumTests",
+ size = "medium",
+ srcs = glob(["*Test.java"]),
+ deps = [
+ "//java/src/org/openqa/selenium/events/local",
+ "//java/src/org/openqa/selenium/grid/sessionqueue/redis",
+ "//java/src/org/openqa/selenium/json",
+ "//java/src/org/openqa/selenium/redis",
+ "//java/src/org/openqa/selenium/remote",
+ "//java/test/org/openqa/selenium/remote/tracing:tracing-support",
+ "//java/test/org/openqa/selenium/testing:test-base",
+ artifact("io.lettuce:lettuce-core"),
+ artifact("io.opentelemetry:opentelemetry-api"),
+ artifact("org.junit.jupiter:junit-jupiter-api"),
+ artifact("org.assertj:assertj-core"),
+ artifact("org.mockito:mockito-core"),
+ ] + JUNIT5_DEPS,
+)
diff --git a/java/test/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueueTest.java b/java/test/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueueTest.java
new file mode 100644
index 0000000..81a2acb
--- /dev/null
+++ b/java/test/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueueTest.java
@@ -0,0 +1,216 @@
+// Licensed to the Software Freedom Conservancy (SFC) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The SFC licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.openqa.selenium.grid.sessionqueue.redis;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.net.URI;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.openqa.selenium.ImmutableCapabilities;
+import org.openqa.selenium.SessionNotCreatedException;
+import org.openqa.selenium.grid.data.CreateSessionResponse;
+import org.openqa.selenium.grid.data.RequestId;
+import org.openqa.selenium.grid.data.Session;
+import org.openqa.selenium.grid.data.SessionId;
+import org.openqa.selenium.grid.data.SessionRequest;
+import org.openqa.selenium.grid.data.SessionRequestCapability;
+import org.openqa.selenium.grid.security.Secret;
+import org.openqa.selenium.internal.Either;
+import org.openqa.selenium.remote.http.Contents;
+import org.openqa.selenium.remote.http.HttpMethod;
+import org.openqa.selenium.remote.http.HttpRequest;
+import org.openqa.selenium.remote.http.HttpResponse;
+import org.openqa.selenium.remote.tracing.DefaultTestTracer;
+import org.openqa.selenium.remote.tracing.Tracer;
+
+class RedisBackedSessionQueueTest {
+
+ private static final Tracer tracer = DefaultTestTracer.createTracer();
+ private static final Secret secret = new Secret("test-secret");
+ private static final URI redisUri = URI.create("redis://localhost:6379");
+ private static final Duration REQUEST_TIMEOUT_CHECK = Duration.ofMillis(50);
+ private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(1);
+ private static final Duration MAX_RESPONSE_DELAY = Duration.ofSeconds(2);
+ private static final int BATCH_SIZE = 3;
+
+ private RedisBackedSessionQueue queue;
+
+ @BeforeEach
+ void setUp() {
+ queue =
+ new RedisBackedSessionQueue(
+ tracer,
+ secret,
+ redisUri,
+ REQUEST_TIMEOUT_CHECK,
+ REQUEST_TIMEOUT,
+ MAX_RESPONSE_DELAY,
+ BATCH_SIZE);
+ }
+
+ @AfterEach
+ void tearDown() {
+ queue.clearQueue();
+ }
+
+ @Test
+ void shouldThrowIllegalArgumentExceptionIfRedisUriIsNull() {
+ assertThatThrownBy(
+ () ->
+ new RedisBackedSessionQueue(
+ tracer,
+ secret,
+ null,
+ REQUEST_TIMEOUT_CHECK,
+ REQUEST_TIMEOUT,
+ MAX_RESPONSE_DELAY,
+ BATCH_SIZE))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowIllegalArgumentExceptionIfTracerIsNull() {
+ assertThatThrownBy(
+ () ->
+ new RedisBackedSessionQueue(
+ null,
+ secret,
+ redisUri,
+ REQUEST_TIMEOUT_CHECK,
+ REQUEST_TIMEOUT,
+ MAX_RESPONSE_DELAY,
+ BATCH_SIZE))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowIllegalArgumentExceptionIfSecretIsNull() {
+ assertThatThrownBy(
+ () ->
+ new RedisBackedSessionQueue(
+ tracer,
+ null,
+ redisUri,
+ REQUEST_TIMEOUT_CHECK,
+ REQUEST_TIMEOUT,
+ MAX_RESPONSE_DELAY,
+ BATCH_SIZE))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void canAddSessionRequestToQueue() {
+ RequestId requestId = new RequestId(UUID.randomUUID());
+ SessionRequest request = createSessionRequest(requestId);
+
+ HttpResponse response = queue.addToQueue(request);
+
+ assertThat(response.getStatus()).isEqualTo(200);
+ }
+
+ @Test
+ void canRemoveSessionRequestFromQueue() {
+ RequestId requestId = new RequestId(UUID.randomUUID());
+ SessionRequest originalRequest = createSessionRequest(requestId);
+
+ queue.addToQueue(originalRequest);
+
+ Optional<SessionRequest> removed = queue.remove(requestId);
+
+ assertThat(removed).isPresent();
+ assertThat(removed.get().getRequestId()).isEqualTo(requestId);
+ }
+
+ @Test
+ void getNextAvailableShouldReturnOldestRequest() {
+ RequestId requestId = new RequestId(UUID.randomUUID());
+ SessionRequest originalRequest = createSessionRequest(requestId);
+
+ queue.addToQueue(originalRequest);
+
+ List<SessionRequest> next = queue.getNextAvailable(Map.of());
+
+ assertThat(next).hasSize(1);
+ assertThat(next.get(0).getRequestId()).isEqualTo(requestId);
+ }
+
+ @Test
+ void completeShouldReturnTrueAndCleanupRequestData() {
+ RequestId requestId = new RequestId(UUID.randomUUID());
+ Session dummySession =
+ new Session(
+ new SessionId("dummy"),
+ "dummy-uri",
+ new ImmutableCapabilities(),
+ new ImmutableCapabilities(),
+ Instant.now());
+ CreateSessionResponse response = new CreateSessionResponse(dummySession, new byte[0]);
+ Either<SessionNotCreatedException, CreateSessionResponse> result = Either.right(response);
+
+ queue.addToQueue(createSessionRequest(requestId));
+
+ boolean completed = queue.complete(requestId, result);
+
+ assertThat(completed).isTrue();
+ }
+
+ @Test
+ void clearQueueShouldRemoveAllRequests() {
+ RequestId requestId1 = new RequestId(UUID.randomUUID());
+ RequestId requestId2 = new RequestId(UUID.randomUUID());
+
+ queue.addToQueue(createSessionRequest(requestId1));
+ queue.addToQueue(createSessionRequest(requestId2));
+
+ int cleared = queue.clearQueue();
+
+ assertThat(cleared).isEqualTo(2);
+ }
+
+ @Test
+ void getQueueContentsShouldReturnAllRequests() {
+ RequestId requestId1 = new RequestId(UUID.randomUUID());
+ RequestId requestId2 = new RequestId(UUID.randomUUID());
+ SessionRequest request1 = createSessionRequest(requestId1);
+ SessionRequest request2 = createSessionRequest(requestId2);
+
+ queue.addToQueue(request1);
+ queue.addToQueue(request2);
+
+ List<SessionRequestCapability> contents = queue.getQueueContents();
+
+ assertThat(contents).hasSize(2);
+ assertThat(contents.get(0).getRequestId()).isEqualTo(requestId1);
+ assertThat(contents.get(1).getRequestId()).isEqualTo(requestId2);
+ }
+
+ private SessionRequest createSessionRequest(RequestId requestId) {
+ HttpRequest httpRequest = new HttpRequest(HttpMethod.POST, "/session");
+ httpRequest.setContent(Contents.utf8String("{\"capabilities\":{\"browserName\":\"chrome\"}}"));
+ return new SessionRequest(requestId, httpRequest, Instant.now());
+ }
+}