Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update sweep logic to re-schedule unchanged jobs when SWEEPER_ENABLED is toggled #243

Merged
merged 2 commits into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return getMonitor(monitorId = monitorId)
}

@Suppress("UNCHECKED_CAST")
protected fun updateMonitor(monitor: Monitor, refresh: Boolean = false): Monitor {
val response = client().makeRequest(
"PUT", "${monitor.relativeUrl()}?refresh=$refresh",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ class MonitorRestApiIT : AlertingRestTestCase() {
}
}

/* Enable this test case after issue issue#269 is fixed.
/* Enable this test case after checking for disallowed destination during Monitor creation is added in
fun `test creating a monitor with a disallowed destination type fails`() {
try {
// Create a Chime Destination
Expand All @@ -284,7 +284,8 @@ class MonitorRestApiIT : AlertingRestTestCase() {
chime = chime,
slack = null,
customWebhook = null,
email = null)
email = null
)
val chimeDestination = createDestination(destination = destination)

// Remove Chime from the allow_list
Expand All @@ -293,12 +294,13 @@ class MonitorRestApiIT : AlertingRestTestCase() {
.joinToString(prefix = "[", postfix = "]") { string -> "\"$string\"" }
client().updateSettings(DestinationSettings.ALLOW_LIST.key, allowedDestinations)

createMonitor(randomMonitor(triggers = listOf(randomTrigger(destinationId = chimeDestination.id))))
createMonitor(randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(destinationId = chimeDestination.id))))
fail("Expected 403 Method FORBIDDEN response")
} catch (e: ResponseException) {
assertEquals("Unexpected status", RestStatus.FORBIDDEN, e.response.restStatus())
}
}*/
}
*/

@Throws(Exception::class)
fun `test updating search for a monitor`() {
Expand Down Expand Up @@ -887,23 +889,62 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Scheduled job is not enabled", false, responseMap[ScheduledJobSettings.SWEEPER_ENABLED.key])
assertEquals("Scheduled job index exists but there are no scheduled jobs.", false, responseMap["scheduled_job_index_exists"])
val _nodes = responseMap["_nodes"] as Map<String, Int>
assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"])
assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"])
assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"])
validateAlertingStatsNodeResponse(_nodes)
}

fun `test monitor stats when disabling and re-enabling scheduled jobs with existing monitor`() {
// Enable Monitor jobs
enableScheduledJob()
val monitorId = createMonitor(randomQueryLevelMonitor(enabled = true), refresh = true).id

var alertingStats = getAlertingStats()
assertEquals("Scheduled job is not enabled", true, alertingStats[ScheduledJobSettings.SWEEPER_ENABLED.key])
assertEquals("Scheduled job index does not exist", true, alertingStats["scheduled_job_index_exists"])
qreshi marked this conversation as resolved.
Show resolved Hide resolved
assertEquals("Scheduled job index is not yellow", "yellow", alertingStats["scheduled_job_index_status"])
assertEquals("Nodes are not on schedule", numberOfNodes, alertingStats["nodes_on_schedule"])

val _nodes = alertingStats["_nodes"] as Map<String, Int>
validateAlertingStatsNodeResponse(_nodes)

assertTrue(
"Monitor [$monitorId] was not found scheduled based on the alerting stats response: $alertingStats",
isMonitorScheduled(monitorId, alertingStats)
)

// Disable Monitor jobs
disableScheduledJob()

alertingStats = getAlertingStats()
assertEquals("Scheduled job is still enabled", false, alertingStats[ScheduledJobSettings.SWEEPER_ENABLED.key])
qreshi marked this conversation as resolved.
Show resolved Hide resolved
assertFalse(
"Monitor [$monitorId] was still scheduled based on the alerting stats response: $alertingStats",
isMonitorScheduled(monitorId, alertingStats)
)

// Re-enable Monitor jobs
enableScheduledJob()

// Sleep briefly so sweep can reschedule the Monitor
Thread.sleep(2000)

alertingStats = getAlertingStats()
assertEquals("Scheduled job is not enabled", true, alertingStats[ScheduledJobSettings.SWEEPER_ENABLED.key])
qreshi marked this conversation as resolved.
Show resolved Hide resolved
assertTrue(
"Monitor [$monitorId] was not re-scheduled based on the alerting stats response: $alertingStats",
isMonitorScheduled(monitorId, alertingStats)
)
}

fun `test monitor stats no jobs`() {
// Disable the Monitor plugin.
// Enable the Monitor plugin.
enableScheduledJob()

val responseMap = getAlertingStats()
// assertEquals("Cluster name is incorrect", responseMap["cluster_name"], "alerting_integTestCluster")
assertEquals("Scheduled job is not enabled", true, responseMap[ScheduledJobSettings.SWEEPER_ENABLED.key])
assertEquals("Scheduled job index exists but there are no scheduled jobs.", false, responseMap["scheduled_job_index_exists"])
val _nodes = responseMap["_nodes"] as Map<String, Int>
assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"])
assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"])
assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"])
validateAlertingStatsNodeResponse(_nodes)
}

fun `test monitor stats jobs`() {
Expand All @@ -919,9 +960,7 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Nodes are not on schedule", numberOfNodes, responseMap["nodes_on_schedule"])

val _nodes = responseMap["_nodes"] as Map<String, Int>
assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"])
assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"])
assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"])
validateAlertingStatsNodeResponse(_nodes)
}

@Throws(Exception::class)
Expand Down Expand Up @@ -950,9 +989,7 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Nodes not on schedule", numberOfNodes, responseMap["nodes_on_schedule"])

val _nodes = responseMap["_nodes"] as Map<String, Int>
assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"])
assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"])
assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"])
validateAlertingStatsNodeResponse(_nodes)
}

fun `test monitor stats incorrect metric`() {
Expand Down Expand Up @@ -1045,4 +1082,23 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus())
}
}

private fun validateAlertingStatsNodeResponse(nodesResponse: Map<String, Int>) {
assertEquals("Incorrect number of nodes", numberOfNodes, nodesResponse["total"])
assertEquals("Failed nodes found during monitor stats call", 0, nodesResponse["failed"])
assertEquals("More than $numberOfNodes successful node", numberOfNodes, nodesResponse["successful"])
}

private fun isMonitorScheduled(monitorId: String, alertingStatsResponse: Map<String, Any>): Boolean {
val nodesInfo = alertingStatsResponse["nodes"] as Map<String, Any>
for (nodeId in nodesInfo.keys) {
val nodeInfo = nodesInfo[nodeId] as Map<String, Any>
val jobsInfo = nodeInfo["jobs_info"] as Map<String, Any>
if (jobsInfo.keys.contains(monitorId)) {
return true
}
}

return false
}
}
42 changes: 39 additions & 3 deletions core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,13 @@ class JobSweeper(
// cancel existing background thread if present
scheduledFullSweep?.cancel()

// Manually sweep all shards before scheduling the background sweep so it picks up any changes immediately
// since the first run of a task submitted with scheduleWithFixedDelay() happens after the interval has passed.
logger.debug("Performing sweep of scheduled jobs.")
fullSweepExecutor.submit {
sweepAllShards()
}

// Setup an anti-entropy/self-healing background sweep, in case a sweep that was triggered by an event fails.
val scheduledSweep = Runnable {
val elapsedTime = getFullSweepElapsedTime()
Expand Down Expand Up @@ -372,13 +379,19 @@ class JobSweeper(
sweptJobs.getOrPut(shardId) { ConcurrentHashMap() }
// Use [compute] to update atomically in case another thread concurrently indexes/deletes the same job
.compute(jobId) { _, currentVersion ->
val jobCurrentlyScheduled = scheduler.scheduledJobs().contains(jobId)

if (newVersion <= (currentVersion ?: Versions.NOT_FOUND)) {
logger.debug("Skipping job $jobId, $newVersion <= $currentVersion")
return@compute currentVersion
if (unchangedJobToBeRescheduled(newVersion, currentVersion, jobCurrentlyScheduled, job)) {
logger.debug("Not skipping job $jobId since it is an unchanged job slated to be rescheduled")
} else {
logger.debug("Skipping job $jobId, $newVersion <= $currentVersion")
return@compute currentVersion
}
}

// deschedule the currently scheduled version
if (scheduler.scheduledJobs().contains(jobId)) {
if (jobCurrentlyScheduled) {
scheduler.deschedule(jobId)
}

Expand All @@ -396,6 +409,29 @@ class JobSweeper(
}
}

/*
* During the job sweep, normally jobs where the currentVersion is equal to the newVersion are skipped since
* there was no change.
*
* However, there exists an edge-case where a job could have been de-scheduled by flipping [SWEEPER_ENABLED]
* to false and then not have undergone any changes when the sweeper is re-enabled. In this case, the job should
* not be skipped so it can be re-scheduled. This utility method checks for this condition so the sweep() method
* can account for it.
*/
private fun unchangedJobToBeRescheduled(
newVersion: JobVersion,
currentVersion: JobVersion?,
jobCurrentlyScheduled: Boolean,
job: ScheduledJob?
): Boolean {
// newVersion should not be [Versions.NOT_FOUND] here since it's passed in from existing search hits
// or successful doc delete operations
val versionWasUnchanged = newVersion == (currentVersion ?: Versions.NOT_FOUND)
val jobEnabled = job?.enabled ?: false

return versionWasUnchanged && !jobCurrentlyScheduled && jobEnabled
}

private fun parseAndSweepJob(
xcp: XContentParser,
shardId: ShardId,
Expand Down