diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 122c7f414be..63cbd397acb 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -767,6 +767,15 @@ public enum Property { "Resource group name for this TabletServer. Resource groups can be defined to dedicate resources " + " to specific tables (e.g. balancing tablets for table(s) within a group, see TableLoadBalancer).", "4.0.0"), + TSERV_CONDITIONAL_UPDATE_THREADS_ROOT("tserver.conditionalupdate.threads.root", "16", + PropertyType.COUNT, "Numbers of threads for executing conditional updates on the root table.", + "4.0.0"), + TSERV_CONDITIONAL_UPDATE_THREADS_META("tserver.conditionalupdate.threads.meta", "64", + PropertyType.COUNT, + "Numbers of threads for executing conditional updates on the metadata table.", "4.0.0"), + TSERV_CONDITIONAL_UPDATE_THREADS_USER("tserver.conditionalupdate.threads.user", "64", + PropertyType.COUNT, "Numbers of threads for executing conditional updates on user tables.", + "4.0.0"), // accumulo garbage collector properties GC_PREFIX("gc.", null, PropertyType.PREFIX, diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java index b57baf0d5d5..b360e41b6b5 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java @@ -62,6 +62,9 @@ public enum ThreadPoolNames { TSERVER_TABLET_MIGRATION_POOL("accumulo.pool.tserver.tablet.migration"), TSERVER_WAL_CREATOR_POOL("accumulo.pool.tserver.wal.creator"), TSERVER_WAL_SORT_CONCURRENT_POOL("accumulo.pool.tserver.wal.sort.concurrent"), + TSERVER_CONDITIONAL_UPDATE_ROOT_POOL("accumulo.pool.tserver.conditionalupdate.root"), + TSERVER_CONDITIONAL_UPDATE_META_POOL("accumulo.pool.tserver.conditionalupdate.meta"), + TSERVER_CONDITIONAL_UPDATE_USER_POOL("accumulo.pool.tserver.conditionalupdate.user"), UTILITY_CHECK_FILE_TASKS("accumulo.pool.util.check.file.tasks"), UTILITY_VERIFY_TABLET_ASSIGNMENTS("accumulo.pool.util.check.tablet.servers"); diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 44fd7d64e7e..12e2567bdf6 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -28,6 +28,9 @@ import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_STATUS_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCHED_FUTURE_CHECKER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_ASSIGNMENT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MIGRATIONS_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL; @@ -343,6 +346,27 @@ public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf builder.enableThreadPoolMetrics(); } return builder.build(); + case TSERV_CONDITIONAL_UPDATE_THREADS_ROOT: + builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_ROOT_POOL) + .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); + case TSERV_CONDITIONAL_UPDATE_THREADS_META: + builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_META_POOL) + .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); + case TSERV_CONDITIONAL_UPDATE_THREADS_USER: + builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_USER_POOL) + .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case GC_DELETE_THREADS: return getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build(); default: diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 0b5c2466473..117d87a96d5 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -794,19 +794,30 @@ public List conditionalUpdate(TInfo tinfo, long sessID, } } - ArrayList results = new ArrayList<>(); - - Map> deferred = - conditionalUpdate(cs, updates, results, symbols); + var lambdaCs = cs; + // Conditional updates read data into memory, examine it, and then make an update. This can be + // CPU, I/O, and memory intensive. Using a thread pool directly limits CPU usage and + // indirectly limits memory and I/O usage. + Future> future = + server.resourceManager.executeConditionalUpdate(cs.tableId, () -> { + ArrayList results = new ArrayList<>(); + + Map> deferred = + conditionalUpdate(lambdaCs, updates, results, symbols); + + while (!deferred.isEmpty()) { + deferred = conditionalUpdate(lambdaCs, deferred, results, symbols); + } - while (!deferred.isEmpty()) { - deferred = conditionalUpdate(cs, deferred, results, symbols); - } + return results; + }); - return results; - } catch (IOException ioe) { - throw new TException(ioe); - } catch (Exception e) { + return future.get(); + } catch (ExecutionException | InterruptedException e) { + log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: {}", + cs == null ? null : cs.tableId, opid, e); + throw new TException(e); + } catch (RuntimeException e) { log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: {}", cs == null ? null : cs.tableId, opid, e); throw e; @@ -814,6 +825,7 @@ public List conditionalUpdate(TInfo tinfo, long sessID, if (opid != null) { writeTracker.finishWrite(opid); } + if (cs != null) { server.sessionManager.unreserveSession(sessID); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index ff23188a5ad..f09165c4508 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -27,6 +27,9 @@ import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_ASSIGNMENT_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_MIGRATION_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TABLET_ASSIGNMENT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_FILE_RETRIEVER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL; @@ -46,8 +49,10 @@ import java.util.OptionalInt; import java.util.Queue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -62,9 +67,11 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory; import org.apache.accumulo.core.file.blockfile.impl.ScanCacheProvider; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.spi.cache.BlockCache; import org.apache.accumulo.core.spi.cache.BlockCacheManager; import org.apache.accumulo.core.spi.cache.CacheType; @@ -118,6 +125,8 @@ public class TabletServerResourceManager { private final Map scanExecutors; private final Map scanExecutorChoices; + private final Map conditionalMutationExecutors; + private final ConcurrentHashMap activeAssignments; private final FileManager fileManager; @@ -385,6 +394,27 @@ public TabletServerResourceManager(ServerContext context, TabletHostingServer ts memMgmt = new MemoryManagementFramework(); memMgmt.startThreads(); + var rootConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, + Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT, enableMetrics); + modifyThreadPoolSizesAtRuntime( + () -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT), + TSERVER_CONDITIONAL_UPDATE_ROOT_POOL.poolName, rootConditionalPool); + + var metaConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, + Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics); + modifyThreadPoolSizesAtRuntime( + () -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER), + TSERVER_CONDITIONAL_UPDATE_META_POOL.poolName, metaConditionalPool); + + var userConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, + Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics); + modifyThreadPoolSizesAtRuntime( + () -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER), + TSERVER_CONDITIONAL_UPDATE_USER_POOL.poolName, userConditionalPool); + + conditionalMutationExecutors = Map.of(Ample.DataLevel.ROOT, rootConditionalPool, + Ample.DataLevel.METADATA, metaConditionalPool, Ample.DataLevel.USER, userConditionalPool); + // We can use the same map for both metadata and normal assignments since the keyspace (extent) // is guaranteed to be unique. Schedule the task once, the task will reschedule itself. ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().schedule( @@ -835,6 +865,10 @@ public void addMigration(KeyExtent tablet, Runnable migrationHandler) { } } + public Future executeConditionalUpdate(TableId tableId, Callable updateTask) { + return conditionalMutationExecutors.get(Ample.DataLevel.of(tableId)).submit(updateTask); + } + public BlockCache getIndexCache() { return _iCache; }