diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 89ed19c23..bb9f8828f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -69,6 +69,7 @@ import org.opensearch.alerting.transport.TransportIndexMonitorAction import org.opensearch.alerting.transport.TransportSearchEmailAccountAction import org.opensearch.alerting.transport.TransportSearchEmailGroupAction import org.opensearch.alerting.transport.TransportSearchMonitorAction +import org.opensearch.alerting.util.ClusterMetricsCoordinator import org.opensearch.alerting.util.DocLevelMonitorQueries import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator import org.opensearch.client.Client @@ -110,6 +111,7 @@ import java.util.function.Supplier * It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY], [QueryLevelTrigger.XCONTENT_REGISTRY], * [BucketLevelTrigger.XCONTENT_REGISTRY], [ClusterMetricsInput.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects. */ + internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, PercolatorPluginExt() { override fun getContextWhitelists(): Map, List> { @@ -141,6 +143,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R lateinit var alertIndices: AlertIndices lateinit var clusterService: ClusterService lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator + lateinit var clusterMetricsCoordinator: ClusterMetricsCoordinator override fun getRestHandlers( settings: Settings, @@ -212,10 +215,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R nodeEnvironment: NodeEnvironment, namedWriteableRegistry: NamedWriteableRegistry, indexNameExpressionResolver: IndexNameExpressionResolver, - repositoriesServiceSupplier: Supplier + repositoriesServiceSupplier: Supplier, + ): Collection { // Need to figure out how to use the OpenSearch DI classes rather than handwiring things here. val settings = environment.settings() + alertIndices = AlertIndices(settings, client, threadPool, clusterService) runner = MonitorRunnerService .registerClusterService(clusterService) @@ -236,9 +241,18 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R scheduler = JobScheduler(threadPool, runner) sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES) destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices) + clusterMetricsCoordinator = ClusterMetricsCoordinator(settings, client, clusterService, threadPool) this.threadPool = threadPool this.clusterService = clusterService - return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator) + return listOf( + sweeper, + scheduler, + runner, + scheduledJobIndices, + docLevelMonitorQueries, + destinationMigrationCoordinator, + clusterMetricsCoordinator + ) } override fun getSettings(): List> { @@ -300,7 +314,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.FINDING_HISTORY_MAX_DOCS, AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE, AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD, - AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD + AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD, + AlertingSettings.METRICS_STORE_TIME, + AlertingSettings.METRICS_EXECUTION_FREQUENCY ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/ClusterMetricsDataPoint.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/ClusterMetricsDataPoint.kt new file mode 100644 index 000000000..1404c2d08 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/ClusterMetricsDataPoint.kt @@ -0,0 +1,72 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder + +data class ClusterMetricsDataPoint( + var metric: MetricType, + var timestamp: String, + var value: String, + var minimum: String? = null, + var maximum: String? = null +) : ToXContent { + + companion object { + val TIMESTAMP_FIELD = "timestamp" + val VALUE_FIELD = "value" + val MINIMUM_FIELD = "minimum" + val MAXIMUM_FIELD = "maximum" + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + val output = builder.startObject() + .startObject(metric.metricName) + .field(TIMESTAMP_FIELD, timestamp) + .field(VALUE_FIELD, value) + if (metric === MetricType.JVM_PRESSURE || metric === MetricType.CPU_USAGE) { + builder.field(MINIMUM_FIELD, minimum) + builder.field(MAXIMUM_FIELD, maximum) + } + output + .endObject() + .endObject() + return output + } + + enum class MetricType( + val metricName: String + ) { + CLUSTER_STATUS( + "cluster_status" + ), + JVM_PRESSURE( + "jvm_pressure" + ), + CPU_USAGE( + "cpu_usage" + ), + UNASSIGNED_SHARDS( + "unassigned_shards" + ), + NUMBER_OF_PENDING_TASKS( + "number_of_pending_tasks" + ), + ACTIVE_SHARDS( + "active_shards" + ), + RELOCATING_SHARDS( + "relocating_shards" + ), + NUMBER_OF_NODES( + "number_of_nodes" + ), + NUMBER_OF_DATA_NODES( + "number_of_data_nodes" + ) + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 1268703c9..846f483b7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -13,14 +13,33 @@ import java.util.concurrent.TimeUnit /** * settings specific to [AlertingPlugin]. These settings include things like history index max age, request timeout, etc... */ +private val log = org.apache.logging.log4j.LogManager.getLogger(AlertingSettings::class.java) + class AlertingSettings { companion object { - const val MONITOR_MAX_INPUTS = 1 const val MONITOR_MAX_TRIGGERS = 10 const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L + val MINIMUM_EXECUTION_FREQUENCY = TimeValue(1, TimeUnit.SECONDS) + val MINIMUM_STORAGE_TIME = TimeValue(1, TimeUnit.MINUTES) + + val METRICS_EXECUTION_FREQUENCY_DEFAULT_VALUE = TimeValue(15, TimeUnit.MINUTES) + val METRICS_STORE_TIME_DEFAULT_VALUE = TimeValue(7, TimeUnit.DAYS) + + val METRICS_EXECUTION_FREQUENCY_DEFAULT = Setting.positiveTimeSetting( + "plugins.alerting.cluster_metrics.execution_frequency", + METRICS_EXECUTION_FREQUENCY_DEFAULT_VALUE, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + val METRICS_STORE_TIME_DEFAULT = Setting.positiveTimeSetting( + "plugins.alerting.cluster_metrics.metrics_history_max_age", + METRICS_STORE_TIME_DEFAULT_VALUE, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + val ALERTING_MAX_MONITORS = Setting.intSetting( "plugins.alerting.monitor.max_monitors", LegacyOpenDistroAlertingSettings.ALERTING_MAX_MONITORS, @@ -156,5 +175,70 @@ class AlertingSettings { -1L, Setting.Property.NodeScope, Setting.Property.Dynamic ) + + val METRICS_STORE_TIME = Setting.timeSetting( + METRICS_STORE_TIME_DEFAULT.key, + METRICS_STORE_TIME_DEFAULT, + StorageValidator(), + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + val METRICS_EXECUTION_FREQUENCY = Setting.timeSetting( + METRICS_EXECUTION_FREQUENCY_DEFAULT.key, + METRICS_EXECUTION_FREQUENCY_DEFAULT, + ExecutionValidator(), + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + internal class ExecutionValidator : Setting.Validator { + override fun validate(value: TimeValue) {} + + override fun validate(value: TimeValue, settings: Map, Any>) { + val storageTime = settings[METRICS_STORE_TIME] as TimeValue + log.info("THIS IS STORAGETIME $storageTime") + validateSettings(value, storageTime) + } + + override fun settings(): MutableIterator> { + val settings = mutableListOf>( + METRICS_EXECUTION_FREQUENCY, + METRICS_STORE_TIME + ) + return settings.iterator() + } + } + + internal class StorageValidator : Setting.Validator { + override fun validate(value: TimeValue) {} + + override fun validate(value: TimeValue, settings: Map, Any>) { + val executionTime = settings[METRICS_EXECUTION_FREQUENCY] as TimeValue + log.info("THIS IS EXECUTIONTIME AS MILLIS ${executionTime.millis()}") + log.info("THIS IS EXECUTIONTIME $executionTime") + validateSettings(executionTime, value) + } + + override fun settings(): MutableIterator> { + val settings = mutableListOf>( + METRICS_EXECUTION_FREQUENCY, + METRICS_STORE_TIME + ) + return settings.iterator() + } + } + private fun validateSettings(executionFrequency: TimeValue, storageTime: TimeValue) { + if (executionFrequency > storageTime) { + throw IllegalArgumentException( + "The execution frequency should be less than the storage time." + ) + } else if (executionFrequency < MINIMUM_EXECUTION_FREQUENCY) { + throw IllegalArgumentException( + "The execution frequency can not be less than $MINIMUM_EXECUTION_FREQUENCY" + ) + } else if (storageTime < MINIMUM_STORAGE_TIME) { + throw IllegalArgumentException( + "The storage time can not be less than $MINIMUM_STORAGE_TIME." + ) + } + } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/ClusterMetricsCoordinator.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/ClusterMetricsCoordinator.kt new file mode 100644 index 000000000..53ef1f7b1 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/ClusterMetricsCoordinator.kt @@ -0,0 +1,273 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.opensearch.action.ActionListener +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.index.IndexResponse +import org.opensearch.alerting.model.ClusterMetricsDataPoint +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.settings.AlertingSettings.Companion.METRICS_EXECUTION_FREQUENCY +import org.opensearch.alerting.settings.AlertingSettings.Companion.METRICS_STORE_TIME +import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient +import org.opensearch.cluster.ClusterChangedEvent +import org.opensearch.cluster.ClusterStateListener +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.component.LifecycleListener +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.BulkByScrollResponse +import org.opensearch.index.reindex.DeleteByQueryAction +import org.opensearch.index.reindex.DeleteByQueryRequestBuilder +import org.opensearch.threadpool.Scheduler +import org.opensearch.threadpool.ThreadPool +import java.time.Instant +import java.util.* +import kotlin.coroutines.CoroutineContext + +private val log = org.apache.logging.log4j.LogManager.getLogger(ClusterMetricsCoordinator::class.java) + +class ClusterMetricsCoordinator( + private val settings: Settings, + private val client: Client, + private val clusterService: ClusterService, + private val threadPool: ThreadPool +) : ClusterStateListener, CoroutineScope, LifecycleListener() { + + @Volatile private var metricsExecutionFrequency = METRICS_EXECUTION_FREQUENCY.get(settings) + @Volatile private var metricsStoreTime = METRICS_STORE_TIME.get(settings) + private var dataPointCollectionDeletionJob: Scheduler.Cancellable? = null + companion object { + @Volatile + var isRunningFlag = false + internal set + var isDeletionUpdated = false + internal set + var isCollectionUpdated = false + internal set + } + + override val coroutineContext: CoroutineContext + get() = Dispatchers.Default + CoroutineName("ClusterMetricsCoordinator") + + init { + clusterService.addListener(this) + clusterService.addLifecycleListener(this) + clusterService.clusterSettings.addSettingsUpdateConsumer(METRICS_EXECUTION_FREQUENCY) { + metricsExecutionFrequency = it + isCollectionUpdated = true + } + clusterService.clusterSettings.addSettingsUpdateConsumer(METRICS_STORE_TIME) { + metricsStoreTime = it + isDeletionUpdated = true + } + } + + override fun clusterChanged(event: ClusterChangedEvent?) { + val scheduledJobCollection = Runnable { + launch { + createDocs(client as NodeClient, clusterService) + deleteDocs(client) + } + } + if (isCollectionUpdated || isDeletionUpdated) { + dataPointCollectionDeletionJob?.cancel() + log.info("Cancelled data collection and deletion jobs") + isRunningFlag = false + log.info("detected changes to settings, resetting running, deletion and collection flags to false") + isDeletionUpdated = false + isCollectionUpdated = false + } + if (event!!.localNodeMaster() && !isRunningFlag) { + log.info("cluster changed metricsExecutionFrequency = $metricsExecutionFrequency") + dataPointCollectionDeletionJob = threadPool.scheduleWithFixedDelay( + scheduledJobCollection, + metricsExecutionFrequency, + ThreadPool.Names.SYSTEM_WRITE + ) + isRunningFlag = true + } + } + + private suspend fun createDocs(client: NodeClient, clusterService: ClusterService) { + val currentTime = Instant.now().toString() + log.info("This is the current time: $currentTime") + val clusterHealth = client.admin().cluster().health(ClusterHealthRequest()).get().toMap() + val nodeStats = client.admin().cluster().nodesStats(NodesStatsRequest().addMetrics("process", "jvm")).get().toMap() + + ClusterMetricsVisualizationIndex.initFunc(client, clusterService) + + val unassignedShards = clusterHealth[ClusterMetricsDataPoint.MetricType.UNASSIGNED_SHARDS.metricName].toString() + log.info("This is unassigned shards value: $unassignedShards") + val clusterStatus = clusterHealth["status"].toString() + log.info("This is cluster status value: $clusterStatus") + val numPending = clusterHealth[ClusterMetricsDataPoint.MetricType.NUMBER_OF_PENDING_TASKS.metricName].toString() + log.info("This is the number of pending tasks: $numPending") + val activeShards = clusterHealth[ClusterMetricsDataPoint.MetricType.ACTIVE_SHARDS.metricName].toString() + log.info("This is active shards $activeShards") + val relocatingShards = clusterHealth[ClusterMetricsDataPoint.MetricType.RELOCATING_SHARDS.metricName].toString() + log.info("This is relocating shards $relocatingShards") + val numNodes = clusterHealth[ClusterMetricsDataPoint.MetricType.NUMBER_OF_NODES.metricName].toString() + log.info("This is number of nodes $numNodes") + val numDataNodes = clusterHealth[ClusterMetricsDataPoint.MetricType.NUMBER_OF_DATA_NODES.metricName].toString() + log.info("this is number of data nodes $numDataNodes") + + val nodesMap = nodeStats["nodes"] as Map + val keys = nodesMap.keys + val jvmData = arrayListOf() + val cpuData = arrayListOf() + + for (key in keys) { + val keyData = nodesMap[key] as Map + val processMap = keyData["process"] as Map + val cpuMap = processMap["cpu"] as Map + val percent = cpuMap["percent"] + cpuData.add(percent as Int) + + val jvmMap = keyData["jvm"] as Map + val memMap = jvmMap["mem"] as Map + val pressure = memMap["heap_used_percent"] + jvmData.add(pressure as Int) + } + + val minimumCPU = Collections.min(cpuData).toString() + val maximumCPU = Collections.max(cpuData).toString() + log.info("This is minimum CPU Usage, $minimumCPU") + log.info("This is maximum CPU usage, $maximumCPU") + + val minimumJVM = Collections.min(jvmData).toString() + val maximumJVM = Collections.max(jvmData).toString() + log.info("This is minimum JVM, $minimumJVM") + log.info("This is maximum JVM, $maximumJVM") + + var avgCPUcalc = 0.0 + var avgJVMcalc = 0.0 + + for (i in cpuData.indices) { + avgCPUcalc += cpuData[i] + avgJVMcalc += jvmData[i] + } + + avgCPUcalc /= cpuData.size + avgJVMcalc /= jvmData.size + + val avgCPU = avgCPUcalc.toString() + val avgJVM = avgJVMcalc.toString() + log.info("This is average CPU, $avgCPU") + log.info("This is average JVM, $avgJVM") + + val dataPoints = arrayListOf( + ClusterMetricsDataPoint( + ClusterMetricsDataPoint.MetricType.CLUSTER_STATUS, + currentTime, + clusterStatus + ), + ClusterMetricsDataPoint( + ClusterMetricsDataPoint.MetricType.UNASSIGNED_SHARDS, + currentTime, + unassignedShards + ), + ClusterMetricsDataPoint( + ClusterMetricsDataPoint.MetricType.CPU_USAGE, + currentTime, + avgCPU, + minimumCPU, + maximumCPU + ), + ClusterMetricsDataPoint( + ClusterMetricsDataPoint.MetricType.JVM_PRESSURE, + currentTime, + avgJVM, + minimumJVM, + maximumJVM + ), + ClusterMetricsDataPoint( + ClusterMetricsDataPoint.MetricType.NUMBER_OF_PENDING_TASKS, + currentTime, + numPending + ), + ClusterMetricsDataPoint( + ClusterMetricsDataPoint.MetricType.ACTIVE_SHARDS, + currentTime, + activeShards + ), + ClusterMetricsDataPoint( + ClusterMetricsDataPoint.MetricType.RELOCATING_SHARDS, + currentTime, + relocatingShards + ), + ClusterMetricsDataPoint( + ClusterMetricsDataPoint.MetricType.NUMBER_OF_NODES, + currentTime, + numNodes + ), + ClusterMetricsDataPoint( + ClusterMetricsDataPoint.MetricType.NUMBER_OF_DATA_NODES, + currentTime, + numDataNodes + ) + ) + dataPoints.forEach { clusterMetricsDataPoint -> + try { + val request = IndexRequest(ClusterMetricsVisualizationIndex.CLUSTER_METRIC_VISUALIZATION_INDEX) + .source( + clusterMetricsDataPoint.toXContent( + XContentFactory.jsonBuilder(), + ToXContent.MapParams(mapOf("with_type" to "true")) + ) + ) + val indexResponse: IndexResponse = client.suspendUntil { client.index(request, it) } + val failureReasons = checkShardsFailure(indexResponse) + if (failureReasons != null) { + log.error("Failed to index ${clusterMetricsDataPoint.metric}.", failureReasons) + } + } catch (t: Exception) { + log.error("Failed to index ${clusterMetricsDataPoint.metric}.", t) + } + } + } + + private fun checkShardsFailure(response: IndexResponse): String? { + val failureReasons = StringBuilder() + if (response.shardInfo.failed > 0) { + response.shardInfo.failures.forEach { + entry -> + failureReasons.append(entry.reason()) + } + return failureReasons.toString() + } + return null + } + + private fun deleteDocs(client: NodeClient) { + val documentAge = metricsStoreTime.toString() + + ClusterMetricsDataPoint.MetricType.values().forEach { + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(ClusterMetricsVisualizationIndex.CLUSTER_METRIC_VISUALIZATION_INDEX) + .filter(QueryBuilders.rangeQuery(it.metricName + ".timestamp").lte("now-$documentAge")) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) { + } + + override fun onFailure(t: Exception) { + } + } + ) + log.info("Deleted ${it.metricName} data from $documentAge ago.") + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/ClusterMetricsVisualizationIndex.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/ClusterMetricsVisualizationIndex.kt new file mode 100644 index 000000000..4ad0795a1 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/ClusterMetricsVisualizationIndex.kt @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import org.apache.logging.log4j.LogManager +import org.opensearch.ResourceAlreadyExistsException +import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.threadpool.ThreadPool + +private val log = LogManager.getLogger(ClusterMetricsVisualizationIndex::class.java) + +class ClusterMetricsVisualizationIndex( + private val client: Client, + private val clusterService: ClusterService, + private val threadPool: ThreadPool +) { + + companion object { + /** The index name pattern for all cluster metric visualizations indices */ + val CLUSTER_METRIC_VISUALIZATION_INDEX = ".opendistro-alerting-cluster-metrics" + + @JvmStatic + fun clusterMetricsVisualizationsMappings(): String { + return ClusterMetricsVisualizationIndex::class.java.classLoader.getResource("mappings/metrics-visualizations.json").readText() + } + suspend fun initFunc(client: Client, clusterService: ClusterService) { + if (!clusterMetricsVisualizationIndexExists(clusterService)) { + val indexRequest = CreateIndexRequest(CLUSTER_METRIC_VISUALIZATION_INDEX) + .mapping(clusterMetricsVisualizationsMappings()) + .settings( + Settings.builder().put("index.hidden", true) + .build() + ) + try { + val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.admin().indices().create(indexRequest, it) } + createIndexResponse.isAcknowledged + } catch (e: ResourceAlreadyExistsException) { + log.info("Index already exists.") + true + } + } + } + fun clusterMetricsVisualizationIndexExists(clusterService: ClusterService): Boolean { + val clusterState = clusterService.state() + return clusterState.routingTable.hasIndex(CLUSTER_METRIC_VISUALIZATION_INDEX) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationCoordinator.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationCoordinator.kt index 5cb37c519..eb22e6275 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationCoordinator.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationCoordinator.kt @@ -36,8 +36,7 @@ class DestinationMigrationCoordinator( private var scheduledMigration: Scheduler.Cancellable? = null - @Volatile - private var runningLock = false + @Volatile private var runningLock = false init { clusterService.addListener(this) @@ -46,6 +45,7 @@ class DestinationMigrationCoordinator( override fun clusterChanged(event: ClusterChangedEvent) { logger.info("Detected cluster change event for destination migration") + if (DestinationMigrationUtilService.finishFlag) { logger.info("Reset destination migration process.") scheduledMigration?.cancel() @@ -92,7 +92,6 @@ class DestinationMigrationCoordinator( logger.info("Cancel background destination migration process.") scheduledMigration?.cancel() } - logger.info("Performing migration of destination data.") DestinationMigrationUtilService.migrateDestinations(client as NodeClient) } catch (e: Exception) { @@ -100,7 +99,6 @@ class DestinationMigrationCoordinator( } } } - scheduledMigration = threadPool.scheduleWithFixedDelay(scheduledJob, TimeValue.timeValueMinutes(1), ThreadPool.Names.MANAGEMENT) } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TriggerServiceTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TriggerServiceTests.kt index d4b7d63a4..1fe13970d 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TriggerServiceTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TriggerServiceTests.kt @@ -37,9 +37,18 @@ class TriggerServiceTests : OpenSearchTestCase() { val trigger = randomBucketLevelTrigger(bucketSelector = bucketSelectorExtAggregationBuilder) val monitor = randomBucketLevelMonitor(triggers = listOf(trigger)) - val inputResultsStr = "{\"_shards\":{\"total\":1,\"failed\":0,\"successful\":1,\"skipped\":0},\"hits\":{\"hits\":[{\"_index\":\"sample-http-responses\",\"_type\":\"http\",\"_source\":{\"status_code\":100,\"http_4xx\":0,\"http_3xx\":0,\"http_5xx\":0,\"http_2xx\":0,\"timestamp\":100000,\"http_1xx\":1},\"_id\":1,\"_score\":1}],\"total\":{\"value\":4,\"relation\":\"eq\"},\"max_score\":1},\"took\":37,\"timed_out\":false,\"aggregations\":{\"status_code\":{\"doc_count_error_upper_bound\":0,\"sum_other_doc_count\":0,\"buckets\":[{\"doc_count\":2,\"key\":100},{\"doc_count\":1,\"key\":102},{\"doc_count\":1,\"key\":201}]},\"${trigger.id}\":{\"parent_bucket_path\":\"status_code\",\"bucket_indices\":[0,1,2]}}}" - - val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputResultsStr) + val inputResultsStr = "{\"_shards\":{\"total\":1,\"failed\":0,\"successful\":1,\"skipped\":0},\"hits\":{\"hits\":[{\"_index\":" + + "\"sample-http-responses\",\"_type\":\"http\",\"_source\":{\"status_code\":100,\"http_4xx\":0,\"http_3xx\":0," + + "\"http_5xx\":0,\"http_2xx\":0,\"timestamp\":100000,\"http_1xx\":1},\"_id\":1,\"_score\":1}]," + + "\"total\":{\"value\":4,\"relation\":\"eq\"},\"max_score\":1},\"took\":37,\"timed_out\":false," + + "\"aggregations\":{\"status_code\":{\"doc_count_error_upper_bound\":0,\"sum_other_doc_count\":0," + + "\"buckets\":[{\"doc_count\":2,\"key\":100},{\"doc_count\":1,\"key\":102},{\"doc_count\":1,\"key\":201}]}," + + "\"${trigger.id}\":{\"parent_bucket_path\":\"status_code\",\"bucket_indices\":[0,1,2]}}}" + + val parser = XContentType.JSON.xContent().createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + inputResultsStr + ) val inputResults = parser.map() @@ -60,9 +69,28 @@ class TriggerServiceTests : OpenSearchTestCase() { val trigger = randomBucketLevelTrigger(bucketSelector = bucketSelectorExtAggregationBuilder) val monitor = randomBucketLevelMonitor(triggers = listOf(trigger)) - val inputResultsStr = "{\"_shards\":{\"total\":1, \"failed\":0, \"successful\":1, \"skipped\":0}, \"hits\":{\"hits\":[{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":100, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":100000, \"http_1xx\":1}, \"_id\":1, \"_score\":1.0}, {\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":102, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":160000, \"http_1xx\":1}, \"_id\":2, \"_score\":1.0}, {\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":100, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":220000, \"http_1xx\":1}, \"_id\":4, \"_score\":1.0}, {\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":201, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":1, \"timestamp\":280000, \"http_1xx\":0}, \"_id\":5, \"_score\":1.0}], \"total\":{\"value\":4, \"relation\":\"eq\"}, \"max_score\":1.0}, \"took\":15, \"timed_out\":false, \"aggregations\":{\"${trigger.id}\":{\"parent_bucket_path\":\"status_code\", \"bucket_indices\":[0, 1, 2]}, \"status_code\":{\"buckets\":[{\"doc_count\":2, \"key\":{\"status_code\":100}}, {\"doc_count\":1, \"key\":{\"status_code\":102}}, {\"doc_count\":1, \"key\":{\"status_code\":201}}], \"after_key\":{\"status_code\":201}}}}" - - val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputResultsStr) + val inputResultsStr = "{\"_shards\":{\"total\":1, \"failed\":0, \"successful\":1, \"skipped\":0}, " + + "\"hits\":{\"hits\":[{\"_index\":\"sample-http-responses\", \"_type\":\"http\", " + + "\"_source\":{\"status_code\":100, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, " + + "\"http_2xx\":0, \"timestamp\":100000, \"http_1xx\":1}, \"_id\":1, \"_score\":1.0}, " + + "{\"_index\":\"sample-http-responses\", \"_type\":\"http\", \"_source\":{\"status_code\":102, " + + "\"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, \"timestamp\":160000, \"http_1xx\":1}, " + + "\"_id\":2, \"_score\":1.0}, {\"_index\":\"sample-http-responses\", " + + "\"_type\":\"http\", \"_source\":{\"status_code\":100, " + + "\"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":0, " + + "\"timestamp\":220000, \"http_1xx\":1}, \"_id\":4, \"_score\":1.0}, {\"_index\":\"sample-http-responses\", " + + "\"_type\":\"http\", \"_source\":{\"status_code\":201, \"http_4xx\":0, \"http_3xx\":0, \"http_5xx\":0, \"http_2xx\":1, " + + "\"timestamp\":280000, \"http_1xx\":0}, \"_id\":5, \"_score\":1.0}], \"total\":{\"value\":4, \"relation\":\"eq\"}, " + + "\"max_score\":1.0}, \"took\":15, \"timed_out\":false, " + + "\"aggregations\":{\"${trigger.id}\":{\"parent_bucket_path\":\"status_code\", \"bucket_indices\":[0, 1, 2]}, " + + "\"status_code\":{\"buckets\":[{\"doc_count\":2, \"key\":{\"status_code\":100}}, " + + "{\"doc_count\":1, \"key\":{\"status_code\":102}}, {\"doc_count\":1, \"key\":{\"status_code\":201}}], " + + "\"after_key\":{\"status_code\":201}}}}" + + val parser = XContentType.JSON.xContent().createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + inputResultsStr + ) val inputResults = parser.map() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/clustermetricscoordinatortest/ClusterMetricsCoordinatorIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/clustermetricscoordinatortest/ClusterMetricsCoordinatorIT.kt new file mode 100644 index 000000000..cbaf24088 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/clustermetricscoordinatortest/ClusterMetricsCoordinatorIT.kt @@ -0,0 +1,255 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util.clustermetricscoordinatortest + +import org.apache.http.entity.ContentType +import org.apache.http.entity.ContentType.APPLICATION_JSON +import org.apache.http.entity.StringEntity +import org.junit.After +import org.junit.Before +import org.opensearch.alerting.AlertingRestTestCase +import org.opensearch.alerting.makeRequest +import org.opensearch.alerting.model.ClusterMetricsDataPoint +import org.opensearch.alerting.opensearchapi.string +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.ClusterMetricsVisualizationIndex +import org.opensearch.client.Response +import org.opensearch.client.ResponseException +import org.opensearch.common.xcontent.XContentFactory.jsonBuilder +import org.opensearch.common.xcontent.XContentType +import org.opensearch.rest.RestStatus +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.* +import kotlin.collections.ArrayList +class ClusterMetricsCoordinatorIT : AlertingRestTestCase() { + @Before + fun setup() { + // When setting up the tests, change the execution frequency and history max age settings to 1 minute and 10 minutes from + // 15 minutes and 7 days. + generateData() + val response = getSettings() + val persistentMap = response["persistent"] as Map + val executionFrequency = persistentMap["plugins.alerting.cluster_metrics.execution_frequency"].toString() + val storageTime = persistentMap["plugins.alerting.cluster_metrics.metrics_history_max_age"].toString() + assertEquals(executionFrequency, "1m") + assertEquals(storageTime, "10m") + } + + fun `test check name of index`() { + // WHEN + THEN, check whether the created index exists and has the name '.opendistro-alerting-cluster-metrics' + val index = ClusterMetricsVisualizationIndex.CLUSTER_METRIC_VISUALIZATION_INDEX + val response = client().makeRequest("HEAD", index) + assertEquals("Index $index does not exist.", RestStatus.OK, response.restStatus()) + } + + fun `test check that number of documents is correct`() { + // Check that the total number of documents found is divisible by the total number of metric types. + val response = getResponse() + val hits = getHits(response) + val numberOfDocsFound = (hits["total"]?.get("value") as Int) + val size = ClusterMetricsDataPoint.MetricType.values().size + assertEquals((numberOfDocsFound.mod(size)), 0) + val docs = hits["hits"] as ArrayList> + + // check that each of the metric types has a unique timestamp, and number of timestamps must be equal to total docs divided by 7 + // expect that there should only be one document created for each metric type at a time. + val mapCheck = hashMapOf>() + ClusterMetricsDataPoint.MetricType.values().forEach { mapCheck[it.metricName] = mutableSetOf() } + for (doc in docs) { + val source = doc["_source"] as Map> + val metricType = source.keys.first() + try { + ClusterMetricsDataPoint.MetricType.valueOf(metricType.uppercase(Locale.getDefault())) + mapCheck[metricType] = mapCheck[metricType]?.plus(source[metricType]?.get("timestamp")) as Set + } catch (e: java.lang.IllegalArgumentException) { + logger.info("Key does not exist in the enum class.") + } + } + assertEquals(mapCheck.values.toSet().size, 1) + } + + fun `test deleting docs from index`() { + val time = getTime() + createDoc(time) + Thread.sleep(60000) + val response = getResponse() + val hits = getHits(response) + var flag = false + val docs = hits["hits"] as ArrayList> + + for (doc in docs) { + val source = doc["_source"] as Map> + val metricType = source.keys.first() + if (metricType == "cluster_status") { + if (source[metricType]?.get("timestamp") == time) { + assertTrue(flag) + return + } + } + } + flag = true + assertTrue(flag) + } + + fun `test execution frequency of job`() { + client().updateSettings("plugins.alerting.cluster_metrics.execution_frequency", "2m") + Thread.sleep(300000) + + val response = getResponse() + val hits = getHits(response) + val docs = hits["hits"] as ArrayList> + val times = mutableSetOf() + + for (doc in docs) { + val source = doc["_source"] as Map> + val metricType = source.keys.first() + times.add(source[metricType]?.get("timestamp").toString()) + } + logger.info("this is times $times") + val time1 = Instant.parse(times.elementAt(times.size - 1)) + val time2 = Instant.parse(times.elementAt(times.size - 2)) + val diff = time2.until(time1, ChronoUnit.MINUTES) + assertEquals(diff, 2) + } + + fun `test update storage time to less than minimum storage time`() { + try { + client().updateSettings("plugins.alerting.cluster_metrics.metrics_history_max_age", "30s") + } catch (t: ResponseException) { + val responseMap = t.response.asMap() + val errMap = responseMap["error"] as Map + assertEquals("illegal_argument_exception", errMap["type"]) + } + } + + fun `test update frequency to less than minimum frequency`() { + try { + client().updateSettings("plugins.alerting.cluster_metrics.execution_frequency", "1ms") + } catch (t: ResponseException) { + val responseMap = t.response.asMap() + val errMap = responseMap["error"] as Map + assertEquals("illegal_argument_exception", errMap["type"]) + } + } + + fun `test update execution frequency greater than storage time`() { + try { + client().updateSettings("plugins.alerting.cluster_metrics.metrics_history_max_age", "20m") + client().updateSettings("plugins.alerting.cluster_metrics.execution_frequency", "25m") + } catch (t: ResponseException) { + val responseMap = t.response.asMap() + val errMap = responseMap["error"] as Map + assertEquals("illegal_argument_exception", errMap["type"]) + } + } + + fun `test successful client setting update`() { + client().updateSettings("plugins.alerting.cluster_metrics.metrics_history_max_age", "400m") + client().updateSettings("plugins.alerting.cluster_metrics.execution_frequency", "12m") + val response = getSettings() + val persistentMap = response["persistent"] as Map + val executionFrequency = persistentMap["plugins.alerting.cluster_metrics.execution_frequency"].toString() + val storageTime = persistentMap["plugins.alerting.cluster_metrics.metrics_history_max_age"].toString() + assertEquals(executionFrequency, "12m") + assertEquals(storageTime, "400m") + } + + fun `test simultaneously changing execution frequency and storage time where execution less than storage time`() { + val settings = jsonBuilder() + .startObject() + .startObject("persistent") + .field("plugins.alerting.cluster_metrics.execution_frequency", "5m") + .field("plugins.alerting.cluster_metrics.metrics_history_max_age", "10m") + .endObject() + .endObject() + .string() + val response = client().makeRequest("PUT", "_cluster/settings", StringEntity(settings, APPLICATION_JSON)) + assertEquals(RestStatus.OK, response.restStatus()) + } + fun `test simultaneously changing execution frequency and storage time where execution greater than storage time`() { + val settings = jsonBuilder() + .startObject() + .startObject("persistent") + .field("plugins.alerting.cluster_metrics.execution_frequency", "10m") + .field("plugins.alerting.cluster_metrics.metrics_history_max_age", "5m") + .endObject() + .endObject() + .string() + try { + client().makeRequest("PUT", "_cluster/settings", StringEntity(settings, APPLICATION_JSON)) + } catch (t: ResponseException) { + val responseMap = t.response.asMap() + val errMap = responseMap["error"] as Map + assertEquals("illegal_argument_exception", errMap["type"]) + } + } + + @After + // Reset the settings back to default, delete the created index. + fun cleanup() { + // reset settings + client().updateSettings( + "plugins.alerting.cluster_metrics.metrics_history_max_age", + AlertingSettings.METRICS_STORE_TIME_DEFAULT_VALUE + ) + client().updateSettings( + "plugins.alerting.cluster_metrics.execution_frequency", + AlertingSettings.METRICS_EXECUTION_FREQUENCY_DEFAULT_VALUE + ) + client().makeRequest("DELETE", ClusterMetricsVisualizationIndex.CLUSTER_METRIC_VISUALIZATION_INDEX) + } + private fun generateData() { + client().updateSettings("plugins.alerting.cluster_metrics.execution_frequency", "1s") + client().updateSettings("plugins.alerting.cluster_metrics.metrics_history_max_age", "10m") + Thread.sleep(60000) + client().updateSettings("plugins.alerting.cluster_metrics.execution_frequency", "1m") + } + + private fun getResponse(): Response { + val settings = jsonBuilder() + .startObject() + .field("size", 10000) + .endObject() + .string() + return client().makeRequest( + "GET", + ".opendistro-alerting-cluster-metrics/_search", + StringEntity(settings, ContentType.APPLICATION_JSON) + ) + } + + private fun createDoc(time: Instant?) { + val doc = jsonBuilder() + .startObject() + .startObject("cluster_status") + .field("timestamp", time.toString()) + .field("value", "yellow") + .endObject() + .endObject() + .string() + client().makeRequest( + "POST", + ".opendistro-alerting-cluster-metrics/_doc", + StringEntity(doc, ContentType.APPLICATION_JSON) + ) + } + private fun getTime(): Instant? { + return Instant.now().minus(10, ChronoUnit.MINUTES).minus(1, ChronoUnit.MINUTES) + } + + private fun getHits(response: Response): Map> { + val xcp = createParser(XContentType.JSON.xContent(), response.entity.content) + return xcp.map()["hits"]!! as Map> + } + + private fun getSettings(): Map { + return client().makeRequest( + "GET", + "_cluster/settings?flat_settings=true" + ).asMap() + } +} diff --git a/core/src/main/resources/mappings/metrics-visualizations.json b/core/src/main/resources/mappings/metrics-visualizations.json new file mode 100644 index 000000000..b9a5f1552 --- /dev/null +++ b/core/src/main/resources/mappings/metrics-visualizations.json @@ -0,0 +1,119 @@ +{ + "_meta": { + "schema_version": 1 + }, + "properties": { + "cluster_status": { + "properties": { + "timestamp": { + "type": "date" + }, + "value" : { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "cpu_usage": { + "properties": { + "timestamp": { + "type": "date" + }, + "value" : { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "jvm_pressure": { + "properties": { + "timestamp": { + "type": "date" + }, + "value" : { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "unassigned_shards": { + "properties": { + "timestamp": { + "type": "date" + }, + "value" : { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "number_of_pending_tasks": { + "properties": { + "timestamp": { + "type": "date" + }, + "value" : { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "active_shards": { + "properties": { + "timestamp": { + "type": "date" + }, + "value" : { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "relocating_shards": { + "properties": { + "timestamp": { + "type": "date" + }, + "value" : { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + } +} \ No newline at end of file