From 111b4690b34ae7a45cf8e9d2ecce69ae1f9f8e9b Mon Sep 17 00:00:00 2001 From: "manson.li" Date: Thu, 6 Feb 2025 17:58:42 +0800 Subject: [PATCH] [AMORO-3346] Extract the implementation of OptimizerManager --- .../amoro/server/AmoroServiceContainer.java | 24 +- .../server/DefaultOptimizingService.java | 459 +--------------- .../server/dashboard/DashboardServer.java | 4 +- .../controller/OptimizerController.java | 6 +- .../controller/OptimizerGroupController.java | 7 +- .../resource/DefaultOptimizerManager.java | 492 ++++++++++++++++++ .../server/resource/OptimizerManager.java | 14 + .../amoro/server/AMSManagerTestBase.java | 2 + .../amoro/server/AMSServiceTestBase.java | 7 +- .../apache/amoro/server/AmsEnvironment.java | 1 + .../server/TestDefaultOptimizingService.java | 4 +- 11 files changed, 549 insertions(+), 471 deletions(-) create mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/resource/DefaultOptimizerManager.java diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 15c532d5df..e1eac5bf39 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -42,6 +42,7 @@ import org.apache.amoro.server.persistence.HttpSessionHandlerFactory; import org.apache.amoro.server.persistence.SqlSessionFactoryProvider; import org.apache.amoro.server.resource.ContainerMetadata; +import org.apache.amoro.server.resource.DefaultOptimizerManager; import org.apache.amoro.server.resource.OptimizerManager; import org.apache.amoro.server.resource.ResourceContainers; import org.apache.amoro.server.table.DefaultTableManager; @@ -100,7 +101,7 @@ public class AmoroServiceContainer { private CatalogManager catalogManager; private TableManager tableManager; private TableService tableService; - private DefaultOptimizingService optimizingService; + private DefaultOptimizerManager optimizerManager; private TerminalManager terminalManager; private Configurations serviceConfig; private TServer tableManagementServer; @@ -157,12 +158,10 @@ public void startService() throws Exception { tableManager = new DefaultTableManager(serviceConfig, catalogManager); tableService = new DefaultTableService(serviceConfig, catalogManager); - optimizingService = - new DefaultOptimizingService(serviceConfig, catalogManager, tableManager, tableService); - + optimizerManager = new DefaultOptimizerManager(serviceConfig, catalogManager, tableManager); LOG.info("Setting up AMS table executors..."); AsyncTableExecutors.getInstance().setup(tableService, serviceConfig); - addHandlerChain(optimizingService.getTableRuntimeHandler()); + addHandlerChain(optimizerManager.getTableRuntimeHandler()); addHandlerChain(AsyncTableExecutors.getInstance().getDataExpiringExecutor()); addHandlerChain(AsyncTableExecutors.getInstance().getSnapshotsExpiringExecutor()); addHandlerChain(AsyncTableExecutors.getInstance().getOrphanFilesCleaningExecutor()); @@ -219,10 +218,11 @@ public void dispose() { terminalManager.dispose(); terminalManager = null; } - if (optimizingService != null) { - LOG.info("Stopping optimizing service..."); - optimizingService.dispose(); - optimizingService = null; + + if (optimizerManager != null) { + LOG.info("Stopping optimizing manager..."); + optimizerManager.dispose(); + optimizerManager = null; } if (amsServiceMetrics != null) { @@ -253,7 +253,7 @@ private void startThriftServer(TServer server, String threadName) { private void initHttpService() { DashboardServer dashboardServer = new DashboardServer( - serviceConfig, catalogManager, tableManager, optimizingService, terminalManager); + serviceConfig, catalogManager, tableManager, optimizerManager, terminalManager); RestCatalogService restCatalogService = new RestCatalogService(catalogManager, tableManager); httpServer = @@ -364,7 +364,7 @@ workerThreads, getThriftThreadFactory(Constants.THRIFT_TABLE_SERVICE_NAME)), new OptimizingService.Processor<>( ThriftServiceProxy.createProxy( OptimizingService.Iface.class, - optimizingService, + new DefaultOptimizingService(serviceConfig, optimizerManager, tableService), AmoroRuntimeException::normalize)); optimizingServiceServer = createThriftServer( @@ -559,6 +559,6 @@ public CatalogManager getCatalogManager() { @VisibleForTesting public OptimizerManager getOptimizingService() { - return this.optimizingService; + return this.optimizerManager; } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index a34e064215..1446cb88a4 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -18,67 +18,37 @@ package org.apache.amoro.server; -import org.apache.amoro.AmoroTable; import org.apache.amoro.OptimizerProperties; -import org.apache.amoro.ServerTableIdentifier; -import org.apache.amoro.api.CatalogMeta; import org.apache.amoro.api.OptimizerRegisterInfo; import org.apache.amoro.api.OptimizingService; import org.apache.amoro.api.OptimizingTask; import org.apache.amoro.api.OptimizingTaskId; import org.apache.amoro.api.OptimizingTaskResult; import org.apache.amoro.config.Configurations; -import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.exception.ForbiddenException; -import org.apache.amoro.exception.IllegalTaskStateException; -import org.apache.amoro.exception.ObjectNotExistsException; import org.apache.amoro.exception.PluginRetryAuthException; import org.apache.amoro.exception.TaskNotFoundException; -import org.apache.amoro.properties.CatalogMetaProperties; -import org.apache.amoro.resource.Resource; -import org.apache.amoro.resource.ResourceGroup; -import org.apache.amoro.server.catalog.CatalogManager; import org.apache.amoro.server.optimizing.OptimizingProcess; import org.apache.amoro.server.optimizing.OptimizingProcessMeta; import org.apache.amoro.server.optimizing.OptimizingQueue; -import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.optimizing.TaskRuntime; import org.apache.amoro.server.persistence.StatedPersistentBase; import org.apache.amoro.server.persistence.mapper.OptimizerMapper; import org.apache.amoro.server.persistence.mapper.OptimizingMapper; -import org.apache.amoro.server.persistence.mapper.ResourceMapper; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.OptimizerManager; import org.apache.amoro.server.resource.OptimizerThread; -import org.apache.amoro.server.resource.QuotaProvider; -import org.apache.amoro.server.table.MaintainedTableManager; -import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; -import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.amoro.shade.thrift.org.apache.thrift.TException; -import org.apache.amoro.table.TableProperties; -import org.apache.commons.lang3.StringUtils; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.Delayed; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; -import java.util.stream.Collectors; /** * DefaultOptimizingService is implementing the OptimizerManager Thrift service, which manages the @@ -90,106 +60,33 @@ * suspending tasks. */ public class DefaultOptimizingService extends StatedPersistentBase - implements OptimizingService.Iface, OptimizerManager, QuotaProvider { + implements OptimizingService.Iface { private static final Logger LOG = LoggerFactory.getLogger(DefaultOptimizingService.class); private final long optimizerTouchTimeout; - private final long taskAckTimeout; - private final int maxPlanningParallelism; private final long pollingTimeout; - private final Map optimizingQueueByGroup = new ConcurrentHashMap<>(); - private final Map optimizingQueueByToken = new ConcurrentHashMap<>(); private final Map authOptimizers = new ConcurrentHashMap<>(); - private final OptimizerKeeper optimizerKeeper = new OptimizerKeeper(); - private final CatalogManager catalogManager; + private final OptimizerManager optimizerManager; private final TableService tableService; - private final MaintainedTableManager tableManager; - private final RuntimeHandlerChain tableHandlerChain; - private final ExecutorService planExecutor; public DefaultOptimizingService( - Configurations serviceConfig, - CatalogManager catalogManager, - MaintainedTableManager tableManager, - TableService tableService) { - this.optimizerTouchTimeout = - serviceConfig.get(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT).toMillis(); - this.taskAckTimeout = - serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT).toMillis(); - this.maxPlanningParallelism = - serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM); - this.pollingTimeout = - serviceConfig.get(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT).toMillis(); + Configurations serviceConfig, OptimizerManager optimizerManager, TableService tableService) { + this.optimizerTouchTimeout = serviceConfig.get(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT).toMillis(); + this.pollingTimeout = serviceConfig.get(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT).toMillis(); this.tableService = tableService; - this.catalogManager = catalogManager; - this.tableManager = tableManager; - this.tableHandlerChain = new TableRuntimeHandlerImpl(); - this.planExecutor = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setNameFormat("plan-executor-thread-%d") - .setDaemon(true) - .build()); + this.optimizerManager = optimizerManager; } - public RuntimeHandlerChain getTableRuntimeHandler() { - return tableHandlerChain; - } - - private void loadOptimizingQueues(List tableRuntimeMetaList) { - List optimizerGroups = - getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups); - List optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll); - Map> groupToTableRuntimes = - tableRuntimeMetaList.stream() - .collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup)); - optimizerGroups.forEach( - group -> { - String groupName = group.getName(); - List tableRuntimes = groupToTableRuntimes.remove(groupName); - OptimizingQueue optimizingQueue = - new OptimizingQueue( - catalogManager, - group, - this, - planExecutor, - Optional.ofNullable(tableRuntimes).orElseGet(ArrayList::new), - maxPlanningParallelism); - optimizingQueueByGroup.put(groupName, optimizingQueue); - }); - optimizers.forEach(optimizer -> registerOptimizer(optimizer, false)); - groupToTableRuntimes - .keySet() - .forEach(groupName -> LOG.warn("Unloaded task runtime in group {}", groupName)); - } - - private void registerOptimizer(OptimizerInstance optimizer, boolean needPersistent) { - if (needPersistent) { - doAs(OptimizerMapper.class, mapper -> mapper.insertOptimizer(optimizer)); - } - - OptimizingQueue optimizingQueue = optimizingQueueByGroup.get(optimizer.getGroupName()); - optimizingQueue.addOptimizer(optimizer); - authOptimizers.put(optimizer.getToken(), optimizer); - optimizingQueueByToken.put(optimizer.getToken(), optimizingQueue); - optimizerKeeper.keepInTouch(optimizer); - } - - private void unregisterOptimizer(String token) { - doAs(OptimizerMapper.class, mapper -> mapper.deleteOptimizer(token)); - OptimizingQueue optimizingQueue = optimizingQueueByToken.remove(token); - OptimizerInstance optimizer = authOptimizers.remove(token); - if (optimizingQueue != null) { - optimizingQueue.removeOptimizer(optimizer); - } + public OptimizerManager getOptimizerManager() { + return optimizerManager; } @Override public void ping() {} public List> listTasks(String optimizerGroup) { - return getQueueByGroup(optimizerGroup).collectTasks(); + return optimizerManager.getQueueByGroup(optimizerGroup).collectTasks(); } @Override @@ -208,7 +105,7 @@ private OptimizerInstance getAuthenticatedOptimizer(String authToken) { @Override public OptimizingTask pollTask(String authToken, int threadId) { LOG.debug("Optimizer {} (threadId {}) try polling task", authToken, threadId); - OptimizingQueue queue = getQueueByToken(authToken); + OptimizingQueue queue = optimizerManager.getQueueByToken(authToken); return Optional.ofNullable(queue.pollTask(pollingTimeout)) .map(task -> extractOptimizingTask(task, authToken, threadId, queue)) .orElse(null); @@ -231,7 +128,7 @@ private OptimizingTask extractOptimizingTask( @Override public void ackTask(String authToken, int threadId, OptimizingTaskId taskId) { LOG.info("Ack task {} by optimizer {} (threadId {})", taskId, authToken, threadId); - OptimizingQueue queue = getQueueByToken(authToken); + OptimizingQueue queue = optimizerManager.getQueueByToken(authToken); Optional.ofNullable(queue.getTask(taskId)) .orElseThrow(() -> new TaskNotFoundException(taskId)) .ack(getAuthenticatedOptimizer(authToken).getThread(threadId)); @@ -244,7 +141,7 @@ public void completeTask(String authToken, OptimizingTaskResult taskResult) { authToken, taskResult.getThreadId(), taskResult.getTaskId()); - OptimizingQueue queue = getQueueByToken(authToken); + OptimizingQueue queue = optimizerManager.getQueueByToken(authToken); OptimizerThread thread = getAuthenticatedOptimizer(authToken).getThread(taskResult.getThreadId()); Optional.ofNullable(queue.getTask(taskResult.getTaskId())) @@ -270,9 +167,9 @@ public String authenticate(OptimizerRegisterInfo registerInfo) { } }); - OptimizingQueue queue = getQueueByGroup(registerInfo.getGroupName()); + OptimizingQueue queue = optimizerManager.getQueueByGroup(registerInfo.getGroupName()); OptimizerInstance optimizer = new OptimizerInstance(registerInfo, queue.getContainerName()); - registerOptimizer(optimizer, true); + optimizerManager.registerOptimizer(optimizer, true); return optimizer.getToken(); } @@ -295,332 +192,4 @@ public boolean cancelProcess(long processId) throws TException { process.close(); return true; } - - /** - * Get optimizing queue. - * - * @return OptimizeQueueItem - */ - private OptimizingQueue getQueueByGroup(String optimizerGroup) { - return getOptionalQueueByGroup(optimizerGroup) - .orElseThrow(() -> new ObjectNotExistsException("Optimizer group " + optimizerGroup)); - } - - private Optional getOptionalQueueByGroup(String optimizerGroup) { - Preconditions.checkArgument(optimizerGroup != null, "optimizerGroup can not be null"); - return Optional.ofNullable(optimizingQueueByGroup.get(optimizerGroup)); - } - - private OptimizingQueue getQueueByToken(String token) { - Preconditions.checkArgument(token != null, "optimizer token can not be null"); - return Optional.ofNullable(optimizingQueueByToken.get(token)) - .orElseThrow(() -> new PluginRetryAuthException("Optimizer has not been authenticated")); - } - - @Override - public List listOptimizers() { - return ImmutableList.copyOf(authOptimizers.values()); - } - - @Override - public List listOptimizers(String group) { - return authOptimizers.values().stream() - .filter(optimizer -> optimizer.getGroupName().equals(group)) - .collect(Collectors.toList()); - } - - @Override - public void deleteOptimizer(String group, String resourceId) { - List deleteOptimizers = - getAs(OptimizerMapper.class, mapper -> mapper.selectByResourceId(resourceId)); - deleteOptimizers.forEach( - optimizer -> { - String token = optimizer.getToken(); - unregisterOptimizer(token); - }); - } - - @Override - public void createResourceGroup(ResourceGroup resourceGroup) { - doAsTransaction( - () -> { - doAs(ResourceMapper.class, mapper -> mapper.insertResourceGroup(resourceGroup)); - OptimizingQueue optimizingQueue = - new OptimizingQueue( - catalogManager, - resourceGroup, - this, - planExecutor, - new ArrayList<>(), - maxPlanningParallelism); - optimizingQueueByGroup.put(resourceGroup.getName(), optimizingQueue); - }); - } - - @Override - public void deleteResourceGroup(String groupName) { - if (canDeleteResourceGroup(groupName)) { - doAs(ResourceMapper.class, mapper -> mapper.deleteResourceGroup(groupName)); - OptimizingQueue optimizingQueue = optimizingQueueByGroup.remove(groupName); - optimizingQueue.dispose(); - } else { - throw new RuntimeException( - String.format( - "The resource group %s cannot be deleted because it is currently in " + "use.", - groupName)); - } - } - - @Override - public void updateResourceGroup(ResourceGroup resourceGroup) { - Preconditions.checkNotNull(resourceGroup, "The resource group cannot be null."); - Optional.ofNullable(optimizingQueueByGroup.get(resourceGroup.getName())) - .ifPresent(queue -> queue.updateOptimizerGroup(resourceGroup)); - doAs(ResourceMapper.class, mapper -> mapper.updateResourceGroup(resourceGroup)); - } - - @Override - public void createResource(Resource resource) { - doAs(ResourceMapper.class, mapper -> mapper.insertResource(resource)); - } - - @Override - public void deleteResource(String resourceId) { - doAs(ResourceMapper.class, mapper -> mapper.deleteResource(resourceId)); - } - - @Override - public List listResourceGroups() { - return getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups); - } - - @Override - public List listResourceGroups(String containerName) { - return getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups).stream() - .filter(group -> group.getContainer().equals(containerName)) - .collect(Collectors.toList()); - } - - @Override - public ResourceGroup getResourceGroup(String groupName) { - return getAs(ResourceMapper.class, mapper -> mapper.selectResourceGroup(groupName)); - } - - @Override - public List listResourcesByGroup(String groupName) { - return getAs(ResourceMapper.class, mapper -> mapper.selectResourcesByGroup(groupName)); - } - - @Override - public Resource getResource(String resourceId) { - return getAs(ResourceMapper.class, mapper -> mapper.selectResource(resourceId)); - } - - @Override - public void dispose() { - optimizerKeeper.dispose(); - tableHandlerChain.dispose(); - optimizingQueueByGroup.clear(); - optimizingQueueByToken.clear(); - authOptimizers.clear(); - planExecutor.shutdown(); - } - - public boolean canDeleteResourceGroup(String name) { - for (CatalogMeta catalogMeta : catalogManager.listCatalogMetas()) { - if (catalogMeta.getCatalogProperties() != null - && catalogMeta - .getCatalogProperties() - .getOrDefault( - CatalogMetaProperties.TABLE_PROPERTIES_PREFIX - + TableProperties.SELF_OPTIMIZING_GROUP, - TableProperties.SELF_OPTIMIZING_GROUP_DEFAULT) - .equals(name)) { - return false; - } - } - for (OptimizerInstance optimizer : listOptimizers()) { - if (optimizer.getGroupName().equals(name)) { - return false; - } - } - for (ServerTableIdentifier identifier : tableManager.listManagedTables()) { - if (optimizingQueueByGroup.containsKey(name) - && optimizingQueueByGroup.get(name).containsTable(identifier)) { - return false; - } - } - return true; - } - - @Override - public int getTotalQuota(String resourceGroup) { - return authOptimizers.values().stream() - .filter(optimizer -> optimizer.getGroupName().equals(resourceGroup)) - .mapToInt(OptimizerInstance::getThreadCount) - .sum(); - } - - private class TableRuntimeHandlerImpl extends RuntimeHandlerChain { - - @Override - public void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus) { - if (!tableRuntime.getOptimizingStatus().isProcessing()) { - getOptionalQueueByGroup(tableRuntime.getOptimizerGroup()) - .ifPresent(q -> q.refreshTable(tableRuntime)); - } - } - - @Override - public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) { - String originalGroup = originalConfig.getOptimizingConfig().getOptimizerGroup(); - if (!tableRuntime.getOptimizerGroup().equals(originalGroup)) { - getOptionalQueueByGroup(originalGroup).ifPresent(q -> q.releaseTable(tableRuntime)); - } - getOptionalQueueByGroup(tableRuntime.getOptimizerGroup()) - .ifPresent(q -> q.refreshTable(tableRuntime)); - } - - @Override - public void handleTableAdded(AmoroTable table, TableRuntime tableRuntime) { - getOptionalQueueByGroup(tableRuntime.getOptimizerGroup()) - .ifPresent(q -> q.refreshTable(tableRuntime)); - } - - @Override - public void handleTableRemoved(TableRuntime tableRuntime) { - getOptionalQueueByGroup(tableRuntime.getOptimizerGroup()) - .ifPresent(queue -> queue.releaseTable(tableRuntime)); - } - - @Override - protected void initHandler(List tableRuntimeList) { - LOG.info("OptimizerManagementService begin initializing"); - loadOptimizingQueues(tableRuntimeList); - optimizerKeeper.start(); - LOG.info("SuspendingDetector for Optimizer has been started."); - LOG.info("OptimizerManagementService initializing has completed"); - } - - @Override - protected void doDispose() {} - } - - private class OptimizerKeepingTask implements Delayed { - - private final OptimizerInstance optimizerInstance; - private final long lastTouchTime; - - public OptimizerKeepingTask(OptimizerInstance optimizer) { - this.optimizerInstance = optimizer; - this.lastTouchTime = optimizer.getTouchTime(); - } - - public boolean tryKeeping() { - return Objects.equals(optimizerInstance, authOptimizers.get(optimizerInstance.getToken())) - && lastTouchTime != optimizerInstance.getTouchTime(); - } - - @Override - public long getDelay(@NotNull TimeUnit unit) { - return unit.convert( - lastTouchTime + optimizerTouchTimeout - System.currentTimeMillis(), - TimeUnit.MILLISECONDS); - } - - @Override - public int compareTo(@NotNull Delayed o) { - OptimizerKeepingTask another = (OptimizerKeepingTask) o; - return Long.compare(lastTouchTime, another.lastTouchTime); - } - - public String getToken() { - return optimizerInstance.getToken(); - } - - public OptimizingQueue getQueue() { - return optimizingQueueByGroup.get(optimizerInstance.getGroupName()); - } - - public OptimizerInstance getOptimizer() { - return optimizerInstance; - } - } - - private class OptimizerKeeper implements Runnable { - - private volatile boolean stopped = false; - private final Thread thread = new Thread(this, "optimizer-keeper-thread"); - private final DelayQueue suspendingQueue = new DelayQueue<>(); - - public OptimizerKeeper() { - thread.setDaemon(true); - } - - public void keepInTouch(OptimizerInstance optimizerInstance) { - Preconditions.checkNotNull(optimizerInstance, "token can not be null"); - suspendingQueue.add(new OptimizerKeepingTask(optimizerInstance)); - } - - public void start() { - thread.start(); - } - - public void dispose() { - stopped = true; - thread.interrupt(); - } - - @Override - public void run() { - while (!stopped) { - try { - OptimizerKeepingTask keepingTask = suspendingQueue.take(); - String token = keepingTask.getToken(); - boolean isExpired = !keepingTask.tryKeeping(); - Optional.ofNullable(keepingTask.getQueue()) - .ifPresent( - queue -> - queue - .collectTasks(buildSuspendingPredication(authOptimizers.keySet())) - .forEach(task -> retryTask(task, queue))); - if (isExpired) { - LOG.info("Optimizer {} has been expired, unregister it", keepingTask.getOptimizer()); - unregisterOptimizer(token); - } else { - LOG.debug("Optimizer {} is being touched, keep it", keepingTask.getOptimizer()); - keepInTouch(keepingTask.getOptimizer()); - } - } catch (InterruptedException ignored) { - } catch (Throwable t) { - LOG.error("OptimizerKeeper has encountered a problem.", t); - } - } - } - - private void retryTask(TaskRuntime task, OptimizingQueue queue) { - LOG.info( - "Task {} is suspending, since it's optimizer is expired, put it to retry queue, optimizer {}", - task.getTaskId(), - task.getResourceDesc()); - // optimizing task of suspending optimizer would not be counted for retrying - try { - queue.retryTask(task); - } catch (IllegalTaskStateException e) { - LOG.error( - "Retry task {} failed due to {}, will check it in next round", - task.getTaskId(), - e.getMessage()); - } - } - - private Predicate> buildSuspendingPredication(Set activeTokens) { - return task -> - StringUtils.isNotBlank(task.getToken()) - && !activeTokens.contains(task.getToken()) - && task.getStatus() != TaskRuntime.Status.SUCCESS - || task.getStatus() == TaskRuntime.Status.SCHEDULED - && task.getStartTime() + taskAckTimeout < System.currentTimeMillis(); - } - } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java index acb13e55f4..8eb388f903 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java @@ -35,7 +35,6 @@ import org.apache.amoro.exception.ForbiddenException; import org.apache.amoro.exception.SignatureCheckException; import org.apache.amoro.server.AmoroManagementConf; -import org.apache.amoro.server.DefaultOptimizingService; import org.apache.amoro.server.RestCatalogService; import org.apache.amoro.server.catalog.CatalogManager; import org.apache.amoro.server.dashboard.controller.CatalogController; @@ -51,6 +50,7 @@ import org.apache.amoro.server.dashboard.controller.VersionController; import org.apache.amoro.server.dashboard.response.ErrorResponse; import org.apache.amoro.server.dashboard.utils.ParamSignatureCalculator; +import org.apache.amoro.server.resource.OptimizerManager; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.terminal.TerminalManager; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; @@ -99,7 +99,7 @@ public DashboardServer( Configurations serviceConfig, CatalogManager catalogManager, TableManager tableManager, - DefaultOptimizingService optimizerManager, + OptimizerManager optimizerManager, TerminalManager terminalManager) { PlatformFileManager platformFileManager = new PlatformFileManager(); this.catalogController = new CatalogController(catalogManager, platformFileManager); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java index 9e34df3ab3..2288f3e430 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java @@ -22,10 +22,10 @@ import org.apache.amoro.resource.Resource; import org.apache.amoro.resource.ResourceGroup; import org.apache.amoro.resource.ResourceType; -import org.apache.amoro.server.DefaultOptimizingService; import org.apache.amoro.server.dashboard.response.OkResponse; import org.apache.amoro.server.resource.ContainerMetadata; import org.apache.amoro.server.resource.OptimizerInstance; +import org.apache.amoro.server.resource.OptimizerManager; import org.apache.amoro.server.resource.ResourceContainers; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; @@ -36,9 +36,9 @@ /** The controller that handles optimizer requests. */ public class OptimizerController { - private final DefaultOptimizingService optimizerManager; + private final OptimizerManager optimizerManager; - public OptimizerController(DefaultOptimizingService optimizerManager) { + public OptimizerController(OptimizerManager optimizerManager) { this.optimizerManager = optimizerManager; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java index e8b3a6d5a2..5b85e51285 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java @@ -22,7 +22,6 @@ import org.apache.amoro.resource.Resource; import org.apache.amoro.resource.ResourceGroup; import org.apache.amoro.resource.ResourceType; -import org.apache.amoro.server.DefaultOptimizingService; import org.apache.amoro.server.dashboard.model.OptimizerInstanceInfo; import org.apache.amoro.server.dashboard.model.OptimizerResourceInfo; import org.apache.amoro.server.dashboard.model.TableOptimizingInfo; @@ -31,6 +30,7 @@ import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.resource.ContainerMetadata; import org.apache.amoro.server.resource.OptimizerInstance; +import org.apache.amoro.server.resource.OptimizerManager; import org.apache.amoro.server.resource.ResourceContainers; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; @@ -56,10 +56,9 @@ public class OptimizerGroupController { private static final String ALL_GROUP = "all"; private final TableManager tableManager; - private final DefaultOptimizingService optimizerManager; + private final OptimizerManager optimizerManager; - public OptimizerGroupController( - TableManager tableManager, DefaultOptimizingService optimizerManager) { + public OptimizerGroupController(TableManager tableManager, OptimizerManager optimizerManager) { this.tableManager = tableManager; this.optimizerManager = optimizerManager; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/resource/DefaultOptimizerManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/resource/DefaultOptimizerManager.java new file mode 100644 index 0000000000..6f3c503181 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/resource/DefaultOptimizerManager.java @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.resource; + +import org.apache.amoro.AmoroTable; +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.api.CatalogMeta; +import org.apache.amoro.config.Configurations; +import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.exception.IllegalTaskStateException; +import org.apache.amoro.exception.ObjectNotExistsException; +import org.apache.amoro.exception.PluginRetryAuthException; +import org.apache.amoro.properties.CatalogMetaProperties; +import org.apache.amoro.resource.Resource; +import org.apache.amoro.resource.ResourceGroup; +import org.apache.amoro.server.AmoroManagementConf; +import org.apache.amoro.server.catalog.CatalogManager; +import org.apache.amoro.server.optimizing.OptimizingQueue; +import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.amoro.server.optimizing.TaskRuntime; +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.persistence.mapper.OptimizerMapper; +import org.apache.amoro.server.persistence.mapper.ResourceMapper; +import org.apache.amoro.server.table.RuntimeHandlerChain; +import org.apache.amoro.server.table.TableManager; +import org.apache.amoro.server.table.TableRuntime; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; +import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; +import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.amoro.table.TableProperties; +import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +public class DefaultOptimizerManager extends PersistentBase + implements OptimizerManager, QuotaProvider { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultOptimizerManager.class); + + private final long optimizerTouchTimeout; + private final long taskAckTimeout; + private final int maxPlanningParallelism; + private final Map optimizingQueueByGroup = new ConcurrentHashMap<>(); + private final Map optimizingQueueByToken = new ConcurrentHashMap<>(); + private final Map authOptimizers = new ConcurrentHashMap<>(); + private final OptimizerKeeper optimizerKeeper = new OptimizerKeeper(); + private final CatalogManager catalogManager; + private final TableManager tableManager; + private final RuntimeHandlerChain tableHandlerChain; + private final ExecutorService planExecutor; + + public DefaultOptimizerManager( + Configurations serviceConfig, CatalogManager catalogManager, TableManager tableManager) { + this.optimizerTouchTimeout = serviceConfig.get(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT).toMillis(); + this.taskAckTimeout = serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT).toMillis(); + this.maxPlanningParallelism = + serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM); + this.tableManager = tableManager; + this.catalogManager = catalogManager; + this.tableHandlerChain = new TableRuntimeHandlerImpl(); + this.planExecutor = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("plan-executor-thread-%d") + .setDaemon(true) + .build()); + } + + public RuntimeHandlerChain getTableRuntimeHandler() { + return tableHandlerChain; + } + + private void loadOptimizingQueues(List tableRuntimeMetaList) { + List optimizerGroups = + getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups); + List optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll); + Map> groupToTableRuntimes = + tableRuntimeMetaList.stream() + .collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup)); + optimizerGroups.forEach( + group -> { + String groupName = group.getName(); + List tableRuntimes = groupToTableRuntimes.remove(groupName); + OptimizingQueue optimizingQueue = + new OptimizingQueue( + catalogManager, + group, + this, + planExecutor, + Optional.ofNullable(tableRuntimes).orElseGet(ArrayList::new), + maxPlanningParallelism); + optimizingQueueByGroup.put(groupName, optimizingQueue); + }); + optimizers.forEach(optimizer -> registerOptimizer(optimizer, false)); + groupToTableRuntimes + .keySet() + .forEach(groupName -> LOG.warn("Unloaded task runtime in group {}", groupName)); + } + + @Override + public void registerOptimizer(OptimizerInstance optimizer, boolean needPersistent) { + if (needPersistent) { + doAs(OptimizerMapper.class, mapper -> mapper.insertOptimizer(optimizer)); + } + + OptimizingQueue optimizingQueue = optimizingQueueByGroup.get(optimizer.getGroupName()); + optimizingQueue.addOptimizer(optimizer); + authOptimizers.put(optimizer.getToken(), optimizer); + optimizingQueueByToken.put(optimizer.getToken(), optimizingQueue); + optimizerKeeper.keepInTouch(optimizer); + } + + @Override + public void unregisterOptimizer(String token) { + doAs(OptimizerMapper.class, mapper -> mapper.deleteOptimizer(token)); + OptimizingQueue optimizingQueue = optimizingQueueByToken.remove(token); + OptimizerInstance optimizer = authOptimizers.remove(token); + if (optimizingQueue != null) { + optimizingQueue.removeOptimizer(optimizer); + } + } + + private OptimizerInstance getAuthenticatedOptimizer(String authToken) { + Preconditions.checkArgument(authToken != null, "authToken can not be null"); + return Optional.ofNullable(authOptimizers.get(authToken)) + .orElseThrow(() -> new PluginRetryAuthException("Optimizer has not been authenticated")); + } + + /** + * Get optimizing queue. + * + * @return OptimizeQueueItem + */ + @Override + public OptimizingQueue getQueueByGroup(String optimizerGroup) { + return getOptionalQueueByGroup(optimizerGroup) + .orElseThrow(() -> new ObjectNotExistsException("Optimizer group " + optimizerGroup)); + } + + @Override + public Optional getOptionalQueueByGroup(String optimizerGroup) { + Preconditions.checkArgument(optimizerGroup != null, "optimizerGroup can not be null"); + return Optional.ofNullable(optimizingQueueByGroup.get(optimizerGroup)); + } + + @Override + public OptimizingQueue getQueueByToken(String token) { + Preconditions.checkArgument(token != null, "optimizer token can not be null"); + return Optional.ofNullable(optimizingQueueByToken.get(token)) + .orElseThrow(() -> new PluginRetryAuthException("Optimizer has not been authenticated")); + } + + @Override + public List listOptimizers() { + return ImmutableList.copyOf(authOptimizers.values()); + } + + @Override + public List listOptimizers(String group) { + return authOptimizers.values().stream() + .filter(optimizer -> optimizer.getGroupName().equals(group)) + .collect(Collectors.toList()); + } + + @Override + public void deleteOptimizer(String group, String resourceId) { + List deleteOptimizers = + getAs(OptimizerMapper.class, mapper -> mapper.selectByResourceId(resourceId)); + deleteOptimizers.forEach( + optimizer -> { + String token = optimizer.getToken(); + unregisterOptimizer(token); + }); + } + + @Override + public void createResourceGroup(ResourceGroup resourceGroup) { + doAsTransaction( + () -> { + doAs(ResourceMapper.class, mapper -> mapper.insertResourceGroup(resourceGroup)); + OptimizingQueue optimizingQueue = + new OptimizingQueue( + catalogManager, + resourceGroup, + this, + planExecutor, + new ArrayList<>(), + maxPlanningParallelism); + optimizingQueueByGroup.put(resourceGroup.getName(), optimizingQueue); + }); + } + + @Override + public void deleteResourceGroup(String groupName) { + if (canDeleteResourceGroup(groupName)) { + doAs(ResourceMapper.class, mapper -> mapper.deleteResourceGroup(groupName)); + OptimizingQueue optimizingQueue = optimizingQueueByGroup.remove(groupName); + optimizingQueue.dispose(); + } else { + throw new RuntimeException( + String.format( + "The resource group %s cannot be deleted because it is currently in " + "use.", + groupName)); + } + } + + @Override + public void updateResourceGroup(ResourceGroup resourceGroup) { + Preconditions.checkNotNull(resourceGroup, "The resource group cannot be null."); + Optional.ofNullable(optimizingQueueByGroup.get(resourceGroup.getName())) + .ifPresent(queue -> queue.updateOptimizerGroup(resourceGroup)); + doAs(ResourceMapper.class, mapper -> mapper.updateResourceGroup(resourceGroup)); + } + + @Override + public void createResource(Resource resource) { + doAs(ResourceMapper.class, mapper -> mapper.insertResource(resource)); + } + + @Override + public void deleteResource(String resourceId) { + doAs(ResourceMapper.class, mapper -> mapper.deleteResource(resourceId)); + } + + @Override + public List listResourceGroups() { + return getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups); + } + + @Override + public List listResourceGroups(String containerName) { + return getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups).stream() + .filter(group -> group.getContainer().equals(containerName)) + .collect(Collectors.toList()); + } + + @Override + public ResourceGroup getResourceGroup(String groupName) { + return getAs(ResourceMapper.class, mapper -> mapper.selectResourceGroup(groupName)); + } + + @Override + public List listResourcesByGroup(String groupName) { + return getAs(ResourceMapper.class, mapper -> mapper.selectResourcesByGroup(groupName)); + } + + @Override + public Resource getResource(String resourceId) { + return getAs(ResourceMapper.class, mapper -> mapper.selectResource(resourceId)); + } + + @Override + public void dispose() { + optimizerKeeper.dispose(); + tableHandlerChain.dispose(); + optimizingQueueByGroup.clear(); + optimizingQueueByToken.clear(); + authOptimizers.clear(); + planExecutor.shutdown(); + } + + @Override + public boolean canDeleteResourceGroup(String name) { + for (CatalogMeta catalogMeta : catalogManager.listCatalogMetas()) { + if (catalogMeta.getCatalogProperties() != null + && catalogMeta + .getCatalogProperties() + .getOrDefault( + CatalogMetaProperties.TABLE_PROPERTIES_PREFIX + + TableProperties.SELF_OPTIMIZING_GROUP, + TableProperties.SELF_OPTIMIZING_GROUP_DEFAULT) + .equals(name)) { + return false; + } + } + for (OptimizerInstance optimizer : listOptimizers()) { + if (optimizer.getGroupName().equals(name)) { + return false; + } + } + for (ServerTableIdentifier identifier : tableManager.listManagedTables()) { + if (optimizingQueueByGroup.containsKey(name) + && optimizingQueueByGroup.get(name).containsTable(identifier)) { + return false; + } + } + return true; + } + + private class TableRuntimeHandlerImpl extends RuntimeHandlerChain { + + @Override + public void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus) { + if (!tableRuntime.getOptimizingStatus().isProcessing()) { + getOptionalQueueByGroup(tableRuntime.getOptimizerGroup()) + .ifPresent(q -> q.refreshTable(tableRuntime)); + } + } + + @Override + public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) { + String originalGroup = originalConfig.getOptimizingConfig().getOptimizerGroup(); + if (!tableRuntime.getOptimizerGroup().equals(originalGroup)) { + getOptionalQueueByGroup(originalGroup).ifPresent(q -> q.releaseTable(tableRuntime)); + } + getOptionalQueueByGroup(tableRuntime.getOptimizerGroup()) + .ifPresent(q -> q.refreshTable(tableRuntime)); + } + + @Override + public void handleTableAdded(AmoroTable table, TableRuntime tableRuntime) { + getOptionalQueueByGroup(tableRuntime.getOptimizerGroup()) + .ifPresent(q -> q.refreshTable(tableRuntime)); + } + + @Override + public void handleTableRemoved(TableRuntime tableRuntime) { + getOptionalQueueByGroup(tableRuntime.getOptimizerGroup()) + .ifPresent(queue -> queue.releaseTable(tableRuntime)); + } + + @Override + protected void initHandler(List tableRuntimeList) { + LOG.info("OptimizerManagementService begin initializing"); + loadOptimizingQueues(tableRuntimeList); + optimizerKeeper.start(); + LOG.info("SuspendingDetector for Optimizer has been started."); + LOG.info("OptimizerManagementService initializing has completed"); + } + + @Override + protected void doDispose() {} + } + + private class OptimizerKeepingTask implements Delayed { + + private final OptimizerInstance optimizerInstance; + private final long lastTouchTime; + + public OptimizerKeepingTask(OptimizerInstance optimizer) { + this.optimizerInstance = optimizer; + this.lastTouchTime = optimizer.getTouchTime(); + } + + public boolean tryKeeping() { + return Objects.equals(optimizerInstance, authOptimizers.get(optimizerInstance.getToken())) + && lastTouchTime != optimizerInstance.getTouchTime(); + } + + @Override + public long getDelay(@NotNull TimeUnit unit) { + return unit.convert( + lastTouchTime + optimizerTouchTimeout - System.currentTimeMillis(), + TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(@NotNull Delayed o) { + OptimizerKeepingTask another = (OptimizerKeepingTask) o; + return Long.compare(lastTouchTime, another.lastTouchTime); + } + + public String getToken() { + return optimizerInstance.getToken(); + } + + public OptimizingQueue getQueue() { + return optimizingQueueByGroup.get(optimizerInstance.getGroupName()); + } + + public OptimizerInstance getOptimizer() { + return optimizerInstance; + } + } + + private class OptimizerKeeper implements Runnable { + + private volatile boolean stopped = false; + private final Thread thread = new Thread(this, "optimizer-keeper-thread"); + private final DelayQueue suspendingQueue = new DelayQueue<>(); + + public OptimizerKeeper() { + thread.setDaemon(true); + } + + public void keepInTouch(OptimizerInstance optimizerInstance) { + Preconditions.checkNotNull(optimizerInstance, "token can not be null"); + suspendingQueue.add(new OptimizerKeepingTask(optimizerInstance)); + } + + public void start() { + thread.start(); + } + + public void dispose() { + stopped = true; + thread.interrupt(); + } + + @Override + public void run() { + while (!stopped) { + try { + OptimizerKeepingTask keepingTask = suspendingQueue.take(); + String token = keepingTask.getToken(); + boolean isExpired = !keepingTask.tryKeeping(); + Optional.ofNullable(keepingTask.getQueue()) + .ifPresent( + queue -> + queue + .collectTasks(buildSuspendingPredication(authOptimizers.keySet())) + .forEach(task -> retryTask(task, queue))); + if (isExpired) { + LOG.info("Optimizer {} has been expired, unregister it", keepingTask.getOptimizer()); + unregisterOptimizer(token); + } else { + LOG.debug("Optimizer {} is being touched, keep it", keepingTask.getOptimizer()); + keepInTouch(keepingTask.getOptimizer()); + } + } catch (InterruptedException ignored) { + } catch (Throwable t) { + LOG.error("OptimizerKeeper has encountered a problem.", t); + } + } + } + + private void retryTask(TaskRuntime task, OptimizingQueue queue) { + LOG.info( + "Task {} is suspending, since it's optimizer is expired, put it to retry queue, optimizer {}", + task.getTaskId(), + task.getResourceDesc()); + // optimizing task of suspending optimizer would not be counted for retrying + try { + queue.retryTask(task); + } catch (IllegalTaskStateException e) { + LOG.error( + "Retry task {} failed due to {}, will check it in next round", + task.getTaskId(), + e.getMessage()); + } + } + + private Predicate> buildSuspendingPredication(Set activeTokens) { + return task -> + StringUtils.isNotBlank(task.getToken()) + && !activeTokens.contains(task.getToken()) + && task.getStatus() != TaskRuntime.Status.SUCCESS + || task.getStatus() == TaskRuntime.Status.SCHEDULED + && task.getStartTime() + taskAckTimeout < System.currentTimeMillis(); + } + } + + @Override + public int getTotalQuota(String resourceGroup) { + return authOptimizers.values().stream() + .filter(optimizer -> optimizer.getGroupName().equals(resourceGroup)) + .mapToInt(OptimizerInstance::getThreadCount) + .sum(); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/resource/OptimizerManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/resource/OptimizerManager.java index c7a1daf3c0..530344f6b2 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/resource/OptimizerManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/resource/OptimizerManager.java @@ -19,8 +19,10 @@ package org.apache.amoro.server.resource; import org.apache.amoro.resource.ResourceManager; +import org.apache.amoro.server.optimizing.OptimizingQueue; import java.util.List; +import java.util.Optional; public interface OptimizerManager extends ResourceManager { List listOptimizers(); @@ -28,4 +30,16 @@ public interface OptimizerManager extends ResourceManager { List listOptimizers(String groupName); void deleteOptimizer(String groupName, String resourceId); + + void registerOptimizer(OptimizerInstance optimizer, boolean needPersistent); + + void unregisterOptimizer(String token); + + Optional getOptionalQueueByGroup(String optimizerGroup); + + OptimizingQueue getQueueByGroup(String optimizerGroup); + + OptimizingQueue getQueueByToken(String token); + + boolean canDeleteResourceGroup(String name); } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java index da95059bc0..e8d9712690 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java @@ -23,6 +23,7 @@ import org.apache.amoro.server.catalog.DefaultCatalogManager; import org.apache.amoro.server.manager.EventsManager; import org.apache.amoro.server.manager.MetricManager; +import org.apache.amoro.server.resource.DefaultOptimizerManager; import org.apache.amoro.server.table.DefaultTableManager; import org.apache.amoro.server.table.DerbyPersistence; import org.apache.amoro.server.table.TableManager; @@ -37,6 +38,7 @@ public abstract class AMSManagerTestBase { protected static DefaultCatalogManager CATALOG_MANAGER = null; protected static DefaultTableManager TABLE_MANAGER = null; + protected static DefaultOptimizerManager OPTIMIZER_MANAGER = null; @BeforeClass public static void initTableManger() { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java index b1f29861d3..638586fca2 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java @@ -40,12 +40,11 @@ public static void initTableService() { configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, Duration.ofMillis(800L)); TABLE_SERVICE = new DefaultTableService(new Configurations(), CATALOG_MANAGER); OPTIMIZING_SERVICE = - new DefaultOptimizingService( - configurations, CATALOG_MANAGER, TABLE_MANAGER, TABLE_SERVICE); - TABLE_SERVICE.addHandlerChain(OPTIMIZING_SERVICE.getTableRuntimeHandler()); + new DefaultOptimizingService(configurations, OPTIMIZER_MANAGER, TABLE_SERVICE); + TABLE_SERVICE.addHandlerChain(OPTIMIZER_MANAGER.getTableRuntimeHandler()); TABLE_SERVICE.initialize(); try { - OPTIMIZING_SERVICE.createResourceGroup(defaultResourceGroup()); + OPTIMIZER_MANAGER.createResourceGroup(defaultResourceGroup()); } catch (Throwable ignored) { } } catch (Throwable throwable) { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java index 38d95adb07..5393ee06b8 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java @@ -307,6 +307,7 @@ public void stopOptimizer() { field .bind(serviceContainer) .get() + .getOptimizerManager() .listOptimizers() .forEach( resource -> { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index a97738ae8a..b4847dfa00 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -111,10 +111,12 @@ public void clear() { toucher = null; } optimizingService() + .getOptimizerManager() .listOptimizers() .forEach( optimizer -> optimizingService() + .getOptimizerManager() .deleteOptimizer(optimizer.getGroupName(), optimizer.getResourceId())); dropTable(); dropDatabase(); @@ -228,7 +230,7 @@ public void testPollTaskThreeTimes() { @Test public void testTouch() throws InterruptedException { - OptimizerInstance optimizer = optimizingService().listOptimizers().get(0); + OptimizerInstance optimizer = optimizingService().getOptimizerManager().listOptimizers().get(0); long oldTouchTime = optimizer.getTouchTime(); Thread.sleep(1); optimizingService().touch(token);