diff --git a/.environment/gitleaks/gitleaks-config.toml b/.environment/gitleaks/gitleaks-config.toml index 354ed292aa2..f0b31d97788 100644 --- a/.environment/gitleaks/gitleaks-config.toml +++ b/.environment/gitleaks/gitleaks-config.toml @@ -120,6 +120,7 @@ title = "PRIME ReportStream Gitleaks Configuration" ] [rules.allowlist] regexes = [ + 'api/lookuptables/list', ' \* \(The older version of this API is "/api/reports"\)', ' \* since this auth (has|uses) a ', ' by option\(\"', diff --git a/prime-router/settings/organizations.yml b/prime-router/settings/organizations.yml index 21dac5d9327..ee9d214bf76 100644 --- a/prime-router/settings/organizations.yml +++ b/prime-router/settings/organizations.yml @@ -1770,6 +1770,9 @@ - name: ignore description: FOR TESTING ONLY jurisdiction: FEDERAL + filters: + - topic: covid-19 + jurisdictionalFilter: [ "matches(ordering_facility_state, IG)" ] senders: - name: ignore-strac # Use this to test sending strac data organizationName: ignore @@ -1807,7 +1810,7 @@ organizationName: ignore topic: covid-19 customerStatus: inactive - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, CSV)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, CSV)" ] timing: operation: MERGE numberPerDay: 1440 # Every minute @@ -1827,7 +1830,7 @@ organizationName: ignore topic: covid-19 customerStatus: inactive - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, HL7)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, HL7)" ] timing: operation: MERGE numberPerDay: 1440 # Every minute @@ -1847,7 +1850,7 @@ organizationName: ignore topic: covid-19 customerStatus: inactive - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, HL7_BATCH)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, HL7_BATCH)" ] timing: operation: MERGE numberPerDay: 1440 # Every minute @@ -1866,7 +1869,7 @@ organizationName: ignore topic: covid-19 customerStatus: inactive - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, REDOX)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, REDOX)" ] timing: operation: MERGE numberPerDay: 1440 # Every minute @@ -1889,7 +1892,7 @@ organizationName: ignore topic: covid-19 customerStatus: inactive - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, AS2)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, AS2)" ] timing: operation: MERGE numberPerDay: 1440 # Every minute @@ -1901,7 +1904,7 @@ organizationName: ignore topic: covid-19 customerStatus: inactive - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, HL7_NULL)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, HL7_NULL)" ] timing: operation: MERGE numberPerDay: 1440 # Every minute @@ -1914,7 +1917,7 @@ organizationName: ignore topic: covid-19 customerStatus: inactive - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, BLOBSTORE)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, BLOBSTORE)" ] timing: operation: MERGE numberPerDay: 1440 # Every minute @@ -1931,7 +1934,7 @@ organizationName: ignore topic: covid-19 customerStatus: inactive - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, SFTP_FAIL)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, SFTP_FAIL)" ] timing: operation: MERGE numberPerDay: 1440 # Every minute @@ -1949,7 +1952,7 @@ organizationName: ignore topic: covid-19 customerStatus: inactive - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, QUALITY_PASS, removed)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, QUALITY_PASS, removed)" ] qualityFilter: - hasValidDataFor(message_id,ordering_facility_county,ordering_facility_state) - hasAtLeastOneOf(message_id,blankField) @@ -1962,7 +1965,7 @@ organizationName: ignore topic: covid-19 customerStatus: inactive - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, QUALITY_REVERSED, kept)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, QUALITY_REVERSED, kept)" ] qualityFilter: - hasValidDataFor(message_id,ordering_facility_county,ordering_facility_state) - hasAtLeastOneOf(message_id,blankField) @@ -1977,7 +1980,7 @@ organizationName: ignore topic: covid-19 customerStatus: inactive - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, QUALITY_ALL)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, QUALITY_ALL)" ] qualityFilter: [ "allowAll()" ] translation: type: CUSTOM @@ -1987,7 +1990,7 @@ organizationName: ignore topic: covid-19 customerStatus: inactive - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, QUALITY_FAIL)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, QUALITY_FAIL)" ] qualityFilter: [ "hasValidDataFor(blankField)", "hasAtLeastOneOf(message_id,blankField)" ] translation: type: CUSTOM @@ -1997,7 +2000,7 @@ organizationName: ignore topic: covid-19 customerStatus: inactive - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, OTC_PROCTORED)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, OTC_PROCTORED)" ] qualityFilter: [ "matches(test_authorized_for_otc, Y)","matches(test_authorized_for_home, Y)","matches(test_authorized_for_unproctored, Y)" ] translation: type: "CUSTOM" @@ -2008,7 +2011,7 @@ organizationName: ignore topic: covid-19 customerStatus: inactive - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, OTC_PROCTORED)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, OTC_PROCTORED)" ] qualityFilter: [ "matches(test_authorized_for_otc, N)","matches(test_authorized_for_home, Y)","matches(test_authorized_for_unproctored, Y)" ] translation: type: "CUSTOM" @@ -2019,7 +2022,7 @@ organizationName: ignore topic: covid-19 customerStatus: inactive - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, OTC_PROCTORED)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, OTC_PROCTORED)" ] qualityFilter: [ "matches(test_authorized_for_otc, N)","matches(test_authorized_for_home, UNK)","matches(test_authorized_for_unproctored, UNK)" ] translation: type: "CUSTOM" @@ -2031,7 +2034,7 @@ organizationName: ignore topic: covid-19 customerStatus: active - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, EVERY_5_MINS)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, EVERY_5_MINS)" ] timing: operation: MERGE numberPerDay: 288 # Every 5 minutes @@ -2050,7 +2053,7 @@ organizationName: ignore topic: covid-19 customerStatus: active - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, EVERY_15_MINS)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, EVERY_15_MINS)" ] timing: operation: MERGE numberPerDay: 96 # Every 15 minutes @@ -2069,7 +2072,7 @@ organizationName: ignore topic: covid-19 customerStatus: active - jurisdictionalFilter: [ "matches(ordering_facility_state, IG)", "matches(ordering_facility_county, EVERY_60_MINS)" ] + jurisdictionalFilter: [ "matches(ordering_facility_county, EVERY_60_MINS)" ] timing: operation: MERGE numberPerDay: 24 # Every 60 minutes diff --git a/prime-router/src/main/kotlin/Metadata.kt b/prime-router/src/main/kotlin/Metadata.kt index 93002548f7b..fb7ce0433ab 100644 --- a/prime-router/src/main/kotlin/Metadata.kt +++ b/prime-router/src/main/kotlin/Metadata.kt @@ -45,7 +45,7 @@ class Metadata : Logging { HashMapper(), NullMapper() ) - private var jurisdictionalFilters = listOf( + private var reportStreamFilterDefinitions = listOf( FilterByCounty(), Matches(), DoesNotMatch(), @@ -252,11 +252,11 @@ class Metadata : Logging { } /* - * JurisdictionalFilters + * ReportStreamFilterDefinitions */ - fun findJurisdictionalFilter(name: String): JurisdictionalFilter? { - return jurisdictionalFilters.find { it.name.equals(name, ignoreCase = true) } + fun findReportStreamFilterDefinitions(name: String): ReportStreamFilterDefinition? { + return reportStreamFilterDefinitions.find { it.name.equals(name, ignoreCase = true) } } /* diff --git a/prime-router/src/main/kotlin/Organization.kt b/prime-router/src/main/kotlin/Organization.kt index 53f909216cd..9c6ce8fad3a 100644 --- a/prime-router/src/main/kotlin/Organization.kt +++ b/prime-router/src/main/kotlin/Organization.kt @@ -9,6 +9,7 @@ open class Organization( val jurisdiction: Jurisdiction, val stateCode: String?, val countyName: String?, + val filters: List? = emptyList(), // one ReportStreamFilters obj per topic. ) { constructor(org: Organization) : this(org.name, org.description, org.jurisdiction, org.stateCode, org.countyName) @@ -53,9 +54,13 @@ class DeepOrganization( jurisdiction: Jurisdiction, stateCode: String? = null, countyName: String? = null, + filters: List? = emptyList(), val senders: List = emptyList(), val receivers: List = emptyList(), -) : Organization(name, description, jurisdiction, stateCode, countyName) { +) : Organization(name, description, jurisdiction, stateCode, countyName, filters) { constructor(org: Organization, senders: List, receivers: List) : - this(org.name, org.description, org.jurisdiction, org.stateCode, org.countyName, senders, receivers) + this( + org.name, org.description, org.jurisdiction, org.stateCode, org.countyName, org.filters, + senders, receivers + ) } \ No newline at end of file diff --git a/prime-router/src/main/kotlin/Receiver.kt b/prime-router/src/main/kotlin/Receiver.kt index d1d7cd52fd7..975ae7d904b 100644 --- a/prime-router/src/main/kotlin/Receiver.kt +++ b/prime-router/src/main/kotlin/Receiver.kt @@ -15,8 +15,12 @@ import java.time.ZoneId * @param topic defines the set of schemas that can translate to each other * @param customerStatus defines if the receiver is fully onboarded * @param translation configuration to translate - * @param jurisdictionalFilter defines the set of elements and regexes that filter the data for this receiver - * @param qualityFilter defines the set of elements and regexes that do qualiyty filtering on the data for this receiver + * @param jurisdictionalFilter defines the geographic region filters for this receiver + * @param qualityFilter defines the filters that remove data, based on quality criteria + * @param routingFilter The original use case was for filters that remove data the + * receiver does not want, based on who sent it. However, its available for any general purpose use. + * @param processingModeFilter defines the filters that is normally set to remove test and debug data. + * @param reverseTheQualityFilter If this is true, then do the NOT of 'qualityFilter'. Like a 'grep -v' * @param deidentify transform * @param timing defines how to delay reports to the org. If null, then send immediately * @param description of the receiver @@ -29,9 +33,10 @@ open class Receiver( val topic: String, val customerStatus: CustomerStatus = CustomerStatus.INACTIVE, val translation: TranslatorConfiguration, - val jurisdictionalFilter: List = emptyList(), - val qualityFilter: List = emptyList(), - // If this is true, then do the NOT of 'qualityFilter'. Like a 'grep -v' + val jurisdictionalFilter: ReportStreamFilter = emptyList(), + val qualityFilter: ReportStreamFilter = emptyList(), + val routingFilter: ReportStreamFilter = emptyList(), + val processingModeFilter: ReportStreamFilter = emptyList(), val reverseTheQualityFilter: Boolean = false, val deidentify: Boolean = false, val timing: Timing? = null, @@ -60,6 +65,8 @@ open class Receiver( copy.translation, copy.jurisdictionalFilter, copy.qualityFilter, + copy.routingFilter, + copy.processingModeFilter, copy.reverseTheQualityFilter, copy.deidentify, copy.timing, diff --git a/prime-router/src/main/kotlin/Report.kt b/prime-router/src/main/kotlin/Report.kt index 1227ac956a9..d0b753875ff 100644 --- a/prime-router/src/main/kotlin/Report.kt +++ b/prime-router/src/main/kotlin/Report.kt @@ -55,26 +55,39 @@ enum class Options { } /** - * QualityFilterResult records the rows filtered out by a quality filter. - * As well as the function name and arguments that did the filtering. + * ReportStreamFilterResult records useful information about rows filtered by one filter call. One filter + * might filter many rows. ReportStreamFilterResult entries are only created when filter logging is on. This is to + * prevent tons of junk logging of jurisdictionalFilters - the vast majority of which typically filter out everything. * * @property receiverName Then intended reciever for the report * @property originalCount The original number of items in the report * @property filterName The name of the filter function that removed the rows * @property filterArgs The arguments used in the filter function * @property filteredRows The row's that were removed from the report, 0 indexed + * @property filteredTrackingElements The trackingElement values of the rows removed. + * Note that we can't guarantee the Sender is sending good unique trackingElement values. */ -data class QualityFilterResult( +data class ReportStreamFilterResult( val receiverName: String, val originalCount: Int, val filterName: String, val filterArgs: List, - val filteredRows: IntArray + val filteredCount: Int, + val filteredTrackingElements: List, ) { + companion object { + // Use this value in logs and user-facing messages if the trackingElement is missing. + val DEFAULT_TRACKING_VALUE = "MissingID" + } + override fun toString(): String { - return "For $receiverName, qualityFilter $filterName, $filterArgs" + - " filtered out Rows ${filteredRows.map{ it + 1 }.joinToString(",")}" + - " reducing the Item count from $originalCount to ${originalCount - filteredRows.size}." + return "For $receiverName, filter $filterName$filterArgs" + + " reduced the item count from $originalCount to ${originalCount - filteredCount}." + + if (filteredTrackingElements.isNullOrEmpty()) { + "" + } else { + " Data with these IDs were filtered out: (${filteredTrackingElements.joinToString(",") })" + } } } @@ -131,9 +144,10 @@ class Report : Logging { val destination: Receiver? /** - * The list of results from quality filters run against the initial report data. + * The list of info about data *removed* from this report by filtering. + * The list has one entry per filter applied, *not* one entry per row removed. */ - val filteredItems: MutableList = mutableListOf() + val filteringResults: MutableList = mutableListOf() /** * The time when the report was created @@ -329,7 +343,7 @@ class Report : Logging { metadata = this.metadata ) copy.itemLineages = createOneToOneItemLineages(this, copy) - copy.filteredItems.addAll(this.filteredItems) + copy.filteringResults.addAll(this.filteringResults) return copy } @@ -379,29 +393,29 @@ class Report : Logging { } fun filter( - filterFunctions: List>>, + filterFunctions: List>>, receiver: Receiver, - isQualityFilter: Boolean, + doLogging: Boolean, + trackingElement: String?, reverseTheFilter: Boolean = false ): Report { - val filteredRows = mutableListOf() - // First, only do detailed logging on qualityFilters. - // But, **don't** do detailed logging if reverseTheFilter is true. - // This is a hack, but its because the logging is nonsensical if the filter is reversed. - // (Its nontrivial to fix the detailed logging of a reversed filter, per deMorgan's law) - val doDetailedFilterLogging = isQualityFilter && !reverseTheFilter + val filteredRows = mutableListOf() val combinedSelection = Selection.withRange(0, table.rowCount()) filterFunctions.forEach { (filterFn, fnArgs) -> - val filterFnSelection = filterFn.getSelection(fnArgs, table, receiver, doDetailedFilterLogging) - if (doDetailedFilterLogging && filterFnSelection.size() < table.rowCount()) { + val filterFnSelection = filterFn.getSelection(fnArgs, table, receiver, doLogging) + if (doLogging && filterFnSelection.size() < table.rowCount()) { val before = Selection.withRange(0, table.rowCount()) + val filteredRowList = before.andNot(filterFnSelection).toList() filteredRows.add( - QualityFilterResult( + ReportStreamFilterResult( receiver.fullName, table.rowCount(), filterFn.name, fnArgs, - before.andNot(filterFnSelection).toArray() + filteredRowList.size, + getValuesInRows( + trackingElement, filteredRowList, ReportStreamFilterResult.DEFAULT_TRACKING_VALUE + ) ) ) } @@ -418,11 +432,35 @@ class Report : Logging { fromThisReport("filter: $filterFunctions"), metadata = this.metadata ) - filteredReport.filteredItems.addAll(filteredRows) + // Write same info to our logs that goes in the json response obj + if (doLogging) + filteredRows.forEach { filterResult -> logger.info(filterResult.toString()) } + filteredReport.filteringResults.addAll(this.filteringResults) // copy ReportStreamFilterResults from prev + filteredReport.filteringResults.addAll(filteredRows) // and add any new ReportStreamFilterResults just created. filteredReport.itemLineages = createItemLineages(finalCombinedSelection, this, filteredReport) return filteredReport } + /** + * Return the values in column [columnName] for these [rows]. If a [default] is specified, + * that value is used for all cases where the value is empty/null/missing for that row in the table. + * If [default] is null, this may return fewer rows than in [rows]. + * + * @return an emptyList if the columnName is null (not an error) + * + */ + fun getValuesInRows(columnName: String?, rows: List, default: String? = null): List { + if (columnName.isNullOrEmpty()) return emptyList() + val columnIndex = this.table.columnIndex(columnName) + return rows.mapNotNull { row -> + val value = this.table.getString(row, columnIndex) + if (value.isNullOrEmpty()) + default // might be null + else + value + } + } + fun deidentify(): Report { val columns = schema.elements.map { if (it.pii == true) { diff --git a/prime-router/src/main/kotlin/ReportStreamFilter.kt b/prime-router/src/main/kotlin/ReportStreamFilter.kt new file mode 100644 index 00000000000..add969dd33a --- /dev/null +++ b/prime-router/src/main/kotlin/ReportStreamFilter.kt @@ -0,0 +1,101 @@ +package gov.cdc.prime.router + +import kotlin.reflect.KProperty1 +import kotlin.reflect.full.memberProperties + +/** + * A ReportStreamFilter is the use (call) of one or more ReportStreamFilterDefinitions. + */ +typealias ReportStreamFilter = List + +/** + * Enum of Fields in the ReportStreamFilters, below. Used to iterate thru all the filters in ReportStreamFilters + */ +enum class ReportStreamFilterType(val field: String) { + JURISDICTIONAL_FILTER("jurisdictionalFilter"), + QUALITY_FILTER("qualityFilter"), + ROUTING_FILTER("routingFilter"), + PROCESSING_MODE_FILTER("processingModeFilter"); + + // Reflection, so that we can write a single routine to handle all types of filters. + val filterProperty = ReportStreamFilters::class.memberProperties.first { it.name == this.field } + as KProperty1 + val receiverFilterProperty = Receiver::class.memberProperties.first { it.name == this.field } + as KProperty1 +} + +/** + * The set of filtering objects available per Organization or per Receiver + * (and someday, per Sender too?) + * + * Examples of FilterTypes that can be applied: + * @param jurisdictionalFilter - used to limit the data received or sent by geographical region + * @param qualityFilter - used to limit the data received or sent, by quality + * @param routingFilter - used to limit the data received or sent, by who sent it. + * @param processingModeFilter - used to limit the data received to be either "Training", "Debug", or "Production" + * We allow a different set of filters per [topic] + */ +data class ReportStreamFilters( + val topic: String, + val jurisdictionalFilter: ReportStreamFilter?, + val qualityFilter: ReportStreamFilter?, + val routingFilter: ReportStreamFilter?, + val processingModeFilter: ReportStreamFilter?, +) { + + companion object { + + // For each 'topic' that ReportStream handles, there is a set of default filters. + // Currently these are defined here in code. + // Each Organization and Receiver can override the defaults. + // todo move all of these to a GLOBAL Setting in the settings table + val defaultCovid19QualityFilter: ReportStreamFilter = listOf( + // valid basic test info + "hasValidDataFor(message_id)", + "hasValidDataFor(equipment_model_name)", + "hasValidDataFor(specimen_type)", + "hasValidDataFor(test_result)", + // valid basic human info + "hasValidDataFor(patient_last_name, patient_first_name)", + "hasValidDataFor(patient_dob)", + // has minimal valid location or other contact info (for contact tracing) + "hasAtLeastOneOf(patient_street,patient_zip_code,patient_phone_number,patient_email)", + // has valid date (for relevance/urgency) + "hasAtLeastOneOf(order_test_date,specimen_collection_date_time,test_result_date)", + // has at least one valid CLIA + "isValidCLIA(testing_lab_clia,reporting_facility_clia)", + ) + private val defaultCovid19Filters = ReportStreamFilters( + topic = "covid-19", + jurisdictionalFilter = listOf("allowNone()"), // Receiver *must* override this to get data! + qualityFilter = defaultCovid19QualityFilter, + routingFilter = listOf("allowAll()"), + processingModeFilter = listOf("doesNotMatch(processing_mode_code, T, D)"), // No Training/Debug data + ) + + private val defaultCsvFileTestFilters = ReportStreamFilters( + topic = "CsvFileTests-topic", + jurisdictionalFilter = listOf("allowAll()"), + qualityFilter = listOf("hasValidDataFor(lab,state,test_time,specimen_id,observation)"), + routingFilter = listOf("allowAll()"), + processingModeFilter = listOf("allowAll()") + ) + + private val defaultTestFilters = ReportStreamFilters( + topic = "test", + jurisdictionalFilter = null, + qualityFilter = listOf("matches(a, no)"), + routingFilter = listOf("matches(b, false)"), + processingModeFilter = listOf("allowAll()") + ) + + /** + * Map from topic-name to a list of filter-function-strings + */ + val defaultFiltersByTopic: Map = mapOf( + defaultCovid19Filters.topic to defaultCovid19Filters, + defaultCsvFileTestFilters.topic to defaultCsvFileTestFilters, + defaultTestFilters.topic to defaultTestFilters, + ) + } +} \ No newline at end of file diff --git a/prime-router/src/main/kotlin/JurisdictionalFilters.kt b/prime-router/src/main/kotlin/ReportStreamFilterDefinition.kt similarity index 61% rename from prime-router/src/main/kotlin/JurisdictionalFilters.kt rename to prime-router/src/main/kotlin/ReportStreamFilterDefinition.kt index 2585d4b99c4..0a6b191efcf 100644 --- a/prime-router/src/main/kotlin/JurisdictionalFilters.kt +++ b/prime-router/src/main/kotlin/ReportStreamFilterDefinition.kt @@ -5,26 +5,31 @@ import tech.tablesaw.api.Table import tech.tablesaw.selection.Selection /** - * A *JurisdictionalFilter* can be used in the jurisdictionalFilter property in an OrganizationService. + * This is a library or toolkit of useful filter definitions. Filters remove "rows" of data. + * (as opposed to Mappers, which manipulate columns of data) + * + * A call to a [ReportStreamFilterDefinition] can be used in the filters property in an Organization * It allowed you to create arbitrarily complex filters on data. - * Each filter in the list does a "and" boolean operation with the other filters in the list. - * Here is an example use: - * `jurisdictionalFilter: { FilterByPatientOrFacilityLoc(AZ, Pima) }` + * Each filter in the list does an "and" boolean operation with the other filters in the list. * - * The name `filterByPatientOrFacility` then maps via pseudo-reflection to an implementation of JurisdictionalFilter - * here. + * Here is an example use: + * ``` + * jurisdictionalFilter: { filterByPatientOrFacilityLoc(AZ, Pima) } + * ``` * - * If you add a implementation here, you have to add it to the list of jurisdictionalFilters in Metadata.kt. + * The name `filterByPatientOrFacility` then maps via pseudo-reflection to an implementation of a + * ReportStreamFilterDef here. * - * A JurisdictionFilter is stateless. It has a name property, which should be used in the filter definition - - * basically a simple way of implementing Reflection. + * If you add an implementation here, you have to add it to the list of reportStreamFilters in Metadata.kt. * - * Currently JurisdictionFilter implements its filtering by just re-using the very rich `Selection` functionality already in tablesaw. + * A ReportStreamFilterDef is stateless. It has a name property, which should be used in the filter definition - + * basically a simple way of implementing Reflection. Currently ReportStreamFilterDef implements its filtering + * by just re-using the very rich `Selection` functionality already in tablesaw. * * Hoping we implement some geospatial searches someday. * */ -interface JurisdictionalFilter : Logging { +interface ReportStreamFilterDefinition : Logging { /** * Name of the filter function */ @@ -40,6 +45,19 @@ interface JurisdictionalFilter : Logging { * tablesaw table. */ fun getSelection(args: List, table: Table, receiver: Receiver, doAuditing: Boolean = true): Selection + + companion object : Logging { + /** + * filterFunction must be of form "funName(arg1, arg2, etc)" + */ + fun parseReportStreamFilter(filterFunction: String): Pair> { + // Using a permissive match in the (arg1, arg2) section, to allow most regexs to be passed as args. + // Somehow this works with "(?i).*Pima.*", I guess because the \\x29 matches rightmost ')' char + val match = Regex("([a-zA-Z0-9]+)\\x28(.*)\\x29").find(filterFunction) + ?: error("ReportStreamFilter field $filterFunction does not parse") + return match.groupValues[1] to match.groupValues[2].split(',').map { it.trim() } + } + } } /** @@ -47,7 +65,7 @@ interface JurisdictionalFilter : Logging { * If the column name does not exist, nothing passes thru the filter. * matches(columnName, regex, regex, regex) */ -class Matches : JurisdictionalFilter { +class Matches : ReportStreamFilterDefinition { override val name = "matches" override fun getSelection( @@ -82,7 +100,7 @@ class Matches : JurisdictionalFilter { * * A row of data is "allowed" if it does not match any of the values, or if the column does not exist */ -class DoesNotMatch : JurisdictionalFilter { +class DoesNotMatch : ReportStreamFilterDefinition { override val name = "doesNotMatch" override fun getSelection( @@ -107,14 +125,6 @@ class DoesNotMatch : JurisdictionalFilter { } else { Selection.withRange(0, table.rowCount()) } - if (selection.size() < table.rowCount()) { - JurisdictionalFilters.logFiltering( - Selection.withRange(0, table.rowCount()), selection, - "$name(${args.joinToString(",")})", - receiver, - doAuditing - ) - } return selection } } @@ -122,7 +132,7 @@ class DoesNotMatch : JurisdictionalFilter { /** * This may or may not be a unicorn. */ -class FilterByCounty : JurisdictionalFilter { +class FilterByCounty : ReportStreamFilterDefinition { override val name = "filterByCounty" override fun getSelection( @@ -174,7 +184,7 @@ class FilterByCounty : JurisdictionalFilter { * Example: * jurisdictionalFilter: orEquals(ordering_facility_state, PA, patient_state, PA) */ -class OrEquals : JurisdictionalFilter { +class OrEquals : ReportStreamFilterDefinition { override val name = "orEquals" override fun getSelection( @@ -205,9 +215,9 @@ class OrEquals : JurisdictionalFilter { } /** - * A filter that filter nothing -- allows all data through + * A filter that filter nothing -- allows all data through. Useful for overriding more strict defaults. */ -class AllowAll : JurisdictionalFilter { +class AllowAll : ReportStreamFilterDefinition { override val name = "allowAll" override fun getSelection( @@ -226,13 +236,34 @@ class AllowAll : JurisdictionalFilter { } } +/** + * A filter that filter everything -- allows no data through. Useful as a default to be overridden. + */ +class AllowNone : ReportStreamFilterDefinition { + override val name = "allowNone" + + override fun getSelection( + args: List, + table: Table, + receiver: Receiver, + doAuditing: Boolean + ): Selection { + // See note on allowAll above on regex weirdness. + if (args.size > 1) error( + "For rcvr ${receiver.fullName} Expecting no args for filter $name." + + " Got ${args.joinToString(",")}" + ) + return Selection.withRange(0, 0) + } +} + /** * Implements a quality check match. If a row has valid data for all the columns, the row is selected. * If any column name does not exist, nothing passes thru the filter. * hasValidDataFor(columnName1, columnName2, columnName3, ...) * If no columns are passed, all rows are selected. So, any number of args is acceptable. */ -class HasValidDataFor : JurisdictionalFilter { +class HasValidDataFor : ReportStreamFilterDefinition { override val name = "hasValidDataFor" override fun getSelection( @@ -248,15 +279,7 @@ class HasValidDataFor : JurisdictionalFilter { if (columnNames.contains(colName)) { val before = Selection.with(*selection.toArray()) // hack way to copy to a new Selection obj selection = selection.andNot(table.stringColumn(colName).isEmptyString) - - JurisdictionalFilters.logFiltering(before, selection, "$name($colName)", receiver, doAuditing) } else { - JurisdictionalFilters.logAllEliminated( - table.rowCount(), - "$name($colName): column not found", - receiver, - doAuditing - ) return Selection.withRange(0, 0) } } @@ -275,7 +298,7 @@ class HasValidDataFor : JurisdictionalFilter { * find no official documentation confirming that, so that is not enforced) * */ -class IsValidCLIA : JurisdictionalFilter { +class IsValidCLIA : ReportStreamFilterDefinition { override val name = "isValidCLIA" override fun getSelection( @@ -287,28 +310,11 @@ class IsValidCLIA : JurisdictionalFilter { if (args.isEmpty()) error("Expecting at least one arg for filter $name. Got none.") var selection = Selection.withRange(0, 0) val columnNames = table.columnNames() - var atLeastOneColumnFound = false args.forEach { colName -> if (columnNames.contains(colName)) { selection = selection.or( table.stringColumn(colName).lengthEquals(10).and(table.stringColumn(colName).isAlphaNumeric) ) - atLeastOneColumnFound = true - } - } - if (!atLeastOneColumnFound) { - JurisdictionalFilters.logAllEliminated( - table.rowCount(), - "$name(${args.joinToString(",")}): none of these columns found.", - receiver, - doAuditing - ) - } else { - if (selection.size() < table.rowCount()) { - JurisdictionalFilters.logFiltering( - Selection.withRange(0, table.rowCount()), selection, - "$name(${args.joinToString(",")})", receiver, doAuditing - ) } } return selection @@ -319,7 +325,7 @@ class IsValidCLIA : JurisdictionalFilter { * hasAtLeastOneOf(columnName1, columnName2, columnName3, ...) * Implements a quality check match. If a row has valid data for any of the columns, the row is selected. */ -class HasAtLeastOneOf : JurisdictionalFilter { +class HasAtLeastOneOf : ReportStreamFilterDefinition { override val name = "hasAtLeastOneOf" override fun getSelection( @@ -331,104 +337,11 @@ class HasAtLeastOneOf : JurisdictionalFilter { if (args.isEmpty()) error("Expecting at least one arg for filter $name. Got none.") var selection = Selection.withRange(0, 0) val columnNames = table.columnNames() - var atLeastOneColumnFound = false args.forEach { colName -> if (columnNames.contains(colName)) { selection = selection.or(table.stringColumn(colName).isNotMissing) - atLeastOneColumnFound = true - } - } - if (!atLeastOneColumnFound) { - JurisdictionalFilters.logAllEliminated( - table.rowCount(), - "$name(${args.joinToString(",")}): none of these columns found.", receiver, doAuditing - ) - } else { - if (selection.size() < table.rowCount()) { - JurisdictionalFilters.logFiltering( - Selection.withRange(0, table.rowCount()), selection, - "$name(${args.joinToString(",")})", receiver, doAuditing - ) } } return selection } -} - -object JurisdictionalFilters : Logging { - // covid-19 default quality check consists of these filters - // todo move this to a GLOBAL Setting in the settings table - val defaultCovid19QualityCheck = listOf( - // valid human and valid test - "hasValidDataFor(" + - "message_id," + - "equipment_model_name," + - "specimen_type," + - "test_result," + - "patient_last_name," + - "patient_first_name," + - "patient_dob" + - ")", - // has minimal valid location or other contact info (for contact tracing) - "hasAtLeastOneOf(patient_street,patient_zip_code,patient_phone_number,patient_email)", - // has valid date (for relevance/urgency) - "hasAtLeastOneOf(order_test_date,specimen_collection_date_time,test_result_date)", - // has at least one valid CLIA - "isValidCLIA(testing_lab_clia,reporting_facility_clia)", - // never send T (Training/Test) or D (Debug) data to the states. - "doesNotMatch(processing_mode_code,T,D)", - ) - - /** - * Map from topic-name to a list of filter-function-strings - */ - val defaultQualityFilters: Map> = mapOf( - "covid-19" to defaultCovid19QualityCheck, - "CsvFileTests-topic" to listOf("hasValidDataFor(lab,state,test_time,specimen_id,observation)"), - ) - - /** - * filterFunction must be of form "funName(arg1, arg2, etc)" - */ - fun parseJurisdictionalFilter(filterFunction: String): Pair> { -// REMOVE val match = Regex("([a-zA-Z0-9]+)\\x28([a-z, \\x2E_\\x2DA-Z0-9]*)\\x29").find(filterFunction) - // Using a permissive match in the (arg1, arg2) section, to allow most regexs to be passed as args. - // Somehow this works with "(?i).*Pima.*", I guess because the \\x29 matches rightmost ')' char - val match = Regex("([a-zA-Z0-9]+)\\x28(.*)\\x29").find(filterFunction) - ?: error("JurisdictionalFilter field $filterFunction does not parse") - return match.groupValues[1] to match.groupValues[2].split(',').map { it.trim() } - } - - fun logAllEliminated(beforeSize: Int, filterDescription: String, receiver: Receiver, doAuditing: Boolean) { - if (!doAuditing) return - logger.warn( - "For ${receiver.fullName}, qualityFilter $filterDescription" + - " reduced the Items from $beforeSize to 0. All rows eliminated" - ) - } - - fun logFiltering( - before: Selection, - after: Selection, - filterDescription: String, - receiver: Receiver, - doAuditing: Boolean - ) { - if (!doAuditing) return - - if (after.size() < before.size()) { - if (after.size() == 0) { - logAllEliminated(before.size(), filterDescription, receiver, true) - } else { - // Note: the expression 'before.andNot(after)' actually changes the 'before' obj! - val beforeSize = before.size() - val eliminatedRows = before.andNot(after) - logger.warn( - "For ${receiver.fullName}, qualityFilter $filterDescription" + - " reduced the Item count from $beforeSize to ${after.size()}. " + - "Row numbers eliminated: ${eliminatedRows.joinToString(",")}" - ) - } - } - } } \ No newline at end of file diff --git a/prime-router/src/main/kotlin/Translator.kt b/prime-router/src/main/kotlin/Translator.kt index b6cd069271d..66bb627a4b4 100644 --- a/prime-router/src/main/kotlin/Translator.kt +++ b/prime-router/src/main/kotlin/Translator.kt @@ -38,10 +38,13 @@ class Translator(private val metadata: Metadata, private val settings: SettingsP (limitReceiversTo.isEmpty() || limitReceiversTo.contains(receiver.fullName)) }.mapNotNull { receiver -> try { - val jurisFilteredReport = filterByJurisdiction(input, receiver) - if (jurisFilteredReport.isEmpty()) return@mapNotNull null - val mappedReport = translateByReceiver(jurisFilteredReport, receiver, defaultValues) - Pair(mappedReport, receiver) + // Filter the report + val filteredReport = filterByAllFilterTypes(settings, input, receiver) ?: return@mapNotNull null + if (filteredReport.isEmpty()) return@mapNotNull Pair(filteredReport, receiver) + + // Translate the filteredReport + val translatedReport = translateByReceiver(filteredReport, receiver, defaultValues) + Pair(translatedReport, receiver) } catch (e: IllegalStateException) { // catching individual translation exceptions enables overall work to continue warnings?.let { @@ -59,70 +62,169 @@ class Translator(private val metadata: Metadata, private val settings: SettingsP } /** - * Determine if a report should be sent to the reciever based on that receiver's - * jurisdiction filter. + * Determine which data in [input] should be sent to the [receiver] based on that receiver's set of filters. + * Apply all the different filter types (at this writing, jurisdictionalFilter, qualityFilter, routingFilter). + * + * @return the filtered Report. Returns null if jurisdictionalFilter had no matches, which is quite common, + * since most geographic locations don't match, and we don't need to log this. Returns empty report if any + * of the later filters removed everything, but even when empty, this report has useful [ReportStreamFilterResult] + * to be logged. */ - private fun filterByJurisdiction(input: Report, receiver: Receiver): Report { - // Filter according to this receiver's desired JurisdictionalFilter patterns - val jurisFilterAndArgs = receiver.jurisdictionalFilter.map { filterSpec -> - val (fnName, fnArgs) = JurisdictionalFilters.parseJurisdictionalFilter(filterSpec) - val filterFn = metadata.findJurisdictionalFilter(fnName) - ?: error("JurisdictionalFilter $fnName is not found") - Pair(filterFn, fnArgs) + fun filterByAllFilterTypes(settings: SettingsProvider, input: Report, receiver: Receiver): Report? { + val organization = settings.findOrganization(receiver.organizationName) + ?: error("No org for ${receiver.fullName}") + + // This has to be the trackingElement of the incoming data, not the outgoing receiver. + var trackingElement = input.schema.trackingElement // might be null + if (!trackingElement.isNullOrBlank() && !input.schema.containsElement(trackingElement)) { + // I've seen cases where the trackingElement is not in the schema(!!) (see az/az-covid-19-csv) + // Nulling this out to avoid exceptions later. + trackingElement = null } - val jurisFilteredReport = input.filter(jurisFilterAndArgs, receiver, isQualityFilter = false) - return jurisFilteredReport + // Do jurisdictionalFiltering on the input + val jurisFilteredReport = filterByOneFilterType( + input, + receiver, + organization, + ReportStreamFilterType.JURISDICTIONAL_FILTER, + trackingElement, + doLogging = false, + ) + // vast majority of receivers will return here, which speeds subsequent filters. + // ok to just return null, we don't need any info about what was eliminated. + if (jurisFilteredReport.isEmpty()) return null + + // Do qualityFiltering on the jurisFilteredReport + val qualityFilteredReport = filterByOneFilterType( + jurisFilteredReport, + receiver, + organization, + ReportStreamFilterType.QUALITY_FILTER, + trackingElement, + doLogging = !receiver.reverseTheQualityFilter + ) + if (qualityFilteredReport.isEmpty()) return qualityFilteredReport + + // Do routingFiltering on the qualityFilteredReport + val routingFilteredReport = filterByOneFilterType( + qualityFilteredReport, + receiver, + organization, + ReportStreamFilterType.ROUTING_FILTER, + trackingElement, + doLogging = true // quality and routing info will go together into the report's filteredItems + ) + if (routingFilteredReport.isEmpty()) return routingFilteredReport + + // Do processingModeFiltering on the routingFilteredReport + val processingModeFilteredReport = filterByOneFilterType( + routingFilteredReport, + receiver, + organization, + ReportStreamFilterType.PROCESSING_MODE_FILTER, + trackingElement, + doLogging = true + ) + if (processingModeFilteredReport.isEmpty()) return processingModeFilteredReport + + return processingModeFilteredReport } /** - * Filter a [input] report for a [receiver] by that receiver's qualityFilter and - * then translate the filtered report based on the receiver's schema. + * Apply a set of ReportStreamFilters associated with a [filterType] to report [input]. eg, Apply one of: + * jurisdictionalFilter, qualityFilter, and routingFilter. + * + * Filter usages can be defined at three different levels: default, organization-level, and receiver-level. + * The [receiver] has only one topic (eg, 'covid-19'), but its [organization] can handle many topics, so + * we must look up default- and organization-level filters per topic. + * Any/all of the three levels are allowed to be null. If all are null, we do no filtering for this filterType. + * + * @return the filtered report. Might be empty. Might be unchanged if no filtering was done. */ - public fun translateByReceiver( + fun filterByOneFilterType( input: Report, receiver: Receiver, - defaultValues: DefaultValues = emptyMap() + organization: Organization, + filterType: ReportStreamFilterType, + trackingElement: String?, + doLogging: Boolean, ): Report { - // Now filter according to this receiver's desired qualityFilter, or default filter if none found. - val qualityFilter = when { - receiver.qualityFilter.isNotEmpty() -> receiver.qualityFilter - JurisdictionalFilters.defaultQualityFilters[receiver.topic] != null -> - JurisdictionalFilters.defaultQualityFilters[receiver.topic]!! + // First, retrieve the default filter for this topic and filterType + val defaultFilters = ReportStreamFilters.defaultFiltersByTopic[receiver.topic] + val defaultFilter = if (defaultFilters != null) + filterType.filterProperty.get(defaultFilters) + else + null + + // Next, retrieve the organization-level filter for this topic and filterType + val orgFilters = organization.filters?.find { it.topic == receiver.topic } + var orgFilter = if (orgFilters != null) + filterType.filterProperty.get(orgFilters) + else + null + if (orgFilter.isNullOrEmpty()) orgFilter = null // force null to avoid empty strings + + // Last, retrieve the receiver-level filter for this filter type. + var receiverFilter: ReportStreamFilter? = filterType.receiverFilterProperty.get(receiver) as ReportStreamFilter + if (receiverFilter.isNullOrEmpty()) receiverFilter = null // force null to be consistent with org and default. + + // Use the "and" of the org filter and receiver filter if either or both exists - and override the default. + // Otherwise use the default. + val filterToApply: ReportStreamFilter = when { + (orgFilter != null && receiverFilter != null) -> orgFilter + receiverFilter + (orgFilter == null && receiverFilter != null) -> receiverFilter + (orgFilter != null && receiverFilter == null) -> orgFilter + (defaultFilter != null) -> defaultFilter else -> { - logger.info("No default qualityFilter found for topic ${receiver.topic}. Not doing qual filtering") - emptyList() + // Probably an error if there's no defaultFilter + logger.error("NOT ${filterType.name} filtering for topic ${receiver.topic}. No filters found.") + emptyList() } } - val qualityFilterAndArgs = qualityFilter.map { filterSpec -> - val (fnName, fnArgs) = JurisdictionalFilters.parseJurisdictionalFilter(filterSpec) - val filterFn = metadata.findJurisdictionalFilter(fnName) - ?: error("qualityFilter $fnName is not found in list of JurisdictionalFilters") + + // This weird obj is of type List>> + val filterAndArgs = filterToApply.map { filterSpec -> + val (fnName, fnArgs) = ReportStreamFilterDefinition.parseReportStreamFilter(filterSpec) + val filterFn = metadata.findReportStreamFilterDefinitions(fnName) + ?: error("Cannot find ReportStreamFilter Definition for $fnName") Pair(filterFn, fnArgs) } - val qualityFilteredReport = input.filter( - qualityFilterAndArgs, + + val filteredReport = input.filter( + filterAndArgs, receiver, - isQualityFilter = true, - receiver.reverseTheQualityFilter + doLogging, + trackingElement, + // the reverseTheQualityFilter flag only applies for qualityFilters + if (filterType == ReportStreamFilterType.QUALITY_FILTER) receiver.reverseTheQualityFilter else false ) - if (qualityFilteredReport.itemCount != input.itemCount) { + if (doLogging && filteredReport.itemCount != input.itemCount) { logger.warn( - "Data quality problem in report ${input.id}, receiver ${receiver.fullName}: " + - "There were ${input.itemCount} rows prior to qualityFilter, and " + - "${qualityFilteredReport.itemCount} rows after qualityFilter." + "Filtering occurred in report ${input.id}, receiver ${receiver.fullName}: " + + "There were ${input.itemCount} rows prior to ${filterType.name}, and " + + "${filteredReport.itemCount} rows after ${filterType.name}." ) } + return filteredReport + } - if (qualityFilteredReport.isEmpty()) return qualityFilteredReport - + /** + * Filter a [input] report for a [receiver] by that receiver's qualityFilter and + * then translate the filtered report based on the receiver's schema. + */ + public fun translateByReceiver( + input: Report, + receiver: Receiver, + defaultValues: DefaultValues = emptyMap() + ): Report { // Apply mapping to change schema - val toReport: Report = if (receiver.schemaName != qualityFilteredReport.schema.name) { + val toReport: Report = if (receiver.schemaName != input.schema.name) { val toSchema = metadata.findSchema(receiver.schemaName) ?: error("${receiver.schemaName} schema is missing from catalog") val receiverDefaults = receiver.translation.defaults val defaults = if (receiverDefaults.isNotEmpty()) receiverDefaults.plus(defaultValues) else defaultValues - val mapping = buildMapping(toSchema, qualityFilteredReport.schema, defaults) + val mapping = buildMapping(toSchema, input.schema, defaults) if (mapping.missing.isNotEmpty()) { error( "Error: To translate to ${receiver.fullName}, ${toSchema.name}, these elements are missing: ${ @@ -132,9 +234,9 @@ class Translator(private val metadata: Metadata, private val settings: SettingsP }" ) } - qualityFilteredReport.applyMapping(mapping) + input.applyMapping(mapping) } else { - qualityFilteredReport + input } // Transform reports @@ -142,12 +244,13 @@ class Translator(private val metadata: Metadata, private val settings: SettingsP if (receiver.deidentify) transformed = transformed.deidentify() var copy = transformed.copy(destination = receiver, bodyFormat = receiver.format) - copy.filteredItems.addAll(qualityFilteredReport.filteredItems) + copy.filteringResults.addAll(input.filteringResults) return copy } /** * Translate one report to another schema. Translate all items. + * @todo get rid of this - not used? */ fun translate(input: Report, toSchema: Schema, defaultValues: DefaultValues = emptyMap()): Report { val mapping = buildMapping(toSchema = toSchema, fromSchema = input.schema, defaultValues) diff --git a/prime-router/src/main/kotlin/azure/ActionHistory.kt b/prime-router/src/main/kotlin/azure/ActionHistory.kt index 37e271c83c8..fea9d106380 100644 --- a/prime-router/src/main/kotlin/azure/ActionHistory.kt +++ b/prime-router/src/main/kotlin/azure/ActionHistory.kt @@ -8,10 +8,10 @@ import com.microsoft.azure.functions.HttpResponseMessage import gov.cdc.prime.router.ClientSource import gov.cdc.prime.router.Options import gov.cdc.prime.router.Organization -import gov.cdc.prime.router.QualityFilterResult import gov.cdc.prime.router.Receiver import gov.cdc.prime.router.Report import gov.cdc.prime.router.ReportId +import gov.cdc.prime.router.ReportStreamFilterResult import gov.cdc.prime.router.ResultDetail import gov.cdc.prime.router.Sender import gov.cdc.prime.router.SettingsProvider @@ -100,7 +100,7 @@ class ActionHistory { /** * List of rows per report that have been filtered out based on quality. */ - val filteredReportRows = mutableMapOf>() + val filteredReportRows = mutableMapOf>() /** * Messages to be queued in an azure queue as part of the result of this action. @@ -339,7 +339,7 @@ class ActionHistory { reportFile.schemaTopic = report.schema.topic reportFile.itemCount = report.itemCount filteredOutReports[reportFile.reportId] = reportFile - filteredReportRows[reportFile.reportId] = report.filteredItems + filteredReportRows[reportFile.reportId] = report.filteringResults } /** @@ -369,7 +369,7 @@ class ActionHistory { reportFile.blobDigest = blobInfo.digest reportFile.itemCount = report.itemCount reportsOut[reportFile.reportId] = reportFile - filteredReportRows[reportFile.reportId] = report.filteredItems + filteredReportRows[reportFile.reportId] = report.filteringResults trackItemLineages(report) trackEvent(event) // to be sent to queue later. } @@ -394,7 +394,7 @@ class ActionHistory { reportFile.blobDigest = blobInfo.digest reportFile.itemCount = report.itemCount reportsOut[reportFile.reportId] = reportFile - filteredReportRows[reportFile.reportId] = report.filteredItems + filteredReportRows[reportFile.reportId] = report.filteringResults trackItemLineages(report) trackEvent(event) // to be sent to queue later. } diff --git a/prime-router/src/main/kotlin/azure/SettingsFacade.kt b/prime-router/src/main/kotlin/azure/SettingsFacade.kt index 48c31d6bede..c84114693d1 100644 --- a/prime-router/src/main/kotlin/azure/SettingsFacade.kt +++ b/prime-router/src/main/kotlin/azure/SettingsFacade.kt @@ -9,6 +9,8 @@ import gov.cdc.prime.router.CustomerStatus import gov.cdc.prime.router.Metadata import gov.cdc.prime.router.Organization import gov.cdc.prime.router.Receiver +import gov.cdc.prime.router.ReportStreamFilter +import gov.cdc.prime.router.ReportStreamFilters import gov.cdc.prime.router.Sender import gov.cdc.prime.router.SettingsProvider import gov.cdc.prime.router.TranslatorConfiguration @@ -298,8 +300,9 @@ class OrganizationAPI jurisdiction: Jurisdiction, stateCode: String?, countyName: String?, + filters: List?, override var meta: SettingMetadata?, -) : Organization(name, description, jurisdiction, stateCode, countyName), SettingAPI { +) : Organization(name, description, jurisdiction, stateCode, countyName, filters), SettingAPI { @get:JsonIgnore override val organizationName: String? = null override fun consistencyErrorMessage(metadata: Metadata): String? { return this.consistencyErrorMessage() } @@ -331,8 +334,10 @@ class ReceiverAPI topic: String, customerStatus: CustomerStatus = CustomerStatus.INACTIVE, translation: TranslatorConfiguration, - jurisdictionalFilter: List = emptyList(), - qualityFilter: List = emptyList(), + jurisdictionalFilter: ReportStreamFilter = emptyList(), + qualityFilter: ReportStreamFilter = emptyList(), + routingFilter: ReportStreamFilter = emptyList(), + processingModeFilter: ReportStreamFilter = emptyList(), reverseTheQualityFilter: Boolean = false, deidentify: Boolean = false, timing: Timing? = null, @@ -347,6 +352,8 @@ class ReceiverAPI translation, jurisdictionalFilter, qualityFilter, + routingFilter, + processingModeFilter, reverseTheQualityFilter, deidentify, timing, diff --git a/prime-router/src/main/kotlin/azure/WorkflowEngine.kt b/prime-router/src/main/kotlin/azure/WorkflowEngine.kt index 9c3da5173ed..40d9fe0050a 100644 --- a/prime-router/src/main/kotlin/azure/WorkflowEngine.kt +++ b/prime-router/src/main/kotlin/azure/WorkflowEngine.kt @@ -420,7 +420,7 @@ class WorkflowEngine( ).partition { (report, _) -> report.isEmpty() } emptyReports.forEach { (report, receiver) -> - if (!report.filteredItems.isEmpty()) { + if (!report.filteringResults.isEmpty()) { actionHistory.trackFilteredReport(report, receiver) } } @@ -466,7 +466,7 @@ class WorkflowEngine( loggerMsg = "Queue: ${event.toQueueMessage()}" } receiver.format.isSingleItemFormat -> { - report.filteredItems.forEach { + report.filteringResults.forEach { val emptyReport = Report( report.schema, emptyList(), @@ -475,7 +475,7 @@ class WorkflowEngine( bodyFormat = report.bodyFormat, metadata = Metadata.getInstance() ) - emptyReport.filteredItems.add(it) + emptyReport.filteringResults.add(it) actionHistory.trackFilteredReport(emptyReport, receiver) } diff --git a/prime-router/src/test/kotlin/JurisdictionalFilterTests.kt b/prime-router/src/test/kotlin/ReportStreamFilterDefinitionTests.kt similarity index 93% rename from prime-router/src/test/kotlin/JurisdictionalFilterTests.kt rename to prime-router/src/test/kotlin/ReportStreamFilterDefinitionTests.kt index 36da0607055..92501ea88a0 100644 --- a/prime-router/src/test/kotlin/JurisdictionalFilterTests.kt +++ b/prime-router/src/test/kotlin/ReportStreamFilterDefinitionTests.kt @@ -11,7 +11,7 @@ import tech.tablesaw.api.StringColumn import tech.tablesaw.api.Table import kotlin.test.Test -class JurisdictionalFilterTests { +class ReportStreamFilterDefinitionTests { private val rcvr = Receiver("name", "org", "topic", CustomerStatus.INACTIVE, "schema", Report.Format.CSV) @@ -354,6 +354,36 @@ class JurisdictionalFilterTests { assertThat(filteredTable).hasRowCount(4) } + @Test + fun `test allowNone`() { + val filter = AllowNone() + val table = Table.create( + StringColumn.create("colA", listOf("A1", "A2", "", "A4")), + StringColumn.create("colB", listOf("B1", "B2", "B3", "B4")), + StringColumn.create("colC", listOf("C1", "C2", "C3", null)) + ) + + // AllowNone takes no args, so passing args is an exception + val colName = listOf("colA", "colB") + assertThat { filter.getSelection(colName, table, rcvr) }.isFailure() + + val emptyArgs = listOf() + val selection = filter.getSelection(emptyArgs, table, rcvr) + val filteredTable = table.where(selection) + // And Then There Were None + assertThat(filteredTable).hasRowCount(0) + + val emptyChairsAndEmptyTables = Table.create( + StringColumn.create("colA"), + StringColumn.create("colB"), + StringColumn.create("colC") + ) + val selection2 = filter.getSelection(emptyArgs, emptyChairsAndEmptyTables, rcvr) + val filteredTable2 = table.where(selection2) + // Nothing will come of nothing + assertThat(filteredTable2).hasRowCount(0) + } + @Test fun `test IsValidCLIA`() { val filter = IsValidCLIA() diff --git a/prime-router/src/test/kotlin/ReportTests.kt b/prime-router/src/test/kotlin/ReportTests.kt index e98786b71a1..028b7a81b0f 100644 --- a/prime-router/src/test/kotlin/ReportTests.kt +++ b/prime-router/src/test/kotlin/ReportTests.kt @@ -38,10 +38,12 @@ class ReportTests { fun `test filter`() { val one = Schema(name = "one", topic = "test", elements = listOf(Element("a"), Element("b"))) val metadata = Metadata(schema = one) - val jurisdictionalFilter = metadata.findJurisdictionalFilter("matches") ?: fail("cannot find filter") + val jurisdictionalFilter = metadata.findReportStreamFilterDefinitions("matches") ?: fail("cannot find filter") val report1 = Report(one, listOf(listOf("1", "2"), listOf("3", "4")), source = TestSource, metadata = metadata) assertThat(report1.itemCount).isEqualTo(2) - val filteredReport = report1.filter(listOf(Pair(jurisdictionalFilter, listOf("a", "1"))), rcvr, false) + val filteredReport = report1.filter( + listOf(Pair(jurisdictionalFilter, listOf("a", "1"))), rcvr, false, one.trackingElement, + ) assertThat(filteredReport.schema).isEqualTo(one) assertThat(filteredReport.itemCount).isEqualTo(1) assertThat(filteredReport.getString(0, "b")).isEqualTo("2") @@ -52,7 +54,7 @@ class ReportTests { fun `test multiarg matches filter`() { val one = Schema(name = "one", topic = "test", elements = listOf(Element("a"), Element("b"))) val metadata = Metadata(schema = one) - val jurisdictionalFilter = metadata.findJurisdictionalFilter("matches") ?: fail("cannot find filter") + val jurisdictionalFilter = metadata.findReportStreamFilterDefinitions("matches") ?: fail("cannot find filter") // each sublist is a row. val report1 = Report( one, listOf(listOf("row1_a", "row1_b"), listOf("row2_a", "row2_b")), source = TestSource, @@ -60,27 +62,29 @@ class ReportTests { ) assertThat(2).isEqualTo(report1.itemCount) val filteredReportA = report1.filter( - listOf(Pair(jurisdictionalFilter, listOf("a", "row1.*", "row2_a"))), rcvr, false + listOf(Pair(jurisdictionalFilter, listOf("a", "row1.*", "row2_a"))), rcvr, false, one.trackingElement ) assertThat(filteredReportA.itemCount).isEqualTo(2) assertThat(filteredReportA.getString(0, "b")).isEqualTo("row1_b") assertThat(filteredReportA.getString(1, "b")).isEqualTo("row2_b") val filteredReportB = report1.filter( - listOf(Pair(jurisdictionalFilter, listOf("a", "row.*"))), rcvr, false + listOf(Pair(jurisdictionalFilter, listOf("a", "row.*"))), rcvr, false, one.trackingElement ) assertThat(filteredReportA.itemCount).isEqualTo(2) assertThat(filteredReportB.getString(0, "b")).isEqualTo("row1_b") assertThat(filteredReportB.getString(1, "b")).isEqualTo("row2_b") val filteredReportC = report1.filter( - listOf(Pair(jurisdictionalFilter, listOf("a", "row1_a", "foo", "bar", "baz"))), rcvr, false + listOf(Pair(jurisdictionalFilter, listOf("a", "row1_a", "foo", "bar", "baz"))), + rcvr, false, one.trackingElement ) assertThat(filteredReportC.itemCount).isEqualTo(1) assertThat(filteredReportC.getString(0, "b")).isEqualTo("row1_b") val filteredReportD = report1.filter( - listOf(Pair(jurisdictionalFilter, listOf("a", "argle", "bargle"))), rcvr, false + listOf(Pair(jurisdictionalFilter, listOf("a", "argle", "bargle"))), + rcvr, false, one.trackingElement ) assertThat(filteredReportD.itemCount).isEqualTo(0) } @@ -318,14 +322,16 @@ class ReportTests { fun `test item lineage after jurisdictional filter`() { val schema = Schema(name = "one", topic = "test", elements = listOf(Element("a")), trackingElement = "a") val metadata = Metadata(schema = schema) - val jurisdictionalFilter = metadata.findJurisdictionalFilter("matches") ?: fail("cannot find filter") + val jurisdictionalFilter = metadata.findReportStreamFilterDefinitions("matches") ?: fail("cannot find filter") // each sublist is a row. val report1 = Report( schema, listOf(listOf("rep1_row1_a"), listOf("rep1_row2_a")), source = TestSource, metadata = metadata ) - val filteredReport = report1.filter(listOf(Pair(jurisdictionalFilter, listOf("a", "rep1_row2_a"))), rcvr, false) + val filteredReport = report1.filter( + listOf(Pair(jurisdictionalFilter, listOf("a", "rep1_row2_a"))), rcvr, false, schema.trackingElement + ) val lineage = filteredReport.itemLineages!! assertThat(lineage.size).isEqualTo(1) @@ -380,7 +386,7 @@ class ReportTests { metadata = metadata ) val metadata = Metadata(schema = schema) - val jurisdictionalFilter = metadata.findJurisdictionalFilter("matches") ?: fail("cannot find filter") + val jurisdictionalFilter = metadata.findReportStreamFilterDefinitions("matches") ?: fail("cannot find filter") // split, merge, split, merge, copy, copy, then filter. val reports1 = report1.split() @@ -389,7 +395,9 @@ class ReportTests { val merge2 = Report.merge(reports2) val copy1 = merge2.copy() val copy2 = copy1.copy() - val filteredReport = copy2.filter(listOf(Pair(jurisdictionalFilter, listOf("a", "aaa"))), rcvr, false) + val filteredReport = copy2.filter( + listOf(Pair(jurisdictionalFilter, listOf("a", "aaa"))), rcvr, false, schema.trackingElement + ) val lineage = filteredReport.itemLineages!! assertThat(lineage.size).isEqualTo(2) diff --git a/prime-router/src/test/kotlin/TranslatorTests.kt b/prime-router/src/test/kotlin/TranslatorTests.kt index 87423824f82..145ee847359 100644 --- a/prime-router/src/test/kotlin/TranslatorTests.kt +++ b/prime-router/src/test/kotlin/TranslatorTests.kt @@ -2,6 +2,7 @@ package gov.cdc.prime.router import assertk.assertThat import assertk.assertions.isEqualTo +import assertk.assertions.isNotNull import assertk.assertions.isTrue import gov.cdc.prime.router.unittest.UnitTestUtils import java.io.ByteArrayInputStream @@ -16,6 +17,13 @@ class TranslatorTests { description: Arizona PHD jurisdiction: STATE stateCode: AZ + filters: + # override the default set for any of the filters, to get a clean test + - topic: test + jurisdictionalFilter: [ "allowAll()" ] + qualityFilter: [ "allowAll()" ] + routingFilter: [ "allowAll()" ] + processingModeFilter: [ "allowAll()" ] receivers: - name: elr organizationName: phd1 @@ -28,8 +36,289 @@ class TranslatorTests { format: CSV """.trimIndent() + /** + * This covers several test cases: + * jurisdictionalFilter : default is missing, but both org and receiver level filters are applied. + * qualityFilter: has a default, and only org level filter is applied (no receiver level filtering) + * routingFilter: has a default, and only receiver level filter is applied (no org level filtering) + */ + private val filterTestYaml = """ + --- + - name: phd + description: Piled Higher and Deeper + jurisdiction: STATE + filters: + - topic: test + jurisdictionalFilter: [ "matches(b,true)" ] + qualityFilter: [ "matches(b,true)" ] + # Missing routingFilter + stateCode: IG + receivers: + - name: elr + organizationName: phd + topic: test + customerStatus: active + jurisdictionalFilter: [ "matches(a,yes)"] + # Missing qualityFilter + routingFilter: [ "matches(a,yes)"] + translation: + type: CUSTOM + schemaName: two + format: CSV + """.trimIndent() + + /** + * This covers several more test cases: + * jurisdictionalFilter : all are missing: default, org, receiver + * qualityFilter: has a default, but org and receiver filters are both missing. AND its reversed! + * routingFilter: has a default, but org and receiver filters are both missing. + */ + private val onlyDefaultFiltersYaml = """ + --- + - name: xyzzy + description: A maze of twisty passages, all alike + jurisdiction: STATE + stateCode: IG + receivers: + - name: elr + organizationName: xyzzy + topic: test + customerStatus: active + reverseTheQualityFilter: true + translation: + type: CUSTOM + schemaName: two + format: CSV + """.trimIndent() + private val one = Schema(name = "one", topic = "test", elements = listOf(Element("a"))) + @Test + fun `test filterByOneFilterType`() { + val mySchema = Schema( + name = "two", topic = "test", trackingElement = "id", + elements = listOf(Element("id"), Element("a"), Element("b")) + ) + val metadata = UnitTestUtils.simpleMetadata.loadSchemas(mySchema) + val settings = FileSettings().also { + it.loadOrganizations(ByteArrayInputStream(filterTestYaml.toByteArray())) + } + val translator = Translator(metadata, settings) + // Table has 4 rows and 2 columns. + val table1 = Report( + mySchema, + listOf( + listOf("0", "yes", "true"), // row 0 + listOf("1", "no", "true"), + listOf("2", "yes", "false"), + listOf("3", "no", "false"), // row 3 + ), + TestSource, + metadata = metadata + ) + val rcvr = settings.findReceiver("phd.elr") + assertThat(rcvr).isNotNull() + val org = settings.findOrganization("phd") + assertThat(org).isNotNull() + // Juris filter: No default exists, both org and receiver exist. + translator.filterByOneFilterType( + table1, rcvr!!, org!!, ReportStreamFilterType.JURISDICTIONAL_FILTER, mySchema.trackingElement, true + ).run { + assertThat(this.itemCount).isEqualTo(1) + assertThat(this.getRow(0)[0]).isEqualTo("0") // row 0 is only one left. + assertThat(this.filteringResults.size).isEqualTo(2) // two rows eliminated, and two filter messages. + assertThat(this.filteringResults[0].filteredCount).isEqualTo(2) // rows 2 and 3 eliminated (zero based) + assertThat(this.filteringResults[0].filteredTrackingElements[0]).isEqualTo("2") + assertThat(this.filteringResults[0].filteredTrackingElements[1]).isEqualTo("3") + assertThat(this.filteringResults[1].filteredCount).isEqualTo(2) // rows 1 and 3 eliminated (zero based) + assertThat(this.filteringResults[1].filteredTrackingElements[0]).isEqualTo("1") + assertThat(this.filteringResults[1].filteredTrackingElements[1]).isEqualTo("3") + } + // Quality filter: Override the default; org filter exists. No receiver filter. + translator.filterByOneFilterType( + table1, rcvr, org, ReportStreamFilterType.QUALITY_FILTER, mySchema.trackingElement, true + ).run { + assertThat(this.itemCount).isEqualTo(2) + assertThat(this.getRow(0)[0]).isEqualTo("0") + assertThat(this.getRow(1)[0]).isEqualTo("1") + assertThat(this.filteringResults.size).isEqualTo(1) // two rows eliminated, but one filter message. + assertThat(this.filteringResults[0].filteredCount).isEqualTo(2) + assertThat(this.filteringResults[0].filteredTrackingElements[0]).isEqualTo("2") + assertThat(this.filteringResults[0].filteredTrackingElements[1]).isEqualTo("3") + } + // Routing filter: Override the default; No org filter. Receiver filter exists. + translator.filterByOneFilterType( + table1, rcvr, org, ReportStreamFilterType.ROUTING_FILTER, mySchema.trackingElement, true + ).run { + assertThat(this.itemCount).isEqualTo(2) + assertThat(this.getRow(0)[0]).isEqualTo("0") + assertThat(this.getRow(1)[0]).isEqualTo("2") + assertThat(this.filteringResults.size).isEqualTo(1) // two rows eliminated, but one filter message. + assertThat(this.filteringResults[0].filteredCount).isEqualTo(2) + assertThat(this.filteringResults[0].filteredTrackingElements[0]).isEqualTo("1") + assertThat(this.filteringResults[0].filteredTrackingElements[1]).isEqualTo("3") + } + } + + @Test + fun `test filterByOneFilterType Defaults`() { + val mySchema = Schema( + name = "two", topic = "test", trackingElement = "id", + elements = listOf(Element("id"), Element("a"), Element("b")) + ) + val metadata = UnitTestUtils.simpleMetadata.loadSchemas(mySchema) + val settings = FileSettings().also { + it.loadOrganizations(ByteArrayInputStream(onlyDefaultFiltersYaml.toByteArray())) + } + val translator = Translator(metadata, settings) + // Table has 4 rows and 2 columns. + val table1 = Report( + mySchema, + listOf( + listOf("0", "yes", "true"), // row 0 + listOf("1", "no", "true"), + listOf("2", "yes", "false"), + listOf("3", "no", "false"), // row 3 + ), + TestSource, + metadata = metadata + ) + val rcvr = settings.findReceiver("xyzzy.elr") + assertThat(rcvr).isNotNull() + val org = settings.findOrganization("xyzzy") + assertThat(org).isNotNull() + // Juris filter: No default, org, or receiver filters exist. No filtering done. + translator.filterByOneFilterType( + table1, rcvr!!, org!!, ReportStreamFilterType.JURISDICTIONAL_FILTER, mySchema.trackingElement, true + ).run { + assertThat(this.itemCount).isEqualTo(4) + // just confirm the first and last rows + assertThat(this.getRow(0)[0]).isEqualTo("0") + assertThat(this.getRow(1)[0]).isEqualTo("1") + assertThat(this.getRow(2)[0]).isEqualTo("2") + assertThat(this.getRow(3)[0]).isEqualTo("3") + assertThat(this.filteringResults.size).isEqualTo(0) // logging turned on, but no rows eliminated. + } + // Quality filter: Default rules apply only. No org or receiver level filters. And its reversed! + translator.filterByOneFilterType( + table1, rcvr, org, ReportStreamFilterType.QUALITY_FILTER, mySchema.trackingElement, false + ).run { + assertThat(this.itemCount).isEqualTo(2) + assertThat(this.getRow(0)[0]).isEqualTo("0") + assertThat(this.getRow(1)[0]).isEqualTo("2") + assertThat(this.filteringResults.size).isEqualTo(0) // no logging done. + } + // Routing filter: Default rules apply only. No org or receiver level filters. No weird reversing. + translator.filterByOneFilterType( + table1, rcvr, org, ReportStreamFilterType.ROUTING_FILTER, mySchema.trackingElement, true + ).run { + assertThat(this.itemCount).isEqualTo(2) + assertThat(this.getRow(0)[0]).isEqualTo("2") + assertThat(this.getRow(1)[0]).isEqualTo("3") + assertThat(this.filteringResults.size).isEqualTo(1) // two rows eliminated, by one rule. + assertThat(this.filteringResults[0].filteredCount).isEqualTo(2) // rows 0 and 1 eliminated (zero based) + assertThat(this.filteringResults[0].filteredTrackingElements[0]).isEqualTo("0") + assertThat(this.filteringResults[0].filteredTrackingElements[1]).isEqualTo("1") + } + } + + @Test + fun `test filterByAllFilterTypes`() { + val mySchema = Schema( + name = "two", topic = "test", trackingElement = "id", + elements = listOf(Element("id"), Element("a"), Element("b")) + ) + val metadata = UnitTestUtils.simpleMetadata.loadSchemas(mySchema) + val settings = FileSettings().also { + it.loadOrganizations(ByteArrayInputStream(filterTestYaml.toByteArray())) + } + val translator = Translator(metadata, settings) + // Table has 4 rows and 2 columns. + val table1 = Report( + mySchema, + listOf( + listOf("0", "yes", "true"), // row 0 + listOf("1", "no", "true"), + listOf("2", "yes", "false"), + listOf("3", "no", "false"), // row 3 + ), + TestSource, + metadata = metadata + ) + val rcvr = settings.findReceiver("phd.elr") + assertThat(rcvr).isNotNull() + // Juris filter eliminates rows 1,2,3 (zero based), but does not create filteredItem entries. + translator.filterByAllFilterTypes(settings, table1, rcvr!!).run { + assertThat(this).isNotNull() + assertThat(this!!.itemCount).isEqualTo(1) + assertThat(this.getRow(0)[0]).isEqualTo("0") // row 0 + assertThat(this.filteringResults.size).isEqualTo(0) // three rows eliminated, but nothing logged. + } + + val settings2 = FileSettings().also { + it.loadOrganizations(ByteArrayInputStream(onlyDefaultFiltersYaml.toByteArray())) + } + val rcvr2 = settings2.findReceiver("xyzzy.elr") + assertThat(rcvr2).isNotNull() + // No juris filtering done. + // Default matches a = "no" in qualityFilter, but its reversed. So original rows 1,3 eliminated, rows 0,2 kept + // But: no logging because its reversed! + // Not done yet! Then keep b = "false" in routingFilter. + // So row 2 is kept ("yes", "false") + translator.filterByAllFilterTypes(settings2, table1, rcvr2!!).run { + assertThat(this).isNotNull() + assertThat(this!!.itemCount).isEqualTo(1) + assertThat(this.getRow(0)[0]).isEqualTo("2") + assertThat(this.filteringResults.size).isEqualTo(1) // three rows eliminated, only routingFilter message. + assertThat(this.filteringResults[0].filteredCount).isEqualTo(1) // rows 0 eliminated + assertThat(this.filteringResults[0].filteredTrackingElements[0]).isEqualTo("0") + } + } + + @Test + fun `test filter with missing tracking element value`() { + val mySchema = Schema( + name = "one", topic = "test", trackingElement = "id", + elements = listOf(Element("id"), Element("a")) + ) + val metadata = UnitTestUtils.simpleMetadata.loadSchemas(mySchema) + val settings = FileSettings().also { + it.loadOrganizations(ByteArrayInputStream(receiversYaml.toByteArray())) + } + val org = settings.findOrganization("phd1") + val translator = Translator(metadata, settings) + // Table has 4 rows and 2 columns. + val table1 = Report( + mySchema, + listOf( + listOf("x", "1"), + listOf("", "1"), // missing trackingElement value + listOf("y", "2"), + listOf("", "2"), // missing trackingElement value + ), + TestSource, + metadata = metadata + ) + val rcvr = settings.findReceiver("phd1.elr") + assertThat(rcvr).isNotNull() + assertThat(org).isNotNull() + // Juris filter keeps data where on a == 1. Run with logging on, to force creation of [filteringResults]. + translator.filterByOneFilterType( + table1, rcvr!!, org!!, ReportStreamFilterType.JURISDICTIONAL_FILTER, mySchema.trackingElement, true + ).run { + assertThat(this).isNotNull() + assertThat(this.itemCount).isEqualTo(2) + assertThat(this.getRow(0)[0]).isEqualTo("x") // row 0 + assertThat(this.getRow(1)[0]).isEqualTo("") // row 0 + assertThat(this.filteringResults.size).isEqualTo(1) // two rows eliminated by one filter + assertThat(this.filteringResults[0].filteredCount).isEqualTo(2) + assertThat(this.filteringResults[0].filteredTrackingElements[0]).isEqualTo("y") + assertThat(this.filteringResults[0].filteredTrackingElements[1]).isEqualTo( + ReportStreamFilterResult.DEFAULT_TRACKING_VALUE + ) + } + } + @Test fun `test buildMapping`() { val two = Schema(name = "two", topic = "test", elements = listOf(Element("a"), Element("b"))) @@ -53,10 +342,12 @@ class TranslatorTests { @Test fun `test buildMapping with default`() { - val two = Schema(name = "two", topic = "test", elements = listOf(Element("a"), Element("b", default = "x"))) - val metadata = UnitTestUtils.simpleMetadata.loadSchemas(one, two) + val twoWithDefault = Schema( + name = "two", topic = "test", elements = listOf(Element("a"), Element("b", default = "x")) + ) + val metadata = UnitTestUtils.simpleMetadata.loadSchemas(one, twoWithDefault) val translator = Translator(metadata, FileSettings()) - translator.buildMapping(fromSchema = one, toSchema = two, defaultValues = mapOf("b" to "foo")).run { + translator.buildMapping(fromSchema = one, toSchema = twoWithDefault, defaultValues = mapOf("b" to "foo")).run { assertThat(useDefault.contains("b")).isTrue() assertThat(useDefault["b"]).isEqualTo("foo") } @@ -80,14 +371,22 @@ class TranslatorTests { } @Test - fun `test filterAndMapByReceiver`() { - val metadata = UnitTestUtils.simpleMetadata + fun `test filterAndTranslateByReceiver`() { + val theSchema = Schema(name = "one", topic = "test", elements = listOf(Element("a"), Element("b"))) + val metadata = UnitTestUtils.simpleMetadata.loadSchemas(theSchema) val settings = FileSettings().also { it.loadOrganizations(ByteArrayInputStream(receiversYaml.toByteArray())) } val translator = Translator(metadata, settings) - val one = Schema(name = "one", topic = "test", elements = listOf(Element("a"), Element("b"))) - val table1 = Report(one, listOf(listOf("1", "2"), listOf("3", "4")), TestSource, metadata = metadata) + val table1 = Report( + theSchema, + listOf( + listOf("1", "2"), // first row of data + listOf("3", "4"), // second row of data + ), + TestSource, + metadata = metadata + ) translator.filterAndTranslateByReceiver(table1, warnings = mutableListOf()).run { assertThat(this.size).isEqualTo(1) val (mappedTable, forReceiver) = this[0]