-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bulk index findings and sequentially invoke auto-correlations #1355
Changes from 3 commits
e7ba6d7
28530dd
03b86ae
ae32748
1833d6d
3bd7888
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,8 +8,10 @@ package org.opensearch.alerting | |
import org.apache.logging.log4j.LogManager | ||
import org.opensearch.ExceptionsHelper | ||
import org.opensearch.OpenSearchStatusException | ||
import org.opensearch.action.DocWriteRequest | ||
import org.opensearch.action.bulk.BulkRequest | ||
import org.opensearch.action.bulk.BulkResponse | ||
import org.opensearch.action.index.IndexRequest | ||
import org.opensearch.action.index.IndexResponse | ||
import org.opensearch.action.search.SearchAction | ||
import org.opensearch.action.search.SearchRequest | ||
import org.opensearch.action.search.SearchResponse | ||
|
@@ -22,6 +24,7 @@ import org.opensearch.alerting.model.MonitorRunResult | |
import org.opensearch.alerting.model.userErrorMessage | ||
import org.opensearch.alerting.opensearchapi.suspendUntil | ||
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext | ||
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE | ||
import org.opensearch.alerting.util.AlertingException | ||
import org.opensearch.alerting.util.IndexUtils | ||
import org.opensearch.alerting.util.defaultToPerExecutionAction | ||
|
@@ -273,10 +276,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { | |
// If there are no triggers defined, we still want to generate findings | ||
if (monitor.triggers.isEmpty()) { | ||
if (dryrun == false && monitor.id != Monitor.NO_ID) { | ||
docsToQueries.forEach { | ||
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } | ||
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true) | ||
} | ||
createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) | ||
} | ||
} else { | ||
monitor.triggers.forEach { | ||
|
@@ -365,7 +365,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { | |
trigger: DocumentLevelTrigger, | ||
monitor: Monitor, | ||
idQueryMap: Map<String, DocLevelQuery>, | ||
docsToQueries: Map<String, List<String>>, | ||
docsToQueries: MutableMap<String, MutableList<String>>, | ||
queryToDocIds: Map<DocLevelQuery, Set<String>>, | ||
dryrun: Boolean, | ||
workflowRunContext: WorkflowRunContext?, | ||
|
@@ -374,35 +374,34 @@ object DocumentLevelMonitorRunner : MonitorRunner() { | |
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) | ||
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) | ||
|
||
val findings = mutableListOf<String>() | ||
val findingDocPairs = mutableListOf<Pair<String, String>>() | ||
val triggerFindingDocPairs = mutableListOf<Pair<String, String>>() | ||
|
||
// TODO: Implement throttling for findings | ||
docsToQueries.forEach { | ||
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } | ||
val findingId = createFindings( | ||
monitor, | ||
monitorCtx, | ||
triggeredQueries, | ||
it.key, | ||
!dryrun && monitor.id != Monitor.NO_ID, | ||
executionId | ||
) | ||
findings.add(findingId) | ||
val findingToDocPairs = createFindings( | ||
monitor, | ||
monitorCtx, | ||
docsToQueries, | ||
idQueryMap, | ||
!dryrun && monitor.id != Monitor.NO_ID, | ||
executionId | ||
) | ||
|
||
if (triggerResult.triggeredDocs.contains(it.key)) { | ||
findingDocPairs.add(Pair(findingId, it.key)) | ||
findingToDocPairs.forEach { | ||
// Only pick those entries whose docs have triggers associated with them | ||
if (triggerResult.triggeredDocs.contains(it.second)) { | ||
triggerFindingDocPairs.add(Pair(it.first, it.second)) | ||
} | ||
} | ||
|
||
val actionCtx = triggerCtx.copy( | ||
triggeredDocs = triggerResult.triggeredDocs, | ||
relatedFindings = findings, | ||
// confirm if this is right or only trigger-able findings should be present in this list | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: plz add TODO/FIXME and maintain pr in draft if it's not ready to merge There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is the previous behaviour? why are we not just refactoring? is there a behaviour change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change is preserving the existing behavior. Left by mistake, removed the comment. |
||
relatedFindings = findingToDocPairs.map { it.first }, | ||
error = monitorResult.error ?: triggerResult.error | ||
) | ||
|
||
val alerts = mutableListOf<Alert>() | ||
findingDocPairs.forEach { | ||
triggerFindingDocPairs.forEach { | ||
val alert = monitorCtx.alertService!!.composeDocLevelAlert( | ||
listOf(it.first), | ||
listOf(it.second), | ||
|
@@ -461,51 +460,102 @@ object DocumentLevelMonitorRunner : MonitorRunner() { | |
return triggerResult | ||
} | ||
|
||
/** | ||
* 1. Bulk index all findings based on shouldCreateFinding flag | ||
* 2. invoke publishFinding() to kickstart auto-correlations | ||
* 3. Returns a list of pairs for finding id to doc id | ||
*/ | ||
private suspend fun createFindings( | ||
monitor: Monitor, | ||
monitorCtx: MonitorRunnerExecutionContext, | ||
docLevelQueries: List<DocLevelQuery>, | ||
matchingDocId: String, | ||
docsToQueries: MutableMap<String, MutableList<String>>, | ||
idQueryMap: Map<String, DocLevelQuery>, | ||
shouldCreateFinding: Boolean, | ||
workflowExecutionId: String? = null, | ||
): String { | ||
// Before the "|" is the doc id and after the "|" is the index | ||
val docIndex = matchingDocId.split("|") | ||
): List<Pair<String, String>> { | ||
|
||
val finding = Finding( | ||
id = UUID.randomUUID().toString(), | ||
relatedDocIds = listOf(docIndex[0]), | ||
correlatedDocIds = listOf(docIndex[0]), | ||
monitorId = monitor.id, | ||
monitorName = monitor.name, | ||
index = docIndex[1], | ||
docLevelQueries = docLevelQueries, | ||
timestamp = Instant.now(), | ||
executionId = workflowExecutionId | ||
) | ||
val findingDocPairs = mutableListOf<Pair<String, String>>() | ||
val findings = mutableListOf<Finding>() | ||
val indexRequests = mutableListOf<IndexRequest>() | ||
monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is this - Why are we mutating value of monitorCtx object?? that object should only be read from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The intent is to update this value in alerting, when it is updated via _cluster/settings. |
||
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(FINDINGS_INDEXING_BATCH_SIZE) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we aren't supposed to add settings consumer in the middle of monitor execution. plz do at node startup There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved it to registerConsumers() in MonitorRunnerService. |
||
monitorCtx.findingsIndexBatchSize = it | ||
} | ||
|
||
val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() | ||
logger.debug("Findings: $findingStr") | ||
docsToQueries.forEach { | ||
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } | ||
|
||
if (shouldCreateFinding) { | ||
val indexRequest = IndexRequest(monitor.dataSources.findingsIndex) | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.source(findingStr, XContentType.JSON) | ||
.id(finding.id) | ||
.routing(finding.id) | ||
// Before the "|" is the doc id and after the "|" is the index | ||
val docIndex = it.key.split("|") | ||
|
||
monitorCtx.client!!.suspendUntil<Client, IndexResponse> { | ||
monitorCtx.client!!.index(indexRequest, it) | ||
val finding = Finding( | ||
id = UUID.randomUUID().toString(), | ||
relatedDocIds = listOf(docIndex[0]), | ||
correlatedDocIds = listOf(docIndex[0]), | ||
monitorId = monitor.id, | ||
monitorName = monitor.name, | ||
index = docIndex[1], | ||
docLevelQueries = triggeredQueries, | ||
timestamp = Instant.now(), | ||
executionId = workflowExecutionId | ||
) | ||
findingDocPairs.add(Pair(finding.id, it.key)) | ||
findings.add(finding) | ||
|
||
val findingStr = | ||
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) | ||
.string() | ||
logger.debug("Findings: $findingStr") | ||
|
||
if (indexRequests.size > monitorCtx.findingsIndexBatchSize) { | ||
// make bulk indexing call here and flush the indexRequest object | ||
bulkIndexFindings(monitor, monitorCtx, indexRequests) | ||
indexRequests.clear() | ||
} else { | ||
if (shouldCreateFinding) { | ||
indexRequests += IndexRequest(monitor.dataSources.findingsIndex) | ||
.source(findingStr, XContentType.JSON) | ||
.id(finding.id) | ||
.routing(finding.id) | ||
.opType(DocWriteRequest.OpType.INDEX) | ||
} | ||
} | ||
} | ||
|
||
if (indexRequests.size <= monitorCtx.findingsIndexBatchSize) { | ||
bulkIndexFindings(monitor, monitorCtx, indexRequests) | ||
} | ||
|
||
try { | ||
publishFinding(monitor, monitorCtx, finding) | ||
findings.forEach { finding -> | ||
publishFinding(monitor, monitorCtx, finding) | ||
} | ||
} catch (e: Exception) { | ||
// suppress exception | ||
logger.error("Optional finding callback failed", e) | ||
} | ||
return finding.id | ||
return findingDocPairs | ||
} | ||
|
||
private suspend fun bulkIndexFindings( | ||
monitor: Monitor, | ||
monitorCtx: MonitorRunnerExecutionContext, | ||
indexRequests: List<IndexRequest> | ||
) { | ||
if (indexRequests.isNotEmpty()) { | ||
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { | ||
goyamegh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dont refresh for every batch. refresh findings index only once after all batches are bulk-ingested There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
} | ||
if (bulkResponse.hasFailures()) { | ||
bulkResponse.items.forEach { item -> | ||
if (item.isFailed) { | ||
goyamegh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logger.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") | ||
} | ||
} | ||
} else { | ||
logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.") | ||
} | ||
} | ||
} | ||
|
||
private fun publishFinding( | ||
|
@@ -629,7 +679,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { | |
matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields)) | ||
} | ||
} catch (e: Exception) { | ||
logger.warn("Failed to run for shard $shard. Error: ${e.message}") | ||
logger.error("Failed to run for shard $shard. Error: ${e.message}") | ||
} | ||
} | ||
return matchingDocs | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ class AlertingSettings { | |
|
||
companion object { | ||
const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L | ||
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 10000 | ||
goyamegh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
val ALERTING_MAX_MONITORS = Setting.intSetting( | ||
"plugins.alerting.monitor.max_monitors", | ||
|
@@ -153,5 +154,12 @@ class AlertingSettings { | |
-1L, | ||
Setting.Property.NodeScope, Setting.Property.Dynamic | ||
) | ||
|
||
val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting( | ||
"plugins.alerting.alert_findings_indexing_batch_size", | ||
DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, | ||
0, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. min cant be 0 |
||
Setting.Property.NodeScope, Setting.Property.Dynamic | ||
) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:BULK