diff --git a/assembly/assembly-wsmaster-war/src/main/java/org/eclipse/che/api/deploy/jsonrpc/CheMajorWebSocketEndpointExecutorServiceProvider.java b/assembly/assembly-wsmaster-war/src/main/java/org/eclipse/che/api/deploy/jsonrpc/CheMajorWebSocketEndpointExecutorServiceProvider.java index 3a5348d8120..8d557f7d66d 100644 --- a/assembly/assembly-wsmaster-war/src/main/java/org/eclipse/che/api/deploy/jsonrpc/CheMajorWebSocketEndpointExecutorServiceProvider.java +++ b/assembly/assembly-wsmaster-war/src/main/java/org/eclipse/che/api/deploy/jsonrpc/CheMajorWebSocketEndpointExecutorServiceProvider.java @@ -11,16 +11,21 @@ */ package org.eclipse.che.api.deploy.jsonrpc; +import static org.slf4j.LoggerFactory.getLogger; + import java.util.concurrent.ExecutorService; import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; import org.eclipse.che.commons.lang.execution.ExecutorServiceProvider; +import org.slf4j.Logger; /** {@link ExecutorService} provider used in {@link CheMajorWebSocketEndpoint}. */ @Singleton public class CheMajorWebSocketEndpointExecutorServiceProvider extends ExecutorServiceProvider { + private static final Logger LOG = getLogger(ExecutorServiceProvider.class); + public static final String JSON_RPC_MAJOR_CORE_POOL_SIZE_PARAMETER_NAME = "che.core.jsonrpc.processor_core_pool_size"; public static final String JSON_RPC_MAJOR_MAX_POOL_SIZE_PARAMETER_NAME = @@ -33,6 +38,15 @@ public CheMajorWebSocketEndpointExecutorServiceProvider( @Named(JSON_RPC_MAJOR_CORE_POOL_SIZE_PARAMETER_NAME) int corePoolSize, @Named(JSON_RPC_MAJOR_MAX_POOL_SIZE_PARAMETER_NAME) int maxPoolSize, @Named(JSON_RPC_MAJOR_QUEUE_CAPACITY_PARAMETER_NAME) int queueCapacity) { - super(corePoolSize, maxPoolSize, queueCapacity); + super( + corePoolSize, + maxPoolSize, + queueCapacity, + (r, executor) -> + LOG.error( + "Executor on major websocket endpoint rejected to handle the payload {}. Some important messages may be lost. Consider increasing `{}`. Now it's configured to {}", + r, + JSON_RPC_MAJOR_QUEUE_CAPACITY_PARAMETER_NAME, + queueCapacity)); } } diff --git a/core/commons/che-core-commons-lang/src/main/java/org/eclipse/che/commons/lang/execution/ExecutorServiceProvider.java b/core/commons/che-core-commons-lang/src/main/java/org/eclipse/che/commons/lang/execution/ExecutorServiceProvider.java index 25ae5099bc5..092a4024c8c 100644 --- a/core/commons/che-core-commons-lang/src/main/java/org/eclipse/che/commons/lang/execution/ExecutorServiceProvider.java +++ b/core/commons/che-core-commons-lang/src/main/java/org/eclipse/che/commons/lang/execution/ExecutorServiceProvider.java @@ -17,6 +17,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -44,15 +45,18 @@ public class ExecutorServiceProvider implements Provider { private static final Logger LOG = getLogger(ExecutorServiceProvider.class); private final ThreadPoolExecutor executor; - /** * @param corePoolSize - corePoolSize of ThreadPoolExecutor * @param maxPoolSize - maximumPoolSize of ThreadPoolExecutor * @param queueCapacity - queue capacity. if > 0 then this is capacity of {@link * LinkedBlockingQueue} if <=0 then {@link SynchronousQueue} are used. + * @param rejectedExecutionHandler - {@link RejectedExecutionHandler} of ThreadPoolExecutor */ - public ExecutorServiceProvider(int corePoolSize, int maxPoolSize, int queueCapacity) { - + public ExecutorServiceProvider( + int corePoolSize, + int maxPoolSize, + int queueCapacity, + RejectedExecutionHandler rejectedExecutionHandler) { ThreadFactory factory = new ThreadFactoryBuilder() .setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.getInstance()) @@ -68,11 +72,24 @@ public ExecutorServiceProvider(int corePoolSize, int maxPoolSize, int queueCapac SECONDS, queueCapacity > 0 ? new LinkedBlockingQueue<>(queueCapacity) : new SynchronousQueue<>(), factory); - executor.setRejectedExecutionHandler( - (r, __) -> LOG.warn("Executor rejected to handle the payload {}", r)); + executor.setRejectedExecutionHandler(rejectedExecutionHandler); executor.prestartCoreThread(); } + /** + * @param corePoolSize - corePoolSize of ThreadPoolExecutor + * @param maxPoolSize - maximumPoolSize of ThreadPoolExecutor + * @param queueCapacity - queue capacity. if > 0 then this is capacity of {@link + * LinkedBlockingQueue} if <=0 then {@link SynchronousQueue} are used. + */ + public ExecutorServiceProvider(int corePoolSize, int maxPoolSize, int queueCapacity) { + this( + corePoolSize, + maxPoolSize, + queueCapacity, + (r, e) -> LOG.warn("Executor rejected to handle the payload {}", r)); + } + @Override public ExecutorService get() { return executor;