-
Notifications
You must be signed in to change notification settings - Fork 105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implemented draft of Finding data model, a new Input type, and some basic unit tests. #260
Changes from 9 commits
36c3ce5
2bf3e9c
640420f
8adc018
7bc4ae2
fb04986
b184af1
8ea16d3
37f70be
23693e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.alerting.model | ||
|
||
import org.opensearch.alerting.elasticapi.instant | ||
import org.opensearch.common.io.stream.StreamInput | ||
import org.opensearch.common.io.stream.StreamOutput | ||
import org.opensearch.common.io.stream.Writeable | ||
import org.opensearch.common.xcontent.ToXContent | ||
import org.opensearch.common.xcontent.XContentBuilder | ||
import org.opensearch.common.xcontent.XContentParser | ||
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken | ||
import java.io.IOException | ||
import java.time.Instant | ||
|
||
/** | ||
* A wrapper of the log event that enriches the event by also including information about the monitor it triggered. | ||
*/ | ||
class Finding( | ||
val id: String = NO_ID, | ||
val logEvent: Map<String, Any>, | ||
val monitorId: String, | ||
val monitorName: String, | ||
val queryId: String = NO_ID, | ||
val queryTags: List<String>, | ||
val severity: String, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might need to be an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I implemented this as a string to align with |
||
val timestamp: Instant, | ||
val triggerId: String, | ||
val triggerName: String | ||
) : Writeable, ToXContent { | ||
|
||
@Throws(IOException::class) | ||
constructor(sin: StreamInput) : this( | ||
id = sin.readString(), | ||
logEvent = suppressWarning(sin.readMap()), | ||
monitorId = sin.readString(), | ||
monitorName = sin.readString(), | ||
queryId = sin.readString(), | ||
queryTags = sin.readStringList(), | ||
severity = sin.readString(), | ||
timestamp = sin.readInstant(), | ||
triggerId = sin.readString(), | ||
triggerName = sin.readString() | ||
) | ||
|
||
fun asTemplateArg(): Map<String, Any?> { | ||
return mapOf( | ||
FINDING_ID_FIELD to id, | ||
LOG_EVENT_FIELD to logEvent, | ||
MONITOR_ID_FIELD to monitorId, | ||
MONITOR_NAME_FIELD to monitorName, | ||
QUERY_ID_FIELD to queryId, | ||
QUERY_TAGS_FIELD to queryTags, | ||
SEVERITY_FIELD to severity, | ||
TIMESTAMP_FIELD to timestamp.toEpochMilli(), | ||
TRIGGER_ID_FIELD to triggerId, | ||
TRIGGER_NAME_FIELD to triggerName | ||
) | ||
} | ||
|
||
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { | ||
builder.startObject() | ||
.field(FINDING_ID_FIELD, id) | ||
.field(LOG_EVENT_FIELD, logEvent) | ||
.field(MONITOR_ID_FIELD, monitorId) | ||
.field(MONITOR_NAME_FIELD, monitorName) | ||
.field(QUERY_ID_FIELD, queryId) | ||
.field(QUERY_TAGS_FIELD, queryTags.toTypedArray()) | ||
.field(SEVERITY_FIELD, severity) | ||
.field(TIMESTAMP_FIELD, timestamp) | ||
.field(TRIGGER_ID_FIELD, triggerId) | ||
.field(TRIGGER_NAME_FIELD, triggerName) | ||
builder.endObject() | ||
return builder | ||
} | ||
|
||
@Throws(IOException::class) | ||
override fun writeTo(out: StreamOutput) { | ||
out.writeString(id) | ||
out.writeMap(logEvent) | ||
out.writeString(monitorId) | ||
out.writeString(monitorName) | ||
out.writeString(queryId) | ||
out.writeStringCollection(queryTags) | ||
out.writeString(severity) | ||
out.writeInstant(timestamp) | ||
out.writeString(triggerId) | ||
out.writeString(triggerName) | ||
} | ||
|
||
companion object { | ||
const val FINDING_ID_FIELD = "id" | ||
const val LOG_EVENT_FIELD = "log_event" | ||
const val MONITOR_ID_FIELD = "monitor_id" | ||
const val MONITOR_NAME_FIELD = "monitor_name" | ||
const val QUERY_ID_FIELD = "query_id" | ||
const val QUERY_TAGS_FIELD = "query_tags" | ||
const val SEVERITY_FIELD = "severity" | ||
const val TIMESTAMP_FIELD = "timestamp" | ||
const val TRIGGER_ID_FIELD = "trigger_id" | ||
const val TRIGGER_NAME_FIELD = "trigger_name" | ||
const val NO_ID = "" | ||
|
||
@JvmStatic @JvmOverloads | ||
@Throws(IOException::class) | ||
fun parse(xcp: XContentParser, id: String = NO_ID): Finding { | ||
var logEvent: Map<String, Any> = mapOf() | ||
lateinit var monitorId: String | ||
lateinit var monitorName: String | ||
var queryId: String = NO_ID | ||
val queryTags: MutableList<String> = mutableListOf() | ||
lateinit var severity: String | ||
lateinit var timestamp: Instant | ||
lateinit var triggerId: String | ||
lateinit var triggerName: String | ||
|
||
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) | ||
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { | ||
val fieldName = xcp.currentName() | ||
xcp.nextToken() | ||
|
||
when (fieldName) { | ||
LOG_EVENT_FIELD -> logEvent = xcp.map() | ||
MONITOR_ID_FIELD -> monitorId = xcp.text() | ||
MONITOR_NAME_FIELD -> monitorName = xcp.text() | ||
QUERY_ID_FIELD -> queryId = xcp.text() | ||
QUERY_TAGS_FIELD -> { | ||
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) | ||
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { | ||
queryTags.add(xcp.text()) | ||
} | ||
} | ||
SEVERITY_FIELD -> severity = xcp.text() | ||
TIMESTAMP_FIELD -> timestamp = requireNotNull(xcp.instant()) | ||
TRIGGER_ID_FIELD -> triggerId = xcp.text() | ||
TRIGGER_NAME_FIELD -> triggerName = xcp.text() | ||
} | ||
} | ||
|
||
return Finding( | ||
id = id, | ||
logEvent = logEvent, | ||
monitorId = monitorId, | ||
monitorName = monitorName, | ||
queryId = queryId, | ||
queryTags = queryTags, | ||
severity = severity, | ||
timestamp = timestamp, | ||
triggerId = triggerId, | ||
triggerName = triggerName | ||
) | ||
} | ||
|
||
@JvmStatic | ||
@Throws(IOException::class) | ||
fun readFrom(sin: StreamInput): Finding { | ||
return Finding(sin) | ||
} | ||
|
||
@Suppress("UNCHECKED_CAST") | ||
fun suppressWarning(map: MutableMap<String?, Any?>?): MutableMap<String, Any> { | ||
return map as MutableMap<String, Any> | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.alerting.model.docLevelInput | ||
|
||
import org.opensearch.alerting.core.model.Input | ||
import org.opensearch.common.CheckedFunction | ||
import org.opensearch.common.ParseField | ||
import org.opensearch.common.io.stream.StreamInput | ||
import org.opensearch.common.io.stream.StreamOutput | ||
import org.opensearch.common.xcontent.NamedXContentRegistry | ||
import org.opensearch.common.xcontent.ToXContent | ||
import org.opensearch.common.xcontent.XContentBuilder | ||
import org.opensearch.common.xcontent.XContentParser | ||
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken | ||
import java.io.IOException | ||
|
||
data class DocLevelMonitorInput( | ||
val description: String = NO_DESCRIPTION, | ||
val indices: List<String>, | ||
val queries: List<DocLevelQuery> | ||
) : Input { | ||
|
||
@Throws(IOException::class) | ||
constructor(sin: StreamInput) : this( | ||
sin.readString(), // description | ||
sin.readStringList(), // indices | ||
sin.readList(::DocLevelQuery) // docLevelQueries | ||
) | ||
|
||
fun asTemplateArg(): Map<String, Any?> { | ||
return mapOf( | ||
DESCRIPTION_FIELD to description, | ||
INDICES_FIELD to indices, | ||
QUERIES_FIELD to queries.map { it.asTemplateArg() } | ||
) | ||
} | ||
|
||
override fun name(): String { | ||
return DOC_LEVEL_INPUT_FIELD | ||
} | ||
|
||
@Throws(IOException::class) | ||
override fun writeTo(out: StreamOutput) { | ||
out.writeString(description) | ||
out.writeStringCollection(indices) | ||
out.writeCollection(queries) | ||
} | ||
|
||
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { | ||
builder.startObject() | ||
.startObject(DOC_LEVEL_INPUT_FIELD) | ||
.field(DESCRIPTION_FIELD, description) | ||
.field(INDICES_FIELD, indices.toTypedArray()) | ||
.field(QUERIES_FIELD, queries.toTypedArray()) | ||
.endObject() | ||
.endObject() | ||
return builder | ||
} | ||
|
||
companion object { | ||
const val DESCRIPTION_FIELD = "description" | ||
const val INDICES_FIELD = "indices" | ||
const val DOC_LEVEL_INPUT_FIELD = "doc_level_input" | ||
const val QUERIES_FIELD = "queries" | ||
|
||
const val NO_DESCRIPTION = "" | ||
|
||
val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Input::class.java, ParseField(DOC_LEVEL_INPUT_FIELD), CheckedFunction { parse(it) }) | ||
|
||
@JvmStatic @Throws(IOException::class) | ||
fun parse(xcp: XContentParser): DocLevelMonitorInput { | ||
var description: String = NO_DESCRIPTION | ||
val indices: MutableList<String> = mutableListOf() | ||
val docLevelQueries: MutableList<DocLevelQuery> = mutableListOf() | ||
|
||
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) | ||
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { | ||
val fieldName = xcp.currentName() | ||
xcp.nextToken() | ||
|
||
when (fieldName) { | ||
DESCRIPTION_FIELD -> description = xcp.text() | ||
INDICES_FIELD -> { | ||
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) | ||
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { | ||
indices.add(xcp.text()) | ||
} | ||
} | ||
QUERIES_FIELD -> { | ||
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) | ||
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { | ||
docLevelQueries.add(DocLevelQuery.parse(xcp)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
return DocLevelMonitorInput(description = description, indices = indices, queries = docLevelQueries) | ||
} | ||
|
||
@JvmStatic @Throws(IOException::class) | ||
fun readFrom(sin: StreamInput): DocLevelMonitorInput { | ||
return DocLevelMonitorInput(sin) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More descriptive class names!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name is a place holder that aligns with terminology used in the design doc. Definitely open to changing it once we have a better overall name in mind; but for now, I think it's better to use terminology present in the design doc to help avoid confusion.