Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modifications to support new StructuredFile ingestor #14

Merged
merged 10 commits into from
May 25, 2017
3 changes: 1 addition & 2 deletions build.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
{"README.md": "README"}
],
"etc": {
"configuration/": "",
"specs/schemas": "etl/etl_specs.d/value_analytics"
"configuration/": ""
}
}
}
19 changes: 16 additions & 3 deletions classes/ETL/Ingestor/ValueAnalyticsGrantsPeopleIngestor.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@ class ValueAnalyticsGrantsPeopleIngestor extends StructuredFileIngestor
/**
* @see aIngestor::_execute
*/

// @codingStandardsIgnoreLine
protected function _execute()
{
// Prepare SQL statements for updating various people-related tables.
$sourceValues = $this->executionData['sourceValues'];

// If no data was provided in the file, use the StructuredFile endpoint
if ( null === $sourceValues ) {
$sourceValues = $this->sourceEndpoint;
}

$destinationSchema = $this->destinationEndpoint->getSchema();
$destinationTable = $this->etlDestinationTable->getFullName();

Expand Down Expand Up @@ -71,8 +78,14 @@ protected function _execute()
$this->logAndThrowException("Failed to prepare statement. ({$e->getMessage()})");
}

// For every grant in the source data...
$numRecordsProcessed = 0;

if ( $this->getEtlOverseerOptions()->isDryrun() ) {
return $numRecordsProcessed;
}

// For every grant in the source data...

foreach ($sourceValues as $sourceValue) {
// Get the grant ID.
$grantId = ValueAnalyticsDataFinder::findGrant(
Expand Down Expand Up @@ -153,5 +166,5 @@ protected function _execute()
}

return $numRecordsProcessed;
}
}
} // _execute()
} // class ValueAnalyticsGrantsPeopleIngestor
27 changes: 22 additions & 5 deletions classes/ETL/Ingestor/ValueAnalyticsPeopleIngestor.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@ class ValueAnalyticsPeopleIngestor extends StructuredFileIngestor
/**
* @see aIngestor::_execute
*/

// @codingStandardsIgnoreLine
protected function _execute()
{
// Prepare SQL statements for updating various people-related tables.
$destColumns = $this->executionData['destColumns'];
$destColumnsToSourceKeys = $this->executionData['destColumnsToSourceKeys'];
$sourceValues = $this->executionData['sourceValues'];

// If no data was provided in the file, use the StructuredFile endpoint
if ( null === $sourceValues ) {
$sourceValues = $this->sourceEndpoint;
}

$destinationSchema = $this->destinationEndpoint->getSchema();
$destinationTable = $this->etlDestinationTable->getFullName();

Expand Down Expand Up @@ -147,6 +154,12 @@ protected function _execute()
$this->logAndThrowException("Failed to prepare statement. ({$e->getMessage()})");
}

$numRecordsProcessed = 0;

if ( $this->getEtlOverseerOptions()->isDryrun() ) {
return $numRecordsProcessed;
}

// For every person in the source data...
foreach ($sourceValues as $sourceValue) {
// Check if the person exists already in the database.
Expand Down Expand Up @@ -191,6 +204,7 @@ protected function _execute()
"Error inserting person data."
);
}

}

// Update the person's organization data.
Expand Down Expand Up @@ -236,7 +250,7 @@ protected function _execute()
}
}
}
}
} // foreach ($sourceValue->organizations as $personOrganization)

// Update the person's identifiers.
$personIdentifiersExist = property_exists($sourceValue, 'identifiers');
Expand All @@ -257,8 +271,11 @@ protected function _execute()
}
}
}
}

return count($sourceValues);
}
}
$numRecordsProcessed++;

} // foreach ($sourceValues as $sourceValue)

return $numRecordsProcessed;
} // _execute()
} // class ValueAnalyticsPeopleIngestor
97 changes: 69 additions & 28 deletions configuration/etl/etl.d/value_analytics.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"type": "mysql",
"name": "Value Analytics DB",
"config": "datawarehouse",
"schema": "modw_value_analytics"
"schema": "modw_value_analytics",
"create_schema_if_not_exists": true
}
}
}
Expand All @@ -29,7 +30,7 @@
"type": "jsonfile",
"name": "Value Analytics Organizations Input File",
"path": "value_analytics/organizations.json",
"array_element_schema_path": "value_analytics/organization.schema.json"
"record_schema_path": "value_analytics/organization.schema.json"
}
}
},
Expand All @@ -42,9 +43,14 @@
"type": "jsonfile",
"name": "Value Analytics People Input File (Filtered to Organizations)",
"path": "value_analytics/people.json",
"filter": {
"jq": "map({name: .organizations[].name}) | unique"
}
"filters": [
{
"type": "external",
"name": "jq",
"path": "jq",
"arguments": "'map({name: .organizations[].name}) | unique'"
}
]
}
}
},
Expand All @@ -57,9 +63,14 @@
"type": "jsonfile",
"name": "Value Analytics Grants Input File (Filtered to Organizations)",
"path": "value_analytics/grants.json",
"filter": {
"jq": "map(.people[] | select(has(\"organization\")) | {name: .organization}) | unique"
}
"filters": [
{
"type": "external",
"name": "jq",
"path": "jq",
"arguments" :"'map(.people[] | select(has(\"organization\")) | {name: .organization}) | unique'"
}
]
}
}
},
Expand All @@ -72,9 +83,14 @@
"type": "jsonfile",
"name": "Value Analytics Grants Input File (Filtered to Funding Agencies)",
"path": "value_analytics/grants.json",
"filter": {
"jq": "map({name: .agency}) | unique"
}
"filters": [
{
"type": "external",
"name": "jq",
"path": "jq",
"arguments": "'map({name: .agency}) | unique'"
}
]
}
}
},
Expand All @@ -87,9 +103,14 @@
"type": "jsonfile",
"name": "Value Analytics People Input File (Filtered to Identity Providers)",
"path": "value_analytics/people.json",
"filter": {
"jq": "map(select(has(\"identifiers\")) | {name: .identifiers[].type}) | unique"
}
"filters": [
{
"type": "external",
"name": "jq",
"path": "jq",
"arguments": "'map(select(has(\"identifiers\")) | {name: .identifiers[].type}) | unique'"
}
]
}
}
},
Expand All @@ -102,9 +123,14 @@
"type": "jsonfile",
"name": "Value Analytics People Input File (Filtered to Organization Groups)",
"path": "value_analytics/people.json",
"filter": {
"jq": "map(.organizations[] | select(has(\"groups\")) | {o_name: .name, g_name: .groups[]}) | unique"
}
"filters": [
{
"type": "external",
"name": "jq",
"path": "jq",
"arguments": "'map(.organizations[] | select(has(\"groups\")) | {o_name: .name, g_name: .groups[]}) | unique'"
}
]
}
}
},
Expand Down Expand Up @@ -148,7 +174,7 @@
"type": "jsonfile",
"name": "Value Analytics People Input File",
"path": "value_analytics/people.json",
"array_element_schema_path": "value_analytics/person.schema.json"
"record_schema_path": "value_analytics/person.schema.json"
}
}
},
Expand All @@ -162,9 +188,14 @@
"type": "jsonfile",
"name": "Value Analytics Grants Input File",
"path": "value_analytics/grants.json",
"filter": {
"jq": "map(.people[] | select(has(\"id\") | not)) | unique | map(. + {organizations: [{name: .organization, id: null}]})"
}
"filters": [
{
"type": "external",
"name": "jq",
"path": "jq",
"arguments": "'map(.people[] | select(has(\"id\") | not)) | unique | map(. + {organizations: [{name: .organization, id: null}]})'"
}
]
}
}
},
Expand All @@ -177,9 +208,14 @@
"type": "jsonfile",
"name": "Value Analytics Grants Input File",
"path": "value_analytics/grants.json",
"filter": {
"jq": "map({name: .type}) | unique"
}
"filters": [
{
"type": "external",
"name": "jq",
"path": "jq",
"arguments": "'map({name: .type}) | unique'"
}
]
}
}
},
Expand All @@ -192,9 +228,14 @@
"type": "jsonfile",
"name": "Value Analytics Grants Input File",
"path": "value_analytics/grants.json",
"filter": {
"jq": "map({name: .people[].role}) | unique"
}
"filters": [
{
"type": "external",
"name": "jq",
"path": "jq",
"arguments": "'map({name: .people[].role}) | unique'"
}
]
}
}
},
Expand All @@ -207,7 +248,7 @@
"type": "jsonfile",
"name": "Value Analytics Grants Input File",
"path": "value_analytics/grants.json",
"array_element_schema_path": "value_analytics/grant.schema.json"
"record_schema_path": "value_analytics/grant.schema.json"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
},
{
"name": "primary",
"type": "boolean",
"type": "tinyint(1)",
"nullable": false,
"default": false,
"comment": "An indicator of if this is a person's primary organization."
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"table_definition": {
"name": "va_grant_fact_by_",
"table_prefix": "va_grant_fact_by_",
"engine": "MyISAM",
"comment": "Value Analytics grant facts aggregated by ${AGGREGATION_UNIT}.",
"columns": [
Expand Down Expand Up @@ -66,7 +67,7 @@
},
{
"name": "time_proportional_dollars",
"type": "decimal(14, 4)",
"type": "decimal(14,4)",
"nullable": false,
"comment": "The time-proportional amount of money associated with a set of grants."
},
Expand Down Expand Up @@ -149,7 +150,7 @@
"agency_id": "g.agency_id",
"grant_type_id": "g.grant_type_id",
"grant_id": "g.id",
"pi_id": "COALESCE((SELECT gp.person_id FROM modw_value_analytics.grants_people AS gp WHERE gp.grant_id = g.id AND gp.grant_role_id = (SELECT gr.id FROM modw_value_analytics.grant_roles AS gr WHERE gr.name = \"PI\" LIMIT 1) LIMIT 1), -1)",
"pi_id": "COALESCE((SELECT gp.person_id FROM ${SOURCE_SCHEMA}.grants_people AS gp WHERE gp.grant_id = g.id AND gp.grant_role_id = (SELECT gr.id FROM modw_value_analytics.grant_roles AS gr WHERE gr.name = \"PI\" LIMIT 1) LIMIT 1), -1)",
"num_grants": "COUNT(*)",
"num_grants_started": "SUM(g.start_time_ts BETWEEN ${:PERIOD_START_TS} AND ${:PERIOD_END_TS})",
"num_grants_ended": "SUM(g.end_time_ts BETWEEN ${:PERIOD_START_TS} AND ${:PERIOD_END_TS})",
Expand Down