From 47b498e1e6108e49c2e4f3ddfcd44d8494dbcdf9 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Mon, 2 Dec 2024 14:56:24 -0500 Subject: [PATCH 01/23] Reimplemented resource group jvm and property overrides (#5126) Reversed some changes made in #5111 and implemented them differently based on conversation on that PR. I was able to handle per-group per-process JVM configuration in the accumulo-env.sh file negating the need for a seperate group specific file. I also included an "includeOptional" property in the accumulo.properties file that will be resolved at runtime to add additional properties to the configuration. Users can change this to "include" if they want the process to fail when the file is missing. Co-authored-by: Christopher Tubbs --- assemble/bin/accumulo-service | 20 ++------ assemble/conf/accumulo-env.sh | 26 +++++++--- assemble/conf/accumulo.properties | 5 ++ assemble/conf/default-group-env.sh | 80 ------------------------------ 4 files changed, 26 insertions(+), 105 deletions(-) delete mode 100644 assemble/conf/default-group-env.sh diff --git a/assemble/bin/accumulo-service b/assemble/bin/accumulo-service index ea77129fe20..ebc486b73ff 100755 --- a/assemble/bin/accumulo-service +++ b/assemble/bin/accumulo-service @@ -70,15 +70,6 @@ function get_group() { echo "${group}" } -function get_group_overrides() { - service="$1" - group="$2" - if [[ -f "${conf}/${group}-group-env.sh" ]]; then - #shellcheck source=../conf/default-group-env.sh - source "${conf}/${group}-group-env.sh" "${service}" - fi -} - function start_service() { local service_type=$1 local service_name=$2 @@ -93,14 +84,6 @@ function start_service() { servers_per_host=${ACCUMULO_CLUSTER_ARG:-1} fi - group=$(get_group "$@") - - # - # Get any resource group overrides, this should - # export ACCUMULO_JAVA_OPTS and PROPERTY_OVERRIDES - # - get_group_overrides "$service_type" "$group" - for ((process_num = 1; process_num <= servers_per_host; process_num++)); do if [[ ${build_service_name} == "true" ]]; then service_name="${service_type}_${group}_${process_num}" @@ -253,6 +236,9 @@ function main() { export conf="${ACCUMULO_CONF_DIR:-${basedir}/conf}" export lib="${basedir}/lib" + group=$(get_group "$@") + export ACCUMULO_RESOURCE_GROUP="$group" + if [[ -f "${conf}/accumulo-env.sh" ]]; then #shellcheck source=../conf/accumulo-env.sh source "${conf}/accumulo-env.sh" diff --git a/assemble/conf/accumulo-env.sh b/assemble/conf/accumulo-env.sh index 7bdc5703809..b39b1b94e7c 100644 --- a/assemble/conf/accumulo-env.sh +++ b/assemble/conf/accumulo-env.sh @@ -89,14 +89,24 @@ JAVA_OPTS=( ## JVM options set for individual applications # cmd is set by calling script that sources this env file #shellcheck disable=SC2154 -case "$cmd" in - manager) JAVA_OPTS=('-Xmx512m' '-Xms512m' "${JAVA_OPTS[@]}") ;; - monitor) JAVA_OPTS=('-Xmx256m' '-Xms256m' "${JAVA_OPTS[@]}") ;; - gc) JAVA_OPTS=('-Xmx256m' '-Xms256m' "${JAVA_OPTS[@]}") ;; - tserver) JAVA_OPTS=('-Xmx768m' '-Xms768m' "${JAVA_OPTS[@]}") ;; - compactor) JAVA_OPTS=('-Xmx256m' '-Xms256m' "${JAVA_OPTS[@]}") ;; - sserver) JAVA_OPTS=('-Xmx512m' '-Xms512m' "${JAVA_OPTS[@]}") ;; - *) JAVA_OPTS=('-Xmx256m' '-Xms64m' "${JAVA_OPTS[@]}") ;; +case "${ACCUMULO_RESOURCE_GROUP:-default}" in + default) + # shellcheck disable=SC2154 + # $cmd is exported in the accumulo script, but not the accumulo-service script + case "$cmd" in + manager) JAVA_OPTS=('-Xmx512m' '-Xms512m' "${JAVA_OPTS[@]}") ;; + monitor) JAVA_OPTS=('-Xmx256m' '-Xms256m' "${JAVA_OPTS[@]}") ;; + gc) JAVA_OPTS=('-Xmx256m' '-Xms256m' "${JAVA_OPTS[@]}") ;; + tserver) JAVA_OPTS=('-Xmx768m' '-Xms768m' "${JAVA_OPTS[@]}") ;; + compactor) JAVA_OPTS=('-Xmx256m' '-Xms256m' "${JAVA_OPTS[@]}") ;; + sserver) JAVA_OPTS=('-Xmx512m' '-Xms512m' "${JAVA_OPTS[@]}") ;; + *) JAVA_OPTS=('-Xmx256m' '-Xms64m' "${JAVA_OPTS[@]}") ;; + esac + ;; + *) + echo "ACCUMULO_RESOURCE_GROUP named $ACCUMULO_RESOURCE_GROUP is not configured" + exit 1 + ;; esac ## JVM options set for logging. Review log4j2.properties file to see how they are used. diff --git a/assemble/conf/accumulo.properties b/assemble/conf/accumulo.properties index 8aea4831535..6e6de34cc63 100644 --- a/assemble/conf/accumulo.properties +++ b/assemble/conf/accumulo.properties @@ -31,3 +31,8 @@ instance.secret=DEFAULT ## Set to false if 'accumulo-util build-native' fails tserver.memory.maps.native.enabled=true + +## (optional) include additional property files for a resource group +## based on the ACCUMULO_RESOURCE_GROUP env var set in accumulo-service +#include=group-${env:ACCUMULO_RESOURCE_GROUP}.properties +#includeOptional=group-${env:ACCUMULO_RESOURCE_GROUP}.properties diff --git a/assemble/conf/default-group-env.sh b/assemble/conf/default-group-env.sh deleted file mode 100644 index a1a46260da5..00000000000 --- a/assemble/conf/default-group-env.sh +++ /dev/null @@ -1,80 +0,0 @@ -#! /usr/bin/env bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This script is called from accumulo-service and serves as a mechanism to set -# ACCUMULO_JAVA_OPTS and provide property overrides for the server processes in -# a resource group. This allows the user to set a base configuration in -# accumulo-env.sh and accumulo.properties and then tailor the configuration of -# the individual processes in a resource group. For example, when you have a -# cluster comprised of sets of servers with different amounts of memory, then -# the user can set default JVM heap settings in accumulo-env.sh, and then -# increase or decrease in this file. -# -# This script will export ACCUMULO_JAVA_OPTS and PROPERTY_OVERRIDES variables -# based on the service type for the default resource group. Additional files can -# be created based on the groups configured in cluster.yaml and should be named -# "{groupName}-group-env.sh". The contents of ACCUMULO_JAVA_OPTS will be appended -# to the JAVA_OPTS variable that is created in accumulo-env.sh, allowing the user -# to override any setting for this resource group. Likewise, the contents of -# PROPERTY_OVERRIDES will be appended to the arguments provided to the server -# class allowing the user to override any Accumulo property for this resource group. -# Overriding the bind address and group name properties would not be advisable as -# this could lead to deployment issues. - -# Examples: -# -# Override JVM args -# export ACCUMULO_JAVA_OPTS="-Xms1024m -Xmx1024m" -# -# Override Accumulo properties -# export PROPERTY_OVERRIDES=('-o' 'rpc.backlog=1000' '-o' 'tserver.hold.time.max=10m') - -service=$1 - -case "$service" in - manager) - export ACCUMULO_JAVA_OPTS="" - export PROPERTY_OVERRIDES=() - ;; - monitor) - export ACCUMULO_JAVA_OPTS="" - export PROPERTY_OVERRIDES=() - ;; - gc) - export ACCUMULO_JAVA_OPTS="" - export PROPERTY_OVERRIDES=() - ;; - tserver) - export ACCUMULO_JAVA_OPTS="" - export PROPERTY_OVERRIDES=() - ;; - compactor) - export ACCUMULO_JAVA_OPTS="" - export PROPERTY_OVERRIDES=() - ;; - sserver) - export ACCUMULO_JAVA_OPTS="" - export PROPERTY_OVERRIDES=() - ;; - *) - export ACCUMULO_JAVA_OPTS="" - export PROPERTY_OVERRIDES=() - ;; -esac From 39c753e514f4f8142923d62ab33ac3282f42c575 Mon Sep 17 00:00:00 2001 From: Christopher Tubbs Date: Mon, 2 Dec 2024 15:52:47 -0500 Subject: [PATCH 02/23] Simplify ZooKeeperTestingServer (#5125) * Remove automatically-generated client object * Simplify the way new client objects are constructed by using a functional interface to specify the constructor, and then automatically attaching the digest and checking that it's connected before returning * Inline the private constructor into the public one * Remove unused public methods * Remove initPaths (redundant with ZooReaderWriter.mkdirs) Also, make related changes to tests that use ZooKeeperTestingServer: * Remove unnecessary exception handling * Remove unused generated instanceId and path initialization in ZooMutatorIT and ServiceLockIT * Update ServiceLockIT to remove unneeded inner classes and to simplify constructing ZooKeeper clients, relying on the fact that ZooKeeperTestingServer returns clients that have been verified connected, and also have the authentication on them --- .../conf/store/PropCacheCaffeineImplZkIT.java | 57 ++++++------- .../test/conf/store/PropStoreZooKeeperIT.java | 59 ++++++------- .../test/conf/store/ZooBasedConfigIT.java | 57 +++++-------- .../test/fate/zookeeper/ZooMutatorIT.java | 5 -- .../accumulo/test/lock/ServiceLockIT.java | 83 +++--------------- .../zookeeper/ZooKeeperTestingServer.java | 84 +++++++------------ 6 files changed, 116 insertions(+), 229 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java b/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java index 22d06b83a18..bb96d3c7471 100644 --- a/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java +++ b/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java @@ -51,7 +51,6 @@ import org.apache.accumulo.server.conf.store.impl.ZooPropLoader; import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZKUtil; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; @@ -83,10 +82,10 @@ public class PropCacheCaffeineImplZkIT { private static File tempDir; @BeforeAll - public static void setupZk() { + public static void setupZk() throws Exception { // using default zookeeper port - we don't have a full configuration testZk = new ZooKeeperTestingServer(tempDir); - zooKeeper = testZk.getZooKeeper(); + zooKeeper = testZk.newClient(); zrw = testZk.getZooReaderWriter(); context = createNiceMock(ServerContext.class); @@ -99,42 +98,34 @@ public static void setupZk() { @AfterAll public static void shutdownZK() throws Exception { verify(context); - testZk.close(); + try { + zooKeeper.close(); + } finally { + testZk.close(); + } } @BeforeEach - public void setupZnodes() { - testZk.initPaths(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZCONFIG); - try { - zooKeeper.create(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES, new byte[0], - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zooKeeper.create(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES + "/" + tIdA.canonical(), - new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zooKeeper.create( - ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES + "/" + tIdA.canonical() + "/conf", - new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - zooKeeper.create(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES + "/" + tIdB.canonical(), - new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zooKeeper.create( - ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES + "/" + tIdB.canonical() + "/conf", - new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - } catch (KeeperException ex) { - log.trace("Issue during zk initialization, skipping", ex); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Interrupted during zookeeper path initialization", ex); - } + public void setupZnodes() throws Exception { + zrw.mkdirs(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZCONFIG); + zooKeeper.create(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zooKeeper.create(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES + "/" + tIdA.canonical(), + new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zooKeeper.create( + ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES + "/" + tIdA.canonical() + "/conf", + new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + zooKeeper.create(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES + "/" + tIdB.canonical(), + new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zooKeeper.create( + ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES + "/" + tIdB.canonical() + "/conf", + new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } @AfterEach - public void cleanupZnodes() { - try { - ZKUtil.deleteRecursive(zooKeeper, "/accumulo"); - } catch (KeeperException | InterruptedException ex) { - throw new IllegalStateException("Failed to clean-up test zooKeeper nodes.", ex); - } + public void cleanupZnodes() throws Exception { + ZKUtil.deleteRecursive(zooKeeper, "/accumulo"); } @Test diff --git a/test/src/main/java/org/apache/accumulo/test/conf/store/PropStoreZooKeeperIT.java b/test/src/main/java/org/apache/accumulo/test/conf/store/PropStoreZooKeeperIT.java index 55c5b193aca..f185f933e09 100644 --- a/test/src/main/java/org/apache/accumulo/test/conf/store/PropStoreZooKeeperIT.java +++ b/test/src/main/java/org/apache/accumulo/test/conf/store/PropStoreZooKeeperIT.java @@ -83,59 +83,52 @@ public class PropStoreZooKeeperIT { private static File tempDir; @BeforeAll - public static void setupZk() { + public static void setupZk() throws Exception { // using default zookeeper port - we don't have a full configuration testZk = new ZooKeeperTestingServer(tempDir); - zooKeeper = testZk.getZooKeeper(); + zooKeeper = testZk.newClient(); ZooUtil.digestAuth(zooKeeper, ZooKeeperTestingServer.SECRET); } @AfterAll public static void shutdownZK() throws Exception { - testZk.close(); + try { + zooKeeper.close(); + } finally { + testZk.close(); + } } @BeforeEach - public void setupZnodes() { + public void setupZnodes() throws Exception { + var zrw = testZk.getZooReaderWriter(); instanceId = InstanceId.of(UUID.randomUUID()); context = EasyMock.createNiceMock(ServerContext.class); expect(context.getInstanceID()).andReturn(instanceId).anyTimes(); - expect(context.getZooReaderWriter()).andReturn(testZk.getZooReaderWriter()).anyTimes(); + expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes(); replay(context); - testZk.initPaths(ZooUtil.getRoot(instanceId) + Constants.ZCONFIG); - try { - zooKeeper.create(ZooUtil.getRoot(instanceId) + Constants.ZTABLES, new byte[0], - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zooKeeper.create(ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + tIdA.canonical(), - new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zooKeeper.create( - ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + tIdA.canonical() + "/conf", - new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - zooKeeper.create(ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + tIdB.canonical(), - new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zooKeeper.create( - ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + tIdB.canonical() + "/conf", - new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - } catch (KeeperException ex) { - log.trace("Issue during zk initialization, skipping", ex); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Interrupted during zookeeper path initialization", ex); - } + zrw.mkdirs(ZooUtil.getRoot(instanceId) + Constants.ZCONFIG); + zooKeeper.create(ZooUtil.getRoot(instanceId) + Constants.ZTABLES, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zooKeeper.create(ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + tIdA.canonical(), + new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zooKeeper.create( + ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + tIdA.canonical() + "/conf", + new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + zooKeeper.create(ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + tIdB.canonical(), + new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zooKeeper.create( + ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + tIdB.canonical() + "/conf", + new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); propStore = ZooPropStore.initialize(instanceId, context.getZooReaderWriter()); } @AfterEach - public void cleanupZnodes() { - try { - ZKUtil.deleteRecursive(zooKeeper, "/accumulo"); - } catch (KeeperException | InterruptedException ex) { - throw new IllegalStateException("Failed to clean-up test zooKeeper nodes.", ex); - } + public void cleanupZnodes() throws Exception { + ZKUtil.deleteRecursive(zooKeeper, "/accumulo"); } /** diff --git a/test/src/main/java/org/apache/accumulo/test/conf/store/ZooBasedConfigIT.java b/test/src/main/java/org/apache/accumulo/test/conf/store/ZooBasedConfigIT.java index 2004d09af8a..285790b4b7e 100644 --- a/test/src/main/java/org/apache/accumulo/test/conf/store/ZooBasedConfigIT.java +++ b/test/src/main/java/org/apache/accumulo/test/conf/store/ZooBasedConfigIT.java @@ -59,7 +59,6 @@ import org.apache.accumulo.server.conf.store.impl.ZooPropStore; import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZKUtil; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; @@ -99,46 +98,40 @@ public class ZooBasedConfigIT { private static File tempDir; @BeforeAll - public static void setupZk() { - + public static void setupZk() throws Exception { // using default zookeeper port - we don't have a full configuration testZk = new ZooKeeperTestingServer(tempDir); - zooKeeper = testZk.getZooKeeper(); + zooKeeper = testZk.newClient(); ZooUtil.digestAuth(zooKeeper, ZooKeeperTestingServer.SECRET); zrw = testZk.getZooReaderWriter(); } @AfterAll public static void shutdownZK() throws Exception { - testZk.close(); + try { + zooKeeper.close(); + } finally { + testZk.close(); + } } @BeforeEach - public void initPaths() { + public void initPaths() throws Exception { context = createMock(ServerContext.class); - testZk.initPaths(ZooUtil.getRoot(INSTANCE_ID)); + zrw.mkdirs(ZooUtil.getRoot(INSTANCE_ID)); - try { - zooKeeper.create(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES, new byte[0], - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - zooKeeper.create(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES + "/" + tidA.canonical(), - new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zooKeeper.create(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES + "/" + tidB.canonical(), - new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - zooKeeper.create(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZNAMESPACES, new byte[0], - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zooKeeper.create( - ZooUtil.getRoot(INSTANCE_ID) + Constants.ZNAMESPACES + "/" + nsId.canonical(), - new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - } catch (KeeperException ex) { - log.trace("Issue during zk initialization, skipping", ex); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Interrupted during zookeeper path initialization", ex); - } + zooKeeper.create(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + zooKeeper.create(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES + "/" + tidA.canonical(), + new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zooKeeper.create(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZTABLES + "/" + tidB.canonical(), + new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + zooKeeper.create(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZNAMESPACES, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zooKeeper.create(ZooUtil.getRoot(INSTANCE_ID) + Constants.ZNAMESPACES + "/" + nsId.canonical(), + new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); ticker = new TestTicker(); @@ -169,12 +162,8 @@ public void initPaths() { } @AfterEach - public void cleanupZnodes() { - try { - ZKUtil.deleteRecursive(zooKeeper, "/accumulo"); - } catch (KeeperException | InterruptedException ex) { - throw new IllegalStateException("Failed to clean-up test zooKeeper nodes.", ex); - } + public void cleanupZnodes() throws Exception { + ZKUtil.deleteRecursive(zooKeeper, "/accumulo"); verify(context); } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooMutatorIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooMutatorIT.java index bf860864665..28cebf2b955 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooMutatorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooMutatorIT.java @@ -26,14 +26,11 @@ import java.io.File; import java.util.ArrayList; import java.util.List; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.harness.WithTestNames; import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; import org.junit.jupiter.api.Tag; @@ -85,8 +82,6 @@ public void concurrentMutatorTest() throws Exception { File newFolder = new File(tempDir, testName() + "/"); assertTrue(newFolder.isDirectory() || newFolder.mkdir(), "failed to create dir: " + newFolder); try (ZooKeeperTestingServer szk = new ZooKeeperTestingServer(newFolder)) { - final var iid = InstanceId.of(UUID.randomUUID()); - szk.initPaths(ZooUtil.getRoot(iid)); ZooReaderWriter zk = szk.getZooReaderWriter(); var executor = Executors.newFixedThreadPool(16); diff --git a/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockIT.java b/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockIT.java index abaa9698276..11e1a29e7a7 100644 --- a/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockIT.java +++ b/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockIT.java @@ -38,10 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; -import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.fate.zookeeper.ZooSession; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher; import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; @@ -49,13 +46,10 @@ import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; -import org.apache.accumulo.test.util.Wait; import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; @@ -81,8 +75,6 @@ public class ServiceLockIT { @BeforeAll public static void setup() throws Exception { szk = new ZooKeeperTestingServer(tempDir); - final var iid = InstanceId.of(UUID.randomUUID()); - szk.initPaths(ZooUtil.getRoot(iid)); } @AfterAll @@ -113,14 +105,6 @@ public String create(String path, byte[] data, List acl, CreateMode createM } - private static class ServiceLockWrapper extends ServiceLock { - - protected ServiceLockWrapper(ZooKeeper zookeeper, ServiceLockPath path, UUID uuid) { - super(zookeeper, path, uuid); - } - - } - static class RetryLockWatcher implements AccumuloLockWatcher { private boolean lockHeld = false; @@ -148,19 +132,6 @@ public boolean isLockHeld() { } } - static class ConnectedWatcher implements Watcher { - volatile boolean connected = false; - - @Override - public synchronized void process(WatchedEvent event) { - connected = event.getState() == KeeperState.SyncConnected; - } - - public synchronized boolean isConnected() { - return connected; - } - } - static class TestALW implements AccumuloLockWatcher { LockLossReason reason = null; @@ -204,14 +175,13 @@ public synchronized void unableToMonitorLockNode(Exception e) { private static final AtomicInteger pdCount = new AtomicInteger(0); - private static ServiceLock getZooLock(ServiceLockPath parent, UUID uuid) { - var zooKeeper = ZooSession.getAuthenticatedSession(szk.getConn(), 30000, "digest", - "accumulo:secret".getBytes(UTF_8)); - return new ServiceLock(zooKeeper, parent, uuid); + private ServiceLock getZooLock(ServiceLockPath parent, UUID randomUUID) + throws IOException, InterruptedException { + return new ServiceLock(szk.newClient(), parent, randomUUID); } private static ServiceLock getZooLock(ZooKeeperWrapper zkw, ServiceLockPath parent, UUID uuid) { - return new ServiceLockWrapper(zkw, parent, uuid); + return new ServiceLock(zkw, parent, uuid); } @Test @@ -381,11 +351,7 @@ public void testUnexpectedEvent() throws Exception { var parent = ServiceLock .path("/zltestUnexpectedEvent-" + this.hashCode() + "-l" + pdCount.incrementAndGet()); - ConnectedWatcher watcher = new ConnectedWatcher(); - try (ZooKeeper zk = new ZooKeeper(szk.getConn(), 30000, watcher)) { - ZooUtil.digestAuth(zk, "secret"); - - Wait.waitFor(() -> !watcher.isConnected(), 30_000, 200); + try (var zk = szk.newClient()) { zk.create(parent.toString(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -426,16 +392,8 @@ public void testUnexpectedEvent() throws Exception { public void testLockSerial() throws Exception { var parent = ServiceLock.path("/zlretryLockSerial"); - ConnectedWatcher watcher1 = new ConnectedWatcher(); - ConnectedWatcher watcher2 = new ConnectedWatcher(); - try (ZooKeeperWrapper zk1 = new ZooKeeperWrapper(szk.getConn(), 30000, watcher1); - ZooKeeperWrapper zk2 = new ZooKeeperWrapper(szk.getConn(), 30000, watcher2)) { - - ZooUtil.digestAuth(zk1, "secret"); - ZooUtil.digestAuth(zk2, "secret"); - - Wait.waitFor(() -> !watcher1.isConnected(), 30_000, 200); - Wait.waitFor(() -> !watcher2.isConnected(), 30_000, 200); + try (ZooKeeperWrapper zk1 = szk.newClient(ZooKeeperWrapper::new); + ZooKeeperWrapper zk2 = szk.newClient(ZooKeeperWrapper::new)) { // Create the parent node zk1.createOnce(parent.toString(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, @@ -536,12 +494,7 @@ public boolean holdsLock() { @Override public void run() { try { - ConnectedWatcher watcher = new ConnectedWatcher(); - try (ZooKeeperWrapper zk = new ZooKeeperWrapper(szk.getConn(), 30000, watcher)) { - ZooUtil.digestAuth(zk, "secret"); - - Wait.waitFor(() -> !watcher.isConnected(), 30_000, 50); - + try (ZooKeeperWrapper zk = szk.newClient(ZooKeeperWrapper::new)) { ServiceLock zl = getZooLock(zk, parent, uuid); getLockLatch.countDown(); // signal we are done getLockLatch.await(); // wait for others to finish @@ -587,13 +540,7 @@ private int parseLockWorkerName(String child) { public void testLockParallel() throws Exception { var parent = ServiceLock.path("/zlParallel"); - ConnectedWatcher watcher = new ConnectedWatcher(); - try (ZooKeeperWrapper zk = new ZooKeeperWrapper(szk.getConn(), 30000, watcher)) { - ZooUtil.digestAuth(zk, "secret"); - - while (!watcher.isConnected()) { - Thread.sleep(50); - } + try (ZooKeeperWrapper zk = szk.newClient(ZooKeeperWrapper::new)) { // Create the parent node zk.createOnce(parent.toString(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -658,11 +605,7 @@ public void testTryLock() throws Exception { ServiceLock zl = getZooLock(parent, UUID.randomUUID()); - ConnectedWatcher watcher = new ConnectedWatcher(); - try (ZooKeeper zk = new ZooKeeper(szk.getConn(), 30000, watcher)) { - ZooUtil.digestAuth(zk, "secret"); - - Wait.waitFor(() -> !watcher.isConnected(), 30_000, 200); + try (var zk = szk.newClient()) { for (int i = 0; i < 10; i++) { zk.create(parent.toString(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, @@ -695,12 +638,8 @@ public void testTryLock() throws Exception { public void testChangeData() throws Exception { var parent = ServiceLock.path("/zltestChangeData-" + this.hashCode() + "-l" + pdCount.incrementAndGet()); - ConnectedWatcher watcher = new ConnectedWatcher(); - try (ZooKeeper zk = new ZooKeeper(szk.getConn(), 30000, watcher)) { - ZooUtil.digestAuth(zk, "secret"); - - Wait.waitFor(() -> !watcher.isConnected(), 30_000, 200); + try (var zk = szk.newClient()) { zk.create(parent.toString(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); ServiceLock zl = getZooLock(parent, UUID.randomUUID()); diff --git a/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooKeeperTestingServer.java b/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooKeeperTestingServer.java index dd83d8cf4b6..ae352ff2354 100644 --- a/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooKeeperTestingServer.java +++ b/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooKeeperTestingServer.java @@ -24,11 +24,10 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReader; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.server.util.PortUtils; import org.apache.curator.test.TestingServer; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,89 +44,70 @@ public class ZooKeeperTestingServer implements AutoCloseable { public static final String SECRET = "secret"; private TestingServer zkServer; - private final ZooKeeper zoo; /** * Instantiate a running zookeeper server - this call will block until the server is ready for * client connections. It will try three times, with a 5 second pause to connect. */ - public ZooKeeperTestingServer(File tmpDir) { - this(tmpDir, PortUtils.getRandomFreePort()); - } - - private ZooKeeperTestingServer(File tmpDir, int port) { - + public ZooKeeperTestingServer(final File tmpDir) { Preconditions.checkArgument(tmpDir.isDirectory()); - + final int port = PortUtils.getRandomFreePort(); try { - - CountDownLatch connectionLatch = new CountDownLatch(1); - // using a random port. The test server allows for auto port // generation, but not with specifying the tmp dir path too. // so, generate our own. boolean started = false; int retry = 0; while (!started && retry++ < 3) { - try { - zkServer = new TestingServer(port, tmpDir); zkServer.start(); - started = true; } catch (Exception ex) { log.trace("zookeeper test server start failed attempt {}", retry); } } - log.info("zookeeper connection string:'{}'", zkServer.getConnectString()); - - zoo = new ZooKeeper(zkServer.getConnectString(), 5_000, watchedEvent -> { - if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) { - connectionLatch.countDown(); - } - }); - - connectionLatch.await(); - } catch (Exception ex) { throw new IllegalStateException("Failed to start testing zookeeper", ex); } - } - public ZooKeeper getZooKeeper() { - return zoo; + @FunctionalInterface + public interface ZooKeeperConstructor { + public T construct(String connectString, int sessionTimeout, Watcher watcher) + throws IOException; } - public ZooReaderWriter getZooReaderWriter() { - return new ZooReader(getConn(), 30000).asWriter(SECRET); + /** + * Create a new instance of a ZooKeeper client that is already connected to the testing server + * using the provided constructor that accepts the connection string, the timeout, and a watcher + * used by this class to wait for the client to connect. This can be used to construct a subclass + * of the ZooKeeper client that implements non-standard behavior for a test. + */ + public T newClient(ZooKeeperConstructor f) + throws IOException, InterruptedException { + var connectionLatch = new CountDownLatch(1); + var zoo = f.construct(zkServer.getConnectString(), 30_000, watchedEvent -> { + if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) { + connectionLatch.countDown(); + } + }); + connectionLatch.await(); + ZooUtil.digestAuth(zoo, SECRET); + return zoo; } - public String getConn() { - return zkServer.getConnectString(); + /** + * Create a new instance of a standard ZooKeeper client that is already connected to the testing + * server. + */ + public ZooKeeper newClient() throws IOException, InterruptedException { + return newClient(ZooKeeper::new); } - public void initPaths(String s) { - try { - - String[] paths = s.split("/"); - - String slash = "/"; - String path = ""; - - for (String p : paths) { - if (!p.isEmpty()) { - path = path + slash + p; - log.debug("building default paths, creating node {}", path); - zoo.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - } - - } catch (Exception ex) { - throw new IllegalStateException("Failed to create accumulo initial paths: " + s, ex); - } + public ZooReaderWriter getZooReaderWriter() { + return new ZooReader(zkServer.getConnectString(), 30000).asWriter(SECRET); } @Override From df053cc63e6e998c1ca264d5078cbd2337759356 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Tue, 3 Dec 2024 12:50:26 -0500 Subject: [PATCH 03/23] Manager balancer fixes (#5070) Modified Manager balancer code such that the tservers for the ROOT and METADATA DataLevels are recalculated on each loop to account for any change in available tablet servers, and ignoring any migrations that the balancer may emit for tablets outside of the current DataLevel. --- .../org/apache/accumulo/manager/Manager.java | 38 +++++++++++++++---- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 9aca12d7ab5..44800d58337 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1045,7 +1045,7 @@ private long balanceTablets() { } // Create a view of the tserver status such that it only contains the tables // for this level in the tableMap. - final SortedMap tserverStatusForLevel = + SortedMap tserverStatusForLevel = createTServerStatusView(dl, tserverStatus); // Construct the Thrift variant of the map above for the BalancerParams final SortedMap tserverStatusForBalancerLevel = @@ -1057,17 +1057,36 @@ private long balanceTablets() { int attemptNum = 0; do { log.debug("Balancing for tables at level {}, times-in-loop: {}", dl, ++attemptNum); - params = BalanceParamsImpl.fromThrift(tserverStatusForBalancerLevel, - tserverStatusForLevel, partitionedMigrations.get(dl)); + + SortedMap statusForBalancerLevel = + tserverStatusForBalancerLevel; + if (attemptNum > 1 && (dl == DataLevel.ROOT || dl == DataLevel.METADATA)) { + // If we are still migrating then perform a re-check on the tablet + // servers to make sure non of them have failed. + Set currentServers = tserverSet.getCurrentServers(); + tserverStatus = gatherTableInformation(currentServers); + // Create a view of the tserver status such that it only contains the tables + // for this level in the tableMap. + tserverStatusForLevel = createTServerStatusView(dl, tserverStatus); + final SortedMap tserverStatusForBalancerLevel2 = + new TreeMap<>(); + tserverStatusForLevel.forEach((tsi, status) -> tserverStatusForBalancerLevel2 + .put(new TabletServerIdImpl(tsi), TServerStatusImpl.fromThrift(status))); + statusForBalancerLevel = tserverStatusForBalancerLevel2; + } + + params = BalanceParamsImpl.fromThrift(statusForBalancerLevel, tserverStatusForLevel, + partitionedMigrations.get(dl)); wait = Math.max(tabletBalancer.balance(params), wait); - migrationsOutForLevel = params.migrationsOut().size(); - for (TabletMigration m : checkMigrationSanity(tserverStatusForBalancerLevel.keySet(), - params.migrationsOut())) { + migrationsOutForLevel = 0; + for (TabletMigration m : checkMigrationSanity(statusForBalancerLevel.keySet(), + params.migrationsOut(), dl)) { final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet()); if (migrations.containsKey(ke)) { log.warn("balancer requested migration more than once, skipping {}", m); continue; } + migrationsOutForLevel++; migrations.put(ke, TabletServerIdImpl.toThrift(m.getNewTabletServer())); log.debug("migration {}", m); } @@ -1091,11 +1110,16 @@ private long balanceTablets() { } private List checkMigrationSanity(Set current, - List migrations) { + List migrations, DataLevel level) { return migrations.stream().filter(m -> { boolean includeMigration = false; if (m.getTablet() == null) { log.error("Balancer gave back a null tablet {}", m); + } else if (DataLevel.of(m.getTablet().getTable()) != level) { + log.trace( + "Balancer wants to move a tablet ({}) outside of the current processing level ({}), " + + "ignoring and should be processed at the correct level ({})", + m.getTablet(), level, DataLevel.of(m.getTablet().getTable())); } else if (m.getNewTabletServer() == null) { log.error("Balancer did not set the destination {}", m); } else if (m.getOldTabletServer() == null) { From cd07dc60360b3c14009380b0b0d96f2197757b26 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 4 Dec 2024 13:34:02 -0500 Subject: [PATCH 04/23] improves trace logging in zoocache (#5133) * improves trace logging in zoocache Noticed a lot of tsever lock checking was blocking on zoocache in jstacks. Was not sure why this was happening. Added more detailed logging to zoocache inorder to know what paths are missing in the cache and when a path is removed from the cache. * Update core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java Co-authored-by: Daniel Roberts --------- Co-authored-by: Daniel Roberts --- .../core/fate/zookeeper/ZooCache.java | 45 ++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java index e0f00994d5d..86b869fa152 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java @@ -53,6 +53,9 @@ public class ZooCache { private final ZCacheWatcher watcher = new ZCacheWatcher(); private final Watcher externalWatcher; + private static final AtomicLong nextCacheId = new AtomicLong(0); + private final String cacheId = "ZC" + nextCacheId.incrementAndGet(); + private static class ZcNode { final byte[] data; final ZcStat stat; @@ -150,7 +153,7 @@ private class ZCacheWatcher implements Watcher { @Override public void process(WatchedEvent event) { if (log.isTraceEnabled()) { - log.trace("{}", event); + log.trace("{}: {}", cacheId, event); } switch (event.getType()) { @@ -167,26 +170,26 @@ public void process(WatchedEvent event) { // These are ignored, because they are generated by SingletonManager closing // ZooKeepers for ZooSession, and SingletonManager is already responsible for clearing // caches via the registered ZooCacheFactory singleton - log.trace("ZooKeeper connection closed, ignoring; {}", event); + log.trace("{} ZooKeeper connection closed, ignoring; {}", cacheId, event); break; case Disconnected: - log.trace("ZooKeeper connection disconnected, clearing cache; {}", event); + log.trace("{} ZooKeeper connection disconnected, clearing cache; {}", cacheId, event); clear(); break; case SyncConnected: - log.trace("ZooKeeper connection established, ignoring; {}", event); + log.trace("{} ZooKeeper connection established, ignoring; {}", cacheId, event); break; case Expired: - log.trace("ZooKeeper connection expired, clearing cache; {}", event); + log.trace("{} ZooKeeper connection expired, clearing cache; {}", cacheId, event); clear(); break; default: - log.warn("Unhandled {}", event); + log.warn("{} Unhandled {}", cacheId, event); break; } break; default: - log.warn("Unhandled {}", event); + log.warn("{} Unhandled {}", cacheId, event); break; } @@ -206,6 +209,7 @@ public ZooCache(ZooReader reader, Watcher watcher) { this.zReader = reader; nodeCache = new ConcurrentHashMap<>(); this.externalWatcher = watcher; + log.trace("{} created new cache", cacheId, new Exception()); } private abstract static class ZooRunnable { @@ -316,6 +320,8 @@ public List run() throws KeeperException, InterruptedException { return zcNode.getChildren(); } + log.trace("{} {} was not in children cache, looking up in zookeeper", cacheId, zPath); + try { zcNode = nodeCache.compute(zPath, (zp, zcn) -> { // recheck the children now that lock is held on key @@ -387,6 +393,8 @@ public byte[] run() throws KeeperException, InterruptedException { return zcNode.getData(); } + log.trace("{} {} was not in data cache, looking up in zookeeper", cacheId, zPath); + zcNode = nodeCache.compute(zPath, (zp, zcn) -> { // recheck the now that lock is held on key, it may be present now. Could have been // computed while waiting for lock. @@ -408,7 +416,7 @@ public byte[] run() throws KeeperException, InterruptedException { ZcStat zstat = null; if (stat == null) { if (log.isTraceEnabled()) { - log.trace("zookeeper did not contain {}", zPath); + log.trace("{} zookeeper did not contain {}", cacheId, zPath); } } else { try { @@ -420,7 +428,7 @@ public byte[] run() throws KeeperException, InterruptedException { throw new ZcInterruptedException(e); } if (log.isTraceEnabled()) { - log.trace("zookeeper contained {} {}", zPath, + log.trace("{} zookeeper contained {} {}", cacheId, zPath, (data == null ? null : new String(data, UTF_8))); } } @@ -460,6 +468,7 @@ protected void copyStats(ZcStat userStat, ZcStat cachedStat) { private void remove(String zPath) { nodeCache.remove(zPath); + log.trace("{} removed {} from cache", cacheId, zPath); updateCount.incrementAndGet(); } @@ -470,6 +479,7 @@ public void clear() { Preconditions.checkState(!closed); nodeCache.clear(); updateCount.incrementAndGet(); + log.trace("{} cleared all from cache", cacheId); } public void close() { @@ -514,7 +524,20 @@ boolean childrenCached(String zPath) { */ public void clear(Predicate pathPredicate) { Preconditions.checkState(!closed); - nodeCache.keySet().removeIf(pathPredicate); + + Predicate pathPredicateToUse; + if (log.isTraceEnabled()) { + pathPredicateToUse = path -> { + boolean testResult = pathPredicate.test(path); + if (testResult) { + log.trace("{} removing {} from cache", cacheId, path); + } + return testResult; + }; + } else { + pathPredicateToUse = pathPredicate; + } + nodeCache.keySet().removeIf(pathPredicateToUse); updateCount.incrementAndGet(); } @@ -536,7 +559,7 @@ public Optional getLockData(ServiceLockPath path) { byte[] lockData = get(path + "/" + lockNode); if (log.isTraceEnabled()) { - log.trace("Data from lockNode {} is {}", lockNode, new String(lockData, UTF_8)); + log.trace("{} Data from lockNode {} is {}", cacheId, lockNode, new String(lockData, UTF_8)); } if (lockData == null) { lockData = new byte[0]; From 355b0d726e3bebc2407e5f29e5bd965004f68bef Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Thu, 5 Dec 2024 10:42:28 -0500 Subject: [PATCH 05/23] Fix ACCUMULO_POOL_PREFIX usage for Thread pool names (#5141) Closes #5136 --- .../apache/accumulo/core/util/threads/ThreadPools.java | 8 ++++++-- .../java/org/apache/accumulo/server/rpc/TServerUtils.java | 2 +- .../tserver/compactions/InternalCompactionExecutor.java | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index b2b0bc02db1..240a41459cb 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -399,7 +399,7 @@ public ThreadPoolExecutorBuilder getPoolBuilder(@NonNull final ThreadPoolNames p } /** - * Fet a fluent-style pool builder. + * Get a fluent-style pool builder. * * @param name the pool name - the name trimed and prepended with the ACCUMULO_POOL_PREFIX so that * pool names begin with a consistent prefix. @@ -409,7 +409,11 @@ public ThreadPoolExecutorBuilder getPoolBuilder(@NonNull final String name) { if (trimmed.startsWith(ACCUMULO_POOL_PREFIX.poolName)) { return new ThreadPoolExecutorBuilder(trimmed); } else { - return new ThreadPoolExecutorBuilder(ACCUMULO_POOL_PREFIX.poolName + trimmed); + if (trimmed.startsWith(".")) { + return new ThreadPoolExecutorBuilder(ACCUMULO_POOL_PREFIX.poolName + trimmed); + } else { + return new ThreadPoolExecutorBuilder(ACCUMULO_POOL_PREFIX.poolName + "." + trimmed); + } } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index d4220ff6ded..22a191a7a68 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -311,7 +311,7 @@ private static ServerAddress createNonBlockingServer(HostAndPort address, TProce private static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName, final int executorThreads, long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks) { - String poolName = ACCUMULO_POOL_PREFIX.poolName + serverName.toLowerCase() + ".client"; + String poolName = ACCUMULO_POOL_PREFIX.poolName + "." + serverName.toLowerCase() + ".client"; final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().getPoolBuilder(poolName).numCoreThreads(executorThreads) .withTimeOut(threadTimeOut, MILLISECONDS).enableThreadPoolMetrics().build(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java index 66002556a3f..618f6463b32 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java @@ -208,7 +208,7 @@ public SubmittedJob submit(CompactionServiceId csid, CompactionJob job, Compacta public void setThreads(int numThreads) { ThreadPools.resizePool(threadPool, () -> numThreads, - ACCUMULO_POOL_PREFIX.poolName + "accumulo.pool.compaction." + ceid); + ACCUMULO_POOL_PREFIX.poolName + ".accumulo.pool.compaction." + ceid); } @Override From 93365b4f2330cc54268149e6ebc24a87af44e89f Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 6 Dec 2024 08:07:35 -0500 Subject: [PATCH 06/23] Fixed failures in RegexGroupBalanceIT after merging PR #5070 (#5140) After merging #5070 RegexGroupBalanceIT started failing. Both GroupBalancer and HostRegexTableLoadBalancer have logic that throttles the frequency that they can be called do not return any migrations in this scenario. The change in #5070 modified the frequency in which the balancer is called from once for all DataLevel's to once per DataLevel. This caused the GroupBalancer and HostRegexTableLoadBalancer to return migrations for the ROOT DataLevel, but not the METADATA and USER DataLevels. The solution in this commit is to push the DataLevel down to the Balancer in the BalancerParams so that the throttling can be done at the DataLevel level. --- .../manager/balancer/BalanceParamsImpl.java | 18 +++++++++++---- .../core/spi/balancer/GroupBalancer.java | 10 +++++--- .../balancer/HostRegexTableLoadBalancer.java | 14 +++++++---- .../core/spi/balancer/TableLoadBalancer.java | 7 ++++-- .../core/spi/balancer/TabletBalancer.java | 9 ++++++++ .../BaseHostRegexTableLoadBalancerTest.java | 8 +++++++ .../core/spi/balancer/GroupBalancerTest.java | 11 +++++---- ...xTableLoadBalancerReconfigurationTest.java | 12 ++++++---- .../HostRegexTableLoadBalancerTest.java | 23 +++++++++++-------- .../spi/balancer/SimpleLoadBalancerTest.java | 7 ++++-- .../spi/balancer/TableLoadBalancerTest.java | 5 ++-- .../org/apache/accumulo/manager/Manager.java | 2 +- .../test/ChaoticLoadBalancerTest.java | 4 +++- 13 files changed, 92 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java index a0c30d43f5d..97b9315c6e6 100644 --- a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.dataImpl.TabletIdImpl; import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.TabletBalancer; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; @@ -40,35 +41,39 @@ public class BalanceParamsImpl implements TabletBalancer.BalanceParameters { private final List migrationsOut; private final SortedMap thriftCurrentStatus; private final Set thriftCurrentMigrations; + private final DataLevel currentDataLevel; public static BalanceParamsImpl fromThrift(SortedMap currentStatus, SortedMap thriftCurrentStatus, - Set thriftCurrentMigrations) { + Set thriftCurrentMigrations, DataLevel currentLevel) { Set currentMigrations = thriftCurrentMigrations.stream().map(TabletIdImpl::new) .collect(Collectors.toUnmodifiableSet()); return new BalanceParamsImpl(currentStatus, currentMigrations, new ArrayList<>(), - thriftCurrentStatus, thriftCurrentMigrations); + thriftCurrentStatus, thriftCurrentMigrations, currentLevel); } public BalanceParamsImpl(SortedMap currentStatus, - Set currentMigrations, List migrationsOut) { + Set currentMigrations, List migrationsOut, + DataLevel currentLevel) { this.currentStatus = currentStatus; this.currentMigrations = currentMigrations; this.migrationsOut = migrationsOut; this.thriftCurrentStatus = null; this.thriftCurrentMigrations = null; + this.currentDataLevel = currentLevel; } private BalanceParamsImpl(SortedMap currentStatus, Set currentMigrations, List migrationsOut, SortedMap thriftCurrentStatus, - Set thriftCurrentMigrations) { + Set thriftCurrentMigrations, DataLevel currentLevel) { this.currentStatus = currentStatus; this.currentMigrations = currentMigrations; this.migrationsOut = migrationsOut; this.thriftCurrentStatus = thriftCurrentStatus; this.thriftCurrentMigrations = thriftCurrentMigrations; + this.currentDataLevel = currentLevel; } @Override @@ -100,4 +105,9 @@ public void addMigration(KeyExtent extent, TServerInstance oldServer, TServerIns TabletServerId newTsid = new TabletServerIdImpl(newServer); migrationsOut.add(new TabletMigration(id, oldTsid, newTsid)); } + + @Override + public String currentLevel() { + return currentDataLevel.name(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/GroupBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/GroupBalancer.java index 3527ba6f4c1..dc34e704440 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/GroupBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/GroupBalancer.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -68,7 +69,8 @@ public abstract class GroupBalancer implements TabletBalancer { protected BalancerEnvironment environment; private final TableId tableId; - private long lastRun = 0; + + protected final Map lastRunTimes = new HashMap<>(DataLevel.values().length); @Override public void init(BalancerEnvironment balancerEnvironment) { @@ -211,7 +213,9 @@ public long balance(BalanceParameters params) { return 5000; } - if (System.currentTimeMillis() - lastRun < getWaitTime()) { + final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel()); + + if (System.currentTimeMillis() - lastRunTimes.getOrDefault(currentLevel, 0L) < getWaitTime()) { return 5000; } @@ -275,7 +279,7 @@ public long balance(BalanceParameters params) { populateMigrations(tservers.keySet(), params.migrationsOut(), moves); - lastRun = System.currentTimeMillis(); + lastRunTimes.put(currentLevel, System.currentTimeMillis()); return 5000; } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java index 0b89e5d4ddf..cb88ce320c4 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java @@ -51,6 +51,7 @@ import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; import org.apache.accumulo.core.manager.balancer.TableStatisticsImpl; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TableStatistics; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; @@ -181,7 +182,7 @@ static class HrtlbConf { } private static final Set EMPTY_MIGRATIONS = Collections.emptySet(); - private volatile long lastOOBCheck = System.currentTimeMillis(); + protected final Map lastOOBCheckTimes = new HashMap<>(DataLevel.values().length); private Map> pools = new HashMap<>(); private final Map migrationsFromLastPass = new HashMap<>(); private final Map tableToTimeSinceNoMigrations = new HashMap<>(); @@ -394,7 +395,10 @@ public long balance(BalanceParameters params) { Map> currentGrouped = splitCurrentByRegex(params.currentStatus()); - if ((now - this.lastOOBCheck) > myConf.oobCheckMillis) { + final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel()); + + if ((now - this.lastOOBCheckTimes.getOrDefault(currentLevel, System.currentTimeMillis())) + > myConf.oobCheckMillis) { try { // Check to see if a tablet is assigned outside the bounds of the pool. If so, migrate it. for (String table : tableIdMap.keySet()) { @@ -454,7 +458,7 @@ public long balance(BalanceParameters params) { } } finally { // this could have taken a while...get a new time - this.lastOOBCheck = System.currentTimeMillis(); + this.lastOOBCheckTimes.put(currentLevel, System.currentTimeMillis()); } } @@ -507,8 +511,8 @@ public long balance(BalanceParameters params) { continue; } ArrayList newMigrations = new ArrayList<>(); - getBalancerForTable(tableId) - .balance(new BalanceParamsImpl(currentView, migrations, newMigrations)); + getBalancerForTable(tableId).balance( + new BalanceParamsImpl(currentView, migrations, newMigrations, DataLevel.of(tableId))); if (newMigrations.isEmpty()) { tableToTimeSinceNoMigrations.remove(tableId); diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java index cb89e5b093a..55a24c30943 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.slf4j.Logger; @@ -124,10 +125,12 @@ public void getAssignments(AssignmentParameters params) { public long balance(BalanceParameters params) { long minBalanceTime = 5_000; // Iterate over the tables and balance each of them + final DataLevel currentDataLevel = DataLevel.valueOf(params.currentLevel()); for (TableId tableId : environment.getTableIdMap().values()) { ArrayList newMigrations = new ArrayList<>(); - long tableBalanceTime = getBalancerForTable(tableId).balance( - new BalanceParamsImpl(params.currentStatus(), params.currentMigrations(), newMigrations)); + long tableBalanceTime = + getBalancerForTable(tableId).balance(new BalanceParamsImpl(params.currentStatus(), + params.currentMigrations(), newMigrations, currentDataLevel)); if (tableBalanceTime < minBalanceTime) { minBalanceTime = tableBalanceTime; } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java index a7dfcbdc2bb..356bbc72236 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java @@ -25,6 +25,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TabletId; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -93,6 +94,14 @@ interface BalanceParameters { * migrations. */ List migrationsOut(); + + /** + * Return the DataLevel name for which the Manager is currently balancing. Balancers should + * return migrations for tables within the current DataLevel. + * + * @return name of current balancing iteration data level + */ + String currentLevel(); } /** diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/BaseHostRegexTableLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/BaseHostRegexTableLoadBalancerTest.java index c9c478a07fd..38d9297881f 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/BaseHostRegexTableLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/BaseHostRegexTableLoadBalancerTest.java @@ -268,4 +268,12 @@ protected SortedMap createCurrent(int numTservers) } return current; } + + @Override + public long balance(BalanceParameters params) { + long wait = super.balance(params); + super.lastOOBCheckTimes.clear(); + return wait; + } + } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java index 3f85ed3b792..e55eb379d23 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java @@ -40,6 +40,7 @@ import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -83,7 +84,8 @@ public void balance() { } public void balance(final int maxMigrations) { - GroupBalancer balancer = new GroupBalancer(TableId.of("1")) { + TableId tid = TableId.of("1"); + GroupBalancer balancer = new GroupBalancer(tid) { @Override protected Map getLocationProvider() { @@ -106,10 +108,10 @@ protected int getMaxMigrations() { } }; - balance(balancer, maxMigrations); + balance(balancer, maxMigrations, tid); } - public void balance(TabletBalancer balancer, int maxMigrations) { + public void balance(TabletBalancer balancer, int maxMigrations, TableId tid) { while (true) { Set migrations = new HashSet<>(); @@ -121,7 +123,8 @@ public void balance(TabletBalancer balancer, int maxMigrations) { new org.apache.accumulo.core.master.thrift.TabletServerStatus())); } - balancer.balance(new BalanceParamsImpl(current, migrations, migrationsOut)); + balancer + .balance(new BalanceParamsImpl(current, migrations, migrationsOut, DataLevel.of(tid))); assertTrue(migrationsOut.size() <= (maxMigrations + 5), "Max Migration exceeded " + maxMigrations + " " + migrationsOut.size()); diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java index f6b2123b6df..58a89ec6260 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java @@ -43,6 +43,7 @@ import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.spi.balancer.data.TabletStatistics; @@ -107,16 +108,19 @@ public void testConfigurationChanges() { // getOnlineTabletsForTable UtilWaitThread.sleep(3000); this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers), - migrations, migrationsOut)); + migrations, migrationsOut, DataLevel.USER)); assertEquals(0, migrationsOut.size()); // Change property, simulate call by TableConfWatcher config.set(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + BAR.getTableName(), "r01.*"); - // Wait to trigger the out of bounds check and the repool check - UtilWaitThread.sleep(10000); + // calls to balance will clear the lastOOBCheckTimes map + // in the HostRegexTableLoadBalancer. For this test we want + // to get into the out of bounds checking code, so we need to + // populate the map with an older time value + this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2); this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers), - migrations, migrationsOut)); + migrations, migrationsOut, DataLevel.USER)); assertEquals(5, migrationsOut.size()); for (TabletMigration migration : migrationsOut) { assertTrue(migration.getNewTabletServer().getHost().startsWith("192.168.0.1") diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java index 298bb8b995c..4d3162e02d2 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java @@ -48,13 +48,13 @@ import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.spi.balancer.data.TabletStatistics; import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.util.ConfigurationImpl; -import org.apache.accumulo.core.util.UtilWaitThread; import org.junit.jupiter.api.Test; public class HostRegexTableLoadBalancerTest extends BaseHostRegexTableLoadBalancerTest { @@ -98,7 +98,7 @@ public void testBalance() { List migrationsOut = new ArrayList<>(); long wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut)); + migrations, migrationsOut, DataLevel.USER)); assertEquals(20000, wait); // should balance four tablets in one of the tables before reaching max assertEquals(4, migrationsOut.size()); @@ -109,7 +109,7 @@ public void testBalance() { } migrationsOut.clear(); wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut)); + migrations, migrationsOut, DataLevel.USER)); assertEquals(20000, wait); // should balance four tablets in one of the other tables before reaching max assertEquals(4, migrationsOut.size()); @@ -120,7 +120,7 @@ public void testBalance() { } migrationsOut.clear(); wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut)); + migrations, migrationsOut, DataLevel.USER)); assertEquals(20000, wait); // should balance four tablets in one of the other tables before reaching max assertEquals(4, migrationsOut.size()); @@ -131,7 +131,7 @@ public void testBalance() { } migrationsOut.clear(); wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut)); + migrations, migrationsOut, DataLevel.USER)); assertEquals(20000, wait); // no more balancing to do assertEquals(0, migrationsOut.size()); @@ -148,7 +148,7 @@ public void testBalanceWithTooManyOutstandingMigrations() { migrations.addAll(tableTablets.get(BAR.getTableName())); long wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut)); + migrations, migrationsOut, DataLevel.USER)); assertEquals(20000, wait); // no migrations should have occurred as 10 is the maxOutstandingMigrations assertEquals(0, migrationsOut.size()); @@ -487,13 +487,16 @@ public void testUnassignedWithNoDefaultPool() { @Test public void testOutOfBoundsTablets() { + // calls to balance will clear the lastOOBCheckTimes map + // in the HostRegexTableLoadBalancer. For this test we want + // to get into the out of bounds checking code, so we need to + // populate the map with an older time value + this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2); init(DEFAULT_TABLE_PROPERTIES); - // Wait to trigger the out of bounds check which will call our version of - // getOnlineTabletsForTable - UtilWaitThread.sleep(11000); Set migrations = new HashSet<>(); List migrationsOut = new ArrayList<>(); - this.balance(new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut)); + this.balance( + new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut, DataLevel.USER)); assertEquals(2, migrationsOut.size()); } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java index 53889be484d..055898928b3 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java @@ -42,6 +42,7 @@ import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl; import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -202,7 +203,8 @@ public void testUnevenAssignment() { // balance until we can't balance no more! while (true) { List migrationsOut = new ArrayList<>(); - balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut)); + balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut, + DataLevel.USER)); if (migrationsOut.isEmpty()) { break; } @@ -244,7 +246,8 @@ public void testUnevenAssignment2() { // balance until we can't balance no more! while (true) { List migrationsOut = new ArrayList<>(); - balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut)); + balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut, + DataLevel.USER)); if (migrationsOut.isEmpty()) { break; } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java index 9d856e6052b..8e9aefd0283 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl; import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -141,13 +142,13 @@ public void test() { List migrationsOut = new ArrayList<>(); TableLoadBalancer tls = new TableLoadBalancer(); tls.init(environment); - tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut)); + tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER)); assertEquals(0, migrationsOut.size()); state.put(mkts("10.0.0.2", 2345, "0x02030405"), status()); tls = new TableLoadBalancer(); tls.init(environment); - tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut)); + tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER)); int count = 0; Map movedByTable = new HashMap<>(); movedByTable.put(TableId.of(t1Id), 0); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 44800d58337..55255751531 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1076,7 +1076,7 @@ private long balanceTablets() { } params = BalanceParamsImpl.fromThrift(statusForBalancerLevel, tserverStatusForLevel, - partitionedMigrations.get(dl)); + partitionedMigrations.get(dl), dl); wait = Math.max(tabletBalancer.balance(params), wait); migrationsOutForLevel = 0; for (TabletMigration m : checkMigrationSanity(statusForBalancerLevel.keySet(), diff --git a/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java b/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java index 90a26464173..57fbd33247f 100644 --- a/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java +++ b/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java @@ -40,6 +40,7 @@ import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl; import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -157,7 +158,8 @@ public void testUnevenAssignment() { // amount, or even expected amount List migrationsOut = new ArrayList<>(); while (!migrationsOut.isEmpty()) { - balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut)); + balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut, + DataLevel.USER)); } } From 3033a33e012fd66ef2da948d6944bf74bdd0da67 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 6 Dec 2024 14:01:29 +0000 Subject: [PATCH 07/23] Add missing since tag in TabletBalancer, remove unused import --- .../org/apache/accumulo/core/spi/balancer/TabletBalancer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java index 356bbc72236..06235a10a1f 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java @@ -25,7 +25,6 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TabletId; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -100,6 +99,7 @@ interface BalanceParameters { * return migrations for tables within the current DataLevel. * * @return name of current balancing iteration data level + * @since 2.1.4 */ String currentLevel(); } From a2ffa7b956a0ba2b920c0128df194ffd782745cc Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 6 Dec 2024 14:34:07 +0000 Subject: [PATCH 08/23] Add override annotation to BalanceParamsImpl that was lost in merge --- .../apache/accumulo/core/manager/balancer/BalanceParamsImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java index 8419452c711..ccccecd0f39 100644 --- a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java @@ -126,6 +126,7 @@ public Map> currentResourceGroups() { return tserverResourceGroups; } + @Override public String currentLevel() { return currentDataLevel.name(); } From 186294dd202b63904281b99f37e3a21d0b1285bc Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 6 Dec 2024 11:40:24 -0500 Subject: [PATCH 09/23] Refactor and simplification of accumulo-cluster (#5116) Modified argument parsing logic to use getopt which provides more flexibility to specify multiple options and allow options to have optional arguments. This allowed me to remove start-servers and stop-servers and use options to start and stop instead. See usage for details to api changes. Modified command execution such that ssh is used, even locally, for commands unless --local is specified. --- assemble/bin/accumulo-cluster | 782 +++++++++++++--------------------- 1 file changed, 290 insertions(+), 492 deletions(-) diff --git a/assemble/bin/accumulo-cluster b/assemble/bin/accumulo-cluster index cd80d7cb0d9..225ab2fd72e 100755 --- a/assemble/bin/accumulo-cluster +++ b/assemble/bin/accumulo-cluster @@ -18,6 +18,14 @@ # under the License. # +# +# Environment variables that can be set to influence the behavior +# of this script +# +# ACCUMULO_LOCALHOST_ADDRESSES - set to a space delimited string of localhost names +# and addresses to override the default lookups +# + function print_usage { cat < ( ...) [