Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk index findings and sequentially invoke auto-correlations #1355

Merged
merged 6 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.FINDING_HISTORY_MAX_DOCS,
AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE,
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:BULK

)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ package org.opensearch.alerting
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
Expand All @@ -22,6 +24,7 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
Expand Down Expand Up @@ -273,10 +276,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// If there are no triggers defined, we still want to generate findings
if (monitor.triggers.isEmpty()) {
if (dryrun == false && monitor.id != Monitor.NO_ID) {
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true)
}
createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true)
}
} else {
monitor.triggers.forEach {
Expand Down Expand Up @@ -365,7 +365,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
trigger: DocumentLevelTrigger,
monitor: Monitor,
idQueryMap: Map<String, DocLevelQuery>,
docsToQueries: Map<String, List<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?,
Expand All @@ -374,35 +374,34 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds)

val findings = mutableListOf<String>()
val findingDocPairs = mutableListOf<Pair<String, String>>()
val triggerFindingDocPairs = mutableListOf<Pair<String, String>>()

// TODO: Implement throttling for findings
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
val findingId = createFindings(
monitor,
monitorCtx,
triggeredQueries,
it.key,
!dryrun && monitor.id != Monitor.NO_ID,
executionId
)
findings.add(findingId)
val findingToDocPairs = createFindings(
monitor,
monitorCtx,
docsToQueries,
idQueryMap,
!dryrun && monitor.id != Monitor.NO_ID,
executionId
)

if (triggerResult.triggeredDocs.contains(it.key)) {
findingDocPairs.add(Pair(findingId, it.key))
findingToDocPairs.forEach {
// Only pick those entries whose docs have triggers associated with them
if (triggerResult.triggeredDocs.contains(it.second)) {
triggerFindingDocPairs.add(Pair(it.first, it.second))
}
}

val actionCtx = triggerCtx.copy(
triggeredDocs = triggerResult.triggeredDocs,
relatedFindings = findings,
// confirm if this is right or only trigger-able findings should be present in this list
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: plz add TODO/FIXME and maintain pr in draft if it's not ready to merge

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the previous behaviour? why are we not just refactoring? is there a behaviour change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is preserving the existing behavior. Left by mistake, removed the comment.

relatedFindings = findingToDocPairs.map { it.first },
error = monitorResult.error ?: triggerResult.error
)

val alerts = mutableListOf<Alert>()
findingDocPairs.forEach {
triggerFindingDocPairs.forEach {
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
listOf(it.first),
listOf(it.second),
Expand Down Expand Up @@ -461,51 +460,102 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return triggerResult
}

/**
* 1. Bulk index all findings based on shouldCreateFinding flag
* 2. invoke publishFinding() to kickstart auto-correlations
* 3. Returns a list of pairs for finding id to doc id
*/
private suspend fun createFindings(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docLevelQueries: List<DocLevelQuery>,
matchingDocId: String,
docsToQueries: MutableMap<String, MutableList<String>>,
idQueryMap: Map<String, DocLevelQuery>,
shouldCreateFinding: Boolean,
workflowExecutionId: String? = null,
): String {
// Before the "|" is the doc id and after the "|" is the index
val docIndex = matchingDocId.split("|")
): List<Pair<String, String>> {

val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocIds = listOf(docIndex[0]),
correlatedDocIds = listOf(docIndex[0]),
monitorId = monitor.id,
monitorName = monitor.name,
index = docIndex[1],
docLevelQueries = docLevelQueries,
timestamp = Instant.now(),
executionId = workflowExecutionId
)
val findingDocPairs = mutableListOf<Pair<String, String>>()
val findings = mutableListOf<Finding>()
val indexRequests = mutableListOf<IndexRequest>()
monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this - monitorCtx.findingsIndexBatchSize??

Why are we mutating value of monitorCtx object?? that object should only be read from

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent is to update this value in alerting, when it is updated via _cluster/settings.

monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(FINDINGS_INDEXING_BATCH_SIZE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we aren't supposed to add settings consumer in the middle of monitor execution. plz do at node startup

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it to registerConsumers() in MonitorRunnerService.

monitorCtx.findingsIndexBatchSize = it
}

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
logger.debug("Findings: $findingStr")
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }

if (shouldCreateFinding) {
val indexRequest = IndexRequest(monitor.dataSources.findingsIndex)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
// Before the "|" is the doc id and after the "|" is the index
val docIndex = it.key.split("|")

monitorCtx.client!!.suspendUntil<Client, IndexResponse> {
monitorCtx.client!!.index(indexRequest, it)
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocIds = listOf(docIndex[0]),
correlatedDocIds = listOf(docIndex[0]),
monitorId = monitor.id,
monitorName = monitor.name,
index = docIndex[1],
docLevelQueries = triggeredQueries,
timestamp = Instant.now(),
executionId = workflowExecutionId
)
findingDocPairs.add(Pair(finding.id, it.key))
findings.add(finding)

val findingStr =
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)
.string()
logger.debug("Findings: $findingStr")

if (indexRequests.size > monitorCtx.findingsIndexBatchSize) {
// make bulk indexing call here and flush the indexRequest object
bulkIndexFindings(monitor, monitorCtx, indexRequests)
indexRequests.clear()
} else {
if (shouldCreateFinding) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
.opType(DocWriteRequest.OpType.INDEX)
}
}
}

if (indexRequests.size <= monitorCtx.findingsIndexBatchSize) {
bulkIndexFindings(monitor, monitorCtx, indexRequests)
}

try {
publishFinding(monitor, monitorCtx, finding)
findings.forEach { finding ->
publishFinding(monitor, monitorCtx, finding)
}
} catch (e: Exception) {
// suppress exception
logger.error("Optional finding callback failed", e)
}
return finding.id
return findingDocPairs
}

private suspend fun bulkIndexFindings(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
indexRequests: List<IndexRequest>
) {
if (indexRequests.isNotEmpty()) {
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil {
goyamegh marked this conversation as resolved.
Show resolved Hide resolved
bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont refresh for every batch. refresh findings index only once after all batches are bulk-ingested

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
if (bulkResponse.hasFailures()) {
bulkResponse.items.forEach { item ->
if (item.isFailed) {
goyamegh marked this conversation as resolved.
Show resolved Hide resolved
logger.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
}
}
} else {
logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.")
}
}
}

private fun publishFinding(
Expand Down Expand Up @@ -629,7 +679,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields))
}
} catch (e: Exception) {
logger.warn("Failed to run for shard $shard. Error: ${e.message}")
logger.error("Failed to run for shard $shard. Error: ${e.message}")
}
}
return matchingDocs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ data class MonitorRunnerExecutionContext(
@Volatile var destinationContextFactory: DestinationContextFactory? = null,

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null
@Volatile var indexTimeout: TimeValue? = null,
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class AlertingSettings {

companion object {
const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 10000
goyamegh marked this conversation as resolved.
Show resolved Hide resolved

val ALERTING_MAX_MONITORS = Setting.intSetting(
"plugins.alerting.monitor.max_monitors",
Expand Down Expand Up @@ -153,5 +154,12 @@ class AlertingSettings {
-1L,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting(
"plugins.alerting.alert_findings_indexing_batch_size",
DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min cant be 0

Setting.Property.NodeScope, Setting.Property.Dynamic
)
}
}
Loading