Skip to content

Commit

Permalink
Merge pull request #3 from praekeltfoundation/add-content-repo
Browse files Browse the repository at this point in the history
update docker, fix contacts table
  • Loading branch information
codiebeulaine authored Mar 23, 2022
2 parents 4916fa9 + 6c3a842 commit 4dd1018
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

FROM praekeltfoundation/python-base
FROM python:3.8.1-slim-buster

RUN apt-get update && apt-get -y install cron

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# who-rapidpro-bq-sync
# ooy-rapidpro-bq-sync
ETL jobs for syncing rapidpro data to BigQuery
15 changes: 9 additions & 6 deletions rapidpro_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ def get_last_record_date(table, field):
query = f"select EXTRACT(DATETIME from max({field})) from {BQ_DATASET}.{table};"
for row in bigquery_client.query(query).result():
if row[0]:
return str(row[0].strftime("%Y-%m-%dT%H:%M:%S.%fZ"))
timestamp = row[0] + timedelta(hours=2)
return str(timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"))


def get_flows():
Expand Down Expand Up @@ -186,7 +187,8 @@ def upload_to_bigquery(table, data, fields):
source_format="NEWLINE_DELIMITED_JSON",
write_disposition="WRITE_APPEND",
max_bad_records=1,
autodetect=False
autodetect=False,
schema=schema
)

job = bigquery_client.load_table_from_json(
Expand All @@ -205,13 +207,14 @@ def upload_to_bigquery(table, data, fields):
last_contact_date_pageviews = get_last_record_date("page_views", "timestamp")
fields = rapidpro_client.get_fields().all()
log("Start")
print("Fetching page views")
log("Fetching page views")
pageviews = get_content_repo_page_views(last_contact_date_pageviews)
print("Fetching flows")
log("Fetching flows")
flows = get_flows()
print("Fetching flow runs and values")
log("Fetching flow runs and values...")
flow_runs, flow_run_values = get_flow_runs(flows, last_contact_date=last_contact_date_flows)
print("Done with flows")
log(f"flow_runs: {len(flow_runs)}")
log(f"flow_run_values: {len(flow_run_values)}")
log("Fetching groups...")
groups = get_groups()
log(f"Groups: {len(groups)}")
Expand Down

0 comments on commit 4dd1018

Please sign in to comment.