diff --git a/assemble/conf/log4j2-service.properties b/assemble/conf/log4j2-service.properties index 01e5e283147..984c543da49 100644 --- a/assemble/conf/log4j2-service.properties +++ b/assemble/conf/log4j2-service.properties @@ -71,6 +71,9 @@ appender.monitor.type = AccumuloMonitor appender.monitor.name = MonitorLog appender.monitor.filter.threshold.type = ThresholdFilter appender.monitor.filter.threshold.level = warn +#appender.monitor.async = true +#appender.monitor.maxThreads = 2 +#appender.monitor.queueSize = 1024 logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = error diff --git a/assemble/conf/log4j2.properties b/assemble/conf/log4j2.properties index 6906e9bb806..9829e61304f 100644 --- a/assemble/conf/log4j2.properties +++ b/assemble/conf/log4j2.properties @@ -33,6 +33,12 @@ appender.console.layout.pattern = %style{%d{ISO8601}}{dim,cyan} %style{[}{red}%s logger.shellaudit.name = org.apache.accumulo.shell.Shell.audit logger.shellaudit.level = warn +logger.core.name = org.apache.accumulo.core +logger.core.level = info + +logger.shell.name = org.apache.accumulo.shell.Shell +logger.shell.level = info + logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = error diff --git a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java index 13bd9c17faa..a9ae3e561ec 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java @@ -48,7 +48,10 @@ public interface ScannerBase extends Iterable>, AutoCloseable { * Consistency level for the scanner. The default level is IMMEDIATE, which means that this * scanner will see keys and values that have been successfully written to a TabletServer. * EVENTUAL means that the scanner may not see the latest data that was written to a TabletServer, - * but may instead see an older version of data. + * but may instead see an older version of data. To use the EVENTUAL consistency level, ScanServer + * processes must be running. If not specifically configured, clients will use the default + * settings ({@code ConfigurableScanServerSelector#PROFILES_DEFAULT}). See + * {@code ConfigurableScanServerSelector} for information on how to configure the client. * * @since 2.1.0 */ diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java index 32252239965..bf9f53dd34c 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java @@ -615,8 +615,6 @@ private ScanServerData rebinToScanServers(Map>> // get a snapshot of this once,not each time the plugin request it var scanAttemptsSnapshot = scanAttempts.snapshot(); - Duration timeoutLeft = Duration.ofMillis(retryTimeout - startTime.elapsed(MILLISECONDS)); - ScanServerSelector.SelectorParameters params = new ScanServerSelector.SelectorParameters() { @Override public Collection getTablets() { @@ -636,6 +634,7 @@ public Map getHints() { @Override public Optional waitUntil(Supplier> condition, Duration maxWaitTime, String description) { + Duration timeoutLeft = Duration.ofMillis(retryTimeout - startTime.elapsed(MILLISECONDS)); return ThriftScanner.waitUntil(condition, maxWaitTime, description, timeoutLeft, context, tableId, log); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/Bulk.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/Bulk.java index de178565ae6..23f32c4f70e 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/Bulk.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/Bulk.java @@ -42,6 +42,7 @@ public static class Mapping { private Collection files; // Gson requires a default constructor when JDK Unsafe usage is disabled + @SuppressWarnings("unused") private Mapping() {} public Mapping(KeyExtent tablet, Files files) { @@ -71,6 +72,7 @@ public static class Tablet { private byte[] prevEndRow; // Gson requires a default constructor when JDK Unsafe usage is disabled + @SuppressWarnings("unused") private Tablet() {} public Tablet(Text endRow, Text prevEndRow) { @@ -111,6 +113,7 @@ public static class FileInfo { long estEntries; // Gson requires a default constructor when JDK Unsafe usage is disabled + @SuppressWarnings("unused") private FileInfo() {} public FileInfo(String fileName, long estFileSize, long estNumEntries) { diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 88975feb2c0..9157fec5ffe 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.classloader.ClassLoaderUtil; import org.apache.accumulo.core.data.constraints.NoDeleteConstraint; +import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iteratorsImpl.system.DeletingIterator; @@ -132,7 +133,7 @@ public enum Property { + " everywhere. Before using the ChangeSecret tool, make sure Accumulo is not" + " running and you are logged in as the user that controls Accumulo files in" + " HDFS. To use the ChangeSecret tool, run the command: `./bin/accumulo" - + " org.apache.accumulo.server.util.ChangeSecret`.", + + " admin changeSecret`.", "1.3.5"), INSTANCE_VOLUMES("instance.volumes", "", PropertyType.STRING, "A comma separated list of dfs uris to use. Files will be stored across" @@ -170,11 +171,6 @@ public enum Property { + "other reserved characters in a URI use standard URI hex encoding. For " + "example replace commas with %2C.", "1.6.0"), - INSTANCE_VOLUMES_UPGRADE_RELATIVE("instance.volumes.upgrade.relative", "", PropertyType.STRING, - "The volume dfs uri containing relative tablet file paths. Relative paths may exist in the metadata from " - + "versions prior to 1.6. This property is only required if a relative path is detected " - + "during the upgrade process and will only be used once.", - "2.1.0"), @Experimental // interface uses unstable internal types, use with caution INSTANCE_SECURITY_AUTHENTICATOR("instance.security.authenticator", "org.apache.accumulo.server.security.handler.ZKAuthenticator", PropertyType.CLASSNAME, @@ -504,11 +500,8 @@ public enum Property { "Time to wait for clients to continue scans before closing a session.", "1.3.5"), TSERV_DEFAULT_BLOCKSIZE("tserver.default.blocksize", "1M", PropertyType.BYTES, "Specifies a default blocksize for the tserver caches.", "1.3.5"), - TSERV_CACHE_MANAGER_IMPL("tserver.cache.manager.class", - "org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager", PropertyType.STRING, - "Specifies the class name of the block cache factory implementation." - + " Alternative implementation is" - + " org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager.", + TSERV_CACHE_MANAGER_IMPL("tserver.cache.manager.class", TinyLfuBlockCacheManager.class.getName(), + PropertyType.STRING, "Specifies the class name of the block cache factory implementation.", "2.0.0"), TSERV_DATACACHE_SIZE("tserver.cache.data.size", "10%", PropertyType.MEMORY, "Specifies the size of the cache for RFile data blocks.", "1.3.5"), diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java index a4132b7d755..76be394e3b2 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Deprecated(since = "3.1.0") public class LruBlockCacheManager extends BlockCacheManager { private static final Logger LOG = LoggerFactory.getLogger(LruBlockCacheManager.class); @@ -32,6 +33,7 @@ public class LruBlockCacheManager extends BlockCacheManager { protected BlockCache createCache(Configuration conf, CacheType type) { LruBlockCacheConfiguration cc = new LruBlockCacheConfiguration(conf, type); LOG.info("Creating {} cache with configuration {}", type, cc); + LOG.warn("This cache implementation is deprecated and will be remove in future releases."); return new LruBlockCache(cc); } diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockData.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockData.java index ece32eff00b..442a6192ce3 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockData.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockData.java @@ -242,6 +242,7 @@ private static class ServiceDescriptorGson { private String group; // default constructor required for Gson + @SuppressWarnings("unused") public ServiceDescriptorGson() {} public ServiceDescriptorGson(UUID uuid, ThriftService service, String address, String group) { diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java index fb4caa47c32..23902d7f511 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java @@ -81,6 +81,7 @@ private static class Extent { String per; // Gson requires a default constructor + @SuppressWarnings("unused") private Extent() {} Extent(KeyExtent extent) { diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index e4356c52474..4d5a3dea3f8 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -262,8 +262,9 @@ public static class LastLocationColumnFamily { */ public static class SuspendLocationColumn { public static final String STR_NAME = "suspend"; - public static final ColumnFQ SUSPEND_COLUMN = - new ColumnFQ(new Text(STR_NAME), new Text("loc")); + public static final Text NAME = new Text(STR_NAME); + public static final String SUSPEND_QUAL = "loc"; + public static final ColumnFQ SUSPEND_COLUMN = new ColumnFQ(NAME, new Text(SUSPEND_QUAL)); } /** diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java index 9850d1a0646..27b5e3bf00a 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java @@ -27,180 +27,210 @@ public enum Metric { "Indicates if the server is idle or not. The value will be 1 when idle and 0 when not idle.", MetricCategory.GENERAL_SERVER), LOW_MEMORY("accumulo.detected.low.memory", MetricType.GAUGE, - "Reports 1 when process memory usage is above threshold, 0 when memory is okay.", + "Reports 1 when process memory usage is above the threshold, reports 0 when memory is okay.", MetricCategory.GENERAL_SERVER), // Compactor Metrics - COMPACTOR_MAJC_STUCK("accumulo.compactor.majc.stuck", MetricType.LONG_TASK_TIMER, "", - MetricCategory.COMPACTOR), + COMPACTOR_MAJC_STUCK("accumulo.compactor.majc.stuck", MetricType.LONG_TASK_TIMER, + "Number and duration of stuck major compactions.", MetricCategory.COMPACTOR), COMPACTOR_ENTRIES_READ("accumulo.compactor.entries.read", MetricType.FUNCTION_COUNTER, - "Number of entries read by all threads performing compactions.", MetricCategory.COMPACTOR), + "Number of entries read by all compactions that have run on this compactor.", + MetricCategory.COMPACTOR), COMPACTOR_ENTRIES_WRITTEN("accumulo.compactor.entries.written", MetricType.FUNCTION_COUNTER, - "Number of entries written by all threads performing compactions.", MetricCategory.COMPACTOR), + "Number of entries written by all compactions that have run on this compactor.", + MetricCategory.COMPACTOR), // Fate Metrics FATE_TYPE_IN_PROGRESS("accumulo.fate.ops.in.progress.by.type", MetricType.GAUGE, - "Number of FATE operations in progress. The type is designated by the `op.type` tag.", + "Number of FATE operations in progress. The op type is designated by the `op.type` tag.", MetricCategory.FATE), - FATE_OPS("accumulo.fate.ops", MetricType.GAUGE, "Tracks all the current FATE ops in any state.", + FATE_OPS("accumulo.fate.ops", MetricType.GAUGE, + "Number of all the current FATE ops in any state.", MetricCategory.FATE), + FATE_OPS_ACTIVITY("accumulo.fate.ops.activity", MetricType.GAUGE, + "Count of the total number of times fate operations are added, updated, and removed.", MetricCategory.FATE), - FATE_OPS_ACTIVITY("accumulo.fate.ops.activity", MetricType.GAUGE, "", MetricCategory.FATE), - FATE_ERRORS("accumulo.fate.errors", MetricType.GAUGE, "", MetricCategory.FATE), + FATE_ERRORS("accumulo.fate.errors", MetricType.GAUGE, + "Count of errors that occurred when attempting to gather fate metrics.", MetricCategory.FATE), FATE_TX("accumulo.fate.tx", MetricType.GAUGE, "The state is now in a tag (e.g., state=new, state=in.progress, state=failed, etc.).", MetricCategory.FATE), // Garbage Collection Metrics - GC_STARTED("accumulo.gc.started", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_FINISHED("accumulo.gc.finished", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_CANDIDATES("accumulo.gc.candidates", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_IN_USE("accumulo.gc.in.use", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_DELETED("accumulo.gc.deleted", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_ERRORS("accumulo.gc.errors", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_WAL_STARTED("accumulo.gc.wal.started", MetricType.GAUGE, "", + GC_STARTED("accumulo.gc.started", MetricType.GAUGE, "Timestamp GC file collection cycle started.", + MetricCategory.GARBAGE_COLLECTION), + GC_FINISHED("accumulo.gc.finished", MetricType.GAUGE, "Timestamp GC file collect cycle finished.", + MetricCategory.GARBAGE_COLLECTION), + GC_CANDIDATES("accumulo.gc.candidates", MetricType.GAUGE, + "Number of files that are candidates for deletion.", MetricCategory.GARBAGE_COLLECTION), + GC_IN_USE("accumulo.gc.in.use", MetricType.GAUGE, "Number of candidate files still in use.", MetricCategory.GARBAGE_COLLECTION), - GC_WAL_FINISHED("accumulo.gc.wal.finished", MetricType.GAUGE, "", + GC_DELETED("accumulo.gc.deleted", MetricType.GAUGE, "Number of candidate files deleted.", MetricCategory.GARBAGE_COLLECTION), - GC_WAL_CANDIDATES("accumulo.gc.wal.candidates", MetricType.GAUGE, "", + GC_ERRORS("accumulo.gc.errors", MetricType.GAUGE, "Number of candidate deletion errors.", MetricCategory.GARBAGE_COLLECTION), - GC_WAL_IN_USE("accumulo.gc.wal.in.use", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_WAL_DELETED("accumulo.gc.wal.deleted", MetricType.GAUGE, "", + GC_WAL_STARTED("accumulo.gc.wal.started", MetricType.GAUGE, + "Timestamp GC WAL collection cycle started.", MetricCategory.GARBAGE_COLLECTION), + GC_WAL_FINISHED("accumulo.gc.wal.finished", MetricType.GAUGE, + "Timestamp GC WAL collect cycle finished.", MetricCategory.GARBAGE_COLLECTION), + GC_WAL_CANDIDATES("accumulo.gc.wal.candidates", MetricType.GAUGE, + "Number of files that are candidates for deletion.", MetricCategory.GARBAGE_COLLECTION), + GC_WAL_IN_USE("accumulo.gc.wal.in.use", MetricType.GAUGE, + "Number of wal file candidates that are still in use.", MetricCategory.GARBAGE_COLLECTION), + GC_WAL_DELETED("accumulo.gc.wal.deleted", MetricType.GAUGE, + "Number of candidate wal files deleted.", MetricCategory.GARBAGE_COLLECTION), + GC_WAL_ERRORS("accumulo.gc.wal.errors", MetricType.GAUGE, + "Number candidate wal file deletion errors.", MetricCategory.GARBAGE_COLLECTION), + GC_POST_OP_DURATION("accumulo.gc.post.op.duration", MetricType.GAUGE, + "GC metadata table post operation duration in milliseconds.", MetricCategory.GARBAGE_COLLECTION), - GC_WAL_ERRORS("accumulo.gc.wal.errors", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), - GC_POST_OP_DURATION("accumulo.gc.post.op.duration", MetricType.GAUGE, "", + GC_RUN_CYCLE("accumulo.gc.run.cycle", MetricType.GAUGE, + "Count of gc cycle runs. Value is reset on process start.", MetricCategory.GARBAGE_COLLECTION), - GC_RUN_CYCLE("accumulo.gc.run.cycle", MetricType.GAUGE, "", MetricCategory.GARBAGE_COLLECTION), // Tablet Server Metrics - TSERVER_ENTRIES("accumulo.tserver.entries", MetricType.GAUGE, "", MetricCategory.TABLET_SERVER), - TSERVER_MEM_ENTRIES("accumulo.tserver.entries.mem", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_MAJC_RUNNING("accumulo.tserver.majc.running", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_MAJC_STUCK("accumulo.tserver.majc.stuck", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_MAJC_QUEUED("accumulo.tserver.majc.queued", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_MINC_QUEUED("accumulo.tserver.minc.queued", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_MINC_RUNNING("accumulo.tserver.minc.running", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_MINC_TOTAL("accumulo.tserver.minc.total", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_TABLETS_ONLINE("accumulo.tserver.tablets.online", MetricType.GAUGE, "", + TSERVER_ENTRIES("accumulo.tserver.entries", MetricType.GAUGE, "Number of entries.", MetricCategory.TABLET_SERVER), + TSERVER_MEM_ENTRIES("accumulo.tserver.entries.mem", MetricType.GAUGE, + "Number of entries in memory.", MetricCategory.TABLET_SERVER), + TSERVER_MAJC_RUNNING("accumulo.tserver.majc.running", MetricType.GAUGE, + "Number of active major compactions.", MetricCategory.TABLET_SERVER), + TSERVER_MAJC_STUCK("accumulo.tserver.majc.stuck", MetricType.GAUGE, + "Number and duration of stuck major compactions.", MetricCategory.TABLET_SERVER), + TSERVER_MAJC_QUEUED("accumulo.tserver.majc.queued", MetricType.GAUGE, + "Number of queued major compactions.", MetricCategory.TABLET_SERVER), + TSERVER_MINC_QUEUED("accumulo.tserver.minc.queued", MetricType.GAUGE, + "Number of queued minor compactions.", MetricCategory.TABLET_SERVER), + TSERVER_MINC_RUNNING("accumulo.tserver.minc.running", MetricType.GAUGE, + "Number of active minor compactions.", MetricCategory.TABLET_SERVER), + TSERVER_MINC_TOTAL("accumulo.tserver.minc.total", MetricType.GAUGE, + "Total number of minor compactions performed.", MetricCategory.TABLET_SERVER), + TSERVER_TABLETS_ONLINE("accumulo.tserver.tablets.online", MetricType.GAUGE, + "Number of online tablets.", MetricCategory.TABLET_SERVER), TSERVER_TABLETS_LONG_ASSIGNMENTS("accumulo.tserver.tablets.assignments.warning", MetricType.GAUGE, - "", MetricCategory.TABLET_SERVER), - TSERVER_TABLETS_OPENING("accumulo.tserver.tablets.opening", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_TABLETS_UNOPENED("accumulo.tserver.tablets.unopened", MetricType.GAUGE, "", - MetricCategory.TABLET_SERVER), - TSERVER_TABLETS_FILES("accumulo.tserver.tablets.files", MetricType.GAUGE, "", + "Number of tablet assignments that are taking longer than the configured warning duration.", MetricCategory.TABLET_SERVER), + TSERVER_TABLETS_OPENING("accumulo.tserver.tablets.opening", MetricType.GAUGE, + "Number of opening tablets.", MetricCategory.TABLET_SERVER), + TSERVER_TABLETS_UNOPENED("accumulo.tserver.tablets.unopened", MetricType.GAUGE, + "Number of unopened tablets.", MetricCategory.TABLET_SERVER), + TSERVER_TABLETS_FILES("accumulo.tserver.tablets.files", MetricType.GAUGE, + "Number of files per tablet.", MetricCategory.TABLET_SERVER), TSERVER_INGEST_MUTATIONS("accumulo.tserver.ingest.mutations", MetricType.GAUGE, - "Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be derived.", + "Ingest mutation count. The rate can be derived from this metric.", MetricCategory.TABLET_SERVER), TSERVER_INGEST_BYTES("accumulo.tserver.ingest.bytes", MetricType.GAUGE, - "Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be derived.", - MetricCategory.TABLET_SERVER), - TSERVER_HOLD("accumulo.tserver.hold", MetricType.GAUGE, "", MetricCategory.TABLET_SERVER), + "Ingest byte count. The rate can be derived from this metric.", MetricCategory.TABLET_SERVER), + TSERVER_HOLD("accumulo.tserver.hold", MetricType.GAUGE, + "Duration for which commits have been held in milliseconds.", MetricCategory.TABLET_SERVER), // Scan Metrics SCAN_RESERVATION_TOTAL_TIMER("accumulo.scan.reservation.total.timer", MetricType.TIMER, "Time to reserve a tablet's files for scan.", MetricCategory.SCAN_SERVER), SCAN_RESERVATION_WRITEOUT_TIMER("accumulo.scan.reservation.writeout.timer", MetricType.TIMER, - "Time to write out a tablets file reservations for scan", MetricCategory.SCAN_SERVER), + "Time to write out a tablets file reservations for scan.", MetricCategory.SCAN_SERVER), SCAN_RESERVATION_CONFLICT_COUNTER("accumulo.scan.reservation.conflict.count", MetricType.COUNTER, - "", MetricCategory.SCAN_SERVER), + "Count of instances where file reservation attempts for scans encountered conflicts.", + MetricCategory.SCAN_SERVER), SCAN_BUSY_TIMEOUT_COUNT("accumulo.scan.busy.timeout.count", MetricType.COUNTER, "Count of the scans where a busy timeout happened.", MetricCategory.SCAN_SERVER), SCAN_TABLET_METADATA_CACHE("accumulo.scan.tablet.metadata.cache", MetricType.CACHE, "Scan server tablet cache metrics.", MetricCategory.SCAN_SERVER), - SCAN_TIMES("accumulo.scan.times", MetricType.TIMER, "", MetricCategory.SCAN_SERVER), - SCAN_OPEN_FILES("accumulo.scan.files.open", MetricType.GAUGE, "", MetricCategory.SCAN_SERVER), - SCAN_RESULTS("accumulo.scan.result", MetricType.GAUGE, "", MetricCategory.SCAN_SERVER), - SCAN_YIELDS("accumulo.scan.yields", MetricType.GAUGE, "", MetricCategory.SCAN_SERVER), - SCAN_START("accumulo.scan.start", MetricType.COUNTER, "", MetricCategory.SCAN_SERVER), - SCAN_CONTINUE("accumulo.scan.continue", MetricType.COUNTER, "", MetricCategory.SCAN_SERVER), - SCAN_CLOSE("accumulo.scan.close", MetricType.COUNTER, "", MetricCategory.SCAN_SERVER), - SCAN_QUERIES("accumulo.scan.queries", MetricType.GAUGE, "", MetricCategory.SCAN_SERVER), - SCAN_SCANNED_ENTRIES("accumulo.scan.query.scanned.entries", MetricType.GAUGE, - "Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be derived.", + SCAN_TIMES("accumulo.scan.times", MetricType.TIMER, "Scan session lifetime (creation to close).", MetricCategory.SCAN_SERVER), - SCAN_QUERY_SCAN_RESULTS("accumulo.scan.query.results", MetricType.GAUGE, - "Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be derived.", + SCAN_OPEN_FILES("accumulo.scan.files.open", MetricType.GAUGE, "Number of files open for scans.", MetricCategory.SCAN_SERVER), - SCAN_QUERY_SCAN_RESULTS_BYTES("accumulo.scan.query.results.bytes", MetricType.GAUGE, - "Prior to 2.1.0 this metric was reported as a rate, it is now the count and the rate can be derived.", + SCAN_RESULTS("accumulo.scan.result", MetricType.GAUGE, "Results per scan.", + MetricCategory.SCAN_SERVER), + SCAN_YIELDS("accumulo.scan.yields", MetricType.GAUGE, "Counts scans that have yielded.", MetricCategory.SCAN_SERVER), - SCAN_PAUSED_FOR_MEM("accumulo.scan.paused.for.memory", MetricType.COUNTER, "", + SCAN_START("accumulo.scan.start", MetricType.COUNTER, + "Number of calls to start a scan or multiscan.", MetricCategory.SCAN_SERVER), + SCAN_CONTINUE("accumulo.scan.continue", MetricType.COUNTER, + "Number of calls to continue a scan or multiscan.", MetricCategory.SCAN_SERVER), + SCAN_CLOSE("accumulo.scan.close", MetricType.COUNTER, + "Number of calls to close a scan or multiscan.", MetricCategory.SCAN_SERVER), + SCAN_QUERIES("accumulo.scan.queries", MetricType.GAUGE, "Number of queries made during scans.", MetricCategory.SCAN_SERVER), - SCAN_RETURN_FOR_MEM("accumulo.scan.return.early.for.memory", MetricType.COUNTER, "", + SCAN_SCANNED_ENTRIES("accumulo.scan.query.scanned.entries", MetricType.GAUGE, + "Count of scanned entries. The rate can be derived from this metric.", MetricCategory.SCAN_SERVER), - SCAN_ZOMBIE_THREADS("accumulo.scan.zombie.threads", MetricType.GAUGE, "", + SCAN_QUERY_SCAN_RESULTS("accumulo.scan.query.results", MetricType.GAUGE, + "Query count. The rate can be derived from this metric.", MetricCategory.SCAN_SERVER), + SCAN_QUERY_SCAN_RESULTS_BYTES("accumulo.scan.query.results.bytes", MetricType.GAUGE, + "Query byte count. The rate can be derived from this metric.", MetricCategory.SCAN_SERVER), + SCAN_PAUSED_FOR_MEM("accumulo.scan.paused.for.memory", MetricType.COUNTER, + "Count of scans paused due to server being low on memory.", MetricCategory.SCAN_SERVER), + SCAN_RETURN_FOR_MEM("accumulo.scan.return.early.for.memory", MetricType.COUNTER, + "Count of scans that returned results early due to server being low on memory.", MetricCategory.SCAN_SERVER), + SCAN_ZOMBIE_THREADS("accumulo.scan.zombie.threads", MetricType.GAUGE, + "Number of scan threads that have no associated client session.", MetricCategory.SCAN_SERVER), // Major Compaction Metrics MAJC_QUEUED("accumulo.tserver.compactions.majc.queued", MetricType.GAUGE, - "The compaction service information is in a tag: `id={i|e}_{compactionServiceName}_{executor_name}`.", + "Number of queued major compactions. The compaction service information is in a tag: `id={i|e}_{compactionServiceName}_{executor_name}`.", MetricCategory.TABLET_SERVER), MAJC_RUNNING("accumulo.tserver.compactions.majc.running", MetricType.GAUGE, - "The compaction service information is in a tag: `id={i|e}_{compactionServiceName}_{executor_name}`.", - MetricCategory.TABLET_SERVER), - MAJC_PAUSED("accumulo.tserver.compactions.majc.paused", MetricType.COUNTER, "", + "Number of running major compactions. The compaction service information is in a tag: `id={i|e}_{compactionServiceName}_{executor_name}`.", MetricCategory.TABLET_SERVER), + MAJC_PAUSED("accumulo.tserver.compactions.majc.paused", MetricType.COUNTER, + "Number of paused major compactions.", MetricCategory.TABLET_SERVER), // Minor Compaction Metrics - MINC_QUEUED("accumulo.tserver.compactions.minc.queued", MetricType.TIMER, "", - MetricCategory.TABLET_SERVER), - MINC_RUNNING("accumulo.tserver.compactions.minc.running", MetricType.TIMER, "", - MetricCategory.TABLET_SERVER), - MINC_PAUSED("accumulo.tserver.compactions.minc.paused", MetricType.COUNTER, "", - MetricCategory.TABLET_SERVER), + MINC_QUEUED("accumulo.tserver.compactions.minc.queued", MetricType.TIMER, + "Queued minor compactions time queued.", MetricCategory.TABLET_SERVER), + MINC_RUNNING("accumulo.tserver.compactions.minc.running", MetricType.TIMER, + "Minor compactions time active.", MetricCategory.TABLET_SERVER), + MINC_PAUSED("accumulo.tserver.compactions.minc.paused", MetricType.COUNTER, + "Number of paused minor compactions.", MetricCategory.TABLET_SERVER), // Updates (Ingest) Metrics UPDATE_ERRORS("accumulo.tserver.updates.error", MetricType.GAUGE, - "Type is stored in a tag (e.g., type=permission, type=unknown.tablet, type=constraint.violation).", - MetricCategory.TABLET_SERVER), - UPDATE_COMMIT("accumulo.tserver.updates.commit", MetricType.TIMER, "", - MetricCategory.TABLET_SERVER), - UPDATE_COMMIT_PREP("accumulo.tserver.updates.commit.prep", MetricType.TIMER, "", - MetricCategory.TABLET_SERVER), - UPDATE_WALOG_WRITE("accumulo.tserver.updates.walog.write", MetricType.TIMER, "", + "Count of errors during tablet updates. Type/reason for error is stored in the `type` tag (e.g., type=permission, type=unknown.tablet, type=constraint.violation).", MetricCategory.TABLET_SERVER), + UPDATE_COMMIT("accumulo.tserver.updates.commit", MetricType.TIMER, + "Time taken to commit a mutation.", MetricCategory.TABLET_SERVER), + UPDATE_COMMIT_PREP("accumulo.tserver.updates.commit.prep", MetricType.TIMER, + "Time taken to prepare to commit a single mutation.", MetricCategory.TABLET_SERVER), + UPDATE_WALOG_WRITE("accumulo.tserver.updates.walog.write", MetricType.TIMER, + "Time taken to write a batch of mutations to WAL.", MetricCategory.TABLET_SERVER), UPDATE_MUTATION_ARRAY_SIZE("accumulo.tserver.updates.mutation.arrays.size", - MetricType.DISTRIBUTION_SUMMARY, "", MetricCategory.TABLET_SERVER), + MetricType.DISTRIBUTION_SUMMARY, "Batch size of mutations from client.", + MetricCategory.TABLET_SERVER), // Thrift Metrics - THRIFT_IDLE("accumulo.thrift.idle", MetricType.DISTRIBUTION_SUMMARY, "", MetricCategory.THRIFT), - THRIFT_EXECUTE("accumulo.thrift.execute", MetricType.DISTRIBUTION_SUMMARY, "", - MetricCategory.THRIFT), + THRIFT_IDLE("accumulo.thrift.idle", MetricType.DISTRIBUTION_SUMMARY, + "Time waiting to execute an RPC request.", MetricCategory.THRIFT), + THRIFT_EXECUTE("accumulo.thrift.execute", MetricType.DISTRIBUTION_SUMMARY, + "Time to execute an RPC request.", MetricCategory.THRIFT), // Block Cache Metrics - BLOCKCACHE_INDEX_HITCOUNT("accumulo.blockcache.index.hitcount", MetricType.FUNCTION_COUNTER, "", - MetricCategory.BLOCK_CACHE), + BLOCKCACHE_INDEX_HITCOUNT("accumulo.blockcache.index.hitcount", MetricType.FUNCTION_COUNTER, + "Index block cache hit count.", MetricCategory.BLOCK_CACHE), BLOCKCACHE_INDEX_REQUESTCOUNT("accumulo.blockcache.index.requestcount", - MetricType.FUNCTION_COUNTER, "", MetricCategory.BLOCK_CACHE), + MetricType.FUNCTION_COUNTER, "Index block cache request count.", MetricCategory.BLOCK_CACHE), BLOCKCACHE_INDEX_EVICTIONCOUNT("accumulo.blockcache.index.evictioncount", - MetricType.FUNCTION_COUNTER, "", MetricCategory.BLOCK_CACHE), - BLOCKCACHE_DATA_HITCOUNT("accumulo.blockcache.data.hitcount", MetricType.FUNCTION_COUNTER, "", - MetricCategory.BLOCK_CACHE), + MetricType.FUNCTION_COUNTER, "Index block cache eviction count.", MetricCategory.BLOCK_CACHE), + BLOCKCACHE_DATA_HITCOUNT("accumulo.blockcache.data.hitcount", MetricType.FUNCTION_COUNTER, + "Data block cache hit count.", MetricCategory.BLOCK_CACHE), BLOCKCACHE_DATA_REQUESTCOUNT("accumulo.blockcache.data.requestcount", MetricType.FUNCTION_COUNTER, - "", MetricCategory.BLOCK_CACHE), + "Data block cache request count.", MetricCategory.BLOCK_CACHE), BLOCKCACHE_DATA_EVICTIONCOUNT("accumulo.blockcache.data.evictioncount", - MetricType.FUNCTION_COUNTER, "", MetricCategory.BLOCK_CACHE), + MetricType.FUNCTION_COUNTER, "Data block cache eviction count.", MetricCategory.BLOCK_CACHE), BLOCKCACHE_SUMMARY_HITCOUNT("accumulo.blockcache.summary.hitcount", MetricType.FUNCTION_COUNTER, - "", MetricCategory.BLOCK_CACHE), + "Summary block cache hit count.", MetricCategory.BLOCK_CACHE), BLOCKCACHE_SUMMARY_REQUESTCOUNT("accumulo.blockcache.summary.requestcount", - MetricType.FUNCTION_COUNTER, "", MetricCategory.BLOCK_CACHE), + MetricType.FUNCTION_COUNTER, "Summary block cache request count.", + MetricCategory.BLOCK_CACHE), BLOCKCACHE_SUMMARY_EVICTIONCOUNT("accumulo.blockcache.summary.evictioncount", - MetricType.FUNCTION_COUNTER, "", MetricCategory.BLOCK_CACHE), + MetricType.FUNCTION_COUNTER, "Summary block cache eviction count.", + MetricCategory.BLOCK_CACHE), // Manager Metrics MANAGER_BALANCER_MIGRATIONS_NEEDED("accumulo.manager.balancer.migrations.needed", MetricType.GAUGE, - "The number of migrations that need to complete before the system is balanced.", + "The total number of migrations that need to complete before the system is balanced.", MetricCategory.MANAGER); private final String name; diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java index 4027f4b0c98..c09f46ab00c 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java @@ -23,6 +23,8 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -54,6 +56,8 @@ public interface TServerClient { + static final String DEBUG_HOST = "org.apache.accumulo.client.rpc.debug.host"; + Pair getThriftServerConnection(ClientContext context, boolean preferCachedConnections) throws TTransportException; @@ -62,7 +66,9 @@ default Pair getThriftServerConnection(Logger LOG, ThriftClientTypes cachedTransport = context.getTransportPool().getAnyCachedTransport(type); if (cachedTransport != null) { @@ -79,28 +85,40 @@ default Pair getThriftServerConnection(Logger LOG, ThriftClientTypes serverPaths = new ArrayList<>(); - zc.getChildren(tserverZooPath).forEach(tserverAddress -> { - serverPaths.add(tserverZooPath + "/" + tserverAddress); - }); - if (type == ThriftClientTypes.CLIENT) { - zc.getChildren(sserverZooPath).forEach(sserverAddress -> { - serverPaths.add(sserverZooPath + "/" + sserverAddress); - }); + if (type == ThriftClientTypes.CLIENT && debugHost != null) { + // add all three paths to the set even though they may not be correct. + // The entire set will be checked in the code below to validate + // that the path is correct and the lock is held and will return the + // correct one. + serverPaths.add(tserverZooPath + "/" + debugHost); + serverPaths.add(sserverZooPath + "/" + debugHost); zc.getChildren(compactorZooPath).forEach(compactorGroup -> { - zc.getChildren(compactorZooPath + "/" + compactorGroup).forEach(compactorAddress -> { - serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + compactorAddress); - }); + serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + debugHost); }); - } - - if (serverPaths.isEmpty()) { - if (warned.compareAndSet(false, true)) { - LOG.warn( - "There are no servers serving the {} api: check that zookeeper and accumulo are running.", - type); + } else { + zc.getChildren(tserverZooPath).forEach(tserverAddress -> { + serverPaths.add(tserverZooPath + "/" + tserverAddress); + }); + if (type == ThriftClientTypes.CLIENT) { + zc.getChildren(sserverZooPath).forEach(sserverAddress -> { + serverPaths.add(sserverZooPath + "/" + sserverAddress); + }); + zc.getChildren(compactorZooPath).forEach(compactorGroup -> { + zc.getChildren(compactorZooPath + "/" + compactorGroup).forEach(compactorAddress -> { + serverPaths.add(compactorZooPath + "/" + compactorGroup + "/" + compactorAddress); + }); + }); + } + if (serverPaths.isEmpty()) { + if (warned.compareAndSet(false, true)) { + LOG.warn( + "There are no servers serving the {} api: check that zookeeper and accumulo are running.", + type); + } + throw new TTransportException("There are no servers for type: " + type); } - throw new TTransportException("There are no servers for type: " + type); } + Collections.shuffle(serverPaths, RANDOM.get()); for (String serverPath : serverPaths) { @@ -113,10 +131,19 @@ default Pair getThriftServerConnection(Logger LOG, ThriftClientTypes(tserverClientAddress.toString(), client); } catch (TTransportException e) { - LOG.trace("Error creating transport to {}", tserverClientAddress); + if (type == ThriftClientTypes.CLIENT && debugHost != null) { + LOG.error( + "Error creating transport to debug host: {}. If this server is down, then you will need to remove or change the system property {}.", + debugHost, DEBUG_HOST); + } else { + LOG.trace("Error creating transport to {}", tserverClientAddress); + } continue; } } @@ -127,7 +154,15 @@ default Pair getThriftServerConnection(Logger LOG, ThriftClientTypes R execute(Logger LOG, ClientContext context, Exec exec) diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java index 1cbfbd289fe..8338d2735a7 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java @@ -34,6 +34,7 @@ public class BlockCacheFactoryTest { @Test + @SuppressWarnings("deprecation") public void testCreateLruBlockCacheFactory() throws Exception { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java index bf814e0e4d3..e12dacfe71f 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java @@ -49,6 +49,7 @@ * Tests will ensure it grows and shrinks in size properly, evictions run when they're supposed to * and do what they should, and that cached blocks are accessible when expected to be. */ +@SuppressWarnings("deprecation") public class TestLruBlockCache { @Test diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java index a5fc04a854e..d65991590ad 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java @@ -44,8 +44,7 @@ import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory; -import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache; -import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager; +import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager; import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder; import org.apache.accumulo.core.file.rfile.RFile.FencedReader; @@ -56,6 +55,7 @@ import org.apache.accumulo.core.iteratorsImpl.system.ColumnFamilySkippingIterator; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.sample.impl.SamplerFactory; +import org.apache.accumulo.core.spi.cache.BlockCache; import org.apache.accumulo.core.spi.cache.BlockCacheManager; import org.apache.accumulo.core.spi.cache.CacheType; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; @@ -164,7 +164,7 @@ public void openReader(boolean cfsi, Range fence) throws IOException { DefaultConfiguration dc = DefaultConfiguration.getInstance(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.TSERV_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); try { manager = BlockCacheManagerFactory.getInstance(cc); } catch (ReflectiveOperationException e) { @@ -174,8 +174,8 @@ public void openReader(boolean cfsi, Range fence) throws IOException { cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(100000000)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(100000000)); manager.start(BlockCacheConfiguration.forTabletServer(cc)); - LruBlockCache indexCache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); - LruBlockCache dataCache = (LruBlockCache) manager.getBlockCache(CacheType.DATA); + BlockCache indexCache = manager.getBlockCache(CacheType.INDEX); + BlockCache dataCache = manager.getBlockCache(CacheType.DATA); CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, accumuloConfiguration.getAllCryptoProperties()); diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 36533596dcc..81c76d58ff6 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -64,7 +64,7 @@ import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory; -import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager; +import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager; import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder; import org.apache.accumulo.core.file.rfile.RFile.Reader; @@ -1584,7 +1584,7 @@ private void runVersionTest(int version, ConfigurationCopy aconf) throws Excepti byte[] data = baos.toByteArray(); SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data); FSDataInputStream in2 = new FSDataInputStream(bais); - aconf.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + aconf.set(Property.TSERV_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); aconf.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(100000)); aconf.set(Property.TSERV_DATACACHE_SIZE, Long.toString(100000000)); aconf.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(100000000)); diff --git a/pom.xml b/pom.xml index 148a1b3d344..0406128385d 100644 --- a/pom.xml +++ b/pom.xml @@ -151,6 +151,7 @@ 5.5.0 2.24.1 3.4.0 + 2.24.0 1.34.1 2.0.9 2.0.12 @@ -198,7 +199,7 @@ org.apache.logging.log4j log4j-bom - 2.23.1 + ${version.log4j} pom import @@ -749,6 +750,11 @@ auto-service ${version.auto-service} + + org.apache.logging.log4j + log4j-core + ${version.log4j} + diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java index 4c0f69f8663..da3b9fb4372 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java @@ -44,9 +44,9 @@ public void incrementMajCPause() { @Override public void registerMetrics(MeterRegistry registry) { FunctionCounter.builder(MAJC_PAUSED.getName(), majcPauseCount, AtomicLong::get) - .description("major compaction pause count").register(registry); + .description(MAJC_PAUSED.getDescription()).register(registry); FunctionCounter.builder(MINC_PAUSED.getName(), mincPauseCount, AtomicLong::get) - .description("minor compactor pause count").register(registry); + .description(MINC_PAUSED.getDescription()).register(registry); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index 0e18fa0a018..8395a8b407a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@ -377,8 +377,7 @@ private void validateServerFamily(ArrayList violations, ColumnUpdate colu private void validateSuspendLocationFamily(ArrayList violations, ColumnUpdate columnUpdate) { String qualStr = new String(columnUpdate.getColumnQualifier(), UTF_8); - String suspendColQualStr = - new String(SuspendLocationColumn.SUSPEND_COLUMN.getColumnQualifier().getBytes(), UTF_8); + String suspendColQualStr = SuspendLocationColumn.SUSPEND_QUAL; if (qualStr.equals(suspendColQualStr)) { try { diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java index e9babfe59c0..f849599233c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java @@ -54,7 +54,6 @@ import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.server.security.SecurityUtil; -import org.apache.accumulo.server.util.ChangeSecret; import org.apache.accumulo.server.util.SystemPropUtil; import org.apache.accumulo.start.spi.KeywordExecutable; import org.apache.commons.lang3.StringUtils; @@ -107,7 +106,7 @@ static void checkInit(ZooReaderWriter zoo, VolumeManager fs, InitialConfiguratio System.out.println(); System.out.println(); System.out.println("You can change the instance secret in accumulo by using:"); - System.out.println(" bin/accumulo " + ChangeSecret.class.getName()); + System.out.println(" bin/accumulo admin changeSecret"); System.out.println("You will also need to edit your secret in your configuration" + " file by adding the property instance.secret to your" + " accumulo.properties. Without this accumulo will not operate correctly"); diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java index 7d3ec1b836a..4b9fcc7932d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java @@ -53,6 +53,7 @@ private static class Data { private SortedMap> candidates; // Gson requires a default constructor when JDK Unsafe usage is disabled + @SuppressWarnings("unused") private Data() {} public Data(int version, SortedMap> candidates) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java index 83def192286..e824da163b2 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java @@ -21,12 +21,12 @@ import static org.apache.accumulo.core.metrics.Metric.LOW_MEMORY; import static org.apache.accumulo.core.metrics.Metric.SERVER_IDLE; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.server.ServerContext; +import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; public class ProcessMetrics implements MetricsProducer { @@ -41,8 +41,10 @@ public ProcessMetrics(final ServerContext context) { @Override public void registerMetrics(MeterRegistry registry) { - registry.gauge(LOW_MEMORY.getName(), List.of(), this, this::lowMemDetected); - registry.gauge(SERVER_IDLE.getName(), isIdle, AtomicInteger::get); + Gauge.builder(LOW_MEMORY.getName(), this, this::lowMemDetected) + .description(LOW_MEMORY.getDescription()).register(registry); + Gauge.builder(SERVER_IDLE.getName(), isIdle, AtomicInteger::get) + .description(SERVER_IDLE.getDescription()).register(registry); } private int lowMemDetected(ProcessMetrics processMetrics) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java index 9fdf98a61c2..b910f57670f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java @@ -43,9 +43,10 @@ public void addExecute(long time) { @Override public void registerMetrics(MeterRegistry registry) { - idle = DistributionSummary.builder(THRIFT_IDLE.getName()).baseUnit("ms").register(registry); - execute = - DistributionSummary.builder(THRIFT_EXECUTE.getName()).baseUnit("ms").register(registry); + idle = DistributionSummary.builder(THRIFT_IDLE.getName()).baseUnit("ms") + .description(THRIFT_IDLE.getDescription()).register(registry); + execute = DistributionSummary.builder(THRIFT_EXECUTE.getName()).baseUnit("ms") + .description(THRIFT_EXECUTE.getDescription()).register(registry); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java index c3e8990b44f..174c70e907e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java @@ -56,6 +56,7 @@ public class FateSummaryReport { private transient Map idsToNameMap; // Gson requires a default constructor when JDK Unsafe usage is disabled + @SuppressWarnings("unused") private FateSummaryReport() {} public FateSummaryReport(Map idsToNameMap, diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java index 6a1d14bc1df..78904857f11 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java @@ -39,6 +39,7 @@ public class FateTxnDetails implements Comparable { private List locksWaiting = List.of(); // Default constructor for Gson + @SuppressWarnings("unused") private FateTxnDetails() {} /** diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/ServiceStatusReport.java b/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/ServiceStatusReport.java index 629242eecf2..b323dbaf77f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/ServiceStatusReport.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/ServiceStatusReport.java @@ -53,6 +53,7 @@ public class ServiceStatusReport { private Map summaries; // Gson requires a default constructor when JDK Unsafe usage is disabled + @SuppressWarnings("unused") private ServiceStatusReport() {} public ServiceStatusReport(final Map summaries, final boolean noHosts) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/StatusSummary.java b/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/StatusSummary.java index ec5a044f1af..8c28d012a2b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/StatusSummary.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/serviceStatus/StatusSummary.java @@ -31,6 +31,7 @@ public class StatusSummary { private int errorCount; // Default constructor required for Gson + @SuppressWarnings("unused") private StatusSummary() {} public StatusSummary(ServiceStatusReport.ReportKey serviceType, final Set resourceGroups, diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 4658bd8a5a7..1d49cf4f8ff 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -183,14 +183,12 @@ private long getTotalEntriesWritten() { public void registerMetrics(MeterRegistry registry) { super.registerMetrics(registry); FunctionCounter.builder(COMPACTOR_ENTRIES_READ.getName(), this, Compactor::getTotalEntriesRead) - .description("Number of entries read by all compactions that have run on this compactor") - .register(registry); + .description(COMPACTOR_ENTRIES_READ.getDescription()).register(registry); FunctionCounter .builder(COMPACTOR_ENTRIES_WRITTEN.getName(), this, Compactor::getTotalEntriesWritten) - .description("Number of entries written by all compactions that have run on this compactor") - .register(registry); + .description(COMPACTOR_ENTRIES_WRITTEN.getDescription()).register(registry); LongTaskTimer timer = LongTaskTimer.builder(COMPACTOR_MAJC_STUCK.getName()) - .description("Number and duration of stuck major compactions").register(registry); + .description(COMPACTOR_MAJC_STUCK.getDescription()).register(registry); CompactionWatcher.setTimer(timer); } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java index 9a88f853164..699ac825670 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java @@ -53,41 +53,39 @@ public GcMetrics(SimpleGarbageCollector gc) { @Override public void registerMetrics(MeterRegistry registry) { Gauge.builder(GC_STARTED.getName(), metricValues, v -> v.getLastCollect().getStarted()) - .description("Timestamp GC file collection cycle started").register(registry); + .description(GC_STARTED.getDescription()).register(registry); Gauge.builder(GC_FINISHED.getName(), metricValues, v -> v.getLastCollect().getFinished()) - .description("Timestamp GC file collect cycle finished").register(registry); + .description(GC_FINISHED.getDescription()).register(registry); Gauge.builder(GC_CANDIDATES.getName(), metricValues, v -> v.getLastCollect().getCandidates()) - .description("Number of files that are candidates for deletion").register(registry); + .description(GC_CANDIDATES.getDescription()).register(registry); Gauge.builder(GC_IN_USE.getName(), metricValues, v -> v.getLastCollect().getInUse()) - .description("Number of candidate files still in use").register(registry); + .description(GC_IN_USE.getDescription()).register(registry); Gauge.builder(GC_DELETED.getName(), metricValues, v -> v.getLastCollect().getDeleted()) - .description("Number of candidate files deleted").register(registry); + .description(GC_DELETED.getDescription()).register(registry); Gauge.builder(GC_ERRORS.getName(), metricValues, v -> v.getLastCollect().getErrors()) - .description("Number of candidate deletion errors").register(registry); + .description(GC_ERRORS.getDescription()).register(registry); // WAL metrics Gauges Gauge.builder(GC_WAL_STARTED.getName(), metricValues, v -> v.getLastWalCollect().getStarted()) - .description("Timestamp GC WAL collection cycle started").register(registry); + .description(GC_WAL_STARTED.getDescription()).register(registry); Gauge.builder(GC_WAL_FINISHED.getName(), metricValues, v -> v.getLastWalCollect().getFinished()) - .description("Timestamp GC WAL collect cycle finished").register(registry); + .description(GC_WAL_FINISHED.getDescription()).register(registry); Gauge .builder(GC_WAL_CANDIDATES.getName(), metricValues, v -> v.getLastWalCollect().getCandidates()) - .description("Number of files that are candidates for deletion").register(registry); + .description(GC_WAL_CANDIDATES.getDescription()).register(registry); Gauge.builder(GC_WAL_IN_USE.getName(), metricValues, v -> v.getLastWalCollect().getInUse()) - .description("Number of wal file candidates that are still in use").register(registry); + .description(GC_WAL_IN_USE.getDescription()).register(registry); Gauge.builder(GC_WAL_DELETED.getName(), metricValues, v -> v.getLastWalCollect().getDeleted()) - .description("Number of candidate wal files deleted").register(registry); + .description(GC_WAL_DELETED.getDescription()).register(registry); Gauge.builder(GC_WAL_ERRORS.getName(), metricValues, v -> v.getLastWalCollect().getErrors()) - .description("Number candidate wal file deletion errors").register(registry); + .description(GC_WAL_ERRORS.getDescription()).register(registry); Gauge .builder(GC_POST_OP_DURATION.getName(), metricValues, v -> TimeUnit.NANOSECONDS.toMillis(v.getPostOpDurationNanos())) - .description("GC metadata table post operation duration in milliseconds") - .register(registry); + .description(GC_POST_OP_DURATION.getDescription()).register(registry); Gauge.builder(GC_RUN_CYCLE.getName(), metricValues, GcCycleMetrics::getRunCycleCount) - .description("gauge incremented each gc cycle run, rest on process start") - .register(registry); + .description(GC_RUN_CYCLE.getDescription()).register(registry); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 2c1539e5530..16a215bd8b6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1035,6 +1035,7 @@ private long balanceTablets() { long totalMigrationsOut = 0; final Map> partitionedMigrations = partitionMigrations(migrationsSnapshot()); + int levelsCompleted = 0; for (DataLevel dl : DataLevel.values()) { if (dl == DataLevel.USER && tabletsNotHosted > 0) { @@ -1072,14 +1073,17 @@ private long balanceTablets() { } } while (migrationsOutForLevel > 0 && (dl == DataLevel.ROOT || dl == DataLevel.METADATA)); totalMigrationsOut += migrationsOutForLevel; + + // increment this at end of loop to signal complete run w/o any continue + levelsCompleted++; } balancerMetrics.assignMigratingCount(migrations::size); - if (totalMigrationsOut == 0) { + if (totalMigrationsOut == 0 && levelsCompleted == DataLevel.values().length) { synchronized (balancedNotifier) { balancedNotifier.notifyAll(); } - } else { + } else if (totalMigrationsOut > 0) { nextEvent.event("Migrating %d more tablets, %d total", totalMigrationsOut, migrations.size()); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/BalancerMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/BalancerMetrics.java index 820f49b139e..9bdb24b94d1 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/BalancerMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/BalancerMetrics.java @@ -48,6 +48,6 @@ public void registerMetrics(MeterRegistry registry) { Gauge .builder(MANAGER_BALANCER_MIGRATIONS_NEEDED.getName(), this, BalancerMetrics::getMigratingCount) - .description("Overall total migrations that need to complete").register(registry); + .description(MANAGER_BALANCER_MIGRATIONS_NEEDED.getDescription()).register(registry); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java index 57a561aa7b4..702eaa7e565 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java @@ -19,12 +19,14 @@ package org.apache.accumulo.manager.metrics.fate; import java.util.Collections; +import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.accumulo.core.fate.AdminUtil; import org.apache.accumulo.core.fate.ReadOnlyTStore; +import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; import org.apache.accumulo.server.ServerContext; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -44,12 +46,12 @@ class FateMetricValues { private final long zkFateChildOpsTotal; private final long zkConnectionErrors; - private final Map txStateCounters; + private final EnumMap txStateCounters; private final Map opTypeCounters; private FateMetricValues(final long updateTime, final long currentFateOps, final long zkFateChildOpsTotal, final long zkConnectionErrors, - final Map txStateCounters, final Map opTypeCounters) { + final EnumMap txStateCounters, final Map opTypeCounters) { this.updateTime = updateTime; this.currentFateOps = currentFateOps; this.zkFateChildOpsTotal = zkFateChildOpsTotal; @@ -75,7 +77,7 @@ long getZkConnectionErrors() { * * @return a map of transaction status counters. */ - Map getTxStateCounters() { + EnumMap getTxStateCounters() { return txStateCounters; } @@ -115,9 +117,9 @@ public static FateMetricValues getFromZooKeeper(final ServerContext context, builder.withCurrentFateOps(currFates.size()); // states are enumerated - create new map with counts initialized to 0. - Map states = new TreeMap<>(); - for (ReadOnlyTStore.TStatus t : ReadOnlyTStore.TStatus.values()) { - states.put(t.name(), 0L); + EnumMap states = new EnumMap<>(TStatus.class); + for (TStatus t : TStatus.values()) { + states.put(t, 0L); } // op types are dynamic, no count initialization needed - clearing prev values will @@ -126,7 +128,7 @@ public static FateMetricValues getFromZooKeeper(final ServerContext context, for (AdminUtil.TransactionStatus tx : currFates) { - String stateName = tx.getStatus().name(); + TStatus stateName = tx.getStatus(); // incr count for state states.merge(stateName, 1L, Long::sum); @@ -182,15 +184,15 @@ static class Builder { private long zkFateChildOpsTotal = 0; private long zkConnectionErrors = 0; - private final Map txStateCounters; + private final EnumMap txStateCounters; private Map opTypeCounters; Builder() { // states are enumerated - create new map with counts initialized to 0. - txStateCounters = new TreeMap<>(); - for (ReadOnlyTStore.TStatus t : ReadOnlyTStore.TStatus.values()) { - txStateCounters.put(t.name(), 0L); + txStateCounters = new EnumMap<>(TStatus.class); + for (TStatus t : TStatus.values()) { + txStateCounters.put(t, 0L); } opTypeCounters = Collections.emptyMap(); @@ -216,7 +218,7 @@ Builder withZkConnectionErrors(final long value) { return this; } - Builder withTxStateCounters(final Map txStateCounters) { + Builder withTxStateCounters(final EnumMap txStateCounters) { this.txStateCounters.putAll(txStateCounters); return this; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java index ebbbec43166..18e376e48ce 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java @@ -24,7 +24,7 @@ import static org.apache.accumulo.core.metrics.Metric.FATE_TX; import static org.apache.accumulo.core.metrics.Metric.FATE_TYPE_IN_PROGRESS; -import java.util.List; +import java.util.EnumMap; import java.util.Map.Entry; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -33,6 +33,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.ReadOnlyTStore; +import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -41,9 +42,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; public class FateMetrics implements MetricsProducer { @@ -60,16 +61,10 @@ public class FateMetrics implements MetricsProducer { private final String fateRootPath; private final long refreshDelay; - private AtomicLong totalCurrentOpsGauge; - private AtomicLong totalOpsGauge; - private AtomicLong fateErrorsGauge; - private AtomicLong newTxGauge; - private AtomicLong submittedTxGauge; - private AtomicLong inProgressTxGauge; - private AtomicLong failedInProgressTxGauge; - private AtomicLong failedTxGauge; - private AtomicLong successfulTxGauge; - private AtomicLong unknownTxGauge; + private final AtomicLong totalCurrentOpsCount = new AtomicLong(0); + private final AtomicLong totalOpsCount = new AtomicLong(0); + private final AtomicLong fateErrorsCount = new AtomicLong(0); + private final EnumMap txStatusCounters = new EnumMap<>(TStatus.class); public FateMetrics(final ServerContext context, final long minimumRefreshDelay) { @@ -88,6 +83,10 @@ public FateMetrics(final ServerContext context, final long minimumRefreshDelay) "FATE Metrics - Interrupt received while initializing zoo store"); } + for (TStatus status : TStatus.values()) { + txStatusCounters.put(status, new AtomicLong(0)); + } + } private void update() { @@ -95,70 +94,35 @@ private void update() { FateMetricValues metricValues = FateMetricValues.getFromZooKeeper(context, fateRootPath, zooStore); - totalCurrentOpsGauge.set(metricValues.getCurrentFateOps()); - totalOpsGauge.set(metricValues.getZkFateChildOpsTotal()); - fateErrorsGauge.set(metricValues.getZkConnectionErrors()); - - for (Entry vals : metricValues.getTxStateCounters().entrySet()) { - switch (ReadOnlyTStore.TStatus.valueOf(vals.getKey())) { - case NEW: - newTxGauge.set(vals.getValue()); - break; - case SUBMITTED: - submittedTxGauge.set(vals.getValue()); - break; - case IN_PROGRESS: - inProgressTxGauge.set(vals.getValue()); - break; - case FAILED_IN_PROGRESS: - failedInProgressTxGauge.set(vals.getValue()); - break; - case FAILED: - failedTxGauge.set(vals.getValue()); - break; - case SUCCESSFUL: - successfulTxGauge.set(vals.getValue()); - break; - case UNKNOWN: - unknownTxGauge.set(vals.getValue()); - break; - default: - log.warn("Unhandled status type: {}", vals.getKey()); + totalCurrentOpsCount.set(metricValues.getCurrentFateOps()); + totalOpsCount.set(metricValues.getZkFateChildOpsTotal()); + fateErrorsCount.set(metricValues.getZkConnectionErrors()); + + for (Entry entry : metricValues.getTxStateCounters().entrySet()) { + AtomicLong counter = txStatusCounters.get(entry.getKey()); + if (counter != null) { + counter.set(entry.getValue()); + } else { + log.warn("Unhandled TStatus: {}", entry.getKey()); } } - metricValues.getOpTypeCounters().forEach((name, count) -> { - Metrics.gauge(FATE_TYPE_IN_PROGRESS.getName(), Tags.of(OP_TYPE_TAG, name), count); - }); + metricValues.getOpTypeCounters().forEach((name, count) -> Metrics + .gauge(FATE_TYPE_IN_PROGRESS.getName(), Tags.of(OP_TYPE_TAG, name), count)); } @Override public void registerMetrics(final MeterRegistry registry) { - totalCurrentOpsGauge = registry.gauge(FATE_OPS.getName(), new AtomicLong(0)); - totalOpsGauge = registry.gauge(FATE_OPS_ACTIVITY.getName(), new AtomicLong(0)); - fateErrorsGauge = registry.gauge(FATE_ERRORS.getName(), - List.of(Tag.of("type", "zk.connection")), new AtomicLong(0)); - newTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.NEW.name().toLowerCase())), - new AtomicLong(0)); - submittedTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.SUBMITTED.name().toLowerCase())), - new AtomicLong(0)); - inProgressTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.IN_PROGRESS.name().toLowerCase())), - new AtomicLong(0)); - failedInProgressTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS.name().toLowerCase())), - new AtomicLong(0)); - failedTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.FAILED.name().toLowerCase())), - new AtomicLong(0)); - successfulTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.SUCCESSFUL.name().toLowerCase())), - new AtomicLong(0)); - unknownTxGauge = registry.gauge(FATE_TX.getName(), - List.of(Tag.of("state", ReadOnlyTStore.TStatus.UNKNOWN.name().toLowerCase())), - new AtomicLong(0)); + Gauge.builder(FATE_OPS.getName(), totalCurrentOpsCount, AtomicLong::get) + .description(FATE_OPS.getDescription()).register(registry); + Gauge.builder(FATE_OPS_ACTIVITY.getName(), totalOpsCount, AtomicLong::get) + .description(FATE_OPS_ACTIVITY.getDescription()).register(registry); + Gauge.builder(FATE_ERRORS.getName(), fateErrorsCount, AtomicLong::get) + .description(FATE_ERRORS.getDescription()).tags("type", "zk.connection").register(registry); + + txStatusCounters.forEach((status, counter) -> Gauge + .builder(FATE_TX.getName(), counter, AtomicLong::get).description(FATE_TX.getDescription()) + .tags("state", status.name().toLowerCase()).register(registry)); update(); diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java index dce39179103..7619e49bd2c 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java @@ -27,8 +27,15 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandlers; import java.util.Optional; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import org.apache.accumulo.core.Constants; @@ -45,6 +52,7 @@ import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.config.Property; import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute; import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -59,33 +67,85 @@ public class AccumuloMonitorAppender extends AbstractAppender { @PluginBuilderFactory - public static > B newBuilder() { - return new Builder().asBuilder(); + public static Builder newBuilder() { + return new Builder(); } - public static class Builder> extends AbstractAppender.Builder + public static class Builder extends AbstractAppender.Builder implements org.apache.logging.log4j.core.util.Builder { + @PluginBuilderAttribute + private boolean async = true; + + @PluginBuilderAttribute + private int queueSize = 1024; + + @PluginBuilderAttribute + private int maxThreads = 2; + + public Builder setAsync(boolean async) { + this.async = async; + return this; + } + + public boolean getAsync() { + return async; + } + + public Builder setQueueSize(int size) { + queueSize = size; + return this; + } + + public int getQueueSize() { + return queueSize; + } + + public Builder setMaxThreads(int maxThreads) { + this.maxThreads = maxThreads; + return this; + } + + public int getMaxThreads() { + return maxThreads; + } + @Override public AccumuloMonitorAppender build() { return new AccumuloMonitorAppender(getName(), getFilter(), isIgnoreExceptions(), - getPropertyArray()); + getPropertyArray(), getQueueSize(), getMaxThreads(), getAsync()); } } - private final HttpClient httpClient = HttpClient.newHttpClient(); + private final HttpClient httpClient; private final Supplier> monitorLocator; + private final ThreadPoolExecutor executor; + private final boolean async; + private final int queueSize; + private final AtomicLong appends = new AtomicLong(0); + private final AtomicLong discards = new AtomicLong(0); + private final AtomicLong errors = new AtomicLong(0); + private final ConcurrentMap statusCodes = new ConcurrentSkipListMap<>(); private ServerContext context; private String path; private Pair> lastResult = new Pair<>(0L, Optional.empty()); private AccumuloMonitorAppender(final String name, final Filter filter, - final boolean ignoreExceptions, final Property[] properties) { + final boolean ignoreExceptions, final Property[] properties, int queueSize, int maxThreads, + boolean async) { super(name, filter, null, ignoreExceptions, properties); - final ZcStat stat = new ZcStat(); - monitorLocator = () -> { + + this.executor = async ? new ThreadPoolExecutor(0, maxThreads, 30, TimeUnit.SECONDS, + new LinkedBlockingQueue()) : null; + final var builder = HttpClient.newBuilder(); + this.httpClient = (async ? builder.executor(executor) : builder).build(); + this.queueSize = queueSize; + this.async = async; + + final var stat = new ZcStat(); + this.monitorLocator = () -> { // lazily set up context/path if (context == null) { context = new ServerContext(SiteConfiguration.auto()); @@ -104,9 +164,24 @@ private AccumuloMonitorAppender(final String name, final Filter filter, }; } + private String getStats() { + return "discards:" + discards.get() + " errors:" + errors.get() + " appends:" + appends.get() + + " statusCodes:" + statusCodes; + } + + private void processResponse(HttpResponse response) { + var statusCode = response.statusCode(); + statusCodes.computeIfAbsent(statusCode, sc -> new AtomicLong()).getAndIncrement(); + if (statusCode >= 400 && statusCode < 600) { + error("Unable to send HTTP in appender [" + getName() + "]. Status: " + statusCode + " " + + getStats()); + } + } + @Override public void append(final LogEvent event) { - monitorLocator.get().ifPresent(uri -> { + appends.getAndIncrement(); + monitorLocator.get().ifPresentOrElse(uri -> { try { var pojo = new SingleLogEvent(); pojo.timestamp = event.getTimeMillis(); @@ -120,14 +195,45 @@ public void append(final LogEvent event) { var req = HttpRequest.newBuilder(uri).POST(BodyPublishers.ofString(jsonEvent, UTF_8)) .setHeader("Content-Type", "application/json").build(); - @SuppressWarnings("unused") - var future = httpClient.sendAsync(req, BodyHandlers.discarding()); + + if (async) { + if (executor.getQueue().size() < queueSize) { + httpClient.sendAsync(req, BodyHandlers.discarding()).thenAccept(this::processResponse) + .exceptionally(e -> { + errors.getAndIncrement(); + error("Unable to send HTTP in appender [" + getName() + "] " + getStats(), event, + e); + return null; + }); + } else { + discards.getAndIncrement(); + error("Unable to send HTTP in appender [" + getName() + "]. Queue full. " + getStats()); + } + } else { + processResponse(httpClient.send(req, BodyHandlers.discarding())); + } } catch (final Exception e) { - error("Unable to send HTTP in appender [" + getName() + "]", event, e); + errors.getAndIncrement(); + error("Unable to send HTTP in appender [" + getName() + "] " + getStats(), event, e); } + }, () -> { + discards.getAndIncrement(); + error("Unable to send HTTP in appender [" + getName() + "]. No monitor is running. " + + getStats()); }); } + @Override + protected boolean stop(long timeout, TimeUnit timeUnit, boolean changeLifeCycleState) { + if (changeLifeCycleState) { + setStopping(); + } + if (executor != null) { + executor.shutdown(); + } + return super.stop(timeout, timeUnit, changeLifeCycleState); + } + @SuppressFBWarnings(value = "INFORMATION_EXPOSURE_THROUGH_AN_ERROR_MESSAGE", justification = "throwable is intended to be printed to output stream, to send to monitor") private static String throwableToStacktrace(Throwable t) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/BlockCacheMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/BlockCacheMetrics.java index b6325654d85..987abc12e7c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/BlockCacheMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/BlockCacheMetrics.java @@ -55,26 +55,26 @@ public void registerMetrics(MeterRegistry registry) { ToDoubleFunction getEvictionCount = cache -> cache.getStats().evictionCount(); FunctionCounter.builder(BLOCKCACHE_INDEX_HITCOUNT.getName(), indexCache, getHitCount) - .description("Index block cache hit count").register(registry); + .description(BLOCKCACHE_INDEX_HITCOUNT.getDescription()).register(registry); FunctionCounter.builder(BLOCKCACHE_INDEX_REQUESTCOUNT.getName(), indexCache, getRequestCount) - .description("Index block cache request count").register(registry); + .description(BLOCKCACHE_INDEX_REQUESTCOUNT.getDescription()).register(registry); FunctionCounter.builder(BLOCKCACHE_INDEX_EVICTIONCOUNT.getName(), indexCache, getEvictionCount) - .description("Index block cache eviction count").register(registry); + .description(BLOCKCACHE_INDEX_EVICTIONCOUNT.getDescription()).register(registry); FunctionCounter.builder(BLOCKCACHE_DATA_HITCOUNT.getName(), dataCache, getHitCount) - .description("Data block cache hit count").register(registry); + .description(BLOCKCACHE_DATA_HITCOUNT.getDescription()).register(registry); FunctionCounter.builder(BLOCKCACHE_DATA_REQUESTCOUNT.getName(), dataCache, getRequestCount) - .description("Data block cache request count").register(registry); + .description(BLOCKCACHE_DATA_REQUESTCOUNT.getDescription()).register(registry); FunctionCounter.builder(BLOCKCACHE_DATA_EVICTIONCOUNT.getName(), dataCache, getEvictionCount) - .description("Data block cache eviction count").register(registry); + .description(BLOCKCACHE_DATA_EVICTIONCOUNT.getDescription()).register(registry); FunctionCounter.builder(BLOCKCACHE_SUMMARY_HITCOUNT.getName(), summaryCache, getHitCount) - .description("Summary block cache hit count").register(registry); + .description(BLOCKCACHE_SUMMARY_HITCOUNT.getDescription()).register(registry); FunctionCounter .builder(BLOCKCACHE_SUMMARY_REQUESTCOUNT.getName(), summaryCache, getRequestCount) - .description("Summary block cache request count").register(registry); + .description(BLOCKCACHE_SUMMARY_REQUESTCOUNT.getDescription()).register(registry); FunctionCounter .builder(BLOCKCACHE_SUMMARY_EVICTIONCOUNT.getName(), summaryCache, getEvictionCount) - .description("Summary block cache eviction count").register(registry); + .description(BLOCKCACHE_SUMMARY_EVICTIONCOUNT.getDescription()).register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 964ddedb4ba..4e105b2320c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -754,7 +754,7 @@ ScanReservation reserveFilesInstrumented(long scanId) throws NoSuchScanIDExcepti } protected ScanReservation reserveFiles(long scanId) throws NoSuchScanIDException { - var session = (ScanSession) sessionManager.getSession(scanId); + var session = (ScanSession) sessionManager.getSession(scanId); if (session == null) { throw new NoSuchScanIDException(); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java index 12d85f1d22d..1f558021e0a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java @@ -56,17 +56,15 @@ public ScanServerMetrics(final LoadingCache tabletMeta @Override public void registerMetrics(MeterRegistry registry) { totalReservationTimer = Timer.builder(SCAN_RESERVATION_TOTAL_TIMER.getName()) - .description("Time to reserve a tablets files for scan").register(registry); + .description(SCAN_RESERVATION_TOTAL_TIMER.getDescription()).register(registry); writeOutReservationTimer = Timer.builder(SCAN_RESERVATION_WRITEOUT_TIMER.getName()) - .description("Time to write out a tablets file reservations for scan").register(registry); + .description(SCAN_RESERVATION_WRITEOUT_TIMER.getDescription()).register(registry); FunctionCounter.builder(SCAN_BUSY_TIMEOUT_COUNT.getName(), busyTimeoutCount, AtomicLong::get) - .description("The number of scans where a busy timeout happened").register(registry); + .description(SCAN_BUSY_TIMEOUT_COUNT.getDescription()).register(registry); FunctionCounter .builder(SCAN_RESERVATION_CONFLICT_COUNTER.getName(), reservationConflictCount, AtomicLong::get) - .description( - "Counts instances where file reservation attempts for scans encountered conflicts") - .register(registry); + .description(SCAN_RESERVATION_CONFLICT_COUNTER.getDescription()).register(registry); if (tabletMetadataCache != null) { Preconditions.checkState(tabletMetadataCache.policy().isRecordingStats(), diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java index 184b628ad2e..b9faa8c8f76 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/ResolvedSortedLog.java @@ -32,8 +32,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Write ahead logs have two paths in DFS. There is the path of the original unsorted walog and the @@ -42,8 +40,6 @@ */ public class ResolvedSortedLog { - private static final Logger log = LoggerFactory.getLogger(ResolvedSortedLog.class); - private final SortedSet children; private final LogEntry origin; private final Path sortedLogDir; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java index 42623f70271..b1c361f9aa8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java @@ -19,7 +19,6 @@ package org.apache.accumulo.tserver.memory; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map.Entry; import java.util.TreeMap; @@ -56,8 +55,6 @@ public class LargestFirstMemoryManager { // The fraction of memory that needs to be used before we begin flushing. private double compactionThreshold; private long maxObserved; - private final HashMap mincIdleThresholds = new HashMap<>(); - private final HashMap mincAgeThresholds = new HashMap<>(); private ServerContext context = null; private static class TabletInfo { @@ -140,15 +137,13 @@ public LargestFirstMemoryManager() { @SuppressWarnings("deprecation") protected long getMinCIdleThreshold(KeyExtent extent) { - TableId tableId = extent.tableId(); - return mincIdleThresholds.computeIfAbsent(tableId, tid -> context.getTableConfiguration(tid) - .getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME)); + return context.getTableConfiguration(extent.tableId()) + .getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME); } protected long getMaxAge(KeyExtent extent) { - TableId tableId = extent.tableId(); - return mincAgeThresholds.computeIfAbsent(tableId, tid -> context.getTableConfiguration(tid) - .getTimeInMillis(Property.TABLE_MINC_COMPACT_MAXAGE)); + return context.getTableConfiguration(extent.tableId()) + .getTimeInMillis(Property.TABLE_MINC_COMPACT_MAXAGE); } protected boolean tableExists(TableId tableId) { @@ -168,9 +163,6 @@ public List tabletsToMinorCompact(List tablets) { final int maxMinCs = maxConcurrentMincs * numWaitingMultiplier; - mincIdleThresholds.clear(); - mincAgeThresholds.clear(); - final List tabletsToMinorCompact = new ArrayList<>(); LargestMap largestMemTablets = new LargestMap(maxMinCs); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java index 26bbd11adbf..7f4567d307d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java @@ -67,61 +67,59 @@ private long getTotalEntriesWritten() { public void registerMetrics(MeterRegistry registry) { FunctionCounter .builder(COMPACTOR_ENTRIES_READ.getName(), this, TabletServerMetrics::getTotalEntriesRead) - .description("Number of entries read by all compactions that have run on this tserver") - .register(registry); + .description(COMPACTOR_ENTRIES_READ.getDescription()).register(registry); FunctionCounter .builder(COMPACTOR_ENTRIES_WRITTEN.getName(), this, TabletServerMetrics::getTotalEntriesWritten) - .description("Number of entries written by all compactions that have run on this tserver") - .register(registry); + .description(COMPACTOR_ENTRIES_WRITTEN.getDescription()).register(registry); LongTaskTimer timer = LongTaskTimer.builder(TSERVER_MAJC_STUCK.getName()) - .description("Number and duration of stuck major compactions").register(registry); + .description(TSERVER_MAJC_STUCK.getDescription()).register(registry); CompactionWatcher.setTimer(timer); Gauge .builder(TSERVER_TABLETS_LONG_ASSIGNMENTS.getName(), util, TabletServerMetricsUtil::getLongTabletAssignments) - .description("Number of tablet assignments that are taking a long time").register(registry); + .description(TSERVER_TABLETS_LONG_ASSIGNMENTS.getDescription()).register(registry); Gauge.builder(TSERVER_ENTRIES.getName(), util, TabletServerMetricsUtil::getEntries) - .description("Number of entries").register(registry); + .description(TSERVER_ENTRIES.getDescription()).register(registry); Gauge.builder(TSERVER_MEM_ENTRIES.getName(), util, TabletServerMetricsUtil::getEntriesInMemory) - .description("Number of entries in memory").register(registry); + .description(TSERVER_MEM_ENTRIES.getDescription()).register(registry); Gauge .builder(TSERVER_MAJC_RUNNING.getName(), util, TabletServerMetricsUtil::getMajorCompactions) - .description("Number of active major compactions").register(registry); + .description(TSERVER_MAJC_RUNNING.getDescription()).register(registry); Gauge .builder(TSERVER_MAJC_QUEUED.getName(), util, TabletServerMetricsUtil::getMajorCompactionsQueued) - .description("Number of queued major compactions").register(registry); + .description(TSERVER_MAJC_QUEUED.getDescription()).register(registry); Gauge .builder(TSERVER_MINC_RUNNING.getName(), util, TabletServerMetricsUtil::getMinorCompactions) - .description("Number of active minor compactions").register(registry); + .description(TSERVER_MINC_RUNNING.getDescription()).register(registry); Gauge .builder(TSERVER_MINC_QUEUED.getName(), util, TabletServerMetricsUtil::getMinorCompactionsQueued) - .description("Number of queued minor compactions").register(registry); + .description(TSERVER_MINC_QUEUED.getDescription()).register(registry); Gauge.builder(TSERVER_TABLETS_ONLINE.getName(), util, TabletServerMetricsUtil::getOnlineCount) - .description("Number of online tablets").register(registry); + .description(TSERVER_TABLETS_ONLINE.getDescription()).register(registry); Gauge.builder(TSERVER_TABLETS_OPENING.getName(), util, TabletServerMetricsUtil::getOpeningCount) - .description("Number of opening tablets").register(registry); + .description(TSERVER_TABLETS_OPENING.getDescription()).register(registry); Gauge .builder(TSERVER_TABLETS_UNOPENED.getName(), util, TabletServerMetricsUtil::getUnopenedCount) - .description("Number of unopened tablets").register(registry); + .description(TSERVER_TABLETS_UNOPENED.getDescription()).register(registry); Gauge .builder(TSERVER_MINC_TOTAL.getName(), util, TabletServerMetricsUtil::getTotalMinorCompactions) - .description("Total number of minor compactions performed").register(registry); + .description(TSERVER_MINC_TOTAL.getDescription()).register(registry); Gauge .builder(TSERVER_TABLETS_FILES.getName(), util, TabletServerMetricsUtil::getAverageFilesPerTablet) - .description("Number of files per tablet").register(registry); + .description(TSERVER_TABLETS_FILES.getDescription()).register(registry); Gauge.builder(TSERVER_HOLD.getName(), util, TabletServerMetricsUtil::getHoldTime) - .description("Time commits held").register(registry); + .description(TSERVER_HOLD.getDescription()).register(registry); Gauge.builder(TSERVER_INGEST_MUTATIONS.getName(), util, TabletServerMetricsUtil::getIngestCount) - .description("Ingest rate (entries/sec)").register(registry); + .description(TSERVER_INGEST_MUTATIONS.getDescription()).register(registry); Gauge.builder(TSERVER_INGEST_BYTES.getName(), util, TabletServerMetricsUtil::getIngestByteCount) - .description("Ingest rate (bytes/sec)").register(registry); + .description(TSERVER_INGEST_BYTES.getDescription()).register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java index 7bc3e5bc856..0ce05e77838 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java @@ -44,11 +44,11 @@ public void addQueued(long value) { @Override public void registerMetrics(MeterRegistry registry) { - activeMinc = Timer.builder(MINC_RUNNING.getName()).description("Minor compactions time active") + activeMinc = Timer.builder(MINC_RUNNING.getName()).description(MINC_RUNNING.getDescription()) .register(registry); - queuedMinc = Timer.builder(MINC_QUEUED.getName()) - .description("Queued minor compactions time queued").register(registry); + queuedMinc = Timer.builder(MINC_QUEUED.getName()).description(MINC_QUEUED.getDescription()) + .register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java index d5848722cba..e5b40c98d9f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java @@ -138,40 +138,39 @@ public long getZombieThreadsCount() { @Override public void registerMetrics(MeterRegistry registry) { Gauge.builder(SCAN_OPEN_FILES.getName(), openFiles::get) - .description("Number of files open for scans").register(registry); - scans = Timer.builder(SCAN_TIMES.getName()).description("Scans").register(registry); + .description(SCAN_OPEN_FILES.getDescription()).register(registry); + scans = Timer.builder(SCAN_TIMES.getName()).description(SCAN_TIMES.getDescription()) + .register(registry); resultsPerScan = DistributionSummary.builder(SCAN_RESULTS.getName()) - .description("Results per scan").register(registry); - yields = - DistributionSummary.builder(SCAN_YIELDS.getName()).description("yields").register(registry); + .description(SCAN_RESULTS.getDescription()).register(registry); + yields = DistributionSummary.builder(SCAN_YIELDS.getName()) + .description(SCAN_YIELDS.getDescription()).register(registry); FunctionCounter.builder(SCAN_START.getName(), this.startScanCalls, AtomicLong::get) - .description("calls to start a scan / multiscan").register(registry); + .description(SCAN_START.getDescription()).register(registry); FunctionCounter.builder(SCAN_CONTINUE.getName(), this.continueScanCalls, AtomicLong::get) - .description("calls to continue a scan / multiscan").register(registry); + .description(SCAN_CONTINUE.getDescription()).register(registry); FunctionCounter.builder(SCAN_CLOSE.getName(), this.closeScanCalls, AtomicLong::get) - .description("calls to close a scan / multiscan").register(registry); + .description(SCAN_CLOSE.getDescription()).register(registry); FunctionCounter .builder(SCAN_BUSY_TIMEOUT_COUNT.getName(), this.busyTimeoutCount, AtomicLong::get) - .description("The number of scans where a busy timeout happened").register(registry); + .description(SCAN_BUSY_TIMEOUT_COUNT.getDescription()).register(registry); FunctionCounter.builder(SCAN_QUERIES.getName(), this.lookupCount, LongAdder::sum) - .description("Number of queries").register(registry); + .description(SCAN_QUERIES.getDescription()).register(registry); FunctionCounter.builder(SCAN_SCANNED_ENTRIES.getName(), this.scannedCount, LongAdder::sum) - .description("Scanned rate").register(registry); + .description(SCAN_SCANNED_ENTRIES.getDescription()).register(registry); FunctionCounter.builder(SCAN_PAUSED_FOR_MEM.getName(), this.pausedForMemory, AtomicLong::get) - .description("scan paused due to server being low on memory").register(registry); + .description(SCAN_PAUSED_FOR_MEM.getDescription()).register(registry); FunctionCounter .builder(SCAN_RETURN_FOR_MEM.getName(), this.earlyReturnForMemory, AtomicLong::get) - .description("scan returned results early due to server being low on memory") - .register(registry); + .description(SCAN_RETURN_FOR_MEM.getDescription()).register(registry); Gauge.builder(SCAN_QUERY_SCAN_RESULTS.getName(), this.queryResultCount, LongAdder::sum) - .description("Query rate (entries/sec)").register(registry); + .description(SCAN_QUERY_SCAN_RESULTS.getDescription()).register(registry); Gauge.builder(SCAN_QUERY_SCAN_RESULTS_BYTES.getName(), this.queryResultBytes, LongAdder::sum) - .description("Query rate (bytes/sec)").register(registry); + .description(SCAN_QUERY_SCAN_RESULTS_BYTES.getDescription()).register(registry); Gauge .builder(SCAN_ZOMBIE_THREADS.getName(), this, TabletServerScanMetrics::getZombieThreadsCount) - .description("Number of scan threads that have no associated client session") - .register(registry); + .description(SCAN_ZOMBIE_THREADS.getDescription()).register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java index 37beff251c3..52453916cce 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java @@ -76,21 +76,21 @@ public void addMutationArraySize(long value) { @Override public void registerMetrics(MeterRegistry registry) { FunctionCounter.builder(UPDATE_ERRORS.getName(), permissionErrorsCount, AtomicLong::get) - .tags("type", "permission").description("Counts permission errors").register(registry); + .tags("type", "permission").description(UPDATE_ERRORS.getDescription()).register(registry); FunctionCounter.builder(UPDATE_ERRORS.getName(), unknownTabletErrorsCount, AtomicLong::get) - .tags("type", "unknown.tablet").description("Counts unknown tablet errors") + .tags("type", "unknown.tablet").description(UPDATE_ERRORS.getDescription()) .register(registry); FunctionCounter.builder(UPDATE_ERRORS.getName(), constraintViolationsCount, AtomicLong::get) - .tags("type", "constraint.violation").description("Counts constraint violations") + .tags("type", "constraint.violation").description(UPDATE_ERRORS.getDescription()) .register(registry); commitPrepStat = Timer.builder(UPDATE_COMMIT_PREP.getName()) - .description("preparing to commit mutations").register(registry); + .description(UPDATE_COMMIT_PREP.getDescription()).register(registry); walogWriteTimeStat = Timer.builder(UPDATE_WALOG_WRITE.getName()) - .description("writing mutations to WAL").register(registry); - commitTimeStat = Timer.builder(UPDATE_COMMIT.getName()).description("committing mutations") - .register(registry); + .description(UPDATE_WALOG_WRITE.getDescription()).register(registry); + commitTimeStat = Timer.builder(UPDATE_COMMIT.getName()) + .description(UPDATE_COMMIT.getDescription()).register(registry); mutationArraySizeStat = DistributionSummary.builder(UPDATE_MUTATION_ARRAY_SIZE.getName()) - .description("mutation array").register(registry); + .description(UPDATE_MUTATION_ARRAY_SIZE.getDescription()).register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index e2a4320d6be..564f25c019d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -340,7 +340,7 @@ private long countZombieScans(long reportTimeMillis) { return Stream.concat(deferredCleanupQueue.stream(), sessions.values().stream()) .filter(session -> { if (session instanceof ScanSession) { - var scanSession = (ScanSession) session; + var scanSession = (ScanSession) session; synchronized (scanSession) { var scanTask = scanSession.getScanTask(); if (scanTask != null && scanSession.getState() == State.REMOVED diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java index add98ce3463..781508318e8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java @@ -35,8 +35,6 @@ import org.apache.accumulo.tserver.TabletHostingServer; import org.apache.accumulo.tserver.TabletServerResourceManager; import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A tablet that can not be written to and operates off of a snapshot of tablet metadata for its @@ -45,8 +43,6 @@ */ public class SnapshotTablet extends TabletBase { - private static final Logger log = LoggerFactory.getLogger(SnapshotTablet.class); - private final TabletHostingServer server; private final SortedMap files; private final TabletServerResourceManager.TabletResourceManager tabletResources; diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java index 8f6b9259ae8..62dd93f2697 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java @@ -32,13 +32,10 @@ import org.apache.accumulo.core.client.admin.ActiveCompaction; import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.util.DurationFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.accumulo.shell.Shell; class ActiveCompactionHelper { - private static final Logger log = LoggerFactory.getLogger(ActiveCompactionHelper.class); - private static final Comparator COMPACTION_AGE_DESCENDING = Comparator.comparingLong(ActiveCompaction::getAge).reversed(); @@ -120,7 +117,7 @@ public static Stream activeCompactionsForServer(String tserver, return instanceOps.getActiveCompactions(tserver).stream().sorted(COMPACTION_AGE_DESCENDING) .map(ActiveCompactionHelper::formatActiveCompactionLine); } catch (Exception e) { - log.debug("Failed to list active compactions for server {}", tserver, e); + Shell.log.debug("Failed to list active compactions for server {}", tserver, e); return Stream.of(tserver + " ERROR " + e.getMessage()); } } diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/DeleteTableCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/DeleteTableCommand.java index 86cd8aa27e9..7d0c90af366 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/DeleteTableCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/DeleteTableCommand.java @@ -28,11 +28,8 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class DeleteTableCommand extends TableOperation { - private static final Logger log = LoggerFactory.getLogger(DeleteTableCommand.class); private Option forceOpt; @@ -78,7 +75,7 @@ protected void pruneTables(Set tables) { String table = tableNames.next(); Pair qualifiedName = TableNameUtil.qualify(table); if (Namespace.ACCUMULO.name().equals(qualifiedName.getFirst())) { - log.trace("Removing table from deletion set: {}", table); + Shell.log.trace("Removing table from deletion set: {}", table); tableNames.remove(); } } diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ListTabletsCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ListTabletsCommand.java index 71b2a18b75d..93adcfe10a3 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ListTabletsCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ListTabletsCommand.java @@ -47,8 +47,6 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -58,8 +56,6 @@ */ public class ListTabletsCommand extends Command { - private static final Logger log = LoggerFactory.getLogger(ListTabletsCommand.class); - private Option outputFileOpt; private Option optTablePattern; private Option optHumanReadable; @@ -70,7 +66,7 @@ public class ListTabletsCommand extends Command { public int execute(String fullCommand, CommandLine cl, Shell shellState) throws Exception { final Set tableInfoSet = populateTables(cl, shellState); if (tableInfoSet.isEmpty()) { - log.warn("No tables found that match your criteria"); + Shell.log.warn("No tables found that match your criteria"); return 0; } boolean humanReadable = cl.hasOption(optHumanReadable.getOpt()); @@ -149,7 +145,7 @@ private Set populateTables(final CommandLine cl, final Shell shellSta TableId id = TableId.of(tableIdString); tableSet.add(new TableInfo(name, id)); } else { - log.warn("Table not found: {}", name); + Shell.log.warn("Table not found: {}", name); } }); return tableSet; @@ -162,7 +158,7 @@ private Set populateTables(final CommandLine cl, final Shell shellSta TableId id = TableId.of(idString); tableSet.add(new TableInfo(table, id)); } else { - log.warn("Table not found: {}", table); + Shell.log.warn("Table not found: {}", table); } return tableSet; } @@ -216,14 +212,14 @@ public int hashCode() { private List getTabletRowInfo(Shell shellState, TableInfo tableInfo) throws Exception { - log.trace("scan metadata for tablet info table name: \'{}\', tableId: \'{}\' ", tableInfo.name, - tableInfo.id); + Shell.log.trace("scan metadata for tablet info table name: \'{}\', tableId: \'{}\' ", + tableInfo.name, tableInfo.id); List tResults = getMetadataInfo(shellState, tableInfo); - if (log.isTraceEnabled()) { + if (Shell.log.isTraceEnabled()) { for (TabletRowInfo tabletRowInfo : tResults) { - log.trace("Tablet info: {}", tabletRowInfo); + Shell.log.trace("Tablet info: {}", tabletRowInfo); } } diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/MaxRowCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/MaxRowCommand.java index 044957d02d5..cd967f445dc 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/MaxRowCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/MaxRowCommand.java @@ -23,13 +23,9 @@ import org.apache.accumulo.shell.Shell; import org.apache.commons.cli.CommandLine; import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class MaxRowCommand extends ScanCommand { - private static final Logger log = LoggerFactory.getLogger(MaxRowCommand.class); - @Override public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws Exception { @@ -47,7 +43,7 @@ public int execute(final String fullCommand, final CommandLine cl, final Shell s shellState.getWriter().println(max); } } catch (Exception e) { - log.debug("Could not get shell state.", e); + Shell.log.debug("Could not get shell state.", e); } return 0; diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ShellPluginConfigurationCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ShellPluginConfigurationCommand.java index 3614c621db6..06e3dcd19b2 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ShellPluginConfigurationCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ShellPluginConfigurationCommand.java @@ -34,7 +34,6 @@ import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; -import org.slf4j.LoggerFactory; public abstract class ShellPluginConfigurationCommand extends Command { private Option removePluginOption, pluginClassOption, listPluginOption; @@ -115,20 +114,16 @@ public static Class getPluginClass(final String tableName, pluginClazz = shellState.getClassLoader(cl, shellState).loadClass(ent.getValue()).asSubclass(clazz); } catch (ClassNotFoundException e) { - LoggerFactory.getLogger(ShellPluginConfigurationCommand.class).error("Class not found {}", - e.getMessage()); + Shell.log.error("Class not found {}", e.getMessage()); return null; } catch (ParseException e) { - LoggerFactory.getLogger(ShellPluginConfigurationCommand.class) - .error("Error parsing table: {} {}", Arrays.toString(args), e.getMessage()); + Shell.log.error("Error parsing table: {} {}", Arrays.toString(args), e.getMessage()); return null; } catch (TableNotFoundException e) { - LoggerFactory.getLogger(ShellPluginConfigurationCommand.class) - .error("Table not found: {} {}", tableName, e.getMessage()); + Shell.log.error("Table not found: {} {}", tableName, e.getMessage()); return null; } catch (Exception e) { - LoggerFactory.getLogger(ShellPluginConfigurationCommand.class).error("Error: {}", - e.getMessage()); + Shell.log.error("Error: {}", e.getMessage()); return null; } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java new file mode 100644 index 00000000000..9769f4c10e4 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.UncheckedIOException; +import java.util.List; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.rpc.clients.TServerClient; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class DebugClientConnectionIT extends AccumuloClusterHarness { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(2); + } + + private List tservers = null; + + @BeforeEach + public void getTServerAddresses() { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + tservers = client.instanceOperations().getTabletServers(); + } + assertNotNull(tservers); + assertEquals(2, tservers.size()); + } + + @Test + public void testPreferredConnection() throws Exception { + System.setProperty(TServerClient.DEBUG_HOST, tservers.get(0)); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + assertNotNull(client.instanceOperations().getSiteConfiguration()); + } + System.setProperty(TServerClient.DEBUG_HOST, tservers.get(1)); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + assertNotNull(client.instanceOperations().getSiteConfiguration()); + } + System.setProperty(TServerClient.DEBUG_HOST, "localhost:1"); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + assertThrows(UncheckedIOException.class, + () -> client.instanceOperations().getSiteConfiguration()); + } + } +}