From 2b8fc0c488504f257405b4708bf5e96624678410 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 23 Sep 2024 13:22:36 -0400 Subject: [PATCH 01/18] deprecates lru block cache and makes TinyLfu the new default (#4920) If the TinyLfu cache works well as the default going forward then the LRU cache could eventually be dropped. --- .../java/org/apache/accumulo/core/conf/Property.java | 8 +++----- .../file/blockfile/cache/lru/LruBlockCacheManager.java | 2 ++ .../file/blockfile/cache/BlockCacheFactoryTest.java | 1 + .../core/file/blockfile/cache/TestLruBlockCache.java | 1 + .../accumulo/core/file/rfile/AbstractRFileTest.java | 10 +++++----- .../org/apache/accumulo/core/file/rfile/RFileTest.java | 4 ++-- 6 files changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 88975feb2c0..eb2ffbfd6df 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.classloader.ClassLoaderUtil; import org.apache.accumulo.core.data.constraints.NoDeleteConstraint; +import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iteratorsImpl.system.DeletingIterator; @@ -504,11 +505,8 @@ public enum Property { "Time to wait for clients to continue scans before closing a session.", "1.3.5"), TSERV_DEFAULT_BLOCKSIZE("tserver.default.blocksize", "1M", PropertyType.BYTES, "Specifies a default blocksize for the tserver caches.", "1.3.5"), - TSERV_CACHE_MANAGER_IMPL("tserver.cache.manager.class", - "org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager", PropertyType.STRING, - "Specifies the class name of the block cache factory implementation." - + " Alternative implementation is" - + " org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager.", + TSERV_CACHE_MANAGER_IMPL("tserver.cache.manager.class", TinyLfuBlockCacheManager.class.getName(), + PropertyType.STRING, "Specifies the class name of the block cache factory implementation.", "2.0.0"), TSERV_DATACACHE_SIZE("tserver.cache.data.size", "10%", PropertyType.MEMORY, "Specifies the size of the cache for RFile data blocks.", "1.3.5"), diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java index a4132b7d755..76be394e3b2 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Deprecated(since = "3.1.0") public class LruBlockCacheManager extends BlockCacheManager { private static final Logger LOG = LoggerFactory.getLogger(LruBlockCacheManager.class); @@ -32,6 +33,7 @@ public class LruBlockCacheManager extends BlockCacheManager { protected BlockCache createCache(Configuration conf, CacheType type) { LruBlockCacheConfiguration cc = new LruBlockCacheConfiguration(conf, type); LOG.info("Creating {} cache with configuration {}", type, cc); + LOG.warn("This cache implementation is deprecated and will be remove in future releases."); return new LruBlockCache(cc); } diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java index 1cbfbd289fe..8338d2735a7 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java @@ -34,6 +34,7 @@ public class BlockCacheFactoryTest { @Test + @SuppressWarnings("deprecation") public void testCreateLruBlockCacheFactory() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java index bf814e0e4d3..e12dacfe71f 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java @@ -49,6 +49,7 @@ * Tests will ensure it grows and shrinks in size properly, evictions run when they're supposed to * and do what they should, and that cached blocks are accessible when expected to be. */ +@SuppressWarnings("deprecation") public class TestLruBlockCache { @Test diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java index a5fc04a854e..d65991590ad 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java @@ -44,8 +44,7 @@ import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory; -import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache; -import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager; +import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager; import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder; import org.apache.accumulo.core.file.rfile.RFile.FencedReader; @@ -56,6 +55,7 @@ import org.apache.accumulo.core.iteratorsImpl.system.ColumnFamilySkippingIterator; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.sample.impl.SamplerFactory; +import org.apache.accumulo.core.spi.cache.BlockCache; import org.apache.accumulo.core.spi.cache.BlockCacheManager; import org.apache.accumulo.core.spi.cache.CacheType; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; @@ -164,7 +164,7 @@ public void openReader(boolean cfsi, Range fence) throws IOException { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.TSERV_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); try { manager = BlockCacheManagerFactory.getInstance(cc); } catch (ReflectiveOperationException e) { @@ -174,8 +174,8 @@ public void openReader(boolean cfsi, Range fence) throws IOException { cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(100000000)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(100000000)); manager.start(BlockCacheConfiguration.forTabletServer(cc)); - LruBlockCache indexCache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); - LruBlockCache dataCache = (LruBlockCache) manager.getBlockCache(CacheType.DATA); + BlockCache indexCache = manager.getBlockCache(CacheType.INDEX); + BlockCache dataCache = manager.getBlockCache(CacheType.DATA); CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, accumuloConfiguration.getAllCryptoProperties()); diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 36533596dcc..81c76d58ff6 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -64,7 +64,7 @@ import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory; -import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager; +import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager; import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder; import org.apache.accumulo.core.file.rfile.RFile.Reader; @@ -1584,7 +1584,7 @@ private void runVersionTest(int version, ConfigurationCopy aconf) throws Excepti byte[] data = baos.toByteArray(); SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data); FSDataInputStream in2 = new FSDataInputStream(bais); - aconf.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + aconf.set(Property.TSERV_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); aconf.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(100000)); aconf.set(Property.TSERV_DATACACHE_SIZE, Long.toString(100000000)); aconf.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(100000000)); From a8dd117d249e071a6d92258671481e6327e09fab Mon Sep 17 00:00:00 2001 From: Daniel Roberts Date: Mon, 23 Sep 2024 13:40:28 -0400 Subject: [PATCH 02/18] Fixes documentation for changeSecret command (#4901) Updates documentation and log message to instruct users on how to correctly run the changeSecret command. --- core/src/main/java/org/apache/accumulo/core/conf/Property.java | 2 +- .../main/java/org/apache/accumulo/server/init/Initialize.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index f20631f5322..469aa9fca1a 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -116,7 +116,7 @@ public enum Property { + " everywhere. Before using the ChangeSecret tool, make sure Accumulo is not" + " running and you are logged in as the user that controls Accumulo files in" + " HDFS. To use the ChangeSecret tool, run the command: `./bin/accumulo" - + " org.apache.accumulo.server.util.ChangeSecret`.", + + " admin changeSecret`.", "1.3.5"), INSTANCE_VOLUMES("instance.volumes", "", PropertyType.STRING, "A comma separated list of dfs uris to use. Files will be stored across" diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java index 800dcc3a949..bd8d7d3fd51 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java @@ -55,7 +55,6 @@ import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.server.security.SecurityUtil; -import org.apache.accumulo.server.util.ChangeSecret; import org.apache.accumulo.server.util.SystemPropUtil; import org.apache.accumulo.start.spi.KeywordExecutable; import org.apache.commons.lang3.StringUtils; @@ -110,7 +109,7 @@ static void checkInit(ZooReaderWriter zoo, VolumeManager fs, InitialConfiguratio System.out.println(); System.out.println(); System.out.println("You can change the instance secret in accumulo by using:"); - System.out.println(" bin/accumulo " + ChangeSecret.class.getName()); + System.out.println(" bin/accumulo admin changeSecret"); System.out.println("You will also need to edit your secret in your configuration" + " file by adding the property instance.secret to your" + " accumulo.properties. Without this accumulo will not operate correctly"); From 960cee8d4585985e61c8c3e1d177bbcae98385c6 Mon Sep 17 00:00:00 2001 From: "Dom G." Date: Mon, 23 Sep 2024 14:39:32 -0400 Subject: [PATCH 03/18] Use getDescription for all micrometer instruments (#4886) * Use getDescription for all micrometer instruments * fill in empty metrics descriptions and improve existing descriptions --------- Co-authored-by: Keith Turner --- .../apache/accumulo/core/metrics/Metric.java | 232 ++++++++++-------- .../compaction/PausedCompactionMetrics.java | 4 +- .../server/metrics/ProcessMetrics.java | 8 +- .../server/metrics/ThriftMetrics.java | 7 +- .../apache/accumulo/compactor/Compactor.java | 8 +- .../apache/accumulo/gc/metrics/GcMetrics.java | 30 ++- .../manager/metrics/BalancerMetrics.java | 2 +- .../accumulo/tserver/BlockCacheMetrics.java | 18 +- .../accumulo/tserver/ScanServerMetrics.java | 10 +- .../tserver/metrics/TabletServerMetrics.java | 38 ++- .../metrics/TabletServerMinCMetrics.java | 6 +- .../metrics/TabletServerScanMetrics.java | 35 ++- .../metrics/TabletServerUpdateMetrics.java | 16 +- 13 files changed, 219 insertions(+), 195 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java index 9850d1a0646..27b5e3bf00a 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java @@ -27,180 +27,210 @@ public enum Metric { "Indicates if the server is idle or not. The value will be 1 when idle and 0 when not idle.", MetricCategory.GENERAL_SERVER), LOW_MEMORY("accumulo.detected.low.memory", MetricType.GAUGE, - "Reports 1 when process memory usage is above threshold, 0 when memory is okay.", + "Reports 1 when process memory usage is above the threshold, reports 0 when memory is okay.", MetricCategory.GENERAL_SERVER), // Compactor Metrics - COMPACTOR_MAJC_STUCK("accumulo.compactor.majc.stuck", MetricType.LONG_TASK_TIMER, "", - MetricCategory.COMPACTOR), + COMPACTOR_MAJC_STUCK("accumulo.compactor.majc.stuck", MetricType.LONG_TASK_TIMER, + "Number and duration of stuck major compactions.", MetricCategory.COMPACTOR), COMPACTOR_ENTRIES_READ("accumulo.compactor.entries.read", MetricType.FUNCTION_COUNTER, - "Number of entries read by all threads performing compactions.", MetricCategory.COMPACTOR), + "Number of entries read by all compactions that have run on this compactor.", + MetricCategory.COMPACTOR), COMPACTOR_ENTRIES_WRITTEN("accumulo.compactor.entries.written", MetricType.FUNCTION_COUNTER, - "Number of entries written by all threads performing compactions.", MetricCategory.COMPACTOR), + "Number of entries written by all compactions that have run on this compactor.", + MetricCategory.COMPACTOR), // Fate Metrics FATE_TYPE_IN_PROGRESS("accumulo.fate.ops.in.progress.by.type", MetricType.GAUGE, - "Number of FATE operations in progress. The type is designated by the `op.type` tag.", + "Number of FATE operations in progress. The op type is designated by the `op.type` tag.", MetricCategory.FATE), - FATE_OPS("accumulo.fate.ops", MetricType.GAUGE, "Tracks all the current FATE ops in any state.", + FATE_OPS("accumulo.fate.ops", MetricType.GAUGE, + "Number of all the current FATE ops in any state.", MetricCategory.FATE), + FATE_OPS_ACTIVITY("accumulo.fate.ops.activity", MetricType.GAUGE, + "Count of the total number of times fate operations are added, updated, and removed.", MetricCategory.FATE), - FATE_OPS_ACTIVITY("accumulo.fate.ops.activity", MetricType.GAUGE, "", MetricCategory.FATE), - FATE_ERRORS("accumulo.fate.errors", MetricType.GAUGE, "", MetricCategory.FATE), + FATE_ERRORS("accumulo.fate.errors", MetricType.GAUGE, + "Count of errors that occurred when attempting to gather fate metrics.", MetricCategory.FATE), FATE_TX("accumulo.fate.tx", MetricType.GAUGE, "The state is now in a tag (e.g., state=new, state=in.progress, state=failed, etc.).", MetricCategory.FATE), // Garbage Collection Metrics - GC_STARTED("accumulo.gc.started", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_FINISHED("accumulo.gc.finished", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_CANDIDATES("accumulo.gc.candidates", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_IN_USE("accumulo.gc.in.use", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_DELETED("accumulo.gc.deleted", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_ERRORS("accumulo.gc.errors", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_WAL_STARTED("accumulo.gc.wal.started", MetricType.GAUGE, "", + GC_STARTED("accumulo.gc.started", MetricType.GAUGE, "Timestamp GC file collection cycle started.", + MetricCategory.GARBAGE_COLLECTION), + GC_FINISHED("accumulo.gc.finished", MetricType.GAUGE, "Timestamp GC file collect cycle finished.", + MetricCategory.GARBAGE_COLLECTION), + GC_CANDIDATES("accumulo.gc.candidates", MetricType.GAUGE, + "Number of files that are candidates for deletion.", MetricCategory.GARBAGE_COLLECTION), + GC_IN_USE("accumulo.gc.in.use", MetricType.GAUGE, "Number of candidate files still in use.", MetricCategory.GARBAGE_COLLECTION), - GC_WAL_FINISHED("accumulo.gc.wal.finished", MetricType.GAUGE, "", + GC_DELETED("accumulo.gc.deleted", MetricType.GAUGE, "Number of candidate files deleted.", MetricCategory.GARBAGE_COLLECTION), - GC_WAL_CANDIDATES("accumulo.gc.wal.candidates", MetricType.GAUGE, "", + GC_ERRORS("accumulo.gc.errors", MetricType.GAUGE, "Number of candidate deletion errors.", MetricCategory.GARBAGE_COLLECTION), - GC_WAL_IN_USE("accumulo.gc.wal.in.use", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_WAL_DELETED("accumulo.gc.wal.deleted", MetricType.GAUGE, "", + GC_WAL_STARTED("accumulo.gc.wal.started", MetricType.GAUGE, + "Timestamp GC WAL collection cycle started.", MetricCategory.GARBAGE_COLLECTION), + GC_WAL_FINISHED("accumulo.gc.wal.finished", MetricType.GAUGE, + "Timestamp GC WAL collect cycle finished.", MetricCategory.GARBAGE_COLLECTION), + GC_WAL_CANDIDATES("accumulo.gc.wal.candidates", MetricType.GAUGE, + "Number of files that are candidates for deletion.", MetricCategory.GARBAGE_COLLECTION), + GC_WAL_IN_USE("accumulo.gc.wal.in.use", MetricType.GAUGE, + "Number of wal file candidates that are still in use.", MetricCategory.GARBAGE_COLLECTION), + GC_WAL_DELETED("accumulo.gc.wal.deleted", MetricType.GAUGE, + "Number of candidate wal files deleted.", MetricCategory.GARBAGE_COLLECTION), + GC_WAL_ERRORS("accumulo.gc.wal.errors", MetricType.GAUGE, + "Number candidate wal file deletion errors.", MetricCategory.GARBAGE_COLLECTION), + GC_POST_OP_DURATION("accumulo.gc.post.op.duration", MetricType.GAUGE, + "GC metadata table post operation duration in milliseconds.", MetricCategory.GARBAGE_COLLECTION), - GC_WAL_ERRORS("accumulo.gc.wal.errors", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_POST_OP_DURATION("accumulo.gc.post.op.duration", MetricType.GAUGE, "", + GC_RUN_CYCLE("accumulo.gc.run.cycle", MetricType.GAUGE, + "Count of gc cycle runs. Value is reset on process start.", MetricCategory.GARBAGE_COLLECTION), - GC_RUN_CYCLE("accumulo.gc.run.cycle", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), // Tablet Server Metrics - TSERVER_ENTRIES("accumulo.tserver.entries", MetricType.GAUGE, "", MetricCategory.TABLET_SERVER), - TSERVER_MEM_ENTRIES("accumulo.tserver.entries.mem", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_MAJC_RUNNING("accumulo.tserver.majc.running", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_MAJC_STUCK("accumulo.tserver.majc.stuck", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_MAJC_QUEUED("accumulo.tserver.majc.queued", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_MINC_QUEUED("accumulo.tserver.minc.queued", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_MINC_RUNNING("accumulo.tserver.minc.running", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_MINC_TOTAL("accumulo.tserver.minc.total", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_TABLETS_ONLINE("accumulo.tserver.tablets.online", MetricType.GAUGE, "", + TSERVER_ENTRIES("accumulo.tserver.entries", MetricType.GAUGE, "Number of entries.", MetricCategory.TABLET_SERVER), + TSERVER_MEM_ENTRIES("accumulo.tserver.entries.mem", MetricType.GAUGE, + "Number of entries in memory.", MetricCategory.TABLET_SERVER), + TSERVER_MAJC_RUNNING("accumulo.tserver.majc.running", MetricType.GAUGE, + "Number of active major compactions.", MetricCategory.TABLET_SERVER), + TSERVER_MAJC_STUCK("accumulo.tserver.majc.stuck", MetricType.GAUGE, + "Number and duration of stuck major compactions.", MetricCategory.TABLET_SERVER), + TSERVER_MAJC_QUEUED("accumulo.tserver.majc.queued", MetricType.GAUGE, + "Number of queued major compactions.", MetricCategory.TABLET_SERVER), + TSERVER_MINC_QUEUED("accumulo.tserver.minc.queued", MetricType.GAUGE, + "Number of queued minor compactions.", MetricCategory.TABLET_SERVER), + TSERVER_MINC_RUNNING("accumulo.tserver.minc.running", MetricType.GAUGE, + "Number of active minor compactions.", MetricCategory.TABLET_SERVER), + TSERVER_MINC_TOTAL("accumulo.tserver.minc.total", MetricType.GAUGE, + "Total number of minor compactions performed.", MetricCategory.TABLET_SERVER), + TSERVER_TABLETS_ONLINE("accumulo.tserver.tablets.online", MetricType.GAUGE, + "Number of online tablets.", MetricCategory.TABLET_SERVER), TSERVER_TABLETS_LONG_ASSIGNMENTS("accumulo.tserver.tablets.assignments.warning", MetricType.GAUGE, - "", MetricCategory.TABLET_SERVER), - TSERVER_TABLETS_OPENING("accumulo.tserver.tablets.opening", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_TABLETS_UNOPENED("accumulo.tserver.tablets.unopened", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_TABLETS_FILES("accumulo.tserver.tablets.files", MetricType.GAUGE, "", + "Number of tablet assignments that are taking longer than the configured warning duration.", MetricCategory.TABLET_SERVER), + TSERVER_TABLETS_OPENING("accumulo.tserver.tablets.opening", MetricType.GAUGE, + "Number of opening tablets.", MetricCategory.TABLET_SERVER), + TSERVER_TABLETS_UNOPENED("accumulo.tserver.tablets.unopened", MetricType.GAUGE, + "Number of unopened tablets.", MetricCategory.TABLET_SERVER), + TSERVER_TABLETS_FILES("accumulo.tserver.tablets.files", MetricType.GAUGE, + "Number of files per tablet.", MetricCategory.TABLET_SERVER), TSERVER_INGEST_MUTATIONS("accumulo.tserver.ingest.mutations", MetricType.GAUGE, - "Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be derived.", + "Ingest mutation count. The rate can be derived from this metric.", MetricCategory.TABLET_SERVER), TSERVER_INGEST_BYTES("accumulo.tserver.ingest.bytes", MetricType.GAUGE, - "Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be derived.", - MetricCategory.TABLET_SERVER), - TSERVER_HOLD("accumulo.tserver.hold", MetricType.GAUGE, "", MetricCategory.TABLET_SERVER), + "Ingest byte count. The rate can be derived from this metric.", MetricCategory.TABLET_SERVER), + TSERVER_HOLD("accumulo.tserver.hold", MetricType.GAUGE, + "Duration for which commits have been held in milliseconds.", MetricCategory.TABLET_SERVER), // Scan Metrics SCAN_RESERVATION_TOTAL_TIMER("accumulo.scan.reservation.total.timer", MetricType.TIMER, "Time to reserve a tablet's files for scan.", MetricCategory.SCAN_SERVER), SCAN_RESERVATION_WRITEOUT_TIMER("accumulo.scan.reservation.writeout.timer", MetricType.TIMER, - "Time to write out a tablets file reservations for scan", MetricCategory.SCAN_SERVER), + "Time to write out a tablets file reservations for scan.", MetricCategory.SCAN_SERVER), SCAN_RESERVATION_CONFLICT_COUNTER("accumulo.scan.reservation.conflict.count", MetricType.COUNTER, - "", MetricCategory.SCAN_SERVER), + "Count of instances where file reservation attempts for scans encountered conflicts.", + MetricCategory.SCAN_SERVER), SCAN_BUSY_TIMEOUT_COUNT("accumulo.scan.busy.timeout.count", MetricType.COUNTER, "Count of the scans where a busy timeout happened.", MetricCategory.SCAN_SERVER), SCAN_TABLET_METADATA_CACHE("accumulo.scan.tablet.metadata.cache", MetricType.CACHE, "Scan server tablet cache metrics.", MetricCategory.SCAN_SERVER), - SCAN_TIMES("accumulo.scan.times", MetricType.TIMER, "", MetricCategory.SCAN_SERVER), - SCAN_OPEN_FILES("accumulo.scan.files.open", MetricType.GAUGE, "", MetricCategory.SCAN_SERVER), - SCAN_RESULTS("accumulo.scan.result", MetricType.GAUGE, "", MetricCategory.SCAN_SERVER), - SCAN_YIELDS("accumulo.scan.yields", MetricType.GAUGE, "", MetricCategory.SCAN_SERVER), - SCAN_START("accumulo.scan.start", MetricType.COUNTER, "", MetricCategory.SCAN_SERVER), - SCAN_CONTINUE("accumulo.scan.continue", MetricType.COUNTER, "", MetricCategory.SCAN_SERVER), - SCAN_CLOSE("accumulo.scan.close", MetricType.COUNTER, "", MetricCategory.SCAN_SERVER), - SCAN_QUERIES("accumulo.scan.queries", MetricType.GAUGE, "", MetricCategory.SCAN_SERVER), - SCAN_SCANNED_ENTRIES("accumulo.scan.query.scanned.entries", MetricType.GAUGE, - "Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be derived.", + SCAN_TIMES("accumulo.scan.times", MetricType.TIMER, "Scan session lifetime (creation to close).", MetricCategory.SCAN_SERVER), - SCAN_QUERY_SCAN_RESULTS("accumulo.scan.query.results", MetricType.GAUGE, - "Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be derived.", + SCAN_OPEN_FILES("accumulo.scan.files.open", MetricType.GAUGE, "Number of files open for scans.", MetricCategory.SCAN_SERVER), - SCAN_QUERY_SCAN_RESULTS_BYTES("accumulo.scan.query.results.bytes", MetricType.GAUGE, - "Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be derived.", + SCAN_RESULTS("accumulo.scan.result", MetricType.GAUGE, "Results per scan.", + MetricCategory.SCAN_SERVER), + SCAN_YIELDS("accumulo.scan.yields", MetricType.GAUGE, "Counts scans that have yielded.", MetricCategory.SCAN_SERVER), - SCAN_PAUSED_FOR_MEM("accumulo.scan.paused.for.memory", MetricType.COUNTER, "", + SCAN_START("accumulo.scan.start", MetricType.COUNTER, + "Number of calls to start a scan or multiscan.", MetricCategory.SCAN_SERVER), + SCAN_CONTINUE("accumulo.scan.continue", MetricType.COUNTER, + "Number of calls to continue a scan or multiscan.", MetricCategory.SCAN_SERVER), + SCAN_CLOSE("accumulo.scan.close", MetricType.COUNTER, + "Number of calls to close a scan or multiscan.", MetricCategory.SCAN_SERVER), + SCAN_QUERIES("accumulo.scan.queries", MetricType.GAUGE, "Number of queries made during scans.", MetricCategory.SCAN_SERVER), - SCAN_RETURN_FOR_MEM("accumulo.scan.return.early.for.memory", MetricType.COUNTER, "", + SCAN_SCANNED_ENTRIES("accumulo.scan.query.scanned.entries", MetricType.GAUGE, + "Count of scanned entries. The rate can be derived from this metric.", MetricCategory.SCAN_SERVER), - SCAN_ZOMBIE_THREADS("accumulo.scan.zombie.threads", MetricType.GAUGE, "", + SCAN_QUERY_SCAN_RESULTS("accumulo.scan.query.results", MetricType.GAUGE, + "Query count. The rate can be derived from this metric.", MetricCategory.SCAN_SERVER), + SCAN_QUERY_SCAN_RESULTS_BYTES("accumulo.scan.query.results.bytes", MetricType.GAUGE, + "Query byte count. The rate can be derived from this metric.", MetricCategory.SCAN_SERVER), + SCAN_PAUSED_FOR_MEM("accumulo.scan.paused.for.memory", MetricType.COUNTER, + "Count of scans paused due to server being low on memory.", MetricCategory.SCAN_SERVER), + SCAN_RETURN_FOR_MEM("accumulo.scan.return.early.for.memory", MetricType.COUNTER, + "Count of scans that returned results early due to server being low on memory.", MetricCategory.SCAN_SERVER), + SCAN_ZOMBIE_THREADS("accumulo.scan.zombie.threads", MetricType.GAUGE, + "Number of scan threads that have no associated client session.", MetricCategory.SCAN_SERVER), // Major Compaction Metrics MAJC_QUEUED("accumulo.tserver.compactions.majc.queued", MetricType.GAUGE, - "The compaction service information is in a tag: `id={i|e}_{compactionServiceName}_{executor_name}`.", + "Number of queued major compactions. The compaction service information is in a tag: `id={i|e}_{compactionServiceName}_{executor_name}`.", MetricCategory.TABLET_SERVER), MAJC_RUNNING("accumulo.tserver.compactions.majc.running", MetricType.GAUGE, - "The compaction service information is in a tag: `id={i|e}_{compactionServiceName}_{executor_name}`.", - MetricCategory.TABLET_SERVER), - MAJC_PAUSED("accumulo.tserver.compactions.majc.paused", MetricType.COUNTER, "", + "Number of running major compactions. The compaction service information is in a tag: `id={i|e}_{compactionServiceName}_{executor_name}`.", MetricCategory.TABLET_SERVER), + MAJC_PAUSED("accumulo.tserver.compactions.majc.paused", MetricType.COUNTER, + "Number of paused major compactions.", MetricCategory.TABLET_SERVER), // Minor Compaction Metrics - MINC_QUEUED("accumulo.tserver.compactions.minc.queued", MetricType.TIMER, "", - MetricCategory.TABLET_SERVER), - MINC_RUNNING("accumulo.tserver.compactions.minc.running", MetricType.TIMER, "", - MetricCategory.TABLET_SERVER), - MINC_PAUSED("accumulo.tserver.compactions.minc.paused", MetricType.COUNTER, "", - MetricCategory.TABLET_SERVER), + MINC_QUEUED("accumulo.tserver.compactions.minc.queued", MetricType.TIMER, + "Queued minor compactions time queued.", MetricCategory.TABLET_SERVER), + MINC_RUNNING("accumulo.tserver.compactions.minc.running", MetricType.TIMER, + "Minor compactions time active.", MetricCategory.TABLET_SERVER), + MINC_PAUSED("accumulo.tserver.compactions.minc.paused", MetricType.COUNTER, + "Number of paused minor compactions.", MetricCategory.TABLET_SERVER), // Updates (Ingest) Metrics UPDATE_ERRORS("accumulo.tserver.updates.error", MetricType.GAUGE, - "Type is stored in a tag (e.g., type=permission, type=unknown.tablet, type=constraint.violation).", - MetricCategory.TABLET_SERVER), - UPDATE_COMMIT("accumulo.tserver.updates.commit", MetricType.TIMER, "", - MetricCategory.TABLET_SERVER), - UPDATE_COMMIT_PREP("accumulo.tserver.updates.commit.prep", MetricType.TIMER, "", - MetricCategory.TABLET_SERVER), - UPDATE_WALOG_WRITE("accumulo.tserver.updates.walog.write", MetricType.TIMER, "", + "Count of errors during tablet updates. Type/reason for error is stored in the `type` tag (e.g., type=permission, type=unknown.tablet, type=constraint.violation).", MetricCategory.TABLET_SERVER), + UPDATE_COMMIT("accumulo.tserver.updates.commit", MetricType.TIMER, + "Time taken to commit a mutation.", MetricCategory.TABLET_SERVER), + UPDATE_COMMIT_PREP("accumulo.tserver.updates.commit.prep", MetricType.TIMER, + "Time taken to prepare to commit a single mutation.", MetricCategory.TABLET_SERVER), + UPDATE_WALOG_WRITE("accumulo.tserver.updates.walog.write", MetricType.TIMER, + "Time taken to write a batch of mutations to WAL.", MetricCategory.TABLET_SERVER), UPDATE_MUTATION_ARRAY_SIZE("accumulo.tserver.updates.mutation.arrays.size", - MetricType.DISTRIBUTION_SUMMARY, "", MetricCategory.TABLET_SERVER), + MetricType.DISTRIBUTION_SUMMARY, "Batch size of mutations from client.", + MetricCategory.TABLET_SERVER), // Thrift Metrics - THRIFT_IDLE("accumulo.thrift.idle", MetricType.DISTRIBUTION_SUMMARY, "", MetricCategory.THRIFT), - THRIFT_EXECUTE("accumulo.thrift.execute", MetricType.DISTRIBUTION_SUMMARY, "", - MetricCategory.THRIFT), + THRIFT_IDLE("accumulo.thrift.idle", MetricType.DISTRIBUTION_SUMMARY, + "Time waiting to execute an RPC request.", MetricCategory.THRIFT), + THRIFT_EXECUTE("accumulo.thrift.execute", MetricType.DISTRIBUTION_SUMMARY, + "Time to execute an RPC request.", MetricCategory.THRIFT), // Block Cache Metrics - BLOCKCACHE_INDEX_HITCOUNT("accumulo.blockcache.index.hitcount", MetricType.FUNCTION_COUNTER, "", - MetricCategory.BLOCK_CACHE), + BLOCKCACHE_INDEX_HITCOUNT("accumulo.blockcache.index.hitcount", MetricType.FUNCTION_COUNTER, + "Index block cache hit count.", MetricCategory.BLOCK_CACHE), BLOCKCACHE_INDEX_REQUESTCOUNT("accumulo.blockcache.index.requestcount", - MetricType.FUNCTION_COUNTER, "", MetricCategory.BLOCK_CACHE), + MetricType.FUNCTION_COUNTER, "Index block cache request count.", MetricCategory.BLOCK_CACHE), BLOCKCACHE_INDEX_EVICTIONCOUNT("accumulo.blockcache.index.evictioncount", - MetricType.FUNCTION_COUNTER, "", MetricCategory.BLOCK_CACHE), - BLOCKCACHE_DATA_HITCOUNT("accumulo.blockcache.data.hitcount", MetricType.FUNCTION_COUNTER, "", - MetricCategory.BLOCK_CACHE), + MetricType.FUNCTION_COUNTER, "Index block cache eviction count.", MetricCategory.BLOCK_CACHE), + BLOCKCACHE_DATA_HITCOUNT("accumulo.blockcache.data.hitcount", MetricType.FUNCTION_COUNTER, + "Data block cache hit count.", MetricCategory.BLOCK_CACHE), BLOCKCACHE_DATA_REQUESTCOUNT("accumulo.blockcache.data.requestcount", MetricType.FUNCTION_COUNTER, - "", MetricCategory.BLOCK_CACHE), + "Data block cache request count.", MetricCategory.BLOCK_CACHE), BLOCKCACHE_DATA_EVICTIONCOUNT("accumulo.blockcache.data.evictioncount", - MetricType.FUNCTION_COUNTER, "", MetricCategory.BLOCK_CACHE), + MetricType.FUNCTION_COUNTER, "Data block cache eviction count.", MetricCategory.BLOCK_CACHE), BLOCKCACHE_SUMMARY_HITCOUNT("accumulo.blockcache.summary.hitcount", MetricType.FUNCTION_COUNTER, - "", MetricCategory.BLOCK_CACHE), + "Summary block cache hit count.", MetricCategory.BLOCK_CACHE), BLOCKCACHE_SUMMARY_REQUESTCOUNT("accumulo.blockcache.summary.requestcount", - MetricType.FUNCTION_COUNTER, "", MetricCategory.BLOCK_CACHE), + MetricType.FUNCTION_COUNTER, "Summary block cache request count.", + MetricCategory.BLOCK_CACHE), BLOCKCACHE_SUMMARY_EVICTIONCOUNT("accumulo.blockcache.summary.evictioncount", - MetricType.FUNCTION_COUNTER, "", MetricCategory.BLOCK_CACHE), + MetricType.FUNCTION_COUNTER, "Summary block cache eviction count.", + MetricCategory.BLOCK_CACHE), // Manager Metrics MANAGER_BALANCER_MIGRATIONS_NEEDED("accumulo.manager.balancer.migrations.needed", MetricType.GAUGE, - "The number of migrations that need to complete before the system is balanced.", + "The total number of migrations that need to complete before the system is balanced.", MetricCategory.MANAGER); private final String name; diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java index 4c0f69f8663..da3b9fb4372 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java @@ -44,9 +44,9 @@ public void incrementMajCPause() { @Override public void registerMetrics(MeterRegistry registry) { FunctionCounter.builder(MAJC_PAUSED.getName(), majcPauseCount, AtomicLong::get) - .description("major compaction pause count").register(registry); + .description(MAJC_PAUSED.getDescription()).register(registry); FunctionCounter.builder(MINC_PAUSED.getName(), mincPauseCount, AtomicLong::get) - .description("minor compactor pause count").register(registry); + .description(MINC_PAUSED.getDescription()).register(registry); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java index 83def192286..e824da163b2 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java @@ -21,12 +21,12 @@ import static org.apache.accumulo.core.metrics.Metric.LOW_MEMORY; import static org.apache.accumulo.core.metrics.Metric.SERVER_IDLE; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.server.ServerContext; +import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; public class ProcessMetrics implements MetricsProducer { @@ -41,8 +41,10 @@ public ProcessMetrics(final ServerContext context) { @Override public void registerMetrics(MeterRegistry registry) { - registry.gauge(LOW_MEMORY.getName(), List.of(), this, this::lowMemDetected); - registry.gauge(SERVER_IDLE.getName(), isIdle, AtomicInteger::get); + Gauge.builder(LOW_MEMORY.getName(), this, this::lowMemDetected) + .description(LOW_MEMORY.getDescription()).register(registry); + Gauge.builder(SERVER_IDLE.getName(), isIdle, AtomicInteger::get) + .description(SERVER_IDLE.getDescription()).register(registry); } private int lowMemDetected(ProcessMetrics processMetrics) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java index 9fdf98a61c2..b910f57670f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java @@ -43,9 +43,10 @@ public void addExecute(long time) { @Override public void registerMetrics(MeterRegistry registry) { - idle = DistributionSummary.builder(THRIFT_IDLE.getName()).baseUnit("ms").register(registry); - execute = - DistributionSummary.builder(THRIFT_EXECUTE.getName()).baseUnit("ms").register(registry); + idle = DistributionSummary.builder(THRIFT_IDLE.getName()).baseUnit("ms") + .description(THRIFT_IDLE.getDescription()).register(registry); + execute = DistributionSummary.builder(THRIFT_EXECUTE.getName()).baseUnit("ms") + .description(THRIFT_EXECUTE.getDescription()).register(registry); } } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 4658bd8a5a7..1d49cf4f8ff 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -183,14 +183,12 @@ private long getTotalEntriesWritten() { public void registerMetrics(MeterRegistry registry) { super.registerMetrics(registry); FunctionCounter.builder(COMPACTOR_ENTRIES_READ.getName(), this, Compactor::getTotalEntriesRead) - .description("Number of entries read by all compactions that have run on this compactor") - .register(registry); + .description(COMPACTOR_ENTRIES_READ.getDescription()).register(registry); FunctionCounter .builder(COMPACTOR_ENTRIES_WRITTEN.getName(), this, Compactor::getTotalEntriesWritten) - .description("Number of entries written by all compactions that have run on this compactor") - .register(registry); + .description(COMPACTOR_ENTRIES_WRITTEN.getDescription()).register(registry); LongTaskTimer timer = LongTaskTimer.builder(COMPACTOR_MAJC_STUCK.getName()) - .description("Number and duration of stuck major compactions").register(registry); + .description(COMPACTOR_MAJC_STUCK.getDescription()).register(registry); CompactionWatcher.setTimer(timer); } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java index 9a88f853164..699ac825670 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java @@ -53,41 +53,39 @@ public GcMetrics(SimpleGarbageCollector gc) { @Override public void registerMetrics(MeterRegistry registry) { Gauge.builder(GC_STARTED.getName(), metricValues, v -> v.getLastCollect().getStarted()) - .description("Timestamp GC file collection cycle started").register(registry); + .description(GC_STARTED.getDescription()).register(registry); Gauge.builder(GC_FINISHED.getName(), metricValues, v -> v.getLastCollect().getFinished()) - .description("Timestamp GC file collect cycle finished").register(registry); + .description(GC_FINISHED.getDescription()).register(registry); Gauge.builder(GC_CANDIDATES.getName(), metricValues, v -> v.getLastCollect().getCandidates()) - .description("Number of files that are candidates for deletion").register(registry); + .description(GC_CANDIDATES.getDescription()).register(registry); Gauge.builder(GC_IN_USE.getName(), metricValues, v -> v.getLastCollect().getInUse()) - .description("Number of candidate files still in use").register(registry); + .description(GC_IN_USE.getDescription()).register(registry); Gauge.builder(GC_DELETED.getName(), metricValues, v -> v.getLastCollect().getDeleted()) - .description("Number of candidate files deleted").register(registry); + .description(GC_DELETED.getDescription()).register(registry); Gauge.builder(GC_ERRORS.getName(), metricValues, v -> v.getLastCollect().getErrors()) - .description("Number of candidate deletion errors").register(registry); + .description(GC_ERRORS.getDescription()).register(registry); // WAL metrics Gauges Gauge.builder(GC_WAL_STARTED.getName(), metricValues, v -> v.getLastWalCollect().getStarted()) - .description("Timestamp GC WAL collection cycle started").register(registry); + .description(GC_WAL_STARTED.getDescription()).register(registry); Gauge.builder(GC_WAL_FINISHED.getName(), metricValues, v -> v.getLastWalCollect().getFinished()) - .description("Timestamp GC WAL collect cycle finished").register(registry); + .description(GC_WAL_FINISHED.getDescription()).register(registry); Gauge .builder(GC_WAL_CANDIDATES.getName(), metricValues, v -> v.getLastWalCollect().getCandidates()) - .description("Number of files that are candidates for deletion").register(registry); + .description(GC_WAL_CANDIDATES.getDescription()).register(registry); Gauge.builder(GC_WAL_IN_USE.getName(), metricValues, v -> v.getLastWalCollect().getInUse()) - .description("Number of wal file candidates that are still in use").register(registry); + .description(GC_WAL_IN_USE.getDescription()).register(registry); Gauge.builder(GC_WAL_DELETED.getName(), metricValues, v -> v.getLastWalCollect().getDeleted()) - .description("Number of candidate wal files deleted").register(registry); + .description(GC_WAL_DELETED.getDescription()).register(registry); Gauge.builder(GC_WAL_ERRORS.getName(), metricValues, v -> v.getLastWalCollect().getErrors()) - .description("Number candidate wal file deletion errors").register(registry); + .description(GC_WAL_ERRORS.getDescription()).register(registry); Gauge .builder(GC_POST_OP_DURATION.getName(), metricValues, v -> TimeUnit.NANOSECONDS.toMillis(v.getPostOpDurationNanos())) - .description("GC metadata table post operation duration in milliseconds") - .register(registry); + .description(GC_POST_OP_DURATION.getDescription()).register(registry); Gauge.builder(GC_RUN_CYCLE.getName(), metricValues, GcCycleMetrics::getRunCycleCount) - .description("gauge incremented each gc cycle run, rest on process start") - .register(registry); + .description(GC_RUN_CYCLE.getDescription()).register(registry); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/BalancerMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/BalancerMetrics.java index 820f49b139e..9bdb24b94d1 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/BalancerMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/BalancerMetrics.java @@ -48,6 +48,6 @@ public void registerMetrics(MeterRegistry registry) { Gauge .builder(MANAGER_BALANCER_MIGRATIONS_NEEDED.getName(), this, BalancerMetrics::getMigratingCount) - .description("Overall total migrations that need to complete").register(registry); + .description(MANAGER_BALANCER_MIGRATIONS_NEEDED.getDescription()).register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/BlockCacheMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/BlockCacheMetrics.java index b6325654d85..987abc12e7c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/BlockCacheMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/BlockCacheMetrics.java @@ -55,26 +55,26 @@ public void registerMetrics(MeterRegistry registry) { ToDoubleFunction getEvictionCount = cache -> cache.getStats().evictionCount(); FunctionCounter.builder(BLOCKCACHE_INDEX_HITCOUNT.getName(), indexCache, getHitCount) - .description("Index block cache hit count").register(registry); + .description(BLOCKCACHE_INDEX_HITCOUNT.getDescription()).register(registry); FunctionCounter.builder(BLOCKCACHE_INDEX_REQUESTCOUNT.getName(), indexCache, getRequestCount) - .description("Index block cache request count").register(registry); + .description(BLOCKCACHE_INDEX_REQUESTCOUNT.getDescription()).register(registry); FunctionCounter.builder(BLOCKCACHE_INDEX_EVICTIONCOUNT.getName(), indexCache, getEvictionCount) - .description("Index block cache eviction count").register(registry); + .description(BLOCKCACHE_INDEX_EVICTIONCOUNT.getDescription()).register(registry); FunctionCounter.builder(BLOCKCACHE_DATA_HITCOUNT.getName(), dataCache, getHitCount) - .description("Data block cache hit count").register(registry); + .description(BLOCKCACHE_DATA_HITCOUNT.getDescription()).register(registry); FunctionCounter.builder(BLOCKCACHE_DATA_REQUESTCOUNT.getName(), dataCache, getRequestCount) - .description("Data block cache request count").register(registry); + .description(BLOCKCACHE_DATA_REQUESTCOUNT.getDescription()).register(registry); FunctionCounter.builder(BLOCKCACHE_DATA_EVICTIONCOUNT.getName(), dataCache, getEvictionCount) - .description("Data block cache eviction count").register(registry); + .description(BLOCKCACHE_DATA_EVICTIONCOUNT.getDescription()).register(registry); FunctionCounter.builder(BLOCKCACHE_SUMMARY_HITCOUNT.getName(), summaryCache, getHitCount) - .description("Summary block cache hit count").register(registry); + .description(BLOCKCACHE_SUMMARY_HITCOUNT.getDescription()).register(registry); FunctionCounter .builder(BLOCKCACHE_SUMMARY_REQUESTCOUNT.getName(), summaryCache, getRequestCount) - .description("Summary block cache request count").register(registry); + .description(BLOCKCACHE_SUMMARY_REQUESTCOUNT.getDescription()).register(registry); FunctionCounter .builder(BLOCKCACHE_SUMMARY_EVICTIONCOUNT.getName(), summaryCache, getEvictionCount) - .description("Summary block cache eviction count").register(registry); + .description(BLOCKCACHE_SUMMARY_EVICTIONCOUNT.getDescription()).register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java index 12d85f1d22d..1f558021e0a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java @@ -56,17 +56,15 @@ public ScanServerMetrics(final LoadingCache tabletMeta @Override public void registerMetrics(MeterRegistry registry) { totalReservationTimer = Timer.builder(SCAN_RESERVATION_TOTAL_TIMER.getName()) - .description("Time to reserve a tablets files for scan").register(registry); + .description(SCAN_RESERVATION_TOTAL_TIMER.getDescription()).register(registry); writeOutReservationTimer = Timer.builder(SCAN_RESERVATION_WRITEOUT_TIMER.getName()) - .description("Time to write out a tablets file reservations for scan").register(registry); + .description(SCAN_RESERVATION_WRITEOUT_TIMER.getDescription()).register(registry); FunctionCounter.builder(SCAN_BUSY_TIMEOUT_COUNT.getName(), busyTimeoutCount, AtomicLong::get) - .description("The number of scans where a busy timeout happened").register(registry); + .description(SCAN_BUSY_TIMEOUT_COUNT.getDescription()).register(registry); FunctionCounter .builder(SCAN_RESERVATION_CONFLICT_COUNTER.getName(), reservationConflictCount, AtomicLong::get) - .description( - "Counts instances where file reservation attempts for scans encountered conflicts") - .register(registry); + .description(SCAN_RESERVATION_CONFLICT_COUNTER.getDescription()).register(registry); if (tabletMetadataCache != null) { Preconditions.checkState(tabletMetadataCache.policy().isRecordingStats(), diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java index 26bbd11adbf..134c604a671 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java @@ -67,61 +67,59 @@ private long getTotalEntriesWritten() { public void registerMetrics(MeterRegistry registry) { FunctionCounter .builder(COMPACTOR_ENTRIES_READ.getName(), this, TabletServerMetrics::getTotalEntriesRead) - .description("Number of entries read by all compactions that have run on this tserver") - .register(registry); + .description(COMPACTOR_ENTRIES_READ.getDescription()).register(registry); FunctionCounter .builder(COMPACTOR_ENTRIES_WRITTEN.getName(), this, TabletServerMetrics::getTotalEntriesWritten) - .description("Number of entries written by all compactions that have run on this tserver") - .register(registry); + .description(COMPACTOR_ENTRIES_WRITTEN.getDescription()).register(registry); LongTaskTimer timer = LongTaskTimer.builder(TSERVER_MAJC_STUCK.getName()) - .description("Number and duration of stuck major compactions").register(registry); + .description(TSERVER_MAJC_STUCK.getDescription()).register(registry); CompactionWatcher.setTimer(timer); Gauge .builder(TSERVER_TABLETS_LONG_ASSIGNMENTS.getName(), util, TabletServerMetricsUtil::getLongTabletAssignments) - .description("Number of tablet assignments that are taking a long time").register(registry); + .description(TSERVER_TABLETS_LONG_ASSIGNMENTS.getDescription()).register(registry); Gauge.builder(TSERVER_ENTRIES.getName(), util, TabletServerMetricsUtil::getEntries) - .description("Number of entries").register(registry); + .description(TSERVER_ENTRIES.getDescription()).register(registry); Gauge.builder(TSERVER_MEM_ENTRIES.getName(), util, TabletServerMetricsUtil::getEntriesInMemory) - .description("Number of entries in memory").register(registry); + .description(TSERVER_MEM_ENTRIES.getDescription()).register(registry); Gauge .builder(TSERVER_MAJC_RUNNING.getName(), util, TabletServerMetricsUtil::getMajorCompactions) - .description("Number of active major compactions").register(registry); + .description(TSERVER_MAJC_RUNNING.getDescription()).register(registry); Gauge .builder(TSERVER_MAJC_QUEUED.getName(), util, TabletServerMetricsUtil::getMajorCompactionsQueued) - .description("Number of queued major compactions").register(registry); + .description(TSERVER_MINC_QUEUED.getDescription()).register(registry); Gauge .builder(TSERVER_MINC_RUNNING.getName(), util, TabletServerMetricsUtil::getMinorCompactions) - .description("Number of active minor compactions").register(registry); + .description(TSERVER_MINC_RUNNING.getDescription()).register(registry); Gauge .builder(TSERVER_MINC_QUEUED.getName(), util, TabletServerMetricsUtil::getMinorCompactionsQueued) - .description("Number of queued minor compactions").register(registry); + .description(TSERVER_MINC_QUEUED.getDescription()).register(registry); Gauge.builder(TSERVER_TABLETS_ONLINE.getName(), util, TabletServerMetricsUtil::getOnlineCount) - .description("Number of online tablets").register(registry); + .description(TSERVER_TABLETS_ONLINE.getDescription()).register(registry); Gauge.builder(TSERVER_TABLETS_OPENING.getName(), util, TabletServerMetricsUtil::getOpeningCount) - .description("Number of opening tablets").register(registry); + .description(TSERVER_TABLETS_OPENING.getDescription()).register(registry); Gauge .builder(TSERVER_TABLETS_UNOPENED.getName(), util, TabletServerMetricsUtil::getUnopenedCount) - .description("Number of unopened tablets").register(registry); + .description(TSERVER_TABLETS_UNOPENED.getDescription()).register(registry); Gauge .builder(TSERVER_MINC_TOTAL.getName(), util, TabletServerMetricsUtil::getTotalMinorCompactions) - .description("Total number of minor compactions performed").register(registry); + .description(TSERVER_MINC_TOTAL.getDescription()).register(registry); Gauge .builder(TSERVER_TABLETS_FILES.getName(), util, TabletServerMetricsUtil::getAverageFilesPerTablet) - .description("Number of files per tablet").register(registry); + .description(TSERVER_TABLETS_FILES.getDescription()).register(registry); Gauge.builder(TSERVER_HOLD.getName(), util, TabletServerMetricsUtil::getHoldTime) - .description("Time commits held").register(registry); + .description(TSERVER_HOLD.getDescription()).register(registry); Gauge.builder(TSERVER_INGEST_MUTATIONS.getName(), util, TabletServerMetricsUtil::getIngestCount) - .description("Ingest rate (entries/sec)").register(registry); + .description(TSERVER_INGEST_MUTATIONS.getDescription()).register(registry); Gauge.builder(TSERVER_INGEST_BYTES.getName(), util, TabletServerMetricsUtil::getIngestByteCount) - .description("Ingest rate (bytes/sec)").register(registry); + .description(TSERVER_INGEST_BYTES.getDescription()).register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java index 7bc3e5bc856..0ce05e77838 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java @@ -44,11 +44,11 @@ public void addQueued(long value) { @Override public void registerMetrics(MeterRegistry registry) { - activeMinc = Timer.builder(MINC_RUNNING.getName()).description("Minor compactions time active") + activeMinc = Timer.builder(MINC_RUNNING.getName()).description(MINC_RUNNING.getDescription()) .register(registry); - queuedMinc = Timer.builder(MINC_QUEUED.getName()) - .description("Queued minor compactions time queued").register(registry); + queuedMinc = Timer.builder(MINC_QUEUED.getName()).description(MINC_QUEUED.getDescription()) + .register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java index d5848722cba..e5b40c98d9f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java @@ -138,40 +138,39 @@ public long getZombieThreadsCount() { @Override public void registerMetrics(MeterRegistry registry) { Gauge.builder(SCAN_OPEN_FILES.getName(), openFiles::get) - .description("Number of files open for scans").register(registry); - scans = Timer.builder(SCAN_TIMES.getName()).description("Scans").register(registry); + .description(SCAN_OPEN_FILES.getDescription()).register(registry); + scans = Timer.builder(SCAN_TIMES.getName()).description(SCAN_TIMES.getDescription()) + .register(registry); resultsPerScan = DistributionSummary.builder(SCAN_RESULTS.getName()) - .description("Results per scan").register(registry); - yields = - DistributionSummary.builder(SCAN_YIELDS.getName()).description("yields").register(registry); + .description(SCAN_RESULTS.getDescription()).register(registry); + yields = DistributionSummary.builder(SCAN_YIELDS.getName()) + .description(SCAN_YIELDS.getDescription()).register(registry); FunctionCounter.builder(SCAN_START.getName(), this.startScanCalls, AtomicLong::get) - .description("calls to start a scan / multiscan").register(registry); + .description(SCAN_START.getDescription()).register(registry); FunctionCounter.builder(SCAN_CONTINUE.getName(), this.continueScanCalls, AtomicLong::get) - .description("calls to continue a scan / multiscan").register(registry); + .description(SCAN_CONTINUE.getDescription()).register(registry); FunctionCounter.builder(SCAN_CLOSE.getName(), this.closeScanCalls, AtomicLong::get) - .description("calls to close a scan / multiscan").register(registry); + .description(SCAN_CLOSE.getDescription()).register(registry); FunctionCounter .builder(SCAN_BUSY_TIMEOUT_COUNT.getName(), this.busyTimeoutCount, AtomicLong::get) - .description("The number of scans where a busy timeout happened").register(registry); + .description(SCAN_BUSY_TIMEOUT_COUNT.getDescription()).register(registry); FunctionCounter.builder(SCAN_QUERIES.getName(), this.lookupCount, LongAdder::sum) - .description("Number of queries").register(registry); + .description(SCAN_QUERIES.getDescription()).register(registry); FunctionCounter.builder(SCAN_SCANNED_ENTRIES.getName(), this.scannedCount, LongAdder::sum) - .description("Scanned rate").register(registry); + .description(SCAN_SCANNED_ENTRIES.getDescription()).register(registry); FunctionCounter.builder(SCAN_PAUSED_FOR_MEM.getName(), this.pausedForMemory, AtomicLong::get) - .description("scan paused due to server being low on memory").register(registry); + .description(SCAN_PAUSED_FOR_MEM.getDescription()).register(registry); FunctionCounter .builder(SCAN_RETURN_FOR_MEM.getName(), this.earlyReturnForMemory, AtomicLong::get) - .description("scan returned results early due to server being low on memory") - .register(registry); + .description(SCAN_RETURN_FOR_MEM.getDescription()).register(registry); Gauge.builder(SCAN_QUERY_SCAN_RESULTS.getName(), this.queryResultCount, LongAdder::sum) - .description("Query rate (entries/sec)").register(registry); + .description(SCAN_QUERY_SCAN_RESULTS.getDescription()).register(registry); Gauge.builder(SCAN_QUERY_SCAN_RESULTS_BYTES.getName(), this.queryResultBytes, LongAdder::sum) - .description("Query rate (bytes/sec)").register(registry); + .description(SCAN_QUERY_SCAN_RESULTS_BYTES.getDescription()).register(registry); Gauge .builder(SCAN_ZOMBIE_THREADS.getName(), this, TabletServerScanMetrics::getZombieThreadsCount) - .description("Number of scan threads that have no associated client session") - .register(registry); + .description(SCAN_ZOMBIE_THREADS.getDescription()).register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java index 37beff251c3..52453916cce 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java @@ -76,21 +76,21 @@ public void addMutationArraySize(long value) { @Override public void registerMetrics(MeterRegistry registry) { FunctionCounter.builder(UPDATE_ERRORS.getName(), permissionErrorsCount, AtomicLong::get) - .tags("type", "permission").description("Counts permission errors").register(registry); + .tags("type", "permission").description(UPDATE_ERRORS.getDescription()).register(registry); FunctionCounter.builder(UPDATE_ERRORS.getName(), unknownTabletErrorsCount, AtomicLong::get) - .tags("type", "unknown.tablet").description("Counts unknown tablet errors") + .tags("type", "unknown.tablet").description(UPDATE_ERRORS.getDescription()) .register(registry); FunctionCounter.builder(UPDATE_ERRORS.getName(), constraintViolationsCount, AtomicLong::get) - .tags("type", "constraint.violation").description("Counts constraint violations") + .tags("type", "constraint.violation").description(UPDATE_ERRORS.getDescription()) .register(registry); commitPrepStat = Timer.builder(UPDATE_COMMIT_PREP.getName()) - .description("preparing to commit mutations").register(registry); + .description(UPDATE_COMMIT_PREP.getDescription()).register(registry); walogWriteTimeStat = Timer.builder(UPDATE_WALOG_WRITE.getName()) - .description("writing mutations to WAL").register(registry); - commitTimeStat = Timer.builder(UPDATE_COMMIT.getName()).description("committing mutations") - .register(registry); + .description(UPDATE_WALOG_WRITE.getDescription()).register(registry); + commitTimeStat = Timer.builder(UPDATE_COMMIT.getName()) + .description(UPDATE_COMMIT.getDescription()).register(registry); mutationArraySizeStat = DistributionSummary.builder(UPDATE_MUTATION_ARRAY_SIZE.getName()) - .description("mutation array").register(registry); + .description(UPDATE_MUTATION_ARRAY_SIZE.getDescription()).register(registry); } } From 8139d1ee232925aa38c6a0a22490af330b7827d7 Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Mon, 23 Sep 2024 16:09:14 -0400 Subject: [PATCH 04/18] fix issue with converting text to string in MetadataConstraints in 3.x (#4897) * fix issue with converting text to string in MetadataConstraints in 3.x * Trivial change to SuspendLocationColumn made the Text name and qualifier string accessible through SuspendLocationColumn; accessed in MetadataConstraints --- .../apache/accumulo/core/metadata/schema/MetadataSchema.java | 5 +++-- .../accumulo/server/constraints/MetadataConstraints.java | 3 +-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index e4356c52474..4d5a3dea3f8 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -262,8 +262,9 @@ public static class LastLocationColumnFamily { */ public static class SuspendLocationColumn { public static final String STR_NAME = "suspend"; - public static final ColumnFQ SUSPEND_COLUMN = - new ColumnFQ(new Text(STR_NAME), new Text("loc")); + public static final Text NAME = new Text(STR_NAME); + public static final String SUSPEND_QUAL = "loc"; + public static final ColumnFQ SUSPEND_COLUMN = new ColumnFQ(NAME, new Text(SUSPEND_QUAL)); } /** diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index 0e18fa0a018..8395a8b407a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@ -377,8 +377,7 @@ private void validateServerFamily(ArrayList violations, ColumnUpdate colu private void validateSuspendLocationFamily(ArrayList violations, ColumnUpdate columnUpdate) { String qualStr = new String(columnUpdate.getColumnQualifier(), UTF_8); - String suspendColQualStr = - new String(SuspendLocationColumn.SUSPEND_COLUMN.getColumnQualifier().getBytes(), UTF_8); + String suspendColQualStr = SuspendLocationColumn.SUSPEND_QUAL; if (qualStr.equals(suspendColQualStr)) { try { From 90bf2f7d095e60dc5be1f1c721589b13b8797c75 Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Tue, 24 Sep 2024 11:11:31 -0400 Subject: [PATCH 05/18] Fix error with Metric description usage --- .../apache/accumulo/tserver/metrics/TabletServerMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java index 134c604a671..7f4567d307d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java @@ -90,7 +90,7 @@ public void registerMetrics(MeterRegistry registry) { Gauge .builder(TSERVER_MAJC_QUEUED.getName(), util, TabletServerMetricsUtil::getMajorCompactionsQueued) - .description(TSERVER_MINC_QUEUED.getDescription()).register(registry); + .description(TSERVER_MAJC_QUEUED.getDescription()).register(registry); Gauge .builder(TSERVER_MINC_RUNNING.getName(), util, TabletServerMetricsUtil::getMinorCompactions) .description(TSERVER_MINC_RUNNING.getDescription()).register(registry); From 99326de4c07693c053d6f22181c199c1b1501ed9 Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Wed, 25 Sep 2024 10:20:30 -0400 Subject: [PATCH 06/18] Bug fix admin cmd (#4923) This fixes a bug with the admin "checkTablets" command (and maybe others) where the ZK connection is closed but is attempted to be accessed again leading to an exception. This makes sure the threads that try to access ZK are stopped before closing the connection Co-authored-by: Keith Turner --- .../src/main/java/org/apache/accumulo/server/util/Admin.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index 781037f66e6..992470c1fae 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -508,6 +508,7 @@ public void execute(final String[] args) { log.error("{}", e.getMessage(), e); System.exit(3); } finally { + context.close(); SingletonManager.setMode(Mode.CLOSED); } } From 02193118d4bf39e43bcafd66ff8fa858bfcd0c74 Mon Sep 17 00:00:00 2001 From: "Dom G." Date: Wed, 25 Sep 2024 15:07:42 -0400 Subject: [PATCH 07/18] Improvements to FateMetrics (#4924) * Improvements to FateMetrics --- .../metrics/fate/FateMetricValues.java | 26 ++--- .../manager/metrics/fate/FateMetrics.java | 102 ++++++------------ 2 files changed, 47 insertions(+), 81 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java index 57a561aa7b4..702eaa7e565 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java @@ -19,12 +19,14 @@ package org.apache.accumulo.manager.metrics.fate; import java.util.Collections; +import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.accumulo.core.fate.AdminUtil; import org.apache.accumulo.core.fate.ReadOnlyTStore; +import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; import org.apache.accumulo.server.ServerContext; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -44,12 +46,12 @@ class FateMetricValues { private final long zkFateChildOpsTotal; private final long zkConnectionErrors; - private final Map txStateCounters; + private final EnumMap txStateCounters; private final Map opTypeCounters; private FateMetricValues(final long updateTime, final long currentFateOps, final long zkFateChildOpsTotal, final long zkConnectionErrors, - final Map txStateCounters, final Map opTypeCounters) { + final EnumMap txStateCounters, final Map opTypeCounters) { this.updateTime = updateTime; this.currentFateOps = currentFateOps; this.zkFateChildOpsTotal = zkFateChildOpsTotal; @@ -75,7 +77,7 @@ long getZkConnectionErrors() { * * @return a map of transaction status counters. */ - Map getTxStateCounters() { + EnumMap getTxStateCounters() { return txStateCounters; } @@ -115,9 +117,9 @@ public static FateMetricValues getFromZooKeeper(final ServerContext context, builder.withCurrentFateOps(currFates.size()); // states are enumerated - create new map with counts initialized to 0. - Map states = new TreeMap<>(); - for (ReadOnlyTStore.TStatus t : ReadOnlyTStore.TStatus.values()) { - states.put(t.name(), 0L); + EnumMap states = new EnumMap<>(TStatus.class); + for (TStatus t : TStatus.values()) { + states.put(t, 0L); } // op types are dynamic, no count initialization needed - clearing prev values will @@ -126,7 +128,7 @@ public static FateMetricValues getFromZooKeeper(final ServerContext context, for (AdminUtil.TransactionStatus tx : currFates) { - String stateName = tx.getStatus().name(); + TStatus stateName = tx.getStatus(); // incr count for state states.merge(stateName, 1L, Long::sum); @@ -182,15 +184,15 @@ static class Builder { private long zkFateChildOpsTotal = 0; private long zkConnectionErrors = 0; - private final Map txStateCounters; + private final EnumMap txStateCounters; private Map opTypeCounters; Builder() { // states are enumerated - create new map with counts initialized to 0. - txStateCounters = new TreeMap<>(); - for (ReadOnlyTStore.TStatus t : ReadOnlyTStore.TStatus.values()) { - txStateCounters.put(t.name(), 0L); + txStateCounters = new EnumMap<>(TStatus.class); + for (TStatus t : TStatus.values()) { + txStateCounters.put(t, 0L); } opTypeCounters = Collections.emptyMap(); @@ -216,7 +218,7 @@ Builder withZkConnectionErrors(final long value) { return this; } - Builder withTxStateCounters(final Map txStateCounters) { + Builder withTxStateCounters(final EnumMap txStateCounters) { this.txStateCounters.putAll(txStateCounters); return this; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java index ebbbec43166..18e376e48ce 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java @@ -24,7 +24,7 @@ import static org.apache.accumulo.core.metrics.Metric.FATE_TX; import static org.apache.accumulo.core.metrics.Metric.FATE_TYPE_IN_PROGRESS; -import java.util.List; +import java.util.EnumMap; import java.util.Map.Entry; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -33,6 +33,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.ReadOnlyTStore; +import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -41,9 +42,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; public class FateMetrics implements MetricsProducer { @@ -60,16 +61,10 @@ public class FateMetrics implements MetricsProducer { private final String fateRootPath; private final long refreshDelay; - private AtomicLong totalCurrentOpsGauge; - private AtomicLong totalOpsGauge; - private AtomicLong fateErrorsGauge; - private AtomicLong newTxGauge; - private AtomicLong submittedTxGauge; - private AtomicLong inProgressTxGauge; - private AtomicLong failedInProgressTxGauge; - private AtomicLong failedTxGauge; - private AtomicLong successfulTxGauge; - private AtomicLong unknownTxGauge; + private final AtomicLong totalCurrentOpsCount = new AtomicLong(0); + private final AtomicLong totalOpsCount = new AtomicLong(0); + private final AtomicLong fateErrorsCount = new AtomicLong(0); + private final EnumMap txStatusCounters = new EnumMap<>(TStatus.class); public FateMetrics(final ServerContext context, final long minimumRefreshDelay) { @@ -88,6 +83,10 @@ public FateMetrics(final ServerContext context, final long minimumRefreshDelay) "FATE Metrics - Interrupt received while initializing zoo store"); } + for (TStatus status : TStatus.values()) { + txStatusCounters.put(status, new AtomicLong(0)); + } + } private void update() { @@ -95,70 +94,35 @@ private void update() { FateMetricValues metricValues = FateMetricValues.getFromZooKeeper(context, fateRootPath, zooStore); - totalCurrentOpsGauge.set(metricValues.getCurrentFateOps()); - totalOpsGauge.set(metricValues.getZkFateChildOpsTotal()); - fateErrorsGauge.set(metricValues.getZkConnectionErrors()); - - for (Entry vals : metricValues.getTxStateCounters().entrySet()) { - switch (ReadOnlyTStore.TStatus.valueOf(vals.getKey())) { - case NEW: - newTxGauge.set(vals.getValue()); - break; - case SUBMITTED: - submittedTxGauge.set(vals.getValue()); - break; - case IN_PROGRESS: - inProgressTxGauge.set(vals.getValue()); - break; - case FAILED_IN_PROGRESS: - failedInProgressTxGauge.set(vals.getValue()); - break; - case FAILED: - failedTxGauge.set(vals.getValue()); - break; - case SUCCESSFUL: - successfulTxGauge.set(vals.getValue()); - break; - case UNKNOWN: - unknownTxGauge.set(vals.getValue()); - break; - default: - log.warn("Unhandled status type: {}", vals.getKey()); + totalCurrentOpsCount.set(metricValues.getCurrentFateOps()); + totalOpsCount.set(metricValues.getZkFateChildOpsTotal()); + fateErrorsCount.set(metricValues.getZkConnectionErrors()); + + for (Entry entry : metricValues.getTxStateCounters().entrySet()) { + AtomicLong counter = txStatusCounters.get(entry.getKey()); + if (counter != null) { + counter.set(entry.getValue()); + } else { + log.warn("Unhandled TStatus: {}", entry.getKey()); } } - metricValues.getOpTypeCounters().forEach((name, count) -> { - Metrics.gauge(FATE_TYPE_IN_PROGRESS.getName(), Tags.of(OP_TYPE_TAG, name), count); - }); + metricValues.getOpTypeCounters().forEach((name, count) -> Metrics + .gauge(FATE_TYPE_IN_PROGRESS.getName(), Tags.of(OP_TYPE_TAG, name), count)); } @Override public void registerMetrics(final MeterRegistry registry) { - totalCurrentOpsGauge = registry.gauge(FATE_OPS.getName(), new AtomicLong(0)); - totalOpsGauge = registry.gauge(FATE_OPS_ACTIVITY.getName(), new AtomicLong(0)); - fateErrorsGauge = registry.gauge(FATE_ERRORS.getName(), - List.of(Tag.of("type", "zk.connection")), new AtomicLong(0)); - newTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.NEW.name().toLowerCase())), - new AtomicLong(0)); - submittedTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.SUBMITTED.name().toLowerCase())), - new AtomicLong(0)); - inProgressTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.IN_PROGRESS.name().toLowerCase())), - new AtomicLong(0)); - failedInProgressTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS.name().toLowerCase())), - new AtomicLong(0)); - failedTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.FAILED.name().toLowerCase())), - new AtomicLong(0)); - successfulTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.SUCCESSFUL.name().toLowerCase())), - new AtomicLong(0)); - unknownTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.UNKNOWN.name().toLowerCase())), - new AtomicLong(0)); + Gauge.builder(FATE_OPS.getName(), totalCurrentOpsCount, AtomicLong::get) + .description(FATE_OPS.getDescription()).register(registry); + Gauge.builder(FATE_OPS_ACTIVITY.getName(), totalOpsCount, AtomicLong::get) + .description(FATE_OPS_ACTIVITY.getDescription()).register(registry); + Gauge.builder(FATE_ERRORS.getName(), fateErrorsCount, AtomicLong::get) + .description(FATE_ERRORS.getDescription()).tags("type", "zk.connection").register(registry); + + txStatusCounters.forEach((status, counter) -> Gauge + .builder(FATE_TX.getName(), counter, AtomicLong::get).description(FATE_TX.getDescription()) + .tags("state", status.name().toLowerCase()).register(registry)); update(); From fe998f73a0bb7f48c9172249db5e7ae7187c6bd8 Mon Sep 17 00:00:00 2001 From: "Dom G." Date: Thu, 26 Sep 2024 15:24:20 -0400 Subject: [PATCH 08/18] Fix bug with calculating remaining timeout in TabletServerBatchReaderIterator (#4893) --- .../core/clientImpl/TabletServerBatchReaderIterator.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java index f43f571f6c4..bf47678b9a7 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java @@ -616,8 +616,6 @@ private ScanServerData rebinToScanServers(Map>> // get a snapshot of this once,not each time the plugin request it var scanAttemptsSnapshot = scanAttempts.snapshot(); - Duration timeoutLeft = Duration.ofMillis(retryTimeout - startTime.elapsed(MILLISECONDS)); - ScanServerSelector.SelectorParameters params = new ScanServerSelector.SelectorParameters() { @Override public Collection getTablets() { @@ -637,6 +635,7 @@ public Map getHints() { @Override public Optional waitUntil(Supplier> condition, Duration maxWaitTime, String description) { + Duration timeoutLeft = Duration.ofMillis(retryTimeout - startTime.elapsed(MILLISECONDS)); return ThriftScanner.waitUntil(condition, maxWaitTime, description, timeoutLeft, context, tableId, log); } From 2625b747dff9d26bb277d81abf7d6c6e58a563bf Mon Sep 17 00:00:00 2001 From: Daniel Roberts Date: Mon, 30 Sep 2024 12:54:01 +0000 Subject: [PATCH 09/18] Standardize shell command loggers (#4899) * Use the Shell logger for all log messages Switch all remaining shell commands to use the shell logger instead of specific loggers. * Adds the shell logger name to the properties file This assists with the deprecation of the `--debug` shell option so users can modify a single line vs needing to discover the logger name * Add core package logger name Adds a logger for the core package so users only need to modify the log levels to recreate 1.10.x shell `-debug` flag functionality --- assemble/conf/log4j2.properties | 6 ++++++ .../shell/commands/ActiveCompactionHelper.java | 7 ++----- .../shell/commands/DeleteTableCommand.java | 5 +---- .../shell/commands/ListTabletsCommand.java | 18 +++++++----------- .../accumulo/shell/commands/MaxRowCommand.java | 10 +++------- .../ShellPluginConfigurationCommand.java | 13 ++++--------- 6 files changed, 23 insertions(+), 36 deletions(-) diff --git a/assemble/conf/log4j2.properties b/assemble/conf/log4j2.properties index 6906e9bb806..9829e61304f 100644 --- a/assemble/conf/log4j2.properties +++ b/assemble/conf/log4j2.properties @@ -33,6 +33,12 @@ appender.console.layout.pattern = %style{%d{ISO8601}}{dim,cyan} %style{[}{red}%s logger.shellaudit.name = org.apache.accumulo.shell.Shell.audit logger.shellaudit.level = warn +logger.core.name = org.apache.accumulo.core +logger.core.level = info + +logger.shell.name = org.apache.accumulo.shell.Shell +logger.shell.level = info + logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = error diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java index 8f6b9259ae8..62dd93f2697 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java @@ -32,13 +32,10 @@ import org.apache.accumulo.core.client.admin.ActiveCompaction; import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.util.DurationFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.accumulo.shell.Shell; class ActiveCompactionHelper { - private static final Logger log = LoggerFactory.getLogger(ActiveCompactionHelper.class); - private static final Comparator COMPACTION_AGE_DESCENDING = Comparator.comparingLong(ActiveCompaction::getAge).reversed(); @@ -120,7 +117,7 @@ public static Stream activeCompactionsForServer(String tserver, return instanceOps.getActiveCompactions(tserver).stream().sorted(COMPACTION_AGE_DESCENDING) .map(ActiveCompactionHelper::formatActiveCompactionLine); } catch (Exception e) { - log.debug("Failed to list active compactions for server {}", tserver, e); + Shell.log.debug("Failed to list active compactions for server {}", tserver, e); return Stream.of(tserver + " ERROR " + e.getMessage()); } } diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/DeleteTableCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/DeleteTableCommand.java index 86cd8aa27e9..7d0c90af366 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/DeleteTableCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/DeleteTableCommand.java @@ -28,11 +28,8 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class DeleteTableCommand extends TableOperation { - private static final Logger log = LoggerFactory.getLogger(DeleteTableCommand.class); private Option forceOpt; @@ -78,7 +75,7 @@ protected void pruneTables(Set tables) { String table = tableNames.next(); Pair qualifiedName = TableNameUtil.qualify(table); if (Namespace.ACCUMULO.name().equals(qualifiedName.getFirst())) { - log.trace("Removing table from deletion set: {}", table); + Shell.log.trace("Removing table from deletion set: {}", table); tableNames.remove(); } } diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ListTabletsCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ListTabletsCommand.java index 71b2a18b75d..93adcfe10a3 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ListTabletsCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ListTabletsCommand.java @@ -47,8 +47,6 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -58,8 +56,6 @@ */ public class ListTabletsCommand extends Command { - private static final Logger log = LoggerFactory.getLogger(ListTabletsCommand.class); - private Option outputFileOpt; private Option optTablePattern; private Option optHumanReadable; @@ -70,7 +66,7 @@ public class ListTabletsCommand extends Command { public int execute(String fullCommand, CommandLine cl, Shell shellState) throws Exception { final Set tableInfoSet = populateTables(cl, shellState); if (tableInfoSet.isEmpty()) { - log.warn("No tables found that match your criteria"); + Shell.log.warn("No tables found that match your criteria"); return 0; } boolean humanReadable = cl.hasOption(optHumanReadable.getOpt()); @@ -149,7 +145,7 @@ private Set populateTables(final CommandLine cl, final Shell shellSta TableId id = TableId.of(tableIdString); tableSet.add(new TableInfo(name, id)); } else { - log.warn("Table not found: {}", name); + Shell.log.warn("Table not found: {}", name); } }); return tableSet; @@ -162,7 +158,7 @@ private Set populateTables(final CommandLine cl, final Shell shellSta TableId id = TableId.of(idString); tableSet.add(new TableInfo(table, id)); } else { - log.warn("Table not found: {}", table); + Shell.log.warn("Table not found: {}", table); } return tableSet; } @@ -216,14 +212,14 @@ public int hashCode() { private List getTabletRowInfo(Shell shellState, TableInfo tableInfo) throws Exception { - log.trace("scan metadata for tablet info table name: \'{}\', tableId: \'{}\' ", tableInfo.name, - tableInfo.id); + Shell.log.trace("scan metadata for tablet info table name: \'{}\', tableId: \'{}\' ", + tableInfo.name, tableInfo.id); List tResults = getMetadataInfo(shellState, tableInfo); - if (log.isTraceEnabled()) { + if (Shell.log.isTraceEnabled()) { for (TabletRowInfo tabletRowInfo : tResults) { - log.trace("Tablet info: {}", tabletRowInfo); + Shell.log.trace("Tablet info: {}", tabletRowInfo); } } diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/MaxRowCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/MaxRowCommand.java index 32eab84c58b..9c44154d0a4 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/MaxRowCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/MaxRowCommand.java @@ -23,23 +23,19 @@ import org.apache.accumulo.shell.Shell; import org.apache.commons.cli.CommandLine; import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class MaxRowCommand extends ScanCommand { - private static final Logger log = LoggerFactory.getLogger(MaxRowCommand.class); - @Override public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws Exception { final String tableName = OptUtil.getTableOpt(cl, shellState); @SuppressWarnings("deprecation") - final org.apache.accumulo.core.util.interpret.ScanInterpreter interpeter = + final org.apache.accumulo.core.util.interpret.ScanInterpreter interpreter = getInterpreter(cl, tableName, shellState); - final Range range = getRange(cl, interpeter); + final Range range = getRange(cl, interpreter); final Authorizations auths = getAuths(cl, shellState); final Text startRow = range.getStartKey() == null ? null : range.getStartKey().getRow(); final Text endRow = range.getEndKey() == null ? null : range.getEndKey().getRow(); @@ -51,7 +47,7 @@ public int execute(final String fullCommand, final CommandLine cl, final Shell s shellState.getWriter().println(max); } } catch (Exception e) { - log.debug("Could not get shell state.", e); + Shell.log.debug("Could not get shell state.", e); } return 0; diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ShellPluginConfigurationCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ShellPluginConfigurationCommand.java index 3614c621db6..06e3dcd19b2 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ShellPluginConfigurationCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ShellPluginConfigurationCommand.java @@ -34,7 +34,6 @@ import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; -import org.slf4j.LoggerFactory; public abstract class ShellPluginConfigurationCommand extends Command { private Option removePluginOption, pluginClassOption, listPluginOption; @@ -115,20 +114,16 @@ public static Class getPluginClass(final String tableName, pluginClazz = shellState.getClassLoader(cl, shellState).loadClass(ent.getValue()).asSubclass(clazz); } catch (ClassNotFoundException e) { - LoggerFactory.getLogger(ShellPluginConfigurationCommand.class).error("Class not found {}", - e.getMessage()); + Shell.log.error("Class not found {}", e.getMessage()); return null; } catch (ParseException e) { - LoggerFactory.getLogger(ShellPluginConfigurationCommand.class) - .error("Error parsing table: {} {}", Arrays.toString(args), e.getMessage()); + Shell.log.error("Error parsing table: {} {}", Arrays.toString(args), e.getMessage()); return null; } catch (TableNotFoundException e) { - LoggerFactory.getLogger(ShellPluginConfigurationCommand.class) - .error("Table not found: {} {}", tableName, e.getMessage()); + Shell.log.error("Table not found: {} {}", tableName, e.getMessage()); return null; } catch (Exception e) { - LoggerFactory.getLogger(ShellPluginConfigurationCommand.class).error("Error: {}", - e.getMessage()); + Shell.log.error("Error: {}", e.getMessage()); return null; } From 3ec800b9b9225e1b2d31efb9cd269c6c72666ef1 Mon Sep 17 00:00:00 2001 From: Christopher Tubbs Date: Thu, 19 Sep 2024 15:19:45 -0400 Subject: [PATCH 10/18] Fix trivial warnings * Avoid raw generic on ScanSession * Remove unused Logger --- .../src/main/java/org/apache/accumulo/tserver/ScanServer.java | 2 +- .../org/apache/accumulo/tserver/session/SessionManager.java | 2 +- .../org/apache/accumulo/tserver/tablet/SnapshotTablet.java | 4 ---- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 0e74b8c732d..688a08b1712 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -757,7 +757,7 @@ ScanReservation reserveFilesInstrumented(long scanId) throws NoSuchScanIDExcepti } protected ScanReservation reserveFiles(long scanId) throws NoSuchScanIDException { - var session = (ScanSession) sessionManager.getSession(scanId); + var session = (ScanSession) sessionManager.getSession(scanId); if (session == null) { throw new NoSuchScanIDException(); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index 497287cda6f..d1bf4d6f8cf 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -350,7 +350,7 @@ private long countZombieScans(long reportTimeMillis) { return Stream.concat(deferredCleanupQueue.stream(), sessions.values().stream()) .filter(session -> { if (session instanceof ScanSession) { - var scanSession = (ScanSession) session; + var scanSession = (ScanSession) session; synchronized (scanSession) { var scanTask = scanSession.getScanTask(); if (scanTask != null && scanSession.getState() == State.REMOVED diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java index 985f7339dd5..c2de47120b0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java @@ -36,8 +36,6 @@ import org.apache.accumulo.tserver.TabletHostingServer; import org.apache.accumulo.tserver.TabletServerResourceManager; import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A tablet that can not be written to and operates off of a snapshot of tablet metadata for its @@ -46,8 +44,6 @@ */ public class SnapshotTablet extends TabletBase { - private static final Logger log = LoggerFactory.getLogger(SnapshotTablet.class); - private final TabletHostingServer server; private final SortedMap files; private final TabletServerResourceManager.TabletResourceManager tabletResources; From 26b3e4d149ab03589eb6d43fd802383f5bae5e66 Mon Sep 17 00:00:00 2001 From: Christopher Tubbs Date: Mon, 30 Sep 2024 18:52:27 -0400 Subject: [PATCH 11/18] Fix more trivial warnings * Remove unused log * Suppress unused private constructors that exist for Gson --- .../java/org/apache/accumulo/core/clientImpl/bulk/Bulk.java | 3 +++ .../java/org/apache/accumulo/core/lock/ServiceLockData.java | 1 + .../core/metadata/schema/ExternalCompactionFinalState.java | 1 + .../org/apache/accumulo/server/metadata/RootGcCandidates.java | 1 + .../accumulo/server/util/fateCommand/FateSummaryReport.java | 1 + .../accumulo/server/util/fateCommand/FateTxnDetails.java | 1 + .../server/util/serviceStatus/ServiceStatusReport.java | 1 + .../accumulo/server/util/serviceStatus/StatusSummary.java | 1 + .../org/apache/accumulo/tserver/log/ResolvedSortedLog.java | 4 ---- 9 files changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/Bulk.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/Bulk.java index de178565ae6..23f32c4f70e 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/Bulk.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/Bulk.java @@ -42,6 +42,7 @@ public static class Mapping { private Collection files; // Gson requires a default constructor when JDK Unsafe usage is disabled + @SuppressWarnings("unused") private Mapping() {} public Mapping(KeyExtent tablet, Files files) { @@ -71,6 +72,7 @@ public static class Tablet { private byte[] prevEndRow; // Gson requires a default constructor when JDK Unsafe usage is disabled + @SuppressWarnings("unused") private Tablet() {} public Tablet(Text endRow, Text prevEndRow) { @@ -111,6 +113,7 @@ public static class FileInfo { long estEntries; // Gson requires a default constructor when JDK Unsafe usage is disabled + @SuppressWarnings("unused") private FileInfo() {} public FileInfo(String fileName, long estFileSize, long estNumEntries) { diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockData.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockData.java index ece32eff00b..442a6192ce3 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockData.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockData.java @@ -242,6 +242,7 @@ private static class ServiceDescriptorGson { private String group; // default constructor required for Gson + @SuppressWarnings("unused") public ServiceDescriptorGson() {} public ServiceDescriptorGson(UUID uuid, ThriftService service, String address, String group) { diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java index fb4caa47c32..23902d7f511 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java @@ -81,6 +81,7 @@ private static class Extent { String per; // Gson requires a default constructor + @SuppressWarnings("unused") private Extent() {} Extent(KeyExtent extent) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java index 7d3ec1b836a..4b9fcc7932d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java @@ -53,6 +53,7 @@ private static class Data { private SortedMap> candidates; // Gson requires a default constructor when JDK Unsafe usage is disabled + @SuppressWarnings("unused") private Data() {} public Data(int version, SortedMap> candidates) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java index c3e8990b44f..174c70e907e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java @@ -56,6 +56,7 @@ public class FateSummaryReport { private transient Map idsToNameMap; // Gson requires a default constructor when JDK Unsafe usage is disabled + @SuppressWarnings("unused") private FateSummaryReport() {} public FateSummaryReport(Map idsToNameMap, diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java index 6a1d14bc1df..78904857f11 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java @@ -39,6 +39,7 @@ public class FateTxnDetails implements Comparable { private List locksWaiting = List.of(); // Default constructor for Gson + @SuppressWarnings("unused") private FateTxnDetails() {} /** diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/ServiceStatusReport.java b/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/ServiceStatusReport.java index 629242eecf2..b323dbaf77f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/ServiceStatusReport.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/ServiceStatusReport.java @@ -53,6 +53,7 @@ public class ServiceStatusReport { private Map summaries; // Gson requires a default constructor when JDK Unsafe usage is disabled + @SuppressWarnings("unused") private ServiceStatusReport() {} public ServiceStatusReport(final Map summaries, final boolean noHosts) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/StatusSummary.java b/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/StatusSummary.java index ec5a044f1af..8c28d012a2b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/StatusSummary.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/StatusSummary.java @@ -31,6 +31,7 @@ public class StatusSummary { private int errorCount; // Default constructor required for Gson + @SuppressWarnings("unused") private StatusSummary() {} public StatusSummary(ServiceStatusReport.ReportKey serviceType, final Set resourceGroups, diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java index 184b628ad2e..b9faa8c8f76 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java @@ -32,8 +32,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Write ahead logs have two paths in DFS. There is the path of the original unsorted walog and the @@ -42,8 +40,6 @@ */ public class ResolvedSortedLog { - private static final Logger log = LoggerFactory.getLogger(ResolvedSortedLog.class); - private final SortedSet children; private final LogEntry origin; private final Path sortedLogDir; From c40645964ef1bd02a5ab7ac04dead53d03274658 Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Wed, 2 Oct 2024 12:40:21 -0400 Subject: [PATCH 12/18] trivial change to how context is closed in Admin.execute --- .../org/apache/accumulo/server/util/Admin.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index 992470c1fae..f72910687a9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -424,15 +424,13 @@ public void execute(final String[] args) { return; } - ServerContext context = opts.getServerContext(); + try (ServerContext context = opts.getServerContext()) { - AccumuloConfiguration conf = context.getConfiguration(); - // Login as the server on secure HDFS - if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { - SecurityUtil.serverLogin(conf); - } - - try { + AccumuloConfiguration conf = context.getConfiguration(); + // Login as the server on secure HDFS + if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + SecurityUtil.serverLogin(conf); + } int rc = 0; @@ -508,7 +506,6 @@ public void execute(final String[] args) { log.error("{}", e.getMessage(), e); System.exit(3); } finally { - context.close(); SingletonManager.setMode(Mode.CLOSED); } } From 37346850c05ebab07647e90412b6959c14554f7e Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Thu, 3 Oct 2024 07:45:38 -0400 Subject: [PATCH 13/18] Removed cached configuration times (#4936) Removed maps in LargestFirstMemoryManager that cached time duration property values by table id. #4890 implemented caching for calls to AccumuloConfiguration.getTimeInMillis, so this higher level caching can be removed. Closes #4860 --- .../memory/LargestFirstMemoryManager.java | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java index 42623f70271..b1c361f9aa8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java @@ -19,7 +19,6 @@ package org.apache.accumulo.tserver.memory; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map.Entry; import java.util.TreeMap; @@ -56,8 +55,6 @@ public class LargestFirstMemoryManager { // The fraction of memory that needs to be used before we begin flushing. private double compactionThreshold; private long maxObserved; - private final HashMap mincIdleThresholds = new HashMap<>(); - private final HashMap mincAgeThresholds = new HashMap<>(); private ServerContext context = null; private static class TabletInfo { @@ -140,15 +137,13 @@ public LargestFirstMemoryManager() { @SuppressWarnings("deprecation") protected long getMinCIdleThreshold(KeyExtent extent) { - TableId tableId = extent.tableId(); - return mincIdleThresholds.computeIfAbsent(tableId, tid -> context.getTableConfiguration(tid) - .getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME)); + return context.getTableConfiguration(extent.tableId()) + .getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME); } protected long getMaxAge(KeyExtent extent) { - TableId tableId = extent.tableId(); - return mincAgeThresholds.computeIfAbsent(tableId, tid -> context.getTableConfiguration(tid) - .getTimeInMillis(Property.TABLE_MINC_COMPACT_MAXAGE)); + return context.getTableConfiguration(extent.tableId()) + .getTimeInMillis(Property.TABLE_MINC_COMPACT_MAXAGE); } protected boolean tableExists(TableId tableId) { @@ -168,9 +163,6 @@ public List tabletsToMinorCompact(List tablets) { final int maxMinCs = maxConcurrentMincs * numWaitingMultiplier; - mincIdleThresholds.clear(); - mincAgeThresholds.clear(); - final List tabletsToMinorCompact = new ArrayList<>(); LargestMap largestMemTablets = new LargestMap(maxMinCs); From 0c4c31625b2eb6680eef4818b61b324a1914a794 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Thu, 3 Oct 2024 16:12:00 -0400 Subject: [PATCH 14/18] Enable user to specify specific server for Thrift client calls (#4880) Allow the user to set a system property to the address of a server to use when making calls to the Client Thrift API. Example: org.apache.accumulo.client.rpc.debug.host="localhost:1234" Closes #4823 --- .../core/rpc/clients/TServerClient.java | 77 ++++++++++++++----- .../functional/DebugClientConnectionIT.java | 71 +++++++++++++++++ 2 files changed, 127 insertions(+), 21 deletions(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java index 4027f4b0c98..c09f46ab00c 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java @@ -23,6 +23,8 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -54,6 +56,8 @@ public interface TServerClient { + static final String DEBUG_HOST = "org.apache.accumulo.client.rpc.debug.host"; + Pair getThriftServerConnection(ClientContext context, boolean preferCachedConnections) throws TTransportException; @@ -62,7 +66,9 @@ default Pair getThriftServerConnection(Logger LOG, ThriftClientTypes cachedTransport = context.getTransportPool().getAnyCachedTransport(type); if (cachedTransport != null) { @@ -79,28 +85,40 @@ default Pair getThriftServerConnection(Logger LOG, ThriftClientTypes serverPaths = new ArrayList<>(); - zc.getChildren(tserverZooPath).forEach(tserverAddress -> { - serverPaths.add(tserverZooPath + "/" + tserverAddress); - }); - if (type == ThriftClientTypes.CLIENT) { - zc.getChildren(sserverZooPath).forEach(sserverAddress -> { - serverPaths.add(sserverZooPath + "/" + sserverAddress); - }); + if (type == ThriftClientTypes.CLIENT && debugHost != null) { + // add all three paths to the set even though they may not be correct. + // The entire set will be checked in the code below to validate + // that the path is correct and the lock is held and will return the + // correct one. + serverPaths.add(tserverZooPath + "/" + debugHost); + serverPaths.add(sserverZooPath + "/" + debugHost); zc.getChildren(compactorZooPath).forEach(compactorGroup -> { - zc.getChildren(compactorZooPath + "/" + compactorGroup).forEach(compactorAddress -> { - serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + compactorAddress); - }); + serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + debugHost); }); - } - - if (serverPaths.isEmpty()) { - if (warned.compareAndSet(false, true)) { - LOG.warn( - "There are no servers serving the {} api: check that zookeeper and accumulo are running.", - type); + } else { + zc.getChildren(tserverZooPath).forEach(tserverAddress -> { + serverPaths.add(tserverZooPath + "/" + tserverAddress); + }); + if (type == ThriftClientTypes.CLIENT) { + zc.getChildren(sserverZooPath).forEach(sserverAddress -> { + serverPaths.add(sserverZooPath + "/" + sserverAddress); + }); + zc.getChildren(compactorZooPath).forEach(compactorGroup -> { + zc.getChildren(compactorZooPath + "/" + compactorGroup).forEach(compactorAddress -> { + serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + compactorAddress); + }); + }); + } + if (serverPaths.isEmpty()) { + if (warned.compareAndSet(false, true)) { + LOG.warn( + "There are no servers serving the {} api: check that zookeeper and accumulo are running.", + type); + } + throw new TTransportException("There are no servers for type: " + type); } - throw new TTransportException("There are no servers for type: " + type); } + Collections.shuffle(serverPaths, RANDOM.get()); for (String serverPath : serverPaths) { @@ -113,10 +131,19 @@ default Pair getThriftServerConnection(Logger LOG, ThriftClientTypes(tserverClientAddress.toString(), client); } catch (TTransportException e) { - LOG.trace("Error creating transport to {}", tserverClientAddress); + if (type == ThriftClientTypes.CLIENT && debugHost != null) { + LOG.error( + "Error creating transport to debug host: {}. If this server is down, then you will need to remove or change the system property {}.", + debugHost, DEBUG_HOST); + } else { + LOG.trace("Error creating transport to {}", tserverClientAddress); + } continue; } } @@ -127,7 +154,15 @@ default Pair getThriftServerConnection(Logger LOG, ThriftClientTypes R execute(Logger LOG, ClientContext context, Exec exec) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java new file mode 100644 index 00000000000..9769f4c10e4 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.UncheckedIOException; +import java.util.List; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.rpc.clients.TServerClient; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class DebugClientConnectionIT extends AccumuloClusterHarness { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(2); + } + + private List tservers = null; + + @BeforeEach + public void getTServerAddresses() { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + tservers = client.instanceOperations().getTabletServers(); + } + assertNotNull(tservers); + assertEquals(2, tservers.size()); + } + + @Test + public void testPreferredConnection() throws Exception { + System.setProperty(TServerClient.DEBUG_HOST, tservers.get(0)); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + assertNotNull(client.instanceOperations().getSiteConfiguration()); + } + System.setProperty(TServerClient.DEBUG_HOST, tservers.get(1)); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + assertNotNull(client.instanceOperations().getSiteConfiguration()); + } + System.setProperty(TServerClient.DEBUG_HOST, "localhost:1"); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + assertThrows(UncheckedIOException.class, + () -> client.instanceOperations().getSiteConfiguration()); + } + } +} From 73b068d87eb7b9cae934cf3d86ad00cdab34db9c Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 4 Oct 2024 13:26:15 -0400 Subject: [PATCH 15/18] Updates to ConsistencyLevel javadoc to provide more information (#4941) Closes #4478 --- .../java/org/apache/accumulo/core/client/ScannerBase.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java index 13bd9c17faa..a9ae3e561ec 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java @@ -48,7 +48,10 @@ public interface ScannerBase extends Iterable>, AutoCloseable { * Consistency level for the scanner. The default level is IMMEDIATE, which means that this * scanner will see keys and values that have been successfully written to a TabletServer. * EVENTUAL means that the scanner may not see the latest data that was written to a TabletServer, - * but may instead see an older version of data. + * but may instead see an older version of data. To use the EVENTUAL consistency level, ScanServer + * processes must be running. If not specifically configured, clients will use the default + * settings ({@code ConfigurableScanServerSelector#PROFILES_DEFAULT}). See + * {@code ConfigurableScanServerSelector} for information on how to configure the client. * * @since 2.1.0 */ From 66229f5f76b68f3f75a1a72f3eb882a97117f9d7 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 7 Oct 2024 11:08:39 -0400 Subject: [PATCH 16/18] fixes bug with waitForBalance signaling (#4947) The code that signals no balancing happened would sometimes skip balancing for user tablets and still signal no balancing happened. Modified to only signal when balancing had run on all levels. --- .../main/java/org/apache/accumulo/manager/Manager.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 77c9da57ae5..d02345905b7 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1035,6 +1035,7 @@ private long balanceTablets() { long totalMigrationsOut = 0; final Map> partitionedMigrations = partitionMigrations(migrationsSnapshot()); + int levelsCompleted = 0; for (DataLevel dl : DataLevel.values()) { if (dl == DataLevel.USER && tabletsNotHosted > 0) { @@ -1072,14 +1073,17 @@ private long balanceTablets() { } } while (migrationsOutForLevel > 0 && (dl == DataLevel.ROOT || dl == DataLevel.METADATA)); totalMigrationsOut += migrationsOutForLevel; + + // increment this at end of loop to signal complete run w/o any continue + levelsCompleted++; } balancerMetrics.assignMigratingCount(migrations::size); - if (totalMigrationsOut == 0) { + if (totalMigrationsOut == 0 && levelsCompleted == DataLevel.values().length) { synchronized (balancedNotifier) { balancedNotifier.notifyAll(); } - } else { + } else if (totalMigrationsOut > 0) { nextEvent.event("Migrating %d more tablets, %d total", totalMigrationsOut, migrations.size()); } From bf3e480662b9ec40ff6bd241a4410d74b56b4670 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 7 Oct 2024 11:08:53 -0400 Subject: [PATCH 17/18] removes unused property (#4948) The property instance.volumes.upgrade.relative was used by 2.1.0 upgrade code is no longer used and is removed in this change. --- .../main/java/org/apache/accumulo/core/conf/Property.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 38616cc2205..9157fec5ffe 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -171,11 +171,6 @@ public enum Property { + "other reserved characters in a URI use standard URI hex encoding. For " + "example replace commas with %2C.", "1.6.0"), - INSTANCE_VOLUMES_UPGRADE_RELATIVE("instance.volumes.upgrade.relative", "", PropertyType.STRING, - "The volume dfs uri containing relative tablet file paths. Relative paths may exist in the metadata from " - + "versions prior to 1.6. This property is only required if a relative path is detected " - + "during the upgrade process and will only be used once.", - "2.1.0"), @Experimental // interface uses unstable internal types, use with caution INSTANCE_SECURITY_AUTHENTICATOR("instance.security.authenticator", "org.apache.accumulo.server.security.handler.ZKAuthenticator", PropertyType.CLASSNAME, From 93e44ec2adfd1d1fc308c3f5f054d8548e24592c Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 7 Oct 2024 11:14:54 -0400 Subject: [PATCH 18/18] limit thread and add queue to monitor append (#4879) Add options to configure the AccumuloMonitorAppender as async (default) with a configurable number of concurrent maxThreads and queueSize. When the queue is full the monitor appender will now drop log messages. This implementation uses an executor and thread pool exclusive to the appender, rather than shared using a global one, which is the default behavior for HttpClient. The maxThreads and queueSize options only have an effect if async is true (which is the default). A user may wish to turn async off for this appender, and wrap this appender with AsyncAppender instead, which may have additional options available. * Additional improvements to monitor appender * Bump to log4j 2.24.0 * Use the annotation processor to generate Log4j2Plugins.dat in the monitor jar's META-INF/, rather than rely on Log4j2's deprecated package scanning to find and register the AccumuloMonitorAppender * Simplify the generics for the AccumuloMonitorAppender.Builder * Add cleanup to executor, to make a best effort to not leave it running when the appender is reconstructed due to configuration changes * Simplify "canAppend" logic by just saving the executor as a member, so we can get the queue size from that * Logs stats about dropped messages, errors, and total messages sent Co-authored-by: Christopher Tubbs --- assemble/conf/log4j2-service.properties | 3 + pom.xml | 8 +- .../util/logging/AccumuloMonitorAppender.java | 130 ++++++++++++++++-- 3 files changed, 128 insertions(+), 13 deletions(-) diff --git a/assemble/conf/log4j2-service.properties b/assemble/conf/log4j2-service.properties index 01e5e283147..984c543da49 100644 --- a/assemble/conf/log4j2-service.properties +++ b/assemble/conf/log4j2-service.properties @@ -71,6 +71,9 @@ appender.monitor.type = AccumuloMonitor appender.monitor.name = MonitorLog appender.monitor.filter.threshold.type = ThresholdFilter appender.monitor.filter.threshold.level = warn +#appender.monitor.async = true +#appender.monitor.maxThreads = 2 +#appender.monitor.queueSize = 1024 logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = error diff --git a/pom.xml b/pom.xml index 1faae076330..a47435ace4d 100644 --- a/pom.xml +++ b/pom.xml @@ -151,6 +151,7 @@ 5.5.0 2.24.1 3.3.6 + 2.24.0 1.34.1 2.0.9 2.0.12 @@ -198,7 +199,7 @@ org.apache.logging.log4j log4j-bom - 2.23.1 + ${version.log4j} pom import @@ -792,6 +793,11 @@ auto-service ${version.auto-service} + + org.apache.logging.log4j + log4j-core + ${version.log4j} + diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java index 7dfde0f3fa1..f6f575eac09 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java @@ -26,8 +26,15 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandlers; import java.util.Optional; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import org.apache.accumulo.core.Constants; @@ -44,6 +51,7 @@ import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.config.Property; import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute; import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory; import com.google.gson.Gson; @@ -60,34 +68,86 @@ public class AccumuloMonitorAppender extends AbstractAppender { @PluginBuilderFactory - public static > B newBuilder() { - return new Builder().asBuilder(); + public static Builder newBuilder() { + return new Builder(); } - public static class Builder> extends AbstractAppender.Builder + public static class Builder extends AbstractAppender.Builder implements org.apache.logging.log4j.core.util.Builder { + @PluginBuilderAttribute + private boolean async = true; + + @PluginBuilderAttribute + private int queueSize = 1024; + + @PluginBuilderAttribute + private int maxThreads = 2; + + public Builder setAsync(boolean async) { + this.async = async; + return this; + } + + public boolean getAsync() { + return async; + } + + public Builder setQueueSize(int size) { + queueSize = size; + return this; + } + + public int getQueueSize() { + return queueSize; + } + + public Builder setMaxThreads(int maxThreads) { + this.maxThreads = maxThreads; + return this; + } + + public int getMaxThreads() { + return maxThreads; + } + @Override public AccumuloMonitorAppender build() { return new AccumuloMonitorAppender(getName(), getFilter(), isIgnoreExceptions(), - getPropertyArray()); + getPropertyArray(), getQueueSize(), getMaxThreads(), getAsync()); } } private final Gson gson = new Gson(); - private final HttpClient httpClient = HttpClient.newHttpClient(); + private final HttpClient httpClient; private final Supplier> monitorLocator; + private final ThreadPoolExecutor executor; + private final boolean async; + private final int queueSize; + private final AtomicLong appends = new AtomicLong(0); + private final AtomicLong discards = new AtomicLong(0); + private final AtomicLong errors = new AtomicLong(0); + private final ConcurrentMap statusCodes = new ConcurrentSkipListMap<>(); private ServerContext context; private String path; private Pair> lastResult = new Pair<>(0L, Optional.empty()); private AccumuloMonitorAppender(final String name, final Filter filter, - final boolean ignoreExceptions, final Property[] properties) { + final boolean ignoreExceptions, final Property[] properties, int queueSize, int maxThreads, + boolean async) { super(name, filter, null, ignoreExceptions, properties); - final ZcStat stat = new ZcStat(); - monitorLocator = () -> { + + this.executor = async ? new ThreadPoolExecutor(0, maxThreads, 30, TimeUnit.SECONDS, + new LinkedBlockingQueue()) : null; + final var builder = HttpClient.newBuilder(); + this.httpClient = (async ? builder.executor(executor) : builder).build(); + this.queueSize = queueSize; + this.async = async; + + final var stat = new ZcStat(); + this.monitorLocator = () -> { // lazily set up context/path if (context == null) { context = new ServerContext(SiteConfiguration.auto()); @@ -106,9 +166,24 @@ private AccumuloMonitorAppender(final String name, final Filter filter, }; } + private String getStats() { + return "discards:" + discards.get() + " errors:" + errors.get() + " appends:" + appends.get() + + " statusCodes:" + statusCodes; + } + + private void processResponse(HttpResponse response) { + var statusCode = response.statusCode(); + statusCodes.computeIfAbsent(statusCode, sc -> new AtomicLong()).getAndIncrement(); + if (statusCode >= 400 && statusCode < 600) { + error("Unable to send HTTP in appender [" + getName() + "]. Status: " + statusCode + " " + + getStats()); + } + } + @Override public void append(final LogEvent event) { - monitorLocator.get().ifPresent(uri -> { + appends.getAndIncrement(); + monitorLocator.get().ifPresentOrElse(uri -> { try { var pojo = new SingleLogEvent(); pojo.timestamp = event.getTimeMillis(); @@ -122,14 +197,45 @@ public void append(final LogEvent event) { var req = HttpRequest.newBuilder(uri).POST(BodyPublishers.ofString(jsonEvent, UTF_8)) .setHeader("Content-Type", "application/json").build(); - @SuppressWarnings("unused") - var future = httpClient.sendAsync(req, BodyHandlers.discarding()); + + if (async) { + if (executor.getQueue().size() < queueSize) { + httpClient.sendAsync(req, BodyHandlers.discarding()).thenAccept(this::processResponse) + .exceptionally(e -> { + errors.getAndIncrement(); + error("Unable to send HTTP in appender [" + getName() + "] " + getStats(), event, + e); + return null; + }); + } else { + discards.getAndIncrement(); + error("Unable to send HTTP in appender [" + getName() + "]. Queue full. " + getStats()); + } + } else { + processResponse(httpClient.send(req, BodyHandlers.discarding())); + } } catch (final Exception e) { - error("Unable to send HTTP in appender [" + getName() + "]", event, e); + errors.getAndIncrement(); + error("Unable to send HTTP in appender [" + getName() + "] " + getStats(), event, e); } + }, () -> { + discards.getAndIncrement(); + error("Unable to send HTTP in appender [" + getName() + "]. No monitor is running. " + + getStats()); }); } + @Override + protected boolean stop(long timeout, TimeUnit timeUnit, boolean changeLifeCycleState) { + if (changeLifeCycleState) { + setStopping(); + } + if (executor != null) { + executor.shutdown(); + } + return super.stop(timeout, timeUnit, changeLifeCycleState); + } + @SuppressFBWarnings(value = "INFORMATION_EXPOSURE_THROUGH_AN_ERROR_MESSAGE", justification = "throwable is intended to be printed to output stream, to send to monitor") private static String throwableToStacktrace(Throwable t) {