Skip to content

Commit

Permalink
Merge pull request #2 from praekeltfoundation/add-content-repo
Browse files Browse the repository at this point in the history
add contentrepo
  • Loading branch information
codiebeulaine authored Mar 1, 2022
2 parents e8b016e + 78f9a20 commit 4916fa9
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 10 deletions.
8 changes: 8 additions & 0 deletions fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@
"exited_on": "TIMESTAMP",
"id": "INTEGER",
}
PAGEVIEW_FIELDS = {
"timestamp": "TIMESTAMP",
"page": "INTEGER",
"revision": "INTEGER",
"id": "INTEGER",
"run_uuid": "STRING",
"contact_uuid": "STRING",
}
FLOW_RUN_VALUES_FIELDS = {
"input": "STRING",
"time": "TIMESTAMP",
Expand Down
54 changes: 44 additions & 10 deletions rapidpro_to_bigquery.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import requests
import json
import os
from temba_client.v2 import TembaClient
from google.cloud import bigquery
from google.oauth2 import service_account
from google.api_core.exceptions import BadRequest
import os

from datetime import datetime, timedelta

from fields import (
CONTACT_FIELDS, GROUP_CONTACT_FIELDS, FLOWS_FIELDS,
FLOW_RUNS_FIELDS, FLOW_RUN_VALUES_FIELDS, GROUP_FIELDS)
FLOW_RUNS_FIELDS, FLOW_RUN_VALUES_FIELDS, GROUP_FIELDS, PAGEVIEW_FIELDS)


BQ_KEY_PATH = os.environ.get('BQ_KEY_PATH', "credentials.json")
BQ_DATASET = "one2one-datascience.rapidpro"
RAPIDPRO_URL = "https://one2one.rapidpro.lvcthealth.org/"
RAPIDPRO_TOKEN = os.environ.get('RAPIDPRO_TOKEN', "")
#
CONTENTREPO_TOKEN = os.environ.get('CONTENTREPO_TOKEN', "")

credentials = service_account.Credentials.from_service_account_file(
BQ_KEY_PATH, scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
Expand Down Expand Up @@ -52,7 +56,7 @@ def get_groups():


def get_contacts_and_contact_groups(last_contact_date=None):
rapidpro_contacts = rapidpro_client.get_contacts().all(
rapidpro_contacts = rapidpro_client.get_contacts(after=last_contact_date).all(
retry_on_rate_exceed=True
)

Expand Down Expand Up @@ -107,7 +111,7 @@ def get_flow_runs(flows, last_contact_date=None):
value_records = []

for flow in flows:
for run_batch in rapidpro_client.get_runs(flow=flow["uuid"]).iterfetches(retry_on_rate_exceed=True):
for run_batch in rapidpro_client.get_runs(flow=flow["uuid"], after=last_contact_date).iterfetches(retry_on_rate_exceed=True):
for run in run_batch:

exited_on = None
Expand Down Expand Up @@ -142,6 +146,29 @@ def get_flow_runs(flows, last_contact_date=None):
return records, value_records


def get_content_repo_page_views(last_contact_date=None):
records = []
url = 'http://one2one.content.lvcthealth.org/api/v2/custom/pageviews/'
headers = {'Authorization': 'token {}'.format(CONTENTREPO_TOKEN)}
response = requests.get(url, headers=headers)
results = json.loads(response.content)['results']
for result in results:
result_data = {
"timestamp": result['timestamp'],
"page": result['page'],
"revision": result['revision'],
"id": result['id'],
"run_uuid": "",
"contact_uuid": "",
}
if "run_uuid" and "contact_uuid" in result['data'].keys():
result_data['run_uuid'] = result['data']['run_uuid']
result_data['contact_uuid'] = result['data']['contact_uuid']
records.append(result_data)
return records



def upload_to_bigquery(table, data, fields):
if table in ["flows", "groups"]:
job_config = bigquery.LoadJobConfig(
Expand All @@ -157,7 +184,7 @@ def upload_to_bigquery(table, data, fields):

job_config = bigquery.LoadJobConfig(
source_format="NEWLINE_DELIMITED_JSON",
write_disposition="WRITE_TRUNCATE",
write_disposition="WRITE_APPEND",
max_bad_records=1,
autodetect=False
)
Expand All @@ -173,20 +200,23 @@ def upload_to_bigquery(table, data, fields):


if __name__ == "__main__":
# last_contact_date_contacts = get_last_record_date("contacts_raw", "modified_on")
# last_contact_date_flows = get_last_record_date("flow_runs", "created_at")
last_contact_date_contacts = get_last_record_date("contacts_raw", "modified_on")
last_contact_date_flows = get_last_record_date("flow_runs", "created_at")
last_contact_date_pageviews = get_last_record_date("page_views", "timestamp")
fields = rapidpro_client.get_fields().all()
log("Start")
print("Fetching page views")
pageviews = get_content_repo_page_views(last_contact_date_pageviews)
print("Fetching flows")
flows = get_flows()
print("Fetching flow runs and values")
flow_runs, flow_run_values = get_flow_runs(flows)
flow_runs, flow_run_values = get_flow_runs(flows, last_contact_date=last_contact_date_flows)
print("Done with flows")
log("Fetching groups...")
groups = get_groups()
log(f"Groups: {len(groups)}")
log("Fetching contacts and contact groups...")
contacts, group_contacts = get_contacts_and_contact_groups()
contacts, group_contacts = get_contacts_and_contact_groups(last_contact_date=last_contact_date_contacts)
log(f"Contacts: {len(contacts)}")
log(f"Group Contacts: {len(group_contacts)}")

Expand All @@ -206,6 +236,10 @@ def upload_to_bigquery(table, data, fields):
"data": flows,
"fields": FLOWS_FIELDS,
},
"page_views": {
"data": pageviews,
"fields": PAGEVIEW_FIELDS,
},
"flow_runs": {
"data": flow_runs,
"fields": FLOW_RUNS_FIELDS,
Expand Down

0 comments on commit 4916fa9

Please sign in to comment.