-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bulk index findings and sequentially invoke auto-correlations #1355
Merged
eirsep
merged 6 commits into
opensearch-project:main
from
goyamegh:main-bulkIndexFindings
Feb 6, 2024
Merged
Changes from 1 commit
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
e7ba6d7
Bulk index findings and sequentially invoke auto-correlations
goyamegh 28530dd
Merge branch 'opensearch-project:main' into main-bulkIndexFindings
goyamegh 03b86ae
Bulk index findings in batches of 10000 and make it configurable
goyamegh ae32748
Addressing review comments
goyamegh 1833d6d
Add integ tests to test bulk index findings
goyamegh 3bd7888
Fix ktlint formatting
goyamegh File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,8 +8,10 @@ package org.opensearch.alerting | |
import org.apache.logging.log4j.LogManager | ||
import org.opensearch.ExceptionsHelper | ||
import org.opensearch.OpenSearchStatusException | ||
import org.opensearch.action.DocWriteRequest | ||
import org.opensearch.action.bulk.BulkRequest | ||
import org.opensearch.action.bulk.BulkResponse | ||
import org.opensearch.action.index.IndexRequest | ||
import org.opensearch.action.index.IndexResponse | ||
import org.opensearch.action.search.SearchAction | ||
import org.opensearch.action.search.SearchRequest | ||
import org.opensearch.action.search.SearchResponse | ||
|
@@ -273,10 +275,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { | |
// If there are no triggers defined, we still want to generate findings | ||
if (monitor.triggers.isEmpty()) { | ||
if (dryrun == false && monitor.id != Monitor.NO_ID) { | ||
docsToQueries.forEach { | ||
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } | ||
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true) | ||
} | ||
createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) | ||
} | ||
} else { | ||
monitor.triggers.forEach { | ||
|
@@ -365,7 +364,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { | |
trigger: DocumentLevelTrigger, | ||
monitor: Monitor, | ||
idQueryMap: Map<String, DocLevelQuery>, | ||
docsToQueries: Map<String, List<String>>, | ||
docsToQueries: MutableMap<String, MutableList<String>>, | ||
queryToDocIds: Map<DocLevelQuery, Set<String>>, | ||
dryrun: Boolean, | ||
workflowRunContext: WorkflowRunContext?, | ||
|
@@ -374,35 +373,34 @@ object DocumentLevelMonitorRunner : MonitorRunner() { | |
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) | ||
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) | ||
|
||
val findings = mutableListOf<String>() | ||
val findingDocPairs = mutableListOf<Pair<String, String>>() | ||
val triggerFindingDocPairs = mutableListOf<Pair<String, String>>() | ||
|
||
// TODO: Implement throttling for findings | ||
docsToQueries.forEach { | ||
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } | ||
val findingId = createFindings( | ||
monitor, | ||
monitorCtx, | ||
triggeredQueries, | ||
it.key, | ||
!dryrun && monitor.id != Monitor.NO_ID, | ||
executionId | ||
) | ||
findings.add(findingId) | ||
val findingToDocPairs = createFindings( | ||
monitor, | ||
monitorCtx, | ||
docsToQueries, | ||
idQueryMap, | ||
!dryrun && monitor.id != Monitor.NO_ID, | ||
executionId | ||
) | ||
|
||
if (triggerResult.triggeredDocs.contains(it.key)) { | ||
findingDocPairs.add(Pair(findingId, it.key)) | ||
findingToDocPairs.forEach { | ||
// Only pick those entries whose docs have triggers associated with them | ||
if (triggerResult.triggeredDocs.contains(it.second)) { | ||
triggerFindingDocPairs.add(Pair(it.first, it.second)) | ||
} | ||
} | ||
|
||
val actionCtx = triggerCtx.copy( | ||
triggeredDocs = triggerResult.triggeredDocs, | ||
relatedFindings = findings, | ||
// confirm if this is right or only trigger-able findings should be present in this list | ||
relatedFindings = findingToDocPairs.map { it.first }, | ||
error = monitorResult.error ?: triggerResult.error | ||
) | ||
|
||
val alerts = mutableListOf<Alert>() | ||
findingDocPairs.forEach { | ||
triggerFindingDocPairs.forEach { | ||
val alert = monitorCtx.alertService!!.composeDocLevelAlert( | ||
listOf(it.first), | ||
listOf(it.second), | ||
|
@@ -461,51 +459,82 @@ object DocumentLevelMonitorRunner : MonitorRunner() { | |
return triggerResult | ||
} | ||
|
||
/** | ||
* 1. Bulk index all findings based on shouldCreateFinding flag | ||
* 2. invoke publishFinding() to kickstart auto-correlations | ||
* 3. Returns a list of pairs for finding id to doc id | ||
*/ | ||
private suspend fun createFindings( | ||
monitor: Monitor, | ||
monitorCtx: MonitorRunnerExecutionContext, | ||
docLevelQueries: List<DocLevelQuery>, | ||
matchingDocId: String, | ||
docsToQueries: MutableMap<String, MutableList<String>>, | ||
idQueryMap: Map<String, DocLevelQuery>, | ||
shouldCreateFinding: Boolean, | ||
workflowExecutionId: String? = null, | ||
): String { | ||
// Before the "|" is the doc id and after the "|" is the index | ||
val docIndex = matchingDocId.split("|") | ||
): List<Pair<String, String>> { | ||
|
||
val finding = Finding( | ||
id = UUID.randomUUID().toString(), | ||
relatedDocIds = listOf(docIndex[0]), | ||
correlatedDocIds = listOf(docIndex[0]), | ||
monitorId = monitor.id, | ||
monitorName = monitor.name, | ||
index = docIndex[1], | ||
docLevelQueries = docLevelQueries, | ||
timestamp = Instant.now(), | ||
executionId = workflowExecutionId | ||
) | ||
val findingDocPairs = mutableListOf<Pair<String, String>>() | ||
val findings = mutableListOf<Finding>() | ||
val indexRequests = mutableListOf<IndexRequest>() | ||
|
||
val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() | ||
logger.debug("Findings: $findingStr") | ||
docsToQueries.forEach { | ||
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } | ||
|
||
// Before the "|" is the doc id and after the "|" is the index | ||
val docIndex = it.key.split("|") | ||
|
||
if (shouldCreateFinding) { | ||
val indexRequest = IndexRequest(monitor.dataSources.findingsIndex) | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.source(findingStr, XContentType.JSON) | ||
.id(finding.id) | ||
.routing(finding.id) | ||
val finding = Finding( | ||
id = UUID.randomUUID().toString(), | ||
relatedDocIds = listOf(docIndex[0]), | ||
correlatedDocIds = listOf(docIndex[0]), | ||
monitorId = monitor.id, | ||
monitorName = monitor.name, | ||
index = docIndex[1], | ||
docLevelQueries = triggeredQueries, | ||
timestamp = Instant.now(), | ||
executionId = workflowExecutionId | ||
) | ||
findingDocPairs.add(Pair(finding.id, it.key)) | ||
findings.add(finding) | ||
|
||
val findingStr = | ||
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) | ||
.string() | ||
logger.debug("Findings: $findingStr") | ||
|
||
if (shouldCreateFinding) { | ||
indexRequests += IndexRequest(monitor.dataSources.findingsIndex) | ||
.source(findingStr, XContentType.JSON) | ||
.id(finding.id) | ||
.routing(finding.id) | ||
goyamegh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.opType(DocWriteRequest.OpType.INDEX) | ||
goyamegh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
monitorCtx.client!!.suspendUntil<Client, IndexResponse> { | ||
monitorCtx.client!!.index(indexRequest, it) | ||
if (indexRequests.isNotEmpty()) { | ||
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { | ||
goyamegh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dont refresh for every batch. refresh findings index only once after all batches are bulk-ingested There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
} | ||
if (bulkResponse.hasFailures()) { | ||
bulkResponse.items.forEach { item -> | ||
if (item.isFailed) { | ||
goyamegh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") | ||
} | ||
} | ||
} else { | ||
logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.") | ||
} | ||
} | ||
|
||
try { | ||
publishFinding(monitor, monitorCtx, finding) | ||
findings.forEach { finding -> | ||
publishFinding(monitor, monitorCtx, finding) | ||
} | ||
} catch (e: Exception) { | ||
// suppress exception | ||
logger.error("Optional finding callback failed", e) | ||
} | ||
return finding.id | ||
return findingDocPairs | ||
} | ||
|
||
private fun publishFinding( | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: plz add TODO/FIXME and maintain pr in draft if it's not ready to merge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the previous behaviour? why are we not just refactoring? is there a behaviour change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is preserving the existing behavior. Left by mistake, removed the comment.