Skip to content

Commit

Permalink
Feat: Migrate the dataset Covid19 Italy from Xenon (#488)
Browse files Browse the repository at this point in the history
  • Loading branch information
Naveen130 authored Oct 6, 2022
1 parent 58cda71 commit 1ca6bd6
Show file tree
Hide file tree
Showing 14 changed files with 489 additions and 202 deletions.
29 changes: 28 additions & 1 deletion datasets/covid19_italy/infra/covid19_italy_dataset.tf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2021 Google LLC
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,3 +24,30 @@ resource "google_bigquery_dataset" "covid19_italy" {
output "bigquery_dataset-covid19_italy-dataset_id" {
value = google_bigquery_dataset.covid19_italy.dataset_id
}

resource "google_bigquery_dataset" "covid19_italy_eu" {
dataset_id = "covid19_italy_eu"
project = var.project_id
description = "COVID-19 Italy data stored in EU region."
location = "EU"
}

output "bigquery_dataset-covid19_italy_eu-dataset_id" {
value = google_bigquery_dataset.covid19_italy_eu.dataset_id
}

resource "google_storage_bucket" "covid19-italy-eu" {
name = "${var.bucket_name_prefix}-covid19-italy-eu"
force_destroy = true
location = "EU"
uniform_bucket_level_access = true
lifecycle {
ignore_changes = [
logging,
]
}
}

output "storage_bucket-covid19-italy-eu-name" {
value = google_storage_bucket.covid19-italy-eu.name
}
31 changes: 22 additions & 9 deletions datasets/covid19_italy/infra/data_by_province_pipeline.tf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2021 Google LLC
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,15 +16,10 @@


resource "google_bigquery_table" "covid19_italy_data_by_province" {
project = var.project_id
dataset_id = "covid19_italy"
table_id = "data_by_province"

project = var.project_id
dataset_id = "covid19_italy"
table_id = "data_by_province"
description = "COVID-19 Italy Data By Province"




depends_on = [
google_bigquery_dataset.covid19_italy
]
Expand All @@ -37,3 +32,21 @@ output "bigquery_table-covid19_italy_data_by_province-table_id" {
output "bigquery_table-covid19_italy_data_by_province-id" {
value = google_bigquery_table.covid19_italy_data_by_province.id
}

resource "google_bigquery_table" "covid19_italy_eu_data_by_province" {
project = var.project_id
dataset_id = "covid19_italy_eu"
table_id = "data_by_province"
description = "COVID-19 Italy Data By Province"
depends_on = [
google_bigquery_dataset.covid19_italy_eu
]
}

output "bigquery_table-covid19_italy_eu_data_by_province-table_id" {
value = google_bigquery_table.covid19_italy_eu_data_by_province.table_id
}

output "bigquery_table-covid19_italy_eu_data_by_province-id" {
value = google_bigquery_table.covid19_italy_eu_data_by_province.id
}
31 changes: 22 additions & 9 deletions datasets/covid19_italy/infra/data_by_region_pipeline.tf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2021 Google LLC
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,15 +16,10 @@


resource "google_bigquery_table" "covid19_italy_data_by_region" {
project = var.project_id
dataset_id = "covid19_italy"
table_id = "data_by_region"

project = var.project_id
dataset_id = "covid19_italy"
table_id = "data_by_region"
description = "COVID-19 Italy Data By Region"




depends_on = [
google_bigquery_dataset.covid19_italy
]
Expand All @@ -37,3 +32,21 @@ output "bigquery_table-covid19_italy_data_by_region-table_id" {
output "bigquery_table-covid19_italy_data_by_region-id" {
value = google_bigquery_table.covid19_italy_data_by_region.id
}

resource "google_bigquery_table" "covid19_italy_eu_data_by_region" {
project = var.project_id
dataset_id = "covid19_italy_eu"
table_id = "data_by_region"
description = "COVID-19 Italy Data By Region"
depends_on = [
google_bigquery_dataset.covid19_italy_eu
]
}

output "bigquery_table-covid19_italy_eu_data_by_region-table_id" {
value = google_bigquery_table.covid19_italy_eu_data_by_region.table_id
}

output "bigquery_table-covid19_italy_eu_data_by_region-id" {
value = google_bigquery_table.covid19_italy_eu_data_by_region.id
}
31 changes: 22 additions & 9 deletions datasets/covid19_italy/infra/national_trends_pipeline.tf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2021 Google LLC
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,15 +16,10 @@


resource "google_bigquery_table" "covid19_italy_national_trends" {
project = var.project_id
dataset_id = "covid19_italy"
table_id = "national_trends"

project = var.project_id
dataset_id = "covid19_italy"
table_id = "national_trends"
description = "COVID-19 Italy National Trends"




depends_on = [
google_bigquery_dataset.covid19_italy
]
Expand All @@ -37,3 +32,21 @@ output "bigquery_table-covid19_italy_national_trends-table_id" {
output "bigquery_table-covid19_italy_national_trends-id" {
value = google_bigquery_table.covid19_italy_national_trends.id
}

resource "google_bigquery_table" "covid19_italy_eu_national_trends" {
project = var.project_id
dataset_id = "covid19_italy_eu"
table_id = "national_trends"
description = "COVID-19 Italy National Trends"
depends_on = [
google_bigquery_dataset.covid19_italy_eu
]
}

output "bigquery_table-covid19_italy_eu_national_trends-table_id" {
value = google_bigquery_table.covid19_italy_eu_national_trends.table_id
}

output "bigquery_table-covid19_italy_eu_national_trends-id" {
value = google_bigquery_table.covid19_italy_eu_national_trends.id
}
2 changes: 1 addition & 1 deletion datasets/covid19_italy/infra/provider.tf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2021 Google LLC
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
5 changes: 4 additions & 1 deletion datasets/covid19_italy/infra/variables.tf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2021 Google LLC
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,4 +20,7 @@ variable "bucket_name_prefix" {}
variable "impersonating_acct" {}
variable "region" {}
variable "env" {}
variable "iam_policies" {
default = {}
}

Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,19 @@ def main(
rename_mappings: dict,
pipeline_name: str,
) -> None:

logging.info(
"Covid-19 Italy process started at "
+ str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
)

logging.info("creating 'files' folder")
pathlib.Path("./files").mkdir(parents=True, exist_ok=True)

logging.info(f"Downloading file {source_url}")
download_file(source_url, source_file)

logging.info(f"Opening file {source_file}")

df = pd.read_csv(str(source_file))

logging.info(f"Transformation Process Starting.. {source_file}")

logging.info(f"Transform: Renaming Headers.. {source_file}")
rename_headers(df, rename_mappings)

logging.info(f"Transform: Creating Geometry Column.. {pipeline_name}")
if pipeline_name == "data_by_province" or pipeline_name == "data_by_region":
df["location_geom"] = (
Expand All @@ -65,24 +57,18 @@ def main(
+ ")"
)
df.location_geom = df.location_geom.replace("POINT( )", "")

logging.info("Transform: Reordering headers..")
df = df[headers]

logging.info(f"Transformation Process complete .. {source_file}")

logging.info(f"Saving to output file.. {target_file}")

try:
save_to_new_file(df, file_path=str(target_file))
except Exception as e:
logging.error(f"Error saving output file: {e}.")

logging.info(
f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}"
)
upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path)

logging.info(
"Covid-19 Italy process completed at "
+ str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2021 Google LLC
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,13 +14,14 @@


from airflow import DAG
from airflow.operators import bash
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.transfers import gcs_to_bigquery

default_args = {
"owner": "Google",
"depends_on_past": False,
"start_date": "2021-04-01",
"start_date": "2022-10-03",
}


Expand Down Expand Up @@ -80,4 +81,39 @@
],
)

data_by_province_transform_csv >> load_data_by_province_to_bq
# Task to copy bq uploadable data file to bucket in EU
copy_data_file_EU = bash.BashOperator(
task_id="copy_data_file_EU",
bash_command="gsutil cp gs://{{ var.value.composer_bucket }}/data/covid19_italy/data_by_province/data_output.csv gs://public-datasets-dev-covid19-italy-eu/province/",
)

# Task to load CSV data to a BigQuery table
load_data_by_province_to_bq_eu = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_data_by_province_to_bq_eu",
bucket="public-datasets-dev-covid19-italy-eu",
source_objects="province/data_output.csv",
source_format="CSV",
destination_project_dataset_table="covid19_italy_eu.data_by_province",
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{"name": "date", "type": "TIMESTAMP", "mode": "NULLABLE"},
{"name": "country", "type": "STRING", "mode": "NULLABLE"},
{"name": "region_code", "type": "STRING", "mode": "NULLABLE"},
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "province_code", "type": "STRING", "mode": "NULLABLE"},
{"name": "province_name", "type": "STRING", "mode": "NULLABLE"},
{"name": "province_abbreviation", "type": "STRING", "mode": "NULLABLE"},
{"name": "latitude", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "longitude", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "location_geom", "type": "GEOGRAPHY", "mode": "NULLABLE"},
{"name": "confirmed_cases", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "note", "type": "STRING", "mode": "NULLABLE"},
],
)

(
data_by_province_transform_csv
>> copy_data_file_EU
>> [load_data_by_province_to_bq, load_data_by_province_to_bq_eu]
)
Loading

0 comments on commit 1ca6bd6

Please sign in to comment.