Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set the cancelAfterTimeInterval parameter on SearchRequest object in … #1366

Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
Expand Down Expand Up @@ -446,6 +448,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues))
sr.source().query(queryBuilder)
}
sr.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
)
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, executionId)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.parseSampleDocTags
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
Expand All @@ -41,6 +42,7 @@ import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.AlertingPluginInterface
Expand Down Expand Up @@ -116,7 +118,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
logger.error("Error setting up alerts and findings indices for monitor: $id", e)
monitorResult = monitorResult.copy(error = AlertingException.wrap(e))
}

try {
validate(monitor)
} catch (e: Exception) {
Expand Down Expand Up @@ -881,7 +882,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
.size(monitorCtx.docLevelMonitorShardFetchSize)
)
.preference(Preference.PRIMARY_FIRST.type())

request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
)
if (monitorCtx.fetchOnlyQueryFieldNames && fieldsToFetch.isNotEmpty()) {
request.source().fetchSource(false)
for (field in fieldsToFetch) {
Expand Down Expand Up @@ -936,7 +939,12 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
"$monitorInputIndices against query index $queryIndices"
)
var response: SearchResponse

try {
searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
)

response = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ data class MonitorRunnerExecutionContext(

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null,
@Volatile var cancelAfterTimeInterval: TimeValue? = null,
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
@Volatile var fetchOnlyQueryFieldNames: Boolean = true,
@Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
Expand Down Expand Up @@ -153,6 +154,9 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
ALERT_BACKOFF_MILLIS.get(monitorCtx.settings),
ALERT_BACKOFF_COUNT.get(monitorCtx.settings)
)

monitorCtx.cancelAfterTimeInterval = SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING.get(monitorCtx.settings)

monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS, ALERT_BACKOFF_COUNT) { millis, count ->
monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(millis, count)
}
Expand All @@ -169,6 +173,9 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
monitorCtx.moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count)
}

monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING) {
monitorCtx.cancelAfterTimeInterval = it
}
monitorCtx.allowList = ALLOW_LIST.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALLOW_LIST) {
monitorCtx.allowList = it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ class TransportGetFindingsSearchAction @Inject constructor(
)
}
searchSourceBuilder.query(queryBuilder).trackTotalHits(true)

client.threadPool().threadContext.stashContext().use {
scope.launch {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.alerting.util

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.AlertService
import org.opensearch.alerting.MonitorRunnerService
import org.opensearch.alerting.model.AlertContext
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.destination.Destination
Expand All @@ -25,6 +27,7 @@ import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
import org.opensearch.commons.alerting.model.action.ActionExecutionScope
import org.opensearch.commons.alerting.util.isBucketLevelMonitor
import org.opensearch.script.Script
import kotlin.math.max

private val logger = LogManager.getLogger("AlertingUtils")

Expand Down Expand Up @@ -172,6 +175,16 @@ inline fun <T : ThreadContext.StoredContext, R> T.use(block: (T) -> R): R {
}
}

fun getCancelAfterTimeInterval(): Long {
// The default value for the cancelAfterTimeInterval is -1 and so, in this case
// we should ignore processing on the value
val givenInterval = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval!!.minutes
if (givenInterval == -1L) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does -1 get interpreted as no cancelAfterTimeout? I'm wondering if we should have a sensible default instead of defaulting to no cancelAfter. Could you elaborate on what conditions would necessitate a user changing this setting from -1 to a specific value?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes -1 is a default for cancel_after_time_interval. Suppose search.cancel_after_time_interval is set at cluster with some higher value and the plugin overrides it to a lower value. Thus, max(cluster setting search.cancel_after_time_interval, plugin's default minimum value) should be used while overriding before calling _search from plugin.

return givenInterval
}
return max(givenInterval, AlertService.ALERTS_SEARCH_TIMEOUT.minutes)
}

/**
* Closes this [AutoCloseable], suppressing possible exception or error thrown by [AutoCloseable.close] function when
* it's being closed due to some other [cause] exception occurred.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class AlertServiceTests : OpenSearchTestCase() {
xContentRegistry = Mockito.mock(NamedXContentRegistry::class.java)
threadPool = Mockito.mock(ThreadPool::class.java)
clusterService = Mockito.mock(ClusterService::class.java)

settings = Settings.builder().build()
val settingSet = hashSetOf<Setting<*>>()
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
Expand Down
Loading