From 6c3a842cb53997a0cb3486ae6929bcd483ca8c5b Mon Sep 17 00:00:00 2001 From: Codie Roelf Date: Mon, 21 Mar 2022 07:45:02 +0200 Subject: [PATCH] update docker, fix contacts table --- Dockerfile | 2 +- README.md | 2 +- rapidpro_to_bigquery.py | 15 +++++++++------ 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/Dockerfile b/Dockerfile index 6f8656b..8543173 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ -FROM praekeltfoundation/python-base +FROM python:3.8.1-slim-buster RUN apt-get update && apt-get -y install cron diff --git a/README.md b/README.md index 08b066d..b5b39a7 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ -# who-rapidpro-bq-sync +# ooy-rapidpro-bq-sync ETL jobs for syncing rapidpro data to BigQuery diff --git a/rapidpro_to_bigquery.py b/rapidpro_to_bigquery.py index c8fe510..23dfd09 100644 --- a/rapidpro_to_bigquery.py +++ b/rapidpro_to_bigquery.py @@ -88,7 +88,8 @@ def get_last_record_date(table, field): query = f"select EXTRACT(DATETIME from max({field})) from {BQ_DATASET}.{table};" for row in bigquery_client.query(query).result(): if row[0]: - return str(row[0].strftime("%Y-%m-%dT%H:%M:%S.%fZ")) + timestamp = row[0] + timedelta(hours=2) + return str(timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ")) def get_flows(): @@ -186,7 +187,8 @@ def upload_to_bigquery(table, data, fields): source_format="NEWLINE_DELIMITED_JSON", write_disposition="WRITE_APPEND", max_bad_records=1, - autodetect=False + autodetect=False, + schema=schema ) job = bigquery_client.load_table_from_json( @@ -205,13 +207,14 @@ def upload_to_bigquery(table, data, fields): last_contact_date_pageviews = get_last_record_date("page_views", "timestamp") fields = rapidpro_client.get_fields().all() log("Start") - print("Fetching page views") + log("Fetching page views") pageviews = get_content_repo_page_views(last_contact_date_pageviews) - print("Fetching flows") + log("Fetching flows") flows = get_flows() - print("Fetching flow runs and values") + log("Fetching flow runs and values...") flow_runs, flow_run_values = get_flow_runs(flows, last_contact_date=last_contact_date_flows) - print("Done with flows") + log(f"flow_runs: {len(flow_runs)}") + log(f"flow_run_values: {len(flow_run_values)}") log("Fetching groups...") groups = get_groups() log(f"Groups: {len(groups)}")