diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 000000000..6ea02bfba --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,2 @@ +# This should match the owning team set up in https://github.com/orgs/opensearch-project/teams +* @opensearch-project/alerting-plugin \ No newline at end of file diff --git a/.github/workflows/multi-node-test-workflow.yml b/.github/workflows/multi-node-test-workflow.yml index 172800a2d..8bc9d598b 100644 --- a/.github/workflows/multi-node-test-workflow.yml +++ b/.github/workflows/multi-node-test-workflow.yml @@ -3,10 +3,10 @@ name: Multi node test workflow on: pull_request: branches: - - main + - "*" push: branches: - - main + - "*" jobs: build: @@ -32,7 +32,7 @@ jobs: with: java-version: 14 - name: Run integration tests with multi node config - run: ./gradlew integTest -PnumNodes=3 -Dopensearch.version=1.2.0-SNAPSHOT + run: ./gradlew integTest -PnumNodes=3 -Dopensearch.version=1.3.0-SNAPSHOT - name: Pull and Run Docker run: | plugin=`ls alerting/build/distributions/*.zip` diff --git a/.github/workflows/release-workflow.yml b/.github/workflows/release-workflow.yml deleted file mode 100644 index f29d78b6c..000000000 --- a/.github/workflows/release-workflow.yml +++ /dev/null @@ -1,122 +0,0 @@ -name: Release workflow -# This workflow is triggered on creating tags to main or a opendistro release branch -on: - push: - tags: - - 'v*' - -jobs: - build: - strategy: - matrix: - java: [14] - # Job name - name: Build Alerting with JDK ${{ matrix.java }} - # This job runs on Linux - runs-on: ubuntu-latest - steps: - # This step uses the checkout Github action: https://github.com/actions/checkout - - name: Checkout Branch - uses: actions/checkout@v2 - # This step uses the setup-java Github action: https://github.com/actions/setup-java - - name: Set Up JDK ${{ matrix.java }} - uses: actions/setup-java@v1 - with: - java-version: ${{ matrix.java }} - - # Building zip, deb and rpm files - - name: Build with Gradle - run: ./gradlew build buildDeb buildRpm --no-daemon -Dbuild.snapshot=false - - # Creating artifact path as well as individual folders for rpm, zip and deb assets - - name: Create Artifact Path - run: | - mkdir -p alerting-artifacts - cp alerting/build/distributions/*.zip alerting-artifacts - cp alerting/build/distributions/*.zip alerting-artifacts_zip - cp alerting/build/distributions/*.deb alerting-artifacts - cp alerting/build/distributions/*.deb alerting-artifacts_deb - cp alerting/build/distributions/*.rpm alerting-artifacts - cp alerting/build/distributions/*.rpm alerting-artifacts_rpm - echo "TAG_VERSION=${GITHUB_REF/refs\/tags\//}" >> $GITHUB_ENV - - # AWS authentication - - name: Configure AWS Credentials - uses: aws-actions/configure-aws-credentials@v1 - with: - aws-access-key-id: ${{ secrets.AWS_STAGING_ACCESS_KEY_ID }} - aws-secret-access-key: ${{ secrets.AWS_STAGING_SECRET_ACCESS_KEY }} - aws-region: us-west-2 - - # This step uses the upload-artifact Github action: https://github.com/actions/upload-artifact - - name: Upload Artifacts to S3 - run: | - zip=`ls alerting-artifacts/*.zip` - rpm=`ls alerting-artifacts/*.rpm` - deb=`ls alerting-artifacts/*.deb` - - # Inject the build number before the suffix - zip_outfile=`basename ${zip%.zip}-build-${GITHUB_RUN_NUMBER}.zip` - rpm_outfile=`basename ${rpm%.rpm}-build-${GITHUB_RUN_NUMBER}.rpm` - deb_outfile=`basename ${deb%.deb}-build-${GITHUB_RUN_NUMBER}.deb` - - s3_prefix="s3://staging.artifacts.opendistroforelasticsearch.amazon.com/snapshots/elasticsearch-plugins/alerting/" - - echo "Copying ${zip} to ${s3_prefix}${zip_outfile}" - aws s3 cp --quiet $zip ${s3_prefix}${zip_outfile} - - echo "Copying ${rpm} to ${s3_prefix}${rpm_outfile}" - aws s3 cp --quiet $rpm ${s3_prefix}${rpm_outfile} - - echo "Copying ${deb} to ${s3_prefix}${deb_outfile}" - aws s3 cp --quiet $deb ${s3_prefix}${deb_outfile} - - - name: Create Github Draft Release - id: create_release - uses: actions/create-release@v1.0.0 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - with: - tag_name: ${{ github.ref }} - release_name: Release ${{ env.TAG_VERSION }} - draft: true - prerelease: false - - # Upload the release with .zip as asset - - name: Upload Release Asset - uses: actions/upload-release-asset@v1.0.1 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - with: - upload_url: ${{ steps.create_release.outputs.upload_url }} - asset_name: alerting.zip - asset_path: alerting-artifacts_zip - asset_content_type: application/zip - - # Upload the release with .rpm as asset - - name: Upload Release Asset - uses: actions/upload-release-asset@v1.0.1 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - with: - upload_url: ${{ steps.create_release.outputs.upload_url }} - asset_name: alerting.rpm - asset_path: alerting-artifacts_rpm - asset_content_type: application/zip - - # Upload the release with .deb as asset - - name: Upload Release Asset - uses: actions/upload-release-asset@v1.0.1 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - with: - upload_url: ${{ steps.create_release.outputs.upload_url }} - asset_name: alerting.deb - asset_path: alerting-artifacts_deb - asset_content_type: application/zip - - - name: Upload Workflow Artifacts - uses: actions/upload-artifact@v1 - with: - name: alerting-plugin - path: alerting-artifacts diff --git a/.github/workflows/test-workflow.yml b/.github/workflows/test-workflow.yml index 5dba154c3..7c6a82966 100644 --- a/.github/workflows/test-workflow.yml +++ b/.github/workflows/test-workflow.yml @@ -3,12 +3,10 @@ name: Test Workflow on: pull_request: branches: - - main - - opendistro-* + - "*" push: branches: - - main - - opendistro-* + - "*" jobs: build: @@ -31,7 +29,7 @@ jobs: - name: Build and run with Gradle - run: ./gradlew build -Dopensearch.version=1.2.0-SNAPSHOT + run: ./gradlew build -Dopensearch.version=1.3.0-SNAPSHOT # - name: Create Artifact Path # run: | @@ -50,4 +48,4 @@ jobs: # path: alerting-artifacts # Publish to local maven - name: Publish to Maven Local - run: ./gradlew publishToMavenLocal -Dopensearch.version=1.2.0-SNAPSHOT + run: ./gradlew publishToMavenLocal -Dopensearch.version=1.3.0-SNAPSHOT diff --git a/README.md b/README.md index 91e5fa1c6..5616272a6 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -[![Test Workflow](https://github.com/opensearch-project/alerting/workflows/Test%20Workflow/badge.svg)](https://github.com/opendistro-for-elasticsearch/alerting/actions) -[![codecov](https://codecov.io/gh/opendistro-for-elasticsearch/alerting/branch/main/graph/badge.svg)](https://codecov.io/gh/opendistro-for-elasticsearch/alerting) -[![Documentation](https://img.shields.io/badge/api-reference-blue.svg)](https://opendistro.github.io/for-elasticsearch-docs/docs/alerting/api/) +[![Test Workflow](https://github.com/opensearch-project/alerting/workflows/Test%20Workflow/badge.svg)](https://github.com/opensearch-project/alerting/actions) +[![codecov](https://codecov.io/gh/opensearch-project/alerting/branch/main/graph/badge.svg)](https://codecov.io/gh/opensearch-project/alerting) +[![Documentation](https://img.shields.io/badge/api-reference-blue.svg)](https://opensearch.org/docs/latest/monitoring-plugins/alerting/api/) [![Chat](https://img.shields.io/badge/chat-on%20forums-blue)](https://discuss.opendistrocommunity.dev/c/alerting/) ![PRs welcome!](https://img.shields.io/badge/PRs-welcome!-success) diff --git a/alerting/build.gradle b/alerting/build.gradle index 46ea3e477..2799c9167 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -161,7 +161,7 @@ String bwcFilePath = "src/test/resources/bwc" testClusters { "${baseName}$i" { testDistribution = "ARCHIVE" - versions = ["7.10.2","1.2.0-SNAPSHOT"] + versions = ["7.10.2","1.3.0-SNAPSHOT"] numberOfNodes = 3 plugin(provider(new Callable(){ @Override diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 63a303964..1e719a022 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -400,6 +400,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return getMonitor(monitorId = monitorId) } + @Suppress("UNCHECKED_CAST") protected fun updateMonitor(monitor: Monitor, refresh: Boolean = false): Monitor { val response = client().makeRequest( "PUT", "${monitor.relativeUrl()}?refresh=$refresh", diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt index d192b4095..856cc47bf 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt @@ -250,7 +250,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { } } - /* Enable this test case after issue issue#269 is fixed. + /* Enable this test case after checking for disallowed destination during Monitor creation is added in fun `test creating a monitor with a disallowed destination type fails`() { try { // Create a Chime Destination @@ -263,7 +263,8 @@ class MonitorRestApiIT : AlertingRestTestCase() { chime = chime, slack = null, customWebhook = null, - email = null) + email = null + ) val chimeDestination = createDestination(destination = destination) // Remove Chime from the allow_list @@ -272,12 +273,13 @@ class MonitorRestApiIT : AlertingRestTestCase() { .joinToString(prefix = "[", postfix = "]") { string -> "\"$string\"" } client().updateSettings(DestinationSettings.ALLOW_LIST.key, allowedDestinations) - createMonitor(randomMonitor(triggers = listOf(randomTrigger(destinationId = chimeDestination.id)))) + createMonitor(randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(destinationId = chimeDestination.id)))) fail("Expected 403 Method FORBIDDEN response") } catch (e: ResponseException) { assertEquals("Unexpected status", RestStatus.FORBIDDEN, e.response.restStatus()) } - }*/ + } + */ @Throws(Exception::class) fun `test updating search for a monitor`() { @@ -866,13 +868,54 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Scheduled job is not enabled", false, responseMap[ScheduledJobSettings.SWEEPER_ENABLED.key]) assertEquals("Scheduled job index exists but there are no scheduled jobs.", false, responseMap["scheduled_job_index_exists"]) val _nodes = responseMap["_nodes"] as Map - assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"]) - assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"]) - assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"]) + validateAlertingStatsNodeResponse(_nodes) + } + + fun `test monitor stats when disabling and re-enabling scheduled jobs with existing monitor`() { + // Enable Monitor jobs + enableScheduledJob() + val monitorId = createMonitor(randomQueryLevelMonitor(enabled = true), refresh = true).id + + var alertingStats = getAlertingStats() + assertEquals("Scheduled job is not enabled", true, alertingStats[ScheduledJobSettings.SWEEPER_ENABLED.key]) + assertEquals("Scheduled job index does not exist", true, alertingStats["scheduled_job_index_exists"]) + assertEquals("Scheduled job index is not yellow", "yellow", alertingStats["scheduled_job_index_status"]) + assertEquals("Nodes are not on schedule", numberOfNodes, alertingStats["nodes_on_schedule"]) + + val _nodes = alertingStats["_nodes"] as Map + validateAlertingStatsNodeResponse(_nodes) + + assertTrue( + "Monitor [$monitorId] was not found scheduled based on the alerting stats response: $alertingStats", + isMonitorScheduled(monitorId, alertingStats) + ) + + // Disable Monitor jobs + disableScheduledJob() + + alertingStats = getAlertingStats() + assertEquals("Scheduled job is still enabled", false, alertingStats[ScheduledJobSettings.SWEEPER_ENABLED.key]) + assertFalse( + "Monitor [$monitorId] was still scheduled based on the alerting stats response: $alertingStats", + isMonitorScheduled(monitorId, alertingStats) + ) + + // Re-enable Monitor jobs + enableScheduledJob() + + // Sleep briefly so sweep can reschedule the Monitor + Thread.sleep(2000) + + alertingStats = getAlertingStats() + assertEquals("Scheduled job is not enabled", true, alertingStats[ScheduledJobSettings.SWEEPER_ENABLED.key]) + assertTrue( + "Monitor [$monitorId] was not re-scheduled based on the alerting stats response: $alertingStats", + isMonitorScheduled(monitorId, alertingStats) + ) } fun `test monitor stats no jobs`() { - // Disable the Monitor plugin. + // Enable the Monitor plugin. enableScheduledJob() val responseMap = getAlertingStats() @@ -880,9 +923,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Scheduled job is not enabled", true, responseMap[ScheduledJobSettings.SWEEPER_ENABLED.key]) assertEquals("Scheduled job index exists but there are no scheduled jobs.", false, responseMap["scheduled_job_index_exists"]) val _nodes = responseMap["_nodes"] as Map - assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"]) - assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"]) - assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"]) + validateAlertingStatsNodeResponse(_nodes) } fun `test monitor stats jobs`() { @@ -898,9 +939,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Nodes are not on schedule", numberOfNodes, responseMap["nodes_on_schedule"]) val _nodes = responseMap["_nodes"] as Map - assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"]) - assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"]) - assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"]) + validateAlertingStatsNodeResponse(_nodes) } @Throws(Exception::class) @@ -929,9 +968,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Nodes not on schedule", numberOfNodes, responseMap["nodes_on_schedule"]) val _nodes = responseMap["_nodes"] as Map - assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"]) - assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"]) - assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"]) + validateAlertingStatsNodeResponse(_nodes) } fun `test monitor stats incorrect metric`() { @@ -1024,4 +1061,23 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) } } + + private fun validateAlertingStatsNodeResponse(nodesResponse: Map) { + assertEquals("Incorrect number of nodes", numberOfNodes, nodesResponse["total"]) + assertEquals("Failed nodes found during monitor stats call", 0, nodesResponse["failed"]) + assertEquals("More than $numberOfNodes successful node", numberOfNodes, nodesResponse["successful"]) + } + + private fun isMonitorScheduled(monitorId: String, alertingStatsResponse: Map): Boolean { + val nodesInfo = alertingStatsResponse["nodes"] as Map + for (nodeId in nodesInfo.keys) { + val nodeInfo = nodesInfo[nodeId] as Map + val jobsInfo = nodeInfo["jobs_info"] as Map + if (jobsInfo.keys.contains(monitorId)) { + return true + } + } + + return false + } } diff --git a/build.gradle b/build.gradle index 39b2054d3..348e06055 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ buildscript { apply from: 'build-tools/repositories.gradle' ext { - opensearch_version = System.getProperty("opensearch.version", "1.2.0-SNAPSHOT") + opensearch_version = System.getProperty("opensearch.version", "1.3.0-SNAPSHOT") // 1.0.0 -> 1.0.0.0, and 1.0.0-SNAPSHOT -> 1.0.0.0-SNAPSHOT opensearch_build = opensearch_version.replaceAll(/(\.\d)([^\d]*)$/, '$1.0$2') common_utils_version = System.getProperty("common_utils.version", opensearch_build) diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt b/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt index a43bd6d42..daae181e8 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt @@ -232,6 +232,13 @@ class JobSweeper( // cancel existing background thread if present scheduledFullSweep?.cancel() + // Manually sweep all shards before scheduling the background sweep so it picks up any changes immediately + // since the first run of a task submitted with scheduleWithFixedDelay() happens after the interval has passed. + logger.debug("Performing sweep of scheduled jobs.") + fullSweepExecutor.submit { + sweepAllShards() + } + // Setup an anti-entropy/self-healing background sweep, in case a sweep that was triggered by an event fails. val scheduledSweep = Runnable { val elapsedTime = getFullSweepElapsedTime() @@ -351,13 +358,19 @@ class JobSweeper( sweptJobs.getOrPut(shardId) { ConcurrentHashMap() } // Use [compute] to update atomically in case another thread concurrently indexes/deletes the same job .compute(jobId) { _, currentVersion -> + val jobCurrentlyScheduled = scheduler.scheduledJobs().contains(jobId) + if (newVersion <= (currentVersion ?: Versions.NOT_FOUND)) { - logger.debug("Skipping job $jobId, $newVersion <= $currentVersion") - return@compute currentVersion + if (unchangedJobToBeRescheduled(newVersion, currentVersion, jobCurrentlyScheduled, job)) { + logger.debug("Not skipping job $jobId since it is an unchanged job slated to be rescheduled") + } else { + logger.debug("Skipping job $jobId, $newVersion <= $currentVersion") + return@compute currentVersion + } } // deschedule the currently scheduled version - if (scheduler.scheduledJobs().contains(jobId)) { + if (jobCurrentlyScheduled) { scheduler.deschedule(jobId) } @@ -375,6 +388,29 @@ class JobSweeper( } } + /* + * During the job sweep, normally jobs where the currentVersion is equal to the newVersion are skipped since + * there was no change. + * + * However, there exists an edge-case where a job could have been de-scheduled by flipping [SWEEPER_ENABLED] + * to false and then not have undergone any changes when the sweeper is re-enabled. In this case, the job should + * not be skipped so it can be re-scheduled. This utility method checks for this condition so the sweep() method + * can account for it. + */ + private fun unchangedJobToBeRescheduled( + newVersion: JobVersion, + currentVersion: JobVersion?, + jobCurrentlyScheduled: Boolean, + job: ScheduledJob? + ): Boolean { + // newVersion should not be [Versions.NOT_FOUND] here since it's passed in from existing search hits + // or successful doc delete operations + val versionWasUnchanged = newVersion == (currentVersion ?: Versions.NOT_FOUND) + val jobEnabled = job?.enabled ?: false + + return versionWasUnchanged && !jobCurrentlyScheduled && jobEnabled + } + private fun parseAndSweepJob( xcp: XContentParser, shardId: ShardId, diff --git a/scripts/build.sh b/scripts/build.sh index 46beffd90..8a0c6f996 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -12,11 +12,12 @@ function usage() { echo -e "-v VERSION\t[Required] OpenSearch version." echo -e "-s SNAPSHOT\t[Optional] Build a snapshot, default is 'false'." echo -e "-a ARCHITECTURE\t[Optional] Build architecture, ignored." + echo -e "-p PLATFORM\t[Optional] Platform, ignored." echo -e "-o OUTPUT\t[Optional] Output path, default is 'artifacts'." echo -e "-h help" } -while getopts ":h:v:s:o:a:" arg; do +while getopts ":h:v:s:o:p:a:" arg; do case $arg in h) usage @@ -31,6 +32,9 @@ while getopts ":h:v:s:o:a:" arg; do o) OUTPUT=$OPTARG ;; + p) + PLATFORM=$OPTARG + ;; a) ARCHITECTURE=$OPTARG ;;