Skip to content

Commit

Permalink
oncrpcsvc: do not use FixedThreadPool from grizzly
Browse files Browse the repository at this point in the history
Motivation:
The FixedThreadPool from grizzly has a bad habit not to start a new thread
when one of threads from the pool have died due to RuntimeException or Error.
As a result, we may observe thread starvation or even full service
unavailability.

Modification:
use Executors#newFixedThreadPool which will take care of dead threads
and start new one if required.

Result:
In situations, where worker thread die due to code issues new threads
are started.

Fixes: #68
Acked-by: Albert Rossi
Target: master, 3.0
  • Loading branch information
kofemann committed Nov 1, 2018
1 parent 95628d3 commit 7479eec
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public class GrizzlyUtils {
*/
final static int MIN_WORKERS = 5;

/**
* Number of available CPUs.
*/
final static int CPUS = Runtime.getRuntime().availableProcessors();

private GrizzlyUtils(){}
Expand Down Expand Up @@ -82,16 +85,21 @@ static private int getSelectorPoolSize(IoStrategy ioStrategy) {
? Math.max(MIN_SELECTORS, CPUS / 4) : Math.max(MIN_WORKERS, CPUS);
}

static private int getWorkerPoolSize(IoStrategy ioStrategy) {
return ioStrategy == WORKER_THREAD
? Math.max(MIN_WORKERS, (CPUS * 2)) : 0;
/**
* Get recommended default number of worker threads. The recommended number is based
* on number of available CPUs and at least equal to {@link #MIN_WORKERS}.
*
* @return recommended number of worker threads.
*/
public static int getDefaultWorkerPoolSize() {
return Math.max(MIN_WORKERS, (CPUS * 2));
}

/**
* Pre-configure Selectors thread pool for given {@link IOStrategy},
* {@code serviceName} and {@code poolSize}. If {@code poolSize} is zero,
* then default value will be used. If {@code poolSize} is smaller than minimal
* allowed number of threads, then {@link MIN_SELECTORS} will be used.
* allowed number of threads, then {@link #MIN_SELECTORS} will be used.
*
* @param ioStrategy to use
* @param serviceName service name (affects thread names)
Expand All @@ -112,36 +120,6 @@ public static ThreadPoolConfig getSelectorPoolCfg(IoStrategy ioStrategy, String
return poolCfg;
}

/**
* Pre-configure Worker thread pool for given {@link IOStrategy},
* {@code serviceName} and {@code poolSize}. If {@code poolSize} is zero,
* then default value will be used. If {@code poolSize} is smaller than minimal
* allowed number of threads, then {@link MIN_WORKERS} will be used.
*
* @param ioStrategy in use
* @param serviceName service name (affects thread names)
* @param poolSize thread pool size. If zero, default thread pool is used.
* @return thread pool configuration or {@code null}, if ioStrategy don't
* supports worker threads.
*/
public static ThreadPoolConfig getWorkerPoolCfg(IoStrategy ioStrategy, String serviceName, int poolSize) {

if (ioStrategy == SAME_THREAD) {
return null;
}

checkArgument(poolSize >= 0, "Negative thread pool size");

final int threadPoolSize = poolSize > 0 ? Math.max(poolSize, MIN_WORKERS) : getWorkerPoolSize(ioStrategy);
final ThreadPoolConfig poolCfg = ThreadPoolConfig.defaultConfig();
poolCfg.setCorePoolSize(threadPoolSize).setMaxPoolSize(threadPoolSize);
if (serviceName != null) {
poolCfg.setPoolName(serviceName + " Worker");
}

return poolCfg;
}

static IOStrategy translate(IoStrategy ioStrategy) {
switch (ioStrategy) {
case SAME_THREAD:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@
package org.dcache.oncrpc4j.rpc;

import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.dcache.oncrpc4j.rpc.gss.GssSessionManager;
import org.glassfish.grizzly.threadpool.FixedThreadPool;
import org.glassfish.grizzly.threadpool.ThreadPoolConfig;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.google.common.base.Preconditions.checkArgument;
import static org.dcache.oncrpc4j.grizzly.GrizzlyUtils.getWorkerPoolCfg;
import static org.dcache.oncrpc4j.grizzly.GrizzlyUtils.getDefaultWorkerPoolSize;
import static org.dcache.oncrpc4j.rpc.net.IpProtocolType.*;


Expand Down Expand Up @@ -251,9 +252,14 @@ public ExecutorService getWorkerThreadExecutorService() {
return _workerThreadExecutionService;
}

ThreadPoolConfig workerPoolConfig = getWorkerPoolCfg(_ioStrategy,
_serviceName, _workerThreadPoolSize);
return new FixedThreadPool(workerPoolConfig);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(_serviceName + " (%d)")
.build();

int threadPoolSize = _workerThreadPoolSize != 0 ? _workerThreadPoolSize
: getDefaultWorkerPoolSize();

return Executors.newFixedThreadPool(threadPoolSize, threadFactory);
}

public int getSelectorThreadPoolSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,6 @@ public void shouldReturnExpectedValueForSelector() {
assertEquals("Must return provided value", GrizzlyUtils.MIN_SELECTORS + 1, tpc.getMaxPoolSize());
}

@Test
public void shouldSReturnDefaultValueOnZeroForWorker() {
ThreadPoolConfig tpc = GrizzlyUtils.getWorkerPoolCfg(IoStrategy.WORKER_THREAD, "aService", 0);
assertTrue(tpc.getMaxPoolSize() > 0);
}

@Test
public void shouldReturnMinValueIfTooSmallForWorker() {
ThreadPoolConfig tpc = GrizzlyUtils.getWorkerPoolCfg(IoStrategy.WORKER_THREAD, "aService", GrizzlyUtils.MIN_WORKERS - 1);
assertEquals("Must return minimal value", GrizzlyUtils.MIN_WORKERS, tpc.getMaxPoolSize());
}

@Test
public void shouldReturnExpectedValueForWorker() {
ThreadPoolConfig tpc = GrizzlyUtils.getWorkerPoolCfg(IoStrategy.WORKER_THREAD, "aService", GrizzlyUtils.MIN_WORKERS + 1);
assertEquals("Must return provided value", GrizzlyUtils.MIN_WORKERS + 1, tpc.getMaxPoolSize());
}

@Test
public void shouldReturnNullIfNowWorkerThreadConfigured() {
ThreadPoolConfig tpc = GrizzlyUtils.getWorkerPoolCfg(IoStrategy.SAME_THREAD, "aService", 1);
assertNull("Must return null if no worker thread configured", tpc);
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowExceptionIfNegativeSizeProvidedForWorker() {
ThreadPoolConfig tpc = GrizzlyUtils.getWorkerPoolCfg(IoStrategy.WORKER_THREAD, "aService", -1);
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowExceptionIfNegativeSizeProvidedForSelector() {
ThreadPoolConfig tpc = GrizzlyUtils.getSelectorPoolCfg(IoStrategy.WORKER_THREAD, "aService", -1);
Expand Down

0 comments on commit 7479eec

Please sign in to comment.